AskOverflow.Dev

AskOverflow.Dev Logo AskOverflow.Dev Logo

AskOverflow.Dev Navigation

  • 主页
  • 系统&网络
  • Ubuntu
  • Unix
  • DBA
  • Computer
  • Coding
  • LangChain

Mobile menu

Close
  • 主页
  • 系统&网络
    • 最新
    • 热门
    • 标签
  • Ubuntu
    • 最新
    • 热门
    • 标签
  • Unix
    • 最新
    • 标签
  • DBA
    • 最新
    • 标签
  • Computer
    • 最新
    • 标签
  • Coding
    • 最新
    • 标签
主页 / coding / 问题 / 77674891
Accepted
MartinHH
MartinHH
Asked: 2023-12-17 23:11:40 +0800 CST2023-12-17 23:11:40 +0800 CST 2023-12-17 23:11:40 +0800 CST

fs2:流启动后如何执行某些操作(“doOnSubscribe”)?

  • 772

我试图在猫效应 IO-App 的上下文中使用不纯的(“java”)API。不纯的 API 看起来有点像这样:

import  io.reactivex.Flowable
import java.util.concurrent.CompletableFuture

trait ImpureProducer[A] {
  /** the produced output - must be subscribed to before calling startProducing() */
  def output: Flowable[A]

  /** Makes the producer start publishing its output to any subscribers of `output`. */
  def startProducing(): CompletableFuture[Unit]
}

(当然,还有更多方法,包括 stopProducing(),但这些与我的问题无关。)

我的(也许是幼稚的)适应该 API 的方法如下所示(利用Flowablea 的事实org.reactivestreams.Publisher):

import cats.effect.IO
import fs2.Stream
import fs2.interop.reactivestreams.*

class AdaptedProducer[A](private val underlying: ImpureProducer[A]) {
  def output: Stream[IO, A] =
    underlying.output.toStreamBuffered(1)
  def startProducing: IO[Unit] = 
    IO.fromCompletableFuture(IO(underlying.startProducing()))
}

我的问题是:如何确保output在评估之前订阅 -stream startProducing?

例如,如何修复以下尝试以获取生成的第一个项目的 IO:

import cats.Parallel
import cats.effect.IO

def firstOutput[A](producer: AdaptedProducer[A]): IO[A] = {
  val firstOut: IO[A] = producer.output.take(1).compile.onlyOrError
  // this introduces a race condition: it is not ensured that the output-stream
  // will already be subscribed to when startProducing is evaluated.
  Parallel[IO].parProductL(firstOut)(producer.startProducing)
}
scala
  • 1 1 个回答
  • 25 Views

1 个回答

  • Voted
  1. Best Answer
    Luis Miguel Mejía Suárez
    2023-12-18T01:09:32+08:002023-12-18T01:09:32+08:00

    这可以使用新的Java 互操作来完成flow。
    请注意,自从添加了反应流inteop 以来,它已被弃用flow。API 被重新设计以处理更多情况(如本例),并且从头开始重新实现实现以提高效率。
    如果您使用的 API 没有flow基础版本,您可以使用FlowAdapters来包装它。

    代码如下所示:

    import org.reactivestreams.FlowAdapters
    
    final class AdaptedProducer[A](underlying: ImpureProducer[A], chunkSize: Int) {
      val run: Stream[IO, A] =
        fs2.interop.flow.fromPublisher(chunkSize) { subscriber =>
           IO(
             underlying.output.subscribe(
                FlowAdapters.toFlowSubscriber(
                 subscriber
                )
             )
           ) >>
           IO.fromCompletableFuture(IO(underlying.startProducing()))
        }    
    }
    

    这确保startProducing仅当您实际开始使用Stream并且在调用之后才调用subscribe。

    希望这有帮助:D

    • 2

相关问题

  • 为什么这个 ZIO 层组合无法编译?

  • 如何在 Scala 3 中将 Expr 转换为树?

  • 如何正确匹配 Scala 3 宏注释中的 TypeDef?

  • Spark Scala 将多列合并为单列

  • 可变参数的多类型方法参数

Sidebar

Stats

  • 问题 205573
  • 回答 270741
  • 最佳答案 135370
  • 用户 68524
  • 热门
  • 回答
  • Marko Smith

    使用 <font color="#xxx"> 突出显示 html 中的代码

    • 2 个回答
  • Marko Smith

    为什么在传递 {} 时重载解析更喜欢 std::nullptr_t 而不是类?

    • 1 个回答
  • Marko Smith

    您可以使用花括号初始化列表作为(默认)模板参数吗?

    • 2 个回答
  • Marko Smith

    为什么列表推导式在内部创建一个函数?

    • 1 个回答
  • Marko Smith

    我正在尝试仅使用海龟随机和数学模块来制作吃豆人游戏

    • 1 个回答
  • Marko Smith

    java.lang.NoSuchMethodError: 'void org.openqa.selenium.remote.http.ClientConfig.<init>(java.net.URI, java.time.Duration, java.time.Duratio

    • 3 个回答
  • Marko Smith

    为什么 'char -> int' 是提升,而 'char -> Short' 是转换(但不是提升)?

    • 4 个回答
  • Marko Smith

    为什么库中不调用全局变量的构造函数?

    • 1 个回答
  • Marko Smith

    std::common_reference_with 在元组上的行为不一致。哪个是对的?

    • 1 个回答
  • Marko Smith

    C++17 中 std::byte 只能按位运算?

    • 1 个回答
  • Martin Hope
    fbrereto 为什么在传递 {} 时重载解析更喜欢 std::nullptr_t 而不是类? 2023-12-21 00:31:04 +0800 CST
  • Martin Hope
    比尔盖子 您可以使用花括号初始化列表作为(默认)模板参数吗? 2023-12-17 10:02:06 +0800 CST
  • Martin Hope
    Amir reza Riahi 为什么列表推导式在内部创建一个函数? 2023-11-16 20:53:19 +0800 CST
  • Martin Hope
    Michael A fmt 格式 %H:%M:%S 不带小数 2023-11-11 01:13:05 +0800 CST
  • Martin Hope
    God I Hate Python C++20 的 std::views::filter 未正确过滤视图 2023-08-27 18:40:35 +0800 CST
  • Martin Hope
    LiDa Cute 为什么 'char -> int' 是提升,而 'char -> Short' 是转换(但不是提升)? 2023-08-24 20:46:59 +0800 CST
  • Martin Hope
    jabaa 为什么库中不调用全局变量的构造函数? 2023-08-18 07:15:20 +0800 CST
  • Martin Hope
    Panagiotis Syskakis std::common_reference_with 在元组上的行为不一致。哪个是对的? 2023-08-17 21:24:06 +0800 CST
  • Martin Hope
    Alex Guteniev 为什么编译器在这里错过矢量化? 2023-08-17 18:58:07 +0800 CST
  • Martin Hope
    wimalopaan C++17 中 std::byte 只能按位运算? 2023-08-17 17:13:58 +0800 CST

热门标签

python javascript c++ c# java typescript sql reactjs html

Explore

  • 主页
  • 问题
    • 最新
    • 热门
  • 标签
  • 帮助

Footer

AskOverflow.Dev

关于我们

  • 关于我们
  • 联系我们

Legal Stuff

  • Privacy Policy

Language

  • Pt
  • Server
  • Unix

© 2023 AskOverflow.DEV All Rights Reserve