首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >两个Quarkus服务之间的非阻塞数据流(Vert.x与Java中的Mutiny )

两个Quarkus服务之间的非阻塞数据流(Vert.x与Java中的Mutiny )
EN

Stack Overflow用户
提问于 2020-08-29 16:20:35
回答 1查看 1.4K关注 0票数 3

更新!

在解决了一些与主要问题无关的问题后,我修正了示例代码中的小错误,这些问题仍然是关于服务之间的非阻塞流的。

背景信息

我在Quarkus下移植一个Spring WebFlux服务。该服务对多个大型数据集进行长时间搜索,并在一个Flux (文本/事件流)中返回可用的部分结果。

问题

现在,我正试图在Quarkus下使用Mutiny和Vert.x,但不知道消费者服务如何在不阻塞的情况下接收这个流。

在所有示例中,使用者要么是JS的前端页面,要么生产者的内容类型是application/json,在bluck看来,直到Multi完成之后才将其发送到一个JSON对象中(这在我的应用程序中没有任何意义)。

问题

WebClient?

  • If如何接收文本/事件流?问题是WebClient无法接收连续的蒸汽:在两个Quarkus服务之间传输数据的标准方法是什么?

这里是一个简化的示例

测试实体

代码语言:javascript
复制
public class SearchResult implements Serializable {

    private String content;

    public SearchResult(String content) {
        this.content = content;
    }


    //.. toString, getters and setters
    
}

生产者1.简单的无限流->挂起

代码语言:javascript
复制
@GET
@Path("/search")
@Produces(MediaType.SERVER_SENT_EVENTS)
@SseElementType(MediaType.APPLICATION_JSON)
public Multi<SearchResult> getResults() {
        return Multi.createFrom().ticks().every(Duration.ofSeconds(2)              .onItem().transform(n -> new SearchResult(n.toString()));
}

生成器2.具有Vertx路径的无限流->挂起

代码语言:javascript
复制
@Route(path = "/routed", methods = HttpMethod.GET)
public Multi<SearchResult> getSrStreamRouted(RoutingContext context) {
        log.info("routed run");
        return ReactiveRoutes.asEventStream(Multi.createFrom().ticks().every(Duration.ofSeconds(2))
                .onItem().transform(n -> new SearchResult(n.toString()));
}

生成器3.简单有限流->块直到完成

代码语言:javascript
复制
@GET
@Path("/search")
@Produces(MediaType.SERVER_SENT_EVENTS)
@SseElementType(MediaType.APPLICATION_JSON)
public Multi<SearchResult> getResults() {
    return Multi.createFrom().ticks().every(Duration.ofSeconds(2))
        .transform().byTakingFirstItems(5)
        .onItem().transform(n -> new SearchResult(n.toString()));
}

Consumer

在生产者和消费者双方都尝试了多种不同的解决方案,但是在任何情况下都会阻塞流,直到它完成或无限期地挂起,而不为无限流传输数据。我在httpie上得到了同样的结果。下面是最新的迭代:

代码语言:javascript
复制
WebClientOptions webClientOptions = new WebClientOptions().setDefaultHost("localhost").setDefaultPort(8182);
WebClient client = WebClient.create(vertx, webClientOptions);
        
client.get("/string")
                .send()
                .onFailure().invoke(resp -> log.error("error: " + resp))
                .onItem().invoke(resp -> log.info("result: " + resp.statusCode()))
                .toMulti()
                .subscribe().with(r -> log.info(String.format("Subscribe: code:%d body:%s",r.statusCode(), r.bodyAsString())));
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2020-08-31 06:02:54

Vert.xWeb客户端不使用SSE (没有配置)。来自https://vertx.io/docs/vertx-web-client/java/

完全缓冲

响应,使用BodyCodec.pipe将响应输送到写流

它一直等到响应完成。您可以使用原始Vert.xHTTP客户机,也可以使用pipe编解码器。在https://vertx.io/docs/vertx-web-client/java/#_decoding_responses上给出了实例。

或者,您可以使用SSE客户机,如in:https://github.com/quarkusio/quarkus-quickstarts/blob/master/kafka-quickstart/src/test/java/org/acme/kafka/PriceResourceTest.java#L27-L34

票数 4
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/63649235

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档