Java 使用 Reactor 抛出异常的正确方法

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/53595420/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-11 00:50:53  来源:igfitidea点击:

Correct way of throwing exceptions with Reactor

javareactive-programmingproject-reactorreactor

提问by davioooh

I'm new to project Reactorand reactive programming in general.

我是项目Reactor和响应式编程的新手。

I'm currently working on a piece of code similar to this:

我目前正在处理一段与此类似的代码:

Mono.just(userId)
    .map(repo::findById)
    .map(user-> {
        if(user == null){
            throw new UserNotFoundException();
        }
        return user;
    })
    // ... other mappings

This example is probably silly and there are surely better ways of implementing this case, but the point is:

这个例子可能很愚蠢,而且肯定有更好的方法来实现这个案例,但重点是:

Is it wrong to use a throw newexception in a mapblock or should I replace this with a return Mono.error(new UserNotFoundException())?

throw newmap块中使用异常是错误的还是应该用 替换它return Mono.error(new UserNotFoundException())

Is there any actual difference in these two ways of doing?

这两种做事方式有什么实际区别吗?

采纳答案by Oleh Dokuka

There are a couple of ways that could be considered as a convenient way of exception throwing:

有几种方法可以被认为是一种方便的异常抛出方式:

Handle your element using Flux/Mono.handle

使用处理您的元素 Flux/Mono.handle

One of the way that could simplify handling of an element which may result in an error or empty stream is operator handle.

可以简化可能导致错误或空流的元素处理的方法之一是 operator handle

The following code shows how we can use it in order to solve our problem:

以下代码显示了我们如何使用它来解决我们的问题:

Mono.just(userId)
    .map(repo::findById)
    .handle((user, sink) -> {
        if(!isValid(user)){
            sink.error(new InvalidUserException());
        } else if (isSendable(user))
            sink.next(user);
        }
        else {
            //just ignore element
        }
    })

as we can see, the .handleoperator requires to pass BiConsumer<T, SynchronousSink<>in order to handle an element. Here we have two parameters in our BiConsumer. The first one is an element from the upstream where the second one is SynchronousSinkwhich helps us to supply element to downstream synchronously. Such a technique expands an ability to supply different results of our element's processing. For example, in case the element is invalid, we can supply error to the same SycnchronousSyncwhich will cancel upstream and produce onErrorsignal to downstream. In turn, we can "filter" using the same handleoperator. Once the handle BiConsumeris executed and no element has been supplied, Reactor will consider that as kind of filtering and will request for an additional element for us. Finally, in case the element is valid, we can simply call SynchronousSink#nextand propagate our element downstream or apply some mapping on it, so we will have handleas the mapoperator here. Moreover, we can safely use that operator with no-performance impact and provide complex element verification such as validation of element or error sending to downstream.

正如我们所见,.handle操作符需要传递BiConsumer<T, SynchronousSink<>才能处理元素。这里我们的 BiConsumer 中有两个参数。第一个是来自上游的元素,第二个是SynchronousSink帮助我们同步向下游提供元素的元素。这种技术扩展了提供元素处理不同结果的能力。例如,如果元素无效,我们可以向相同的元素提供错误,SycnchronousSync这将取消上游onError并向下游产生信号。反过来,我们可以使用相同的handle运算符“过滤” 。一旦把手BiConsumer执行并且没有提供任何元素,Reactor 会认为这是一种过滤,并会为我们请求一个额外的元素。最后,如果元素有效,我们可以简单地SynchronousSink#next向下游调用和传播我们的元素或对其应用一些映射,因此我们将handlemap这里使用as操作符。此外,我们可以安全地使用该运算符而不会影响性能,并提供复杂的元素验证,例如元素验证或向下游发送错误。

Throws using #concatMap+ Mono.error

使用#concatMap+抛出Mono.error

One of the options to throw an exception during mapping is to replace mapwith concatMap. In its essence, concatMapdoes almost the same flatMapdoes. The only difference is that concatMapallows only one substream at a time. Such behavior simplifies internal implementation a lot and does not impact performance. So we can use the following code in order to throw an exception in a more functional way:

在映射期间抛出异常的选项之一是替换mapconcatMap。从本质上讲,concatMap几乎相同flatMap。唯一的区别是concatMap一次只允许一个子流。这种行为极大地简化了内部实现并且不会影响性能。因此,我们可以使用以下代码以更实用的方式抛出异常:

Mono.just(userId)
    .map(repo::findById)
    .concatMap(user-> {
        if(!isValid(user)){
            return Mono.error(new InvalidUserException());
        }
        return Mono.just(user);
    })

In the sample above in case of an invalid user, we return exception using Mono.error. The same we can do for flux using Flux.error:

在上面的示例中,如果用户无效,我们使用Mono.error. 我们可以使用Flux.error以下方法对通量做同样的事情:

Flux.just(userId1, userId2, userId3)
    .map(repo::findById)
    .concatMap(user-> {
        if(!isValid(user)){
            return Flux.error(new InvalidUserException());
        }
        return Mono.just(user);
    })

Note, in both cases we return coldstream which has only one element. In Reactor, there is a couple of optimizations that improve performance in the case returned stream is a cold scalarstream. Thus, it is recommended to use Flux/Mono concatMap+ .just, empty, erroras a result when we need more complex mapping, that could end up with return nullor throw new ....

注意,在这两种情况下,我们都返回只有一个元素的冷流。在 Reactor 中,有一些优化可以在返回的流是冷标量流的情况下提高性能。因此,建议使用 Flux/Mono concatMap+ .just, emptyerror因此当我们需要更复杂的映射时,可能会以return null或结尾throw new ...

Attention! Don't ever check incoming element on nullability. The Reactor Project will never send a nullvalue for you since this violates Reactive Streams spec (see Rule 2.13)Thus, in case if repo.findByIdreturns null, Reactor will throw NullPointerException for you.

注意力!永远不要检查传入元素的可空性。Reactor 项目永远不会null为您发送值,因为这违反了 Reactive Streams 规范(请参阅规则 2.13因此,如果repo.findById返回 null,Reactor 将为您抛出 NullPointerException。

Wait, Why concatMapis better than flatMap?

等等,为什么concatMapflatMap

In its essence, flatMapis designed to merge elements from the multiple substreams that is executing at a time. It means that flatMap should have asynchronous streams underneath so, they could potentially process data on the multiple threads or that could be a several network calls. Subsequently, such expectations impact implementation a lot so flatMapshould be able to handle data from the multiple streams (Threads) (means usage of concurrent data structures), enqueue elements if there is a draining from another stream (means additional memory allocation for Queues for each substream) and do not violate Reactive Streams specification rules (means really complex implementation). Counting all these facts and the fact that we replace a plain mapoperation (which is synchronous) onto the more convenient way of throwing an exception using Flux/Mono.error(which does not change synchronicity of execution) leads to the fact that we do not need such a complex operator and we can use much simpler concatMapwhich is designed for asynchronous handling of a single stream at a time and has a couple of optimization in order to handle scalar, cold stream.

从本质上讲,flatMap旨在合并来自一次执行的多个子流的元素。这意味着 flatMap 应该在下面有异步流,因此它们可能会在多个线程上处理数据,或者可能是多个网络调用。随后,这种期望对实现有很大影响,因此flatMap应该能够处理来自多个流 ( Threads) 的数据(意味着使用并发数据结构),如果从另一个流中排空,则将元素排入队列(意味着Queue为每个流分配额外的内存)substream)并且不违反 Reactive Streams 规范规则(意味着非常复杂的实现)。计算所有这些事实以及我们替换一个普通的事实map操作(它是同步的)到更方便的抛出异常的方式使用Flux/Mono.error(它不会改变执行的同步性)导致我们不需要这样一个复杂的操作符,我们可以使用更简单的concatMap为异步处理而设计的操作符一次处理单个流,并进行了一些优化以处理标量冷流。

Throws exception using switchOnEmpty

使用抛出异常 switchOnEmpty

So, another approach to throw an exception when the result is empty is switchOnEmptyoperator. The following code demonstrates how we can use that approach :

因此,另一种在结果为空时抛出异常的方法是switchOnEmptyoperator。以下代码演示了我们如何使用该方法:

Mono.just(userId)
    .flatMap(repo::findById)
    .switchIfEmpty(Mono.error(new UserNotFoundExeception()))

As we can see, in this case repo::findByIdshould have Monoof Useras the return type. Therefore, in case a Userinstance will not be found, the result stream will be empty. Thus, Reactor will call an alternative Mono, specified as switchIfEmptyparameter.

正如我们所见,在这种情况下repo::findById应该有MonoofUser作为返回类型。因此,如果User找不到实例,结果流将为空。因此,Reactor 将调用一个替代项Mono,指定为switchIfEmpty参数。

Throw your exception as is

按原样抛出您的异常

It could be counted as a less readable code or bad practice (my own opinion), but you can throw your exception as is with Project Reactor. Even though, in someway doing so can violates Reactive Streams specification (in this context violatefrom the semantic perspective, because your operator under the hood is a Subscriberin a chain of Subscribers, therefore - semantically, throwing an exception in lambda could be mapped to throwing exception in the onNextmethod which violates the spec's rule 2.13). However, since Reactor will catch the thrown exception for you and propagate it then as the onErrorsignal to your downstream, it is not prohibited to do that.

它可以被视为可读性较差的代码或不好的做法(我自己的意见),但是您可以像 Project Reactor 一样抛出异常。即使在某种程度上这样做可能会违反 Reactive Streams 规范(在这种情况下,从语义的角度来看是违反的,因为引擎盖下的运算符是sSubscriber链中Subscriber的一个,因此 - 从语义上讲,在 lambda 中抛出异常可以映射为抛出onNext违反规范规则 2.13的方法中的异常)。但是,由于 Reactor 会为您捕获抛出的异常并将其作为onError信号传播到您的下游,因此不禁止这样做。

Takeaways

外卖

  1. Use .handleoperator in order to provide complex element processing
  2. Use concatMap+ Mono.errorwhen we need to throw an exception during mapping but such a technique is most suitable for cases of asynchronous element processing.
  3. Use flatMap+ Mono.errorwhen we have already had flatMapin place
  4. Nullas a return type is forbidden so instead of nullin your downstream mapyou will get unexpected onErrorwith NullPointerException
  5. Use switchIfEmptyin all cases when you need to send an error signal if the result of calling some specific function finished with the emptystream
  1. 使用.handle运算符来提供复杂的元素处理
  2. 当我们需要在映射期间抛出异常时使用concatMap+Mono.error但这种技术最适合异步元素处理的情况。
  3. 当我们已经就位时使用flatMap+Mono.errorflatMap
  4. Null因为返回类型是被禁止的所以而不是null在你的下游map你会得到意想不到onErrorNullPointerException
  5. switchIfEmpty如果调用某些特定函数的结果以流结束,则在需要发送错误信号的所有情况下使用