Java 并行流、收集器和线程安全

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/22350288/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-13 15:08:31  来源:igfitidea点击:

Parallel streams, collectors and thread safety

javaconcurrencyparallel-processingjava-8java-stream

提问by assylias

See the simple example below that counts the number of occurences of each word in a list:

请参阅下面的简单示例,该示例计算列表中每个单词的出现次数:

Stream<String> words = Stream.of("a", "b", "a", "c");
Map<String, Integer> wordsCount = words.collect(toMap(s -> s, s -> 1,
                                                      (i, j) -> i + j));

At the end, wordsCountis {a=2, b=1, c=1}.

最后,wordsCount{a=2, b=1, c=1}

But my stream is very large and I want to parallelise the job, so I write:

但是我的流非常大,我想并行化这项工作,所以我写道:

Map<String, Integer> wordsCount = words.parallel()
                                       .collect(toMap(s -> s, s -> 1,
                                                      (i, j) -> i + j));

However I have noticed that wordsCountis a simple HashMapso I wonder if I need to explicitly ask for a concurrent map to ensure thread safety:

但是我注意到这wordsCount很简单,HashMap所以我想知道是否需要明确要求并发映射以确保线程安全:

Map<String, Integer> wordsCount = words.parallel()
                                       .collect(toConcurrentMap(s -> s, s -> 1,
                                                                (i, j) -> i + j));

Can non-concurrent collectors be safely used with a parallel stream or should I only use the concurrent versions when collecting from a parallel stream?

非并发收集器可以安全地与并行流一起使用,还是应该在从并行流收集时只使用并发版本?

采纳答案by Stuart Marks

Can non-concurrent collectors be safely used with a parallel stream or should I only use the concurrent versions when collecting from a parallel stream?

非并发收集器可以安全地与并行流一起使用,还是应该在从并行流收集时只使用并发版本?

It is safe to use a non-concurrent collector in a collectoperation of a parallel stream.

collect并行流的操作中使用非并发收集器是安全的。

In the specificationof the Collectorinterface, in the section with half a dozen bullet points, is this:

在接口规范Collector,在有六个要点的部分中,是这样的:

For non-concurrent collectors, any result returned from the result supplier, accumulator, or combiner functions must be serially thread-confined. This enables collection to occur in parallel without the Collector needing to implement any additional synchronization. The reduction implementation must manage that the input is properly partitioned, that partitions are processed in isolation, and combining happens only after accumulation is complete.

对于非并发收集器,从结果提供器、累加器或组合器函数返回的任何结果都必须是串行线程限制的。这使得收集能够并行发生,而无需收集器实现任何额外的同步。归约实现必须管理输入被正确分区,分区被隔离处理,合​​并仅在累加完成后发生。

This means that the various implementations provided by the Collectorsclass can be used with parallel streams, even though some of those implementations might not be concurrent collectors. This also applies to any of your own non-concurrent collectors that you might implement. They can be used safely with parallel streams, provided your collectors don't interfere with the stream source, are side-effect free, order independent, etc.

这意味着Collectors该类提供的各种实现可以与并行流一起使用,即使其中一些实现可能不是并发收集器。这也适用于您可能实现的任何非并发收集器。它们可以安全地与并行流一起使用,前提是您的收集器不干扰流源、无副作用、独立于顺序等。

I also recommend reading the Mutable Reductionsection of the java.util.stream package documentation. In the middle of this section is an example that is stated to be parallelizable, but which collects results into an ArrayList, which is not thread-safe.

我还建议阅读java.util.stream 包文档的Mutable Reduction部分。在本节的中间是一个被声明为可并行化的示例,但它将结果收集到一个ArrayList不是线程安全的 。

The way this works is that a parallel stream ending in a non-concurrent collector makes sure that different threads are always operating on different instances of the intermediate result collections. That's why a collector has a Supplierfunction, for creating as many intermediate collections as there are threads, so each thread can accumulate into its own. When intermediate results are to be merged, they are handed off safely between threads, and at any given time only a single thread is merging any pair of intermediate results.

其工作方式是,以非并发收集器结尾的并行流确保不同的线程始终在中间结果集合的不同实例上运行。这就是为什么收集器有一个Supplier函数,用于创建与线程一样多的中间集合,因此每个线程都可以累积到自己的集合中。当要合并中间结果时,它们在线程之间安全地传递,并且在任何给定时间只有一个线程合并任何一对中间结果。

回答by nosid

It is safe to use non-concurrent collections and non-atomic counters with parallel streams.

将非并发集合和非原子计数器与并行流一起使用是安全的。

If you take a look at the documentation of Stream::collect, you find the following paragraph:

如果您查看Stream::collect的文档,您会发现以下段落:

Like reduce(Object, BinaryOperator), collect operations can be parallelized without requiring additional synchronization.

就像reduce(Object, BinaryOperator),收集操作可以并行化而无需额外的同步。

And for the method Stream::reduce:

对于Stream::reduce方法:

While this may seem a more roundabout way to perform an aggregation compared to simply mutating a running total in a loop, reduction operations parallelize more gracefully, without needing additional synchronization and with greatly reduced risk of data races.

虽然与简单地改变循环中的运行总数相比,这似乎是一种更迂回的执行聚合方式,但归约操作可以更优雅地并行化,无需额外的同步,并且大大降低了数据竞争的风险。

This might be a bit surprising. However, note that parallel streamsare based on a fork-join model. That means the concurrent execution works as follows:

这可能有点令人惊讶。但是,请注意并行流基于fork-join 模型。这意味着并发执行的工作原理如下:

  • split sequence into two parts with about the same size
  • process each part individually
  • collect the results of both parts and combine them into one result
  • 将序列分成大小相同的两部分
  • 单独处理每个部分
  • 收集两部分的结果并将它们合并为一个结果

In the second step, the three steps are recursively applied to the sub-sequences.

在第二步中,这三个步骤被递归地应用于子序列。

An example should make that clear. The

一个例子应该能说明这一点。这

IntStream.range(0, 4)
    .parallel()
    .collect(Trace::new, Trace::accumulate, Trace::combine);

The only purpose of the class Traceis log the constructor and method calls. If you execute this statement, it prints the following lines:

Trace类的唯一目的是记录构造函数和方法调用。如果您执行此语句,它会打印以下几行:

thread:  9  /  operation: new
thread: 10  /  operation: new
thread: 10  /  operation: accumulate
thread:  1  /  operation: new
thread:  1  /  operation: accumulate
thread:  1  /  operation: combine
thread: 11  /  operation: new
thread: 11  /  operation: accumulate
thread:  9  /  operation: accumulate
thread:  9  /  operation: combine
thread:  9  /  operation: combine

You can see, that four Traceobjects have been created, accumulatehas been called once on each object, and combinehas been used three times to combine the four objects into one. Each object can only be accesses by one thread at a time. That makes the code thread-safe, and the same applies to the method Collectors::toMap.

你可以看到,四个跟踪对象被创建,积累一直被称为一次每个对象上,并结合已使用三次四个对象合二为一。每个对象一次只能被一个线程访问。这使得代码线程安全,这同样适用于方法Collectors::toMap

回答by Brian Goetz

All collectors, if they follow the rules in the specification, are safe to run in parallel or sequential. Parallel-readiness is a key part of the design here.

所有收集器,如果它们遵循规范中的规则,则可以安全地并行或顺序运行。并行准备是这里设计的关键部分。

The distinction between concurrent and non-concurrent collectors have to do with the approach to parallelization.

并发收集器和非并发收集器之间的区别与并行化方法有关。

An ordinary (non-concurrent) collector operates by merging sub-results. So the source is partitioned into a bunch of chunks, each chunk is collected into a result container (like a list or a map), and then the sub-results are merged into a bigger result container. This is safe and order-preserving, but for some kinds of containers -- especially maps -- can be expensive, since merging two maps by key is often expensive.

普通(非并发)收集器通过合并子结果进行操作。所以源被划分成一堆块,每个块被收集到一个结果容器(如列表或地图)中,然后子结果合并到一个更大的结果容器中。这是安全且保持顺序的,但对于某些类型的容器——尤其是地图——可能很昂贵,因为通过键合并两个地图通常很昂贵。

A concurrent collector instead creates one result container, whose insertion operations are guaranteed to be thread-safe, and blasts elements into it from multiple threads. With a highly concurrent result container like ConcurrentHashMap, this approach may well perform better than merging ordinary HashMaps.

相反,并发收集器创建一个结果容器,其插入操作保证是线程安全的,并从多个线程将元素爆炸到其中。对于 ConcurrentHashMap 这样的高并发结果容器,这种方式可能会比合并普通的 HashMap 表现得更好。

So, the concurrent collectors are strictly optimizations over their ordinary counterparts. And they don't come without a cost; because elements are being blasted in from many threads, concurrent collectors generally cannot preserve encounter order. (But, often you don't care -- when creating a word count histogram, you don't care which instance of "foo" you counted first.)

因此,并发收集器是对普通收集器的严格优化。他们不是没有代价的。因为元素是从多个线程中被炸进来的,并发收集器通常无法保持遇到顺序。(但是,通常您并不关心——在创建字数统计直方图时,您并不关心首先计算的是哪个“foo”实例。)