Akka 文档Source.fromIterator
(https://doc.akka.io/docs/akka/current/stream/operators/Source/fromIterator.html)说:
如果迭代器执行阻塞操作,请确保在单独的调度程序上运行它。
我有一个迭代器,我想用它构建一个 Akka Source
。但是,有可能在返回的元素可用之前hasNext
返回。如果发生这种情况,则调用必须阻塞,等待元素。请注意,不能 return ,因为 Akka 会认为已完成并停止处理。true
next
next
hasNext
false
Source
当然,Akka 不喜欢等待下一个元素。偶尔会弹出超时异常。那么,在这种情况下我该怎么办?在单独的调度程序上运行这样的迭代器意味着什么?如果我将代码包装起来next
以返回 a Future
(为此使用单独的调度程序),那么我会得到 aSource[Future[T]]
而不是Source[T]
,这会在下游产生问题,除非我以某种方式将其转换为Future[Source[T]]
。
有什么建议么?
正如您所注意到的,如果您将迭代器转换为一个
Iterator[Future[T]]
在另一个线程上完成 future 的东西,Source.fromIterator
则将是一个Source[Future[T]]
. 但是,始终可以通过附加Source[Future[T]]
a 将 a 转换为a :Source[T]
mapAsync(identity)
mapAsync
不会阻塞流调度程序中的线程。ActorAttributes.dispatcher
您还可以使用和设置在特定调度程序上运行的阶段withAttributes
:请注意,这将在源和下游之间引入异步边界(及其关联的缓冲区)。