java 为什么并行流不使用 ForkJoinPool 的所有线程?

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/36947336/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-11-03 02:01:49  来源:igfitidea点击:

Why does the parallel stream not use all the threads of the ForkJoinPool?

javamultithreadingconcurrencyjava-8java-stream

提问by Pablo Matias Gomez

So I know that if you use the parallelStreamwithout a custom ForkJoinPool it will use the default ForkJoinPool which by default has one less threads as you have processors.

所以我知道,如果你使用parallelStream没有自定义 ForkJoinPool 的,它将使用默认的 ForkJoinPool,默认情况下,它比你有处理器少一个线程。

So, as stated here(and also in the other answer of that question) in order to have more parallelism, you have to:

因此,正如此处(以及该问题的另一个答案中)所述,为了获得更多并行性,您必须:

submit the parallel stream execution to your own ForkJoinPool: yourFJP.submit(() -> stream.parallel().forEach(doSomething));

将并行流执行提交到您自己的 ForkJoinPool: yourFJP.submit(() -> stream.parallel().forEach(doSomething));

So, I did this:

所以,我这样做了:

import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.stream.IntStream;
import com.google.common.collect.Sets;

public class Main {

    public static void main(String[] args) throws InterruptedException, ExecutionException {

        ForkJoinPool forkJoinPool = new ForkJoinPool(1000);

        IntStream stream = IntStream.range(0, 999999);

        final Set<String> thNames = Collections.synchronizedSet(new HashSet<String>());

        forkJoinPool.submit(() -> {
            stream.parallel().forEach(n -> {

                System.out.println("Processing n: " + n);
                try {
                    Thread.sleep(500);
                    thNames.add(Thread.currentThread().getName());
                    System.out.println("Size: " + thNames.size() + " activeCount: " + forkJoinPool.getActiveThreadCount());
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            });
        }).get();
    }
}

I made a Set of thread Names in order to see how many threads are being created, and also logged the number of active threads that the pool has and both numbers don't grow up more that 16, so that means that the parallelism here is not being more than 16 (why even 16?). If I do not use the forkJoinPool, I get 4 as parallelism, which is according to the number of processors I have.

我创建了一组线程名称以查看正在创建的线程数,并记录了池中活动线程的数量,并且两个数字的增长都不会超过 16,因此这意味着这里的并行度是不超过 16 岁(为什么甚至是 16 岁?)。如果我不使用 forkJoinPool,我会得到 4 个并行度,这取决于我拥有的处理器数量。

Why does it give me 16 and not 1000?

为什么它给我 16 而不是 1000?

回答by Dimitar Dimitrov

Update

更新

Originally this answer was an elaborate explanation claiming that the ForkJoinPoolapplies back-pressure and doesn't even reach the prescribed parallelism level, because there are always idle workers available to process the stream.

最初这个答案是一个详尽的解释,声称ForkJoinPool应用了背压并且甚至没有达到规定的并行度级别,因为总是有空闲的工作人员可以处理流。

That's incorrect.

那是不正确的。

The actual answer is provided in the original question to which this was marked as duplicate - using a custom ForkJoinPoolfor stream processing is not officially supported, and when using forEach, the default pool parallelism is used to determine the stream spliterator behavior.

实际答案在原始问题中提供,这被标记为重复 -ForkJoinPool不正式支持使用自定义进行流处理,并且在使用时forEach,默认池并行性用于确定流拆分器行为。

Here's an example how when tasks are manually submitted to a custom ForkJoinPool, the pool's active thread count easily reaches its parallelism level:

这是一个示例,当任务手动提交到 custom 时ForkJoinPool,池的活动线程数很容易达到其并行级别:

for (int i = 0; i < 1_000_000; ++i) {
   forkJoinPool.submit(() -> {
      try {
         Thread.sleep(1);
         thNames.add(Thread.currentThread().getName());
         System.out.println("Size: " + thNames.size() + " activeCount: " + forkJoinPool.getActiveThreadCount() + " parallelism: " + forkJoinPool.getParallelism());
      } catch (Exception e) {
         throw new RuntimeException(e);
      }
   });
}

Thanks to Stuart Marksfor pointing this out and to Sotirios Delimanolisfor arguing that my original answer is wrong :)

感谢Stuart Marks指出这一点,感谢Sotirios Delimanolis认为我原来的答案是错误的 :)

回答by edharned

It seems to me that when you submit a lambda to the FJP that lambda will use the common pool and not the FJP. Sotirios Delimanolis proved this with his comment, above. What you are submitting is a Task that runs in your FJP.

在我看来,当您向 FJP 提交 lambda 时,该 lambda 将使用公共池而不是 FJP。Sotirios Delimanolis 用上面的评论证明了这一点。您提交的是在您的 FJP 中运行的任务。

Try profiling this code to see what threads are actually being used.

尝试分析此代码以查看实际使用了哪些线程。

You cannotname the threads within the FJP.

不能在 FJP 中命名线程。