相关:scala.concurrent.Future包装器用于java.util.concurrent.Future
这是我的另一个问题:
如何将akka流kafka (反应性-kafka)集成到akka http应用程序中?
我有一个AKKA应用程序,我想在我的路由中向onComplete函数中的Kafka发送一个消息/ProducerRecord,如下所示:
val producer : KafkaProducer = new KafkaProducer(producerSettings)
val routes : Route =
post {
entity(as[User]) { user =>
val createUser = userService.create(user)
onSuccess(createUser) {
case Invalid(y: NonEmptyList[Err]) =>
complete(BadRequest -> "invalid user")
case Valid(u: User) => {
val producerRecord =
new ProducerRecord[Array[Byte], String]("topic1","some message")
onComplete(producer.send(producerRecord)) { _ =>
complete(ToResponseMarshallable((StatusCodes.Created, u)))
}
}
}
}
}但是,onComplete(生产者发送producerRecord)正在生成以下类型不匹配错误:
错误发现:未来需要org.apache.kafka.clients.producer.RecordMetadata错误:未来org.apache.kafka.clients.producer.RecordMetadata错误onCompleteRecordMetadata {_ =>
有办法解决这个问题吗?也许可以使用生产者作为接收器(http://doc.akka.io/docs/akka-stream-kafka/current/producer.html#producer-as-a-sink),而不是使用java producer.send函数?
发布于 2017-09-18 09:49:02
您可以利用基于蛋糕的Scala卡夫卡客户端,它将运行Java并将Scala期货还给您。一旦您确保创建了一个cakesolutions.kafka.KafkaProducer而不是一个org.apache.kafka.clients.producer.KafkaProducer,您的其余代码实际上应该保持不变。
或者,您可以在保持使用高级别Akka的同时,利用反应性卡夫卡来解决这个问题。你可以通过在卡夫卡水槽上运行你的制作人唱片来做到这一点:
val producerSink = Producer.plainSink(producerSettings)
...
// inside the route
val producerRecord =
new ProducerRecord[Array[Byte], String]("topic1", "some message")
onComplete(Source.single(producerRecord).runWith(producerSink)) { _ =>
complete(ToResponseMarshallable((StatusCodes.Created, u)))
}发布于 2017-09-22 11:10:21
为了回答您的具体问题,scala-java8-compat库提供了java8和Scala之间的转换器。
具体来说,您可以使用FutureConverters.toScala(producer.send(producerRecord))将java.util.concurrent.Future转换为scala.concurrent.Future。
但是,使用具有Scala友好API本身的客户端库(如上面Stefano所建议的)可能会得到最好的结果。
https://stackoverflow.com/questions/46275913
复制相似问题