多线程
2022-08-08 15:32:37 11 举报
AI智能生成
多线程相关概念和注意点
作者其他创作
大纲/内容
JAVA内存模型
JAVA对象模型
AQS/AbstractQueuedSynchronizer
AQS/抽象队列式同步器
子类们必须定义改变state变量的protected方法这些方法定义了state是如何被获取或释放的
子类们必须定义改变state变量的protected方法这些方法定义了state是如何被获取或释放的
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
throw new UnsupportedOperationException();
}
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
throw new UnsupportedOperationException();
}
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
throw new UnsupportedOperationException();
}
protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}
throw new UnsupportedOperationException();
}
Sync
FairSync
final void lock() {
acquire(1);
}
acquire(1);
}
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
// 获取锁失败则创建节点
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
if (!tryAcquire(arg) &&
// 获取锁失败则创建节点
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
//无锁
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
//有锁 看是否是自己持有了锁
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
final Thread current = Thread.currentThread();
int c = getState();
//无锁
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
//有锁 看是否是自己持有了锁
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
//始终返回tail节点
return node;
}
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
//始终返回tail节点
return node;
}
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
//无限循环
for (;;) {
final Node p = node.predecessor();
//如果node获取到了锁return interrupted
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
//获取锁失败后
//如果p的node.status是 -1 (执行完毕)
//&& 当前线程isInterrupted
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
boolean failed = true;
try {
boolean interrupted = false;
//无限循环
for (;;) {
final Node p = node.predecessor();
//如果node获取到了锁return interrupted
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
//获取锁失败后
//如果p的node.status是 -1 (执行完毕)
//&& 当前线程isInterrupted
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
//如果前置节点Node.SIGNAL 那么return true
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
//如果前置节点Node status > 0那么 删除这个节点。
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
}
//如果前置节点 -2 ,-3 那么尝试改变这个节点状态为Node.SIGNAL。
else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
int ws = pred.waitStatus;
//如果前置节点Node.SIGNAL 那么return true
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
//如果前置节点Node status > 0那么 删除这个节点。
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
}
//如果前置节点 -2 ,-3 那么尝试改变这个节点状态为Node.SIGNAL。
else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
/** waitStatus value to indicate thread has cancelled */
static final int CANCELLED = 1;
static final int CANCELLED = 1;
/** waitStatus value to indicate successor's thread needs unparking */
static final int SIGNAL = -1;
static final int SIGNAL = -1;
/** waitStatus value to indicate thread is waiting on condition */
static final int CONDITION = -2;
static final int CONDITION = -2;
/**
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate
*/
static final int PROPAGATE = -3;
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate
*/
static final int PROPAGATE = -3;
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
LockSupport.park(this);
return Thread.interrupted();
}
NonfairSync
final void lock() {
//先尝试抢锁
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
//先尝试抢锁
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
CAS
ThreadLocal
Thread
ThreadLocal.ThreadLocalMap
static class Entry extends WeakReference<ThreadLocal<?>> {
/** The value associated with this ThreadLocal. */
Object value;
Entry(ThreadLocal<?> k, Object v) {
super(k);
value = v;
}
}
/** The value associated with this ThreadLocal. */
Object value;
Entry(ThreadLocal<?> k, Object v) {
super(k);
value = v;
}
}
Reference
PhantomReference
SoftReference
WeakReference
引用状态
Active: Subject to special treatment by the garbage collector. Some
time after the collector detects that the reachability of the
referent has changed to the appropriate state, it changes the
instance's state to either Pending or Inactive, depending upon
whether or not the instance was registered with a queue when it was
created. In the former case it also adds the instance to the
pending-Reference list. Newly-created instances are Active.
time after the collector detects that the reachability of the
referent has changed to the appropriate state, it changes the
instance's state to either Pending or Inactive, depending upon
whether or not the instance was registered with a queue when it was
created. In the former case it also adds the instance to the
pending-Reference list. Newly-created instances are Active.
Pending: An element of the pending-Reference list, waiting to be
enqueued by the Reference-handler thread. Unregistered instances
are never in this state.
enqueued by the Reference-handler thread. Unregistered instances
are never in this state.
Enqueued: An element of the queue with which the instance was
registered when it was created. When an instance is removed from
its ReferenceQueue, it is made Inactive. Unregistered instances are
never in this state.
registered when it was created. When an instance is removed from
its ReferenceQueue, it is made Inactive. Unregistered instances are
never in this state.
Inactive: Nothing more to do. Once an instance becomes Inactive its
state will never change again.
state will never change again.
private void set(ThreadLocal<?> key, Object value) {
// We don't use a fast path as with get() because it is at
// least as common to use set() to create new entries as
// it is to replace existing ones, in which case, a fast
// path would fail more often than not.
Entry[] tab = table;
int len = tab.length;
//计算 hash之后的应该存放的桶位置
int i = key.threadLocalHashCode & (len-1);
for (Entry e = tab[i];
//如果应该存放的桶位置被占用了就进入循环
e != null;
e = tab[i = nextIndex(i, len)]) {
ThreadLocal<?> k = e.get();
//如果占用位置的e 的 threadlocal是同一个,那么更新这个threadlocal
if (k == key) {
e.value = value;
return;
}
//如果占用位置的threadlocal弱引用已经被清除掉
//,进行replaceStaleEntry方法
if (k == null) {
replaceStaleEntry(key, value, i);
return;
}
}
tab[i] = new Entry(key, value);
int sz = ++size;
if (!cleanSomeSlots(i, sz) && sz >= threshold)
rehash();
}
// We don't use a fast path as with get() because it is at
// least as common to use set() to create new entries as
// it is to replace existing ones, in which case, a fast
// path would fail more often than not.
Entry[] tab = table;
int len = tab.length;
//计算 hash之后的应该存放的桶位置
int i = key.threadLocalHashCode & (len-1);
for (Entry e = tab[i];
//如果应该存放的桶位置被占用了就进入循环
e != null;
e = tab[i = nextIndex(i, len)]) {
ThreadLocal<?> k = e.get();
//如果占用位置的e 的 threadlocal是同一个,那么更新这个threadlocal
if (k == key) {
e.value = value;
return;
}
//如果占用位置的threadlocal弱引用已经被清除掉
//,进行replaceStaleEntry方法
if (k == null) {
replaceStaleEntry(key, value, i);
return;
}
}
tab[i] = new Entry(key, value);
int sz = ++size;
if (!cleanSomeSlots(i, sz) && sz >= threshold)
rehash();
}
//这个方法的前提是:staleSlot位置key 为null
//占用位置的threadlocal弱引用已经被清除
private void replaceStaleEntry(ThreadLocal<?> key, Object value,
int staleSlot) {
Entry[] tab = table;
int len = tab.length;
Entry e;
int slotToExpunge = staleSlot;
for (int i = prevIndex(staleSlot, len);
(e = tab[i]) != null;
i = prevIndex(i, len))
if (e.get() == null)
slotToExpunge = i;
// Find either the key or trailing null slot of run, whichever
// occurs first
for (int i = nextIndex(staleSlot, len);
(e = tab[i]) != null;
i = nextIndex(i, len)) {
ThreadLocal<?> k = e.get();
// If we find key, then we need to swap it
// with the stale entry to maintain hash table order.
// The newly stale slot, or any other stale slot
// encountered above it, can then be sent to expungeStaleEntry
// to remove or rehash all of the other entries in run.
if (k == key) {
e.value = value;
//i 位置 设为 threadlocal为 null 的e
tab[i] = tab[staleSlot];
//staleSlot位置 设为最新的e
tab[staleSlot] = e;
// Start expunge at preceding stale entry if it exists
if (slotToExpunge == staleSlot)
slotToExpunge = i;
cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
return;
}
// If we didn't find stale entry on backward scan, the
// first stale entry seen while scanning for key is the
// first still present in the run.
if (k == null && slotToExpunge == staleSlot)
slotToExpunge = i;
}
// If key not found, put new entry in stale slot
tab[staleSlot].value = null;
tab[staleSlot] = new Entry(key, value);
// If there are any other stale entries in run, expunge them
if (slotToExpunge != staleSlot)
cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
}
//占用位置的threadlocal弱引用已经被清除
private void replaceStaleEntry(ThreadLocal<?> key, Object value,
int staleSlot) {
Entry[] tab = table;
int len = tab.length;
Entry e;
int slotToExpunge = staleSlot;
for (int i = prevIndex(staleSlot, len);
(e = tab[i]) != null;
i = prevIndex(i, len))
if (e.get() == null)
slotToExpunge = i;
// Find either the key or trailing null slot of run, whichever
// occurs first
for (int i = nextIndex(staleSlot, len);
(e = tab[i]) != null;
i = nextIndex(i, len)) {
ThreadLocal<?> k = e.get();
// If we find key, then we need to swap it
// with the stale entry to maintain hash table order.
// The newly stale slot, or any other stale slot
// encountered above it, can then be sent to expungeStaleEntry
// to remove or rehash all of the other entries in run.
if (k == key) {
e.value = value;
//i 位置 设为 threadlocal为 null 的e
tab[i] = tab[staleSlot];
//staleSlot位置 设为最新的e
tab[staleSlot] = e;
// Start expunge at preceding stale entry if it exists
if (slotToExpunge == staleSlot)
slotToExpunge = i;
cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
return;
}
// If we didn't find stale entry on backward scan, the
// first stale entry seen while scanning for key is the
// first still present in the run.
if (k == null && slotToExpunge == staleSlot)
slotToExpunge = i;
}
// If key not found, put new entry in stale slot
tab[staleSlot].value = null;
tab[staleSlot] = new Entry(key, value);
// If there are any other stale entries in run, expunge them
if (slotToExpunge != staleSlot)
cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
}
可见性
可见性是指当多个线程访问同一个变量时,一个线程修改了这个变量的值,其他线程能够立即看得到修改的值。
缓存导致的可见性问题
i++
volatile,synchronized,final
volatile对于可见性的实现,内存屏障也起着至关重要的作用。因为内存屏障相当于一个数据同步点,他要保证在这个同步点之后的读写操作必须在这个点之前的读写操作都执行完之后才可以执行。并且在遇到内存屏障的时候,缓存数据会和主存进行同步,或者把缓存数据写入主存、或者从主存把数据读取到缓存。
被synchronized修饰的代码,在开始执行时会加锁,执行完成后会进行解锁。而为了保证可见性,Happens-Before 规则有一条规则是这样的:对一个变量解锁之前,必须先把此变量同步回主存中。这样解锁后,后续线程就可以访问到被修改后的值。
Happens-Before 规则
有没有什么办法可以不使用synchronized和lock,如何实现一个线程安全的单例?
public class Singleton {
private static final AtomicReference<Singleton> INSTANCE = new AtomicReference<Singleton>();
private Singleton() {}
public static Singleton getInstance() {
for (;;) {
Singleton singleton = INSTANCE.get();
if (null != singleton) {
return singleton;
}
singleton = new Singleton();
if (INSTANCE.compareAndSet(null, singleton)) {
return singleton;
}
}
}
}
private static final AtomicReference<Singleton> INSTANCE = new AtomicReference<Singleton>();
private Singleton() {}
public static Singleton getInstance() {
for (;;) {
Singleton singleton = INSTANCE.get();
if (null != singleton) {
return singleton;
}
singleton = new Singleton();
if (INSTANCE.compareAndSet(null, singleton)) {
return singleton;
}
}
}
}
原子性
原子性是指在一个操作中就是cpu不可以在中途暂停然后再调度,既不被中断操作,要不执行完成,要不就不执行
在并发编程中,原子性的定义不应该和事务中的原子性一样。他应该定义为:一段代码,或者一个变量的操作,在没有执行完之前,不能被其他线程执行
线程切换带来的原子性问题
i++
synchronized
对于同步方法,JVM采用ACC_SYNCHRONIZED标记符来实现同步
方法级的同步是隐式的。同步方法的常量池中会有一个ACC_SYNCHRONIZED标志。当某个线程要访问某个方法的时候,会检查是否有ACC_SYNCHRONIZED,如果有设置,则需要先获得监视器锁,然后开始执行方法,方法执行之后再释放监视器锁。这时如果其他线程来请求执行方法,会因为无法获得监视器锁而被阻断住。值得注意的是,如果在方法执行过程中,发生了异常,并且方法内部并没有处理该异常,那么在异常被抛到方法外面之前监视器锁会被自动释放
对于同步代码块。JVM采用monitorenter、monitorexit两个指令来实现同步
同步代码块使用monitorenter和monitorexit两个指令实现。可以把执行monitorenter指令理解为加锁,执行monitorexit理解为释放锁。 每个对象维护着一个记录着被锁次数的计数器。未被锁定的对象的该计数器为0,当一个线程获得锁(执行monitorenter)后,该计数器自增变为 1 ,当同一个线程再次获得该对象的锁的时候,计数器再次自增。当同一个线程释放锁(执行monitorexit指令)的时候,计数器再自减。当计数器为0的时候。锁将被释放,其他线程便可以获得锁
synchronized其实是借助Monitor实现的,在加锁时会调用objectMonitor的enter方法,解锁的时候会调用exit方法。事实上,只有在JDK1.6之前,synchronized的实现才会直接调用ObjectMonitor的enter和exit,这种锁被称之为重量级锁
为什么synchronized可以保证原子性
因为被synchronized修饰的代码片段,在进入之前加了锁,只要他没执行完,其他线程是无法获得锁执行这段代码片段的,就可以保证他内部的代码可以全部被执行。进而保证原子性。
为什么volatile不能保证原子性
因为他不是锁,他没做任何可以保证原子性的处理
只有在JDK1.6之前,synchronized的实现才会直接调用ObjectMonitor的enter和exit,这种锁被称之为重量级锁。为什么说这种方式操作锁很重呢?
Java的线程是映射到操作系统原生线程之上的,如果要阻塞或唤醒一个线程就需要操作系统的帮忙,这就要从用户态转换到核心态,因此状态转换需要花费很多的处理器时间,对于代码简单的同步块(如被synchronized修饰的get 或set方法)状态转换消耗的时间有可能比用户代码执行的时间还要长,所以说synchronized是java语言中一个重量级的操纵。
有序性
有序性即程序执行的顺序按照代码的先后顺序执行。
编译优化带来的有序性问题
双检锁单例
volatile,synchronized
volatile是通过内存屏障来来禁止指令重排的。https://www.hollischuang.com/archives/2673
synchronized关键字保证同一时刻只允许一条线程操作
为什么synchronized无法禁止指令重排却能保证有序性?
Java程序中天然的有序性可以总结为一句话:如果在本线程内观察,所有操作都是天然有序的。如果在一个线程中观察另一个线程,所有操作都是无序的。这其实和as-if-serial语义有关。
as-if-serial语义的意思指:不管怎么重排序(编译器和处理器为了提高并行度),单线程程序的执行结果都不能被改变。编译器和处理器无论如何优化,都必须遵守as-if-serial语义。简单说就是,as-if-serial语义保证了单线程中,指令重排是有一定的限制的,而只要编译器和处理器都遵守了这个语义,那么就可以认为单线程程序是按照顺序执行的。当然,实际上还是有重排的,只不过我们无须关心这种重排的干扰
synchronized通过排他锁的方式就保证了同一时间内,被synchronized修饰的代码是单线程执行的。所以呢,这就满足了as-if-serial语义的一个关键前提,那就是单线程,因为有as-if-serial语义保证,单线程的有序性就天然存在了。
但是!!!我们可以说,synchronized保证的有序性是多个线程之间的有序性,即被加锁的内容要按照顺序被多个线程执行。但是其内部的同步代码还是会发生重排序,只不过由于编译器和处理器都遵循as-if-serial语义,所以我们可以认为这些重排序在单线程内部可忽略。所以在双检锁单例情况下,只用synchronized仍会出现问题。
0 条评论
下一页