07_concurrent包2
2024-05-23 21:38:19 0 举报
Java ThreadPool源码剖析
作者其他创作
大纲/内容
3位:状态
优点:创建线程数可控,通过有限的线程数量处理任务。缺点:由于用了无界队列,导致任务无限怼到队列中,吃掉内存 导致OOM队列:LinkedBlockingQueue
POLL任务160s
HashSet<Worker> workers 任务
PUT任务1
ThreadPoolExecutor
Worker.run() { runWorker(this); }
HashSet<Worker> workers (core线程池)
当前线程数< corePoolSize
Woker
执行任务后进入while循环,从队列中poll
maxsimumpoolsize 临时线程超过keepalivetime后销毁
N
ReentrantLock mainLock
当前worker< corePoolSize?
ctl
压入队列cachedThreadPool线程池 使用同步队列SynchronousQueue.offer()所以在没有POLL任务的线程时,入队失败
return;
当前线程总数<= maxsimumpoolsize
FixedThreadPool
参数:corepoolsize=5 (自定义大小)maxsimumpoolsize=5 (=core数量)keepaliveTime=0
提交一个线程
参数:corepoolsize=0maximumpoolsize=Integer.MAX_VALUEkeepaliveTime=60
如果采用newfixed线程池,这里达到的效果是,当线程池workers的数量==core时,所有的worker都在队列上阻塞take(),一旦有任务放到队列中时,就会有一个worker被唤醒执行任务。
因为corePoolSize = 0,所以肯定不成立
等待执行
w.lock()其实是通过AQS的lock实现
ThreadPoolExecutor.runWorker(Worker w)
执行拒绝策略RejectedExecutionHandler
29位:数量
配对成功标志着getTask()能拿到任务,worker就开始处理,之后继续进入while循环getTask
线程池是否允许?是否有存活worker?
Y
优点:不断新增线程,快速处理任务;线程空闲等待60秒,如果时间段内有任务就处理,没有就过期。缺点:毫无限制的线程增加使得CPU消耗巨大,导致系统崩溃队列:SynchronousQueue,且默认队列是非公平的,采用TransferStack栈的方式LIFO,如果是公平的采用TransferQueue队列方式FIFO。
释放锁w.unlock();
run
当worker执行的任务抛异常,就会跳出while在finally中进行清理
压入队列newFixedThreadPool线程池 使用无界队列LinkedBlockingQueue.offer()不阻塞,直接挂到队尾
入队成功?
cachedThreadPool弹性线程池
HashSet<Worker> workers (临时线程池)
通过while循环来执行任务,不断通过getTask获取队列中的任务while (task != null || (task = getTask()) != null) {
执行任务之前ThreadPoolExecutor.beforeExecute()
当临时worker处理完自己的任务后,会对synchronousQueue进行拉取任务,并设定超时时间60s。 根据同步队列的特点,会先在栈顶压入poll任务,并等待60s,有两种情况:1、期间有PUT任务压入,会将put和poll任务进行match,匹配成功后交换数据并将两个线程出队(如:PUT1和POLL2)。2、如果没有PUT任务压入,POLL任务就会超时,park在此的线程超实惠后悔进行unpark,并且会拿到一个null。
RejectedExecutionHandler 默认拒绝策略
firstTask
执行我们的线程task.run();
ThreadFactory 默认的
POLL任务260s
执行任务之后ThreadPoolExecutor.afterExecute()
TransferState
corepoolsize 线程池线程数
BlockingQueue 线程等待队列有界/无界
从线程池workers中移除掉异常worker。
创建 Worker
清理不工作的workerThreadPoolExecutor.processWorkerExit()
标记线程池为TERMINATED
keepaliveTime 线程闲置阈值
getTask()拿不到任务,说明60s内没有新的PUT任务压到栈,while循环就退出。之后会销毁,之后继续重新创建临时线程。
创建 Workerextends AQS implements Runnable用AQS的state标识task运行状态
在构造的时候,将传入的线程替换为自己,以便在之后运行自己的run方法Worker(Runnable firstTask) { setState(-1); // 利用AQS的state标记线程状态 this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); }
尝试再次创建线程放入临时线程池
CAS递增线程数量compareAndIncrementWorkerCount()
加锁mainLock.lock();
match成功
阻塞获取任务getTask()workQueue.take()
0 条评论
下一页