java 在 RxJava 中正确处理空的 Observable
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/29086444/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Properly handling empty Observable in RxJava
提问by babernathy
I have a situation where I am creating an Observable containing results from a database. I am then applying a series of filters to them. I then have a subscriber that is logging the results. It may be the case that no elements make their way though the filters. My business logic states this is not an error. However, when this happens my onError is called and contains the following exception: java.util.NoSuchElementException: Sequence contains no elements
我有一种情况,我正在创建一个包含数据库结果的 Observable。然后我对它们应用一系列过滤器。然后我有一个正在记录结果的订阅者。可能没有元素通过过滤器。我的业务逻辑表明这不是错误。但是,当发生这种情况时,我的 onError 被调用并包含以下异常:java.util.NoSuchElementException: Sequence contains no elements
Is the accepted practice to just detect that type of exception and ignore it? Or is there a better way to handle this?
仅检测该类型的异常并忽略它是公认的做法吗?或者有没有更好的方法来处理这个问题?
The version is 1.0.0.
版本是 1.0.0。
Here is a simple test case that exposes what I'm seeing. It appears to be related to having all the events filtered before reaching a map and reduce.
这是一个简单的测试用例,它揭示了我所看到的。这似乎与在到达地图和减少之前过滤所有事件有关。
@Test
public void test()
{
Integer values[] = new Integer[]{1, 2, 3, 4, 5};
Observable.from(values).filter(new Func1<Integer, Boolean>()
{
@Override
public Boolean call(Integer integer)
{
if (integer < 0)
return true;
else
return false;
}
}).map(new Func1<Integer, String>()
{
@Override
public String call(Integer integer)
{
return String.valueOf(integer);
}
}).reduce(new Func2<String, String, String>()
{
@Override
public String call(String s, String s2)
{
return s + "," + s2;
}
})
.subscribe(new Action1<String>()
{
@Override
public void call(String s)
{
System.out.println(s);
}
});
}
Because I am using a safe subscriber, it initially throws an OnErrorNotImplementedException which wraps the following exception:
因为我使用的是安全订阅者,所以它最初会抛出一个 OnErrorNotImplementedException,它包含以下异常:
java.util.NoSuchElementException: Sequence contains no elements
at rx.internal.operators.OperatorSingle.onCompleted(OperatorSingle.java:82)
at rx.internal.operators.NotificationLite.accept(NotificationLite.java:140)
at rx.internal.operators.TakeLastQueueProducer.emit(TakeLastQueueProducer.java:73)
at rx.internal.operators.TakeLastQueueProducer.startEmitting(TakeLastQueueProducer.java:45)
at rx.internal.operators.OperatorTakeLast.onCompleted(OperatorTakeLast.java:59)
at rx.internal.operators.OperatorScan.onCompleted(OperatorScan.java:121)
at rx.internal.operators.OperatorMap.onCompleted(OperatorMap.java:43)
at rx.internal.operators.OperatorFilter.onCompleted(OperatorFilter.java:42)
at rx.internal.operators.OnSubscribeFromIterable$IterableProducer.request(OnSubscribeFromIterable.java:79)
at rx.internal.operators.OperatorScan.request(OperatorScan.java:147)
at rx.Subscriber.setProducer(Subscriber.java:139)
at rx.internal.operators.OperatorScan.setProducer(OperatorScan.java:139)
at rx.Subscriber.setProducer(Subscriber.java:133)
at rx.Subscriber.setProducer(Subscriber.java:133)
at rx.internal.operators.OnSubscribeFromIterable.call(OnSubscribeFromIterable.java:47)
at rx.internal.operators.OnSubscribeFromIterable.call(OnSubscribeFromIterable.java:33)
at rx.Observable.call(Observable.java:144)
at rx.Observable.call(Observable.java:136)
at rx.Observable.call(Observable.java:144)
at rx.Observable.call(Observable.java:136)
at rx.Observable.call(Observable.java:144)
at rx.Observable.call(Observable.java:136)
at rx.Observable.call(Observable.java:144)
at rx.Observable.call(Observable.java:136)
at rx.Observable.call(Observable.java:144)
at rx.Observable.call(Observable.java:136)
at rx.Observable.subscribe(Observable.java:7284)
Based on the answer from @davem below, I created a new test case:
根据下面@davem 的回答,我创建了一个新的测试用例:
@Test
public void testFromBlockingAndSingle()
{
Integer values[] = new Integer[]{-2, -1, 0, 1, 2, 3, 4, 5};
List<String> results = Observable.from(values).filter(new Func1<Integer, Boolean>()
{
@Override
public Boolean call(Integer integer)
{
if (integer < 0)
return true;
else
return false;
}
}).map(new Func1<Integer, String>()
{
@Override
public String call(Integer integer)
{
return String.valueOf(integer);
}
}).reduce(new Func2<String, String, String>()
{
@Override
public String call(String s, String s2)
{
return s + "," + s2;
}
}).toList().toBlocking().single();
System.out.println("Test: " + results + " Size: " + results.size());
}
And this test results in the following behavior:
此测试导致以下行为:
When the input is:
当输入为:
Integer values[] = new Integer[]{-2, -1, 0, 1, 2, 3, 4, 5};
Then the results (as expected) are:
然后结果(如预期的那样)是:
Test: [-2,-1] Size: 1
And when the input is:
当输入是:
Integer values[] = new Integer[]{0, 1, 2, 3, 4, 5};
Then the result is the following stack trace:
然后结果是以下堆栈跟踪:
java.util.NoSuchElementException: Sequence contains no elements
at rx.internal.operators.OperatorSingle.onCompleted(OperatorSingle.java:82)
at rx.internal.operators.NotificationLite.accept(NotificationLite.java:140)
at rx.internal.operators.TakeLastQueueProducer.emit(TakeLastQueueProducer.java:73)
at rx.internal.operators.TakeLastQueueProducer.startEmitting(TakeLastQueueProducer.java:45)
at rx.internal.operators.OperatorTakeLast.onCompleted(OperatorTakeLast.java:59)
at rx.internal.operators.OperatorScan.onCompleted(OperatorScan.java:121)
at rx.internal.operators.OperatorMap.onCompleted(OperatorMap.java:43)
at rx.internal.operators.OperatorFilter.onCompleted(OperatorFilter.java:42)
at rx.internal.operators.OnSubscribeFromIterable$IterableProducer.request(OnSubscribeFromIterable.java:79)
at rx.internal.operators.OperatorScan.request(OperatorScan.java:147)
at rx.Subscriber.setProducer(Subscriber.java:139)
at rx.internal.operators.OperatorScan.setProducer(OperatorScan.java:139)
at rx.Subscriber.setProducer(Subscriber.java:133)
at rx.Subscriber.setProducer(Subscriber.java:133)
at rx.internal.operators.OnSubscribeFromIterable.call(OnSubscribeFromIterable.java:47)
at rx.internal.operators.OnSubscribeFromIterable.call(OnSubscribeFromIterable.java:33)
at rx.Observable.call(Observable.java:144)
at rx.Observable.call(Observable.java:136)
at rx.Observable.call(Observable.java:144)
at rx.Observable.call(Observable.java:136)
at rx.Observable.call(Observable.java:144)
at rx.Observable.call(Observable.java:136)
at rx.Observable.call(Observable.java:144)
at rx.Observable.call(Observable.java:136)
at rx.Observable.call(Observable.java:144)
at rx.Observable.call(Observable.java:136)
at rx.Observable.call(Observable.java:144)
at rx.Observable.call(Observable.java:136)
at rx.Observable.call(Observable.java:144)
at rx.Observable.call(Observable.java:136)
at rx.Observable.subscribe(Observable.java:7284)
at rx.observables.BlockingObservable.blockForSingle(BlockingObservable.java:441)
at rx.observables.BlockingObservable.single(BlockingObservable.java:340)
at EmptyTest2.test(EmptyTest2.java:19)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at org.junit.runners.model.FrameworkMethod.runReflectiveCall(FrameworkMethod.java:47)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
at org.junit.runners.ParentRunner.run(ParentRunner.java:238)
at org.junit.runners.ParentRunner.schedule(ParentRunner.java:63)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
at org.junit.runners.ParentRunner.access@Test
public void testNoReduce()
{
Integer values[] = new Integer[]{-2, -1, 0, 1, 2, 3, 4, 5};
List<String> results = Observable.from(values).filter(new Func1<Integer, Boolean>()
{
@Override
public Boolean call(Integer integer)
{
if (integer < 0)
return true;
else
return false;
}
}).map(new Func1<Integer, String>()
{
@Override
public String call(Integer integer)
{
return String.valueOf(integer);
}
}).toList().toBlocking().first();
Iterator<String> itr = results.iterator();
StringBuilder b = new StringBuilder();
while (itr.hasNext())
{
b.append(itr.next());
if (itr.hasNext())
b.append(",");
}
System.out.println("Test NoReduce: " + b);
}
0(ParentRunner.java:53)
at org.junit.runners.ParentRunner.evaluate(ParentRunner.java:229)
at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
at org.junit.runner.JUnitCore.run(JUnitCore.java:160)
at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:74)
at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:211)
at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:67)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:134)
So it appears that the problem is definitely with the use of the reduce function. See the following test case that handles both situations:
所以看起来问题肯定出在reduce函数的使用上。请参阅以下处理这两种情况的测试用例:
Integer values[] = new Integer[]{-2, -1, 0, 1, 2, 3, 4, 5};
With the following input:
使用以下输入:
Test NoReduce: -2,-1
I get the following results which are expected:
我得到以下预期结果:
Integer values[] = new Integer[]{0, 1, 2, 3, 4, 5};
And with the following input:
并使用以下输入:
Test NoReduce:
I get the following output which are expected:
我得到以下预期的输出:
@Test
public void testWithToList()
{
Integer values[] = new Integer[]{-2, -1, 0, 1, 2, 3, 4, 5};
Observable.from(values).filter(new Func1<Integer, Boolean>()
{
@Override
public Boolean call(Integer integer)
{
if (integer < 0)
return true;
else
return false;
}
}).toList().map(new Func1<List<Integer>, String>()
{
@Override
public String call(List<Integer> integers)
{
Iterator<Integer> intItr = integers.iterator();
StringBuilder b = new StringBuilder();
while (intItr.hasNext())
{
b.append(intItr.next());
if (intItr.hasNext())
{
b.append(",");
}
}
return b.toString();
}
}).subscribe(new Action1<String>()
{
@Override
public void call(String s)
{
System.out.println("With a toList: " + s);
}
});
}
So, unless I am completely misunderstanding something, the only way to really handle a zero element Observable that results from a filter when followed by a map and reduce is to implement the reduce logic outside of the Observable chain. Do you all agree with that statement?
因此,除非我完全误解了某些东西,否则真正处理由过滤器产生的零元素 Observable 的唯一方法是在 Observable 链之外实现 reduce 逻辑。大家同意这个说法吗?
Final Solution
最终解决方案
Here is my final solution after implementing what both Tomá? Dvo?ák and David Motten suggested. I think this solution is reasonable.
这是我在实施 Tomá 后的最终解决方案?Dvo?ák 和 David Motten 建议。我认为这个解决方案是合理的。
Integer values[] = new Integer[]{-2, -1, 0, 1, 2, 3, 4, 5};
Here is how this test behaves when given the following inputs.
以下是给定以下输入时此测试的行为方式。
When given a stream that will have some values pass through the filters:
当给定一个将有一些值通过过滤器的流时:
With a toList: -2,-1
The result is:
结果是:
Integer values[] = new Integer[]{0, 1, 2, 3, 4, 5};
When given a stream that will not have any values pass through the filters:
当给定一个没有任何值的流通过过滤器时:
With a toList: <empty string>
The result is:
结果是:
.filter(...)
.toList()
.map(listOfValues => listOfValues.intersperse(", "))
回答by Tomá? Dvo?ák
Now after your update the error is quite obvious. Reduce
in RxJava will fail with an IllegalArgumentException
if the observable it is reducing is empty, exactly as per specification (http://reactivex.io/documentation/operators/reduce.html).
现在,在您更新后,错误非常明显。Reduce
在 RxJava 中,IllegalArgumentException
如果它正在减少的 observable 为空,则将失败并显示,完全按照规范(http://reactivex.io/documentation/operators/reduce.html)。
In functional programming, there are usually two generic operators that aggregate a collection into a single value, fold
and reduce
. In the accepted terminology, fold
takes an initial accumulator value, and a function that takes a running accumulator and a value from the collection and produces another accumulator value. An example in pseudocode:
在函数式编程中,通常有两个泛型运算符将集合聚合为单个值,fold
以及reduce
. 在公认的术语中,fold
取一个初始累加器值,以及一个函数,该函数从集合中获取一个正在运行的累加器和一个值,并生成另一个累加器值。伪代码示例:
[1, 2, 3, 4].fold(0, (accumulator, value) => accumulator + value)
[1, 2, 3, 4].fold(0, (accumulator, value) => accumulator + value)
will start with 0, and eventually add 1, 2, 3, 4 to the running accumulator, finally yielding 10, the sum of the values.
将从 0 开始,最终将 1、2、3、4 添加到正在运行的累加器,最终产生 10,即这些值的总和。
Reduce is very similar, only it doesn't take the initial accumulator value explicitly, it uses the first value as an initial accumulator, and then accumulates all the remaining values. This makes sense if you are e.g. looking for a minimum or maximum value.
Reduce 非常相似,只是它没有显式地取初始累加器值,它使用第一个值作为初始累加器,然后累加所有剩余的值。如果您正在寻找最小值或最大值,这很有意义。
[1, 2, 3, 4].reduce((accumulator, value) => min(accumulator, value))
[1, 2, 3, 4].reduce((accumulator, value) => min(accumulator, value))
Looking at fold and reduce different way, you will probably use fold
whenever the aggregated value will make sense even on empty collection (like, in sum
, 0 makes sense), and reduce
otherwise (minimum
makes no sense on an empty collection, and reduce
will fail to operate on such collection, in your case by throwing an exception).
以不同的方式查看 fold 和 reduce ,您可能会fold
在聚合值即使在空集合上有意义时使用(例如 in sum
, 0 有意义),reduce
否则(minimum
对空集合没有意义,并且reduce
将无法操作这样的集合,在你的情况下通过抛出异常)。
You are doing a similar aggregation, interspersing a collection of strings with a comma to produce a single string. That is a little bit more difficult situation. It probably makes sense on an empty collection (you probably expect an empty string), but on the other hand, if you start with an empty accumulator, you will have one more comma in the result than you expect. Correct solution to this is to check, whether the collection is empty first, and either return a fall back string for an empty collection, or do a reduce
on a non-empty collection. You will probably observe, that often you don't actually want an empty string in the empty collection case, but something like "collection is empty" might be more appropriate, thus further assuring you that this solution is the clean one.
您正在执行类似的聚合,用逗号散布字符串集合以生成单个字符串。那是有点困难的情况。这可能对空集合有意义(您可能期望一个空字符串),但另一方面,如果您从一个空累加器开始,结果中的逗号将比您预期的多。对此的正确解决方案是先检查集合是否为空,然后为空集合返回回退字符串,或reduce
对非空集合执行 a 。您可能会观察到,在空集合的情况下,您通常实际上并不想要空字符串,但诸如“集合为空”之类的内容可能更合适,从而进一步向您保证此解决方案是干净的解决方案。
Btw, I'm using the word collectionhere instead of observablefreely, just for educational purposes. Also, in RxJava, both fold
and reduce
are called the same, reduce
, only there are two versions of that method, one taking just one parameter, the other two parameters.
顺便说一句,我在这里使用的词是collection而不是observable,只是出于教育目的。此外,在 RxJava 中,fold
和reduce
都被称为相同的方法,reduce
只有该方法有两个版本,一个只接受一个参数,另外两个参数。
As for your final question: you don't have to leave the Observable chain. Just use toList(), as David Motten suggests.
至于你的最后一个问题:你不必离开 Observable 链。正如 David Motten 建议的那样,只需使用 toList()。
collection.intersperse(separator) =
if (collection.isEmpty())
""
else
collection.reduce(accumulator, element => accumulator + separator + element)
where intersperse
could be implemented in terms of reduce
, if not already a library function (it is quite common).
如果还不是库函数(这很常见),intersperse
可以在where方面实现reduce
。
回答by Dave Moten
The reason this is happening is that your are using toBlocking().single()
on an empty stream. If you expect 0 or 1 values from the stream you could do toList().toBlocking().single()
then inspect the values in the list (which could be empty but won't provoke the exception you are getting).
发生这种情况的原因是您正在使用toBlocking().single()
空流。如果您期望来自流的 0 或 1 个值,您可以toList().toBlocking().single()
检查列表中的值(可能为空,但不会引发您获得的异常)。
回答by Victor Mejia
You can use reduce with initial value version .reduce(0, (x,y) -> x+y )
it should work as fold operator explained by Tomá? Dvo?ák
您可以将 reduce 与初始值版本一起使用,.reduce(0, (x,y) -> x+y )
它应该像 Tomá 解释的折叠运算符一样工作?德沃克