java并发
2022-06-23 11:54:25 18 举报
java并发工具原理
作者其他创作
大纲/内容
Node enq(final Node node)
初始化变量
lock.lock();
获取读锁个数
writeLock.unlock()
如果线程被中断了就抛出异常并重置count,唤醒等待线程
BlockingQueue<Integer> synchronousQueue = new SynchronousQueue<>()
如果数组有元素就获取元素
如果还剩余资源,或头节点为空,或后续节点可被唤醒
调用codition等待超时时间
t != null && t.waitStatus != Node.CONDITION
当前剩余资源
获取上一个节点
先获取当前的线程与当前锁的状态
cancelAcquire(node)
获取当前状态
新建标记
notEmpty.await();
Thread.currentThread().interrupt();
如果数组有空间就入队
if (t.waitStatus <= 0)->s = t;
readHolds.remove();
调用内部释放的方法
ws == Node.SIGNAL
rh.count == 0
tryAcquire
timed && nanos <= 0
如果链表中已经空了,消费者应该阻塞了
readLock.lock();
循环执行里面的操作
notFull.signal();return x;
成功释放了锁并且队列头节点不为空,等待状态不为0就尝试唤醒队列中的节点
如果以上操作被中断了,先判断当前年代是否被刷新了,如果没有,并且被broken了就直接转换队列抛出异常
cas获取锁
t == null
获取当前线程与等待状态
for (;;)
去除取消节点
如果状态小于0就重置等待状态为0(小于0表明后面节点可被唤醒)
count == 0
getExclusiveOwnerThread() != current
否则直接中断
count == items.length说明队列没有数组已满,生产者需要等待
current == getExclusiveOwnerThread()
获取锁后续步骤
获取当前节点的等待状态
如果被中断抛出
setExclusiveOwnerThread(Thread.currentThread());
如果获取失败就尝试等待队列的入队操作
-*- Node.WaitStatus -*- /** waitStatus value to indicate thread has cancelled */ static final int CANCELLED = 1; /** waitStatus value to indicate successor's thread needs unparking */ static final int SIGNAL = -1; /** waitStatus value to indicate thread is waiting on condition */ static final int CONDITION = -2; /** * waitStatus value to indicate the next acquireShared should * unconditionally propagate */ static final int PROPAGATE = -3;
return 1;
enq(node);
t = next;
final ReentrantLock lock = this.lock;->lock.lockInterruptibly();
if (g.broken)->throw new BrokenBarrierException();-----------------------------------------------if (g != generation)->return index;
如果获取的指针已经在队尾了重置指针指向开始并设置数组内容剩余个数
Node t = tail; t != null && t != node; t = t.prev
Thread current = Thread.currentThread();->int c = getState();
tryAcquire(arg)
void signalAll()
如果头节点为空或者模式相同
for (;;)
Node addConditionWaiter()
int count = rh.count;
int index = --count
如果记录节点是空就设置头节点为下一个节点说明头节点状态为取消
如果尾节点为空,创建队列尝试创建与设置头节点并设置头尾节点为同一元素
查看非公平实现
setHead(node);p.next = null; failed = false;return interrupted;
semaphore.acquire(1);
return node;
doSignalAll(Node first)
如果当前读锁已经被创建多次,并且不是头节点,先获取重入次数记录
setState(c + acquires);return true;
否则就去获取
return -1;
等于0,说明当前锁为空闲先检测同步队列中是否存在有比当前线程更前的元素
如果没有比当前更优先的就出获取锁成功的话就设置当前线程为独占状态
如果执行自己的方法抛出异常将直接转换等待线程队列
如果没有设置超时时间调用codition等待
唤醒后面的阻塞节点
doReleaseShared唤醒head后继节点后h.waitStatus从-1到0,还没来得及更新head,即被唤醒的共享节点还没有setHeadAndPropagate,又有其他线程doReleaseShared唤醒head后继节点h.waitStatus从0到-3。
如果获取锁失败就先设置当前节点状态,如果被取消的了就出队,最后park节点并设置中断标志
unparkSuccessor(Node node)
for(;;)执行以下逻辑先获取队列头
return true;
sync.release(1);
如果当前队列没有节点,并且等待状态不是-2
nonfairTryAcquire(...)
semaphore.release(1)
while循环检测当前链表是否存在元素
ranAction = true;nextGeneration();return 0;
最后减次数
notFull.await();
如果减去要获取的资源数后0就直接返回还有资源就尝试执行cas获取资源成功就直接放回
如果没有创建,或者获取的不是当前线程的
h == head
if (g == generation && ! g.broken)->breakBarrier();throw ie;
int interruptMode = 0;
throw new IllegalMonitorStateException();
firstReaderHoldCount++;
获取等待队列的最后节点
头节点状态为-1,后续有节点可以唤醒
尝试修改状态为0失败就继续循环
linkedBlockingQueue.put(e);
setState(c);return free;
当前读锁个数为0,设置第一个读锁节点为当前线程设置第一个读锁节点的重入次数为1
清空
void unlinkCancelledWaiters()
doSignalAll(first);
if (rh.count == 0)return -1;
如果前面都没放回说明加锁成功并且是非重入的,设置写锁为现在的线程独占
获取下一个节点
转到同步队列
do while循环执行条件:
构建数据大小,构建非公平锁设置等待条件
如果已经遍历到了最后就把尾节点指向记录节点
公平锁实现
设置重入记录
sync.releaseShared(permits)
tryAcquireShared(int acquires)
如果当前执行的线程不是之前获取锁的独占线程就抛出异常
尾节点可能被中断取消了,所以要剔除再获取一次最后节点
p.next = null; failed = false;return;
再次尝试获取锁
获取当前线程与状态
final Generation g = generation;
当前state减去是否的次数1再查看写锁state是否为0
如果锁次数=1
SNode h = head;
trail == null
如果没有同步队列将会创建如果存在直接入队尾返回之前同步队列的尾节点
尝试唤醒,获取当前状态
if (c + 1 < capacity)->notFull.signal();
尾节点不为空,表示队列已经存在使用尾插法
void await() throws InterruptedException
!isHeldExclusively()
Node next = first.nextWaiter;->first.nextWaiter = null;
transferForSignal(first)
唤醒获取队列最后返回获取的元素
rh = readHolds.get();
int w = exclusiveCount(c);
如果上一个节点为头节点并且再次尝试获取锁成功
循环执行操作
获取当前头节点的等待状态
return true;
如果释放资源小于当前资源就抛异常
boolean acquireQueued(final font color=\"#e65100\
NonfairSync非公平锁实现
获取头节点
先获取下一个节点
尝试从读锁中获取锁
存在队列并且还有等待节点
readLock.unlock()
firstReader == current
Node node = addWaiter(Node.SHARED);
propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0
当前的锁是否是空闲的
不为空则进行入队操作
int c = getState();
while (first != null)
int fullTryAcquireShared(Thread current)
c != 0
blockingQueue.take();
else if (nanos > 0L)->nanos = trip.awaitNanos(nanos);
就从集合中移除当前记录器如果锁次数=0 就抛出异常(小于等于0应该不能被再次释放)
当前线程执行一次锁
trail = t;
parkAndCheckInterrupt()
boolean tryAcquire(int acquires)
pred != null)
s=null-> forNode t = tail; t != null && t != node; t = t.prev
如果获取失败后可以park节点
doAcquireInterruptibly
return dequeue()
默认是非公平的公平是使用队列不公平使用栈
hasQueuedPredecessors()
查看是存入数据还是取数据
while!isOnSyncQueue(node)
如果写锁存在
lastWaiter = firstWaiter = null;
h == null || h.mode == mode
h != null && h != tail
重入次数的记录如果没有创建,或根据当前tid获取的记录与记录的tid不一致
int c = getState();int nextc = c - SHARED_UNIT;
如果链表个数等于0唤醒消费者(前面已经加1了所以至至少有一个元素存在)
Lock lock = new ReentrantLock();
生产者阻塞
c = count.getAndIncrement();
return null;
头节点没有超时取消把e设置头节点
Node p = enq(node);
queue.take()
if (itrs != null)->itrs.elementDequeued();
if (r == 0)
rh == null
从threadlocal中获取
int ws = pred.waitStatus;
readHolds.set(rh)
Node node = addConditionWaiter();
sync.acquire(1);
if (failed)->node.waitStatus = Node.CANCELLED;
写锁没有添加(存在读锁)或当前线程不是独占的(其他线程拥有写锁)
setExclusiveOwnerThread(null);
ws > 0 do {node.prev = pred = pred.prev;} while (pred.waitStatus > 0);->pred.next = node;
如果第一个读锁为当前线程
final Node p = node.predecessor();
可唤醒状态,更改当前节点等待状态,如果失败就跳过
最后拿到以上遍历后的节点后进行判断,不为空就唤醒它
并且当前重入次数为1
将新建节点的上一个节点设置为尾节点并且尝试设置新建节点为尾节点
从尾部开始向前遍历,找到最前的一个处于正常阻塞状态的结点,直到节点重合
s == null || s.isShared()->doReleaseShared();
boolean failed = true;
写锁
release(savedState)
transferForSignal(Node node)
获取数组中的元素,并清空对应槽位
获取上一个节点的等待状态
如果当前节点等于head就跳出循环
unparkSuccessor(h);
当前重入记录数为0
尝试获取锁
链表个数还有空间就插到尾部
获取当前的状态与读锁减1的值
unlock();
如果被中断了设置标志
HoldCounter rh = cachedHoldCounter;
rh == null || rh.tid != getThreadId(current)
int c = getState() - releases;
linkedBlockingQueue.take()
获取批次年代其中break表示当前屏障有没有被破坏(例如调用await方法设置的超时时间到了)
获取当前节点状态,如果小于0就更新成0
否则就唤醒后续节点
如果尾节点被取消了或者修改状态失败了就唤醒当前最新同步队列的尾节点(就是当前要转换的节点)
查看写锁state
如果成功获取锁就
如果后面还有元素,就唤醒消费者
return false;
cb.await();
执行尝试释放锁的方法
如果队列存在就把新节点插入尾
enqueue(e);
!hasQueuedPredecessors()
writeLock.lock();
boolean free = false;->if(c == 0)
如果释放完成就直接把独占清空
获取当前线程
!isHeldExclusively()->throw new IllegalMonitorStateException()
获取第一个等待队列的节点,如果不为空就执行转换队列方法
如果当前节点的状态不是-2说明下一个节点被中断或取消了
HoldCounter rh = cachedHoldCounter;
出队第一个节点获取当前元素个数,然后将count-1
Node s = node.next;
while循环 判断如果链表个数已经达到最大值
Node h = head;
非公平锁实现
readerShouldBlock()
int fullyRelease(Node node)
最后解开ReentrantLock
c == 0
int ws = h.waitStatus;
更改当前节点的等待状态为0
Node addWaiter(Node mode)
如果当前线程获取锁成功,那么就将独占的属性设置为当前的线程
boolean failed = true;boolean interrupted = false;->for (;;)
如果线程被中断就抛出中断异常
获取个数后进行原子增加
interrupted = true
循环执行
定义变量然后执行以下循环操作
lock.unlock();
firstReader = current;firstReaderHoldCount = 1;
设置尾节点的下一个节点尾当前节点
Node p = node.predecessor()->p == head
否则先尝试获取锁
acquireQueued(font color=\"#b71c1c\
if (Thread.interrupted())->throw new InterruptedException();
this.capacity = capacity; last = head = new Node<E>(null);
如果被中断了就抛出异常
blockingQueue.put(i);
int r = sharedCount(c);
检查当前节点是不是在同步队列等待唤醒是就park
否则就去唤醒后续节点
ReadWriteLock readWriteLock = new ReentrantReadWriteLock(false);
lastWaiter = node;->return node;
如果上一个节点被取消,再次循环检查,并执行跳过上一个节点
permits < 0
获取当前线程的读锁重入记录
if (!timed)-> trip.await();
tryAcquire(...)
elseif如果同步队列的(第一个是头)第二个节点是写节点
count <= 1
以上条件都不满足直接设置上一个节点的为请求释放状态
判断条件
FairSync
E x; int c = -1; final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock;
从队列中移除当前节点
如果当前节点状态没有被取消就把记录节点指向当前节点
if (timed && nanos <= 0L) {breakBarrier();->throw new TimeoutException();}
否则将上一个记录节点的下一个节点指向当前t的下一个节点
Thread.interrupted()->breakBarrier();->throw new InterruptedException();
BlockingQueue queue = new ArrayBlockingQueue(1024);
count.get() == 0
exclusiveCount(c) != 0 &&getExclusiveOwnerThread() != current
获取写锁state
Node first = firstWaiter;if (first != null)
first = next;
如果获取锁记录器为空,或记录器拥有者不是当前线程就从集合中获取
public SynchronousQueue(boolean fair) { transferer = fair ? new TransferQueue<E>() : new TransferStack<E>(); }
Node next = t.nextWaiter;
将当前节点设置为头节点并且设置prev与thread属性为空清除后面的节点让gc回收最后返回false
compareAndSetHead(new Node())->tail = head;
tryRelease(arg)
检测当前等待状态为可唤醒状态
int nextc = c + acquires; setState(nextc);
doAcquireSharedInterruptibly(arg)
linkedBlockingQueue = new LinkedBlockingQueue<String>(20);
int ws = node.waitStatus;
tryReleaseShared(arg)
g.broken->throw new BrokenBarrierException();
void acquireShared(int arg)
获取头节点(这里执行相当于拿到当前节点)
获取锁成功循环while判断count是否等于0
获取state(当前加锁状态)然后进行减操作获取一个值(可能当前为重入锁)
unparkSuccessor(h)
Node t = tail;
Node s = node.next;
Lock readLock = readWriteLock.readLock();-> private Lock writeLock = readWriteLock.writeLock();
if (c > 1)->notEmpty.signal();
rh.count++;
if (!ranAction)->breakBarrier();
enqueue(node);
this.items = new Object[capacity]; lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition();
setExclusiveOwnerThread(current);
新建添加的节点获取写锁与链表个数
如果下一个节点为空或者是共享节点就唤醒
循环执行以下操作
cachedHoldCounter = rh = readHolds.get();
Thread.interrupted()
获取锁次数
设置链表个数创建链表头尾节点
此节点已经设置了请求释放的状态以向其发送信号,因此它可以安全的park
int savedState = fullyRelease(node);
修改状态为0,从后往前拿到最靠近头节点并且可以唤醒的节点,然后进行unpark
如果无超时参数
从集合中获取
firstReader = null;
sync = fair ? new FairSync() : new NonfairSync(); readerLock = new ReadLock(this); writerLock = new WriteLock(this);
int r = tryAcquireShared(arg);
NonfairSync
已经没有资源,到达屏障
类似逻辑,除了不用判断同步队列中是否有比当前节点更优先的
复位中断标志位
如果是写节点或者读锁个数达到上限
尾节点是否为空
failed = false;->return savedState;
获取尾节点
count=0说明队列没有数组为null,消费者需要等待
firstReaderHoldCount == 1
while循环查看当前数组是否已经满了
s == null || s.waitStatus > 0
t.nextWaiter = null;
return t;
setState(nextc);->return free;
int current = getState();int next = current + releases;
当前存在写锁并且写锁的的独占线程不是当前线程
SNode s = null;->int mode = (e == null) ? REQUEST : DATA;
Semaphore semaphore = new Semaphore(5);
类似操作
if (failed)->cancelAcquire(node);
state为0则表示当前锁已释放就设置标志成功,并且清空独占线程记录
sharedCount(c) == MAX_COUNT->throw new Error(\"Maximum lock count exceeded\");
E x = (E) items[takeIndex];->items[takeIndex] = null;
当前线程为重入线程,设置状态+1
把尾节点记录变为新节点
while (t != null)
ws == Node.SIGNAL->return true
!tryAcquire(arg)
unlinkCancelledWaiters();->t = lastWaiter;
rh = cachedHoldCounter;
就从末尾开始遍历,往前拿,一直到当前节点的下一个并且等待状态小于的节点
s != null->LockSupport.unpark(s.thread)
doReleaseShared()
Node t = lastWaiter;
如果当前线程就是独占属性的线程
如果下一个节点是空的,或者下一个节点被取消了
this.parties = parties;-> this.count = parties;-> this.barrierCommand = barrierAction;
firstReaderHoldCount--;
if (c == 0)->signalNotEmpty();
doReleaseShared();
先获取剩余资源数减去要获取的资源数
尾节点为空把头节点指向新节点
int available = getState(); int remaining = available - acquires;
最后判断,如果超时了就需要出队
queue.put(i);
将等待队列的节点放入同步队列中
当前线程是否为独占的线程
w == 0 || current != getExclusiveOwnerThread()
如果不是,就直接抛出异常
rh = readHolds.get();
--rh.count;
传入true为公平锁false为非公平锁
if (next == null)->lastWaiter = trail;
获取当前元素个数获取读锁
free = true;setExclusiveOwnerThread(null);
如果下一个节点是空的,并且等待状态大于0
根据构造参数确定是公平还是非公平的创建读锁与写锁的实例
count.get() == capacity
Thread.currentThread() != getExclusiveOwnerThread()
nonfairTryAcquireShared(acquires)
pred.next = node;->return node;
boolean nonfairTryAcquire(int acquires)
h != null && h.isCancelled()
sync.acquireSharedInterruptibly(permits);
t.nextWaiter = node;
s == null || s.waitStatus > 0-> s = null;
acquire(1);
并且独占线程不是当前线程
失败
throw new Error(\"Maximum lock count exceeded\");
firstWaiter = node;
trail.nextWaiter = next;
w + exclusiveCount(acquires) > MAX_COUNT
index == 0->boolean ranAction = false;
创建一个标志记录是否释放成功然后判断当前锁状态是否为0
int c = -1; Node<E> node = new Node<E>(e); final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count;
selfInterrupt();
for(;;)执行以下步骤计算释放后的资源
是否为头节点,并且cas把头设置为下一个节点是否成功
把当前计数器放入读锁的计算器集合中
先去尝试获取锁
获取等待队列头节点的下一个节点然后把等待队列中的这个节点清空
LockSupport.unpark(node.thread);
if( s != null )->LockSupport.unpark(s.thread);
x = dequeue(); c = count.getAndDecrement();
if (++takeIndex == items.length)->takeIndex = 0;count--;
查看当前线程是否排在等待队列最前面
获取失败就入队阻塞
如果当前增加之后个数小于最大阈值就唤醒生产者
等待状态不为0,说明已经添加了读锁或写锁
第一个读锁的重入次数加1
如果state为0就cas获取锁,并设置当前线程独占,如果不为0且是当前线程独占就设置重入次数加1
已满,抛出异常
先创建当前线程的node然后获取队列的尾节点
if (free)
构建失败标识
如果尾节点没有被取消那就修改状态为-1
int nextc = getState() - releases;->boolean free = exclusiveCount(nextc) == 0;
while循环
Node h = head;->if( h != null && h.waitStatus != 0 )
累加重入后更新状态
int savedState = getState();
判断,如果当前获取锁的不是自己就报错,不能释放别人的锁
尾节点为空,这里可能会创建一个等待队列
boolean tryRelease(int releases)
与独占锁实现相识只看不同点
p == head && tryAcquire(arg)
如果没有获取到锁,那么就去尝试获取锁
减1
如果超时了就打断屏障转换队列等操作并抛出异常
初始化个数,默认是非公平锁
int tryAcquireShared(int unused)
takeLock.lockInterruptibly();
并且头节点不为空或者没有被put标记获取
如果个数已经到达最大值获取写锁,并阻塞生产者
如果读锁队列头是当前线程
成功返回资源状态
获取尾节点状态
count == items.length
finally最后判断
获取队列的头节点,然后把当前节点设置为头节点
boolean failed = true;
设置本节点为头尝试传递唤醒
写锁重入超出最大值
tryAcquireShared(arg) < 0
Semaphore.boolean tryReleaseShared(int releases)
如果成功更改就返回
并且进行park。。。。唤醒后会重置中断标志志
exclusiveCount(c) != 0
HoldCounter rh = null;
先执行传入的方法
cas修改
if (c == capacity)->signalNotFull();
如果当前无锁,c==0那尝试cas加锁,writerShouldBlock非公平锁直接返回false
如果当前头节点不为空,并且队列里面还有节点等待
return fullTryAcquireShared(current);
Thread current = Thread.currentThread();
以上条件不满足就放回false
如果本节点的等待状态已经修改为0,就去尝试修改本节点的状态为传播的状态如果失败,跳过本次
成功修改后进行释放资源,否则return false
generation.broken = true;count = parties;trip.signalAll();
final ReentrantLock lock = this.lock;lock.lock();
void doReleaseShared()
转换所有的等待线程重置资源,新建年代
t.waitStatus != Node.CONDITION
int ws = p.waitStatus;
先查看许可证是否有剩余没有就报错
nonfairTryAcquire
final ReentrantLock lock = this.lock;-> lock.lockInterruptibly();
Node h = head; setHead(node);
final Runnable command = barrierCommand;->if (command != null)command.run();
Node t = firstWaiter;Node trail = null;
构建循环屏障parties记录次数,以后可以恢复count为计数次数command为记录方法
tryAcquireShared(arg)
失败就入队等待
如果同步队列的(第一个是头)第二个节点不是写节点并且锁次数小于最大值并且cas创建成功
如果不为0,则可能是重入锁那么就更新state然后放回标志为false
next < current->throw new Error(\"Maximum permit count exceeded\");
如果下一个节点不为空就唤醒它
0 条评论
下一页