ThreadPoolExecutor线程池源码分析
2025-01-24 09:28:43 0 举报
`ThreadPoolExecutor` 是 Java 中用于创建线程池的核心类,定义在 `java.util.concurrent` 包下,提供了一种在多线程执行中管理线程生命周期、任务队列以及线程容量的机制。它允许开发者控制线程池的启动、停止以及等待中任务的队列处理方式。 在其源码实现中,`ThreadPoolExecutor` 继承自 `AbstractExecutorService`,并在构造器中初始化线程池的基本参数,包括核心线程数、最大线程数、存活时间、时间单位以及工作队列等。它通过任务的提交、执行和资源的回收等生命周期管理,增强了线程复用,提高了程序性能,同时支持任务延迟执行和周期性执行等高级特性。 核心方法包括 `execute`(提交任务)和 `shutdown`/`shutdownNow`(关闭线程池),并通过状态标志 `ctl` 控制线程池的运行状态和线程的创建。内部的线程管理采用装饰者设计模式,包装了 `Runnable` 任务为 `RunnableFuture`,再进一步到 `FutureTask`,实现任务执行后能够提供结果和状态。 除了原子类 `AtomicInteger`,`ThreadPoolExecutor` 还利用同步锁 `ReentrantLock` 和条件变量 `Condition` 保证对线程资源的竞争控制和协调,确保线程池状态的正确切换和线程任务执行的稳定性。源码提供了灵活可调的结构,通过丰富的抽象,使得开发者可以根据应用场景的不同需求进行定制化配置。
作者其他创作
大纲/内容
c = ctl.get(); // Re-read ctlif (runStateOf(c) != rs){ continue retry;}
添加阻塞队列失败
int wc = workerCountOf(c);//获取工作线程数量boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
int wc = workerCountOf(c)
跳出外层循环,重新流程判断
对核心属性ctl进行+1操作
线程状态不是RUNNING
for (;;) {}
线程池状态检查
不满足条件
满足条件判断
非运行状态
线程是否添加成功的标识判断
若线程池状态为STOP,则需要将线程进行中断操作
创建核心线程
rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)检查是否满足任务执行的条件:1.当前状态为 RUNNING2.当前状态为 SHUTDOWN 且 线程任务为空(?)
将工作任务添加到workers集合中
更新线程池中工作线程的数量
当前活动线程数量判断
加入任务队列失败(说明队列已满)
从Worker中取出线程
执行拒绝策略
将线程任务加入到阻塞队列
1.int s = workers.size();获取HashSet<Worker>集合的数量,得到当前线程池中工作线程的数量2.if (s > largestPoolSize){ largestPoolSize = s;}如果当前线程池中工作线程的数量大于记录的最大线程数量时。对largestPoolSize进行重新赋值largestPoolSize成员变量:用于记录当前线程池中的最大线程个数
启动失败
ctl+1结果判断
workers.add(w)workers对象的本质是HashSet<Worker>集合
任务执行结束继续循环,直到工作线程任务集合中的任务执行完
开始执行Worker对象中的run()方法的逻辑
ThreadPoolExecutor.runWorker(Worker w)
addWorkerFailed(w);
线程池状态判断
可以创建核心线程
final Thread t = w.thread;
1.final ReentrantLock mainLock = this.mainLock; 获取锁对象2.mainLock.lock(); 进行加锁3.int rs = runStateOf(ctl.get()); 获取线程池当前状态
workerCountOf(c) < corePoolSize
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null;}
跳出外层循环,开始添加工作线程并启动工作线程逻辑
1.检查线程池的状态是否大于等于SHUTDOWN,如果满足则说明线程池的状态不是RUNNING2.状态不是SHUTDOWN,且线程任务不是空 且工作队列为空
+1操作成功
!isRunning(c)
重新获取核心属性ctl中高3位的值
workQueue.offer(command)
+1操作失败
添加工作任务失败
while (task != null || (task = getTask()) != null) {}
工作线程启动结果判断
在循环中进行任务对象判断
int rs = runStateOf(c)用于判断线程池状态
获取当前活动的线程数
获取是否允许超时或当前工作线程数是否大于核心线程数
从Worker中获取工作线程对象
Running状态
创建非核心线程
添加失败
线程状态判断
retry:用于标记外层for循环,目的是为了方便内层for循环跳出时,直接跳出到标记的地方
Runnable task = w.firstTask; w.firstTask = null; //任务对象取出后,需要将其位置置为null w.unlock();//调用该方法的目的是执行Worker类中的unlock方法,将当前线程任务设置为可中断状态boolean completedAbruptly = true;//用来标识当前工作线程是否异常结束,默认就是异常结束
将工作线程是否添加成功的标识更新为true
workerAdded = true
1.启动线程2.将工作线程是否启动的标识更新为true
线程池数据量判断
创建一个无工作任务的非核心线程
开始执行线程工作任务逻辑
int recheck = ctl.get()
调用execute/submit方法
isRunning(c)
满足条件
需要将添加的工作任务从集合中移除
启动成功
remove(command)
task.run();
true
将添加的线程任务从队列中移除
无论调用execute()还是submit()方法,其底层都是执行的调用execute()方法
添加队列成功
内层for (;;)死循环调用
retry
工作线程对象不存在或执行时发生异常需要进行一些清理操作
if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue;}1.当前工作线程数已经大于线程池允许的最大线程数 或 允许超时2.当前工作线程数大于1 或 工作队列为空3.compareAndDecrementWorkerCount(c):尝试减少工作线程数,执行成功则返回null,否则跳出当前循环,再重新执行循环
if (workerAdded) { t.start(); workerStarted = true; }
从woeker中获取需要执行的任务
wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize)1.当前活动线程数已经大于等于允许的最大线程数(2^29-1)2.若创建的是核心线程,则检查当前活动线程数是否已 大于设置的核心线程数量 若创建的是非核心线程,则检查当前活动线程数是否已大于设置的最大线程数量
判断结果为false不满足创建工作线程的条件则将线程任务添加到队列中
ThreadPoolExecutor
不满足条件判断
正式获取任务
开始执行死循环
核心线程数判断
boolean workerStarted = false; 标识工作线程是否启动 默认为falseboolean workerAdded = false; 标识工作线程是否添加成功 默认为falseWorker w = new Worker(firstTask); 创建工作任务对象
创建线程工作对象Worker
compareAndIncrementWorkerCount(c)
getTask()
添加工作任务的前置处理
操作结束
false
外层for (;;)死循环调用
wt.interrupt();
返回获取到的工作任务
reject(command);
创建非核心线程是否成功
开始执行任务
重新获取线程池的核心属性AtomicInteger ctl
CAS操作失败流程重新开始
0 条评论
下一页