Java 复制流以避免“流已被操作或关闭”
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/23860533/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Copy a stream to avoid "stream has already been operated upon or closed"
提问by Toby
I'd like to duplicate a Java 8 stream so that I can deal with it twice. I can collect
as a list and get new streams from that;
我想复制一个 Java 8 流,以便我可以处理它两次。我可以collect
作为一个列表并从中获取新的流;
// doSomething() returns a stream
List<A> thing = doSomething().collect(toList());
thing.stream()... // do stuff
thing.stream()... // do other stuff
But I kind of think there should be a more efficient/elegant way.
但我认为应该有一种更有效/更优雅的方式。
Is there a way to copy the stream without turning it into a collection?
有没有办法复制流而不将其转换为集合?
I'm actually working with a stream of Either
s, so want to process the left projection one way before moving onto the right projection and dealing with that another way. Kind of like this (which, so far, I'm forced to use the toList
trick with).
我实际上正在处理一个Either
s流,所以想在移动到右投影并以另一种方式处理之前以一种方式处理左投影。有点像这样(到目前为止,我不得不使用这个toList
技巧)。
List<Either<Pair<A, Throwable>, A>> results = doSomething().collect(toList());
Stream<Pair<A, Throwable>> failures = results.stream().flatMap(either -> either.left());
failures.forEach(failure -> ... );
Stream<A> successes = results.stream().flatMap(either -> either.right());
successes.forEach(success -> ... );
回答by assylias
You could create a stream of runnables (for example):
您可以创建一个可运行的流(例如):
results.stream()
.flatMap(either -> Stream.<Runnable> of(
() -> failure(either.left()),
() -> success(either.right())))
.forEach(Runnable::run);
Where failure
and success
are the operations to apply. This will however create quite a few temporary objects and may not be more efficient than starting from a collection and streaming/iterating it twice.
要应用的操作在哪里failure
和success
在哪里。然而,这将创建相当多的临时对象,并且可能不会比从集合开始并对其进行流/迭代两次更有效。
回答by Brian Goetz
I think your assumption about efficiency is kind of backwards. You get this huge efficiency payback if you're only going to use the data once, because you don't have to store it, and streams give you powerful "loop fusion" optimizations that let you flow the whole data efficiently through the pipeline.
我认为你对效率的假设有点倒退。如果您只使用一次数据,您将获得巨大的效率回报,因为您不必存储它,并且流为您提供强大的“循环融合”优化,让您可以有效地通过管道传输整个数据。
If you want to re-use the same data, then by definition you either have to generate it twice (deterministically) or store it. If it already happens to be in a collection, great; then iterating it twice is cheap.
如果您想重复使用相同的数据,那么根据定义,您必须(确定性地)生成它两次或存储它。如果它已经出现在一个集合中,那太好了;然后迭代两次很便宜。
We did experiment in the design with "forked streams". What we found was that supporting this had real costs; it burdened the common case (use once) at the expense of the uncommon case. The big problem was dealing with "what happens when the two pipelines don't consume data at the same rate." Now you're back to buffering anyway. This was a feature that clearly didn't carry its weight.
我们在“分叉流”的设计中做了实验。我们发现支持这一点是有实际成本的;它以不常见的情况为代价增加了普通情况(使用一次)的负担。最大的问题是处理“当两条管道以不同的速率消耗数据时会发生什么”。现在你又回到了缓冲状态。这是一个显然没有重量的功能。
If you want to operate on the same data repeatedly, either store it, or structure your operations as Consumers and do the following:
如果您想重复操作相同的数据,要么存储它,要么将您的操作构建为消费者并执行以下操作:
stream()...stuff....forEach(e -> { consumerA(e); consumerB(e); });
You might also look into the RxJava library, as its processing model lends itself better to this kind of "stream forking".
您还可以查看 RxJava 库,因为它的处理模型更适合这种“流分叉”。
回答by Lukas Eder
We've implemented a duplicate()
method for streams in jOOλ, an Open Source library that we created to improve integration testing for jOOQ. Essentially, you can just write:
我们已经duplicate()
在jOOλ 中实现了一种流方法,这是一个我们创建的开源库,用于改进jOOQ 的集成测试。本质上,你可以只写:
Tuple2<Seq<A>, Seq<A>> duplicates = Seq.seq(doSomething()).duplicate();
Internally, there is a buffer storing all values that have been consumed from one stream but not from the other. That's probably as efficient as it gets if your two streams are consumed about at the same rate, and if you can live with the lack of thread-safety.
在内部,有一个缓冲区存储已从一个流但未从另一个流消耗的所有值。如果您的两个流以大致相同的速度消耗,并且您可以忍受缺乏 thread-safety ,那么这可能与它的效率一样高。
Here's how the algorithm works:
以下是算法的工作原理:
static <T> Tuple2<Seq<T>, Seq<T>> duplicate(Stream<T> stream) {
final List<T> gap = new LinkedList<>();
final Iterator<T> it = stream.iterator();
@SuppressWarnings("unchecked")
final Iterator<T>[] ahead = new Iterator[] { null };
class Duplicate implements Iterator<T> {
@Override
public boolean hasNext() {
if (ahead[0] == null || ahead[0] == this)
return it.hasNext();
return !gap.isEmpty();
}
@Override
public T next() {
if (ahead[0] == null)
ahead[0] = this;
if (ahead[0] == this) {
T value = it.next();
gap.offer(value);
return value;
}
return gap.poll();
}
}
return tuple(seq(new Duplicate()), seq(new Duplicate()));
}
Tuple2
is probably like your Pair
type, whereas Seq
is Stream
with some enhancements.
Tuple2
大概是喜欢你的Pair
类型,而Seq
为Stream
一些增强功能。
回答by user4975679
You can use a local variable with a Supplier
to set up common parts of the stream pipeline.
您可以使用带有 的局部变量Supplier
来设置流管道的公共部分。
From http://winterbe.com/posts/2014/07/31/java8-stream-tutorial-examples/:
从http://winterbe.com/posts/2014/07/31/java8-stream-tutorial-examples/:
Reusing Streams
Java 8 streams cannot be reused. As soon as you call any terminal operation the stream is closed:
Stream<String> stream = Stream.of("d2", "a2", "b1", "b3", "c") .filter(s -> s.startsWith("a")); stream.anyMatch(s -> true); // ok stream.noneMatch(s -> true); // exception Calling `noneMatch` after `anyMatch` on the same stream results in the following exception: java.lang.IllegalStateException: stream has already been operated upon or closed at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:229) at java.util.stream.ReferencePipeline.noneMatch(ReferencePipeline.java:459) at com.winterbe.java8.Streams5.test7(Streams5.java:38) at com.winterbe.java8.Streams5.main(Streams5.java:28)
To overcome this limitation we have to to create a new stream chain for every terminal operation we want to execute, e.g. we could create a stream supplier to construct a new stream with all intermediate operations already set up:
Supplier<Stream<String>> streamSupplier = () -> Stream.of("d2", "a2", "b1", "b3", "c") .filter(s -> s.startsWith("a")); streamSupplier.get().anyMatch(s -> true); // ok streamSupplier.get().noneMatch(s -> true); // ok
Each call to
get()
constructs a new stream on which we are save to call the desired terminal operation.
重用流
Java 8 流不能重用。只要您调用任何终端操作,流就会关闭:
Stream<String> stream = Stream.of("d2", "a2", "b1", "b3", "c") .filter(s -> s.startsWith("a")); stream.anyMatch(s -> true); // ok stream.noneMatch(s -> true); // exception Calling `noneMatch` after `anyMatch` on the same stream results in the following exception: java.lang.IllegalStateException: stream has already been operated upon or closed at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:229) at java.util.stream.ReferencePipeline.noneMatch(ReferencePipeline.java:459) at com.winterbe.java8.Streams5.test7(Streams5.java:38) at com.winterbe.java8.Streams5.main(Streams5.java:28)
为了克服这个限制,我们必须为我们想要执行的每个终端操作创建一个新的流链,例如我们可以创建一个流供应商来构造一个新的流,其中所有中间操作都已经设置:
Supplier<Stream<String>> streamSupplier = () -> Stream.of("d2", "a2", "b1", "b3", "c") .filter(s -> s.startsWith("a")); streamSupplier.get().anyMatch(s -> true); // ok streamSupplier.get().noneMatch(s -> true); // ok
每次调用都会
get()
构造一个新的流,我们将保存在该流上以调用所需的终端操作。
回答by Martin
Another way to handle the elements multiple times is to use Stream.peek(Consumer):
多次处理元素的另一种方法是使用Stream.peek(Consumer):
doSomething().stream()
.peek(either -> handleFailure(either.left()))
.foreach(either -> handleSuccess(either.right()));
peek(Consumer)
can be chained as many times as needed.
peek(Consumer)
可以根据需要链接多次。
doSomething().stream()
.peek(element -> handleFoo(element.foo()))
.peek(element -> handleBar(element.bar()))
.peek(element -> handleBaz(element.baz()))
.foreach(element-> handleQux(element.qux()));
回答by John McClean
cyclops-react, a library I contribute to, has a static method that will allow you duplicate a Stream (and returns a jOOλ Tuple of Streams).
cyclops-react是我贡献的一个库,它有一个静态方法,可以让你复制一个流(并返回一个 jOOλ 流元组)。
Stream<Integer> stream = Stream.of(1,2,3);
Tuple2<Stream<Integer>,Stream<Integer>> streams = StreamUtils.duplicate(stream);
See comments, there is performance penalty that will be incurred when using duplicate on an existing Stream. A more performant alternative would be to use Streamable :-
请参阅评论,在现有 Stream 上使用重复项会导致性能损失。一个更高效的替代方法是使用 Streamable :-
There is also a (lazy) Streamable class that can be constructed from a Stream, Iterable or Array and replayed multiple times.
还有一个(懒惰的)Streamable 类,可以从 Stream、Iterable 或 Array 构造并重放多次。
Streamable<Integer> streamable = Streamable.of(1,2,3);
streamable.stream().forEach(System.out::println);
streamable.stream().forEach(System.out::println);
AsStreamable.synchronizedFromStream(stream) - can be used to create a Streamable that will lazily populate it's backing collection, in a way such that can be shared across threads. Streamable.fromStream(stream) will not incur any synchronization overhead.
AsStreamable.synchronizedFromStream(stream) - 可用于创建一个 Streamable ,它将以一种可以跨线程共享的方式延迟填充它的支持集合。Streamable.fromStream(stream) 不会产生任何同步开销。
回答by Lubomir Varga
For this particular problem you can use also partitioning. Something like
对于这个特殊问题,您还可以使用分区。就像是
// Partition Eighters into left and right
List<Either<Pair<A, Throwable>, A>> results = doSomething();
Map<Boolean, Object> passingFailing = results.collect(Collectors.partitioningBy(s -> s.isLeft()));
passingFailing.get(true) <- here will be all passing (left values)
passingFailing.get(false) <- here will be all failing (right values)
回答by Rams
Use a Supplier
to produce the stream for each termination operation.
使用 aSupplier
为每个终止操作生成流。
Supplier<Stream<Integer>> streamSupplier = () -> list.stream();
Whenever you need a stream of that collection,
use streamSupplier.get()
to get a new stream.
每当您需要该集合的流时,请使用streamSupplier.get()
获取新流。
Examples:
例子:
streamSupplier.get().anyMatch(predicate);
streamSupplier.get().allMatch(predicate2);
streamSupplier.get().anyMatch(predicate);
streamSupplier.get().allMatch(predicate2);
回答by Lokesh Singal
We can make use of Stream Builder at the time of reading or iterating a stream. Here's the document of Stream Builder.
我们可以在读取或迭代流时使用 Stream Builder。这是Stream Builder的文档。
https://docs.oracle.com/javase/8/docs/api/java/util/stream/Stream.Builder.html
https://docs.oracle.com/javase/8/docs/api/java/util/stream/Stream.Builder.html
Use case
用例
Let's say we have employee stream and we need to use this stream to write employee data in excel file and then update the employee collection/table [This is just use case to show the use of Stream Builder]:
假设我们有员工流,我们需要使用此流将员工数据写入 excel 文件中,然后更新员工集合/表 [这只是展示 Stream Builder 使用的用例]:
Stream.Builder<Employee> builder = Stream.builder();
employee.forEach( emp -> {
//store employee data to excel file
// and use the same object to build the stream.
builder.add(emp);
});
//Now this stream can be used to update the employee collection
Stream<Employee> newStream = builder.build();
回答by Jeremy Hicks
I had a similar problem, and could think of three different intermediate structures from which to create a copy of the stream: a List
, an array and a Stream.Builder
. I wrote a little benchmark program, which suggested that from a performance point of view the List
was about 30% slower than the other two which were fairly similar.
我有一个类似的问题,可以想到三种不同的中间结构来创建流的副本: a List
、一个数组和 a Stream.Builder
。我写了一个小基准程序,它表明从性能的角度来看,List
它比其他两个相当相似的程序慢了大约 30%。
The only drawback of converting to an array is that it is tricky if your element type is a generic type (which in my case it was); therefore I prefer to use a Stream.Builder
.
转换为数组的唯一缺点是,如果您的元素类型是泛型类型(在我的情况下是),则它很棘手;因此我更喜欢使用Stream.Builder
.
I ended up writing a little function that creates a Collector
:
我最终编写了一个小函数来创建一个Collector
:
private static <T> Collector<T, Stream.Builder<T>, Stream<T>> copyCollector()
{
return Collector.of(Stream::builder, Stream.Builder::add, (b1, b2) -> {
b2.build().forEach(b1);
return b1;
}, Stream.Builder::build);
}
I can then make a copy of any stream str
by doing str.collect(copyCollector())
which feels quite in keeping with the idiomatic usage of streams.
然后我可以str
通过这样做str.collect(copyCollector())
来制作任何流的副本,这感觉非常符合流的惯用用法。