RxJava 调度器的用例

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/31276164/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-11 10:52:27  来源:igfitidea点击:

Use cases for RxJava schedulers

javamultithreadingthread-safetyrx-javarx-android

提问by bcorso

In RxJava there are 5 different schedulersto choose from:

在 RxJava 中有5 种不同的调度程序可供选择:

  1. immediate(): Creates and returns a Scheduler that executes work immediately on the current thread.

  2. trampoline(): Creates and returns a Scheduler that queues work on the current thread to be executed after the current work completes.

  3. newThread(): Creates and returns a Scheduler that creates a new Thread for each unit of work.

  4. computation(): Creates and returns a Scheduler intended for computational work. This can be used for event-loops, processing callbacks and other computational work. Do not perform IO-bound work on this scheduler. Use Schedulers.io()instead.

  5. io(): Creates and returns a Scheduler intended for IO-bound work. The implementation is backed by an Executor thread-pool that will grow as needed. This can be used for asynchronously performing blocking IO. Do not perform computational work on this scheduler. Use Schedulers.computation()instead.

  1. immediate():创建并返回一个在当前线程上立即执行工作的调度程序。

  2. Trampoline():创建并返回一个调度程序,该调度程序将当前工作完成后要执行的当前线程上的工作排队。

  3. newThread():创建并返回一个调度程序,该调度程序为每个工作单元创建一个新线程。

  4. 计算():创建并返回用于计算工作的调度程序。这可用于事件循环、处理回调和其他计算工作。不要在这个调度器上执行 IO 绑定的工作。使用调度程序。io()代替。

  5. io():创建并返回一个用于 IO 绑定工作的调度程序。该实现由 Executor 线程池支持,该线程池将根据需要增长。这可用于异步执行阻塞 IO。不要在这个调度器上执行计算工作。使用调度程序。计算()代替。

Questions:

问题:

The first 3 schedulers are pretty self explanatory; however, I'm a little confused about computationand io.

前 3 个调度程序是不言自明的;但是,我对计算io有点困惑。

  1. What exactly is "IO-bound work"? Is it used for dealing with streams (java.io) and files (java.nio.files)? Is it used for database queries? Is it used for downloading files or accessing REST APIs?
  2. How is computation()different from newThread()? Is it that all computation()calls are on a single (background) thread instead of a new (background) thread each time?
  3. Why is it bad to call computation()when doing IO work?
  4. Why is it bad to call io()when doing computational work?
  1. 究竟什么是“IO 绑定工作”?是否用于处理流(java.io)和文件(java.nio.files)?它用于数据库查询吗?它是用于下载文件还是访问 REST API?
  2. 如何计算()从不同newThread() ?是不是所有的计算()调用都在单个(后台)线程上而不是每次都在一个新的(后台)线程上?
  3. 为什么在做 IO 工作时调用计算()不好?
  4. 为什么在做计算工作时调用io()不好?

采纳答案by Dave Moten

Great questions, I think the documentation could do with some more detail.

很好的问题,我认为文档可以提供更多细节。

  1. io()is backed by an unbounded thread-pool and is the sort of thing you'd use for non-computationally intensive tasks, that is stuff that doesn't put much load on the CPU. So yep interaction with the file system, interaction with databases or services on a different host are good examples.
  2. computation()is backed by a bounded thread-pool with size equal to the number of available processors. If you tried to schedule CPU intensive work in parallel across more than the available processors (say using newThread()) then you are up for thread creation overhead and context switching overhead as threads vie for a processor and it's potentially a big performance hit.
  3. It's best to leave computation()for CPU intensive work only otherwise you won't get good CPU utilization.
  4. It's bad to call io()for computational work for the reason discussed in 2. io()is unbounded and if you schedule a thousand computational tasks on io()in parallel then each of those thousand tasks will each have their own thread and be competing for CPU incurring context switching costs.
  1. io()由无限线程池支持,是您用于非计算密集型任务的那种东西,这种东西不会给 CPU 带来太多负载。所以是的,与文件系统的交互,与不同主机上的数据库或服务的交互都是很好的例子。
  2. computation()由大小等于可用处理器数量的有界线程池支持。如果您尝试在多个可用处理器上并行调度 CPU 密集型工作(例如 using newThread()),那么您将面临线程创建开销和上下文切换开销,因为线程会争夺处理器,这可能会对性能造成很大的影响。
  3. 最好computation()只进行 CPU 密集型工作,否则您将无法获得良好的 CPU 利用率。
  4. 调用io()计算工作是不好的,因为 2. 中讨论的原因io()是无限的,如果你io()并行调度一千个计算任务,那么这千个任务中的每一个都有自己的线程,并且会竞争 CPU,从而产生上下文切换成本。

回答by softjake

The most important point is that both Schedulers.ioand Schedulers.computationare backed by unbounded thread pools as opposed to the others mentioned in the question. This characteristic is only shared by the Schedulers.from(Executor)in the case the Executoris created with newCachedThreadPool(unbounded with an auto-reclaim thread pool).

最重要的一点是Schedulers.ioSchedulers.computation都由无界线程池支持,而不是问题中提到的其他线程池。此特性仅由Schedulers.from(Executor)Executor使用newCachedThreadPool创建的情况下共享不受自动回收线程池的限制)。

As abundantly explained in previous responses and multiple articles on the web, Schedulers.ioand Schedulers.computationshall be used carefully as they are optimized for the type of work in their name. But, to my point of view, they're most important role is to provide real concurrency to reactive streams.

正如在之前的回复和网络上的多篇文章中充分解释的那样,Schedulers.ioSchedulers.computation应谨慎使用,因为它们已针对其名称中的工作类型进行了优化。但是,在我看来,它们最重要的作用是为响应式流提供真正的并发性

Contrary to newcomers belief, reactive streams are not inherently concurrent but inherently asynchronous and sequential. For this very reason, Schedulers.ioshall be used only when the I/O operation is blocking (eg: using a blocking command such as Apache IOUtils FileUtils.readFileAsString(...)) thus would freeze the calling thread until the operation is done.

与新手的看法相反,反应式流本质上不是并发的,而是本质上是异步和顺序的。正是因为这个原因,Schedulers.io仅在 I/O 操作被阻塞时才使用(例如:使用阻塞命令,如 Apache IOUtils FileUtils.readFileAsString (...)),因此会冻结调用线程,直到操作被阻塞完毕。

Using an asynchronous method such as Java AsynchronousFileChannel(...) wouldn't block the calling thread during the operation so there is no point in using a separate thread. In fact, Schedulers.iothreads are not really a good fit for asynchronous operations as they don't run an event loop and the callback would never... be called.

使用诸如 Java AsynchronousFileChannel(...) 之类的异步方法不会在操作期间阻塞调用线程,因此没有必要使用单独的线程。事实上,Schedulers.io线程并不真正适合异步操作,因为它们不运行事件循环并且回调永远不会......被调用。

The same logic applies for database access or remote API calls. Don't use the Schedulers.ioif you can use an asynchronous or reactive API to make the call.

相同的逻辑适用于数据库访问或远程 API 调用。如果您可以使用异步或反应式 API 进行调用,请不要使用Schedulers.io

Back to concurrency. You may not have access to an async or reactive API to do I/O operations asynchronously or concurrently, so your only alternative is to dispatch multiple calls on a separate thread. Alas, Reactive streams are sequential at their endsbut the good news is that the flatMap()operator can introduce concurrency at their core.

回到并发。您可能无法访问异步或反应式 API 来异步或并发地执行 I/O 操作,因此您唯一的选择是在单独的线程上分派多个调用。唉,无流是在其两端顺序,但好消息是,flatMap()操作人员可以在他们的核心介绍并发

Concurrency must be built in the stream construct, typically using the flatMap()operator. This powerful operator can be configured to internally provide a multi-threaded context to your flatMap()embedded Function<T, R>. That context is provided by a multi-threaded Scheduler such as Scheduler.ioor Scheduler.computation.

并发必须在流构造中构建,通常使用flatMap()运算符。这个强大的运算符可以配置为在内部为flatMap()嵌入的 Function<T, R>提供多线程上下文。该上下文由多线程调度程序提供,例如Scheduler.ioScheduler.computation

Find more details in articles on RxJava2 Schedulersand Concurrencywhere you'll find code sample and detailed explanations on how to use Schedulers sequentially and concurrently.

在有关 RxJava2调度程序并发的文章中找到更多详细信息,您将在其中找到有关如何顺序和并发使用调度程序的代码示例和详细说明。

Hope this helps,

希望这可以帮助,

Softjake

软Hyman

回答by joe

This blog post provides an excellent answer

这篇博文提供了一个很好的答案

From the blog post:

来自博客文章:

Schedulers.io()is backed by an unbounded thread pool. It is used for non CPU-intensive I/O type work including interaction with the file system, performing network calls, database interactions, etc. This thread pool is intended to be used for asynchronously performing blocking IO.

Schedulers.io()由无界线程池支持。它用于非 CPU 密集型 I/O 类型的工作,包括与文件系统的交互、执行网络调用、数据库交互等。此线程池旨在用于异步执行阻塞 IO。

Schedulers.computation()is backed by a bounded thread pool with size up to the number of available processors. It is used for computational or CPU-intensive work such as resizing images, processing large data sets, etc. Be careful: when you allocate more computation threads than available cores, performance will degrade due to context switching and thread creation overhead as threads vie for processors' time.

Schedulers.computation()由一个有界线程池支持,其大小最多为可用处理器的数量。它用于计算或 CPU 密集型工作,例如调整图像大小、处理大型数据集等。 请注意:当您分配的计算线程多于可用内核时,由于线程争用上下文切换和线程创建开销,性能将下降处理器的时间。

Schedulers.newThread()creates a new thread for each unit of work scheduled. This scheduler is expensive as new thread is spawned every time and no reuse happens.

Schedulers.newThread()为每个调度的工作单元创建一个新线程。这个调度器很昂贵,因为每次都会产生新线程并且不会发生重用。

Schedulers.from(Executor executor)creates and returns a custom scheduler backed by the specified executor. To limit the number of simultaneous threads in the thread pool, use Scheduler.from(Executors.newFixedThreadPool(n)). This guarantees that if a task is scheduled when all threads are occupied, it will be queued. The threads in the pool will exist until it is explicitly shutdown.

Schedulers.from(Executor executor)创建并返回由指定执行程序支持的自定义调度程序。要限制线程池中并发线程的数量,请使用 Scheduler.from(Executors.newFixedThreadPool(n))。这保证了如果一个任务在所有线程都被占用时被调度,它将被排队。池中的线程将一直存在,直到它被明确关闭。

Main thread or AndroidSchedulers.mainThread()is provided by the RxAndroid extension library to RxJava. Main thread (also known as UI thread) is where user interaction happens. Care should be taken not to overload this thread to prevent janky non-responsive UI or, worse, Application Not Responding” (ANR) dialog.

主线程或AndroidSchedulers.mainThread()由 RxAndroid 扩展库提供给 RxJava。主线程(也称为 UI 线程)是用户交互发生的地方。应注意不要使该线程过载,以防止出现卡顿的无响应 UI,或者更糟的是,应用程序无响应”(ANR) 对话框。

Schedulers.single()is new in RxJava 2. This scheduler is backed by a single thread executing tasks sequentially in the order requested.

Schedulers.single()是 RxJava 2 中的新功能。这个调度器由一个单线程支持,按照请求的顺序依次执行任务。

Schedulers.trampoline()executes tasks in a FIFO (First In, First Out) manner by one of the participating worker threads. It's often used when implementing recursion to avoid growing the call stack.

Schedulers.trampoline()由参与的工作线程之一以 FIFO(先进先出)方式执行任务。它通常在实现递归时使用,以避免增加调用堆栈。