alpakka-kafka就是alpakka项目里的kafka-connector。对于我们来说:可以用alpakka-kafka来对接kafka,使用kafka提供的功能。 或者从另外一个角度讲:alpakka-kafka就是一个用akka-streams实现kafka功能的scala开发工具。 本篇我们先介绍alpakka-kafka的producer功能及其使用方法。如前所述:alpakka是用akka-streams实现了kafka-producer功能。 alpakka-kafka提供了一个最基本的producer,非akka-streams组件,sendProducer。 alpakka-kafka streams组件使用这个消息类型作为流元素,最终把它转换成一或多条ProducerRecord写入kafka。
alpakka-kafka-consumer的功能描述很简单:向kafka订阅某些topic然后把读到的消息传给akka-streams做业务处理。 ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000") //自动commit间隔 .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") alpakka-kafka alpakka-kafka还有一个atMostOnceSource。
在下篇我们可以讨论一下用alpakka-kafka实现这个案例所需要考虑的一些技术方案。
在alpakka-kafka里用一个普通的Source就可以实现at-least-once消费模式了: val consumerSettings = ConsumerSettings(consumerConfig
我们把这个库存更新功能的实现作为典型的kafka应用案例来介绍,然后再在过程中对akka系列alpakka-kafka的使用进行讲解和示范。 首先,后端业务功能与前端数据采集是松散耦合的。 alpakka-kafka提供了很多类型的sink来实现写produce功能。 producerKafka.publisherSettings.topic, doc.shopId, toJson(doc))) } else FastFuture.successful(Completed) } SendProducer.send就是alpakka-kafka 这个平台是一个以alpakka-kafka-stream为主要运算框架的流计算软件。我们可以通过这次示范深入了解alpakka-kafka-stream的原理和应用。 在alpakka-kafka,reader可以用一个stream-source来表示,如下: val commitableSource = Consumer .committableSource
alpakka-kafka提供了一种CommittableSource: def committableSource[K, V](settings: ConsumerSettings[K, V], alpakka-kafka提供了Committer,通过Committer.sink, Committer.Flow帮助实现offset-commit,Committer.flow如下: Consumer 对于at-most-once消费模式的实现,alpakka-kafka提供了atMostOnceSource: def atMostOnceSource[K, V](settings: ConsumerSettings
所谓consumer就是alpakka-kafka的一个stream。 还有一个问题需要考虑的:alpakka-kafka提供了一个独特的分片部署策略kafkaSharding,能实现partition与某分片在同一节点对应,这样可以节省消息跨节点传递,把消息读取和业务处理放到同一节点上去完成
随着网上购物消费模式热度的不断提高,网上销售平台上各种促销手段也层出不穷,其中“秒购”已经是各种网站普遍流行的促销方式了。“秒购”对数据的实效性和精确性要求非常高,所以通过分布式运算实现高并发数据处理应该是正确的选择。不过,高并发也意味着高频率的数据操作冲突,而高频使用“锁”又会严重影响效率及容易造成不可控异常,所以又被迫选择单线程运行模式。单线程、分布式虽然表面相悖,不过如上篇博文所述:可以利用akka-cluster-sharding分片可指定调用的特性将一种商品的所有操作放到同一个shard上运算(因为shard即是actor,mailbox里的运算指令是按序执行的)可容许在一个分布式环境下有多个分片来同时操作。如此可在获取分布式运算高效率的同时又保证了数据的安全性和完整性。
kafka具备的分布式、高吞吐、高可用特性,以及所提供的各种消息消费模式可以保证在一个多节点集群环境里消息被消费的安全性:即防止每条消息遗漏处理或重复消费。特别是exactly-once消费策略:可以保证每条消息肯定只被消费一次。换句话说就是在分布式运算环境里kafka的消息消费是能保证唯一性的。
在进入具体的kafka应用设计之前我们先把kafka集群环境配置介绍一下。多节点kafka-cluster的安装、配置非常简单,所以应该不用太多篇幅就可以完成一个完整可用的kafka-cluster环境了:
不过akka在alpakka社区提供了alpakka-kafka:这个东西是个基于akka-streams的kafka scala终端编程工具,稍微过了一下,感觉功能比较全面,那就是它了。 那么下面的一系列讨论我就会尝试用alpakka-kafka来构建一个基于CQRS模式的实时交易系统,并和大家进行交流分享。
, "com.typesafe.akka" %% "akka-persistence" % akkaversion, "com.lightbend.akka" %% "akka-stream-alpakka-cassandra 1.0.1", "org.mongodb.scala" %% "mongo-scala-driver" % "2.6.0", "com.lightbend.akka" %% "akka-stream-alpakka-mongodb
HTTP服务端和客户端;Akka Cluster可以帮你实现一个分布式集群系统;Cluster Sharding可以帮你做集群的分片处理;Distributed Data可以帮助你在集群之间分享数据;Alpakka
, "com.typesafe.akka" %% "akka-persistence" % akkaversion, "com.lightbend.akka" %% "akka-stream-alpakka-cassandra 1.0.1", "org.mongodb.scala" %% "mongo-scala-driver" % "2.6.0", "com.lightbend.akka" %% "akka-stream-alpakka-mongodb
在akka-alpakka工具包里也提供了对MongoDB的stream-connector,能针对MongoDB数据库进行streaming操作。 3.4.0", "org.mongodb.scala" %% "mongo-scala-driver" % "2.2.1", "com.lightbend.akka" %% "akka-stream-alpakka-mongodb % "2.5.4", "com.typesafe.akka" %% "akka-stream" % "2.5.4", "com.lightbend.akka" %% "akka-stream-alpakka-cassandra scala.collection.generic.CanBuildFrom import scala.concurrent.duration.Duration import akka.NotUsed import akka.stream.alpakka.cassandra.scaladsl fileName) } MongoEngine.scala import java.text.SimpleDateFormat import akka.NotUsed import akka.stream.alpakka.mongodb.scaladsl
, "com.typesafe.akka" %% "akka-persistence" % akkaVersion, "com.lightbend.akka" %% "akka-stream-alpakka-cassandra 1.0.1", "org.mongodb.scala" %% "mongo-scala-driver" % "2.6.0", "com.lightbend.akka" %% "akka-stream-alpakka-mongodb
akka在alpakka工具包里提供了对cassandra数据库的streaming功能。 % "2.5.4", "com.typesafe.akka" %% "akka-stream" % "2.5.4", "com.lightbend.akka" %% "akka-stream-alpakka-cassandra scala.collection.generic.CanBuildFrom import scala.concurrent.duration.Duration import akka.NotUsed import akka.stream.alpakka.cassandra.scaladsl
classicSystem: ActorSystem) = { implicit val session = CassandraSessionRegistry(classicSystem).sessionFor("alpakka.cassandra
com.datastax.cassandra" % "cassandra-driver-extras" % "3.6.0", "com.lightbend.akka" %% "akka-stream-alpakka-cassandra 4.0 "org.mongodb.scala" %% "mongo-scala-driver" % "2.6.0", "com.lightbend.akka" %% "akka-stream-alpakka-mongodb
Seq( "org.mongodb.scala" %% "mongo-scala-driver" % "2.2.1", "com.lightbend.akka" %% "akka-stream-alpakka-mongodb