CH21-ThreadPoolExecutor

线程池的作用

  • 降低资源消耗:线程无限制的创建,使用完毕后消耗
  • 提高响应速度:无需频繁新建线程
  • 提高线程的可管理性

应用详解

  • 线程池即一个线程集合 workerSet 和一个阻塞队列 workQueue。

  • 当用户向线程池提交一个任务时,线程池会先将任务放入 workQueue 中。

  • workerSet 中的线程会不断的从 workQueue 中获取任务并执行。

  • 当 workQueue 中没有任务时,worker 则会阻塞,直到队列中有任务了再开始执行。

NAME

Executor 原理

当一个线程提交至线程池后:

  1. 线程池首先判断当前运行的线程数量是否少于与 corePoolSize。如果是则新建工作线程来执行任务,否则进入 2。
  2. 判断 BlockingQueue 是否已满,如果没满,则将任务放入 BlockingQueue,否则进入 3。
  3. 如果新建线程会使当前线程珊瑚粮超过 maximumPoolSize,则交给 RejectedExecutionHandler 来处理。

当 ThreadPoolExecutor 新建线程时,通过 CAS 来更新线程池的状态 ctl。

参数

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          RejectedExecutionHandler handler)
  • corePoolSize:核心线程数。
    • 当提交一个任务时,线程池新建线程来执行任务,直到线程数量等于 corePoolSize,即使存在空闲线程。
    • 如果当前线程数量等于 corePoolSize,提交的任务将会被保存到阻塞队列,等待执行。
    • 如果执行了线程池的 prestartAllCoreThreads 方法,线程池会提前创建并开启所有核心线程。
  • workQueue:用于保存需要被执行的任务,可选的队列类型有:
    • ArrayBlockingQueue:基于数组结构,按 FIFO 排序任务。
    • LinkedBlockingQueue:基于链表结构,按 FIFO 排序任务,吞吐量高于 ArrayBlockingQueue。
      • 比 ArrayBlockingQueue 在插入、删除元素时性能更优,但 put、take 时均需加锁。
    • SynchronousQueue:不存储元素的阻塞队列,每个插入操作必须等待另一个线程调用移除操作,否则插入操作将一直阻塞,吞吐量高于 LinkedBlockingQueue。
      • 使用无锁算法,基于节点状态执行判断,无需使用锁,核心是 Transfer.transfer。
    • PriorityBlockingQueue:具有优先级的无界阻塞队列。
  • maximumPoolSize:允许的最大线程数量。
    • 如果阻塞队列已满后继续提交任务,则需创建新的线程来执行任务,前提是线程数小于最大允许数量。
    • 当阻塞队列是无界队列时,则最大允许数量不起作用。
  • keepAliveTime:线程空闲存活时间。
    • 即当线程没有执行任务时,该线程继续存活的时间。
    • 默认情况下,该参数只有在线程数量大于 corePoolSize 时才起效。
    • 超过空闲存活时间的现场将被终止。
  • unit:线程空闲存活时间的时间单位。
  • threadFactory:创建线程的工厂,通过自定义工厂可以设置线程的属性,如名称、demaon。
  • handler:线程池饱和策略。如果队列已满且没有空闲线程,如果继续提交任务,必须采取一种策略来处理该任务,共有四种策略:
    • AbortPolicy:直接抛出异常,默认策略。
    • CallerRunPolicy:用调用者线程来执行任务。
    • DiscardOldestPolicy:丢弃队列中较靠前的任务,以执行当前任务。
    • DiscardPolicy:直接丢弃任务。
    • 支持自定义饱和策略,比如记录日志会持久化存储任务信息。

类型

newFixedThreadPool

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>());
}
  • 固定线程数量(corePoolSize)。
  • 即使线程池没有可执行的任务,也不会终止线程。
  • 采用无界队列 LinkedBlockingQueue(Integer.MAX_VALUE),潜在问题:
    • 线程数量不会超过 corePoolSize,导致 maximumPoolSize 和 keepAliveTIme 参数失效。
    • 采用无界队列导致永远不会拒绝提交的任务,导致饱和策略失效。

newSingleThreadExecutor

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}
  • 初始化的线程池中仅一个线程,如果该线程异常结束,会新建线程以继续执行任务。
  • 该唯一线程可以保证顺序处理队列中的任务。
  • 基于无界队列,因此饱和策略失效。

newCachedThreadPool

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                    60L, TimeUnit.SECONDS,
                                    new SynchronousQueue<Runnable>());
}
  • 线程数最多可达 Integer.MAX_VALUE。
  • 内部使用 SynchronousQueue 作为阻塞队列。
  • 线程空间时间超过最大空闲时长会终止线程。
  • 如果提交任务没有可用线程,则新建线程。
  • 执行过程与前两个线程池不同:
    • 主线程调用 SynchronousQueue.offer 添加 task,如果此时线程池中有空闲线程尝试读取队列中的任务,即调用 SynchronousQueue.poll,则主线程将该 task 交给空闲线程。否则进入下一步。
    • 当线程池为空或没有空闲线程,则新建线程。
    • 执行完任务的线程如果在 60 秒内空间,则被终止,因此长时间空闲的线程池不会持有任何线程资源。

关闭线程池

遍历线程池中的所有线程,然后逐个调用线程的 interrupt 方法来中断线程。

关闭方式:shutdown

将线程池里的线程状态设置成SHUTDOWN状态, 然后中断所有没有正在执行任务的线程。

关闭方式:shutdownNow

将线程池里的线程状态设置成STOP状态, 然后停止所有正在执行或暂停任务的线程。

只要调用这两个关闭方法中的任意一个, isShutDown() 返回true. 当所有任务都成功关闭了, isTerminated()返回true。

ThreadPoolExecutor

关键属性

//这个属性是用来存放 当前运行的worker数量以及线程池状态的
//int是32位的,这里把int的高3位拿来充当线程池状态的标志位,后29位拿来充当当前运行worker的数量
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//存放任务的阻塞队列
private final BlockingQueue<Runnable> workQueue;
//worker的集合,用set来存放
private final HashSet<Worker> workers = new HashSet<Worker>();
//历史达到的worker数最大值
private int largestPoolSize;
//当队列满了并且worker的数量达到maxSize的时候,执行具体的拒绝策略
private volatile RejectedExecutionHandler handler;
//超出coreSize的worker的生存时间
private volatile long keepAliveTime;
//常驻worker的数量
private volatile int corePoolSize;
//最大worker的数量,一般当workQueue满了才会用到这个参数
private volatile int maximumPoolSize;

内部状态

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

// runState is stored in the high-order bits
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;

// Packing and unpacking ctl
private static int runStateOf(int c)     { return c & ~CAPACITY; }
private static int workerCountOf(int c)  { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }

其中AtomicInteger变量ctl的功能非常强大: 利用低29位表示线程池中线程数,通过高3位表示线程池的运行状态:

  • RUNNING: -1 « COUNT_BITS,即高3位为111,该状态的线程池会接收新任务,并处理阻塞队列中的任务;
  • SHUTDOWN: 0 « COUNT_BITS,即高3位为000,该状态的线程池不会接收新任务,但会处理阻塞队列中的任务;
  • STOP : 1 « COUNT_BITS,即高3位为001,该状态的线程不会接收新任务,也不会处理阻塞队列中的任务,而且会中断正在运行的任务;
  • TIDYING : 2 « COUNT_BITS,即高3位为010, 所有的任务都已经终止;
  • TERMINATED: 3 « COUNT_BITS,即高3位为011, terminated()方法已经执行完成
NAME

执行过程

execute –> addWorker –> runworker(getTask)

  • 线程池的工作线程通过Woker类实现,在ReentrantLock锁的保证下,把Woker实例插入到HashSet后,并启动Woker中的线程。
  • 从Woker类的构造方法实现可以发现: 线程工厂在创建线程thread时,将Woker实例本身this作为参数传入,当执行start方法启动线程thread时,本质是执行了Worker的runWorker方法。
  • firstTask执行完成之后,通过getTask方法从阻塞队列中获取等待的任务,如果队列中没有任务,getTask方法会被阻塞并挂起,不会占用cpu资源;

execute 方法

ThreadPoolExecutor.execute(task)实现了Executor.execute(task)

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    /*
     * Proceed in 3 steps:
     *
     * 1. If fewer than corePoolSize threads are running, try to
     * start a new thread with the given command as its first
     * task.  The call to addWorker atomically checks runState and
     * workerCount, and so prevents false alarms that would add
     * threads when it shouldn't, by returning false.
     *
     * 2. If a task can be successfully queued, then we still need
     * to double-check whether we should have added a thread
     * (because existing ones died since last checking) or that
     * the pool shut down since entry into this method. So we
     * recheck state and if necessary roll back the enqueuing if
     * stopped, or start a new thread if there are none.
     *
     * 3. If we cannot queue task, then we try to add a new
     * thread.  If it fails, we know we are shut down or saturated
     * and so reject the task.
     */
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {  
    //workerCountOf获取线程池的当前线程数;小于corePoolSize,执行addWorker创建新线程执行command任务
       if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    // double check: c, recheck
    // 线程池处于RUNNING状态,把提交的任务成功放入阻塞队列中
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        // recheck and if necessary 回滚到入队操作前,即倘若线程池shutdown状态,就remove(command)
        //如果线程池没有RUNNING,成功从阻塞队列中删除任务,执行reject方法处理任务
        if (! isRunning(recheck) && remove(command))
            reject(command);
        //线程池处于running状态,但是没有线程,则创建线程
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    // 往线程池中创建新的线程失败,则reject任务
    else if (!addWorker(command, false))
        reject(command);
}
  • 为什么需要double check线程池的状态?

在多线程环境下,线程池的状态时刻在变化,而ctl.get()是非原子操作,很有可能刚获取了线程池状态后线程池状态就改变了。判断是否将command加入workque是线程池之前的状态。倘若没有double check,万一线程池处于非running状态(在多线程环境下很有可能发生),那么command永远不会执行。

addWorker 方法

从方法execute的实现可以看出: addWorker主要负责创建新的线程并执行任务。

线程池创建新线程执行任务时,需要获取全局锁:

private final ReentrantLock mainLock = new ReentrantLock();

private boolean addWorker(Runnable firstTask, boolean core) {
    // CAS更新线程池数量
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
                firstTask == null &&
                ! workQueue.isEmpty()))
            return false;

        for (;;) {
            int wc = workerCountOf(c);
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            // 线程池重入锁
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int rs = runStateOf(ctl.get());

                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                t.start();  // 线程启动,执行任务(Worker.thread(firstTask).start());
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

Worker.runWorker 方法

 private final class Worker extends AbstractQueuedSynchronizer implements Runnable{
     Worker(Runnable firstTask) {
         setState(-1); // inhibit interrupts until runWorker
         this.firstTask = firstTask;
         this.thread = getThreadFactory().newThread(this); // 创建线程
     }
     /** Delegates main run loop to outer runWorker  */
     public void run() {
         runWorker(this);
     }
     // ...
 }
  • 继承了AQS类,可以方便的实现工作线程的中止操作;
  • 实现了Runnable接口,可以将自身作为一个任务在工作线程中执行;
  • 当前提交的任务firstTask作为参数传入Worker的构造方法;

一些属性还有构造方法:

//运行的线程,前面addWorker方法中就是直接通过启动这个线程来启动这个worker
final Thread thread;
//当一个worker刚创建的时候,就先尝试执行这个任务
Runnable firstTask;
//记录完成任务的数量
volatile long completedTasks;

Worker(Runnable firstTask) {
    setState(-1); // inhibit interrupts until runWorker
    this.firstTask = firstTask;
    //创建一个Thread,将自己设置给他,后面这个thread启动的时候,也就是执行worker的run方法
    this.thread = getThreadFactory().newThread(this);
}   

runWorker方法是线程池的核心:

  • 线程启动之后,通过unlock方法释放锁,设置AQS的state为0,表示运行可中断;
  • Worker执行firstTask或从workQueue中获取任务
    • 进行加锁操作,保证thread不被其他线程中断(除非线程池被中断)
    • 检查线程池状态,倘若线程池处于中断状态,当前线程将中断。
    • 执行beforeExecute
    • 执行任务的run方法
    • 执行afterExecute方法
    • 解锁操作

通过getTask方法从阻塞队列中获取等待的任务,如果队列中没有任务,getTask方法会被阻塞并挂起,不会占用cpu资源;

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        // 先执行firstTask,再从workerQueue中取task(getTask())

        while (task != null || (task = getTask()) != null) {
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            if ((runStateAtLeast(ctl.get(), STOP) ||
                    (Thread.interrupted() &&
                    runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

getTask 方法

下面来看一下getTask()方法,这里面涉及到keepAliveTime的使用,从这个方法我们可以看出先吃池是怎么让超过corePoolSize的那部分worker销毁的。

private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?

    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);

        // Are workers subject to culling?
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

注意这里一段代码是keepAliveTime起作用的关键:

boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
  • allowCoreThreadTimeOut为false,线程即使空闲也不会被销毁;
  • 倘若为ture,在keepAliveTime内仍空闲则会被销毁。

如果线程允许空闲等待而不被销毁timed == false,workQueue.take任务:

如果阻塞队列为空,当前线程会被挂起等待;

当队列中有任务加入时,线程被唤醒,take方法返回任务,并执行;

如果线程不允许无休止空闲timed == true, workQueue.poll任务: 如果在keepAliveTime时间内,阻塞队列还是没有任务,则返回null;

提交过程

NAME
  1. submit任务,等待线程池execute
  2. 执行FutureTask类的get方法时,会把主线程封装成WaitNode节点并保存在waiters链表中, 并阻塞等待运行结果;
  3. FutureTask任务执行完成后,通过UNSAFE设置waiters相应的waitNode为null,并通过LockSupport类unpark方法唤醒主线程;

在实际业务场景中,Future和Callable基本是成对出现的,Callable负责产生结果,Future负责获取结果。

  1. Callable接口类似于Runnable,只是Runnable没有返回值。
  2. Callable任务除了返回正常结果之外,如果发生异常,该异常也会被返回,即Future可以拿到异步执行任务各种结果;
  3. Future.get方法会导致主线程阻塞,直到Callable任务执行完成;

submit 方法

AbstractExecutorService.submit()实现了ExecutorService.submit() 可以获取执行完的返回值, 而ThreadPoolExecutor 是AbstractExecutorService.submit()的子类,所以submit方法也是ThreadPoolExecutor的方法。

// submit()在ExecutorService中的定义
<T> Future<T> submit(Callable<T> task);

<T> Future<T> submit(Runnable task, T result);

Future<?> submit(Runnable task);
// submit方法在AbstractExecutorService中的实现
public Future<?> submit(Runnable task) {
    if (task == null) throw new NullPointerException();
    // 通过submit方法提交的Callable任务会被封装成了一个FutureTask对象。
    RunnableFuture<Void> ftask = newTaskFor(task, null);
    execute(ftask);
    return ftask;
}

通过submit方法提交的Callable任务会被封装成了一个FutureTask对象。通过Executor.execute方法提交FutureTask到线程池中等待被执行,最终执行的是FutureTask的run方法;

关闭过程

shutdown方法会将线程池的状态设置为SHUTDOWN,线程池进入这个状态后,就拒绝再接受任务,然后会将剩余的任务全部执行完:

public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        //检查是否可以关闭线程
        checkShutdownAccess();
        //设置线程池状态
        advanceRunState(SHUTDOWN);
        //尝试中断worker
        interruptIdleWorkers();
            //预留方法,留给子类实现
        onShutdown(); // hook for ScheduledThreadPoolExecutor
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
}

private void interruptIdleWorkers() {
    interruptIdleWorkers(false);
}

private void interruptIdleWorkers(boolean onlyOne) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        //遍历所有的worker
        for (Worker w : workers) {
            Thread t = w.thread;
            //先尝试调用w.tryLock(),如果获取到锁,就说明worker是空闲的,就可以直接中断它
            //注意的是,worker自己本身实现了AQS同步框架,然后实现的类似锁的功能
            //它实现的锁是不可重入的,所以如果worker在执行任务的时候,会先进行加锁,这里tryLock()就会返回false
            if (!t.isInterrupted() && w.tryLock()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                } finally {
                    w.unlock();
                }
            }
            if (onlyOne)
                break;
        }
    } finally {
        mainLock.unlock();
    }
}

shutdownNow做的比较绝,它先将线程池状态设置为STOP,然后拒绝所有提交的任务。最后中断左右正在运行中的worker,然后清空任务队列。

public List<Runnable> shutdownNow() {
    List<Runnable> tasks;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        //检测权限
        advanceRunState(STOP);
        //中断所有的worker
        interruptWorkers();
        //清空任务队列
        tasks = drainQueue();
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
    return tasks;
}

private void interruptWorkers() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        //遍历所有worker,然后调用中断方法
        for (Worker w : workers)
            w.interruptIfStarted();
    } finally {
        mainLock.unlock();
    }
}

配置线程池需要考虑因素

从任务的优先级,任务的执行时间长短,任务的性质(CPU密集/ IO密集),任务的依赖关系这四个角度来分析。并且近可能地使用有界的工作队列。

性质不同的任务可用使用不同规模的线程池分开处理:

  • CPU密集型: 尽可能少的线程,Ncpu+1
  • IO密集型: 尽可能多的线程, Ncpu*2,比如数据库连接池
  • 混合型: CPU密集型的任务与IO密集型任务的执行时间差别较小,拆分为两个线程池;否则没有必要拆分。

监控线程池的状态

可以使用ThreadPoolExecutor以下方法:

  • getTaskCount
  • getCompletedTaskCount
  • getLargestPoolSize
  • getPoolSize
  • getActiveCount

参考