java 如何正确读取 Flux<DataBuffer> 并将其转换为单个 inputStream
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/46460599/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
How to correctly read Flux<DataBuffer> and convert it to a single inputStream
提问by Bk Santiago
I'm using WebClient
and custom BodyExtractor
class for my spring-boot application
我正在为我的 spring-boot 应用程序使用WebClient
自定义BodyExtractor
类
WebClient webLCient = WebClient.create();
webClient.get()
.uri(url, params)
.accept(MediaType.APPLICATION.XML)
.exchange()
.flatMap(response -> {
return response.body(new BodyExtractor());
})
BodyExtractor.java
体提取器.java
@Override
public Mono<T> extract(ClientHttpResponse response, BodyExtractor.Context context) {
Flux<DataBuffer> body = response.getBody();
body.map(dataBuffer -> {
try {
JaxBContext jc = JaxBContext.newInstance(SomeClass.class);
Unmarshaller unmarshaller = jc.createUnmarshaller();
return (T) unmarshaller.unmarshal(dataBuffer.asInputStream())
} catch(Exception e){
return null;
}
}).next();
}
Above code works with small payload but not on a large payload, I think it's because I'm only reading a single flux value with next
and I'm not sure how to combine and read all dataBuffer
.
上面的代码适用于小有效载荷但不适用于大有效载荷,我认为这是因为我只读取了单个通量值,next
我不确定如何组合和读取所有dataBuffer
.
I'm new to reactor, so I don't know a lot of tricks with flux/mono.
我是反应堆的新手,所以我不知道很多通量/单声道的技巧。
采纳答案by Bk Santiago
I was able to make it work by using Flux#collect
and SequenceInputStream
我能够通过使用Flux#collect
和SequenceInputStream
@Override
public Mono<T> extract(ClientHttpResponse response, BodyExtractor.Context context) {
Flux<DataBuffer> body = response.getBody();
return body.collect(InputStreamCollector::new, (t, dataBuffer)-> t.collectInputStream(dataBuffer.asInputStream))
.map(inputStream -> {
try {
JaxBContext jc = JaxBContext.newInstance(SomeClass.class);
Unmarshaller unmarshaller = jc.createUnmarshaller();
return (T) unmarshaller.unmarshal(inputStream);
} catch(Exception e){
return null;
}
}).next();
}
InputStreamCollector.java
InputStreamCollector.java
public class InputStreamCollector {
private InputStream is;
public void collectInputStream(InputStream is) {
if (this.is == null) this.is = is;
this.is = new SequenceInputStream(this.is, is);
}
public InputStream getInputStream() {
return this.is;
}
}
回答by samanime
A slightly modified version of Bk Santiago's answer makes use of reduce()
instead of collect()
. Very similar, but doesn't require an extra class:
Bk Santiago 的答案的稍微修改版本使用了reduce()
代替collect()
。非常相似,但不需要额外的课程:
Java:
爪哇:
body.reduce(new InputStream() {
public int read() { return -1; }
}, (s: InputStream, d: DataBuffer) -> new SequenceInputStream(s, d.asInputStream())
).flatMap(inputStream -> /* do something with single InputStream */
Or Kotlin:
或科特林:
body.reduce(object : InputStream() {
override fun read() = -1
}) { s: InputStream, d -> SequenceInputStream(s, d.asInputStream()) }
.flatMap { inputStream -> /* do something with single InputStream */ }
Benefit of this approach over using collect()
is simply you don't need to have a different class to gather things up.
这种方法比使用的好处collect()
是你不需要有一个不同的类来收集东西。
I created a new empty InputStream()
, but if that syntax is confusing, you can also replace it with ByteArrayInputStream("".toByteArray())
instead to create an empty ByteArrayInputStream
as your initial value instead.
我创建了一个新的 empty InputStream()
,但如果该语法令人困惑,您也可以将其替换为ByteArrayInputStream("".toByteArray())
改为创建一个空ByteArrayInputStream
作为初始值。
回答by user1585916
This is really not as complicated as other answers imply.
这真的没有其他答案暗示的那么复杂。
The only way to stream the data without buffering it all in memory is to use a pipe, as @jin-kwon suggested. However, it can be done very simply by using Spring's BodyExtractorsand DataBufferUtilsutility classes.
正如@jin-kwon 建议的那样,流式传输数据而不将其全部缓存在内存中的唯一方法是使用管道。但是,它可以通过使用 Spring 的BodyExtractors和DataBufferUtils实用程序类非常简单地完成。
Example:
例子:
private InputStream readAsInputStream(String url) throws IOException {
PipedOutputStream osPipe = new PipedOutputStream();
PipedInputSteam isPipe = new PipedInputStream(osPipe);
ClientResponse response = webClient.get().uri(url)
.accept(MediaType.APPLICATION.XML)
.exchange()
.block();
final int statusCode = response.rawStatusCode();
// check HTTP status code, can throw exception if needed
// ....
Flux<DataBuffer> body = response.body(BodyExtractors.toDataBuffers())
.doOnError(t -> {
log.error("Error reading body.", t);
// close pipe to force InputStream to error,
// otherwise the returned InputStream will hang forever if an error occurs
try(isPipe) {
//no-op
} catch (IOException ioe) {
log.error("Error closing streams", ioe);
}
})
.doFinally(s -> {
try(osPipe) {
//no-op
} catch (IOException ioe) {
log.error("Error closing streams", ioe);
}
});
DataBufferUtils.write(body, osPipe)
.subscribe(DataBufferUtils.releaseConsumer());
return isPipe;
}
If you don't care about checking the response code or throwing an exception for a failure status code, you can skip the block()
call and intermediate ClientResponse
variable by using
如果您不关心检查响应代码或为失败状态代码抛出异常,则可以通过使用跳过block()
调用和中间ClientResponse
变量
flatMap(r -> r.body(BodyExtractors.toDataBuffers()))
instead.
反而。
回答by Jin Kwon
Here comes another variant from other answers. And it's still not memory-friendly.
这是其他答案的另一种变体。而且它仍然不是内存友好的。
static Mono<InputStream> asStream(WebClient.ResponseSpec response) {
return response.bodyToFlux(DataBuffer.class)
.map(b -> b.asInputStream(true))
.reduce(SequenceInputStream::new);
}
static void doSome(WebClient.ResponseSpec response) {
asStream(response)
.doOnNext(stream -> {
// do some with stream
})
.block();
}
回答by Jin Kwon
You can use pipes.
您可以使用管道。
static <R> Mono<R> pipeAndApply(
final Publisher<DataBuffer> source, final Executor executor,
final Function<? super ReadableByteChannel, ? extends R> function) {
return using(Pipe::open,
p -> {
executor.execute(() -> write(source, p.sink())
.doFinally(s -> {
try {
p.sink().close();
} catch (final IOException ioe) {
log.error("failed to close pipe.sink", ioe);
throw new RuntimeException(ioe);
}
})
.subscribe(releaseConsumer()));
return just(function.apply(p.source()));
},
p -> {
try {
p.source().close();
} catch (final IOException ioe) {
log.error("failed to close pipe.source", ioe);
throw new RuntimeException(ioe);
}
});
}
Or using CompletableFuture
,
或使用CompletableFuture
,
static <R> Mono<R> pipeAndApply(
final Publisher<DataBuffer> source,
final Function<? super ReadableByteChannel, ? extends R> function) {
return using(Pipe::open,
p -> fromFuture(supplyAsync(() -> function.apply(p.source())))
.doFirst(() -> write(source, p.sink())
.doFinally(s -> {
try {
p.sink().close();
} catch (final IOException ioe) {
log.error("failed to close pipe.sink", ioe);
throw new RuntimeException(ioe);
}
})
.subscribe(releaseConsumer())),
p -> {
try {
p.source().close();
} catch (final IOException ioe) {
log.error("failed to close pipe.source", ioe);
throw new RuntimeException(ioe);
}
});
}
回答by Abhijit Sarkar
Reconstructing the InputStream
defeats the purpose of using WebClient
in the first place because nothing will be emitted until the collect
operation completes. For a large stream, that can be a very long time. The reactive model doesn't deal with individual bytes, but blocks of bytes (like Spring DataBuffer
). See my answer here for a more elegant solution: https://stackoverflow.com/a/48054615/839733
重构首先InputStream
破坏了使用的目的,WebClient
因为在collect
操作完成之前不会发出任何内容。对于大流,这可能是很长的时间。反应式模型不处理单个字节,而是处理字节块(如 Spring DataBuffer
)。在这里查看我的答案以获得更优雅的解决方案:https: //stackoverflow.com/a/48054615/839733