java 如何使用 AsyncRestTemplate 同时进行多个调用?
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/35214384/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
How to use AsyncRestTemplate to make multiple calls simultaneously?
提问by Glide
I don't understand how to use AsyncRestTemplate
effectively for making external service calls. For the code below:
我不明白如何AsyncRestTemplate
有效地进行外部服务调用。对于下面的代码:
class Foo {
public void doStuff() {
Future<ResponseEntity<String>> future1 = asyncRestTemplate.getForEntity(
url1, String.class);
String response1 = future1.get();
Future<ResponseEntity<String>> future2 = asyncRestTemplate.getForEntity(
url2, String.class);
String response2 = future2.get();
Future<ResponseEntity<String>> future3 = asyncRestTemplate.getForEntity(
url3, String.class);
String response3 = future3.get();
}
}
Ideally I want to execute all 3 calls simultaneously and process the results once they're all done. Howevereach external service call is notfetched until get()
is called but get()
is blocked. So doesn't that defeat the purpose of AsyncRestTemplate
? I might as well use RestTemplate
.
理想情况下,我想同时执行所有 3 个调用,并在它们全部完成后处理结果。然而,每个外部服务调用在被调用之前不会被获取,get()
而是get()
被阻止。那么这不是违背了目的AsyncRestTemplate
吗?我还不如用RestTemplate
.
So I don't understaand how I can get them to execute simultaneously?
所以我不明白如何让它们同时执行?
回答by diginoise
Simply don't call blocking get()
before dispatching all of your asynchronous calls:
get()
在调度所有异步调用之前不要调用阻塞:
class Foo {
public void doStuff() {
ListenableFuture<ResponseEntity<String>> future1 = asyncRestTemplate
.getForEntity(url1, String.class);
ListenableFuture<ResponseEntity<String>> future2 = asyncRestTemplate
.getForEntity(url2, String.class);
ListenableFuture<ResponseEntity<String>> future3 = asyncRestTemplate
.getForEntity(url3, String.class);
String response1 = future1.get();
String response2 = future2.get();
String response3 = future3.get();
}
}
You can do both dispatchand getin loops, but note that current results gathering is inefficient as it would get stuck on the next unfinished future.
您可以同时执行dispatch和getin 循环,但请注意,当前的结果收集效率低下,因为它会卡在下一个未完成的未来。
You could add all the futures to a collection, and iterate through it testing each future for non blocking isDone()
. When that call returns true, you can then call get()
.
您可以将所有期货添加到一个集合中,并通过它迭代测试每个期货是否为非阻塞isDone()
。当该调用返回 true 时,您可以调用get()
.
This way your en masse results gathering will be optimised rather than waiting on the next slow future result in the order of calling get()
s.
这样,您的整体结果收集将得到优化,而不是按照调用get()
s的顺序等待下一个缓慢的未来结果。
Better still you can register callbacks (runtimes) within each ListenableFuture
returned by AccyncRestTemplate
and you don't have to worry about cyclically inspecting the potential results.
更好的是,您可以在每个ListenableFuture
返回的内部注册回调(运行时),AccyncRestTemplate
而不必担心循环检查潜在结果。
回答by lkz
If you don't have to use 'AsyncRestTemplate' I would suggest to use RxJava instead. RxJava zipoperator is what you are looking for. Check code below:
如果您不必使用“AsyncRestTemplate”,我建议您改用 RxJava。RxJava zip运算符正是您要找的。检查下面的代码:
private rx.Observable<String> externalCall(String url, int delayMilliseconds) {
return rx.Observable.create(
subscriber -> {
try {
Thread.sleep(delayMilliseconds); //simulate long operation
subscriber.onNext("response(" + url + ") ");
subscriber.onCompleted();
} catch (InterruptedException e) {
subscriber.onError(e);
}
}
);
}
public void callServices() {
rx.Observable<String> call1 = externalCall("url1", 1000).subscribeOn(Schedulers.newThread());
rx.Observable<String> call2 = externalCall("url2", 4000).subscribeOn(Schedulers.newThread());
rx.Observable<String> call3 = externalCall("url3", 5000).subscribeOn(Schedulers.newThread());
rx.Observable.zip(call1, call2, call3, (resp1, resp2, resp3) -> resp1 + resp2 + resp3)
.subscribeOn(Schedulers.newThread())
.subscribe(response -> System.out.println("done with: " + response));
}
All requests to external services will be executed in separate threads, when last call will be finished transformation function( in example simple string concatenation) will be applied and result (concatenated string) will be emmited from 'zip' observable.
对外部服务的所有请求都将在单独的线程中执行,当最后一次调用完成时,将应用转换函数(在示例中为简单字符串连接)并且结果(连接字符串)将从 'zip' observable 中发出。
回答by Vikrant Kashyap
What I Understand by Your question is You have a predefined asynchronous method and you try to do is call this method asynchoronously using RestTemplate Class.
我对你的问题的理解是你有一个预定义的异步方法,你尝试做的是使用 RestTemplate 类异步调用这个方法。
I have wrote a method that will help you out to call Your method asynchoronously.
我写了一个方法,可以帮助你异步调用你的方法。
public void testMyAsynchronousMethod(String... args) throws Exception {
// Start the clock
long start = System.currentTimeMillis();
// Kick of multiple, asynchronous lookups
Future<String> future1 = asyncRestTemplate
.getForEntity(url1, String.class);;
Future<String> future2 = asyncRestTemplate
.getForEntity(url2, String.class);
Future<String> future3 = asyncRestTemplate
.getForEntity(url3, String.class);
// Wait until they are all done
while (!(future1 .isDone() && future2.isDone() && future3.isDone())) {
Thread.sleep(10); //10-millisecond pause between each check
}
// Print results, including elapsed time
System.out.println("Elapsed time: " + (System.currentTimeMillis() - start));
System.out.println(future1.get());
System.out.println(future2.get());
System.out.println(future3.get());
}
回答by Kuvaldis
You might want to use CompletableFuture
class (javadoc).
您可能想要使用CompletableFuture
类 ( javadoc)。
Transform your calls into
CompletableFuture
. For instance.final CompletableFuture<ResponseEntity<String>> cf = CompletableFuture.supplyAsync(() -> { try { return future.get(); } catch (InterruptedException | ExecutionException e) { throw new RuntimeException(e); } });
Next call
CompletableFuture::allOf
method with your 3 newly created completable futures.Call
join()
method on the result. After the resulting completable future is resolved you can get the results from each separate completable future you've created on step 3.
将您的电话转换为
CompletableFuture
. 例如。final CompletableFuture<ResponseEntity<String>> cf = CompletableFuture.supplyAsync(() -> { try { return future.get(); } catch (InterruptedException | ExecutionException e) { throw new RuntimeException(e); } });
Next call
CompletableFuture::allOf
方法与您新创建的 3 个可完成期货。join()
对结果调用方法。在生成的可完成未来得到解决后,您可以从您在第 3 步中创建的每个单独的可完成未来获得结果。
回答by Rowanto
I think you are misunderstanding a few things here. When you call the getForEntity method, the requests are already fired. When the get() method of the future object is called, you are just waiting for the request to complete. So in order fire all those three requests on the same subsecond, you just have to do:
我认为你在这里误解了一些事情。当您调用 getForEntity 方法时,请求已被触发。当future对象的get()方法被调用时,你只是在等待请求完成。因此,为了在同一亚秒内触发所有这三个请求,您只需要执行以下操作:
// Each of the lines below will fire an http request when it's executed
Future<ResponseEntity<String>> future1 = asyncRestTemplate.getForEntity(url1, String.class);
Future<ResponseEntity<String>> future2 = asyncRestTemplate.getForEntity(url2, String.class);
Future<ResponseEntity<String>> future3 = asyncRestTemplate.getForEntity(url3, String.class);
After all these codes are run, all the requests are already fired (most probably in the same subsecond). Then you can do whatever you want in the meanwhile. As soon as you call any of the get() method, you are waiting for each request to complete. If they are already completed, then it will just return immediately.
运行所有这些代码后,所有请求都已被触发(很可能在同一亚秒内)。然后你可以在此期间做任何你想做的事情。只要您调用任何 get() 方法,您就在等待每个请求完成。如果它们已经完成,那么它会立即返回。
// do whatever you want in the meantime
// get the response of the http call and wait if it's not completed
String response1 = future1.get();
String response2 = future2.get();
String response3 = future3.get();
回答by Abhijit Sarkar
I don't think any of the previous answers actually achieve parallelism. The problem with @diginoise response is that it doesn't actually achieve parallelism. As soon as we call get
, we're blocked. Consider that the calls are really slow such that future1
takes 3 seconds to complete, future2
2 seconds and future3
3 seconds again. With 3 get
calls one after another, we end up waiting 3 + 2 + 3 = 8 seconds.
@Vikrant Kashyap answer blocks as well on while (!(future1 .isDone() && future2.isDone() && future3.isDone()))
. Besides the while
loop is a pretty ugly looking piece of code for 3 futures, what if you have more? @lkz answer uses a different technology than you asked for, and even then, I'm not sure if zip
is going to do the job. From ObservableJavadoc:
我认为以前的任何答案实际上都没有实现并行性。@diginoise 响应的问题在于它实际上并没有实现并行性。我们一打电话get
,就被屏蔽了。考虑到调用真的很慢,future1
需要 3 秒才能完成,又需要future2
2 秒和future3
3 秒。随着 3 个get
接一个的调用,我们最终等待了 3 + 2 + 3 = 8 秒。@Vikrant Kashyap 在while (!(future1 .isDone() && future2.isDone() && future3.isDone()))
. 除了while
循环是一段非常难看的代码,用于 3 个期货,如果你有更多呢?@lkz 回答使用的技术与您要求的不同,即使那样,我也不确定是否zip
会完成这项工作。来自ObservableJavadoc:
zip applies this function in strict sequence, so the first item emitted by the new Observable will be the result of the function applied to the first item emitted by each of the source Observables; the second item emitted by the new Observable will be the result of the function applied to the second item emitted by each of those Observables; and so forth.
zip 以严格的顺序应用此函数,因此新 Observable 发出的第一项将是应用于每个源 Observable 发出的第一项的函数的结果;新 Observable 发出的第二项将是应用于每个 Observable 发出的第二项的函数的结果;等等。
Due to Spring's widespread popularity, they try very hard to maintain backward compatibility and in doing so, sometimes make compromises with the API. AsyncRestTemplate
methods returning ListenableFuture
is one such case. If they committed to Java 8+, CompletableFuture
could be used instead. Why? Since we won't be dealing with thread pools directly, we don't have a good way to know when all the ListenableFutures have completed. CompletableFuture
has an allOf
method that creates a new CompletableFuture
that is completed when all of the given CompletableFutures complete. Since we don't have that in ListenableFuture
, we will have to improvise.
I've not compiled the following code but it should be clear what I'm trying to do. I'm using Java 8 because it's end of 2016.
由于 Spring 的广泛流行,他们非常努力地保持向后兼容性,并且在这样做时有时会与 API 妥协。AsyncRestTemplate
方法返回ListenableFuture
就是这样一种情况。如果他们致力于 Java 8+,则CompletableFuture
可以改用。为什么?由于我们不会直接处理线程池,因此我们无法知道所有的 ListenableFuture 何时完成。CompletableFuture
有一个allOf
方法来创建一个新的CompletableFuture
,当所有给定的 CompletableFutures 完成时完成。由于我们没有那个ListenableFuture
,我们将不得不即兴发挥。我没有编译以下代码,但应该很清楚我要做什么。我正在使用 Java 8,因为它是 2016 年底。
// Lombok FTW
@RequiredArgsConstructor
public final class CounterCallback implements ListenableFutureCallback<ResponseEntity<String>> {
private final LongAdder adder;
public void onFailure(Throwable ex) {
adder.increment();
}
public void onSuccess(ResponseEntity<String> result) {
adder.increment();
}
}
ListenableFuture<ResponseEntity<String>> f1 = asyncRestTemplate
.getForEntity(url1, String.class);
f1.addCallback(//);
// more futures
LongAdder adder = new LongAdder();
ListenableFutureCallback<ResponseEntity<String>> callback = new CounterCallback(adder);
Stream.of(f1, f2, f3)
.forEach {f -> f.addCallback(callback)}
for (int counter = 1; adder.sum() < 3 && counter < 10; counter++) {
Thread.sleep(1000);
}
// either all futures are done or we're done waiting
Map<Boolean, ResponseEntity<String>> futures = Stream.of(f1, f2, f3)
.collect(Collectors.partitioningBy(Future::isDone));
Now we've a Map
for which futures.get(Boolean.TRUE)
will give us all the futures that have completed and futures.get(Boolean.FALSE)
will give us the ones that didn't. We will want to cancel the ones that didn't complete.
现在我们有一个Map
for whichfutures.get(Boolean.TRUE)
将为我们提供所有已完成的期货,futures.get(Boolean.FALSE)
并将为我们提供未完成的期货。我们将要取消那些没有完成的。
This code does a few things that are important with parallel programming:
这段代码做了一些对并行编程很重要的事情:
- It doesn't block.
- It limits the operation to some maximum allowed time.
- It clearly separates successful and failure cases.
- 它不会阻塞。
- 它将操作限制在某个最大允许时间。
- 它清楚地区分了成功和失败的案例。