Java 如何停止和恢复 Observable.interval 发出滴答声
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/35419062/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
How to stop and resume Observable.interval emiting ticks
提问by Jan Seevers
This will emit a tick every 5 seconds.
这将每 5 秒发出一次滴答声。
Observable.interval(5, TimeUnit.SECONDS, Schedulers.io())
.subscribe(tick -> Log.d(TAG, "tick = "+tick));
To stop it you can use
要停止它,您可以使用
Schedulers.shutdown();
But then all the Schedulers stops and it is not possible to resume the ticking later. How can I stop and resume the emiting "gracefully"`?
但是随后所有的调度程序都停止了,以后不可能恢复滴答声。我怎样才能“优雅地”停止和恢复发射?
采纳答案by AndroidEx
Here's one possible solution:
这是一种可能的解决方案:
class TickHandler {
private AtomicLong lastTick = new AtomicLong(0L);
private Subscription subscription;
void resume() {
System.out.println("resumed");
subscription = Observable.interval(5, TimeUnit.SECONDS, Schedulers.io())
.map(tick -> lastTick.getAndIncrement())
.subscribe(tick -> System.out.println("tick = " + tick));
}
void stop() {
if (subscription != null && !subscription.isUnsubscribed()) {
System.out.println("stopped");
subscription.unsubscribe();
}
}
}
回答by Sheng
val switch = new java.util.concurrent.atomic.AtomicBoolean(true)
val tick = new java.util.concurrent.atomic.AtomicLong(0L)
val suspendableObservable =
Observable.
interval(5 seconds).
takeWhile(_ => switch.get()).
repeat.
map(_ => tick.incrementAndGet())
You can set switch
to false
to suspend the ticking and true
to resume it.
您可以设置switch
为false
暂停和true
恢复滴答声。
回答by Chris Wong
Here is a another way to do this, I think.
When you check the source code, you will find interval()using class OnSubscribeTimerPeriodically. The key code below.
我想这是另一种方法来做到这一点。
当您检查源代码时,您会发现使用类OnSubscribeTimerPeriodically 的interval()。关键代码如下。
@Override
public void call(final Subscriber<? super Long> child) {
final Worker worker = scheduler.createWorker();
child.add(worker);
worker.schedulePeriodically(new Action0() {
long counter;
@Override
public void call() {
try {
child.onNext(counter++);
} catch (Throwable e) {
try {
worker.unsubscribe();
} finally {
Exceptions.throwOrReport(e, child);
}
}
}
}, initialDelay, period, unit);
}
So, you will see, if you wanna cannel the loop, what about throwing a new exception in onNext(). Example code below.
所以,你会看到,如果你想取消循环,在onNext() 中抛出一个新的异常怎么样。下面的示例代码。
Observable.interval(1000, TimeUnit.MILLISECONDS)
.subscribe(new Action1<Long>() {
@Override
public void call(Long aLong) {
Log.i("abc", "onNext");
if (aLong == 5) throw new NullPointerException();
}
}, new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
Log.i("abc", "onError");
}
}, new Action0() {
@Override
public void call() {
Log.i("abc", "onCompleted");
}
});
Then you will see this:
然后你会看到这个:
08-08 11:10:46.008 28146-28181/net.bingyan.test I/abc: onNext
08-08 11:10:47.008 28146-28181/net.bingyan.test I/abc: onNext
08-08 11:10:48.008 28146-28181/net.bingyan.test I/abc: onNext
08-08 11:10:49.008 28146-28181/net.bingyan.test I/abc: onNext
08-08 11:10:50.008 28146-28181/net.bingyan.test I/abc: onNext
08-08 11:10:51.008 28146-28181/net.bingyan.test I/abc: onNext
08-08 11:10:51.018 28146-28181/net.bingyan.test I/abc: onError
回答by mikebridge
Sorry this is in RxJS instead of RxJava, but the concept will be the same. I adapted this from learn-rxjs.ioand here it is on codepen.
抱歉,这是在 RxJS 中而不是在 RxJava 中,但概念是相同的。我从learn-rxjs.io改编了这个,它在codepen 上。
The idea is that you start out with two streams of click events, startClick$
and stopClick$
. Each click occurring on the stopClick$
stream get mapped to an empty observable, and clicks on startClick$
each get mapped to the interval$
stream. The two resulting streams get merge
-d together into one observable-of-observables. In other words, a new observable of one of the two types will be emitted from merge
each time there's a click. The resulting observable will go through switchMap
, which starts listening to this new observable and stops listening to whatever it was listening to before. Switchmap will also start merge the values from this new observable onto its existing stream.
这个想法是你从两个点击事件流开始,startClick$
和stopClick$
. stopClick$
流上发生的每次点击都会映射到一个空的 observable,并且startClick$
每次点击都会映射到interval$
流上。两个结果流将merge
-d 组合成一个 observable-of-observables。换句话说,merge
每次点击时都会发出两种类型之一的新 observable 。生成的 observable 将通过switchMap
,它开始收听这个新的 observable 并停止收听它之前收听的任何内容。Switchmap 还将开始将来自这个新 observable 的值合并到其现有流中。
After the switch, scan
only ever sees the "increment" value emitted by interval$
, and it doesn't see any values when "stop" has been clicked.
切换后,scan
只能看到 发出的“增量”值interval$
,单击“停止”时看不到任何值。
And until the first click occurs, startWith
will start emitting values from $interval
, just to get things going:
在第一次点击之前,startWith
将开始从 发出值$interval
,只是为了让事情顺利进行:
const start = 0;
const increment = 1;
const delay = 1000;
const stopButton = document.getElementById('stop');
const startButton = document.getElementById('start');
const startClick$ = Rx.Observable.fromEvent(startButton, 'click');
const stopClick$ = Rx.Observable.fromEvent(stopButton, 'click');
const interval$ = Rx.Observable.interval(delay).mapTo(increment);
const setCounter = newValue => document.getElementById("counter").innerHTML = newValue;
setCounter(start);
const timer$ = Rx.Observable
// a "stop" click will emit an empty observable,
// and a "start" click will emit the interval$ observable.
// These two streams are merged into one observable.
.merge(stopClick$.mapTo(Rx.Observable.empty()),
startClick$.mapTo(interval$))
// until the first click occurs, merge will emit nothing, so
// use the interval$ to start the counter in the meantime
.startWith(interval$)
// whenever a new observable starts, stop listening to the previous
// one and start emitting values from the new one
.switchMap(val => val)
// add the increment emitted by the interval$ stream to the accumulator
.scan((acc, curr) => curr + acc, start)
// start the observable and send results to the DIV
.subscribe((x) => setCounter(x));
And here's the HTML
这是 HTML
<html>
<body>
<div id="counter"></div>
<button id="start">
Start
</button>
<button id="stop">
Stop
</button>
</body>
</html>
回答by Artur Szymański
Some time ago, I was also looking for kind of RX "timer" solutions, but non of them met my expectations. So there you can find my own solution:
前段时间,我也在寻找某种 RX“定时器”解决方案,但没有一个符合我的期望。所以你可以找到我自己的解决方案:
AtomicLong elapsedTime = new AtomicLong();
AtomicBoolean resumed = new AtomicBoolean();
AtomicBoolean stopped = new AtomicBoolean();
public Flowable<Long> startTimer() { //Create and starts timper
resumed.set(true);
stopped.set(false);
return Flowable.interval(1, TimeUnit.SECONDS)
.takeWhile(tick -> !stopped.get())
.filter(tick -> resumed.get())
.map(tick -> elapsedTime.addAndGet(1000));
}
public void pauseTimer() {
resumed.set(false);
}
public void resumeTimer() {
resumed.set(true);
}
public void stopTimer() {
stopped.set(true);
}
public void addToTimer(int seconds) {
elapsedTime.addAndGet(seconds * 1000);
}
回答by Alier
You can use takeWhile and loop until conditions is true
您可以使用 takeWhile 并循环直到条件为真
Observable.interval(1, TimeUnit.SECONDS)
.takeWhile {
Log.i(TAG, " time " + it)
it != 30L
}
.subscribe(object : Observer<Long> {
override fun onComplete() {
Log.i(TAG, "onComplete " + format.format(System.currentTimeMillis()))
}
override fun onSubscribe(d: Disposable) {
Log.i(TAG, "onSubscribe " + format.format(System.currentTimeMillis()))
}
override fun onNext(t: Long) {
Log.i(TAG, "onNext " + format.format(System.currentTimeMillis()))
}
override fun onError(e: Throwable) {
Log.i(TAG, "onError")
e.printStackTrace()
}
});