public static void main(String[] args) {
Flux.fromIterable(Arrays.asList("a","b","c"))
.flatMap(a -> Mono.just(a).subscribeOn(Schedulers.parallel()))
.doOnNext(
a ->
System.out.println(
"Receivedededed " + a + " on thread:" + Thread.currentThread().getName()))
.flatMap(
a -> {
System.out.println(
"Received: " + a + " on threads: " + Thread.currentThread().getName());
if ("a" == a) return getCreditScore(a);
else return Mono.just(a);
})
.subscribe(a -> System.out.println("afka" + a));
System.out.println("Main Thread Free");
try {
Thread.sleep(Integer.MAX_VALUE);
} catch (InterruptedException e) {
e.printStackTrace();
}
}我得到了以下输出:
Receivedededed a on thread:parallel-1
Received: a on threads: parallel-1
Main Thread Free然后是无限的等待
其中getCreditScore(a)有一个无限等待的线程。
现在,据我所知,flatMap热切地订阅了所有的内部流,而不是在订阅另一个之前等待一个完成。由于我使用了.subscribeOn(),第一个flatMap在两个不同的线程上发布事件。第二个flatMap应该订阅对应于"b“和"c”的事件,就像包含"a“的线程正在等待一样,其余的线程是空闲的,而flatMap在订阅另一个线程之前不会等待流完成。
调试器显示处于休眠状态的parallel-1 (这是正确的,因为getCreditScore(a)使线程无限期休眠),parallel-2和parallel-3处于等待状态。
我哪里出问题了?
发布于 2019-06-18 04:55:21
这是因为每个运算符每次都处理其中的项。这是反应的关键。在您的处理逻辑中,由于您无限期地阻塞线程,其他项("b“、"c")只是排队,从未处理过。当然,"a“首先被推送到队列中,因为Flux. fromIterable是按顺序执行的。
与其显式地控制线程,不如使用Mono.delay返回一个“承诺”。这将确保打印"b“和"c”。
https://stackoverflow.com/questions/56637649
复制相似问题