RxJava 和观察者代码的并行执行

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/35425832/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-11 16:39:33  来源:igfitidea点击:

RxJava and parallel execution of observer code

javasystem.reactiverx-java

提问by Pawan Mishra

I am having the following code using RxJava Observable api :

我有以下使用 RxJava Observable api 的代码:

Observable<Info> observable = fileProcessor.processFileObservable(processedFile.getAbsolutePath());
    observable
      .buffer(10000)
      .observeOn(Schedulers.computation())
      .subscribe(recordInfo -> {
        _logger.info("Running stage2 on thread with id : " + Thread.currentThread().getId());
          for(Info info : recordInfo) {
            // some I/O operation logic
         }
      }, 
      exception -> {
      }, 
      () -> {
      });

My expectation was that the observation code i.e. code inside the subscribe() method will be executed in parallel after I have specified the computation scheduler. Instead the code is still being executed sequentially on single thread. How can make the code run in parallel using RxJava api.

我的期望是在我指定了计算调度程序后,观察代码即 subscribe() 方法中的代码将并行执行。相反,代码仍然在单线程上按顺序执行。如何使用 RxJava api 使代码并行运行。

采纳答案by LordRaydenMK

RxJava is often misunderstood when it comes to the asynchronous/multithreaded aspects of it. The coding of multithreaded operations is simple, but understanding the abstraction is another thing.

当涉及到 RxJava 的异步/多线程方面时,它经常被误解。多线程操作的编码很简单,但理解抽象是另一回事。

A common question about RxJava is how to achieve parallelization, or emitting multiple items concurrently from an Observable. Of course, this definition breaks the Observable Contract which states that onNext() must be called sequentially and never concurrently by more than one thread at a time.

关于 RxJava 的一个常见问题是如何实现并行化,或者从一个 Observable 并发地发出多个项目。当然,这个定义打破了 Observable 契约,该契约规定 onNext() 必须按顺序调用,并且一次不能被多个线程同时调用。

To achieve parallelism you need multiple Observables.

要实现并行性,您需要多个 Observable。

This runs in a single thread:

这在单个线程中运行:

Observable<Integer> vals = Observable.range(1,10);

vals.subscribeOn(Schedulers.computation())
          .map(i -> intenseCalculation(i))
          .subscribe(val -> System.out.println("Subscriber received "
                  + val + " on "
                  + Thread.currentThread().getName()));

This runs in multiple threads:

这在多个线程中运行:

Observable<Integer> vals = Observable.range(1,10);

vals.flatMap(val -> Observable.just(val)
            .subscribeOn(Schedulers.computation())
            .map(i -> intenseCalculation(i))
).subscribe(val -> System.out.println(val));

Code and text comes from this blog post.

代码和文字来自这篇博

回答by Geralt_Encore

You have to specify subscribeOn(Schedulers.computation())instead of observeOn(Schedulers.computation())for that purpose. In subscribeOnyou declare in which thread you are going to emit your values. In observeOnyou declare in which thread you are going to handle and observe them.

您必须指定subscribeOn(Schedulers.computation())而不是observeOn(Schedulers.computation())为此目的。在subscribeOn你声明你将在哪个线程中发出你的值。在observeOn你声明你将在哪个线程中处理并观察它们。

回答by Arun

This still comes in the same sequence. Even on new threads

这仍然以相同的顺序出现。即使在新线程上

    Observable<Integer> ob3 = Observable.range(1, 5);

    ob3.flatMap(new Func1<Integer, Observable<Integer>>() {

        @Override
        public Observable<Integer> call(Integer pArg0) {

            return Observable.just(pArg0);
        }

    }).subscribeOn(Schedulers.newThread()).map(new Func1<Integer, Integer>() {

        @Override
        public Integer call(Integer pArg0) {

            try {
                Thread.sleep(1000 - (pArg0 * 100));
                System.out.println(pArg0 + "  ccc   " + Thread.currentThread().getName());
            } catch (Exception e) {
                e.printStackTrace();
            }

            return pArg0;
        }

    }).subscribe();

Output

输出

1 ccc RxNewThreadScheduler-1

2 ccc RxNewThreadScheduler-1

3 ccc RxNewThreadScheduler-1

4 ccc RxNewThreadScheduler-1

5 ccc RxNewThreadScheduler-1

1 ccc RxNewThreadScheduler-1

2 ccc RxNewThreadScheduler-1

3 ccc RxNewThreadScheduler-1

4 ccc RxNewThreadScheduler-1

5 ccc RxNewThreadScheduler-1

回答by alijandro

Using flatMapand specify to subscribe on Schedulers.computation()will achieve concurrency.

使用flatMap并指定订阅Schedulers.computation()将实现并发。

Here is a more practical example using Callable, from the output, we can see it will take about 2000 milliseconds to finish all the tasks.

这是一个更实际的例子,使用Callable,从输出中,我们可以看到完成所有任务大约需要 2000 毫秒。

static class MyCallable implements Callable<Integer> {

    private static final Object CALLABLE_COUNT_LOCK = new Object();
    private static int callableCount;

    @Override
    public Integer call() throws Exception {
        Thread.sleep(2000);
        synchronized (CALLABLE_COUNT_LOCK) {
            return callableCount++;
        }
    }

    public static int getCallableCount() {
        synchronized (CALLABLE_COUNT_LOCK) {
            return callableCount;
        }
    }
}

private static void runMyCallableConcurrentlyWithRxJava() {
    long startTimeMillis = System.currentTimeMillis();

    final Semaphore semaphore = new Semaphore(1);
    try {
        semaphore.acquire();
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    Observable.just(new MyCallable(), new MyCallable(), new MyCallable(), new MyCallable())
            .flatMap(new Function<MyCallable, ObservableSource<?>>() {
                @Override
                public ObservableSource<?> apply(@NonNull MyCallable myCallable) throws Exception {
                    return Observable.fromCallable(myCallable).subscribeOn(Schedulers.computation());
                }
            })
            .subscribeOn(Schedulers.computation())
            .subscribe(new Observer<Object>() {
                @Override
                public void onSubscribe(@NonNull Disposable d) {

                }

                @Override
                public void onNext(@NonNull Object o) {
                    System.out.println("onNext " + o);
                }

                @Override
                public void onError(@NonNull Throwable e) {

                }

                @Override
                public void onComplete() {
                    if (MyCallable.getCallableCount() >= 4) {
                        semaphore.release();
                    }
                }
            });


    try {
        semaphore.acquire();
    } catch (InterruptedException e) {
        e.printStackTrace();
    } finally {
        semaphore.release();
    }
    System.out.println("durationMillis " + (System.currentTimeMillis()-startTimeMillis));
}

回答by michalbrz

RxJava 2.0.5 introduced paralell flowsand ParallelFlowable, which makes parallel execution simpler and more declarative.

RxJava 2.0.5 引入了并行流ParallelFlowable,这使得并行执行更简单、更具声明性。

You no longer have to create Observable/Flowablewithin flatMap, you can simply call parallel()on Flowableand it returns ParallelFlowable.

你不再需要创建Observable/FlowableflatMap,您也可以叫parallel()Flowable并返回ParallelFlowable

It's not as feature rich as a regular Flowable, because concurrency raises many issues with Rx contracts, but you have basic map(), filter()and many more, which should be enough in most cases.

它不像常规的那样功能丰富Flowable,因为并发会引发 Rx 合约的许多问题,但是你有基本的map()filter()还有更多,在大多数情况下应该足够了。

So instead of this flow from @LordRaydenMK answer

所以而不是来自@LordRaydenMK 的这个流程回答

Observable<Integer> vals = Observable.range(1,10);

vals.flatMap(val -> Observable.just(val)
        .subscribeOn(Schedulers.computation())
        .map(i -> intenseCalculation(i))
    ).subscribe(val -> System.out.println(val));

now you can do:

现在你可以这样做:

Flowable<Integer> vals = Flowable.range(1, 10);

vals.parallel()
        .runOn(Schedulers.computation())
        .map(i -> intenseCalculation(i))
        .sequential()
        .subscribe(val -> System.out.println(val));