我有少数生产者,他们各自向自己的流发出数据,然后我将它们合并为一个,进一步处理这些值并使用它们:
fun makeInputFlow() = flow {
while (shouldMakeRequest()) {
// make (possibly blocking) network request
// do some CPU-intensive operations
results.forEach { emit(it) }
yield()
}
}
suspend fun runFlows() {
val flow1 = makeInputFlow()
val flow2 = makeInputFlow()
val flow3 = makeInputFlow()
val merged = listOf(flow1, flow2, flow3)
.merge()
merged.collect { println(it) }
}
这是可行的,但看起来只有一个流程实际上在当时产生了值。CPU 大部分时间都处于空闲状态。
有没有办法在自己的协程/线程中安全地运行每个流程,以便所有流程并行产生值?