spring 如何重新启动失败的春季批处理作业并让它从停止的地方开始?
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/38846457/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
How can you restart a failed spring batch job and let it pick up where it left off?
提问by Tranquilized
According to the Spring Batch documentation restarting of a job is supported out of the box but I cannot get it to start from where it left of. e.g. If my step processed 10 records it should start at record 11 with processing whenever I restart it. In practice this doesn't happen. It reads from the beginnen en reprocesses everything.
根据 Spring Batch 文档,重新启动工作是开箱即用的,但我无法让它从它离开的地方开始。例如,如果我的步骤处理了 10 条记录,则每当我重新启动它时,它都应该从记录 11 开始处理。在实践中这不会发生。它从头开始读取并重新处理所有内容。
Does anybody have a Java config based configuration of a simple job that reads a delimited file and writes the content to a db table that can be restarted from the point it stopped?
有没有人有一个基于 Java 配置的简单作业的配置,它读取一个分隔文件并将内容写入一个可以从它停止的点重新启动的 db 表?
@Configuration
public class BatchConfiguration {
@Value("${spring-batch.databaseType}")
private String databaseType;
@Value("${spring-batch.databaseSchema}")
private String schemaName;
@Bean
public JobBuilderFactory jobBuilderFactory(final JobRepository jobRepository) {
return new JobBuilderFactory(jobRepository);
}
@Bean
public StepBuilderFactory stepBuilderFactory(final JobRepository jobRepository,
final PlatformTransactionManager transactionManager) {
return new StepBuilderFactory(jobRepository, transactionManager);
}
@Bean
public JobRepository jobRepository(final DataSource dataSource, final PlatformTransactionManager transactionManager) {
final JobRepositoryFactoryBean bean = new JobRepositoryFactoryBean();
bean.setDatabaseType(databaseType);
bean.setDataSource(dataSource);
if (StringUtils.isNotBlank(schemaName)) {
bean.setTablePrefix(schemaName);
}
bean.setTransactionManager(transactionManager);
try {
bean.afterPropertiesSet();
return bean.getObject();
} catch (final Exception e) {
throw new BatchConfigurationException("Invalid batch job repository configuration.", e);
}
}
@Bean
public JobLauncher jobLauncher(final JobRepository jobRepository) {
final SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
jobLauncher.setJobRepository(jobRepository);
return jobLauncher;
}
}
@Configuration
@EnableScheduling
@ComponentScan("com.some.package")
public class BatchJobConfiguration {
@Resource
private JobBuilderFactory jobBuilderFactory;
@Resource
private StepBuilderFactory stepBuilderFactory;
@Value("${savings-transaction.file}")
private String savingsTransactionFile;
@Value("${savings-balance.file}")
private String savingsBalanceFile;
@Value("${processed-directory}")
private String processedDirectory;
private static final Integer IMPORT_CHUNKSIZE = 10;
@Bean
@DependsOn("stepBuilderFactory")
public Step savingsTransactionStep(final PlatformTransactionManager transactionManager,
@Qualifier("savingsTransactionItemReader") final ItemReader<SavingsTransactionItem> savingsTransactionItemReader,
@Qualifier("savingsTransactionProcessor") final ItemProcessor<SavingsTransactionItem, SavingsTransaction> processor,
@Qualifier("savingsTransactionItemWriter") final ItemWriter<SavingsTransaction> savingsTransactionItemWriter,
@Qualifier("savingsTransactionStepListener") final SavingsTransactionStepListener listener) {
return stepBuilderFactory.get("savingsTransactionStep")
.transactionManager(transactionManager)
.<SavingsTransactionItem, SavingsTransaction> chunk(IMPORT_CHUNKSIZE)
.reader(savingsTransactionItemReader)
.processor(processor)
.writer(savingsTransactionItemWriter)
.listener(listener)
.build();
}
@Bean
public Step savingsTransactionCleanUpStep(final PlatformTransactionManager transactionManager,
final JobRepository jobRepository) {
final TaskletStep taskletStep = new TaskletStep("savingsTransactionCleanUpStep");
final FileMovingTasklet tasklet = new FileMovingTasklet();
tasklet.setFileNamePattern(savingsTransactionFile);
tasklet.setProcessedDirectory(processedDirectory);
taskletStep.setTasklet(tasklet);
taskletStep.setTransactionManager(transactionManager);
taskletStep.setJobRepository(jobRepository);
try {
taskletStep.afterPropertiesSet();
} catch (final Exception e) {
throw new BatchConfigurationException("Failed to configure tasklet!", e);
}
return taskletStep;
}
@Bean
@DependsOn("jobBuilderFactory")
public Job job(final Step savingsTransactionStep,
final Step savingsTransactionCleanUpStep) {
return jobBuilderFactory.get("job")
.incrementer(new RunIdIncrementer())
.start(savingsTransactionStep)
.next(savingsTransactionCleanUpStep)
.on("FINISHED")
.end()
.build()
.build();
}
}
Unit test code that restarts the job
重新启动作业的单元测试代码
final Date now = new Date();
jobMananger.processRegistrations(now);
final List<SavingsBalance> savingsBalances = savingsBalanceDao.findAll();
assertEquals(9, savingsBalances.size());
FileUtils.moveFile(new File("target/AEA001_20160610.dat"), new File("target/AEA001_20160610_invalid.dat"));
FileUtils.moveFile(new File("target/AEA001_20160610_valid.dat"), new File("target/AEA001_20160610.dat"));
jobMananger.processRegistrations(now);
final List<SavingsBalance> savingsBalances2 = savingsBalanceDao.findAll();
System.out.println(savingsBalances2.size());
int found = 0;
for (final SavingsBalance savingsBalance : savingsBalances2) {
final String id = savingsBalance.getId();
if ("12345".equals(id)) {
found++;
}
}
assertEquals("Invalid number of found balances!", 1, found);
The job manager implementation
作业管理器实现
public class JobManager {
@Resource
private JobLauncher jobLauncher;
@Resource
private Job job;
@Transactional(propagation = Propagation.NOT_SUPPORTED)
public void processRegistrations(final Date date) {
try {
final Map<String, JobParameter> parameters = new HashMap<>();
parameters.put("START_DATE", new JobParameter(date));
final JobParameters jobParameters = new JobParameters(parameters);
final JobExecution execution = jobLauncher.run(job, jobParameters);
LOG.info("Exit Status : " + execution.getStatus());
} catch (JobExecutionAlreadyRunningException | JobRestartException | JobInstanceAlreadyCompleteException
| JobParametersInvalidException e) {
LOG.error("Failed to process registrations.", e);
}
}
}
采纳答案by Tranquilized
It seems you have to configure the following beans to be able to restart your job.
看来您必须配置以下 bean 才能重新启动您的工作。
@Bean
public JobOperator jobOperator(final JobLauncher jobLauncher, final JobRepository jobRepository,
final JobRegistry jobRegistry, final JobExplorer jobExplorer) {
final SimpleJobOperator jobOperator = new SimpleJobOperator();
jobOperator.setJobLauncher(jobLauncher);
jobOperator.setJobRepository(jobRepository);
jobOperator.setJobRegistry(jobRegistry);
jobOperator.setJobExplorer(jobExplorer);
return jobOperator;
}
@Bean
public JobExplorer jobExplorer(final DataSource dataSource) throws Exception {
final JobExplorerFactoryBean bean = new JobExplorerFactoryBean();
bean.setDataSource(dataSource);
bean.setTablePrefix("BATCH_");
bean.setJdbcOperations(new JdbcTemplate(dataSource));
bean.afterPropertiesSet();
return bean.getObject();
}
Then you need to retrieve the batch instance id from the batch tables to be able to restart that specific instance by using the jobOperator.
然后,您需要从批处理表中检索批处理实例 ID,以便能够使用 jobOperator 重新启动该特定实例。
final Long restartId = jobOperator.restart(id);
final JobExecution restartExecution = jobExplorer.getJobExecution(restartId);
回答by DevG
Inside your JobManager class , instead of using JobLauncher , use JobOperator.restart() nethod .
在您的 JobManager 类中,不要使用 JobLauncher ,而是使用 JobOperator.restart() nethod 。
The reason why your job is not getting restarted from the last failed step is because with JobLauncher you are again starting one more new job and hence it is starting the job from the step one .
您的工作没有从最后一个失败的步骤重新启动的原因是因为使用 JobLauncher,您又开始了一项新工作,因此它是从第一步开始工作。
Please make sure that "restartable" property is set to true (By default it is set to true ) .
请确保“restartable”属性设置为 true (默认设置为 true )。
Here is the sample code .
这是示例代码。
public boolean resumeWorkflow(long executionId)
throws WorkflowResumeServiceException {
JobOperator jobOperator = (JobOperator) ApplicationContextProvider.getApplicationContext().getBean("jobOperator");
try
{
LOGGER.info("SUMMARY AFTER RESTART:" + jobOperator.getSummary(executionId));
jobOperator.restart(executionId);
}
}
You need to get the jobExecutionid of the failed job and pass it to the above method .
您需要获取失败作业的 jobExecutionid 并将其传递给上述方法。
Please note that a job which is completed with "FINISHED" status can not be restarted .
请注意,以“FINISHED”状态完成的作业无法重新启动。
You can read this post also Restarting a job
您也可以阅读这篇文章 重新开始工作
回答by Dmitry
You start your job with new JobParameters, so SB doesn't resume Job, but starts new one.
If you want resume Job, you should remove incrementer from Job bean config.
您使用新的 JobParameters 开始您的工作,因此 SB 不会恢复 Job,而是开始新的 Job。
如果你想恢复 Job,你应该从 Job bean 配置中删除增量器。