自定义线程池

2022-01-10 16:48:03 浏览数 (1)

ThreadPoolExecutor

代码语言:javascript复制
public class ThreadPoll {
    private BlockedQueue<Runnable> blockedQueue;
    //堵塞队列最大数量
    private int capacity;
    //核心线程的最大数量
    private int coreSize;
    private TimeUnit unit;
    //任务集合
    private Set<Work> works = new HashSet<>();
    //超时时间
    private long timeout;
    public ThreadPoll(int capacity, long timeout, int coreSize, TimeUnit unit){
        this.timeout = timeout;
        this.coreSize = coreSize;
        this.blockedQueue = new BlockedQueue<>(timeout, capacity, unit);
    }
    public void exec(Runnable task){
        synchronized (works) {
            if(works.size() < coreSize){
                Work work = new Work(task);
                works.add(work);
                work.start();
            }else{
                blockedQueue.put(task);
            }
        }
    }
    @Slf4j
    class Work extends Thread{
        private Runnable task;
        public Work(Runnable task){
            this.task = task;
        }
        @Override
        public void run() {
            while(task == null || (task = blockedQueue.take()) == null){
                log.debug("我再执行中");
                task.run();
            }
        }
    }
    static class BlockedQueue<T>{
        private Deque<T> queue = new LinkedList<>();
        private ReentrantLock lock = new ReentrantLock();
        private Condition consumerSet = lock.newCondition();
        private Condition producerSet = lock.newCondition();
        private long timeout;
        private int capacity;
        private TimeUnit unit;
        public BlockedQueue(long timeout, int capacity, TimeUnit unit){
            this.timeout = timeout;
            this.capacity = capacity;
            this.unit = unit;
        }
        public T take(){
            lock.lock();
            try{
                while(queue.isEmpty()){
                    try {
                        consumerSet.await();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                producerSet.signal();
                return queue.removeFirst();
            }finally {
                lock.unlock();
            }
        }
        public void put(T t){
            lock.lock();
            try{
                while(queue.size() == capacity){
                    try {
                        producerSet.await();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                queue.add(t);
                consumerSet.signal();
            }finally {
                lock.unlock();
            }
        }
        public Object poll(long timeout){
            lock.lock();
            try{
                long nanos = unit.toNanos(timeout);
                while(queue.isEmpty()){
                    try {
                        nanos = consumerSet.awaitNanos(nanos);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                producerSet.signal();
                return queue.removeFirst();
            }finally {
                lock.unlock();
            }
        }

    }
}

本站文章除注明转载/出处外,均为本站原创或翻译,转载前请务必署名,转载请标明出处 最后编辑时间为: 2021/11/24 15:58

0 人点赞