Netty时间轮延时任务

2022-07-25 16:28:57 浏览数 (1)

  • HashedWheelTimer概念

这个类用来计划执行非精准的I/O超时。可以通过指定每一格的时间间隔来改变执行时间的精确度。在大多数网络应用中,I/O超时不需要十分准确,因此,默认的时间间隔是100 毫秒,这个值适用于大多数场合。HashedWheelTimer内部结构可以看做是个车轮,简单来说,就是TimerTask的hashTable的车轮。车轮的size默认是512,可以通过构造函数自己设置这个值。注意,当HashedWheelTimer被实例化启动后,会创建一个新的线程,因此,你的项目里应该只创建它的唯一一个实例。

  • 参数解释

首先创建时间轮,因为项目中只能出现一个实例所以直接用final修饰;

代码语言:javascript复制
public  final HashedWheelTimer timer_wheel = new HashedWheelTimer(1L, TimeUnit.SECONDS, 60);

参数解释:

代码语言:javascript复制
/**long tickDuration:滴答时间,刻度之间的持续时间;
    TimeUnit unit: 滴答时间单位
    int ticksPerWheel:  多少个刻度一圈
*/
public HashedWheelTimer(long tickDuration, TimeUnit unit, int ticksPerWheel) {
    this(Executors.defaultThreadFactory(), tickDuration, unit, ticksPerWheel);
}

理解起来就像钟表,tickDuration 1s滴答一下,ticksPerWheel 刻度盘为60 ;

连起来就是创建一个时间论,一秒滴答一次,刻度盘为60,也就是60S 后重新开始

代码语言:javascript复制
/**
TimerTask task:延时执行的任务,需要实现接口TimerTask
long delay:延时时间的时间
TimeUnit unit:延时的单位
*/
timer_wheel.newTimeout(TimerTask task, long delay, TimeUnit unit);

代码语言:java复制

//设置10S后执行myTimerTask
Object object = new Object;
MyDelayTask myDelayTask = new MyDelayTask(object);
timer_wheel.newTimeout(myTimerTask, 10, TimeUnit.SECONDS);
代码语言:javascript复制
//实现TimerTask
 class MyDelayTask implements TimerTask {
    private Object object;

    public MyDelayTask(Object object) {
        this.object = object;
    }

    @Override
    public void run(Timeout timeout) {
    //使用异步线程
        CompletableFuture.runAsync(() -> {
            //Do SomeThing
        }, executorService);

    }

}

附带上线程池的类

代码语言:javascript复制
import cn.hutool.extra.spring.SpringUtil;
import cn.hutool.log.Log;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;

import java.util.concurrent.*;


@Import({SpringUtil.class})
@Configuration
@Slf4j
public class ThreadPoolConfig {
   

    @Bean(value = "executorService")
    public ThreadPoolExecutor threadPoolExecutor() {
        int nThreads = Runtime.getRuntime().availableProcessors();
        ThreadPoolExecutor executor = new TraceThreadPoolExecutor(
                nThreads,
                nThreads * 2   1,
                30L,
                TimeUnit.SECONDS,
                new LinkedBlockingQueue(100000),
                Executors.defaultThreadFactory(),
                new BusinessAbortPolicy()) {
            @Override
            protected void beforeExecute(Thread t, Runnable r) {
                log.info("Thread ready to execute:{}", t.getName());
            }

            @Override
            protected void afterExecute(Runnable r, Throwable t) {
                if (t == null) {
                    log.info("Thread execution complete:{}", new Thread(r).getName());
                } else {
                    log.error("Thread execution exception{}--->{}", new Thread(r).getName(), t.getMessage());
                }
            }

            @Override
            protected void terminated() {
                log.info("Thread pool exit");
            }
        };
        return executor;
    }
}

class BusinessAbortPolicy implements RejectedExecutionHandler {
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        Log log = Log.get();
        String message = "任务 "   r.toString()   " 被 "   executor.toString()   "拒绝!!";
        log.error("The_thread_pool_is_full and cannot continue processing tasks>>>>{}", message);
    }
}

class TraceThreadPoolExecutor extends ThreadPoolExecutor {

    public TraceThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,
                                   RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
    }

    @Override
    public void execute(Runnable command) {
        super.execute(wrap(command, clientTrace(), Thread.currentThread().getName()));
    }

    @Override
    public Future<?> submit(Runnable task) {
        return super.submit(wrap(task, clientTrace(), Thread.currentThread().getName()));
    }

    private Exception clientTrace() {
        return new Exception("============Client_thread_exception=================");
    }

    private Runnable wrap(final Runnable task, final Exception clientStack, String clientThreadName) {
        return new Runnable() {
            @Override
            public void run() {
                try {
                    task.run();
                } catch (Exception ex) {
                    clientStack.printStackTrace();
                    throw ex;
                }
            }
        };
    }

}

当然HashedWheelTimer这个类属于全内存任务计算,通常在我们真正的业务中,是不会把这些任务直接放到jvm内存中的,要不然重启之后任务不都会消失了么,这样我们需要重写HashedWheelTimer,只需要对它任务的添加和获取进行重写到相应的持久化中间件中即可;

代码语言:javascript复制
@Slf4j
@Component
public class DelayTimeTask {
    @Autowired
    private RaceMatchPeopleService raceMatchPeopleService;

    @Autowired
    private ExecutorService executorService;

    public void initTask(MatchTeamRedis matchTeamRedis, int timeWheelTime) {
        MatchDelayTask matchTimerTask = new MatchDelayTask(matchTeamRedis);
        MatchConfig.TIMER_WHEEL.newTimeout(matchTimerTask, timeWheelTime, TimeUnit.SECONDS);
    }

    private class MatchDelayTask implements TimerTask {
        private MatchTeamRedis matchTeamRedis;

        public MatchDelayTask(MatchTeamRedis matchTeamRedis) {
            this.matchTeamRedis = matchTeamRedis;
        }

        @Override
        public void run(Timeout timeout) {
            CompletableFuture.runAsync(() -> {
                raceMatchPeopleService.matchPeople(this.matchTeamRedis, Boolean.TRUE);
            }, executorService);

        }

    }
}

0 人点赞