Observable vs Flowable rxJava2
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/40323307/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Observable vs Flowable rxJava2
提问by user2141889
I have been looking at new rx java 2 and I'm not quite sure I understand the idea of backpressure
anymore...
我一直在看新的 rx java 2,但我不太确定我是否理解backpressure
了……
I'm aware that we have Observable
that does not have backpressure
support and Flowable
that has it.
我知道我们有Observable
没有得到backpressure
支持的支持Flowable
。
So based on example, lets say I have flowable
with interval
:
因此,基于例如,可以说我有flowable
有interval
:
Flowable.interval(1, TimeUnit.MILLISECONDS, Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
// do smth
}
});
This is going to crash after around 128 values, and thats pretty obvious I am consuming slower than getting items.
这将在大约 128 个值后崩溃,这很明显我消耗的速度比获取项目慢。
But then we have the same with Observable
但后来我们有同样的 Observable
Observable.interval(1, TimeUnit.MILLISECONDS, Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
// do smth
}
});
This will not crash at all, even when I put some delay on consuming it still works. To make Flowable
work lets say I put onBackpressureDrop
operator, crash is gone but not all values are emitted either.
这根本不会崩溃,即使我延迟使用它仍然有效。为了Flowable
工作,可以说我把onBackpressureDrop
操作符,崩溃消失了,但也不是所有的值都被发出。
So the base question I can not find answer currently in my head is why should I care about backpressure
when I can use plain Observable
still receive all values without managing the buffer
? Or maybe from the other side, what advantages do backpressure
give me in favour of managing and handling the consuming?
因此,我目前在脑海中找不到答案的基本问题是,为什么我应该关心backpressure
何时可以使用 plainObservable
仍然接收所有值而无需管理buffer
?或者从另一方面来说,backpressure
我在管理和处理消费方面有什么优势?
采纳答案by akarnokd
What backpressure manifests in practice is bounded buffers, Flowable.observeOn
has a buffer of 128 elements that gets drained as fast as the dowstream can take it. You can increase this buffer size individually to handle bursty source and all the backpressure-management practices still apply from 1.x. Observable.observeOn
has an unbounded buffer that keeps collecting the elements and your app may run out of memory.
背压在实践中表现出来的是有界缓冲区,它Flowable.observeOn
有一个包含 128 个元素的缓冲区,可以尽可能快地排空。您可以单独增加此缓冲区大小以处理突发源,并且所有背压管理实践仍然适用于 1.x。Observable.observeOn
有一个无限的缓冲区,不断收集元素,你的应用程序可能会耗尽内存。
You may use Observable
for example:
Observable
例如,您可以使用:
- handling GUI events
- working with short sequences (less than 1000 elements total)
- 处理 GUI 事件
- 处理短序列(总共少于 1000 个元素)
You may use Flowable
for example:
Flowable
例如,您可以使用:
- cold and non-timed sources
- generator like sources
- network and database accessors
- 冷源和非定时源
- 像源一样的生成器
- 网络和数据库访问器
回答by Egor
The fact that your Flowable
crashed after emitting 128 values without backpressure handling doesn't mean it will always crash after exactly 128 values: sometimes it will crash after 10, and sometimes it will not crash at all. I believe this is what happened when you tried the example with Observable
- there happened to be no backpressure, so your code worked normally, next time it may not. The difference in RxJava 2 is that there is no concept of backpressure in Observable
s anymore, and no way to handle it. If you're designing a reactive sequence that will probably require explicit backpressure handling - then Flowable
is your best choice.
在Flowable
没有背压处理的情况下发出 128 个值后崩溃的事实并不意味着它总是在恰好 128 个值后崩溃:有时它会在 10 个值后崩溃,有时它根本不会崩溃。我相信这就是您尝试示例时Observable
发生的情况 - 碰巧没有背压,因此您的代码正常工作,下次可能不会。RxJava 2 的不同之处在于Observable
s 中不再有背压的概念,也没有办法处理。如果您正在设计一个可能需要显式背压处理的反应式序列 - 那么这Flowable
是您的最佳选择。
回答by j2emanue
Backpressure is when your observable (publisher) is creating more events than your subscriber can handle. So you can get subscribers missing events, or you can get a huge queue of events which just leads to out of memory eventually. Flowable
takes backpressure into consideration. Observable
does not. Thats it.
背压是指您的可观察对象(发布者)创建的事件超出订阅者的处理能力。所以你可以让订阅者丢失事件,或者你可以获得一个巨大的事件队列,最终导致内存不足。 Flowable
考虑到背压。 Observable
才不是。就是这样。
it reminds me of a funnel which when it has too much liquid overflows. Flowable can help with not making that happen:
它让我想起了一个漏斗,当它有太多液体溢出时。Flowable 可以帮助避免这种情况发生:
with tremendous backpressure:
背压巨大:
but with using flowable, there is much less backpressure :
但是使用flowable,背压要小得多:
Rxjava2 has a few backpressure strategies you can use depending on your usecase. by strategy i mean Rxjava2 supplies a way to handle the objects that cannot be processed because of the overflow (backpressure).
Rxjava2 有一些你可以根据你的用例使用的背压策略。通过策略,我的意思是 Rxjava2 提供了一种处理由于溢出(背压)而无法处理的对象的方法。
here are the strategies.I wont go through them all, but for example if you want to not worry about the items that are overflowed you can use a drop strategy like this:
这是策略。我不会全部介绍,但例如,如果您不想担心溢出的项目,您可以使用这样的丢弃策略:
observable.toFlowable(BackpressureStrategy.DROP)
observable.toFlowable(BackpressureStrategy.DROP)
As far as i know there should be a 128 item limit on the queue, after that there can be a overflow (backpressure). Even if its not 128 its close to that number. Hope this helps someone.
据我所知,队列中应该有 128 个项目限制,之后可能会出现溢出(背压)。即使它不是 128 也接近这个数字。希望这可以帮助某人。
if you need to change the buffer size from 128 it looks like it can be done like this (but watch any memory constraints:
如果您需要将缓冲区大小从 128 更改为看起来可以这样做(但请注意任何内存限制:
myObservable.toFlowable(BackpressureStrategy.MISSING).buffer(256); //but using MISSING might be slower.
in software developement usually back pressure strategy means your telling the emitter to slow down a bit as the consumer cannot handle the velocity your emitting events.
在软件开发中,背压策略通常意味着您告诉发射器放慢一点,因为消费者无法处理发射事件的速度。