RxJava 观察调用/订阅线程

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/29355741/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-11 07:49:33  来源:igfitidea点击:

RxJava Observing on calling/subscribing thread

javamultithreadingreactive-programmingrx-java

提问by Filip Zymek

I have some trouble understandig how subscribeOn/observeOn works in RxJava. I've created simple app with observable that emits solar system planet names, does some mapping and filtering and prints results.

我在理解 subscribeOn/observeOn 在 RxJava 中的工作方式时遇到了一些麻烦。我用 observable 创建了一个简单的应用程序,它发出太阳系行星的名称,做一些映射和过滤并打印结果。

As I understand, scheduling work to background thread is done via subscribeOnoperator (and it seems to work fine).

据我了解,将工作调度到后台线程是通过subscribeOn操作符完成的(它似乎工作正常)。

Observing on background thread also works fine with observeOnoperator.

观察后台线程也适用于observeOn操作员。

But I have trouble in understanding, how to observe on calling thread (either if it is main thread or any other). It is easily done on Android with AndroidSchedulers.mainThread()operator, but I don't know how to achieve this in pure java.

但是我无法理解如何观察调用线程(无论是主线程还是其他线程)。在 Android 上使用AndroidSchedulers.mainThread()operator很容易完成,但我不知道如何在纯 Java 中实现这一点。

Here's my code:

这是我的代码:

public class Main {

    public static void main(String[] args) throws InterruptedException {

        ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 5, 3000L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());

        System.out.println("Main thread: " + getCurrentThreadInfo());

        Observable<String> stringObservable = Observable.from(Arrays.asList("Merkury", "Wenus", "Ziemia", "Mars", "Jowisz", "Saturn", "Uran", "Neptun", "Pluton"))
                .map(in -> {
                    System.out.println("map on: " + getCurrentThreadInfo());
                    return in.toUpperCase();
                })
                .filter(in -> {
                    System.out.println("filter on: " + getCurrentThreadInfo());
                    return in.contains("A");
                })
                .subscribeOn(Schedulers.from(executor));

        for (int i = 0; i < 5; i++) {
            Thread thread = new Thread("Thread-" + i) {
                @Override
                public void run() {
                    stringObservable
                            .buffer(5)
                            .subscribe(s -> System.out.println("Result " + s + " on: " + getCurrentThreadInfo()));
                }
            };
            thread.start();
        }

    }

    private static String getCurrentThreadInfo() {
        return Thread.currentThread().getName() + "(" + Thread.currentThread().getId() + ")";
    }
}

Observable in created and work is subscribed on one of three thread from executor. This works as expected. But how to observe results on those dynamically created thread in for loop? Is there a way to create Scheduler from current thread?

在 created 和 work 中的 Observable 在来自执行程序的三个线程之一上订阅。这按预期工作。但是如何在 for 循环中观察那些动态创建的线程的结果呢?有没有办法从当前线程创建调度程序?

Also, I've found out that after running this code, it never terminates and I don't know why? :(

另外,我发现在运行此代码后,它永远不会终止,我不知道为什么?:(

采纳答案by Marek Hawrylczak

To answer your question, let me start from beginning, this allows other people to understand what you already know.

为了回答你的问题,让我从头开始,这样其他人才能理解你已经知道的东西。

Schedulers

调度器

Schedulers play the same role as Executors for Java. Briefly - they decide on which thread actions are executed.

调度程序与 Java 的执行程序扮演着相同的角色。简而言之 - 他们决定执行哪些线程操作。

Usually an Observable and operators execute in current thread. Sometimes you can pass Scheduler to Observable or operator as a parameter (e.g. Observable.timer()).

通常一个 Observable 和操作符在当前线程中执行。有时您可以将调度程序作为参数传递给 Observable 或运算符(例如 Observable.timer())。

Additionally RxJava provides 2 operators to specify Scheduler:

此外,RxJava 提供了 2 个操作符来指定调度器:

  • subscribeOn - specify the Scheduler on which an Observable will operate
  • observeOn - specify the Scheduler on which an observer will observe this Observable
  • subscribeOn - 指定 Observable 将在其上运行的调度程序
  • observeOn - 指定观察者将在其上观察这个 Observable 的调度器

To understand them quickly, I use a the example code:

为了快速理解它们,我使用了示例代码:

On all samples, I will use helper createObservable, which emits a name of thread on which the Observable operates:

在所有示例中,我将使用 helper createObservable,它发出 Observable 操作的线程名称:

 public static Observable<String> createObservable(){
        return Observable.create((Subscriber<? super String> subscriber) -> {
                subscriber.onNext(Thread.currentThread().getName());
                subscriber.onCompleted();
            }
        );
    }

Without schedulers:

没有调度程序:

createObservable().subscribe(message -> {
        System.out.println("Case 1 Observable thread " + message);
        System.out.println("Case 1 Observer thread " + Thread.currentThread().getName());
    });
    //will print:
    //Case 1 Observable thread main
    //Case 1 Observer thread main

SubscribeOn:

订阅:

createObservable()
            .subscribeOn(Schedulers.newThread())
            .subscribe(message -> {
                System.out.println("Case 2 Observable thread " + message);
                System.out.println("Case 2 Observer thread " + Thread.currentThread().getName());
            });
            //will print:
            //Case 2 Observable thread RxNewThreadScheduler-1
            //Case 2 Observer thread RxNewThreadScheduler-1

SubscribeOn and ObserveOn:

SubscribeOn 和 ObserveOn:

reateObservable()
            .subscribeOn(Schedulers.newThread())
            .observeOn(Schedulers.newThread())
            .subscribe(message -> {
                System.out.println("Case 3 Observable thread " + message);
                System.out.println("Case 3 Observer thread " + Thread.currentThread().getName());
            });
            //will print:
            //Case 3 Observable thread RxNewThreadScheduler-2
            //Case 3 Observer thread RxNewThreadScheduler-1

ObserveOn:

观察:

createObservable()
            .observeOn(Schedulers.newThread())
            .subscribe(message -> {
                System.out.println("Case 4 Observable thread " + message);
                System.out.println("Case 4 Observer thread " + Thread.currentThread().getName());
            });
            //will print:
            //Case 4 Observable thread main
            //Case 4 Observer thread RxNewThreadScheduler-1

Answer:

回答:

AndroidSchedulers.mainThread() returns a sheduler which delegates work to MessageQueue associated with main thread.
For this purpose it uses android.os.Looper.getMainLooper() and android.os.Handler.

AndroidSchedulers.mainThread() 返回一个调度器,它将工作委托给与主线程关联的 MessageQueue。
为此,它使用 android.os.Looper.getMainLooper() 和 android.os.Handler。

In other words, if you would like to specify particular thread, you must provide means to schedule and perform tasks on thread.

换句话说,如果你想指定特定的线程,你必须提供在线程上调度和执行任务的方法。

Underneath it may use any kind of MQ for storing tasks and logic which loops the Quee and execute tasks.

在它下面可以使用任何类型的 MQ 来存储任务和循环队列并执行任务的逻辑。

In java, we have Executor which is designated for such tasks. RxJava can easily create Scheduler from such Executor.

在java中,我们有专为此类任务指定的Executor。RxJava 可以很容易地从这样的 Executor 创建调度器。

Below is example which shows how you can observe on main thread (not particular useful but show all required parts).

下面的示例显示了如何在主线程上进行观察(不是特别有用,但显示了所有必需的部分)。

public class RunCurrentThread implements Executor {

    private BlockingQueue<Runnable> tasks = new LinkedBlockingQueue<>();

    public static void main(String[] args) throws InterruptedException {
        RunCurrentThread sample = new RunCurrentThread();
        sample.observerOnMain();
        sample.runLoop();
    }

    private void observerOnMain() {
        createObservable()
                .subscribeOn(Schedulers.newThread())
                .observeOn(Schedulers.from(this))
                .subscribe(message -> {
                    System.out.println("Observable thread " + message);
                    System.out.println("Observer thread " + Thread.currentThread().getName());
                });
        ;
    }

    public Observable<String> createObservable() {
        return Observable.create((Subscriber<? super String> subscriber) -> {
                    subscriber.onNext(Thread.currentThread().getName());
                    subscriber.onCompleted();
                }
        );
    }

    private void runLoop() throws InterruptedException {
        while(!Thread.interrupted()){
            tasks.take().run();
        }
    }

    @Override
    public void execute(Runnable command) {
        tasks.add(command);
    }
}

And the last question, why your code does not terminate:

最后一个问题,为什么您的代码不会终止:

ThreadPoolExecutor uses non deamon threads by defult, thus your program does not end until they exist. You should use shutdownmethod to close the threads.

ThreadPoolExecutor 默认使用非守护线程,因此您的程序在它们存在之前不会结束。您应该使用shutdown方法关闭线程。

回答by miguel

Here's a simplified example updated for RxJava 2. It's the same concept as Marek's answer: an Executor that adds the runnables to a BlockingQueue that's being consumed on the caller's thread.

这是为 RxJava 2 更新的简化示例。它与 Marek 的答案具有相同的概念:一个 Executor 将可运行对象添加到调用者线程上正在使用的 BlockingQueue。

public class ThreadTest {

    @Test
    public void test() throws InterruptedException {

        final BlockingQueue<Runnable> tasks = new LinkedBlockingQueue<>();

        System.out.println("Caller thread: " + Thread.currentThread().getName());

        Observable.fromCallable(new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                System.out.println("Observable thread: " + Thread.currentThread().getName());
                return 1;
            }
        })
            .subscribeOn(Schedulers.io())
            .observeOn(Schedulers.from(new Executor() {
                @Override
                public void execute(@NonNull Runnable runnable) {
                    tasks.add(runnable);
                }
            }))
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(@NonNull Integer integer) throws Exception {
                    System.out.println("Observer thread: " + Thread.currentThread().getName());
                }
            });
        tasks.take().run();
    }

}

// Output: 
// Caller thread main
// Observable thread RxCachedThreadScheduler-1
// Observer thread main