Java 使用ForkJoinPool实现Fork和Join
" ForkJoinPool"已添加到Java 7中的Java中。" ForkJoinPool"类似于Java ExecutorService,但有一个区别。 ForkJoinPool使任务很容易将其工作分解为较小的任务,然后再将这些任务提交给ForkJoinPool。只要可以拆分任务,任务就可以继续将其工作拆分为较小的子任务。听起来可能有点抽象,所以在此fork and join教程中,我将解释ForkJoinPool的工作方式以及拆分任务的工作方式。
Fork和Join解释
在查看ForkJoinPool之前,我想先解释一下fork和join原理是如何工作的。
分叉和联接原理包括两个步骤,这些步骤以递归方式执行。这两个步骤是fork步骤和join步骤。
Fork
使用派生和联接原理的任务可以将自己派生(拆分)为较小的子任务,这些子任务可以同时执行。如下图所示:
通过将自身拆分为多个子任务,每个子任务可以由不同的CPU或者同一CPU上的不同线程并行执行。
如果给定任务的工作足够大,则任务只能将自身拆分为子任务。将任务拆分为子任务会产生开销,因此对于少量工作,此开销可能会大于同时执行子任务所实现的加速。
将任务分叉到子任务中的合理时限也称为阈值。由每个任务决定一个合理的阈值。这在很大程度上取决于所完成的工作。
Join
当任务将自身拆分为子任务时,任务将等待直到子任务完成执行。
一旦子任务完成执行,该任务可以将所有结果合并(合并)为一个结果。如下图所示:
当然,并非所有类型的任务都可能返回结果。如果任务没有返回结果,则任务仅等待其子任务完成。这样就不会合并结果。
ForkJoinPool
ForkJoinPool是一个特殊的线程池,旨在与fork-and-join任务拆分一起很好地工作。 ForkJoinPool位于java.util.concurrent包中,因此完整的类名是java.util.concurrent.ForkJoinPool。
创建一个ForkJoinPool
我们使用其构造函数创建一个" ForkJoinPool"。作为ForkJoinPool构造函数的参数,我们可以传递所需的指示级别的并行性。并行级别指示要在传递给ForkJoinPool的任务上同时处理多少个线程或者CPU。这是一个" ForkJoinPool"创建示例:
ForkJoinPool forkJoinPool = new ForkJoinPool(4);
本示例创建一个并行级别为4的ForkJoinPool。
提交任务到ForkJoinPool
我们将任务提交到ForkJoinPool类似于将任务提交到ExecutorService。我们可以提交两种类型的任务。一个不返回任何结果的任务(一个"动作"),一个不返回结果的任务(一个"任务")。这两种任务由RecursiveAction和RecursiveTask类表示。以下各节将介绍如何使用这两项任务以及如何提交它们。
递归动作
RecursiveAction是不返回任何值的任务。它只是做一些工作,例如将数据写入磁盘,然后退出。
RecursiveAction可能仍需要将其工作分解为较小的块,这些块可以由独立的线程或者CPU执行。
我们可以通过将其子类化来实现" RecursiveAction"。这是一个" RecursiveAction"示例:
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.RecursiveAction;
public class MyRecursiveAction extends RecursiveAction {
private long workLoad = 0;
public MyRecursiveAction(long workLoad) {
this.workLoad = workLoad;
}
@Override
protected void compute() {
//if work is above threshold, break tasks up into smaller tasks
if(this.workLoad > 16) {
System.out.println("Splitting workLoad : " + this.workLoad);
List<MyRecursiveAction> subtasks =
new ArrayList<MyRecursiveAction>();
subtasks.addAll(createSubtasks());
for(RecursiveAction subtask : subtasks){
subtask.fork();
}
} else {
System.out.println("Doing workLoad myself: " + this.workLoad);
}
}
private List<MyRecursiveAction> createSubtasks() {
List<MyRecursiveAction> subtasks =
new ArrayList<MyRecursiveAction>();
MyRecursiveAction subtask1 = new MyRecursiveAction(this.workLoad / 2);
MyRecursiveAction subtask2 = new MyRecursiveAction(this.workLoad / 2);
subtasks.add(subtask1);
subtasks.add(subtask2);
return subtasks;
}
}
这个例子非常简化。 MyRecursiveAction只是将虚构的workLoad作为其构造函数的参数。如果workLoad高于某个阈值,则将工作拆分为多个子任务,这些子任务也计划执行(通过子任务的.fork()方法。如果workLoad低于某个阈值,则该工作由MyRecursiveAction本身执行。
我们可以像这样安排MyRecursiveAction来执行:
MyRecursiveAction myRecursiveAction = new MyRecursiveAction(24); forkJoinPool.invoke(myRecursiveAction);
递归任务
RecursiveTask是一个返回结果的任务。它可以将工作分解为较小的任务,然后将这些较小的任务的结果合并为一个集合结果。拆分和合并可以在多个级别上进行。这是一个" RecursiveTask"示例:
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.RecursiveTask;
public class MyRecursiveTask extends RecursiveTask<Long> {
private long workLoad = 0;
public MyRecursiveTask(long workLoad) {
this.workLoad = workLoad;
}
protected Long compute() {
//if work is above threshold, break tasks up into smaller tasks
if(this.workLoad > 16) {
System.out.println("Splitting workLoad : " + this.workLoad);
List<MyRecursiveTask> subtasks =
new ArrayList<MyRecursiveTask>();
subtasks.addAll(createSubtasks());
for(MyRecursiveTask subtask : subtasks){
subtask.fork();
}
long result = 0;
for(MyRecursiveTask subtask : subtasks) {
result += subtask.join();
}
return result;
} else {
System.out.println("Doing workLoad myself: " + this.workLoad);
return workLoad * 3;
}
}
private List<MyRecursiveTask> createSubtasks() {
List<MyRecursiveTask> subtasks =
new ArrayList<MyRecursiveTask>();
MyRecursiveTask subtask1 = new MyRecursiveTask(this.workLoad / 2);
MyRecursiveTask subtask2 = new MyRecursiveTask(this.workLoad / 2);
subtasks.add(subtask1);
subtasks.add(subtask2);
return subtasks;
}
}
此示例与RecursiveAction示例相似,不同之处在于它返回结果。 MyRecursiveTask类扩展了RecursiveTask <Long>,这意味着任务返回的结果是Long。
MyRecursiveTask示例还将工作分解为子任务,并使用它们的fork()方法安排这些子任务的执行。
另外,此示例然后通过调用每个子任务的join()方法接收每个子任务返回的结果。子任务结果将合并为更大的结果,然后将其返回。子任务结果的这种合并/合并可能在递归的多个级别上递归发生。
我们可以像这样安排RecursiveTask:
MyRecursiveTask myRecursiveTask = new MyRecursiveTask(128);
long mergedResult = forkJoinPool.invoke(myRecursiveTask);
System.out.println("mergedResult = " + mergedResult);
注意如何从ForkJoinPool.invoke()方法调用中获得最终结果。

