开发者

Kotlin协程launch启动流程原理详解

目录
  • 1.launch启动流程
    • 反编译后的Java代码
  • 2.协程是如何被启动的

    1.launch启动流程

    已知协程的启动方式之一是Globalscope.launch,那么Globalscope.launch的流程是怎样的呢,直接进入launch的源码开始看起。

    fun main() {
        coroutineTest()
        Thread.sleep(2000L)
    }
    val block = suspend {
        println("Hello")
        delay(1000L)
        println("Kotlin")
    }
    private fun coroutineTest() {
        CoroutineScope(Job()).launch {
            withContext(Dispatchers.IO) {
                block.invoke()
            }
        }
    }
    

    反编译后的Java代码

    public final class CoroutineDemoKt {
       @NotNull
       private static final Function1 block;
       public static final void main() {
          coroutineTest();
          Thread.sleep(2000L);
       }
       // $FF: synthetic method
       public static void main(String[] var0) {
          main();
       }
       @NotNull
       public static final Function1 getBlock() {
          return block;
       }
       private static final void coroutineTest() {
          BuildersKt.launch$default(CoroutineScopeKt.CoroutineScope((CoroutineContext)JobKt.Job$default((Job)null, 1, (Object)null)), (CoroutineContext)null, (CoroutineStart)null, (Function2)(new Function2((Continuation)null) {
             int label;
             @Nullable
             public final Object invokeSuspend(@NotNull Object $result) {
                Object var2 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
                switch(this.label) {
                case 0:
                   ResultKt.throwOnFailure($result);
                   CoroutineContext var10000 = (CoroutineContext)Dispatchers.getIO();
                   Function2 var10001 = (Function2)(new Function2((Continuation)null) {
                      int label;
                      @Nullable
                      public final Object invokeSuspend(@NotNull Object $result) {
                         Object var2 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
                         switch(this.label) {
                         case 0:
                            ResultKt.throwOnFailure($result);
                            Function1 var10000 = CoroutineDemoKt.getBlock();
                            this.label = 1;
                            if (var10000.invoke(this) == var2) {
                               return var2;
                            }
                            break;
                         case 1:
                            ResultKt.throwOnFailure($result);
                            break;
                         default:
                   http://www.devze.com         throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
                         }
                         return Unit.INSTANCE;
                      }
                      @NotNull
                      public final Continuation create(@Nullable Object value, @NotNull Continuation completion) {
                         Intrinsics.checkNotNullParameter(completion, "completion");
                         Function2 var3 = new <anonymous constructor>(completion);
                         return var3;
                      }
                      public final Object invoke(Object var1, Object var2) {
                         return ((<undefinedtype>)this.create(var1, (Continuation)var2)).invokeSuspend(Unit.INSTANCE);
                      }
                   });
                   this.label = 1;
                   if (BuildersKt.withContext(var10000, var10001, this) == var2) {
                      return var2;
                   }
                   break;
                case 1:
                   ResultKt.throwOnFailure($result);
                   break;
                default:
                   throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
                }
                return Unit.INSTANCE;
             }
             @NotNull
             public final Continuation create(@Nullable Object value, @NotNull Continuation completion) {
                Intrinsics.checkNotNullParameter(completion, "completion");
                Function2 var3 = new <anonymous constructor>(completion);
                return var3;
             }
             public final Object invoke(Object var1, Object var2) {
                return ((<undefinedtype>)this.create(var1, (Continuation)var2)).invokeSuspend(Unit.INSTANCE);
             }
          }), 3, (Object)null);
       }
       static {
          Function1 var0 = (Function1)(new Function1((Continuation)null) {
             int label;
             @Nullable
             public final Object invokeSuspend(@NotNull Object $result) {
                Object var3 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
                String var2;
                swandroiditch(this.label) {
                case 0:
                   ResultKt.throwOnFailure($result);
                   var2 = "Hello";
                   System.out.println(var2);
                   this.label = 1;
                   if (DelayKt.delay(1000L, this) == var3) {
                      return var3;
                   }
                   break;
                case 1:
                   ResultKt.throwOnFailure($result);
                   break;
                default:
                   throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
                }
                var2 = "Kotlin";
                System.out.println(var2);
                return Unit.INSTANCE;
             }
             @NotNull
             public final Continuation create(@NotNull Continuation completion) {
                Intrinsics.checkNotNullParameter(completion, "completion");
                Function1 var2 = new <anonymous constructor>(completion);
                return var2;
             }
             public final Object invoke(Object var1) {
                return ((<undefinedtype>)this.create((Continuation)var1)).invokeSuspend(Unit.INSTANCE);
             }
          });
          block = var0;
       }
    }
    

    先分析一下上面代码的流程:

    • 首先声明了一个Function1类型的block变量,这个变量就是demo中的block,然后会在static函数中会被赋值。
    • 接下来就是coroutineTest函数的调用。这个函数中的第一行代码就是CoroutineScope的传参和一些默认值
    • 然后通过89行的invoke进入到了外层状态机流转的过程
    • 95行的static表示的是内部的挂起函数就是demo中的block.invoke,它是以匿名内部类的方式实现php,然后执行内部的状态机流转过程,最后给block赋值。
    • block被赋值后最终在Function1 var10000 = CoroutineDemoKt.getBlock();被调用

    那么这个过程又是如何实现的,进入launch源码进行查看:

    public fun CoroutineScope.launch(
        context: CoroutineContext = EmptyCoroutineContext,
        start: CoroutineStart = CoroutineStart.DEFAULT,
        block: suspend CoroutineScope.() -> Unit
    ): Job {
        val newContext = newCoroutineContext(context)
        val coroutine = if (start.isLazy)
            LazyStandaloneCoroutine(newContext, block) else
            StandaloneCoroutine(newContext, active = true)
        coroutine.start(start, coroutine, block)
        return coroutine
    }
    

    这里的block指的就是demo中的block代码段

    再来看一下里面的几行代码的含义:

    • newCoroutineContext: 通过默认的或者传入的context创建一个新的Context;
    • coroutine: launch 会根据传入的启动模式来创建对应的协程对象。这里有两种,一种是标准的,一种是懒加载的。
    • coroutine.start: 尝试启动协程

    2.协程是如何被启动的

    通过launch的源码可知协程的启动是通过coroutine.start启动的,那么协程的启动流程又是怎样的?

    public abstract class AbstractCoroutine<in T>(
        parentContext: CoroutineContext,
        initParentJob: Boolean,
        active: Boolean
    ) : JobSupport(active), Job, Continuation<T>, CoroutineScope {
        ...
        /**
         * 用给定的代码块启动这个协程编程并启动策略。这个函数在这个协程上最多调用一次。
         */
        public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) {
            start(block, receiver, this)
        }
    }
    

    start函数中传入了三个参数,只需要关注第一个参数即可。

    public enum class CoroutineStart {
        ...
        /**
         * 用这个协程的启动策略启动相应的块作为协程。
         */
        public operator fun <T> invoke(block: suspend () -> T, completion: Continuation<T>): Unit =
        when (this) {
            DEFAULT -> block.startCoroutineCancellable(completion)
            ATOMIC -> block.startCoroutine(completion)
            UNDISPATCHED -> block.startCoroutineUndispatched(completion)
            LAZY -> Unit // will start lazily
        }
    }
    

    启动策略的具体实现有三种方式,这里只需要分析startCoroutine,另外两个其实就是它的基础上增加了一些功能,其中前者代表启动协程以后可以在等待调度时取消,后者表示协程启动后不会被分发。

    /**
     * 创建没有接收方且结果类型为T的协程,这个函数每次调用时都会创建一个新的可挂起的实例。
     */ 
    public fun <T> (suspend () -> T).startCoroutine(
        completion: Continuation<T>
    ) {
        createCoroutineUnintercepted(completion).intercepted().resume(Unit)
    }
    

    createCoroutineUnintercepted在源代码中只是一个声明,它的具体实现是在IntrinsicsJvm.kt文件中。

    //IntrinsicsJvm.kt#createCoroutineUnintercepted
    /**
     * 创建没有接收方且结果类型为T的非拦截协程。这个函数每次调用时都会创建一个新的可挂起的实例。
     */
    public actual fun <T> (suspend () -> T).createCoroutineUnintercepted(
        completion: Continuation<T>
    ): Continuation<Unit> {
        val probeCompletion = probeCoroutineCreated(completion)
        return if (this is BaseContinuationImpl)
            create(probeCompletion)
        else
            createCoroutineFromSuspendFunction(probeCompletion) {
                (this as Function1<Continuation<T>, Any?>).invoke(it)
            }
    }
    

    actual代表了 createCoroutineUnintercepted() 在 JVM 平台的实现。

    createCoroutineUnintercepted是一个扩展函数,接收者类型是一个无参数,返回值为 T 的挂起函数或者 Lambda。

    第9行代码中的this代表的是(suspend () -> T)也就是invoke函数中的block变量,这个block变量就是demo中的block代码段。

    第9行的BaseContinuationImpl是一个抽象类它实现了Continuation

    关于if (this is BaseContinuationImpl)的结果暂且不分析,先分析两种情况下的create函数:

    • create(probeCompletion):
    //ContinuationImpl.kt#create
    public open fun create(completion: Continuation<*>): Continuation<Unit> {
        throw UnsupportedOperationException("create(Continuation) has not been overridden")
    }
    public open fun create(value: Any?, completion: Continuation<*>): Continuation<Unit> {
        throw UnsupportedOperationException("create(Any?;Continuation) has not been overridden")
    }
    

    这个create函数抛出一个异常,意思就是这个create()没有被重写,而这个create()的重写就是在反编译后的Java代码中的create函数

    @NotNull
    public final Continuation create(@Nullable Object value, @NotNull Continuation completion) {
        Intrinsics.checkhttp://www.devze.comNotNullParameter(completion, "completion");
        Function2 var3 = new <anonymous constructor>(completion);
        return var3;
    }
    
    • createCoroutineFromSuspendFunction(probeCompletion):
    //IntrinsicsJvm.kt#createCoroutineFromSuspendFunction
    /**
     * 当一个被suspend修饰的lambda表达式没有继承BaseContinuationImpl类时,则通过此方法创建协程。
     *
     * 它发生在两种情况下:
     * 1.lambda表达式中调用了其他的挂起方法
     * 2.挂起方法是通过Java实现的
     *
     * 必须将它封装到一个扩展[BaseContinuationImpl]的实例中,因为这是所有协程机制的期望。 
     */
    private inline fun <T> createCorouti开发者_JS开发neFromSuspendFunction(
    	completion: Continuation<T>,
    	crossinline block: (Continuation<T>) -> Any?
    		): Continuation<Unit> {
    	val context = completion.context
    	// context为空创建一个受限协程
    	return if (context === EmptyCoroutineContext)
    	//受限协程:只能调用协程作用域中提供的挂起方式挂起,其他挂起方法不能调用
    	object : RestrictedContinuationImpl(completion as Continuation<Any?>) {
    		private var label = 0
    		override fun invokeSuspend(result: Result<Any?>): Any? =
    		when (label) {
    			0 -> {
    				label = 1
    				result.getOrThrow() // 如果试图以异常开始,则重新抛出异常(将被BaseContinuationImpl.resumeWith捕获)
    				block(this) // 运行块,可以返回或挂起
    			}
    			1 -> {
    				label = 2
    				result.getOrThrow() // 这是block挂起的结果
    			}
    			else -> error("This coroutine had already completed")
    		}
    	}
    	else
    	//创建一个正常的协程
    	object : ContinuationImpl(completion as Continuation<Any?>, context) {
    		private var label = 0
    		override fun invokeSuspend(result: Result<Any?>): Any? =
    		when (label) {
    			0 -> {
    				label = 1
    				result.getOrThrow() // 如果试图以异常开始,则重新抛出异常(将被BaseContinuationImpl.resumeWith捕获)
    				block(this) // 运行块,可以返回或挂起
    			}
    			1 -> {
    				label = 2
    				result.getOrThrow() // 这是block挂起的结果
    			}
    			else -> error("This coroutine had already completed")
    		}
    	}
    }
    

    createCoroutineFromSuspendFunction就是当一个被suspend修饰的Lambda表达式没有继承BaseContinuationImpl是才会被调用,然后根据上下文是否为空创建不同类型的协程。

    两种情况都已经分析完了,那么现在if (this is BaseContinuationImpl)会执行哪一个呢,首先这里的this所指的就是demo中的block代码段,Kotlin编译器编译后会自动生成一个类就是上面的static,它会继承SuspendLambda类,而这个SuspendLambda类继承自ContinuationImpl,ContinuationImpl继承自BaseContinuationImpl,因此可以得到判断结果为true,

    createCoroutineUnintercepted的过程就是协程创建的过程。

    然后就是intercepted函数,这个函数的具体实现也在IntrinsicsJvm.kt中,那么intercepted又做了什么呢

    public expect fun <T> Continuation<T>.intercepted(): Continuation<T>
    //具体实现
    //IntrinsicsJvm.kt#intercepted
    public actual fun <T> Continuation<T>.intercepted(): Continuation<T> =
        (this as? ContinuationImpl)?.intercepted() ?: this
    

    首先有个强转,通过上面的分析这个强转是一定会成功的,到这里intercepted就进入到了ContinuationImpl中了

    internal abstract class ContinuationImpl(
        completion: Continuation<Any?>?,
        private val _context: CoroutineContext?
    ) : BaseContinuationImpl(completion) {
    	...
        @Transient
        private var intercepted: Continuation<Any?>? = null
    	//如果没有缓存,则从上下文获取拦截器,调用interceptContinuation进行拦截
    	//将获取到的内容保存到全局变量
        public fun intercepted(): Continuation<Any?> =
            intercepted
                ?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)
                    .also { intercepted = it }
    }
    

    这里的ContinuationInterceptor指的就是Demo中传输的Dispatcher.IO,默认值时Dispatcher.Default

    再回到startContinue中还剩最后一个resume

    /**
     * 恢复执行相应的协程传递值作为最后一个挂起点的返回值。
     */
    public inline fun <T> Continuation<T>.resume(value: T): Unit =
    	resumeWith(Result.success(value))
    public interface Continuation<in T> {
    	/**
         * 与此延续相对应的协程的上下文。
         */
    	public val context: CoroutineContext
    	/**
         * 恢复执行相应的协程传递值作为最后一个挂起点的返回值。
         */
    	public fun resumeWith(result: Result<T>)
    }
    

    这里的resume(Unit)作用就相当与启动了一个协程。

    上面的启动流程中为了方便分析的是CoroutineStart.ATOMIC,而默认的是CoroutineStart.DEFAULT,下面分析一下DEFAULT的流程

    //Cancellable.kt#startCoroutineCancellable
    public fun <T> (suspend () -> T).startCoroutineCancellable(completion: Continuation<T>): Unit = runSafely(completion) {
        createCoroutineUnintercepted(completion).intercepted().resumeCancellableWith(Result.success(Unit))
    }
    

    startCoroutineCancellable对于协程的创建和拦截与ATOMIC是一样的,区别就在于resumeCancellableWith

    //DispatchedContinuation#resumeCancellableWith
    public fun <T> Continuation<T>.resumeCancellableWith(
    	result: Result<T>,
    	onCancellation: ((cause: Throwable) -> Unit)? = null
    ): Unit = when (this) {
    	is DispatchedContinuation -> resumeCancellableWith(result, onCancellation)
    	else -> resumeWith(result)
    }
    // 我们内联它来保存堆栈上的一个条目,在它显示的情况下(无限制调度程序)
    // 它只在Continuation<T>.resumeCancellableWith中使用
    @Suppress("NOTHING_TO_INLINE")
    inline fun resumeCancellableWith(
    	result: Result<T>,
    	noinline onCancellation: ((cause: Throwable) -> Unit)?
    		) {
    	val state = result.toState(onCancellation)
    	//是否需要分发
    	if (dispatcher.isDispatchNeeded(context)) {
    		_state = state
    		resumeMode = MODE_CANCELLABLE
    		//将可运行块的执行分派给给定上下文中的另一个线程
    		dispatcher.dispatch(context, this)
    	} else {
    		executeUnconfined(state, MODE_CANCELLABLE) {
    			//协程未被取消
    			if (!resumeCancelled(state)) {
    				// 恢复执行
    				resumeUndispatchedwith(result)
    			}
    		}
    	}
    }
    //恢复执行前判断协程是否已经取消执行
    inline fun resumeCancelled(state: Any?): Boolean {
    	//获取当前协程任务
    	val job = context[Job]
    	//如果不为空且不活跃
    	if (job != null && !job.isActive) {
    		val cause = job.getCancellationException()
    		cancelCompletedResult(state, cause)
    		//抛出异常
    		resumeWithException(cause)
    		return true
    	}
    	return false
    }
    //我们需要内联它来在堆栈中保存一个条目
    inline fun resumeUndispatchedWith(result: Result<T>) {
    	withContinuationContext(continuation, countOrElement) {
    		continuation.resumeWith(result)
    	}
    }

    以上就是Kotlin协程launch启动流程原理详解的详细内容,更多关于Kotlin协程launch启动流程的资料请关注我们其它相关文章!

    0

    上一篇:

    下一篇:

    精彩评论

    暂无评论...
    验证码 换一张
    取 消

    最新开发

    开发排行榜