开发者

C#异步编程之async/await详解

目录
  • 概述
  • C#异步编程用法
  • async/await和Task简介
    • async
    • await
    • Task
    • 其他
  • 实现原理剖析
    • 实现原理示例

概述

异步这个概念在不同语境下有不同的解释,比如在一个单核CPU里开启两个线程执行两个函数,通常认为这种调用是异步的,但对于CPU来说它是单核不可能同时运行两个函编程客栈数,不过是由系统调度在不同的时间分片中执行。一般来说,如果两个工作能同时进行,就认为是异步的。在编程中,它通常代表函数的调用可以在不执行完的情况下返回,必要时在完成时回调。

有一个概念常常被混淆,多线程和异步。很多人认为异步就是多线程的,但是多线程只是实现异步的其中一种方式,除此之外还有系统中断,定时器,甚至可以自己写一个状态机实现异步(C# 的异步实现类似状态机)。

不同的编程语言有不同异步编程方法,在C#语言中,常常使用async/await等关键字,和Task等类来实现异步编程。

C#异步编程用法

class Program
{
    static void Main(string[] args)
    {
        var task =  IntTask();
		Console.WriteLine("等待中...");
        Console.WriteLine($"算完了? 让我康康! result = {task.Result}");
    }

    static async Task<int> IntTask()
    {
        Console.WriteLine("等3秒吧");
        await Task.Delay(3000);
        return 1;
    }
}

Main函数异步调用IntTask,打印"等三秒吧",随后返回到Main函数打印“等待中”,在task.Result取值时阻塞,三秒后IntTask返回(此时Task.Result被赋值)打印“result = 1”。看一下用法:

  • async: 异步函数使用async关键字修饰
  • await: 等待异步函数返回
  • Task:异步函数有返回值,且返回值为int类型

上述只是一个极简的用法,忽略了大量的细节,可以建立一个初步的印象。

async/await和Task简介

async

用async修饰一个方法,表明这个方法可以异步执行,其返回值必须是void/Task/Task<T>(T是返回值类型)其中一个,方法内的语句至少包含一个await关键字,否则会被同步的方式执行。

await

await只能修饰(返回值是)Task类型变量,此时会返回Task.Result或void而不是Task本身,在上述示例中,Main没有被async修饰,不能使用await,其返回值就是Task<int>, 而IntTask调用Task.Delay就是直接返回void。await也只能在被async修饰的函数的语句中使用。

Task

源于基于任务的异步模式(Task-based Asynchronous Pattern,TAP),被作为异步函数的返回值。异步函数的返回值有三种:

  • void:"fire and forget"(触发并忘记)不需要知道状态(是否完成),比如抛出异常、打印日志时可以使用
  • Task:需要知道是否完成(或失败)等状态,但是不需要返回值
  • Task<T>:在Task的基础上还想要返回值

其他

异步函数不能使用ref/out修饰参数

实现原理剖析

如果使用反汇编等手段,可以看到上述示例代码的编译:

C#异步编程之async/await详解

在返回1之前,好像有什么“奇怪的东西”被调用,编译器又背着开发者偷偷干了什么呢?

实现原理示例

在微软的开发博客里有一个叫谢尔盖·杰普利亚科夫(Sergey Tepliakov)的毛子曾提到这部分,来看一下他的示例:

源码

class StockPrices
{
    private Dictionary<string, decimal> _stockPrices;
    public async Task<decimal> GetStockPriceForAsync(string companyId)
    {
        await InitializeMapIfNeededAsync();
        _stockPrices.TryGetValue(companyId, out var result);
        return result;
    }
 
    private async Task InitialiphpzeMapIfNeededAsync()
    {
        if (_stockPrices != null)
            return;
 
        await Task.Delay(42);
        // Getting the stock prices from the external source and cache in memory.
        _stockPrices = new Dictionary<string, decimal> { { "MSFT", 42 } };
    }
}

这是他的源代码,这个类叫做StockPrices(股票价格),其核心业务是根据公司ID查询股票价格GetStockPriceForAsync,这是一个异步调用,首先它先异步调用InitializeMapIfNeededAsync对数据库进行初始化,初始化完成尝试从数据库中获取该公司的股票价格返回。

上述提到编译器偷偷自己生成了代码,如果手动实现大概是怎样的呢?来看谢尔盖给出的解:

手动实现

class GetStockPriceForAsync_StateMAChine
{
    enum State { Start, Step1, }
    private readonly StockPrices @this;
    private readonly string _companyId;
    private readonly TaskCompletionSource<decimal> _tcs;
    private Task _initializeMapIfNeededTask;
    private State _state = State.Start;

    public GetStockPriceForAsync_StateMachine(StockPrices @this, string companyId)
    {
        this.@this = @this;
        _companyId = companyId;
    }

    public void Start()
    {
        try
        {
            if (_state == State.Start)
            {
                // The code from the start of the method to the first 'await'.

                if (string.IsNullOrEmpty(_companyId))
                    throw new ArgumentNullException();

                _initializeMapIfNeededTask = @this.InitializeMapIfNeeded();

                // Update state and schedule continuation
                _state = State.Step1;
                _initializeMapIfNeededTask.ContinueWith(_ => Start());
            }
            else if (_state == State.Step1)
            {
                // Need to check the error and the cancel case first
                if (_initializeMapIfNeededTask.Status == TaskStatus.Canceled)
                    _tcs.SetCanceled();
                else if (_initializeMapIfNeededTask.Status == TaskStatus.Faulted)
                    _tcs.SetException(_initializeMapIfNeededTask.Exception.InnerException);
                else
                {
                    // The code between first await and the rest of the method

                    @this._store.TryGetValue(_companyId, out var result);
                    _tcs.SetResult(result);
                }
            }
        }
        catch (Exception e)
        {
            _tcs.SetException(e);
        }
    }

    public Task<decimal> Task => _tcs.Task;
}

public Task<decimal> GetStockPriceForAsync(string companyId)
{
    var stateMachine = new GetStockPriceForAsync_StateMachine(this, companyId);
    stateMachine.Start();
    return stateMachine.Task;
}

从类名GetStockPriceForAsync_StateMachine可以看到,他为这个异步调用生成了一个状态机来实现异步,先来看下成员变量:

  • StockPrices: 原来那个“股票价格”类的引用
  • _companyId: 调用方法时的参数公司ID
  • _tcs:TaskCompletionSource 创建并完成该任务的来源。
  • _initializeMapIfNeededTask:调用初始化数据的异步任务
  • _state:状态枚举
  • Task:直接就是_tcs.Task,即该任务创建并完成的来源

现在看来这段代码的逻辑就比较清楚了,在调用异步查询股票的接口时,创建了一个状态机并调用状态机的Start函数,第一次进入start函数时状态机的状态是Start状态,它给_initializeMapIfNeededTask赋值,把状态机状态流转到Step1,并让_initializeMapIfNeededTask执行结束末尾再次调用Start函数(ContinueWith)。

_initializeMapIfNeededTask任务在等待了42毫秒后(Task.Delay(42)),末尾时再次调用了Start函数,此时状态为Step1。首先检查了Task状态,符合要求调用_tcs.SetResult(其实是给Task的Result赋值),此时异步任务完成。

TaskCompletionSource

看官方文档给的定义:

表示未绑定到委托的 Task<TResult> 的制造者方,并通过 Task 属性提供对使用者方的访问

简单的示例:

static void Main()
{
	TaskCompletionSource<int> tcs1 = new TaskCompletionSource<int>();
	Task<int> t1 = tcs1.Task;

	// Start a background task that will complete tcs1.Task
	Task.Factory.StartNew(() =>
	{
		Thread.Sleep(1000);
		tcs1.SetResult(15);
	});
}

看的出来这个类就是对Task的包装,方便创建分发给使用者的任务。其核心就是包装Task并方便外面设置其属性和状态

Task.ContinueWith

创建一个在目标 Task 完成时异步执行的延续任务

可以传入一个委托,在Task完成的末尾调用。这是一个典型的续体传递风格(continuation-pass style)。

续体传递风格

续体传递风格(continuation-pass style, CPS),来看维基百科的描述:

A function written in continuation-passing style takes an extra argument: an explicit "continuation"; i.e., a function of one argument. When the CPS function has computed its result value, it "returns" it by calling the continuation function with this vhttp://www.devze.comalue as the argument. That means that when invoking a CPS function, the calling function is required to supply a procedure to be invoked with the subroutine's "return" value. Expressing code in this form makes a number of things explicit which are implicit in direct style. These include: procedure returns, which become apparent as calls to a continuation; intermediate values, which are all given names; order of argument evaLuation, which is made explicit; and tail calls, which simply call a procedure with the same continuation, unmodified, that was passed to the caller

大概的意思是,这种风格的函数比起普通的有一个额外的函数指针参数,调用结束(即将return)调用或者函数参数(替代直接return到调用者Caller)。还有一些其他细节,就不多说了,感兴趣自行翻译查看。

来看一个极简的例子:

int a = b + c + d;

这是一个链式运算,是有顺序的,在C++中,上述运算其实是:

int a = (b + c) + d; 先计算tmp = b + c(tmp是寄存器上一个临时的值,也称将亡值),然后计算 int a = tmp + c

使用续体传递来模拟这一过程:

class Program
{
    public class Result<T>
    {
        public T V;
    }

    static void Main(string[] args)
    {
        int a = 1;
        int b = 2;
        int c = 3;
        int d = 4;

        Result<int> ar = new() { V = a};
        Calc3(ar, b, c, d, Calc2);

        Console.WriteLine($"a = {ar.V}");
    }

    static void Calc3(Result<int> ar, int b, int c, int d, Action<Result<int>, int, int> continues)
    {
        int tmp = b + c;
        continues(ar, tmp, d);
    }

    static void Calc2(Result<int> ar, int tmp, int d)
    {
        ar.V = tmp + d;
    }
}

上述代码应该很清楚了,稍微看下应该能看明白。

C#编译器的实现

struct _GetStockPriceForAsync_d__1 : IAsyncStateMachine
{
    public StockPrices __this;
    public string companyId;
    public AsyncTaskMethodBuilder<decimal> __builder;
    public int __state;
    private TaskAwaiter __task1Awaiter;

    public void MoveNext()
    {
        decimal result;
        try
        {
            TaskAwaiter awaiter;
            if (__state != 0)
            {
                // State 1 of the generated state machine:
                if (string.IsNullOrEmpty(companyId))
                    throw new ArgumentNullException();

                awaiter = __this.InitializeLocalStoreIfNeededAsync().GetAwaiter();

                // Hot path optimization: if the task is completed,
                // the state machine automatically moves to the next step
                if (!awaiter.IsCompleted)
                {
                    __state = 0;
                    __task1Awaiter = awaiter;

                    // The following call will eventually cause boxing of the state machine.
                    __builder.AwaitUnsafeOnComplet开发者_JAVAed(ref awaiter, ref this);
                    return;
                }
            }
            else
            {
                awaiter = __task1Awaiter;
                __task1Awaiter = default(TaskAwaiter);
                __state = -1;
            }

            // GetResult returns void, but it'll throw if the awaited task failed.
            // This exception is catched later and changes the resulting task.
            awaiter.GetResult();
            __this._stocks.TryGetValue(companyId, out result);
        }
        catch (Exception exception)
        {
            // Final state: failure
            __state = -2;
            __builder.SetException(exception);
            return;
        }

        // Final state: success
        __state = -2;
        __builder.SetResult(result);
    }

    void IAsyncStateMachine.SetStateMachine(IAsyncStateMachine stateMachine)
    {
        __builder.SetStateMachine(stateMachine);
    }
}

[AsyncStateMachine(typeof(_GetStockPriceForAsync_d__1))]
public Task<decimal> GetStockPriceFor(string companyId)
{
    _GetStockPriceForAsync_d__1 _GetStockPriceFor_d__;
    _GetStockPriceFor_d__.__this = this;
    _GetStockPriceFor_d__.companyId = companyId;
    _GetStockPriceFor_d__.__builder = AsyncTaskMethodBuilder<decimal>.Create();
    _GetStockPriceFor_d__.__state = -1;
    var __t__builder = _GetStockPriceFor_d__.__builder;
    __t__builder.Start<_GetStockPriceForAsync_d__1>(ref _GetStockPriceFor_d__);
    return _GetStockPriceFor_d__.__builder.Task;
}

比较一下C#编译器生成的状态机:

  • __this:St编程ockPrices“股票价格”类的引用
  • companyId:公司ID参数
  • __builder:AsyncTaskMethodBuilder类型的表示返回任务的异步方法生成器
  • __state:状态索引
  • __task1Awaiter:TaskAwaiter类型,提供等待异步任务完成的对象

上述成员有一些和之前手撸的状态机不太一样,等下面会介绍,先来这一套的逻辑:

首先创建了一个_GetStockPriceForAsync_d__1状态机_GetStockPriceFor_d__并初始化赋值,随后调用了这个状态机的__builder的Start函数并把该状态机作为引用参数传入。__builder.Start函数会调到该状态机的MoveNext函数(下面会介绍),这和手撸代码状态机Start函数调用类似。

MoveNext与Start函数的处理过程也类似:第一次进来__state == -1,__builder.AwaitUnsafeOnCompleted切换上下文执行InitializeLocalStoreIfNeededAsync异步任务,并指定在完成后切换到当前上下文调用该状态机的MoveNext函数,类似手撸代码的Task.ContinueWith。第二次进入时,执行到__builder.SetResult(result),异步任务基本完成。

上述描述也是忽略了一些细节,下面是调用的时序图,会更清楚些,有些不太清楚的点后面会详细介绍。

C#异步编程之async/await详解

TaskAwaiter

来看下官方定义:

public readonly struct TaskAwaiter : System.Runtime.CompilerServices.ICriticalNotifyCompletion 提供等待异步任务完成的对象

结构:

C#异步编程之async/await详解

可以看到,这个所谓“等待异步任务完成的对象”,主要是保证实现ICriticalNotifyCompletion的接口OnCompleted等。

AsyncTaskMethodBuilder<TAwaiter,TStateMachine>(TAwaiter, TStateMachine)

官方定义:

C#异步编程之async/await详解

个人认为可以视为异步任务的“门面”,它负责启动状态机,传递一些中间状态,并在最终SetResult时表示它和其子例程的异步任务结束。其中有一个方法AwaitUnsafeOnCompleted,值得研究一下。

AsyncTaskMethodBuilder.AwaitUnsafeOnCompleted

这个方法在上述中一笔带过,被描述为类似Task.ContinueWith,确实如此,但执行过程相当复杂,在这里也只是简单介绍下过程。

AwaitUnsafeOnCompleted首先会调用GetCompletionAction,GetCompletionAction创建了一个保存了上下文 context = ExecuteContext.FastCapture()的MoveNextRunner,并返回了指向的MoveNextRunner.Run函数的委托。http://www.devze.com

接着调用参数awaiter的UnsafeOnCompleted(completionAction)函数,这里completionAction就是上述的那个委托,内部调用了成员Task.SetContinuationForAwait函数来初始化续体,SetContinuationForAwait又调用AddTaskContinuation把延续方法添加到Task中,当上述示例源码中的InitializeMapIfNeededAsync函数执行完调用Runner.Run:

[SecuritySafeCritical]
internal void Run()
{
    if (this.m_context != null)
    {
        try
        {
            // 我们并未给 s_invokeMoveNext 赋值,所以 callback == null
            ContextCallback callback = s_invokeMoveNext;
            if (callback == null)
            {
                // 将回调设置为下方的 InvokeMoveNext 方法
                s_invokeMoveNext = callback = new
                ContextCallback(AsyncMethodBuilderCore.MoveNextRunner.InvokeMoveNext);
            }
            ExecutionContext.Run(this.m_context, callback, this.m_stateMachine, true);
            return;
        }
        finally
        {
            this.m_context.Dispose();
        }
    }
    this.m_stateMachine.MoveNext();
}

[SecurityCritical]
private static void InvokeMoveNext(object stateMachine)
{
    ((IAsyncStateMachine) stateMachine).MoveNext();
}

((IAsyncStateMachine) stateMachine).MoveNext() 重新调用了状态机的MoveNext()

以上就是C#异步编程之async/await详解的详细内容,更多关于C#异步编程async await的资料请关注我们其它相关文章!

0

上一篇:

下一篇:

精彩评论

暂无评论...
验证码 换一张
取 消

最新开发

开发排行榜