RxJava 响应式编程
问题
RxJava 的核心概念和在 Android 中的典型用法有哪些?
答案
RxJava 核心模型
Observable 类型
| 类型 | 发射 | 背压 | 用途 |
|---|---|---|---|
| Observable | 0~N 项 | ❌ | 小量数据、UI 事件 |
| Flowable | 0~N 项 | ✅ | 大量数据、数据库查询 |
| Single | 1 项或错误 | — | 网络请求 |
| Maybe | 0~1 项或错误 | — | 数据库查询可能为空 |
| Completable | 无数据,只有完成/错误 | — | 写操作、删除操作 |
基本使用
// Single:网络请求(返回一个结果)
interface ApiService {
@GET("users/{id}")
fun getUser(@Path("id") id: String): Single<User>
}
// 订阅
api.getUser("123")
.subscribeOn(Schedulers.io()) // 在 IO 线程请求
.observeOn(AndroidSchedulers.mainThread()) // 在主线程处理
.subscribe(
{ user -> showUser(user) }, // onSuccess
{ error -> showError(error) } // onError
)
常用操作符
// 1. 转换操作符
observable
.map { user -> user.name } // 1对1 转换
.flatMap { name -> api.search(name) } // 1对N 转换(并行)
.switchMap { query -> api.search(query) } // 1对N(取消前一个)
.filter { it.isActive } // 过滤
// 2. 组合操作符
Observable.zip(
api.getUser(),
api.getOrders(),
BiFunction { user, orders -> DashboardData(user, orders) }
)
Observable.combineLatest(
nameObservable,
emailObservable,
BiFunction { name, email -> isValid(name, email) }
)
// 3. 错误处理
api.getData()
.retry(3) // 失败重试 3 次
.onErrorReturn { emptyList() } // 错误时返回默认值
.onErrorResumeNext { api.getBackupData() } // 错误时切换数据源
// 4. 搜索防抖
searchSubject
.debounce(300, TimeUnit.MILLISECONDS) // 防抖
.distinctUntilChanged() // 去重
.switchMap { query -> // 取消前一次
api.search(query)
.toObservable()
.onErrorReturnItem(emptyList())
}
.observeOn(AndroidSchedulers.mainThread())
.subscribe { results -> showResults(results) }
线程调度
// subscribeOn:指定上游(数据源)执行线程,只有第一个生效
// observeOn:指定下游(操作符和观察者)执行线程,可以多次切换
api.getUsers() // IO 线程
.subscribeOn(Schedulers.io()) // ↑ 上游在 IO 线程
.map { users -> sortUsers(users) } // IO 线程(跟上游一样)
.observeOn(Schedulers.computation()) // ↓ 切换到计算线程
.filter { it.isActive } // 计算线程
.observeOn(AndroidSchedulers.mainThread()) // ↓ 切换到主线程
.subscribe { users -> adapter.submitList(users) } // 主线程
| Scheduler | 线程池 | 用途 |
|---|---|---|
Schedulers.io() | 缓存线程池 | 网络请求、文件 IO |
Schedulers.computation() | 固定大小(CPU 核心数) | CPU 计算 |
Schedulers.newThread() | 每次新建线程 | 不推荐 |
AndroidSchedulers.mainThread() | Android 主线程 | UI 更新 |
生命周期管理
class MyActivity : AppCompatActivity() {
private val disposables = CompositeDisposable()
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
val disposable = api.getUsers()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
{ users -> showUsers(users) },
{ error -> showError(error) }
)
// 添加到 CompositeDisposable
disposables.add(disposable)
}
override fun onDestroy() {
super.onDestroy()
// 统一取消所有订阅
disposables.clear()
}
}
RxJava vs Kotlin Flow
新项目推荐使用 Kotlin Flow。Flow 是协程原生支持,与 Jetpack 深度集成,学习曲线更低。RxJava 仍是许多存量项目的核心依赖,面试中需要了解。
| 特性 | RxJava | Flow |
|---|---|---|
| 语言 | Java/Kotlin | Kotlin 原生 |
| 依赖 | 需要额外库 | Kotlin 标准库 |
| 背压 | Flowable 显式支持 | Flow 天然支持(挂起) |
| 生命周期 | CompositeDisposable | repeatOnLifecycle |
| 操作符 | 极其丰富 | 够用,可自定义 |
常见面试问题
Q1: subscribeOn 和 observeOn 的区别?
答案:
subscribeOn:指定上游(Observable 执行数据发射)的线程。多次调用只有第一个生效。observeOn:指定下游(后续操作符和 Observer)的线程。每次调用都会切换线程,可以多次使用。
Q2: flatMap 和 switchMap 的区别?
答案:
flatMap:将每个上游元素映射为一个 Observable 并合并输出。多个内部 Observable 并行执行,结果交错。switchMap:同 flatMap,但每次新元素发射时取消上一个内部 Observable,只保留最新的。
搜索场景应使用 switchMap:用户快速输入时取消之前的搜索请求。
Q3: 如何防止 RxJava 内存泄漏?
答案:
- 使用
CompositeDisposable在onDestroy()统一clear() - 使用
AutoDispose库自动绑定生命周期 - 使用
takeUntil()操作符在特定事件时终止订阅