带批处理的 Java 8 Stream
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/30641383/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Java 8 Stream with batch processing
提问by Andy Dang
I have a large file that contains a list of items.
我有一个包含项目列表的大文件。
I would like to create a batch of items, make an HTTP request with this batch (all of the items are needed as parameters in the HTTP request). I can do it very easily with a for
loop, but as Java 8 lover, I want to try writing this with Java 8's Stream framework (and reap the benefits of lazy processing).
我想创建一批项目,使用该批次发出 HTTP 请求(所有项目都需要作为 HTTP 请求中的参数)。我可以很容易地使用for
循环来完成,但作为 Java 8 爱好者,我想尝试使用 Java 8 的 Stream 框架来编写它(并获得延迟处理的好处)。
Example:
例子:
List<String> batch = new ArrayList<>(BATCH_SIZE);
for (int i = 0; i < data.size(); i++) {
batch.add(data.get(i));
if (batch.size() == BATCH_SIZE) process(batch);
}
if (batch.size() > 0) process(batch);
I want to do something a long the line of
lazyFileStream.group(500).map(processBatch).collect(toList())
我想做一些很长的事情
lazyFileStream.group(500).map(processBatch).collect(toList())
What would be the best way to do this?
什么是最好的方法来做到这一点?
采纳答案by Lukas Eder
Note! This solution reads the whole file before running the forEach.
笔记!此解决方案在运行 forEach 之前读取整个文件。
You could do it with jOOλ, a library that extends Java 8 streams for single-threaded, sequential stream use-cases:
您可以使用jOOλ 来实现,这是一个为单线程、顺序流用例扩展 Java 8 流的库:
Seq.seq(lazyFileStream) // Seq<String>
.zipWithIndex() // Seq<Tuple2<String, Long>>
.groupBy(tuple -> tuple.v2 / 500) // Map<Long, List<String>>
.forEach((index, batch) -> {
process(batch);
});
Behind the scenes, zipWithIndex()
is just:
在幕后,zipWithIndex()
只是:
static <T> Seq<Tuple2<T, Long>> zipWithIndex(Stream<T> stream) {
final Iterator<T> it = stream.iterator();
class ZipWithIndex implements Iterator<Tuple2<T, Long>> {
long index;
@Override
public boolean hasNext() {
return it.hasNext();
}
@Override
public Tuple2<T, Long> next() {
return tuple(it.next(), index++);
}
}
return seq(new ZipWithIndex());
}
... whereas groupBy()
is API convenience for:
...而groupBy()
API 方便的是:
default <K> Map<K, List<T>> groupBy(Function<? super T, ? extends K> classifier) {
return collect(Collectors.groupingBy(classifier));
}
(Disclaimer: I work for the company behind jOOλ)
(免责声明:我为 jOOλ 背后的公司工作)
回答by Tagir Valeev
Pure Java-8 implementation is also possible:
纯 Java-8 实现也是可能的:
int BATCH = 500;
IntStream.range(0, (data.size()+BATCH-1)/BATCH)
.mapToObj(i -> data.subList(i*BATCH, Math.min(data.size(), (i+1)*BATCH)))
.forEach(batch -> process(batch));
Note that unlike JOOl it can work nicely in parallel (provided that your data
is a random access list).
请注意,与 JOOl 不同的是,它可以很好地并行工作(前提是您data
是随机访问列表)。
回答by John McClean
You could also take a look at cyclops-react, I am the author of this library. It implements the jOOλ interface (and by extension JDK 8 Streams), but unlike JDK 8 Parallel Streams it has a focus on Asynchronous operations (such as potentially blocking Async I/O calls). JDK Parallel Streams, by contrast focus on data parallelism for CPU bound operations. It works by managing aggregates of Future based tasks under the hood, but presents a standard extended Stream API to end users.
你也可以看看cyclops-react,我是这个库的作者。它实现了 jOOλ 接口(以及 JDK 8 Streams 的扩展),但与 JDK 8 Parallel Streams 不同,它专注于异步操作(例如可能阻塞异步 I/O 调用)。相比之下,JDK Parallel Streams 专注于 CPU 绑定操作的数据并行性。它通过在后台管理基于 Future 的任务的聚合来工作,但向最终用户提供标准的扩展 Stream API。
This sample code may help you get started
此示例代码可能会帮助您入门
LazyFutureStream.parallelCommonBuilder()
.react(data)
.grouped(BATCH_SIZE)
.map(this::process)
.run();
There is a tutorial on batching here
And a more general Tutorial here
和一个更一般的教程在这里
To use your own Thread Pool (which is probably more appropriate for blocking I/O), you could start processing with
要使用您自己的线程池(这可能更适合阻塞 I/O),您可以开始处理
LazyReact reactor = new LazyReact(40);
reactor.react(data)
.grouped(BATCH_SIZE)
.map(this::process)
.run();
回答by frhack
回答by Ben Manes
For completeness, here is a Guavasolution.
Iterators.partition(stream.iterator(), batchSize).forEachRemaining(this::process);
In the question the collection is available so a stream isn't needed and it can be written as,
在这个问题中,集合可用,因此不需要流,它可以写为,
Iterables.partition(data, batchSize).forEach(this::process);
回答by rohitvats
Pure Java 8 solution:
纯 Java 8 解决方案:
We can create a custom collector to do this elegantly, which takes in a batch size
and a Consumer
to process each batch:
我们可以创建一个自定义收集器来优雅地执行此操作,它接受 abatch size
和 aConsumer
来处理每个批次:
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.function.*;
import java.util.stream.Collector;
import static java.util.Objects.requireNonNull;
/**
* Collects elements in the stream and calls the supplied batch processor
* after the configured batch size is reached.
*
* In case of a parallel stream, the batch processor may be called with
* elements less than the batch size.
*
* The elements are not kept in memory, and the final result will be an
* empty list.
*
* @param <T> Type of the elements being collected
*/
class BatchCollector<T> implements Collector<T, List<T>, List<T>> {
private final int batchSize;
private final Consumer<List<T>> batchProcessor;
/**
* Constructs the batch collector
*
* @param batchSize the batch size after which the batchProcessor should be called
* @param batchProcessor the batch processor which accepts batches of records to process
*/
BatchCollector(int batchSize, Consumer<List<T>> batchProcessor) {
batchProcessor = requireNonNull(batchProcessor);
this.batchSize = batchSize;
this.batchProcessor = batchProcessor;
}
public Supplier<List<T>> supplier() {
return ArrayList::new;
}
public BiConsumer<List<T>, T> accumulator() {
return (ts, t) -> {
ts.add(t);
if (ts.size() >= batchSize) {
batchProcessor.accept(ts);
ts.clear();
}
};
}
public BinaryOperator<List<T>> combiner() {
return (ts, ots) -> {
// process each parallel list without checking for batch size
// avoids adding all elements of one to another
// can be modified if a strict batching mode is required
batchProcessor.accept(ts);
batchProcessor.accept(ots);
return Collections.emptyList();
};
}
public Function<List<T>, List<T>> finisher() {
return ts -> {
batchProcessor.accept(ts);
return Collections.emptyList();
};
}
public Set<Characteristics> characteristics() {
return Collections.emptySet();
}
}
Optionally then create a helper utility class:
(可选)然后创建一个辅助实用程序类:
import java.util.List;
import java.util.function.Consumer;
import java.util.stream.Collector;
public class StreamUtils {
/**
* Creates a new batch collector
* @param batchSize the batch size after which the batchProcessor should be called
* @param batchProcessor the batch processor which accepts batches of records to process
* @param <T> the type of elements being processed
* @return a batch collector instance
*/
public static <T> Collector<T, List<T>, List<T>> batchCollector(int batchSize, Consumer<List<T>> batchProcessor) {
return new BatchCollector<T>(batchSize, batchProcessor);
}
}
Example usage:
用法示例:
List<Integer> input = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
List<Integer> output = new ArrayList<>();
int batchSize = 3;
Consumer<List<Integer>> batchProcessor = xs -> output.addAll(xs);
input.stream()
.collect(StreamUtils.batchCollector(batchSize, batchProcessor));
I've posted my code on GitHub as well, if anyone wants to take a look:
我也在 GitHub 上发布了我的代码,如果有人想看一看:
回答by Bruce Hamilton
I wrote a custom Spliterator for scenarios like this. It will fill lists of a given size from the input Stream. The advantage of this approach is that it will perform lazy processing, and it will work with other stream functions.
我为这样的场景编写了一个自定义的 Spliterator。它将从输入流中填充给定大小的列表。这种方法的优点是它将执行延迟处理,并且可以与其他流函数一起使用。
public static <T> Stream<List<T>> batches(Stream<T> stream, int batchSize) {
return batchSize <= 0
? Stream.of(stream.collect(Collectors.toList()))
: StreamSupport.stream(new BatchSpliterator<>(stream.spliterator(), batchSize), stream.isParallel());
}
private static class BatchSpliterator<E> implements Spliterator<List<E>> {
private final Spliterator<E> base;
private final int batchSize;
public BatchSpliterator(Spliterator<E> base, int batchSize) {
this.base = base;
this.batchSize = batchSize;
}
@Override
public boolean tryAdvance(Consumer<? super List<E>> action) {
final List<E> batch = new ArrayList<>(batchSize);
for (int i=0; i < batchSize && base.tryAdvance(batch::add); i++)
;
if (batch.isEmpty())
return false;
action.accept(batch);
return true;
}
@Override
public Spliterator<List<E>> trySplit() {
if (base.estimateSize() <= batchSize)
return null;
final Spliterator<E> splitBase = this.base.trySplit();
return splitBase == null ? null
: new BatchSpliterator<>(splitBase, batchSize);
}
@Override
public long estimateSize() {
final double baseSize = base.estimateSize();
return baseSize == 0 ? 0
: (long) Math.ceil(baseSize / (double) batchSize);
}
@Override
public int characteristics() {
return base.characteristics();
}
}
回答by Ashley Frieze
We had a similar problem to solve. We wanted to take a stream that was larger than system memory (iterating through all objects in a database) and randomise the order as best as possible - we thought it would be ok to buffer 10,000 items and randomise them.
我们有一个类似的问题需要解决。我们想要一个大于系统内存的流(遍历数据库中的所有对象)并尽可能随机化顺序——我们认为缓冲 10,000 个项目并随机化它们是可以的。
The target was a function which took in a stream.
目标是一个接收流的函数。
Of the solutions proposed here, there seem to be a range of options:
在这里提出的解决方案中,似乎有多种选择:
- Use various non-java 8 additional libraries
- Start with something that's not a stream - e.g. a random access list
- Have a stream which can be split easily in a spliterator
- 使用各种非 java 8 附加库
- 从不是流的东西开始 - 例如随机访问列表
- 有一个可以在拆分器中轻松拆分的流
Our instinct was originally to use a custom collector, but this meant dropping out of streaming. The custom collector solution above is very good and we nearly used it.
我们的本能最初是使用自定义收集器,但这意味着退出流媒体。上面的自定义收集器解决方案非常好,我们几乎使用了它。
Here's a solution which cheats by using the fact that Stream
s can give you an Iterator
which you can use as an escape hatchto let you do something extra that streams don't support. The Iterator
is converted back to a stream using another bit of Java 8 StreamSupport
sorcery.
这是一个解决方案,它利用Stream
s 可以为您提供一个事实来欺骗您Iterator
,您可以将其用作逃生舱门,让您做一些流不支持的额外事情。Iterator
使用另一位 Java 8StreamSupport
法术将转换回流。
/**
* An iterator which returns batches of items taken from another iterator
*/
public class BatchingIterator<T> implements Iterator<List<T>> {
/**
* Given a stream, convert it to a stream of batches no greater than the
* batchSize.
* @param originalStream to convert
* @param batchSize maximum size of a batch
* @param <T> type of items in the stream
* @return a stream of batches taken sequentially from the original stream
*/
public static <T> Stream<List<T>> batchedStreamOf(Stream<T> originalStream, int batchSize) {
return asStream(new BatchingIterator<>(originalStream.iterator(), batchSize));
}
private static <T> Stream<T> asStream(Iterator<T> iterator) {
return StreamSupport.stream(
Spliterators.spliteratorUnknownSize(iterator,ORDERED),
false);
}
private int batchSize;
private List<T> currentBatch;
private Iterator<T> sourceIterator;
public BatchingIterator(Iterator<T> sourceIterator, int batchSize) {
this.batchSize = batchSize;
this.sourceIterator = sourceIterator;
}
@Override
public boolean hasNext() {
prepareNextBatch();
return currentBatch!=null && !currentBatch.isEmpty();
}
@Override
public List<T> next() {
return currentBatch;
}
private void prepareNextBatch() {
currentBatch = new ArrayList<>(batchSize);
while (sourceIterator.hasNext() && currentBatch.size() < batchSize) {
currentBatch.add(sourceIterator.next());
}
}
}
A simple example of using this would look like this:
使用它的一个简单示例如下所示:
@Test
public void getsBatches() {
BatchingIterator.batchedStreamOf(Stream.of("A","B","C","D","E","F"), 3)
.forEach(System.out::println);
}
The above prints
以上印
[A, B, C]
[D, E, F]
For our use case, we wanted to shuffle the batches and then keep them as a stream - it looked like this:
对于我们的用例,我们想对批次进行洗牌,然后将它们作为流保留 - 它看起来像这样:
@Test
public void howScramblingCouldBeDone() {
BatchingIterator.batchedStreamOf(Stream.of("A","B","C","D","E","F"), 3)
// the lambda in the map expression sucks a bit because Collections.shuffle acts on the list, rather than returning a shuffled one
.map(list -> {
Collections.shuffle(list); return list; })
.flatMap(List::stream)
.forEach(System.out::println);
}
This outputs something like (it's randomised, so different every time)
这会输出类似的东西(它是随机的,每次都不同)
A
C
B
E
D
F
The secret sauce here is that there's always a stream, so you can either operate on a stream of batches, or do something to each batch and then flatMap
it back to a stream. Even better, all of the above only runs as the final forEach
or collect
or other terminating expressions PULLthe data through the stream.
这里的秘诀是总是有一个流,所以你可以对一个批次的流进行操作,或者对每个批次做一些事情,然后再flatMap
回到一个流。更好的是,上述所有的只运行作为最终forEach
或collect
或其他终止表达式PULL通过流中的数据。
It turns out that iterator
is a special type of terminating operationon a stream and does not cause the whole stream to run and come into memory! Thanks to the Java 8 guys for a brilliant design!
事实证明,这iterator
是对流的一种特殊类型的终止操作,不会导致整个流运行并进入内存!感谢 Java 8 人员的出色设计!
回答by rhinmass
Simple example using Spliterator
使用 Spliterator 的简单示例
// read file into stream, try-with-resources
try (Stream<String> stream = Files.lines(Paths.get(fileName))) {
//skip header
Spliterator<String> split = stream.skip(1).spliterator();
Chunker<String> chunker = new Chunker<String>();
while(true) {
boolean more = split.tryAdvance(chunker::doSomething);
if (!more) {
break;
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
static class Chunker<T> {
int ct = 0;
public void doSomething(T line) {
System.out.println(ct++ + " " + line.toString());
if (ct % 100 == 0) {
System.out.println("====================chunk=====================");
}
}
}
Bruce's answer is more comprehensive, but I was looking for something quick and dirty to process a bunch of files.
布鲁斯的回答更全面,但我一直在寻找快速而肮脏的东西来处理一堆文件。
回答by Nicolas Lacombe
Pure Java 8 example that works with parallel streams as well.
也适用于并行流的纯 Java 8 示例。
How to use:
如何使用:
Stream<Integer> integerStream = IntStream.range(0, 45).parallel().boxed();
CsStreamUtil.processInBatch(integerStream, 10, batch -> System.out.println("Batch: " + batch));
The method declaration and implementation:
方法声明和实现:
public static <ElementType> void processInBatch(Stream<ElementType> stream, int batchSize, Consumer<Collection<ElementType>> batchProcessor)
{
List<ElementType> newBatch = new ArrayList<>(batchSize);
stream.forEach(element -> {
List<ElementType> fullBatch;
synchronized (newBatch)
{
if (newBatch.size() < batchSize)
{
newBatch.add(element);
return;
}
else
{
fullBatch = new ArrayList<>(newBatch);
newBatch.clear();
newBatch.add(element);
}
}
batchProcessor.accept(fullBatch);
});
if (newBatch.size() > 0)
batchProcessor.accept(new ArrayList<>(newBatch));
}