线程池源码分析
2022-09-08 23:37:17 11 举报
线程池源码分析
作者其他创作
大纲/内容
从workers中移除workerfinal ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { completedTaskCount += w.completedTasks; workers.remove(w); } finally { mainLock.unlock(); }
返回futuretaskreturn ftask;
变量allowCoreThreadTimeOut的官方介绍是:如果为 false(默认),核心线程即使在空闲时也保持活动状态。 如果为真,核心线程使用 keepAliveTime 超时等待工作也就是说此时的min等于collPoolSize判断当前存活的线程数是不是大于等于min如果是则什么不用做如果不是则会调用addWorker()去创建线程上文分析过了,调用这个api会创建一个线程,然后worker被添加到了workers中那么结论是:默认情况下,核心线程不会被销毁可以调用public void allowCoreThreadTimeOut(boolean value) 这个api去更改此值
但是将任务放到队列了为什么还要DCL重新检查呢?摘自官方描述:如果一个任务可以成功排队,那么我们仍然需要仔细检查我们是否应该添加一个线程 (因为现有的线程自上次检查后就死了)或者池在进入此方法后关闭。所以我们重新检查状态,如果停止则回滚入队,如果没有,则启动一个新线程。我解释一下,这个李老哥的意思就是说,你把任务放到队列了,好像没啥毛病,但是如果,你刚放进去,任务还没处理,线程池就关闭了,那么要检查一下,如果真的是线程池关闭了,则需要执行拒绝策略,否则,任务放进队列了,检查一下有没有存活的线程,没有则进行addWorker()进行添加线程(非核心员工)去处理任务,此处的addworker真的会添加吗?未必,总不能一直添加对吧,继续往下看源码分析
workCountOf方法其实就是计算ctl.get() & CAPACITY而再之后的源码分析中,当线程增加,ctl的值使用cas会不断递增也就是说ctl.get() & CAPACITY会从一开始的0不断的随着线程数的增加而递增
public Future<?> submit(Runnable task) {
ThreadPoolExecutor.shutdown();中有解释,原来,在这个方法中执行的是线程池的停止命令,在这里面,会把每一个worker从workers中拿出来遍历的去中断,中断前就会尝试拿worker的锁,拿到了再中断,此处就是保证了线程在执行任务的时候,不允许被中断呗
这几行的意思就是从worker中拿我们的任务,拿到的不是null则直接进入下面的处理逻辑,如果是null则调用getTask()去阻塞队列拿,这边看一些博客说的执行优先级,分配优先级,我想可能就是此处,大家还记得上面代码中,任务在corepoolsize满了之后,会放到阻塞队列中去吗,但是此处却是先处理worker内的任务Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) {
public void run() { runWorker(this); }
执行的任务为空抛异常if (task == null) throw new NullPointerException();
这是worker的构造,可以看到他是调用我们创建的线程工厂去生产线程,并把任务加进去了Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); }而worker类我们看一下他的继承关系private final class Worker extends AbstractQueuedSynchronizer implements Runnable发现worker本身就是一个可执行的任务,而且worker自身维护了刚创建的线程的引用 this.thread = getThreadFactory().newThread(this);盲猜:会使用这个引用去执行自身这个worker真会玩。。。。
getTask()
必须来一个总结了,span style=\"font-size: inherit;\
可以直观的看到submit是有返回值的,而且他其实调用的就是execute
reject(command);
这边有李大神标记的线程的几个状态,我依旧摘自官方描述:RUNNING:接受新任务并处理排队的任务SHUTDOWN:不接受新任务,但处理排队的任务STOP:不接受新任务,不处理排队的任务,*并中断正在进行的任务TIDYING:所有任务都已终止,workerCount 为零,* 转换到状态 TIDYING 的线程 * 将运行 terminate() 钩子方法TERMINATED:终止()已完成
private static boolean isRunning(int c) { return c < SHUTDOWN; }
那我们就从submit开始分析,走起!!!!
执行任务execute(ftask);
最终解锁,如果刚才添加成功了,则调用start开启任务,注意看是t去调用的,而t就是我们在创建worker的时候调用我们自己的线程工厂去生产出来的线程,这个线程维护在了worker内,而worker本身就是一个runnable,所以此处其实就是运行的worker内的run方法,盲猜正确!finally { mainLock.unlock(); } if (workerAdded) { t.start(); workerStarted = true; }
果不其然拿到了引用,这里还上了把锁这把锁的作用是,在锁的范围内对workers这样的集合做了一些操作,必须是线程安全的final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock();
首先是workerCountOf方法private static int workerCountOf(int c) { return c & CAPACITY; }private static final int CAPACITY = (1 << COUNT_BITS) - 1;private static final int COUNT_BITS = Integer.SIZE - 3;
下面的操作要是线程安全的,所以上了一把mainLock锁又是DCL重复检查,这边是检查线程池状态然后使用workers进行添加,看看官方对workers的描述:包含池中所有工作线程的集合。仅在 * 持有 mainLock 时访问还有一个变量:largestPoolSize官方描述:跟踪获得的最大池大小。只能在 * mainLock 下访问。然后把此次的添加状态改为了true// Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; }
使用cas把ctl原子加1,注意break的是retry if (compareAndIncrementWorkerCount(c)) break retry;
这个方法没啥好说的吧,在文章起始我就介绍了李哥定义的线程池的几个状态,判断是不是running中只要判断值大于shuttdown即可
分析之前先说一下创建线程池的几个参数,我们举一个公司职员的例子方便理解:1.corePoolSize:核心线程数(公司的正式职工)2.maximumPoolSize:(公司的总职工数,正式职工+临时工)3.keepAliveTime:临时工的存活时间4.临时工存活时间的时间单位5.阻塞队列,理解为公司仓库,公司的活干不完的先在仓库放一放6.拒绝策略:活在仓库都放的发霉了,来的新活没法干了,只能拒绝了,怎么拒绝呢?就是这个拒绝策略
c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop
线程池在执行任务的时候可以使用submit去执行也可以使用execute去执行,那么这两个api有何区别呢?
break retry后执行w = new Worker(firstTask);
ThreadPoolExecutor源码分析
不符合条件不会添加workerint c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false;
int wc = workerCountOf(c);超出了创建线程数的数值则不创建直接return false if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false;
为什么我会猜测这个方法得到的是当前线程池共计拥有的线程数呢?
0 条评论
下一页