RxJava 和观察者代码的并行执行
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/35425832/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
RxJava and parallel execution of observer code
提问by Pawan Mishra
I am having the following code using RxJava Observable api :
我有以下使用 RxJava Observable api 的代码:
Observable<Info> observable = fileProcessor.processFileObservable(processedFile.getAbsolutePath());
observable
.buffer(10000)
.observeOn(Schedulers.computation())
.subscribe(recordInfo -> {
_logger.info("Running stage2 on thread with id : " + Thread.currentThread().getId());
for(Info info : recordInfo) {
// some I/O operation logic
}
},
exception -> {
},
() -> {
});
My expectation was that the observation code i.e. code inside the subscribe() method will be executed in parallel after I have specified the computation scheduler. Instead the code is still being executed sequentially on single thread. How can make the code run in parallel using RxJava api.
我的期望是在我指定了计算调度程序后,观察代码即 subscribe() 方法中的代码将并行执行。相反,代码仍然在单线程上按顺序执行。如何使用 RxJava api 使代码并行运行。
采纳答案by LordRaydenMK
RxJava is often misunderstood when it comes to the asynchronous/multithreaded aspects of it. The coding of multithreaded operations is simple, but understanding the abstraction is another thing.
当涉及到 RxJava 的异步/多线程方面时,它经常被误解。多线程操作的编码很简单,但理解抽象是另一回事。
A common question about RxJava is how to achieve parallelization, or emitting multiple items concurrently from an Observable. Of course, this definition breaks the Observable Contract which states that onNext() must be called sequentially and never concurrently by more than one thread at a time.
关于 RxJava 的一个常见问题是如何实现并行化,或者从一个 Observable 并发地发出多个项目。当然,这个定义打破了 Observable 契约,该契约规定 onNext() 必须按顺序调用,并且一次不能被多个线程同时调用。
To achieve parallelism you need multiple Observables.
要实现并行性,您需要多个 Observable。
This runs in a single thread:
这在单个线程中运行:
Observable<Integer> vals = Observable.range(1,10);
vals.subscribeOn(Schedulers.computation())
.map(i -> intenseCalculation(i))
.subscribe(val -> System.out.println("Subscriber received "
+ val + " on "
+ Thread.currentThread().getName()));
This runs in multiple threads:
这在多个线程中运行:
Observable<Integer> vals = Observable.range(1,10);
vals.flatMap(val -> Observable.just(val)
.subscribeOn(Schedulers.computation())
.map(i -> intenseCalculation(i))
).subscribe(val -> System.out.println(val));
Code and text comes from this blog post.
回答by Geralt_Encore
You have to specify subscribeOn(Schedulers.computation())
instead of observeOn(Schedulers.computation())
for that purpose.
In subscribeOn
you declare in which thread you are going to emit your values.
In observeOn
you declare in which thread you are going to handle and observe them.
您必须指定subscribeOn(Schedulers.computation())
而不是observeOn(Schedulers.computation())
为此目的。在subscribeOn
你声明你将在哪个线程中发出你的值。在observeOn
你声明你将在哪个线程中处理并观察它们。
回答by Arun
This still comes in the same sequence. Even on new threads
这仍然以相同的顺序出现。即使在新线程上
Observable<Integer> ob3 = Observable.range(1, 5);
ob3.flatMap(new Func1<Integer, Observable<Integer>>() {
@Override
public Observable<Integer> call(Integer pArg0) {
return Observable.just(pArg0);
}
}).subscribeOn(Schedulers.newThread()).map(new Func1<Integer, Integer>() {
@Override
public Integer call(Integer pArg0) {
try {
Thread.sleep(1000 - (pArg0 * 100));
System.out.println(pArg0 + " ccc " + Thread.currentThread().getName());
} catch (Exception e) {
e.printStackTrace();
}
return pArg0;
}
}).subscribe();
Output
输出
1 ccc RxNewThreadScheduler-1
2 ccc RxNewThreadScheduler-1
3 ccc RxNewThreadScheduler-1
4 ccc RxNewThreadScheduler-1
5 ccc RxNewThreadScheduler-1
1 ccc RxNewThreadScheduler-1
2 ccc RxNewThreadScheduler-1
3 ccc RxNewThreadScheduler-1
4 ccc RxNewThreadScheduler-1
5 ccc RxNewThreadScheduler-1
回答by alijandro
Using flatMap
and specify to subscribe on Schedulers.computation()
will achieve concurrency.
使用flatMap
并指定订阅Schedulers.computation()
将实现并发。
Here is a more practical example using Callable
, from the output, we can see it will take about 2000 milliseconds to finish all the tasks.
这是一个更实际的例子,使用Callable
,从输出中,我们可以看到完成所有任务大约需要 2000 毫秒。
static class MyCallable implements Callable<Integer> {
private static final Object CALLABLE_COUNT_LOCK = new Object();
private static int callableCount;
@Override
public Integer call() throws Exception {
Thread.sleep(2000);
synchronized (CALLABLE_COUNT_LOCK) {
return callableCount++;
}
}
public static int getCallableCount() {
synchronized (CALLABLE_COUNT_LOCK) {
return callableCount;
}
}
}
private static void runMyCallableConcurrentlyWithRxJava() {
long startTimeMillis = System.currentTimeMillis();
final Semaphore semaphore = new Semaphore(1);
try {
semaphore.acquire();
} catch (InterruptedException e) {
e.printStackTrace();
}
Observable.just(new MyCallable(), new MyCallable(), new MyCallable(), new MyCallable())
.flatMap(new Function<MyCallable, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(@NonNull MyCallable myCallable) throws Exception {
return Observable.fromCallable(myCallable).subscribeOn(Schedulers.computation());
}
})
.subscribeOn(Schedulers.computation())
.subscribe(new Observer<Object>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull Object o) {
System.out.println("onNext " + o);
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
if (MyCallable.getCallableCount() >= 4) {
semaphore.release();
}
}
});
try {
semaphore.acquire();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore.release();
}
System.out.println("durationMillis " + (System.currentTimeMillis()-startTimeMillis));
}
回答by michalbrz
RxJava 2.0.5 introduced paralell flowsand ParallelFlowable, which makes parallel execution simpler and more declarative.
RxJava 2.0.5 引入了并行流和ParallelFlowable,这使得并行执行更简单、更具声明性。
You no longer have to create Observable
/Flowable
within flatMap
, you can simply call parallel()
on Flowable
and it returns ParallelFlowable
.
你不再需要创建Observable
/Flowable
内flatMap
,您也可以叫parallel()
上Flowable
并返回ParallelFlowable
。
It's not as feature rich as a regular Flowable
, because concurrency raises many issues with Rx contracts, but you have basic map()
, filter()
and many more, which should be enough in most cases.
它不像常规的那样功能丰富Flowable
,因为并发会引发 Rx 合约的许多问题,但是你有基本的map()
,filter()
还有更多,在大多数情况下应该足够了。
So instead of this flow from @LordRaydenMK answer
所以而不是来自@LordRaydenMK 的这个流程回答
Observable<Integer> vals = Observable.range(1,10);
vals.flatMap(val -> Observable.just(val)
.subscribeOn(Schedulers.computation())
.map(i -> intenseCalculation(i))
).subscribe(val -> System.out.println(val));
now you can do:
现在你可以这样做:
Flowable<Integer> vals = Flowable.range(1, 10);
vals.parallel()
.runOn(Schedulers.computation())
.map(i -> intenseCalculation(i))
.sequential()
.subscribe(val -> System.out.println(val));