我有少数生产者,他们各自向自己的流发出数据,然后我将它们合并为一个,进一步处理这些值并使用它们:
fun makeInputFlow() = flow {
while (shouldMakeRequest()) {
// make (possibly blocking) network request
// do some CPU-intensive operations
results.forEach { emit(it) }
yield()
}
}
suspend fun runFlows() {
val flow1 = makeInputFlow()
val flow2 = makeInputFlow()
val flow3 = makeInputFlow()
val merged = listOf(flow1, flow2, flow3)
.merge()
merged.collect { println(it) }
}
这是可行的,但看起来只有一个流程实际上在当时产生了值。CPU 大部分时间都处于空闲状态。
有没有办法在自己的协程/线程中安全地运行每个流程,以便所有流程并行产生值?
我们通常不允许在协程内部执行阻塞 I/O,因为这可能会导致许多不同类型的问题,类似于您观察到的问题。
为了执行阻塞 I/O,我们需要暂时切换到针对此类工作负载的特殊线程池:
请注意,
Dispatchers.IO
对于 CPU 密集型操作来说, 并不是最佳选择。为此,最好使用Dispatchers.Default
。理想情况下,您应该将生产者逻辑设为可挂起的代码,根据工作负载在调度程序之间切换。如果您做不到这一点,Dispatchers.IO
可能是您最好的选择。此外,
yield()
可能不需要,因为withContext
和emit
都是暂停函数,如果需要,调用它们就足以释放线程。如果使用运行 CPU 密集型代码Dispatchers.Default
,最好定期在那里让步。makeInputFlow()
创建冷流。冷流本身无法运行,需要调用者的协程才能运行。1由于您只有一个协程来收集这三个流,因此它们无法并行运行。除非……您调用
merge
这三个流。merge
内部收集流并为每个流使用单独的协程。摘自文档:因此,从单个协程收集合并的流仍然应该并行工作,因为内部三个流中的每一个都是在单独的协程中执行的。
仍然不行的原因是您的代码当前在主调度程序上运行。这是一个只有一个线程的调度程序。在单独的协程中运行这三个流程无助于并行执行它们,因为它们无法分布到单独的线程中,因为没有线程。
但这很容易解决:只需切换
runFlows
到另一个具有多个线程的调度程序:IO 调度程序有很多线程,但它们大多被设计为暂停,就像网络或文件系统访问等 IO 的情况一样。当您执行 CPU 密集型操作时,您应该选择
Dispatchers.Default
它。它的线程数与 CPU 内核数相同,因此它可以有效地最大限度地利用硬件。1与热SharedFlow相比,它将在其自己的协程中运行,而与其如何收集(甚至是否收集)无关。