三个方面分析RxJava2的实现:
- RxJava2基本流程的分析
- 操作符原理的分析
- 线程调度的分析
源码基于RxJava2(2.1.15版本)
RxJava2基本流程的分析
举个栗子
1 | Observable.create(new ObservableOnSubscribe<String>() { |
Observable.create()
创建一个被观察者new Observer<String>()
创建一个观察者subscribe()
两者产生订阅关系new ObservableOnSubscribe<String>()
被观察者通知观察者
Observable.create()
源码分析
A Observable.create()
1
2
3
4public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
传入ObservableOnSubscribe<T>
返回Observable<T>
B return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
传入 ObservableCreate<T>(source)
C onAssembly(new ObservableCreate<T>(source))
1 | public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) { |
传入ObservableCreate<T>
返回Observable<T>
总结:A->B>C,Observable.create()
得到一个Observable
subscribe()
源码分析
1 | public final void subscribe(Observer<? super T> observer) { |
subscribeActual(observer)
源码分析
1 | protected void subscribeActual(Observer super T> observer) { |
source.subscribe(parent);
之后源码分析
1 | static final class CreateEmitter<T> |
ObservableOnSubscribe<T>
源码分析
A ObservableOnSubscribe<T>
1 | public interface ObservableOnSubscribe<T> { |
B subscribe(ObservableEmitter<T> e)
C ObservableEmitter
1 | public interface ObservableEmitter<T> extends Emitter<T> { |
D Emitter
1 | public interface Emitter<T> { |
总结:ObservableOnSubscribe
中实现subscribe()
方法里最常用的三个方法
总结
- 创建被观察者->创建观察者->两者订阅->被观察者发射数据(
ObservableEmitter
)->观察者接收数据 - 被观察者和观察者之间如果没有
diapose
,之后才会进行观察者的onNext(),onComplete(),onError()....
- 观察者的
onComplete()
和onError()
两者互斥,只能选其一 - 被观察者和观察者订阅之后,才会发射数据
总结
- 什么时候被观察者和观察者订阅:
subscribeActual()
- 被观察者和观察者订阅之后才发送数据
- 观察者的
onComplete()
和onError()
两者互斥,只能选其一
操作符原理的分析
我们这里就以map
操作符为例
先举个栗子
1 | Observable.just(1,2,3) |
map()
源码分析
1 | public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) { |
ObservableMap
源码分析
1 | public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> { |
MapObserver
源码分析
1 | static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> { |
总结
- 订阅:下游(观察者)订阅上游(被观察者)
- 订阅之后,发送数据
- 数据流向:上游(被观察者)->下游(观察者)
- 变换操作:发生在
MapObserver
中
线程调度的分析
举个栗子
1 | Observable.just(1,2,3) |
subscribeOn
源码分析
1 | //套路都一样 |
ObservableSubscribeOn
源码分析
1 | public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> { |
scheduleDirect
源码分析
1 | public Disposable scheduleDirect(@NonNull Runnable run) { |
scheduleDirect
源码分析
1 | public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) { |
返回的Disposable对象
schedule
源码分析
1 | public abstract Disposable schedule(@NonNull Runnable run, long delay, @NonNull TimeUnit unit); |
延迟0秒执行Runnable,也就是直接执行
原话:
Schedules an Runnable for execution at some point in the future specified by a time delay relative to the current time.
结束之后,自动执行w.dispose();
总结subscribeOn(Schedulers.xxx())
过程
- 返回一个
ObservableSubscribeOn
对象 - 上一步返回的对象被订阅时,回调该类中的
subscribeActual()
方法,在其中会立刻将线程切换到对应的Schedulers.xxx()
线程 - 在切换后的线程中,执行
source.subscribe(parent)
;对上游(被观察者)Observable
订阅 - 上游(被观察者)
Observable
开始发送数据, - 上游发送数据仅仅是调用下游观察者对应的
onXXX()
方法而已,所以此时操作是在切换后的线程中进行
多次切换线程,以第一次为准?
- 因为订阅的流程是从下游(观察者)往上游(被观察者)方向,
- 切换线程发生在
source.subscribe(parent)
中,所以在最上面的线程最后才执行切换,所以以最上面的为准 - 数据从被观察者流向观察者,所以在最上面的线程中进行传递数据,所以就是以第一次为准啦
observeOn
源码分析
1 | public final Observable<T> observeOn(Scheduler scheduler) { |
observeOn
源码分析
1 | public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) { |
ObservableObserveOn
源码分析
1 | public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> { |
总结observeOn
过程
- 先将来自于被观察者的数据存入
queue
中 - 然后开始切换到指定线程
- 切换线程后,从
queue
中取出数据,发送给观察者 onError
和onComplete
,会先将信息保存,切换线程后再发送
关于观察者多次调用生效问题。
- 对比
subscribeOn()
的切换线程是在subscribeActual()里进行的,主动切换了上游的订阅线程,从而影响其发射数据时所在的线程。而直到真正发射数据之前,任何改变线程的行为,都会生效(影响发射数据的线程)。所以subscribeOn()只生效一次。 - observeOn()切换线程发生在
onXXX()
,是一个主动的行为,并且切换线程后会立刻发送数据,所以会生效多次 - 再解释一下,
subscribeOn
,先切换线程,后发送数据,切换时从下往上,切换后,发送的数据存储在queue
中,然后observeOn
,指定接受线程,切换一下线程,发送一次数据,从上到下,所以每次都有效(希望能够理解了)
背压策略
参考大神分析
同步情况
异步情况
当观察者每消费96个事件便会自动触发内部的request()去设置被观察者的requested的值啊