聊聊PowerJob的HashedWheelTimer

2024-01-22 10:08:37 浏览数 (2)

本文主要研究一下PowerJob的HashedWheelTimer

Timer

tech/powerjob/server/common/timewheel/Timer.java

代码语言:javascript复制
public interface Timer {

    /**
     * 调度定时任务
     */
    TimerFuture schedule(TimerTask task, long delay, TimeUnit unit);

    /**
     * 停止所有调度任务
     */
    Set<TimerTask> stop();
}

Timer接口定义了schedule方法,用于在指定时间之后调度TimerTask,它返回TimerFuture;stop方法返回未处理的TimerTask

TimerTask

代码语言:javascript复制
@FunctionalInterface
public interface TimerTask extends Runnable {
}

TimerTask继承了Runnable

TimerFuture

tech/powerjob/server/common/timewheel/TimerFuture.java

代码语言:javascript复制
public interface TimerFuture {

    TimerTask getTask();

    boolean cancel();

    boolean isCancelled();

    boolean isDone();
}            

TimerFuture定于了getTask、cancel、isCancelled、isDone方法

HashedWheelTimer

tech/powerjob/server/common/timewheel/HashedWheelTimer.java

代码语言:javascript复制
@Slf4j
public class HashedWheelTimer implements Timer {

    private final long tickDuration;
    private final HashedWheelBucket[] wheel;
    private final int mask;

    private final Indicator indicator;

    private final long startTime;

    private final Queue<HashedWheelTimerFuture> waitingTasks = Queues.newLinkedBlockingQueue();
    private final Queue<HashedWheelTimerFuture> canceledTasks = Queues.newLinkedBlockingQueue();

    private final ExecutorService taskProcessPool;

    public HashedWheelTimer(long tickDuration, int ticksPerWheel) {
        this(tickDuration, ticksPerWheel, 0);
    }

    /**
     * 新建时间轮定时器
     * @param tickDuration 时间间隔,单位毫秒(ms)
     * @param ticksPerWheel 轮盘个数
     * @param processThreadNum 处理任务的线程个数,0代表不启用新线程(如果定时任务需要耗时操作,请启用线程池)
     */
    public HashedWheelTimer(long tickDuration, int ticksPerWheel, int processThreadNum) {

        this.tickDuration = tickDuration;

        // 初始化轮盘,大小格式化为2的N次,可以使用 & 代替取余
        int ticksNum = CommonUtils.formatSize(ticksPerWheel);
        wheel = new HashedWheelBucket[ticksNum];
        for (int i = 0; i < ticksNum; i  ) {
            wheel[i] = new HashedWheelBucket();
        }
        mask = wheel.length - 1;

        // 初始化执行线程池
        if (processThreadNum <= 0) {
            taskProcessPool = null;
        }else {
            ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("HashedWheelTimer-Executor-%d").build();
            // 这里需要调整一下队列大小
            BlockingQueue<Runnable> queue = Queues.newLinkedBlockingQueue(8192);
            int core = Math.max(Runtime.getRuntime().availableProcessors(), processThreadNum);
            // 基本都是 io 密集型任务
            taskProcessPool = new ThreadPoolExecutor(core, 2 * core,
                    60, TimeUnit.SECONDS,
                    queue, threadFactory, RejectedExecutionHandlerFactory.newCallerRun("PowerJobTimeWheelPool"));
        }

        startTime = System.currentTimeMillis();

        // 启动后台线程
        indicator = new Indicator();
        new Thread(indicator, "HashedWheelTimer-Indicator").start();
    }

    //......
}    

HashedWheelTimer实现了Timer接口,其构造器要求输入tickDuration、ticksPerWheel,它会将ticksPerWheel转换为2的N次,然后创建对应的HashedWheelBucket,若processThreadNum大于0则同时创建ThreadPoolExecutor用于处理任务,最后启动异步线程执行Indicator

schedule

代码语言:javascript复制
    @Override
    public TimerFuture schedule(TimerTask task, long delay, TimeUnit unit) {

        long targetTime = System.currentTimeMillis()   unit.toMillis(delay);
        HashedWheelTimerFuture timerFuture = new HashedWheelTimerFuture(task, targetTime);

        // 直接运行到期、过期任务
        if (delay <= 0) {
            runTask(timerFuture);
            return timerFuture;
        }

        // 写入阻塞队列,保证并发安全(性能进一步优化可以考虑 Netty 的 Multi-Producer-Single-Consumer队列)
        waitingTasks.add(timerFuture);
        return timerFuture;
    }

schedule方法先计算目标时间,然后创建对应的HashedWheelTimerFuture,若delay小于等于0则执行runTask,否则添加到waitingTasks

stop

代码语言:javascript复制
    @Override
    public Set<TimerTask> stop() {
        indicator.stop.set(true);
        taskProcessPool.shutdown();
        while (!taskProcessPool.isTerminated()) {
            try {
                Thread.sleep(100);
            }catch (Exception ignore) {
            }
        }
        return indicator.getUnprocessedTasks();
    }

stop方法先设置indicator的stop为true,然后执行taskProcessPool.shutdown(),等待关闭,最后返回indicator.getUnprocessedTasks()

HashedWheelBucket

代码语言:javascript复制
    private final class HashedWheelBucket extends LinkedList<HashedWheelTimerFuture> {

        public void expireTimerTasks(long currentTick) {

            removeIf(timerFuture -> {

                // processCanceledTasks 后外部操作取消任务会导致 BUCKET 中仍存在 CANCELED 任务的情况
                if (timerFuture.status == HashedWheelTimerFuture.CANCELED) {
                    return true;
                }

                if (timerFuture.status != HashedWheelTimerFuture.WAITING) {
                    log.warn("[HashedWheelTimer] impossible, please fix the bug");
                    return true;
                }

                // 本轮直接调度
                if (timerFuture.totalTicks <= currentTick) {

                    if (timerFuture.totalTicks < currentTick) {
                        log.warn("[HashedWheelTimer] timerFuture.totalTicks < currentTick, please fix the bug");
                    }

                    try {
                        // 提交执行
                        runTask(timerFuture);
                    }catch (Exception ignore) {
                    } finally {
                        timerFuture.status = HashedWheelTimerFuture.FINISHED;
                    }
                    return true;
                }

                return false;
            });

        }
    }

    private void runTask(HashedWheelTimerFuture timerFuture) {
        timerFuture.status = HashedWheelTimerFuture.RUNNING;
        if (taskProcessPool == null) {
            timerFuture.timerTask.run();
        }else {
            taskProcessPool.submit(timerFuture.timerTask);
        }
    }    

HashedWheelBucket继承了LinkedList,其泛型为HashedWheelTimerFuture,它提供了expireTimerTasks方法,通过removeIf删除status为CANCELED、status不为WAITING,以及执行runTask(注意这里忽略了异常)之后标记status为FINISHED的元素;runTask先标记为RUNNING,对于taskProcessPool为null则直接执行,否则提交到taskProcessPool

HashedWheelTimerFuture

tech/powerjob/server/common/timewheel/HashedWheelTimer.java

代码语言:javascript复制
    private final class HashedWheelTimerFuture implements TimerFuture {

        // 预期执行时间
        private final long targetTime;
        private final TimerTask timerTask;

        // 所属的时间格,用于快速删除该任务
        private HashedWheelBucket bucket;
        // 总圈数
        private long totalTicks;
        // 当前状态 0 - 初始化等待中,1 - 运行中,2 - 完成,3 - 已取消
        private int status;

        // 状态枚举值
        private static final int WAITING = 0;
        private static final int RUNNING = 1;
        private static final int FINISHED = 2;
        private static final int CANCELED = 3;

        public HashedWheelTimerFuture(TimerTask timerTask, long targetTime) {

            this.targetTime = targetTime;
            this.timerTask = timerTask;
            this.status = WAITING;
        }

        @Override
        public TimerTask getTask() {
            return timerTask;
        }

        @Override
        public boolean cancel() {
            if (status == WAITING) {
                status = CANCELED;
                canceledTasks.add(this);
                return true;
            }
            return false;
        }

        @Override
        public boolean isCancelled() {
            return status == CANCELED;
        }

        @Override
        public boolean isDone() {
            return status == FINISHED;
        }
    }

HashedWheelTimerFuture实现了TimerFuture接口,它定义了WAITING、RUNNING、FINISHED、CANCELED状态;初始状态为WAITING,对于WAITING状态的可以设置为CANCELED,并添加到canceledTasks;isCancelled判断状态是不是CANCELED,isDone判断状态是不是FINISHED

getUnprocessedTasks

代码语言:javascript复制
        public Set<TimerTask> getUnprocessedTasks() {
            try {
                latch.await();
            }catch (Exception ignore) {
            }

            Set<TimerTask> tasks = Sets.newHashSet();

            Consumer<HashedWheelTimerFuture> consumer = timerFuture -> {
                if (timerFuture.status == HashedWheelTimerFuture.WAITING) {
                    tasks.add(timerFuture.timerTask);
                }
            };

            waitingTasks.forEach(consumer);
            for (HashedWheelBucket bucket : wheel) {
                bucket.forEach(consumer);
            }
            return tasks;
        }

getUnprocessedTasks会等待Indicator的while循环结束,然后遍历所有的HashedWheelBucket找出状态还是WAITING的任务

Indicator

代码语言:javascript复制
    private class Indicator implements Runnable {

        private long tick = 0;

        private final AtomicBoolean stop = new AtomicBoolean(false);
        private final CountDownLatch latch = new CountDownLatch(1);

        @Override
        public void run() {

            while (!stop.get()) {

                // 1. 将任务从队列推入时间轮
                pushTaskToBucket();
                // 2. 处理取消的任务
                processCanceledTasks();
                // 3. 等待指针跳向下一刻
                tickTack();
                // 4. 执行定时任务
                int currentIndex = (int) (tick & mask);
                HashedWheelBucket bucket = wheel[currentIndex];
                bucket.expireTimerTasks(tick);

                tick   ;
            }
            latch.countDown();
        }

        /**
         * 模拟指针转动,当返回时指针已经转到了下一个刻度
         */
        private void tickTack() {

            // 下一次调度的绝对时间
            long nextTime = startTime   (tick   1) * tickDuration;
            long sleepTime = nextTime - System.currentTimeMillis();

            if (sleepTime > 0) {
                try {
                    Thread.sleep(sleepTime);
                }catch (Exception ignore) {
                }
            }
        }

        /**
         * 处理被取消的任务
         */
        private void processCanceledTasks() {
            while (true) {
                HashedWheelTimerFuture canceledTask = canceledTasks.poll();
                if (canceledTask == null) {
                    return;
                }
                // 从链表中删除该任务(bucket为null说明还没被正式推入时间格中,不需要处理)
                if (canceledTask.bucket != null) {
                    canceledTask.bucket.remove(canceledTask);
                }
            }
        }

        /**
         * 将队列中的任务推入时间轮中
         */
        private void pushTaskToBucket() {

            while (true) {
                HashedWheelTimerFuture timerTask = waitingTasks.poll();
                if (timerTask == null) {
                    return;
                }

                // 总共的偏移量
                long offset = timerTask.targetTime - startTime;
                // 总共需要走的指针步数
                timerTask.totalTicks = offset / tickDuration;
                // 取余计算 bucket index
                int index = (int) (timerTask.totalTicks & mask);
                HashedWheelBucket bucket = wheel[index];

                // TimerTask 维护 Bucket 引用,用于删除该任务
                timerTask.bucket = bucket;

                if (timerTask.status == HashedWheelTimerFuture.WAITING) {
                    bucket.add(timerTask);
                }
            }
        }

        public Set<TimerTask> getUnprocessedTasks() {
            try {
                latch.await();
            }catch (Exception ignore) {
            }

            Set<TimerTask> tasks = Sets.newHashSet();

            Consumer<HashedWheelTimerFuture> consumer = timerFuture -> {
                if (timerFuture.status == HashedWheelTimerFuture.WAITING) {
                    tasks.add(timerFuture.timerTask);
                }
            };

            waitingTasks.forEach(consumer);
            for (HashedWheelBucket bucket : wheel) {
                bucket.forEach(consumer);
            }
            return tasks;
        }
    }

Indicator实现了Runnable接口,其run方法在stop为false的时候循环执行,pushTaskToBucket、processCanceledTasks、tickTack、expireTimerTasks

pushTaskToBucket

代码语言:javascript复制
        private void pushTaskToBucket() {

            while (true) {
                HashedWheelTimerFuture timerTask = waitingTasks.poll();
                if (timerTask == null) {
                    return;
                }

                // 总共的偏移量
                long offset = timerTask.targetTime - startTime;
                // 总共需要走的指针步数
                timerTask.totalTicks = offset / tickDuration;
                // 取余计算 bucket index
                int index = (int) (timerTask.totalTicks & mask);
                HashedWheelBucket bucket = wheel[index];

                // TimerTask 维护 Bucket 引用,用于删除该任务
                timerTask.bucket = bucket;

                if (timerTask.status == HashedWheelTimerFuture.WAITING) {
                    bucket.add(timerTask);
                }
            }
        }

pushTaskToBucket通过waitingTasks.poll()拉取任务,若为null直接返回,否则通过timerTask.targetTime与startTime计算offset,再根据tickDuration计算需要走的步数,然后计算并获取目标HashedWheelBucket,然后将timerTask添加到bucket中

processCanceledTasks

代码语言:javascript复制
        private void processCanceledTasks() {
            while (true) {
                HashedWheelTimerFuture canceledTask = canceledTasks.poll();
                if (canceledTask == null) {
                    return;
                }
                // 从链表中删除该任务(bucket为null说明还没被正式推入时间格中,不需要处理)
                if (canceledTask.bucket != null) {
                    canceledTask.bucket.remove(canceledTask);
                }
            }
        }

processCanceledTasks会执行canceledTasks.poll()拉取canceledTask,若canceledTask.bucket不为null则将canceledTask从该bucket中移除

tickTack

代码语言:javascript复制
        private void tickTack() {

            // 下一次调度的绝对时间
            long nextTime = startTime   (tick   1) * tickDuration;
            long sleepTime = nextTime - System.currentTimeMillis();

            if (sleepTime > 0) {
                try {
                    Thread.sleep(sleepTime);
                }catch (Exception ignore) {
                }
            }
        }

tickTack模拟指针移动,它先计算nextTime,再计算需要sleep多久,然后执行Thread.sleep(sleepTime)

小结

PowerJob定义了Timer接口,并提供了HashedWheelTimer的实现,它定义了waitingTasks、canceledTasks两个LinkedBlockingQueue(无界队列),同时还支持定义任务处理线程池的core线程数;它通过Indicator线程来处理时间轮的转动及任务处理,Indicator循环将waitingTasks的任务放入到对应的bucket,然后模拟时间轮等待,然后通过bucket.expireTimerTasks(tick)处理到期任务,最后再递增tick。

0 人点赞