Java 使用 Reactor 抛出异常的正确方法
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/53595420/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Correct way of throwing exceptions with Reactor
提问by davioooh
I'm new to project Reactorand reactive programming in general.
我是项目Reactor和响应式编程的新手。
I'm currently working on a piece of code similar to this:
我目前正在处理一段与此类似的代码:
Mono.just(userId)
.map(repo::findById)
.map(user-> {
if(user == null){
throw new UserNotFoundException();
}
return user;
})
// ... other mappings
This example is probably silly and there are surely better ways of implementing this case, but the point is:
这个例子可能很愚蠢,而且肯定有更好的方法来实现这个案例,但重点是:
Is it wrong to use a throw new
exception in a map
block or should I replace this with a return Mono.error(new UserNotFoundException())
?
throw new
在map
块中使用异常是错误的还是应该用 替换它return Mono.error(new UserNotFoundException())
?
Is there any actual difference in these two ways of doing?
这两种做事方式有什么实际区别吗?
采纳答案by Oleh Dokuka
There are a couple of ways that could be considered as a convenient way of exception throwing:
有几种方法可以被认为是一种方便的异常抛出方式:
Handle your element using Flux/Mono.handle
使用处理您的元素 Flux/Mono.handle
One of the way that could simplify handling of an element which may result in an error or empty stream is operator handle
.
可以简化可能导致错误或空流的元素处理的方法之一是 operator handle
。
The following code shows how we can use it in order to solve our problem:
以下代码显示了我们如何使用它来解决我们的问题:
Mono.just(userId)
.map(repo::findById)
.handle((user, sink) -> {
if(!isValid(user)){
sink.error(new InvalidUserException());
} else if (isSendable(user))
sink.next(user);
}
else {
//just ignore element
}
})
as we can see, the .handle
operator requires to pass BiConsumer<T, SynchronousSink<>
in order to handle an element. Here we have two parameters in our BiConsumer. The first one is an element from the upstream where the second one is SynchronousSink
which helps us to supply element to downstream synchronously. Such a technique expands an ability to supply different results of our element's processing. For example, in case the element is invalid, we can supply error to the same SycnchronousSync
which will cancel upstream and produce onError
signal to downstream. In turn, we can "filter" using the same handle
operator. Once the handle BiConsumer
is executed and no element has been supplied, Reactor will consider that as kind of filtering and will request for an additional element for us. Finally, in case the element is valid, we can simply call SynchronousSink#next
and propagate our element downstream or apply some mapping on it, so we will have handle
as the map
operator here. Moreover, we can safely use that operator with no-performance impact and provide complex element verification such as validation of element or error sending to downstream.
正如我们所见,.handle
操作符需要传递BiConsumer<T, SynchronousSink<>
才能处理元素。这里我们的 BiConsumer 中有两个参数。第一个是来自上游的元素,第二个是SynchronousSink
帮助我们同步向下游提供元素的元素。这种技术扩展了提供元素处理不同结果的能力。例如,如果元素无效,我们可以向相同的元素提供错误,SycnchronousSync
这将取消上游onError
并向下游产生信号。反过来,我们可以使用相同的handle
运算符“过滤” 。一旦把手BiConsumer
执行并且没有提供任何元素,Reactor 会认为这是一种过滤,并会为我们请求一个额外的元素。最后,如果元素有效,我们可以简单地SynchronousSink#next
向下游调用和传播我们的元素或对其应用一些映射,因此我们将handle
在map
这里使用as操作符。此外,我们可以安全地使用该运算符而不会影响性能,并提供复杂的元素验证,例如元素验证或向下游发送错误。
Throws using #concatMap
+ Mono.error
使用#concatMap
+抛出Mono.error
One of the options to throw an exception during mapping is to replace map
with concatMap
. In its essence, concatMap
does almost the same flatMap
does. The only difference is that concatMap
allows only one substream at a time. Such behavior simplifies internal implementation a lot and does not impact performance. So we can use the following code in order to throw an exception in a more functional way:
在映射期间抛出异常的选项之一是替换map
为concatMap
。从本质上讲,concatMap
几乎相同flatMap
。唯一的区别是concatMap
一次只允许一个子流。这种行为极大地简化了内部实现并且不会影响性能。因此,我们可以使用以下代码以更实用的方式抛出异常:
Mono.just(userId)
.map(repo::findById)
.concatMap(user-> {
if(!isValid(user)){
return Mono.error(new InvalidUserException());
}
return Mono.just(user);
})
In the sample above in case of an invalid user, we return exception using Mono.error
. The same we can do for flux using Flux.error
:
在上面的示例中,如果用户无效,我们使用Mono.error
. 我们可以使用Flux.error
以下方法对通量做同样的事情:
Flux.just(userId1, userId2, userId3)
.map(repo::findById)
.concatMap(user-> {
if(!isValid(user)){
return Flux.error(new InvalidUserException());
}
return Mono.just(user);
})
Note, in both cases we return coldstream which has only one element. In Reactor, there is a couple of optimizations that improve performance in the case returned stream is a cold scalarstream. Thus, it is recommended to use Flux/Mono concatMap
+ .just
, empty
, error
as a result when we need more complex mapping, that could end up with return null
or throw new ...
.
注意,在这两种情况下,我们都返回只有一个元素的冷流。在 Reactor 中,有一些优化可以在返回的流是冷标量流的情况下提高性能。因此,建议使用 Flux/Mono concatMap
+ .just
, empty
,error
因此当我们需要更复杂的映射时,可能会以return null
或结尾throw new ...
。
Attention! Don't ever check incoming element on nullability. The Reactor Project will never send a
null
value for you since this violates Reactive Streams spec (see Rule 2.13)Thus, in case ifrepo.findById
returns null, Reactor will throw NullPointerException for you.
注意力!永远不要检查传入元素的可空性。Reactor 项目永远不会
null
为您发送值,因为这违反了 Reactive Streams 规范(请参阅规则 2.13)因此,如果repo.findById
返回 null,Reactor 将为您抛出 NullPointerException。
Wait, Why concatMap
is better than flatMap
?
等等,为什么concatMap
比flatMap
?
In its essence, flatMap
is designed to merge elements from the multiple substreams that is executing at a time. It means that flatMap should have asynchronous streams underneath so, they could potentially process data on the multiple threads or that could be a several network calls. Subsequently, such expectations impact implementation a lot so flatMap
should be able to handle data from the multiple streams (Thread
s) (means usage of concurrent data structures), enqueue elements if there is a draining from another stream (means additional memory allocation for Queue
s for each substream) and do not violate Reactive Streams specification rules (means really complex implementation). Counting all these facts and the fact that we replace a plain map
operation (which is synchronous) onto the more convenient way of throwing an exception using Flux/Mono.error
(which does not change synchronicity of execution) leads to the fact that we do not need such a complex operator and we can use much simpler concatMap
which is designed for asynchronous handling of a single stream at a time and has a couple of optimization in order to handle scalar, cold stream.
从本质上讲,flatMap
旨在合并来自一次执行的多个子流的元素。这意味着 flatMap 应该在下面有异步流,因此它们可能会在多个线程上处理数据,或者可能是多个网络调用。随后,这种期望对实现有很大影响,因此flatMap
应该能够处理来自多个流 ( Thread
s) 的数据(意味着使用并发数据结构),如果从另一个流中排空,则将元素排入队列(意味着Queue
为每个流分配额外的内存)substream)并且不违反 Reactive Streams 规范规则(意味着非常复杂的实现)。计算所有这些事实以及我们替换一个普通的事实map
操作(它是同步的)到更方便的抛出异常的方式使用Flux/Mono.error
(它不会改变执行的同步性)导致我们不需要这样一个复杂的操作符,我们可以使用更简单的concatMap
为异步处理而设计的操作符一次处理单个流,并进行了一些优化以处理标量冷流。
Throws exception using switchOnEmpty
使用抛出异常 switchOnEmpty
So, another approach to throw an exception when the result is empty is switchOnEmpty
operator. The following code demonstrates how we can use that approach :
因此,另一种在结果为空时抛出异常的方法是switchOnEmpty
operator。以下代码演示了我们如何使用该方法:
Mono.just(userId)
.flatMap(repo::findById)
.switchIfEmpty(Mono.error(new UserNotFoundExeception()))
As we can see, in this case repo::findById
should have Mono
of User
as the return type. Therefore, in case a User
instance will not be found, the result stream will be empty. Thus, Reactor will call an alternative Mono
, specified as switchIfEmpty
parameter.
正如我们所见,在这种情况下repo::findById
应该有Mono
ofUser
作为返回类型。因此,如果User
找不到实例,结果流将为空。因此,Reactor 将调用一个替代项Mono
,指定为switchIfEmpty
参数。
Throw your exception as is
按原样抛出您的异常
It could be counted as a less readable code or bad practice (my own opinion), but you can throw your exception as is with Project Reactor. Even though, in someway doing so can violates Reactive Streams specification (in this context violatefrom the semantic perspective, because your operator under the hood is a Subscriber
in a chain of Subscriber
s, therefore - semantically, throwing an exception in lambda could be mapped to throwing exception in the onNext
method which violates the spec's rule 2.13). However, since Reactor will catch the thrown exception for you and propagate it then as the onError
signal to your downstream, it is not prohibited to do that.
它可以被视为可读性较差的代码或不好的做法(我自己的意见),但是您可以像 Project Reactor 一样抛出异常。即使在某种程度上这样做可能会违反 Reactive Streams 规范(在这种情况下,从语义的角度来看是违反的,因为引擎盖下的运算符是sSubscriber
链中Subscriber
的一个,因此 - 从语义上讲,在 lambda 中抛出异常可以映射为抛出onNext
违反规范规则 2.13的方法中的异常)。但是,由于 Reactor 会为您捕获抛出的异常并将其作为onError
信号传播到您的下游,因此不禁止这样做。
Takeaways
外卖
- Use
.handle
operator in order to provide complex element processing - Use
concatMap
+Mono.error
when we need to throw an exception during mapping but such a technique is most suitable for cases of asynchronous element processing. - Use
flatMap
+Mono.error
when we have already hadflatMap
in place Null
as a return type is forbidden so instead ofnull
in your downstreammap
you will get unexpectedonError
withNullPointerException
- Use
switchIfEmpty
in all cases when you need to send an error signal if the result of calling some specific function finished with the emptystream
- 使用
.handle
运算符来提供复杂的元素处理 - 当我们需要在映射期间抛出异常时使用
concatMap
+Mono.error
但这种技术最适合异步元素处理的情况。 - 当我们已经就位时使用
flatMap
+Mono.error
flatMap
Null
因为返回类型是被禁止的所以而不是null
在你的下游map
你会得到意想不到onError
的NullPointerException
switchIfEmpty
如果调用某些特定函数的结果以空流结束,则在需要发送错误信号的所有情况下使用