Rxjava 源码系列 - 基础框架分析

  • 时间:2019-06-11 02:48 作者:Android进阶开发 来源:Android进阶开发 阅读:82
  • 扫一扫,手机访问
摘要:前言RxjavaRxAndroid本篇博客讲解的 Rxjava 的原理基于版本 2.1.4,RxAndroid 的原理的版本基于 2.0.2 。基本框架Rxjava 有四个基本的概念Observable (可观察者,即被观察者)Observer (观察者)subscribe (订阅) 通过该方法,将

前言

Rxjava

RxAndroid

本篇博客讲解的 Rxjava 的原理基于版本 2.1.4,RxAndroid 的原理的版本基于 2.0.2 。


基本框架

Rxjava 有四个基本的概念

  • Observable (可观察者,即被观察者)
  • Observer (观察者)
  • subscribe (订阅) 通过该方法,将 Observable 与 Observer 关联起来
  • 事件 (包括 onNext,onComplete,onError 等事件)

简单来说:Observable 和 Observer 通过 subscribe() 方法实现订阅关系,从而 Observable 可以在需要的时候发出事件来通知 Observer,并且回调 Observer 的相应的方法。

用一张简单的图来形容大概如下

该图片来源于 给 Android 开发者的 RxJava 详解

Observable

public abstract class Observable<T> implements ObservableSource<T> {}

可以看到 Observable 是一个笼统类,实现了 ObservableSource 接口

Observer

Observer 其实也是一个接口,里面定义了若干方法,onSubscribe ,onNext,onError,onComplete 方法。

public interface Observer<T> {    void onSubscribe(@NonNull Disposable d);    void onNext(@NonNull T t);    void onError(@NonNull Throwable e);    void onComplete();}
  • 一个正常的事件序列的调用顺序会是这样的 onSubscribe > onNext > onComplete,若中途出错了,那调用顺序可能是这样的 onSubscribe > onNext > onError
  • onSubscribe 方法,当我们调用 Observable 的 subscribe 方法的时候,会先回调 Observer 的 onSubscribe 方法,此方法的调用顺序先于 onNext,onError ,onComplete 方法。
  • onError 方法与 onComplete 方法可以说是互斥的,调用了其中一个方法就不会调用另外一个方法

源码解析

基本使用

在讲解原理之前,我们先来看一下 Rxjava 的一个基本使用。

Observable           .create(new ObservableOnSubscribe<String>() {                @Override                public void subscribe(ObservableEmitter<String> emitter) throws Exception {                    emitter.onNext("a");                    emitter.onNext("b");                    emitter.onNext("c");                    emitter.onComplete();                }            })            .subscribe(new Observer<String>() {                @Override                public void onSubscribe(Disposable d) {                    Log.e("TAG", "onSubscribe():  ");                }                @Override                public void onNext(String s) {                    Log.e("TAG", "onNext():  " + s);                }                @Override                public void onError(Throwable e) {                }                @Override                public void onComplete() {                    Log.e("TAG", "onComplete():  ");                }            });
E/TAG: onSubscribe():  E/TAG: onNext():  aE/TAG: onNext():  bE/TAG: onNext():  cE/TAG: onComplete():

首先我们先从上面简单的例子回顾起:

先来看 Observable 的 create 方法

public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {    ObjectHelper.requireNonNull(source, "source is null");    return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));}

在 create 方法中,其实很简单,只是对 source 进行判空解决,并将 source 用 ObservableCreate 包装起来,并返回回去。下面让我们一起来看一下 ObservableCreate 是什么东西?

public final class ObservableCreate<T> extends Observable<T> {    final ObservableOnSubscribe<T> source;    public ObservableCreate(ObservableOnSubscribe<T> source) {        this.source = source;    }    @Override    protected void subscribeActual(Observer<? super T> observer) {        CreateEmitter<T> parent = new CreateEmitter<T>(observer);        observer.onSubscribe(parent);        try {            source.subscribe(parent);        } catch (Throwable ex) {            Exceptions.throwIfFatal(ex);            parent.onError(ex);        }    }

ObservableCreate 其实也很简单,它是 Observable 的子类,持有了上游 source 的引用,并重写 subscribeActual 方法。

接下来我们来看重点了,即 Observable 的 subscribe 方法,在该方法中,他会将 Observalble 与 observer 关联起来。

@SchedulerSupport(SchedulerSupport.NONE)@Overridepublic final void subscribe(Observer<? super T> observer) {    // 检查 observer 能否为 null,为 null 抛出异常    ObjectHelper.requireNonNull(observer, "observer is null");    try {       // RxJavaPlugins 插件的,暂时不论        observer = RxJavaPlugins.onSubscribe(this, observer);      // 检查 observer 能否为 null,为 null 抛出异常        ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");        subscribeActual(observer);    } catch (NullPointerException e) { // NOPMD        throw e;    } catch (Throwable e) {        Exceptions.throwIfFatal(e);        // can't call onError because no way to know if a Disposable has been set or not        // can't call onSubscribe because the call might have set a Subscription already        RxJavaPlugins.onError(e);        NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");        npe.initCause(e);        throw npe;    }}

subscribe 方法也比较简单,大概可以分为以下两步:

  • 首先检查 observer 能否为空,为 null 抛出异常
  • 第二步,调用 subscribeActual 方法,而我们知道在 Observable 类中 subscribeActual 是笼统方法,因而,我们只要要关注其实现类的 subscribeActual 方法。从上面的分析,我们知道,当我们调用 Observable<T> create(ObservableOnSubscribe<T> source) 方法的时候,最终会返回 ObservableCreate 实例。因而,我们只要要关注 ObservableCreate 的 subscribeActual 方法
public final class ObservableCreate<T> extends Observable<T> {    final ObservableOnSubscribe<T> source;    public ObservableCreate(ObservableOnSubscribe<T> source) {        this.source = source;    }    @Override    protected void subscribeActual(Observer<? super T> observer) {        CreateEmitter<T> parent = new CreateEmitter<T>(observer);        observer.onSubscribe(parent);        try {            source.subscribe(parent);        } catch (Throwable ex) {            Exceptions.throwIfFatal(ex);            parent.onError(ex);        }    }    ----}

ObservableCreate 的核心代码主要也只有几行,source 是上游 ObservableOnSubscribe 的引用,而 CreateEmitter 这个类,它是 ObservableCreate 的一个静态内部类,实现了 ObservableEmitter,Disposable 接口 它持有 observer 的引用,当我们调用 CreateEmitter 的 next 方法的时候,它会判断当前的 CreateEmitter 有没有被 dispose 掉,假如没有,调用他持有的 observer 的 onNext 方法, 同理 onComplete 方法逐个样,只不过执行完 onComplete 方法的时候,还会执行 dispose 方法,dispose 当前的 CreateEmitter。(dispose 方法这里先记住以下,下面会讲到

static final class CreateEmitter<T>extends AtomicReference<Disposable>implements ObservableEmitter<T>, Disposable {    private static final long serialVersionUID = -3434801548987643227L;    final Observer<? super T> observer;    CreateEmitter(Observer<? super T> observer) {        this.observer = observer;    }    @Override    public void onNext(T t) {        if (t == null) {            onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));            return;        }        if (!isDisposed()) {            observer.onNext(t);        }    }    @Override    public void onError(Throwable t) {        if (!tryOnError(t)) {            RxJavaPlugins.onError(t);        }    }    @Override    public boolean tryOnError(Throwable t) {        if (t == null) {            t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");        }        if (!isDisposed()) {            try {                observer.onError(t);            } finally {                dispose();            }            return true;        }        return false;    }    @Override    public void onComplete() {        if (!isDisposed()) {            try {                observer.onComplete();            } finally {                dispose();            }        }    }    @Override    public void setDisposable(Disposable d) {        DisposableHelper.set(this, d);    }    @Override    public void setCancellable(Cancellable c) {        setDisposable(new CancellableDisposable(c));    }    @Override    public ObservableEmitter<T> serialize() {        return new SerializedEmitter<T>(this);    }    @Override    public void dispose() {        DisposableHelper.dispose(this);    }    @Override    public boolean isDisposed() {        return DisposableHelper.isDisposed(get());    }}

好,看完上面的代码,我们回到 ObservableCreate 的 subscribeActual 方法,我们调用 observer.onSubscribe 方法的时候,会将 parent 对象作为方法参数暴露出去(而这个 parent 正是我们的 CreateEmitter,通过 CreateEmitter 的 dispose 方法可以取消订阅关系)。接着,当我们调用 source.subscribe(parent) 的时候,会调用 ObservableOnSubscribe 的 subscribe 方法。

    CreateEmitter<T> parent = new CreateEmitter<T>(observer);    observer.onSubscribe(parent);    try {        source.subscribe(parent);    } catch (Throwable ex) {        Exceptions.throwIfFatal(ex);        parent.onError(ex);    }

因而,在我们上面的例子中,若不出错,调用顺序

Observable subcrible > Observable subscribeActual > ObservableCreate subscribeActual > observer.onSubscribe > ObservableOnSubscribe subscribe(emitter 是 CreateEmitter 的实例,包装了 observer,调用 emitter 的相应方法 ,会进而调用 observer 的 onNext onComplete 方法,而不会调用 onError 方法)

若在调用 onNext 方法的过程中出错,那调用顺序可能是这样的

Observable subcrible > Observable subscribeActual > ObservableCreate subscribeActual > observer.onSubscribe > ObservableOnSubscribe subscribe(@NonNull ObservableEmitter<T> emitter)
(emitter 是 CreateEmitter 的实例,包装了 observer,调用 emitter 的相应方法 ,会进而调用 observer 的 onNext onError 方法,而不会调用 onComplete 方法 )


observable 与 Observer 是如何取消订阅关系的

在上面讲解的时候,其实我们已经有提到 CreateEmitter 的 dispose 方法,该方法就是用来取消订阅关系的。

假设这样一个场景,当我们收到的 value 的值大于等于 2 的时候,这个时候认为是异常的,处理两者之间的订阅关系

    Observable<Integer> observable=Observable.create(new ObservableOnSubscribe<Integer>() {            @Override            public void subscribe(ObservableEmitter<Integer> e) throws Exception {                e.onNext(1);                e.onNext(2);                e.onNext(3);                e.onNext(4);                e.onComplete();            }        });    Observer<Integer> observer = new Observer<Integer>() {            private Disposable disposable;            @Override            public void onSubscribe(Disposable d) {                disposable = d;            }            @Override            public void onNext(Integer value) {                Log.d("xujun", value.toString());                if (value >=2) {   // >=2  时为异常数据,解除订阅                    disposable.dispose();                }            }            @Override            public void onError(Throwable e) {            }            @Override            public void onComplete() {            }        };    observable.subscribe(observer); //建立订阅关系
D/xujun: 1   2

总结

Rxjava 的原理其实不难,Observable 和 Observer 通过 subscribe() 方法实现订阅关系,从而 Observable 可以在需要的时候发出事件来通知 Observer,并且回调 Observer 的相应的方法。

用一张简单的流程图形容如下:


关于RXJava的一律学习内容,我们这边都有系统的知识体系以及进阶视频资料,有需要的朋友可以加群免费领取安卓进阶视频教程,源码,面试资料,群内有大牛一起交流探讨技术;点击链接加入群聊【腾讯@Android高级架构】
(包括自己设置控件、NDK、架构设计、混合式开发工程师(React native,Weex)、性能优化、完整商业项目开发等)

Android高级进阶视频教程
  • 全部评论(0)
最新发布的资讯信息
【系统环境|】极客时间-数据分析实战45讲【完结】(2021-09-02 16:26)
【系统环境|windows】字节跳动前台面试题解析:盛最多水的容器(2021-03-20 21:27)
【系统环境|windows】DevOps敏捷60问,肯定有你想理解的问题(2021-03-20 21:27)
【系统环境|windows】字节跳动最爱考的前台面试题:JavaScript 基础(2021-03-20 21:27)
【系统环境|windows】JavaScript 的 switch 条件语句(2021-03-20 21:27)
【系统环境|windows】解决 XML 数据应用实践(2021-03-20 21:26)
【系统环境|windows】20个编写现代CSS代码的建议(2021-03-20 21:26)
【系统环境|windows】《vue 3.0探险记》- 运行报错:Error:To install them, you can run: npm install --save core-js/modules/es.arra...(2021-03-20 21:24)
【系统环境|windows】浅谈前台可视化编辑器的实现(2021-03-20 21:24)
【系统环境|windows】产品经理入门迁移学习指南(2021-03-20 21:23)
手机二维码手机访问领取大礼包
返回顶部