首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >找到java.util.concurrent.Future必需的scala.concurrent.Future

找到java.util.concurrent.Future必需的scala.concurrent.Future
EN

Stack Overflow用户
提问于 2017-09-18 09:28:00
回答 2查看 1.2K关注 0票数 0

相关:scala.concurrent.Future包装器用于java.util.concurrent.Future

这是我的另一个问题:

如何将akka流kafka (反应性-kafka)集成到akka http应用程序中?

我有一个AKKA应用程序,我想在我的路由中向onComplete函数中的Kafka发送一个消息/ProducerRecord,如下所示:

代码语言:javascript
复制
val producer : KafkaProducer = new KafkaProducer(producerSettings)

val routes : Route = 
  post {
    entity(as[User]) { user =>
      val createUser = userService.create(user)
      onSuccess(createUser) {
        case Invalid(y: NonEmptyList[Err]) =>  
          complete(BadRequest -> "invalid user")
        case Valid(u: User) => { 
          val producerRecord = 
            new ProducerRecord[Array[Byte], String]("topic1","some message")

          onComplete(producer.send(producerRecord)) { _ =>
            complete(ToResponseMarshallable((StatusCodes.Created, u)))
          }
        }
      }
    }
  }

但是,onComplete(生产者发送producerRecord)正在生成以下类型不匹配错误:

错误发现:未来需要org.apache.kafka.clients.producer.RecordMetadata错误:未来org.apache.kafka.clients.producer.RecordMetadata错误onCompleteRecordMetadata {_ =>

有办法解决这个问题吗?也许可以使用生产者作为接收器(http://doc.akka.io/docs/akka-stream-kafka/current/producer.html#producer-as-a-sink),而不是使用java producer.send函数?

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2017-09-18 09:49:02

您可以利用基于蛋糕的Scala卡夫卡客户端,它将运行Java并将Scala期货还给您。一旦您确保创建了一个cakesolutions.kafka.KafkaProducer而不是一个org.apache.kafka.clients.producer.KafkaProducer,您的其余代码实际上应该保持不变。

或者,您可以在保持使用高级别Akka的同时,利用反应性卡夫卡来解决这个问题。你可以通过在卡夫卡水槽上运行你的制作人唱片来做到这一点:

代码语言:javascript
复制
val producerSink = Producer.plainSink(producerSettings)

...
        // inside the route
        val producerRecord =
          new ProducerRecord[Array[Byte], String]("topic1", "some message")

        onComplete(Source.single(producerRecord).runWith(producerSink)) { _ =>
          complete(ToResponseMarshallable((StatusCodes.Created, u)))
        }
票数 3
EN

Stack Overflow用户

发布于 2017-09-22 11:10:21

为了回答您的具体问题,scala-java8-compat库提供了java8和Scala之间的转换器。

具体来说,您可以使用FutureConverters.toScala(producer.send(producerRecord))java.util.concurrent.Future转换为scala.concurrent.Future

但是,使用具有Scala友好API本身的客户端库(如上面Stefano所建议的)可能会得到最好的结果。

票数 3
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/46275913

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档