Java多线程
2020-07-31 14:04:23 3 举报
AI智能生成
巨全的Java多线程编程知识图谱
作者其他创作
大纲/内容
并发容器及框架
ConcurrentHashMap
jdk7和jdk8之后的实现不同,jdk7使用的是segment继承了ReentrantLock的分段锁,对entry分段来实现的。
jdk8使用的是CAS+synchronized来保证并发安全。底层使用的存储结构是:数组+链表/红黑树的存储结构。
jdk8使用的是CAS+synchronized来保证并发安全。底层使用的存储结构是:数组+链表/红黑树的存储结构。
重要的内部类
node
key-value键值对
TreeNode
红黑树节点
TreeBin
相当于一颗红黑树,其构造过程就是一颗红黑树
ForwardingNode
辅助节点,用于ConcurrentHashMap的扩容操作。
setCtl
控制标识符,用来控制table初始化和扩容操作的
含义
负数表示正在初始化或扩容操作的
-1 表示正在初始化
-N 表示有N-1个线程正在进行扩容操作
正数或0 代表hash表还没有被初始化,这个数值表示初始化或下一次扩容大小
ReservationNodes
是一个占位在类似computeIfAbsent的方法中。
重要方法
initTable
ConcurrentHashMap初始化方法
只能有一个线程参与初始化,其它线程必须挂起
构造函数不做初始化过程,初始化正在是在put方法中触发的
步骤:
1. sizeCtl < 0表示正在进行初始化,线程挂起
2. 线程获取初始化资格(CAS(SIZECTL, sc, -1))进行初始化
3. 初始化步骤完成之后,设置sizeCtl = 0.75*n(下一次扩容阀值),表示下一次扩容大小。
1. sizeCtl < 0表示正在进行初始化,线程挂起
2. 线程获取初始化资格(CAS(SIZECTL, sc, -1))进行初始化
3. 初始化步骤完成之后,设置sizeCtl = 0.75*n(下一次扩容阀值),表示下一次扩容大小。
putVal
核心思想
根据计算的hash值计算节点插入table的位置,如果该位置为空,则直接插入,否则插入到链表/树之中
步骤
table为null,线程进入初始化步骤,如果有其它线程正在初始化,该线程挂起
如果插入的当前位置i为null,则是第一次insert,直接cas到成功即可,插入成功则调用addCount判断是否扩容。
若失败则继续匹配(自旋)
若失败则继续匹配(自旋)
若该节点的hash == MOVED(-1), 表示有线程正在扩容,则进入扩容等待中
其余情况就是按照节点插入链表或者红黑树之中,这个过程需要加锁(synchronized)
get
从链表/红黑树节点获取
扩容
多线程扩容
步骤
构建一个nextTable,其大小是原来大小的两背,这个步骤是在单线程环境下完成的
将原来的table里面的内容复制到nextTable中,这个步骤是允许多线程操作
链表转换为红黑树过程
红黑树算法
实在链表元素到达阀值8,则将链表转换为红黑树
ConcurrentLinkedQueue
基于链表节点无边界的线程安全队列,采用FIFO原则对元素进行管理,内部使用CAS算法实现
不变性
在入队的最后一个元素的next为null
队列中所有为删除的节点的item都不能为null且都能从head节点遍历到
对于要删除的节点,不是直接将其设置为null,而是先将其item域设置为null,(迭代器会自动跳过为null的节点)
允许head和tail更新滞后: head,tail不总是第一个和最后一个元素
这里使用的惰性更新策略,将数据查询的复杂度放到插入的时候。
当插入新的节点的时候,如果发现最后一个node不是tail,那么就会设置tail为最后一个node,然后再在tail.next赋值
当poll数据的时候,如果发现开头的node的item是null,那么head就被别的线程拿走了,它会移动head,并尝试取head的item
这里使用的惰性更新策略,将数据查询的复杂度放到插入的时候。
当插入新的节点的时候,如果发现最后一个node不是tail,那么就会设置tail为最后一个node,然后再在tail.next赋值
当poll数据的时候,如果发现开头的node的item是null,那么head就被别的线程拿走了,它会移动head,并尝试取head的item
精妙之处:利用CAS来完成数据操作,同时允许队列的不一致性,弱一致性表现得淋漓尽致
ConcurrentSkipListMap
跳跃表的算法,支持并发的跳跃表
实现
大量使用CAS实现
ConcurrentNavigableMap
NavigableMap
SortedMap提供了获取最大值与最小值的方法,但对于一个已经排序的数据集,除了最大值与最小值之外,
我们还想对任何一个元素,找到比它小的值和比它大的值,还可以按照原有的顺序倒序排序等,
我们还想对任何一个元素,找到比它小的值和比它大的值,还可以按照原有的顺序倒序排序等,
ConcurrentMap和NavigableMap的一个集合
jdk11之中多处使用内存屏障实现。
它和ConcurrentLinkedQueue的区别是它提供了log(n)复杂的的containsKey,get,put,remove等方法。
ConcurrentSkipListSet
内部使用ConcurrentSkipListMap实现
CopyOnWriteArrayList
一个线程安全的ArrayList
实现的原理
使用了synchronized 锁
所有对 储存元素的操作 包括读写都使用了 synchronized 锁
CopyOnWriteArraySet
内部使用CopyOnReadWriteList实现
阻塞队列
阻塞算法
用锁来实现(可以出入队用同一把锁,也可以出入对使用不同的锁)
非阻塞算法
使用自旋CAS来实现
ArrayBlockingQueue
使用一把Reentrant锁来管理数组,是一个fifo的有界队列
有界队列:一开始就要声明queue的大小
有界队列:一开始就要声明queue的大小
底层是使用数组储存数据的。有界且固定,在构造函数之中声明其大小,并且不可变
通知模式:
使用两个condition,实现await和singale机制。
如果队列满的时候,就会通知入队的线程挂起
如果队列空的时候,通知出队的线程挂起
使用两个condition,实现await和singale机制。
如果队列满的时候,就会通知入队的线程挂起
如果队列空的时候,通知出队的线程挂起
LinkedBlockingQueue
two-lock-queue的一个变种
主要有两把锁。一把put锁,一把take锁。
两个锁都是ReentrantLock类型的锁。
一个负责入队的锁,一个负责出队的锁。
调用take,poll的时候使用take锁。
调用put, offer的时候使用put锁
两个锁都是ReentrantLock类型的锁。
一个负责入队的锁,一个负责出队的锁。
调用take,poll的时候使用take锁。
调用put, offer的时候使用put锁
通知模式:
同样使用两个condition,实现await和singale机制。
如果队列满的时候,就会通知入队的线程挂起
如果队列空的时候,通知出队的线程挂起
同样使用两个condition,实现await和singale机制。
如果队列满的时候,就会通知入队的线程挂起
如果队列空的时候,通知出队的线程挂起
注意contains这种函数会锁整个队列。
业界也将它称为无界队列,底层是用链表实现的queue,但是默认最大支持的大小是Integer.MAX_VALUE
LinkedBlockingDeque
使用的是一把锁ReentrantLock来管理队列
通知模式:
同样使用两个condition,实现await和singale机制。
如果队列满的时候,就会通知入队的线程挂起
如果队列空的时候,通知出队的线程挂起
同样使用两个condition,实现await和singale机制。
如果队列满的时候,就会通知入队的线程挂起
如果队列空的时候,通知出队的线程挂起
运用
工作窃取模式
PriorityBlockingQueue
一个逻辑上是无界的队列,有排序规则,排序规则和PriorityQueue一样
默认是按入队先后顺序排序,如果有指定的Comparator,就按指定的Comparator来堆元素进行排序
默认是按入队先后顺序排序,如果有指定的Comparator,就按指定的Comparator来堆元素进行排序
默认数组容量为11,最大数容量为Integer.MAX_VALUE-8.
因为有一些vms要求数组的header words保存在头部,占用了8*4=32个字节。
因为有一些vms要求数组的header words保存在头部,占用了8*4=32个字节。
queue的表现形式是一个平衡二叉堆 queye[n]的子节点是queue[2*n+1] 和 queue[2*(n+1)].
有扩容机制,使用的是一个volatile变量做锁,实际的扩容还是通过System.arraycopy扩容。
注意:这个queue是无界的,put方法不会block,这个方法和ArrayBlockingQueue,LinkedBlockingQueue不一样。
这个queue的put方法和offer一模一样
但是take方法还是会阻塞,直到能拿回元素。
这个queue的put方法和offer一模一样
但是take方法还是会阻塞,直到能拿回元素。
二叉堆
分类
最大堆
父节点的key总是大于或等于任何一个子节点的键值
最小堆
父节点的key总是小于或等于任何一个子节点的key
添加操作则不断“上冒”,添加操作则不断“下掉”
实现
ReentrantLock + Condition
二叉堆
DelayQueue
支持延时的无界阻塞队列
应用
缓存——清除掉缓存中超时储存的数据
任务超时处理
这个queue的put方法也和offer一样,没等待的作用
使用的是Leader-Follower模式。
储存的所有元素都是Delayed类型
需要实现long getDelay(TimeUnit unit);这个方法告诉queue需要延时多久才能出队
需要实现long getDelay(TimeUnit unit);这个方法告诉queue需要延时多久才能出队
Delayed接口
用来标记那些应该在给定延时时间之后执行的对象
该接口要求实现它的实现类必须定义一个compareTo方法,该方法提供与接口getDelay方法一直的排序
实现
PriorityQueue
ReentrantLock
储存的元素都implement 了 Delayed
根据Deplay时间排序的优先队列:PriorityQueue
condition的await来实现阻塞 ,而await使用的是LockSupport来实现的
1. 实现主要是依赖实现了Delayed的任务里面通过getDelayed获取到long类型来获取延迟的时间。
2. 在poll/take之中使用await(time),来阻塞一段时间。
3. 阻塞时间到之后,激活线程才会poll/take任务成功。
注意poll时间没到会返回null。
2. 在poll/take之中使用await(time),来阻塞一段时间。
3. 阻塞时间到之后,激活线程才会poll/take任务成功。
注意poll时间没到会返回null。
SychronousQueue
一个不存在储存的阻塞队列
每一个put操作必须等待一个take操作,否则不能继续添加元素
支持公平/非公平访问策略,如果是公平访问策略,那么等待的线程将会采用fifo顺序访问队列。
运用
适合生产者线程处理的数据直接传递给消费者。
队列本身不存任何数据,非常适合传递
队列本身不存任何数据,非常适合传递
SynchronousQueue的吞吐量大于LinkedBlockingQueue和ArrayBlockingQueue
LinkedTransferQueue
相对于其它阻塞队列,这个队列多了两个方法tryTransfer和transfer方法
transfer: 如果当前有消费者正在等待接收元素
transfer会把生产者传入的元素立刻transfer给消费者。否则就放入队列的tail。
tryTransfer: 这个方法会尝试把元素传递给消费者,成功则返回true,如果消费者没有接收到就返回false。
transfer会把生产者传入的元素立刻transfer给消费者。否则就放入队列的tail。
tryTransfer: 这个方法会尝试把元素传递给消费者,成功则返回true,如果消费者没有接收到就返回false。
这个queue的put方法也不会阻塞
PriorityQueue
并不是阻塞队列,只是blocking queue经常用到(PriorityBlockingQueue,DelayQueue)所以总结一下
这个类提供了O(log(n)) 级别的入队和出队方法。
O(N) 级别的remove,contains方法
O(1) 级别的peek,size方法
O(N) 级别的remove,contains方法
O(1) 级别的peek,size方法
实现和PriorityBlockingQueue基本没什么区别:就是一个PriorityBLockingQueue的无锁版
支持阻塞的插入/移除方法
offer和put的区别:
offer 这个queue如果offer满了,那么它不会等待,而是直接返回false。
put 这个队列如果满了那么它会阻塞到成功为止。
offer 这个queue如果offer满了,那么它不会等待,而是直接返回false。
put 这个队列如果满了那么它会阻塞到成功为止。
take和poll的区别:
take 获取queue的头部节点数据,如果获取不到就一直在那里等着(被挂起),直到能拿回第一个数据
poll 获取queue的头部节点数据,如果获取不到就返回null
take 获取queue的头部节点数据,如果获取不到就一直在那里等着(被挂起),直到能拿回第一个数据
poll 获取queue的头部节点数据,如果获取不到就返回null
peek取头部节点,但是不出队列
并发工具
CountDownLatch
允许一个或多个线程等待其它线程执行完任务
实现原理
AQS
使用的是AQS的共享模式
使用的是逆向思维,所谓的await就是争抢锁。
1. 构造函数指定了aqs的state的值
2. countDown方法 使用cas的方法state-1
3. tryAcquireShared只有state==0的时候,是允许获取锁
4. 获取到锁,await在后面执行。
1. 构造函数指定了aqs的state的值
2. countDown方法 使用cas的方法state-1
3. tryAcquireShared只有state==0的时候,是允许获取锁
4. 获取到锁,await在后面执行。
使用
final CountDownLatch countDownLatch = new CountDownLatch(2);
。。。
countDownLatch.countDown();
。。。
countDownLatch.await();
final CountDownLatch countDownLatch = new CountDownLatch(2);
。。。
countDownLatch.countDown();
。。。
countDownLatch.await();
CyclicBarrier
需要多个线程到达栅栏的时候,才能多个线程同时执行后面的逻辑。
api
CyclicBarrier(int parties)
CyclicBarrier(int parties, Runnable barrierAction)
能 声明一个runnable,到最后一个线程到达栅栏的时候,
最后一个线程帮你执行barrierAction,然后再去唤醒其它线程。
最后一个线程帮你执行barrierAction,然后再去唤醒其它线程。
await
实现原理
使用的是ReentrantLock和它的Condition来实现的。(下面的流程没考虑异常情况)
1. 在最后一个线程到达之前所有的线程都会调用condition的await进入等待队列。
2. 在最后一个线程到达的时候,执行barrierAction(如果有),然后唤醒其它线程。
1. 在最后一个线程到达之前所有的线程都会调用condition的await进入等待队列。
2. 在最后一个线程到达的时候,执行barrierAction(如果有),然后唤醒其它线程。
Semaphore
信号量:给一个信号,可以让线程在那里等待着,然后发出信号之后才会执行后面的共享的逻辑。
这个信号内部实现称为许可。
api
构造函数
Semaphore(int permits)
Semaphore(int permits, boolean fair)
申请一个许可
acquire()
acquireUninterruptibly()
tryAcquire()
释放许可
release(int permits)
实现原理
1. 使用了AQS实现了一个公平和非公平模式,在构造函数之中可以声明使用的是哪种模式。默认使用非公平模式
2. 申请许可就是获取一个共享锁
3. 释放许可就是释放一个共享锁
2. 申请许可就是获取一个共享锁
3. 释放许可就是释放一个共享锁
运用
可以使用来限制某些资源的访问数目
Exchanger
两个线程之间交换数据
Atom13个类
AtomicInteger
AtomicLong
Atomic...
线程池&并发框架
Fork/Join框架
一个用于执行任务的框架,是把大的任务分成小的若干小的任务,并最终汇总每个小任务结果得到大任务的结果的框架
实现
工作窃取算法
当一个线程的任务完成的时候,检查某个线程的任务没有完成,就窃取它的任务来执行
提升整个任务的完成效率
每个工作线程维持一个任务队列
任务队列以双端队列的形式维护,不仅支持先进后出的push和pop操作,还支持先进先出的take操作
由父任务fork出来的子任务被push到运行该父任务的工作线程对应的任务队列中
工作线程以先进后出的方式处理pop自己任务队列中的任务(优先处理最年轻的任务)
当任务队列中没有任务时,工作线程尝试随机从其他任务队列中窃取任务
当工作线程没有任务可以执行,且窃取不到任务时,它会“退出”(yiled、sleep、优先级调整),经过一段时间后再次尝试。除非其他所有的线程也都没有任务可以执行,这种情况下它们会一直阻塞直到有新的任务从上层添加进来
任务队列以双端队列的形式维护,不仅支持先进后出的push和pop操作,还支持先进先出的take操作
由父任务fork出来的子任务被push到运行该父任务的工作线程对应的任务队列中
工作线程以先进后出的方式处理pop自己任务队列中的任务(优先处理最年轻的任务)
当任务队列中没有任务时,工作线程尝试随机从其他任务队列中窃取任务
当工作线程没有任务可以执行,且窃取不到任务时,它会“退出”(yiled、sleep、优先级调整),经过一段时间后再次尝试。除非其他所有的线程也都没有任务可以执行,这种情况下它们会一直阻塞直到有新的任务从上层添加进来
核心思想
分而治之
fork分解任务,join收集任务结果
fork
1. 调用fork的时候,程序会调用ForkJoinWorkerThread的pushTask进入任务的入队,然后立刻返回这个线程。
2. 通过调用ForkJoinPool的signalWork() 方法唤醒或创建一个工作线程来执行任务。
2. 通过调用ForkJoinPool的signalWork() 方法唤醒或创建一个工作线程来执行任务。
join
1. 阻塞当前线程,并等待执行结果
实际调用的是doJoin,
1. 是调用实际计算程序compute。
2. 根据compute的返回状态收集结果,可能是异常/正常结果/取消等等。
3. 窃取其它线程任务,调用await 等待
1. 是调用实际计算程序compute。
2. 根据compute的返回状态收集结果,可能是异常/正常结果/取消等等。
3. 窃取其它线程任务,调用await 等待
框架的组成部分
ForkJoinPool
一个运行ForkJoinTask的 线程池
ForkJoinPool是ExecutorService的实现类,因此是一种特殊的线程池。
ForkJoinTask
ForkJoinPool的一个基类,是ForkJoinPool一个任务体现形式
ForkJoinTask是一个类似线程的实体,都是比线程轻量得多。
巨量的任务/子任务数量可以被少数的实际线程持有。
ForkJoinTask是一个类似线程的实体,都是比线程轻量得多。
巨量的任务/子任务数量可以被少数的实际线程持有。
实现了Future接口
ForkJoinTask是一个抽象类,它还有两个抽象子类:RecusiveAction和RecusiveTask。
其中RecusiveTask代表有返回值的任务,而RecusiveAction代表没有返回值的任务。
其中RecusiveTask代表有返回值的任务,而RecusiveAction代表没有返回值的任务。
ForkJoinWorkerThread
一个被ForkJoinPool管理的线程,在ForkJoinTask之中执行。是执行任务的线程
api
invoke
相当于 fork+join
Executor框架
组成
Executors
为 ThreadPoolExecutor,Callable,ForkJoinPool,FutureTask,Executor,ExecutorService,ScheduleExecutorService等提供静态工厂方法
api
线程池 ThreadPoolExecutor
固定线程数 newFixedThreadPool
用途:FixedThreadPool 用于负载比较大的服务器,为了资源的合理利用,需要限制当前线
程数量
程数量
一个线程的线程池 newSingleThreadExecutor
newCachedThreadPool
一个可根据实际情况调整线程个数的线程池,不限制最大线程
数量,若用空闲的线程则执行任务,若无任务则不创建线程。并且每一个空闲线程会在 60 秒
后自动回收
一个可根据实际情况调整线程个数的线程池,不限制最大线程
数量,若用空闲的线程则执行任务,若无任务则不创建线程。并且每一个空闲线程会在 60 秒
后自动回收
定时线程池 ScheduledThreadPoolExecutor
newScheduledThreadPool
创建一个可以指定线程的数量的线程池,但是这个线程池还带有
延迟和周期性执行任务的功能,类似定时器。
创建一个可以指定线程的数量的线程池,但是这个线程池还带有
延迟和周期性执行任务的功能,类似定时器。
工作窃取类型的线程池
newWorkStealingPool
线程池的构建不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式。
用 Executors 使得用户不需要关心线程池的参数配置,意味着大家对于线程池的运行规则也会慢慢的忽略。这会导致
一个问题,比如我们用 newFixdThreadPool 或者 singleThreadPool.允许的队列长度为
Integer.MAX_VALUE,如果使用不当会导致大量请求堆积到队列中导致 OOM 的风险
而 newCachedThreadPool,允许创建线程数量为 Integer.MAX_VALUE,也可能会导致大量
线程的创建出现 CPU 使用过高或者 OOM 的问题
用 Executors 使得用户不需要关心线程池的参数配置,意味着大家对于线程池的运行规则也会慢慢的忽略。这会导致
一个问题,比如我们用 newFixdThreadPool 或者 singleThreadPool.允许的队列长度为
Integer.MAX_VALUE,如果使用不当会导致大量请求堆积到队列中导致 OOM 的风险
而 newCachedThreadPool,允许创建线程数量为 Integer.MAX_VALUE,也可能会导致大量
线程的创建出现 CPU 使用过高或者 OOM 的问题
ThreadPoolExecutor
ScheduledThreadPoolExecutor
ForkJoinPool
FutureTask/Future接口
Future接口和实现Future接口的FutureTask类用来表示异步计算的结果。
当我们把Runnable 接口或Callable接口的实现类提交(submit)给ThreadPoolExecutor或
ScheduledThreadPoolExecutor时,ThreadPoolExecutor或ScheduledThreadPoolExecutor会向我们 返回一个FutureTask对象。
<T> Future<T> submit(Callable<T> task)
<T> Future<T> submit(Runnable task, T result)
Future<> submit(Runnable task)
当我们把Runnable 接口或Callable接口的实现类提交(submit)给ThreadPoolExecutor或
ScheduledThreadPoolExecutor时,ThreadPoolExecutor或ScheduledThreadPoolExecutor会向我们 返回一个FutureTask对象。
<T> Future<T> submit(Callable<T> task)
<T> Future<T> submit(Runnable task, T result)
Future<> submit(Runnable task)
Runnable接口和Callable接口
Runnable接口和Callable接口的实现类,都可以被ThreadPoolExecutor或Scheduled- ThreadPoolExecutor执行。
它们之间的区别是Runnable不会返回结果,而Callable可以返回结 果。
它们之间的区别是Runnable不会返回结果,而Callable可以返回结 果。
除了可以自己创建实现Callable接口的对象外,还可以使用工厂类Executors来把一个 Runnable包装成一个Callable。
下面是Executors提供的,把一个Runnable包装成一个Callable的API。
public static Callable<Object> callable(Runnable task) // 假设返回对象Callable
下面是Executors提供的,把一个Runnable和一个待返回的结果包装成一个Callable的API。
public static <T> Callable<T> callable(Runnable task, T result) // 假设返回对象Call
下面是Executors提供的,把一个Runnable包装成一个Callable的API。
public static Callable<Object> callable(Runnable task) // 假设返回对象Callable
下面是Executors提供的,把一个Runnable和一个待返回的结果包装成一个Callable的API。
public static <T> Callable<T> callable(Runnable task, T result) // 假设返回对象Call
示意图
子主题
线程池
线程池的好处
1. 提高性能:通过线程复用,减少线程创建/销毁 的性能消耗。
2. 提高响应速度:因为线程不需要创建就能立刻执行。
3. 提高线程可管理性:统一分配线程,方便调优,监控
2. 提高响应速度:因为线程不需要创建就能立刻执行。
3. 提高线程可管理性:统一分配线程,方便调优,监控
ThreadPoolExecutor
public ThreadPoolExecutor(int corePoolSize, //核心线程数量
int maximumPoolSize, //最大线程数
long keepAliveTime, //超时时间,超出核心线程数量以外的线程空余存活时间
TimeUnit unit, //存活时间单位
BlockingQueue<Runnable> workQueue, //保存执行任务的队列
ThreadFactory threadFactory,//创建新线程使用的工厂
RejectedExecutionHandler handler //当任务无法执行的时候的处理方式)
int maximumPoolSize, //最大线程数
long keepAliveTime, //超时时间,超出核心线程数量以外的线程空余存活时间
TimeUnit unit, //存活时间单位
BlockingQueue<Runnable> workQueue, //保存执行任务的队列
ThreadFactory threadFactory,//创建新线程使用的工厂
RejectedExecutionHandler handler //当任务无法执行的时候的处理方式)
使用
创建
参考下面的构造函数,及Executors的api
ThreadFactory
用于设置创建线程的工厂,可以通过线程工厂给每个创建出来的线程设 置更有意义的名字。
new ThreadFactoryBuilder().setNameFormat("XX-task-%d").build();
new ThreadFactoryBuilder().setNameFormat("XX-task-%d").build();
RejectedExecutionHandler(饱和策略,拒绝策略)
AbortPolicy:直接抛出异常。默认是这个策略。
CallerRunsPolicy:只用调用者所在线程来运行任务。
DiscardOldestPolicy:丢弃队列里最近的一个任务,并执行当前任务。
DiscardPolicy:不处理,丢弃掉。
CallerRunsPolicy:只用调用者所在线程来运行任务。
DiscardOldestPolicy:丢弃队列里最近的一个任务,并执行当前任务。
DiscardPolicy:不处理,丢弃掉。
可以自定义拒绝策略
提交任务
execute() 不需要返回值的任务
submit() 需要返回值的任务
关闭线程池
shutdown
shutdown只是将线程池的状态设置成SHUTDOWN状态,然后中断所有没有正在执行任务的线程
shutdownNow
shutdownNow首先将线程池的状态设置成 STOP,然后尝试停止所有的正在执行或暂停任务的线程,并返回等待执行任务的列表
原理
遍历线 程池中的工作线程,然后逐个调用线程的interrupt方法来中断线程,所以无法响应中断的任务 可能永远无法终止。
isShutdown
调用了shutdown或者shutdownNow,isShutdown方法就会立刻返回true
isTerminaed
所有的任务 都已关闭后,才表示线程池关闭成功,这时调用isTerminaed方法会返回true
合理设置线程数
CPU密集型任务: 应配置尽可能小的 线程,如配置Ncpu+1个线程的线程池
IO密集型任务: 线程并不是一直在执行任务,则应配 置尽可能多的线程,如2*Ncpu。
混合型的任务,如果可以拆分,将其拆分成一个CPU密集型任务 和一个IO密集型任务,只要这两个任务执行的时间相差不是太大,那么分解后执行的吞吐量 将高于串行执行的吞吐量。如果这两个任务执行时间相差太大,则没必要进行分解。可以通过 Runtime.getRuntime().availableProcessors()方法获得当前设备的CPU个数。
线程池中的线程初始化
默认情况下,创建线程池之后,线程池中是没有线程的,需要提交任务之后才会创建线程。
在实 际中如果需要 线程池创建之 后立即创建线 程,可以通过 以下两个方法 办到:
prestartCoreThread():初始化一个核心线程; prestartAllCoreThreads():初始化所有核心线
程
ThreadPoolExecutor tpe=(ThreadPoolExecutor)service;
tpe.prestartAllCoreThreads();
默认情况下,创建线程池之后,线程池中是没有线程的,需要提交任务之后才会创建线程。
在实 际中如果需要 线程池创建之 后立即创建线 程,可以通过 以下两个方法 办到:
prestartCoreThread():初始化一个核心线程; prestartAllCoreThreads():初始化所有核心线
程
ThreadPoolExecutor tpe=(ThreadPoolExecutor)service;
tpe.prestartAllCoreThreads();
优先级不同的任务可以使用优先级队列PriorityBlockingQueue来处理
建议使用有界队列。
监视的api
taskCount:线程池需要执行的任务数量。
completedTaskCount:线程池在运行过程中已完成的任务数量,小于或等于taskCount。
largestPoolSize:线程池里曾经创建过的最大线程数量。通过这个数据可以知道线程池是 否曾经满过。如该数值等于线程池的最大大小,则表示线程池曾经满过。
getPoolSize:线程池的线程数量。如果线程池不销毁的话,线程池里的线程不会自动销 毁,所以这个大小只增不减。
getActiveCount:获取活动的线程数。
钩子方法
beforeExecute、afterExecute和terminated方法,也可以在任务执行前、执行后和线程池关闭前执 行一些代码来进行监控。
ScheduledThreadPoolExecutor
ScheduledThreadPoolExecutor使用的队列是DelayQueue
DelayQueue封装了一个PriorityQueue,这个PriorityQueue会对队列中的Scheduled- FutureTask进行排序。排序时,time小的排在前面(时间早的任务将被先执行)。如果两个 ScheduledFutureTask的time相同,就比较sequenceNumber,sequenceNumber小的排在前面(也就 是说,如果两个任务的执行时间相同,那么先提交的任务将被先执行)。
DelayQueue封装了一个PriorityQueue,这个PriorityQueue会对队列中的Scheduled- FutureTask进行排序。排序时,time小的排在前面(时间早的任务将被先执行)。如果两个 ScheduledFutureTask的time相同,就比较sequenceNumber,sequenceNumber小的排在前面(也就 是说,如果两个任务的执行时间相同,那么先提交的任务将被先执行)。
实现原理
继承了ThreadPoolExecutor
内部类 ScheduledFutureTask 继承了FutureTask
DelayedWorkQueue 延时队列
1. 将任务封装成ScheduleFutureTask
2. 调用 scheduleAtFixedRate,schedule等等方法都是将任务丢进DelayedWorkQueue中的,不是像ThreadPoolExecutor看核心线程不足创建核心线程的。它有一个提前创建线程的动作。ensurePrestart方法。
所以它的延时实现是靠DelayedWorkQueue
3. ThreadPoolExecutor从DelayedWorkQueue之中poll/take拿到ScheduleFutureTask来执行并返回结果。
poll/take方法之中获取到任务之后,会执行ScheduleFutureTask任务的getDelay。
public long getDelay(TimeUnit unit) {
return unit.convert(time - now(), NANOSECONDS);
}
这个方法可以获取到延时的时间,然后有一个 ReentrantLock 的 condition,调用condition.await(time),阻塞一段时间。
如果获取的时间是 小于等于0的,那么就让这个任务出队,出队之后就可以线程执行了。
4. 返回结果之前会检测这个任务是否是周期性的,然后计数下次执行时间这部分依赖ScheduleFutureTask实现。
2. 调用 scheduleAtFixedRate,schedule等等方法都是将任务丢进DelayedWorkQueue中的,不是像ThreadPoolExecutor看核心线程不足创建核心线程的。它有一个提前创建线程的动作。ensurePrestart方法。
所以它的延时实现是靠DelayedWorkQueue
3. ThreadPoolExecutor从DelayedWorkQueue之中poll/take拿到ScheduleFutureTask来执行并返回结果。
poll/take方法之中获取到任务之后,会执行ScheduleFutureTask任务的getDelay。
public long getDelay(TimeUnit unit) {
return unit.convert(time - now(), NANOSECONDS);
}
这个方法可以获取到延时的时间,然后有一个 ReentrantLock 的 condition,调用condition.await(time),阻塞一段时间。
如果获取的时间是 小于等于0的,那么就让这个任务出队,出队之后就可以线程执行了。
4. 返回结果之前会检测这个任务是否是周期性的,然后计数下次执行时间这部分依赖ScheduleFutureTask实现。
Future/FutureTask
Future 表示一个任务,它暴露了一个任务相应的方法来判断是否已经完成或取消,以及获
取任务的结果和取消任务等。
取任务的结果和取消任务等。
FutureTask 是 Runnable 和 Future 的结合,如果
我们把 Runnable 比作是生产者,Future 比作是消费者,那么 FutureTask 是被这两者共享的,
生产者运行 run 方法计算结果,消费者通过 get 方法获取结果。
我们把 Runnable 比作是生产者,Future 比作是消费者,那么 FutureTask 是被这两者共享的,
生产者运行 run 方法计算结果,消费者通过 get 方法获取结果。
api
run
同步方法,调用这方法相当于自己调用call方法
现在Future一般和executor一起用,如果直接使用run和同步方法无异。
(看《并发编程艺术》中描述是使用了AQS,但是在jdk8之中发现并没有,因为jdk6和jdk8的区别,所以这个点需要注意一下。)
(看《并发编程艺术》中描述是使用了AQS,但是在jdk8之中发现并没有,因为jdk6和jdk8的区别,所以这个点需要注意一下。)
get
cancel
isDone
实现原理
jdk8-14
使用的是LockSupport的park和unpark方法来实现的
1. 执行run的时候,执行完call方法(callable方法), 然后在设置执行的结果,调用LockSupport.unpark
2.执行get方法的时候,调用LockSupport.park阻塞线程。
3. LockSupport.unpark和park要指定到同一个线程才能唤醒对应的线程。
FutureTask使用一个waitQueue来实现,在执行get的方法时候将当前thread追加到waitQueue之中。
在finishCompletion方法之中会唤醒所有在waitQueue之中的线程。
2.执行get方法的时候,调用LockSupport.park阻塞线程。
3. LockSupport.unpark和park要指定到同一个线程才能唤醒对应的线程。
FutureTask使用一个waitQueue来实现,在执行get的方法时候将当前thread追加到waitQueue之中。
在finishCompletion方法之中会唤醒所有在waitQueue之中的线程。
Hook线程以及捕获线程执行异常
Hook线程:防止线程/进程重复启动
实战
生产消费模式
并发编程的基础
Thread
线程是一个运行单位,Java允许有多线程
线程有优先级priority,子线程优先级默认和父线程一样
多线程概念
多线程生命周期
NEW
RUNNABLE
RUNNING
BLOCKED
TERMINATED
线程控制和业务分离
继承Thread重写run方法
给Thread传递一个Runnable 接口
线程分守护线程/非守护线程
守护线程,只能创建守护线程
除非所有的非守护线程都挂了或者已经返回或者抛出了异常,否则程序run方法之后还在运行。
daemon线程的finally块不一定执行
Thread的基本属性
name: 每个线程都可以有name,java允许线程拥有相同的名字,不指定thread name,就会给你generate 一个。
最好给线程起一个名称,可以在jstack分析程序或查找问题的时候方便。
最好给线程起一个名称,可以在jstack分析程序或查找问题的时候方便。
priority
public static final int MIN_PRIORITY = 1;
public static final int NORM_PRIORITY = 5;
public static final int MAX_PRIORITY = 10;
有些系统会忽略优先级,程序的正确性不能依赖线程的优先级
daemon
daemon线程的finally块不一定执行
1. yield()
2. ThreadLocal.ThreadLocalMap threadLocals = null;
3. ThreadLocal.ThreadLocalMap inheritableThreadLocals = null;
。。。
2. ThreadLocal.ThreadLocalMap threadLocals = null;
3. ThreadLocal.ThreadLocalMap inheritableThreadLocals = null;
。。。
interrupt
不要使用过期的supend,resume,stop
1. 通信使用notify,wait等
2. 停止用打断 Thread.interrupt(), Feture.cancel()
1. 通信使用notify,wait等
2. 停止用打断 Thread.interrupt(), Feture.cancel()
执行这个方法之后,Thread.sleep,LockSupport.park等抛出一个InterruptException,用户可以try catch住,然后自己决定处理还是忽略
在jdk8之中,
调用这个方法之后
第一步:jvm底层会setInterrupt为true,
第二步:然后使用线程的unpark方法,唤醒这个线程
第三步:抛出异常
第四步:用户线程根据interrupt变量判断是否打断,处理异常。
根据第一步和第四步都用到了共享变量interrupt,所以打断机制也使用了共享变量来通信
具体的代码调用栈是 : jvm.cpp interrupt -> os_linux.cpp os::interrupt
调用这个方法之后
第一步:jvm底层会setInterrupt为true,
第二步:然后使用线程的unpark方法,唤醒这个线程
第三步:抛出异常
第四步:用户线程根据interrupt变量判断是否打断,处理异常。
根据第一步和第四步都用到了共享变量interrupt,所以打断机制也使用了共享变量来通信
具体的代码调用栈是 : jvm.cpp interrupt -> os_linux.cpp os::interrupt
安全的终止线程
Future.cancel()
Thread.interrupt()
Thread.join()
线程API
线程初始化
线程命名
线程默认命名
修改线程名:setName
父子线程
当前线程: currentThread()
ThreadGroup
默认使用父线程的ThreadGroup
new ThreadGourp()
Thread.currentThread().getThreadGourp()
Thread和Runnable
守护进程
User Thread线程和Daemon Thread守护线程本质上来说去没啥区别的,唯一的区别之处就在虚拟机的离开:如果User Thread全部撤离,那么Daemon Thread也就没啥线程好服务的了,所以虚拟机也就退出了。
setDaemon()
thread.setDaemon(true)必须在thread.start()之前设置,否则会跑出一个IllegalThreadStateException异常
sleep
sleep(int millis)
sleep(int millis, int nanos)
TimeUint
yield
提醒调度器我愿意放弃当前资源,如果CPU不紧张,则会忽略这种提醒
yiled()
优先级
setPriority(int newPriority),只能是1到10
getPriority()
对于root用户,它会hint系统你想要的级别,否则系统会忽略它
如果CPU比较忙,优先级高的可能会获得更高的时间片,CPU闲暇几乎没什么作用
不要在设计时企图使用线程优先级绑定业务。
线程ID
getId()
设置线程上下文类加载器
getContextClassLoader()
setContextClassLoader(ClassLoader d)
interrupt
interrupt()
interrupted()
isInterrupted()
join()
join()
join(long millis)
join(long millis, long nanos)
当我们调用某个线程的这个方法时,这个方法会挂起调用线程,直到被调用线程结束执行,调用线程才会继续执行。
实现原理:线程执行完成的时候,会在jvm底层调用一个notifyAll,让在的join的线程唤醒。
关闭线程
正常执行结束
捕获中断信号关闭
使用volatile 字段关闭
异常退出
进程假死
复制线程
enumerate(Thread[] list)
enumerate(Thread[] list, boolean recurse)
ThreadGroup
线程组拥有的线程 Thread threads[];
子线程group ThreadGroup groups[];
父线程group ThreadGroup parent;
ThreadLocal
一种多线程环境下解决线程变量的方案,与线程同步无关。
思路是每个线程都有有一个ThreadLocalMap变量副本,和继承来的副本,不被其它线程影响。
思路是每个线程都有有一个ThreadLocalMap变量副本,和继承来的副本,不被其它线程影响。
实现原理
Thread 的get实现
Thread 有如下属性:
ThreadLocal.ThreadLocalMap threadLocals = null;
ThreadLocal.ThreadLocalMap inheritableThreadLocals = null;
ThreadLocal.ThreadLocalMap threadLocals = null;
ThreadLocal.ThreadLocalMap inheritableThreadLocals = null;
每个ThreadLocal都有一个独一无二的id,这个id是使用AtomicInteger不断自增的。
ThreadLocal的key是WeakRefence,当WeakRefence的key为null的时候,没有强引用会立即被回收
上下文切换
vmstat测量上下文切换次数
如何减少上下文切换
无锁开发
CAS算法
Java的atomic包
使用最少的线程
Linux内核提供了perf命令来监视Java程序运行过程之中的上下文切换的次数和频率
线程数的设置
如果是io操作密集类型,设置的线程数为 2*CPU核数
如果是计算密集类型任务,设置为CPU核心数即可
死锁
死锁演示的代码
避免死锁的方法
避免同个线程获得多个锁
避免同个线程在锁内占用太多的资源,尽量保证每个锁都只占有一个资源
使用定时锁
使用lock.tryLock(timeout) 来替换使用内部锁机制
对于数据库锁,加锁和解锁必须在同一个数据库连接中,否则会出现解锁失败的情况。
死锁
交叉锁导致死锁
内存死锁
一问一问答式的数据交换
数据库锁
文件锁
死循环
底层原理
Java原子操作的实现原理
锁
使用了锁(偏向锁,轻量级锁和互斥锁)也可以实现原子性。
JVM获取锁的方式都是使用了自旋CAS,释放锁也是使用CAS释放锁
CAS
Java的CAS是利用了CPU提供的CMPXCHG指令实现的
Compare And Swap ,整个JUC最核心、最基础的理论
内存值V,旧的值预期值A,要更新为新的值B,当且仅当 内存值V
等于旧的A时 会将内存值V修改为B,否则什么也不干。
等于旧的A时 会将内存值V修改为B,否则什么也不干。
native中存在四个参数
compareAndSetReference(
Object o, // which
long offset, //Unsafe.getUnsafe().objectFieldOffset(AtomicInteger.class, "value");
Object expected, // the expected value
Object x // the new value);
compareAndSetReference(
Object o, // which
long offset, //Unsafe.getUnsafe().objectFieldOffset(AtomicInteger.class, "value");
Object expected, // the expected value
Object x // the new value);
CAS三大问题
ABA问题
解决方案
版本号/时间戳
AtomicStampedReference
循环时间长开销大
只能保证一个共享变量的原子操作
AQS(AbstractQueueSynchronizer)
参考锁里面的AQS章节,不在这里重复。
具体实现
Unsafe
LockSupport
在AQS,还是其它同步工具,在jdk之中都是使用LockSupport实现阻塞,和唤醒。
VarHandle
OrderAccess::fence() 内存屏障,volatile也是靠这个实现的
http://ifeve.com/juc-atomic-class-lazyset-que/
http://ifeve.com/juc-atomic-class-lazyset-que/
JMM模型
JMM,java内存模型,是一个抽象概念,描述本地线程和主线程的关系
每个本地线程都有一个自己的一个共享内存的变量副本,如何保证副本和主内存还有其它本地线程的副本的一致性,
是JMM抽象之后要描述的如何解决这个问题
每个本地线程都有一个自己的一个共享内存的变量副本,如何保证副本和主内存还有其它本地线程的副本的一致性,
是JMM抽象之后要描述的如何解决这个问题
线程通信机制
要解决多个本地内存共享变量的副本和主内存的共享变量副本一致性问题,就要考虑线程的通讯机制
在操作系统中线程的通讯机制主要由两种:内存共享和消息传递。Java采用的是内存共享。
在操作系统中线程的通讯机制主要由两种:内存共享和消息传递。Java采用的是内存共享。
内存共享/共享副本遇到的问题及解决方案
问题:重排序
为了程序的性能,编译器,处理器会进行程序重排序执行
条件
数据存在依赖性不允许重排序
在单线程下,对存在控制依赖的操作重排序,不会改变执行的结果 即 as-if-searial
as-if-serial语义
所有的操作均可以为了优化而被重排序,但是必须保证重排序之后的执行结果不能被改变
解决的概念模型:顺序一致性内存模型
顺序一致性是Java内存模型的一个参考模型。
多线程程序的同步执行结果和该程序顺序执行一致性内存模型中的执行结果相同。
这里的同步包括常用的同步原语(synchronized、volatile 和final)的正确使用
这里的同步包括常用的同步原语(synchronized、volatile 和final)的正确使用
两大特性
1. 一个线程所有操作都是都必须按照程序的顺序执行 —— as-if-serial
2. 所有线程都只能看到一个单一的操作执行顺序。在顺序一致性内存模
型中,每个操作都必须原子执行且立刻对所有线程可见。
型中,每个操作都必须原子执行且立刻对所有线程可见。
解决的抽象模型:happens-before
JMM最核心的理论,保证内存的可见性
如果A线程的写操作a与B线程的读操作b之间存在happens-before关系,a操作将对b可见
如果A线程的写操作a与B线程的读操作b之间存在happens-before关系,a操作将对b可见
定义
1)如果一个操作happens-before另外一个操作,那么前者对后者是可见的,并且前者执行顺序在后在之前
2)两个操作之间存在happens-before关系,但是这只是个建议,如果重排序之后的结果是一致的,那么运行编译器处理器重排序
1)如果一个操作happens-before另外一个操作,那么前者对后者是可见的,并且前者执行顺序在后在之前
2)两个操作之间存在happens-before关系,但是这只是个建议,如果重排序之后的结果是一致的,那么运行编译器处理器重排序
如果JMM中一个操作对另外一个操作是可见的,那么一定存在前者happens-before后者
Java之中解决的方案:三个同步原语 及 锁
volatile
可以这样理解:一个volatile变量使用了同一个锁对这些读/写操作做了同步。
两大特性
可见性:对一个volatile变量的读,总是能看见任意线程对这个volatile变量最后的操作
原子性:对于单个变量的读写具有原子性。但是volatile对类似volatile++这种复合操作不具备原子性
内存语义
写内存语义:当写一个volatile变量的时候,JMM会把该线程对应的本地内存中的共享变量值刷新到主内存。
读内存语义:当读一个volatile变量的时候,JMM会把本地内存置为无效。接下来从主内存之中读取共享变量。
内存语义实现机制:内存屏障
LoadLoad
StoreStore
LoadStore
StoreLoad
屏障之前不能溢出,屏障之后不能溢出
操作系统语义(CPU层次变量可见性,原子性)
主内存,高速缓存之间是否一致,多个CPU之间是否一致。
解决方案
主内存和高速缓存之间 总线锁 LOCK# 方式
多处理器之间,高速缓存之间一致——MEIS协议
重排序
编译器不会对volatile读和读后面的任意内存操作重排序
编译器不会对volatile写和写之前任意内存排序
synchronized 也是锁,参考下面即可
锁
内存语义
释放锁的内存语义:当线程释放锁的时候,JMM会把该线程对应的本地内存中的共享变量刷新到内存中。
获取锁的内存语义:当线程获取锁的时候,JMM会把线程对于的本地内存置为无效,
从而使得被监视器保护的临界区代码必须从主内存读取共享变量。
从而使得被监视器保护的临界区代码必须从主内存读取共享变量。
两种实现方式
CAS所带的内存语义
编译器不能对CAS的前面后面任意内存操作重排序(同时volatile读/写的重排序规则)
同时具备volatile 读和写的内存语义
比如:Atomic::cmpchg
利用volatile所附带的volatile读和volatile写的内存语义。
final
两个规则
构造函数对一个final域的写入,与随后把这个被构造对象的引用给引用变量,这两个操作之间不能重排序
初次读一个包含final域的对象的引用,与随后初次读这个final域,这两个操作之间不能重排序
写重排序规则
JMM禁止编译器把final的写重排序到构造函数之外
实现原理:编译器会在final的写之后,构造函数return之前,
插入一个StoreStore屏障这个屏障禁止处理器把final域的写重排序到构造函数之外。
插入一个StoreStore屏障这个屏障禁止处理器把final域的写重排序到构造函数之外。
使用它的一个好处:确保对象引用为任意线程可见。普通域可能构造函数返回之后,因为重排序还没初始化完成。
读重排序规则
编译器会在读final域操作的前面加一个LoadLoad屏障
JMM禁止重排序 初次读对象的引用与初次该对象包含的final域
DCL (double check lock)
单例模式。
DCL
问题:对象初始化的时候因为重排序而引起的问题
解决方案
volatile方案:禁止重排序
基于类初始化的解决方案:利用classLoader的机制来保证
instance时只有一个线程。JVM在类初始化阶段会获取一个锁,这个锁可以同步多个线程对同一个类的初始化
instance时只有一个线程。JVM在类初始化阶段会获取一个锁,这个锁可以同步多个线程对同一个类的初始化
线程通信
线程通信机制
要解决多个本地内存共享变量的副本和主内存的共享变量副本一致性问题,就要考虑线程的通讯机制
在操作系统中线程的通讯机制主要由两种:内存共享和消息传递。Java采用的是内存共享。
在操作系统中线程的通讯机制主要由两种:内存共享和消息传递。Java采用的是内存共享。
volatile和synchronized 关键字
volatile 在x86架构之中写的效率比较慢,不适合频繁写的变量。
等待/通知机制
synchronized配套的5个方法
notify
notifyAll
wait
wait(long)
wait(long, int)
Reenterlock配套的方法
Condition
等待队列WaitQueue和同步队列SynchronizedQueue之间的关系
比如synchronized调用wait就会进入WaitQueue,收到notify之后就会进入SynchronizedQueue,然后才回去争抢锁
比如synchronized调用wait就会进入WaitQueue,收到notify之后就会进入SynchronizedQueue,然后才回去争抢锁
管道输入/输出流
PipedOutputStream、PipedInputStream、PipedReader、PipedWriter
Thread.join
底层也是使用wait()实现的。
ThreadLocal
一种多线程环境下解决线程变量的方案,与线程同步无关。
思路是每个线程都有有一个ThreadLocalMap变量副本,和继承来的副本,不被其它线程影响。
思路是每个线程都有有一个ThreadLocalMap变量副本,和继承来的副本,不被其它线程影响。
实现原理
Thread 的get实现
Thread 有如下属性:
ThreadLocal.ThreadLocalMap threadLocals = null;
ThreadLocal.ThreadLocalMap inheritableThreadLocals = null;
ThreadLocal.ThreadLocalMap threadLocals = null;
ThreadLocal.ThreadLocalMap inheritableThreadLocals = null;
key使用的是WeakRefence
ThreadLocal不是解决多线程的共享变量的方案,而只是协调,给每个线程独自一个存储空间。
四大方法
get 返回这个线程这种副本的值
initialValue
remove
set
ThreadMap
实现线程隔离机制的关键点
Thread 有如下属性:
ThreadLocal.ThreadLocalMap threadLocals = null;
ThreadLocal.ThreadLocalMap inheritableThreadLocals = null;
ThreadLocal.ThreadLocalMap threadLocals = null;
ThreadLocal.ThreadLocalMap inheritableThreadLocals = null;
提供了一种key-value 方式存储 线程这种类型的ThreadLocal方法,key为ThradLocal对象,value为存储的对象。
缺陷
需要显式调用remove,否则会内存泄露
锁
synchronized
三种使用方式
锁 普通方法
锁 静态方法
锁 代码块
保证只有一个线程能进入临界区(代码块或者方法),同时还保证共享变量的可见性。
实现原理
都是锁对象头,即对象头Mark Word的状态变化记录锁状态
对象头记录着这个class在内存中的布局
对象头分两部分数据
Mark Word(标记字段)
对象头是设计成动态变化来记录更多的信息
记录的信息包括,哈希码(HashCode),GC分代年龄,锁状态标记,线程持有的锁,偏向线程id,偏向时间戳,monitor
monitor
Owner
初始时为Null表示当前没有任何线程拥有该monitor record,当线程成功拥有该所之后保留线程唯一表示,当线程释放时又设置为null
Klass Pointer(类型指针)
类型指针指向方法区,指向描述class的元数据的内存
jvm源码之中
instanceOop.hpp 表示实例
oop.hpp 表示一个class
markOop.hpp 描述一个对象头
instanceOop.hpp 表示实例
oop.hpp 表示一个class
markOop.hpp 描述一个对象头
偏向锁
偏向锁加锁
偏向锁撤销
关闭偏向锁 -XX:UseBiasedLocking=false,默认是关闭偏向锁的
-XX:UseBiasedLocking 开启偏向锁
-XX:BiasedLockingStartupDelay=0 设置偏向锁的启动延迟为0(这个值默认为5秒)
-XX:BiasedLockingStartupDelay=0 设置偏向锁的启动延迟为0(这个值默认为5秒)
轻量级锁
该线程等待一段时间,不会立刻被挂起,看持有锁的线程是否会很快释放锁。
加锁:CAS 竞争锁,竞争锁失败会自旋
解锁:CAS释放锁
性能依据:在大部分锁之中,整个生命周期内都是不会存在竞争的
缺点:在多线程环境下,运行效率可能比重量锁还慢
缺点:自旋次数较难控制(-XX:preBlockSpin)
原因:线程频繁挂起、唤醒负担较重,可以认为每个线程占有锁定的时间很短,线程挂起再唤醒得不偿失
轻量级锁在加锁过程中,用到了自旋锁,所谓自旋:当有另外一个线程来竞争锁时,争抢线程会在原地循环等待,而不是把该线程给阻塞,直到那个获得锁的线程释放锁之后,争抢线程就可以马上获得锁的。
注意👮:,锁在原地循环的时候,是会消耗 cpu 的。 所以,轻量级锁适用于那些同步代码块执行的很快的场景。 自旋锁的使用,其实也是有一定的概率背景,在大部分同 步代码块执行的时间都是很短的。所以通过看似无异议的循环反而能提升锁的性能。 但是自旋必须要有一定的条件控制,jdk1.6,默认启用,默认情况下自旋的次数是 10 次, 可以通过 -XX:PreBlockSpin=10来修改。
作者:biudefu
链接:https://www.jianshu.com/p/eb7dd1b05a1c
来源:简书
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。
注意👮:,锁在原地循环的时候,是会消耗 cpu 的。 所以,轻量级锁适用于那些同步代码块执行的很快的场景。 自旋锁的使用,其实也是有一定的概率背景,在大部分同 步代码块执行的时间都是很短的。所以通过看似无异议的循环反而能提升锁的性能。 但是自旋必须要有一定的条件控制,jdk1.6,默认启用,默认情况下自旋的次数是 10 次, 可以通过 -XX:PreBlockSpin=10来修改。
作者:biudefu
链接:https://www.jianshu.com/p/eb7dd1b05a1c
来源:简书
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。
自适应锁
在 JDK1.6 之后,引入了自适应自旋锁,自适应意味着自旋的次数不是固定不变的
自旋的次数不是固定的,它是由上一次再同一个锁上的自旋时间即锁的拥有者的状态来决定的
自旋成功,则可以增加自旋的次数,如果获取锁经常失败,那么自旋次数就会减少
重量级锁
使用monitor来实现,会调用操作系统的互斥锁MutexLock。
线程被阻塞后便进入内核(Linux)调度状态,这个会导致系统在用户态与内核态之间来回切换,严重影响锁的性能。
线程被阻塞后便进入内核(Linux)调度状态,这个会导致系统在用户态与内核态之间来回切换,严重影响锁的性能。
实现原理:
1. 锁方法/代码块 : 使用的原理是获得monitor(监视器)来进入临界区,monitorenter和monitorexit
2. 锁方法:方法上修饰符上加上 ACC_SYNCHRONIZED,也是通过获得monitor(监视器)来进入临界区完成
1. 锁方法/代码块 : 使用的原理是获得monitor(监视器)来进入临界区,monitorenter和monitorexit
2. 锁方法:方法上修饰符上加上 ACC_SYNCHRONIZED,也是通过获得monitor(监视器)来进入临界区完成
锁升级过程
偏向锁 -> 轻量级锁 -> 重量级锁
优化
锁粒度缩小
锁消除
若不存在数据竞争,jvm会消除锁机制
判断依据
变量逃逸
锁粗化
将多个连续的加锁,解锁操作合并到一起,扩展为一个范围较大的锁,比如for循环内部获取锁
可以使用工具jol打印对象内存布局信息。
jol打印信息
Type DESCRIPTION
类型描述,
object header表示对象头的意思
object header表示对象头的意思
value
最后边的括号里面表示 内存的位置,0表示开始的为止,1表示 0之后的位置
因为大小端问题,所以不同机器打印的顺序会不一样。
因为大小端问题,所以不同机器打印的顺序会不一样。
valotile
多cpu 之间有缓存失效协议 ——MEIS协议
lock#前缀指令 为总线锁 会引起处理器缓存会写到内存之中
底层对应的代码是OrderAccess::fence()
valotile使用优化
LinkedTransferQueue
Lock
ReentrantLock
显式加锁的时候,释放锁尽量在finally之中释放。
加锁的时候不要放在try之中,因为如果发生了异常,异常的抛出也会导致锁被无故释放。
加锁的时候不要放在try之中,因为如果发生了异常,异常的抛出也会导致锁被无故释放。
获取锁可以被打断
aqs (AbstractQueuedSynchronizer)
定义:抽象队列同步器
算是各种锁及想有一个java层面的同步队列的需求。
实现JUC的核心基础组件,另外就是volatile
算是各种锁及想有一个java层面的同步队列的需求。
实现JUC的核心基础组件,另外就是volatile
采用了CLH锁的队列一种变种
CLH在没划分内存领域(没有远程内存)的CPU模型之中非常高效
但是每个节点都需要自旋,所以这里需要优化一下
CLH同步队列
FIFO双向队列:AQS使用它来管理没抢到锁的线程,让它们在这里等待
当锁释放的时候,会唤醒首节点,所以是FIFO获取资源。但是因为上下文切换的关系,所以非公平模式下很有可能抢不到锁。
新加入的抢不到锁的线程 和 waiting queue 的线程 入队的时候,插入队尾。
同步状态的获取和释放
独占模式
获取锁
获取同步状态 acquire
响应中断的 acquireInterruptibly
获取超时的 tryAcquireNanos(int arg, long nanosTimeout)
释放锁
release(int arg)
共享模式
获取锁
acquireShared(int arg)
acquireSharedInterruptibly(int arg)
tryAcquireSharedNanos(int arg, long nanosTimeout)
共享锁
releaseShared(int arg)
子类需要注意的5个方法
独占模式
tryAcquire(int arg)
tryRelease(int arg)
共享模式
tryAcquireShared(int arg)
tryReleaseShared(int arg)
isHeldExclusively()
一般共享模式下尝试获取锁需要自旋,注意有些地方需要哪些地方使用cas。
独占模式不用,因为acquire里面有自旋入队列。
独占模式不用,因为acquire里面有自旋入队列。
线程的阻塞和唤醒
阻塞: 当获取锁失败的时候,再次尝试获取还是失败的时候,就会进入阻塞队列。
释放:当线程释放锁的时候,会修改state,然后去唤醒head节点。
释放:当线程释放锁的时候,会修改state,然后去唤醒head节点。
LockSupport
阻塞和唤醒的实现原理都是使用LockSupport,而LockSupport使用的是Unsafe.park,unpark
LockSupport.park提供了一个Java层面的阻塞原语
最重要的两类方法:park、unpark
park(blocker) 中的blocker可以在问题排查和监控的时候,可以根据blocker得知当前线程是因blocker而挂起的。
park/unpark的设计原理核心是“许可”。park是等待一个许可。unpark是为某线程提供一个许可。如果某线程A调用park,那么除非另外一个线程调用unpark(A)给A一个许可,否则线程A将阻塞在park操作上。
有一点比较难理解的,是unpark操作可以再park操作之前。也就是说,先提供许可。当某线程调用park时,已经有许可了,它就消费这个许可,然后可以继续运行。这其实是必须的。考虑最简单的生产者(Producer)消费者(Consumer)模型:Consumer需要消费一个资源,于是调用park操作等待;Producer则生产资源,然后调用unpark给予Consumer使用的许可。非常有可能的一种情况是,Producer先生产,这时候Consumer可能还没有构造好(比如线程还没启动,或者还没切换到该线程)。那么等Consumer准备好要消费时,显然这时候资源已经生产好了,可以直接用,那么park操作当然可以直接运行下去。如果没有这个语义,那将非常难以操作。
有一点比较难理解的,是unpark操作可以再park操作之前。也就是说,先提供许可。当某线程调用park时,已经有许可了,它就消费这个许可,然后可以继续运行。这其实是必须的。考虑最简单的生产者(Producer)消费者(Consumer)模型:Consumer需要消费一个资源,于是调用park操作等待;Producer则生产资源,然后调用unpark给予Consumer使用的许可。非常有可能的一种情况是,Producer先生产,这时候Consumer可能还没有构造好(比如线程还没启动,或者还没切换到该线程)。那么等Consumer准备好要消费时,显然这时候资源已经生产好了,可以直接用,那么park操作当然可以直接运行下去。如果没有这个语义,那将非常难以操作。
condition
Condition在AQS之中有个默认实现ConditionObject,这个也类比synchronized中的wait/notify的通信机制。
Condtion提供了比Object Monitor (就是wait/notify那些) 更多的更加灵活的功能:
1. 它的waiting queue是多个,一个condtionObject 一个,而synchronized只有一个。
2. condtion支持忽略打断,synchronized忽略不了打断
3. 支持让线程到指定的时间还没收到信号就中断,synchronized不支持
1. 它的waiting queue是多个,一个condtionObject 一个,而synchronized只有一个。
2. condtion支持忽略打断,synchronized忽略不了打断
3. 支持让线程到指定的时间还没收到信号就中断,synchronized不支持
主要方法:await, awaitUninterruptibly,signal等
底层原理实现还是维护一个FIFO双向队列来管理等待状态的线程和使用了LockSupport的park, unpark来挂起或者唤醒线程。
当一个线程进入wait队列的时候,创建一个node记录这个线程并且进入队尾。
当收到signal的时候,唤醒头部node的线程,并且尝试抢锁(非公平)或进入阻塞队列的队尾。
当一个线程进入wait队列的时候,创建一个node记录这个线程并且进入队尾。
当收到signal的时候,唤醒头部node的线程,并且尝试抢锁(非公平)或进入阻塞队列的队尾。
阻塞在Jdk14交给ForkJoinThread管理了,在Jdk8之中使用的还是LockSupport.park
实现原理
基于CLH锁算法维护一个FIFO队列。
ReentrantLock
可重入锁
比synchronized强大,灵活的锁机制,可以减少死锁发生的概率
分为公平锁、非公平锁
默认使用的策略是非公平锁,非公平锁在上下文切换上比公平锁少了很多,所以表现的性能更加优越。
都是有可能出现线程饥饿的现象。
都是有可能出现线程饥饿的现象。
实现原理
使用AQS实现,内部有一个Sync继承AQS
记录了ThreadId,判断是同个ThreadId就进入。会记录重入次数
ReentrantReadWriteLock
一般情况下,读写锁的性能比排他锁更好。特别是读比写多的场景(大多数也是这种场景)。
读写锁提供更好的并发和吞吐量。
读写锁提供更好的并发和吞吐量。
三大特性
公平性
重入性
可降级
实现上:有两把锁——读锁和写锁。读锁为共享锁,写锁为排他锁
对state按位切割来计量读写锁。高16位记录读锁,低16位记录写锁。
对state按位切割来计量读写锁。高16位记录读锁,低16位记录写锁。
锁降级
锁降级的意义:为了保证数据的可见性,如果当前有写锁的线程不获取读锁降级而是直接释放写锁,
假设另外一个线程获取了写锁并且修改了数据,那么当前线程就无法感知那个线程修改的数据。
如果获取了读锁进行了锁降级别的线程就不能获取这个写锁,就没有这个问题。
也因为这个原因,ReentrantReadWriteLock不支持锁升级。即数据的可见性:其中一个线程锁升级修改了数据,对其它读线程是无感知的。
假设另外一个线程获取了写锁并且修改了数据,那么当前线程就无法感知那个线程修改的数据。
如果获取了读锁进行了锁降级别的线程就不能获取这个写锁,就没有这个问题。
也因为这个原因,ReentrantReadWriteLock不支持锁升级。即数据的可见性:其中一个线程锁升级修改了数据,对其它读线程是无感知的。
0 条评论
下一页
为你推荐
查看更多