我目前正在编写一个方法,该方法需要在继续下游操作之前将每个元素添加到并发HashMap中。例如
public Flux<Bar> methodOne(final Flux<Foo> foos) {
Map<UUID, Foo> idToFoo = new ConcurrentHashMap<>();
return foos.doOnNext(foo -> idToFoo.put(foo.getId(), foo)
.flatMap(foo -> fooToBar(foo, idToFoo));
}
我是否可以确定fooToBar()
传递的地图中将填充相应的元素?
据我所知,doOnNext()
它是同步的,它会在继续下游之前执行并完成 lambda。然而,我遭到了一位高级开发人员的反对 - 他们认为这是一种副作用,是一种“发射后不管”的类型。
我正在努力更好地理解源代码,以便能够证明用例的合理性,但也想在这里检查。
谢谢
你是对的。除非另有编码,否则链中的消费者
doOnNext()
将在执行链中的下一个操作符之前完全执行。来自Reactor API 文档:
您可以使用以下示例展示此行为
运行此
main
方法时,无论你设置的延迟有多高,你都会按以下顺序看到以下输出Thread.sleep