Netty中的时间轮(v3.10.7)

2022-06-02 14:04:19 浏览数 (1)

在上一篇Netty中的时间轮(v3.2.5)中,讲解的版本是v3.2.5,它在MAVEN仓库中是可以找到的.这篇文章讲解的是3.x系列中目前最高的版本v3.10.7,它在MAVEN仓库中不存在,这个版本只在Netty源码中可以找到.讲解这个v3.10.7版本的目的是要和v3.2.5版本做个对比,看它们各自在时间轮上的实现差异.

在时间轮创建的底层,v3.10.7使用的是普通对象数组,而v3.2.5使用的是集合数组.

代码语言:javascript复制
// v3.2.5版本中的时间轮底层存放任务的结构
final Set<HashedWheelTimeout>[] wheel;
// v3.10.7版本中的时间轮底层存放任务的结构
private final HashedWheelBucket[] wheel;      //Bucket表示桶的意思

在v3.2.5版本中,当提交一个任务的时候,任务是直接放入到时间轮上面去的.每个格子都指向一个HashMap,HashMap中存放着提交的任务.如下图

在v3.10.7版本中,当提交一个任务的时候,并不是立刻就放到时间轮上面,而是先放到一个队列中,之后每次执行任务的时候,从队列中最多取出100000个任务放到时间轮上.

接下来从源码的角度看下v3.10.7版本的实现.代码做了删减,只体现重点.

代码语言:javascript复制
public HashedWheelTimer(
            ThreadFactory threadFactory,
            ThreadNameDeterminer determiner,
            long tickDuration, TimeUnit unit, int ticksPerWheel) {

  // 创建时间轮底层存储任务的数据结构
  wheel = createWheel(ticksPerWheel);
  mask = wheel.length - 1;

  // 每个格子的时间
  this.tickDuration = unit.toNanos(tickDuration);
  
  // 时间轮处理任务的线程
  workerThread = threadFactory.newThread(new ThreadRenamingRunnable(
          worker, "Hashed wheel timer #"   id.incrementAndGet(),
          determiner));

}
// 时间轮
private final HashedWheelBucket[] wheel;
// 存放任务的队列
private final Queue<HashedWheelTimeout> timeouts = new ConcurrentLinkedQueue<HashedWheelTimeout>();

在构造方法中,会创建时间轮,它的底层就是一个对象数组.有一个timeouts的队列,用于存储外界提交的任务.

代码语言:javascript复制
private static final class HashedWheelBucket {

    // 时间轮上每个格子都是通过链表的方式,将每个任务'链'起来
        private HashedWheelTimeout head;
        private HashedWheelTimeout tail;
    
    ...
}

外界提交任务的时候,代码如下

代码语言:javascript复制
public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {

  long deadline = System.nanoTime()   unit.toNanos(delay) - startTime;
  HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
  // 将任务放入到队列中,并没有一开始就放到时间轮上
  timeouts.add(timeout);
  return timeout;
}

时间轮执行任务,代码如下

代码语言:javascript复制
public void run() {
  
  startTime = System.nanoTime();

  do {
    final long deadline = waitForNextTick();
    if (deadline > 0) {
      // 将队列中的任务最多取100000放到时间轮上
      transferTimeoutsToBuckets();
      HashedWheelBucket bucket = wheel[(int) (tick & mask)];
      // 执行时间轮上当前格子上的任务
      bucket.expireTimeouts(deadline);
      tick  ;
    }
  } while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);

}

上一篇文章介绍过关于startTime,deadline的含义.这里再说一下

当时间轮启动的时候,虽然startTime = System.nanoTime(). 其实我们可以换个角度,在时间轮的世界里,startTime=0,因为世界才刚刚开始启动.

当一个任务要延时delay执行的时候,它在时间轮的世界里就已经被固定好位置了.随着时间轮的'转动',当时间'走'到相应的位置,就会执行符合条件的任务.

再看下任务提交时的关于deadline的概念

代码语言:javascript复制
public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {

  // 每个任务的deadline都是相对时间,相对startTime而言的
  long deadline = System.nanoTime()   unit.toNanos(delay) - startTime;
  HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
  timeouts.add(timeout);
  return timeout;
}

long deadline = System.nanoTime() unit.toNanos(delay) - startTime 这行代码,它使用当前时间减去startTime时间,再加上延时时间delay. 表示的含义就是任务相对于起点(startTime那个时刻点)的位置.

关于时间轮,大家可以想象下卷尺.

时间轮在执行任务的时候,上面的代码有个waitForNextTick方法

代码语言:javascript复制
private long waitForNextTick() {
  long deadline = tickDuration * (tick   1);

  for (;;) {
    final long currentTime = System.nanoTime() - startTime;
    // 当当前时间等于deadline的时候,就会跳出循环
    long sleepTimeMs = (deadline - currentTime   999999) / 1000000;

    if (sleepTimeMs <= 0) {
      if (currentTime == Long.MIN_VALUE) {
        return -Long.MAX_VALUE;
      } else {
        return currentTime;
      }
    }

    try {
      Thread.sleep(sleepTimeMs);
    } catch (InterruptedException e) {
    }
  }
}

用图形表示下上面这段代码的含义

只有当前时间currentTime等于deadline的时候,才会跳出循环.

跳出循环之后,接下来就会从任务队列中取出任务放到时间轮上.

代码语言:javascript复制
private void transferTimeoutsToBuckets() {
  for (int i = 0; i < 100000; i  ) {
      // 从队列中取出任务
    HashedWheelTimeout timeout = timeouts.poll();
    if (timeout == null) {
      break;
    }
    if (timeout.state() == HashedWheelTimeout.ST_CANCELLED
        || !timeout.compareAndSetState(HashedWheelTimeout.ST_INIT, HashedWheelTimeout.ST_IN_BUCKET)) {
      timeout.remove();
      continue;
    }
    long calculated = timeout.deadline / tickDuration;
    long remainingRounds = (calculated - tick) / wheel.length;
    timeout.remainingRounds = remainingRounds;

    final long ticks = Math.max(calculated, tick); // Ensure we don't schedule for past.
    int stopIndex = (int) (ticks & mask);
    
    // 将任务放到时间轮的当前格子中
    HashedWheelBucket bucket = wheel[stopIndex];
    bucket.addTimeout(timeout);
  }
}

最多取出100000个任务放到时间轮上.

接下来就是执行过期的任务

代码语言:javascript复制
public void expireTimeouts(long deadline) {
  HashedWheelTimeout timeout = head;

  while (timeout != null) {
    boolean remove = false;
    if (timeout.remainingRounds <= 0) {
        // 如果时间已经到了,则执行任务
      if (timeout.deadline <= deadline) {
        timeout.expire();
      }
      remove = true;
    } else if (timeout.isCancelled()) {
      remove = true;
    } else {
      timeout.remainingRounds --;
    }
    HashedWheelTimeout next = timeout.next;
    if (remove) {
      remove(timeout);
    }
    timeout = next;
  }
}

会遍历每个格子中的任务,如果任务超期了,则执行它.

0 人点赞