C# 为什么 .NET Reactive Extensions 中不推荐使用 Subjects?

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/14396449/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-10 11:42:25  来源:igfitidea点击:

Why are Subjects not recommended in .NET Reactive Extensions?

c#system.reactivereactivex

提问by Anthony

I am currently getting to grips with the Reactive Extensions framework for .NET and I am working my way through the various introduction resources I've found (mainly http://www.introtorx.com)

我目前正在掌握 .NET 的 Reactive Extensions 框架,我正在研究我发现的各种介绍资源(主要是http://www.introtorx.com

Our application involves a number of hardware interfaces that detect network frames, these will be my IObservables, I then have a variety of components that will consume those frames or perform some manner of transform on the data and produce a new type of frame. There will also be other components that need to display every n'th frame for example. I am convinced that Rx is going to be useful for our application, however I am struggling with the implementation details for the IObserver interface.

我们的应用程序涉及许多检测网络帧的硬件接口,这些将是我的 IObservables,然后我有各种组件来使用这些帧或对数据执行某种方式的转换并产生一种新类型的帧。例如,还有其他组件需要显示每第 n 帧。我确信 Rx 对我们的应用程序很有用,但是我正在努力解决 IObserver 接口的实现细节。

Most (if not all) of the resources I have been reading have said that I should not implement the IObservable interface myself but use one of the provided functions or classes. From my research it appears that creating a Subject<IBaseFrame>would provide me what I need, I would have my single thread that reads data from the hardware interface and then calls the OnNext function of my Subject<IBaseFrame>instance. The different IObserver components would then receive their notifications from that Subject.

我读过的大部分(如果不是全部)资源都说我不应该自己实现 IObservable 接口,而是使用提供的函数或类之一。从我的研究看来,创建一个Subject<IBaseFrame>可以提供我需要的东西,我将拥有从硬件接口读取数据的单线程,然后调用我的Subject<IBaseFrame>实例的 OnNext 函数。然后,不同的 IObserver 组件将从该主题接收通知。

My confusion is coming from the advice give in the appendix of this tutorialwhere it says:

我的困惑来自本教程附录中给出的建议,其中说:

Avoid the use of the subject types. Rx is effectively a functional programming paradigm. Using subjects means we are now managing state, which is potentially mutating. Dealing with both mutating state and asynchronous programming at the same time is very hard to get right. Furthermore, many of the operators (extension methods) have been carefully written to ensure that correct and consistent lifetime of subscriptions and sequences is maintained; when you introduce subjects, you can break this. Future releases may also see significant performance degradation if you explicitly use subjects.

避免使用主题类型。Rx 实际上是一种函数式编程范式。使用主题意味着我们现在正在管理状态,这可能会发生变化。同时处理改变状态和异步编程是很难做到的。此外,许多运算符(扩展方法)都经过精心编写,以确保维护订阅和序列的正确和一致的生命周期;当您介绍主题时,您可以打破这一点。如果您明确使用主题,未来的版本也可能会出现显着的性能下降。

My application is quite performance critical, I am obviously going to test the performance of using the Rx patterns before it goes in to production code; however I am worried that I am doing something that is against the spirit of the Rx framework by using the Subject class and that a future version of the framework is going to hurt performance.

我的应用程序对性能非常关键,我显然会在进入生产代码之前测试使用 Rx 模式的性能;但是我担心我正在通过使用 Subject 类做一些违背 Rx 框架精神的事情,并且框架的未来版本会损害性能。

Is there a better way of doing what I want? The hardware polling thread is going to be running continuously whether there are any observers or not (the HW buffer will back up otherwise), so this is a very hot sequence. I need to then pass the received frames out to multiple observers.

有没有更好的方法来做我想做的事?无论是否有任何观察者,硬件轮询线程都将持续运行(否则硬件缓冲区将备份),因此这是一个非常热的序列。然后我需要将接收到的帧传递给多个观察者。

Any advice would be greatly appreciated.

任何建议将不胜感激。

采纳答案by Lee Campbell

Ok, If we ignore my dogmatic ways and ignore "subjects are good/bad" all together. Let us look at the problem space.

好吧,如果我们忽略我的教条方式并一起忽略“主题是好是坏”。让我们看看问题空间。

I bet you either have 1 of 2 styles of system you need to ingrate to.

我敢打赌,您要么拥有 2 种风格的系统中的 1 种,要么需要忘恩负义。

  1. The system raises an event or a call back when a message arrives
  2. You need to poll the system to see if there are any message to process
  1. 系统在消息到达时引发事件或回调
  2. 您需要轮询系统以查看是否有任何消息要处理

For option 1, easy, we just wrap it with the appropriate FromEvent method and we are done. To the Pub!

对于选项 1,简单,我们只需使用适当的 FromEvent 方法包装它,我们就完成了。到酒吧!

For option 2, we now need to consider how we poll this and how to do this effciently. Also when we get the value, how do we publish it?

对于选项 2,我们现在需要考虑如何进行轮询以及如何有效地执行此操作。另外,当我们获得价值时,我们如何发布它?

I would imagine that you would want a dedicated thread for polling. You wouldn't want some other coder hammering the ThreadPool/TaskPool and leaving you in a ThreadPool starvation situation. Alternatively you don't want the hassle of context switching (I guess). So assume we have our own thread, we will probably have some sort of While/Sleep loop that we sit in to poll. When the check finds some messages we publish them. Well all of this sounds perfect for Observable.Create. Now we probably cant use a While loop as that wont allow us to ever return a Disposable to allow cancellation. Luckily you have read the whole book so are savvy with Recursive scheduling!

我想你会想要一个专门的线程来进行轮询。您不希望其他编码人员敲打 ThreadPool/TaskPool 并让您陷入 ThreadPool 饥饿的境地。或者你不想要上下文切换的麻烦(我猜)。所以假设我们有自己的线程,我们可能会有某种 While/Sleep 循环供我们轮询。当检查发现一些消息时,我们发布它们。好吧,所有这些听起来都非常适合 Observable.Create。现在我们可能不能使用 While 循环,因为它不允许我们返回 Disposable 以允许取消。幸运的是,您已经阅读了整本书,因此对递归调度非常了解!

I imagine something like this could work. #NotTested

我想这样的事情可以奏效。#未测试

public class MessageListener
{
    private readonly IObservable<IMessage> _messages;
    private readonly IScheduler _scheduler;

    public MessageListener()
    {
        _scheduler = new EventLoopScheduler();

        var messages = ListenToMessages()
                                    .SubscribeOn(_scheduler)
                                    .Publish();

        _messages = messages;
        messages.Connect();
    }

    public IObservable<IMessage> Messages
    {
        get {return _messages;}
    }

    private IObservable<IMessage> ListenToMessages()
    {
        return Observable.Create<IMessage>(o=>
        {
                return _scheduler.Schedule(recurse=>
                {
                    try
                    {           
                        var messages = GetMessages();
                        foreach (var msg in messages)
                        {
                            o.OnNext(msg);
                        }   
                        recurse();
                    }
                    catch (Exception ex)
                    {
                        o.OnError(ex);
                    }                   
                });
        });
    }

    private IEnumerable<IMessage> GetMessages()
    {
         //Do some work here that gets messages from a queue, 
         // file system, database or other system that cant push 
         // new data at us.
         // 
         //This may return an empty result when no new data is found.
    }
}

The reason I really don't like Subjects, is that is usually a case of the developer not really having a clear design on the problem. Hack in a subject, poke it here there and everywhere, and then let the poor support dev guess at WTF was going on. When you use the Create/Generate etc methods you are localizing the effects on the sequence. You can see it all in one method and you know no-one else is throwing in a nasty side effect. If I see a subject fields I now have to go looking for all the places in a class it is being used. If some MFer exposes one publicly, then all bets are off, who knows how this sequence is being used! Async/Concurrency/Rx is hard. You don't need to make it harder by allowing side effects and causality programming to spin your head even more.

我真的不喜欢 Subjects 的原因是,这通常是开发人员对问题没有真正明确设计的情况。Hack 一个主题,到处戳它,然后让可怜的支持开发人员猜测 WTF 正在发生。当您使用 Create/Generate 等方法时,您正在本地化对序列的影响。您可以在一种方法中看到这一切,并且您知道没有其他人会产生令人讨厌的副作用。如果我看到一个主题字段,我现在必须去寻找它正在使用的类中的所有位置。如果某个 MFer 公开暴露一个,那么所有的赌注都没有了,谁知道这个序列是如何被使用的!异步/并发/Rx 很难。你不需要让副作用和因果关系编程更让你头晕目眩。

回答by casperOne

The quoted block text pretty much explains why you shouldn't be using Subject<T>, but to put it simpler, you are combining the functions of observer and observable, while injecting some sort of state in between (whether you're encapsulating or extending).

引用的块文本几乎解释了为什么你不应该使用Subject<T>,但简单地说,你结合了观察者和观察者的功能,同时在两者之间注入某种状态(无论是封装还是扩展)。

This is where you run into trouble; these responsibilities should be separate and distinct from each other.

这是你遇到麻烦的地方;这些责任应该是分开的,彼此不同。

That said, in your specificcase, I'd recommend that you break your concerns into smaller parts.

也就是说,在您的具体情况下,我建议您将您的担忧分解为更小的部分。

First, you have your thread that is hot, and always monitoring the hardware for signals to raise notifications for. How would you do this normally? Events. So let's start with that.

首先,您的线程很热,并且始终监视硬件以获取要为其发出通知的信号。你通常会怎么做? 事件。所以让我们从那个开始。

Let's define the EventArgsthat your event will fire.

让我们定义EventArgs您的事件将触发的。

// The event args that has the information.
public class BaseFrameEventArgs : EventArgs
{
    public BaseFrameEventArgs(IBaseFrame baseFrame)
    {
        // Validate parameters.
        if (baseFrame == null) throw new ArgumentNullException("IBaseFrame");

        // Set values.
        BaseFrame = baseFrame;
    }

    // Poor man's immutability.
    public IBaseFrame BaseFrame { get; private set; }
}

Now, the class that will fire the event. Note, this could be a static class (since you always have a thread running monitoring the hardware buffer), or something you call on-demand which subscribes to that. You'll have to modify this as appropriate.

现在,将触发事件的类。请注意,这可能是一个静态类(因为您总是有一个线程在运行监视硬件缓冲区),或者您按需调用订阅. 您必须适当地修改它。

public class BaseFrameMonitor
{
    // You want to make this access thread safe
    public event EventHandler<BaseFrameEventArgs> HardwareEvent;

    public BaseFrameMonitor()
    {
        // Create/subscribe to your thread that
        // drains hardware signals.
    }
}

So now you have a class that exposes an event. Observables work well with events. So much so that there's first-class support for converting streams of events (think of an event stream as multiple firings of an event) into IObservable<T>implementations if you follow the standard event pattern, through the static FromEventPatternmethodon the Observableclass.

所以现在你有一个公开事件的类。Observable 可以很好地处理事件。IObservable<T>如果您遵循标准事件模式,通过class上的静态FromEventPattern方法,那么对于将事件流(将事件流视为事件的多次触发)转换为实现的Observable一流支持

With the source of your events, and the FromEventPatternmethod, we can create an IObservable<EventPattern<BaseFrameEventArgs>>easily (the EventPattern<TEventArgs>classembodies what you'd see in a .NET event, notably, an instance derived from EventArgsand an object representing the sender), like so:

有了您的事件源和FromEventPattern方法,我们可以IObservable<EventPattern<BaseFrameEventArgs>>轻松创建一个(EventPattern<TEventArgs>该类体现了您在 .NET 事件中看到的内容,特别是派生自的实例EventArgs和代表发件人的对象),如下所示:

// The event source.
// Or you might not need this if your class is static and exposes
// the event as a static event.
var source = new BaseFrameMonitor();

// Create the observable.  It's going to be hot
// as the events are hot.
IObservable<EventPattern<BaseFrameEventArgs>> observable = Observable.
    FromEventPattern<BaseFrameEventArgs>(
        h => source.HardwareEvent += h,
        h => source.HardwareEvent -= h);

Of course, you want an IObservable<IBaseFrame>, but that's easy, using the Selectextension methodon the Observableclass to create a projection (just like you would in LINQ, and we can wrap all of this up in an easy-to-use method):

当然,您需要一个IObservable<IBaseFrame>,但这很容易,使用类上的Select扩展方法Observable来创建投影(就像在 LINQ 中一样,我们可以将所有这些都包装在一个易于使用的方法中):

public IObservable<IBaseFrame> CreateHardwareObservable()
{
    // The event source.
    // Or you might not need this if your class is static and exposes
    // the event as a static event.
    var source = new BaseFrameMonitor();

    // Create the observable.  It's going to be hot
    // as the events are hot.
    IObservable<EventPattern<BaseFrameEventArgs>> observable = Observable.
        FromEventPattern<BaseFrameEventArgs>(
            h => source.HardwareEvent += h,
            h => source.HardwareEvent -= h);

    // Return the observable, but projected.
    return observable.Select(i => i.EventArgs.BaseFrame);
}

回答by Wilka

In general you should avoid using Subject, however for the thing you are doing here I think they work quite well. I asked a similar questionwhen I came across the "avoid subjects" message in Rx tutorials.

一般来说,您应该避免使用Subject,但是对于您在这里所做的事情,我认为它们工作得很好。当我在 Rx 教程中遇到“避免主题”消息时,我问了一个类似的问题

To quote Dave Sexton(of Rxx)

引用Dave Sexton(Rxx 的)

"Subjects are the stateful components of Rx. They are useful for when you need to create an event-like observable as a field or a local variable."

“主题是 Rx 的有状态组件。当您需要创建类似事件的 observable 作为字段或局部变量时,它们非常有用。”

I tend to use them as the entry point into Rx. So if I have some code that needs to say 'something happened' (like you have), I would use a Subjectand call OnNext. Then expose that as an IObservablefor others to subscribe to (you can use AsObservable()on your subject to make sure nobody can cast to a Subject and mess things up).

我倾向于使用它们作为 Rx 的入口点。因此,如果我有一些代码需要说明“发生了一些事情”(就像您一样),我会使用 aSubject并调用OnNext. 然后将其公开IObservable为其他人订阅(您可以AsObservable()在您的主题上使用以确保没有人可以将其转换为主题并将事情搞砸)。

You could also achieve this with a .NET event and use FromEventPattern, but if I'm only going to turn the event into an IObservableanyway, I don't see the benefit of having an event instead of a Subject(which might mean I'm missing something here)

你也可以使用.NET事件和使用做到这一点FromEventPattern,但如果我只打算把事件成IObservable反正,我没有看到有一个事件,而不是一个的好处Subject(这可能意味着我失踪这里有东西)

However, what you should avoid quite strongly is subscribing to an IObservablewith a Subject, i.e. don't pass a Subjectinto the IObservable.Subscribemethod.

不过,你应该避免非常强烈被预订至IObservable一个Subject,即没有传递SubjectIObservable.Subscribe方法。

回答by Niall Connaughton

Often when you're managing a Subject, you're actually just reimplementing features already in Rx, and probably in not as robust, simple and extensible a way.

通常,当您管理一个 Subject 时,您实际上只是重新实现了 Rx 中已有的功能,并且可能没有那么健壮、简单和可扩展的方式。

When you're trying to adapt some asynchronous data flow into Rx (or create an asynchronous data flow from one that's not currently asynchronous), the most common cases are usually:

当您尝试将一些异步数据流适配到 Rx 中(或从当前非异步的数据流创建异步数据流)时,最常见的情况通常是:

  • The source of data is an event: As Lee says, this is the simplest case: use FromEvent and head to the pub.

  • The source of data is from a synchronous operation and you want polled updates, (eg a webservice or database call): In this case you could use Lee's suggested approach, or for simple cases, you could use something like Observable.Interval.Select(_ => <db fetch>). You may want to use DistinctUntilChanged() to prevent publishing updates when nothing has changed in the source data.

  • The source of data is some kind of asynchronous api that calls your callback: In this case, use Observable.Create to hook up your callback to call OnNext/OnError/OnComplete on the observer.

  • The source of data is a call that blocks until new data is available(eg some synchronous socket read operations): In this case, you can use Observable.Create to wrap the imperative code that reads from the socket and publishes to the Observer.OnNext when data is read. This may be similar to what you're doing with the Subject.

  • 数据源是一个事件:正如 Lee 所说,这是最简单的情况:使用 FromEvent 并前往酒吧。

  • 数据源来自同步操作,您需要轮询更新(例如 Web 服务或数据库调用):在这种情况下,您可以使用 Lee 建议的方法,或者对于简单的情况,您可以使用类似Observable.Interval.Select(_ => <db fetch>). 您可能希望使用 DistinctUntilChanged() 来防止在源数据中没有任何更改时发布更新。

  • 数据源是某种调用您的回调的异步 api:在这种情况下,使用 Observable.Create 连接您的回调以在观察者上调用 OnNext/OnError/OnComplete。

  • 数据源是一个调用,它会阻塞直到新数据可用(例如一些同步套接字读取操作):在这种情况下,您可以使用 Observable.Create 来包装从套接字读取并发布到 Observer.OnNext 的命令式代码读取数据时。这可能类似于您对主题所做的事情。

Using Observable.Create vs creating a class that manages a Subject is fairly equivalent to using the yield keyword vs creating a whole class that implements IEnumerator. Of course, you can write an IEnumerator to be as clean and as good a citizen as the yield code, but which one is better encapsulated and feels a neater design? The same is true for Observable.Create vs managing Subjects.

使用 Observable.Create 与创建管理主题的类与使用 yield 关键字与创建实现 IEnumerator 的整个类相当。当然,你可以写一个 IEnumerator 来像 yield 代码一样干净整洁,但是哪个更好封装,感觉更简洁?Observable.Create 与管理主题也是如此。

Observable.Create gives you a clean pattern for lazy setup and clean teardown. How do you achieve this with a class wrapping a Subject? You need some kind of Start method... how do you know when to call it? Or do you just always start it, even when no one is listening? And when you're done, how do you get it to stop reading from the socket/polling the database, etc? You have to have some kind of Stop method, and you have to still have access not just to the IObservable you're subscribed to, but the class that created the Subject in the first place.

Observable.Create 为您提供了一个干净的模式,用于懒惰的设置和干净的拆卸。你如何通过一个包含主题的类来实现这一点?你需要某种 Start 方法……你怎么知道什么时候调用它?或者你总是开始它,即使没有人在听?完成后,如何让它停止从套接字读取/轮询数据库等?您必须拥有某种 Stop 方法,并且您仍然不仅必须能够访问您订阅的 IObservable,还必须能够访问首先创建主题的类。

With Observable.Create, it's all wrapped up in one place. The body of Observable.Create is not run until someone subscribes, so if no one subscribes, you never use your resource. And Observable.Create returns a Disposable that can cleanly shutdown your resource/callbacks, etc - this is called when the Observer unsubscribes. The lifetimes of the resources you're using to generate the Observable are neatly tied to the lifetime of the Observable itself.

使用 Observable.Create,一切都集中在一个地方。Observable.Create 的主体在有人订阅之前不会运行,因此如果没有人订阅,您将永远不会使用您的资源。并且 Observable.Create 返回一个 Disposable 可以干净地关闭您的资源/回调等 - 这在观察者取消订阅时被调用。您用来生成 Observable 的资源的生命周期与 Observable 本身的生命周期紧密相关。

回答by Felix Keil

It is bad to generalize that Subjects are not good to use for a public interface. While it is certainly true, that this is not the way a reactive programming approach should look like, it is definitively a good improvement/refactoring option for your classic code.

概括地说 Subjects 不适用于公共接口是不好的。虽然确实如此,但这不是反应式编程方法应有的样子,但对于您的经典代码来说,它绝对是一个很好的改进/重构选项。

If you have a normal property with an public set accessor and you want to notify about changes, there speaks nothing against replacing it with a BehaviorSubject. INPC or additional other events are just not that clean and it personally wears me off. For this purpose you can and should use BehaviorSubjects as public properties instead of normal properties and ditch INPC or other events.

如果您有一个带有公共集合访问器的普通属性,并且您想通知更改,那么用 BehaviorSubject 替换它是没有问题的。INPC 或其他其他事件并不那么干净,它让我感到厌烦。为此,您可以并且应该使用 BehaviorSubjects 作为公共属性而不是普通属性,并丢弃 INPC 或其他事件。

Additionally the Subject-interface makes the users of your interface more aware about the functionality of your properties and are more likely to subscribe instead of just getting the value.

此外,主题界面使界面的用户更加了解您的属性的功能,并且更有可能订阅而不是仅仅获取价值。

It is the best to use if you want others to listen/subscribe to changes of a property.

如果您希望其他人收听/订阅属性的更改,最好使用它。