04-ArrayBlockingQueue原理分析await
2023-03-04 22:27:24 0 举报
aqs
作者其他创作
大纲/内容
next
否
notEmpty = lock.newCondition()
在条件队列
不是CLH中第一个节点
入队线程
条件队列 producer
waitStatus=-2
pre
唤醒通过队列head中的下个节点unparkSuccessor(h)
1、put操作:arrayblockingqueue对象的items对象满了以后,对当前生产者线程进行阻塞,并释放锁,唤醒CLH队列中的线程;不满则直接放入items数组,并通知消费者把消费者条件等待队列加入CLH队列。2、take操作:arrayblockingqueue对象的items对象空了以后,对当前消费者线程进行阻塞,并释放锁,唤醒CLH队列中的线程;不空则直接从items数组取,并通知生产者condition对象把生产者条件等待队列加入CLH队列。3、put、take操作完成,也会调用unlock方法,唤醒CLH队列中的线程。
当前线程自我了结selfInterrupt()
常用方法:https://www.processon.com/view/link/5e43cd17e4b03e660721ef14
lastWaiter
unlock
成功
获得锁
当前线程对应node从队列中移除
队列是否已满
put(E e)
条件队列 consumer
阻塞当前线程,并判断当前线程是否在中断parkAndCheckInterrupt()
应该阻塞
lock
3、阻塞当前线程
CLH队列中尝试获取锁
外围程序员拿到线程的中断状态,由程序员根据线程的状态去做其他处理
当前节点是否同步队列节点isOnSyncQueue
在CLH同步队列
是
是否释放锁成功tryRelease
进入等待队列,并阻塞当前线程notFull.await()
notEmpty.signal()
firstWaiter
返回当前线程是否中断
thread
notFull = lock.newCondition()
同步队列中被唤醒
tryAcquire
执行fullyRelease
acquireQueued
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) notEmpty.await(); return dequeue(); } finally { lock.unlock(); // 调用此方法的时候,唤醒CLH队列阻塞线程 } }
没有获取到锁
向外抛出异常
释放锁唤醒线程
// 在进行take时,出队发送sinal信号。 private E dequeue() { // assert lock.getHoldCount() == 1; // assert items[takeIndex] != null; final Object[] items = this.items; @SuppressWarnings(\"unchecked\") E x = (E) items[takeIndex]; items[takeIndex] = null; if (++takeIndex == items.length) takeIndex = 0; count--; if (itrs != null) itrs.elementDequeued(); notFull.signal(); // 通过此方法将条件队列同步到CLH队列 return x; }
执行入队操作enqueue(e)
当前节点是否是队列中的第一个节点node.predecessor()==head?
new ConditionObject
1.进入条件队列
ArrayBlockingQueue
2、释放锁并唤醒CLH队列中的节点
阻塞当前线程LockSupport.park(this)
通过 transferForSignal(first) 完成将条件队列的头节点移动到clh队列
失败
当前线程进入条件等待队列addConditionWaiter()
被唤醒等情况下,重复执行for
ReentrantLock
0 条评论
下一页