Javascript RxJS 序列等价于 promise.then()?

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/34523338/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-23 16:30:12  来源:igfitidea点击:

RxJS sequence equivalent to promise.then()?

javascriptrxjs

提问by Haoliang Yu

I used to develop a lot with promise and now I am moving to RxJS. The doc of RxJS doesn't provide a very clear example on how to move from promise chain to observer sequence.

我曾经用 promise 开发了很多东西,现在我正在转向 RxJS。RxJS 的文档没有提供一个非常清晰的例子,说明如何从承诺链转移到观察者序列。

For example, I usually write promise chain with multiple steps, like

例如,我通常用多个步骤编写承诺链,例如

// a function that returns a promise
getPromise()
.then(function(result) {
   // do something
})
.then(function(result) {
   // do something
})
.then(function(result) {
   // do something
})
.catch(function(err) {
    // handle error
});

How should I rewrite this promise chain in the RxJS style?

我应该如何以 RxJS 风格重写这个承诺链?

采纳答案by user3743222

For data flow (equivalent to then):

对于数据流(相当于then):

Rx.Observable.fromPromise(...)
  .flatMap(function(result) {
   // do something
  })
  .flatMap(function(result) {
   // do something
  })
  .subscribe(function onNext(result) {
    // end of chain
  }, function onError(error) {
    // process the error
  });

A promise can be converted into an observable with Rx.Observable.fromPromise.

一个 promise 可以转换成一个 observable Rx.Observable.fromPromise

Some promise operators have a direct translation. For instance RSVP.all, or jQuery.whencan be replaced by Rx.Observable.forkJoin.

一些承诺运算符有直接翻译。例如RSVP.all, 或jQuery.when可以替换为Rx.Observable.forkJoin

Keep in mind that you have a bunch of operators that allows to transform data asynchronously, and to perform tasks that you cannot or would be very hard to do with promises. Rxjs reveals all its powers with asynchronous sequences of data (sequence i.e. more than 1 asynchronous value).

请记住,您有一堆操作符,它们允许异步转换数据,并执行您不能或很难用 Promise 完成的任务。Rxjs 展示了异步数据序列(即超过 1 个异步值的序列)的所有功能。

For error management, the subject is a little bit more complex.

对于错误管理,这个主题有点复杂。

  • there are catchand finallyoperators too
  • retryWhencan also help to repeat a sequence in case of error
  • you can also deal with errors in the subscriber itself with the onErrorfunction.
  • 也有catchfinally操作符
  • retryWhen还可以帮助在出现错误时重复序列
  • 您还可以使用该onError函数处理订阅者本身的错误。

For precise semantics, have a deeper look at the documentation and examples you can find on the web, or ask specific questions here.

要获得精确的语义,请更深入地查看您可以在网络上找到的文档和示例,或在此处提出具体问题。

This would definitely be a good starting point for going deeper in error management with Rxjs : https://xgrommx.github.io/rx-book/content/getting_started_with_rxjs/creating_and_querying_observable_sequences/error_handling.html

这绝对是使用 Rxjs 进行更深入错误管理的一个很好的起点:https://xgrommx.github.io/rx-book/content/getting_started_with_rxjs/creating_and_querying_observable_sequences/error_handling.html

回答by mik01aj

A more modern alternative:

一个更现代的选择:

import {from as fromPromise} from 'rxjs';
import {catchError, flatMap} from 'rxjs/operators';

fromPromise(...).pipe(
   flatMap(result => {
       // do something
   }),
   flatMap(result => {
       // do something
   }),
   flatMap(result => {
       // do something
   }),
   catchError(error => {
       // handle error
   })
)

Also note that for all this to work, you need to subscribeto this piped Observablesomewhere, but I assume it's handled in some other part of the application.

还要注意,要使所有这些工作,您需要将其subscribe通过管道传输Observable到某处,但我认为它是在应用程序的其他部分处理的。

回答by arcseldon

Update May 2019, using RxJs 6

2019 年 5 月更新,使用 RxJs 6

Agree with the provided answers above, wished to add a concrete example with some toy data & simple promises (with setTimeout) using RxJs v6to add clarity.

同意上面提供的答案,希望使用RxJs v6添加一个带有一些玩具数据和简单承诺(使用 setTimeout)的具体示例以增加清晰度。

Just update the passed id (currently hard-coded as 1) to something that does not exist to execute the error handling logic too. Importantly, also note the use of ofwith catchErrormessage.

只需将传递的 id(当前硬编码为1)更新为不存在的内容即可执行错误处理逻辑。重要的是,还要注意ofwithcatchError消息的使用。

import { from as fromPromise, of } from "rxjs";
import { catchError, flatMap, tap } from "rxjs/operators";

const posts = [
  { title: "I love JavaScript", author: "Wes Bos", id: 1 },
  { title: "CSS!", author: "Chris Coyier", id: 2 },
  { title: "Dev tools tricks", author: "Addy Osmani", id: 3 }
];

const authors = [
  { name: "Wes Bos", twitter: "@wesbos", bio: "Canadian Developer" },
  {
    name: "Chris Coyier",
    twitter: "@chriscoyier",
    bio: "CSS Tricks and CodePen"
  },
  { name: "Addy Osmani", twitter: "@addyosmani", bio: "Googler" }
];

function getPostById(id) {
  return new Promise((resolve, reject) => {
    setTimeout(() => {
      const post = posts.find(post => post.id === id);
      if (post) {
        console.log("ok, post found!");
        resolve(post);
      } else {
        reject(Error("Post not found!"));
      }
    }, 200);
  });
}

function hydrateAuthor(post) {
  return new Promise((resolve, reject) => {
    setTimeout(() => {
      const authorDetails = authors.find(person => person.name === post.author);
      if (authorDetails) {
        post.author = authorDetails;
        console.log("ok, post hydrated with author info");
        resolve(post);
      } else {
        reject(Error("Author not Found!"));
      }
    }, 200);
  });
}

function dehydratePostTitle(post) {
  return new Promise((resolve, reject) => {
    setTimeout(() => {
      delete post.title;
      console.log("ok, applied transformation to remove title");
      resolve(post);
    }, 200);
  });
}

// ok, here is how it looks regarding this question..
let source$ = fromPromise(getPostById(1)).pipe(
  flatMap(post => {
    return hydrateAuthor(post);
  }),
  flatMap(post => {
    return dehydratePostTitle(post);
  }),
  catchError(error => of(`Caught error: ${error}`))
);

source$.subscribe(console.log);

Output Data:

输出数据:

ok, post found!
ok, post hydrated with author info
ok, applied transformation to remove title
{ author:
   { name: 'Wes Bos',
     twitter: '@wesbos',
     bio: 'Canadian Developer' },
  id: 1 }

The key part, is equivalent to the following using plain promise control flow:

关键部分,等价于以下使用简单的 promise 控制流:

getPostById(1)
  .then(post => {
    return hydrateAuthor(post);
  })
  .then(post => {
    return dehydratePostTitle(post);
  })
  .then(author => {
    console.log(author);
  })
  .catch(err => {
    console.error(err);
  });

回答by Anand Rockzz

This is how I did it.

我就是这样做的。

Previously

之前

  public fetchContacts(onCompleteFn: (response: gapi.client.Response<gapi.client.people.ListConnectionsResponse>) => void) {
    const request = gapi.client.people.people.connections.list({
      resourceName: 'people/me',
      pageSize: 100,
      personFields: 'phoneNumbers,organizations,emailAddresses,names'
    }).then(response => {
      onCompleteFn(response as gapi.client.Response<gapi.client.people.ListConnectionsResponse>);
    });
  }

// caller:

  this.gapi.fetchContacts((rsp: gapi.client.Response<gapi.client.people.ListConnectionsResponse>) => {
      // handle rsp;
  });

After(ly?)

之后(ly?)

public fetchContacts(): Observable<gapi.client.Response<gapi.client.people.ListConnectionsResponse>> {
    return from(
      new Promise((resolve, reject) => {
        gapi.client.people.people.connections.list({
          resourceName: 'people/me',
          pageSize: 100,
          personFields: 'phoneNumbers,organizations,emailAddresses,names'
        }).then(result => {
          resolve(result);
        });
      })
    ).pipe(map((result: gapi.client.Response<gapi.client.people.ListConnectionsResponse>) => {
      return result; //map is not really required if you not changing anything in the response. you can just return the from() and caller would subscribe to it.
    }));
  }

// caller

this.gapi.fetchContacts().subscribe(((rsp: gapi.client.Response<gapi.client.people.ListConnectionsResponse>) => {
  // handle rsp
}), (error) => {
  // handle error
});

回答by Maksim Romanenko

if getPromisefunction is in a middle of a stream pipe you should simple wrap it into one of functions mergeMap, switchMapor concatMap(usually mergeMap):

如果getPromise函数在流管道的中间,你应该简单地将它包装成函数之一mergeMapswitchMap或者concatMap(通常mergeMap):

stream$.pipe(
   mergeMap(data => getPromise(data)),
   filter(...),
   map(...)
 ).subscribe(...);

if you want to start your stream with getPromise()then wrap it into fromfunction:

如果你想开始你的流getPromise()然后把它包装成from函数:

import {from} from 'rxjs';

from(getPromise()).pipe(
   filter(...)
   map(...)
).subscribe(...);

回答by Samantha Adrichem

As far as i just found out, if you return a result in a flatMap, it converts it to an Array, even if you returned a string.

据我刚刚发现,如果您在 flatMap 中返回结果,它会将其转换为数组,即使您返回的是一个字符串。

But if you return an Observable, that observable can return a string;

但是如果你返回一个 Observable,那个 observable 可以返回一个字符串;

回答by David Kabii

If I understood correctly, you mean consuming the values, in which case you use sbuscribe i.e.

如果我理解正确,您的意思是使用这些值,在这种情况下,您使用 sbuscribe 即

const arrObservable = from([1,2,3,4,5,6,7,8]);
arrObservable.subscribe(number => console.log(num) );

Additionally, you can just turn the observable to a promise using toPromise() as shown:

此外,您可以使用 toPromise() 将 observable 转换为 promise,如下所示:

arrObservable.toPromise().then()