Rx.NET基础使用

2023-09-18 17:25:02 浏览数 (1)

1.概要

.NET Rx(Reactive Extensions)它提供了一种强大的数据流操作和组合方式,以便你可以更简单地处理异步数据流,如用户界面事件、异步请求、消息等。在许多情况下,这些数据流可能会很难用常规的迭代技术来表达。

Rx库提供了一种使用可观察序列进行异步编程的模型,它基于观察者设计模式并结合了迭代器模式和功能编程的概念。Rx使开发人员可以对这些数据流进行各种操作,如过滤、选择、转换、合并等。

以下是一些主要的特点:

  • 它将所有数据源视为可观察数据流(或被称为可观察对象)。
  • 它提供了丰富的API允许开发者对这些可观察对象进行转换、过滤、聚合、连接等操作。
  • 它提供了一种统一方式处理同步和异步数据源。
  • 它有助于管理和协调异步操作和事件,降低了代码复杂性。

2.详细内容

安装

代码语言:javascript复制
Install-Package System.Reactive

使用

(1)基础使用

代码语言:javascript复制
using System;
using System.Reactive.Linq;

class Program
{
    static void Main()
    {
        // 创建一个Observable序列
        var observable = Observable.Range(1, 5);

        // 订阅这个Observable序列,并指定对每个元素和结束时应执行的操作
        var subscription = observable.Subscribe(
            value => Console.WriteLine($"OnNext: {value}"),  // 当得到新值时执行的操作
            ex => Console.WriteLine($"OnError: {ex.Message}"),  // 当发生错误时执行的操作
            () => Console.WriteLine("OnCompleted")  // 当序列完成时执行的操作
        );

        Console.ReadKey();
        
        // 取消订阅
        subscription.Dispose();
    }
}

(2)处理实时数据

代码语言:javascript复制
    using System;
    using System.Reactive.Linq;
    using System.Reactive.Subjects;

    class Program
    {
        static void Main()
        {
            IObservable<StockPrice> stockPrices = GetRealTimeStockPrices();

            IDisposable subscription = stockPrices
                .Where(price => price.Value > 100)
                .Subscribe(
                    price => Console.WriteLine($"High stock price detected: {price.Symbol} at {price.Value}"),
                    ex => Console.WriteLine($"OnError: {ex.Message}"),
                    () => Console.WriteLine("OnCompleted"));

            Console.Read( );

            // 在某个时刻,你可能想取消订阅
            subscription.Dispose();
        }

        class StockPrice
        {
            public string Symbol { get; set; }
            public double Value { get; set; }
        }

       static  IObservable<StockPrice> GetRealTimeStockPrices()
        {
            // 创建一个 Subject
            var subject = new Subject<StockPrice>();

            // 在后台线程上生成模拟数据
            Task.Run(() =>
            {
                var random = new Random();
                while (true)
                {
                    var price = new StockPrice
                    {
                        Symbol = "MSFT",
                        Value = 90   (random.NextDouble() * 20)
                    };

                    // 向 Subject 发送新的价格
                    subject.OnNext(price);

                    // 暂停一段时间以模拟实时数据
                    Thread.Sleep(1000);
                }
            });

            // 返回 Observable
            return subject.AsObservable();
        }
    }

Rx还提供了大量的操作符,比如:

  • Filtering: 过滤序列中的元素。比如: Where, Distinct, Skip, Take 等。
  • Transforming: 转换序列中的元素。比如: Select, SelectMany, Scan, Buffer 等。
  • Combining: 组合多个序列。比如: Concat, Merge, Zip, CombineLatest 等。
  • Error Handling: 处理错误。比如: Catch, Retry, OnErrorResumeNext 等。
  • Utility: 其他功能。比如: Using, Delay, TimeInterval, Timeout 答等。

这些操作符可以让你更加方便地处理和操作数据流,满足不同场景需要。

Ref

https://learn.microsoft.com/en-us/previous-versions/dotnet/reactive-extensions/hh242981(v=vs.103)

https://learn.microsoft.com/en-us/dotnet/api/system.iobservable-1?view=net-7.0&devlangs=csharp&f1url=?appId=Dev16IDEF1&l=EN-US&k=k(System.IObservable%601);k(DevLang-csharp)&rd=true

0 人点赞