javascript 如何在事件中使用带有 Socket.IO 的 RxJs

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/29813503/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-10-28 11:06:44  来源:igfitidea点击:

How to use RxJs with Socket.IO on event

javascriptsocketsrxjs

提问by Mitul

I want to use RxJS inside of my socket.on('sense',function(data){});. I am stuck and confused with very few documentation available and my lack of understanding RxJS. Here is my problem.

我想在我的socket.on('sense',function(data){});. 我对很少的可用文档以及我对 RxJS 的缺乏了解感到困惑和困惑。这是我的问题。

I have a distSensor.jsthat has a function pingEnd()

我有一个distSensor.js函数 pingEnd()

function pingEnd(x){
socket.emit("sense", dist); //pingEnd is fired when an Interrupt is generated.
}

Inside my App.js I have

在我的 App.js 里面我有

io.on('connection', function (socket) {
    socket.on('sense', function (data) {
        //console.log('sense from App4 was called ' + data);
    });
});

The sense function gets lots of sensor data which I want to filter using RxJS and I don't know what should I do next to use RxJs here. Any pointers to right docs or sample would help.

sense 函数获取了大量传感器数据,我想使用 RxJS 过滤这些数据,但我不知道接下来要在这里使用 RxJs 该怎么做。任何指向正确文档或示例的指针都会有所帮助。

采纳答案by raimohanska

I think you can use Rx.Observable.fromEvent(https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/core/operators/fromevent.md).

我认为你可以使用Rx.Observable.fromEventhttps://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/core/operators/fromevent.md)。

Here's how I did a similar thing using Bacon.js, which has a very similar API: https://github.com/raimohanska/bacon-minsk-2015/blob/gh-pages/server.js#L13

这是我如何使用 Bacon.js 做类似的事情,它有一个非常相似的 API:https: //github.com/raimohanska/bacon-minsk-2015/blob/gh-pages/server.js#L13

So in Bacon.js it would go like

所以在 Bacon.js 中它会像

io.on('connection', function(socket){
  Bacon.fromEvent(socket, "sense")
    .filter(function(data) { return true })
    .forEach(function(data) { dealWith(data) })
})

And in RxJs you'd replace Bacon.fromEventwith Rx.Observable.fromEvent.

而在 RxJs 中,你会Bacon.fromEventRx.Observable.fromEvent.

回答by Morten Poulsen

I have experienced some strange issues using the fromEvent method, so I prefer just to create my own Observable:

我在使用 fromEvent 方法时遇到了一些奇怪的问题,所以我更喜欢创建自己的 Observable:

function RxfromIO (io, eventName) {
  return Rx.Observable.create(observer => {
    io.on(eventName, (data) => {
        observer.onNext(data)
    });
    return {
        dispose : io.close
    }
});

I can then use like this:

然后我可以这样使用:

let $connection = RxfromIO(io, 'connection');

回答by Xesued

You can create an Observable like so:

你可以像这样创建一个 Observable:

var senses = Rx.Observable.fromEventPattern(
    function add (h) {
      socket.on('sense',h);
    }
  );

Then use senseslike any other Observable.

然后senses像任何其他 Observable 一样使用。

回答by Mick

Simply use fromEvent(). Here is a full example in Node.js but works the same in browser. Note that i use first()and takeUntil()to prevent a memory leak: first()only listens to one event and then completes. Now use takeUntil()on all other socket-events you listen to so the observables complete on disconnect:

只需使用fromEvent(). 这是 Node.js 中的完整示例,但在浏览器中的工作方式相同。请注意,我使用first()takeUntil()来防止内存泄漏:first()仅侦听一个事件然后完成。现在takeUntil()在您侦听的所有其他套接字事件上使用,以便在断开连接时完成观察:

const app = require('express')();
const server = require('http').createServer(app);
const io = require('socket.io')(server);
const Rx = require('rxjs/Rx');

connection$ = Rx.Observable.fromEvent(io, 'connection');

connection$.subscribe(socket => {
    console.log(`Client connected`);

    // Observables
    const disconnect$ = Rx.Observable.fromEvent(socket, 'disconnect').first();
    const message$ = Rx.Observable.fromEvent(socket, 'message').takeUntil(disconnect$);

    // Subscriptions
    message$.subscribe(data => {
        console.log(`Got message from client with data: ${data}`);
        io.emit('message', data); // Emit to all clients
    });

    disconnect$.subscribe(() => {
        console.log(`Client disconnected`);
    })
});

server.listen(3000);

回答by woudsma

ES6 one liner that i use, using ES7 bindsyntax:
(read $as stream)

我使用的 ES6 一个衬垫,使用ES7 绑定语法:(
$stream

import { Observable } from 'rxjs'

// create socket

const message$ = Observable.create($ => socket.on('message', ::$.next))
// translates to: Observable.create($ => socket.on('message', $.next.bind(this)))

// filter example
const subscription = message$
  .filter(message => message.text !== 'spam')
  //or .filter(({ text }) => text !== 'spam')
  .subscribe(::console.log)

回答by serkan

You can use rxjs-dom,

你可以使用 rxjs-dom,

Rx.DOM.fromWebSocket(url, protocol, [openObserver], [closeObserver])


// an observer for when the socket is open
var openObserver = Rx.Observer.create(function(e) {
  console.info('socket open');

  // Now it is safe to send a message
  socket.onNext('test');
});

// an observer for when the socket is about to close
var closingObserver = Rx.Observer.create(function() {
  console.log('socket is about to close');
});

// create a web socket subject
socket = Rx.DOM.fromWebSocket(
  'ws://echo.websocket.org',
  null, // no protocol
  openObserver,
  closingObserver);

// subscribing creates the underlying socket and will emit a stream of incoming
// message events
socket.subscribe(
  function(e) {
    console.log('message: %s', e.data);
  },
  function(e) {
    // errors and "unclean" closes land here
    console.error('error: %s', e);
  },
  function() {
    // the socket has been closed
    console.info('socket closed');
  }
);