前言
.NET 中使用Task可以方便地编写异步程序,为了更好地理解Task及其调度机制,接下来模拟Task的实现,目的是搞清楚:
1、Task是什么
2、Task是如何被调度的
基本的Task模拟实现
从最基本的Task用法开始
代码语言:javascript复制Task.Run(Action action)
这个命令的作用是将action作为一项任务提交给调度器,调度器会安排空闲线程来处理。我们使用Job来模拟Task
代码语言:javascript复制public class Job
{
private readonly Action _work;
public Job(Action work) => _work = work;
public JobStatus Status { get; internal set; }
internal protected virtual void Invoke()
{
Status = JobStatus.Running;
_work();
Status = JobStatus.Completed;
}
public void Start(JobScheduler? scheduler = null)
=> (scheduler ?? JobScheduler.Current).QueueJob(this);
public static Job Run(Action work)
{
var job = new Job(work);
job.Start();
return job;
}
}
public enum JobStatus
{
Created,
Scheduled,
Running,
Completed
}
这里也定义了同Task一样的静态Run方法,使用方式也与Task类似
代码语言:javascript复制Job.Run(() => Console.WriteLine($"Job1, thread:{Thread.CurrentThread.ManagedThreadId}"));
作为对比,使用Task时的写法如下,多了await关键字,后文会讨论。
代码语言:javascript复制await Task.Run(()=>() => Console.WriteLine($"Task1, thread:{Thread.CurrentThread.ManagedThreadId}"));
代码语言:javascript复制public abstract class JobScheduler
{
public abstract void QueueJob(Job job);
public static JobScheduler Current { get; set; } = new ThreadPoolJobScheduler();
}
ThreadPoolJobScheduler实现的QueueJob如下:
代码语言:javascript复制public class ThreadPoolJobScheduler : JobScheduler
{
public override void QueueJob(Job job)
{
job.Status = JobStatus.Scheduled;
var executionContext = ExecutionContext.Capture();
ThreadPool.QueueUserWorkItem(_ => ExecutionContext.Run(executionContext!,
_ => job.Invoke(), null));
}
}
代码语言:javascript复制public class DedicatedThreadJobScheduler : JobScheduler
{
private readonly BlockingCollection<Job> _queues=new();
private readonly Thread[] _threads;
public DedicatedThreadJobScheduler(int threadCount)
{
_threads=new Thread[threadCount];
for(int index=0; index< threadCount; index )
{
_threads[index] =new Thread(Invoke);
}
Array.ForEach(_threads, thread=>thread.Start());
void Invoke(object? state){
while(true){
_queues.Take().Invoke();
}
}
}
public override void QueueJob(Job job)
{
_queues.Add(job);
}
}
代码语言:javascript复制await Task.Factory.StartNew(LongRunningMethod, TaskCreationOptions.LongRunning);
static void LongRunningMethod()
{
// Simulate a long-running operation
Console.WriteLine("Long-running task started on thread {0}.", Thread.CurrentThread.ManagedThreadId);
Thread.Sleep(10000);
Console.WriteLine("Long-running task finished on thread {0}.", Thread.CurrentThread.ManagedThreadId);
}
代码语言:javascript复制var taskA = Task.Run(() => DateTime.Now);
var taskB = taskA.ContinueWith(time => Console.WriteLine(time.Result));
await taskB;
模仿Task,我们给Job也添加ContinueWith方法。
代码语言:javascript复制public class Job
{
private readonly Action _work;
private Job? _continue;
public Job(Action work) => _work = work;
public JobStatus Status { get; internal set; }
internal protected virtual void Invoke()
{
Status = JobStatus.Running;
_work();
Status = JobStatus.Completed;
_continue?.Start();
}
public void Start(JobScheduler? scheduler = null)
=> (scheduler ?? JobScheduler.Current).QueueJob(this);
public static Job Run(Action work)
{
var job = new Job(work);
job.Start();
return job;
}
public Job ContinueWith(Action<Job> tobeContinued)
{
if (_continue == null)
{
var job = new Job(() => tobeContinued(this));
_continue = job;
}
else
{
_continue.ContinueWith(tobeContinued);
}
return this;
}
}
代码语言:javascript复制Job.Run(() =>
{
Thread.Sleep(1000);
Console.WriteLine("11");
}).ContinueWith(_ =>
{
Thread.Sleep(1000);
Console.WriteLine("12");
});
代码语言:javascript复制public TaskAwaiter GetAwaiter();
然后TaskAwaiter继承了ICriticalNotifyCompletion接口
代码语言:javascript复制public readonly struct TaskAwaiter<TResult> : System.Runtime.CompilerServices.ICriticalNotifyCompletion
照猫画虎,也为Job添加一个最简单的JobAwaiter
代码语言:javascript复制public class Job
{
...
public JobAwaiter GetAwaiter() => new(this);
}
JobAwaiter的定义如下:
代码语言:javascript复制public struct JobAwaiter : ICriticalNotifyCompletion
{
private readonly Job _job;
public readonly bool IsCompleted => _job.Status == JobStatus.Completed;
public JobAwaiter(Job job)
{
_job = job;
if (job.Status == JobStatus.Created)
{
job.Start();
}
}
public void GetResult() { }
public void OnCompleted(Action continuation)
{
_job.ContinueWith(_ => continuation());
}
public void UnsafeOnCompleted(Action continuation)
=> OnCompleted(continuation);
}
添加了await后,前面的代码也可以这样写:
代码语言:javascript复制await F1();
await F2();
static Job F1() => new Job(() =>
{
Thread.Sleep(1000);
Console.WriteLine("11");
});
static Job F2() => new Job(() =>
{
Thread.Sleep(1000);
Console.WriteLine("12");
});
总结
回顾开头的两个问题,现在可以尝试给出答案了。
1、Task是什么,Task是一种有状态的操作(Created,Scheduled,Running,Completed),是对耗时操作的抽象,就像现实中的一项任务一样,它的执行需要相对较长的时间,它也有创建(Created),安排(Scheduled),执行(Running),完成(Completed)的基本过程。任务完成当然需要拿到结果的,这里的Job比较简单,没有模拟具体的结果;
2、Task是如何被调度的,默认采用基于线程池的调度,即创建好Task后,由线程池中的空闲线程执行,具体什么时候执行、由哪个线程执行,开发者是不用关心的,在具体执行过程中,但由于.NET全局线程池的局限,对于一些特殊场景无法满足时(比如需要立即执行Task),此时可以通过TaskCreationOptions更改调度行为;
另外,await是语法糖,它背后的实现是基于GetAwaiter,由其返回ICriticalNotifyCompletion接口的实现,并对ContinueWith做了封装。
推荐阅读:
使用 C# 开发的开源 SSO 单点登录认证框架
EasyCaching:简单高效的.NET缓存包
基于.NET、Uni-App开发支持多平台的小程序商城系统 - CoreShop
遥遥领先,开源一个 .NET 构建的个人网盘
.NET中的数组在内存中如何布局?
.NET Web新人入门必学项目EarthChat