ThreadPoolExecutor
2023-05-14 14:26:44 0 举报
threadPool
作者其他创作
大纲/内容
//继承了AQS,并且 Worker 就是要创建的线程class Worker extends AbstractQueuedSynchronizerimplements Runnable//将 firstTask 需要执行的任务作为其 Runnable firstTask; 成员变量// this.thread = getThreadFactory().newThread(this); 调用线程工厂,将 Worker 创建成所需的线程,赋值给 Worker 的成员变量 thread//其 run()方法直接调用外部类Executor的runWorker(this)方法,参数是 Worker 本身
workerCount>= CAPACITY || workerCount>= (core ? corePoolSize : maximumPoolSize)
判断当前线城数是否小于核心线程数
循环结束,线程也结束
Running 运行中
new 新建
线程数<核心线程数
再判断一次档当前Executor到是否满足任务执行条件:当前状态是RUNNING或者(当前状态时SHUTDOWN且firstTask不为空)
w = new Worker(firstTask);
Object.wait()LockSupport.park()Object.join()
workers.add(w);,将worker线程放入一个HashSet集合中保存
线程数<最大线程数
开始
是
添加核心工作线程
workQueue.take();
workQueue.offer(command)
否
需要淘汰线程,设置超时时间
TEMINATED终止
执行任务
是,创建核心线程,core = true
int rs = runStateOf(c);获取Executor的运行状态,如果不满足直接return
ThreadPoolExecutor.runWorker(Worker w)
//工作线程自加1,使用cas方式解决多线程自加(无锁)compareAndIncrementWorkerCount(c)
获取到任务
task.run();
for (;;)死循环调用
return task
系统调度
否,尝试加入阻塞队列
//再判断一次Executor是否正常运行! isRunning(recheck)
整体死循环结束,确定可以去创建线程continue retry;
Runnable
whilen循环,只要task不为空那么一直执行
加入阻塞队列,失败(说明阻塞队列已满,尝试创建非核心线程)
yield()
添加任务到i阻塞队列
添加非核心工作线程
WAITING 和 BLOCKED 的区别是:WAIRING 需要等待其他线程唤醒,期间不参与CPU调度BLOCKED 只能等待获取锁,期间参与CPU调度,但是不执行任何指令
取出woker线程
阻塞队列是否已满
执行拒绝策略
thread.start()
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
retry:
firstTask,是当前提交的任务。线程创建完成时直接调用,处理任务,不需要通过阻塞队列
判断是否需要淘汰线程 allowCoreThreadTimeOut 默认为false,是配置核心线程是否需要被淘汰的参数,需要淘汰设置为 true。如果核心线程也需要淘汰,那么说明只要有线程超时未获取到任务,就需要结束当前线程。wc > corePoolSize:活动线程数大于核心线程数,说明一旦有线程超时未获取到任务,就需要结束当前线程
超时为获取到任务,说明当前超过了没有任务执行的保活时间,应该结束线程
true
提交任务
Ready 就绪
notify()notifyAll()LockSupport.unPark()
如果是调用的事submit方法,那么直接就会先调用FutureTask.run()方法。因为submit()会对提交的任务包装成FutureTask
不满足创建线程的条件,直接尝试加入阻塞队列
是,创建非核心线程,core = false
执行拒绝策略:reject(command);
task = getTask()
TIMED_WAITING超时等待
Synchronized
//将任务从阻塞队列中移除remove(command)
t.start();//开启worker线程,执行worker的run()方法
返回任务
ThreadPoolExecutor
final Thread t = w.thread;
submit/execute提交任务(Runnable/Callable)
submit
while (task != null || (task = getTask()) != null)
workerCount 当前活动线程数大于等于最大的线程总数(2^29-1),不能再容纳线程了,直接 return false 或者(core true 表示核心线程,false 表示非核心线程)创建核线程时:活动线程数大于等于核心线程数,说明不该再创建核心线程了,直接 return false;创建非核心线程时:活动线程数>最大线程数,说明不应该再创建线程了,直接 return false
不需要淘汰线程,阻塞读取,直到从阻塞队列中获取到任务
活得锁
set(result);
唤醒submitResul.get()的线程,unpark()
达到最大线程数并且阻塞队列也满了
BLOCKED 阻塞
for (;;)死循环调用
Object.wait(time)Thread.sleep(time)Object.join(time)LockSupport.parkNanos()
WAITING等待
可以正常去创建线程
//取出创建线程时的第一个taskRunnable task = w.firstTask;
*******线程的保活时间就是,阻塞队列的超时时间*******,一旦指定时间内仍未获取到任务,说明已经没有新的任务过来的了,当前线程已经空闲。wc > corePoolSize,说明存在非核心线程,那么当前线程一旦空闲就可以结束。所谓线程池核心线程的保活,就是 workQueue.take(); 直接放到阻塞队列的BlockingQueue里面,不让核心线程执行的run方法结束!!!!!!
workerCountOf(c) < corePoolSize
cas失败,表示有另一个线程去创建了,返回重新判断
结束
收藏
0 条评论
下一页