我是新来卡夫卡的。我的要求是,我有两个分区,例如Partition-0和Partition-1,并且我有一个值列表,其中也包含键值。我想根据我的key来存储数据,比如key-1将存储到Partition-0,key-2将存储到Partition-1。使用旧的API可以实现像我们需要实现Partition接口一样的功能,但是我可以使用新的API来实现这一点。谢谢你
发布于 2016-07-29 01:55:44
使用新的生产者,您还可以实现Partitioner接口(https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/Partitioner.java)来实现循环分发。
您可以使用DefaultPartitioner作为参考- https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java
发布于 2018-03-01 01:54:56
如果你想要轮询行为,只要在写给Producer时不要传递密钥就行了,DefaultPartitioner会帮你完成这项工作。您不需要编写自定义实现。在javadoc中:
/**
* The default partitioning strategy:
* <ul>
* <li>If a partition is specified in the record, use it
* <li>If no partition is specified but a key is present choose a partition based on a hash of the key
* <li>If no partition or key is present choose a partition in a round-robin fashion
*/发布于 2016-07-29 03:27:44
您可以通过覆盖kafka生产者的default partitioner,以循环方式向kafka生产。
一个伪实现
class RRPartitioner():
def __init__():
# Using topic metadata get total number of partitions
self.total_partitions = client[topic].get_number_partitions()
self.part_offset = 0
def partitioner(self, key, msg):
if self.part_offset > self.total_partitions:
self.part_offset = 0
return self.part_offset
else:
self.part_offset += 1
return self.part_offset上面的实现是纯循环的,如果你想让消息按键排序并且有循环,你必须在自定义分区程序中做更多的事情。
https://stackoverflow.com/questions/38640513
复制相似问题