目录
  1. 1. 一、响应式编程的核心思想
  2. 2. 二、Observable 族的四种类型
    1. 2.1. 2.1 Observable
    2. 2.2. 2.2 Flowable
    3. 2.3. 2.3 Single
    4. 2.4. 2.4 Completable
    5. 2.5. 2.5 Maybe
  3. 3. 三、Observable/Observer 订阅模型深入
  4. 4. 四、操作符链与装饰器模式
    1. 4.1. 4.1 flatMap 与内部 Observable 管理
    2. 4.2. 4.2 操作符融合(Operator Fusion)
  5. 5. 五、线程调度与 Scheduler 机制
    1. 5.1. 5.1 IoScheduler 内部实现
    2. 5.2. 5.2 AndroidSchedulers.mainThread() 内部实现
    3. 5.3. 5.3 subscribeOn 和 observeOn 的线程切换原理
  6. 6. 六、背压(Backpressure)策略
    1. 6.1. 6.1 Flowable 内部队列
  7. 7. 七、Disposable 与内存泄漏防护
    1. 7.1. 7.1 dispose 的原子性传播
  8. 8. 八、RxJava 2.x 与 3.x 的差异
    1. 8.1. 8.1 RxJava 与 Kotlin Coroutines 的对比
  9. 9. 九、RxJavaPlugins 全局钩子
  10. 10. 十、面试常问题目
解读开源框架系列-RxJava响应式编程框架设计

一、响应式编程的核心思想

响应式编程(Reactive Programming)是一种面向数据流和变化传播的编程范式。在命令式编程中,表达式 a = b + c 只在赋值那一刻计算一次;而在响应式编程中,当 b 或 c 发生变化时,a 会自动更新。RxJava 将这一思想带入 JVM 生态,用 Observable(可观察对象) 表示数据流,用 Observer(观察者) 订阅并响应这些数据流。

RxJava 的四个核心角色:

  1. ObservableSource:数据的生产者,负责发射数据(onNext)、发射完成信号(onComplete)或错误信号(onError)。
  2. Observer:数据的消费者,通过 subscribe(Observer) 与上游建立订阅关系。
  3. Disposable:用于取消订阅,防止内存泄漏。调用 dispose() 后,上游将停止发射数据。
  4. Operator(操作符):在上下游之间对数据流进行变换、过滤、聚合等操作。

RxJava 对 Reactive Streams 规范的实现体现在 Flowable 中,其核心接口定义在 org.reactivestreams 包的四个接口中:Publisher<T>Subscriber<T>SubscriptionProcessor<T,R>。这套规范已被纳入 JDK 9 的 java.util.concurrent.Flow

// 最简单的订阅模型
Observable.just("Hello", "World")
.map(String::toUpperCase)
.filter(s -> s.startsWith("H"))
.subscribe(System.out::println);

这条链路的执行流程是:just 发射两个字符串 → map 转换为大写 → filter 过滤 → 订阅者打印。每一层操作符都创建一个新的 Observable,形成装饰器模式的嵌套结构。

二、Observable 族的四种类型

RxJava 2.x/3.x 提供了五种响应式类型,它们各有适用场景:

2.1 Observable

发射 0 到 N 个数据,以 onComplete 或 onError 终止。不支持背压,适用于 GUI 事件、少量数据流(最多几千个元素)。典型场景:按钮点击事件、单个网络请求响应。

2.2 Flowable

发射 0 到 N 个数据,支持背压。实现了 Reactive Streams 规范,通过 request(n) 让下游控制上游的发射速率。适用于大量数据流:数据库查询结果、文件读取、传感器事件流。额外开销比 Observable 大(需要维护请求计数和队列)。

2.3 Single

发射恰好 1 个数据或 1 个错误。没有 onComplete 回调——onSuccess 替代了 onNext+onComplete。适用于”请求-响应”模式:网络请求、数据库查询。内部实现比 Observable 简单,不需要处理多元素和完成信号的关系。

Single.just("Hello")
.map(String::length)
.subscribe(
length -> System.out.println("Length: " + length),
Throwable::printStackTrace
);

2.4 Completable

只关心操作是否完成,不发射数据。只有 onComplete 和 onError 两种终止信号。适用于:写入数据库、文件操作、发送分析事件——只关心成功或失败,不需要返回值。

Completable.fromAction(() -> database.deleteAll())
.subscribe(
() -> Log.d("TAG", "Delete completed"),
throwable -> Log.e("TAG", "Delete failed", throwable)
);

2.5 Maybe

结合了 Single 和 Completable:可能发射 1 个数据,也可能不发射任何数据就完成,或发射一个错误。适用于:从缓存中查找数据——可能找到(onSuccess)、可能找不到(onComplete)、可能出错(onError)。

Maybe.fromCallable(() -> cache.get("key"))
.subscribe(
value -> System.out.println("Found: " + value),
Throwable::printStackTrace,
() -> System.out.println("Not found")
);

选择指南

类型 数据量 完成信号 背压 典型场景
Observable 0..N onComplete UI 事件、单个 API 调用
Flowable 0..N onComplete 大数据流、数据库查询
Single 1 onSuccess 网络请求
Completable 0 onComplete 写操作
Maybe 0..1 onComplete/onSuccess 缓存查询

三、Observable/Observer 订阅模型深入

在 RxJava 2.x 源码中,Observable 是一个抽象类,内部持有 ObservableSource<T> 接口引用。ObservableCreateObservable 最基础的实现类,构造函数接收 ObservableOnSubscribe<T>,后者定义了 subscribe(ObservableEmitter<T> emitter) 方法。

// RxJava 2.x 源码:io.reactivex.ObservableCreate
public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source;

public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}

@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent); // 触发数据发射
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
}

注意这里的 subscribeActual 方法。Observable 的 subscribe(Observer) 方法最终会调用子类的 subscribeActual。这是模板方法模式——父类 Observable 定义 subscribe 的通用逻辑(如空检查、线程调度),子类实现 subscribeActual 完成具体的发射逻辑。

CreateEmitter 内部持有 Observer 的引用,负责将上游的 onNext/onError/onComplete 转发给下游,同时检查 isDisposed() 状态,一旦取消订阅就停止发射。这实现了背压(Backpressure)的最基本形式——通过 Disposable 终止数据流。

// CreateEmitter 的部分源码
static final class CreateEmitter<T> extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {

final Observer<? super T> observer;

CreateEmitter(Observer<? super T> observer) {
this.observer = observer;
}

@Override
public void onNext(T t) {
if (!isDisposed()) {
observer.onNext(t);
}
}

@Override
public void dispose() {
DisposableHelper.dispose(this);
}
}

DisposableHelper 是一个工具类,使用 CAS 原子操作管理 Disposable 的状态。它将 Disposable 引用封装在 AtomicReference<Disposable> 中:

// io.reactivex.internal.disposables.DisposableHelper
public enum DisposableHelper implements Disposable {
DISPOSED;

public static boolean dispose(AtomicReference<Disposable> field) {
Disposable current = field.get();
Disposable d = DISPOSED;
if (current != d) {
current = field.getAndSet(d);
if (current != d) {
if (current != null) {
current.dispose();
}
return true;
}
}
return false;
}

public static boolean isDisposed(AtomicReference<Disposable> field) {
return field.get() == DISPOSED;
}
}

这个枚举实现非常巧妙:使用单例 DISPOSED 作为已取消状态的标记,CAS 保证线程安全。一旦 getAndSet(DISPOSED) 成功,后续所有 isDisposed() 检查都返回 true,且 dispose() 只执行一次。

四、操作符链与装饰器模式

操作符是 RxJava 的核心竞争力。常用的变换类操作符(map、flatMap、concatMap、switchMap)、过滤类操作符(filter、take、skip、distinct)、组合类操作符(zip、merge、combineLatest)都以 Observable 的子类实现。

map 操作符为例,RxJava 2.x 的 ObservableMap 类源码:

// io.reactivex.internal.operators.observable.ObservableMap
public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
final Function<? super T, ? extends U> function;

public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
super(source); // source 是上游的 Observable
this.function = function;
}

@Override
public void subscribeActual(Observer<? super U> t) {
source.subscribe(new MapObserver<T, U>(t, function)); // 包装下游 Observer
}

static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
final Function<? super T, ? extends U> mapper;

@Override
public void onNext(T t) {
U v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned null");
downstream.onNext(v); // 变换后传给下游
}
}
}

典型的操作符链 Observable.just(1).map(x -> x * 2).filter(x -> x > 0) 构建出的对象图:

ObservableJust → ObservableMap → ObservableFilter
↑ parent ↑ parent ↑ parent

每个操作符都持有上游的引用(upstream/source),订阅时自下而上逐层包装 Observer,数据发射时再自上而下逐层传递。这就是 assembly-time(装配期)subscription-time(订阅期) 的区分:

  • 装配期:操作符链中的对象被创建,形成嵌套引用。此时数据尚未流动。
  • 订阅期:subscribe() 被调用,从下游向上游逐层调用 subscribe,每一层创建对应的 Observer 包装,最终触发顶层的 subscribeActual

这一设计的精妙之处在于:数据流是惰性的(lazy),只有在订阅时才真正启动;而且每一层操作符之间是完全解耦的,可以自由组合。

4.1 flatMap 与内部 Observable 管理

flatMap 的操作更加复杂。它将每个上游元素映射为一个 Observable,然后将这些 Observable 合并(merge) 为一个输出流。注意 flatMap 不保证事件顺序,因为多个内部 Observable 可能交错发射。如果需要顺序保证,应使用 concatMap

// flatMap 合并内部 Observable 的核心逻辑(简化)
// io.reactivex.internal.operators.observable.ObservableFlatMap
@Override
public void onNext(T t) {
ObservableSource<? extends R> p;
try {
p = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned null");
} catch (Throwable e) {
onError(e);
return;
}
p.subscribe(new InnerObserver()); // 每个内部 Observable 独立订阅
}

concatMap 的区别在于它维护一个内部队列,前一个 Observable 完成后再订阅下一个,保证顺序。switchMap 则在新 Observable 到来时取消(dispose)前一个内部 Observable 的订阅——适用于搜索框输入联想场景(只需最新搜索词的结果)。

4.2 操作符融合(Operator Fusion)

RxJava 2.x 引入了操作符融合机制,这是一项重要的性能优化。BasicFuseableObserver 中的 “Fuseable” 就是这个概念。当相邻的两个操作符满足条件时,可以通过 QueueDisposable / QueueSubscription 接口直接传递数据,跳过 Observer 包装的开销。

融合分两种模式:

  1. 同步融合(SYNC):上游 Observable.just() 直接提供一个已计算完成的数据队列,下游 map 的 Observer 无需通过 onNext 回调,而是直接通过 poll() 获取数据并变换。
  2. 异步融合(ASYNC):上游是异步数据源(如 Observable.range()),下游操作符通过 requestFusion() 协商进入融合模式,用 poll() + drain() 的拉取模式替代逐次 onNext 推送。
// BasicFuseableObserver 中的融合协商
@Override
public void onSubscribe(Disposable d) {
if (DisposableHelper.validate(this.upstream, d)) {
this.upstream = d;
if (d instanceof QueueDisposable) {
QueueDisposable<T> qd = (QueueDisposable<T>) d;
int m = qd.requestFusion(QueueDisposable.ANY);
if (m == QueueDisposable.SYNC) {
sourceMode = SYNC;
queue = qd;
done = true;
downstream.onSubscribe(this); // 立刻通知下游
return;
}
if (m == QueueDisposable.ASYNC) {
sourceMode = ASYNC;
queue = qd;
}
}
downstream.onSubscribe(this);
}
}

融合情况下,map 不再用 onNext 逐个接收数据,而是从 queue.poll() 批量拉取。这显著减少了方法调用栈深度和 Observer 链上的逐层转发开销。

五、线程调度与 Scheduler 机制

RxJava 的线程切换通过 subscribeOnobserveOn 完成。这是面试中的高频考点。

subscribeOn:指定上游 Observable 的工作线程,即 subscribeActual 中发射数据的线程。如果在链中多次调用 subscribeOn,只有第一个(离源头最近的)生效。原因是下游调用 subscribeOn 时,上游已经在上一个 subscribeOn 指定的线程上被订阅了。

observeOn:指定下游 Observer 接收数据的线程。可以在链中多次调用,每次调用后的下游都在新线程上工作。

Scheduler 的实现核心是 Scheduler.Worker,它是一个可调度的执行单元。RxJava 内置几种 Scheduler:

Scheduler 底层实现 适用场景
Schedulers.io() 无上限的 CachedThreadPool 网络请求、文件 I/O
Schedulers.computation() 固定大小(CPU 核数)的线程池 计算密集型操作
Schedulers.newThread() 每次新建线程 需要完全隔离的任务
AndroidSchedulers.mainThread() Handler(Looper.getMainLooper()) UI 更新
Schedulers.single() 单线程 需要顺序执行的任务
Schedulers.trampoline() 当前线程的队列 递归调度避免栈溢出

5.1 IoScheduler 内部实现

// io.reactivex.internal.schedulers.IoScheduler
public final class IoScheduler extends Scheduler {
final AtomicReference<CachedWorkerPool> pool;

@Override
public Worker createWorker() {
return new EventLoopWorker(pool.get());
}

// CachedWorkerPool 维护一个 ScheduledExecutorService
static final class CachedWorkerPool implements Runnable {
private final ScheduledExecutorService executorService;
private final CompositeDisposable allWorkers;

ScheduledExecutorService create() {
return Executors.newScheduledThreadPool(1, threadFactory);
}

// 定期清理闲置线程(默认 60 秒无任务则回收)
@Override
public void run() {
// 检查并移除空闲超过 60 秒的 worker
}
}
}

关键设计:Schedulers.io() 虽然”无上限”,但并非每次创建新线程。它使用线程池缓存,核心线程数为 0,最大线程数为 Integer.MAX_VALUE,线程空闲 60 秒后回收。这使得 IO 调度器能应对大量并发 IO 请求,同时空闲时不会持有不必要的线程。

5.2 AndroidSchedulers.mainThread() 内部实现

AndroidSchedulers.mainThread() 的实现原理非常简单:

// rxandroid: AndroidSchedulers.java
public final class AndroidSchedulers {
private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler(
() -> new HandlerScheduler(new Handler(Looper.getMainLooper()), true));
}

// HandlerScheduler 的 schedule 方法
@Override
public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
run = RxJavaPlugins.onSchedule(run);
ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
Message message = Message.obtain(handler, scheduled);
message.obj = this;
handler.sendMessageDelayed(message, unit.toMillis(delay));
return scheduled;
}

HandlerScheduler 通过 Handler.sendMessageDelayed() 将任务投递到主线程的 MessageQueue。ScheduledRunnable 实现了 Disposable,可以通过 handler.removeCallbacks() 取消尚未执行的任务。message.obj = this 的设置用于在取消时找到对应的 Message(通过 handler.removeCallbacksAndMessages(this))。

源码路径:rxandroid/src/main/java/rx/android/schedulers/HandlerScheduler.java

5.3 subscribeOn 和 observeOn 的线程切换原理

subscribeOn 的实现 —— 以 ObservableSubscribeOn 为例:

// io.reactivex.internal.operators.observable.ObservableSubscribeOn
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;

@Override
public void subscribeActual(final Observer<? super T> observer) {
// 将订阅动作包装成 Runnable,提交到 Scheduler
Scheduler.Worker w = scheduler.createWorker();
observer.onSubscribe(w); // 让下游可以取消整个调度

w.schedule(new SubscribeTask(new SubscribeOnObserver<T>(observer)));
}

final class SubscribeTask implements Runnable {
final SubscribeOnObserver<T> parent;

@Override
public void run() {
// 在 Scheduler 的线程中执行上游的 subscribe
source.subscribe(parent);
}
}
}

observeOn 的实现 —— 以 ObservableObserveOn 为例:

// io.reactivex.internal.operators.observable.ObservableObserveOn
@Override
public void onNext(T t) {
if (sourceMode != QueueDisposable.ASYNC) {
queue.offer(t); // 将数据放入队列
}
schedule(); // 调度到目标线程消费队列
}

void schedule() {
if (outputFused) {
worker.schedule(this);
}
}

observeOn 的核心是一个生产者-消费者队列:上游的 onNext 将数据放入 SpscArrayQueue(单生产者单消费者无锁队列),然后在目标 Scheduler 上调度一个 Worker 消费队列,消费时逐条调用下游 Observer 的 onNext。

六、背压(Backpressure)策略

当上游发射数据的速度超过下游消费的速度时,就会产生背压。RxJava 2.x 将 Observable 拆分为两类:

  • Observable:不支持背压(适合少量数据、GUI 事件)
  • Flowable:支持背压(适合大量数据、流式处理)

Flowable 的背压策略通过 BackpressureStrategy 枚举定义(源码路径 io.reactivex.BackpressureStrategy):

  1. MISSING:不做任何处理,下游需要自行通过 request(n) 控制速率。数据可能溢出导致 MissingBackpressureException
  2. ERROR:当数据堆积时直接抛出 MissingBackpressureException
  3. BUFFER:无界缓冲,所有数据先入队列。可能导致 OOM。
  4. DROP:丢弃下游来不及处理的数据。
  5. LATEST:仅保留最新的一个数据,旧的被丢弃。

Flowable 内部的背压协议基于 Reactive Streams 规范的 Subscription.request(n) 机制:

// 下游告诉上游:我准备好了,给我 n 个数据
subscription.request(10);

FlowableCreate 中,FlowableEmitter 实现了 request(n) 的感知:

// FlowableCreate.java - 关键字段
final AtomicLong requested = new AtomicLong(); // 下游请求的数量

// 每次发射检查是否需要
public void onNext(T t) {
if (requested.get() != 0) {
actual.onNext(t);
BackpressureHelper.produced(requested, 1); // 减少请求计数
}
}

io.reactivex.internal.util.BackpressureHelper 包含用于管理请求计数的工具方法,使用 CAS(Compare-And-Swap)保证原子性:

public static long produced(AtomicLong requested, long n) {
for (;;) {
long current = requested.get();
if (current == Long.MAX_VALUE) {
return Long.MAX_VALUE; // 无界模式
}
long update = current - n;
if (update < 0) {
// 上游发射超过了请求数量,这是 bug
throw new IllegalStateException("More produced than requested: " + update);
}
if (requested.compareAndSet(current, update)) {
return update;
}
}
}

Long.MAX_VALUE 的语义是”下游接受无限数据”(如调用 request(Long.MAX_VALUE)),此时 produced() 直接返回,无任何限制。这是 Flowable 内建的”退路”——如果某些操作符不支持背压,可以直接请求最大值来退化成无背压行为。

6.1 Flowable 内部队列

FlowableCreate 内部根据不同 BackpressureStrategy 使用不同的队列实现:

// FlowableCreate 内部
if (backpressure == BackpressureStrategy.MISSING) {
emitter = new MissingEmitter<T>(t);
} else if (backpressure == BackpressureStrategy.ERROR) {
emitter = new ErrorAsyncEmitter<T>(t);
} else if (backpressure == BackpressureStrategy.DROP) {
emitter = new DropAsyncEmitter<T>(t);
} else if (backpressure == BackpressureStrategy.LATEST) {
emitter = new LatestAsyncEmitter<T>(t);
} else {
emitter = new BufferAsyncEmitter<T>(t, capacity); // BUFFER
}

七、Disposable 与内存泄漏防护

任何订阅如果不取消,都可能导致 Activity/Fragment 无法被 GC 回收。RxJava 提供了三级防御机制:

第一级:手动管理 Disposable

private CompositeDisposable compositeDisposable = new CompositeDisposable();

@Override
protected void onCreate(Bundle savedInstanceState) {
Disposable d = Observable.interval(1, TimeUnit.SECONDS)
.subscribe(count -> textView.setText(String.valueOf(count)));
compositeDisposable.add(d);
}

@Override
protected void onDestroy() {
compositeDisposable.clear(); // 批量取消所有订阅
}

CompositeDisposable 内部使用 OpenHashSet<Disposable> 存储,并提供原子性的 add/delete/clear 操作。clear() 不仅清空集合,还会对每个 Disposable 调用 dispose()

第二级:RxLifecycle / AutoDispose

AutoDispose 通过拦截 subscribe 调用,自动在生命周期结束时取消订阅。其核心是在 Observer 和上游之间插入一个 AutoDisposingObserver,监听 LifecycleOwner 的 ON_DESTROY 事件。

AutoDispose 使用 as() 操作符和 ScopeProvider 模式:

// 使用 AutoDispose
myObservable
.as(AutoDispose.autoDisposable(AndroidLifecycleScopeProvider.from(this)))
.subscribe(data -> updateUI(data));

AndroidLifecycleScopeProvider 监听 Lifecycle 事件,在 ON_DESTROY 时触发 dispose 信号。具体实现是在 LiveDataReactiveStreams 中将 Lifecycle 的 ON_DESTROY 事件映射为 emit(Unit)Maybe<Lifecycle.Event>,然后 AutoDisposingObserver 在收到事件时 dispose 上游。

第三级:UndeliverableException 处理

onError 发生后订阅已终止,后续的事件无法投递,会抛出 UndeliverableException。应通过 RxJavaPlugins.setErrorHandler() 设置全局异常处理器:

RxJavaPlugins.setErrorHandler(e -> {
if (e instanceof UndeliverableException) {
Log.w("RxJava", "Undeliverable exception", e);
} else {
Thread.currentThread().getUncaughtExceptionHandler()
.uncaughtException(Thread.currentThread(), e);
}
});

7.1 dispose 的原子性传播

dispose 的传播方向是从下游向上游。DisposableHelper.dispose() 首先断开下游的 Disposable 引用(通过设置 CAS 为 DISPOSED),然后调用上游的 dispose。这是一个原子操作——一旦 CAS 成功,后续所有 onNext/onError/onComplete 调用都会在 isDisposed() 检查处被拦截。

八、RxJava 2.x 与 3.x 的差异

RxJava 3.x 基于 Java 8,引入了 java.util.concurrent.Flow 兼容性。关键变化包括:

  • 所有操作符、类使用 Java 8 方法引用和 lambda
  • CompletableSingleMaybe 的操作符增加
  • Flowable 可直接转换为 Flow.Publisher
  • 移除了 Observable.flatMap() 过时方法,统一到 flatMap 的规范实现
  • 引入了 @CheckReturnValue 注解,所有返回 Disposable/Subscription 的操作符方法都被标记,防止丢失订阅引用

生产环境建议使用 3.x,2.x 已进入维护模式。

8.1 RxJava 与 Kotlin Coroutines 的对比

随着 Kotlin 在 Android 开发中的普及,Coroutines 逐渐成为 RxJava 的主要替代方案。

维度 RxJava Kotlin Coroutines
编程模型 响应式流(Observable/Flowable) 结构化并发(suspend/Flow)
线程切换 subscribeOn/observeOn withContext/flowOn
背压 Flowable 原生支持 Flow 原生支持(Channel.BUFFERED 等)
错误处理 onError 回调链 try-catch / catch 操作符
操作符丰富度 极丰富(200+ 操作符) 基础齐全(正在快速增长)
学习曲线 陡峭 相对平缓
调试体验 困难(操作符链深,堆栈难读) 相对容易(suspend 堆栈可读性好)
生命周期绑定 需额外库(AutoDispose) 天然与 lifecycleScope 集成
Java 互操作 原生支持 需要少量适配
性能 有装饰器链开销 有协程切换开销,通常更轻量

RxJava 的优势在于操作符生态极其丰富,对于复杂的流转换(如 debounce + switchMap + distinctUntilChanged 的搜索框场景)有成熟的范式。Coroutines 的优势在于与 Kotlin 语言深度集成,代码更接近同步写法,调试更简单。

九、RxJavaPlugins 全局钩子

RxJavaPlugins 是 RxJava 提供的全局 Hook 机制,主要有三个用途:

  1. 全局错误处理setErrorHandler()
  2. 调度器替换setInitIoSchedulerHandler() / setInitComputationSchedulerHandler() 等在 Scheduler 创建前注入自定义实例
  3. Observable 创建拦截setOnObservableAssembly() 对所有 Observalbe 装配过程注入监控
// 全局监控所有 Observable 的创建(调试/性能分析用)
RxJavaPlugins.setOnObservableAssembly(observable -> {
// 在装配时记录堆栈
return observable;
});

这是测试和生产环境中实现统一日志、性能追踪的入口。

十、面试常问题目

Q1: subscribeOn 和 observeOn 的区别?多次调用 subscribeOn 会怎样?

subscribeOn 影响上游数据发射所在的线程,只第一个生效(最近的源头)。observeOn 影响下游数据接收所在的线程,可以多次调用以切换不同阶段的线程。例如:.subscribeOn(Schedulers.io()).map(...).observeOn(AndroidSchedulers.mainThread()).subscribe(...) 表示在 IO 线程发数据、做 map 变换,在主线程接收结果。

Q2: map 和 flatMap 的区别是什么?

map 是一对一变换,将 T 类型转换为 U 类型;flatMap 是一对多变换,将 T 映射为一个 Observable<U>,然后将所有内部 Observable 合并(merge)输出。flatMap 不能保证顺序,concatMap 保证顺序但有性能开销(需要等前一个内部 Observable 完成),switchMap 在新事件到来时取消旧的内部 Observable。

Q3: RxJava 如何防止内存泄漏?必须手动 dispose 吗?

RxJava 提供 CompositeDisposable 进行批量取消;推荐使用 AutoDispose 或 RxLifecycle 将订阅绑定到生命周期;也可使用 takeUntil() 操作符配合生命周期事件自动取消。核心原则:每个 subscribe 返回的 Disposable 必须在适当生命周期方法中 dispose。

Q4: Observable 和 Flowable 的核心区别是什么?

Observable 不支持背压,适用于少量、低频事件(GUI 点击、单个网络请求结果)。Flowable 支持背压,实现了 Reactive Streams 规范,通过 request(n) 机制让下游控制上游发射速率,适用于大量数据流(数据库查询结果、传感器事件流)。Flowable 额外开销更大(需维护原子计数器和队列),仅在需要背压时使用。

Q5: 操作符融合(Operator Fusion)解决了什么问题?

操作符融合解决了操作符链中逐层包装 Observer 带来的性能开销。当相邻的两个操作符满足条件时(如 Observable.just() + map),它们可以通过 QueueDisposable 接口直接推送/拉取数据,跳过 Observer 的一次包装和方法调用。这是 RxJava 2.x 的重要性能优化,在大量数据的场景(如 range().map().filter())下性能提升可达 30-50%。


参考源码路径:

  • RxJava 核心:https://github.com/ReactiveX/RxJava/tree/2.x/src/main/java/io/reactivex
  • Observable 创建:io.reactivex.internal.operators.observable.ObservableCreate
  • Map 操作符:io.reactivex.internal.operators.observable.ObservableMap
  • 背压策略:io.reactivex.BackpressureStrategy
  • Flowable 创建:io.reactivex.internal.operators.flowable.FlowableCreate
  • RxAndroid Scheduler:rxandroid/src/main/java/rx/android/schedulers/AndroidSchedulers.java
  • Reactive Streams 规范:http://www.reactive-streams.org/
  • AutoDispose:https://github.com/uber/AutoDispose
  • Operator Fusion:io.reactivex.internal.fuseable.QueueDisposable
打赏
  • 微信
  • 支付宝

评论