// #atLeastOnceBatch
Consumer.Control control =
Consumer.committableSource(consumerSettings, Subscriptions.topics("topic1"))
.mapAsync(1, msg ->
business(msg.record().key(), msg.record().value())
.thenApply(done -> msg.committableOffset())
)
.batch(
20,
ConsumerMessage::createCommittableOffsetBatch,
ConsumerMessage.CommittableOffsetBatch::updated
)
.mapAsync(3, c -> c.commitJavadsl())
.to(Sink.ignore())
.run(materializer);
// #atLeastOnceBatch我尝试测试Alpakka Kafka Connnector至少一次批处理示例,我得到了以下编译时错误
类型ConsumerMessage没有定义适用于此处的createCommittableOffsetBatch(ConsumerMessage.CommittableOffset)
并且类型ConsumerMessage.CommittableOffsetBatch没有定义适用于此处的更新(S,ConsumerMessage.CommittableOffset
发布于 2018-09-28 07:06:11
这些都在0.22版本中可用。不幸的是,与Akka文档相比,Alpakka的文档有所欠缺。
https://stackoverflow.com/questions/50971794
复制相似问题