Java 8 流条件处理
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/35854574/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Java 8 streams conditional processing
提问by gdiazc
I'm interested in separating a stream into two or more substreams, and processing the elements in different ways. For example, a (large) text file might contain lines of type A and lines of type B, in which case I'd like to do something like:
我有兴趣将一个流分成两个或多个子流,并以不同的方式处理这些元素。例如,一个(大)文本文件可能包含类型 A 的行和类型 B 的行,在这种情况下,我想执行以下操作:
File.lines(path)
.filter(line -> isTypeA(line))
.forEachTrue(line -> processTypeA(line))
.forEachFalse(line -> processTypeB(line))
The previous is my attempt at abstracting the situation. In reality I have a very large text file where each line is testing against a regex; if the line passes, then it is processed, whereas if it is rejected, then I want to update a counter. This further processing on rejected strings is why I don't simply use filter
.
前一个是我试图抽象情况。实际上,我有一个非常大的文本文件,其中每一行都在针对正则表达式进行测试;如果该行通过,则对其进行处理,而如果它被拒绝,则我想更新一个计数器。对被拒绝字符串的进一步处理是我不简单地使用filter
.
Is there any reasonable way to do this with streams, or will I have to fallback to loops? (I would like this to run in parallel as well, so streams are my first choice).
有没有什么合理的方法可以用流来做到这一点,还是我必须回退到循环?(我也希望它并行运行,因此流是我的首选)。
采纳答案by Cosu
Java 8 streams weren't designed to support this kind of operation. From the jdk:
Java 8 流不是为了支持这种操作而设计的。从jdk:
A stream should be operated on (invoking an intermediate or terminal stream operation) only once. This rules out, for example, "forked" streams, where the same source feeds two or more pipelines, or multiple traversals of the same stream.
一个流应该只被操作一次(调用一个中间或终端流操作)。例如,这排除了“分叉”流,其中相同的源提供两个或多个管道,或者同一流的多次遍历。
If you can store it in memory you can use Collectors.partitioningBy
if you have just two types and go by with a Map<Boolean, List>
. Otherwise use Collectors.groupingBy
.
如果您可以将它存储在内存中,那么您可以Collectors.partitioningBy
在只有两种类型时使用,并且使用Map<Boolean, List>
. 否则使用Collectors.groupingBy
.
回答by Louis Wasserman
The way I'd deal with this is not to split this up at all, but rather, write
我处理这个问题的方法根本不是把它分开,而是写
Files.lines(path)
.map(line -> {
if (condition(line)) {
return doThingA(line);
} else {
return doThingB(line);
}
})...
Details vary depending on exactly what you want to do and how you plan to do it.
详细信息取决于您想要做什么以及您打算如何做。
回答by JB Nizet
Well, you can simply do
好吧,你可以简单地做
Counter counter = new Counter();
File.lines(path)
.forEach(line -> {
if (isTypeA(line)) {
processTypeA(line);
}
else {
counter.increment();
}
});
Not very functional-style, but it does it in a similar way as your example. Of course, if parallel, both Counter.increment()
and processTypeA()
have to be thread-safe.
不是很实用的风格,但它的方式与您的示例类似。当然,如果平行,两者Counter.increment()
并processTypeA()
必须是线程安全的。
回答by erickson
Simply test each element, and act accordingly.
只需测试每个元素,然后采取相应的行动。
lines.forEach(line -> {
if (isTypeA(line)) processTypeA(line);
else processTypeB(line);
});
This behavior could be hidden in a helper method:
此行为可能隐藏在辅助方法中:
public static <T> Consumer<T> branch(Predicate<? super T> test,
Consumer<? super T> t,
Consumer<? super T> f) {
return o -> {
if (test.test(o)) t.accept(o);
else f.accept(o);
};
}
Then the usage would look like this:
然后用法如下所示:
lines.forEach(branch(this::isTypeA, this::processTypeA, this::processTypeB));
Tangential Note
切线注意事项
The Files.lines()
method does not close the underlying file,so you must use it like this:
该Files.lines()
方法不会关闭底层文件,因此您必须像这样使用它:
try (Stream<String> lines = Files.lines(path, encoding)) {
lines.forEach(...);
}
Variables of Stream
type throw up a bit of a red flag for me, so I prefer to manage a BufferedReader
directly:
Stream
类型变量对我来说有点危险,所以我更喜欢BufferedReader
直接管理:
try (BufferedReader lines = Files.newBufferedReader(path, encoding)) {
lines.lines().forEach(...);
}
回答by Holger
While side effects in behavioral parameters are discouraged, they are not forbidden, as long as there's no interference, so the simplest, though not cleanest solution is to count right in the filter:
虽然不鼓励行为参数中的副作用,但只要没有干扰,它们就不会被禁止,所以最简单但不是最干净的解决方案是在过滤器中正确计数:
AtomicInteger rejected=new AtomicInteger();
Files.lines(path)
.filter(line -> {
boolean accepted=isTypeA(line);
if(!accepted) rejected.incrementAndGet();
return accepted;
})
// chain processing of matched lines
As long as you are processing all items, the result will be consistent. Only if you are using a short-circuiting terminal operation (in a parallel stream), the result will become unpredictable.
只要您处理所有项目,结果就会保持一致。只有当您使用短路终端操作(在并行流中)时,结果才会变得不可预测。
Updating an atomic variable may not be the most efficient solution, but in the context of processing lines from a file, the overhead will likely be negligible.
更新原子变量可能不是最有效的解决方案,但在处理文件行的上下文中,开销可能可以忽略不计。
If you want a clean, parallel friendly solution, one general approach is to implement a Collector
which can combine the processing of two collect operations based on a condition. This requires that you are able to express the downstream operation as a collector, but most stream operations can be expressed as collector (and the trend is going towards the possibility to express all operation that way, i.e. Java?9 will add the currently missing filtering
and flatMapping
.
如果你想要一个干净的、并行友好的解决方案,一种通用的方法是实现一个Collector
可以根据条件组合两个收集操作的处理。这要求您能够将下游操作表示为收集器,但大多数流操作都可以表示为收集器(并且趋势趋向于以这种方式表示所有操作的可能性,即 Java?9 将添加当前缺少的filtering
和flatMapping
.
You'll need a pair type to hold two results, so assuming a sketch like
你需要一个pair类型来保存两个结果,所以假设一个草图
class Pair<A,B> {
final A a;
final B b;
Pair(A a, B b) {
this.a=a;
this.b=b;
}
}
the combining collector implementation will look like
组合收集器的实现看起来像
public static <T, A1, A2, R1, R2> Collector<T, ?, Pair<R1,R2>> conditional(
Predicate<? super T> predicate,
Collector<T, A1, R1> whenTrue, Collector<T, A2, R2> whenFalse) {
Supplier<A1> s1=whenTrue.supplier();
Supplier<A2> s2=whenFalse.supplier();
BiConsumer<A1, T> a1=whenTrue.accumulator();
BiConsumer<A2, T> a2=whenFalse.accumulator();
BinaryOperator<A1> c1=whenTrue.combiner();
BinaryOperator<A2> c2=whenFalse.combiner();
Function<A1,R1> f1=whenTrue.finisher();
Function<A2,R2> f2=whenFalse.finisher();
return Collector.of(
()->new Pair<>(s1.get(), s2.get()),
(p,t)->{
if(predicate.test(t)) a1.accept(p.a, t); else a2.accept(p.b, t);
},
(p1,p2)->new Pair<>(c1.apply(p1.a, p2.a), c2.apply(p1.b, p2.b)),
p -> new Pair<>(f1.apply(p.a), f2.apply(p.b)));
}
and can be used, for example for collecting matching items into a list and counting the non-matching, like this:
并且可以用于例如将匹配项收集到列表中并计算不匹配项,如下所示:
Pair<List<String>, Long> p = Files.lines(path)
.collect(conditional(line -> isTypeA(line), Collectors.toList(), Collectors.counting()));
List<String> matching=p.a;
long nonMatching=p.b;
The collector is parallel friendly and allows arbitrarily complex delegate collectors, but note that with the current implementation, the stream returned by Files.lines
might not perform so well with parallel processing, compare to “Reader#lines() parallelizes badly due to nonconfigurable batch size policy in its spliterator”. Improvements are scheduled for the Java?9 release.
收集器是并行友好的,并允许任意复杂的委托收集器,但请注意,在当前的实现中,返回的流Files.lines
可能不会在并行处理中表现得很好,与“Reader#lines() 由于不可配置的批量大小策略在它的分离器”。计划在 Java?9 版本中进行改进。
回答by Oleg Mikheev
It seems that in reality you do want to process each line, but process it differently based on some condition (type).
似乎实际上您确实想要处理每一行,但根据某些条件(类型)进行不同的处理。
I think this is more or less functional way to implement it would be:
我认为这或多或少是实现它的功能方式:
public static void main(String[] args) {
Arrays.stream(new int[] {1,2,3,4}).map(i -> processor(i).get()).forEach(System.out::println);
}
static Supplier<Integer> processor(int i) {
return tellType(i) ? () -> processTypeA(i) : () -> processTypeB(i);
}
static boolean tellType(int i) {
return i % 2 == 0;
}
static int processTypeA(int i) {
return i * 100;
}
static int processTypeB(int i) {
return i * 10;
}
回答by tom
Here's an approach (which ignores the cautions about forcing conditional processing into a stream) that wraps a predicate and consumer into a single predicate-with-side-effect:
这是一种将谓词和使用者包装成单个带有副作用的谓词的方法(忽略了将条件处理强制转换为流的注意事项):
public static class StreamProc {
public static <T> Predicate<T> process( Predicate<T> condition, Consumer<T> operation ) {
Predicate<T> p = t -> { operation.accept(t); return false; };
return (t) -> condition.test(t) ? p.test(t) : true;
}
}
Then filter the stream:
然后过滤流:
someStream
.filter( StreamProc.process( cond1, op1 ) )
.filter( StreamProc.process( cond2, op2 ) )
...
.collect( ... )
Elements remaining in the stream have not yet been processed.
流中剩余的元素尚未处理。
For example, a typical filesystem traversal using external iteration looks like
例如,使用外部迭代的典型文件系统遍历看起来像
File[] files = dir.listFiles();
for ( File f : files ) {
if ( f.isDirectory() ) {
this.processDir( f );
} else if ( f.isFile() ) {
this.processFile( f );
} else {
this.processErr( f );
}
}
With streams and internal iteration this becomes
通过流和内部迭代,这变成了
Arrays.stream( dir.listFiles() )
.filter( StreamProc.process( f -> f.isDirectory(), this::processDir ) )
.filter( StreamProc.process( f -> f.isFile(), this::processFile ) )
.forEach( f -> this::processErr );
I would like Stream to implement the process method directly. Then we could have
我希望 Stream 直接实现 process 方法。那么我们可以有
Arrays.stream( dir.listFiles() )
.process( f -> f.isDirectory(), this::processDir ) )
.process( f -> f.isFile(), this::processFile ) )
.forEach( f -> this::processErr );
Thoughts?
想法?