这篇文章主要和大家探讨下关于rxjava的Scheduler和协程的Dispatcher。
这两个东西的用处都是处理线程调度用的。
Rxjava Scheduler
释义
Scheduler 与 Worker 在 RxJava2 中是一个非常重要的概念,他们是 RxJava 线程调度的核心与基石。Scheduler主要负责的就是一件事情,定义好每个流模块的执行线程。
源码追溯
源码分析我们先从subscribeOn方法开始。
代码语言:javascript复制@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn(this, scheduler));
}
复制代码
上述代码可以看出,subscribeOn的核心代码是ObservableSubscribeOn,这个类只做了一件事情,它会把上一个流用装饰者模式包装了一下,当上一个流被执行的时候会将流执行到scheduler的线程上去。
代码语言:javascript复制public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
public ObservableSubscribeOn(ObservableSource source, Scheduler scheduler) {
super(source);
this.scheduler = scheduler;
}
@Override
public void subscribeActual(final Observersuper T> s) {
final SubscribeOnObserver parent = new SubscribeOnObserver(s);
s.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
}
复制代码
ObservableSubscribeOn的subscribeActual方法执行之后,scheduler.scheduleDirect(new SubscribeTask(parent))通过这段代码将当前流运行到Scheduler的线程内。
之后我们可以看下Scheduler的实现累,RxAndroid的HandlerScheduler,看看对于安卓的调度器,RxJava是怎么写的。
代码语言:javascript复制@Overridepublic
Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) {
if (run == null) throw new NullPointerException("run == null");
if (unit == null) throw new NullPointerException("unit == null");
run = RxJavaPlugins.onSchedule(run);
ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
handler.postDelayed(scheduled, unit.toMillis(delay));
return scheduled;
}
复制代码
scheduler.scheduleDirect执行的时候就会调用scheduleDirect方法。看得出来,方法被触发之后调用了handler的postdelay方法,将当前的Runnable运行到该handler的线程上去。
代码语言:javascript复制 static final class EventLoopWorker extends Scheduler.Worker {
private final CompositeDisposable tasks;
private final CachedWorkerPool pool;
private final ThreadWorker threadWorker;
final AtomicBoolean once = new AtomicBoolean();
EventLoopWorker(CachedWorkerPool pool) {
this.pool = pool;
this.tasks = new CompositeDisposable();
this.threadWorker = pool.get();
}
@Override
public void dispose() {
if (once.compareAndSet(false, true)) {
tasks.dispose();
// releasing the pool should be the last action
pool.release(threadWorker);
}
}
@Override
public boolean isDisposed() {
return once.get();
}
@NonNull
@Override
public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
if (tasks.isDisposed()) {
// don't schedule, we are unsubscribed
return EmptyDisposable.INSTANCE;
}
return threadWorker.scheduleActual(action, delayTime, unit, tasks);
}
}
复制代码
接下来我们要说另外一个Worker,上面是IoScheduler里面的Worker代码。Work在初始化的时候会去生成一个线程池,这个线程池就是我们后续schedule执行的地方,当一个Runnnable被调度到这个work上的时,会调用schedule方法,然后将这个Runnnable运行到这个线程池上去。
协程 Dispatcher
释义
协程上下文(coroutine context)包含一个协程调度器(参阅 CoroutineDispatcher),协程调度器 用于确定执行协程的目标载体,即运行于哪个线程,包含一个还是多个线程。协程调度器可以将协程的执行操作限制在特定线程上,也可以将其分派到线程池中,或者让它无限制地运行。
源码追溯
代码语言:javascript复制public suspend fun withContext(
context: CoroutineContext,
block: suspend CoroutineScope.() -> T
): T = suspendCoroutineUninterceptedOrReturn sc@ { uCont ->
// compute new context
val oldContext = uCont.context
val newContext = oldContext context
// always check for cancellation of new context
newContext.checkCompletion()
// FAST PATH #1 -- new context is the same as the old one
if (newContext === oldContext) {
val coroutine = ScopeCoroutine(newContext, uCont)
return@sc coroutine.startUndispatchedOrReturn(coroutine, block)
}
// FAST PATH #2 -- the new dispatcher is the same as the old one (something else changed)
// `equals` is used by design (see equals implementation is wrapper context like ExecutorCoroutineDispatcher)
if (newContext[ContinuationInterceptor] == oldContext[ContinuationInterceptor]) {
val coroutine = UndispatchedCoroutine(newContext, uCont)
// There are changes in the context, so this thread needs to be updated
withCoroutineContext(newContext, null) {
return@sc coroutine.startUndispatchedOrReturn(coroutine, block)
}
}
// SLOW PATH -- use new dispatcher
val coroutine = DispatchedCoroutine(newContext, uCont)
coroutine.initParentJob()
block.startCoroutineCancellable(coroutine, coroutine)
coroutine.getResult()
}
复制代码
这是协程的线程调度的代码,当发现当前的调度器和目标调度器不是同一个的情况下,会new一个DispatchedCoroutine,开始进行线程的调度操作。
coroutine.initParentJob()初始化父任务,这个方法需要一开始就被初始化调用的。 然后就是核心关键将block.startCoroutineCancellable(coroutine, coroutine),该方法会创建一个新的可挂起线程。 进行异步等待操作,当有值的情况下会回调将当前挂起结束,进行下一步获取值操作,然后将当前的线程返回。
在调用withContext方法的时候因为我们传入的是Dispatchers.Main
代码语言:javascript复制 @JvmStatic
public actual val Main: MainCoroutineDispatcher get() = MainDispatcherLoader.dispatcher
而DispatcherMain是可以有外部的fatroy构造的,由安卓的kotlin支持库中可以发现,其实现类是AndroidDispatcherFactory。
代码语言:javascript复制internal class AndroidDispatcherFactory : MainDispatcherFactory {
override fun createDispatcher(allFactories: List<MainDispatcherFactory>) =
HandlerContext(Looper.getMainLooper().asHandler(async = true), "Main")
override fun hintOnError(): String? = "For tests Dispatchers.setMain from kotlinx-coroutines-test module can be used"
override val loadPriority: Int
get() = Int.MAX_VALUE / 2
}
复制代码
这次真的转过来了把,没有干扰了把,各位老哥,我要给你们跪了啊。
接下来我们看下重头戏HandlerContext,这个类就是和rxjava的HandlerScheduler基本一模一样的线程调度器。
代码语言:javascript复制internal class HandlerContext private constructor(
private val handler: Handler,
private val name: String?,
private val invokeImmediately: Boolean
) : HandlerDispatcher(), Delay {
/**
* Creates [CoroutineDispatcher] for the given Android [handler].
*
* @param handler a handler.
* @param name an optional name for debugging.
*/
public constructor(
handler: Handler,
name: String? = null
) : this(handler, name, false)
@Volatile
private var _immediate: HandlerContext? = if (invokeImmediately) this else null
override val immediate: HandlerContext = _immediate ?:
HandlerContext(handler, name, true).also { _immediate = it }
override fun isDispatchNeeded(context: CoroutineContext): Boolean {
return !invokeImmediately || Looper.myLooper() != handler.looper
}
override fun dispatch(context: CoroutineContext, block: Runnable) {
handler.post(block)
}
override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
val block = Runnable {
with(continuation) { resumeUndispatched(Unit) }
}
handler.postDelayed(block, timeMillis.coerceAtMost(MAX_DELAY))
continuation.invokeOnCancellation { handler.removeCallbacks(block) }
}
override fun invokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle {
handler.postDelayed(block, timeMillis.coerceAtMost(MAX_DELAY))
return object : DisposableHandle {
override fun dispose() {
handler.removeCallbacks(block)
}
}
}
override fun toString(): String =
if (name != null) {
if (invokeImmediately) "$name [immediate]" else name
} else {
handler.toString()
}
override fun equals(other: Any?): Boolean = other is HandlerContext && other.handler === handler
override fun hashCode(): Int = System.identityHashCode(handler)
}
复制代码
DispatchedCoroutine在上面的这个挂起函数的父类CoroutineDispatcher,会调用dispatch方法,进行线程切换操作,然后是不是和上面的rxjava 有点似曾相似的感觉。
没错,各位看官,这次调用了handler.post(block)。所以我这次我真的下结论了,上篇文章是有点小微妙,但是这次应该没事清楚了。
结论
如果当你基本了解rxjava的调度器的实现的情况下。大胆点以后面试问你kotlin协程是如何实现调度的逻辑,你就把逻辑copy一遍告诉他就好了。
感谢
写这篇文章还收收集了一些资料的,谢谢各位大佬。
理解RxJava(三)线程调度原理分析
【译】kotlin 协程官方文档(4)-协程上下文和调度器(Coroutine Context and Dispatchers)