1. 引言
最近为了解决ABP集成CAP时无法通过拦截器启用工作单元的问题,从小伙伴那里学了一招。借助DiagnossticSource
,可以最小改动完成需求。关于DiagnosticSource晓东大佬18年在文章 在 .NET Core 中使用 Diagnostics (Diagnostic Source) 记录跟踪信息就有介绍,文章开头就说明了Diagnostics 一直是一个被大多数开发者忽视的东西。是的,我也忽略了,这个好东西,有必要学习一下,下面就和大家简单聊一聊System.Diagnostics.DiagnosticSource在.NET上的应用。
2. System.Diagnostics.DiagnosticSource
Diagnostics
位于System
命名空间下,由此可见Diagnostics
在.NET 运行时中的地位不可小觑。其中System.Diagnostics命名空间下又包含不同类库,提供了允许与系统进程,事件日志和性能计数器进行交互的类。如下图所示:
其中System.Diagnostics.DiagnosticSource模块,它允许对代码进行检测,以在生产时记录丰富的数据负载(可以传递不可序列化的数据类型),以便在进程内进行消耗。消费者可以在运行时动态发现数据源并订阅感兴趣的数据源。
在展开之前,有必要先梳理下涉及的以下核心概念:
- IObservable:可观测对象
- IObserver:观察者
- DiagnosticSource :诊断来源
- DiagnosticListener:诊断监听器
- Activity:活动
3. 观察者模式(IObservable & IObserver)
IObservable
和 IObserver
位于System
命名空间下,是.NET中对观察者模式的抽象。
观察者设计模式使观察者能够从可观察对象订阅并接收通知。 它适用于需要基于推送通知的任何方案。 此模式定义可观察对象,以及零个、一个或多个观察者。 观察者订阅可观察对象,并且每当预定义的条件、事件或状态发生更改时,该可观察对象会通过调用其方法之一来自动通知所有观察者。 在此方法调用中,该可观察对象还可向观察者提供当前状态信息。 在 .NET Framework 中,通过实现泛型 System.IObservable 和 System.IObserver 接口来应用观察者设计模式。 泛型类型参数表示提供通知信息的类型。 泛型类型参数表示提供通知信息的类型。
第一次学习观察者模式,应该是大学课本中基于事件烧水的例子,咱们就基于此实现个简单的Demo吧。首先执行dotnet new web -n Dotnet.Diagnostic.Demo
创建示例项目。
3.1. 定义可观察对象(实现IObservable接口)
对于烧水的示例,主要关注水温的变化,因此先定义Temperature
来表示温度变化:
public class Temperature
{
public Temperature(decimal temperature, DateTime date)
{
Degree = temperature;
Date = date;
}
public decimal Degree { get; }
public DateTime Date { get; }
}
接下来通过实现IObservable<T>
接口来定义可观察对象。
public interface IObservable<out T>
{
IDisposable Subscribe(IObserver<T> observer);
}
从接口申明来看,只定义了一个Subscribe
方法,从观察者模式讲,观察者应该既能订阅又能取消订阅消息。为什么没有定义一个UnSubscribe
方法呢?其实这里方法申明已经说明,期望通过返回IDisposable
对象的Dispose
方法来达到这个目的。
/// <summary>
/// 热水壶
/// </summary>
public class Kettle : IObservable<Temperature>
{
private List<IObserver<Temperature>> observers;
private decimal temperature = 0;
public Kettle()
{
observers = new List<IObserver<Temperature>>();
}
public decimal Temperature
{
get => temperature;
private set
{
temperature = value;
observers.ForEach(observer => observer.OnNext(new Temperature(temperature, DateTime.Now)));
if (temperature == 100)
observers.ForEach(observer => observer.OnCompleted());
}
}
public IDisposable Subscribe(IObserver<Temperature> observer)
{
if (!observers.Contains(observer))
{
Console.WriteLine("Subscribed!");
observers.Add(observer);
}
//使用UnSubscriber包装,返回IDisposable对象,用于观察者取消订阅
return new UnSubscriber<Temperature>(observers, observer);
}
/// <summary>
/// 烧水方法
/// </summary>
public async Task StartBoilWaterAsync()
{
var random = new Random(DateTime.Now.Millisecond);
while (Temperature < 100)
{
Temperature = 10;
await Task.Delay(random.Next(5000));
}
}
}
//定义泛型取消订阅对象,用于取消订阅
internal class UnSubscriber<T> : IDisposable
{
private List<IObserver<T>> _observers;
private IObserver<T> _observer;
internal UnSubscriber(List<IObserver<T>> observers, IObserver<T> observer)
{
this._observers = observers;
this._observer = observer;
}
public void Dispose()
{
if (_observers.Contains(_observer))
{
Console.WriteLine("Unsubscribed!");
_observers.Remove(_observer);
}
}
}
以上代码中List<IObserver>存在线程安全问题,因为简单Demo,就不予优化了。
3.2. 定义观察者(实现IObserver接口)
比如定义一个报警器,实时播报温度。
代码语言:javascript复制public class Alter : IObserver<Temperature>
{
public void OnCompleted()
{
Console.WriteLine("du du du !!!");
}
public void OnError(Exception error)
{
//Nothing to do
}
public void OnNext(Temperature value)
{
Console.WriteLine($"{value.Date.ToString()}: Current temperature is {value.Degree}.");
}
}
添加测试代码,访问localhost:5000/subscriber
控制台输出结果如下:
endpoints.MapGet("/subscriber", async context =>
{
var kettle = new Kettle();//初始化热水壶
var subscribeRef = kettle.Subscribe(new Alter());//订阅
var boilTask = kettle.StartBoilWaterAsync();//启动开始烧水任务
var timoutTask = Task.Delay(TimeSpan.FromSeconds(15));//定义15s超时任务
//等待,如果超时任务先返回则取消订阅
var firstReturnTask = await Task.WhenAny(boilTask, timoutTask);
if (firstReturnTask == timoutTask)
subscribeRef.Dispose();
await context.Response.WriteAsync("Hello subscriber!");
});
------------------------------------------------------------------
Subscribed!
10/2/2020 4:53:20 PM: Current temperature is 10.
10/2/2020 4:53:20 PM: Current temperature is 20.
10/2/2020 4:53:21 PM: Current temperature is 30.
10/2/2020 4:53:21 PM: Current temperature is 40.
10/2/2020 4:53:24 PM: Current temperature is 50.
10/2/2020 4:53:25 PM: Current temperature is 60.
10/2/2020 4:53:26 PM: Current temperature is 70.
10/2/2020 4:53:30 PM: Current temperature is 80.
Unsubscribed!
4. DiagnosticSource & DiagnosticListener
4.1. 概念讲解
DiagnosticSource
直译就是诊断源,也就是它是诊断日志的来源入口。DiagnosticSource其是一个抽象类主要定义了以下方法:
//Provides a generic way of logging complex payloads
public abstract void Write(string name, object value);
//Verifies if the notification event is enabled.
public abstract bool IsEnabled(string name);
DiagnosticListener
直译就是诊断监听器,继承自DiagnosticSource
,同时实现了IObservable<KeyValuePair<string, object>>
接口,因此其本质是一个可观察对象。小结以下:
DiagnosticSource
作为诊断日志来源,提供接口,用于写入诊断日志。- 诊断日志的可观察数据类型为
KeyValuePair<string, object>
。 DiagnosticListener
继承自DiagnosticSource
,作为可观察对象,可由其他观察者订阅,以获取诊断日志。
DiagnosticListener
其构造函数接收一个name
参数。
private static DiagnosticSource httpLogger = new DiagnosticListener("System.Net.Http");
可以通过下面这种方式记录诊断日志:
代码语言:javascript复制if (httpLogger.IsEnabled("RequestStart"))
httpLogger.Write("RequestStart", new { Url="http://clr", Request=aRequest });
然后需要实现IObserver<KeyValuePair<string, object>>
接口,以便消费诊断数据。定义DiagnosticObserver
,进行诊断日志消费:
public class DiagnosticObserver : IObserver<KeyValuePair<string, object>>
{
public void OnCompleted()
{
//Noting to do
}
public void OnError(Exception error)
{
Console.WriteLine($"{error.Message}");
}
public void OnNext(KeyValuePair<string, object> pair)
{
// 这里消费诊断数据
Console.WriteLine($"{pair.Key}-{pair.Value}");
}
}
ASP.NET Core 项目中默认就依赖了System.Diagnostics.DiagnosticSource
Nuget包,同时在构建通用Web主机时,就注入了名为Microsoft.AspNetCore
的DiagnosticListener
。
//GenericWebHostBuilder.cs
DiagnosticListener instance = new DiagnosticListener("Microsoft.AspNetCore");
services.TryAddSingleton<DiagnosticListener>(instance);
services.TryAddSingleton<DiagnosticSource>((DiagnosticSource) instance);
因此我们可以直接通过注入DiagnosticListener
进行诊断日志的订阅:
public void Configure(IApplicationBuilder app, IWebHostEnvironment env, DiagnosticListener diagnosticListener)
{
diagnosticListener.Subscribe(new DiagnosticObserver());//订阅诊断日志
}
当然也可以直接使用DiagnosticListener.AllListeners.Subscribe(IObserver<DiagnosticListener> observer);
进行订阅,不过区别是,接收的参数类型为IObserver<DiagnosticListener>
。
运行项目输出:
代码语言:javascript复制Microsoft.AspNetCore.Hosting.HttpRequestIn.Start-Microsoft.AspNetCore.Http.DefaultHttpContext
Microsoft.AspNetCore.Hosting.BeginRequest-{ httpContext = Microsoft.AspNetCore.Http.DefaultHttpContext, timestamp = 7526300014352 }
Microsoft.AspNetCore.Routing.EndpointMatched-Microsoft.AspNetCore.Http.DefaultHttpContext
Microsoft.AspNetCore.Hosting.EndRequest-{ httpContext = Microsoft.AspNetCore.Http.DefaultHttpContext, timestamp = 7526300319214 }
Microsoft.AspNetCore.Hosting.HttpRequestIn.Stop-Microsoft.AspNetCore.Http.DefaultHttpContext
从中可以看出,ASP.NET Core Empty Web Project在一次正常的Http请求过程中分别在请求进入、请求处理、路由匹配都埋了点,除此之外还有请求异常、Action处理都有埋点。因此,根据需要,可以实现比如请求拦截、耗时统计等系列操作。
4.2. 耗时统计
基于以上知识,下面尝试完成一个简单的耗时统计。从上面的内容可知,ASP.NET Core在BeginRequest和EndRequest返回的诊断数据类型如下所示:
代码语言:javascript复制Microsoft.AspNetCore.Hosting.BeginRequest-{ httpContext = Microsoft.AspNetCore.Http.DefaultHttpContext, timestamp = 7526300014352 }
Microsoft.AspNetCore.Hosting.EndRequest-{ httpContext = Microsoft.AspNetCore.Http.DefaultHttpContext, timestamp = 7526300319214 }
因此只要拿到两个timestamp就可以直接计算耗时,修改DiagnosticObserver
的OnNext
方法如下:
private ConcurrentDictionary<string, long> startTimes = new ConcurrentDictionary<string, long>();
public void OnNext(KeyValuePair<string, object> pair)
{
//Console.WriteLine($"{pair.Key}-{pair.Value}");
//获取httpContext
var context = pair.Value.GetType().GetTypeInfo().GetDeclaredProperty("httpContext")
?.GetValue(pair.Value) as DefaultHttpContext;
//获取timestamp
var timestamp = pair.Value.GetType().GetTypeInfo().GetDeclaredProperty("timestamp")
?.GetValue(pair.Value) as long?;
switch (pair.Key)
{
case "Microsoft.AspNetCore.Hosting.BeginRequest":
Console.WriteLine($"Request {context.TraceIdentifier} Begin:{context.Request.GetUri()}");
startTimes.TryAdd(context.TraceIdentifier, timestamp.Value);//记录请求开始时间
break;
case "Microsoft.AspNetCore.Hosting.EndRequest":
startTimes.TryGetValue(context.TraceIdentifier, out long startTime);
var elapsedMs = (timestamp - startTime) / TimeSpan.TicksPerMillisecond;//计算耗时
Console.WriteLine(
$"Request {context.TraceIdentifier} End: Status Code is {context.Response.StatusCode},Elapsed {elapsedMs}ms");
startTimes.TryRemove(context.TraceIdentifier, out _);
break;
}
}
输出如下,大功告成:
代码语言:javascript复制Request 0HM37UNERKGF0:00000001 Begin:https://localhost:44330
Request 0HM37UNERKGF0:00000001 End: Status Code is 200,Elapsed 38ms
上面有通过反射去获取诊断数据属性的代码(var timestamp = pair.Value.GetType().GetTypeInfo().GetDeclaredProperty("timestamp") ?.GetValue(pair.Value) as long?;
),非常不优雅。但我们可以安装Microsoft.Extensions.DiagnosticAdapter
包来简化诊断数据的消费。安装后,添加HttpContextDiagnosticObserver
,通过添加DiagnosticName
指定监听的诊断名称,即可进行诊断数据消费。
public sealed class HttpContextDiagnosticObserver
{
private ConcurrentDictionary<string, long> startTimes = new ConcurrentDictionary<string, long>();
[DiagnosticName("Microsoft.AspNetCore.Hosting.BeginRequest")]
public void BeginRequest(HttpContext httpContext,long timestamp)
{
Console.WriteLine($"Request {httpContext.TraceIdentifier} Begin:{httpContext.Request.GetUri()}");
startTimes.TryAdd(httpContext.TraceIdentifier, timestamp);//记录请求开始时间
}
[DiagnosticName("Microsoft.AspNetCore.Hosting.EndRequest")]
public void EndRequest(HttpContext httpContext,long timestamp)
{
startTimes.TryGetValue(httpContext.TraceIdentifier, out long startTime);
var elapsedMs = (timestamp - startTime) / TimeSpan.TicksPerMillisecond;//计算耗时
Console.WriteLine(
$"Request {httpContext.TraceIdentifier} End: Status Code is {httpContext.Response.StatusCode},Elapsed {elapsedMs}ms");
startTimes.TryRemove(httpContext.TraceIdentifier, out _);
}
}
然后使用SubscribeWithAdapter
进行订阅即可。
public void Configure(IApplicationBuilder app, IWebHostEnvironment env, DiagnosticListener diagnosticListener)
{
// diagnosticListener.Subscribe(new DiagnosticObserver());
diagnosticListener.SubscribeWithAdapter(new HttpContextDiagnosticObserver());
}
到这里可能也有小伙伴说,我用ActionFilter
也可以实现,没错,但这两种方式是完全不同的,DiagnosticSource
是完全异步的。
4.3. 应用场景思考
根据DiagnosticSource的特性,可以运用于以下场景 :
1. AOP:因为Diagnostics命名事件一般是成对出现的,因此可以做些拦截操作。比如在Abp集成Cap时,若想默认启用Uow,就可以消费DotNetCore.CAP.WriteSubscriberInvokeBefore
命名事件,创建Uow,再在命名事件DotNetCore.CAP.WriteSubscriberInvokeAfter
中提交事务,并Dispose。
2. APM:SkyAPM-dotnet的实现就是通过消费诊断日志,进行链路跟踪。
3. EventBus:充分利用其发布订阅模式,可将其用于进程内事件的发布与消费。
5. Activity(活动)
5.1. Activity 概述
那Activity又是何方神圣,用于解决什么问题呢?关于Activity官方只有一句简要介绍:Represents an operation with context to be used for logging。(表示包含上下文的操作,用于日志记录。)
Activity用来存储和访问诊断上下文,并由日志系统进行消费。当应用程序开始处理操作时,例如HTTP请求或队列中的任务,它会在处理请求时创建Activity以在系统中跟踪该Activity。Activity中存储的上下文可以是HTTP请求路径,方法,用户代理或关联ID:所有重要信息都应与每个跟踪一起记录。当应用程序调用外部依赖关系以完成操作时,它可能需要传递一些上下文(例如,关联ID)以及依赖关系调用,以便能够关联来自多个服务的日志。
先来看下Activity主要以下核心属性:
- Tags(标签)
IEnumerable<KeyValuePair<string, string>> Tags { get; }
- 表示与活动一起记录的信息。标签的好例子是实例/机器名称,传入请求HTTP方法,路径,用户/用户代理等。标签不传递给子活动。 典型的标签用法包括添加一些自定义标签,并通过它们进行枚举以填充日志事件的有效负载。可通过Activity AddTag(string key, string value)
添加Tag,但不支持通过Key检索标签。 - Baggage(行李)
IEnumerable<KeyValuePair<string, string>> Baggage { get; }
- 表示要与活动一起记录并传递给其子项的信息。行李的例子包括相关ID,采样和特征标记。 Baggage被序列化并与外部依赖项请求一起传递。 典型的Baggage用法包括添加一些Baggage属性,并通过它们进行枚举以填充日志事件的有效负载。 可通过Activity AddBaggage(string key, string value)
添加Baggage。并通过string GetBaggageItem(string key)
获取指定Key的Baggage。 - OperationName(操作名称)
string OperationName { get; }
- 活动名称,必须在构造函数中指定。 - StartTimeUtc
DateTime StartTimeUtc { get; private set; }
- UTC格式的启动时间,如果不指定,则在启动时默认指定为DateTime.UtcNow
。可通过Activity SetStartTime(DateTime startTimeUtc)
指定。 - Duration
TimeSpan Duration { get; private set; }
- 如果活动已停止,则代表活动持续时间,否则为0。 - Id
string Id { get; private set; }
- 表示特定的活动标识符。过滤特定ID可确保您仅获得与操作中特定请求相关的日志记录。该Id在活动开始时生成。Id传递给外部依赖项,并被视为新的外部活动的[ParentId]。 - ParentId
string ParentId { get; private set; }
- 如果活动是根据请求反序列化的,则该活动可能具有进程中的[Parent]或外部Parent。 ParentId和Id代表日志中的父子关系,并允许您关联传出和传入请求。 - RootId
string RootId { get; private set; }
- 代表根Id - Current
static Activity Current { get; }
- 返回在异步调用之间流动的当前Activity。 - Parent
Activity Parent { get; private set; }
- 如果活动是在同一过程中从另一个活动创建的,则可以使用Partent
获得该活动。但是,如果“活动”是根活动或父项来自流程外部,则此字段可能为null。 - Start()
Activity Start()
- 启动活动:设置活动的Activity.Current和Parent,生成唯一的ID并设置StartTimeUtc(如果尚未设置)。 - Stop()
void Stop()
- 停止活动:设置活动的Activity.Current,并使用Activity SetEndTime(DateTime endTimeUtc)
或DateTime.UtcNow中提供的时间戳计算Duration。
另外DiagnosticSource
中也定义了两个相关方法:
- StartActivity
Activity StartActivity(Activity activity, object args)
- 启动给定的Activity,并将DiagnosticSource
事件消息写入OperationName.Start
格式的命名事件中。 - StopActivity
void StopActivity(Activity activity, object args)
- 停止给定的Activity,并将DiagnosticSource
事件消息写入{OperationName}.Stop
格式的命名事件中。
5.2. Activity在ASP.NET Core中的应用
要想弄懂Activity,我们还是得向源码学习,看一下HostingApplicationDiagnostics的实现。首先来看下BeginRequst
中的StartActivity
方法。
private Activity StartActivity(HttpContext httpContext, out bool hasDiagnosticListener)
{
Activity activity = new Activity("Microsoft.AspNetCore.Hosting.HttpRequestIn");
hasDiagnosticListener = false;
IHeaderDictionary headers = httpContext.Request.Headers;
StringValues stringValues1;
if (!headers.TryGetValue(HeaderNames.TraceParent, out stringValues1))
headers.TryGetValue(HeaderNames.RequestId, out stringValues1);
if (!StringValues.IsNullOrEmpty(stringValues1))
{
activity.SetParentId((string) stringValues1);
StringValues stringValues2;
if (headers.TryGetValue(HeaderNames.TraceState, out stringValues2))
activity.TraceStateString = (string) stringValues2;
string[] commaSeparatedValues = headers.GetCommaSeparatedValues(HeaderNames.CorrelationContext);
if (commaSeparatedValues.Length != 0)
{
foreach (string str in commaSeparatedValues)
{
NameValueHeaderValue parsedValue;
if (NameValueHeaderValue.TryParse((StringSegment) str, out parsedValue))
activity.AddBaggage(parsedValue.Name.ToString(), parsedValue.Value.ToString());
}
}
}
this._diagnosticListener.OnActivityImport(activity, (object) httpContext);
if (this._diagnosticListener.IsEnabled("Microsoft.AspNetCore.Hosting.HttpRequestIn.Start"))
{
hasDiagnosticListener = true;
this.StartActivity(activity, httpContext);
}
else
activity.Start();
return activity;
}
从中可以看出,在ASP.NET Core 开始处理请求之前:
- 首先,创建了名为
Microsoft.AspNetCore.Hosting.HttpRequestIn
的Activity,该Activity首先尝试从HTTP请求头中获取TraceParent/euqstId作为当前Activity的ParentId,这个很显然,是用来链路跟踪的。 - 其次,尝试从
CorrelationContext
中获取关联上下文信息,然后将其添加到创建的Activity的Baggage中,进行关联上下文的继续传递。 - 然后,启动Activity,然后向Name为
Microsoft.AspNetCore.Hosting.HttpRequestIn.Start
中写入诊断日志。
这里大家可能有个疑问,这个关联上下文信息CorrelationContext
又是何时添加到Http请求头中的呢?在System.Net.Http
中的DiagnosticsHandler中添加的。
因此我们应该明白了,整个关联上下文的传递机制。
紧接着再来看一看RequestEnd
中的StopActivity
方法。
private void StopActivity(Activity activity, HttpContext httpContext)
{
if (activity.Duration == TimeSpan.Zero)
activity.SetEndTime(DateTime.UtcNow);
this._diagnosticListener.Write("Microsoft.AspNetCore.Hosting.HttpRequestIn.Stop", (object) httpContext);
activity.Stop();
}
从中可以看出主要是先SetEndTime
,再写入Microsoft.AspNetCore.Hosting.HttpRequestIn.Stop
命名事件;最后调用Stop
方法停止当前Activity。
简单总结一下,借助Activity中附加的Baggage信息可以实现请求链路上上下文数据的共享。
5.3. 应用场景思考
从上面的命名事件中可以看出,其封送的数据类型是特定的,因此可以借助Activity的Tags或Baggage添加自定义的数据进行共享。
按照上面我们的耗时统计,只能统计到整个http请求的耗时,但对于我们定位问题来说还是有困难,比如,某个api即有调用redis,又操作了消息队列,同时又访问了数据库,那到底是那一段超时了呢?显然不好直接定位,借助activity,我们就可以很好的实现细粒度的链路跟踪。通过activity携带的信息,可以将一系列的操作关联起来,记录日志,再借助AMP进行可视化快速定位跟踪。
6. 参考资料
- 在 .NET Core 中使用 Diagnostics (Diagnostic Source) 记录跟踪信息
- Logging using DiagnosticSource in ASP.NET Core
- .Net Core中的诊断日志DiagnosticSource讲解
- Observer Design Pattern
- DiagnosticSource User Guide
- Activity User Guide
- DiagnosticSourcery 101 - Mark Rendle
- Improvements in .NET Core 3.0 for troubleshooting and monitoring distributed apps