Fork me on GitHub

RxJava2(5)

三个方面分析RxJava2的实现:

  1. RxJava2基本流程的分析
  2. 操作符原理的分析
  3. 线程调度的分析

源码基于RxJava2(2.1.15版本)

RxJava2基本流程的分析

举个栗子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("hello world!");
emitter.onComplete();
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.i(TAG, "onSubscribe: ");
}

@Override
public void onNext(String s) {
Log.i(TAG, "onNext: " + s);
}

@Override
public void onError(Throwable e) {
Log.i(TAG, "onError: ");
}

@Override
public void onComplete() {
Log.i(TAG, "onComplete: ");
}
});
  • Observable.create() 创建一个被观察者
  • new Observer<String>() 创建一个观察者
  • subscribe() 两者产生订阅关系
  • new ObservableOnSubscribe<String>() 被观察者通知观察者

Observable.create()源码分析

A Observable.create()

1
2
3
4
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}

传入ObservableOnSubscribe<T>
返回Observable<T>

B return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));

传入 ObservableCreate<T>(source)

C onAssembly(new ObservableCreate<T>(source))

1
2
3
4
5
6
7
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
Function<? super Observable, ? extends Observable> f = onObservableAssembly;
if (f != null) {
return apply(f, source);
}
return source;
}

传入ObservableCreate<T>
返回Observable<T>

总结:A->B>C,Observable.create() 得到一个Observable对象,也就是一个被观察者对象。

subscribe()源码分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
//hook相关,如果没有hook,即相应的对象是null,则是传入什么返回什么的
observer = RxJavaPlugins.onSubscribe(this, observer);

ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
//订阅的地方,重点
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Disposable has been set or not
// can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);

NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}

subscribeActual(observer)源码分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
protected void subscribeActual(Observer<? super T> observer) {
//1 创建CreateEmitter,也是一个适配器
CreateEmitter<T> parent = new CreateEmitter<T>(observer);

//2 onSubscribe()参数是Disposable(实现了Disposable接口) ,所以CreateEmitter可以将Observer->Disposable 。
//还有一点要注意的是`onSubscribe()`是在我们执行`subscribe()`这句代码的那个线程回调的,并不受线程调度影响。
//此时,*** 进行onSubscribe(被观察者和观察者)
observer.onSubscribe(parent);
//log 订阅

try {
//3 将ObservableOnSubscribe(源头)与CreateEmitter(Observer,终点)联系起来
source.subscribe(parent);
//4 调用parent.onNext() 和parent.onComplete(),parent是CreateEmitter对象,
//就是emitter.onNext("hello world!");emitter.onComplete();
//log onNext: helloworld!
//log onComplete
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}

source.subscribe(parent);之后源码分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
static final class CreateEmitter<T>
extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {
final Observer<? super T> observer;
CreateEmitter(Observer<? super T> observer) {
this.observer = observer;
}

@Override
public void onNext(T t) {
...
//如果没有被dispose,会调用Observer的onNext()方法
if (!isDisposed()) {
observer.onNext(t);
}
}

@Override
public void onError(Throwable t) {
...
//1 如果没有被dispose,会调用Observer的onError()方法
if (!isDisposed()) {
try {
observer.onError(t);
} finally {
//2 一定会自动dispose()
dispose();
}
} else {
//3 如果已经被dispose了,会抛出异常。所以onError、onComplete彼此互斥,只能被调用一次
RxJavaPlugins.onError(t);
}
}

@Override
public void onComplete() {
//1 如果没有被dispose,会调用Observer的onComplete()方法
if (!isDisposed()) {
try {
observer.onComplete();
} finally {
//2 一定会自动dispose()
dispose();
}
}
}

@Override
public void dispose() {
DisposableHelper.dispose(this);
}

@Override
public boolean isDisposed() {
return DisposableHelper.isDisposed(get());
}
}

ObservableOnSubscribe<T>源码分析

A ObservableOnSubscribe<T>

1
2
3
public interface ObservableOnSubscribe<T> {
void subscribe(ObservableEmitter<T> e) throws Exception;
}

B subscribe(ObservableEmitter<T> e)

C ObservableEmitter

1
2
3
4
5
6
7
public interface ObservableEmitter<T> extends Emitter<T> {
void setDisposable(Disposable d);
void setCancellable(Cancellable c);
boolean isDisposed();
ObservableEmitter<T> serialize();
boolean tryOnError(@NonNull Throwable t);
}

D Emitter

1
2
3
4
5
public interface Emitter<T> {
void onNext(T value);
void onError(Throwable error);
void onComplete();
}

总结:ObservableOnSubscribe中实现subscribe()方法里最常用的三个方法

总结

  1. 创建被观察者->创建观察者->两者订阅->被观察者发射数据(ObservableEmitter)->观察者接收数据
  2. 被观察者和观察者之间如果没有diapose,之后才会进行观察者的onNext(),onComplete(),onError()....
  3. 观察者的onComplete()onError()两者互斥,只能选其一
  4. 被观察者和观察者订阅之后,才会发射数据

总结

  1. 什么时候被观察者和观察者订阅:subscribeActual()
  2. 被观察者和观察者订阅之后才发送数据
  3. 观察者的onComplete()onError()两者互斥,只能选其一

操作符原理的分析

我们这里就以map操作符为例

先举个栗子

1
2
3
4
5
6
7
8
9
10
11
12
Observable.just(1,2,3)
.map(new Function<Integer, String>() {
@Override
public String apply(Integer integer){
return "string: " + integer;
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s){
Log.i(TAG, "accept: " + s);
}
});

map()源码分析

1
2
3
4
5
6
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
//判断是否非空
ObjectHelper.requireNonNull(mapper, "mapper is null");
//上面介绍过了,重点在ObservableMap,返回Observable
return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}

ObservableMap源码分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {

//Function就是我们的转换函数,T:Integer,U:String
final Function<? super T, ? extends U> function;

public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
//source:上游(被观察者)保存
super(source);
this.function = function;
}

//被观察者和观察者订阅的地方
@Override
public void subscribeActual(Observer<? super U> t) {
//source:上游(被观察者)
//MapObserver(t 观察者)订阅source(被观察者)
source.subscribe(new MapObserver<T, U>(t, function));
}

.....
}

MapObserver源码分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
final Function<? super T, ? extends U> mapper;

MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {

//super()将actual(观察者)保存起来,mapper保存Function变量
super(actual);
this.mapper = mapper;
}

@Override
public void onNext(T t) {

//protected boolean done;默认为false,onError 和 onComplete以后才会是true
/**
* @Override
* public void onError(Throwable t) {
* if (done) {
* RxJavaPlugins.onError(t);
* return;
* }
* done = true;
* actual.onError(t);
* }
* @Override
* public void onComplete() {
* if (done) {
* return;
* }
* done = true;
* actual.onComplete();
* }
**/
if (done) {
return;
}

if (sourceMode != NONE) {
actual.onNext(null);
return;
}

U v;

//进行变换,将上游传过来的T,利用Function转换成下游需要的U。
try {
v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
fail(ex);
return;
}

//变换后传递给下游Observer
actual.onNext(v);
}

@Override
public int requestFusion(int mode) {
return transitiveBoundaryFusion(mode);
}

@Nullable
@Override
public U poll() throws Exception {
T t = qs.poll();
return t != null ? ObjectHelper.<U>requireNonNull(mapper.apply(t), "The mapper function returned a null value.") : null;
}
}

总结

  1. 订阅:下游(观察者)订阅上游(被观察者)
  2. 订阅之后,发送数据
  3. 数据流向:上游(被观察者)->下游(观察者)
  4. 变换操作:发生在MapObserver

线程调度的分析

举个栗子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
Observable.just(1,2,3)
.map(new Function<Integer, String>() {
@Override
public String apply(Integer integer){
return "string: " + integer;
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<String>() {
@Override
public void accept(String s){
Log.i(TAG, "accept: " + s);
}
});

subscribeOn源码分析

1
2
3
4
5
6
7
//套路都一样
public final Observable<T> subscribeOn(Scheduler scheduler) {
//判断非空
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
//重点ObservableSubscribeOn
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}

ObservableSubscribeOn源码分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
// 保存线程调度器
final Scheduler scheduler;

//保存source上游(被观察者)
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
super(source);
this.scheduler = scheduler;
}

//重点 订阅起作用的地方
@Override
public void subscribeActual(final Observer<? super T> s) {

//创建下游(观察者)
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
//调用下游(观察者)Observer.onSubscribe()方法,
//所以被观察者订阅观察者订阅是发生在指定的线程中
s.onSubscribe(parent);
//setDisposable()是为了将子线程的操作加入Disposable管理中
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}

static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {

private static final long serialVersionUID = 8094547886072529208L;

//真正的下游(观察者)
final Observer<? super T> actual;
//用于保存上游的Disposable,以便在自身dispose时,连同上游一起dispose
final AtomicReference<Disposable> s;

SubscribeOnObserver(Observer<? super T> actual) {
this.actual = actual;
this.s = new AtomicReference<Disposable>();
}

@Override
public void onSubscribe(Disposable s) {
//onSubscribe()方法由上游调用,传入Disposable。在本类中赋值给this.s,加入管理。
DisposableHelper.setOnce(this.s, s);
}

@Override
public void onNext(T t) {
actual.onNext(t);
}

@Override
public void onError(Throwable t) {
actual.onError(t);
}

@Override
public void onComplete() {
actual.onComplete();
}

@Override
public void dispose() {
//s,上游(被观察者)的dispose
DisposableHelper.dispose(s);
DisposableHelper.dispose(this);
}

@Override
public boolean isDisposed() {
return DisposableHelper.isDisposed(get());
}

void setDisposable(Disposable d) {
DisposableHelper.setOnce(this, d);
}
}

final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;

SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}

@Override
public void run() {
source.subscribe(parent);
}
}
}

scheduleDirect源码分析

1
2
3
public Disposable scheduleDirect(@NonNull Runnable run) {
return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}
scheduleDirect源码分析
1
2
3
4
5
6
7
8
9
10
11
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
final Worker w = createWorker();

final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

DisposeTask task = new DisposeTask(decoratedRun, w);

w.schedule(task, delay, unit);

return task;
}

返回的Disposable对象

schedule源码分析
1
public abstract Disposable schedule(@NonNull Runnable run, long delay, @NonNull TimeUnit unit);

延迟0秒执行Runnable,也就是直接执行
原话:

Schedules an Runnable for execution at some point in the future specified by a time delay relative to the current time.

结束之后,自动执行w.dispose();

总结subscribeOn(Schedulers.xxx())过程

  1. 返回一个ObservableSubscribeOn对象
  2. 上一步返回的对象被订阅时,回调该类中的subscribeActual()方法,在其中会立刻将线程切换到对应的Schedulers.xxx()线程
  3. 在切换后的线程中,执行source.subscribe(parent);对上游(被观察者)Observable订阅
  4. 上游(被观察者)Observable开始发送数据,
  5. 上游发送数据仅仅是调用下游观察者对应的onXXX()方法而已,所以此时操作是在切换后的线程中进行

多次切换线程,以第一次为准?

  • 因为订阅的流程是从下游(观察者)往上游(被观察者)方向,
  • 切换线程发生在source.subscribe(parent)中,所以在最上面的线程最后才执行切换,所以以最上面的为准
  • 数据从被观察者流向观察者,所以在最上面的线程中进行传递数据,所以就是以第一次为准啦

observeOn源码分析

1
2
3
4
public final Observable<T> observeOn(Scheduler scheduler) {
return observeOn(scheduler, false, bufferSize());
}
//注:BUFFER_SIZE = Math.max(1, Integer.getInteger("rx2.buffer-size", 128));

observeOn源码分析

1
2
3
4
5
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}

ObservableObserveOn源码分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
final boolean delayError;
//128
final int bufferSize;
public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
super(source);
this.scheduler = scheduler;
this.delayError = delayError;
this.bufferSize = bufferSize;
}

@Override
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
//创建指定的线程
Scheduler.Worker w = scheduler.createWorker();
//订阅被观察者的数据源
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}

static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
implements Observer<T>, Runnable {

private static final long serialVersionUID = 6576896619930983584L;
//观察者
final Observer<? super T> actual;
//指定的线程
final Scheduler.Worker worker;
//是否延迟
final boolean delayError;
//128
final int bufferSize;
//被观察者传递的数据都存放在这里
SimpleQueue<T> queue;

Disposable s;

Throwable error;
volatile boolean done;

volatile boolean cancelled;

int sourceMode;

boolean outputFused;

ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
this.actual = actual;
this.worker = worker;
this.delayError = delayError;
this.bufferSize = bufferSize;
}

@Override
public void onSubscribe(Disposable s) {
if (DisposableHelper.validate(this.s, s)) {
this.s = s;
if (s instanceof QueueDisposable) {
@SuppressWarnings("unchecked")
QueueDisposable<T> qd = (QueueDisposable<T>) s;

int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY);
//同步或异步
//创建保存数据的队列
//订阅观察者的onSubscribe方法
if (m == QueueDisposable.SYNC) {
sourceMode = m;
queue = qd;
done = true;
actual.onSubscribe(this);
schedule();
return;
}
if (m == QueueDisposable.ASYNC) {
sourceMode = m;
queue = qd;
actual.onSubscribe(this);
return;
}
}

queue = new SpscLinkedArrayQueue<T>(bufferSize);

actual.onSubscribe(this);
}
}

@Override
public void onNext(T t) {
if (done) {
return;
}

if (sourceMode != QueueDisposable.ASYNC) {
//存入来自于被观察者的数据
queue.offer(t);
}
//进入指定线程,在queue中取出数据,发送给观察者
schedule();
}

@Override
public void onError(Throwable t) {
if (done) {
RxJavaPlugins.onError(t);
return;
}
error = t;
done = true;
schedule();
}

@Override
public void onComplete() {
if (done) {
return;
}
done = true;
schedule();
}

@Override
public void dispose() {
if (!cancelled) {
cancelled = true;
s.dispose();
worker.dispose();
if (getAndIncrement() == 0) {
queue.clear();
}
}
}

@Override
public boolean isDisposed() {
return cancelled;
}

void schedule() {
if (getAndIncrement() == 0) {
//传入一个线程,查看对应的run()方法
worker.schedule(this);
}
}

void drainNormal() {
int missed = 1;

final SimpleQueue<T> q = queue;
final Observer<? super T> a = actual;

for (;;) {
//检查queue是否为空
if (checkTerminated(done, q.isEmpty(), a)) {
return;
}

for (;;) {
boolean d = done;
T v;

try {
//poll取出一个数据
v = q.poll();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
s.dispose();
q.clear();
a.onError(ex);
worker.dispose();
return;
}
boolean empty = v == null;

//检查是否满足终止条件
if (checkTerminated(d, empty, a)) {
return;
}

//上游还没结束数据发送,但是这边处理的队列已经是空的,不会push给观察者 Observer
if (empty) {
//结束此次循环
break;
}

//发送给观察者
a.onNext(v);
}

missed = addAndGet(-missed);
if (missed == 0) {
break;
}
}
}

void drainFused() {
int missed = 1;

for (;;) {
if (cancelled) {
return;
}

boolean d = done;
Throwable ex = error;

if (!delayError && d && ex != null) {
actual.onError(error);
worker.dispose();
return;
}

actual.onNext(null);

if (d) {
ex = error;
if (ex != null) {
actual.onError(ex);
} else {
actual.onComplete();
}
worker.dispose();
return;
}

missed = addAndGet(-missed);
if (missed == 0) {
break;
}
}
}

@Override
public void run() {
if (outputFused) {
drainFused();
} else {
//取出数据,发送
drainNormal();
}
}

//检查是否结束,没有数据发送
boolean checkTerminated(boolean d, boolean empty, Observer<? super T> a) {
if (cancelled) {
queue.clear();
return true;
}
if (d) {
Throwable e = error;
if (delayError) {
if (empty) {
if (e != null) {
a.onError(e);
} else {
a.onComplete();
}
worker.dispose();
return true;
}
} else {
if (e != null) {
queue.clear();
a.onError(e);
worker.dispose();
return true;
} else
if (empty) {
a.onComplete();
worker.dispose();
return true;
}
}
}
return false;
}

@Override
public int requestFusion(int mode) {
if ((mode & ASYNC) != 0) {
outputFused = true;
return ASYNC;
}
return NONE;
}

@Nullable
@Override
public T poll() throws Exception {
return queue.poll();
}

@Override
public void clear() {
queue.clear();
}

@Override
public boolean isEmpty() {
return queue.isEmpty();
}
}
}

总结observeOn过程

  • 先将来自于被观察者的数据存入queue
  • 然后开始切换到指定线程
  • 切换线程后,从queue中取出数据,发送给观察者
  • onErroronComplete,会先将信息保存,切换线程后再发送

关于观察者多次调用生效问题。

  • 对比subscribeOn()的切换线程是在subscribeActual()里进行的,主动切换了上游的订阅线程,从而影响其发射数据时所在的线程。而直到真正发射数据之前,任何改变线程的行为,都会生效(影响发射数据的线程)。所以subscribeOn()只生效一次。
  • observeOn()切换线程发生在onXXX(),是一个主动的行为,并且切换线程后会立刻发送数据,所以会生效多次
  • 再解释一下,subscribeOn,先切换线程,后发送数据,切换时从下往上,切换后,发送的数据存储在queue中,然后observeOn,指定接受线程,切换一下线程,发送一次数据,从上到下,所以每次都有效(希望能够理解了)

背压策略

参考大神分析

同步情况

异步情况

当观察者每消费96个事件便会自动触发内部的request()去设置被观察者的requested的值啊