C#异步编程之async/await详解
目录
- 概述
- C#异步编程用法
- async/await和Task简介
- async
- await
- Task
- 其他
- 实现原理剖析
- 实现原理示例
概述
异步这个概念在不同语境下有不同的解释,比如在一个单核CPU里开启两个线程执行两个函数,通常认为这种调用是异步的,但对于CPU来说它是单核不可能同时运行两个函编程客栈数,不过是由系统调度在不同的时间分片中执行。一般来说,如果两个工作能同时进行,就认为是异步的。在编程中,它通常代表函数的调用可以在不执行完的情况下返回,必要时在完成时回调。
有一个概念常常被混淆,多线程和异步。很多人认为异步就是多线程的,但是多线程只是实现异步的其中一种方式,除此之外还有系统中断,定时器,甚至可以自己写一个状态机实现异步(C# 的异步实现类似状态机)。
不同的编程语言有不同异步编程方法,在C#语言中,常常使用async/await等关键字,和Task等类来实现异步编程。
C#异步编程用法
class Program { static void Main(string[] args) { var task = IntTask(); Console.WriteLine("等待中..."); Console.WriteLine($"算完了? 让我康康! result = {task.Result}"); } static async Task<int> IntTask() { Console.WriteLine("等3秒吧"); await Task.Delay(3000); return 1; } }
Main函数异步调用IntTask,打印"等三秒吧",随后返回到Main函数打印“等待中”,在task.Result取值时阻塞,三秒后IntTask返回(此时Task.Result被赋值)打印“result = 1”。看一下用法:
- async: 异步函数使用async关键字修饰
- await: 等待异步函数返回
- Task:异步函数有返回值,且返回值为int类型
上述只是一个极简的用法,忽略了大量的细节,可以建立一个初步的印象。
async/await和Task简介
async
用async修饰一个方法,表明这个方法可以异步执行,其返回值必须是void/Task/Task<T>(T是返回值类型)其中一个,方法内的语句至少包含一个await关键字,否则会被同步的方式执行。
await
await只能修饰(返回值是)Task类型变量,此时会返回Task.Result或void而不是Task本身,在上述示例中,Main没有被async修饰,不能使用await,其返回值就是Task<int>, 而IntTask调用Task.Delay就是直接返回void。await也只能在被async修饰的函数的语句中使用。
Task
源于基于任务的异步模式(Task-based Asynchronous Pattern,TAP),被作为异步函数的返回值。异步函数的返回值有三种:
- void:"fire and forget"(触发并忘记)不需要知道状态(是否完成),比如抛出异常、打印日志时可以使用
- Task:需要知道是否完成(或失败)等状态,但是不需要返回值
- Task<T>:在Task的基础上还想要返回值
其他
异步函数不能使用ref/out修饰参数
实现原理剖析
如果使用反汇编等手段,可以看到上述示例代码的编译:
在返回1之前,好像有什么“奇怪的东西”被调用,编译器又背着开发者偷偷干了什么呢?
实现原理示例
在微软的开发博客里有一个叫谢尔盖·杰普利亚科夫(Sergey Tepliakov)的毛子曾提到这部分,来看一下他的示例:
源码
class StockPrices { private Dictionary<string, decimal> _stockPrices; public async Task<decimal> GetStockPriceForAsync(string companyId) { await InitializeMapIfNeededAsync(); _stockPrices.TryGetValue(companyId, out var result); return result; } private async Task InitialiphpzeMapIfNeededAsync() { if (_stockPrices != null) return; await Task.Delay(42); // Getting the stock prices from the external source and cache in memory. _stockPrices = new Dictionary<string, decimal> { { "MSFT", 42 } }; } }
这是他的源代码,这个类叫做StockPrices(股票价格),其核心业务是根据公司ID查询股票价格GetStockPriceForAsync,这是一个异步调用,首先它先异步调用InitializeMapIfNeededAsync对数据库进行初始化,初始化完成尝试从数据库中获取该公司的股票价格返回。
上述提到编译器偷偷自己生成了代码,如果手动实现大概是怎样的呢?来看谢尔盖给出的解:
手动实现
class GetStockPriceForAsync_StateMAChine { enum State { Start, Step1, } private readonly StockPrices @this; private readonly string _companyId; private readonly TaskCompletionSource<decimal> _tcs; private Task _initializeMapIfNeededTask; private State _state = State.Start; public GetStockPriceForAsync_StateMachine(StockPrices @this, string companyId) { this.@this = @this; _companyId = companyId; } public void Start() { try { if (_state == State.Start) { // The code from the start of the method to the first 'await'. if (string.IsNullOrEmpty(_companyId)) throw new ArgumentNullException(); _initializeMapIfNeededTask = @this.InitializeMapIfNeeded(); // Update state and schedule continuation _state = State.Step1; _initializeMapIfNeededTask.ContinueWith(_ => Start()); } else if (_state == State.Step1) { // Need to check the error and the cancel case first if (_initializeMapIfNeededTask.Status == TaskStatus.Canceled) _tcs.SetCanceled(); else if (_initializeMapIfNeededTask.Status == TaskStatus.Faulted) _tcs.SetException(_initializeMapIfNeededTask.Exception.InnerException); else { // The code between first await and the rest of the method @this._store.TryGetValue(_companyId, out var result); _tcs.SetResult(result); } } } catch (Exception e) { _tcs.SetException(e); } } public Task<decimal> Task => _tcs.Task; } public Task<decimal> GetStockPriceForAsync(string companyId) { var stateMachine = new GetStockPriceForAsync_StateMachine(this, companyId); stateMachine.Start(); return stateMachine.Task; }
从类名GetStockPriceForAsync_StateMachine可以看到,他为这个异步调用生成了一个状态机来实现异步,先来看下成员变量:
- StockPrices: 原来那个“股票价格”类的引用
- _companyId: 调用方法时的参数公司ID
- _tcs:TaskCompletionSource 创建并完成该任务的来源。
- _initializeMapIfNeededTask:调用初始化数据的异步任务
- _state:状态枚举
- Task:直接就是_tcs.Task,即该任务创建并完成的来源
现在看来这段代码的逻辑就比较清楚了,在调用异步查询股票的接口时,创建了一个状态机并调用状态机的Start函数,第一次进入start函数时状态机的状态是Start状态,它给_initializeMapIfNeededTask赋值,把状态机状态流转到Step1,并让_initializeMapIfNeededTask执行结束末尾再次调用Start函数(ContinueWith)。
_initializeMapIfNeededTask任务在等待了42毫秒后(Task.Delay(42)),末尾时再次调用了Start函数,此时状态为Step1。首先检查了Task状态,符合要求调用_tcs.SetResult(其实是给Task的Result赋值),此时异步任务完成。
TaskCompletionSource
看官方文档给的定义:
表示未绑定到委托的 Task<TResult> 的制造者方,并通过 Task 属性提供对使用者方的访问
简单的示例:
static void Main() { TaskCompletionSource<int> tcs1 = new TaskCompletionSource<int>(); Task<int> t1 = tcs1.Task; // Start a background task that will complete tcs1.Task Task.Factory.StartNew(() => { Thread.Sleep(1000); tcs1.SetResult(15); }); }
看的出来这个类就是对Task的包装,方便创建分发给使用者的任务。其核心就是包装Task并方便外面设置其属性和状态
Task.ContinueWith
创建一个在目标 Task 完成时异步执行的延续任务
可以传入一个委托,在Task完成的末尾调用。这是一个典型的续体传递风格(continuation-pass style)。
续体传递风格
续体传递风格(continuation-pass style, CPS),来看维基百科的描述:
A function written in continuation-passing style takes an extra argument: an explicit "continuation"; i.e., a function of one argument. When the CPS function has computed its result value, it "returns" it by calling the continuation function with this vhttp://www.devze.comalue as the argument. That means that when invoking a CPS function, the calling function is required to supply a procedure to be invoked with the subroutine's "return" value. Expressing code in this form makes a number of things explicit which are implicit in direct style. These include: procedure returns, which become apparent as calls to a continuation; intermediate values, which are all given names; order of argument evaLuation, which is made explicit; and tail calls, which simply call a procedure with the same continuation, unmodified, that was passed to the caller
大概的意思是,这种风格的函数比起普通的有一个额外的函数指针参数,调用结束(即将return)调用或者函数参数(替代直接return到调用者Caller)。还有一些其他细节,就不多说了,感兴趣自行翻译查看。
来看一个极简的例子:
int a = b + c + d;
这是一个链式运算,是有顺序的,在C++中,上述运算其实是:
int a = (b + c) + d; 先计算tmp = b + c(tmp是寄存器上一个临时的值,也称将亡值),然后计算 int a = tmp + c
使用续体传递来模拟这一过程:
class Program { public class Result<T> { public T V; } static void Main(string[] args) { int a = 1; int b = 2; int c = 3; int d = 4; Result<int> ar = new() { V = a}; Calc3(ar, b, c, d, Calc2); Console.WriteLine($"a = {ar.V}"); } static void Calc3(Result<int> ar, int b, int c, int d, Action<Result<int>, int, int> continues) { int tmp = b + c; continues(ar, tmp, d); } static void Calc2(Result<int> ar, int tmp, int d) { ar.V = tmp + d; } }
上述代码应该很清楚了,稍微看下应该能看明白。
C#编译器的实现
struct _GetStockPriceForAsync_d__1 : IAsyncStateMachine { public StockPrices __this; public string companyId; public AsyncTaskMethodBuilder<decimal> __builder; public int __state; private TaskAwaiter __task1Awaiter; public void MoveNext() { decimal result; try { TaskAwaiter awaiter; if (__state != 0) { // State 1 of the generated state machine: if (string.IsNullOrEmpty(companyId)) throw new ArgumentNullException(); awaiter = __this.InitializeLocalStoreIfNeededAsync().GetAwaiter(); // Hot path optimization: if the task is completed, // the state machine automatically moves to the next step if (!awaiter.IsCompleted) { __state = 0; __task1Awaiter = awaiter; // The following call will eventually cause boxing of the state machine. __builder.AwaitUnsafeOnComplet开发者_JAVAed(ref awaiter, ref this); return; } } else { awaiter = __task1Awaiter; __task1Awaiter = default(TaskAwaiter); __state = -1; } // GetResult returns void, but it'll throw if the awaited task failed. // This exception is catched later and changes the resulting task. awaiter.GetResult(); __this._stocks.TryGetValue(companyId, out result); } catch (Exception exception) { // Final state: failure __state = -2; __builder.SetException(exception); return; } // Final state: success __state = -2; __builder.SetResult(result); } void IAsyncStateMachine.SetStateMachine(IAsyncStateMachine stateMachine) { __builder.SetStateMachine(stateMachine); } } [AsyncStateMachine(typeof(_GetStockPriceForAsync_d__1))] public Task<decimal> GetStockPriceFor(string companyId) { _GetStockPriceForAsync_d__1 _GetStockPriceFor_d__; _GetStockPriceFor_d__.__this = this; _GetStockPriceFor_d__.companyId = companyId; _GetStockPriceFor_d__.__builder = AsyncTaskMethodBuilder<decimal>.Create(); _GetStockPriceFor_d__.__state = -1; var __t__builder = _GetStockPriceFor_d__.__builder; __t__builder.Start<_GetStockPriceForAsync_d__1>(ref _GetStockPriceFor_d__); return _GetStockPriceFor_d__.__builder.Task; }
比较一下C#编译器生成的状态机:
- __this:St编程ockPrices“股票价格”类的引用
- companyId:公司ID参数
- __builder:AsyncTaskMethodBuilder类型的表示返回任务的异步方法生成器
- __state:状态索引
- __task1Awaiter:TaskAwaiter类型,提供等待异步任务完成的对象
上述成员有一些和之前手撸的状态机不太一样,等下面会介绍,先来这一套的逻辑:
首先创建了一个_GetStockPriceForAsync_d__1状态机_GetStockPriceFor_d__并初始化赋值,随后调用了这个状态机的__builder的Start函数并把该状态机作为引用参数传入。__builder.Start函数会调到该状态机的MoveNext函数(下面会介绍),这和手撸代码状态机Start函数调用类似。MoveNext与Start函数的处理过程也类似:第一次进来__state == -1,__builder.AwaitUnsafeOnCompleted切换上下文执行InitializeLocalStoreIfNeededAsync异步任务,并指定在完成后切换到当前上下文调用该状态机的MoveNext函数,类似手撸代码的Task.ContinueWith。第二次进入时,执行到__builder.SetResult(result),异步任务基本完成。
上述描述也是忽略了一些细节,下面是调用的时序图,会更清楚些,有些不太清楚的点后面会详细介绍。
TaskAwaiter
来看下官方定义:
public readonly struct TaskAwaiter : System.Runtime.CompilerServices.ICriticalNotifyCompletion 提供等待异步任务完成的对象
结构:
可以看到,这个所谓“等待异步任务完成的对象”,主要是保证实现ICriticalNotifyCompletion的接口OnCompleted等。
AsyncTaskMethodBuilder<TAwaiter,TStateMachine>(TAwaiter, TStateMachine)
官方定义:
个人认为可以视为异步任务的“门面”,它负责启动状态机,传递一些中间状态,并在最终SetResult时表示它和其子例程的异步任务结束。其中有一个方法AwaitUnsafeOnCompleted,值得研究一下。
AsyncTaskMethodBuilder.AwaitUnsafeOnCompleted
这个方法在上述中一笔带过,被描述为类似Task.ContinueWith,确实如此,但执行过程相当复杂,在这里也只是简单介绍下过程。
AwaitUnsafeOnCompleted首先会调用GetCompletionAction,GetCompletionAction创建了一个保存了上下文 context = ExecuteContext.FastCapture()的MoveNextRunner,并返回了指向的MoveNextRunner.Run函数的委托。http://www.devze.com
接着调用参数awaiter的UnsafeOnCompleted(completionAction)函数,这里completionAction就是上述的那个委托,内部调用了成员Task.SetContinuationForAwait函数来初始化续体,SetContinuationForAwait又调用AddTaskContinuation把延续方法添加到Task中,当上述示例源码中的InitializeMapIfNeededAsync函数执行完调用Runner.Run:
[SecuritySafeCritical] internal void Run() { if (this.m_context != null) { try { // 我们并未给 s_invokeMoveNext 赋值,所以 callback == null ContextCallback callback = s_invokeMoveNext; if (callback == null) { // 将回调设置为下方的 InvokeMoveNext 方法 s_invokeMoveNext = callback = new ContextCallback(AsyncMethodBuilderCore.MoveNextRunner.InvokeMoveNext); } ExecutionContext.Run(this.m_context, callback, this.m_stateMachine, true); return; } finally { this.m_context.Dispose(); } } this.m_stateMachine.MoveNext(); } [SecurityCritical] private static void InvokeMoveNext(object stateMachine) { ((IAsyncStateMachine) stateMachine).MoveNext(); }
((IAsyncStateMachine) stateMachine).MoveNext() 重新调用了状态机的MoveNext()
以上就是C#异步编程之async/await详解的详细内容,更多关于C#异步编程async await的资料请关注我们其它相关文章!
精彩评论