线程池源码
2022-11-16 13:45:08 0 举报
线程池源码
作者其他创作
大纲/内容
if (compareAndIncrementWorkerCount(c))
c & CAPACITY
定时线程池执行
int recheck = ctl.get();
ensurePrestart();
if (t.getPriority() != Thread.NORM_PRIORITY)
min = 1;
如果可以在给定当前运行状态和关机后运行参数的情况下运行任务,则返回 true。形参:periodic – 如果此任务是周期性的则为 true,如果延迟则为 false
if (t != null)
workerAdded = true;
cancel(false);
如果(SHUTDOWN 和池和队列为空)或(STOP 和池为空),则转换为 TERMINATED 状态。如果符合终止条件但 workerCount 不为零,则中断空闲工作程序以确保关闭信号传播。必须在任何可能导致终止的操作之后调用此方法——减少工作人员数量或在关闭期间从队列中删除任务。该方法是非私有的,以允许从 ScheduledThreadPoolExecutor 进行访问。
w.unlock();
w.lock()
DelayedWorkQueue.add
w = new Worker(firstTask);
return t;
isRunningOrShutdown(periodic ?continueExistingPeriodicTasksAfterShutdown :executeExistingDelayedTasksAfterShutdown);
reExecutePeriodic(outerTask);
执行FutureTask的run方法去执行设置的callable的call方法去执行定时线程池传入的Runnable的run方法
true
reject(command);
canRunInCurrentRunState(task.isPeriodic())
false
setNextRunTime();
if (workerAdded)
runAndReset()
threadPoolExecutor.execute
tryTerminate();
插入堆尾。
t.start();
Thread wt = Thread.currentThread();Runnable task = w.firstTask;w.firstTask = null;
workers.add(w);
int rs = runStateOf(ctl.get());
}
容量扩增50%。
wt.interrupt();
cas设置工作线程数成功
task.run();
if (wc < corePoolSize)
高位记录线程池状态,低位记录核心线程数
core调用方法时传递,true核心线程
if (completedAbruptly)
if (min == 0 && ! workQueue.isEmpty())
判断completedAbruptly是否突然完成
Worker.run()
if (workerCountOf(c) >= min)
final Thread t = w.thread;
不是突然完成
else if (!periodic)
int c = ctl.get();
mainLock.lock();
int s = workers.size();
ScheduledFutureTask.super.run();
if (s > largestPoolSize)
c = ctl.get();
if (!completedAbruptly)
if (! workerStarted)
grow()
运行中断
添加Worker,从队列获取任务
if (! isRunning(recheck) && remove(command))
int wc = workerCountOf(ctl.get());
if (t.isAlive())
放入延迟队列,等待Worker的getTask()获取任务
移除出workers
return;
c = ctl.get();
leader = null;
task.cancel(false);
Worker 类主要维护线程运行任务的中断控制状态,以及其他一些次要的簿记。此类机会性地扩展 AbstractQueuedSynchronizer 以简化获取和释放围绕每个任务执行的锁。这可以防止旨在唤醒等待任务的工作线程的中断,而不是中断正在运行的任务。我们实现了一个简单的不可重入互斥锁,而不是使用 ReentrantLock,因为我们不希望工作任务在调用 setCorePoolSize 等池控制方法时能够重新获取锁。此外,为了在线程实际开始运行任务之前抑制中断,我们将锁定状态初始化为负值,并在启动时将其清除(在 runWorker 中)
分3步进行:** 1。如果运行的线程少于corePoolSize,请尝试*以给定的命令为第一个启动一个新线程*任务。对addWorker的调用会自动检查runState和* workerCount,这样可以防止误报警*在不应该的时候线程,返回false。** 2。如果一个任务可以成功排队,那么我们仍然需要*来再次检查我们是否应该添加线程*(因为自上次检查以来已有的已经死亡)*自进入此方法以来池已关闭。所以我们*重新检查状态,如果有必要回滚排队if*停止,或启动一个新的线程,如果没有。** 3。如果不能将任务排队,则尝试添加新的*线程。如果它失败了,我们知道我们已经关闭或饱和了*,因此拒绝任务。
runWorker(this);
w.completedTasks++;
else if (ScheduledFutureTask.super.runAndReset())
预先检查 t 是否可启动
boolean periodic = isPeriodic();
int wc = workerCountOf(c);
completedAbruptly = false;
定时线程池,稳定定时器
在不设置结果的情况下执行计算,然后将此未来重置为初始状态,如果计算遇到异常或被取消,则不会这样做。这是为与本质上执行多次的任务一起使用而设计的。返回值:如果成功运行并重置则为true
if (i = queue.length)
completedTaskCount += w.completedTasks;
主要工人运行循环。重复从队列中获取任务并执行它们,同时处理一些问题: 1. 我们可能从一个初始任务开始,在这种情况下我们不需要获取第一个任务。否则,只要池在运行,我们就会从 getTask 获取任务。如果它返回 null,那么 worker 会由于池状态或配置参数的改变而退出。其他退出是由外部代码中的异常抛出引起的,在这种情况下 completedAbruptly 成立,这通常会导致 processWorkerExit 替换此线程。 2. 在运行任何任务之前,获取锁以防止在执行任务时其他池中断,然后我们确保除非池正在停止,否则该线程不会设置其中断。 3. 每个任务运行之前都会调用 beforeExecute,这可能会引发异常,在这种情况下,我们会导致线程死亡(使用 completedAbruptly true 中断循环)而不处理任务。 4. 假设 beforeExecute 正常完成,我们运行任务,收集任何抛出的异常以发送到 afterExecute。我们分别处理 RuntimeException、Error(规范保证我们捕获)和任意 Throwable。因为我们不能在 Runnable.run 中重新抛出 Throwable,所以我们在出路(到线程的 UncaughtExceptionHandler)时将它们包装在 Errors 中。任何抛出的异常也会保守地导致线程死亡。 5、task.run完成后,我们调用afterExecute,也可能会抛出异常,同样会导致thread死掉。根据 JLS Sec 14.20,这个异常即使 task.run 抛出也会生效。异常机制的最终效果是 afterExecute 和线程的 UncaughtExceptionHandler 拥有我们可以提供的关于用户代码遇到的任何问题的准确信息。
available.signal();
if (workerCountOf(recheck) == 0)
final ReentrantLock mainLock = this.mainLock;mainLock.lock();
ScheduledFutureTask.run()
executor.scheduleAtFixedRate
while (task != null || (task = getTask()) != null)
计算核心线程数
mainLock.unlock();
if (rs >= SHUTDOWN &&! (rs ==SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))
final ReentrantLock mainLock = this.mainLock;
if (isShutdown() && !canRunInCurrentRunState(task.isPeriodic()) &&remove(task))
delayedExecute(t);
if (queue[0] == e)
else if (wc == 0)
设置callable
setState(-1); //禁止中断直到runWorkerthis.firstTask = firstTask;this.thread = getThreadFactory().newThread(this);
保持锁定状态重新检查。在 ThreadFactory 失败或在获取锁之前关闭时退出。
if (!canRunInCurrentRunState(periodic))
DefaultThreadFactory.newThread
super.getQueue().add(task);
判断运行状态是否改变
创建ScheduledFutureTask
largestPoolSize = s;
在给定线程中执行给定的 Runnable 之前调用的方法。此方法由将执行任务r的线程t调用,可用于重新初始化 ThreadLocals,或执行日志记录。这个实现什么都不做,但可以在子类中定制。注意:为了正确地嵌套多个覆盖,子类通常应该在此方法结束时调用super.beforeExecute
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
workerStarted = true;
for (;;)
boolean workerStarted = false;boolean workerAdded = false;Worker w = null;
getTask()
addWorkerFailed(w)
size = i + 1;
如果池正在停止,确保线程被中断;如果不是,确保线程不被中断。这需要在第二种情况下重新检查以在清除中断时处理 shutdownNow 竞争
decrementWorkerCount();
执行阻塞或定时等待任务,具体取决于当前配置设置,或者如果此工作人员由于以下任何原因必须退出,则返回空值: 1. 工作人员数量超过 maximumPoolSize(由于调用 setMaximumPoolSize)。 2.游泳池停止。 3.池关闭,队列为空。 4、本worker等待任务超时,超时的worker在定时等待前后都会被终止(即allowCoreThreadTimeOut || workerCount > corePoolSize ),如果队列非空,则本worker不是池中的最后一个线程。返回值:task,如果 worker 必须退出则为 null,在这种情况下 workerCount 递减
不是RUNNING状态或者队列已满
if (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))
工作数为0,添加一个空的保活
for (;;) {
boolean completedAbruptly = true;
task = null;
工作线程数减1
if (isShutdown())
判断是否超过允许的最大线程数,或自定义设置的最大线程数或核心线程数
判断是否是守护线程
if (i == 0)
向队列中添加任务
突然完成
执行设置的拒绝策略
return;
主池控制状态ctl是一个原子整型封装了两个概念字段workerCount,表示有效线程数runState,表示是否运行,关闭等为了将它们封装成一个int,我们将workerCount限制为(2^29 )-1(约 5 亿)个线程,而不是 (2^31)-1(20 亿)个线程。如果这在未来成为一个问题,可以将变量更改为 AtomicLong,并调整下面的移位/掩码常量。但在需要出现之前,这段代码使用 int 会更快更简单一些。 workerCount 是允许启动但不允许停止的 worker 数量。该值可能暂时不同于实际的活动线程数,例如,当 ThreadFactory 在请求时无法创建线程,以及退出线程在终止前仍在执行簿记时。用户可见的池大小报告为工作程序集的当前大小。 runState 提供主要的生命周期控制,取值: RUNNING:接受新任务并处理排队任务 SHUTDOWN:不接受新任务,但处理排队任务 STOP:不接受新任务,不处理排队任务,并中断正在进行的任务 TIDYING:所有任务都已终止,workerCount 为零,转换到状态 TIDYING 的线程将运行 terminated() 挂钩方法 TERMINATED:terminated() 已完成 这些值之间的数字顺序很重要,以允许有序比较. runState 随时间单调增加,但不需要达到每个状态。转换是: RUNNING -> SHUTDOWN 在调用 shutdown() 时,可能隐含在 finalize() 中(RUNNING 或 SHUTDOWN) -> STOP 在调用 shutdownNow() 时 SHUTDOWN -> TIDYING 当队列和池都为空时 STOP -> TIDYING当池为空时 TIDYING -> TERMINATED 当 terminated() 钩子方法完成时 在 awaitTermination() 中等待的线程将在状态达到 TERMINATED 时返回。检测从 SHUTDOWN 到 TIDYING 的转换没有你想的那么简单,因为在 SHUTDOWN 状态期间队列可能在非空之后变为空,反之亦然,但我们只能在看到它为空后看到 workerCount 时终止为 0(有时需要重新检查)
if (workerCountOf(c) < corePoolSize)
t.setDaemon(false);
根据执行线程池时period的不同,执行不同延迟方案
执行execute传入的Runnable的run方法
if (t.isDaemon())
再次check
t.setPriority(Thread.NORM_PRIORITY);
判断是否为停止状态
if (isRunning(c) && workQueue.offer(command))
throw new IllegalThreadStateException();
reject(task);
workers.remove(w);
if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null))
判断是否有task,没有就从线程池设置的队列里获取
int rs = runStateOf(c);
设置第一个元素
if (runStateOf(c) != rs)
return rs == RUNNING || (rs == SHUTDOWN && shutdownOK);
0 条评论
回复 删除
下一页