java 如何组合 Observables 以避免给定的嵌套和依赖回调?
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/28402376/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
How to compose Observables to avoid the given nested and dependent callbacks?
提问by Aravind Yarram
In this blog, he gives this(copy/pasted the following code) example for the callback hell. However, there is no mention of how the issue can be eliminated by using Reactive Extensions.
在这篇博客中,他给出了回调地狱的这个(复制/粘贴以下代码)示例。但是,没有提到如何通过使用 Reactive Extensions 来消除该问题。
So here F3 depends upon F1 completion and F4 and F5 depend upon F2 completion.
所以这里F3取决于F1完成,F4和F5取决于F2完成。
- Wondering what would be the functional equivalent in Rx.
- How to represent in Rx that F1, F2, F3, F4 and F5 should all be pulled asynchronously?
- 想知道 Rx 中的功能等价物是什么。
- 如何在 Rx 中表示 F1、F2、F3、F4 和 F5 都应该异步拉?
NOTE:I am currently trying to wrap my head around Rx so I didn't try solving this example before asking this question.
注意:我目前正试图围绕 Rx 进行思考,所以在问这个问题之前我没有尝试解决这个例子。
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
public class CallbackB {
/**
* Demonstration of nested callbacks which then need to composes their responses together.
* <p>
* Various different approaches for composition can be done but eventually they end up relying upon
* synchronization techniques such as the CountDownLatch used here or converge on callback design
* changes similar to <a href="https://github.com/Netflix/RxJava">Rx</a>.
*/
public static void run() throws Exception {
final ExecutorService executor = new ThreadPoolExecutor(4, 4, 1, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>());
/* the following are used to synchronize and compose the asynchronous callbacks */
final CountDownLatch latch = new CountDownLatch(3);
final AtomicReference<String> f3Value = new AtomicReference<String>();
final AtomicReference<Integer> f4Value = new AtomicReference<Integer>();
final AtomicReference<Integer> f5Value = new AtomicReference<Integer>();
try {
// get f3 with dependent result from f1
executor.execute(new CallToRemoteServiceA(new Callback<String>() {
@Override
public void call(String f1) {
executor.execute(new CallToRemoteServiceC(new Callback<String>() {
@Override
public void call(String f3) {
// we have f1 and f3 now need to compose with others
System.out.println("intermediate callback: " + f3 + " => " + ("f4 * f5"));
// set to thread-safe variable accessible by external scope
f3Value.set(f3);
latch.countDown();
}
}, f1));
}
}));
// get f4/f5 after dependency f2 completes
executor.execute(new CallToRemoteServiceB(new Callback<Integer>() {
@Override
public void call(Integer f2) {
executor.execute(new CallToRemoteServiceD(new Callback<Integer>() {
@Override
public void call(Integer f4) {
// we have f2 and f4 now need to compose with others
System.out.println("intermediate callback: f3" + " => " + (f4 + " * f5"));
// set to thread-safe variable accessible by external scope
f4Value.set(f4);
latch.countDown();
}
}, f2));
executor.execute(new CallToRemoteServiceE(new Callback<Integer>() {
@Override
public void call(Integer f5) {
// we have f2 and f5 now need to compose with others
System.out.println("intermediate callback: f3" + " => " + ("f4 * " + f5));
// set to thread-safe variable accessible by external scope
f5Value.set(f5);
latch.countDown();
}
}, f2));
}
}));
/* we must wait for all callbacks to complete */
latch.await();
System.out.println(f3Value.get() + " => " + (f4Value.get() * f5Value.get()));
} finally {
executor.shutdownNow();
}
}
public static void main(String[] args) {
try {
run();
} catch (Exception e) {
e.printStackTrace();
}
}
private static final class CallToRemoteServiceA implements Runnable {
private final Callback<String> callback;
private CallToRemoteServiceA(Callback<String> callback) {
this.callback = callback;
}
@Override
public void run() {
// simulate fetching data from remote service
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
callback.call("responseA");
}
}
private static final class CallToRemoteServiceB implements Runnable {
private final Callback<Integer> callback;
private CallToRemoteServiceB(Callback<Integer> callback) {
this.callback = callback;
}
@Override
public void run() {
// simulate fetching data from remote service
try {
Thread.sleep(40);
} catch (InterruptedException e) {
e.printStackTrace();
}
callback.call(100);
}
}
private static final class CallToRemoteServiceC implements Runnable {
private final Callback<String> callback;
private final String dependencyFromA;
private CallToRemoteServiceC(Callback<String> callback, String dependencyFromA) {
this.callback = callback;
this.dependencyFromA = dependencyFromA;
}
@Override
public void run() {
// simulate fetching data from remote service
try {
Thread.sleep(60);
} catch (InterruptedException e) {
e.printStackTrace();
}
callback.call("responseB_" + dependencyFromA);
}
}
private static final class CallToRemoteServiceD implements Runnable {
private final Callback<Integer> callback;
private final Integer dependencyFromB;
private CallToRemoteServiceD(Callback<Integer> callback, Integer dependencyFromB) {
this.callback = callback;
this.dependencyFromB = dependencyFromB;
}
@Override
public void run() {
// simulate fetching data from remote service
try {
Thread.sleep(140);
} catch (InterruptedException e) {
e.printStackTrace();
}
callback.call(40 + dependencyFromB);
}
}
private static final class CallToRemoteServiceE implements Runnable {
private final Callback<Integer> callback;
private final Integer dependencyFromB;
private CallToRemoteServiceE(Callback<Integer> callback, Integer dependencyFromB) {
this.callback = callback;
this.dependencyFromB = dependencyFromB;
}
@Override
public void run() {
// simulate fetching data from remote service
try {
Thread.sleep(55);
} catch (InterruptedException e) {
e.printStackTrace();
}
callback.call(5000 + dependencyFromB);
}
}
private static interface Callback<T> {
public void call(T value);
}
}
回答by benjchristensen
I'm the original author of the referenced blog post about callbacks and Java Futures. Here is an example of using flatMap, zip and merge to do service composition asynchronously.
我是有关回调和 Java Futures 的参考博客文章的原作者。下面是一个使用 flatMap、zip 和 merge 来异步进行服务组合的示例。
It fetches a User object, then concurrently fetches Social and PersonalizedCatalog data, then for each Video from the PersonalizedCatalog concurrently fetches a Bookmark, Rating and Metadata, zips those together, and merges all of the responses into a progressive stream output as Server-Sent Events.
它获取一个 User 对象,然后同时获取 Social 和 PersonalizedCatalog 数据,然后对于来自 PersonalizedCatalog 的每个视频同时获取一个书签、评级和元数据,将它们压缩在一起,并将所有响应合并为渐进式流输出作为服务器发送的事件.
return getUser(userId).flatMap(user -> {
Observable<Map<String, Object>> catalog = getPersonalizedCatalog(user)
.flatMap(catalogList -> catalogList.videos().<Map<String, Object>> flatMap(
video -> {
Observable<Bookmark> bookmark = getBookmark(video);
Observable<Rating> rating = getRatings(video);
Observable<VideoMetadata> metadata = getVideoMetadata(video);
return Observable.zip(bookmark, rating, metadata, (b, r, m) -> combineVideoData(video, b, r, m));
}));
Observable<Map<String, Object>> social = getSocial(user).map(s -> {
return s.getDataAsMap();
});
return Observable.merge(catalog, social);
}).flatMap(data -> {
String json = SimpleJson.mapToJson(data);
return response.writeStringAndFlush("data: " + json + "\n");
});
This example can be seen in context of a functioning application at https://github.com/Netflix/ReactiveLab/blob/952362b89a4d4115ae0eecf0e73f273ecb27ba98/reactive-lab-gateway/src/main/java/io/reactivex/lab/gateway/routes/RouteForDeviceHome.java#L33
Since I can't possibly provide all of the information here you can also find an explanation in presentation form (with link to video) at https://speakerdeck.com/benjchristensen/reactive-streams-with-rx-at-javaone-2014?slide=32.
由于我无法在此处提供所有信息,您还可以在https://speakerdeck.com/benjchristensen/reactive-streams-with-rx-at-javaone 上以演示形式(带有视频链接)找到说明- 2014?幻灯片=32。
回答by dwursteisen
According to your code. Suppose that the remote call are done using Observable
.
根据你的代码。假设远程调用是使用Observable
.
Observable<Integer> callRemoveServiceA() { /* async call */ }
/* .... */
Observable<Integer> callRemoveServiceE(Integer f2) { /* async call */ }
What you want :
你想要什么 :
- call
serviceA
then callserviceB
with the result ofserviceA
- call
serviceC
then callserviceD
andserviceE
with the result ofserviceC
- with the result of
serviceE
andserviceD
, build a new value - display the new value with the result of
serviceB
- 调用
serviceA
然后调用serviceB
结果serviceA
- 调用
serviceC
然后调用serviceD
,serviceE
结果是serviceC
- 使用
serviceE
and的结果serviceD
,构建一个新值 - 结果显示新值
serviceB
With RxJava, you'll achieve this with this code :
使用 RxJava,您将使用以下代码实现这一点:
Observable<Integer> f3 = callRemoveServiceA() // call serviceA
// call serviceB with the result of serviceA
.flatMap((f1) -> callRemoveServiceB(f1));
Observable<Integer> f4Andf5 = callRemoveServiceC() // call serviceC
// call serviceD and serviceE then build a new value
.flatMap((f2) -> callRemoveServiceD(f2).zipWith(callRemoveServiceE(f2), (f4, f5) -> f4 * f5));
// compute the string to display from f3, and the f4, f5 pair
f3.zipWith(f4Andf5, (childF3, childF4Andf5) -> childF3 + " => " + childF4Andf5)
// display the value
.subscribe(System.out::println);
the important part here is the use of flapMap
and zip
(or zipWith
)
这里的重要部分是使用flapMap
和zip
(或zipWith
)
flapMap
will transform a value into anotherObservable
. ThisObservable
here will be your new asynchronos call. ( http://reactivex.io/documentation/operators/flatmap.html)zip
will compose a new value from two differentObservable
. So you can build a new value with the result of two (or more)Obsevable
( http://reactivex.io/documentation/operators/zip.html)
flapMap
将一个值转换为另一个值Observable
。这Observable
将是您新的异步调用。( http://reactivex.io/documentation/operators/flatmap.html)zip
将从两个不同的Observable
. 因此,您可以使用两个(或多个)的结果构建一个新值Obsevable
(http://reactivex.io/documentation/operators/zip.html)
You can get more info on flapMap here : When do you use map vs flatMap in RxJava?
你可以在这里获得更多关于flapMap的信息:你什么时候在RxJava中使用map vs flatMap?