新的Kafka,最近我试图从春季批量获取数据,然后写入Kafka,但我不知道如何做到这一点。有人能帮我弄清楚如何将数据写入Kafka吗?下面是我用SpringBatch编写的获取数据的演示代码:
@Configuration公共类FileReader {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
@Qualifier("flatFileWriter")
private ItemWriter<? super Demo1> flatFileWriter;
@Bean
public Job FileReaderJob() {
return jobBuilderFactory.get("FileReaderJob").start(FileReaderStep()).build();
}
private Step FileReaderStep() {
// TODO Auto-generated method stub
return stepBuilderFactory.get("FileReaderStep").<Demo1,Demo1>chunk(100).reader(flatFileReader())
.writer(flatFileWriter).build();
}
@Bean
@StepScope
public FlatFileItemReader<Demo1> flatFileReader() {
// TODO Auto-generated method stub
FlatFileItemReader<Demo1> reader = new FlatFileItemReader<Demo1>();
reader.setResource(new ClassPathResource("Demo1.csv"));
reader.setLinesToSkip(1);
DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
tokenizer.setNames(new String [] {"id","first","last"});
DefaultLineMapper<Demo1> mapper = new DefaultLineMapper<>();
mapper.setLineTokenizer(tokenizer);
mapper.setFieldSetMapper(new FieldSetMapper<Demo1>() {
@Override
public Demo1 mapFieldSet(FieldSet fieldSet) throws BindException {
Demo1 demo1 = new Demo1();
demo1.setId(fieldSet.readLong("id"));
demo1.setFirst(fieldSet.readString("first"));
demo1.setLast(fieldSet.readString("last"));
// TODO Auto-generated method stub
return demo1;
}
});
mapper.afterPropertiesSet();
reader.setLineMapper(mapper);
return reader;
}}
发布于 2019-09-25 15:57:53
即将发布的Spring Batch v4.2 GA将提供对Apache Kafka主题的读/写数据的支持。你已经可以在4.2.0.RC1 release上试用了。
对于KafkaItemWriter,您需要配置KafkaTemplate。下面是一个编写器的例子:
@Bean
public KafkaItemWriter<String, Demo1> kafkaItemWriter(KafkaTemplate<String, Demo1> kafkaTemplate) {
return new KafkaItemWriterBuilder<String, Demo1>()
.kafkaTemplate(kafkaTemplate)
.build();
}你也可以看看Josh Long在Spring Batch中关于Kafka支持的Spring Tips installment。
https://stackoverflow.com/questions/58091972
复制相似问题