java Rxandroid SubscribeOn 和 ObserveOn 有什么区别

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/44984730/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-11-03 08:26:52  来源:igfitidea点击:

Rxandroid What's the difference between SubscribeOn and ObserveOn

javaandroidrx-javarx-androidrx-java2

提问by Rathore

I am just learning Rx-java and Rxandroid2 and I am just confused what is the major difference between in SubscribeOn and ObserveOn.

我只是在学习 Rx-java 和 Rxandroid2,我很困惑 SubscribeOn 和 ObserveOn 之间的主要区别是什么。

回答by Kubicko

SubscribeOn specify the Scheduler on which an Observable will operate. ObserveOn specify the Scheduler on which an observer will observe this Observable.

SubscribeOn 指定 Observable 将在其上运行的调度程序。ObserveOn 指定观察者将在其上观察此 Observable 的调度程序。

So basically SubscribeOn is mostly subscribed (executed) on a background thread ( you do not want to block the UI thread while waiting for the observable) and also in ObserveOn you want to observe the result on a main thread...

所以基本上 SubscribeOn 主要是在后台线程上订阅(执行)(您不想在等待可观察对象时阻塞 UI 线程),并且在 ObserveOn 中您想在主线程上观察结果......

If you are familiar with AsyncTask then SubscribeOn is similar to doInBackground method and ObserveOn to onPostExecute...

如果您熟悉 AsyncTask,那么 SubscribeOn 类似于 doInBackground 方法,而 ObserveOn 类似于 onPostExecute...

回答by Daksh Gargas

observeOn()simply changes the thread of all operators further Downstream. People usually have this misconceptionthat observeOnalso acts as upstream, but it doesn't.

observeOn()简单地改变所有运营商的线程进一步下游。人们通常有这种误解,它observeOn也充当上游,但事实并非如此。

The below example will explain it better..

下面的例子将更好地解释它..

Observable.just("Some string")                  // UI
       .map(str -> str.length())               // UI
       .observeOn(Schedulers.computation())   // Changing the thread
       .map(length -> 2 * length)            // Computation
       .subscribe(---)

subscribeOn()only influencesthe thread which is going to be used when Observable is going to get subscribed to and it will stay on it downstream.

subscribeOn()影响当 Observable 将要订阅时将使用的线程,并且它将留在下游。

Observable.just("Some String")              // Computation
  .map(str -> str.length())                // Computation
  .map(length -> 2 * length)              // Computation
  .subscribeOn(Schedulers.computation()) // -- changing the thread
  .subscribe(number -> Log.d("", "Number " + number));// Computation

Position does not matter (subscribeOn())

位置无关紧要 (subscribeOn())

Why? Because it affectsonly the time of subscription.

为什么?因为它只影响订阅的时间。

Methods that obey the contact with subscribeOn

服从接触的方法 subscribeOn

-> Basic example : Observable.create

-> 基本示例: Observable.create

All the work specified inside the createbody will run on the thread specified in subscribeOn.

create正文中指定的所有工作都将在 中指定的线程上运行subscribeOn

Another example: Observable.just,Observable.fromor Observable.range

另一个例子:Observable.just,Observable.fromObservable.range

Note:All those methods accept values, so do not use blocking methods to create those values, as subscribeOn won't affect it.

注意:所有这些方法都接受值,所以不要使用阻塞方法来创建这些值,因为 subscribeOn 不会影响它。

If you want to use blocking functions, use

如果要使用阻塞功能,请使用

Observable.defer(() -> Obervable.just(blockingMenthod())));

Observable.defer(() -> Obervable.just(blockingMenthod())));

Important Fact:

重要事实:

subscribeOn does not work with Subjects

subscribeOn 不适用于Subjects

Multiple subscribeOn:

多个subscribeOn

If there are multiple instances of subscribeOnin the stream, only the firstone has a practical effect.

如果流中有多个实例subscribeOn,则只有一个具有实际效果。

Subscribe & subscribeOn

订阅 &subscribeOn

People think that subscribeOnhas something to do with Observable.subscribe, but it doesn't have anything special to do with it. It only affects the subscription phase.

人们认为这subscribeOn与 有关系Observable.subscribe,但与它没有任何特殊关系。 它只影响订阅阶段

tl;drIf none of the above makes any sense, look at this code snippet

tl;dr如果以上都没有任何意义,请查看此代码片段

 Observable.just("Some string")                 
           .map(str -> str.length())              
           .observeOn(Schedulers.computation())   
           .map(length -> 2 * length)   
           .observeOn(AndroidSchedulers.mainThread())
           .subscribeOn(Schedulers.io())
           .subscribe(---)

Observe an observable, perform the map function on the UIthread, now switch to a Computation Threadand perform map(length -> 2 * length)function now make sure you Observe the output on Mainthread but perform all the tasks defined under subscribe()in a IOthread.

观察一个 observable,在UI线程上执行 map 函数,现在切换到一个计算线程和执行map(length -> 2 * length)函数,确保你在线程上观察输出,但执行subscribe()IO线程下定义的所有任务。

Source : Tomek Polański (Medium)

资料来源:Tomek Poański(

回答by user1032613

Summary

概括

  • Use observeOnto set threads for callbacks"further down the stream (below it)", such as code blocks inside doOnNextor map.
  • Use subscribeOnto set threads for initializations"upstream (above it)", such as doOnSubscribe, Observable.justor Observable.create.
  • Both methods can be called multiple times, with each call overwriting previous ones. Position matters.
  • 使用observeOn到线组为回调“进一步往下流(下面它)”,如内部代码块doOnNextmap
  • 使用subscribeOn到组线程初始化“上游(上面它)”,如doOnSubscribeObservable.justObservable.create
  • 这两种方法都可以多次调用,每次调用都会覆盖以前的调用。位置很重要。


Let's walk through this topic with an example: we want to find the length of the string "user1032613". This is not an easy task for computers, so it's only natural that we perform the intense calculation in a background thread, to avoid freezing the app.

让我们通过一个例子来介绍这个主题:我们想要找到字符串“user1032613”的长度。这对计算机来说不是一件容易的事,因此我们在后台线程中执行密集计算是很自然的,以避免冻结应用程序。

observeOn

观察

We can call observeOnas many times as we like, and it controls which thread all callbacks below itwill run. It's easy to use, and works just as you'd expect.

我们可以调用任意observeOn多次,它控制着它下面的所有回调将运行哪个线程。它易于使用,并且按您的预期工作。

For example, we will show a progress bar on the main UI thread, then do intensive/blocking operations in another thread, then come back to the main UI thread to update the result:

例如,我们将在主 UI 线程上显示一个进度条,然后在另一个线程中进行密集/阻塞操作,然后回到主 UI 线程更新结果:

    Observable.just("user1032613")

            .observeOn(mainThread) // set thread for operation 1
            .doOnNext {
                /* operation 1 */
                print("display progress bar")
                progressBar.visibility = View.VISIBLE
            }

            .observeOn(backThread) // set thread for operation 2 and 3
            .map {
                /* operation 2 */
                print("calculating")
                Thread.sleep(5000)
                it.length
            }

            .doOnNext {
                /* operation 3 */
                print("finished calculating")
            }

            .observeOn(mainThread) // set thread for operation 4
            .doOnNext {
                /* operation 4 */
                print("hide progress bar and display result")
                progressBar.visibility = View.GONE
                resultTextView.text = "There're $it characters!"
            }

            .subscribe()

In the above example, /* operation 1 */is ran in the mainThreadbecause we set it using observeOn(mainThread)on the line right above it; then we switch to backThreadby calling observeOnagain, so /* operation 2 */will run there. Because we didn't change it before chaining /* operation 3 */, it will run in the back thread as well, just like /* operation 2 */; finally we call observeOn(mainThread)again, to make sure /* operation 4 */updates the UI from the main thread.

在上面的例子中,/* operation 1 */运行在mainThread因为我们在它observeOn(mainThread)上面的行上设置了它;然后我们再次backThread调用切换到observeOn,所以/* operation 2 */会在那里运行。因为我们在链接之前没有改变它/* operation 3 */,它也会在后台线程中运行,就像/* operation 2 */; 最后我们observeOn(mainThread)再次调用,以确保/* operation 4 */从主线程更新 UI。

subscribeOn

订阅

So we've learned observeOnsets threads for subsequent callbacks. What else are we missing? Well, the Observableitself, and its methods such as just(), create(), subscribe()and so on, are also code that needs to be executed. This is how objects are passed along the stream. We use subscribeOnto set threads for code related to Observableitself.

所以我们已经学习了observeOn为后续回调设置线程。我们还缺少什么?好了,Observable本身和它的方法,例如just()create()subscribe()等等,也都是代码被执行的需要。这就是对象沿流传递的方式。我们用来subscribeOn为与Observable自身相关的代码设置线程。

If we remove all the callbacks (controlled by observeOndiscussed earlier), we are left with the "skeleton code" that will, by default, run on whichever thread the code is written in (probably main thread):

如果我们删除所有回调(由observeOn前面讨论的控制),我们将留下“骨架代码”,默认情况下,它将在编写代码的任何线程(可能是主线程)上运行:

    Observable.just("user1032613")
            .observeOn(mainThread)
            .doOnNext {
            }
            .observeOn(backThread)
            .map {
            }
            .doOnNext {
            }
            .observeOn(mainThread)
            .doOnNext {
            }
            .subscribe()

If we aren't happy about this empty skeleton code running on main thread, we can use subscribeOnto change it. For example, maybe the first line Observable.just("user1032613")isn't as simple as creating a stream from my user name - maybe it's a string from the Internet, or perhaps you are using doOnSubscribefor some other intensive operations. In that case, you can call subscribeOn(backThread)to put some of the code in another thread.

如果我们对在主线程上运行的这个空框架代码不满意,我们可以用subscribeOn它来改变它。例如,也许第一行Observable.just("user1032613")不像根据我的用户名创建流那么简单——也许它是来自 Internet 的字符串,或者您正在使用doOnSubscribe其他一些密集型操作。在这种情况下,您可以调用subscribeOn(backThread)将一些代码放在另一个线程中。

Where to put subscribeOn

放在哪里 subscribeOn

At the time of writing this answer, there are some misconceptions saying "only call it once", "position does not matter", and "if you call it multiple times, only the first time counts". After lots of researches and experiments, it turns out subscribeOncan be usefully called multiple times.

在写这个答案的时候,有一些误解说“只调用一次”、“位置无关紧要”和“如果你多次调用它,只有第一次重要”。经过大量的研究和实验,结果证明subscribeOn可以多次调用。

Because Observableuses Builder Pattern(fancy name for "chaining methods one after another"), subscribeOnis applied in reverse order. Therefore, this method sets the thread for code above it, exactly the opposite of observeOn.

因为Observable使用了构建器模式(“一个接一个地链接方法”的奇特名称),subscribeOn所以以相反的顺序应用。因此,此方法为其上方代码设置线程,与 完全相反observeOn

We can experiment this using doOnSubscribemethod. This method is triggered on the subscription event, and it runs on the thread set by subscribeOn:

我们可以用doOnSubscribe方法来试验这个。此方法在订阅事件上触发,并在由subscribeOn以下设置的线程上运行:

    Observable.just("user1032613")
            .doOnSubscribe {
                print("#3 running on main thread")
            }
            .subscribeOn(mainThread) // set thread for #3 and just()
            .doOnNext {
            }
            .map {
            }
            .doOnSubscribe {
                print("#2 running on back thread")
            }
            .doOnNext {
            }
            .subscribeOn(backThread) // set thread for #2 above
            .doOnNext {
            }
            .doOnSubscribe {
                print("#1 running on default thread")
            }
            .subscribe()

It might be easier to follow the logic, if you read the above example from bottom to top, just like how Builder Pattern executes the code.

如果您从下到上阅读上面的示例,那么遵循逻辑可能会更容易,就像 Builder Pattern 执行代码的方式一样。

In this example, the first line Observable.just("user1032613")is run in the same thread as print("#3")because there are no more subscribeOnin-between them. This creates the illusion of "only the first call matters" for people who only care about code inside just()or create(). This quickly falls apart once you start doing more.

在此示例中,第一行在Observable.just("user1032613")同一线程中运行,print("#3")因为它们之间没有更多内容subscribeOn。这会给只关心just()create(). 一旦你开始做更多,很快就会崩溃



Footnote:

脚注:

Threads and print()functions in the examples are defined, for brevity, as follows:

print()为简洁起见,示例中的线程和函数定义如下:

val mainThread = AndroidSchedulers.mainThread()
val backThread = Schedulers.computation()
private fun print(msg: String) = Log.i("", "${Thread.currentThread().name}: $msg")

回答by Lukasz Frankowski

If someone finds rx java description hard to understand (as me for example), here is pure java explanation:

如果有人发现 rx java 描述难以理解(以我为例),这里是纯 java 解释:

subscribeOn()

订阅()

Observable.just("something")
  .subscribeOn(Schedulers.newThread())
  .subscribe(...);

Is equivalent of:

相当于:

Observable observable = Observable.just("something");
new Thread(() -> observable.subscribe(...)).start();

Because Observableemits values on subscribe()and here subscribe()goes in the separate thread, the values are also emitted in the same thread as subscribe(). This is why it works "upstream" (influences the thread for the previous operations) and "downstream".

由于Observable在发射值subscribe(),在这里subscribe()去在单独的线程,该值也发出在同一个线程subscribe()。这就是为什么它在“上游”(影响先前操作的线程)和“下游”工作的原因。

observeOn()

观察()

Observable.just("something")
  .observeOn(Schedulers.newThread())
  .subscribe(...);

Is equivalent of:

相当于:

Observable observable = Observable.just("something")
  .subscribe(it -> new Thread(() -> ...).start());

Here Observableemits values in the main thread, only the listener method is executed in the separate thread.

这里Observable在主线程中发出值,在单独的线程中只执行侦听器方法。