我正在尝试使用spring应用程序中的kafka主题。我使用的是Spring云流和下面提到的版本
下面是代码和配置
application.yml
spring:
zipkin:
sender:
type: kafka
kafka:
bootstrap-servers:
- localhost:19091
cloud:
stream:
bindings:
audit-in-0:
destination: com.tonitingaurav.kafka.log
group: kafka-log-group
consumer:
concurrency: 10
max-attempts: 3
default-binder: kafka
kafka:
binder:
brokers:
- localhost:19091讯息使用者类别
@Configuration
public class LogConsumer {
@Bean
Consumer<Log> audit(){
return log -> {
System.out.println(log.getMessage());
};
}
}下面的消息发布者正在正确地发布邮件。Publisher是用不同的微服务编写的。
@Component
public class LogEventPublisher {
@Autowired
@Qualifier(LogProducerKafkaConfig.KAFKA_LOG_PUBLISHER)
MessageChannel messageChannel;
public void logMessage(Log log) {
Message<Log> message = MessageBuilder.withPayload(log).build();
messageChannel.send(message);
}
}pom.xml
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-test-support</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>发布于 2021-11-29 09:21:18
您已经发布了一个非常类似的问题here,并提供了两种不同的解决方案的答复。另外,下面是可以用作起点的示例- https://github.com/spring-cloud/spring-cloud-stream-samples
https://stackoverflow.com/questions/70152139
复制相似问题