AQS之CyclicBarrier源码流程
2024-09-11 14:18:11 0 举报
CyclicBarrier是AQS框架下的一种同步机制,用于控制多个线程同时到达某个屏障点。它的核心内容包括:首先,通过构造函数设置屏障数,每次调用await()方法时,计数器减一,当计数器减至0时,唤醒所有等待线程。其次,CyclicBarrier支持重置操作,使得屏障可重复使用。此外,CyclicBarrier还提供了getNumberWaiting()方法来获取当前在屏障处等待的线程数量。在源码中,CyclicBarrier使用了AbstractQueuedSynchronizer(AQS)框架,底层通过volatile修饰的state变量和双向队列实现线程同步。
作者其他创作
大纲/内容
t
trail = firstwaiter
lastwaiter
nextWaiter
conditionnode
同步等待队列
returnfindNodeFromTail(node)
当前线程为中断状态,去破坏屏障
线程被唤醒后去尝试获取锁
设置当前代的 broken 属性为 true
重置count 计数器为下一代屏障做准备
Y
trail
lock.unlock()
fullyRelease(node)
firstwaiter
trip.signalAll()
AbstractQueuedSynchronizer.ConditionObject#await()
条件队列:node 的 nextWaiter 属性指向下一个 node同步队列:node 的 next 属性指向下一个 node
trail.nextwaiter = t.next
记录最新的尾节点
node 有 next node
条件队列转同步队列
finally
return 0
将当前 condition node 转到同步队列中
创建一个新的 Generation 实例,并将其赋值给 generation表示进入下一代屏障
cancellednode
isHeldExclusively()
interruptMode == REINTERRUPT
t.nextWaiter = node
CyclicBarrier#await()
尾节点不是 cancelled node
return true
unlock释放重新获取到的独占锁
AbstractQueuedSynchronizer.ConditionObject#doSignalAll
interruptMode != THROW_IE
Barrier 核心
记录条件等待队列的尾节点
所有参与的线程都已经到达了屏障点,此时屏障被触发
唤醒node对应的线程的目的是为了重新同步,设置正确的prev nodeprev node 如果是 cancelled node 则指向新的prev node,从同步队列中剔除 cancelled nodeprev node 的 waitStaus 更新为 SIGNAL
ws = p.waitStatus
release(savedState)
加锁
first
breakBarrier()
tail
如果非 condition 节点的 prev 不为 null,但还没有在队列中(可能是 CAS tail node 入队失败),则从队列尾部开始遍历,确认节点是否在队列中
唤醒同步等待队列中阻塞的线程
next
(interruptMode = checkInterruptWhileWaiting(node)) != 0
lastWaiter = firstWaiter = null
入队条件队列
校验当前代的屏障是否被破坏
尾节点是 cancelled node
不是最后到达屏障点的线程
lock加独占锁
BrokenBarrierException
CAS 成功说明线程在条件等待队列中等待期间被中断唤醒
IllegalMonitorStateException
获取锁失败
firstWaiter = node
AQS 维护了同步等待队列
AbstractQueuedSynchronizer#isOnSyncQueue
独占锁
unlinkCancelledWaiters()
重置count 计数器
first = next
LockSupport.unpark(node.thread)
Thread.interrupted()
这里都是 AQS 的一些通用方法,看下https://www.processon.com/diagraming/66cec2deb03fbb660e59a136 即可
线程在对应的node 被 Condition.signal() 方法转到同步等待队列中后才被中断唤醒则设置线程的中断标记
generation.broken = true
Node t = lastWaiter
如果 command.run() 执行失败则去破坏屏障
使用了超时的方法但是传入的超时时间小于等于0则抛出超时异常
条件等待队列中所有节点转换到同步等待队列
入队到条件等待队列末尾
node.next != null
runnable action触发屏障时执行 action
N
唤醒所有在当前屏障上等待的线程(条件等待队列转成同步等待队列)并进入下一代屏障
AbstractQueuedSynchronizer.ConditionObject#unlinkCancelledWaiters
g != generation
ReentrantLock.Sync
将条件队列中所有的节点转到同步队列中
解锁线程唤醒后获取到的锁
所以这里 ConditionObject 调用 AQS 的方法时实际是走的 ReentrantLock.Sync 对象的实现
ReentrantLock.Sync#tryRelease
释放的是这把锁
selfInterrupt()
判断当前节点是否在同步等待队列中
lock.lock()
这里重新加锁
设计两个指针:t - 当前处理的 node(初始值为 firstWaiter)trail - 记录上一个未取消的 node(初始值为 null)当 t 为 cancelled node 时需要将该 node 从队列中剔除- trail 为 null 时 firstwaiter 需要替换为 t.next- trail 非 null 时直接 trail.nextwaiter = t.next最终的队列就是由 trail 记录过的 node 组成的
Node p = enq(node)
unlock
ReentrantLock lock = this.lock
Y抛出异常
extends
当前屏障的代每次屏障被触发或重置时,代会改变
Node first = firstWaiter
count = parties
开始阻塞
判断当前线程是否是锁的持有者
trip.await()
释放 ReentrantLock锁
trail = null
如果node的waitStatus是CONDITION或者prev node为null
Runnable command = barrierCommand
unpark 唤醒线程或是线程因中断而唤醒检查线程中断情况
解锁最外面的lock
LockSupport.park(this)
如果 CyclicBarrier 创建时设置了Runnable action 那么执行该action
ranAction = true
记录条件队列的头节点
head
timed && nanos <= 0L
N创建 Condition Node
尾节点为空则初始化同步等待队列node 既是头节点也是尾节点
继续去唤醒同步队列中的下个节点
acquire lock重新获取独占锁
unlinkCancelledWaiters
while (!isOnSyncQueue(node))
ws 0(说明 prev node has bean cancelled)或者 CAS 失败则 unpark 当前唤醒当前线程
指针指到 next condition node执行转换操作
return Thread.interrupted() ? (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0
signal(条件队列转到同步队列)
Thread.yield()
return false
enq(node)
AbstractQueuedSynchronizer#transferAfterCancelledWait
node.nextWaiter != null
抛出异常
t = lastWaiter
park
lastWaiter = node
doSignalAll(first)
interruptMode = REINTERRUPT
AbstractQueuedSynchronizer.ConditionObject#addConditionWaiter
!ranAction
清除条件队列中的 cancelled node 的设计原理流程就不画了,主要还是这个设计真的很巧妙
addConditionWaiter()
唤醒条件等待队列中所有的节点(条件等待队列中所有节点转换到同步等待队列)
return index
删除条件队列
command.run()
prev
next generation开启下一代
node 不在同步队列park 阻塞当前线程
index == 0
Node next = first.nextWaiter
线程在条件等待队列中等待期间被中断唤醒则抛出中断异常
g.broken
判断线程中断情况
获取锁成功
release释放独占锁
AbstractQueuedSynchronizer#transferForSignal
返回前还会执行最外面的finally的lock.unlock()
interruptMode != 0如果线程因中断而被唤醒则需要进行一些操作
1. 线程在条件等待队列中等待期间被中断唤醒则返回 THROW_IE,代表后续需要抛出中断异常2. 线程在对应的node 被 Condition.signal() 方法转到同步等待队列中后才被中断唤醒则返回 REINTERRUPT,代表后续需要设置中断标记3. 如果线程是 unpark 唤醒的(不是中断唤醒),那么返回 0,代表线程没有中断
1. Condition.await() 会在入队条件队列完成后先 release 释放锁2. 所有参与线程到达屏障点触发屏障执行signal 条件队列转到同步队列后,等待 unpark 唤醒后会再次获取锁,最后该锁由 lock.unlock 释放
AbstractQueuedSynchronizer
N校验线程状态
当前线程是否是最后一个到达屏障点
node01
条件等待队列是单向链表结构
for (;;) { // 循环直到屏障触发、屏障被破坏、参与线程中断、等待超时}
从条件等待队列的头节点开始一个个的转到同步等待队列的末尾
ConditionObject
设计原理
CAS失败说明线程在对应的node 被 Condition.signal() 方法转到同步等待队列中后才被中断唤醒
如果 CAS 失败,说明该 node 已 cancelled
node.waitStatus == Node.CONDITION || node.prev == null
InterruptedException
是最后到达屏障点的线程
interruptMode == THROW_IE
nextGeneration()
条件队列是 ConditionObject 维护的,通过将头节点和尾节点设置为空来删除条件队列后续的转换操作,都是由提前记录的条件队列(first)来转换
入队同步队列并返回 prev node
final Generation g = generation
阻塞
维护了条件等待队列
入队条件等待队列并park 等待直到屏障触发执行signal操作 - 条件等待队列转成同步等待队列在同步队列中 unpark 唤醒线程
transferForSignal(first)
t == null
int index = --count
AbstractQueuedSynchronizer.ConditionObject#signalAll
非静态内部类
由最后一个到达屏障点的线程去唤醒同步等待队列中阻塞的线程
reportInterruptAfterWait(interruptMode)
捕获InterruptedException当前代没变且当前代没被破坏
判断尾节点是否为空
N计数器减一
尾节点不为空则node 添加到末尾
清除条件等待队列中 cancelled node
参与的线程没有全部到达屏障点则添加到条件队列进行阻塞,直到最后一个线程到达屏障点触发屏障执行 signal 将条件队列转到同步队列中等待锁释放后 unpark 唤醒线程
条件等待队列
TimeoutException
do { // 循环直到条件队列全都转到了同步队列} while (first != null)
generation = new Generation()
t != null && t.waitStatus != Node.CONDITION
清除条件等待队列中的 cancelled node
ws > 0 || span style=\"font-size: inherit;\
记录 next contidion node
收藏
0 条评论
下一页