什么决定了 Java ForkJoinPool 创建的线程数?
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/10797568/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
What determines the number of threads a Java ForkJoinPool creates?
提问by Holger Peine
As far as I had understood ForkJoinPool
, that pool creates a fixed number of threads (default: number of cores) and will never create more threads (unless the application indicates a need for those by using managedBlock
).
据我所知ForkJoinPool
,该池创建固定数量的线程(默认值:核心数)并且永远不会创建更多线程(除非应用程序通过使用表明需要这些线程managedBlock
)。
However, using ForkJoinPool.getPoolSize()
I discovered that in a program that creates 30,000 tasks (RecursiveAction
), the ForkJoinPool
executing those tasks uses 700 threads on average (threads counted each time a task is created). The tasks don't do I/O, but pure computation; the only inter-task synchronization is calling ForkJoinTask.join()
and accessing AtomicBoolean
s, i.e. there are no thread-blocking operations.
但是,使用ForkJoinPool.getPoolSize()
我发现在创建 30,000 个任务 ( RecursiveAction
)的程序中,ForkJoinPool
执行这些任务平均使用 700 个线程(每次创建任务时计算线程数)。这些任务不做 I/O,而是纯计算;唯一的任务间同步是调用ForkJoinTask.join()
和访问AtomicBoolean
s,即没有线程阻塞操作。
Since join()
does not block the calling thread as I understand it, there is no reason why any thread in the pool should ever block, and so (I had assumed) there should be no reason to create any further threads (which is obviously happening nevertheless).
由于join()
我理解它不会阻塞调用线程,因此池中的任何线程都没有理由阻塞,因此(我假设)不应该创建任何其他线程(尽管这显然正在发生) .
So, why does ForkJoinPool
create so many threads? What factors determine the number of threads created?
那么,为什么要ForkJoinPool
创建这么多线程呢?哪些因素决定了创建的线程数?
I had hoped that this question could be answered without posting code, but here it comes upon request. This code is an excerpt from a program of four times the size, reduced to the essential parts; it does not compile as it is. If desired, I can of course post the full program, too.
我曾希望可以在不发布代码的情况下回答这个问题,但这里是应要求提供的。这段代码是从一个四倍大小的程序中摘录的,缩减为基本部分;它不会按原样编译。如果需要,我当然也可以发布完整的程序。
The program searches a maze for a path from a given start point to a given end point using depth-first search. A solution is guaranteed to exist. The main logic is in the compute()
method of SolverTask
: A RecursiveAction
that starts at some given point and continues with all neighbor points reachable from the current point. Rather than creating a new SolverTask
at each branching point (which would create far too many tasks), it pushes all neighbors except one onto a backtracking stack to be processed later and continues with only the one neighbor not pushed to the stack. Once it reaches a dead end that way, the point most recently pushed to the backtracking stack is popped, and the search continues from there (cutting back the path built from the taks's starting point accordingly). A new task is created once a task finds its backtracking stack larger than a certain threshold; from that time, the task, while continuing to pop from its backtracking stack until that is exhausted, will not push any further points to its stack when reaching a branching point, but create a new task for each such point. Thus, the size of the tasks can be adjusted using the stack limit threshold.
该程序使用深度优先搜索在迷宫中搜索从给定起点到给定终点的路径。保证存在解决方案。主要逻辑是在以下compute()
方法中SolverTask
: ARecursiveAction
从某个给定点开始,并继续从当前点可到达的所有相邻点。而不是创造一个新的SolverTask
在每个分支点(这会创建太多任务),它将除一个之外的所有邻居推入回溯堆栈以供稍后处理,并继续仅将一个邻居未推入堆栈。一旦它以这种方式到达死胡同,最近推入回溯堆栈的点就会弹出,并从那里继续搜索(相应地削减从 taks 的起点构建的路径)。一旦任务发现其回溯堆栈大于某个阈值,就会创建一个新任务;从那时起,任务在继续从其回溯堆栈中弹出直到耗尽时,在到达分支点时不会将任何进一步的点推入其堆栈,而是为每个这样的点创建一个新任务。因此,可以使用堆栈限制阈值来调整任务的大小。
The numbers I quoted above ("30,000 tasks, 700 threads on average") are from searching a maze of 5000x5000 cells. So, here is the essential code:
我上面引用的数字(“平均 30,000 个任务,700 个线程”)来自搜索 5000x5000 单元的迷宫。所以,这里是基本的代码:
class SolverTask extends RecursiveTask<ArrayDeque<Point>> {
// Once the backtrack stack has reached this size, the current task
// will never add another cell to it, but create a new task for each
// newly discovered branch:
private static final int MAX_BACKTRACK_CELLS = 100*1000;
/**
* @return Tries to compute a path through the maze from local start to end
* and returns that (or null if no such path found)
*/
@Override
public ArrayDeque<Point> compute() {
// Is this task still accepting new branches for processing on its own,
// or will it create new tasks to handle those?
boolean stillAcceptingNewBranches = true;
Point current = localStart;
ArrayDeque<Point> pathFromLocalStart = new ArrayDeque<Point>(); // Path from localStart to (including) current
ArrayDeque<PointAndDirection> backtrackStack = new ArrayDeque<PointAndDirection>();
// Used as a stack: Branches not yet taken; solver will backtrack to these branching points later
Direction[] allDirections = Direction.values();
while (!current.equals(end)) {
pathFromLocalStart.addLast(current);
// Collect current's unvisited neighbors in random order:
ArrayDeque<PointAndDirection> neighborsToVisit = new ArrayDeque<PointAndDirection>(allDirections.length);
for (Direction directionToNeighbor: allDirections) {
Point neighbor = current.getNeighbor(directionToNeighbor);
// contains() and hasPassage() are read-only methods and thus need no synchronization
if (maze.contains(neighbor) && maze.hasPassage(current, neighbor) && maze.visit(neighbor))
neighborsToVisit.add(new PointAndDirection(neighbor, directionToNeighbor.opposite));
}
// Process unvisited neighbors
if (neighborsToVisit.size() == 1) {
// Current node is no branch: Continue with that neighbor
current = neighborsToVisit.getFirst().getPoint();
continue;
}
if (neighborsToVisit.size() >= 2) {
// Current node is a branch
if (stillAcceptingNewBranches) {
current = neighborsToVisit.removeLast().getPoint();
// Push all neighbors except one on the backtrack stack for later processing
for(PointAndDirection neighborAndDirection: neighborsToVisit)
backtrackStack.push(neighborAndDirection);
if (backtrackStack.size() > MAX_BACKTRACK_CELLS)
stillAcceptingNewBranches = false;
// Continue with the one neighbor that was not pushed onto the backtrack stack
continue;
} else {
// Current node is a branch point, but this task does not accept new branches any more:
// Create new task for each neighbor to visit and wait for the end of those tasks
SolverTask[] subTasks = new SolverTask[neighborsToVisit.size()];
int t = 0;
for(PointAndDirection neighborAndDirection: neighborsToVisit) {
SolverTask task = new SolverTask(neighborAndDirection.getPoint(), end, maze);
task.fork();
subTasks[t++] = task;
}
for (SolverTask task: subTasks) {
ArrayDeque<Point> subTaskResult = null;
try {
subTaskResult = task.join();
} catch (CancellationException e) {
// Nothing to do here: Another task has found the solution and cancelled all other tasks
}
catch (Exception e) {
e.printStackTrace();
}
if (subTaskResult != null) { // subtask found solution
pathFromLocalStart.addAll(subTaskResult);
// No need to wait for the other subtasks once a solution has been found
return pathFromLocalStart;
}
} // for subTasks
} // else (not accepting any more branches)
} // if (current node is a branch)
// Current node is dead end or all its neighbors lead to dead ends:
// Continue with a node from the backtracking stack, if any is left:
if (backtrackStack.isEmpty()) {
return null; // No more backtracking avaible: No solution exists => end of this task
}
// Backtrack: Continue with cell saved at latest branching point:
PointAndDirection pd = backtrackStack.pop();
current = pd.getPoint();
Point branchingPoint = current.getNeighbor(pd.getDirectionToBranchingPoint());
// DEBUG System.out.println("Backtracking to " + branchingPoint);
// Remove the dead end from the top of pathSoFar, i.e. all cells after branchingPoint:
while (!pathFromLocalStart.peekLast().equals(branchingPoint)) {
// DEBUG System.out.println(" Going back before " + pathSoFar.peekLast());
pathFromLocalStart.removeLast();
}
// continue while loop with newly popped current
} // while (current ...
if (!current.equals(end)) {
// this task was interrupted by another one that already found the solution
// and should end now therefore:
return null;
} else {
// Found the solution path:
pathFromLocalStart.addLast(current);
return pathFromLocalStart;
}
} // compute()
} // class SolverTask
@SuppressWarnings("serial")
public class ParallelMaze {
// for each cell in the maze: Has the solver visited it yet?
private final AtomicBoolean[][] visited;
/**
* Atomically marks this point as visited unless visited before
* @return whether the point was visited for the first time, i.e. whether it could be marked
*/
boolean visit(Point p) {
return visited[p.getX()][p.getY()].compareAndSet(false, true);
}
public static void main(String[] args) {
ForkJoinPool pool = new ForkJoinPool();
ParallelMaze maze = new ParallelMaze(width, height, new Point(width-1, 0), new Point(0, height-1));
// Start initial task
long startTime = System.currentTimeMillis();
// since SolverTask.compute() expects its starting point already visited,
// must do that explicitly for the global starting point:
maze.visit(maze.start);
maze.solution = pool.invoke(new SolverTask(maze.start, maze.end, maze));
// One solution is enough: Stop all tasks that are still running
pool.shutdownNow();
pool.awaitTermination(Integer.MAX_VALUE, TimeUnit.DAYS);
long endTime = System.currentTimeMillis();
System.out.println("Computed solution of length " + maze.solution.size() + " to maze of size " +
width + "x" + height + " in " + ((float)(endTime - startTime))/1000 + "s.");
}
回答by elusive-code
There're related questions on stackoverflow:
有关于stackoverflow的相关问题:
ForkJoinPool stalls during invokeAll/join
在 invokeAll/join 期间 ForkJoinPool 停顿
ForkJoinPool seems to waste a thread
I made a runnable stripped down version of what is happening (jvm arguments i used: -Xms256m -Xmx1024m -Xss8m):
我制作了一个可运行的精简版正在发生的事情(我使用的 jvm 参数:-Xms256m -Xmx1024m -Xss8m):
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;
import java.util.concurrent.RecursiveTask;
import java.util.concurrent.TimeUnit;
public class Test1 {
private static ForkJoinPool pool = new ForkJoinPool(2);
private static class SomeAction extends RecursiveAction {
private int counter; //recursive counter
private int childrenCount=80;//amount of children to spawn
private int idx; // just for displaying
private SomeAction(int counter, int idx) {
this.counter = counter;
this.idx = idx;
}
@Override
protected void compute() {
System.out.println(
"counter=" + counter + "." + idx +
" activeThreads=" + pool.getActiveThreadCount() +
" runningThreads=" + pool.getRunningThreadCount() +
" poolSize=" + pool.getPoolSize() +
" queuedTasks=" + pool.getQueuedTaskCount() +
" queuedSubmissions=" + pool.getQueuedSubmissionCount() +
" parallelism=" + pool.getParallelism() +
" stealCount=" + pool.getStealCount());
if (counter <= 0) return;
List<SomeAction> list = new ArrayList<>(childrenCount);
for (int i=0;i<childrenCount;i++){
SomeAction next = new SomeAction(counter-1,i);
list.add(next);
next.fork();
}
for (SomeAction action:list){
action.join();
}
}
}
public static void main(String[] args) throws Exception{
pool.invoke(new SomeAction(2,0));
}
}
Apparently when you perform a join, current thread sees that required task is not yet completed and takes another task for himself to do.
显然,当您执行连接时,当前线程看到所需的任务尚未完成,并为自己执行另一个任务。
It happens in java.util.concurrent.ForkJoinWorkerThread#joinTask
.
它发生在java.util.concurrent.ForkJoinWorkerThread#joinTask
.
However this new task spawns more of the same tasks, but they can not find threads in the pool, because threads are locked in join. And since it has no way to know how much time it will require for them to be released (thread could be in infinite loop or deadlocked forever), new thread(s) is(are) spawned (Compensating for joined threads as Louis Wassermanmentioned): java.util.concurrent.ForkJoinPool#signalWork
然而,这个新任务产生了更多相同的任务,但它们在池中找不到线程,因为线程被锁定在连接中。并且由于它无法知道释放它们需要多长时间(线程可能处于无限循环或永远死锁),因此产生了新线程(补偿加入的线程,正如Louis Wasserman提到的那样) ):java.util.concurrent.ForkJoinPool#signalWork
So to prevent such scenario you need to avoid recursive spawning of tasks.
因此,为了防止这种情况,您需要避免递归生成任务。
For example if in above code you set initial parameter to 1, active thread amount will be 2, even if you increase childrenCount tenfold.
例如,如果在上面的代码中将初始参数设置为 1,则活动线程数量将为 2,即使将 childrenCount 增加十倍。
Also note that, while amount of active threads increases, amount of running threads is less or equal to parallelism.
另请注意,虽然活动线程数量增加,但运行线程数量小于或等于parallelism。
回答by Louis Wasserman
From the source comments:
来自源评论:
Compensating: Unless there are already enough live threads, method tryPreBlock() may create or re-activate a spare thread to compensate for blocked joiners until they unblock.
补偿:除非已经有足够的活动线程,否则方法 tryPreBlock() 可能会创建或重新激活备用线程以补偿阻塞的加入者,直到它们解除阻塞。
I think what's happening is that you're not finishing any of the tasks very quickly, and since there aren't available worker threads when you submit a new task, a new thread gets created.
我认为正在发生的事情是您没有很快完成任何任务,并且由于在您提交新任务时没有可用的工作线程,因此会创建一个新线程。
回答by edharned
strict, full-strict, and terminally-strict have to do with processing a directed acyclic graph (DAG). You can google those terms to get a full understanding of them. That is the type of processing the framework was designed to process. Look at the code in the API for Recursive..., the framework relies on your compute() code to do other compute() links and then do a join(). Each Task does a single join() just like processing a DAG.
strict、full-strict 和 terminally-strict 与处理有向无环图 (DAG) 有关。您可以通过谷歌搜索这些术语以全面了解它们。这就是框架设计用于处理的处理类型。查看API for Recursive...中的代码,该框架依赖于您的compute() 代码来执行其他compute() 链接,然后执行join()。每个 Task 执行单个 join() 就像处理 DAG 一样。
You are not doing DAG processing. You are forking many new Tasks and waiting (join()) on each. Have a read in the source code. It's horrendously complex but you may be able to figure it out. The framework does not do proper Task Management. Where is it going to put the waiting Task when it does a join()? There is no suspended queue, that would require a monitor thread to constantly look at the queue to see what is finished. This is why the framework uses "continuation threads". When one task does join() the framework is assuming it is waiting for a single lower Task to finish. When many join() methods are present the thread cannot continue so a helper or continuation thread needs to exist.
您没有进行 DAG 处理。您正在分叉许多新任务并在每个任务上等待 (join())。阅读源代码。它非常复杂,但您可能能够弄清楚。该框架没有进行适当的任务管理。当它执行 join() 时,它将把等待的 Task 放在哪里?没有挂起的队列,这需要监视器线程不断查看队列以查看完成的内容。这就是框架使用“延续线程”的原因。当一个任务执行 join() 时,框架假设它正在等待一个较低的任务完成。当存在许多 join() 方法时,线程无法继续,因此需要存在帮助程序或继续线程。
As noted above, you need a scatter-gather type fork-join process. There you can fork as many Tasks
如上所述,您需要一个分散-聚集类型的分叉-连接过程。在那里你可以分叉尽可能多的任务
回答by Artёm Basov
The both code snippets posted by Holger Peineand elusive-codedoesn't actually follow recommended practice which appeared in javadoc for 1.8 version:
Holger Peine和elusive-code发布的两个代码片段实际上并未遵循出现在1.8 版本的 javadoc 中的推荐做法:
In the most typical usages, a fork-join pair act like a call (fork) and return (join) from a parallel recursive function. As is the case with other forms of recursive calls, returns (joins) should be performed innermost-first. For example, a.fork(); b.fork(); b.join(); a.join();is likely to be substantially more efficient than joining code abefore code b.
在最典型的用法中,fork-join 对的行为类似于并行递归函数的调用(fork)和返回(join)。与其他形式的递归调用的情况一样,返回(连接)应该在最内层执行。例如, a.fork(); b.fork(); b.加入(); a.join(); 可能比在代码b之前加入代码a更有效。
In both cases FJPool was instantiated via default constructor. This leads to construction of the pool with asyncMode=false, which is default:
在这两种情况下,FJpool 都是通过默认构造函数实例化的。这导致使用asyncMode=false构建池,这是默认值:
@param asyncMode if true,
establishes local first-in-first-out scheduling mode for forked tasks that are never joined. This mode may be more appropriate than default locally stack-based mode in applications in which worker threads only process event-style asynchronous tasks. For default value, use false.
@param asyncMode 如果为 true,
则为从未加入的分叉任务建立本地先进先出调度模式。在工作线程仅处理事件式异步任务的应用程序中,此模式可能比默认的基于本地堆栈的模式更合适。对于默认值,请使用 false。
that way working queue is actually lifo:
head -> | t4 | t3 | t2 | t1 | ... | <- tail
这样工作队列实际上是 lifo:
head -> | t4 | t3 | t2 | t1 | ... | <- 尾巴
So in snippets they fork()all task pushing them on stack and than join()in same order, that is from deepest task (t1) to topmost (t4) effectively blocking until some other thread will steal (t1), then (t2) and so on. Since there is enouth tasks to block all pool threads (task_count >> pool.getParallelism()) compensation kicks in as Louis Wassermandescribed.
因此,在片段中,它们fork()所有任务都将它们压入堆栈,然后 以相同的顺序加入 (),即从最深的任务 (t1) 到最顶部的 (t4) 有效地阻塞,直到某个其他线程将窃取 (t1),然后 (t2 ) 等等。由于有足够多的任务来阻塞所有池线程 (task_count >> pool.getParallelism()) 补偿开始,如Louis Wasserman所述。
回答by Ivan Beziazychnyi
It is worth noting that the output of the code posted by elusive-codedepends on the version of java. Running the code in the java 8 I see the output:
值得注意的是,elusive-code发布的代码的输出取决于 java 的版本。在 java 8 中运行代码我看到输出:
...
counter=0.73 activeThreads=45 runningThreads=5 poolSize=49 queuedTasks=105 queuedSubmissions=0 parallelism=2 stealCount=3056
counter=0.75 activeThreads=46 runningThreads=1 poolSize=51 queuedTasks=0 queuedSubmissions=0 parallelism=2 stealCount=3158
counter=0.77 activeThreads=47 runningThreads=3 poolSize=51 queuedTasks=0 queuedSubmissions=0 parallelism=2 stealCount=3157
counter=0.74 activeThreads=45 runningThreads=3 poolSize=51 queuedTasks=5 queuedSubmissions=0 parallelism=2 stealCount=3153
But running the same code in the java 11 the output is different:
但是在 java 11 中运行相同的代码输出是不同的:
...
counter=0.75 activeThreads=1 runningThreads=1 poolSize=2 queuedTasks=4 queuedSubmissions=0 parallelism=2 stealCount=0
counter=0.76 activeThreads=1 runningThreads=1 poolSize=2 queuedTasks=3 queuedSubmissions=0 parallelism=2 stealCount=0
counter=0.77 activeThreads=1 runningThreads=1 poolSize=2 queuedTasks=2 queuedSubmissions=0 parallelism=2 stealCount=0
counter=0.78 activeThreads=1 runningThreads=1 poolSize=2 queuedTasks=1 queuedSubmissions=0 parallelism=2 stealCount=0
counter=0.79 activeThreads=1 runningThreads=1 poolSize=2 queuedTasks=0 queuedSubmissions=0 parallelism=2 stealCount=0