Estou tentando usar uma API impura ("java") no contexto de um IO-App com efeito de gato. A API impura se parece com isto:
import io.reactivex.Flowable
import java.util.concurrent.CompletableFuture
trait ImpureProducer[A] {
/** the produced output - must be subscribed to before calling startProducing() */
def output: Flowable[A]
/** Makes the producer start publishing its output to any subscribers of `output`. */
def startProducing(): CompletableFuture[Unit]
}
(É claro que existem mais métodos, incluindo stopProduzindo(), mas esses não são relevantes para minha pergunta.)
Minha abordagem (talvez ingênua) para adaptar essa API é a seguinte (aproveitando o fato de que Flowable
é um org.reactivestreams.Publisher
):
import cats.effect.IO
import fs2.Stream
import fs2.interop.reactivestreams.*
class AdaptedProducer[A](private val underlying: ImpureProducer[A]) {
def output: Stream[IO, A] =
underlying.output.toStreamBuffered(1)
def startProducing: IO[Unit] =
IO.fromCompletableFuture(IO(underlying.startProducing()))
}
Minha pergunta é: como posso garantir que o output
-stream esteja inscrito antes de avaliar startProducing
?
Por exemplo, como eu poderia corrigir a seguinte tentativa de obter um IO do primeiro item produzido:
import cats.Parallel
import cats.effect.IO
def firstOutput[A](producer: AdaptedProducer[A]): IO[A] = {
val firstOut: IO[A] = producer.output.take(1).compile.onlyOrError
// this introduces a race condition: it is not ensured that the output-stream
// will already be subscribed to when startProducing is evaluated.
Parallel[IO].parProductL(firstOut)(producer.startProducing)
}