对 Java 8 流进行分区
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/32434592/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Partition a Java 8 Stream
提问by Trader001
How to implement "partition" operation on Java 8 Stream? By partition I mean, divide a stream into sub-streams of a given size. Somehow it will be identical to Guava Iterators.partition()method, just it's desirable that the partitions are lazily-evaluated Streams rather than List's.
如何在 Java 8 Stream 上实现“分区”操作?分区我的意思是,将流划分为给定大小的子流。不知何故,它与 Guava Iterators.partition()方法相同,只是希望分区是延迟评估的 Streams 而不是 List 的。
采纳答案by Tagir Valeev
It's impossible to partition the arbitrary source stream to the fixed size batches, because this will screw up the parallel processing. When processing in parallel you may not know how many elements in the first sub-task after the split, so you cannot create the partitions for the next sub-task until the first is fully processed.
不可能将任意源流划分为固定大小的批次,因为这会破坏并行处理。并行处理时,您可能不知道拆分后第一个子任务中有多少元素,因此在第一个子任务完全处理之前,您无法为下一个子任务创建分区。
However it is possible to create the stream of partitions from the random access List
. Such feature is available, for example, in my StreamEx
library:
然而,可以从随机访问创建分区流List
。例如,在我的StreamEx
图书馆中可以使用此类功能:
List<Type> input = Arrays.asList(...);
Stream<List<Type>> stream = StreamEx.ofSubLists(input, partitionSize);
Or if you really want the stream of streams:
或者,如果您真的想要流的流:
Stream<Stream<Type>> stream = StreamEx.ofSubLists(input, partitionSize).map(List::stream);
If you don't want to depend on third-party libraries, you can implement such ofSubLists
method manually:
如果不想依赖第三方库,可以ofSubLists
手动实现这样的方法:
public static <T> Stream<List<T>> ofSubLists(List<T> source, int length) {
if (length <= 0)
throw new IllegalArgumentException("length = " + length);
int size = source.size();
if (size <= 0)
return Stream.empty();
int fullChunks = (size - 1) / length;
return IntStream.range(0, fullChunks + 1).mapToObj(
n -> source.subList(n * length, n == fullChunks ? size : (n + 1) * length));
}
This implementation looks a little bit long, but it takes into account some corner cases like close-to-MAX_VALUE list size.
这个实现看起来有点长,但它考虑了一些极端情况,比如接近 MAX_VALUE 列表大小。
If you want parallel-friendly solution for unordered stream (so you don't care which stream elements will be combined in single batch), you may use the collector like this (thanks to @sibnick for inspiration):
如果您想要无序流的并行友好解决方案(因此您不关心哪些流元素将在单个批处理中组合),您可以像这样使用收集器(感谢@sibnick 的启发):
public static <T, A, R> Collector<T, ?, R> unorderedBatches(int batchSize,
Collector<List<T>, A, R> downstream) {
class Acc {
List<T> cur = new ArrayList<>();
A acc = downstream.supplier().get();
}
BiConsumer<Acc, T> accumulator = (acc, t) -> {
acc.cur.add(t);
if(acc.cur.size() == batchSize) {
downstream.accumulator().accept(acc.acc, acc.cur);
acc.cur = new ArrayList<>();
}
};
return Collector.of(Acc::new, accumulator,
(acc1, acc2) -> {
acc1.acc = downstream.combiner().apply(acc1.acc, acc2.acc);
for(T t : acc2.cur) accumulator.accept(acc1, t);
return acc1;
}, acc -> {
if(!acc.cur.isEmpty())
downstream.accumulator().accept(acc.acc, acc.cur);
return downstream.finisher().apply(acc.acc);
}, Collector.Characteristics.UNORDERED);
}
Usage example:
用法示例:
List<List<Integer>> list = IntStream.range(0,20)
.boxed().parallel()
.collect(unorderedBatches(3, Collectors.toList()));
Result:
结果:
[[2, 3, 4], [7, 8, 9], [0, 1, 5], [12, 13, 14], [17, 18, 19], [10, 11, 15], [6, 16]]
Such collector is perfectly thread-safe and produces ordered batches for sequential stream.
这种收集器是完全线程安全的,并为顺序流生成有序批次。
If you want to apply an intermediate transformation for every batch, you may use the following version:
如果要对每个批次应用中间转换,可以使用以下版本:
public static <T, AA, A, B, R> Collector<T, ?, R> unorderedBatches(int batchSize,
Collector<T, AA, B> batchCollector,
Collector<B, A, R> downstream) {
return unorderedBatches(batchSize,
Collectors.mapping(list -> list.stream().collect(batchCollector), downstream));
}
For example, this way you can sum the numbers in every batch on the fly:
例如,通过这种方式,您可以即时对每批中的数字求和:
List<Integer> list = IntStream.range(0,20)
.boxed().parallel()
.collect(unorderedBatches(3, Collectors.summingInt(Integer::intValue),
Collectors.toList()));
回答by Trader001
It seem like, as Jon Skeet has shown in his comment, it's not possible to make partitions lazy. For non-lazy partitions, I already have this code:
正如 Jon Skeet 在他的评论中所表明的那样,似乎不可能使分区变得懒惰。对于非惰性分区,我已经有了这个代码:
public static <T> Stream<Stream<T>> partition(Stream<T> source, int size) {
final Iterator<T> it = source.iterator();
final Iterator<Stream<T>> partIt = Iterators.transform(Iterators.partition(it, size), List::stream);
final Iterable<Stream<T>> iterable = () -> partIt;
return StreamSupport.stream(iterable.spliterator(), false);
}
回答by sibnick
I think it is possible with some sort of hack inside:
我认为内部有某种黑客攻击是可能的:
create utility class for batch:
为批处理创建实用程序类:
public static class ConcurrentBatch {
private AtomicLong id = new AtomicLong();
private int batchSize;
public ConcurrentBatch(int batchSize) {
this.batchSize = batchSize;
}
public long next() {
return (id.getAndIncrement()) / batchSize;
}
public int getBatchSize() {
return batchSize;
}
}
and method:
和方法:
public static <T> void applyConcurrentBatchToStream(Consumer<List<T>> batchFunc, Stream<T> stream, int batchSize){
ConcurrentBatch batch = new ConcurrentBatch(batchSize);
//hack java map: extends and override computeIfAbsent
Supplier<ConcurrentMap<Long, List<T>>> mapFactory = () -> new ConcurrentHashMap<Long, List<T>>() {
@Override
public List<T> computeIfAbsent(Long key, Function<? super Long, ? extends List<T>> mappingFunction) {
List<T> rs = super.computeIfAbsent(key, mappingFunction);
//apply batchFunc to old lists, when new batch list is created
if(rs.isEmpty()){
for(Entry<Long, List<T>> e : entrySet()) {
List<T> batchList = e.getValue();
//todo: need to improve
synchronized (batchList) {
if (batchList.size() == batch.getBatchSize()){
batchFunc.accept(batchList);
remove(e.getKey());
batchList.clear();
}
}
}
}
return rs;
}
};
stream.map(s -> new AbstractMap.SimpleEntry<>(batch.next(), s))
.collect(groupingByConcurrent(AbstractMap.SimpleEntry::getKey, mapFactory, mapping(AbstractMap.SimpleEntry::getValue, toList())))
.entrySet()
.stream()
//map contains only unprocessed lists (size<batchSize)
.forEach(e -> batchFunc.accept(e.getValue()));
}
回答by John McClean
Provided you want to use the Stream sequentially, it is possible to partition a Stream (as well as perform related functions such as windowing - which I think is what you really want in this case). Two libraries that will support partitoning for standard Streams are cyclops-react(I am the author) and jOOλwhich cyclops-react extends (to add functionality such as Windowing).
如果您想按顺序使用 Stream,则可以对 Stream 进行分区(以及执行相关功能,例如窗口化 - 我认为在这种情况下这是您真正想要的)。支持标准流分区的两个库是cyclops-react(我是作者)和cyclops-react 扩展的jOOλ(添加窗口等功能)。
cyclops-streams has a collection of static functions StreamUtilsfor operating on Java Streams, and a series of functions such as splitAt, headAndTail, splitBy, partition for partitioning.
cyclops-streams 有一组静态函数StreamUtils用于对Java Streams 进行操作,还有splitAt、headAndTail、splitBy、partition 等一系列用于分区的函数。
To window a Stream into a Stream of nested Streams of size 30 you can use the window method.
要将流窗口化为大小为 30 的嵌套流的流,您可以使用 window 方法。
To the OPs point, in Streaming terms, splitting a Stream into multiple Streams of a given size is a Windowing operation (rather than a Partitioning operation).
就 OP 而言,在 Streaming 术语中,将一个 Stream 拆分为多个给定大小的 Streams 是一个窗口操作(而不是分区操作)。
Stream<Streamable<Integer>> streamOfStreams = StreamUtils.window(stream,30);
There is a Stream extension class called ReactiveSeqthat extends jool.Seqand adds Windowing functionality, that may make the code a little cleaner.
有一个名为ReactiveSeq的 Stream 扩展类,它扩展了jool.Seq并添加了窗口功能,这可能会使代码更简洁一些。
ReactiveSeq<Integer> seq;
ReactiveSeq<ListX<Integer>> streamOfLists = seq.grouped(30);
As Tagir points out above though, this isn't suitable for parallel Streams. If you want to window or batch a Stream you wish to executed in a multithreaded fashion. LazyFutureStream in cyclops-reactmight be useful (Windowing is on the to-do list, but plain old batching is available now).
正如 Tagir 在上面指出的那样,这不适合并行流。如果要对希望以多线程方式执行的流进行窗口化或批处理。cyclops -react 中的LazyFutureStream可能很有用(窗口化在待办事项列表中,但现在可以使用普通的旧批处理)。
In this case data will be passed from the multiple threads executing the Stream to a Multi-Producer/Single-Consumer wait-free Queue and the sequential data from that queue can be windowed before being distributed to threads again.
在这种情况下,数据将从执行 Stream 的多个线程传递到多生产者/单消费者等待队列,并且该队列中的顺序数据可以在再次分发给线程之前被窗口化。
Stream<List<Data>> batched = new LazyReact().range(0,1000)
.grouped(30)
.map(this::process);
回答by user_3380739
Here is quick solution by AbacusUtil
这是AbacusUtil 的快速解决方案
IntStream.range(0, Integer.MAX_VALUE).split(size).forEach(s -> N.println(s.toArray()));
Disclaimer:I'm the developer of AbacusUtil.
免责声明:我是 AbacusUtil 的开发者。
回答by rloeffel
The most elegant and pure java 8 solution for this problem i found:
我发现针对这个问题的最优雅、最纯粹的 java 8 解决方案:
public static <T> List<List<T>> partition(final List<T> list, int batchSize) {
return IntStream.range(0, getNumberOfPartitions(list, batchSize))
.mapToObj(i -> list.subList(i * batchSize, Math.min((i + 1) * batchSize, list.size())))
.collect(toList());
}
//https://stackoverflow.com/questions/23246983/get-the-next-higher-integer-value-in-java
private static <T> int getNumberOfPartitions(List<T> list, int batchSize) {
return (list.size() + batchSize- 1) / batchSize;
}
回答by Hei
This is a pure Java solution that's evaluated lazily instead of using List.
这是一个纯粹的 Java 解决方案,它被懒惰地评估而不是使用 List。
public static <T> Stream<List<T>> partition(Stream<T> stream, int batchSize){
List<List<T>> currentBatch = new ArrayList<List<T>>(); //just to make it mutable
currentBatch.add(new ArrayList<T>(batchSize));
return Stream.concat(stream
.sequential()
.map(new Function<T, List<T>>(){
public List<T> apply(T t){
currentBatch.get(0).add(t);
return currentBatch.get(0).size() == batchSize ? currentBatch.set(0,new ArrayList<>(batchSize)): null;
}
}), Stream.generate(()->currentBatch.get(0).isEmpty()?null:currentBatch.get(0))
.limit(1)
).filter(Objects::nonNull);
}
The method returns Stream<List<T>>
for flexibility. You can convert it to Stream<Stream<T>>
easily by partition(something, 10).map(List::stream)
.
该方法返回Stream<List<T>>
以获得灵活性。您可以Stream<Stream<T>>
轻松地将其转换为partition(something, 10).map(List::stream)
.
回答by WarGoth
I found an elegant solution: Iterable parts = Iterables.partition(stream::iterator, size)
我找到了一个优雅的解决方案: Iterable parts = Iterables.partition(stream::iterator, size)
回答by domax
Here is a pure Java 8 solution - both sequential and parallel:
这是一个纯 Java 8 解决方案 - 顺序和并行:
public <T> Collection<List<T>> chunk(Collection<T> collection, int chunkSize) {
final AtomicInteger index = new AtomicInteger();
return collection.stream()
.map(v -> new SimpleImmutableEntry<>(index.getAndIncrement() / chunkSize, v))
// LinkedHashMap is used here just to preserve order
.collect(groupingBy(Entry::getKey, LinkedHashMap::new, mapping(Entry::getValue, toList())))
.values();
}
public <T> Collection<List<T>> chunkParallel(Collection<T> collection, int chunkSize) {
final AtomicInteger index = new AtomicInteger();
return collection.parallelStream()
.map(v -> new SimpleImmutableEntry<>(index.getAndIncrement() / chunkSize, v))
// So far it is parallel processing ordering cannot be preserved,
// but we have to make it thread safe - using e.g. ConcurrentHashMap
.collect(groupingBy(Entry::getKey, ConcurrentHashMap::new, mapping(Entry::getValue, toList())))
.values();
}