我正在尝试使用GetKafka处理器获取消息,但我无法获取。我使用Kafka命令行消费者测试了消费消息,它起作用了。我还能够成功地使用PutKafka processor将消息放入主题中。附加了我设置Zookeeper连接字符串和主题名称的设置。当我运行流时,我在处理器中看不到任何错误。
GetKafka Processor
我在nifi-app.log中看到一个异常:
2016-08-03 09:34:33,722 WARN [70e1df87-6097-4ed0-9a40-7e36f9be6921_mydomain.com-1470231250839-1fbd0cfe-leader-finder-thread] kafka.client.ClientUtils$ Fetching topic metadata with correlation id 0 for topics [Set(test)] from broker [id:0,host:DataSlave1.CSE-RD.com,
port:9092] failed
java.nio.channels.ClosedByInterruptException: null
at java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:202) ~[na:1.8.0_101]
at sun.nio.ch.SocketChannelImpl.poll(SocketChannelImpl.java:957) ~[na:1.8.0_101]Kafka (0.8):2.10-0.8.2.1,Nifi: 0.7.0我错过了什么吗?谢谢。
发布于 2016-08-04 01:57:53
重新启动后,异常消失了。GetKafka现在能够在生产者中发送消息时获取消息。它没有收到主题中之前的消息(相当于kafka控制台消费者中的--from-kafka)。我在处理器中看不到这样的设置。
https://stackoverflow.com/questions/38745500
复制相似问题