java 如何正确读取 Flux<DataBuffer> 并将其转换为单个 inputStream
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/46460599/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
How to correctly read Flux<DataBuffer> and convert it to a single inputStream
提问by Bk Santiago
I'm using WebClientand custom BodyExtractorclass for my spring-boot application
我正在为我的 spring-boot 应用程序使用WebClient自定义BodyExtractor类
WebClient webLCient = WebClient.create();
webClient.get()
.uri(url, params)
.accept(MediaType.APPLICATION.XML)
.exchange()
.flatMap(response -> {
return response.body(new BodyExtractor());
})
BodyExtractor.java
体提取器.java
@Override
public Mono<T> extract(ClientHttpResponse response, BodyExtractor.Context context) {
Flux<DataBuffer> body = response.getBody();
body.map(dataBuffer -> {
try {
JaxBContext jc = JaxBContext.newInstance(SomeClass.class);
Unmarshaller unmarshaller = jc.createUnmarshaller();
return (T) unmarshaller.unmarshal(dataBuffer.asInputStream())
} catch(Exception e){
return null;
}
}).next();
}
Above code works with small payload but not on a large payload, I think it's because I'm only reading a single flux value with nextand I'm not sure how to combine and read all dataBuffer.
上面的代码适用于小有效载荷但不适用于大有效载荷,我认为这是因为我只读取了单个通量值,next我不确定如何组合和读取所有dataBuffer.
I'm new to reactor, so I don't know a lot of tricks with flux/mono.
我是反应堆的新手,所以我不知道很多通量/单声道的技巧。
采纳答案by Bk Santiago
I was able to make it work by using Flux#collectand SequenceInputStream
我能够通过使用Flux#collect和SequenceInputStream
@Override
public Mono<T> extract(ClientHttpResponse response, BodyExtractor.Context context) {
Flux<DataBuffer> body = response.getBody();
return body.collect(InputStreamCollector::new, (t, dataBuffer)-> t.collectInputStream(dataBuffer.asInputStream))
.map(inputStream -> {
try {
JaxBContext jc = JaxBContext.newInstance(SomeClass.class);
Unmarshaller unmarshaller = jc.createUnmarshaller();
return (T) unmarshaller.unmarshal(inputStream);
} catch(Exception e){
return null;
}
}).next();
}
InputStreamCollector.java
InputStreamCollector.java
public class InputStreamCollector {
private InputStream is;
public void collectInputStream(InputStream is) {
if (this.is == null) this.is = is;
this.is = new SequenceInputStream(this.is, is);
}
public InputStream getInputStream() {
return this.is;
}
}
回答by samanime
A slightly modified version of Bk Santiago's answer makes use of reduce()instead of collect(). Very similar, but doesn't require an extra class:
Bk Santiago 的答案的稍微修改版本使用了reduce()代替collect()。非常相似,但不需要额外的课程:
Java:
爪哇:
body.reduce(new InputStream() {
public int read() { return -1; }
}, (s: InputStream, d: DataBuffer) -> new SequenceInputStream(s, d.asInputStream())
).flatMap(inputStream -> /* do something with single InputStream */
Or Kotlin:
或科特林:
body.reduce(object : InputStream() {
override fun read() = -1
}) { s: InputStream, d -> SequenceInputStream(s, d.asInputStream()) }
.flatMap { inputStream -> /* do something with single InputStream */ }
Benefit of this approach over using collect()is simply you don't need to have a different class to gather things up.
这种方法比使用的好处collect()是你不需要有一个不同的类来收集东西。
I created a new empty InputStream(), but if that syntax is confusing, you can also replace it with ByteArrayInputStream("".toByteArray())instead to create an empty ByteArrayInputStreamas your initial value instead.
我创建了一个新的 empty InputStream(),但如果该语法令人困惑,您也可以将其替换为ByteArrayInputStream("".toByteArray())改为创建一个空ByteArrayInputStream作为初始值。
回答by user1585916
This is really not as complicated as other answers imply.
这真的没有其他答案暗示的那么复杂。
The only way to stream the data without buffering it all in memory is to use a pipe, as @jin-kwon suggested. However, it can be done very simply by using Spring's BodyExtractorsand DataBufferUtilsutility classes.
正如@jin-kwon 建议的那样,流式传输数据而不将其全部缓存在内存中的唯一方法是使用管道。但是,它可以通过使用 Spring 的BodyExtractors和DataBufferUtils实用程序类非常简单地完成。
Example:
例子:
private InputStream readAsInputStream(String url) throws IOException {
PipedOutputStream osPipe = new PipedOutputStream();
PipedInputSteam isPipe = new PipedInputStream(osPipe);
ClientResponse response = webClient.get().uri(url)
.accept(MediaType.APPLICATION.XML)
.exchange()
.block();
final int statusCode = response.rawStatusCode();
// check HTTP status code, can throw exception if needed
// ....
Flux<DataBuffer> body = response.body(BodyExtractors.toDataBuffers())
.doOnError(t -> {
log.error("Error reading body.", t);
// close pipe to force InputStream to error,
// otherwise the returned InputStream will hang forever if an error occurs
try(isPipe) {
//no-op
} catch (IOException ioe) {
log.error("Error closing streams", ioe);
}
})
.doFinally(s -> {
try(osPipe) {
//no-op
} catch (IOException ioe) {
log.error("Error closing streams", ioe);
}
});
DataBufferUtils.write(body, osPipe)
.subscribe(DataBufferUtils.releaseConsumer());
return isPipe;
}
If you don't care about checking the response code or throwing an exception for a failure status code, you can skip the block()call and intermediate ClientResponsevariable by using
如果您不关心检查响应代码或为失败状态代码抛出异常,则可以通过使用跳过block()调用和中间ClientResponse变量
flatMap(r -> r.body(BodyExtractors.toDataBuffers()))
instead.
反而。
回答by Jin Kwon
Here comes another variant from other answers. And it's still not memory-friendly.
这是其他答案的另一种变体。而且它仍然不是内存友好的。
static Mono<InputStream> asStream(WebClient.ResponseSpec response) {
return response.bodyToFlux(DataBuffer.class)
.map(b -> b.asInputStream(true))
.reduce(SequenceInputStream::new);
}
static void doSome(WebClient.ResponseSpec response) {
asStream(response)
.doOnNext(stream -> {
// do some with stream
})
.block();
}
回答by Jin Kwon
You can use pipes.
您可以使用管道。
static <R> Mono<R> pipeAndApply(
final Publisher<DataBuffer> source, final Executor executor,
final Function<? super ReadableByteChannel, ? extends R> function) {
return using(Pipe::open,
p -> {
executor.execute(() -> write(source, p.sink())
.doFinally(s -> {
try {
p.sink().close();
} catch (final IOException ioe) {
log.error("failed to close pipe.sink", ioe);
throw new RuntimeException(ioe);
}
})
.subscribe(releaseConsumer()));
return just(function.apply(p.source()));
},
p -> {
try {
p.source().close();
} catch (final IOException ioe) {
log.error("failed to close pipe.source", ioe);
throw new RuntimeException(ioe);
}
});
}
Or using CompletableFuture,
或使用CompletableFuture,
static <R> Mono<R> pipeAndApply(
final Publisher<DataBuffer> source,
final Function<? super ReadableByteChannel, ? extends R> function) {
return using(Pipe::open,
p -> fromFuture(supplyAsync(() -> function.apply(p.source())))
.doFirst(() -> write(source, p.sink())
.doFinally(s -> {
try {
p.sink().close();
} catch (final IOException ioe) {
log.error("failed to close pipe.sink", ioe);
throw new RuntimeException(ioe);
}
})
.subscribe(releaseConsumer())),
p -> {
try {
p.source().close();
} catch (final IOException ioe) {
log.error("failed to close pipe.source", ioe);
throw new RuntimeException(ioe);
}
});
}
回答by Abhijit Sarkar
Reconstructing the InputStreamdefeats the purpose of using WebClientin the first place because nothing will be emitted until the collectoperation completes. For a large stream, that can be a very long time. The reactive model doesn't deal with individual bytes, but blocks of bytes (like Spring DataBuffer). See my answer here for a more elegant solution: https://stackoverflow.com/a/48054615/839733
重构首先InputStream破坏了使用的目的,WebClient因为在collect操作完成之前不会发出任何内容。对于大流,这可能是很长的时间。反应式模型不处理单个字节,而是处理字节块(如 Spring DataBuffer)。在这里查看我的答案以获得更优雅的解决方案:https: //stackoverflow.com/a/48054615/839733

