线程池源码
2022-10-20 16:01:07 0 举报
线程池源码流程图
作者其他创作
大纲/内容
否-新增非核心线程
都是核心线程--基于链表的阻塞队列--“国企,都有编制”
获取方法workQueue.take();
mainLock.lock();
调用 ReentrantLock解锁
加锁
Executors
设置worker任务为空 task = null;调用worker AQS解锁
从阻塞队列的获取
入队失败了 判断 新增非核心线程是否成功
4.execute(ftask)返回提交的RunnableFuture
调用 ReentrantLock 加锁
newTaskFor(task)直接传参赋值转换new FutureTask<T>(callable)
线程数 < 核心线程数
offer()offer方法队列已满生产者不会阻塞,返回false使用ReentrantLock保证线程安全
addWorkerFailed(w)
解锁
线程添加到队列中
Future<T> submit(Callable<T> task)
Worker.start()
有异常抛出异常
workers.add(w);设置添加成功workerAdded = true;
阻塞队列 继承了BlockingQueue,AbstractQueue基于链表和ReentrantLock队列为空,消费者阻塞;队列满,生产者阻塞读写锁分离,效率更高ReentrantLock takeLock = new ReentrantLock()Condition notEmpty = takeLock.newCondition()ReentrantLock putLock = new ReentrantLock()Condition notFull = putLock.newCondition()
如果线程运行失败调用添加失败方法
执行工作程序退出1.一般是抛出异常2.获取的任务为空
如果 线程数>最大线程数或者 允许核心线程是超时回收或者 线程数> 核心线程数或者 从队列中获取任务超时且 工作线程数>1 或者队列为空返回null,并减少线程工作数量
以LinkedBlockingQueue为例
根据任务新建线程
工作的线程数是否等于最大线程数
Worker.run()
finally
newCachedThreadPool
基于优先级队列的线程池,数据结构是二叉堆,堆顶元素设置过期时间,为0则出栈,循环任务,操作完成后添加回队列
主要成员
死循环获取
是
循环 task = getTask() 从阻塞队列中获取任务没有获取到会阻塞
workQueue.take()调用的是阻塞队列的阻塞获取方法如果没有会阻塞
新增是唤醒线程获取任务执行由阻塞队列实现
new ThreadPoolExecutor()
返回
如果没有终止,没有中断XXXXX
newSingleThreadExecutor
Future<?> submit(Runnable task)
移除工作线程workers.remove(w)CAS 线程数减一decrementWorkerCount();尝试终止tryTerminate
添加成功则调用操作系统创建线程然后系统回调worker的run方法
线程退出逻辑:runWorker 是一个while循环,只要获取任务不为空就会一直执行,同时获取任务的方法getTask也是一个while循环,所以线程退出的触发条件就是,getTask的方法返回null或者任务的run方法抛出异常的情况
阻塞队列:存放提交的任务BlockingQueue<Runnable> workQueue保存线程的容器 HashSet<Worker> workers = new HashSet<Worker>()保证线程安全 :ReentrantLock mainLock = new ReentrantLock()阻塞唤醒:Condition termination = mainLock.newCondition()线程工厂:生成线程的工厂:ThreadFactory threadFactory拒绝策略处理器:RejectedExecutionHandler handler核心线程数,最大线程数,存活时间是否允许核心线程超时回收 allowCoreThreadTimeOut
newFixedThreadPool
执行run方法法
验证线程池是否要终止
拒绝该任务reject(command)
ScheduledThreadPoolExecutor
4种提交任务的方式
是-入队成功 直接返回
mainLock.unlock()
线程数 > 核心线程数 --入队池正在运行 且入队成功
如果池不是运行状态 且 任务移除成功
ReentrantLock 加锁工作线程移除workers.remove(w)ReentrantLock 解锁
没有核心线程,非核心线程没有上限 --\"外包公司\"--基于同步阻塞阻塞队列,队列:核心线程为0,阻塞队列数量为0,每次新增都会阻塞,并创建一个非核心线程来消费,消费完生产者结束阻塞
Worker.lock()
workQueue.take()
都会包装成 RunnableFuture提交给execute()执行
线程池保证线程安全:1.生产线程--addWorker 通过ReentrantLock保证线程安全2.新增任务--使用阻塞队列的特性保证线程安全 底层是使用ReentrantLock cas添加3.获取任务--使用阻塞队列的特性保证线程安全 底层是使用ReentrantLock,链表双锁,数组单锁4.执行任务 -- 执行任务基于AQS自定义的并发锁Worker保证线程安全
AQS加锁
new Worker(firstTask)
1.线程已经关闭或者2.停止,或者3.队列为空返回null,并减少线程工作数量
入队方法workQueue.offer(command)
新增非核心线程
否
Runnable task = w.firstTasktask != null || (task = getTask()) != null
是:新增核心线程
执行前的扩展接口
返回null
队列为空会阻塞线程,直到队列中新增任务,会唤醒等待中的线程
是,拒绝任务
新增线程
this.firstTask = firstTask;this.thread = getThreadFactory().newThread(this);
task = getTask() 从阻塞队列中获取任务
减小线程数并返回空
task.run();
tryTerminate();
wile 循环获取任务先从worker中取为空就从队列中取
runWorker(this)
死循环--自旋检查状态
如果设置核心线程回收或者线程数大于核心线程数且存在线程或者队列为空
”个体户“-只有一个线程,没有非核心线程--基于链表的阻塞队列
0 条评论
下一页