AQS:并发多线程抢锁源码分析(debug跟踪)-条件队列栅栏 以CyclicBarrier为例
2021-03-08 22:14:18 0 举报
在同步队列的基础上,对条件队列进行debug源码跟踪学习
作者其他创作
大纲/内容
构造方法
创建非公平锁sync = new NonfairSync();
同步队列
第一次循环
第一次当前是空队列,进入if代码
waitstate=0prev=nullnextWaiter=nullthread=Thread-1(当前节点)
waitStatus = -2prev = nullnext = nullthread = Thread-0nextWaiter = null
CyclicBarrier#await()
进入循环for (;;)
Node.CONDITION=-2Thread-0到这里:参与量还不满足,还是在条件队列中,因此状态为-2,返回falseThread-1到这里:参与量还不满足,还是在条件队列中,因此状态为-2,返回false
if (t != null && t.waitStatus != Node.CONDITION) {\tunlinkCancelledWaiters();\tt = lastWaiter;}
进来一个,就将待参与量减一Thread-0: index=2Thread-1: index=1Thread-2: index=0
waitstate=0prev=前面的空节点nextWaiter=nullthread=Thread-1(当前节点)
待
具体逻辑
ReentrantLock.Sync#tryRelease
AbstractQueuedSynchronizer.ConditionObject#await()
将当前线程放到队列中,并将当前线程的前节点指向新队列的节点
Sync(int count) { setState(count);}
System.out.println(Thread.currentThread().getName() + \" counts = \" +countDownLatch.getCount());
//用于finally部分ranAction = true;
如果释放锁失败,将waitStatus重置为-2,待条件触发
mian线程
boolean ranAction = false;
waitstate=0prev=nullnextWaiter=nullthread=null
ReentrantLock$NonfairSync
CountDownLatch countDownLatch = new CountDownLatch(3);
Head,tail
测试代码
AbstractQueuedSynchronizer#fullyRelease
public static void main(String[] args) { // 闭锁 CountDownLatch countDownLatch = new CountDownLatch(3); for (int i = 0; i < 5; i++) { new Thread(new Runnable() { @Override public void run() { try { //减1 state=0 countDownLatch.countDown(); System.out.println(Thread.currentThread().getName() + \" counts = \" +countDownLatch.getCount()); // 阻塞 countDownLatch.await(); System.out.println(Thread.currentThread().getName() + \" end\"); } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); } }
Condition队列Condition 将 Object 监视器方法(wait、notify 和 notifyAll)分解成截然不同的对象,以便通过将这些对象与任意 Lock 实现组合使用,为每个对象提供多个等待 set(wait-set)。其中,Lock 替代了 synchronized 方法和语句的使用,Condition 替代了 Object 监视器方法的使用。
cyclicBarrier对象
public static void main(String[] args) { CyclicBarrier cyclicBarrier = new CyclicBarrier(3); for (int i = 0; i < 5; i++) { new Thread(new Runnable() { @Override public void run() { try { System.out.println(Thread.currentThread().getName() + \"开始等待其他线程\"); cyclicBarrier.await(); System.out.println(Thread.currentThread().getName() + \"开始执行\"); // 模拟业务处理 Thread.sleep(2000); System.out.println(Thread.currentThread().getName() + \"执行完毕\"); } catch (Exception e) { e.printStackTrace(); } } }).start(); } }
第二次循环,进入else代码
由start调用Thread的run方法
//获取条件队列的首节点Node first = firstWaiter;
if (g.broken)//初始化时是false throw new BrokenBarrierException();if (Thread.interrupted()) {//判断当前线程已标记挂起 breakBarrier(); throw new InterruptedException();}
Y
AbstractQueuedSynchronizer.ConditionObject#doSignalAll不为空,将条件队列的节点全部移除并转移到同步节点
N
//核心逻辑,条件队列到同步队列trip.signalAll();//当前三个节点都移到同步队列后,重置参与量并创建新的一代 count = parties;generation = new Generation();return 0;
获取barrierCommand,如果被赋值了就执行线程对应的代码逻辑final Runnable command = barrierCommand;if (command != null) command.run();
//设置当前待参与量为3this.parties = parties;//设置初始化值为3.待凑够3个线程为一组执行后,需要将待参与量重置为count值(即3)。this.count = parties;this.barrierCommand = barrierAction;
打印当前的count值
Node node = addConditionWaiter();
适用于时间条件的
无论哪个线程走到这,都要将当前节点放到队列的lastWaiter
判断最后一个节点不为空,且或最后一个waiter的状态为等待条件触发状态。Thread-0到这里,队列还是空的,因此t为null
创建一个新节点,并作为新队列的头部和尾部节点
mian线程挂起
//条件队列的首位节点置空lastWaiter = firstWaiter = null;do {//获取首节点的下一个nextWaiter Node next = first.nextWaiter;//将首节点的nextWaiter属性置为空 first.nextWaiter = null; transferForSignal(first);//具体逻辑下面详解 first = next; } while (first != null);
int index = --count;
Thread-2能进到这里,因为Thread-0到2已满足参与量。第一次进来时,barrierCommand未赋值为null
for只有出现异常才会调用解锁
加入到初始化时,创建的条件队列中trip.await();
count必须大于0,否则构造函数会直接抛出异常
public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException(\"count < 0\"); this.sync = new Sync(count); }
图解队列
CountDownLatch.Sync#tryReleaseShared
private void unlinkCancelledWaiters() { Node t = firstWaiter; Node trail = null; while (t != null) { Node next = t.nextWaiter; if (t.waitStatus != Node.CONDITION) { t.nextWaiter = null; if (trail == null) firstWaiter = next; else trail.nextWaiter = next; if (next == null) lastWaiter = trail; } else trail = t; t = next; }}
nanos = trip.awaitNanos(nanos);
使用当前线程创建节点,并指定waitStatus为-2(等待条件触发)
lock.lock();
finally:lock.unlock();
将前面的ReentranLock解锁,并返回占有锁的节点数
1、获取到条件队列的第二个节点,赋值给next。2、将首节点放到同步队列中,3、将nest赋值给first(就是将已放到同步队列的首节点从条件队列移除,再循环,依次将条件队列的每一个节点都放到同步队列。)。当前三个节点都移到同步队列后,
countDownLatch.await();
boolean broken = false;
firstWaiter = nulllastWaiter = nullReentrantLock$NonfairSync
LockSupport.park(this);
构造方法,创建Sync,并将count作为state的值
如果不在队列中,调用park方法挂起线程
创建线程,并调用start
Thread-0:c=2Thread-1:c=1Thread-2:c=0Thread-3:c=0
cyclicBarrier.await();
将当前的屏障生成设置为已打破并唤醒所有人。仅在保持锁定时调用。private void breakBarrier() { generation.broken = true; count = parties; trip.signalAll(); }
private final ReentrantLock lock = new ReentrantLock();
切换到Thread-0
Thread-0到这里,尾部节点为空,即t为nullThread-1到这里,尾部节点为Thread-0节点
创建CyclicBarrier栅栏对象,设定参与量为3
将当前线程的节点作为条件队列的尾部节点
if (t == null) firstWaiter = node;else t.nextWaiter = node;
if (!isHeldExclusively())//判断lock的独占者是否是当前线程throw new IllegalMonitorStateException();
sync.releaseShared(1);
将当前线程加到条件队列做尾部节点。
CountDownLatch闭锁模拟并发,多个线程在某一时刻同时开始执行通过规定阈值n,每进一个线程减1,阈值减为0后,关闭锁,线程就可以任意执行了
只有一个私有属性Sync,并实现了AQS的方法
this.sync = new Sync(count);
判断是否要阻塞当前线程
System.out.println(Thread.currentThread().getName() + \"开始执行\");// 模拟业务处理Thread.sleep(2000);System.out.println(Thread.currentThread().getName() + \"执行完毕\");
main线程
通过自旋;没有同步队列就创建同步队列,将当前节点放到同步队列
else if (nanos > 0L)
当条件队列不为空,且尾部节点的状态不在是-2时(即参与量已满足条件触发,修改了状态)
//用于锁住屏障入口private final ReentrantLock lock = new ReentrantLock();跳过的条件private final Condition trip = lock.newCondition();参与量private final int parties;跳过条件后要运行的代码private final Runnable barrierCommand;//当前代private Generation generation = new Generation(); //仍在等待的参与方数量。每代从参与方最大量减到0。每到新一代或是被打破的时候,它都会被重新设置。private int count;
初始化一个条件队列
if (!timed)
对象初始化时,初始化的私有变量
int interruptMode = 0; while (!isOnSyncQueue(node)) { LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break;}
protected final boolean tryRelease(int releases) { int c = getState() - releases;//释放一次就对state进行减一//此时独占线程肯定是当前线程,如果不是肯定是异常 if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) {//Thread0是锁的拥有者,state为1,因此减1后为0,代表锁已经被释放完 free = true; setExclusiveOwnerThread(null); } setState(c); return free;}
Node p = enq(node);
当nextc=0时
enq(node);通过自旋将node加入到队列中
这里就是之前的ReentrantLock获取锁
//ConditionObject implements Conditionnew ConditionObject()
try {//获取AQS的state。因前面ReentrantLock已经获取到锁,state值为1 int savedState = getState();//释放锁,修改state的值 1-->0 if (release(savedState)) {//修改failed的值为false failed = false;//返回修改前的state的值 1 return savedState; } else { throw new IllegalMonitorStateException(); } }
private Generation generation = new Generation();
已满
切换到Thread-2
切换到Thread-0,debug线程的run方法体
判断当前节点是否在 中isOnSyncQueue(Node node)
lastWaiter = node;
nextGeneration();
条件实体类
更新屏障行程的状态并唤醒所有人。仅在保持锁定时调用。private void nextGeneration() {\t// 上一代信号完成\ttrip.signalAll();\t//建立下一代\tcount = parties;\tgeneration = new Generation();}
切换到Thread-1
Thread-0到这里,尾部节点为空,所以将Thread-1设置为条件队列的firstWaiterThread-1到这里,尾部节点为Thread-0节点,所以将Thread-0节点的nextWaiter指向Thread-1
int savedState = fullyRelease(node)
AbstractQueuedSynchronizer.ConditionObject#addConditionWaiter
finally { if (failed) node.waitStatus = Node.CANCELLED;}
//获取锁屏障的独占锁,这个锁是创建CyclicBarrier栅栏对象时初始化好的。final ReentrantLock lock = this.lock;
条件队列与同步队列:条件队列与同步队列是两个独立的队列,同步队列作为等待队列,条件队列的节点释放后就要加到同步等待队列中,才能往下走
创建五次new Thread,并调用start方法
prev
countDownLatch.countDown();
并发多线程抢锁源码分析(debug跟踪)-条件队列栅栏以CyclicBarrier为例,多线程获取锁模拟
AbstractQueuedSynchronizer.ConditionObject#signalAll
源码分析CountDownLatch
如果条件队列尾部节点为null,就将新创建的节点作为头部节点。否则就将新创建的当前线程的节点放到尾部节点后面
当前线程堵塞在这个for循环中,切换下一个线程
Thread-0:nextc =1Thread-1:nextc =0
判断条件队列的首节点是否为空,这里一般不会为空。为空代表条件队列的节点已经转到同步队列了,不再做其它处理
切到Thread-3再次执行上面的逻辑
AbstractQueuedSynchronizer#transferForSignal
判断参与量是否已满if (index == 0)
返回的p是传进来的节点node的前一个节点
对应的解锁
head = null tail = null state = 0 exclusiveOwnerThread = null
if (tryReleaseShared(arg)) { doReleaseShared(); return true;}return false;
CyclicBarrier
//获取最后一个waiter,初始化时为nullNode t = lastWaiter;
Thread-0、Thread-1不会进到这里,Thread-2进来,获取的首节点是Thread-0节点
颜色相同的为一个方法级别的,建议debug代码跟踪学习
waitStatus = 0prev = nullnext = nullthread = Thread-0nextWaiter = null
if (first != null)doSignalAll(first);
Thread-2,Thread-3返回false
private final Condition trip = lock.newCondition();
CountDownLatch
CyclicBarrier(栅栏)允许一组线程互相等待,直到到达某个公共屏障点 (Common Barrier Point)闭锁用于等待countDown事件,而栅栏用于等待其他线程。根据设定的值n,规定每次执行需要的线程数,每n个线程一组执行,后面的等到够n个了再次执行下一组。
lock = ReentrantLocktrip = AbstractQueuedSynchronizer$ConditionObjectparties = 3barrierCommand = nullgeneration = CyclicBarrier$Generationcount = 3
CountDownLatch.Sync#doReleaseShared();
0 条评论
下一页