定时器算法

2022-06-28 18:49:24 浏览数 (1)

概述

在日常开发中, 定时任务是一个比较关键的功能。 Java 中一般使用 JDK 中 TimerScheduledExecutorService 和调度框架 Quartz等。 通常用于实现延时任务, 周期性任务等, 一般会有两种需求:

  1. 轮询定时任务:给定周期内扫描所有记录,查看是否有满足要求的数据。
  2. 延时消息:如常见的订单业务, 订单创建的时候发送一条 N 分钟到期的信息,一旦消息消费后便可判断订单是否可以取消

轮询的方式在数据量大的时候性能会比较差, 通常我们会选择第二种方式。

Timer

Timer 调度任务有一次性调度和循环调度,循环调度有分为:

固定速率调度(fixRate)

固定时延调度(fixDelay):

内部原理

Timer 类里包含一个任务队列和一个异步轮训线程。

  1. 任务队列是一个以下次执行时间比较的最小堆
  2. 任务队列里容纳了所有待执行的任务,所有的任务将会在这一个异步线程里执行,任务执行中代码不能抛出异常,否则会导致 Timer 线程挂掉,所有的任务都无法执行了。
  3. 单个任务也不易执行时间太长,否则会影响任务调度在时间上的精准性。比如你一个任务跑了太久,其它等着调度的任务就一直处于饥饿状态得不到调度。所有任务的执行都是这单一的 TimerThread 线程。

Timer 类:

代码语言:javascript复制
class Timer {
  // 任务队列, 任务通过 Timer.schedule 方法将任务加入 TaskQueue
  // taskQueue 是数组实现的一个最小堆
  TaskQueue queue = new TaskQueue();
  // 执行任务的 timer 线程
  TimerThread thread = new TimerThread(queue);
}

class TaskQueue {
  TimerTask[] queue = new TimerTask[128];
  int size;
}

任务调度过程:

代码语言:javascript复制
// 调度任务
private void sched(TimerTask task, long time, long period) {
  synchronized(task.lock) {
        if (task.state != TimerTask.VIRGIN)
                 throw new IllegalStateException(
                    "Task already scheduled or cancelled");
         task.nextExecutionTime = time;
         task.period = period;
         task.state = TimerTask.SCHEDULED;
    }
        queue.add(task);
        if (queue.getMin() == task)
            queue.notify();
}


// new Timer 的时候 内部的 TimerThread 已经开始 run 调用 mainLoop 了
// 运行任务
private void mainLoop() {
   while (true) {
        try {
            TimerTask task;
            boolean taskFired;
            synchronized(queue) {
                // 队列是空并且还有其他任务, 则 wait
                while (queue.isEmpty() && newTasksMayBeScheduled)
                    queue.wait();
                // 如果队列没有任务, 直接结束
                if (queue.isEmpty())
                    break;
                // taskQueue 按照 nextExecutionTime 进行堆排序, 取出最小的(应该执行的)任务
                task = queue.getMin();
                synchronized(task.lock) {
                    if (task.state == TimerTask.CANCELLED) {
                        queue.removeMin();
                        continue;
                    }
                    currentTime = System.currentTimeMillis();
                    executionTime = task.nextExecutionTime;
                    if (taskFired = (executionTime<=currentTime)) {
                        // 如果没有设置执行间隔, 就移除该任务
                        if (task.period == 0) {
                            queue.removeMin();
                            task.state = TimerTask.EXECUTED;
                        } else {
                            // 否则加入队列等待下一次执行(这里是修改下一次任务的执行时间)
                            // 计算下一次调度时间
                            queue.rescheduleMin(
                              task.period<0 ? currentTime   - task.period
                                            : executionTime   task.period);
                        }
                    }
                }
                if (!taskFired) // Task hasn't yet fired; wait
                    queue.wait(executionTime - currentTime);
            }
            if (taskFired)  // Task fired; run it, holding no locks
                task.run();
        } catch(InterruptedException e) {
   }
 }

注意:

  1. Timer 只能单线程调度
  2. TimerTask 中出现的异常会影响到 Timer 的执行

ScheduledThreadPoolExecutor

  • schedule 提交一个一次性触发的任务, 在给定延时后执行
  • ScheduleAtFixedRate 是基于固定时间间隔进行任务调度
  • ScheduleWithFixedDelay 取决于每次任务执行的时间长短,是基于不固定时间间隔的任务调度

使用:

代码语言:javascript复制
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(10);

//提交一个一次性触发的Runnable任务,在给定时延后执行。
public ScheduledFuture<?> schedule(
    Runnable command, 
    long delay, 
    TimeUnit unit
);

//提交一个定期重复的Runnable任务,在initialDelay时间后第一次执行,此后每隔period时间执行一次。如果某次执行抛出异常,之后所有任务取消执行。
// 如果一次执行时间过长,完成时已经超过下次执行开始时间,下一次执行会等待上一次执行结束后马上执行
public ScheduledFuture<?> scheduleAtFixedRate(
    Runnable command,
    long initialDelay,
    long period,
    TimeUnit unit
);


// 跟 scheduleAtFixRate 类似
// 区别是在上一次执行结束之后,再等待delay时间,然后再执行下一次任务。
public ScheduledFuture<?> scheduleWithFixedDelay(
    Runnable command,
    long initialDelay,
    long delay,
    TimeUnit unit
);
ScheduledThreadPoolExecutor 执行原理

无论是 scheduleAtFixedRate 还是 scheduleWithFixedDelay 都会把任务包装成 ScheduledFutureTask, 然后调用delayedExecute(RunnableScheduledFuture<?> task 处理, 这里以scheduleAtFixedRate为例:

代码语言:javascript复制
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                              long initialDelay,
                                              long period,
                                              TimeUnit unit) {
    if (command == null || unit == null)
        throw new NullPointerException();
    if (period <= 0)
        throw new IllegalArgumentException();
     // 包装成 ScheduledFutureTask
    ScheduledFutureTask<Void> sft =
        new ScheduledFutureTask<Void>(command,
                                      null,
                                      triggerTime(initialDelay, unit),
                                      unit.toNanos(period));                               
    RunnableScheduledFuture<Void> t = decorateTask(command, sft);
    sft.outerTask = t;
    // 处理任务
    delayedExecute(t);
    return t;
}

delayedExecute 逻辑如下:

代码语言:javascript复制
private void delayedExecute(RunnableScheduledFuture<?> task) {
    if (isShutdown())
        //如果线程池已经关闭,拒绝该任务
       reject(task);
    else {
        //将任务加入队列并再次检查线程池状态,不可运行则取消任务
        super.getQueue().add(task);
        //如果任务刚入队后线程池关闭了且不允许执行任务,
        //从任务队列移除该任务并取消之
        if (isShutdown() &&
            !canRunInCurrentRunState(task.isPeriodic()) &&
            remove(task))
            task.cancel(false);
        else
            // 保证核心线程运行任务
            ensurePrestart();
    }
}
void ensurePrestart() {
    int wc = workerCountOf(ctl.get());
    if (wc < corePoolSize)
        //如果worker数量小于corePoolSize,新建Worker
        //但是不设置firstTask而是让Worker从延迟队列获取任务
        addWorker(null, true);
    else if (wc == 0)
        //保证至少有一个Worker在运行
        addWorker(null, false);
}

可以看到以上代码并没有包含任何控制或响应延时的代码,因此这些逻辑应该是由延迟队列本身来控制的,这样就可以直接使用继承自ThreadPoolExecutor的方法完成其他相同的部分, 构造函数显示队列类型是DelayedWorkQueue

那我们回到加入队列的任务 ScheduledFutureTask 的 run 方法:

代码语言:javascript复制
public void run() {
    //根据periodic是否为0判断是否是周期性任务
    boolean periodic = isPeriodic();
    //判断为非可执行任务状态时,取消任务
    if (!canRunInCurrentRunState(periodic))
        cancel(false);
    //不是周期性任务,直接执行    
    else if (!periodic)
        ScheduledFutureTask.super.run();
    //是周期性任务runAndReset方法会执行在执行结束时将任务的状态重置为NEW,便于下次再次执行    
    else if (ScheduledFutureTask.super.runAndReset()) {
        //设置下周执行的时间
        setNextRunTime();
        //再次执行周期行任务
        reExecutePeriodic(outerTask);
    }
}

private void setNextRunTime() {
    long p = period;
    // 这里就是 fixRate 和 fixDelay 的区别, fixRate preiod > 0, 以固定频率执行(上次任务时间 执行周期)
    if (p > 0)
        time  = p;
    // 固定延迟, 则是当前系统时间 执行周期
    else
        time = triggerTime(-p);
}

long triggerTime(long delay) {
    // 返回当前系统时间加执行周期
    return now()  
        // 这里是为了防止溢出
        ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
}


void reExecutePeriodic(RunnableScheduledFuture<?> task) {
    if (canRunInCurrentRunState(true)) {
        //将任务重新入队
        super.getQueue().add(task);
        //如果线程池已关闭,将任务取消
        if (!canRunInCurrentRunState(true) && remove(task))
            task.cancel(false);
        else
            ensurePrestart();//再次执行任务
    }
}

TimeWheel 时间轮算法

TimeWheel时间轮算法,是一种实现延迟队列的巧妙且高效的算法,被应用在Netty,Zookeeper,Kafka等各种框架中, 应用场景广泛。

比如在Dubbo中,为增强系统的容错能力,会有相应的监听判断处理机制。比如RPC调用的超时机制的实现,消费者判断RPC调用是否超时,如果超时会将超时结果返回给应用层。

在Dubbo最开始的实现中,是将所有的返回结果(DefaultFuture)都放入一个集合中,并且通过一个定时任务,每隔一定时间间隔就扫描所有的future,逐个判断是否超时。

这样的实现方式虽然比较简单,但是存在一个问题就是会有很多无意义的遍历操作开销。比如一个RPC调用的超时时间是10秒,而设置的超时判定的定时任务是2秒执行一次,那么可能会有4次左右无意义的循环检测判断操作。

对于以上问题, 目的就是要减少额外扫描的次数,这样能减少 CPU 的开销, 时间轮可以很好的解决这个问题

时间轮介绍
单时间轮

单时间轮只有一个由bucket串起来的轮子,每个bucket下链接着未来对应时刻到期的节点。

假设相邻bucket到期时间的间隔为slot=1s,从当前时刻0s开始计时,1s时到期的定时器节点挂在bucket[1]下,2s时到期的定时器节点挂在bucket[2]下……

当tick检查到时间过去了1s时,bucket[1]下所有节点执行超时动作,当时间到了2s时,bucket[2]下所有节点执行超时动作……

上图只有 8 个 bucket, 如果按照 slot=expire 来算, 只能挂 8s 的定时任务, 超过 8s 可以使用 slot = expire % N, 这里需要引入 rotation 的概念,定时器中expire表示到期时间,rotation表示节点在时间轮转了几圈后才到期

多时间轮

)

Linux的多时间轮算法,借鉴了日常生活中水表的度量方法,通过低刻度走得快的轮子带动高一级刻度轮子走动的方法,达到了仅使用较少刻度即可表示很大范围度量值的效果

netty HashedWheelTimer 源码分析

HashedWheelTimer 是接口 io.netty.util.Timer 的实现:

代码语言:javascript复制
public interface Timer {

    // 创建一个定时任务
    Timeout newTimeout(TimerTask task, long delay, TimeUnit unit);

    // 停止所有的还没有被执行的定时任务
    Set<Timeout> stop();
}

Timeout 是一个接口类, TimerTask 非常简单,就一个 run() 方法:

代码语言:javascript复制
public interface TimerTask {
    void run(Timeout timeout) throws Exception;
}

public interface Timeout {
    Timer timer();
    TimerTask task();
    boolean isExpired();
    boolean isCancelled();
    boolean cancel();
}

构造函数太多,摘要一下几个重要的参数:

  • tickDuration 和 timeUnit 定义了一格的时间长度,默认的就是 100ms。
  • ticksPerWheel 定义了一圈有多少格,默认的就是 512;
  • leakDetection:用于追踪内存泄漏
  • maxPendingTimeouts:最大允许等待的 Timeout 实例数,也就是我们可以设置不允许太多的任务等待。如果未执行任务数达到阈值,那么再次提交任务会抛出 RejectedExecutionException 异常。默认不限制。

HashedWheelTimer 提交任务:

代码语言:javascript复制
public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
    if (task == null) {
        throw new NullPointerException("task");
    }
    if (unit == null) {
        throw new NullPointerException("unit");
    }

    // 校验等待任务数是否达到阈值 maxPendingTimeouts
    long pendingTimeoutsCount = pendingTimeouts.incrementAndGet();
    if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) {
        pendingTimeouts.decrementAndGet();
        throw new RejectedExecutionException("Number of pending timeouts ("
              pendingTimeoutsCount   ") is greater than or equal to maximum allowed pending "
              "timeouts ("   maxPendingTimeouts   ")");
    }

    // 如果工作线程没有启动,这里负责启动(如果是第一个任务, 还会 await 在这里)
    start();

    /** 下面的代码,构建 Timeout 实例,将其放到任务队列中。 **/

    // deadline 是一个相对时间,相对于 HashedWheelTimer 的启动时间
    long deadline = System.nanoTime()   unit.toNanos(delay) - startTime;

    // 防止 deadline 溢出
    if (delay > 0 && deadline < 0) {
        deadline = Long.MAX_VALUE;
    }
    // timeout 实例,一个上层依赖 timer,一个下层依赖 task,另一个是任务到期时间
    HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
    // 放到 timeouts 队列中
    timeouts.add(timeout);
    return timeout;
}

这里的操作很简单:实例化 Timeout,然后放到任务队列中。需要注意的是,任务队列是 MPSC(Multiple Producer Single Consumer)队列, 刚好适用于这里的多生产线程,单消费线程的场景(这个队列是 JCTools 提供的一个并发数据结构)

Worker 线程工作原理:

代码语言:javascript复制
private final class Worker implements Runnable {

    private final Set<Timeout> unprocessedTimeouts = new HashSet<Timeout>();

    // tick 过的次数,时针每 100ms tick 一次
    private long tick;

    @Override
    public void run() {
        // 在 HashedWheelTimer 中,用的都是相对时间,所以需要启动时间作为基准,并且要用 volatile 修饰
        // 这里跟 newTimeout 中 start 的部分结合理解一下, start 希望工作线程真正启动之后, 并赋值完 startTime 才返回, 因为后面的逻辑需要一个正确的 startTime
        // 所以在 start 方法中使用  while(startTime ==0 )  startTimeInitialized.awit 这样做保证
        startTime = System.nanoTime();
        if (startTime == 0) {
            startTime = 1;
        }

        // 唤醒工作线程启动的时候提交的任务(在提交任务的start里面 await 的)
        startTimeInitialized.countDown();

        // 执行逻辑的地方
        do {
            // 等待并获取下个 tick 相对于开始时间的纳秒数
            final long deadline = waitForNextTick();

            if (deadline > 0) {
                // 该次 tick,bucket 数组对应的 index
                int idx = (int) (tick & mask);

                // 处理一下已经取消的任务
                processCancelledTasks();

                // 取得当前 bucket
                HashedWheelBucket bucket = wheel[idx];

                // 将队列中所有的任务转移到相应的 buckets 中
                transferTimeoutsToBuckets();

                // 执行进入到这个 bucket 中的任务
                bucket.expireTimeouts(deadline);

                tick  ;
            }
        } while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);

        /* 到这里,说明这个 timer 要关闭了,做一些清理工作 */

        // 将所有 bucket 中没有执行的任务,添加到 unprocessedTimeouts 这个 HashSet 中,
        // 主要目的是用于 stop() 方法返回
        for (HashedWheelBucket bucket: wheel) {
            bucket.clearTimeouts(unprocessedTimeouts);
        }
        // 将任务队列中的任务也添加到 unprocessedTimeouts 中
        for (;;) {
            HashedWheelTimeout timeout = timeouts.poll();
            if (timeout == null) {
                break;
            }
            if (!timeout.isCancelled()) {
                unprocessedTimeouts.add(timeout);
            }
        }
        processCancelledTasks();
    }

在细看一下 run 流程中几个重要的方法, waitForNextTick 在每个 tick 到期返回:

代码语言:javascript复制
/**
 * 前面说过,这里用的都是相对时间,所以:
 *   第一次进来的时候,工作线程会在 100ms 的时候返回,返回值是 100*10^6 (纳秒)
 *   第二次进来的时候,工作线程会在 200ms 的时候返回,依次类推
 * 另外就是注意极端情况,比如第二次进来的时候,由于被前面的任务阻塞,导致进来的时候就已经是 250ms,
 *   那么,一进入这个方法就要立即返回,返回值是 250ms,而不是 200ms
 */
private long waitForNextTick() {
    // 下一个 tick 的毫秒数
    long deadline = tickDuration * (tick   1);

    // 嵌套在一个死循环里面
    for (;;) {
        final long currentTime = System.nanoTime() - startTime;
        long sleepTimeMs = (deadline - currentTime   999999) / 1000000;

        if (sleepTimeMs <= 0) {
        // 这里可能是 HashedWheelTimer 启动到现在经过了太久,  currentTime 已经溢出了
            if (currentTime == Long.MIN_VALUE) {
                return -Long.MAX_VALUE;
            } else {
                // 这里是出口,所以返回值是当前时间(相对时间)
                return currentTime;
            }
        }

        // Check if we run on windows, as if thats the case we will need
        // to round the sleepTime as workaround for a bug that only affect
        // the JVM if it runs on windows.
        //
        // See https://github.com/netty/netty/issues/356
        if (PlatformDependent.isWindows()) {
            sleepTimeMs = sleepTimeMs / 10 * 10;
        }

        try {
            Thread.sleep(sleepTimeMs);
        } catch (InterruptedException ignored) {
            // 如果 timer 已经 shutdown,那么返回 Long.MIN_VALUE
            if (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN) {
                return Long.MIN_VALUE;
            }
        }
    }
}

接着看看转移队列到 bucket 中的方法 transferTimeoutsToBuckets:

代码语言:javascript复制
private void transferTimeoutsToBuckets() {
    // 这里一个 for 循环,还特地限制了 10 万次, 怕一直往队列里面仍任务, netty 的源码还是很细节的
    for (int i = 0; i < 100000; i  ) {
        HashedWheelTimeout timeout = timeouts.poll();
        if (timeout == null) {
            // 没有任务了
            break;
        }
        if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) {
            // 该任务刚刚被取消了(在transfer之前其实已经做过一次了)
            continue;
        }

        
        // 计算还需要等待几个 tick
        long calculated = timeout.deadline / tickDuration;
        // 计算出轮次
        timeout.remainingRounds = (calculated - tick) / wheel.length;
        
        final long ticks = Math.max(calculated, tick); // Ensure we don't schedule for past.
        int stopIndex = (int) (ticks & mask);

        HashedWheelBucket bucket = wheel[stopIndex];
        // 单个 bucket,是由 HashedWheelTimeout 实例组成的一个链表,
        // 放入 bucket 链表
        bucket.addTimeout(timeout);
    }
}

bucket 中任务的执行:

代码语言:javascript复制
/**
 * 这里会执行这个 bucket 中,轮次为 0 的任务,也就是到期的任务。
 * 这个方法的入参 deadline 其实没什么用,因为轮次为 0 的都是应该被执行的。
 */
public void expireTimeouts(long deadline) {
    HashedWheelTimeout timeout = head;

    // 处理链表上的所有 timeout 实例
    while (timeout != null) {
        HashedWheelTimeout next = timeout.next;
        if (timeout.remainingRounds <= 0) {
            next = remove(timeout);
            if (timeout.deadline <= deadline) {
                // 这行代码负责执行具体的任务
                timeout.expire();
            } else {
                // 这里的代码注释也说,不可能进入到这个分支
                // The timeout was placed into a wrong slot. This should never happen.
                throw new IllegalStateException(String.format(
                        "timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline));
            }
        } else if (timeout.isCancelled()) {
            next = remove(timeout);
        } else {
            // 轮次减 1
            timeout.remainingRounds --;
        }
        timeout = next;
    }
}

总的来说 HashedWheelTimer 工作原理如下:

参考资料

  • 延时消息之时间轮
  • kafka 时间轮设计
  • HashedWheelTimer 源码分析

0 人点赞