Java Spring Batch:具有多线程执行器的 Tasklet 与节流算法相关的性能非常差

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/18262857/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-11 23:50:33  来源:igfitidea点击:

Spring Batch : Tasklet with multi threaded executor has very bad performances related to Throttling algorithm

javamultithreadingperformancespringspring-batch

提问by pmpm

Using Spring batch 2.2.1, I have configured a Spring Batch Job, I used this approach:

使用 Spring Batch 2.2.1,我配置了一个 Spring Batch Job,我使用了这种方法:

Configuration is the following:

配置如下:

  • Tasklet uses ThreadPoolTaskExecutor limited to 15 threads

  • throttle-limit is equal to number of threads

  • Chunk is used with:

    • 1 synchronized adapter of JdbcCursorItemReader to allow it's use by many threads as per Spring Batch documentation recommandation

      You can synchronize the call to read() and as long as the processing and writing is the most expensive part of the chunk your step may still complete much faster than in a single threaded configuration.

    • saveState is false on JdbcCursorItemReader

    • A Custom ItemWriter based on JPA. Note that its processing of one item can vary in terms of processing time, it can take few millis to few seconds ( > 60s).

    • commit-interval set to 1 (I know it could be better but it's not the issue)

  • All jdbc pools are fine, regarding Spring Batch doc recommandation

  • Tasklet 使用 ThreadPoolTask​​Executor 限制为 15 个线程

  • 节流限制等于线程数

  • 块用于:

    • JdbcCursorItemReader 的 1 个同步适配器,允许它根据 Spring Batch 文档推荐被多个线程使用

      您可以同步对 read() 的调用,只要处理和写入是块中最昂贵的部分,您的步骤仍可能比单线程配置更快地完成。

    • JdbcCursorItemReader 上的 saveState 为 false

    • 基于 JPA 的自定义 ItemWriter。请注意,它对一项的处理在处理时间方面可能会有所不同,可能需要几毫秒到几秒(> 60 秒)。

    • commit-interval 设置为 1(我知道它可能会更好,但这不是问题)

  • 所有 jdbc 池都很好,关于 Spring Batch 文档推荐

Running the batch leads to very strange and bad results due to the following:

由于以下原因,运行批处理会导致非常奇怪和糟糕的结果:

  • at some step, if the items take some time to process by a writer, nearly all threads in the thread pool end up doing nothing instead of processing, only the slow writer is working.
  • 在某个步骤中,如果写入器需要一些时间来处理这些项目,则线程池中的几乎所有线程最终都不会执行任何操作,而是进行处理,只有慢速写入器在工作。

Looking at Spring Batch code, root cause seems to be in this package:

查看 Spring Batch 代码,根本原因似乎在这个包中:

  • org/springframework/batch/repeat/support/
  • org/springframework/batch/repeat/support/

Is this way of working a feature or is it a limitation/bug ?

这种工作方式是功能还是限制/错误?

If it's a feature, what is the way by configuration to make all threads without being starved by long processing work without having to rewrite everything ?

如果它是一个功能,那么通过配置使所有线程不会因长时间处理工作而饥饿而无需重写所有内容的方法是什么?

Note that if all items take the same time, everything works fine and multi-threading is OK, but if one of the item processing takes much more time, then multi-threading is nearly useless for the time the slow process works.

请注意,如果所有项目都花费相同的时间,则一切正常,多线程也可以,但如果其中一个项目处理需要更多时间,那么在慢速进程工作的时间里,多线程几乎毫无用处。

Note I opened this issue:

注意我打开了这个问题:

采纳答案by UBIK LOAD PACK

As Alex said, it seems this behaviour is a contract as per javadocs of :

正如亚历克斯所说,根据以下的javadocs,这种行为似乎是一种契约:

Subclasses just need to provide a method that gets the next result * and one that waits for all the results to be returned from concurrent * processes or threads

子类只需要提供一个方法来获取下一个结果 * 和一个等待所有结果从并发进程或线程返回的方法 *

Look at:

看着:

TaskExecutorRepeatTemplate#waitForResults

TaskExecutorRepeatTemplate#waitForResults

Another option for you would be to use Partitioning :

您的另一个选择是使用 Partitioning :

  • A TaskExecutorPartitionHandler that will execute items from Partitionned ItemReader, see below
  • A Partitioner implementation that gives the ranges to be processed by ItemReader, see ColumnRangePartitioner below
  • A CustomReader that will read data using what Partitioner will have filled, see myItemReader configuration below
  • 一个 TaskExecutorPartitionHandler 将从 Partitionned ItemReader 执行项目,见下文
  • 一个 Partitioner 实现,它给出了 ItemReader 要处理的范围,请参阅下面的 ColumnRangePartitioner
  • 一个 CustomReader 将使用 Partitioner 将填充的内容读取数据,请参阅下面的 myItemReader 配置

Michael Minella explains this in Chapter 11 of his book Pro Spring Batch:

Michael Minella 在他的书Pro Spring Batch 的第 11 章中解释了这一点

<batch:job id="batchWithPartition">
    <batch:step id="step1.master">
        <batch:partition  partitioner="myPartitioner" handler="partitionHandler"/>
    </batch:step>       
</batch:job>
<!-- This one will create Paritions of Number of lines/ Grid Size--> 
<bean id="myPartitioner" class="....ColumnRangePartitioner"/>
<!-- This one will handle every partition in a Thread -->
<bean id="partitionHandler" class="org.springframework.batch.core.partition.support.TaskExecutorPartitionHandler">
    <property name="taskExecutor" ref="multiThreadedTaskExecutor"/>
    <property name="step" ref="step1" />
    <property name="gridSize" value="10" />
</bean>
<batch:step id="step1">
        <batch:tasklet transaction-manager="transactionManager">
            <batch:chunk reader="myItemReader"
                writer="manipulatableWriterForTests" commit-interval="1"
                skip-limit="30000">
                <batch:skippable-exception-classes>
                    <batch:include class="java.lang.Exception" />
                </batch:skippable-exception-classes>
            </batch:chunk>
        </batch:tasklet>
</batch:step>
 <!-- scope step is critical here-->
<bean id="myItemReader"    
                        class="org.springframework.batch.item.database.JdbcCursorItemReader" scope="step">
    <property name="dataSource" ref="dataSource"/>
    <property name="sql">
        <value>
            <![CDATA[
                select * from customers where id >= ? and id <=  ?
            ]]>
        </value>
    </property>
    <property name="preparedStatementSetter">
        <bean class="org.springframework.batch.core.resource.ListPreparedStatementSetter">
            <property name="parameters">
                <list>
 <!-- minValue and maxValue are filled in by Partitioner for each Partition in an ExecutionContext-->
                    <value>{stepExecutionContext[minValue]}</value>
                    <value>#{stepExecutionContext[maxValue]}</value>
                </list>
            </property>
        </bean>
    </property>
    <property name="rowMapper" ref="customerRowMapper"/>
</bean>
<batch:job id="batchWithPartition">
    <batch:step id="step1.master">
        <batch:partition  partitioner="myPartitioner" handler="partitionHandler"/>
    </batch:step>       
</batch:job>
<!-- This one will create Paritions of Number of lines/ Grid Size--> 
<bean id="myPartitioner" class="....ColumnRangePartitioner"/>
<!-- This one will handle every partition in a Thread -->
<bean id="partitionHandler" class="org.springframework.batch.core.partition.support.TaskExecutorPartitionHandler">
    <property name="taskExecutor" ref="multiThreadedTaskExecutor"/>
    <property name="step" ref="step1" />
    <property name="gridSize" value="10" />
</bean>
<batch:step id="step1">
        <batch:tasklet transaction-manager="transactionManager">
            <batch:chunk reader="myItemReader"
                writer="manipulatableWriterForTests" commit-interval="1"
                skip-limit="30000">
                <batch:skippable-exception-classes>
                    <batch:include class="java.lang.Exception" />
                </batch:skippable-exception-classes>
            </batch:chunk>
        </batch:tasklet>
</batch:step>
 <!-- scope step is critical here-->
<bean id="myItemReader"    
                        class="org.springframework.batch.item.database.JdbcCursorItemReader" scope="step">
    <property name="dataSource" ref="dataSource"/>
    <property name="sql">
        <value>
            <![CDATA[
                select * from customers where id >= ? and id <=  ?
            ]]>
        </value>
    </property>
    <property name="preparedStatementSetter">
        <bean class="org.springframework.batch.core.resource.ListPreparedStatementSetter">
            <property name="parameters">
                <list>
 <!-- minValue and maxValue are filled in by Partitioner for each Partition in an ExecutionContext-->
                    <value>{stepExecutionContext[minValue]}</value>
                    <value>#{stepExecutionContext[maxValue]}</value>
                </list>
            </property>
        </bean>
    </property>
    <property name="rowMapper" ref="customerRowMapper"/>
</bean>

Partitioner.java:

Partitioner.java:

 package ...;
  import java.util.HashMap;  
 import java.util.Map;
 import org.springframework.batch.core.partition.support.Partitioner;
 import org.springframework.batch.item.ExecutionContext;
 public class ColumnRangePartitioner  implements Partitioner {
 private String column;
 private String table;
 public Map<String, ExecutionContext> partition(int gridSize) {
    int min =  queryForInt("SELECT MIN(" + column + ") from " + table);
    int max = queryForInt("SELECT MAX(" + column + ") from " + table);
    int targetSize = (max - min) / gridSize;
    System.out.println("Our partition size will be " + targetSize);
    System.out.println("We will have " + gridSize + " partitions");
    Map<String, ExecutionContext> result = new HashMap<String, ExecutionContext>();
    int number = 0;
    int start = min;
    int end = start + targetSize - 1;
    while (start <= max) {
        ExecutionContext value = new ExecutionContext();
        result.put("partition" + number, value);
        if (end >= max) {
            end = max;
        }
        value.putInt("minValue", start);
        value.putInt("maxValue", end);
        System.out.println("minValue = " + start);
        System.out.println("maxValue = " + end);
        start += targetSize;
        end += targetSize;
        number++;
    }
    System.out.println("We are returning " + result.size() + " partitions");
    return result;
}
public void setColumn(String column) {
    this.column = column;
}
public void setTable(String table) {
    this.table = table;
}
}
 package ...;
  import java.util.HashMap;  
 import java.util.Map;
 import org.springframework.batch.core.partition.support.Partitioner;
 import org.springframework.batch.item.ExecutionContext;
 public class ColumnRangePartitioner  implements Partitioner {
 private String column;
 private String table;
 public Map<String, ExecutionContext> partition(int gridSize) {
    int min =  queryForInt("SELECT MIN(" + column + ") from " + table);
    int max = queryForInt("SELECT MAX(" + column + ") from " + table);
    int targetSize = (max - min) / gridSize;
    System.out.println("Our partition size will be " + targetSize);
    System.out.println("We will have " + gridSize + " partitions");
    Map<String, ExecutionContext> result = new HashMap<String, ExecutionContext>();
    int number = 0;
    int start = min;
    int end = start + targetSize - 1;
    while (start <= max) {
        ExecutionContext value = new ExecutionContext();
        result.put("partition" + number, value);
        if (end >= max) {
            end = max;
        }
        value.putInt("minValue", start);
        value.putInt("maxValue", end);
        System.out.println("minValue = " + start);
        System.out.println("maxValue = " + end);
        start += targetSize;
        end += targetSize;
        number++;
    }
    System.out.println("We are returning " + result.size() + " partitions");
    return result;
}
public void setColumn(String column) {
    this.column = column;
}
public void setTable(String table) {
    this.table = table;
}
}

回答by Alex

Here's what I think is going on:

这是我认为正在发生的事情:

  • As you said, your ThreadPoolTaskExecutor is limited to 15 threads
  • The framework's "chunk" is causing each item in the JdbcCursorItemReader (up to the thread limit) to be executed in a different thread
  • But the Spring Batch framework is also waiting for each of the threads (i.e., all 15) to complete their individual read/process/write flow before moving onto the next chunk, given your commit interval of 1. On occasion, this is causing 14 threads to wait almost 60 seconds on a sibling thread that is taking forever to complete.
  • 正如您所说,您的 ThreadPoolTask​​Executor 限制为 15 个线程
  • 框架的“块”导致 JdbcCursorItemReader 中的每个项目(达到线程限制)在不同的线程中执行
  • 但是 Spring Batch 框架也在等待每个线程(即全部 15 个)在移动到下一个块之前完成它们各自的读/处理/写流,假设您的提交间隔为 1。有时,这会导致 14线程在需要永远完成的同级线程上等待近 60 秒。

In other words, for this multi-threaded approach in Spring Batch to be helpful, each thread needs to process in about the same amount of time. Given your scenario where there is a huge disparity between the processing time of certain items, you are experiencing a limitation where many of your threads are complete and waiting on a long-running sibling thread to be able to move onto the next chunk of processing.

换句话说,为了让 Spring Batch 中的这种多线程方法有用,每个线程需要在大约相同的时间内处理。考虑到某些项目的处理时间之间存在巨大差异的情况,您遇到了一个限制,其中许多线程已完成并等待长时间运行的同级线程能够移动到下一个处理块。

My suggestion:

我的建议:

  • Generally, I'd say that increasing your commit interval should help somewhat, since it should allow more than one cursor item to be processed in a single thread in between commits even if one of the threads is stuck on a long-running write. However, if you're unlucky, multiple long transactions could occur in the same thread and make matters worse (e.g., 120 sec. between commits in a single thread for a commit interval of 2).
  • Specifically, I'd suggest increasing your thread pool size to a big number, even exceeding your max database connections by 2x or 3x. What should happen is that even though some of your threads will block trying to acquire a connection (because of the large thread pool size), you'll actually see an increase in throughput as your long-running threads are no longer stopping other threads from taking new items from the cursor and continuing your batch job's work in the meantime (at the beginning of a chunk, your number of pending threads will greatly exceed your number of available database connections. So the OS scheduler will churn a bit as it activates threads that are blocked on acquiring a database connection and has to deactivate the thread. However, since most of your threads will complete their work and release their database connection relatively quickly, you should see that overall your throughput is improved as many threads continue acquiring database connections, doing work, releasing database connections, and allowing further threads to do the same even while your long-running threads are doing their thing).
  • 一般来说,我会说增加提交间隔应该会有所帮助,因为它应该允许在提交之间的单个线程中处理多个游标项,即使其中一个线程卡在长时间运行的写入上。但是,如果您不走运,多个长事务可能会发生在同一个线程中,并使情况变得更糟(例如,在一个提交间隔为 2 的情况下,单个线程中的提交间隔 120 秒)。
  • 具体来说,我建议将您的线程池大小增加到一个很大的数字,甚至超过最大数据库连接数的 2 倍或 3 倍。应该发生的情况是,即使您的某些线程在尝试获取连接时会阻塞(因为线程池大小很大),您实际上会看到吞吐量增加,因为长时间运行的线程不再阻止其他线程从从游标中取出新项目并同时继续批处理作业的工作(在块的开头,挂起线程的数量将大大超过可用数据库连接的数量。因此,操作系统调度程序在激活线程时会稍微搅动一下在获取数据库连接时被阻止并且必须停用线程。但是,

回答by Harsh Gupta

In my case, if i don't set the throttle-limit, then only 4 threads come in read() method of ItemReader which is also the default number of threads, if not specified in tasklet tag as per Spring Batch documentation.

在我的情况下,如果我不设置油门限制,那么只有 4 个线程进入 ItemReader 的 read() 方法,这也是默认线程数,如果没有按照 Spring Batch 文档在 tasklet 标签中指定。

If i specify more threads e.g 10 or 20 or 100, then only 8 threads come in read() method of ItemReader

如果我指定更多线程,例如 10 或 20 或 100,则只有 8 个线程进入 ItemReader 的 read() 方法

回答by Filoche

The limit of 8 active threads regardless of the value of throttle-limit might be caused by contention on Spring Batch Job repository. Every time a chunk is processed some info is written in job repository. Increase its pool size to accommodate the number of threads you need!

无论throttle-limit 的值如何,8 个活动线程的限制可能是由 Spring Batch Job 存储库上的争用引起的。每次处理块时,都会在作业存储库中写入一些信息。增加其池大小以容纳您需要的线程数!