2025-06-09
内卷九股文
0

目录

线程池
JDK自带的构建线程池的方式 Executors
固定大小的线程池 (newFixedThreadPool)
单线程的线程池 (newSingleThreadExecutor)
可缓存的线程池 (newCachedThreadPool)
定时或周期性任务的线程池 (newScheduledThreadPool)
ThreadPoolExecutor应用&源码剖析
为什么要自定义线程池
ThreadPoolExecutor应用
ThreadPoolExecutor源码剖析
核心属性
有参构造
execute方法
addWorker方法
Worker工作线程
ThreadPoolExecutor的runWorker方法
getTask方法
shutdownNow关闭方法
线程池的核心参数设计规则
线程池参数示例
队列选型指南
线程池处理任务的核心流程
ScheduleThreadPoolExecutor
ScheduleThreadPoolExecutor应用
ScheduleThreadPoolExecutor源码剖析
核心属性
schedule方法
At和With方法&任务的run方法
总结

在开始以前,先思考下什么是线程创建有哪几种方式,为了避免频繁创建和销毁线程造成不必要的性能,一般在使用线程时,会采用线程池,那什么是线程池呢?

线程池

Java线程池是并发编程的核心组件,而 ThreadPoolExecutor 作为其最核心的实现,通过池化技术和精细化任务调度,解决了频繁创建/销毁线程的资源消耗问题。

本文将从设计思想、工作原理到源码实现,全方位解析 ThreadPoolExecutor 的底层机制。

JDK自带的构建线程池的方式 Executors

JDK中基于Executors提供了很多种线程池

固定大小的线程池 (newFixedThreadPool)

创建一个固定大小的线程池,可控制线程最大并发数,超出的线程会在队列中等待。

java
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }

构建时,需要给newFixedThreadPool方法提供一个nThreads的属性,而这个属性其实就是当前线程池中线程的个数。当前线程池的本质其实就是使用ThreadPoolExecutor。

构建好当前线程池后,线程个数已经固定好(线程是懒加载,在构建之初,线程并没有构建出来,而是随着人任务的提交才会将线程在线程池中国构建出来)。如果线程没构建,线程会待着任务执行被创建和执行。如果线程都已经构建好了,此时任务会被放到LinkedBlockingQueue无界队列中存放,等待线程从LinkedBlockingQueue中去take出任务,然后执行。

java
public static void main(String[] args) throws Exception { ExecutorService threadPool = Executors.newFixedThreadPool(3); threadPool.execute(() -> { System.out.println("1号任务:" + Thread.currentThread().getName() + System.currentTimeMillis()); try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } }); threadPool.execute(() -> { System.out.println("2号任务:" + Thread.currentThread().getName() + System.currentTimeMillis()); try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } }); threadPool.execute(() -> { System.out.println("3号任务:" + Thread.currentThread().getName() + System.currentTimeMillis()); try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } }); }

单线程的线程池 (newSingleThreadExecutor)

这个线程池看名字就知道是单例线程池,线程池中只有一个工作线程在处理任务。当你希望所有的任务都在一个线程中顺序执行时使用。这可以保证任务的顺序执行,适用于需要串行处理任务的场景。

java
// 当前这里就是构建单例线程池的方式 public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService // 在内部依然是构建了ThreadPoolExecutor,设置的线程个数为1 // 当任务投递过来后,第一个任务会被工作线程处理,后续的任务会被扔到阻塞队列中 // 投递到阻塞队列中任务的顺序,就是工作线程处理的顺序 // 当前这种线程池可以用作顺序处理的一些业务中 (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }

示例代码:

java
public static void main(String[] args) throws Exception { ExecutorService threadPool = Executors.newSingleThreadExecutor(); threadPool.execute(() -> { System.out.println(Thread.currentThread().getName() + "," + "111"); }); threadPool.execute(() -> { System.out.println(Thread.currentThread().getName() + "," + "222"); }); threadPool.execute(() -> { System.out.println(Thread.currentThread().getName() + "," + "333"); }); threadPool.execute(() -> { System.out.println(Thread.currentThread().getName() + "," + "444"); }); }

可缓存的线程池 (newCachedThreadPool)

适用于执行大量短期异步任务的情况。线程池会根据需要创建新线程,并且在长时间闲置后自动终止线程。适用于执行大量独立且耗时短的任务。

java
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }

最大的一个特点,任务只要提交到当前的newCachedThreadPool中,就必然有工作线程可以处理

  • 当第一次提交任务到线程池时,会直接构建一个工作线程
  • 这个工作线程带执行完人后,60秒没有任务可以执行后,会结束
  • 如果在等待60秒期间有任务进来,他会再次拿到这个任务去执行
  • 如果后续提升任务时,没有线程是空闲的,那么就构建工作线程去执行。

代码示例:

java
public static void main(String[] args) throws Exception { ExecutorService executorService = Executors.newCachedThreadPool(); for (int i = 1; i <= 200; i++) { final int j = i; executorService.execute(() -> { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + ":" + j); }); } }

定时或周期性任务的线程池 (newScheduledThreadPool)

用于需要定时或定期执行任务的场景。你可以使用schedule或scheduleAtFixedRate方法来安排任务的执行。

查看一下如何构建的。

java
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); }

基于这个方法可以看到,构建的是ScheduledThreadPoolExecutor线程池

java
public class ScheduledThreadPoolExecutor extends ThreadPoolExecutor{ //.... }

所以本质上还是正常线程池,只不过在原来的线程池基础上实现了定时任务的功能

原理是基于DelayQueue实现的延迟执行。周期性执行是任务执行完毕后,再次扔回到阻塞队列。

代码示例:

java
public static void main(String[] args) throws Exception { ScheduledExecutorService pool = Executors.newScheduledThreadPool(10); // 正常执行 // pool.execute(() -> { // System.out.println(Thread.currentThread().getName() + ":1"); // }); // 延迟执行,执行当前任务延迟5s后再执行 // pool.schedule(() -> { // System.out.println(Thread.currentThread().getName() + ":2"); // },5,TimeUnit.SECONDS); // 周期执行,当前任务第一次延迟5s执行,然后没3s执行一次 // 这个方法在计算下次执行时间时,是从任务刚刚开始时就计算。 // pool.scheduleAtFixedRate(() -> { // try { // Thread.sleep(3000); // } catch (InterruptedException e) { // e.printStackTrace(); // } // System.out.println(System.currentTimeMillis() + ":3"); // },2,1,TimeUnit.SECONDS); // 周期执行,当前任务第一次延迟5s执行,然后没3s执行一次 // 这个方法在计算下次执行时间时,会等待任务结束后,再计算时间 pool.scheduleWithFixedDelay(() -> { try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(System.currentTimeMillis() + ":3"); },2,1,TimeUnit.SECONDS); }

至于Executors提供的newSingleThreadScheduledExecutor单例的定时任务线程池就不说了。

一个线程的线程池可以延迟或者以一定的周期执行一个任务。

ThreadPoolExecutor应用&源码剖析

前面讲到的Executors中的构建线程池的方式,大多数还是基于ThreadPoolExecutor去new出来的。

为什么要自定义线程池

首先ThreadPoolExecutor中,一共提供了7个参数,每个参数都是非常核心的属性,在线程池去执行任务时,每个参数都有决定性的作用。

但是如果直接采用JDK提供的方式去构建,可以设置的核心参数最多就两个,这样就会导致对线程池的控制粒度很粗。所以在阿里规范中也推荐自己去自定义线程池。手动的去new ThreadPoolExecutor设置他的一些核心属性。

自定义构建线程池,可以细粒度的控制线程池,去管理内存的属性,并且针对一些参数的设置可能更好的在后期排查问题。

查看一下ThreadPoolExecutor提供的七个核心参数

java
public ThreadPoolExecutor( int corePoolSize, // 核心工作线程(当前任务执行结束后,不会被销毁) int maximumPoolSize, // 最大工作线程(代表当前线程池中,一共可以有多少个工作线程) long keepAliveTime, // 非核心工作线程在阻塞队列位置等待的时间 TimeUnit unit, // 非核心工作线程在阻塞队列位置等待时间的单位 BlockingQueue<Runnable> workQueue, // 任务在没有核心工作线程处理时,任务先扔到阻塞队列中 ThreadFactory threadFactory, // 构建线程的线程工作,可以设置thread的一些信息 RejectedExecutionHandler handler) { // 当线程池无法处理投递过来的任务时,执行当前的拒绝策略 // 初始化线程池的操作 }

ThreadPoolExecutor应用

手动new一下,处理的方式还是执行execute或者submit方法。

JDK提供的几种拒绝策略:

  • AbortPolicy:当前拒绝策略会在无法处理任务时,直接抛出一个异常
  • CallerRunsPolicy:当前拒绝策略会在线程池无法处理任务时,将任务交给调用者处理
  • DiscardPolicy:当前拒绝策略会在线程池无法处理任务时,直接将任务丢弃掉
  • DiscardOldestPolicy:当前拒绝策略会在线程池无法处理任务时,将队列中最早的任务丢弃掉,将当前任务再次尝试交给线程池处理
  • 自定义Policy:根据自己的业务,可以将任务扔到数据库,也可以做其他操作。

代码构建线程池,并处理有无返回结果的任务

java
public static void main(String[] args) throws ExecutionException, InterruptedException { //1. 构建线程池 ThreadPoolExecutor threadPool = new ThreadPoolExecutor( 2, 5, 10, TimeUnit.SECONDS, new ArrayBlockingQueue<>(5), new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r); thread.setName("test-ThreadPoolExecutor"); return thread; } }, new MyRejectedExecution() ); //2. 让线程池处理任务,没返回结果 threadPool.execute(() -> { System.out.println("没有返回结果的任务"); }); //3. 让线程池处理有返回结果的任务 Future<Object> future = threadPool.submit(new Callable<Object>() { @Override public Object call() throws Exception { System.out.println("我有返回结果!"); return "返回结果"; } }); Object result = future.get(); System.out.println(result); //4. 如果是局部变量的线程池,记得用完要shutdown threadPool.shutdown(); } private static class MyRejectedExecution implements RejectedExecutionHandler{ @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { System.out.println("根据自己的业务情况,决定编写的代码!"); } }

ThreadPoolExecutor源码剖析

线程池的源码内容会比较多一点,需要一点一点的去查看,内部比较多。

核心属性

核心属性主要就是ctl,基于ctl拿到线程池的状态以及工作线程个数

在整个线程池的执行流程中,会基于ctl判断上述两个内容

java
// 当前是线程池的核心属性 // 当前的ctl其实就是一个int类型的数值,内部是基于AtomicInteger套了一层,进行运算时,是原子性的。 // ctl表示着线程池中的2个核心状态: // 线程池的状态:ctl的高3位,表示线程池状态 // 工作线程的数量:ctl的低29位,表示工作线程的个数 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); // Integer.SIZE:在获取Integer的bit位个数 // 声明了一个常量:COUNT_BITS = 29 private static final int COUNT_BITS = Integer.SIZE - 3; 00000000 00000000 00000000 00000001 00100000 00000000 00000000 00000000 00011111 11111111 11111111 11111111 // CAPACITY就是当前工作线程能记录的工作线程的最大个数 private static final int CAPACITY = (1 << COUNT_BITS) - 1; // 线程池状态的表示 // 当前五个状态中,只有RUNNING状态代表线程池没问题,可以正常接收任务处理 // 111:代表RUNNING状态,RUNNING可以处理任务,并且处理阻塞队列中的任务。 private static final int RUNNING = -1 << COUNT_BITS; // 000:代表SHUTDOWN状态,不会接收新任务,正在处理的任务正常进行,阻塞队列的任务也会做完。 private static final int SHUTDOWN = 0 << COUNT_BITS; // 001:代表STOP状态,不会接收新任务,正在处理任务的线程会被中断,阻塞队列的任务一个不管。 private static final int STOP = 1 << COUNT_BITS; // 010:代表TIDYING状态,这个状态是否SHUTDOWN或者STOP转换过来的,代表当前线程池马上关闭,就是过渡状态。 private static final int TIDYING = 2 << COUNT_BITS; // 011:代表TERMINATED状态,这个状态是TIDYING状态转换过来的,转换过来只需要执行一个terminated方法。 private static final int TERMINATED = 3 << COUNT_BITS; // 在使用下面这几个方法时,需要传递ctl进来 // 基于&运算的特点,保证只会拿到ctl高三位的值。 private static int runStateOf(int c) { return c & ~CAPACITY; } // 基于&运算的特点,保证只会拿到ctl低29位的值。 private static int workerCountOf(int c) { return c & CAPACITY; }

线程池状态的特点以及转换的方式

image.png

有参构造

有参构造没啥说的,记住核心线程个数是允许为0的。

java
// 有参构造。无论调用哪个有参构造,最终都会执行当前的有参构造 public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { // 健壮性校验 // 核心线程个数是允许为0个的。 // 最大线程数必须大于0,最大线程数要大于等于核心线程数 // 非核心线程的最大空闲时间,可以等于0 if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) // 不满足要求就抛出参数异常 throw new IllegalArgumentException(); // 阻塞队列,线程工厂,拒绝策略都不允许为null,为null就扔空指针异常 if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); // 不要关注当前内容,系统资源访问决策,和线程池核心业务关系不大。 this.acc = System.getSecurityManager() == null ? null : AccessController.getContext(); // 各种赋值,JUC包下,几乎所有涉及到线程挂起的操作,单位都用纳秒。 // 有参构造的值,都赋值给成员变量。 // Doug Lea的习惯就是将成员变量作为局部变量单独操作。 this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }

execute方法

execute方法是提交任务到线程池的核心方法,很重要

线程池的执行流程其实就是在说execute方法内部做了哪些判断

execute源码的分析

java
// 提交任务到线程池的核心方法 // command就是提交过来的任务 public void execute(Runnable command) { // 提交的任务不能为null if (command == null) throw new NullPointerException(); // 获取核心属性ctl,用于后面的判断 int c = ctl.get(); // 如果工作线程个数,小于核心线程数。 // 满足要求,添加核心工作线程 if (workerCountOf(c) < corePoolSize) { // addWorker(任务,是核心线程吗) // addWorker返回true:代表添加工作线程成功 // addWorker返回false:代表添加工作线程失败 // addWorker中会基于线程池状态,以及工作线程个数做判断,查看能否添加工作线程 if (addWorker(command, true)) // 工作线程构建出来了,任务也交给command去处理了。 return; // 说明线程池状态或者是工作线程个数发生了变化,导致添加失败,重新获取一次ctl c = ctl.get(); } // 添加核心工作线程失败,往这走 // 判断线程池状态是否是RUNNING,如果是,正常基于阻塞队列的offer方法,将任务添加到阻塞队列 if (isRunning(c) && workQueue.offer(command)) { // 如果任务添加到阻塞队列成功,走if内部 // 如果任务在扔到阻塞队列之前,线程池状态突然改变了。 // 重新获取ctl int recheck = ctl.get(); // 如果线程池的状态不是RUNNING,将任务从阻塞队列移除, if (!isRunning(recheck) && remove(command)) // 并且直接拒绝策略 reject(command); // 在这,说明阻塞队列有我刚刚放进去的任务 // 查看一下工作线程数是不是0个 // 如果工作线程为0个,需要添加一个非核心工作线程去处理阻塞队列中的任务 // 发生这种情况有两种: // 1. 构建线程池时,核心线程数是0个。 // 2. 即便有核心线程,可以设置核心线程也允许超时,设置allowCoreThreadTimeOut为true,代表核心线程也可以超时 else if (workerCountOf(recheck) == 0) // 为了避免阻塞队列中的任务饥饿,添加一个非核心工作线程去处理 addWorker(null, false); } // 任务添加到阻塞队列失败 // 构建一个非核心工作线程 // 如果添加非核心工作线程成功,直接完事,告辞 else if (!addWorker(command, false)) // 添加失败,执行决绝策略 reject(command); }

execute方法的完整执行流程图

image.png

addWorker方法

addWorker中主要分成两大块去看

  • 第一块:校验线程池的状态以及工作线程个数
  • 第二块:添加工作线程并且启动工作线程

校验线程池的状态以及工作线程个数

java
// 添加工作线程之校验源码 private boolean addWorker(Runnable firstTask, boolean core) { // 外层for循环在校验线程池的状态 // 内层for循环是在校验工作线程的个数 // retry是给外层for循环添加一个标记,是为了方便在内层for循坏跳出外层for循环 retry: for (;;) { // 获取ctl int c = ctl.get(); // 拿到ctl的高3位的值 int rs = runStateOf(c); //==========================线程池状态判断================================================== // 如果线程池状态是SHUTDOWN,并且此时阻塞队列有任务,工作线程个数为0,添加一个工作线程去处理阻塞队列的任务 // 判断线程池的状态是否大于等于SHUTDOWN,如果满足,说明线程池不是RUNNING if (rs >= SHUTDOWN && // 如果这三个条件都满足,就代表是要添加非核心工作线程去处理阻塞队列任务 // 如果三个条件有一个没满足,返回false,配合!,就代表不需要添加 !(rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) // 不需要添加工作线程 return false; for (;;) { //==========================工作线程个数判断================================================== // 基于ctl拿到低29位的值,代表当前工作线程个数 int wc = workerCountOf(c); // 如果工作线程个数大于最大值了,不可以添加了,返回false if (wc >= CAPACITY || // 基于core来判断添加的是否是核心工作线程 // 如果是核心:基于corePoolSize去判断 // 如果是非核心:基于maximumPoolSize去判断 wc >= (core ? corePoolSize : maximumPoolSize)) // 代表不能添加,工作线程个数不满足要求 return false; // 针对ctl进行 + 1,采用CAS的方式 if (compareAndIncrementWorkerCount(c)) // CAS成功后,直接退出外层循环,代表可以执行添加工作线程操作了。 break retry; // 重新获取一次ctl的值 c = ctl.get(); // 判断重新获取到的ctl中,表示的线程池状态跟之前的是否有区别 // 如果状态不一样,说明有变化,重新的去判断线程池状态 if (runStateOf(c) != rs) // 跳出一次外层for循环 continue retry; } } // 省略添加工作线程以及启动的过程 }

添加工作线程并且启动工作线程

java
private boolean addWorker(Runnable firstTask, boolean core) { // 省略校验部分的代码 // 添加工作线程以及启动工作线程~~~ // 声明了三个变量 // 工作线程启动了没,默认false boolean workerStarted = false; // 工作线程添加了没,默认false boolean workerAdded = false; // 工作线程,默认为null Worker w = null; try { // 构建工作线程,并且将任务传递进去 w = new Worker(firstTask); // 获取了Worker中的Thread对象 final Thread t = w.thread; // 判断Thread是否不为null,在new Worker时,内部会通过给予的ThreadFactory去构建Thread交给Worker // 一般如果为null,代表ThreadFactory有问题。 if (t != null) { // 加锁,保证使用workers成员变量以及对largestPoolSize赋值时,保证线程安全 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 再次获取线程池状态。 int rs = runStateOf(ctl.get()); // 再次判断 // 如果满足 rs < SHUTDOWN 说明线程池是RUNNING,状态正常,执行if代码块 // 如果线程池状态为SHUTDOWN,并且firstTask为null,添加非核心工作处理阻塞队列任务 if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { // 到这,可以添加工作线程。 // 校验ThreadFactory构建线程后,不能自己启动线程,如果启动了,抛出异常 if (t.isAlive()) throw new IllegalThreadStateException(); // private final HashSet<Worker> workers = new HashSet<Worker>(); // 将new好的Worker添加到HashSet中。 workers.add(w); // 获取了HashSet的size,拿到工作线程个数 int s = workers.size(); // largestPoolSize在记录最大线程个数的记录 // 如果当前工作线程个数,大于最大线程个数的记录,就赋值 if (s > largestPoolSize) largestPoolSize = s; // 添加工作线程成功 workerAdded = true; } } finally { mainLock.unlock(); } // 如果工作线程添加成功, if (workerAdded) { // 直接启动Worker中的线程 t.start(); // 启动工作线程成功 workerStarted = true; } } } finally { // 做补偿的操作,如果工作线程启动失败,将这个添加失败的工作线程处理掉 if (!workerStarted) addWorkerFailed(w); } // 返回工作线程是否启动成功 return workerStarted; } // 工作线程启动失败,需要不的步长操作 private void addWorkerFailed(Worker w) { // 因为操作了workers,需要加锁 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 如果w不为null,之前Worker已经new出来了。 if (w != null) // 从HashSet中移除 workers.remove(w); // 同时对ctl进行 - 1,代表去掉了一个工作线程个数 decrementWorkerCount(); // 因为工作线程启动失败,判断一下状态的问题,是不是可以走TIDYING状态最终到TERMINATED状态了。 tryTerminate(); } finally { // 释放锁 mainLock.unlock(); } }

Worker工作线程

Worker对象主要包含了两个内容

  • 工作线程要执行任务
  • 工作线程可能会被中断,控制中断
java
// Worker继承了AQS,目的就是为了控制工作线程的中断。 // Worker实现了Runnable,内部的Thread对象,在执行start时,必然要执行Worker中断额一些操作 private final class Worker extends AbstractQueuedSynchronizer implements Runnable{ // =======================Worker管理任务================================ // 线程工厂构建的线程 final Thread thread; // 当前Worker要执行的任务 Runnable firstTask; // 记录当前工作线程处理了多少个任务。 volatile long completedTasks; // 有参构造 Worker(Runnable firstTask) { // 将State设置为-1,代表当前不允许中断线程 setState(-1); // 任务赋值 this.firstTask = firstTask; // 基于线程工作构建Thread,并且传入的Runnable是Worker this.thread = getThreadFactory().newThread(this); } // 当thread执行start方法时,调用的是Worker的run方法, public void run() { // 任务执行时,执行的是runWorker方法 runWorker(this); } // =======================Worker管理中断================================ // 当前方法是中断工作线程时,执行的方法 void interruptIfStarted() { Thread t; // 只有Worker中的state >= 0的时候,可以中断工作线程 if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { // 如果状态正常,并且线程未中断,这边就中断线程 t.interrupt(); } catch (SecurityException ignore) { } } } protected boolean isHeldExclusively() { return getState() != 0; } protected boolean tryAcquire(int unused) { if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } protected boolean tryRelease(int unused) { setExclusiveOwnerThread(null); setState(0); return true; } public void lock() { acquire(1); } public boolean tryLock() { return tryAcquire(1); } public void unlock() { release(1); } public boolean isLocked() { return isHeldExclusively(); } }

ThreadPoolExecutor的runWorker方法

runWorker就是让工作线程拿到任务去执行即可。

并且在内部也处理了在工作线程正常结束和异常结束时的处理方案

java
// 工作线程启动后执行的任务。 final void runWorker(Worker w) { // 拿到当前线程 Thread wt = Thread.currentThread(); // 从worker对象中拿到任务 Runnable task = w.firstTask; // 将Worker中的firstTask置位空 w.firstTask = null; // 将Worker中的state置位0,代表当前线程可以中断的 w.unlock(); // allow interrupts // 判断工作线程是否是异常结束,默认就是异常结束 boolean completedAbruptly = true; try { // 获取任务 // 直接拿到第一个任务去执行 // 如果第一个任务为null,去阻塞队列中获取任务 while (task != null || (task = getTask()) != null) { // 执行了Worker的lock方法,当前在lock时,shutdown操作不能中断当前线程,因为当前线程正在处理任务 w.lock(); // 比较ctl >= STOP,如果满足找个状态,说明线程池已经到了STOP状态甚至已经要凉凉了 // 线程池到STOP状态,并且当前线程还没有中断,确保线程是中断的,进到if内部执行中断方法 // if(runStateAtLeast(ctl.get(), STOP) && !wt.isInterrupted()) {中断线程} // 如果线程池状态不是STOP,确保线程不是中断的。 // 如果发现线程中断标记位是true了,再次查看线程池状态是大于STOP了,再次中断线程 // 这里其实就是做了一个事情,如果线程池状态 >= STOP,确保线程中断了。 if ( ( runStateAtLeast(ctl.get(), STOP) || ( Thread.interrupted() && runStateAtLeast(ctl.get(), STOP) ) ) && !wt.isInterrupted()) wt.interrupt(); try { // 勾子函数在线程池中没有做任何的实现,如果需要在线程池执行任务前后做一些额外的处理,可以重写勾子函数 // 前置勾子函数 beforeExecute(wt, task); Throwable thrown = null; try { // 执行任务。 task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { // 前后置勾子函数 afterExecute(task, thrown); } } finally { // 任务执行完,丢掉任务 task = null; // 当前工作线程处理的任务数+1 w.completedTasks++; // 执行unlock方法,此时shutdown方法才可以中断当前线程 w.unlock(); } } // 如果while循环结束,正常走到这,说明是正常结束 // 正常结束的话,在getTask中就会做一个额外的处理,将ctl - 1,代表工作线程没一个。 completedAbruptly = false; } finally { // 考虑干掉工作线程 processWorkerExit(w, completedAbruptly); } } // 工作线程结束前,要执行当前方法 private void processWorkerExit(Worker w, boolean completedAbruptly) { // 如果是异常结束 if (completedAbruptly) // 将ctl - 1,扣掉一个工作线程 decrementWorkerCount(); // 操作Worker,为了线程安全,加锁 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 当前工作线程处理的任务个数累加到线程池处理任务的个数属性中 completedTaskCount += w.completedTasks; // 将工作线程从hashSet中移除 workers.remove(w); } finally { // 释放锁 mainLock.unlock(); } // 只要工作线程凉了,查看是不是线程池状态改变了。 tryTerminate(); // 获取ctl int c = ctl.get(); // 判断线程池状态,当前线程池要么是RUNNING,要么是SHUTDOWN if (runStateLessThan(c, STOP)) { // 如果正常结束工作线程 if (!completedAbruptly) { // 如果核心线程允许超时,min = 0,否则就是核心线程个数 int min = allowCoreThreadTimeOut ? 0 : corePoolSize; // 如果min == 0,可能会出现没有工作线程,并且阻塞队列有任务没有线程处理 if (min == 0 && ! workQueue.isEmpty()) // 至少要有一个工作线程处理阻塞队列任务 min = 1; // 如果工作线程个数 大于等于1,不怕没线程处理,正常return if (workerCountOf(c) >= min) return; } // 异常结束,为了避免出现问题,添加一个空任务的非核心线程来填补上刚刚异常结束的工作线程 addWorker(null, false); } }

getTask方法

工作线程在去阻塞队列获取任务前,要先查看线程池状态

如果状态没问题,去阻塞队列take或者是poll任务

第二个循环时,不但要判断线程池状态,还要判断当前工作线程是否可以被干掉

java
// 当前方法就在阻塞队列中获取任务 // 前面半部分是判断当前工作线程是否可以返回null,结束。 // 后半部分就是从阻塞队列中拿任务 private Runnable getTask() { // timeOut默认值是false。 boolean timedOut = false; // 死循环 for (;;) { // 拿到ctl int c = ctl.get(); // 拿到线程池的状态 int rs = runStateOf(c); // 如果线程池状态是STOP,没有必要处理阻塞队列任务,直接返回null // 如果线程池状态是SHUTDOWN,并且阻塞队列是空的,直接返回null if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { // 如果可以返回null,先扣减工作线程个数 decrementWorkerCount(); // 返回null,结束runWorker的while循环 return null; } // 基于ctl拿到工作线程个数 int wc = workerCountOf(c); // 核心线程允许超时,timed为true // 工作线程个数大于核心线程数,timed为true boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; if ( // 如果工作线程个数,大于最大线程数。(一般情况不会满足),把他看成false // 第二个判断代表,只要工作线程数小于等于核心线程数,必然为false // 即便工作线程个数大于核心线程数了,此时第一次循环也不会为true,因为timedOut默认值是false // 考虑第二次循环了,因为循环内部必然有修改timeOut的位置 (wc > maximumPoolSize || (timed && timedOut)) && // 要么工作线程还有,要么阻塞队列为空,并且满足上述条件后,工作线程才会走到if内部,结束工作线程 (wc > 1 || workQueue.isEmpty()) ) { // 第二次循环才有可能到这。 // 正常结束,工作线程 - 1,因为是CAS操作,如果失败了,重新走for循环 if (compareAndDecrementWorkerCount(c)) return null; continue; } // 工作线程从阻塞队列拿任务 try { // 如果是核心线程,timed是false,如果是非核心线程,timed就是true Runnable r = timed ? // 如果是非核心,走poll方法,拿任务,等待一会 workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : // 如果是核心,走take方法,死等。 workQueue.take(); // 从阻塞队列拿到的任务不为null,这边就正常返回任务,去执行 if (r != null) return r; // 说明当前线程没拿到任务,将timeOut设置为true,在上面就可以返回null退出了。 timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }

shutdownNow关闭方法

首先查看shutdownNow方法,可以从RUNNING状态转变为STOP

java
// shutDownNow方法,shutdownNow不会处理阻塞队列的任务,将任务全部给你返回了。 public List<Runnable> shutdownNow() { // 声明返回结果 List<Runnable> tasks; // 加锁 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 不关注这个方法…… checkShutdownAccess(); // 将线程池状态修改为STOP advanceRunState(STOP); // 无论怎么,直接中断工作线程。 interruptWorkers(); // 将阻塞队列的任务全部扔到List集合中。 tasks = drainQueue(); } finally { // 释放锁 mainLock.unlock(); } tryTerminate(); return tasks; } // 将线程池状态修改为STOP private void advanceRunState(int STOP) { // 死循环。 for (;;) { // 获取ctl属性的值 int c = ctl.get(); // 第一个判断:如果当前线程池状态已经大于等于STOP了,不管了,告辞。 if (runStateAtLeast(c, STOP) || // 基于CAS,将ctl从c修改为STOP状态,不修改工作线程个数,但是状态变为了STOP // 如果修改成功结束 ctl.compareAndSet(c, ctlOf(STOP, workerCountOf(c)))) break; } } // 无论怎么,直接中断工作线程。 private void interruptWorkers() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 遍历HashSet,拿到所有的工作线程,直接中断。 for (Worker w : workers) w.interruptIfStarted(); } finally { mainLock.unlock(); } } // 移除阻塞队列,内容全部扔到List集合中 private List<Runnable> drainQueue() { BlockingQueue<Runnable> q = workQueue; ArrayList<Runnable> taskList = new ArrayList<Runnable>(); // 阻塞队列自带的,直接清空阻塞队列,内容扔到List集合 q.drainTo(taskList); // 为了避免任务丢失,重新判断,是否需要编辑阻塞队列,重新扔到List if (!q.isEmpty()) { for (Runnable r : q.toArray(new Runnable[0])) { if (q.remove(r)) taskList.add(r); } } return taskList; } // 查看当前线程池是否可以变为TERMINATED状态 final void tryTerminate() { // 死循环。 for (;;) { // 拿到ctl int c = ctl.get(); // 如果是RUNNING,直接告辞。 // 如果状态已经大于等于TIDYING,马上就要凉凉,直接告辞。 // 如果状态是SHUTDOWN,但是阻塞队列还有任务,直接告辞。 if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) return; // 如果还有工作线程 if (workerCountOf(c) != 0) { // 再次中断工作线程 interruptIdleWorkers(ONLY_ONE); // 告辞,等你工作线程全完事,我这再尝试进入到TERMINATED状态 return; } // 加锁,为了可以执行Condition的释放操作 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 将线程池状态修改为TIDYING状态,如果成功,继续往下走 if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { try { // 这个方法是空的,如果你需要在线程池关闭后做一些额外操作,这里你可以自行实现 terminated(); } finally { // 最终修改为TERMINATED状态 ctl.set(ctlOf(TERMINATED, 0)); // 线程池提供了一个方法,主线程在提交任务到线程池后,是可以继续做其他操作的。 // 咱们也可以让主线程提交任务后,等待线程池处理完毕,再做后续操作 // 这里线程池凉凉后,要唤醒哪些调用了awaitTermination方法的线程 termination.signalAll(); } return; } } finally { mainLock.unlock(); } // else retry on failed CAS } }

再次shutdown方法,可以从RUNNING状态转变为SHUTDOWN

shutdown状态下,不会中断正在干活的线程,而且会处理阻塞队列中的任务

java
public void shutdown() { // 加锁。。 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 不看。 checkShutdownAccess(); // 里面是一个死循环,将线程池状态修改为SHUTDOWN advanceRunState(SHUTDOWN); // 中断空闲线程 interruptIdleWorkers(); // 说了,这个是为了ScheduleThreadPoolExecutor准备的,不管 onShutdown(); } finally { mainLock.unlock(); } // 尝试结束线程 tryTerminate(); } // 中断空闲线程 private void interruptIdleWorkers(boolean onlyOne) { // 加锁 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (Worker w : workers) { Thread t = w.thread; // 如果线程没有中断,那么就去获取Worker的锁,基于tryLock可知,不会中断正在干活的线程 if (!t.isInterrupted() && w.tryLock()) { try { // 会中断空闲线程 t.interrupt(); } catch (SecurityException ignore) { } finally { w.unlock(); } } if (onlyOne) break; } } finally { mainLock.unlock(); } }

线程池的核心参数设计规则

线程池的使用难度不大,难度在于线程池的参数并不好配置。

主要难点在于任务类型无法控制,比如任务有CPU密集型,还有IO密集型,甚至还有混合型的。因为IO咱们无法直接控制,所以很多时间按照一些书上提供的一些方法,是无法解决问题的。

想调试出一个符合当前任务情况的核心参数,最好的方式就是测试。需要将项目部署到测试环境或者是沙箱环境中,结果各种压测得到一个相对符合的参数。如果每次修改项目都需要重新部署,成本太高了。此时咱们可以实现一个动态监控以及修改线程池的方案。

因为线程池的核心参数无非就是:

  • corePoolSize:核心线程数
  • maximumPoolSize:最大线程数
  • workQueue:工作队列

线程池中提供了获取核心信息的get方法,同时也提供了动态修改核心属性的set方法。

image.png

结合以上思路,可以采用动态线程池的思想进行调整,采用一些开源项目提供的方式去做监控和修改,比如 hippo4jdynamic-tp 就可以对线程池进行监控,而且可以和SpringBoot整合,。

线程池参数示例

参数设置准则典型值示例
corePoolSizeCPU密集型:CPU核心数;IO密集型:CPU核心数 * (1 + 平均等待时间/计算时间)8核CPU:IO密集型可设32
maximumPoolSize核心线程数 * 2 ~ 3倍core=8 → max=24
keepAliveTime非核心线程存活时间,建议30~60秒30, TimeUnit.SECONDS

队列选型指南

队列类型特点适用场景
SynchronousQueue零容量,直接传递任务高吞吐量短任务
LinkedBlockingQueue无界队列(默认Integer.MAX_VALUE)需积压任务的批处理系统
ArrayBlockingQueue有界队列,FIFO流量可控的实时系统
PriorityBlockingQueue优先级队列任务有优先级的调度系统

线程池处理任务的核心流程

基于addWorker添加工作线程的流程切入到整体处理任务的位置

image.png

ScheduleThreadPoolExecutor

从名字上就可以看出,当前线程池是用于执行定时任务的线程池。

Java比较早的定时任务工具是Timer类。但是Timer问题很多,串行的,不靠谱,会影响到其他的任务执行。

其实除了Timer以及ScheduleThreadPoolExecutor之外,正常在企业中一般会采用Quartz或者是SpringBoot提供的Schedule的方式去实现定时任务的功能。

ScheduleThreadPoolExecutor支持延迟执行以及周期性执行的功能。

ScheduleThreadPoolExecutor应用

定时任务线程池的有参构造

java
public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory, RejectedExecutionHandler handler) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), threadFactory, handler); }

发现ScheduleThreadPoolExecutor在构建时,直接调用了父类的构造方法

ScheduleThreadPoolExecutor的父类就是ThreadPoolExecutor

首先ScheduleThreadPoolExecutor最多允许设置3个参数:

  • 核心线程数
  • 线程工厂
  • 拒绝策略

首先没有设置阻塞队列,以及最大线程数和空闲时间以及单位

阻塞队列设置的是DelayedWorkQueue,其实本质就是DelayQueue,一个延迟队列。DelayQueue是一个无界队列。所以最大线程数以及非核心线程的空闲时间是不需要设置的。

代码落地使用

java
public static void main(String[] args) { //1. 构建定时任务线程池 ScheduledThreadPoolExecutor pool = new ScheduledThreadPoolExecutor( 5, new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread t = new Thread(r); return t; } }, new ThreadPoolExecutor.AbortPolicy() ); //2. 应用ScheduledThreadPoolExecutor // 跟直接执行线程池的execute没啥区别 pool.execute(() -> { System.out.println("execute"); }); // 指定延迟时间执行 System.out.println(System.currentTimeMillis()); pool.schedule(() -> { System.out.println("schedule"); System.out.println(System.currentTimeMillis()); },2, TimeUnit.SECONDS); // 指定第一次的延迟时间,并且确认后期的周期执行时间,周期时间是在任务开始时就计算 // 周期性执行就是将执行完毕的任务再次社会好延迟时间,并且重新扔到阻塞队列 // 计算的周期执行,也是在原有的时间上做累加,不关注任务的执行时长。 System.out.println(System.currentTimeMillis()); pool.scheduleAtFixedRate(() -> { System.out.println("scheduleAtFixedRate"); System.out.println(System.currentTimeMillis()); },2,3,TimeUnit.SECONDS); // // 指定第一次的延迟时间,并且确认后期的周期执行时间,周期时间是在任务结束后再计算下次的延迟时间 System.out.println(System.currentTimeMillis()); pool.scheduleWithFixedDelay(() -> { System.out.println("scheduleWithFixedDelay"); System.out.println(System.currentTimeMillis()); try { Thread.sleep(4000); } catch (InterruptedException e) { e.printStackTrace(); } },2,3,TimeUnit.SECONDS); }

ScheduleThreadPoolExecutor源码剖析

核心属性

后面的方法业务流程会涉及到这些属性。

java
// 这里是针对任务取消时的一些业务判断会用到的标记 private volatile boolean continueExistingPeriodicTasksAfterShutdown; private volatile boolean executeExistingDelayedTasksAfterShutdown = true; private volatile boolean removeOnCancel = false; // 计数器,如果两个任务的执行时间节点一模一样,根据这个序列来判断谁先执行 private static final AtomicLong sequencer = new AtomicLong(); // 这个方法是获取当前系统时间的毫秒值 final long now() { return System.nanoTime(); } // 内部类。核心类之一。 private class ScheduledFutureTask<V> extends FutureTask<V> implements RunnableScheduledFuture<V> { // 全局唯一的序列,如果两个任务时间一直,基于当前属性判断 private final long sequenceNumber; // 任务执行的时间,单位纳秒 private long time; /** * period == 0:执行一次的延迟任务 * period > 0:代表是At * period < 0:代表是With */ private final long period; // 周期性执行时,需要将任务重新扔回阻塞队列,基础当前属性拿到任务,方便扔回阻塞队列 RunnableScheduledFuture<V> outerTask = this; /** * 构建schedule方法的任务 */ ScheduledFutureTask(Runnable r, V result, long ns) { super(r, result); this.time = ns; this.period = 0; this.sequenceNumber = sequencer.getAndIncrement(); } /** * 构建At和With任务的有参构造 */ ScheduledFutureTask(Runnable r, V result, long ns, long period) { super(r, result); this.time = ns; this.period = period; this.sequenceNumber = sequencer.getAndIncrement(); } } // 内部类。核心类之一。 static class DelayedWorkQueue extends AbstractQueue<Runnable> implements BlockingQueue<Runnable> { // 这个类就是DelayQueue,不用过分关注,如果没看过,看阻塞队列中的优先级队列和延迟队列

schedule方法

execute方法也是调用的schedule方法,只不过传入的延迟时间是0纳秒

schedule方法就是将任务和延迟时间封装到一起,并且将任务扔到阻塞队列中,再去创建工作线程去take阻塞队列。

java
// 延迟任务执行的方法。 // command:任务 // delay:延迟时间 // unit:延迟时间的单位 public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) { // 健壮性校验。 if (command == null || unit == null) throw new NullPointerException(); // 将任务和延迟时间封装到一起,最终组成ScheduledFutureTask // 要分成三个方法去看 // triggerTime:计算延迟时间。最终返回的是当前系统时间 + 延迟时间 // triggerTime就是将延迟时间转换为纳秒,并且+当前系统时间,再做一些健壮性校验 // ScheduledFutureTask有参构造:将任务以及延迟时间封装到一起,并且设置任务执行的方式 // decorateTask:当前方式是让用户基于自身情况可以动态修改任务的一个扩展口 RunnableScheduledFuture<?> t = decorateTask(command, new ScheduledFutureTask<Void>(command, null, triggerTime(delay, unit))); // 任务封装好,执行delayedExecute方法,去执行任务 delayedExecute(t); // 返回FutureTask return t; } // triggerTime做的事情 // 外部方法,对延迟时间做校验,如果小于0,就直接设置为0 // 并且转换为纳秒单位 private long triggerTime(long delay, TimeUnit unit) { return triggerTime(unit.toNanos((delay < 0) ? 0 : delay)); } // 将延迟时间+当前系统时间 // 后面的校验是为了避免延迟时间超过Long的取值范围 long triggerTime(long delay) { return now() + ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay)); } // ScheduledFutureTask有参构造 ScheduledFutureTask(Runnable r, V result, long ns) { super(r, result); // time就是任务要执行的时间 this.time = ns; // period,为0,代表任务是延迟执行,不是周期执行 this.period = 0; // 基于AtmoicLong生成的序列 this.sequenceNumber = sequencer.getAndIncrement(); } // delayedExecute 执行延迟任务的操作 private void delayedExecute(RunnableScheduledFuture<?> task) { // 查看当前线程池是否还是RUNNING状态,如果不是RUNNING,进到if if (isShutdown()) // 不是RUNNING。 // 执行拒绝策略。 reject(task); else { // 线程池状态是RUNNING // 直接让任务扔到延迟的阻塞队列中 super.getQueue().add(task); // DCL的操作,再次查看线程池状态 // 如果线程池在添加任务到阻塞队列后,状态不是RUNNING if (isShutdown() && // task.isPeriodic():现在反回的是false,因为任务是延迟执行,不是周期执行 // 默认情况,延迟队列中的延迟任务,可以执行 !canRunInCurrentRunState(task.isPeriodic()) && // 从阻塞队列中移除任务。 remove(task)) task.cancel(false); else // 线程池状态正常,任务可以执行 ensurePrestart(); } } // 线程池状态不为RUNNING,查看任务是否可以执行 // 延迟执行:periodic==false // 周期执行:periodic==true // continueExistingPeriodicTasksAfterShutdown:周期执行任务,默认为false // executeExistingDelayedTasksAfterShutdown:延迟执行任务,默认为true boolean canRunInCurrentRunState(boolean periodic) { return isRunningOrShutdown(periodic ? continueExistingPeriodicTasksAfterShutdown : executeExistingDelayedTasksAfterShutdown); } // 当前情况,shutdownOK为true final boolean isRunningOrShutdown(boolean shutdownOK) { int rs = runStateOf(ctl.get()); // 如果状态是RUNNING,正常可以执行,返回true // 如果状态是SHUTDOWN,根据shutdownOK来决定 return rs == RUNNING || (rs == SHUTDOWN && shutdownOK); } // 任务可以正常执行后,做的操作 void ensurePrestart() { // 拿到工作线程个数 int wc = workerCountOf(ctl.get()); // 如果工作线程个数小于核心线程数 if (wc < corePoolSize) // 添加核心线程去处理阻塞队列中的任务 addWorker(null, true); else if (wc == 0) // 如果工作线程数为0,核心线程数也为0,这是添加一个非核心线程去处理阻塞队列任务 addWorker(null, false); }

At和With方法&任务的run方法

这两个方法在源码层面上的第一个区别,就是在计算周期时间时,需要将这个值传递给period,基于正负数在区别At和With

所以查看一个方法就ok,查看At方法

java
// At方法, // command:任务 // initialDelay:第一次执行的延迟时间 // period:任务的周期执行时间 // unit:上面两个时间的单位 public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { // 健壮性校验 if (command == null || unit == null) throw new NullPointerException(); // 周期时间不能小于等于0. if (period <= 0) throw new IllegalArgumentException(); // 将任务以及第一次的延迟时间,和后续的周期时间封装好。 ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command, null, triggerTime(initialDelay, unit), unit.toNanos(period)); // 扩展口,可以对任务做修改。 RunnableScheduledFuture<Void> t = decorateTask(command, sft); // 周期性任务,需要在任务执行完毕后,重新扔会到阻塞队列,为了方便拿任务,将任务设置到outerTask成员变量中 sft.outerTask = t; // 和schedule方法一样的方式 // 如果任务刚刚扔到阻塞队列,线程池状态变为SHUTDOWN,默认情况,当前任务不执行 delayedExecute(t); return t; } // 延迟任务以及周期任务在执行时,都会调用当前任务的run方法。 public void run() { // periodic == false:一次性延迟任务 // periodic == true:周期任务 boolean periodic = isPeriodic(); // 任务执行前,会再次判断状态,能否执行任务 if (!canRunInCurrentRunState(periodic)) cancel(false); // 判断是周期执行还是一次性任务 else if (!periodic) // 一次性任务,让工作线程直接执行command的逻辑 ScheduledFutureTask.super.run(); // 到这个else if,说明任务是周期执行 else if (ScheduledFutureTask.super.runAndReset()) { // 设置下次任务执行的时间 setNextRunTime(); // 将任务重新扔回线程池做处理 reExecutePeriodic(outerTask); } } // 设置下次任务执行的时间 private void setNextRunTime() { // 拿到period值,正数:At,负数:With long p = period; if (p > 0) // 拿着之前的执行时间,直接追加上周期时间 time += p; else // 如果走到else,代表任务是With方式,这种方式要重新计算延迟时间 // 拿到当前系统时间,追加上延迟时间, time = triggerTime(-p); } // 将任务重新扔回线程池做处理 void reExecutePeriodic(RunnableScheduledFuture<?> task) { // 如果状态ok,可以执行 if (canRunInCurrentRunState(true)) { // 将任务扔到延迟队列 super.getQueue().add(task); // DCL,判断线程池状态 if (!canRunInCurrentRunState(true) && remove(task)) task.cancel(false); else // 添加工作线程 ensurePrestart(); } }

总结

ThreadPoolExecutor的核心价值在于通过三层缓冲机制实现资源的最优调度:

  1. 核心线程池:常驻线程处理稳态流量
  2. 任务队列:缓冲突发流量
  3. 非核心线程:应对队列满载的极端场景

配合状态机和CAS无锁操作,实现了高并发环境下的线程安全调度。其源码中展现的位运算技巧(如ctl设计)和Worker线程复用机制,是Java并发编程的经典范例。

性能数据参考:在8核服务器上,合理配置的ThreadPoolExecutor可支撑20,000+ QPS的任务调度,相比单线程执行效率提升8-10倍,同时保持95%的CPU利用率。

掌握ThreadPoolExecutor的原理与实现,是构建高并发、高可靠Java应用的基石。建议结合ForkJoinPool(分治场景)和ScheduledThreadPoolExecutor(定时任务)形成完整的线程池技术体系。

本文作者:柳始恭

本文链接:

版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!