java 执行 Flux.map() 时如何处理错误
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/36237230/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
How to handle error while executing Flux.map()
提问by Victor
I′m trying to figure out how handle errors when mapping elements inside a Flux.
我试图弄清楚在 Flux 中映射元素时如何处理错误。
For instance, I′m parsing a CSV string into one of my business POJOs:
例如,我正在将一个 CSV 字符串解析为我的业务 POJO 之一:
myflux.map(stock -> converter.convertHistoricalCSVToStockQuotation(stock));
Some of this lines might contain errors, so what I get in the log is:
其中一些行可能包含错误,所以我在日志中得到的是:
reactor.core.publisher.FluxLog: onNext([SOME_BOGUS_QUOTE]@38.09 (Fri Apr 08 00:00:00 CEST 2016) H(38.419998)/L(37.849998)/O(37.970001))
reactor.core.publisher.FluxLog: onNext([SOME_BOGUS_QUOTE]@38.130001 (Thu Apr 07 00:00:00 CEST 2016) H(38.189999)/L(37.610001)/O(37.799999))
reactor.core.publisher.FluxLog: onError(java.lang.IllegalArgumentException: Invalid CSV stock quotation: SOME_BOGUS_QUOTE,trololo)
reactor.core.publisher.FluxLog: java.lang.IllegalArgumentException: Invalid CSV stock quotation: SOME_BOGUS_QUOTE,trololo
I read in the API some error handling methods, but most refered to returning an "error value" or using a fallback Flux, like this one:
我在 API 中阅读了一些错误处理方法,但大多数是指返回“错误值”或使用回退 Flux,如下所示:
Flux.onErrorResumeWith(myflux, x -> Mono.fromCallable(() -> ... do stuff);
However, using this with my myflux
means that the whole flux is processed again.
但是,将其与我一起使用myflux
意味着再次处理整个通量。
So, is there a way to handle errors while processing particular elements (I.e ignoring them/Logging them) and keep processing the rest of the flux?
那么,有没有办法在处理特定元素时处理错误(即忽略它们/记录它们)并继续处理其余的通量?
UPDATE with @akarnokd workaround
使用@akarnokd 解决方法更新
public Flux<StockQuotation> getQuotes(List<String> tickers)
{
Flux<StockQuotation> processingFlux = Flux.fromIterable(tickers)
// Get each set of quotes in a separate thread
.flatMap(s -> Mono.fromCallable(() -> feeder.getCSVQuotes(s)))
// Convert each list of raw quotes string in a new Flux<String>
.flatMap(list -> Flux.fromIterable(list))
// Convert the string to POJOs
.flatMap(x -> {
try {
return Flux.just(converter.convertHistoricalCSVToStockQuotation(x));
}
catch (IllegalArgumentException ex){
System.out.println("Error decoding stock quotation: " + x);
return Flux.empty();
}
});
return processingFlux;
}
This works as a charm, however, as you can see the code is less elegant than before. Does not the Flux API have any method to do what this code does?
然而,这很有用,因为您可以看到代码没有以前那么优雅。Flux API 没有任何方法可以执行此代码的操作吗?
retry(...)
retryWhen(...)
onErrorResumeWith(...)
onErrorReturn(...)
采纳答案by akarnokd
You need flatMap
instead which let's you return an empty sequence if the processing failed:
flatMap
如果处理失败,您需要让我们返回一个空序列:
myflux.flatMap(v -> {
try {
return Flux.just(converter.convertHistoricalCSVToStockQuotation(stock));
} catch (IllegalArgumentException ex) {
return Flux.empty();
}
});
回答by Tsvetan Ovedenski
If you want to use Reactor 3's methods for dealing with exceptions, you can use Mono.fromCallable
.
如果你想使用 Reactor 3 的方法来处理异常,你可以使用Mono.fromCallable
.
flatMap(x ->
Mono.fromCallable(() -> converter.convertHistoricalCSVToStockQuotation(x))
.flux()
.flatMap(Flux::fromIterable)
.onErrorResume(Flux::empty)
)
Unfortunately there is no Flux.fromCallable
, so assuming the callable returns a list, you have to convert it to a Flux manually.
不幸的是没有Flux.fromCallable
,所以假设可调用返回一个列表,您必须手动将其转换为 Flux。
回答by code4kix
With the current version of Reactor 3, quite a few methods have been added. So we could do something like so:
在当前版本的 Reactor 3 中,添加了相当多的方法。所以我们可以这样做:
Flux.onErrorResume(error -> {
System.out.println("Error decoding stock quotation: " + e);
return Flux.empty();
});
See more info on how to handle errors here
在此处查看有关如何处理错误的更多信息
回答by seanzxx
You can use onErrorContinue. It allows recovering from errors by dropping the trouble element and continuing with the subsequent elements.
您可以使用 onErrorContinue。它允许通过删除故障元素并继续后续元素来从错误中恢复。
回答by CanaBeasT
...
// Convert the string to POJOs
.flatMap(x ->
Flux.just(converter.convertHistoricalCSVToStockQuotation(x))
.doOnError(IllegalArgumentException.class,
e -> System.out.println("Error decoding stock quotation: " + x))
//.onErrorStop()
.onErrorResume(IllegalArgumentException.class, e -> Flux.empty())
)
...