Java Stream API:为什么要区分顺序和并行执行模式?

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/22950642/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-13 19:33:28  来源:igfitidea点击:

Java Stream API: why the distinction between sequential and parallel execution mode?

javaparallel-processingjava-8java-stream

提问by davnicwil

From the Stream javadoc:

流 javadoc

Stream pipelines may execute either sequentially or in parallel. This execution mode is a property of the stream. Streams are created with an initial choice of sequential or parallel execution.

流管道可以顺序或并行执行。这种执行模式是流的一个属性。流是通过初始选择顺序或并行执行来创建的。

My assumptions:

我的假设:

  1. There is no functional difference between a sequential/parallel streams. Output is never affected by execution mode.
  2. A parallel stream is always preferable, given appropriate number of cores and problem size to justify the overhead, due to the performance gains.
  3. We want to write code once and run anywhere without having to care about the hardware (this is Java, after all).
  1. 顺序/并行流之间没有功能差异。输出永远不受执行模式的影响。
  2. 考虑到适当数量的内核和问题大小来证明开销的合理性,并行流总是更可取的,因为性能增益。
  3. 我们希望编写一次代码并在任何地方运行,而不必关心硬件(毕竟这是 Java)。

Assuming these assumptions are valid (nothing wrong with a bit of meta-assumption), what's the value in having the execution mode exposed in the api?

假设这些假设是有效的(有点元假设没有错),在 api 中公开执行模式的价值是什么?

It seems like you should just be able to declare a Stream, and the choice of sequential/parallel execution should be handled automagically in a layer below, either by library code or the JVM itself as a function of the cores available at runtime, the size of the problem, etc.

似乎您应该能够声明 a Stream,并且顺序/并行执行的选择应该在下面的层中自动处理,通过库代码或 JVM 本身作为运行时可用内核的函数,大小问题等。

Sure, assuming parallel streams also work on a single core machine, perhaps just always using a parallel stream achieves this. But this is really ugly - why have explicit references to parallel streams in my code when it's the default option?

当然,假设并行流也可以在单核机器上运行,也许只是总是使用并行流就可以实现这一点。但这真的很难看 - 当它是默认选项时,为什么在我的代码中显式引用并行流?

Even if there is a scenario where you'd deliberately want to hard code the use of a sequential stream - why is there not just a sub-interface SequentialStreamfor that purpose, rather than polluting Streamwith an execution mode switch?

即使在SequentialStream某些情况下您会故意对顺序流的使用进行硬编码 - 为什么不只是一个用于该目的的子接口,而不是Stream使用执行模式切换进行污染?

采纳答案by Louis Wasserman

It seems like you should just be able to declare a Stream, and the choice of sequential/parallel execution should be handled automagically in a layer below, either by library code or the JVM itself as a function of the cores available at runtime, the size of the problem, etc.

似乎您应该能够声明一个 Stream,并且顺序/并行执行的选择应该在下面的层中自动处理,通过库代码或 JVM 本身作为运行时可用内核的函数,大小的问题等。

The reality is that a) streams are a library, and have no special JVM magic, and b) you can't really design a library smart enough to automagically figure out what the right decision is in this particular case. There's no sensible way to estimate how costly a particular function will be without running it -- even if you could introspect its implementation, which you can't -- and now you're introducing a benchmark into every stream operation, trying to figure out if parallelizing it will be worth the cost of the parallelism overhead. That's just not practical, especially given that you don't know in advance how bad the parallelism overhead is, either.

现实情况是 a) 流是一个库,没有特殊的 JVM 魔法,b) 你不能真正设计一个足够聪明的库来自动找出在这种特殊情况下正确的决定。没有明智的方法来估计不运​​行特定函数的成本有多高——即使你可以内省它的实现,但你不能——现在你正在为每个流操作引入一个基准,试图弄清楚如果并行化,那么并行开销的成本是值得的。这不切实际,特别是考虑到您事先也不知道并行开销有多糟糕。

A parallel stream is always preferable, given appropriate number of cores and problem size to justify the overhead, due to the performance gains.

考虑到适当数量的内核和问题大小来证明开销的合理性,并行流总是更可取的,因为性能增益。

Not always, in practice. Some tasks are just so small that they're not worth parallelizing, and parallelism does always have some overhead. (And frankly, most programmers tend to overestimate the usefulness of parallelism, slapping it everywhere when it's really hurtingperformance.)

并非总是如此,在实践中。有些任务太小以至于它们不值得并行化,并且并行化总是有一些开销。(坦率地说,大多数程序员往往高估了并行性的用处,当它真的损害性能时,它会随处可见。)

Basically, it's a hard enough problem that you basically have to shove it off onto the programmer.

基本上,这是一个足够困难的问题,你基本上必须把它推给程序员。

回答by Kanagavelu Sugumar

There is no functional difference between a sequential/parallel streams. Output is never affected by execution mode.

顺序/并行流之间没有功能差异。输出永远不受执行模式的影响。

There is a difference between sequential/parallel streams execution. In the below code TEST_2results shows that parallel thread execution is very much faster than the sequential way.

顺序/并行流执行之间存在差异。下面的代码TEST_2结果表明,并行线程执行比顺序方式快得多。

A parallel stream is always preferable, given appropriate number of cores and problem size to justify the overhead, due to the performance gains.

考虑到适当数量的内核和问题大小来证明开销的合理性,并行流总是更可取的,因为性能增益。

Not really. if task is not worthy(simple tasks) to be executed in parallel threads, then it is simply we are adding overhead to our code. TEST_1results shows this. Also note that if all the worker threads are busy on one parallel execution tasks; then other parallel stream operation elsewhere in your code will be waiting for that.

并不真地。如果任务不值得(简单任务)在并行线程中执行,那么这只是我们在代码中增加了开销。 TEST_1结果表明了这一点。还要注意,如果所有工作线程都忙于一个并行执行任务;那么代码中其他地方的其他并行流操作将等待它。

We want to write code once and run anywhere without having to care about the hardware (this is Java, after all).

我们希望编写一次代码并在任何地方运行,而不必关心硬件(毕竟这是 Java)。

Since only programmer knows about; is it worthy to execute this task in parallel/sequential irrespective of CPU's. So java API exposed both option to the developer.

由于只有程序员知道;无论CPU如何,并行/顺序执行此任务是否值得。因此,java API 向开发人员公开了这两个选项。

import java.util.ArrayList;
import java.util.List;

/*
 * Performance test over internal(parallel/sequential) and external iterations.
 * https://docs.oracle.com/javase/tutorial/collections/streams/parallelism.html
 * 
 * 
 * Parallel computing involves dividing a problem into subproblems, 
 * solving those problems simultaneously (in parallel, with each subproblem running in a separate thread),
 *  and then combining the results of the solutions to the subproblems. Java SE provides the fork/join framework, 
 *  which enables you to more easily implement parallel computing in your applications. However, with this framework, 
 *  you must specify how the problems are subdivided (partitioned). 
 *  With aggregate operations, the Java runtime performs this partitioning and combining of solutions for you.
 * 
 * Limit the parallelism that the ForkJoinPool offers you. You can do it yourself by supplying the -Djava.util.concurrent.ForkJoinPool.common.parallelism=1,
 *  so that the pool size is limited to one and no gain from parallelization
 *  
 *  @see ForkJoinPool
 *  https://docs.oracle.com/javase/tutorial/essential/concurrency/forkjoin.html
 *  
 *  ForkJoinPool, that pool creates a fixed number of threads (default: number of cores) and 
 *  will never create more threads (unless the application indicates a need for those by using managedBlock).
 *   *  http://stackoverflow.com/questions/10797568/what-determines-the-number-of-threads-a-java-forkjoinpool-creates
 *  
 */
public class IterationThroughStream {
    private static boolean found = false;
    private static List<Integer> smallListOfNumbers = null;
    public static void main(String[] args) throws InterruptedException {


        // TEST_1
        List<String> bigListOfStrings = new ArrayList<String>();
        for(Long i = 1l; i <= 1000000l; i++) {
            bigListOfStrings.add("Counter no: "+ i);
        }

        System.out.println("Test Start");
        System.out.println("-----------");
        long startExternalIteration = System.currentTimeMillis();
        externalIteration(bigListOfStrings);
        long endExternalIteration = System.currentTimeMillis();
        System.out.println("Time taken for externalIteration(bigListOfStrings) is :" + (endExternalIteration - startExternalIteration) + " , and the result found: "+ found);

        long startInternalIteration = System.currentTimeMillis();
        internalIteration(bigListOfStrings);
        long endInternalIteration = System.currentTimeMillis();
        System.out.println("Time taken for internalIteration(bigListOfStrings) is :" + (endInternalIteration - startInternalIteration) + " , and the result found: "+ found);





        // TEST_2
        smallListOfNumbers = new ArrayList<Integer>();
        for(int i = 1; i <= 10; i++) {
            smallListOfNumbers.add(i);
        }

        long startExternalIteration1 = System.currentTimeMillis();
        externalIterationOnSleep(smallListOfNumbers);
        long endExternalIteration1 = System.currentTimeMillis();
        System.out.println("Time taken for externalIterationOnSleep(smallListOfNumbers) is :" + (endExternalIteration1 - startExternalIteration1));

        long startInternalIteration1 = System.currentTimeMillis();
        internalIterationOnSleep(smallListOfNumbers);
        long endInternalIteration1 = System.currentTimeMillis();
        System.out.println("Time taken for internalIterationOnSleep(smallListOfNumbers) is :" + (endInternalIteration1 - startInternalIteration1));




        // TEST_3
        Thread t1 = new Thread(IterationThroughStream :: internalIterationOnThread);
        Thread t2 = new Thread(IterationThroughStream :: internalIterationOnThread);
        Thread t3 = new Thread(IterationThroughStream :: internalIterationOnThread);
        Thread t4 = new Thread(IterationThroughStream :: internalIterationOnThread);

        t1.start();
        t2.start();
        t3.start();
        t4.start();

        Thread.sleep(30000);
    }


    private static boolean externalIteration(List<String> bigListOfStrings) {
        found = false;
        for(String s : bigListOfStrings) {
            if(s.equals("Counter no: 1000000")) {
                found = true;
            }
        }
        return found;
    }

    private static boolean internalIteration(List<String> bigListOfStrings) {
        found = false;
        bigListOfStrings.parallelStream().forEach(
                (String s) -> { 
                    if(s.equals("Counter no: 1000000")){  //Have a breakpoint to look how many threads are spawned.
                        found = true;
                    }

                }
            );
        return found;       
    }


    private static boolean externalIterationOnSleep(List<Integer> smallListOfNumbers) {
        found = false;
        for(Integer s : smallListOfNumbers) {
            try {
                Thread.sleep(100);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        return found;
    }

    private static boolean internalIterationOnSleep(List<Integer> smallListOfNumbers) {
        found = false;
        smallListOfNumbers.parallelStream().forEach( //Removing parallelStream() will behave as single threaded (sequential access).
                (Integer s) -> {
                    try {
                        Thread.sleep(100); //Have a breakpoint to look how many threads are spawned.
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            );
        return found;       
    }

    public static void internalIterationOnThread() {
        smallListOfNumbers.parallelStream().forEach(
                (Integer s) -> {
                    try {
                        /*
                         * DANGEROUS
                         * This will tell you that if all the 7 FJP(Fork join pool) worker threads are blocked for one single thread (e.g. t1), 
                         * then other normal three(t2 - t4) thread wont execute, will wait for FJP worker threads. 
                         */
                        Thread.sleep(100); //Have a breakpoint here.
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            );
    }
}

回答by Tagir Valeev

There's an interesting case in this questionshowing that sometimes parallel stream might be slower in orders of magnitude. In that particular example parallel version runs for ten minutes while sequential takes several seconds.

这个问题中有一个有趣的例子表明,有时并行流可能会慢几个数量级。在该特定示例中,并行版本运行十分钟,而顺序运行需要几秒钟。

回答by roookeee

It seems like you should just be able to declare a Stream, and the choice of sequential/parallel execution should be handled automagically in a layer below, either by library code or the JVM itself as a function of the cores available at runtime, the size of the problem, etc.

似乎您应该能够声明一个 Stream,并且顺序/并行执行的选择应该在下面的层中自动处理,通过库代码或 JVM 本身作为运行时可用内核的函数,大小的问题等。

To add to the already given answers:

要添加到已经给出的答案:

Thats a pretty bold assumption. Imagine simulating a board-game for training some form of AI, it's pretty easy to parallelize the execution of different playthroughs - just create a new instance and let it run on its own thread. As it doesn't share any state with another playthrough you don't even have to consider multi-threading issues in your game logic. If you on the other hand parallelize the game logic itself you get all sorts of multi-threading issues and most likely pay a steep price for complexity and even performance.

这是一个相当大胆的假设。想象一下模拟棋盘游戏来训练某种形式的 AI,并行执行不同的游戏过程非常容易——只需创建一个新实例并让它在自己的线程上运行即可。由于它不与另一个游戏共享任何状态,因此您甚至不必考虑游戏逻辑中的多线程问题。另一方面,如果您将游戏逻辑本身并行化,则会遇到各种多线程问题,并且很可能会为复杂性甚至性能付出高昂的代价。

Having control over the behaviour of streams gives you (appropriately limited) flexibility which in and of itself is a key feature for good library design.

控制流的行为为您提供(适当有限的)灵活性,这本身就是良好库设计的一个关键特性。