07_concurrent包
2024-05-23 21:57:23 0 举报
JUC并发包下的大部分核心类,AtomicInteger,LongAdder,concurrentHashMap,AtomicReference,ReentractLock,CopyOnWriteArrayList,BlockingQueue
作者其他创作
大纲/内容
semaphore原理
释放锁takeLock.unlock();
释放锁lock.unlock()
开始执行wait()后的代码
state != 0?是否有人上锁
改变对象引用
自旋等待,等不及后,尝试去Cell[1]中去操作
创建双向链表里面是一个个Node
User user
上锁成功
semaphore
LongAdder
尝试唤醒阻塞在“队列为空”的线程notEmpty.signal();
修改School中User
挂起
无限自旋等待
如果当前数量==0就阻塞住
获取锁
head
N
迭代next()
Condition
next
CC
Node-new
barrierAction.run()触发每次执行完任务的方法注意:这里并没有用start()
2、unpark
Node-Thread1
acquire(1)
School
进入while,再次检查waitStatus=CONDITION?
User0
释放锁fullyRelease(node)
阻塞 notFull.await();
offer
tail
Y
将当前node 的前一个node的waitStatus设为-1 (SIGNAL)
唤醒后
Node head
X == 1?
firstWaiter
ReentrantLock putLock
指向新的引用
快照机制,保证无锁迭代,优点是性能高,缺点是,snapshot获取的是构建迭代器那一刻的数据,而不是最新的
Node tail
Class clazz
通过valueOffset获取this(AtomicInteger)的value值 X
累加 1
volatile long value
value = 3
当前Node-Thread2 的prev是否是head?
capacity
AbstractQueuedSynchronizer的等待上锁队列
第一段:从0 拷贝,长度= index第二段:从index+1 拷贝,长度=numMoved
N 获取令牌,允许执行
Thread exclusiveOwnerThread(父类)
返回当前数组
返回下标对应的元素(E) items[i]
设置waitStatus=0
加入队尾
获取当前数组长度len = current.length
如果当前数量==队列数量就阻塞住
AtomicReference修改一坨对象
如果Hashcode一样,key也一样
释放信号量资源semaphore.release();
加锁成功
挂起当前线程LockSupport.park(this);Thread.interrupted();
BB
take
LinkedBlockingQueue
Unsafe.getObjectVolatile获取volatile值,保证可见性
ReentrantLock takeLock
ReentrantReadWriteLock读写锁 原理
加入等待队列后,立刻释放锁
while (count == items.length) notFull.await();
两个对象的引用是否依然指向之前的引用且stamp=1?
上锁lock.lock()
构造双向队列(此处省略)参考reentrantlock
addConditionWaiter
Node
优点:1、运用了unsafe来执行底层的轻量级锁操作,同一时间只允许一个线程进行CAS操作2、通过CAS实现无锁化缺点:1、自旋等待问题,如果CAS失败会无限自旋,直到修改成功。2、ABA问题
lastWaiter
count.get()
size
valueOffset
ReentractReadWriteLock
将新增加的元素放到末尾newElements[len] = e
上锁lock.lockInterruptibly()
遍历链表count++
put
如果是公平锁,不会尝试修改state,而是直接acquire,先判断一下队列里面如果有人在排队,他也得入队
尝试唤醒阻塞在“队列已满”的线程notFull.signal();
读写互斥写写互斥读读不互斥
只要是通过setHead设置head指针的node,里面的指针就会全部指向null,作为一个新的空node作为headhead = node;node.thread = null;node.prev = null;
LongAdder优化无限自旋
替换原数组setArray(newElements);
如果是当前线程
concurrentHashMap原理
thread2
释放锁
挂起当前线程LockSupport.park(this);
Condition notEmpty = lock.newCondition()
上锁synchronized (node)
无界队列优点是:所有操作都是cas无锁操作,在保证线程安全的情况下,提供高性能的读写。缺点是:size和iterator执行的时候,可能其他线程会对一些node进行写(poll或者add)操作,导致数据不是强一致性的。size是遍历整个链表,性能差
获取put锁putLock.lock();
快速处理流程
numMoved == 0
修改元素set()
通过CAS写入
对多个对象整体进行CAS操作,可以聚合在一个对象中,然后包裹在AtomicReference中
lock.lock()要想signal()先得获取锁
state
CAS原子性操作
Object[] items
AA
指向的引用
while (!isOnSyncQueue(node)) { LockSupport.park(this); //挂起 //唤醒后接着执行 //检查下,那个线程是否已经interrupt了,如果没有返回0 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; }
initTable()初始化table默认大小=16
state修改成功?
Node-Thread2
需要移动的数量numMoved = len - index - 1
删除下标元素remove(x)
Condition notEmpty = takeLock.newCondition()
是否等于请求上锁的线程Thread2
获取信号量 semaphore.acquire();
lock.lock()要想await()先得获取锁
队头出队
nextWaiter
唤醒LockSupport.unpark(node.thread);
这里使用分段加锁策略,将锁优化提现的淋漓尽致。concurrentHashMap将锁加到table的每个元素上,如果是默认大小16,那么可以最大支持16的线程同时对concurrentHashMap上锁。妙哉!!!
挂成链表设置node.next = new Node
null
再次获取state如果是0就尝试上锁
设置exclusiveOwnerThread
加入队首
开始挂起操作
重置parties以便下一批执行
上锁putLock.lockInterruptibly()
NonfairSync.lock()
ArrayBlockingQueue
改变对象引用并修改stamp
阻塞 notEmpty.await();
DD
修改value为1
初始化 0 大小的 array
队尾增加一个Node.SHARED节点
while (count.get() == 0) { notEmpty.await(); }
低位写锁 != 0并且不是同一个线程
用notEmpty condition尝试唤醒阻塞在“队列为空”的线程,让他们继续take node
AtomicInteger
唤醒其他等待锁的线程unparkSuccessor(h)
获取当前上锁的线程getExclusiveOwnerThread()
返回
返回count
thread1
Unsafe
获取state
AtomicStampedReference
将第一个wait的线程唤醒doSignal(first)
获取当前数组Object[] elements = getArray()
修改state-1
将请求线程包装为Node
设置exclusiveOwnerThread为当前线程
value = 5
CAS修改为新值
AQS
stamp = 1
User2
--count
释放锁putLock.unlock();
ReentrantLock lock
ReentractLock
barrierAction每批执行完后触发的方法
获取值 8
getArray()
ReentrantLock
CopyOnWriteArrayList
修改失败
Condition notFull = lock.newCondition()
链表转红黑树从o(n) 提升到 o(logn)
Condition notFull = putLock.newCondition()
poll
修改值!=旧值
上锁次数+1
将新元素更新到指定下标newElements[index] = element;
上全锁putLock.lock();takeLock.lock();
volatile int value =0
stamp = 2
获取node
不是写锁?或者不是当前线程?
唤醒
AtomicStampedReference修改一坨对象并解决ABA问题
迭代器构造COWIterator
Cell 1
prev
获取元素get(x)
count任务数
condition.signalAll()唤醒所有等待的子任务
获取take锁takeLock.lock();
加入红黑树节点
Class1
Cell 3
Cell 2
拿到head指向的item
thread1开始尝试获取锁
ReentractLock原理
上写锁state+1(低位+1)
创建Node
挂起当前线程
写不了,我很忙
head指向当前node修改之前队列结构
state - acquires < 0
AtomicInteger count
也就是说,一个thread过来上锁,如果获取不到锁,并且还入队了,那么就挂起。
doAcquireSharedInterruptibly()
Thread2
COWIterator
添加元素add()
每个任务完成后执行await()
cyclicBarrier原理
thread1拿到锁
上锁takeLock.lockInterruptibly()
两段拷贝
AQS(AbstractQueuedSynchronizer)
直接无锁返回array[x]
spread()高低位异或,充分体现数据的hash特征,降低冲突
ConcurrentLinkedQueue单向无界队列
Object[] snapshot
修改state+1
Atomic系列
LinkedBlockingQueue线程安全单向有界队列两把独占锁
重试拿信号量
写锁标识(获取state低16位)
释放全部锁takeLock.unlock();putLock.unlock();
readerLock
是否==0
巧妙的通过state高低位来区分读写锁,低16位写锁,高16位读锁
Node==null
再次尝试获取锁
Thread1
volatile Object[] array
修改state和设置加锁线程成功?
while (count.get() == capacity) { notFull.await(); }
两个对象的引用是否依然指向之前的引用?
cursor
请求加锁reentrantLock.lock()
reentractlock底层是基于AQS(AbstractQueuedSynchronizer 抽象队列同步器)实现的,其中核心是state(加锁状态),currentThread,和Node队列
1、获取队列中挂起的线程
AbstractQueuedSynchronizer中Condition的等待上锁队列
说明thread1已经加了写锁
Condition实现wait和signal
doReleaseShared()
parties每一批任务数(3个)
thread
构造双向队列(参考:reentrantlock)
GC回收掉
ArrayBlockingQueue基于数组线程安全有界队列一把独占锁
设置最大活动线程数量 state = 2
获取高位读锁state >>> SHARED_SHIFT
上写锁
修改一堆指针firstReaderfirstReaderHoldCount
binCount>8链表长度是否大于8
如果 head node是读锁?
tryAcquireShared()
用notFull condition尝试唤醒阻塞在“队列已满”的线程,让他们继续添加node
Cell 0
上写锁state+1(低位+1)实现重入锁
优点是:两把独占所控制出队入队,数据强一致性。缺点是:遍历上锁性能差。
await()
设置state = 2标识同一个线程又加一次锁(用state实现重入锁)
CAS设置这个new Node
while (count == 0) notEmpty.await();
AtomicReference
writerLock
是否已成为红黑树
说明删除的是末尾元素
cyclicbarrier
再次设置state的高位
condition.await()脑补condition原理
获取当前数组长度len = elements.length
上读锁
创建一个新的数组,长度-1Object[] newElements = new Object[len - 1];
0 条评论
下一页