Java之线程池源码浅析

2022-03-04 10:26:47 浏览数 (2)

一、前言

线程是稀缺资源,如果被无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,合理的使用线程池对线程进行统一分配、调优和监控,有以下好处:重用存在的线程、可有效控制最大并发线程数

二、怎么做

线程池简单用法如下

代码语言:javascript复制
ExecutorService executorService = new ThreadPoolExecutor(3, 5,
        60L, TimeUnit.SECONDS,
        new ArrayBlockingQueue<>(10));
for (int i = 0; i < 10; i  ) {
    executorService.execute(new Runnable() {
        @Override
        public void run() {
            System.out.println("thread id is: "   Thread.currentThread().getId());
        }
    });
}
executorService.shutdown();

一,ThreadPoolExecutor

代码语言:javascript复制
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         Executors.defaultThreadFactory(), defaultHandler);
}

corePoolSize:线程池中的核心线程数,核心线程会一直存活,即使没有任务需要执行。

maximumPoolSize: 线程池中允许的最大线程数 核心线程数 临时线程数

keepAliveTime:临时线程空闲时的存活时间

workQueue:保存等待被执行的任务的阻塞队列,且任务必须实现Runable接口。

我们直接看executorService.execute方法。

代码语言:javascript复制
public void execute(Runnable command) {
    if (command == null) {
        throw new NullPointerException();
    } else {
        int c = this.ctl.get();//获取当前线程状态相关值
        if (workerCountOf(c) < this.corePoolSize) {  //1
            if (this.addWorker(command, true)) {
                return;
            }

            c = this.ctl.get();
        }

        if (isRunning(c) && this.workQueue.offer(command)) { //2
            int recheck = this.ctl.get();
            if (!isRunning(recheck) && this.remove(command)) {//3
                this.reject(command);
            } else if (workerCountOf(recheck) == 0) {//4
                this.addWorker((Runnable)null, false);
            }
        } else if (!this.addWorker(command, false)) {//5
            this.reject(command);
        }

    }
}

1.workerCountOf根据ctl的低29位,得到线程池的当前线程数,如果当前线程数小于核心线程数,那就直接添加任务addWork

2.如果上面addWork失败,则继续。先判断当前线程池处于RUNNING状态,同时把提交的任务成功放入阻塞队列workQueue

中,添加失败offer返回false,否则执行reject方法处理任务。

3.再次判断判断当前线程池状态,如果线程池没有RUNNING,且成功从阻塞队列中删除任务,则执行reject方法处理任务;

4.如果当前线程数等于0的极端情况下,可能出现一个任务刚被插入队列的同时,所有的线程都结束任务然后被销毁了,则添加一个非核心线程。

5.如果workQueue队列已满,尝试创建非核心线程处理任务.

这里面主要有两个行为,一个就是this.workQueue.offer(command) 添加任务队列,另一个就是this.addWorker(command,true)

来看下addWork方法源码

代码语言:javascript复制
private boolean addWorker(Runnable firstTask, boolean core) {
    int c = this.ctl.get();

    label247:
    //当前线程池不是SHUTDOWN或者不是STOP状态时,且firstTask==null workQueue不是空,则继续进行
    while(!runStateAtLeast(c, SHUTDOWN) || !runStateAtLeast(c, STOP) && firstTask == null 
    && !this.workQueue.isEmpty()) { 
        //2检查线程总数是否超过容量。
        while(workerCountOf(c) <((core ? this.corePoolSize:this.maximumPoolSize)&COUNT_MASK)) {
            if (this.compareAndIncrementWorkerCount(c)) {// 线程数加1
                boolean workerStarted = false;
                boolean workerAdded = false;
                ThreadPoolExecutor.Worker w = null;

                try {
                    //开始创建新的线程,添加任务firstTask
                    w = new ThreadPoolExecutor.Worker(firstTask);
                    Thread t = w.thread;
                    if (t != null) {
                        //开始加锁 做到了在最小的范围内加锁,尽量减少锁竞争
                        ReentrantLock mainLock = this.mainLock;
                        mainLock.lock();

                        try {
                            int c = this.ctl.get();
                            ///检查线程状态,只有当线程池处于RUNNING,或者处于SHUTDOWN并且firstTask==null的时候,
                            
                            if (isRunning(c) || runStateLessThan(c, STOP) && firstTask == null) {
                                if (t.getState() != State.NEW) {
                                    throw new IllegalThreadStateException();
                                }
                                //workers是一个HashSet,添加我们新增的Worker
                                this.workers.add(w);
                                workerAdded = true;
                                //每次增加worker的时候,都会判断当前workers.size()是否大于largestPoolSize,
                                //如果大于,则将当前最大值赋予largestPoolSize.
                                int s = this.workers.size();
                                if (s > this.largestPoolSize) {
                                    //记录workers历史以来的最大值,
                                    this.largestPoolSize = s;
                                }
                            }
                        } finally {
                            mainLock.unlock();
                        }

                        if (workerAdded) {
                            t.start();//开始工作
                            workerStarted = true;
                        }
                    }
                } finally {
                    if (!workerStarted) {
                        this.addWorkerFailed(w);
                    }

                }

                return workerStarted;
            }

            c = this.ctl.get();
            if (runStateAtLeast(c, 0)) {
                continue label247;
            }
        }

        return false;
    }

    return false;
}

关于线程池的状态,有5种,

  • RUNNING: 接收新的任务,并能继续处理 workQueue 中的任务
  • SHUTDOWN: 不再接收新的任务,不过能继续处理 workQueue 中的任务
  • STOP: 不再接收新的任务,也不再处理 workQueue 中的任务,并且会中断正在处理任务的线程
  • TIDYING: 所有的任务都完结了,并且线程数量(workCount)为 0 时即为此状态,进入此状态后会调用 terminated() 这个钩子方法进入 TERMINATED 状态
  • TERMINATED: 调用 terminated() 方法后即为此状态

addwork过程是,检查线程池状态和线程总数是否符合条件,符合的话就创建新的Worker,添加任务firstTask,并把Worker

添加到workers hashset中,最后开始启动Worker中的线程t.start(),源码中 this.thread = getThreadFactory().newThread(this);this是当前Runnable,那this.thread里面Runnable就是当前Worker,执行t.start()

就是执行当前Worker中run方法

Worker 继承AbstractQueuedSynchronizer 实现Runnable

代码语言:javascript复制
public void run() {
    ThreadPoolExecutor.this.runWorker(this);
}

代码语言:javascript复制
final void runWorker(ThreadPoolExecutor.Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;//获取当前线程任务Runnable
    w.firstTask = null;
    w.unlock();
    boolean completedAbruptly = true;

    try {
        while(task != null || (task = this.getTask()) != null) {
            //当前线程枷锁
            w.lock();
          //如果(线程池的状态>=STOP或者(线程已中断并且线程状态>=STOP))并且当前线程没有被中断。
          // 两种情况:
          //1)如果当前线程池的状态是>=Stop的,并且当前线程没有被中断,那么就要执行中断。
          //2)或者当前线程目前是已中断的状态并且线程池的状态也是>=Stop的(注意Thread.interrupted是会擦除中断标识符的),
          //那么因为中断标识符已经被擦除了,那么!wt.isInterrupted()一定返回true,这个时候还是要将当前线程中断。
          //第二次执行runStateAtLeast(ctl.get(), STOP)相当于一个二次检查。
            if ((runStateAtLeast(this.ctl.get(), STOP) || Thread.interrupted() && 
            runStateAtLeast(this.ctl.get(), STOP)) && !wt.isInterrupted()) {
                wt.interrupt();//中断当前线程
            }

            try {
                this.beforeExecute(wt, task);//前置操作,空方法,可以业务自己实现

                try {
                    task.run();//执行run,也就是下面方法
                    //new Runnable() {
                    //   @Override
                    //  public void run() {
                    //      System.out.println("thread id is: "   Thread.currentThread().getId());
                    //   }
                    //}
                    this.afterExecute(task, (Throwable)null);//后置操作,空方法,可以业务自己实现
                } catch (Throwable var14) {
                    this.afterExecute(task, var14);
                    throw var14;
                }
            } finally {
                task = null;//最后将task置为null
                  w.completedTasks;//已完成的任务计数器 1
                w.unlock();//释放当前线程的独占锁
            }
        }

        completedAbruptly = false;
    } finally {
        this.processWorkerExit(w, completedAbruptly);
    }

}

runWorker是开启了一个线程,就一直循环执行getTask,知道task==null才结束。

来看下getTask源码

代码语言:javascript复制
private Runnable getTask() {
    boolean timedOut = false;

    while(true) {
        int c = this.ctl.get();
        // 如果当前状态是>=SHOTDOWN状态&&(运行状态是STOP或者队列是空的).
        // 1)如果线程池的状态是>=STOP状态,这个时候不再处理队列中的任务,并且减少worker记录数量,
        //返回的任务为null,这个时候在runRWorker方法中会执行processWorkerExit进行worker的退出操作.
        // 2)如果线程池的状态是>=SHUTDOWN并且workQueue为空,就说明处于SHOTdown以上的状态下,
        //且没有任务在等待,那么也属于获取不到任务,getTask返回null.
        if (runStateAtLeast(c, SHUTDOWN) && (runStateAtLeast(c, STOP) || this.workQueue.isEmpty())) {
            this.decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);
        //如果允许超时切线程数大于核心线程容量,则开启超时机制timed=true
        boolean timed = this.allowCoreThreadTimeOut || wc > this.corePoolSize;
        //如果当前线程数小于等于最大线程容量 且 不允许超时或者当前没有超时 或者当前线程数小于等于1 且当前任务队列不是空
        if (wc <= this.maximumPoolSize && (!timed || !timedOut) || wc <= 1 && !this.workQueue.isEmpty()) {
            try {
                //判断是否允许超时,允许超时用poll设置超时时间,不允许就使用take依赖超时机制
                Runnable r = timed ? (Runnable)this.workQueue.poll(this.keepAliveTime, TimeUnit.NANOSECONDS) 
                : (Runnable)this.workQueue.take();
                if (r != null) {
                    return r;
                }

                timedOut = true;
            } catch (InterruptedException var6) {
                timedOut = false;
            }
        } else if (this.compareAndDecrementWorkerCount(c)) {
            return null;
        }
    }
}

getTask主要就是从workQueue队列中不断的取Runnable任务。

继续看processWorkerExit方法源码

代码语言:javascript复制
private void processWorkerExit(ThreadPoolExecutor.Worker w, boolean completedAbruptly) {
    if (completedAbruptly) {
        //任务完成,将工作线程数量-1
        this.decrementWorkerCount();
    }

    ReentrantLock mainLock = this.mainLock;
    mainLock.lock();

    //
    try {
        this.completedTaskCount  = w.completedTasks;
        //从worker中移除任务
        this.workers.remove(w);
    } finally {
        mainLock.unlock();
    }
    //尝试关闭线程池,但如果是正常运行状态,就不会关闭
    this.tryTerminate();
    int c = this.ctl.get();
    if (runStateLessThan(c, 536870912)) {
        if (!completedAbruptly) {
           //如果允许核心线程超时并且当前队列里面还有任务,那就必须留一个线程.
            int min = this.allowCoreThreadTimeOut ? 0 : this.corePoolSize;
            if (min == 0 && !this.workQueue.isEmpty()) {
                min = 1;
            }

            if (workerCountOf(c) >= min) {
                return;
            }
        }
        
        this.addWorker((Runnable)null, false);
    }

}

三、问题

1.如何复用线程?

回顾代码,好像也没有直接体现复用线程池的,Worker每次都是new。

我们先看下executorService.execute代码,里面有这样的逻辑,

如果当前线程数小于核心线程数,则执行this.addWorker(command,true)操作,这个方法主要就是创建Worker对象,并且运行Worker里面thread直接start,开启线程,执行firstTask任务。假如核心线程数是4,那就开启4个线程。

如果当前线程大于核心线程数,就走this.workQueue.offer(command)这一步,把任务添加到workQueue队列中。

我们看下runWorker方法,这个就是执行线程任务的主要方法,看下while(task !=null||(task =this.getTask())!=null)这个条件,当我们调用this.addWorker(command,true)方法是,task就是command 不等于null,那就执行内容,执行完成后finally{ task =null; w.completedTasks;w.unlock();//}task=null ,那任务就应该完成,线程就要销毁。

但是我们还要注意while条件中有个getTask方法,里面就是从阻塞队列中获取Runnable任务,也就是workQueue取Runnable。workQueue有内容,则在当前Worker线程中执行Runnable,没有内容的话,就阻塞。此时线程就不会销毁。

2.如何并发?

线程池是如何做到高效并发的。

看整个线程池的工作流程,有以下几个需要特别关注的并发点.

线程池状态和工作线程数量的变更。这个由一个AtomicInteger变量 ctl来解决原子性问题。

向工作Worker容器workers中添加新的Worker的时候线程池加锁。

执行具体任务的时候,线程枷锁。

工作线程Worker从等待队列中取任务的时候。这个由工作队列本身来保证线程安全,比如LinkedBlockingQueue等。

3.非核心线程什么时候销毁?

代码语言:javascript复制
boolean timed = this.allowCoreThreadTimeOut || wc > this.corePoolSize;
if (wc <= this.maximumPoolSize && (!timed || !timedOut) || wc <= 1 && !this.workQueue.isEmpty()) {
    try {
        Runnable r = timed ? (Runnable)this.workQueue.poll(this.keepAliveTime, TimeUnit.NANOSECONDS) : (Runnable)this.workQueue.take();
        if (r != null) {
            return r;
        }

        timedOut = true;
    } catch (InterruptedException var6) {
        timedOut = false;
    }
}

看下这段代码,当wc >this.corePoolSize时,应该有产生非核心线程了,timed=true,Runnable r=(Runnable)this.workQueue.poll(this.keepAliveTime, TimeUnit.NANOSECONDS),在队列超过keepAliveTime时长后,返回null,直接跳出 while(task !=null||(task =this.getTask())!=null)代码,走processWorkerExit函数。

0 人点赞