ItemReader正在从DB2读取数据,并提供了java对象ClaimDto。现在ClaimProcessor接受ClaimDto的对象,并返回由claimRecord1和claimRecord2组成的CompositeClaimRecord对象,该对象将被发送到两个不同的Kafka主题。如何将claimRecord1和claimRecord2分别写入topic1和topic2。
发布于 2020-12-14 18:11:39
只需编写一个自定义的ItemWriter就可以做到这一点。
public class YourItemWriter implements ItemWriter<CompositeClaimRecord>` {
private final ItemWriter<Record1> writer1;
private final ItemWriter<Record2> writer2;
public YourItemWriter(ItemWriter<Record1> writer1, ItemWriter<Record2> writer2>) {
this.writer1=writer1;
this.writer2=writer2;
}
public void write(List<CompositeClaimRecord> items) throws Exception {
for (CompositeClaimRecord record : items) {
writer1.write(Collections.singletonList(record.claimRecord1));
writer2.write(Collections.singletonList(record.claimRecord2));
}
}
}或者,不是一次写入一条记录,而是将单个列表转换为2个列表并传递。但在这种情况下,错误处理可能有点困难。\
public class YourItemWriter implements ItemWriter<CompositeClaimRecord>` {
private final ItemWriter<Record1> writer1;
private final ItemWriter<Record2> writer2;
public YourItemWriter(ItemWriter<Record1> writer1, ItemWriter<Record2> writer2>) {
this.writer1=writer1;
this.writer2=writer2;
}
public void write(List<CompositeClaimRecord> items) throws Exception {
List<ClaimRecord1> record1List = items.stream().map(it -> it.claimRecord1).collect(Collectors.toList());
List<ClaimRecord2> record2List = items.stream().map(it -> it.claimRecord2).collect(Collectors.toList());
writer1.write(record1List);
writer2.write(record2List);
}
}发布于 2020-12-14 17:25:04
您可以使用具有两个KafkaItemWriter的ClassifierCompositeItemWriter作为委派(每个主题一个)。
Classifier将根据项目的类型(claimRecord1或claimRecord2)对项目进行分类,并将它们路由到相应的kafka项目写入器(topic1或topic2)。
https://stackoverflow.com/questions/65286139
复制相似问题