1.概要
.NET Rx(Reactive Extensions)它提供了一种强大的数据流操作和组合方式,以便你可以更简单地处理异步数据流,如用户界面事件、异步请求、消息等。在许多情况下,这些数据流可能会很难用常规的迭代技术来表达。
Rx库提供了一种使用可观察序列进行异步编程的模型,它基于观察者设计模式并结合了迭代器模式和功能编程的概念。Rx使开发人员可以对这些数据流进行各种操作,如过滤、选择、转换、合并等。
以下是一些主要的特点:
- 它将所有数据源视为可观察数据流(或被称为可观察对象)。
- 它提供了丰富的API允许开发者对这些可观察对象进行转换、过滤、聚合、连接等操作。
- 它提供了一种统一方式处理同步和异步数据源。
- 它有助于管理和协调异步操作和事件,降低了代码复杂性。
2.详细内容
安装
代码语言:javascript复制Install-Package System.Reactive
使用
(1)基础使用
代码语言:javascript复制using System;
using System.Reactive.Linq;
class Program
{
static void Main()
{
// 创建一个Observable序列
var observable = Observable.Range(1, 5);
// 订阅这个Observable序列,并指定对每个元素和结束时应执行的操作
var subscription = observable.Subscribe(
value => Console.WriteLine($"OnNext: {value}"), // 当得到新值时执行的操作
ex => Console.WriteLine($"OnError: {ex.Message}"), // 当发生错误时执行的操作
() => Console.WriteLine("OnCompleted") // 当序列完成时执行的操作
);
Console.ReadKey();
// 取消订阅
subscription.Dispose();
}
}
(2)处理实时数据
代码语言:javascript复制 using System;
using System.Reactive.Linq;
using System.Reactive.Subjects;
class Program
{
static void Main()
{
IObservable<StockPrice> stockPrices = GetRealTimeStockPrices();
IDisposable subscription = stockPrices
.Where(price => price.Value > 100)
.Subscribe(
price => Console.WriteLine($"High stock price detected: {price.Symbol} at {price.Value}"),
ex => Console.WriteLine($"OnError: {ex.Message}"),
() => Console.WriteLine("OnCompleted"));
Console.Read( );
// 在某个时刻,你可能想取消订阅
subscription.Dispose();
}
class StockPrice
{
public string Symbol { get; set; }
public double Value { get; set; }
}
static IObservable<StockPrice> GetRealTimeStockPrices()
{
// 创建一个 Subject
var subject = new Subject<StockPrice>();
// 在后台线程上生成模拟数据
Task.Run(() =>
{
var random = new Random();
while (true)
{
var price = new StockPrice
{
Symbol = "MSFT",
Value = 90 (random.NextDouble() * 20)
};
// 向 Subject 发送新的价格
subject.OnNext(price);
// 暂停一段时间以模拟实时数据
Thread.Sleep(1000);
}
});
// 返回 Observable
return subject.AsObservable();
}
}
Rx还提供了大量的操作符,比如:
- Filtering: 过滤序列中的元素。比如:
Where
,Distinct
,Skip
,Take
等。 - Transforming: 转换序列中的元素。比如:
Select
,SelectMany
,Scan
,Buffer
等。 - Combining: 组合多个序列。比如:
Concat
,Merge
,Zip
,CombineLatest
等。 - Error Handling: 处理错误。比如:
Catch
,Retry
,OnErrorResumeNext
等。 - Utility: 其他功能。比如:
Using
,Delay
,TimeInterval
,Timeout
答等。
这些操作符可以让你更加方便地处理和操作数据流,满足不同场景需要。
Ref
https://learn.microsoft.com/en-us/previous-versions/dotnet/reactive-extensions/hh242981(v=vs.103)
https://learn.microsoft.com/en-us/dotnet/api/system.iobservable-1?view=net-7.0&devlangs=csharp&f1url=?appId=Dev16IDEF1&l=EN-US&k=k(System.IObservable%601);k(DevLang-csharp)&rd=true