Java block()/blockFirst()/blockLast() 在 exchange() 之后调用 bodyToMono 时阻塞错误
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/51449889/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
block()/blockFirst()/blockLast() are blocking error when calling bodyToMono AFTER exchange()
提问by DaithiG
I am trying to use Webflux to stream a generated file to another location, however, if the generation of the file ran into an error, the api returns success, but with a DTO detailing the errors while generating the file instead of the file itself. This is using a very old and poorly designed api so please excuse the use of post and the api design.
我正在尝试使用 Webflux 将生成的文件流式传输到另一个位置,但是,如果文件的生成遇到错误,api 将返回成功,但是在生成文件而不是文件本身时使用 DTO 详细说明错误。这是使用一个非常旧且设计糟糕的 api,所以请原谅使用 post 和 api 设计。
The response from the api call (exchange()) is a ClientResponse. From here I can either convert to a ByteArrayResource using bodyToMono which can be streamed to a file, or, if there is an error in creating the file, then I can convert to the DTO also using bodyToMono. However, I cannot seem to do either or depending on the contents of the header of ClientResponse.
来自 api 调用 (exchange()) 的响应是 ClientResponse。从这里我可以使用 bodyToMono 转换为 ByteArrayResource,它可以流式传输到文件,或者,如果创建文件时出现错误,那么我也可以使用 bodyToMono 转换为 DTO。但是,我似乎无法执行或取决于 ClientResponse 标头的内容。
In run time I get an IllegalStateException caused by
在运行时我得到一个 IllegalStateException 引起的
block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-client-epoll-12
block()/blockFirst()/blockLast() 是阻塞的,线程 reactor-http-client-epoll-12 不支持
I think my issue is that I cannot call block() twice in the same function chain.
我认为我的问题是我不能在同一个函数链中调用 block() 两次。
My code snippet is like so:
我的代码片段是这样的:
webClient.post()
.uri(uriBuilder -> uriBuilder.path("/file/")
.queryParams(params).build())
.exchange()
.doOnSuccess(cr -> {
if (MediaType.APPLICATION_JSON_UTF8.equals(cr.headers().contentType().get())) {
NoPayloadResponseDto dto = cr.bodyToMono(NoPayloadResponseDto.class).block();
createErrorFile(dto);
}
else {
ByteArrayResource bAr = cr.bodyToMono(ByteArrayResource.class).block();
createSpreadsheet(bAr);
}
}
)
.block();
Basically I want to process the ClientResponse differently based on the MediaType which is defined in the header.
基本上,我想根据标头中定义的 MediaType 以不同方式处理 ClientResponse。
Is this possible?
这可能吗?
采纳答案by Brian Clozel
First, a few things that will help you understand the code snippet solving this use case.
首先,有几件事可以帮助您理解解决此用例的代码片段。
- You should never call a blocking method within a method that returns a reactive type; you will block one of the few threads of your application and it is very bad for the application
- Anyway as of Reactor 3.2, blocking within a reactive pipeline throws an error
- Calling
subscribe
, as suggested in the comments, is not a good idea either. It is more or less like starting that job as a task in a separate thread. You'll get a callback when it's done (thesubscribe
methods can be given lambdas), but you're in fact decoupling your current pipeline with that task. In this case, the client HTTP response could be closed and resources cleaned before you get a chance to read the full response body to write it to a file - If you don't want to buffer the whole response in memory, Spring provides
DataBuffer
(think ByteBuffer instances that can be pooled). - You can call block if the method you're implementing is itself blocking (returning
void
for example), for example in a test case.
- 你永远不应该在返回响应式类型的方法中调用阻塞方法;您将阻塞应用程序的少数线程之一,这对应用程序非常不利
- 无论如何,从 Reactor 3.2 开始,在反应式管道中阻塞会引发错误
subscribe
正如评论中所建议的那样,调用也不是一个好主意。这或多或少就像在单独的线程中作为一项任务开始这项工作。完成后你会得到一个回调(subscribe
方法可以被赋予 lambdas),但实际上你正在将当前的管道与该任务分离。在这种情况下,在您有机会读取完整的响应正文以将其写入文件之前,可能会关闭客户端 HTTP 响应并清理资源- 如果您不想在内存中缓冲整个响应,Spring 提供了
DataBuffer
(想想可以池化的 ByteBuffer 实例)。 - 如果您正在实现的方法本身是阻塞的(
void
例如返回),您可以调用 block ,例如在测试用例中。
Here's a code snippet that you could use to do this:
这是您可以用来执行此操作的代码片段:
Mono<Void> fileWritten = WebClient.create().post()
.uri(uriBuilder -> uriBuilder.path("/file/").build())
.exchange()
.flatMap(response -> {
if (MediaType.APPLICATION_JSON_UTF8.equals(response.headers().contentType().get())) {
Mono<NoPayloadResponseDto> dto = response.bodyToMono(NoPayloadResponseDto.class);
return createErrorFile(dto);
}
else {
Flux<DataBuffer> body = response.bodyToFlux(DataBuffer.class);
return createSpreadsheet(body);
}
});
// Once you get that Mono, you should give plug it into an existing
// reactive pipeline, or call block on it, depending on the situation
As you can see, we're not blocking anywhere and methods dealing with I/O are returning Mono<Void>
, which is the reactive equivalent of a done(error)
callback that signals when things are done and if an error happened.
正如您所看到的,我们不会在任何地方阻塞,并且处理 I/O 的方法正在返回Mono<Void>
,这相当于done(error)
回调的反应性,用于在事情完成和发生错误时发出信号。
Since I'm not sure what the createErrorFile
method should do, I've provided a sample for createSpreadsheet
that just writes the body bytes to a file. Note that since databuffers might be recycled/pooled, we need to release them once we're done.
由于我不确定该createErrorFile
方法应该做什么,因此我提供了一个示例,用于createSpreadsheet
将正文字节写入文件。请注意,由于数据缓冲区可能会被回收/汇集,我们需要在完成后释放它们。
private Mono<Void> createSpreadsheet(Flux<DataBuffer> body) {
try {
Path file = //...
WritableByteChannel channel = Files.newByteChannel(file, StandardOpenOption.WRITE);
return DataBufferUtils.write(body, channel).map(DataBufferUtils::release).then();
} catch (IOException exc) {
return Mono.error(exc);
}
}
With this implementation, your application will hold a few DataBuffer
instances in memory at a given time (the reactive operators are prefetching values for performance reasons) and will write bytes as they come in a reactive fashion.
使用此实现,您的应用程序将DataBuffer
在给定时间在内存中保存一些实例(响应式运算符出于性能原因预取值),并在它们以响应式方式出现时写入字节。
回答by Nurlan Rysbaev
RestResultMessage message= createWebClient()
.get()
.uri(uri)
.exchange()
.map(clientResponse -> {
//delegation
ClientResponseWrapper wrapper = new
ClientResponseWrapper(clientResponse);
return Mono.just(wrapper);
})
.block() //wait until request is not done
.map(result -> {
//convert to any data
if (!result.statusCode().isError()){
//extract the result from request
return create(RestResultMessage.Result.success, result.bodyToMono(String.class).block());}
} else {
return create(RestResultMessage.Result.error, result.statusCode().name());
}
})
.block();
回答by KernelMode
In my case, I just had to replace exchange
and block
with retrieve
.
就我而言,我只需要将exchange
和替换block
为retrieve
.
This one caused the error:
这导致了错误:
Mono<Boolean> booleanMono = webClient.get()
.exchange().block().bodyToMono(Boolean.class);
Replacing the above with the following line solved my issue:
用以下行替换上面的内容解决了我的问题:
Mono<Boolean> booleanMono = webClient.get()
.retrieve().bodyToMono(Boolean.class);