java 有没有一种优雅的方式来分块处理流?
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/27583623/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Is there an elegant way to process a stream in chunks?
提问by Bohemian
My exact scenario is inserting data to database in batches, so I want to accumulate DOM objects then every 1000, flush them.
我的确切场景是将数据批量插入数据库,所以我想累积 DOM 对象,然后每 1000 个,刷新它们。
I implemented it by putting code in the accumulator to detect fullness then flush, but that seems wrong - the flush control should come from the caller.
我通过将代码放入累加器来检测充满度然后刷新来实现它,但这似乎是错误的 - 刷新控制应该来自调用者。
I could convert the stream to a List then use subList in an iterative fashion, but that too seems clunky.
我可以将流转换为 List,然后以迭代方式使用 subList,但这似乎也很笨拙。
It there a neat way to take action every n elements then continue with the stream while only processing the stream once?
有没有一种巧妙的方法可以每 n 个元素采取行动,然后继续处理流,同时只处理一次流?
回答by Misha
Elegance is in the eye of the beholder. If you don't mind using a stateful function in groupingBy
, you can do this:
优雅在旁观者的眼中。如果您不介意在 中使用有状态函数groupingBy
,您可以这样做:
AtomicInteger counter = new AtomicInteger();
stream.collect(groupingBy(x->counter.getAndIncrement()/chunkSize))
.values()
.forEach(database::flushChunk);
This doesn't win any performance or memory usage points over your original solution because it will still materialize the entire stream before doing anything.
这不会比您的原始解决方案赢得任何性能或内存使用点,因为它仍然会在执行任何操作之前实现整个流。
If you want to avoid materializing the list, stream API will not help you. You will have to get the stream's iterator or spliterator and do something like this:
如果您想避免物化列表,流 API 将无济于事。您将必须获得流的迭代器或拆分器并执行以下操作:
Spliterator<Integer> split = stream.spliterator();
int chunkSize = 1000;
while(true) {
List<Integer> chunk = new ArrayList<>(size);
for (int i = 0; i < chunkSize && split.tryAdvance(chunk::add); i++){};
if (chunk.isEmpty()) break;
database.flushChunk(chunk);
}
回答by user2814648
If you have guava dependency on your project you could do this:
如果你的项目依赖番石榴,你可以这样做:
StreamSupport.stream(Iterables.partition(simpleList, 1000).spliterator(), false).forEach(...);
回答by Nazarii Bardiuk
Using library StreamExsolution would look like
使用库StreamEx解决方案看起来像
Stream<Integer> stream = IntStream.iterate(0, i -> i + 1).boxed().limit(15);
AtomicInteger counter = new AtomicInteger(0);
int chunkSize = 4;
StreamEx.of(stream)
.groupRuns((prev, next) -> counter.incrementAndGet() % chunkSize != 0)
.forEach(chunk -> System.out.println(chunk));
Output:
输出:
[0, 1, 2, 3]
[4, 5, 6, 7]
[8, 9, 10, 11]
[12, 13, 14]
groupRuns
accepts predicate that decides whether 2 elements should be in the same group.
groupRuns
接受决定 2 个元素是否应该在同一组中的谓词。
It produces a group as soon as it finds first element that does not belong to it.
一旦找到第一个不属于它的元素,它就会生成一个组。
回答by Peter Walser
You can create a stream of chunks(List<T>
) of a stream of itemsand a given chunk sizeby
您可以创建一个数据块的数据流(List<T>
一)的项目流和给定块大小由
- grouping the items by the chunk index (element index / chunk size)
- ordering the chunks by their index
- reducing the map to their ordered elements only
- 按块索引(元素索引/块大小)对项目进行分组
- 按索引对块进行排序
- 仅将地图缩减为其有序元素
Code:
代码:
public static <T> Stream<List<T>> chunked(Stream<T> stream, int chunkSize) {
AtomicInteger index = new AtomicInteger(0);
return stream.collect(Collectors.groupingBy(x -> index.getAndIncrement() / chunkSize))
.entrySet().stream()
.sorted(Map.Entry.comparingByKey()).map(Map.Entry::getValue);
}
Example usage:
用法示例:
Stream<Integer> stream = IntStream.range(0, 100).mapToObj(Integer::valueOf);
Stream<List<Integer>> chunked = chunked(stream, 8);
chunked.forEach(chunk -> System.out.println("Chunk: " + chunk));
Output:
输出:
Chunk: [0, 1, 2, 3, 4, 5, 6, 7]
Chunk: [8, 9, 10, 11, 12, 13, 14, 15]
Chunk: [16, 17, 18, 19, 20, 21, 22, 23]
Chunk: [24, 25, 26, 27, 28, 29, 30, 31]
Chunk: [32, 33, 34, 35, 36, 37, 38, 39]
Chunk: [40, 41, 42, 43, 44, 45, 46, 47]
Chunk: [48, 49, 50, 51, 52, 53, 54, 55]
Chunk: [56, 57, 58, 59, 60, 61, 62, 63]
Chunk: [64, 65, 66, 67, 68, 69, 70, 71]
Chunk: [72, 73, 74, 75, 76, 77, 78, 79]
Chunk: [80, 81, 82, 83, 84, 85, 86, 87]
Chunk: [88, 89, 90, 91, 92, 93, 94, 95]
Chunk: [96, 97, 98, 99]
回答by findusl
As Misha rightfully said, Elegance is in the eye of the beholder. I personally think an elegant solution would be to let the class that inserts to the database do this task. Similar to a BufferedWriter
. This way it does not depend on your original data structure and can be used even with multiple streams after one and another. I am not sure if this is exactly what you mean by having the code in the accumulator which you thought is wrong. I don't think it is wrong, since the existing classes like BufferedWriter
work this way. You have some flush control from the caller this way by calling flush()
on the writer at any point.
正如米沙所说,优雅在旁观者的眼中。我个人认为一个优雅的解决方案是让插入到数据库的类来完成这项任务。类似于BufferedWriter
. 通过这种方式,它不依赖于您的原始数据结构,甚至可以与多个流一个接一个地使用。我不确定这是否正是您在累加器中包含您认为错误的代码的意思。我不认为这是错误的,因为现有的类就像这样BufferedWriter
工作。通过flush()
在任何时候调用编写器,您可以通过这种方式从调用者那里获得一些刷新控制。
Something like the following code.
类似于以下代码。
class BufferedDatabaseWriter implements Flushable {
List<DomObject> buffer = new LinkedList<DomObject>();
public void write(DomObject o) {
buffer.add(o);
if(buffer.length > 1000)
flush();
}
public void flush() {
//write buffer to database and clear it
}
}
Now your stream gets processed like this:
现在你的流被这样处理:
BufferedDatabaseWriter writer = new BufferedDatabaseWriter();
stream.forEach(o -> writer.write(o));
//if you have more streams stream2.forEach(o -> writer.write(o));
writer.flush();
If you want to work multithreaded, you could run the flush asynchronous. The taking from the stream can't go in parallel but I don't think there is a way to count 1000 elements from a stream in parallel anyway.
如果你想多线程工作,你可以运行刷新异步。从流中获取不能并行进行,但我认为无论如何都没有办法从流中并行计算 1000 个元素。
You can also extend the writer to allow setting of the buffer size in constructor or you can make it implement AutoCloseable
and run it in a try with ressources and more. The nice things you have from a BufferedWriter
.
您还可以扩展编写器以允许在构造函数中设置缓冲区大小,或者您可以AutoCloseable
使用资源等尝试实现并运行它。您从BufferedWriter
.
回答by Yura
Look's like no, cause creating chunks means reducing stream, and reduce means termination. If you need to maintain stream nature and process chunks without collecting all data before here is my code (does not work for parallel streams):
看起来不是,因为创建块意味着减少流,减少意味着终止。如果您需要在我的代码(不适用于并行流)之前在不收集所有数据的情况下保持流性质和处理块:
private static <T> BinaryOperator<List<T>> processChunks(Consumer<List<T>> consumer, int chunkSize) {
return (data, element) -> {
if (data.size() < chunkSize) {
data.addAll(element);
return data;
} else {
consumer.accept(data);
return element; // in fact it's new data list
}
};
}
private static <T> Function<T, List<T>> createList(int chunkSize) {
AtomicInteger limiter = new AtomicInteger(0);
return element -> {
limiter.incrementAndGet();
if (limiter.get() == 1) {
ArrayList<T> list = new ArrayList<>(chunkSize);
list.add(element);
return list;
} else if (limiter.get() == chunkSize) {
limiter.set(0);
}
return Collections.singletonList(element);
};
}
and how to use
以及如何使用
Consumer<List<Integer>> chunkProcessor = (list) -> list.forEach(System.out::println);
int chunkSize = 3;
Stream.generate(StrTokenizer::getInt).limit(13)
.map(createList(chunkSize))
.reduce(processChunks(chunkProcessor, chunkSize))
.ifPresent(chunkProcessor);
static Integer i = 0;
static Integer getInt()
{
System.out.println("next");
return i++;
}
it will print
它会打印
next next next next 0 1 2 next next next 3 4 5 next next next 6 7 8 next next next 9 10 11 12
next next next next 0 1 2 next next next 3 4 5 next next next 6 7 8 next next next 9 10 11 12
the idea behind is to create lists in a map operation with 'pattern'
背后的想法是使用“模式”在地图操作中创建列表
[1,,],[2],[3],[4,,]...
[1,,],[2],[3],[4,,]...
and merge (+process) that with reduce.
并将其与reduce合并(+处理)。
[1,2,3],[4,5,6],...
[1,2,3],[4,5,6],...
and don't forget to process the last 'trimmed' chunk with
并且不要忘记处理最后一个“修剪”的块
.ifPresent(chunkProcessor);
回答by dmitryvim
Most of answers above do not use stream benefits like saving your memory. You can try to use iterator to resolve the problem
上面的大多数答案都没有使用像节省内存这样的流好处。可以尝试使用迭代器来解决问题
Stream<List<T>> chunk(Stream<T> stream, int size) {
Iterator<T> iterator = stream.iterator();
Iterator<List<T>> listIterator = new Iterator<>() {
public boolean hasNext() {
return iterator.hasNext();
}
public List<T> next() {
List<T> result = new ArrayList<>(size);
for (int i = 0; i < size && iterator.hasNext(); i++) {
result.add(iterator.next());
}
return result;
}
};
return StreamSupport.stream(((Iterable<List<T>>) () -> listIterator).spliterator(), false);
}