Java 你能把一个流分成两个流吗?

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/19940319/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-12 21:42:20  来源:igfitidea点击:

Can you split a stream into two streams?

javajava-8java-stream

提问by user1148758

I have a data set represented by a Java 8 stream:

我有一个由 Java 8 流表示的数据集:

Stream<T> stream = ...;

I can see how to filter it to get a random subset - for example

我可以看到如何过滤它以获得随机子集 - 例如

Random r = new Random();
PrimitiveIterator.OfInt coin = r.ints(0, 2).iterator();   
Stream<T> heads = stream.filter((x) -> (coin.nextInt() == 0));

I can also see how I could reduce this stream to get, for example, two lists representing two random halves of the data set, and then turn those back into streams. But, is there a direct way to generate two streams from the initial one? Something like

我还可以看到我如何减少这个流以获得,例如,代表数据集的两个随机一半的两个列表,然后将它们转换回流。但是,有没有一种直接的方法可以从初始流生成两个流?就像是

(heads, tails) = stream.[some kind of split based on filter]

Thanks for any insight.

感谢您的任何见解。

采纳答案by Louis Wasserman

Not exactly. You can't get two Streams out of one; this doesn't make sense -- how would you iterate over one without needing to generate the other at the same time? A stream can only be operated over once.

不完全是。你不能Stream从一个中得到两个;这没有意义——你将如何迭代一个而不需要同时生成另一个?一个流只能操作一次。

However, if you want to dump them into a list or something, you could do

但是,如果您想将它们转储到列表或其他内容中,您可以这样做

stream.forEach((x) -> ((x == 0) ? heads : tails).add(x));

回答by ZhongYu

This is against the general mechanism of Stream. Say you can split Stream S0 to Sa and Sb like you wanted. Performing any terminal operation, say count(), on Sa will necessarily "consume" all elements in S0. Therefore Sb lost its data source.

这违背了 Stream 的一般机制。假设您可以根据需要将流 S0 拆分为 Sa 和 Sb。count()对 Sa执行任何终端操作,例如,都将必然“消耗”S0 中的所有元素。因此 Sb 丢失了它的数据源。

Previously, Stream had a tee()method, I think, which duplicate a stream to two. It's removed now.

以前,Stream 有一种tee()方法,我认为,可以将一个流复制为两个。现在已经删除了。

Stream has a peek() method though, you might be able to use it to achieve your requirements.

Stream 有一个 peek() 方法,您可以使用它来满足您的要求。

回答by Trevor Freeman

Unfortunately, what you ask for is directly frowned upon in the JavaDoc of Stream:

不幸的是,您要求的内容在StreamJavaDoc 中被直接反对:

A stream should be operated on (invoking an intermediate or terminal stream operation) only once. This rules out, for example, "forked" streams, where the same source feeds two or more pipelines, or multiple traversals of the same stream.

一个流应该只被操作一次(调用一个中间或终端流操作)。例如,这排除了“分叉”流,其中相同的源提供两个或多个管道,或者同一流的多次遍历。

You can work around this using peekor other methods should you truly desire that type of behaviour. In this case, what you should do is instead of trying to back two streams from the same original Stream source with a forking filter, you would duplicate your stream and filter each of the duplicates appropriately.

如果peek您真的需要这种类型的行为,您可以使用或其他方法解决此问题。在这种情况下,您应该做的不是尝试使用分叉过滤器从同一个原始 Stream 源返回两个流,而是复制您的流并适当地过滤每个重复项。

However, you may wish to reconsider if a Streamis the appropriate structure for your use case.

但是,您可能希望重新考虑 aStream是否适合您的用例。

回答by aepurniet

not exactly, but you may be able to accomplish what you need by invoking Collectors.groupingBy(). you create a new Collection, and can then instantiate streams on that new collection.

不完全是,但您可以通过调用Collectors.groupingBy(). 您创建一个新集合,然后可以在该新集合上实例化流。

回答by Mark Jeronimus

A collectorcan be used for this.

集电极可以用于此目的。

  • For two categories, use Collectors.partitioningBy()factory.
  • 对于两个类别,使用Collectors.partitioningBy()工厂。

This will create a Mapfrom Booleanto List, and put items in one or the other list based on a Predicate.

这将创建一个Mapfrom Booleanto List,并根据 a 将项目放入一个或另一个列表中Predicate

Note: Since the stream needs to be consumed whole, this can't work on infinite streams. And because the stream is consumed anyway, this method simply puts them in Lists instead of making a new stream-with-memory. You can always stream those lists if you require streams as output.

注意:由于流需要被整个消耗,因此这不适用于无限流。并且因为无论如何都会消耗流,所以这个方法只是将它们放在列表中,而不是创建一个新的带有内存的流。如果您需要流作为输出,您始终可以流式传输这些列表。

Also, no need for the iterator, not even in the heads-only example you provided.

此外,不需要迭代器,即使在您提供的仅限头像的示例中也不需要。

  • Binary splitting looks like this:
  • 二分法看起来像这样:
Random r = new Random();

Map<Boolean, List<String>> groups = stream
    .collect(Collectors.partitioningBy(x -> r.nextBoolean()));

System.out.println(groups.get(false).size());
System.out.println(groups.get(true).size());
  • For more categories, use a Collectors.groupingBy()factory.
  • 对于更多类别,请使用Collectors.groupingBy()工厂。
Map<Object, List<String>> groups = stream
    .collect(Collectors.groupingBy(x -> r.nextInt(3)));
System.out.println(groups.get(0).size());
System.out.println(groups.get(1).size());
System.out.println(groups.get(2).size());

In case the streams are not Stream, but one of the primitive streams like IntStream, then this .collect(Collectors)method is not available. You'll have to do it the manual way without a collector factory. It's implementation looks like this:

如果流不是Stream,而是原始流之一,例如IntStream,则此.collect(Collectors)方法不可用。您必须在没有收集器工厂的情况下以手动方式进行操作。它的实现如下所示:

[Example 2.0 since 2020-04-16]

[自 2020-04-16 以来的示例 2.0]

    IntStream    intStream = IntStream.iterate(0, i -> i + 1).limit(100000).parallel();
    IntPredicate predicate = ignored -> r.nextBoolean();

    Map<Boolean, List<Integer>> groups = intStream.collect(
            () -> Map.of(false, new ArrayList<>(100000),
                         true , new ArrayList<>(100000)),
            (map, value) -> map.get(predicate.test(value)).add(value),
            (map1, map2) -> {
                map1.get(false).addAll(map2.get(false));
                map1.get(true ).addAll(map2.get(true ));
            });

In this example I initialize the ArrayLists with the full size of the initial collection (if this is known at all). This prevents resize events even in the worst-case scenario, but can potentially gobble up 2*N*T space (N = initial number of elements, T = number of threads). To trade-off space for speed, you can leave it out or use your best educated guess, like the expected highest number of elements in one partition (typically just over N/2 for a balanced split).

在这个例子中,我用初始集合的完整大小初始化 ArrayLists(如果这是已知的)。即使在最坏的情况下,这也可以防止调整大小事件,但可能会占用 2*N*T 空间(N = 初始元素数,T = 线程数)。为了权衡速度空间,您可以忽略它或使用您最好的猜测,例如一个分区中预期的最高元素数(对于平衡拆分,通常仅超过 N/2)。

I hope I don't offend anyone by using a Java 9 method. For the Java 8 version, look at the edit history.

我希望我不会因为使用 Java 9 方法而冒犯任何人。对于 Java 8 版本,请查看编辑历史记录。

回答by Ludger

I stumbled across this question to my self and I feel that a forked stream has some use cases that could prove valid. I wrote the code below as a consumer so that it does not do anything but you could apply it to functions and anything else you might come across.

我偶然发现了这个问题,我觉得分叉流有一些可以证明有效的用例。我以使用者的身份编写了下面的代码,因此它不会执行任何操作,但您可以将其应用于函数以及您可能遇到的任何其他内容。

class PredicateSplitterConsumer<T> implements Consumer<T>
{
  private Predicate<T> predicate;
  private Consumer<T>  positiveConsumer;
  private Consumer<T>  negativeConsumer;

  public PredicateSplitterConsumer(Predicate<T> predicate, Consumer<T> positive, Consumer<T> negative)
  {
    this.predicate = predicate;
    this.positiveConsumer = positive;
    this.negativeConsumer = negative;
  }

  @Override
  public void accept(T t)
  {
    if (predicate.test(t))
    {
      positiveConsumer.accept(t);
    }
    else
    {
      negativeConsumer.accept(t);
    }
  }
}

Now your code implementation could be something like this:

现在你的代码实现可能是这样的:

personsArray.forEach(
        new PredicateSplitterConsumer<>(
            person -> person.getDateOfBirth().isPresent(),
            person -> System.out.println(person.getName()),
            person -> System.out.println(person.getName() + " does not have Date of birth")));

回答by Ian Jones

This was the least bad answer I could come up with.

这是我能想出的最不坏的答案。

import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;

public class Test {

    public static <T, L, R> Pair<L, R> splitStream(Stream<T> inputStream, Predicate<T> predicate,
            Function<Stream<T>, L> trueStreamProcessor, Function<Stream<T>, R> falseStreamProcessor) {

        Map<Boolean, List<T>> partitioned = inputStream.collect(Collectors.partitioningBy(predicate));
        L trueResult = trueStreamProcessor.apply(partitioned.get(Boolean.TRUE).stream());
        R falseResult = falseStreamProcessor.apply(partitioned.get(Boolean.FALSE).stream());

        return new ImmutablePair<L, R>(trueResult, falseResult);
    }

    public static void main(String[] args) {

        Stream<Integer> stream = Stream.iterate(0, n -> n + 1).limit(10);

        Pair<List<Integer>, String> results = splitStream(stream,
                n -> n > 5,
                s -> s.filter(n -> n % 2 == 0).collect(Collectors.toList()),
                s -> s.map(n -> n.toString()).collect(Collectors.joining("|")));

        System.out.println(results);
    }

}

This takes a stream of integers and splits them at 5. For those greater than 5 it filters only even numbers and puts them in a list. For the rest it joins them with |.

这需要一个整数流并将它们拆分为 5。对于那些大于 5 的整数,它只过滤偶数并将它们放在一个列表中。对于其余的,它用 | 连接它们。

outputs:

输出:

 ([6, 8],0|1|2|3|4|5)

Its not ideal as it collects everything into intermediary collections breaking the stream (and has too many arguments!)

它并不理想,因为它将所有内容收集到破坏流的中间集合中(并且有太多参数!)

回答by Matthew

How about:

怎么样:

Supplier<Stream<Integer>> randomIntsStreamSupplier =
    () -> (new Random()).ints(0, 2).boxed();

Stream<Integer> tails =
    randomIntsStreamSupplier.get().filter(x->x.equals(0));
Stream<Integer> heads =
    randomIntsStreamSupplier.get().filter(x->x.equals(1));

回答by Sebastian Hans

I stumbled across this question while looking for a way to filter certain elements out of a stream and log them as errors. So I did not really need to split the stream so much as attach a premature terminating action to a predicate with unobtrusive syntax. This is what I came up with:

我在寻找从流中过滤某些元素并将它们记录为错误的方法时偶然发现了这个问题。所以我真的不需要拆分流,只需将过早的终止操作附加到具有不显眼的语法的谓词上。这就是我想出的:

public class MyProcess {
    /* Return a Predicate that performs a bail-out action on non-matching items. */
    private static <T> Predicate<T> withAltAction(Predicate<T> pred, Consumer<T> altAction) {
    return x -> {
        if (pred.test(x)) {
            return true;
        }
        altAction.accept(x);
        return false;
    };

    /* Example usage in non-trivial pipeline */
    public void processItems(Stream<Item> stream) {
        stream.filter(Objects::nonNull)
              .peek(this::logItem)
              .map(Item::getSubItems)
              .filter(withAltAction(SubItem::isValid,
                                    i -> logError(i, "Invalid")))
              .peek(this::logSubItem)
              .filter(withAltAction(i -> i.size() > 10,
                                    i -> logError(i, "Too large")))
              .map(SubItem::toDisplayItem)
              .forEach(this::display);
    }
}

回答by OneCricketeer

Shorter version that uses Lombok

使用 Lombok 的较短版本

import java.util.function.Consumer;
import java.util.function.Predicate;

import lombok.RequiredArgsConstructor;

/**
 * Forks a Stream using a Predicate into postive and negative outcomes.
 */
@RequiredArgsConstructor
@FieldDefaults(makeFinal = true, level = AccessLevel.PROTECTED)
public class StreamForkerUtil<T> implements Consumer<T> {
    Predicate<T> predicate;
    Consumer<T> positiveConsumer;
    Consumer<T> negativeConsumer;

    @Override
    public void accept(T t) {
        (predicate.test(t) ? positiveConsumer : negativeConsumer).accept(t);
    }
}