javascript Node.js Streams 与 Observables
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/30423413/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Node.js Streams vs. Observables
提问by urish
After learning about Observables, I find them quite similar to Node.js streams. Both have a mechanism of notifying the consumer whenever new data arrives, an error occurs or there is no more data (EOF).
在了解了Observables 之后,我发现它们与Node.js 流非常相似。两者都有一种机制,可以在新数据到达、发生错误或没有更多数据 (EOF) 时通知消费者。
I would love to learn about the conceptual/functional differences between the two. Thanks!
我很想了解两者之间的概念/功能差异。谢谢!
回答by m4ktub
Both Observablesand node.js's Streamsallow you to solve the same underlying problem: asynchronously process a sequence of values. The main difference between the two, I believe, is related to the context that motivated its appearance. That context is reflected in the terminology and API.
无论观测量和node.js中的流让你解决同样的根本问题:异步处理值的序列。我认为,两者之间的主要区别与促使其出现的背景有关。该上下文反映在术语和 API 中。
On the Observablesside you have an extension to EcmaScript that introduces the reactive programming model. It tries to fill the gap between value generation and asynchronicity with the minimalist and composable concepts of Observer
and Observable
.
在Observables方面,你有一个 EcmaScript 的扩展,它引入了反应式编程模型。它试图填补值生成和异步之间的间隙用的极简和可组合的概念Observer
和Observable
。
On node.js and Streamsside you wanted to create an interface for the asynchronous and performant processing of network streams and local files. The terminology derives from that initial context and you get pipe
, chunk
, encoding
, flush
, Duplex
, Buffer
, etc. By having a pragmatic approach that provides explicit support for particular use cases you lose some ability to compose things because it's not as uniform. For example, you use push
on a Readable
stream and write
on a Writable
although, conceptually, you are doing the same thing: publishing a value.
在 node.js 和Streams方面,您希望为网络流和本地文件的异步和高性能处理创建一个接口。该术语源自最初的上下文,您会得到pipe
, chunk
, encoding
, flush
, Duplex
, Buffer
, 等等。通过使用为特定用例提供明确支持的务实方法,您会失去一些组合事物的能力,因为它不是统一的。例如,您push
在Readable
流write
上使用和Writable
虽然,从概念上讲,您正在做同样的事情:发布一个值。
So, in practice, if you look at the concepts, and if you use the option { objectMode: true }
, you can match Observable
with the Readable
stream and Observer
with the Writable
stream. You can even create some simple adapters between the two models.
因此,在实践中,如果你看的概念,如果你使用的选项{ objectMode: true }
,可以匹配Observable
与Readable
流和Observer
与Writable
流。您甚至可以在两个模型之间创建一些简单的适配器。
var Readable = require('stream').Readable;
var Writable = require('stream').Writable;
var util = require('util');
var Observable = function(subscriber) {
this.subscribe = subscriber;
}
var Subscription = function(unsubscribe) {
this.unsubscribe = unsubscribe;
}
Observable.fromReadable = function(readable) {
return new Observable(function(observer) {
function nop() {};
var nextFn = observer.next ? observer.next.bind(observer) : nop;
var returnFn = observer.return ? observer.return.bind(observer) : nop;
var throwFn = observer.throw ? observer.throw.bind(observer) : nop;
readable.on('data', nextFn);
readable.on('end', returnFn);
readable.on('error', throwFn);
return new Subscription(function() {
readable.removeListener('data', nextFn);
readable.removeListener('end', returnFn);
readable.removeListener('error', throwFn);
});
});
}
var Observer = function(handlers) {
function nop() {};
this.next = handlers.next || nop;
this.return = handlers.return || nop;
this.throw = handlers.throw || nop;
}
Observer.fromWritable = function(writable, shouldEnd, throwFn) {
return new Observer({
next: writable.write.bind(writable),
return: shouldEnd ? writable.end.bind(writable) : function() {},
throw: throwFn
});
}
You may have noticed that I changed a few names and used the simpler concepts of Observer
and Subscription
, introduced here, to avoid the overload of reponsibilities done by Observablesin Generator
. Basically, the Subscription
allows you to unsubscribe from the Observable
. Anyway, with the above code you can have a pipe
.
您可能已经注意到,我改变了一些名字和使用的简单的概念Observer
和Subscription
,介绍到这里,以避免做reponsibilities超负荷观测量在Generator
。基本上,Subscription
允许您取消订阅Observable
. 无论如何,使用上面的代码,您可以拥有一个pipe
.
Observable.fromReadable(process.stdin).subscribe(Observer.fromWritable(process.stdout));
Compared with process.stdin.pipe(process.stdout)
, what you have is a way to combine, filter, and transform streams that also works for any other sequence of data. You can achieve it with Readable
, Transform
, and Writable
streams but the API favors subclassing instead of chaining Readable
s and applying functions. On the Observable
model, For example, transforming values corresponds to applying a transformer function to the stream. It does not require a new subtype of Transform
.
与 相比process.stdin.pipe(process.stdout)
,您拥有的是一种组合、过滤和转换流的方法,该方法也适用于任何其他数据序列。您可以使用Readable
、Transform
和Writable
流来实现它,但 API 支持子类化而不是链接Readable
s 和应用函数。Observable
例如,在模型上,转换值对应于将转换函数应用于流。它不需要 的新子类型Transform
。
Observable.just = function(/*... arguments*/) {
var values = arguments;
return new Observable(function(observer) {
[].forEach.call(values, function(value) {
observer.next(value);
});
observer.return();
return new Subscription(function() {});
});
};
Observable.prototype.transform = function(transformer) {
var source = this;
return new Observable(function(observer) {
return source.subscribe({
next: function(v) {
observer.next(transformer(v));
},
return: observer.return.bind(observer),
throw: observer.throw.bind(observer)
});
});
};
Observable.just(1, 2, 3, 4, 5).transform(JSON.stringify)
.subscribe(Observer.fromWritable(process.stdout))
The conclusion? It's easy to introduce the reactive model and the Observable
concept anywhere. It's harder to implement an entire library around that concept. All those little functions need to work together consistently. After all, the ReactiveXproject is still going at it. But if you really need to send the file content to the client, deal with encoding, and zip it then the support it's there, in NodeJS, and it works pretty well.
结论?很容易在Observable
任何地方引入反应模型和概念。围绕该概念实现整个库更加困难。所有这些小功能都需要始终如一地协同工作。毕竟,ReactiveX项目仍在进行中。但是如果你真的需要将文件内容发送到客户端,处理编码,然后压缩它,那么它就在那里,在 NodeJS 中,它工作得很好。