我正在使用Spring编写一个Spring批处理,并且我需要根据条件编写两个不同的表,因此我试图使用CompositeItemWriter,但是当我调用该批处理时,不会调用写入器。
这是我的职务配置类。
@Configuration
public class JobConfiguration {
...
...
...
@Bean
public JdbcCursorItemReader<Notification> reader() {
JdbcCursorItemReader<Notification> reader = new JdbcCursorItemReader<Notification>();
reader.setDataSource(dataSource);
...
...
reader.setRowMapper(new BeanPropertyRowMapper<>(Notification.class));
return reader;
}
@Bean
public NotificationItemProcessor notificatonProcessor() {
return new NotificationItemProcessor();
}
@Bean
public JdbcBatchItemWriter<Notification> updateWriter() {
JdbcBatchItemWriter<Notification> writer = new JdbcBatchItemWriter<Notification>();
writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<Notification>());
...
writer.setDataSource(dataSource);
return writer;
}
/**
* Composite Exchange Writer
* @return
* @throws InstantiationException
* @throws IllegalAccessException
*/
@SuppressWarnings("unchecked")
@Bean
public CompositeItemWriter<Notification> compositeExchangeWriter() throws InstantiationException, IllegalAccessException {
HashMap<String, Object> map = new HashMap<String, Object>();
map.put(ExchangeRouter.INSERT_EXCHANGE_FOR_NOTIFICATION.getActionName(), exchangeWorkflowWriter());
map.put(ExchangeRouter.INSERT_EXCHANGE_FOR_PACK.getActionName(), exchangeWriter());
map.put(ExchangeRouter.DO_NOTHING.getActionName(), doNothing());
return new CompositeItemWriterBuilder(map, ExchangeWriterRouterClassifier.class).build();
}
@Bean
public JdbcBatchItemWriter<Notification> exchangeWorkflowWriter() {
JdbcBatchItemWriter<Notification> writer = new JdbcBatchItemWriter<Notification>();
writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<Notification>());
writer.setSql(" INSERT INTO SOME TABLE..");
writer.setDataSource(dataSource);
return writer;
}
@Bean
public JdbcBatchItemWriter<Notification> exchangeWriter() {
JdbcBatchItemWriter<Notification> writer = new JdbcBatchItemWriter<Notification>();
writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<Notification>());
writer.setSql("INSERT INTO SOME OTHER TABLE.");
writer.setDataSource(dataSource);
return writer;
}
@Bean
public ItemWriter<Document> doNothing() {
return new DummyWriter();
}
@Bean
public Job generatePdf(JobCompletionNotificationListener listener) throws InstantiationException, IllegalAccessException {
return jobBuilderFactory.get("generatePdf")
.incrementer(new RunIdIncrementer())
.flow(treatStock())
.end()
.build();
}
@Bean
public Step treatStock() throws InstantiationException, IllegalAccessException {
return stepBuilderFactory.get("treatStock")
.<Notification, Notification>chunk(1)
.reader(reader())
.processor(notificatonProcessor())
.writer(compositeExchangeWriter())
.writer(updateWriter())
.build();
}
}CompositeItemWriter.java
public class CompositeItemWriterBuilder extends CompositeItemBuilder<CompositeItemWriter> {
public CompositeItemWriterBuilder(HashMap<String, Object> matcherMap, Class<?> routerDelegate) throws InstantiationException, IllegalAccessException {
BackToBackPatternClassifier classif = new BackToBackPatternClassifier();
classif.setRouterDelegate(routerDelegate.newInstance());
classif.setMatcherMap(matcherMap);
ClassifierCompositeItemWriter classifier = new ClassifierCompositeItemWriter();
classifier.setClassifier(classif);
this.delegates.add(classifier);
}
public CompositeItemWriterBuilder(List<Object> delegates) {
this.delegates = delegates;
}
@Override
protected Class<?> getCompositeItem() {
return CompositeItemWriter.class;
}
}CompositeItemBuiler.java
public abstract class CompositeItemBuilder<T> {
protected List<Object> delegates = new ArrayList<Object>();
@SuppressWarnings("unchecked")
public T build() throws InstantiationException, IllegalAccessException {
Object compositeItem = getCompositeItem().newInstance();
Method setDelegates = ReflectionUtils.findMethod(compositeItem.getClass(), "setDelegates", List.class);
ReflectionUtils.invokeMethod(setDelegates,compositeItem, delegates);
return (T) compositeItem;
}
abstract protected Class<?> getCompositeItem();
}ExchangeWriterRouterClassifier .java (不调用分类方法)
public class ExchangeWriterRouterClassifier {
@Classifier
public String classify(Notification notification) {
return notification.getExchangesWorkflow().getRouter().getActionName();
}
}Spring怎么叫分类器?我是不是遗漏了什么?
发布于 2018-07-20 10:06:04
但是,当我调用批处理时,没有调用写入器,我正在尝试CompositeItemWriter。
问题在您的步骤定义中:
@Bean
public Step treatStock() throws InstantiationException, IllegalAccessException {
return stepBuilderFactory.get("treatStock")
.<Notification, Notification>chunk(1)
.reader(reader())
.processor(notificatonProcessor())
.writer(compositeExchangeWriter())
.writer(updateWriter())
.build();
}您正在两次调用writer()方法,因此updateWriter()将覆盖compositeExchangeWriter()。您需要使用复合写入器作为参数调用该方法一次,在该参数上您已经设置了委托写入器。
在使用复合写入器时,请注意,如果委托没有实现ItemStream接口,则需要确保将它们注册为流。有关此问题的更多详细信息,请参见:https://docs.spring.io/spring-batch/4.0.x/reference/html/readersAndWriters.html#delegatePatternAndRegistering
Spring怎么叫分类器?
当正确配置ClassifierCompositeItemWriter时,Spring将对每个项调用分类器,以确定要使用哪个编写器,然后调用适当的编写器来编写该项。
在您的配置中,ClassifierCompositeItemWriter在这里没有正确配置:
@SuppressWarnings("unchecked")
public T build() throws InstantiationException, IllegalAccessException {
Object compositeItem = getCompositeItem().newInstance();
Method setDelegates = ReflectionUtils.findMethod(compositeItem.getClass(), "setDelegates", List.class);
ReflectionUtils.invokeMethod(setDelegates,compositeItem, delegates);
return (T) compositeItem;
}我不会用反射来设置代表。问题是,您期望方法compositeExchangeWriter注册一个ClassifierCompositeItemWriter,但是它的返回类型是CompositeItemWriter。因此,合成作者不被视为一个分类器。
您可以在这里找到一个如何使用ClassifierCompositeItemWriter的示例:https://github.com/spring-projects/spring-batch/blob/master/spring-batch-infrastructure/src/test/java/org/springframework/batch/item/support/ClassifierCompositeItemWriterTests.java#L44
https://stackoverflow.com/questions/51435285
复制相似问题