首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Spring停止并使用CompositeItemWriter启动多线程步骤

Spring停止并使用CompositeItemWriter启动多线程步骤
EN

Stack Overflow用户
提问于 2021-05-20 05:08:42
回答 1查看 635关注 0票数 1

我正试图通过Scheduler停止并开始一个多线程步骤。但我得到的例外是

代码语言:javascript
复制
Caused by: org.springframework.dao.InvalidDataAccessResourceUsageException: Unexpected cursor position change.

如果我正确理解,我们将无法重新启动多线程步骤。但我不会重新开始的。我通过stepExecution.setTerminateOnly()通过ChunkListener()停止作业,并试图在调度程序中通过jobLauncher.run()启动这一任务。这是我的密码;

代码语言:javascript
复制
public class BatchConfiguration {
    
    @Autowired
    private JobBuilderFactory jobBuilderFactory;
    
    @Autowired
    private StepBuilderFactory stepBuilderFactory;
        
    @Autowired
    public DataSource dataSource;
    
        @Bean
        public TaskExecutor taskExecutor(){
            SimpleAsyncTaskExecutor asyncTaskExecutor=new SimpleAsyncTaskExecutor("user_purge");
            asyncTaskExecutor.setConcurrencyLimit(5);
            return asyncTaskExecutor;
        }
    
        @Bean
        public Job userPurgeJob() {
            return jobBuilderFactory.get("userPurgeJob")
                    .start(userPurgeStep())
                    .listener(new JobLoggerListener())
                    .build();
        }   
            
        @Bean
        public Step userPurgeStep() {
            return stepBuilderFactory.get("userPurgeStep")
                    .<UserInfo, UserInfo> chunk(10)
                    .reader(userPurgeReader())
                    .writer(compositePurgeWriter())
                    .listener(new StopListener())
                    .taskExecutor(taskExecutor())
                    .build();
        }
        
        
        @Bean
        @StepScope
        public JdbcCursorItemReader<UserInfo> userPurgeReader(){
            JdbcCursorItemReader<UserInfo> reader = new JdbcCursorItemReader<UserInfo>();
            reader.setDataSource(dataSource);
            reader.setSql("SELECT user_id, user_status "
                    + "FROM db3.user_purge "
                    + "WHERE user_status = 'I' "
                    + "AND purge_status = 'N'");
            reader.setRowMapper(new SoftDeleteMapper());
      
            return reader;
        }
        
        @Bean
        public CompositeItemWriter<UserInfo> compositePurgeWriter() {
            CompositeItemWriter<UserInfo> compositeItemWriter = new CompositeItemWriter<>();
            compositeItemWriter.setDelegates(Arrays.asList(delMasterWriter(), delTableWriter()));
            return compositeItemWriter;
        }
        
            
        @Bean
        @StepScope
        public JdbcBatchItemWriter<UserInfo> delMasterWriter() {
            JdbcBatchItemWriter<UserInfo> writer = new JdbcBatchItemWriter<UserInfo>();
            writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>());
            writer.setSql("UPDATE db3.userinfo "
                    + "SET user_status = :userStatus, "
                        + "updated = NOW() "
                        + "WHERE user_id = :userId");
            writer.setDataSource(dataSource);
            
            return writer;
        }
        
        @Bean
        @StepScope
        public JdbcBatchItemWriter<UserInfo> delTableWriter() {
            JdbcBatchItemWriter<UserInfo> writer = new JdbcBatchItemWriter<UserInfo>();
            writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>());
            writer.setSql("UPDATE db3.user_purge SET purge_status = 'S', del_date = NOW() WHERE user_id = :userId");
            writer.setDataSource(dataSource);
            return writer;
        }
}

StopListener.java此ChunkListner类实现用于在晚上10点至早上6点以外的任何时间终止执行。

代码语言:javascript
复制
public class StopListener implements ChunkListener{
    private StepExecution stepExecution;
    
    @Autowired
    AppUtils appUtils;
    
    @Override
    public void beforeChunk(ChunkContext context) {
        
    }
    
    @Override
    public void afterChunk(ChunkContext context) {
        if (stopJob()) {
            this.stepExecution.setTerminateOnly();            
        }       
    }
    
    @Override
    public void afterChunkError(ChunkContext context) {
                
    }

    //Check the time between 10pm and 6am
    private boolean terminateJob() {
        Date date = new Date();
        Calendar calendar = GregorianCalendar.getInstance(); 
        calendar.setTime(date); 
        calendar.get(Calendar.HOUR_OF_DAY);
        
        if(calendar.get(Calendar.HOUR_OF_DAY) >= 6 
                && calendar.get(Calendar.HOUR_OF_DAY) < 22) {           
            return true;
        }else {
            return false;
        }
    }

}

最后,在应用程序类中使用我的调度器方法。我使用CommandLneRunner来接受参数。

代码语言:javascript
复制
@SpringBootApplication
@EnableScheduling
public class UserPurgeBatchApplication implements CommandLineRunner{
    static final Logger LOG = LogManager.getLogger(UserPurgeBatchApplication.class);
    
    @Autowired
    private JobLauncher jobLauncher;
    
    @Autowired
    private ApplicationContext context;
    
    @Autowired
    private JobRepository jobRepository;
    
    @Autowired
    private JobOperator jobOperator;
    
    private String jobName;
    private JobParameters jobParameters;
    private String inputFile;
    private String usertype;
    private boolean jobStatus = false;
    private String completionStatus;    

    public static void main(String[] args) throws Exception{
        SpringApplication.run(UserPurgeBatchApplication.class, args);           
        
    }
    
    @Override
    public void run(String... args) throws Exception {
        this.jobName = args[0];
        
        this.jobParameters = new JobParametersBuilder()
                .addLong("time", System.currentTimeMillis())
                .toJobParameters();
                
        
        LOG.info("||| Launching the JOB: " + jobName);  
        this.completionStatus  = jobSelector(jobName, jobParameters).getExitCode();
        LOG.info(">>> JOB completed with status: " + this.completionStatus);
    }


    public ExitStatus jobSelector(String jobName, JobParameters jobParameters) {        
        Job job = this.context.getBean(jobName, Job.class);
        
        try {
            return this.jobLauncher.run(job,  jobParameters).getExitStatus();
        } catch (JobExecutionAlreadyRunningException | 
                JobRestartException | 
                JobInstanceAlreadyCompleteException | 
                JobParametersInvalidException e) {
            
            e.printStackTrace();
        }
        
        return new ExitStatus("FAILED");
    }
    
    
    @Scheduled(cron = "0 0/30 22-23,23,0-6 * * *")
    public void batchStartScheduler() {
        LOG.info("---Beginning of batchScheduler()---");
        
        Long lastExecutionID = jobRepository.getLastJobExecution(jobName, jobParameters).getId();
        String jobStatus = jobRepository.getLastJobExecution(jobName, jobParameters).getStatus().toString();
        Job job = this.context.getBean(jobName, Job.class);
        
        if(!jobStatus.equals(ApplicationConstants.JOB_EXITSTATUS_COMPLETED)) {
            if(jobStatus.equals(ApplicationConstants.JOB_EXITSTATUS_STOPPED) 
                    || jobStatus.equals(ApplicationConstants.JOB_EXITSTATUS_FAILED)) {
                    
                try {
                LOG.info("|||Starting the Job...");
                    this.jobParameters = new JobParametersBuilder(jobParameters)
                            .addLong("time", System.currentTimeMillis())
                            .toJobParameters();
                    
                    this.jobLauncher.run(job,  jobParameters);
                } catch (JobExecutionAlreadyRunningException | JobRestartException | JobInstanceAlreadyCompleteException
                        | JobParametersInvalidException e) {
                    e.printStackTrace();
                }
            }
        }else {
            LOG.info("Scheduler not executed!!");           
        }
                
        LOG.info("---End of batchScheduler()---");
    }
    
}

有一些困惑。如果以前的执行失败,run方法总是尝试重新启动吗?因为我可以看到这件事还在重新开始,这可能就是原因。我试着提供新的JobParameter,希望它能再次启动。我希望我从ChunkListener的停止方法是可以的。但不知何故,我想重新开始这项工作,从计划,我肯定需要一个多小时的步骤。我还希望多线程步骤中的CompositeWriter也可以。如果能提供帮助,我们将不胜感激。提前感谢!

Update:最后,我可以通过添加reader.setVerifyCursorPosition(false)来使其工作。但是我认为我需要像马哈茂德·本·哈辛所建议的那样使用线程安全阅读器。因此,我尝试使用JdbcPagingItemReader,但得到的错误为"sortKey必须指定“。我想我已经说明了,但不确定它是否正确。这是我的JdbcPagingItemReader

代码语言:javascript
复制
@Bean
public JdbcPagingItemReader<UserInfo> jdbcPagingItemReader() {
    JdbcPagingItemReader<UserInfo> pagingItemReader = new JdbcPagingItemReader<>();

    pagingItemReader.setDataSource(dataSource);
    pagingItemReader.setFetchSize(3);
    pagingItemReader.setRowMapper(new SoftDeleteMapper());

    MySqlPagingQueryProvider mySqlPagingQueryProvider = new MySqlPagingQueryProvider();
    mySqlPagingQueryProvider.setSelectClause("SELECT user_id, user_status");
    mySqlPagingQueryProvider.setFromClause("FROM db3.user_purge");
    mySqlPagingQueryProvider.setWhereClause( "WHERE user_status = 'I' "
                                                + "AND purge_status = 'N'");

    Map<String, Order> orderByKeys = new HashMap<>();
    orderByKeys.put("user_id", Order.ASCENDING);

    mySqlPagingQueryProvider.setSortKeys(orderByKeys);
    pagingItemReader.setQueryProvider(mySqlPagingQueryProvider);

    return pagingItemReader;
}

我最新的一步

代码语言:javascript
复制
@Bean
public Step userPurgeStep() {
    return stepBuilderFactory.get("userPurgeStep")
            .<UserInfo, UserInfo> chunk(10)
            .reader(jdbcPagingItemReader())
            .writer(compositeSoftDelWriter())
            .listener(new StopListener())
            .taskExecutor(taskExecutor())
            .build();
}
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2021-05-20 09:38:15

多线程与重新启动不兼容。正如在Javadoc中提到的,如果在多线程步骤中使用JdbcCursorItemReader,则应该将JdbcCursorItemReader设置为false。

此外,JdbcCursorItemReader不是线程安全的,因为它包装了一个不是线程安全的ResultSet对象,也因为它继承了AbstractItemCountingItemStreamItemReader (也是非螺纹安全 )。因此,在多线程步骤中使用它是不正确的。这实际上是引发Unexpected cursor position change问题的原因。并发线程无意中修改了游标位置。

您需要通过将读取器包装在SynchronizedIteamStreamReader中或使用JdbcPagingItemReader ( 线程安全吗? )来同步对读取器的访问。

编辑:添加JdbcPagingItemReader示例

下面是一个独立的基于码头的示例:

代码语言:javascript
复制
import java.util.HashMap;
import java.util.Map;

import javax.sql.DataSource;

import com.mysql.cj.jdbc.MysqlDataSource;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.testcontainers.containers.MySQLContainer;
import org.testcontainers.utility.DockerImageName;

import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.item.database.JdbcPagingItemReader;
import org.springframework.batch.item.database.Order;
import org.springframework.batch.item.database.support.MySqlPagingQueryProvider;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ByteArrayResource;
import org.springframework.core.io.ClassPathResource;
import org.springframework.jdbc.core.BeanPropertyRowMapper;
import org.springframework.jdbc.datasource.init.ResourceDatabasePopulator;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@ContextConfiguration
public class SO67614305 {

    private static final DockerImageName MYSQL_IMAGE = DockerImageName.parse("mysql:8.0.24");

    @ClassRule
    public static MySQLContainer<?> mysql = new MySQLContainer<>(MYSQL_IMAGE);
    
    @Autowired
    private DataSource dataSource;
    @Autowired
    private JobLauncher jobLauncher;
    @Autowired
    private Job job;
    
    @Before
    public void setUp() {
        String schema = "/org/springframework/batch/core/schema-mysql.sql";
        String data = // the script is inline here to have a self contained example
                "create table person (ID int not null primary key, name varchar(20));" +
                "insert into person values (1, 'foo1'); insert into person values (2, 'foo2');" +
                "insert into person values (3, 'foo3'); insert into person values (4, 'foo4');";
        ResourceDatabasePopulator databasePopulator = new ResourceDatabasePopulator();
        databasePopulator.addScript(new ClassPathResource(schema));
        databasePopulator.addScript(new ByteArrayResource(data.getBytes()));
        databasePopulator.execute(this.dataSource);
    }

    @Test
    public void testJob() throws Exception {
        // given
        JobParameters jobParameters = new JobParameters();
        
        // when
        JobExecution jobExecution = this.jobLauncher.run(this.job, jobParameters);

        // then
        Assert.assertEquals(ExitStatus.COMPLETED, jobExecution.getExitStatus());
    }

    @Configuration
    @EnableBatchProcessing
    static class TestConfiguration {

        @Bean
        public DataSource dataSource() throws Exception {
            MysqlDataSource datasource = new MysqlDataSource();
            datasource.setURL(mysql.getJdbcUrl());
            datasource.setUser(mysql.getUsername());
            datasource.setPassword(mysql.getPassword());
            datasource.setUseSSL(false);
            return datasource;
        }

        @Bean
        public JdbcPagingItemReader<Person> jdbcPagingItemReader() throws Exception {
            MySqlPagingQueryProvider mySqlPagingQueryProvider = new MySqlPagingQueryProvider();
            mySqlPagingQueryProvider.setSelectClause("SELECT id, name");
            mySqlPagingQueryProvider.setFromClause("FROM person");
            Map<String, Order> orderByKeys = new HashMap<>();
            orderByKeys.put("id", Order.DESCENDING);
            mySqlPagingQueryProvider.setSortKeys(orderByKeys);

            JdbcPagingItemReader<Person> pagingItemReader = new JdbcPagingItemReader<>();
            pagingItemReader.setDataSource(dataSource());
            pagingItemReader.setFetchSize(2);
            pagingItemReader.setRowMapper(new BeanPropertyRowMapper<>(Person.class));
            pagingItemReader.setQueryProvider(mySqlPagingQueryProvider);
            return pagingItemReader;
        }

        @Bean
        public Job job(JobBuilderFactory jobs, StepBuilderFactory steps) throws Exception {
            return jobs.get("job")
                    .start(steps.get("step").chunk(2)
                            .reader(jdbcPagingItemReader())
                            .writer(items -> items.forEach(System.out::println))
                            .build())
                    .build();
        }

        static class Person {
            int id;
            String name;

            public int getId() {
                return id;
            }

            public void setId(int id) {
                this.id = id;
            }

            public String getName() {
                return name;
            }

            public void setName(String name) {
                this.name = name;
            }

            @Override
            public String toString() {
                return "Person{" +
                        "id=" + id +
                        ", name='" + name + '\'' +
                        '}';
            }
        }

    }
}

这将按照预期按降序打印项,而不会抱怨缺少排序键:

代码语言:javascript
复制
Person{id=4, name='foo4'}
Person{id=3, name='foo3'}
Person{id=2, name='foo2'}
Person{id=1, name='foo1'}
票数 3
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/67614305

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档