总之,我在使用Spring Batch CompositeItemWriter时遇到了事务回滚问题。Spring batch的版本是2.2.7。
我的配置与这里的帖子相同:https://stackoverflow.com/questions/32432519/compositeitemwriter-doesnt-roll-back-in-spring-batch
<bean id="compositeItemWriter" class="org.springframework.batch.item.support.CompositeItemWriter">
<property name="delegates">
<list>
<ref bean="firstItemWriter"/>
<ref bean="secondItemWriter"/>
<ref bean="thirdItemWriter"/>
</list>
</property>
</bean>
<bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager" >
<property name="dataSource" ref="dataSource"/>
</bean>
<bean id="dataSource"
class="org.apache.commons.dbcp.BasicDataSource">
<property name="driverClassName" value="${batch.jdbc.driver}" />
<property name="url" value="${batch.jdbc.url}" />
<property name="username" value="${batch.jdbc.user}" />
<property name="password" value="${batch.jdbc.password}" />
</bean>当thirdItemWriter抛出异常时,由以前的编写器写入数据库的数据将不会回滚。
我不确定我错过了什么,但我的理解是,复合项编写器共享相同的事务,所有数据都应该在发生异常时回滚。
欢迎提出任何建议。
发布于 2015-11-16 06:12:12
经过几天的努力,终于有了一个解决方案。关键是将所有写入器放在一个事务中,并关闭自动提交。
public class TransactionAwareCompositeItemWritter<T> extends CompositeItemWriter<T> {
private static Logger LOG = LoggerFactory.getLogger(TransactionAwareCompositeItemWritter.class);
private List<ItemWriter<? super T>> delegates;
private PlatformTransactionManager transactionManager;
private JdbcTemplate jdbcTemplate;
@Override
public void write(final List<? extends T> item) throws Exception {
Connection con = DataSourceUtils.getConnection(jdbcTemplate.getDataSource());
if (con.getAutoCommit()) {
LOG.debug("Switching JDBC Connection [" + con + "] to manual commit");
con.setAutoCommit(false);
}
DefaultTransactionDefinition def = new DefaultTransactionDefinition();
def.setName("CompositeItemWriter Transaction");
def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);
TransactionStatus status = transactionManager.getTransaction(def);
try {
for (ItemWriter<? super T> writer : delegates) {
writer.write(item);
}
} catch (Exception e) {
transactionManager.rollback(status);
throw e;
}
transactionManager.commit(status);
if (!con.getAutoCommit()) {
LOG.debug("Switching JDBC Connection [" + con + "] to auto commit");
con.setAutoCommit(true);
}
}
public void setDelegates(List<ItemWriter<? super T>> delegates) {
super.setDelegates(delegates);
this.delegates = delegates;
}
public PlatformTransactionManager getTransactionManager() {
return transactionManager;
}
public void setTransactionManager(PlatformTransactionManager transactionManager) {
this.transactionManager = transactionManager;
}
public JdbcTemplate getJdbcTemplate() {
return jdbcTemplate;
}
public void setJdbcTemplate(JdbcTemplate jdbcTemplate) {
this.jdbcTemplate = jdbcTemplate;
}}
发布于 2020-07-06 11:59:47
创建自定义compositeItemWrite。“@Transactional”适用于我的代码。
package com.krafton.intraplatform.batch.job.step;
import java.util.Iterator;
import java.util.List;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemStream;
import org.springframework.batch.item.ItemStreamException;
import org.springframework.batch.item.ItemStreamWriter;
import org.springframework.batch.item.ItemWriter;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.Assert;
public class CustomCompositeWriter<T> implements ItemStreamWriter<T>, InitializingBean {
private List<ItemWriter<? super T>> delegates;
private boolean ignoreItemStream = false;
public CustomCompositeWriter() {
}
public void setIgnoreItemStream(boolean ignoreItemStream) {
this.ignoreItemStream = ignoreItemStream;
}
@Transactional
public void write(List<? extends T> item) throws Exception {
Iterator var2 = this.delegates.iterator();
while(var2.hasNext()) {
ItemWriter<? super T> writer = (ItemWriter)var2.next();
writer.write(item);
}
}
public void afterPropertiesSet() throws Exception {
Assert.notNull(this.delegates, "The 'delegates' may not be null");
Assert.notEmpty(this.delegates, "The 'delegates' may not be empty");
}
public void setDelegates(List<ItemWriter<? super T>> delegates) {
this.delegates = delegates;
}
public void close() throws ItemStreamException {
Iterator var1 = this.delegates.iterator();
while(var1.hasNext()) {
ItemWriter<? super T> writer = (ItemWriter)var1.next();
if (!this.ignoreItemStream && writer instanceof ItemStream) {
((ItemStream)writer).close();
}
}
}
public void open(ExecutionContext executionContext) throws ItemStreamException {
Iterator var2 = this.delegates.iterator();
while(var2.hasNext()) {
ItemWriter<? super T> writer = (ItemWriter)var2.next();
if (!this.ignoreItemStream && writer instanceof ItemStream) {
((ItemStream)writer).open(executionContext);
}
}
}
public void update(ExecutionContext executionContext) throws ItemStreamException {
Iterator var2 = this.delegates.iterator();
while(var2.hasNext()) {
ItemWriter<? super T> writer = (ItemWriter)var2.next();
if (!this.ignoreItemStream && writer instanceof ItemStream) {
((ItemStream)writer).update(executionContext);
}
}
}
}https://stackoverflow.com/questions/33603933
复制相似问题