typescript 在 Angular 和 rxjs 中重新连接 websocket?

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/44060315/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-10-21 04:33:46  来源:igfitidea点击:

Reconnecting a websocket in Angular and rxjs?

angulartypescriptwebsocketrxjsngrx

提问by stwissel

I have a ngrx/store (v2.2.2) and rxjs (v5.1.0) based application that listens to a web socket for incoming data using an observable. When I start the application I receive incoming data flawlessly.

我有一个基于 ngrx/store (v2.2.2) 和 rxjs (v5.1.0) 的应用程序,它使用 observable 侦听 Web 套接字以获取传入数据。当我启动应用程序时,我会完美地接收传入的数据。

However after a while (updates are coming in quite infrequently) the connection seem to get lost and I don't get anymore incoming data. My code:

但是过了一段时间(更新很少),连接似乎丢失了,我不再收到传入的数据。我的代码:

The service

服务

import { Injectable, OnInit } from '@angular/core';
import { Observable } from 'rxjs';

@Injectable()
export class MemberService implements OnInit {

  private websocket: any;
  private destination: string = "wss://notessensei.mybluemix.net/ws/time";

  constructor() { }

  ngOnInit() { }

  listenToTheSocket(): Observable<any> {

    this.websocket = new WebSocket(this.destination);

    this.websocket.onopen = () => {
      console.log("WebService Connected to " + this.destination);
    }

    return Observable.create(observer => {
      this.websocket.onmessage = (evt) => {
        observer.next(evt);
      };
    })
      .map(res => res.data)
      .share();
  }
}

The subscriber

订阅者

  export class AppComponent implements OnInit {

  constructor(/*private store: Store<fromRoot.State>,*/ private memberService: MemberService) {}

  ngOnInit() {
    this.memberService.listenToTheSocket().subscribe((result: any) => {
      try {
        console.log(result);
        // const member: any = JSON.parse(result);
        // this.store.dispatch(new MemberActions.AddMember(member));
      } catch (ex) {
        console.log(JSON.stringify(ex));
      }
    })
  }
}

What do I need to do to reconnect the web socket when it times out, so the observable continues to emit incoming values?

我需要做什么才能在超时时重新连接 Web 套接字,以便 observable 继续发出传入值?

I had a look at some Q&A here, hereand hereand it didn't seem to address this question (in a way I could comprehend).

我在这里这里这里查看了一些问答,它似乎没有解决这个问题(以我可以理解的方式)。

Note: the websocket at wss://notessensei.mybluemix.net/ws/timeis live and emits a time stamp once a minute (in case one wants to test that).

注意:websocket atwss://notessensei.mybluemix.net/ws/time是实时的,并且每分钟发出一次时间戳(以防有人想测试)。

Advice is greatly appreciated!

非常感谢您的建议!

回答by Herman

Actually there now is a WebsocketSubject in rxjs!

实际上现在在 rxjs 中有一个 WebsocketSubject!

 import { webSocket } from 'rxjs/webSocket' // for RxJS 6, for v5 use Observable.webSocket

 let subject = webSocket('ws://localhost:8081');
 subject.subscribe(
    (msg) => console.log('message received: ' + msg),
    (err) => console.log(err),
    () => console.log('complete')
  );
 subject.next(JSON.stringify({ op: 'hello' }));

It does handle reconnection when you resubscribe to a broken connection. So for example write this to reconnect:

当您重新订阅断开的连接时,它会处理重新连接。所以例如写这个来重新连接:

subject.retry().subscribe(...)

See the docs for more info. Unfortunately the searchbox doesn't show the method, but you find it here:

有关更多信息,请参阅文档。不幸的是,搜索框没有显示该方法,但您可以在这里找到它:

http://reactivex.io/rxjs/class/es6/Observable.js~Observable.html#static-method-webSocket

http://reactivex.io/rxjs/class/es6/Observable.js~Observable.html#static-method-webSocket

that #-navigation is not working in my browser, so search for "webSocket" on that page.

#-navigation 在我的浏览器中不起作用,因此在该页面上搜索“webSocket”。

Source: http://reactivex.io/rxjs/file/es6/observable/dom/WebSocketSubject.js.html#lineNumber15

来源:http: //reactivex.io/rxjs/file/es6/observable/dom/WebSocketSubject.js.html#lineNumber15

回答by maxime1992

This might not be the good answer but it's way too much for a comment.

这可能不是一个好的答案,但评论太多了。

The problem might comes from your service :

问题可能来自您的服务:

listenToTheSocket(): Observable<any> {
  this.websocket = new WebSocket(this.destination);

  this.websocket.onopen = () => {
    console.log("WebService Connected to " + this.destination);
  }

  return Observable.create(observer => {
    this.websocket.onmessage = (evt) => {
      observer.next(evt);
    };
  })
  .map(res => res.data)
  .share();
}

Do you think that you go multiple times in your component into the ngOnInitmethod?
You should try to put a console.loginto ngOnInitto be sure.

你认为你在组件中多次进入ngOnInit方法吗?
你应该试着把 a console.logintongOnInit确定。

Because if you do so, in your service you'll override the this.websocketwith a new one.

因为如果你这样做,在你的服务中你会this.websocket用一个新的覆盖。

You should try something like that instead :

你应该尝试这样的事情:

@Injectable()
export class MemberService implements OnInit {

  private websocket: any;
  private websocketSubject$ = new BehaviorSubject<any>();
  private websocket$ = this.websocketSubject$.asObservable();

  private destination = 'wss://notessensei.mybluemix.net/ws/time';

  constructor() { }

  ngOnInit() { }

  listenToTheSocket(): Observable<any> {
    if (this.websocket) {
      return this.websocket$;
    }

    this.websocket = new WebSocket(this.destination);

    this.websocket.onopen = () => console.log(`WebService Connected to ${this.destination}`);

    this.websocket.onmessage = (res) => this.websocketSubject$.next(res => res.data);
  }
}

The BehaviorSubjectwill send the last value if it receives an event before you subscribe to it. Plus, as it's a subject, no need to use the shareoperator.

BehaviorSubject如果在您订阅事件之前接收到事件,它将发送最后一个值。另外,因为它是一个主题,所以不需要使用share运算符。

回答by carkod

For rxjs 6, websocket implementation.

对于 rxjs 6,websocket 实现。

import { webSocket } from 'rxjs/websocket';
let subject = webSocket('ws://localhost:8081');