稀有猿诉

十年磨一剑,历炼出锋芒,说话千百句,不如码二行。

用优雅的姿式应对Kotlin Flow的回压

本文译自「How to Manage Backpressure in Kotlin Flow: collect • buffer • conflate • collectLatest」,原文链接https://proandroiddev.com/how-to-manage-backpressure-in-kotlin-flow-collect-buffer-conflate-collectlatest-b8102284d968,由Shbazhenov发布于2025年6月13日。

你是否曾遇到过快速数据源发送的数据量超出应用处理能力的情况,导致应用速度变慢甚至崩溃?Kotlin Flow 内置了一些方法,可让你的生产者和消费者保持同步。本文将介绍:

  1. 回压的含义
  2. Flow 默认的“互相等待”模式如何工作
  3. 何时使用 buffer() 添加小型队列
  4. conflate() 如何跳过旧数据项
  5. 为什么 collectLatest { } 会停止旧数据处理
  6. 如何根据你的情况选择合适的选项

回压的含义

回压的作用是确保快速的数据发送方不会压垮较慢的接收方。如果没有回压,你可能会在内存中存储过多的数据,或者浪费时间处理过时的信息。

回压可以帮助你:

  • 控制内存使用量
  • 避免不必要的工作
  • 使应用性能更可预测

1. 默认“互相等待”模式

默认情况下,当你执行以下操作时:

1
2
3
4
5
6
7
8
9
10
11
flow {
  repeat(3) {
    emit(it)
    println("Sent $it")
    delay(100)            // 快速的发送者
  }
}
.collect { value ->
  println("Handling $value")
  delay(300)             // 慢速的处理者
}

发送方 ( emit ) 将暂停,直到处理方 ( collect ) 处理完最后一个值。没有队列,每个值都是一次发送和处理一个。

2. 使用 buffer() 添加一个小队列

如果你希望发送方提前一点,请使用:

1
2
3
4
5
flow {  }
  .buffer(capacity = 2)
  .collect { value ->
    // slow work here
  }
  • 现在,发送者最多可以将 2 个项目放入一个小队列中。
  • 一旦队列满了,它就会再次暂停。

这给了你一个有限的队列:你仍然可以处理每个项目,但可以平滑速度峰值。

3. 使用 conflate() 跳过旧项目

当你只关心最新数据(例如更新进度条)时,你可以这样写:

1
2
3
4
5
6
flow {  }
  .conflate()
  .collect { value ->
    println("Update to $value")
    delay(300)
  }
  • 如果处理程序繁忙,则仅保留最新未处理的项目。
  • 较旧的项目将被丢弃,因此你无需处理过时的更新。

注意:conflate() 不会停止当前工作;它只是在下次读取时跳过旧值。

4. 使用 collectLatest { } 停止旧工作

要进一步操作并在新数据进入时取消任何正在进行的工作,请使用:

1
2
3
4
5
6
flow {  }
  .collectLatest { value ->
    println("Start $value")
    delay(300)    // 可能会被切断
    println("Done $value")
  }
  • 每次发出(emit)新的数据时,处理前一个值的块都会立即被丢弃。
  • 只有当发送方的发送速度持续超出你的处理能力时,你才需要完成最后一个值的工作。

这非常适合边输入边搜索的情况,在这种情况下,你希望在用户再次输入时立即丢弃旧请求。

5. 选择合适的工具

普通 collect

  • 功能:发送方和处理方互相等待,一个接一个
  • 何时选择它:你必须按顺序处理每个项目

    .buffer(n)

  • 功能:大小为 n 的小队列;不丢弃任何项目
  • 何时选择它:你需要少量缓冲,但仍要处理所有项目

    .conflate()

  • 功能:如果处理方繁忙,则仅保留最新项目
  • 何时选择它:你需要最新数据,但仍要完成当前工作

    collectLatest { }

  • 功能:新数据到达后立即取消所有正在进行的工作
  • 何时选择它:只考虑最新的结果;立即放下其他一切

    6. 总结

  • 回压机制可防止快速数据流过载慢速处理器。
  • 默认模式没有队列:安全但速度可能较慢。
  • buffer() 函数添加了一个小队列:更灵活,不会丢包。
  • conflate() 函数跳过旧值:始终保持最新,但让当前工作完成。
  • collectLatest { } 函数停止旧工作:仅完成最新项。

下次你的 Flow 感觉太快或太慢时,请问自己:

  1. 我需要处理每个值吗?
  2. 小型队列有帮助吗?
  3. 只有最新数据才重要吗?
  4. 当新数据到达时,我应该取消旧工作吗?

选择最合适的简单选项,Kotlin Flow 会处理余下的事情。

Comments