Android Rx Observable 周期性地发射值

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/24557153/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-20 08:24:49  来源:igfitidea点击:

Rx Observable emitting values periodically

androidrx-java

提问by Haspemulator

I have to poll some RESTful endpoint periodically to refresh my android app's data. I also have to pause and resume it based on connectivity (if the phone is offline, there's no need to even try). My current solution is working, but it uses standard Java's ScheduledExecutorServiceto perform periodic tasks, but I'd like to stay in Rx paradigm.

我必须定期轮询一些 RESTful 端点以刷新我的 android 应用程序的数据。我还必须根据连接情况暂停和恢复它(如果手机处于离线状态,甚至无需尝试)。我当前的解决方案正在运行,但它使用标准 JavaScheduledExecutorService来执行定期任务,但我想留在 Rx 范式中。

Here's my current code, parts of which are skipped for brevity.

这是我当前的代码,为简洁起见,跳过了其中的一部分。

userProfileObservable = Observable.create(new Observable.OnSubscribe<UserProfile>() {
    @Override
    public void call(final Subscriber<? super UserProfile> subscriber) {
        final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
        final Runnable runnable = new Runnable() {
            @Override
            public void run() {
                // making http request here
            }
        };
        final List<ScheduledFuture<?>> futures = new ArrayList<ScheduledFuture<?>>(1);
        networkStatusObservable.subscribe(new Action1<Boolean>() {
            @Override
            public void call(Boolean networkAvailable) {
                if (!networkAvailable) {
                    pause();
                } else {
                    pause();                        
                    futures.add(scheduledExecutorService.scheduleWithFixedDelay(runnable, 0, SECOND_IN_MILLIS * SECONDS_TO_EXPIRE, TimeUnit.MILLISECONDS));
                }
            }

            private void pause() {
                for (ScheduledFuture<?> future : futures) {
                    future.cancel(true);
                }
                futures.clear();
            }
        });

        final Subscription subscription = new Subscription() {
            private boolean isUnsubscribed = false;

            @Override
            public void unsubscribe() {
                scheduledExecutorService.shutdownNow();
                isUnsubscribed = true;
            }

            @Override
            public boolean isUnsubscribed() {
                return isUnsubscribed;
            }
        };
        subscriber.add(subscription);
    }
}).multicast(BehaviorSubject.create()).refCount();

networkStatusObservableis basically a broadcast receiver wrapped into Observable<Boolean>, indicating that the phone is connected to the network.

networkStatusObservable基本上是包裹在 中的广播接收器Observable<Boolean>,表示手机已经连接到网络。

As I said, this solution is working, but I want to use Rx approach for periodic polling and emitting new UserProfiles, because there are numerous problems with scheduling things manually, which I want to avoid. I know about Observable.timerand Observable.interval, but can't figure out how to apply them to this task (and I'm not sure if I need to use those at all).

正如我所说,这个解决方案是有效的,但我想使用 Rx 方法进行定期轮询和发出 new UserProfiles,因为手动调度事情有很多问题,我想避免这些问题。我知道Observable.timerand Observable.interval,但不知道如何将它们应用于此任务(我不确定是否需要使用它们)。

回答by Robert Estivill

There are a few approaches on this GitHub issue that you might find helpful.

在此 GitHub 问题上有一些方法可能对您有所帮助。

https://github.com/ReactiveX/RxJava/issues/448

https://github.com/ReactiveX/RxJava/issues/448

The three implementations are:

这三个实现是:



Observable.interval

Observable.interval

Observable.interval(delay, TimeUnit.SECONDS).timeInterval()
        .flatMap(new Func1<Long, Observable<Notification<AppState>>>() {
            public Observable<Notification<AppState>> call(Long seconds) {
                return lyftApi.updateAppState(params).materialize(); } });


Scheduler.schedulePeriodically

Scheduler.schedulePeriodically

Observable.create({ observer ->
        Schedulers.newThread().schedulePeriodically({
            observer.onNext("application-state-from-network");
        }, 0, 1000, TimeUnit.MILLISECONDS);
    }).take(10).subscribe({ v -> println(v) });


Manual Recursion

手动递归

Observable.create(new OnSubscribeFunc<String>() {
        @Override
        public Subscription onSubscribe(final Observer<? super String> o) {
            return Schedulers.newThread().schedule(0L, new Func2<Scheduler, Long, Subscription>() {
                @Override
                public Subscription call(Scheduler inner, Long t2) {
                    o.onNext("data-from-polling");
                    return inner.schedule(t2, this, 1000, TimeUnit.MILLISECONDS);
                }
            });
        }
    }).toBlockingObservable().forEach(new Action1<String>() {
        @Override
        public void call(String v) {
            System.out.println("output: " + v);
        }
    });

And the conclusion is that manual recursion is the way to go because it waits until the operation is completed before scheduling the next execution.

结论是手动递归是可行的方法,因为它会等到操作完成后再安排下一次执行。

回答by Marek Hawrylczak

One of options is to use Observable.interval and checking the user state when the intervals are emitted:

选项之一是使用 Observable.interval 并在发出间隔时检查用户状态:

     Observable<Long> interval = Observable.interval(1, TimeUnit.SECONDS);

    //pulling the user data
    Observable<Observable<String>> userObservable = interval.map(new Func1<Long, Observable<String>>() {
        Random random = new Random();
        @Override
        public Observable<String> call(Long tick) {
            //here you are pulling user data; you should do it asynchronously - rx way - because the interval is using Schedulers.computation which is not best suited for doing io operations
            switch(random.nextInt(10)){
                case 0://suppose this is for cases when network in  not available or exception happens
                    return Observable.<String>just(null);
                case 1:
                case 2:
                    return Observable.just("Alice");
                default:
                    return Observable.just("Bob");
            }
        }
    });

    Observable<String> flatUsers = userObservable.flatMap(new Func1<Observable<String>, Observable<? extends String>>() {
        @Override
        public Observable<? extends String> call(Observable<String> stringObservable) {
            return stringObservable;
        }
    });

    //filter valid data
    Observable<String> usersWithoutErrors = flatUsers.filter(new Func1<String, Boolean>() {
        @Override
        public Boolean call(String s) {
            return s != null;
        }
    });

    //publish only changes
    Observable<String> uniqueUsers = usersWithoutErrors.distinctUntilChanged();

You can do it even simpler if your networkStatusObservable is emitting events at least as frequently as you need to check user data

如果您的 networkStatusObservable 发出事件的频率至少与您需要检查用户数据的频率一样,则您可以做得更简单

 networkStatusObservable.sample(1,TimeUnit.Seconds).filter(/*the best is to filter only connected state */).map(/*now start pulling the user data*/)

Finally you can can create observable which uses scheduler to emit the user states periodically - refer to Schedulers documentationto learn which scheduler fit you needs the best:

最后,您可以创建 observable,它使用调度程序定期发出用户状态 - 请参阅调度程序文档以了解最适合您的调度程序:

public abstract class ScheduledOnSubscribe<T> implements Observable.OnSubscribe<T>{
    private final Scheduler scheduler;
    private final long initialDelay;
    private final long period;
    private final TimeUnit unit;

    public ScheduledOnSubscribe(Scheduler scheduler, long initialDelay, long period, TimeUnit unit) {
        this.scheduler = scheduler;
        this.initialDelay = initialDelay;
        this.period = period;
        this.unit = unit;
    }

    abstract T next() throws Exception;


    @Override
    public void call(final Subscriber<? super T> subscriber) {
        final Scheduler.Worker worker = scheduler.createWorker();
        subscriber.add(worker);
        worker.schedulePeriodically(new Action0() {
            @Override
            public void call() {
                try {
                    subscriber.onNext(next());
                } catch (Throwable e) {
                    try {
                        subscriber.onError(e);
                    } finally {
                        worker.unsubscribe();
                    }
                }
            }

        }, initialDelay, period, unit);
    }
}

//And here is the sample usage
 Observable<String> usersObservable = Observable.create(new ScheduledOnSubscribe(Schedulers.io(), 1, 1, TimeUnit.SECONDS ){
        Random random = new Random();
        @Override
        String next() throws Exception {
            //if you use Schedulers.io, you can call the remote service synchronously
            switch(random.nextInt(10)){
                case 0:
                    return null;
                case 1:
                case 2:
                    return "Alice";
                default:
                    return "Bob";
            }
        }
    });

回答by Johnny Five

Short answer. RxJava2:

简短的回答。RxJava2:

Observable.interval(initialDelay, unitAmount, timeUnit)
            .subscribe(value -> {
                // code for periodic execution
            });

Choose initialDelay, unitAmount and TimeUnit according to your needs.

根据需要选择 initialDelay、unitAmount 和 TimeUnit。

Example: 0, 1, TimeUnit.MINUTES.

示例:0、1、TimeUnit.MINUTES。

回答by zafar142003

There is a simpler way to do it by using interval(). I have tested this code and it works. But first, you should encapsulate the job you want to periodically execute in a subclass of Action1.

有一种更简单的方法可以使用 interval() 来做到这一点。我已经测试了这段代码并且它有效。但首先,您应该将要定期执行的作业封装在 Action1 的子类中。

class Act<T> implements Action1<T> {
     public Service service;
     public String data;
     public void call(T t){
         service.log(data); //the periodic job
     }
}

(I have kept the fields public for brevity, but that isn't advisable). Now you can schedule it the following way:

(为简洁起见,我已将字段公开,但这不可取)。现在您可以按以下方式安排它:

Act<Long> act=new Act<>();
act.data="dummy data";
act.service=this;
Observable.interval(0l, period, TimeUnit.SECONDS).subscribeOn(Schedulers.from(Executors.newFixedThreadPool(10))).subscribe((Action1<Long>)act);

This will not block your threads anywhere, unlike the approach given in the other answer. This approach allows us to pass a variable as a kind of mutable storage inside the Action which could be handy in subsequent invocations. Also, this way you could subscribe your call on your own thread pool.

与其他答案中给出的方法不同,这不会在任何地方阻塞您的线程。这种方法允许我们将变量作为一种可变存储传递到 Action 中,这在后续调用中可能会很方便。此外,通过这种方式,您可以在您自己的线程池上订阅您的调用。

回答by Haspemulator

Okay, I'll post my own solution, maybe someone will benefit from it. I'll only post the part related to the question, omitting the HTTP and caching stuff. Here's how I do it:

好的,我会发布我自己的解决方案,也许有人会从中受益。我只会发布与问题相关的部分,省略 HTTP 和缓存内容。这是我的方法:

private ConnectableObservable<Long> createNetworkBoundHeartbeatObservable(final Observable<Boolean> networkStatusObservable,
                                                                          final Observable<Boolean> pauseResumeObservable) {

    final Observable<Boolean> pausableHeartbeatObservable = Observable.combineLatest(networkStatusObservable, pauseResumeObservable,
            new Func2<Boolean, Boolean, Boolean>() {
                @Override
                public Boolean call(Boolean networkAvailable, Boolean mustPause) {
                    return mustPause && networkAvailable;
                }
            }
    ).distinctUntilChanged();

    final Observable<Boolean> hasToResumeObservable = pausableHeartbeatObservable.filter(new Func1<Boolean, Boolean>() {
        @Override
        public Boolean call(Boolean networkAvailable) {
            return networkAvailable;
        }
    });
    final Observable<Boolean> hasToStopObservable = pausableHeartbeatObservable.filter(new Func1<Boolean, Boolean>() {
        @Override
        public Boolean call(Boolean networkAvailable) {
            return !networkAvailable;
        }
    });


    return pausableHeartbeatObservable.concatMap(new Func1<Boolean, Observable<Long>>() {
        @Override
        public Observable<Long> call(Boolean shouldResumeRequests) {
            if (shouldResumeRequests) {
                long timeToUpdate;
                final Date oldestModifiedExpiresAt = cache.oldestModifiedExpiresAt();
                timeToUpdate = Math.max(0, oldestModifiedExpiresAt.getTime() - System.currentTimeMillis());
                Log.d(TAG, String.format("Have to restart updates, %d seconds till next update", timeToUpdate / SECOND_IN_MILLIS));
                return Observable
                        .timer(timeToUpdate, SECONDS_TO_EXPIRE * SECOND_IN_MILLIS, TimeUnit.MILLISECONDS)
                        .takeUntil(hasToStopObservable);
            } else {
                Log.d(TAG, "Have to pause updates");
                return Observable.<Long>never().takeUntil(hasToResumeObservable);
            }
        }
    }).multicast(PublishSubject.<Long>create());
}

As you can see, the conditions to pause or resume updates become a bit more complicated, with a new Observable added to support pausing when app goes to background.

如您所见,暂停或恢复更新的条件变得更加复杂,添加了一个新的 Observable 以支持在应用程序进入后台时暂停。

Then at the core of the solution is the concatMapoperation which emits the Observablessequentially (hence concatMap, not flatMap, see this question: What is the difference between concatMap and flatMap in RxJava). It emits either intervalor neverObservables, depending on whether updates should be continued or paused. Then every emitted Observableis takenUntil'an opposite' Observableemits new value.

然后解决方案的核心是顺序concatMap发出的操作Observables(因此是 concatMap,而不是 flatMap,请参阅这个问题:RxJava 中的 concatMap 和 flatMap 之间有什么区别)。它发出intervalneverObservables,取决于更新是应该继续还是暂停。然后每个发出ObservabletakenUntil“相反”Observable发出新值。

ConnectableObservableis returned because the created Observableis hot, and all the intended subscribers have to subscribe to it before it starts emitting something, otherwise initial events could be lost. I call connecton it later.

ConnectableObservable返回是因为 createdObservable是热的,并且所有预期的订阅者必须在它开始发出某些东西之前订阅它,否则初始事件可能会丢失。我connect稍后会调用它。

I'll accept either my or another answer based on votes, if any.

如果有的话,我会根据投票接受我的答案或其他答案。