Java 8 并行流中的自定义线程池
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/21163108/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Custom thread pool in Java 8 parallel stream
提问by Lukas
Is it possible to specify a custom thread pool for Java 8 parallel stream? I can not find it anywhere.
是否可以为 Java 8并行流指定自定义线程池?我在任何地方都找不到它。
Imagine that I have a server application and I would like to use parallel streams. But the application is large and multi-threaded so I want to compartmentalize it. I do not want a slow running task in one module of the applicationblock tasks from another module.
想象一下,我有一个服务器应用程序,我想使用并行流。但是该应用程序很大并且是多线程的,所以我想对它进行划分。我不希望在另一个模块的 applicationblock 任务的一个模块中运行缓慢的任务。
If I can not use different thread pools for different modules, it means I can not safely use parallel streams in most of real world situations.
如果我不能为不同的模块使用不同的线程池,这意味着我不能在现实世界的大多数情况下安全地使用并行流。
Try the following example. There are some CPU intensive tasks executed in separate threads. The tasks leverage parallel streams. The first task is broken, so each step takes 1 second (simulated by thread sleep). The issue is that other threads get stuck and wait for the broken task to finish. This is contrived example, but imagine a servlet app and someone submitting a long running task to the shared fork join pool.
试试下面的例子。有一些 CPU 密集型任务在单独的线程中执行。这些任务利用并行流。第一个任务被破坏,所以每一步需要 1 秒(由线程睡眠模拟)。问题是其他线程卡住并等待中断的任务完成。这是一个人为的例子,但想象一个 servlet 应用程序和某人将一个长时间运行的任务提交到共享 fork 加入池。
public class ParallelTest {
public static void main(String[] args) throws InterruptedException {
ExecutorService es = Executors.newCachedThreadPool();
es.execute(() -> runTask(1000)); //incorrect task
es.execute(() -> runTask(0));
es.execute(() -> runTask(0));
es.execute(() -> runTask(0));
es.execute(() -> runTask(0));
es.execute(() -> runTask(0));
es.shutdown();
es.awaitTermination(60, TimeUnit.SECONDS);
}
private static void runTask(int delay) {
range(1, 1_000_000).parallel().filter(ParallelTest::isPrime).peek(i -> Utils.sleep(delay)).max()
.ifPresent(max -> System.out.println(Thread.currentThread() + " " + max));
}
public static boolean isPrime(long n) {
return n > 1 && rangeClosed(2, (long) sqrt(n)).noneMatch(divisor -> n % divisor == 0);
}
}
采纳答案by Lukas
There actually is a trick how to execute a parallel operation in a specific fork-join pool. If you execute it as a task in a fork-join pool, it stays there and does not use the common one.
实际上有一个技巧如何在特定的 fork-join 池中执行并行操作。如果您将它作为 fork-join 池中的任务执行,它会留在那里并且不使用公共的。
final int parallelism = 4;
ForkJoinPool forkJoinPool = null;
try {
forkJoinPool = new ForkJoinPool(parallelism);
final List<Integer> primes = forkJoinPool.submit(() ->
// Parallel task here, for example
IntStream.range(1, 1_000_000).parallel()
.filter(PrimesPrint::isPrime)
.boxed().collect(Collectors.toList())
).get();
System.out.println(primes);
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
} finally {
if (forkJoinPool != null) {
forkJoinPool.shutdown();
}
}
The trick is based on ForkJoinTask.forkwhich specifies: "Arranges to asynchronously execute this task in the pool the current task is running in, if applicable, or using the ForkJoinPool.commonPool() if not inForkJoinPool()"
该技巧基于ForkJoinTask.fork,它指定:“安排在当前任务正在运行的池中异步执行此任务,如果适用,或者使用 ForkJoinPool.commonPool() 如果不是 inForkJoinPool()”
回答by assylias
The parallel streams use the default ForkJoinPool.commonPool
which by default has one less threads as you have processors, as returned by Runtime.getRuntime().availableProcessors()
(This means that parallel streams use all your processors because they also use the main thread):
并行流使用默认值ForkJoinPool.commonPool
,默认情况下,当您有处理器时,少一个线程,如返回的Runtime.getRuntime().availableProcessors()
(这意味着并行流使用您的所有处理器,因为它们也使用主线程):
For applications that require separate or custom pools, a ForkJoinPool may be constructed with a given target parallelism level; by default, equal to the number of available processors.
对于需要单独或自定义池的应用程序,可以使用给定的目标并行级别构建 ForkJoinPool;默认情况下,等于可用处理器的数量。
This also means if you have nested parallel streams or multiple parallel streams started concurrently, they will all sharethe same pool. Advantage: you will never use more than the default (number of available processors). Disadvantage: you may not get "all the processors" assigned to each parallel stream you initiate (if you happen to have more than one). (Apparently you can use a ManagedBlockerto circumvent that.)
这也意味着如果您有嵌套的并行流或多个并行流同时启动,它们将共享同一个池。优点:您永远不会使用超过默认值(可用处理器的数量)。缺点:您可能无法将“所有处理器”分配给您启动的每个并行流(如果您碰巧有多个)。(显然你可以使用ManagedBlocker来规避它。)
To change the way parallel streams are executed, you can either
要更改并行流的执行方式,您可以
- submit the parallel stream execution to your own ForkJoinPool:
yourFJP.submit(() -> stream.parallel().forEach(soSomething)).get();
or - you can change the size of the common pool using system properties:
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20")
for a target parallelism of 20 threads. However, this no longer works after the backported patch https://bugs.openjdk.java.net/browse/JDK-8190974.
- 将并行流执行提交到您自己的 ForkJoinPool:
yourFJP.submit(() -> stream.parallel().forEach(soSomething)).get();
或 - 您可以使用系统属性更改公共池的大小:
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20")
对于 20 个线程的目标并行度。但是,在反向移植补丁https://bugs.openjdk.java.net/browse/JDK-8190974之后,这不再起作用。
Example of the latter on my machine which has 8 processors. If I run the following program:
后者在我有 8 个处理器的机器上的示例。如果我运行以下程序:
long start = System.currentTimeMillis();
IntStream s = IntStream.range(0, 20);
//System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20");
s.parallel().forEach(i -> {
try { Thread.sleep(100); } catch (Exception ignore) {}
System.out.print((System.currentTimeMillis() - start) + " ");
});
The output is:
输出是:
215 216 216 216 216 216 216 216 315 316 316 316 316 316 316 316 415 416 416 416
215 216 216 216 216 216 216 216 315 316 316 316 316 316 316 316 415 416 416 416
So you can see that the parallel stream processes 8 items at a time, i.e. it uses 8 threads. However, if I uncomment the commented line, the output is:
所以你可以看到并行流一次处理 8 个项目,即它使用 8 个线程。但是,如果我取消注释注释行,则输出为:
215 215 215 215 215 216 216 216 216 216 216 216 216 216 216 216 216 216 216 216
215 215 215 215 215 216 216 216 216 216 216 216 216 216 216 216 216 216 216 216
This time, the parallel stream has used 20 threads and all 20 elements in the stream have been processed concurrently.
这一次,并行流使用了 20 个线程,并且流中的所有 20 个元素都已被并发处理。
回答by Mario Fusco
Alternatively to the trick of triggering the parallel computation inside your own forkJoinPool you can also pass that pool to the CompletableFuture.supplyAsync method like in:
除了在您自己的 forkJoinPool 中触发并行计算的技巧之外,您还可以将该池传递给 CompletableFuture.supplyAsync 方法,如下所示:
ForkJoinPool forkJoinPool = new ForkJoinPool(2);
CompletableFuture<List<Integer>> primes = CompletableFuture.supplyAsync(() ->
//parallel task here, for example
range(1, 1_000_000).parallel().filter(PrimesPrint::isPrime).collect(toList()),
forkJoinPool
);
回答by charlie
To measure the actual number of used threads, you can check Thread.activeCount()
:
要测量实际使用的线程数,您可以检查Thread.activeCount()
:
Runnable r = () -> IntStream
.range(-42, +42)
.parallel()
.map(i -> Thread.activeCount())
.max()
.ifPresent(System.out::println);
ForkJoinPool.commonPool().submit(r).join();
new ForkJoinPool(42).submit(r).join();
This can produce on a 4-core CPU an output like:
这可以在 4 核 CPU 上产生如下输出:
5 // common pool
23 // custom pool
Without .parallel()
it gives:
没有.parallel()
它会给出:
3 // common pool
4 // custom pool
回答by Stefan Ferstl
Until now, I used the solutions described in the answers of this question. Now, I came up with a little library called Parallel Stream Supportfor that:
到目前为止,我使用了这个问题的答案中描述的解决方案。现在,我想出了一个名为Parallel Stream Support的小库:
ForkJoinPool pool = new ForkJoinPool(NR_OF_THREADS);
ParallelIntStreamSupport.range(1, 1_000_000, pool)
.filter(PrimesPrint::isPrime)
.collect(toList())
But as @PabloMatiasGomez pointed out in the comments, there are drawbacks regarding the splitting mechanism of parallel streams which depends heavily on the size of the common pool. See Parallel stream from a HashSet doesn't run in parallel.
但正如@PabloMatiasGomez 在评论中指出的那样,并行流的拆分机制存在缺陷,这在很大程度上取决于公共池的大小。请参阅来自 HashSet 的并行流不并行运行。
I am using this solution only to have separate pools for different types of work but I can not set the size of the common pool to 1 even if I don't use it.
我使用此解决方案只是为了为不同类型的工作设置单独的池,但即使我不使用它,我也无法将公共池的大小设置为 1。
回答by Tod Casasent
The original solution (setting the ForkJoinPool common parallelism property) no longer works. Looking at the links in the original answer, an update which breaks this has been back ported to Java 8. As mentioned in the linked threads, this solution was not guaranteed to work forever. Based on that, the solution is the forkjoinpool.submit with .get solution discussed in the accepted answer. I think the backport fixes the unreliability of this solution also.
原始解决方案(设置 ForkJoinPool 公共并行性属性)不再有效。查看原始答案中的链接,打破这一点的更新已重新移植到 Java 8。如链接线程中所述,此解决方案不能保证永远有效。基于此,解决方案是在接受的答案中讨论的带有 .get 解决方案的 forkjoinpool.submit 。我认为 backport 也解决了这个解决方案的不可靠性。
ForkJoinPool fjpool = new ForkJoinPool(10);
System.out.println("stream.parallel");
IntStream range = IntStream.range(0, 20);
fjpool.submit(() -> range.parallel()
.forEach((int theInt) ->
{
try { Thread.sleep(100); } catch (Exception ignore) {}
System.out.println(Thread.currentThread().getName() + " -- " + theInt);
})).get();
System.out.println("list.parallelStream");
int [] array = IntStream.range(0, 20).toArray();
List<Integer> list = new ArrayList<>();
for (int theInt: array)
{
list.add(theInt);
}
fjpool.submit(() -> list.parallelStream()
.forEach((theInt) ->
{
try { Thread.sleep(100); } catch (Exception ignore) {}
System.out.println(Thread.currentThread().getName() + " -- " + theInt);
})).get();
回答by user_3380739
Go to get AbacusUtil. Thread number can by specified for parallel stream. Here is the sample code:
去获取AbacusUtil。可以为并行流指定线程数。这是示例代码:
LongStream.range(4, 1_000_000).parallel(threadNum)...
Disclosure: I'm the developer of AbacusUtil.
披露:我是AbacusUtil的开发者。
回答by John McClean
If you don't mind using a third-party library, with cyclops-reactyou can mix sequential and parallel Streams within the same pipeline and provide custom ForkJoinPools. For example
如果您不介意使用第三方库,通过cyclops-react,您可以在同一管道中混合顺序和并行流,并提供自定义 ForkJoinPools。例如
ReactiveSeq.range(1, 1_000_000)
.foldParallel(new ForkJoinPool(10),
s->s.filter(i->true)
.peek(i->System.out.println("Thread " + Thread.currentThread().getId()))
.max(Comparator.naturalOrder()));
Or if we wished to continue processing within a sequential Stream
或者,如果我们希望在顺序流中继续处理
ReactiveSeq.range(1, 1_000_000)
.parallel(new ForkJoinPool(10),
s->s.filter(i->true)
.peek(i->System.out.println("Thread " + Thread.currentThread().getId())))
.map(this::processSequentially)
.forEach(System.out::println);
[Disclosure I am the lead developer of cyclops-react]
[披露我是独眼巨人反应的首席开发人员]
回答by Hearen
I tried the customForkJoinPool as follows to adjust the pool size:
我尝试了自定义ForkJoinPool 如下调整池大小:
private static Set<String> ThreadNameSet = new HashSet<>();
private static Callable<Long> getSum() {
List<Long> aList = LongStream.rangeClosed(0, 10_000_000).boxed().collect(Collectors.toList());
return () -> aList.parallelStream()
.peek((i) -> {
String threadName = Thread.currentThread().getName();
ThreadNameSet.add(threadName);
})
.reduce(0L, Long::sum);
}
private static void testForkJoinPool() {
final int parallelism = 10;
ForkJoinPool forkJoinPool = null;
Long result = 0L;
try {
forkJoinPool = new ForkJoinPool(parallelism);
result = forkJoinPool.submit(getSum()).get(); //this makes it an overall blocking call
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
} finally {
if (forkJoinPool != null) {
forkJoinPool.shutdown(); //always remember to shutdown the pool
}
}
out.println(result);
out.println(ThreadNameSet);
}
Here is the output saying the pool is using more threads than the default 4.
这是输出说池使用的线程比默认值4 多。
50000005000000
[ForkJoinPool-1-worker-8, ForkJoinPool-1-worker-9, ForkJoinPool-1-worker-6, ForkJoinPool-1-worker-11, ForkJoinPool-1-worker-10, ForkJoinPool-1-worker-1, ForkJoinPool-1-worker-15, ForkJoinPool-1-worker-13, ForkJoinPool-1-worker-4, ForkJoinPool-1-worker-2]
But actually there is a weirdo, when I tried to achieve the same result using ThreadPoolExecutor
as follows:
但实际上有一个奇怪的人,当我尝试使用ThreadPoolExecutor
以下方法实现相同的结果时:
BlockingDeque blockingDeque = new LinkedBlockingDeque(1000);
ThreadPoolExecutor fixedSizePool = new ThreadPoolExecutor(10, 20, 60, TimeUnit.SECONDS, blockingDeque, new MyThreadFactory("my-thread"));
but I failed.
但我失败了。
It will only start the parallelStreamin a new thread and then everything else is just the same, which againproves that the parallelStream
will use the ForkJoinPoolto start its child threads.
它只会在新线程中启动parallelStream,然后其他一切都一样,这再次证明parallelStream
将使用ForkJoinPool来启动其子线程。
回答by Scott Langley
Note:There appears to be a fix implemented in JDK 10 that ensures the Custom Thread Pool uses the expected number of threads.
注意:似乎在 JDK 10 中实现了一个修复程序,可确保自定义线程池使用预期的线程数。
Parallel stream execution within a custom ForkJoinPool should obey the parallelism https://bugs.openjdk.java.net/browse/JDK-8190974
自定义 ForkJoinPool 中的并行流执行应遵守并行性 https://bugs.openjdk.java.net/browse/JDK-8190974