java RxJava 中的任务取消是如何工作的?

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/25344088/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-11-02 07:51:41  来源:igfitidea点击:

How does task cancellation work in RxJava?

javarx-java

提问by user3452758

I'm unclear on how to implement task cancellation in RXJava.

我不清楚如何在 RXJava 中实现任务取消。

I'm interested in porting an existing API built using Guava's ListenableFuture. My use case is as follows:

我有兴趣移植使用 Guava 的ListenableFuture. 我的用例如下:

  • I have an single operation that's composed of a sequence of futures joined by Futures.transform()
  • Multiple subscribers observe the operation's final future.
  • Each observer can cancel the final future, and all observers witness the cancellation event.
  • Cancellation of the final future results in the cancellation of its dependencies, e.g. in sequence 1->2->3, cancellation of 3is propagated to 2, and so on.
  • 我有一个由一系列期货组成的单一操作 Futures.transform()
  • 多个订阅者观察操作的最终未来。
  • 每个观察者都可以取消最后的未来,所有观察者都见证了取消事件。
  • 取消最终未来会导致取消其依赖项,例如按顺序1-> 2-> 3,取消 of3传播到2,依此类推。

There's very little info in the RxJava wiki about this; the only references I can find to cancellation mention Subscriptionas an equivalent to .NET's Disposable, but as far as I can see, Subscription only offers the ability to unsubscribe from subsequent values in the sequence.

RxJava wiki 中关于此的信息很少;我能找到的唯一引用取消提及Subscription相当于 .NET 的Disposable,但据我所知,订阅仅提供取消订阅序列中后续值的能力。

I'm unclear on how to implement "any subscriber can cancel" semantics through this API. Am I thinking about this in the wrong way?

我不清楚如何通过这个 API 实现“任何订阅者都可以取消”语义。我是否以错误的方式思考这个问题?

Any input would be appreciated.

任何输入将不胜感激。

回答by André Staltz

It's important to learn about Cold vs Hot Observables. If your Observables are cold, then their operations will not execute if you have no subscribers. Hence to "cancel", just make sure all Observers unsubscribe from the source Observable.

了解Cold vs Hot Observables很重要。如果您的 Observable 是冷的,那么如果您没有订阅者,它们的操作将不会执行。因此要“取消”,只需确保所有观察者都取消订阅源 Observable。

However, if only one Observer of the source unsubscribes, and there are other Observers still subscribed to the source, this will not incur a "cancelling". In that case you can use (but it's not the only solution) ConnectableObservables. Also see this link about Rx.NET.

但是,如果只有一个源的观察者取消订阅,并且还有其他观察者仍然订阅了源,则不会导致“取消”。在这种情况下,您可以使用(但它不是唯一的解决方案)ConnectableObservables。另请参阅有关 Rx.NET 的链接

A practical way of using ConnectableObservables is to simply call .publish().refCount()on any cold Observable. What that does is create one single "proxy" Observer which relays the events from the source to the actual Observers. The proxy Observer unsubscribes when the last actual Observer unsubscribes.

使用 ConnectableObservables 的一种实用方法是简单地调用.publish().refCount()任何冷 Observable。这样做是创建一个单一的“代理”观察者,它将事件从源中继到实际的观察者。当最后一个实际观察者取消订阅时,代理观察者取消订阅。

To manually control a ConnectableObservable, call just coldSource.publish()and you will get an instance of ConnectableObservable. Then you can call .connect()which will return you the Subscription of the "proxy" Observer. To manually "cancel" the source, you just unsubscribe the Subscription of the proxy Observer.

要手动控制 ConnectableObservable,只需调用即可coldSource.publish(),您将获得一个 ConnectableObservable 实例。然后你可以调用.connect()它会返回你“代理”观察者的订阅。要手动“取消”源,您只需取消订阅代理观察者的订阅即可。



For your specific problem, you can also use the .takeUntil()operator.

对于您的特定问题,您还可以使用.takeUntil()运算符。

Suppose your "final future" is ported as finalStreamin RxJava, and suppose that "cancel events" are Observables cancelStream1, cancelStream2, etc, then it becomes fairly simple to "cancel" operations resulting from finalStream:

假设你的“最后的未来”被移植为finalStream在RxJava,并假设“取消事件”的观测量cancelStream1cancelStream2等等,然后就变得相当简单的“取消”,从产生的操作finalStream

Observable<FooBar> finalAndCancelableStream = finalStream
    .takeUntil( Observable.merge(cancelStream1, cancelStream2) );

In diagrams, this is how takeUntil works, and this is how merge works.

在图表中,这就是 takeUntil 的工作方式这就是 merge 的工作方式

In plain english, you can read it as "finalAndCancelableStream is the finalStream until either cancelStream1 or cancelStream2 emit an event".

用简单的英语,您可以将其读作“finalAndCancelableStream 是 finalStream,直到 cancelStream1 或 cancelStream2 发出事件”。