我试图在猫效应 IO-App 的上下文中使用不纯的(“java”)API。不纯的 API 看起来有点像这样:
import io.reactivex.Flowable
import java.util.concurrent.CompletableFuture
trait ImpureProducer[A] {
/** the produced output - must be subscribed to before calling startProducing() */
def output: Flowable[A]
/** Makes the producer start publishing its output to any subscribers of `output`. */
def startProducing(): CompletableFuture[Unit]
}
(当然,还有更多方法,包括 stopProducing(),但这些与我的问题无关。)
我的(也许是幼稚的)适应该 API 的方法如下所示(利用Flowable
a 的事实org.reactivestreams.Publisher
):
import cats.effect.IO
import fs2.Stream
import fs2.interop.reactivestreams.*
class AdaptedProducer[A](private val underlying: ImpureProducer[A]) {
def output: Stream[IO, A] =
underlying.output.toStreamBuffered(1)
def startProducing: IO[Unit] =
IO.fromCompletableFuture(IO(underlying.startProducing()))
}
我的问题是:如何确保output
在评估之前订阅 -stream startProducing
?
例如,如何修复以下尝试以获取生成的第一个项目的 IO:
import cats.Parallel
import cats.effect.IO
def firstOutput[A](producer: AdaptedProducer[A]): IO[A] = {
val firstOut: IO[A] = producer.output.take(1).compile.onlyOrError
// this introduces a race condition: it is not ensured that the output-stream
// will already be subscribed to when startProducing is evaluated.
Parallel[IO].parProductL(firstOut)(producer.startProducing)
}