本文译自「How to Manage Backpressure in Kotlin Flow: collect • buffer • conflate • collectLatest」,原文链接https://proandroiddev.com/how-to-manage-backpressure-in-kotlin-flow-collect-buffer-conflate-collectlatest-b8102284d968,由Shbazhenov发布于2025年6月13日。
你是否曾遇到过快速数据源发送的数据量超出应用处理能力的情况,导致应用速度变慢甚至崩溃?Kotlin Flow 内置了一些方法,可让你的生产者和消费者保持同步。本文将介绍:
- 回压的含义
- Flow 默认的“互相等待”模式如何工作
- 何时使用 buffer() 添加小型队列
- conflate() 如何跳过旧数据项
- 为什么 collectLatest { } 会停止旧数据处理
- 如何根据你的情况选择合适的选项
回压的含义
回压的作用是确保快速的数据发送方不会压垮较慢的接收方。如果没有回压,你可能会在内存中存储过多的数据,或者浪费时间处理过时的信息。
回压可以帮助你:
- 控制内存使用量
- 避免不必要的工作
- 使应用性能更可预测
1. 默认“互相等待”模式
默认情况下,当你执行以下操作时:
1 2 3 4 5 6 7 8 9 10 11 |
|
发送方 ( emit ) 将暂停,直到处理方 ( collect ) 处理完最后一个值。没有队列,每个值都是一次发送和处理一个。
2. 使用 buffer() 添加一个小队列
如果你希望发送方提前一点,请使用:
1 2 3 4 5 |
|
- 现在,发送者最多可以将 2 个项目放入一个小队列中。
- 一旦队列满了,它就会再次暂停。
这给了你一个有限的队列:你仍然可以处理每个项目,但可以平滑速度峰值。
3. 使用 conflate() 跳过旧项目
当你只关心最新数据(例如更新进度条)时,你可以这样写:
1 2 3 4 5 6 |
|
- 如果处理程序繁忙,则仅保留最新未处理的项目。
- 较旧的项目将被丢弃,因此你无需处理过时的更新。
注意:conflate() 不会停止当前工作;它只是在下次读取时跳过旧值。
4. 使用 collectLatest { } 停止旧工作
要进一步操作并在新数据进入时取消任何正在进行的工作,请使用:
1 2 3 4 5 6 |
|
- 每次发出(emit)新的数据时,处理前一个值的块都会立即被丢弃。
- 只有当发送方的发送速度持续超出你的处理能力时,你才需要完成最后一个值的工作。
这非常适合边输入边搜索的情况,在这种情况下,你希望在用户再次输入时立即丢弃旧请求。
5. 选择合适的工具
普通 collect
- 功能:发送方和处理方互相等待,一个接一个
- 何时选择它:你必须按顺序处理每个项目
.buffer(n)
- 功能:大小为 n 的小队列;不丢弃任何项目
- 何时选择它:你需要少量缓冲,但仍要处理所有项目
.conflate()
- 功能:如果处理方繁忙,则仅保留最新项目
- 何时选择它:你需要最新数据,但仍要完成当前工作
collectLatest { }
- 功能:新数据到达后立即取消所有正在进行的工作
- 何时选择它:只考虑最新的结果;立即放下其他一切
6. 总结
- 回压机制可防止快速数据流过载慢速处理器。
- 默认模式没有队列:安全但速度可能较慢。
- buffer() 函数添加了一个小队列:更灵活,不会丢包。
- conflate() 函数跳过旧值:始终保持最新,但让当前工作完成。
- collectLatest { } 函数停止旧工作:仅完成最新项。
下次你的 Flow 感觉太快或太慢时,请问自己:
- 我需要处理每个值吗?
- 小型队列有帮助吗?
- 只有最新数据才重要吗?
- 当新数据到达时,我应该取消旧工作吗?
选择最合适的简单选项,Kotlin Flow 会处理余下的事情。