目录
  1. 1. 一、响应式编程的核心思想
  2. 2. 二、Observable/Observer 订阅模型深入
  3. 3. 三、操作符链与装饰器模式
  4. 4. 四、线程调度与 Scheduler 机制
  5. 5. 五、背压(Backpressure)策略
  6. 6. 六、Disposable 与内存泄漏防护
  7. 7. 七、RxJava 2.x 与 3.x 的差异
  8. 8. 八、面试常问题目
解读开源框架系列-RxJava响应式编程框架设计

一、响应式编程的核心思想

响应式编程(Reactive Programming)是一种面向数据流和变化传播的编程范式。在命令式编程中,表达式 a = b + c 只在赋值那一刻计算一次;而在响应式编程中,当 b 或 c 发生变化时,a 会自动更新。RxJava 将这一思想带入 JVM 生态,用 Observable(可观察对象) 表示数据流,用 Observer(观察者) 订阅并响应这些数据流。

RxJava 的四个核心角色:

  1. ObservableSource:数据的生产者,负责发射数据(onNext)、发射完成信号(onComplete)或错误信号(onError)。
  2. Observer:数据的消费者,通过 subscribe(Observer) 与上游建立订阅关系。
  3. Disposable:用于取消订阅,防止内存泄漏。调用 dispose() 后,上游将停止发射数据。
  4. Operator(操作符):在上下游之间对数据流进行变换、过滤、聚合等操作。

RxJava 对 Reactive Streams 规范的实现体现在 Flowable 中,其核心接口定义在 org.reactivestreams 包的四个接口中:Publisher<T>Subscriber<T>SubscriptionProcessor<T,R>。这套规范已被纳入 JDK 9 的 java.util.concurrent.Flow

// 最简单的订阅模型
Observable.just("Hello", "World")
.map(String::toUpperCase)
.filter(s -> s.startsWith("H"))
.subscribe(System.out::println);

这条链路的执行流程是:just 发射两个字符串 → map 转换为大写 → filter 过滤 → 订阅者打印。每一层操作符都创建一个新的 Observable,形成装饰器模式的嵌套结构。

二、Observable/Observer 订阅模型深入

在 RxJava 2.x 源码中,Observable 是一个抽象类,内部持有 ObservableSource<T> 接口引用。ObservableCreateObservable 最基础的实现类,构造函数接收 ObservableOnSubscribe<T>,后者定义了 subscribe(ObservableEmitter<T> emitter) 方法。

// RxJava 2.x 源码:io.reactivex.ObservableCreate
public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source;

public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}

@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent); // 触发数据发射
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
}

注意这里的 subscribeActual 方法。Observable 的 subscribe(Observer) 方法最终会调用子类的 subscribeActual。这是模板方法模式——父类 Observable 定义 subscribe 的通用逻辑(如空检查、线程调度),子类实现 subscribeActual 完成具体的发射逻辑。

CreateEmitter 内部持有 Observer 的引用,负责将上游的 onNext/onError/onComplete 转发给下游,同时检查 isDisposed() 状态,一旦取消订阅就停止发射。这实现了背压(Backpressure)的最基本形式——通过 Disposable 终止数据流。

// CreateEmitter 的部分源码
static final class CreateEmitter<T> extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {

final Observer<? super T> observer;

CreateEmitter(Observer<? super T> observer) {
this.observer = observer;
}

@Override
public void onNext(T t) {
if (!isDisposed()) {
observer.onNext(t);
}
}

@Override
public void dispose() {
DisposableHelper.dispose(this);
}
}

三、操作符链与装饰器模式

操作符是 RxJava 的核心竞争力。常用的变换类操作符(map、flatMap、concatMap、switchMap)、过滤类操作符(filter、take、skip、distinct)、组合类操作符(zip、merge、combineLatest)都以 Observable 的子类实现。

map 操作符为例,RxJava 2.x 的 ObservableMap 类源码:

// io.reactivex.internal.operators.observable.ObservableMap
public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
final Function<? super T, ? extends U> function;

public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
super(source); // source 是上游的 Observable
this.function = function;
}

@Override
public void subscribeActual(Observer<? super U> t) {
source.subscribe(new MapObserver<T, U>(t, function)); // 包装下游 Observer
}

static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
final Function<? super T, ? extends U> mapper;

@Override
public void onNext(T t) {
U v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned null");
downstream.onNext(v); // 变换后传给下游
}
}
}

典型的操作符链 Observable.just(1).map(x -> x * 2).filter(x -> x > 0) 构建出的对象图:

ObservableJust → ObservableMap → ObservableFilter
↑ parent ↑ parent ↑ parent

每个操作符都持有上游的引用(upstream/source),订阅时自下而上逐层包装 Observer,数据发射时再自上而下逐层传递。这就是 assembly-time(装配期)subscription-time(订阅期) 的区分:

  • 装配期:操作符链中的对象被创建,形成嵌套引用。此时数据尚未流动。
  • 订阅期:subscribe() 被调用,从下游向上游逐层调用 subscribe,每一层创建对应的 Observer 包装,最终触发顶层的 subscribeActual

这一设计的精妙之处在于:数据流是惰性的(lazy),只有在订阅时才真正启动;而且每一层操作符之间是完全解耦的,可以自由组合。

flatMap 的操作更加复杂。它将每个上游元素映射为一个 Observable,然后将这些 Observable 合并(merge) 为一个输出流。注意 flatMap 不保证事件顺序,因为多个内部 Observable 可能交错发射。如果需要顺序保证,应使用 concatMap

// flatMap 合并内部 Observable 的核心逻辑(简化)
// io.reactivex.internal.operators.observable.ObservableFlatMap
@Override
public void onNext(T t) {
ObservableSource<? extends R> p;
try {
p = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned null");
} catch (Throwable e) {
onError(e);
return;
}
p.subscribe(new InnerObserver()); // 每个内部 Observable 独立订阅
}

四、线程调度与 Scheduler 机制

RxJava 的线程切换通过 subscribeOnobserveOn 完成。这是面试中的高频考点。

subscribeOn:指定上游 Observable 的工作线程,即 subscribeActual 中发射数据的线程。如果在链中多次调用 subscribeOn,只有第一个(离源头最近的)生效。原因是下游调用 subscribeOn 时,上游已经在上一个 subscribeOn 指定的线程上被订阅了。

observeOn:指定下游 Observer 接收数据的线程。可以在链中多次调用,每次调用后的下游都在新线程上工作。

Scheduler 的实现核心是 Scheduler.Worker,它是一个可调度的执行单元。RxJava 内置几种 Scheduler:

Scheduler 底层实现 适用场景
Schedulers.io() 无上限的 CachedThreadPool 网络请求、文件 I/O
Schedulers.computation() 固定大小(CPU 核数)的线程池 计算密集型操作
Schedulers.newThread() 每次新建线程 需要完全隔离的任务
AndroidSchedulers.mainThread() Handler(Looper.getMainLooper()) UI 更新
Schedulers.single() 单线程 需要顺序执行的任务

AndroidSchedulers.mainThread() 的实现原理非常简单:

// rxandroid: AndroidSchedulers.java
public final class AndroidSchedulers {
private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler(
() -> new HandlerScheduler(new Handler(Looper.getMainLooper()), true));
}

// HandlerScheduler 的 schedule 方法
@Override
public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
run = RxJavaPlugins.onSchedule(run);
ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
Message message = Message.obtain(handler, scheduled);
message.obj = this;
handler.sendMessageDelayed(message, unit.toMillis(delay));
return scheduled;
}

源码路径:rxandroid/src/main/java/rx/android/schedulers/HandlerScheduler.java

五、背压(Backpressure)策略

当上游发射数据的速度超过下游消费的速度时,就会产生背压。RxJava 2.x 将 Observable 拆分为两类:

  • Observable:不支持背压(适合少量数据、GUI 事件)
  • Flowable:支持背压(适合大量数据、流式处理)

Flowable 的背压策略通过 BackpressureStrategy 枚举定义(源码路径 io.reactivex.BackpressureStrategy):

  1. MISSING:不做任何处理,下游需要自行通过 request(n) 控制速率。数据可能溢出导致 MissingBackpressureException
  2. ERROR:当数据堆积时直接抛出 MissingBackpressureException
  3. BUFFER:无界缓冲,所有数据先入队列。可能导致 OOM。
  4. DROP:丢弃下游来不及处理的数据。
  5. LATEST:仅保留最新的一个数据,旧的被丢弃。

Flowable 内部的背压协议基于 Reactive Streams 规范的 Subscription.request(n) 机制:

// 下游告诉上游:我准备好了,给我 n 个数据
subscription.request(10);

FlowableCreate 中,FlowableEmitter 实现了 request(n) 的感知:

// FlowableCreate.java - 关键字段
final AtomicLong requested = new AtomicLong(); // 下游请求的数量

// 每次发射检查是否需要
public void onNext(T t) {
if (requested.get() != 0) {
actual.onNext(t);
BackpressureHelper.produced(requested, 1); // 减少请求计数
}
}

io.reactivex.internal.util.BackpressureHelper 包含用于管理请求计数的工具方法,使用 CAS(Compare-And-Swap)保证原子性。

六、Disposable 与内存泄漏防护

任何订阅如果不取消,都可能导致 Activity/Fragment 无法被 GC 回收。RxJava 提供了三级防御机制:

第一级:手动管理 Disposable

private CompositeDisposable compositeDisposable = new CompositeDisposable();

@Override
protected void onCreate(Bundle savedInstanceState) {
Disposable d = Observable.interval(1, TimeUnit.SECONDS)
.subscribe(count -> textView.setText(String.valueOf(count)));
compositeDisposable.add(d);
}

@Override
protected void onDestroy() {
compositeDisposable.clear(); // 批量取消所有订阅
}

第二级:RxLifecycle / AutoDispose

AutoDispose 通过拦截 subscribe 调用,自动在生命周期结束时取消订阅。其核心是在 Observer 和上游之间插入一个 AutoDisposingObserver,监听 LifecycleOwner 的 ON_DESTROY 事件。

第三级:UndeliverableException 处理

onError 发生后订阅已终止,后续的事件无法投递,会抛出 UndeliverableException。应通过 RxJavaPlugins.setErrorHandler() 设置全局异常处理器:

RxJavaPlugins.setErrorHandler(e -> {
if (e instanceof UndeliverableException) {
Log.w("RxJava", "Undeliverable exception", e);
} else {
Thread.currentThread().getUncaughtExceptionHandler()
.uncaughtException(Thread.currentThread(), e);
}
});

七、RxJava 2.x 与 3.x 的差异

RxJava 3.x 基于 Java 8,引入了 java.util.concurrent.Flow 兼容性。关键变化包括:

  • 所有操作符、类使用 Java 8 方法引用和 lambda
  • CompletableSingleMaybe 的操作符增加
  • Flowable 可直接转换为 Flow.Publisher
  • 移除了 Observable.flatMap() 过时方法,统一到 flatMap 的规范实现

生产环境建议使用 3.x,2.x 已进入维护模式。

八、面试常问题目

Q1: subscribeOn 和 observeOn 的区别?多次调用 subscribeOn 会怎样?

subscribeOn 影响上游数据发射所在的线程,只第一个生效(最近的源头)。observeOn 影响下游数据接收所在的线程,可以多次调用以切换不同阶段的线程。例如:.subscribeOn(Schedulers.io()).map(...).observeOn(AndroidSchedulers.mainThread()).subscribe(...) 表示在 IO 线程发数据、做 map 变换,在主线程接收结果。

Q2: map 和 flatMap 的区别是什么?

map 是一对一变换,将 T 类型转换为 U 类型;flatMap 是一对多变换,将 T 映射为一个 Observable<U>,然后将所有内部 Observable 合并(merge)输出。flatMap 不能保证顺序,concatMap 保证顺序但有性能开销,switchMap 在新事件到来时取消旧的内部 Observable。

Q3: RxJava 如何防止内存泄漏?必须手动 dispose 吗?

RxJava 提供 CompositeDisposable 进行批量取消;推荐使用 AutoDispose 或 RxLifecycle 将订阅绑定到生命周期;也可使用 takeUntil() 操作符配合生命周期事件自动取消。核心原则:每个 subscribe 返回的 Disposable 必须在适当生命周期方法中 dispose。

Q4: Observable 和 Flowable 的核心区别是什么?

Observable 不支持背压,适用于少量、低频事件(GUI 点击、单个网络请求结果)。Flowable 支持背压,实现了 Reactive Streams 规范,通过 request(n) 机制让下游控制上游发射速率,适用于大量数据流(数据库查询结果、传感器事件流)。Flowable 额外开销更大,仅在需要背压时使用。


参考源码路径:

  • RxJava 核心:https://github.com/ReactiveX/RxJava/tree/2.x/src/main/java/io/reactivex
  • Observable 创建:io.reactivex.internal.operators.observable.ObservableCreate
  • Map 操作符:io.reactivex.internal.operators.observable.ObservableMap
  • 背压策略:io.reactivex.BackpressureStrategy
  • Flowable 创建:io.reactivex.internal.operators.flowable.FlowableCreate
  • RxAndroid Scheduler:rxandroid/src/main/java/rx/android/schedulers/AndroidSchedulers.java
  • Reactive Streams 规范:http://www.reactive-streams.org/
打赏
  • 微信
  • 支付宝

评论