首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >与StreamObserver进行异步交互(即与Java 8+ CompletableFutures )是否安全?

与StreamObserver进行异步交互(即与Java 8+ CompletableFutures )是否安全?
EN

Stack Overflow用户
提问于 2020-05-05 07:17:40
回答 2查看 1.7K关注 0票数 1

我在看Grpc.io基础教程中的简单RPC示例

代码语言:javascript
复制
@Override
public void getFeature(Point request, StreamObserver<Feature> responseObserver) {
  responseObserver.onNext(checkFeature(request));
  responseObserver.onCompleted();
}

...

private Feature checkFeature(Point location) {
  for (Feature feature : features) {
    if (feature.getLocation().getLatitude() == location.getLatitude()
        && feature.getLocation().getLongitude() == location.getLongitude()) {
      return feature;
    }
  }

  // No feature was found, return an unnamed feature.
  return Feature.newBuilder().setName("").setLocation(location).build();
}

是否对从其他线程与StreamObserver交互有任何警告?例如,,比方说,checkFeature()异步访问另一个服务,返回一个CompletableFuture:

代码语言:javascript
复制
@Override
public void getFeature(Point request, StreamObserver<Feature> responseObserver) {
  checkFeature(request).
      thenAccept(feature -> responseObserver.onNext(feature));
  responseObserver.onCompleted();
}

当然,上面的内容不能工作,因为第一个线程将在返回特性之前执行onCompleted()。所以让我们解决这个问题:

代码语言:javascript
复制
@Override
public void getFeature(Point request, StreamObserver<Feature> responseObserver) {
  checkFeature(request).
      thenAccept(feature -> {
        responseObserver.onNext(feature);
        responseObserver.onCompleted();
      });
}

我认为这是可行的,但我对Java并不熟悉,所以我想知道有什么影响。例如,

  • Context.current()是一致的吗?
  • 对于一元调用和StreamObserver,除了onNext()之外,还有什么东西会导致onError()过早破坏或关闭?
  • 有更好的练习吗?

如果有人也能让我了解他们的推理方式,那就太好了。我试着查找StreamObserver的实际实现,但不知道该查找什么。

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2020-05-06 17:35:05

使用thenAccept()调用onNext()onCompleted()很好,因为观察者不是从多个线程并发调用的。

单独命名为onCompleted()的“坏”示例之所以失败,也是因为它可以在没有任何形式的同步的情况下从多个线程调用观察者。不能同时从多个线程调用StreamObservers

然而,使用thenAccept()并不完全正确,因为它不能处理未来失败的情况。所以您也需要接收Throwable,这可以用whenComplete()来完成

代码语言:javascript
复制
@Override
public void getFeature(Point request, StreamObserver<Feature> responseObserver) {
  checkFeature(request).
      whenComplete((feature, t) -> {
        if (t != null) {
          responseObserver.onError(t);
        } else {
          responseObserver.onNext(feature);
          responseObserver.onCompleted();
        }
      });
}

在处理该lambda时,上下文很容易出现“错误”。通常,我们会寻找“体系结构”解决方案来确保上下文被传播,比如在创建应用程序线程池时将所有应用程序线程池包装在Context.currentContextExecutor()中,这样单个的调用站点就不需要关心传播了。我对CompletableFuture还不太熟悉,无法为其提供策略。

票数 2
EN

Stack Overflow用户

发布于 2020-05-05 17:10:38

Context.current()是否一致?

Context.current()正在使用ThreadLocal。如果您是在另一个线程上访问它,则它将不一致。可以在线程之间传播上下文。您可能会发现这个职位很有用。

除了对一元调用和StreamObserver ()的onNext()之外,还有什么东西会导致onError()过早地破坏或关闭吗?

是的,StreamObserver的正常流以onError或onCompleted结尾。

正如StreamObserver javadoc所述,“由于单个StreamObservers不是线程安全的,如果多个线程将并发写入StreamObserver,则应用程序必须同步调用”。如果要调用concurrently,,则需要同步调用。换句话说,如果您确实知道即使您使用的是多个线程,也不会同时调用它,那么它应该是可以的。

如果在多个线程上访问相同的StreamObserver,我会尝试同步它,除非性能是关键的,因为它容易出错。至少,它应该得到一个好的评论。

票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/61607744

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档