Spring 批处理 - 并行运行多个作业
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/45718888/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Spring batch - running multiple jobs in parallel
提问by ljustin
I am new to Spring batch and couldn't figure out how to do this..
我是 Spring Batch 的新手,不知道该怎么做。
Basically I have a spring file poller which runs every N mins to look for files with some name (ex: A.txt & B.txt) in certain directory. At any moment in time, there could be max 2 files in this directory (A and B). Through Spring Batch Job, these two files will be processed and persisted to 2 different DB tables.
基本上我有一个 spring 文件轮询器,它每 N 分钟运行一次,以在某个目录中查找具有某个名称(例如:A.txt 和 B.txt)的文件。在任何时候,此目录中最多可能有 2 个文件(A 和 B)。通过 Spring Batch Job,这两个文件将被处理并持久化到 2 个不同的 DB 表中。
These files are somewhat similar, so the same processor/writer is used.
这些文件有些相似,因此使用相同的处理器/写入器。
Right now the way I set up, every polling cycle 1 file is picked up and job is ran.
现在按照我的设置方式,每个轮询周期 1 文件都会被选中并运行。
Let's say there are 2 files in the directory (A.txt and B.txt), is there a way to create 2 jobs so that both jobs can be run in parallel?
假设目录中有 2 个文件(A.txt 和 B.txt),有没有办法创建 2 个作业以便两个作业可以并行运行?
回答by Daniel C.
There are very good approaches in order to run jobs in async mode with Spring, it is just a matter of how is configured the JobLauncher. The JobLauncherhas a taskExecutorproperty and the asynchronous execution could be activated depending on the implementation that is assigned to that property.
有一些很好的方法可以使用 Spring 以异步模式运行作业,这只是如何配置JobLauncher. 该JobLauncher有一个taskExecutor属性和异步执行可以根据分配给该属性的实现被激活。
You can find all the TaskExecutortypes that Spring can provide and depending on your needs select the best approach to accomplish your batch asynchronous jobs. Task Executors Types in Spring
您可以找到TaskExecutorSpring 可以提供的所有类型,并根据您的需要选择最佳方法来完成您的批处理异步作业。 Spring 中的任务执行器类型
For example SimpleAsyncTaskExecutoris a task executor that will create a new Threadon any invocation and that could generate a performance issue if the execution runs with high frequency. In the other hand there are also TaskExecutorstypes that provides pooling features in order to reuse resources and maximize the efficiency of the system.
例如SimpleAsyncTaskExecutor,一个任务执行器将Thread在任何调用时创建一个新的,如果执行运行频率高,它可能会产生性能问题。另一方面,也TaskExecutors有提供池化特性的类型,以重用资源并最大化系统效率。
Here is an small example of how configure a ThreadPoolTaskExecutor:
这是一个如何配置 a 的小例子ThreadPoolTaskExecutor:
A) Configure ThreadPoolTaskExecutor Bean
A) 配置 ThreadPoolTaskExecutor Bean
@Bean
public ThreadPoolTaskExecutor taskExecutor() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(15);
taskExecutor.setMaxPoolSize(20);
taskExecutor.setQueueCapacity(30);
return taskExecutor;
}
B) Configure JobLauncher Bean
B) 配置 JobLauncher Bean
@Bean
public JobLauncher jobLauncher(ThreadPoolTaskExecutor taskExecutor, JobRepository jobRepository){
SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
jobLauncher.setTaskExecutor(taskExecutor);
jobLauncher.setJobRepository(jobRepository);
return jobLauncher;
}
C) Inject your JobLauncherand your Jobsconfiguration
C) 注入你JobLauncher和你的Jobs配置
@Autowired
private JobLauncher jobLauncher;
@Autowired
@Qualifier("job1-file-A")
private Job job1;
@Autowired
@Qualifier("job2-file-B")
private Job job2;
D) Schedule the jobs
D) 安排工作
@Scheduled(cron = "*/1 * * * * *")
public void run1(){
Map<String, JobParameter> confMap = new HashMap<>();
confMap.put("time", new JobParameter(System.currentTimeMillis()));
JobParameters jobParameters = new JobParameters(confMap);
try {
jobLauncher.run(job1, jobParameters);
}catch (Exception ex){
logger.error(ex.getMessage());
}
}
@Scheduled(cron = "*/1 * * * * *")
public void run2(){
Map<String, JobParameter> confMap = new HashMap<>();
confMap.put("time", new JobParameter(System.currentTimeMillis()));
JobParameters jobParameters = new JobParameters(confMap);
try {
jobLauncher.run(job2, jobParameters);
}catch (Exception ex){
logger.error(ex.getMessage());
}
}
E) Finally on your SpringBoot Class @EnableBatchProcessingand @EnableScheduling
E) 最后在你的 SpringBoot 类@EnableBatchProcessing和@EnableScheduling
@EnableBatchProcessing
@EnableScheduling
@SpringBootApplication
public class MyBatchApp {
回答by Lachezar Balev
I believe that you can. Since you are new in spring batch (just like me) I would recommend that you go through the domain language of a batchif you haven't done so already.
我相信你可以。由于您是春季批次的新手(就像我一样),如果您还没有这样做,我建议您通过批次的领域语言。
Then you may start by configuring your own asynchronousJobLauncher. For example:
然后你可以从配置你自己的异步开始JobLauncher。例如:
@Bean
public JobLauncher jobLauncher() throws Exception
{
SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
jobLauncher.setJobRepository(jobRepository);
jobLauncher.setTaskExecutor(new SimpleAsyncTaskExecutor());
jobLauncher.afterPropertiesSet();
return jobLauncher;
}
Pay special attention to SimpleAsyncTaskExecutor(the job repo can be autowired). This configuration will allow asynchronousexecution as visualized next:
特别注意SimpleAsyncTaskExecutor(工作仓库可以自动装配)。此配置将允许异步执行,如下所示:
Compare it with the synchronous execution flow:
与同步执行流程对比:
Maybe it would additionally help to quote the SimpleJobLauncherjava doc:
也许引用SimpleJobLauncherjava doc还会有帮助:
Simple implementation of the JobLauncher interface. The Spring Core TaskExecutor interface is used to launch a Job. This means that the type of executor set is very important. If a SyncTaskExecutor is used, then the job will be processed within the same thread that called the launcher. Care should be taken to ensure any users of this class understand fully whether or not the implementation of TaskExecutor used will start tasks synchronously or asynchronously. The default setting uses a synchronous task executor.
JobLauncher 接口的简单实现。Spring Core TaskExecutor 接口用于启动 Job。这意味着执行器集的类型非常重要。如果使用 SyncTaskExecutor,则作业将在调用启动程序的同一线程内处理。应注意确保此类的任何用户都完全了解所使用的 TaskExecutor 的实现是同步还是异步启动任务。默认设置使用同步任务执行器。
More details and configuration options - here.
更多细节和配置选项 -在这里。
At the end just create the jobs with differentnames and/or launch them with different parameter set. Naive example would be:
最后,只需创建具有不同名称的作业和/或使用不同的参数集启动它们。天真的例子是:
@Autowired
public JobBuilderFactory jobBuilderFactory;
public Job createJobA() {
return jobBuilderFactory.get("A.txt")
.incrementer(new RunIdIncrementer())
.flow(step1())
.next(step2())
.end()
.build();
}
public Job createJobB() {
return jobBuilderFactory.get("B.txt")
.incrementer(new RunIdIncrementer())
.flow(step1())
.next(step2())
.end()
.build();
}
Executing these jobs with your asynchronous job launcher will create two job instances that which will execute in parallel. This is just one option, that may or may not be suitable for your context.
使用异步作业启动器执行这些作业将创建两个并行执行的作业实例。这只是一种选择,可能适合也可能不适合您的上下文。


