跳到主要内容

RxJava 响应式编程

问题

RxJava 的核心概念和在 Android 中的典型用法有哪些?

答案

RxJava 核心模型

Observable 类型

类型发射背压用途
Observable0~N 项小量数据、UI 事件
Flowable0~N 项大量数据、数据库查询
Single1 项或错误网络请求
Maybe0~1 项或错误数据库查询可能为空
Completable无数据,只有完成/错误写操作、删除操作

基本使用

// Single:网络请求(返回一个结果)
interface ApiService {
@GET("users/{id}")
fun getUser(@Path("id") id: String): Single<User>
}

// 订阅
api.getUser("123")
.subscribeOn(Schedulers.io()) // 在 IO 线程请求
.observeOn(AndroidSchedulers.mainThread()) // 在主线程处理
.subscribe(
{ user -> showUser(user) }, // onSuccess
{ error -> showError(error) } // onError
)

常用操作符

// 1. 转换操作符
observable
.map { user -> user.name } // 1对1 转换
.flatMap { name -> api.search(name) } // 1对N 转换(并行)
.switchMap { query -> api.search(query) } // 1对N(取消前一个)
.filter { it.isActive } // 过滤

// 2. 组合操作符
Observable.zip(
api.getUser(),
api.getOrders(),
BiFunction { user, orders -> DashboardData(user, orders) }
)

Observable.combineLatest(
nameObservable,
emailObservable,
BiFunction { name, email -> isValid(name, email) }
)

// 3. 错误处理
api.getData()
.retry(3) // 失败重试 3 次
.onErrorReturn { emptyList() } // 错误时返回默认值
.onErrorResumeNext { api.getBackupData() } // 错误时切换数据源

// 4. 搜索防抖
searchSubject
.debounce(300, TimeUnit.MILLISECONDS) // 防抖
.distinctUntilChanged() // 去重
.switchMap { query -> // 取消前一次
api.search(query)
.toObservable()
.onErrorReturnItem(emptyList())
}
.observeOn(AndroidSchedulers.mainThread())
.subscribe { results -> showResults(results) }

线程调度

// subscribeOn:指定上游(数据源)执行线程,只有第一个生效
// observeOn:指定下游(操作符和观察者)执行线程,可以多次切换

api.getUsers() // IO 线程
.subscribeOn(Schedulers.io()) // ↑ 上游在 IO 线程
.map { users -> sortUsers(users) } // IO 线程(跟上游一样)
.observeOn(Schedulers.computation()) // ↓ 切换到计算线程
.filter { it.isActive } // 计算线程
.observeOn(AndroidSchedulers.mainThread()) // ↓ 切换到主线程
.subscribe { users -> adapter.submitList(users) } // 主线程
Scheduler线程池用途
Schedulers.io()缓存线程池网络请求、文件 IO
Schedulers.computation()固定大小(CPU 核心数)CPU 计算
Schedulers.newThread()每次新建线程不推荐
AndroidSchedulers.mainThread()Android 主线程UI 更新

生命周期管理

class MyActivity : AppCompatActivity() {
private val disposables = CompositeDisposable()

override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)

val disposable = api.getUsers()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
{ users -> showUsers(users) },
{ error -> showError(error) }
)

// 添加到 CompositeDisposable
disposables.add(disposable)
}

override fun onDestroy() {
super.onDestroy()
// 统一取消所有订阅
disposables.clear()
}
}
RxJava vs Kotlin Flow

新项目推荐使用 Kotlin Flow。Flow 是协程原生支持,与 Jetpack 深度集成,学习曲线更低。RxJava 仍是许多存量项目的核心依赖,面试中需要了解。

特性RxJavaFlow
语言Java/KotlinKotlin 原生
依赖需要额外库Kotlin 标准库
背压Flowable 显式支持Flow 天然支持(挂起)
生命周期CompositeDisposablerepeatOnLifecycle
操作符极其丰富够用,可自定义

常见面试问题

Q1: subscribeOn 和 observeOn 的区别?

答案

  • subscribeOn:指定上游(Observable 执行数据发射)的线程。多次调用只有第一个生效。
  • observeOn:指定下游(后续操作符和 Observer)的线程。每次调用都会切换线程,可以多次使用。

Q2: flatMap 和 switchMap 的区别?

答案

  • flatMap:将每个上游元素映射为一个 Observable 并合并输出。多个内部 Observable 并行执行,结果交错。
  • switchMap:同 flatMap,但每次新元素发射时取消上一个内部 Observable,只保留最新的。

搜索场景应使用 switchMap:用户快速输入时取消之前的搜索请求。

Q3: 如何防止 RxJava 内存泄漏?

答案

  1. 使用 CompositeDisposableonDestroy() 统一 clear()
  2. 使用 AutoDispose 库自动绑定生命周期
  3. 使用 takeUntil() 操作符在特定事件时终止订阅

相关链接