Java并发编程
2020-09-08 17:47:20 1 举报
AI智能生成
Java 并发编程实战
作者其他创作
大纲/内容
多线程基础
现代计算机的硬件结构
CPU指令结构
控制单元
控制单元是整个CPU的指挥控制中心,由指令寄存器IR(Instruction Register)、指
令译码器ID(Instruction Decoder)和 操作控制器OC(Operation Controller) 等组
成,对协调整个电脑有序工作极为重要。它根据用户预先编好的程序,依次从存储器中取出
各条指令,放在指令寄存器IR中,通过指令译码(分析)确定应该进行什么操作,然后通过
操作控制器OC,按确定的时序,向相应的部件发出微操作控制信号。操作控制器OC中主要
包括:节拍脉冲发生器、控制矩阵、时钟脉冲发生器、复位电路和启停电路等控制逻辑。
令译码器ID(Instruction Decoder)和 操作控制器OC(Operation Controller) 等组
成,对协调整个电脑有序工作极为重要。它根据用户预先编好的程序,依次从存储器中取出
各条指令,放在指令寄存器IR中,通过指令译码(分析)确定应该进行什么操作,然后通过
操作控制器OC,按确定的时序,向相应的部件发出微操作控制信号。操作控制器OC中主要
包括:节拍脉冲发生器、控制矩阵、时钟脉冲发生器、复位电路和启停电路等控制逻辑。
运算单元
运算单元是运算器的核心。可以执行算术运算(包括加减乘数等基本运算及其附加运
算)和逻辑运算(包括移位、逻辑测试或两个值比较)。相对控制单元而言,运算器接受控
制单元的命令而进行动作,即运算单元所进行的全部操作都是由控制单元发出的控制信号来
指挥的,所以它是执行部件。
算)和逻辑运算(包括移位、逻辑测试或两个值比较)。相对控制单元而言,运算器接受控
制单元的命令而进行动作,即运算单元所进行的全部操作都是由控制单元发出的控制信号来
指挥的,所以它是执行部件。
存储单元
存储单元包括 CPU 片内缓存Cache和寄存器组,是 CPU 中暂时存放数据的地方,里
面保存着那些等待处理的数据,或已经处理过的数据,CPU 访问寄存器所用的时间要比访
问内存的时间短。 寄存器是CPU内部的元件,寄存器拥有非常高的读写速度,所以在寄存
器之间的数据传送非常快。采用寄存器,可以减少 CPU 访问内存的次数,从而提高了 CPU
的工作速度。寄存器组可分为专用寄存器和通用寄存器。专用寄存器的作用是固定的,分别
寄存相应的数据;而通用寄存器用途广泛并可由程序员规定其用途。
面保存着那些等待处理的数据,或已经处理过的数据,CPU 访问寄存器所用的时间要比访
问内存的时间短。 寄存器是CPU内部的元件,寄存器拥有非常高的读写速度,所以在寄存
器之间的数据传送非常快。采用寄存器,可以减少 CPU 访问内存的次数,从而提高了 CPU
的工作速度。寄存器组可分为专用寄存器和通用寄存器。专用寄存器的作用是固定的,分别
寄存相应的数据;而通用寄存器用途广泛并可由程序员规定其用途。
CPU缓存结构
现代CPU为了提升执行效率,减少CPU与内存的交互(交互影响CPU效率),一般在CPU上集
成了多级缓存架构,常见的为三级缓存结构
成了多级缓存架构,常见的为三级缓存结构
L1 Cache,分为数据缓存和指令缓存,逻辑核独占
L2 Cache,物理核独占,逻辑核共享
L3 Cache,所有物理核共享
存储器存储空间大小:内存>L3>L2>L1>寄存器;
存储器速度快慢排序:寄存器>L1>L2>L3>内存;
缓存行
缓存是由最小的存储区块-缓存行(cacheline)组成,缓存行大小通
常为64byte。
常为64byte。
缓存行是什么意思呢?
比如你的L1缓存大小是512kb,而cacheline = 64byte,那么就是L1里有512 * 1024/64个
cacheline
cacheline
进程与线程
什么是进程
现代操作系统在运行一个程序时,会为其创建一个进程;例如,启动一个Java程序,操作系
统就会创建一个Java进程。进程是OS(操作系统)资源分配的最小单位。
统就会创建一个Java进程。进程是OS(操作系统)资源分配的最小单位。
什么是线程
线程是OS(操作系统)调度CPU的最小单元,也叫轻量级进程(Light Weight Process),
在一个进程里可以创建多个线程,这些线程都拥有各自的计数器、堆栈和局部变量等属性,
并且能够访问共享的内存变量。CPU在这些线程上高速切换,让使用者感觉到这些线程在同
时执行,即并发的概念,相似的概念还有并行!
在一个进程里可以创建多个线程,这些线程都拥有各自的计数器、堆栈和局部变量等属性,
并且能够访问共享的内存变量。CPU在这些线程上高速切换,让使用者感觉到这些线程在同
时执行,即并发的概念,相似的概念还有并行!
线程上下文切换过程
java线程与系统内核线程关系
线程的实现可以分为两类
1、用户级线程(User-Level Thread)
指不需要内核支持而在用户程序中实现的线程,其不依赖于操作系统核心,应用进程利用线程库提供创建、同步、调度和管理线程的函数来控制用户线程。另外,用户线程是由应用进程利用线程库创建和管理,不依赖于操作系统核心。不需要用户态/核心态切换,速度快。操作系统内核不知道多线程的存在,因此一个线程阻塞将使得整个进程(包括它的所有线程)阻塞。由于这里的处理器时间片分配是以进程为基本单位,所以每个线程执行的时间相对减少。
2、内核线线程(Kernel-Level Thread)
线程的所有管理操作都是由操作系统内核完成的。内核保存线程的状态和上下文信息,当一个线程执行了引起阻塞的系统调用时,内核可以调度该进程的其他线程执行。在多处理器系统上,内核可以分派属于同一进程的多个线程在多个处理器上运行,提高进程执行的并行度。由于需要内核完成线程的创建、调度和管理,所以和用户级线程相比这些操作要慢得多,但是仍然比进程的创建和管理操作要快。大多数市场上的操作系统,如Windows,Linux等都支持内核级线程。
原理 图
关系
线程的应用
如何实现多线程
继承Thread
实现Runnable接口
使用ExecutorService
Callable
Future实现带返回结果的多线程
线程的生命周期
一共有 6 种状态
NEW
初始状态,线程被构建,但是还没有调用 start 方法
RUNNABLED
运行状态,JAVA 线程把操作系统中的就绪
和运行两种状态统一称为“运行中”
和运行两种状态统一称为“运行中”
BLOCKED
阻塞状态,表示线程进入等待状态,也就是线程
因为某种原因放弃了 CPU 使用权,阻塞也分为几种情况
因为某种原因放弃了 CPU 使用权,阻塞也分为几种情况
等待阻塞
运行的线程执行 wait 方法,jvm 会把当前
线程放入到等待队列
线程放入到等待队列
同步阻塞
运行的线程在获取对象的同步锁时,若该同
步锁被其他线程锁占用了,那么 jvm 会把当前的线程
放入到锁池中
步锁被其他线程锁占用了,那么 jvm 会把当前的线程
放入到锁池中
其他阻塞
运行的线程执行 Thread.sleep 或者 t.join 方
法,或者发出了 I/O 请求时,JVM 会把当前线程设置
为阻塞状态,当 sleep 结束、join 线程终止、io 处理完
毕则线程恢复
法,或者发出了 I/O 请求时,JVM 会把当前线程设置
为阻塞状态,当 sleep 结束、join 线程终止、io 处理完
毕则线程恢复
TIME_WAITING
超时等待状态,超时以后自动返回
TERMINATED
终止状态,表示当前线程执行完毕
线程的终止
interupt(线程终止)
Thread.interrupt(中断线程)
Thread.isInterrupted() 判断是否被中断
Thread.interrupted() 判断是否被中断,并清除当前中断状态
通过指令的方式
volatile boolean isStop = false
线程间的协作
线程之间相互配合,完成某项工作,比如:一个线程修改了一个对象的值,而另一个线程感知到了变化,然后进行相应的操作,整个过程开始于一个线程,而最终执行又是另一个线程。前者是生产者,后者就是消费者,这种模式隔离了“做什么”(what)和“怎么做”(How),简单的办法是让消费者线程不断地循环检查变量是否符合预期在while循环中设置不满足的条件,如果条件满足则退出while循环,从而完成消费者的工作。却存在如下问题:
1) 难以确保及时性。
2)难以降低开销。如果降低睡眠的时间,比如休眠1毫秒,这样消费者能更加迅速地发现条件变化,但是却可能消耗更多的处理器资源,造成了无端的浪费。
1) 难以确保及时性。
2)难以降低开销。如果降低睡眠的时间,比如休眠1毫秒,这样消费者能更加迅速地发现条件变化,但是却可能消耗更多的处理器资源,造成了无端的浪费。
等待/通知机制
等待和通知的标准范式
等待方遵循如下原则
获取对象的锁
如果条件不满足,那么调用对象的wait方法,被 通知后让仍要检查条件。
条件满足,则执行对应的逻辑
通知方遵循如下原则
获取对象的锁
改变条件
通知所有等待在对象上的线程
notify和notifyAll应该用谁
尽可能用notifyall(),谨慎使用notify(),因为notify()只会唤醒一个线程,我们无法确保被唤醒的这个线程一定就是我们需要唤醒的线程,具体表现参见代码。
等待超时模式实现一个连接池
守护线程和非守护线程
实现方式
Thread t = new Thread();
t.setDaemon();
t.start
t.setDaemon();
t.start
区别
用户线程会阻止java虚拟机的正常停止,即一个java虚拟机只有在其所有用户线程都运行结束的情况下才能正常停止
守护线程不会影响java虚拟机的 正常停止,即应用程序中有守护线程在运行也不影响 java虚拟机的正常停止。
守护线程通常用于执行一些重要性不很高的任务,例如用户监视其他线程的运行情况
如果java虚拟机是被强制停止的,比如在linux系统下使用kill命令强制终止一个java虚拟机线程,那么即使是用户线程也无法阻止java虚拟机停止
总结
如果你希望在主线程结束后JVM进程马上结束,那么在创建线程时可以将其设置为守护线程
如果你希望在主线程结束后子线程继续工作,等子线程结束后再让JVM进程结束,那么就将子线程设置为用户线程。
上下文切换
导致原因
自发性切换
Thread.sleep
wait
Thread,yield
Thread,.join
LockSupport.park
发起了IO操作如读文件
等待其他线程持有的锁
非自发性切换
线程由于调度器的原因被迫切出
被切出的时间片用完
比被切出线程的优先级更高的线程需要被运行
开销
直接开销
操作系统保存和恢复上下文所需的开销
线程调度器 进行线程调度的开销
间接开销
处理器高速缓存重新加载的开销
上下文切换也可能导致一级高速缓存中的内容被冲刷
如何减少上下文 切换
无锁并发编程
CAS
使用最少线程
协程
通过相应命令显示 线程状态
1.打开终端或者命令提示符,键入“jps”,(JDK1.5 提供的一个显示当前所有 java进程 pid 的命令),可以获得相应进程的 pid
2.根据上一步骤获得的 pid,继续输入 jstack pid(jstack 是 java 虚拟机自带的一种堆栈跟踪工具。jstack 用于打印出给定的 java 进程 ID 或 core file 或远程调试服务的 Java 堆栈信息)
线程通信
共享内存
线程之间共享程序的 公共状态,通过写-读内存中的公共状态进行隐式通信
消息传递
线程之间通过发送消息 来显示进行通信
死锁
并发基础
CAS
Compare And Swap
CAS 的全称是 Compare And Swap 即比较交换,其算法核心思想如下
函数:CAS(V,E,N) 参数:V 表示要更新的变量 E 预期值 N 新值
如果 V 值等于 E 值,则将 V 的值设为 N。若 V 值和 E 值不同,则说明已经有其他线程做了
更新,则当前线程什么都不做。通俗的理解就是 CAS 操作需要我们提供一个期望值,当期
望值与当前线程的变量值相同时,说明还没线程修改该值,当前线程可以进行修改,也就是
执行 CAS 操作,但如果期望值与当前线程不符,则说明该值已被其他线程修改,此时不执
行更新操作,但可以选择重新读取该变量再尝试再次修改该变量,也可以放弃操作
函数:CAS(V,E,N) 参数:V 表示要更新的变量 E 预期值 N 新值
如果 V 值等于 E 值,则将 V 的值设为 N。若 V 值和 E 值不同,则说明已经有其他线程做了
更新,则当前线程什么都不做。通俗的理解就是 CAS 操作需要我们提供一个期望值,当期
望值与当前线程的变量值相同时,说明还没线程修改该值,当前线程可以进行修改,也就是
执行 CAS 操作,但如果期望值与当前线程不符,则说明该值已被其他线程修改,此时不执
行更新操作,但可以选择重新读取该变量再尝试再次修改该变量,也可以放弃操作
什么是CAS
CAS进阶
缺陷
ABA
ABA解决方案
循环时间长 开销大
只能保证一个共享变量的原子操作
AQS
AQS具备的特性
阻塞等待队列
共享/独占
公平/非公平
可重入
允许中断
AbstractqueuedSynchronizer同步器
除了Lock外,Java.util.concurrent当中同步器的实现如Latch,Barrier,BlockingQueue等,都是基于AQS框架实现
一般通过定义内部类Sync继承AQS
将同步器所有调用都映射到Sync对应的方法
AQS内部维护属性volatile int state (32位)
state表示资源的可用状态
State三种访问方式
getState():获取当前同步状态
setState(int newState):设置当前同步状态
compareAndSetState(int expext,int update):使用CAS设置当前状态,该方法能够保证状态设置的原子性
对于ReentrantLock的实现来说,state可以用来表示当前线程获取锁的可重入的次数
对于读写锁ReentrantReadWriteLock来说,state的高16位表示读状态,也就是获取该读锁的次数;低16位表示获取到写锁的线程的可重入次数;
对于semphore来说,state用来表示当前可用信号的个数;
对于CountDownlatch来说,state用来表示计数器当前的值
AQS定义两种资源共享方式
Exclusive-独占,只有一个线程能执行,如ReentrantLock
Share-共享,多个线程可以同时执行,如Semaphore/CountDownLatch
AQS定义两种队列
同步等待队列
AQS当中的同步等待队列也称CLH队列,CLH队列是Craig、Landin、Hagersten三人发明的一种基于双向链表数据结构的队列,是FIFO先入先出线程等待队列,Java中的CLH队列是原CLH队列的一个变种,线程由原自旋机制改为阻塞机制。
同步器依赖内部的同步队列(`FIFO`双向队列)来完成同步状态的管理,当前线程获取状态失败时,同步器会将当前线程以及等待状态等信息构造成称为一个节点(`Node`)并将其加入同步队列,同时会阻塞当前线程,当同步状态释放时,会把首节点中的线程唤醒,使其再次尝试获取同步状态。
独占锁同步状态获取与释放
共享锁同步状态与释放
条件等待队列
Condition是一个多线程间协调通信的工具类,使得某个,或者某些线程一起等待某个条件(Condition),只有当该条件具备时,这些等待线程才会被唤醒,从而重新争夺锁
AQS源码分析
自定义同步器实现时主要实现以下几种方法
isHeldExclusively():该线程是否正在独占资源。只有用到condition才需要去实现它。
tryAcquire(int):独占方式。尝试获取资源,成功则返回true,失败则返回false。
tryRelease(int):独占方式。尝试释放资源,成功则返回true,失败则返回false。
tryAcquireShared(int):共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
tryReleaseShared(int):共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点返回true,否则返回false。
AQS 队列同步器的实现和分析
基于AQS实现自定义同步器(自定义实现)
提供多种锁模式
独占、共享
独占模式
public final void acquire(int arg);
public final boolean release(int arg);
共享模式
public final void acquireShared(int arg);
public final boolean releaseShared(int arg);
可中断、不可中断
共享
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException;
throws InterruptedException;
独占
public final void acquireInterruptibly(int arg)
throws InterruptedException;
throws InterruptedException;
待超时时间的
独占
public final boolean tryAcquireNanos(int arg, long nanosTimeout);
共享
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException;
AQS使用的设计模式
模板方法模式
同步器的设计基于模板方法模式。模板方法模式的意图是,定义一个操作中的算法的骨架,而将一些步骤的实现延迟到子类中。模板方法使得子类可以不改变一个算法的结构即可重定义该算法的某些特定步骤。我们最常见的就是Spring框架里的各种Template。
实际例子
我们开了个蛋糕店,蛋糕店不能只卖一种蛋糕呀,于是我们决定先卖奶油蛋糕,芝士蛋糕和慕斯蛋糕。三种蛋糕在制作方式上一样,都包括造型,烘焙和涂抹蛋糕上的东西。所以可以定义一个抽象蛋糕模型
然后就可以批量生产三种蛋糕
子主题
这样一来,不但可以批量生产三种蛋糕,而且如果日后有扩展,只需要继承抽象蛋糕方法就可以了,十分方便,我们天天生意做得越来越赚钱。突然有一天,我们发现市面有一种最简单的小蛋糕销量很好,这种蛋糕就是简单烘烤成型就可以卖,并不需要涂抹什么食材,由于制作简单销售量大,这个品种也很赚钱,于是我们也想要生产这种蛋糕。但是我们发现了一个问题,抽象蛋糕是定义了抽象的涂抹方法的,也就是说扩展的这种蛋糕是必须要实现涂抹方法,这就很鸡儿蛋疼了。怎么办?我们可以将原来的模板修改为带钩子的模板。
自定义实现一个独占锁
设计一个同步工具:该工具在同一时刻,只允许至多3个线程同时访问,超过3个线程的访问将被阻塞。
首先,确定访问模式。TrinityLock能够在同一时刻支持多个线程的访问,这显然是共享式访问,因此,需要使用同步器提供的acquireShared(int args)方法等和Shared相关的方法,这就要求TwinsLock必须重写tryAcquireShared(int args)方法和tryReleaseShared(int args)方法,这样才能保证同步器的共享式同步状态的获取与释放方法得以执行。
其次,定义资源数。TrinityLock在同一时刻允许至多三个线程的同时访问,表明同步资源数为3,这样可以设置初始状态status为3,当一个线程进行获取,status减1,该线程释放,则status加1,状态的合法范围为0、1和2,3,其中0表示当前已经有3个线程获取了同步资源,此时再有其他线程对同步状态进行获取,该线程只能被阻塞。在同步状态变更时,需要使用compareAndSet(int expect,int update)方法做原子性保障。
最后,组合自定义同步器。前面的章节提到,自定义同步组件通过组合自定义同步器来完成同步功能,一般情况下自定义同步器会被定义为自定义同步组件的内部类。
AQS条件变量的支持
synchronized 同时只能与一个共享变量的notify或wait方法实现同步,而AQS的一个锁可以对应多个条件变量。
Unsafe
Unsafe是位于sun.misc包下的一个类,主要提供一些用于执行低级别、不安全操作的方法,如直接访问系统内存资源、自主管理内存资源等,这些方法在提升Java运行效率、增强Java语言底层资源操作能力方面起到了很大的作用。
但由于Unsafe类使Java语言拥有了类似C语言指针一样操作内存空间的能力,这无疑也增加了程序发生相关指针问题的风险。
在程序中过度、不正确使用Unsafe类会使得程序出错的概率变大,使得Java这种安全的语言变得不再“安全”,因此对Unsafe的使用一定要慎重。
Unsafe类为一单例实现,提供静态方法getUnsafe获取Unsafe实例,当且仅当调用getUnsafe方法的类为引导类加载器所加载时才合法,否则抛出SecurityException异常。
功能介绍
Unsafe提供的API大致可分为内存操作、CAS、Class相关、对象操作、线程调度、系统信息获取、内存屏障、数组操作等几类
内存操作
分配、拷贝、扩充、释放堆外内存
//分配内存, 相当于C++的malloc函数
public native long allocateMemory(long bytes);
public native long allocateMemory(long bytes);
//扩充内存
public native long reallocateMemory(long address, long bytes);
public native long reallocateMemory(long address, long bytes);
//释放内存
public native void freeMemory(long address);
public native void freeMemory(long address);
//内存拷贝
public native void copyMemory(Object srcBase, long srcOffset, Object destBase, long destOffset, long bytes);
public native void copyMemory(Object srcBase, long srcOffset, Object destBase, long destOffset, long bytes);
设置、获得给定地址中的值
//在给定的内存块中设置值
public native void setMemory(Object o, long offset, long bytes, byte value);
public native void setMemory(Object o, long offset, long bytes, byte value);
//获取给定地址值,忽略修饰限定符的访问限制。与此类似操作还有: getInt,getDouble,getLong,getChar等
public native Object getObject(Object o, long offset);
public native Object getObject(Object o, long offset);
//为给定地址设置值,忽略修饰限定符的访问限制,与此类似操作还有: putInt,putDouble,putLong,putChar等
public native void putObject(Object o, long offset, Object x);
public native void putObject(Object o, long offset, Object x);
通常,我们在Java中创建的对象都处于堆内内存(heap)中,堆内内存是由JVM所管控的Java进程内存,并且它们遵循JVM的内存管理机制,JVM会采用垃圾回收机制统一管理堆内存。与之相对的是堆外内存,存在于JVM管控之外的内存区域,Java中对堆外内存的操作,依赖于Unsafe提供的操作堆外内存的native方法。
使用堆外内存的原因
对垃圾回收停顿的改善。由于堆外内存是直接受操作系统管理而不是JVM,所以当我们使用堆外内存时,即可保持较小的堆内内存规模。从而在GC时减少回收停顿对于应用的影响
提升程序I/O操作的性能。通常在I/O通信过程中,会存在堆内内存到堆外内存的数据拷贝操作,对于需要频繁进行内存间数据拷贝且生命周期较短的暂存数据,都建议存储到堆外内存。
典型应用
DirectByteBuffer是Java用于实现堆外内存的一个重要类,通常用在通信过程中做缓冲池,如在Netty、MINA等NIO框架中应用广泛。DirectByteBuffer对于堆外内存的创建、使用、销毁等逻辑均由Unsafe提供的堆外内存API来实现。
下图为DirectByteBuffer构造函数,创建DirectByteBuffer的时候,通过Unsafe.allocateMemory分配内存、Unsafe.setMemory进行内存初始化,而后构建Cleaner对象用于跟踪DirectByteBuffer对象的垃圾回收,以实现当DirectByteBuffer被垃圾回收时,分配的堆外内存一起被释放。
CAS
api
/**
* CAS
* @param o 包含要修改field的对象
* @param offset 对象中某field的偏移量
* @param expected 期望值
* @param update 更新值
* @return true | false
*/
public final native boolean compareAndSwapObject(Object var1, long var2, Object var4, Object var5);
* CAS
* @param o 包含要修改field的对象
* @param offset 对象中某field的偏移量
* @param expected 期望值
* @param update 更新值
* @return true | false
*/
public final native boolean compareAndSwapObject(Object var1, long var2, Object var4, Object var5);
public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);
public final native boolean compareAndSwapLong(Object var1, long var2, long var4, long var6);
典型应用
AtomicInteger的实现中,静态字段valueOffset即为字段value的内存偏移地址,valueOffset的值在AtomicInteger初始化时,在静态代码块中通过Unsafe的objectFieldOffset方法获取。在AtomicInteger中提供的线程安全方法中,通过字段valueOffset的值可以定位到AtomicInteger对象中value的内存地址,从而可以根据CAS实现对value字段的原子操作
下图为某个AtomicInteger对象自增操作前后的内存示意图,对象的基地址baseAddress=“0x110000”,通过baseAddress+valueOffset得到value的内存地址valueAddress=“0x11000c”;然后通过CAS进行原子性的更新操作,成功则返回,否则继续重试,直到更新成功为止。
Class相关
动态创建类(普通类 &匿名类)
获取Field的内存地址偏移量
监测、确保类初始化
对象操作
获取对象成员属性在内存偏移量
非常规对象实例化
存储、获取指定偏移地址的变量值(包含延迟生效、volatile语义)
数组相关
返回数组元素内存大小
返回数组首元素的偏移地址
内存屏障
禁止load、store重排序
在Java 8中引入,用于定义内存屏障(也称内存栅栏,内存栅障,屏障指令等,是一类同步屏障指令,是CPU或编译器在对内存随机访问的操作中的一个同步点,使得此点之前的所有读写操作都执行后才可以开始执行此点之后的操作),避免代码重排序。
//内存屏障,禁止load操作重排序。屏障前的load操作不能被重排序到屏障后,屏障后的load操作不能被重排序到屏障前
public native void loadFence();
public native void loadFence();
//内存屏障,禁止store操作重排序。屏障前的store操作不能被重排序到屏障后,屏障后的store操作不能被重排序到屏障前
public native void storeFence();
public native void storeFence();
//内存屏障,禁止load、store操作重排序
public native void fullFence();
public native void fullFence();
典型应用
在Java 8中引入了一种锁的新机制——StampedLock,它可以看成是读写锁的一个改进版本。StampedLock提供了一种乐观读锁的实现,这种乐观读锁类似于无锁的操作,完全不会阻塞写线程获取写锁,从而缓解读多写少时写线程“饥饿”现象。由于StampedLock提供的乐观读锁不阻塞写线程获取读锁,当线程共享变量从主内存load到线程工作内存时,会存在数据不一致问题,所以当使用StampedLock的乐观读锁时,需要遵从如下图用例中使用的模式来确保数据的一致性。
系统相关
返回内存页大小
返回系统指针大小
线程调度
线程挂起、恢复
//取消阻塞线程
public native void unpark(Object thread);
public native void unpark(Object thread);
//阻塞线程
public native void park(boolean isAbsolute, long time);
public native void park(boolean isAbsolute, long time);
获取、释放锁
//获得对象锁(可重入锁)
@Deprecated
public native void monitorEnter(Object o);
@Deprecated
public native void monitorEnter(Object o);
//释放对象锁
@Deprecated
public native void monitorExit(Object o);
@Deprecated
public native void monitorExit(Object o);
//尝试获取对象锁
@Deprecated
public native boolean tryMonitorEnter(Object o);
@Deprecated
public native boolean tryMonitorEnter(Object o);
方法park、unpark即可实现线程的挂起与恢复,将一个线程进行挂起是通过park方法实现的,调用park方法后,线程将一直阻塞直到超时或者中断等条件出现;unpark可以终止一个挂起的线程,使其恢复正常。
典型应用
- java锁和同步器框架的核心类AbstractQueuedSynchronizer,就是通过调用LockSupport.park()和LockSupport.unpark()实现线程的阻塞和唤醒的,而LockSupport的park、unpark方法实际是调用Unsafe的park、unpark方式来实现。
如何获取Unsafe类??
LockSupport
作用:挂起和唤醒 线程
是一个简单的代理类,里面的代码都是使用 Unsafe 类里面的方法。
park()
阻塞线程
unpark()
解除阻塞线程
与 wait()、notify() 的区别
LockSupport 主要是针对 Thread 进进行阻塞处理,可以指定阻塞队列的目标对象,每次可以指定具体的线程唤醒
Object.wait() 是以对象为纬度,阻塞当前的线程和唤醒单个(随机)或者所有线程。
LockSupport 阻塞和解除阻塞线程直接操作的是 Thread。而 Object 的 wait/notify 并不是直接对线程操作,是被动的方法,需要一个 Object 来进行线程的挂起或唤醒。
Thead 在调用 wait 之前,当前线程必须先获得该对象的监视器(syschronized),被唤醒之后需要重新获取到监视器才能继续执行。而 LockSupport 可以随意进行 park 或者 unpark。
Java 内存模型(JMM)
什么是JMM模型
Java内存模型(Java Memory Model简称JMM)是一种抽象的概念,并不真实存在,它描述的是一组规则或规范,通过这组规范定义了程序中各个变量(包括实例字段,静态字段和构成数组对象的元素)的访问方式。
JVM运行程序的实体是线程,而每个线程创建时,JVM都会为其创建一个工作内存(有些地方称为栈空间),用于存储线程私有的数据。
Java内存模型中规定所有变量都存储在主内存,主内存是共享内存区域,所有线程都可以访问,但线程对变量的操作(读取赋值等)必须在工作内存中进行,首先要将变量从主内存拷贝的自己的工作内存空间,然后对变量进行操作,操作完成后再将变量写回主内存,不能直接操作主内存中的变量,工作内存中存储着主内存中的变量副本拷贝。
JMM和JVM的区别
JMM是围绕原子性,有序性、可见性展开。
JMM与Java内存区域唯一相似点,都存在共享数据区域和私有数据区域,在JMM中主内存属于共享数据区域,从某个程度上讲应该包括了堆和方法区,而工作内存数据线程私有数据区域,从某个程度上讲则应该包括程序计数器、虚拟机栈以及本地方法栈。
主内存
主要存储的是Java实例对象,所有线程创建的 实例对象都存放在主存中,不管该实例对象时成员边浪还是方法中的本地变量,当然还包含共享的类信息、常量、静态 变量。由于是共享数据区域,多条线程对同一个变量进行访问可能会发生线程安全问题。
工作内存
主要存储当前方法的所有本地变量(工作内存中存储着主内存中的变量副本拷贝),每个线程只能访问自己的工作内存,即线程中的本地变量对其他线程是不可见的,就算是两个线程执行的是同一段代码,它们也会各自在自己的工作内存中创建属于当前线程的本地变量,当然也包括了字节码行号指示器、相关Native方法的信息。
由于工作内存是每个线程的私有数据,线程间无法相互访问工作内存,因此存储在工作内存的数据不存在线程安全问题。
根据JVM 虚拟机规范主内存与工作内存的数据存储类型以及操作方式,对于一个实例对象中的成员方法而言,如果方法中包含本地变量是基本类型,将直接存储在工作内存的栈帧结构中,但倘若本地变量是引用类型,那么该变量的引用会存储在工作内存的栈帧中,而对象实例将存储在主内存(共享内存区域,堆中)。
但对于实例对象的成员变量,不管它是基本数据类型或者包装类型还是引用类型,都会存储在堆中
至于static变量以及类本身相关信息将会存储在主内存中。
在主内存中的实例对象可以被多线程共享,倘若两个线程同时调用了同一对象的同一方法,那么两条线程会将要操作的数据拷贝一份到自己的工作内存中,执行完成操作后才刷新到主内存中。
Java内存模型与硬件内存架构的关系
对于硬件内存来说只有寄存器、缓存内存、主内存的概念,并没有工作内存之分,也就是说Java内存模型对内存的划分对硬件内存并没有任何影响,因为JMM只是一种抽象的概念,是一组规则,并不实际存在,不管是工作内存的数据还是主 内存的数据,对于计算机硬件来说都会存储在计算机主内存中,当然也有可能存储在到CPU缓存或者寄存器中
因此总体上来 说,Java内存模型和计算机硬件内存架构是一个相互交叉的关系,是一种抽象概念划分与真实物理硬件的交互
JMM存在的必要性
由于JVM运行程序的实体是线程,而每个线程创建时JVM都会为其创建一个工作内存(有些地方称为栈空间),用于存储线程私有的数据,线程与主内存中的变量操作必须通过工作内存间接完成,主要过程是将变量从主内存拷贝的每个线程各自的工作内存空间,然后对变量进行操作,操作完成后再将变量写回主内存,如果存在两个线程同时对一个主内存中的实例对象的变量进行操作就有可能诱发线程安全问题。
假设主内存中存在一个共享变量X,现在有A和B两条线程分别对该变量x=1进行操作,A和B线程各自在自己的工作内存中存在共享变量副本X.
假设现在A线程想要修改x的值 为2,而B线程却想要读取X的值,那么B线程读取到的值是A线程更新后的值2还是更新前的值1呢?答案是不确定,即B线程 有可能读取到A线程更新前的值1,也有可能读取到A 线程更新后的值2,这是因为工作内存是每个线程私有的数据区域,而线程A变量x时,首先是讲变量从主内存拷贝到A线程的工作内存中,然后对变量进行操作,操作完成 后再将变量X写回主内存中,而对于B线程也是类似,这样就有可能造成主内存与工作内存间数据存在不一致性问题。
假设A线程修改完成后正在将数据写回主内存,而B线程此时正在读取主内存,即将x=1拷贝到自己的工作呢次云中,这样B线程读取的值就是x=1,但如果A线程已将x=2写回主内存后,B线程才开始读取的话,那么此时B线程读取的就是x=2,但到底是哪种情况先发生呢?
JMM数据同步八大原子操作
(1)lock(锁定):作用于主内存的变量,把一个变量标记为一条线程独占状态
(2)unlock(解锁):作用于主内存的变量,把一个处于锁定状态的变量释放出来,释放后
的变量才可以被其他线程锁定
的变量才可以被其他线程锁定
(3)read(读取):作用于主内存的变量,把一个变量值从主内存传输到线程的工作内存
中,以便随后的load动作使用
中,以便随后的load动作使用
(4)load(载入):作用于工作内存的变量,它把read操作从主内存中得到的变量值放入工
作内存的变量副本中
作内存的变量副本中
(5)use(使用):作用于工作内存的变量,把工作内存中的一个变量值传递给执行引擎
(6)assign(赋值):作用于工作内存的变量,它把一个从执行引擎接收到的值赋给工作内
存的变量
存的变量
(7)store(存储):作用于工作内存的变量,把工作内存中的一个变量的值传送到主内存
中,以便随后的write的操作
中,以便随后的write的操作
(8)write(写入):作用于工作内存的变量,它把store操作从工作内存中的一个变量的值
传送到主内存的变量中
传送到主内存的变量中
JMM内存模型的八种同步操作
read(读取),从主内存读取数据
load(载入):将主内存读取到的数据写入到工作内存
use(使用): 从工作内存读取数据来计算
assign(赋值):将计算好的值重新赋值到工作内存中
store(存储):将工作内存数据写入主内存
write(写入):将store过去的变量值赋值给主内存中的变量
lock(锁定):将主内存变量加锁,标识为线程 独占状态
unlock(解锁):将主内存变量解锁,解锁后其他线程可以锁定该变量
CPU缓存一致性协议MESI
为什么要有高速缓存?
CPU在摩尔定律的指导下以每18个月翻一番的速度在发展,然而内存和硬盘的发展速度远远不及CPU。这就造成了高性能能的内存和硬盘价格及其昂贵。然而CPU的高度运算需要高速的数据。为了解决这个问题,CPU厂商在CPU中内置了少量的高速缓存以解决I\O速度和CPU运算速度之间的不匹配问题。
在CPU访问存储设备时,无论是存取数据抑或存取指令,都趋于聚集在一片连续的区域中,这就被称为局部性原理。
在CPU访问存储设备时,无论是存取数据抑或存取指令,都趋于聚集在一片连续的区域中,这就被称为局部性原理。
时间局部 性
如果一个信息项正在被访问,那么在近期它很可能还会被再次访问。比如循环、递归、方法的反复调用等。
空间局部性
如果一个存储器的位置被引用,那么将来他附近的位置也会被引用。比如顺序执行的代码、连续创建的两个对象、数组等。
带有高速缓存的CPU执行计算的流程
1.程序以及数据被加载到主内存
2.指令和数据被加载到CPU的高速缓存
3.CPU执行指令,把结果写到高速缓存
4.高速缓存中的数据写回主内存
目前流行的多级缓存结构
由于CPU的运算速度超越了1级缓存的数据I\O能力,CPU厂商又引入了多级的缓存结构。
多核CPU多级缓存一致性协议MESI
MESI协议缓存状态
M 修改 (Modified)
该Cache line有效,数据被修改了,和内存中的数据不一致,数据只存在于本Cache中
缓存行必须时刻监听所有试图读该缓存行相对就主存的操作,这种操作必须在缓存将该缓存行写回主存并将状态变成S(共享)状态之前被延迟执行。
E 独享、互斥 (Exclusive)
该Cache line有效,数据和内存中的数据一致,数据只存在于本Cache中。
缓存行也必须监听其它缓存读主存中该缓存行的操作,一旦有这种操作,该缓存行需要变成S(共享)状态
S 共享 (Shared)
该Cache line有效,数据和内存中的数据一致,数据存在于很多Cache中
缓存行也必须监听其它缓存使该缓存行无效或者独享该缓存行的请求,并将该缓存行变成无效(Invalid)。
I 无效 (Invalid)
该Cache line无效。
MESI状态转换
触发事件
cache分类
前提:所有的cache共同缓存了主内存中的某一条数据。
本地cache:指当前cpu的cache。
触发cache:触发读写事件的cache
其他cache:指既除了以上两种之外的cache
注意:本地的事件触发 本地cache和触发cache为相同。
上图的切换解释
假设cache 1 中有一个变量x = 0的cache line 处于S状态(共享)。
那么其他拥有x变量的cache 2、cache 3等x的cache line调整为S状态(共享)或者调整为 I 状态(无效)。
多核缓存协同操作
假设有三个CPU A、B、C,对应三个缓存分别是cache a、b、 c。在主内存中定义了x的引用值为0。
单核读取
执行流程
CPU A发出了一条指令,从主内存中读取x。
从主内存通过bus读取到缓存中(远端读取Remote read),这是该Cache line修改为E状态(独享).
双核读取
执行流程
CPU A发出了一条指令,从主内存中读取x。
CPU A从主内存通过bus读取到 cache a中并将该cache line 设置为E状态。
CPU B发出了一条指令,从主内存中读取x。
CPU B试图从主内存中读取x时,CPU A检测到了地址冲突。这时CPU A对相关数据做出响应。此时x 存储于cache a和cache b中,x在chche a和cache b中都被设置为S状态(共享)。
修改数据
执行流程
CPU A 计算完成后发指令需要修改x.
CPU A 将x设置为M状态(修改)并通知缓存了x的CPU B, CPU B将本地cache b中的x设置为I状态(无效)
CPU A 对x进行赋值。
同步数据
执行流程
CPU B 发出了要读取x的指令。
CPU B 通知CPU A,CPU A将修改后的数据同步到主内存时cache a 修改为E(独享)
CPU A同步CPU B的x,将cache a和同步后cache b中的x设置为S状态(共享)。
缓存行伪共享
什么是伪共享?
CPU缓存系统中是以缓存行(cache line)为单位存储的。目前主流的CPU Cache 的 Cache Line 大小都是64Bytes。在多线程情况下,如果需要修改“共享同一个缓存行的变量”,就会无意中影响彼此的性能,这就是伪共享(False Sharing)。
举个例子: 现在有2个long 型变量 a 、b,如果有t1在访问a,t2在访问b,而a与b刚好在同一个cache line中,此时t1先修改a,将导致b被刷新!
怎么解决伪共享
Java8中新增了一个注解:@sun.misc.Contended。加上这个注解的类会自动补齐缓存行,需要注意的是此注解默认是无效的,需要在jvm启动时设置 -XX:-RestrictContended 才会生效
@sun.misc.Contended
public final static class TulingVolatileLong {
public volatile long value = 0L;
//public long p1, p2, p3, p4, p5, p6;
}
public final static class TulingVolatileLong {
public volatile long value = 0L;
//public long p1, p2, p3, p4, p5, p6;
}
CPU底层全执行流程
同步规则分析
1)不允许一个线程无原因地(没有发生过任何assign操作)把数据从工作内存同步回主内存中
2)一个新的变量只能在主内存中诞生,不允许在工作内存中直接使用一个未被初始化(load或者assign)的变量。即就是对一个变量实施use和store操作之前,必须先自行assign和load操作
3)一个变量在同一时刻只允许一条线程对其进行lock操作,但lock操作可以被同一线程重复执行多次,多次执行lock后,只有执行相同次数的unlock操作,变量才会被解锁。lock和unlock必须成对出现
4)如果对一个变量执行lock操作,将会清空工作内存中此变量的值,在执行引擎使用这个变量之前需要重新执行load或assign操作初始化变量的值
5)如果一个变量事先没有被lock操作锁定,则不允许对它执行unlock操作;也不允许去unlock一个被其他线程锁定的变量。
6)对一个变量执行unlock操作之前,必须先把此变量同步到主内存中(执行store和write操作)
并发编程的可见性、原子性和有序性问题
原子性
原子性指的是一个操作是不可中断的,即使是在多线程环境下,一个操作一旦开始就不会被其他线程影响。在java中,对基本数据类型的变量的读取和赋值操作是原子性操作有点要注意的是,对于32位系统的来说,long类型数据和double类型数据(对于基本数据类型,byte,short,int,float,boolean,char读写是原子操作),它们的读写并非原子性的,也就是说
如果存在两条线程同时对long类型或者double类型的数据进行读写是存在相互干扰的,因为对于32位虚拟机来说,每次原子读写是32位的,而long和double则是64位的存储单元,这样会导致一个线程在写时,操作完前32位的原子操作后,轮到B线程读取时,恰好只读取到了后32位的数据,这样可能会读取到一个既非原值又不是线程修改值的变量,它可能
是“半个变量”的数值,即64位数据被两个线程分成了两次读取。但也不必太担心,因为读取到“半个变量”的情况比较少见,至少在目前的商用的虚拟机中,几乎都把64位的数据的读写操作作为原子操作来执行,因此对于这个问题不必太在意,知道这么回事即可
如果存在两条线程同时对long类型或者double类型的数据进行读写是存在相互干扰的,因为对于32位虚拟机来说,每次原子读写是32位的,而long和double则是64位的存储单元,这样会导致一个线程在写时,操作完前32位的原子操作后,轮到B线程读取时,恰好只读取到了后32位的数据,这样可能会读取到一个既非原值又不是线程修改值的变量,它可能
是“半个变量”的数值,即64位数据被两个线程分成了两次读取。但也不必太担心,因为读取到“半个变量”的情况比较少见,至少在目前的商用的虚拟机中,几乎都把64位的数据的读写操作作为原子操作来执行,因此对于这个问题不必太在意,知道这么回事即可
可见性
可见性指的是当一个线程修改了某个共享变量的值,其他线程是否能够马上得知这个修改的值。对于串行程序来说,可见性是不存在的,因为我们在任何一个操作中修改了某个变量的值,后续的操作中都能读取这个变量值,并且是修改过的新值。
但在多线程环境中可就不一定了,前面我们分析过,由于线程对共享变量的操作都是线程拷贝到各自的工作内存进行操作后才写回到主内存中的,这就可能存在一个线程A修改了共享变量x的值,还未写回主内存时,另外一个线程B又对主内存中同一个共享变量x进行操作,但此时A线程工作内存中共享变量x对线程B来说并不可见,这种工作内存与主内存同步延迟现象就造成了可见性问题,另外指令重排以及编译器优化也可能导致可见性问题,通过前面的分析,我们知道无论是编译器优化还是处理器优化的重排现象,在多线程环境下,确实会导致程序轮序执行的问题,从而也就导致可见性问题。
有序性
有序性是指对于单线程的执行代码,我们总是认为代码的执行是按顺序依次执行的,这样的理解并没有毛病,毕竟对于单线程而言确实如此,但对于多线程环境,则可能出现乱序现象,因为程序编译成机器码指令后可能会出现指令重排现象,重排后的指令与原指令的顺未必一致,要明白的是,在Java程序中,倘若在本线程内,所有操作都视为有序行为,如果是多线程环境下,一个线程中观察另外一个线程,所有操作都是无序的,前半句指的是单线程内保证串行语义执行的一致性,后半句则指指令重排现象和工作内存与主内存同步延迟现象
JMM是如何解决原子性&可见性&有序性问题
原子性问题
除了JVM自身提供的对基本数据类型读写操作的原子性外,可以通过 synchronized和Lock实现原子性。因为synchronized和Lock能够保证任一时刻只有一个线程访问该代码块。
可见性问题
volatile关键字保证可见性。当一个共享变量被volatile修饰时,它会保证修改的值立即被其他的线程看到,即修改的值立即更新到主存中,当其他线程需要读取时,它会去内存中读取新值。
synchronized和Lock也可以保证可见性,因为它们可以保证任一时刻只有一个线程能访问共享资源,并在其释放锁之前将修改的变量刷新到内存中。
有序性问题
通过volatile可以保证有序性
可以通过synchronized和Lock来保证有序性,很显然,synchronized和Lock保证每个时刻是有一个线程执行同步代码,相当于是让线程顺序执行同步代码,自然就保证了有序性
指令重排序
Java语言规范规定JVM线程内部维持顺序化语义。即只要程序的最终结果与它的顺序情况的结果相等,那么指令的执行顺序可以与代码顺序不一样,此过程叫指令你的重排序
指令重排序的意义是什么?
JVM能根据处理器特征适当的对机器指令进行重排序,使机器指令能更符合CPU的执行特性,最大限度的发挥机器特性。
as-if-serial 语义
不管怎么重排序,程序的执行结果不能被改变。编译器、runtime和处理器都必须遵守as-if-serial语义
为了遵守as-if-serial 语义,编译器和处理器不会对存在数据依赖关系的操作做重排序,因为这种重排序会改变执行结果。但是操作之间不存在数据依赖关系,这些操作就可能被编译器和处理器重排序。
happens-before原则
只靠synchronized和volatile关键字来保证原子性、可见性以及有序性,那么编写并发编程显得十分麻烦,从JDK5开始,Java使用新的JSR-133内存模型,提供了happeds-before原则来辅助保证程序执行的原子性、可见性以及有序性的问题,它是判断数据是否存在竞争、线程是否安全的依据。
程序顺序原则,即在一个线程内必须保证语义串行性,也就是说按照代码顺序执行。
锁规则:解锁(unlock)操作必然发生在后续的同一个锁的加锁(lock)之前,也就是说,如果对于一个锁解锁后,再加锁,那么加锁的动作必须在解锁动作之后(同一个锁)
volatile规则 volatile变量的写,先发生于读,这保证了volatile变量的可见性,简单的理解就是,volatile变量在每次被线程访问时,都强迫从主内存中读该变量的值,而当该变量发生变化时,又会强迫将最新的值刷新到主内存,任何时刻,不同的线程总是能够看到该变量的最新值
线程启动规则 线程的start()方法先于它的每一个动作,即如果线程A在执行线程B的start方法之前修改了共享变量的值,那么当线程B执行start方法时,线程A对共享变量的修改对线程B可见
传递性 A先于B ,B先于C 那么A必然先于C
线程终止规则 线程的所有操作先于线程的终结,Thread.join()方法的作用是等待当前执行的线程终止。假设在线程B终止之前,修改了共享变量,线程A从线程B的join方法成功返回后,线程B对共享变量的修改将对线程A可见。
线程中断规则 对线程 interrupt()方法的调用先行发生于被中断线程的代码检测到
中断事件的发生,可以通过Thread.interrupted()方法检测线程是否中断。
中断事件的发生,可以通过Thread.interrupted()方法检测线程是否中断。
对象终结规则对象的构造函数执行,结束先于finalize()方法
volatile
作用
保证被volatile修饰的共享 变量对所有线程总是可见的,也就是当一个线程修改了一个被volatile修饰共享变量的值,新值总是可以被其他线程立即得知。
禁止指令重排序
volatile可见性
volatile的可见性作用,我们必须意思到被volatile修饰的变量对所有线程总是可见的,对volatile变量的所有写操作总是能立刻反应到其他线程的
样例代码
initFlag加上volatile关键字
线程A改变initFlag属性之后,线程B马上感知到
initFlag不加volatile关键字
线程B感知不到
volatile无法保证原子性
public class VolatileVisibility {
public static volatile int i =0;
public static void increase(){
i++;
} }
public static volatile int i =0;
public static void increase(){
i++;
} }
在并发场景下,i变量的任何改变都会立马反应到其他线程,但是如果存在多个线程同时调用 increase()方法的话,就会出现线程安全问题,毕竟i++操作并不具备原子性,该操作是 先读取值,然后写回一个新值,相当于原来的值加上1,分两步完成,如果第二个线程在第一个线程读取旧值和写回新值期间读取i的域值,那么第二个线程就会与第一个线程
一起看到同一个值,并执行相同值的加1操作,这也就造成了线程安全失败。
一起看到同一个值,并执行相同值的加1操作,这也就造成了线程安全失败。
因此对于increase方法必须使用synchronized修饰,以便保证线程安全,需要注意 的是一旦使用synchronized修饰方法后,由于synchronized本身也具备 与volatile相同的特性,即可见性,因此在这种情况下就完全可以省去volatile修饰变量。
volatile禁止重排序
硬件层的内存屏障
Intel硬件提供了一系列的内存屏障
Ifen,是一种Load Barrier读屏障
sfence,是一种store Barrier写屏障
mfence,是一种全能型的屏障,具备Ifence和sfnece的能力
Lock前缀,Lock不是 一种内存屏障但是它能完成类似内存屏障的功能。Lock会对CPU总线和高速缓存加锁,可以理解为CPU指令级的一种锁。
JVM中提供了四类内存屏障指令
LoadLoad
Load1; LoadLoad; Load2
保证load1的读取操作在load2及后续读取操作之前执行
StoreStore
Store1; StoreStore; Store2
在store2及其后的写操作执行前,保证store1的写操作已刷新到主内存
LoadStore
Load1; LoadStore; Store2
在store2及其后的写操作执行前,保证load1的读操作已读取结束
StoreLoad
Store1; StoreLoad; Load2
保证store1的写操作已刷新到主内存之后,load2及其后的读操作才能执行
内存屏障
是一个CPU指令,作用有两个
保证特定操作的执行顺序
保证某些变量 的内存可见性(利用该特性实现volatile的内存可见性)
由于编译器和处理器都能执行指令重排序,如果在指令间插入一条Memory barrier则会告诉编译器和CPU,不管什么指令都不能和 这条Memory Barrier指令重排序 ,也就是说通过插入内存屏障禁止再内存屏障前后的指令执行重排序优化。
Memory Barrier的另外一个作用是强制刷出各种CPU的缓存数据,因此任何CPU上的线程都能 读取到 这些数据 的 最新版本。
禁止重排序例子
双重加锁单例模式代码
这段代码在单线程环境下并没有什么问题,但是如果在多线程环境下就会 出现安全问题。原因在于某一个线程执行到第一次检测,读取到的instance不为null时,instance的引用对象可能还没有初始化
instance = new DoubleCheckLock() 可以分为以下3步骤完成
1.memory = allocate() 分配对象内存空间
2.instance(memory) 初始化对象)
3.instance = memory 设置instance指向刚分配的内存地址,此时instance != null
上面的步骤有可能重排序
1.memory = allocate() 分配对象内存空间
3.instance = memory 设置instance指向刚分配的内存地址,此时instance != null
2.instance(memory) 初始化对象)
由于2和3之间不存在数据依赖关系,而且无论重排序 前还是重排序后程序的执行结果在单线程并没有改变,因此这种重排序优化是允许的。
但是指令重排只会保证 串行语义的执行的 一致性(单线程),单并不会关心 多线程间的语义一致性。
所以当一条线程访问instance不为null时,由于instance实例未必已初始化完成,也就造成了线程安全问题。
那么如何解决呢?很简单,我们使用volatile禁止instance变量被执行指令重排优化即可;
private volatile static DoubleCheckLock instance;
重排序对多线程的影响
volatile内存语义 volatile写插入内存屏障后生成的指令序列示意图
volatile重排序的规则表
当第二个操作是volatile写时,不管第一个操作是什么,都不能重排序。这个规则确保volatile写之前的操作不会被编译器重排序到volatile写之后。
当第一个操作是 volatile读时,不管第二个操作是什么,都不能重排序。这个规则确保volatile读之后的操作不会被编译器重排序到volatile读之前。
当第一个操作是 volatile 写,第二个操作是volatile读或写时,不能重排序
为了实现volatile的内存语义,编译器在生成字节码时,会在指令序列中插入内存屏障来禁止特定类型的处理器重排序。对于编译器来说,发现一个最优布置来最小化插入屏障的总数几乎不可能。为此,JMM采取保守策略。下面是基于保守策略的JMM内存屏障插入策略。
在每个volatile写操作的前面插入一个StoreStore屏障。
在每个volatile写操作的后面插入一个StoreLoad屏障
在每个volatile读操作的后面插入一个LoadLoad屏障
在每个volatile读操作的后面插入一个LoadStore屏障
StoreStore屏障
上面的StoreStore屏障可以保证在volatle写之前,其前面的所有普通写操作已经对任意处理器可见了
这里的StoreStore屏障将保障上面所有的普通写在volatile写 之前刷新到主内存
StoreLoad屏障
此屏障的作用是避免volatile写后面可能有volatile读/写操作的重排序。因为编译器常常无法准确判断在一个volatile写的后面是否需要插入一个StoreLoad屏障。
为了能保证正确实现volatile的内存语义,JMM在采取了保守策略,在每个volatile写的后面,或者在每个volatile读的前面插入一个StroeLoad屏障。
从整体执行效率的角度考虑,JMM最终选择了在每个volatile写的后面插入一个StoreLoad屏障。
因为volatile写-读内存语义的常见使用模式是:一个写线程写volatile变量,多个 读线程读同一个volatile变量。当读线程的数量大大超过写线程时,选择在volatile写之后插入StoreLoad屏障将带来可观的执行效率的提升。从这里可以看到JMM 在实现上的一个特点:首先确保正确性,然后再去追求执行效率。
volatile读插入内存屏障
LoadLoad屏障用来禁止处理器把上面的volatile读与下面的普通读重排序。
LoadStore屏障用来禁止处理器把上面的volatile读与下面的普通写重排序。
示例
代码
针对readAndWrite()方法,编译器在生成字节码时可以做如下的优化。
注意,最后的StoreLoad屏障不能省略。因为第二个volatile写之后,方法立即return。此时编 译器可能无法准确断定后面是否会有volatile读或写,为了安全起见,编译器通常会在这里插 入一个StoreLoad屏障。
单例模式双重检查加锁问题
锁
锁的分类
重入锁
支持重进入的锁,它表示该锁能够支持一个线程对资源的重复加锁
实现重进入需要解决的2个问题
线程再次获取锁
锁需要去识别获取锁的线程是否为当前占据锁的线程,如果是,则再次成功获取
锁的最终释放
乐观锁和悲观锁
悲观锁
对数据被外界修改保持保守态度,认为数据很容易就会被其他线程修改,所以修改在数据被修改前先对数据进行加锁,并在整个数据处理过程中,使数据处于锁定状态。
乐观锁
认为数据在一般情况下不会造成冲突,所以在访问记录前不会加排他锁
公平锁和非公平锁的区别
公平锁
表示线程获取锁的顺序是按照线程请求锁的时间早晚来决定的
ReentrantLock pairLock = new ReentrantLock(true);
非公平锁
先来不一定先得
ReentrantLock pairLock = new ReentrantLock(false);如果不传,默认为非公平的
独占锁与共享锁
独占锁
保证任何时候都只有一个线程能获得锁,ReentrantLock就是以独占方式实现的
共享锁
可以同时由多个线程持有,ReadWriteLock读写锁,它允许一个资源可以被多线程同时 进行读操作
读写锁
实现缓存
实现分析
读写状态的设计
写锁的获取与释放
读锁的获取与释放
锁降级
synchronized
synchronized内置锁是一种对象锁(锁的是对象而非引用),作用粒度是对象,可以用来实现对临界资源的同步互斥访问,是可重入的
原理
- synchronized是基于JVM内置锁实现,通过内部对象Monitor(监视器锁)实现,基于进入与退出Monitor对象实现方法与代码块同步,监视器锁的实现依赖底层操作系统的Mutex lock(互斥锁)实现,它是一个重量级锁性能较低。当然,JVM内置锁在1.5之后版本做了重大的优化,如锁粗化(Lock Coarsening)、锁消除(Lock Elimination)、轻量级锁(Lightweight Locking)、偏向锁(Biased Locking)、适应性自旋(Adaptive Spinning)等技术来减少锁操作的开销,,内置锁的并发性能已经基本与Lock持平。
synchronized关键字被编译成字节码后会被翻译成monitorenter 和 monitorexit 两条指令分别在同步块逻辑代码的起始位置与结束位置。
每个同步对象都有一个自己的Monitor(监视器锁),加锁过程如下图所示:
Monitor监视器锁
任何一个对象都有一个Monitor与之关联,当且一个Monitor被持有后,它将处于锁定状态
Synchronized在JVM里的实现都是 基于进入和退出Monitor对象来实现方法同步和代码块同步,虽然具体实现细节不一样,但是都可以通过成对的MonitorEnter和MonitorExit指令来实现。
monitorenter
每个对象都是一个监视器锁(monitor)。当monitor被占用时就会处于锁定状态,线程执行monitorenter指令时尝试获取monitor的所有权,过程如下:
如果monitor的进入数为0,则该线程进入monitor,然后将进入数设置为1,该线程即为monitor的所有者
如果线程已经占有该monitor,只是重新进入,则进入monitor的进入数加1;
如果其他线程已经占用了monitor,则该线程进入阻塞状态,直到monitor的进入数为0,再重新尝试获取monitor的所有权;
monitorexit
执行monitorexit的线程必须是objectref所对应的monitor的所有者。指令执行时,monitor的进入数减1,如果减1后进入数为0,那线程退出monitor,不再是这个monitor的所有者。其他被这个monitor阻塞的线程可以尝试去获取这个 monitor 的所有权。
monitorexit,指令出现了两次,第1次为同步正常退出释放锁;第2次为发生异步退出释放锁;
示例代码
从编译 结果来看,方法的同步并没有通过monitorenter和monitorexit来完成,不过相对于普通方法,其常量池中多了ACC_SYNCHRONIZED标识符。JVM 就是根据该标识符来实现方法的同步的。
当方法调用时,调用指令将会检查方法的 ACC_SYNCHRONIZED 访问标志是否被设置,如果设置了,执行线程将先获取monitor,获取成功之后才能执行方法体,方法执行完后再释放monitor。在方法执行期间,其他任何线程都无法再获得同一个monitor对象
只是方法的同步是一种隐式的方式来实现,无需通过字节码来完成。两个指令的执行是JVM通过调用操作系统的互斥原语mutex来实现,被阻塞的线程会被挂起、等待重新调度,会导致“用户态和内核态”两个态之间来回切换,对性能有较大影响。
对象的内存布局
对象的内存布局 分为三块区域
对象头(Header)
比如 hash码,对象所属的年代,对象锁,锁状态标志,偏向锁(线程)ID,偏向时间,数组长度(数组对象)等。Java对象头一般占有2个机器码(在32位虚拟机中,1个机器码等于4字节,也就是32bit,在64位虚拟机中,1个机器码是8个字节,也就是64bit),但是 如果对象是数组类型,则需要3个机器码,因为JVM虚拟机可以通过Java对象的元数据信息确定Java对象的大小,但是无法从数组的元数据来确认数组的大小,所以用一块来记录数组长度。
实例数据 (Instance Data)
存放类的属性数据信息,包括父类的属性信息;
对齐填充(Padding)
由于虚拟机要求 对象起始地址必须是8字节的整数倍。填充数据不是必须存在的,仅仅是为了字节对齐;
第一部分Mark Word
用于存储对象自身的运行时数据,如哈希吗(HashCode)、GC代年龄、锁状态标志、线程持有的 锁、偏向线程ID、偏向时间戳等等。它是实现轻量级锁和偏向锁的关键。
锁对象
普通方法,锁的是当前实例对象
静态同步方法,锁的 是当前类的class对象
同步方法块,锁的是括号里的 对象
锁的膨胀升级过程
锁的状态总共有四种:无锁状态、偏向锁、轻量级锁和重量级锁
随着锁的竞争,锁可以从偏向锁升级到轻量级锁,再升级到重量级锁,但是锁的升级是单向的,也就是说只能从低到高升级,不会出现锁的降级。
从JDK1.6中默认是开启偏向锁和轻量级锁的,可以通过-XX:-UseBiasedLocking来禁用偏向锁
偏向锁
偏向锁是Java 6之后加入的新锁,它是一种针对加锁操作的优化手段,经过研究发现,在大多数情况下,锁不仅不存在多线程竞争,而且总是由同一线程多次获得,因此为了减少同一线程获取锁(会涉及到一些CAS操作,耗时)的代价而引入偏向锁。偏向锁的核心思想是,如果一个线程获得了锁,那么锁就进入偏向模式,此时Mark Word 的结构也变为偏向锁结构,当这个线程再次请求锁时,无需再做任何同步操作,即获取锁的过程,这样就省去了大量有关锁申请的操作,从而也就提供程序的性能。
所以,对于没有锁竞争的场合,偏向锁有很好的优化效果,毕竟极有可能连续多次是同一个线程申请相同的锁。但是对于锁竞争比较激烈的场合,偏向锁就失效了,因为这样场合极有可能每次申请锁的线程都是不相同的,因此这种场合下不应该使用偏向锁,否则会得不偿失,需要注意的是,偏向锁失败后,并不会立即膨胀为重量级锁,而是先升级为轻量级锁。下面我们接着了解轻量级锁。
默认开启偏向锁
开启偏向锁:-XX:+UseBiasedLocking -XX:BiasedLockingStartupDelay=0
关闭偏向锁:-XX:-UseBiasedLocking
开启偏向锁:-XX:+UseBiasedLocking -XX:BiasedLockingStartupDelay=0
关闭偏向锁:-XX:-UseBiasedLocking
轻量级锁
倘若偏向锁失败,虚拟机并不会立即升级为重量级锁,它还会尝试使用一种称为轻量级锁的优化手段(1.6之后加入的),此时Mark Word 的结构也变为轻量级锁的结构。轻量级锁能够提升程序性能的依据是“对绝大部分的锁,在整个同步周期内都不存在竞争”,注意这是经验数据。需要了解的是,轻量级锁所适应的场景是线程交替执行同步块的场合,如果存在同一时间访问同一锁的场合,就会导致轻量级锁膨胀为重量级锁。
自旋锁
轻量级锁失败后,虚拟机为了避免线程真实地在操作系统层面挂起,还会进行一项称为自旋锁的优化手段。这是基于在大多数情况下,线程持有锁的时间都不会太长,如果直接挂起操作系统层面的线程可能会得不偿失,毕竟操作系统实现线程之间的切换时需要从用户态转换到核心态,这个状态之间的转换需要相对比较长的时间,时间成本相对较高,因此自旋锁会假设在不久将来,当前的线程可以获得锁,因此虚拟机会让当前想要获取锁的线程做几个空循环(这也是称为自旋的原因),一般不会太久,可能是50个循环或100循环,在经过若干次循环后,如果得到锁,就顺利进入临界区。如果还不能获得锁,那就会将线程在操作系统层面挂起,这就是自旋锁的优化方式,这种方式确实也是可以提升效率的。最后没办法也就只能升级为重量级锁了。
锁消除
消除锁是虚拟机另外一种锁的优化,这种优化更彻底,Java虚拟机在JIT编译时(可以简单理解为当某段代码即将第一次被执行时进行编译,又称即时编译),通过对运行上下文的扫描,去除不可能存在共享资源竞争的锁,通过这种方式消除没有必要的锁,可以节省毫无意义的请求锁时间,如下StringBuffer的append是一个同步方法,但是在add方法中的StringBuffer属于一个局部变量,并且不会被其他线程所使用,因此StringBuffer不可能存在共享资源竞争的情景,JVM会自动将其锁消除。锁消除的依据是逃逸分析的数据支持。
锁消除,前提是java必须运行在server模式(server模式会比client模式作更多的优化),同时必须开启逃逸分析
:-XX:+DoEscapeAnalysis 开启逃逸分析
-XX:+EliminateLocks 表示开启锁消除。
-XX:+EliminateLocks 表示开启锁消除。
逃逸分析
使用逃逸分析,编译器可以对代码做如下优化:
一、同步省略。如果一个对象被发现只能从一个线程被访问到,那么对于这个对象的操作可以不考虑同步。
二、将堆分配转化为栈分配。如果一个对象在子程序中被分配,要使指向该对象的指针永远不会逃逸,对象可能是栈分配的候选,而不是堆分配。
三、分离对象或标量替换。有的对象可能不需要作为一个连续的内存结构存在也可以被访问到,那么对象的部分(或全部)可以不存储在内存,而是存储在CPU寄存器中。
是不是所有的对象和数组都会在堆内存分配空间?
不一定
在Java代码运行时,通过JVM参数可指定是否开启逃逸分析, -XX:+DoEscapeAnalysis : 表示开启逃逸分析 -XX:-DoEscapeAnalysis : 表示关闭逃逸分析。从jdk 1.7开始已经默认开启逃逸分析,如需关闭,需要指定-XX:-DoEscapeAnalysis
代码演示
synchronized锁实现与升级过程
wait notify
wait和notify 为什么需要在synchronized里面
wait方法的语义有两个,一个是释放当前对象的锁,另一个是使得当前线程进入阻塞队列,而这些操作都和监视器是相关的,所以wait必须要获得一个监视器锁。
而notify来说也 是一样的,它是唤醒一个线程,既然要去唤醒,首先得知道它在哪里?所以就必须要找到这个对象获取到这个对象的锁,然后到这个对象的等待队列中去唤醒一个线程。
对象头在JVM 源码中的定义,需要关心几个文件,oop.hpp/markOop.hpp
oop.hpp,每个 Java Object 在 JVM 内部都有一个 native 的 C++ 对象 oop/oopDesc 与之对应。先在oop.hpp中看
oopDesc的定义
oopDesc的定义
_mark 被声明在 oopDesc 类的顶部,所以这个 _mark 可以认为是一个 头部, 前面我们讲过头部保存了一些重要的
状态和标识信息,在markOop.hpp文件中有一些注释说明markOop的内存布局
状态和标识信息,在markOop.hpp文件中有一些注释说明markOop的内存布局
Lock
ReentrantLock
ReentrantLock是一种基于AQS框架的应用实现,是JDK中的一种线程并发访问的同步手段,它的功能类似于synchronized是一种互斥锁,可以保证线程安全。而且它具有比synchronized更多的特性,比如它支持手动加锁与解锁,支持加锁的公平性。
ReentrantLock如何实现synchronized不具备的公平与非公平性呢?
在ReentrantLock内部定义了一个Sync的内部类,该类继承AbstractQueuedSynchronized,
对该抽象类的部分方法做了实现;并且还定义了两个子类
对该抽象类的部分方法做了实现;并且还定义了两个子类
1、FairSync 公平锁的实现
2、NonfairSync 非公平锁的实现
这两个类都继承自Sync,也就是间接继承了AbstractQueuedSynchronized
所以这一个ReentrantLock同时具备公平与非公平特性。
上面主要涉及的设计模式:模板模式-子类根据需要做具体业务实现
上面主要涉及的设计模式:模板模式-子类根据需要做具体业务实现
实现重进入
重进入是指任意线程在获取到锁之后能够再次获取该锁而不会被锁所阻塞,该特性的实现需要解决以下两个问题。
1)线程再次获取锁。锁需要去识别获取锁的线程是否为当前占据锁的线程,如果是,则再次成功获取。
2)锁的最终释放。线程重复n次获取了锁,随后在第n次释放该锁后,其他线程能够获取到该锁。锁的最终释放要求锁对于获取进行计数自增,计数表示当前锁被重复获取的次数,而锁被释放时,计数自减,当计数等于0时表示锁已经成功释放
nonfairTryAcquire方法增加了再次获取同步状态的处理逻辑:通过判断当前线程是否为获取锁的线程来决定获取操作是否成功,如果是获取锁的线程再次请求,则将同步状态值进行增加并返回true,表示获取同步状态成功。同步状态表示锁被一个线程重复获取的次数。
如果该锁被获取了n次,那么前(n-1)次tryRelease(int releases)方法必须返回false,而只有同步状态完全释放了,才能返回true。可以看到,该方法将同步状态是否为0作为最终释放的条件,当同步状态为0时,将占有线程设置为null,并返回true,表示释放成功。
公平锁
ReentrantLock的构造函数中,默认的无参构造函数将会把Sync对象创建为NonfairSync对象,这是一个“非公平锁”;而另一个构造函数ReentrantLock(boolean fair)传入参数为true时将会把Sync对象创建为“公平锁”FairSync。
非公平锁
nonfairTryAcquire(int acquires)方法,对于非公平锁,只要CAS设置同步状态成功,则表示当前线程获取了锁,而公平锁则不同。tryAcquire方法,该方法与nonfairTryAcquire(int acquires)比较,唯一不同的位置为判断条件多了hasQueuedPredecessors()方法,即加入了同步队列中当前节点是否有前驱节点的判断,如果该方法返回true,则表示有线程比当前线程更早地请求获取锁,因此需要等待前驱线程获取并释放锁之后才能继续获取锁。
ReentrantReadWriteLock
之前提到锁(如Mutex和ReentrantLock)基本都是排他锁,这些锁在同一时刻只允许一个线程进行访问,而读写锁在同一时刻可以允许多个读线程访问,但是在写线程访问时,所有的读线程和其他写线程均被阻塞。读写锁维护了一对锁,一个读锁和一个写锁,通过分离读锁和写锁,使得并发性相比一般的排他锁有了很大提升。
除了保证写操作对读操作的可见性以及并发性的提升之外,读写锁能够简化读写交互场景的编程方式。假设在程序中定义一个共享的用作缓存数据结构,它大部分时间提供读服务(例如查询和搜索),而写操作占有的时间很少,但是写操作完成之后的更新需要对后续的读服务可见。
在没有读写锁支持的(Java 5之前)时候,如果需要完成上述工作就要使用Java的等待通知机制,就是当写操作开始时,所有晚于写操作的读操作均会进入等待状态,只有写操作完成并进行通知之后,所有等待的读操作才能继续执行(写操作之间依靠synchronized关键进行同步),这样做的目的是使读操作能读取到正确的数据,不会出现脏读。改用读写锁实现上述功能,只需要在读操作时获取读锁,写操作时获取写锁即可。当写锁被获取到时,后续(非当前写操作线程)的读写操作都会被阻塞,写锁释放之后,所有操作继续执行,编程方式相对于使用等待通知机制的实现方式而言,变得简单明了。
一般情况下,读写锁的性能都会比排它锁好,因为大多数场景读是多于写的。在读多于写的情况下,读写锁能够提供比排它锁更好的并发性和吞吐量
ReentrantReadWriteLock其实实现的是ReadWriteLock接口
ReentrantReadWriteLock其实实现的是ReadWriteLock接口
使用ReentrantReadWriteLock实现缓存
源代码分析
Condition
Condition接口提供了类似Object的 监视器方法,与Lock配合可以实现等待/通知模式
理解Condition 和 条件变量
Condition使用范式
实现
`CondiitonObject`是同同步器`AbstractQueuedSynchronizer`的内部类 ,因为Condition的操作需要获取相关联的锁,所以作为同步器的内部类也会是比较合理的,每个`Condition`对象都包含着一个队列(等待队列),该队列时`Condition`对象实现`等待/通知`功能的 关键。
分析
Lock和synchronized的简单对比
从层次上,一个是关键字、一个是类,这是最直观的差异
从使用上,lock具备更大的灵活性,可以控制锁的释放和获取;而synchronized的锁的释放是被动的,当出现异常或者同步代码执行完以后,才会释放锁。
lock可以判断锁的状态,而synchronized无法做到
lock可以实现公平锁、非公平锁 ;而synchronized只有非公平锁
阻塞队列
BlockingQueue,是java.util.concurrent 包提供的用于解决并发生产者 - 消费者问题的最有用的类,它的特性是在任意时刻只有一个线程可以进行take或者put操作,并且BlockingQueue提供了超时return null的机制,在许多生产场景里都可以看到这个工具的身影
队列类型
1. 无限队列 (unbounded queue ) - 几乎可以无限增长
2. 有限队列 ( bounded queue ) - 定义了最大容量
队列的数据结构
通常用链表或者数组实现
一般而言队列具备FIFO先进先出的特性,当然也有双端队列(Deque)优先级队列
阻塞队列
ArrayBlockingQueue:基于数组结构的有界阻塞队列(长度不可变);
数组+锁实现
LinkedBlockingQueue:基于链表结构的有界阻塞队列(默认容量 Integer.MAX_VALUE);
链表+锁实现
LinkedTransferQueue:基于链表结构的无界阻塞/传递队列;
LinkedBlockingDeque:基于链表结构的有界阻塞双端队列(默认容量 Integer.MAX_VALUE);
SynchronousQueue:不存储元素的阻塞/传递队列;
PriorityBlockingQueue:支持优先级排序的无界阻塞队列
堆 + 锁
DelayQueue:支持延时获取元素的无界阻塞队列
PriorityQueue + 锁
ArrayBlockingQueue
实现
ReentrantLock
Condition
应用场景
在线程池中有比较多的应用,生产者消费者场景
工作原理
基于ReentrantLock保证线程安全,根据Condition实现队列满时的阻塞
原理分析
构造方法
ArrayBlockingQueue 提供了三个构造方法,分别如下
capacity: 表示数组的长度,也就是队列的长度
fair:表示是否为公平的阻塞队列,默认情况下构造的是非公平的阻塞队列。
源码分析
LinkedBlockingQueue
是一个基于链表的无界队列(理论上有界)
BlockingQueue<String> blockingQueue = new LinkedBlockingQueue<>();
blockingQueue 的容量将设置为 Integer.MAX_VALUE 。
向无限队列添加元素的所有操作都将永远不会阻塞,[注意这里不是说不会加锁保证线程安全],因此它可以增长到非常大的容量。
使用无限 BlockingQueue 设计生产者 - 消费者模型时最重要的是 消费者应该能够像生产者向队列添加消息一样快地消费消息 。否则,内存可能会填满,然后就会得到一个 OutOfMemory 异常。
实现
ReentrantLock + Condition
二叉堆
源码分析
DelayQueue
由优先级堆支持的、基于时间的调度队列,内部基于无界队列PriorityQueue实现,而无界队列基于数组的扩容实现。
队列创建
BlockingQueue<String> blockingQueue = new DelayQueue();
要求
入队的对象必须要实现Delayed接口,而Delayed集成自Comparable接口
应用场景
对缓存超时的数据进行移除
当向缓存中添加key-value对时,如果这个key在缓存中存在并且还没有过期,需要用这个key对应的新过期时间
为了能够让DelayQueue将其已保存的key删除,需要重写实现Delayed接口添加到DelayQueue的DelayedItem的hashCode函数和equals函数
当缓存关闭,监控程序也应关闭,因而监控线程应当用守护线程
任务超时处理(例如订单超时,
饿了么订餐:下单后60s之后给用户发送短信通知))
饿了么订餐:下单后60s之后给用户发送短信通知))
实现方式
使用数据库定时任务,每隔几秒扫描订单表,找出超时订单后关闭
使用spring的@Scheduled注解启动定时任务或者使用Quartz任务管理器,定时触发任务,处理超时订单
使用消息中间件,Active或者RocketMQ提供了延迟消息队列,下单后往延迟消息队列中发消息,超时后,消费端会接收到一条延迟的 订单消息,并做相应处理
使用DelayQueue来实现
其他方式
空闲连接的关闭
使用示例
BlockingQueue API
添加元素
检索元素
ConcurrentLinkedQueue
如何实现一个线程安全的队列?
1.使用阻塞算法
2.使用非阻塞算法
ConcurrentLinkedQueue
入队列
出队列
并发工具类
CyclicBarrier
它允许一组线程互相等待,直到到达某个公共屏障带你
通俗讲: 让一组线程到达一个屏障时被阻塞,知道最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续干活
底层采用ReentrantLock + Condition 实现
API
cyclicBarrier.await();
应用场景
多线程结果的合并的操作,用于多线程计算数据,最后合并计算结果的应用场景
可以用于多线程计算数据,最后合并计算结果的场景。例如,用一个Excel保存了用户所有银行流水,每个Sheet保存一个账户近一年的每笔银行流水,现在需要统计用户的日均银行流水,先用多线程处理每个sheet里的银行流水,都执行完之后,得到每个sheet的日均银行流水,最后,再用barrierAction用这些线程的计算结果,计算出整个Excel的日均银行流水。
示例代码
CountDownLatch
在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待
用给定的计数初始化CountDownLatch,由于调用了countDown()方法,所以在当前计数到达零之前,await()方法会一直受阻塞,之后,会释放所有等待的 线程 ,await的所有后续调用都将立即返回。
和CyclicBarrier的区别?
CountDownLatch 的作用是允许1或者N个线程等待其他线程 完成执行;而CyclicBarrier则是允许N个线程相互等待。
CountDownLatch的计数器无法重置;CyclicBarrier的计数器可以被重置后使用,因此它被 称为是循环的barrier。
内部用共享锁来实现
API
CountDownLatch.countDown()
CountDownLatch.await();
使用场景
Zookeeper分布式锁,Jmeter模拟高并发等
实现最大的并行性:有时我们想同时启动多个线程,实现最大程度的并行性。例如,我们想测试一个单例类。如果我们创建一个初始计数为1的CountDownLatch,并让所有线程都在这个锁上等待,那么我们可以很轻松地完成测试。我们只需调用 一次countDown()方法就可以让所有的等待线程同时恢复执行。
代码示例
用途
确保某个计算在其需要的所有资源都被初始化之后才执行
确保某个服务在其依赖的所有其他服务都已经启动之后才启动
等待直到每个操作的所有参与者都就绪再执行(比如打麻将时需要等待四个玩家就绪)
案例介绍:
主线程中启动多个线程去执行任务,
并且主线程需要等待所有子线程执行完毕后去汇总
并且主线程需要等待所有子线程执行完毕后去汇总
使用join
使用CounDownLatch
CountDownLatch 和 join 的区别
调用子线程的join方法后,该线程会一直被阻塞直到子线程运行完毕,而CountDownLatch则使用计数器来允许子线程运行完毕或者在运行中递减计数,也就是CountDownLatch可以在子线程运行的 任何 时候让await方法返回而不一定必须等到线程结束,
使用线程池来管理线程时一般都是直接添加Runable线程池,这时候就没有办法再调用线程的join方法了,就是说countDownLatch相比join方法让我们对线程同步有更灵活的控制。
Semaphore
信号量的意思,它的作用是控制访问特定资源的线程数目,底层依赖AQS 的状态state,是在生产当中比较常用的一个工具类
怎么使用
构造方法
public Semaphore(int permits)
public Semaphore(int permits, boolean fair)
permits 表示许可线程的数量
fair 表示公平性,如果这个设为 true 的话,下次执行的线程会是等待最久的线程
重要方法
public void acquire() throws InterruptedException
acquire() 表示阻塞并获取许可
public void release()
release() 表示释放许可
需求场景
资源访问,服务限流(Hystrix里限流就有基于信号量方式)。
示例代码
应用场景
流量控制
一个数据库连接池的实现
LockSupport
作用
挂起和唤醒线程
是一个简单的代理类,里面的代码都是使用Unsafe类里面的方法
park()
阻塞线程
unpark()
解除阻塞线程
与wait()、notify()的区别
LockSupport 主要是针对 Thread 进进行阻塞处理,可以指定阻塞队列的目标对象,每次可以指定具体的线程唤醒
Object.wait() 是以对象为纬度,阻塞当前的线程和唤醒单个(随机)或者所有线程。
LockSupport 阻塞和解除阻塞线程直接操作的是 Thread。而 Object 的 wait/notify 并不是直接对线程操作,是被动的方法,需要一个 Object 来进行线程的挂起或唤醒。
Thead 在调用 wait 之前,当前线程必须先获得该对象的监视器(syschronized),被唤醒之后需要重新获取到监视器才能继续执行。而 LockSupport 可以随意进行 park 或者 unpark。
线程池
目的
线程池为线程生命周期的开销和资源不足的问题提供了解决方案,通过对多个任务重用线程,创建线程的开销被分摊到了多个任务上。
什么时候使用线程池
单个任务处理时间比较短
需要处理的任务数量很大
Executor框架
Executor是一个接口,它是Executor框架的基础,它将任务的提交与任务的执行分离开来。
ExecutorService接口继承了Executor,在其上做了一些shutdown()、submit()的扩展,可以说是真正的线程池接口;
AbstractExecutorService抽象类实现了ExecutorService接口中的大部分方法;
ThreadPoolExecutor是线程池的核心实现类,用来执行被提交的任务。
ScheduledExecutorService接口继承了ExecutorService接口,提供了带"周期执行"功能ExecutorService;
ScheduledThreadPoolExecutor是一个实现类,可以在给定的延迟后运行命令,或者定期执行命令。ScheduledThreadPoolExecutor比Timer更灵活,功能更强大。
重要API
execute(Runnable command):履行Ruannable类型的任务
submit(task):可用来提交Callable或Runnable任务,并返回代表此任务的Future
对象
对象
shutdown():在完成已提交的任务后封闭办事,不再接管新任务,
shutdownNow():停止所有正在履行的任务并封闭办事
isTerminated():测试是否所有任务都履行完毕了。
isShutdown():测试是否该ExecutorService已被关闭
三大部分组成
任务
Runnable接口
Callable接口
任务的执行
ThreadPoolExecutor
ScheduledThreadPoolExecutor
异步计算的结果
包括接口Future和实现Future接口的FutureTask类。
线程池原理
流程
1、如果当前运行的线程少于corePoolSize,则创建新线程来执行任务(执行这一步骤要获取全局锁)
2、如果运行的线程等于或多于,则将 任务加入BlockingQueue
3、如果无法将任务将入BlockingQueue(队列已满),则创建新的线程来处理任务(需获取全局锁)
4、如果创建新线程将使当前运行的线程超出maximumPoolSize,任务将被拒绝,并调用RejectedExecutionHandler.rejectedExecution()方法。
ThreadPoolExecutor
构造方法
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
任务提交
execute()方法用于提交不需要返回值的任务,所以无法判断任务是否被线程池执行成功。
submit()方法用于提交需要返回值的任务。线程池会返回一个future类型的对象,通过这个future对象可以判断任务是否执行成功,并且可以通过future的get()方法来获取返回值,get()方法会阻塞当前线程直到任务完成,而使用get(long timeout,TimeUnit unit)方法则会阻塞当前线程一段时间后立即返回,这时候有可能任务没有执行完。
参数解释
corePoolSize
线程池中的核心线程数,当提交一个任务时,线程池创建一个新线程执行任务,直到当前线程数等于corePoolSize;如果当前线程数为corePoolSize,继续提交的任务被保存到阻塞队列中,等待被执行;如果执行了线程池的prestartAllCoreThreads()方法,线程池会提前创建并启动所有核心线程。
maximumPoolSize
线程池中允许的最大线程数。如果当前阻塞队列满了,且继续提交任务,则创建新的线程执行任务,前提是当前线程数小于maximumPoolSize;
keepAliveTime
线程池维护线程所允许的空闲时间。当线程池中的线程数量大于corePoolSize的时候,如果这时没有新的任务提交,核心线程外的线程不会立即销毁,而是会等待,直到等待的时间超过了keepAliveTime;
unit
keepAliveTime的单位
workQueue
用来保存等待被执行的任务的阻塞队列,且任务必须实现Runable接口,在JDK中提供了如下阻塞队列:
1、ArrayBlockingQueue:基于数组结构的有界阻塞队列,按FIFO排序任务;
2、LinkedBlockingQuene:基于链表结构的阻塞队列,按FIFO排序任务,吞吐量通常要高于ArrayBlockingQuene
3、SynchronousQuene:一个不存储元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQuene;
4、priorityBlockingQuene:具有优先级的无界阻塞队列;
threadFactory
它是ThreadFactory类型的变量,用来创建新线程。默认使用Executors.defaultThreadFactory() 来创建线程。使用默认的ThreadFactory来创建线程时,会使新创建的线程具有相同的NORM_PRIORITY优先级并且是非守护线程,同时也设置了线程的名称。
可以通过线程工厂给每个创建出来的线程设置更有意义的名字。线程池的命名时通过给这个factory增加组前缀来实现的。在虚拟机栈分析时,就可以知道线程任务由哪个线程工厂产生的。
使用 Google Guava设置线程名字
自定义实现:ThreadFactory
handler
线程池的饱和策略,当阻塞队列满了,且没有空闲的工作线程,如果继续提交任务,必须采取一种策略处理该任务,线程池提供了4种策略
1、AbortPolicy:直接抛出异常,默认策略;
2、CallerRunsPolicy:用调用者所在的线程来执行任务;
3、DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务;
4、DiscardPolicy:直接丢弃任务;
当然也可以根据应用场景实现RejectedExecutionHandler接口,自定义饱和策略,如记录日志或持久化存储不能处理的任务。
友好的拒绝策略
保存到数据库进行削峰填谷。在 空闲时再提取出来执行
转向某个提示页
打印日志
关闭线程池
可以通过调用线程池的shutdown或shutdownNow方法来关闭线程池。它们的原理是遍历线程池中的工作线程,然后逐个调用线程的interrupt方法来中断线程,所以无法响应中断的任务可能永远无法终止。但是它们存在一定的区别,shutdownNow首先将线程池的状态设置成STOP,然后尝试停止所有的正在执行或暂停任务的线程,并返回等待执行任务的列表,而shutdown只是将线程池的状态设置成SHUTDOWN状态,然后中断所有没有正在执行任务的线程。
只要调用了这两个关闭方法中的任意一个,isShutdown方法就会返回true。当所有的任务都已关闭后,才表示线程池关闭成功,这时调用isTerminaed方法会返回true。至于应该调用哪一种方法来关闭线程池,应该由提交到线程池的任务特性决定,通常调用shutdown方法来关闭线程池,如果任务不一定要执行完,则可以调用shutdownNow方法。
线程池监控
可以监控以下属性
taskCount
线程池需要执行的任务数量
completedTaskCount
线程池在运行过程中已完成的任务数量
largestPoolSize
线程池里曾经创建过的最大线程数量
getPoolSize
线程池的线程数量
getActiveCount
获取活动的线程数
监控的代码示例
public long getTaskCount() //线程池已执行与未执行的任务总数
public long getCompletedTaskCount() //已完成的任务数
public int getPoolSize() //线程池当前的线程数
public int getActiveCount() //线程池中正在执行任务的线程数量
源码分析
定时线程池
提交定时任务
public ScheduledFuture schedule(Runnable command, long delay, TimeUnit unit)
//向定时任务线程池提交一个延时Runnable任务(仅执行一次)
public ScheduledFuture schedule(Callable callable, long delay, TimeUnit unit);
public
//向定时任务线程池提交一个延时的Callable任务(仅执行一次)
public ScheduledFuture scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)
public ScheduledFuture scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)
//向定时任务线程池提交一个固定时间间隔执行的任务
public ScheduledFuture scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);
public ScheduledFuture scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);
//向定时任务线程池提交一个固定延时间隔执行的任务
固定时间间隔的任务不论每次任务花费多少时间,下次任务开始执行时间从理论上讲是确定的,当然执行任务的时间不能超过执行周期。
固定延时间隔的任务是指每次执行完任务以后都延时一个固定的时间。由于操作系统调度以及每次任务执行的语句可能不同,所以每次任务执行所花费的时间是不确定的,也就导致了每次任务的执行周期存在一定的波动。
定时任务超时问题
scheduleAtFixedRate中,若任务处理时长超出设置的定时频率时长,本次任务执行完才开始下次任务,下次任务已经处于超时状态,会马上开始执行。
若任务处理时长小于定时频率时长,任务执行完后,定时器等待,下次任务会在定时器等待频率时长后执行。
设置定时任务每60s执行一次,那么从理论上应该第一次任务在第0s开始,第二次任务在第60s开始,第三次任务在120s开始,但实际运行时第一次任务时长80s,第二次任务时长30s,第三次任务时长50s,则实际运行结果为:
第一次任务第0s开始,第80s结束;
第二次任务第80s开始,第110s结束(上次任务已超时,本次不会再等待60s,会马上开始);
第三次任务第120s开始,第170s结束.
第四次任务第180s开始.....
源码分析
使用线程池的注意事项
合理设置各类参数,应根据实际业务场景来设置合理的工作线程数
任务的性质
CPU密集型任务
尽可能小
IO 密集型任务
按照CPU核心数的倍数去设置
混合型任务
任务的优先级
高
中
低
任务的时间执行长短
长
中
短
任务的依赖性
是否依赖其他系统资源,如数据库连接
获取CPU个数
线程资源必须通过线程池提供,不允许在应用中自行显示创建线程
创建线程或线程池时请指定有意义的线程名字,方便出错时回溯
建议使用有界队列。有界队列能增加系统的稳定性和预警能力,可以根据需要设大一点儿,比如几千。
优先级不同的任务可以使用优先级队列PriorityBlockingQueue来处理。它可以让优先级高的任务先执行。
执行时间不同的任务可以交给不同规模的线程池来处理,或者可以使用优先级队列,让执行时间短的任务先执行。
依赖数据库连接池的任务,因为线程提交SQL后需要等待数据库返回结果,等待的时间越长,则CPU空闲时间就越长,那么线程数应该设置得越大,这样才能更好地利用CPU。
假设,我们现在有一个Web系统,里面使用了线程池来处理业务,在某些情况下,系统里后台任务线程池的队列和线程池全满了,不断抛出抛弃任务的异常,通过排查发现是数据库出现了问题,导致执行SQL变得非常缓慢,因为后台任务线程池里的任务全是需要向数据库查询和插入数据的,所以导致线程池里的工作线程全部阻塞,任务积压在线程池里。
如果当时我们设置成无界队列,那么线程池的队列就会越来越多,有可能会撑满内存,导致整个系统不可用,而不只是后台任务出现问题。
系统预定义的线程池
FixedThreadPool详解
创建使用固定线程数的FixedThreadPool的API。适用于为了满足资源管理的需求,而需要限制当前线程数量的应用场景,它适用于负载比较重的服务器。FixedThreadPool的corePoolSize和maximumPoolSize都被设置为创建FixedThreadPool时指定的参数nThreads。
当线程池中的线程数大于corePoolSize时,keepAliveTime为多余的空闲线程等待新任务的
最长时间,超过这个时间后多余的线程将被终止。这里把keepAliveTime设置为0L,意味着多余的空闲线程会被立即终止。
FixedThreadPool使用有界队列LinkedBlockingQueue作为线程池的工作队列(队列的容量为Integer.MAX_VALUE)。
SingleThreadExecutor
创建使用单个线程的SingleThread-Executor的API,于需要保证顺序地执行各个任务;并且在任意时间点,不会有多个线程是活动的应用场景。
corePoolSize和maximumPoolSize被设置为1。其他参数与FixedThreadPool相同。SingleThreadExecutor使用有界队列LinkedBlockingQueue作为线程池的工作队列(队列的容量为Integer.MAX_VALUE)。
CachedThreadPool
创建一个会根据需要创建新线程的CachedThreadPool的API。大小无界的线程池,适用于执行很多的短期异步任务的小程序,或者是负载较轻的服务器。
corePoolSize被设置为0,即corePool为空;maximumPoolSize被设置为Integer.MAX_VALUE。这里把keepAliveTime设置为60L,意味着CachedThreadPool中的空闲线程等待新任务的最长时间为60秒,空闲线程超过60秒后将会被终止。
FixedThreadPool和SingleThreadExecutor使用有界队列LinkedBlockingQueue作为线程池的工作队列。CachedThreadPool使用没有容量的SynchronousQueue作为线程池的工作队列,但CachedThreadPool的maximumPool是无界的。这意味着,如果主线程提交任务的速度高于maximumPool中线程处理任务的速度时,CachedThreadPool会不断创建新线程。极端情况下,CachedThreadPool会因为创建过多线程而耗尽CPU和内存资源。
ScheduledThreadPoolExecutor
使用工厂类Executors来创建。Executors可以创建2种类型的ScheduledThreadPoolExecutor,如下。
•ScheduledThreadPoolExecutor。包含若干个线程的ScheduledThreadPoolExecutor。
•SingleThreadScheduledExecutor。只包含一个线程的ScheduledThreadPoolExecutor。
ScheduledThreadPoolExecutor适用于需要多个后台线程执行周期任务,同时为了满足资源管理的需求而需要限制后台线程的数量的应用场景。
SingleThreadScheduledExecutor适用于需要单个后台线程执行周期任务,同时需要保证顺序地执行各个任务的应用场景。
CompletionService
CompletionService实际上可以看做是Executor和BlockingQueue的结合体。CompletionService在接收到要执行的任务时,通过类似BlockingQueue的put和take获得任务执行的结果。
CompletionService的一个实现是ExecutorCompletionService,ExecutorCompletionService把具体的计算任务交给Executor完成。
在实现上,ExecutorCompletionService在构造函数中会创建一个BlockingQueue(使用的基于链表的LinkedBlockingQueue),该BlockingQueue的作用是保存Executor执行的结果。
当提交一个任务到ExecutorCompletionService时,首先将任务包装成QueueingFuture,它是FutureTask的一个子类,然后改写FutureTask的done方法,之后把Executor执行的计算结果放入BlockingQueue中。
与ExecutorService最主要的区别在于submit的task不一定是按照加入时的顺序完成的。CompletionService对ExecutorService进行了包装,内部维护一个保存Future对象的BlockingQueue。只有当这个Future对象状态是结束的时候,才会加入到这个Queue中,take()方法其实就是Producer-Consumer中的Consumer。它会从Queue中取出Future对象,如果Queue是空的,就会阻塞在那里,直到有完成的Future对象加入到Queue中。所以,先完成的必定先被取出。这样就减少了不必要的等待时间。
示例代码
使用方法一,自己创建一个集合来保存Future存根并循环调用其返回结果的时候,主线程并不能保证首先获得的是最先完成任务的线程返回值。它只是按加入线程池的顺序返回。因为take方法是阻塞方法,后面的任务完成了,前面的任务却没有完成,主程序就那样等待在那儿,只到前面的完成了,它才知道原来后面的也完成了。
使用方法二,使用CompletionService来维护处理线程不的返回结果时,主线程总是能够拿到最先完成的任务的返回值,而不管它们加入线程池的顺序。
FutureTask
Future除了实现Future接口外,还实现了Runnable接口。因此,FutureTask还可以提交给Executor执行,也可以由调用线程直接执行。
Future模式实现方式
Data
FutureClient
FutureData
RealData
FutureTask可以处于下面3中状态
未启动
FutureTask.run()方法还没有被执行之前,FutureTask处于未启动状态。当创建一个FutureTask,且没有执行FutureTask.run()方法之前,这个FutureTask处于未启动状态。
已启动
FutureTask.run()方法被执行的过程中,FutureTask处于已启动状态
已完成
FutureTask.run()方法执行完成后正常结束,或被取消或异常结束,FutureTask处于已完成状态
当一个线程需要等待另外一个线程把某个任务执行完后它才能继续被执行,此时可以使用FutureTask。
假设有多个线程执行 若干任务,每个任务最多只能被执行一次。当多个线程试图同时执行同一个任务时,只允许一个线程执行任务,其他线程需要等待这个任务执行完后才能继续执行。
atomic
什么是原子操作
原子(atom)本意是“不能被进一步分割的最小粒子”,而原子操作(atomic operation)意为”不可被中断的一个或一系列操作” 。在多处理器上实现原子操作就变得有点复杂。本文让我们一起来聊一聊在Inter处理器和Java里是如何实现原子操作的。
相关术语
缓存行
Cache line
缓存的最小操作单位
比较并交换
Compare and Swap
CAS操作需要输入两个数值,一个旧值(期望操作前的值)和一个新值,在操作期间先比较下在旧值有没有发生变化,如果没有发生变化,才交换成新值,发生了变化则不交换。
CPU流水线
CPU pipeline
CPU流水线的工作方式就象工业生产上的装配流水线,在CPU中由5~6个不同功能的电路单元组成一条指令处理流水线,然后将一条X86指令分成5~6步后再由这些电路单元分别执行,这样就能实现在一个CPU时钟周期完成一条指令,因此提高CPU的运算速度。
内存顺序冲突
Memory order violation
内存顺序冲突一般是由假共享引起,假共享是指多个CPU同时修改同一个缓存行的不同部分而引起其中一个CPU的操作无效,当出现这个内存顺序冲突时,CPU必须清空流水线。
处理器如何实现原子操作
32位IA-32处理器使用基于对缓存加锁或总线加锁的方式来实现多处理器之间的原子操作。
1、处理器自动保证基本内存操作的原子性
首先处理器会自动保证基本的内存操作的原子性。处理器保证从系统内存当中读取或者写入一个字节是原子的,意思是当一个处理器读取一个字节时,其他处理器不能访问这个字节的内存地址。奔腾6和最新的处理器能自动保证单处理器对同一个缓存行里进行16/32/64位的操作是原子的,但是复杂的内存操作处理器不能自动保证其原子性,比如跨总线宽度,跨多个缓存行,跨页表的访问。但是处理器提供总线锁定和缓存锁定两个机制来保证复杂内存操作的原子性。
2、使用总线锁保证原子性
第一个机制是通过总线锁保证原子性。如果多个处理器同时对共享变量进行读改写(i++就是经典的读改写操作)操作,那么共享变量就会被多个处理器同时进行操作,这样读改写操作就不是原子的,操作完之后共享变量的值会和期望的不一致,举个例子:如果i=1,我们进行两次i++操作,我们期望的结果是3,但是有可能结果是2。如下图
原因是有可能多个处理器同时从各自的缓存中读取变量i,分别进行加一操作,然后分别写入系统内存当中。那么想要保证读改写共享变量的操作是原子的,就必须保证CPU1读改写共享变量的时候,CPU2不能操作缓存了该共享变量内存地址的缓存。
处理器使用总线锁就是来解决这个问题的。所谓总线锁就是使用处理器提供的一个LOCK#信号,当一个处理器在总线上输出此信号时,其他处理器的请求将被阻塞住,那么该处理器可以独占使用共享内存
3、使用缓存锁保证原子性
第二个机制是通过缓存锁定保证原子性。在同一时刻我们只需保证对某个内存地址的操作是原子性即可,但总线锁定把CPU和内存之间通信锁住了,这使得锁定期间,其他处理器不能操作其他内存地址的数据,所以总线锁定的开销比较大,最近的处理器在某些场合下使用缓存锁定代替总线锁定来进行优化。
频繁使用的内存会缓存在处理器的L1,L2和L3高速缓存里,那么原子操作就可以直接在处理器内部缓存中进行,并不需要声明总线锁,在奔腾6和最近的处理器中可以使用“缓存锁定”的方式来实现复杂的原子性。所谓“缓存锁定”就是如果缓存在处理器缓存行中内存区域在LOCK操作期间被锁定,当它执行锁操作回写内存时,处理器不在总线上声言LOCK#信号,而是修改内部的内存地址,并允许它的缓存一致性机制来保证操作的原子性,因为缓存一致性机制会阻止同时修改被两个以上处理器缓存的内存区域数据,当其他处理器回写已被锁定的缓存行的数据时会起缓存行无效,在例1中,当CPU1修改缓存行中的i时使用缓存锁定,那么CPU2就不能同时缓存了i的缓存行。
但是有两种情况下处理器不会使用缓存锁定。第一种情况是:当操作的数据不能被缓存在处理器内部,或操作的数据跨多个缓存行(cache line),则处理器会调用总线锁定。第二种情况是:有些处理器不支持缓存锁定。对于Inter486和奔腾处理器,就算锁定的内存区域在处理器的缓存行中也会调用总线锁定。
以上两个机制我们可以通过Inter处理器提供了很多LOCK前缀的指令来实现。比如位测试和修改指令BTS,BTR,BTC,交换指令XADD,CMPXCHG和其他一些操作数和逻辑指令,比如ADD(加),OR(或)等,被这些指令操作的内存区域就会加锁,导致其他处理器不能同时访问它。
Java当中如何实现原子操作
在java中可以通过锁和循环CAS的方式来实现原子操作。
JVM中的CAS操作正是利用了上文中提到的处理器提供的CMPXCHG指令实现的。自旋CAS实现的基本思路就是循环进行CAS操作直到成功为止
Atomic
在Atomic包里一共有12个类,四种原子更新方式,分别是原子更新基本类型,原子更新数组,原子更新引用和原子更新字段。Atomic包里的类基本都是使用Unsafe实现的包装类。
基本类:AtomicInteger、AtomicLong、AtomicBoolean;
引用类型:AtomicReference、AtomicReference的ABA实例、AtomicStampedRerence、AtomicMarkableReference;
数组类型:AtomicIntegerArray、AtomicLongArray、AtomicReferenceArray
属性原子修改器(Updater):AtomicIntegerFieldUpdater、AtomicLongFieldUpdater、AtomicReferenceFieldUpdater
1、原子更新基本类型类
用于通过原子的方式更新基本类型,Atomic包提供了以下三个类
AtomicBoolean:原子更新布尔类型
AtomicInteger:原子更新整型
AtomicLong:原子更新长整型
AtomicInteger的常用方法如下
int addAndGet(int delta) :以原子方式将输入的数值与实例中的值(AtomicInteger里的value)相加,并返回结果
boolean compareAndSet(int expect, int update) :如果输入的数值等于预期值,则以原子方式将该值设置为输入的值。
int getAndIncrement():以原子方式将当前值加1,注意:这里返回的是自增前的值。
void lazySet(int newValue):最终会设置成newValue,使用lazySet设置值后,可能导致其他线程在之后的一小段时间内还是可以读到旧的值。
int getAndSet(int newValue):以原子方式设置为newValue的值,并返回旧值。
Atomic包提供了三种基本类型的原子更新,但是Java的基本类型里还有char,float和double等。那么问题来了,如何原子的更新其他的基本类型呢?Atomic包里的类基本都是使用Unsafe实现的,Unsafe只提供了三种CAS方法,compareAndSwapObject,compareAndSwapInt和compareAndSwapLong,再看AtomicBoolean源码,发现其是先把Boolean转换成整型,再使用compareAndSwapInt进行CAS,所以原子更新double也可以用类似的思路来实现。
2、原子更新数组类
通过原子的方式更新数组里的某个元素,Atomic包提供了以下三个类
AtomicIntegerArray:原子更新整型数组里的元素
AtomicLongArray:原子更新长整型数组里的元素
AtomicReferenceArray:原子更新引用类型数组里的元素
其常用方法如下
int addAndGet(int i, int delta):以原子方式将输入值与数组中索引i的元素相加。
boolean compareAndSet(int i, int expect, int update):如果当前值等于预期值,则以原子方式将数组位置i的元素设置成update值。
3、原子更新引用类型
原子更新基本类型的AtomicInteger,只能更新一个变量,如果要原子的更新多个变量,就需要使用这个原子更新引用类型提供的类。
Atomic包提供了以下三个类:
AtomicReference:原子更新引用类型
AtomicReferenceFieldUpdater:原子更新引用类型里的字段
AtomicMarkableReference:原子更新带有标记位的引用类型
4、原子更新字段类
如果我们只需要某个类里的某个字段,那么就需要使用原子更新字段类
Atomic包提供了以下三个类
AtomicIntegerFieldUpdater:原子更新整型的字段的更新器
AtomicLongFieldUpdater:原子更新长整型字段的更新器
AtomicStampedReference:原子更新带有版本号的引用类型
该类将整数值与引用关联起来,可用于原子的更数据和数据的版本号,可以解决使用CAS进行原子更新时,可能出现的ABA问题。
原子更新字段类都是抽象类,每次使用都时候必须使用静态方法newUpdater创建一个更新器。原子更新类的字段的必须使用public volatile修饰符
JDK8新增的 原子操作类LongAddr,用来代替AtomicLong
AtimicLong通过CAS提供了非阻塞的原子性操作,相比使用阻塞算法的同步器来说它的性能已经非常好了。
使用AtomicLong时,在高并发下大量线程会同时去 竞争更新同一个原子变量,但是由于同时只有一个线程的CAS操作会成功,这就造成了大量线程竞争失败后,会通过无线循环不断进行自旋尝试CAS的操作,而这会白白浪费CPU 的 资源。
JDK8新增了一个原子性递增或者递减了LongAdder用来克服在高并发下使用AtomicLong的缺点
LongAddr思路:把一个变量分解为多个变量,让同样多的线程去竞争多个资源,不就解决了性能问题。
AtomicLong:多个线程同时竞争同一个原子变量
LongAddr
使用LongAddr时,则是在内部维护多个cell变量,每个cell里面有一个初始值为0的long性型变量,这样,在同等并发量的情况下,争夺单个变量更新操作的线程量会减少,这变相地减少了争夺共享资源的并发量。
多个线程在争夺同一个Cell原子变量时,如果失败了,它并不是在当前Cell变量上一直自旋CAS重试,而是尝试在其他Cell的变量上进行CAS尝试,这个改变增加了当前线程重试CAS成功的可能性,最后,在获取LongAddr当前值时,是把所有cell变量的value值累加后再加上base返回值的
Java 并发集合
HashMap
数据结构
数组+链表+(红黑树jdk>=8)
源码分析
ConcurrentHashMap
ConcurrentHashMap的数据结构与HashMap基本类似,区别在于:1、内部在数据写入时加了同步机制(分段锁)保证线程安全,读操作是无锁操作;2、扩容时老数据的转移是并发执行的,这样扩容的效率更高。
Java7
基于ReentrantLock实现分段锁
Java8
分段锁+CAS保证线程安全
分段锁基于synchronized关键字实现
源码分析
ConcurrentHashMap实现分析
在1.7下的实现
ConcurrentHashMap是由Segment数组结构和HashEntry数组结构组成。Segment是一种可重入锁(ReentrantLock),在ConcurrentHashMap里扮演锁的角色;HashEntry则用于存储键值对数据。一个ConcurrentHashMap里包含一个Segment数组。Segment的结构和HashMap类似,是一种数组和链表结构。一个Segment里包含一个HashEntry数组,每个HashEntry是一个链表结构的元素,每个Segment守护着一个HashEntry数组里的元素,当对HashEntry数组的数据进行修改时,必须首先获得与它对应的Segment锁。
构造方法和初始化
ConcurrentHashMap初始化方法是通过initialCapacity、loadFactor和concurrencyLevel(参数concurrencyLevel是用户估计的并发级别,就是说你觉得最多有多少线程共同修改这个map,根据这个来确定Segment数组的大小concurrencyLevel默认是DEFAULT_CONCURRENCY_LEVEL = 16;)等几个参数来初始化segment数组、段偏移量segmentShift、段掩码segmentMask和每个segment里的HashEntry数组来实现的。
并发级别可以理解为程序运行时能够同时更新ConccurentHashMap且不产生锁竞争的最大线程数,实际上就是ConcurrentHashMap中的分段锁个数,即Segment[]的数组长度。ConcurrentHashMap默认的并发度为16,但用户也可以在构造函数中设置并发度。当用户设置并发度时,ConcurrentHashMap会使用大于等于该值的最小2幂指数作为实际并发度(假如用户设置并发度为17,实际并发度则为32)。
如果并发度设置的过小,会带来严重的锁竞争问题;如果并发度设置的过大,原本位于同一个Segment内的访问会扩散到不同的Segment中,CPU cache命中率会下降,从而引起程序性能下降。(文档的说法是根据你并发的线程数量决定,太多会导性能降低)
segments数组的长度ssize是通过concurrencyLevel计算得出的。为了能通过按位与的散列算法来定位segments数组的索引,必须保证segments数组的长度是2的N次方(power-of-two size),所以必须计算出一个大于或等于concurrencyLevel的最小的2的N次方值来作为segments数组的长度。假如concurrencyLevel等于14、15或16,ssize都会等于16,即容器里锁的个数也是16。
在默认情况下, ssize = 16,initialCapacity = 16,loadFactor = 0.75f,那么cap = 1,threshold = (int) cap * loadFactor = 0。
既然ConcurrentHashMap使用分段锁Segment来保护不同段的数据,那么在插入和获取元素的时候,必须先通过散列算法定位到Segment。ConcurrentHashMap会首先使用Wang/Jenkins hash的变种算法对元素的hashCode进行一次再散列。
ConcurrentHashMap完全允许多个读操作并发进行,读操作并不需要加锁。ConcurrentHashMap实现技术是保证HashEntry几乎是不可变的以及volatile关键字。
get操作
get操作先经过一次再散列,然后使用这个散列值通过散列运算定位到Segment(使用了散列值的高位部分),再通过散列算法定位到table(使用了散列值的全部)。整个get过程,没有加锁,而是通过volatile保证get总是可以拿到最新值。
put操作
ConcurrentHashMap 初始化的时候会初始化第一个槽 segment[0],对于其他槽,在插入第一个值的时候再进行初始化。
ensureSegment方法考虑了并发情况,多个线程同时进入初始化同一个槽 segment[k],但只要有一个成功就可以了。
具体实现是
put方法会通过tryLock()方法尝试获得锁,获得了锁,node为null进入try语句块,没有获得锁,调用scanAndLockForPut方法自旋等待获得锁。
scanAndLockForPut方法里在尝试获得锁的过程中会对对应hashcode的链表进行遍历,如果遍历完毕仍然找不到与key相同的HashEntry节点,则为后续的put操作提前创建一个HashEntry。当tryLock一定次数后仍无法获得锁,则通过lock申请锁。
在获得锁之后,Segment对链表进行遍历,如果某个HashEntry节点具有相同的key,则更新该HashEntry的value值,
否则新建一个HashEntry节点,采用头插法,将它设置为链表的新head节点并将原头节点设为新head的下一个节点。新建过程中如果节点总数(含新建的HashEntry)超过threshold,则调用rehash()方法对Segment进行扩容,最后将新建HashEntry写入到数组中。
rehash操作
扩容是新创建了数组,然后进行迁移数据,最后再将 newTable 设置给属性 table。
为了避免让所有的节点都进行复制操作:由于扩容是基于2的幂指来操作,假设扩容前某HashEntry对应到Segment中数组的index为i,数组的容量为capacity,那么扩容后该HashEntry对应到新数组中的index只可能为i或者i+capacity,因此很多HashEntry节点在扩容前后index可以保持不变。
假设原来table长度为4,那么元素在table中的分布是这样的
扩容后table长度变为8,那么元素在table中的分布变成:
可以看见 hash值为34和56的下标保持不变,而15,23,77的下标都是在原来下标的基础上+4即可,可以快速定位和减少重排次数。
该方法没有考虑并发,因为执行该方法之前已经获取了锁。
该方法没有考虑并发,因为执行该方法之前已经获取了锁。
remove操作
与put方法类似,都是在操作前需要拿到锁,以保证操作的线程安全性。
ConcurrentHashMap的弱一致性
对链表遍历判断是否存在key相同的节点以及获得该节点的value。但由于遍历过程中其他线程可能对链表结构做了调整,因此get和containsKey返回的可能是过时的数据,这一点是ConcurrentHashMap在弱一致性上的体现。如果要求强一致性,那么必须使用Collections.synchronizedMap()方法。
size、containsValue
这些方法都是基于整个ConcurrentHashMap来进行操作的,他们的原理也基本类似:首先不加锁循环执行以下操作:循环所有的Segment,获得对应的值以及所有Segment的modcount之和。在 put、remove 和 clean 方法里操作元素前都会将变量 modCount 进行变动,如果连续两次所有Segment的modcount和相等,则过程中没有发生其他线程修改ConcurrentHashMap的情况,返回获得的值。
当循环次数超过预定义的值时,这时需要对所有的Segment依次进行加锁,获取返回值后再依次解锁。所以一般来说,应该避免在多线程环境下使用size和containsValue方法。
当循环次数超过预定义的值时,这时需要对所有的Segment依次进行加锁,获取返回值后再依次解锁。所以一般来说,应该避免在多线程环境下使用size和containsValue方法。
在1.8下的实现
改进一:取消segments字段,直接采用transient volatile HashEntry[] table保存数据,采用table数组元素作为锁,从而实现了对缩小锁的粒度,进一步减少并发冲突的概率,并大量使用了采用了 CAS + synchronized 来保证并发安全性。
改进二:将原先table数组+单向链表的数据结构,变更为table数组+单向链表+红黑树的结构。对于hash表来说,最核心的能力在于将key hash之后能均匀的分布在数组中。如果hash之后散列的很均匀,那么table数组中的每个队列长度主要为0或者1。但实际情况并非总是如此理想,虽然ConcurrentHashMap类默认的加载因子为0.75,但是在数据量过大或者运气不佳的情况下,还是会存在一些队列长度过长的情况,如果还是采用单向列表方式,那么查询某个节点的时间复杂度为O(n);因此,对于个数超过8(默认值)的列表,jdk1.8中采用了红黑树的结构,那么查询的时间复杂度可以降低到O(logN),可以改进性能。
使用 Node(1.7 为 Entry) 作为链表的数据结点,仍然包含 key,value,hash 和 next 四个属性。 红黑树的情况使用的是 TreeNode(extends Node)。
根据数组元素中,第一个结点数据类型是 Node 还是 TreeNode 可以判断该位置下是链表还是红黑树。
用于判断是否需要将链表转换为红黑树的阈值
用于判断是否需要将红黑树转换为链表的阈值
核心数据结构和属性
Node
Node是最核心的内部类,它包装了key-value键值对
定义基本和1.7中的HashEntry相同。而这个map本身所持有的也是一个Node型的数组
增加了一个find方法来用以辅助map.get()方法。其实就是遍历链表,子类中会覆盖这个方法。
在map中还定义了Segment这个类,不过只是为了向前兼容而已,不做过多考虑。
TreeNode
树节点类,另外一个核心的数据结构。当链表长度过长的时候,会转换为TreeNode。
与1.8中HashMap不同点:
1、它并不是直接转换为红黑树,而是把这些结点放在TreeBin对象中,由TreeBin完成对红黑树的包装。
2、TreeNode在ConcurrentHashMap扩展自Node类,而并非HashMap中的扩展自LinkedHashMap.Entry类,也就是说TreeNode带有next指针。
TreeBin
负责TreeNode节点。它代替了TreeNode的根节点,也就是说在实际的ConcurrentHashMap“数组”中,存放的是TreeBin对象,而不是TreeNode对象。另外这个类还带有了读写锁机制。
特殊的ForwardingNode
一个特殊的 Node 结点,hash 值为 -1,其中存储 nextTable 的引用。有 table 发生扩容的时候,ForwardingNode 发挥作用,作为一个占位符放在 table 中表示当前结点为 null 或者已经被移动。
sizeCtl
用来控制 table 的初始化和扩容操作。
负数代表正在进行初始化或扩容操作
-1代表正在初始化
-N 表示有N-1个线程正在进行扩容操作
0为默认值,代表当时的table还没有被初始化
正数表示初始化大小或Map中的元素达到这个数量时,需要进行扩容了。
核心方法
构造方法
可以发现,在new出一个map的实例时,并不会创建其中的数组等等相关的部件,只是进行简单的属性设置而已,同样的,table的大小也被规定为必须是2的乘方数。
真正的初始化在放在了是在向ConcurrentHashMap中插入元素的时候发生的。如调用put、computeIfAbsent、compute、merge等方法的时候,调用时机是检查table==null。
get操作
get方法比较简单,给定一个key来确定value的时候,必须满足两个条件 key相同 hash值相同,对于节点可能在链表或树上的情况,需要分别去查找。
put操作
总结来说,put方法就是,沿用HashMap的put方法的思想,根据hash值计算这个新插入的点在table中的位置i,如果i位置是空的,直接放进去,否则进行判断,如果i位置是树节点,按照树的方式插入新的节点,否则把i插入到链表的末尾。
整体流程上,就是首先定义不允许key或value为null的情况放入 对于每一个放入的值,首先利用spread方法对key的hashcode进行一次hash计算,由此来确定这个值在table中的位置。
如果这个位置是空的,那么直接放入,而且不需要加锁操作。
如果这个位置存在结点,说明发生了hash碰撞,首先判断这个节点的类型。如果是链表节点,则得到的结点就是hash值相同的节点组成的链表的头节点。需要依次向后遍历确定这个新加入的值所在位置。如果遇到hash值与key值都与新加入节点是一致的情况,则只需要更新value值即可。否则依次向后遍历,直到链表尾插入这个结点。如果加入这个节点以后链表长度大于8,就把这个链表转换成红黑树。如果这个节点的类型已经是树节点的话,直接调用树节点的插入方法进行插入新的值。
remove
移除方法的基本流程和put方法很类似,只不过操作由插入数据变为移除数据而已,而且如果存在红黑树的情况下,会检查是否需要将红黑树转为链表的步骤。不再重复讲述。
treeifyBin
用于将过长的链表转换为TreeBin对象。但是他并不是直接转换,而是进行一次容量判断,如果容量没有达到转换的要求,直接进行扩容操作并返回;如果满足条件才将链表的结构转换为TreeBin ,这与HashMap不同的是,它并没有把TreeNode直接放入红黑树,而是利用了TreeBin这个小容器来封装所有的TreeNode。
size
在JDK1.8版本中,对于size的计算,在扩容和addCount()方法就已经有处理了,可以注意一下Put函数,里面就有addCount()函数,早就计算好的,然后你size的时候直接给你。JDK1.7是在调用size()方法才去计算,其实在并发集合中去计算size是没有多大的意义的,因为size是实时在变的。
在具体实现上,计算大小的核心方法都是 sumCount()
可以看见,统计数量时使用了 baseCount、和CounterCell 类型的变量counterCells 。其实baseCount就是记录容器数量的,而counterCells则是记录CAS更新baseCounter值时,由于高并发而导致失败的值。这两个变量的变化在addCount() 方法中有体现,大致的流程就是:
1、对 baseCount 做 CAS 自增操作。
2、如果并发导致 baseCount CAS 失败了,则使用 counterCells。
3、如果counterCells CAS 失败了,在 fullAddCount 方法中,会继续死循环操作,直到成功。
CopyOnWrite机制
核心思想
读写分离,空间换时间,避免为保证并发安全导致的激烈的锁竞争。
关键点
1、CopyOnWrite适用于读多写少的情况,最大程度的提高读的效率;
2、CopyOnWrite是最终一致性,在写的过程中,原有的读的数据是不会发生更新的,只有新的读才能读到最新数据;
3、如何使其他线程能够及时读到新的数据,需要使用volatile变量;
4、写的时候不能并发写,需要对写操作进行加锁;
CopyOnWrite的应用场景
CopyOnWrite并发容器用于读多写少的并发场景。比如白名单,黑名单,商品类目的访问和更新场景,假如我们有一个搜索网站,用户在这个网站的搜索框中,输入关键字搜索内容,但是某些关键字不允许被搜索。这些不能被搜索的关键字会被放在一个黑名单当中,黑名单每天晚上更新一次。当用户搜索时,会检查当前关键字在不在黑名单当中,如果在,则提示不能搜索。实现代码如下:
缺点
内存占用问题
因为CopyOnWrite的写时复制机制,所以在进行写操作的时候,内存里会同时驻扎两个对象的内存,旧的对象和新写入的对象(注意:在复制的时候只是复制容器里的引用,只是在写的时候会创建新对象添加到新容器里,而旧容器的对象还在使用,所以有两份对象内存)。如果这些对象占用的内存比较大,比如说200M左右,那么再写入100M数据进去,内存就会占用300M,那么这个时候很有可能造成频繁的Yong GC和Full GC。之前我们系统中使用了一个服务由于每晚使用CopyOnWrite机制更新大对象,造成了每晚15秒的Full GC,应用响应时间也随之变长。
针对内存占用问题,可以通过压缩容器中的元素的方法来减少大对象的内存消耗,比如,如果元素全是10进制的数字,可以考虑把它压缩成36进制或64进制。或者不使用CopyOnWrite容器,而使用其他的并发容器,如ConcurrentHashMap。
针对内存占用问题,可以通过压缩容器中的元素的方法来减少大对象的内存消耗,比如,如果元素全是10进制的数字,可以考虑把它压缩成36进制或64进制。或者不使用CopyOnWrite容器,而使用其他的并发容器,如ConcurrentHashMap。
数据一致性问题
CopyOnWrite容器只能保证数据的最终一致性,不能保证数据的实时一致性。所以如果你希望写入的的数据,马上能读到,请不要使用CopyOnWrite容器
CopyOnWriteMap
JDK中并没有提供CopyOnWriteMap,我们可以参考CopyOnWriteArrayList来实现一个
CopyOnWriteArrayList
CopyOnWriteArraySet
源码
ConcurrentSkipList系列
ConcurrentSkipListMap 有序Map
ConcurrentSkipListSet 有序Set
TreeMap和TreeSet使用红黑树按照key的顺序(自然顺序、自定义顺序)来使得键值对有序存储,但是只能在单线程下安全使用
多线程下想要使键值对按照key的顺序来存储,则需要使用ConcurrentSkipListMap和ConcurrentSkipListSet,分别用以代替TreeMap和TreeSet,存入的数据按key排序。在实现上,ConcurrentSkipListSet 本质上就是ConcurrentSkipListMap。
了解什么是SkipList?
二分查找和AVL树查找
二分查找要求元素可以随机访问,所以决定了需要把元素存储在连续内存。这样查找确实很快,但是插入和删除元素的时候,为了保证元素的有序性,就需要大量的移动元素了。
如果需要的是一个能够进行二分查找,又能快速添加和删除元素的数据结构,首先就是二叉查找树,二叉查找树在最坏情况下可能变成一个链表。
于是,就出现了平衡二叉树,根据平衡算法的不同有AVL树,B-Tree,B+Tree,红黑树等,但是AVL树实现起来比较复杂,平衡操作较难理解,这时候就可以用SkipList跳跃表结构。
什么是跳表
传统意义的单链表是一个线性结构,向有序的链表中插入一个节点需要O(n)的时间,查找操作需要O(n)的时间。
如果我们使用上图所示的跳跃表,就可以减少查找所需时间为O(n/2),因为我们可以先通过每个节点的最上面的指针先进行查找,这样子就能跳过一半的节点。
比如我们想查找50,首先和20比较,大于20之后,在和40进行比较,然后在和70进行比较,发现70大于50,说明查找的点在40和50之间,从这个过程中,我们可以看出,查找的时候跳过了30。
跳跃表其实也是一种通过“空间来换取时间”的一个算法,令链表的每个结点不仅记录next结点位置,还可以按照level层级分别记录后继第level个结点。此法使用的就是“先大步查找确定范围,再逐渐缩小迫近”的思想进行的查找。跳跃表在算法效率上很接近红黑树。
跳跃表又被称为概率,或者说是随机化的数据结构,目前开源软件 Redis 和 lucence都有用到它。
都是线程安全的Map实现,ConcurrentHashMap的性能和存储空间要优于ConcurrentSkipListMap,但是ConcurrentSkipListMap有一个功能: 它会按照键的顺序进行排序。
ConcurrentSkipListMap和TreeMap的不同
ConcurrentSkipListMap是线程安全的有序的哈希表,适用于高并发的场景
ConcurrentSkipListMap和TreeMap,它们虽然都是有序的哈希表。但是,第一,它们的线程安全机制不同,TreeMap是非线程安全的,而ConcurrentSkipListMap是线程安全的。第二,ConcurrentSkipListMap是通过跳表实现的,而TreeMap是通过红黑树实现的。
在非多线程的情况下,应当尽量使用TreeMap。此外对于并发性相对较低的并行程序可以使用Collections.synchronizedSortedMap将TreeMap进行包装,也可以提供较好的效率。对于高并发程序,应当使用ConcurrentSkipListMap,能够提供更高的并发度。
ConcurrentSkipListMap 和 ConcurrentHashMap 不不同
所以在多线程程序中,如果需要对Map的键值进行排序时,请尽量使用ConcurrentSkipListMap,可能得到更好的并发度。
在4线程1.6万数据的条件下,ConcurrentHashMap 存取速度是ConcurrentSkipListMap 的4倍左右。
但ConcurrentSkipListMap有几个ConcurrentHashMap 不能比拟的优点:
1、ConcurrentSkipListMap 的key是有序的。
2、ConcurrentSkipListMap 支持更高的并发。ConcurrentSkipListMap 的存取时间是log(N),和线程数几乎无关。也就是说在数据量一定的情况下,并发的线程越多,ConcurrentSkipListMap越能体现出他的优势。
所以在多线程程序中,如果需要对Map的键值进行排序时,请尽量使用ConcurrentSkipListMap,可能得到更好的并发度。
注意,调用ConcurrentSkipListMap的size时,由于多个线程可以同时对映射表进行操作,所以映射表需要遍历整个链表才能返回元素个数,这个操作是个O(log(n))的操作。
Disruptor
应用背景和介绍
Disruptor是英国外汇交易公司LMAX开发的一个高性能队列,研发的初衷是解决内部的内存队列的延迟问题,而不是分布式队列。基于Disruptor开发的系统单线程能支撑每秒600万订单,2010年在QCon演讲后,获得了业界关注。
据目前资料显示:应用Disruptor的知名项目有如下的一些:Storm, Camel, Log4j2,还有目前的美团点评技术团队也有很多不少的应用,或者说有一些借鉴了它的设计机制。
Disruptor是一个高性能的线程间异步通信的框架,即在同一个JVM进程中的多线程间消息传递。
传统队列问题
在JDK中,Java内部的队列BlockQueue的各种实现,仔细分析可以得知,队列的底层数据结构一般分成三种:数组、链表和堆,堆这里是为了实现带有优先级特性的队列暂且不考虑。
在稳定性和性能要求特别高的系统中,为了防止生产者速度过快,导致内存溢出,只能选择有界队列;同时,为了减少Java的垃圾回收对系统性能的影响,会尽量选择 Array格式的数据结构。这样筛选下来,符合条件的队列就只有ArrayBlockingQueue。但是ArrayBlockingQueue是通过加锁的方式保证线程安全,而且ArrayBlockingQueue还存在伪共享问题,这两个问题严重影响了性能。
ArrayBlockingQueue的这个伪共享问题存在于哪里呢,分析下核心的部分源码,其中最核心的三个成员变量为
是在ArrayBlockingQueue的核心enqueue和dequeue方法中经常会用到的,这三个变量很容易放到同一个缓存行中,进而产生伪共享问题。
高性能的原理
高性能的原理
高性能的原理
引入环形的数组结构:数组元素不会被回收,避免频繁的GC,
无锁的设计:采用CAS无锁方式,保证线程的安全性
属性填充:通过添加额外的无用信息,避免伪共享问题
环形数组结构是整个Disruptor的核心所在。
首先因为是数组,所以要比链表快,而且根据我们对上面缓存行的解释知道,数组中的一个元素加载,相邻的数组元素也是会被预加载的,因此在这样的结构中,cpu无需时不时去主存加载数组中的下一个元素。而且,你可以为数组预先分配内存,使得数组对象一直存在(除非程序终止)。这就意味着不需要花大量的时间用于垃圾回收。此外,不像链表那样,需要为每一个添加到其上面的对象创造节点对象—对应的,当删除节点时,需要执行相应的内存清理操作。环形数组中的元素采用覆盖方式,避免了jvm的GC。
其次结构作为环形,数组的大小为2的n次方,这样元素定位可以通过位运算效率会更高,这个跟一致性哈希中的环形策略有点像。在disruptor中,这个牛逼的环形结构就是RingBuffer,既然是数组,那么就有大小,而且这个大小必须是2的n次方
其实质只是一个普通的数组,只是当放置数据填充满队列(即到达2^n-1位置)之后,再填充数据,就会从0开始,覆盖之前的数据,于是就相当于一个环。
每个生产者首先通过CAS竞争获取可以写的空间,然后再进行慢慢往里放数据,如果正好这个时候消费者要消费数据,那么每个消费者都需要获取最大可消费的下标。
同时,Disruptor 不像传统的队列,分为一个队头指针和一个队尾指针,而是只有一个角标(上图的seq),它属于一个volatile变量,同时也是我们能够不用锁操作就能实现Disruptor的原因之一,而且通过缓存行补充,避免伪共享问题。该指针是通过一直自增的方式来获取下一个可写或者可读数据。
其他
ThreadLocal
一种解决多线程环境下成员变量的问题的方案,但是与线程同步无关,其思路是为每一个线程创建一个单独的变量副本,从而每个线程都可以独立地改变所拥有的变量副本,而不会影响其他线程所对应的副本。
ThreadLocal不是用于解决共享变量的问题的,也不是为了协调线程同步而存在,而是为了方便每个线程处理自己的状态而引入的一个机制。
四个方法
void set(Object value)
设置当前线程的线程局部变量的值。
public Object get()
该方法返回当前线程所对应的线程局部变量。
public void remove()
将当前线程局部变量的值删除,目的是为了减少内存的占用,该方法是JDK 5.0新增的方法。需要指出的是,当线程结束后,对应该线程的局部变量将自动被垃圾回收,所以显式调用该方法清除线程的局部变量并不是必须的操作,但它可以加快内存回收的速度。
protected Object initialValue()
返回该线程局部变量的初始值,该方法是一个protected的方法,显然是为了让子类覆盖而设计的。这个方法是一个延迟调用方法,在线程第1次调用get()或set(Object)时才执行,并且仅执行1次。ThreadLocal中的缺省实现直接返回一个null。
public final static ThreadLocal RESOURCE = new ThreadLocal();RESOURCE代表一个能够存放String类型的ThreadLocal对象。此时不论什么一个线程能够并发访问这个变量,对它进行写入、读取操作,都是线程安全的。
使用方法
代码
存储结构
源码分析
ThreadLocal的核心机制
每个Thread线程内部都有一个Map
Map里面存储线程本地对象(key)和线程的变量副本(value)
但是,Thread内部的Map是由ThreadLocal维护的,由ThreadLocal负责向map获取和设置线程的变量值。
所以对于不同的线程,每次获取副本值时,别的线程并不能获取到当前线程的副本值,形成了副本的隔离,互不干扰。
ThreadLocalMap
实现线程隔离机制的关键
每个Thread内部都有 一个ThreadLocal.ThreadLocalMap类型的成员变量,该成员变量用来存储实际的ThreadLocal变量副本
提供了一种用健值对方式存储每一个线程的变量副本的 方法,key为ThreadLocal对象,value则是对应线程的 变量副本。
public class Thread implements Runnable {
ThreadLocal.ThreadLocalMap threadLocals = null;
}
ThreadLocal.ThreadLocalMap threadLocals = null;
}
可以看到有个Entry内部静态类,它继承了WeakReference,总之它记录了两个信息,一个是ThreadLocal类型,一个是Object类型的值。getEntry方法则是获取某个ThreadLocal对应的值,set方法就是更新或赋值相应的ThreadLocal对应的值。
Entry继承自WeakReference(弱引用,生命周期只能存活到下次GC前),但只有Key是弱引用类型的,Value并非弱引用。
ThreadLocalMap 和 HashMap的区别
ThreadLocalMap结构非常简单,没有next引用,也就是说ThreadLocalMap中解决Hash冲突的方式并非链表的方式,而是采用线性探测的方式,所谓线性探测,就是根据初始key的hashcode值确定元素在table数组中的位置,如果发现这个位置上已经有其他key值的元素被占用,则利用固定的算法寻找一定步长的下个位置,依次判断,直至找到能够存放的位置。
ThreadLocalMap解决Hash冲突的方式就是简单的步长加1或减1,寻找下一个相邻的位置。
ThreadLocalMap的问题
ThreadLocal的原理:每个Thread内部维护着一个ThreadLocalMap,它是一个Map。这个映射表的Key是一个弱引用,其实就是ThreadLocal本身,Value是真正存的线程变量Object。
也就是说ThreadLocal本身并不真正存储线程的变量值,它只是一个工具,用来维护Thread内部的Map,帮助存和取。注意上图的虚线,它代表一个弱引用类型,而弱引用的生命周期只能存活到下次GC前。
建议
每个线程只存一个变量,这样的话所有的线程存放到map中的Key都是相同的ThreadLocal,如果一个线程要保存多个变量,就需要创建多个ThreadLocal,多个ThreadLocal放入Map中时会极大的增加Hash冲突的可能。
set方法
获取当前线程的成员变量map
map非空,则重新将ThreadLocal和新的value副本放入到map中。
map空,则对线程的成员变量ThreadLocalMap进行初始化创建,并将ThreadLocal和value副本放入map中。
get方法
1.获取当前线程的 ThreadLocalMap 对象threadLocals
2.从 map 中获取线程存储的 K-V Entry节点。
3.从Entry节点获取存储的Value副本值返回。
3.从Entry节点获取存储的Value副本值返回。
remove()方法
ThreadLocal为什么会内存泄漏
ThreadLocal 在 ThreadLocalMap 中是以一个弱引用身份被Entry中的Key引用的,因此如果ThreadLocal没有外部强引用来引用它,那么ThreadLocal会在下次JVM垃圾收集时被回收。这个时候就会出现Entry中Key已经被回收,出现一个null Key的情况,外部读取ThreadLocalMap中的元素是无法通过null Key来找到Value的。因此如果当前线程的生命周期很长,一直存在,那么其内部的ThreadLocalMap对象也一直生存下来,这些null key就存在一条强引用链的关系一直存在:Thread --> ThreadLocalMap-->Entry-->Value,这条强引用链会导致Entry不会回收,Value也不会回收,但Entry中的Key却已经被回收的情况,造成内存泄漏。
但是JVM团队已经考虑到这样的情况,并做了一些措施来保证ThreadLocal尽量不会内存泄漏:在ThreadLocal的get()、set()、remove()方法调用的时候会清除掉线程ThreadLocalMap中所有Entry中Key为null的Value,并将整个Entry设置为null,利于下次内存
由于ThreadLocalMap的key是弱引用,而Value是强引用。这就导致了一个问题,ThreadLocal在没有外部对象强引用时,发生GC时弱引用Key会被回收,而Value不会回收,如果创建ThreadLocal的线程一直持续运行,那么这个Entry对象中的value就有可能一直得不到回收,发生内存泄露。
由于ThreadLocalMap的key是弱引用,而Value是强引用。这就导致了一个问题,ThreadLocal在没有外部对象强引用时,发生GC时弱引用Key会被回收,而Value不会回收,如果创建ThreadLocal的线程一直持续运行,那么这个Entry对象中的value就有可能一直得不到回收,发生内存泄露。
我们先看看ThreadLocal的get方法的底层实现
在调用map.getEntry(this)时,内部会判断key是否为null,继续看map.getEntry(this)源码
在getEntry方法中,如果Entry中的key发现是null,会继续调用getEntryAfterMiss(key, i, e)方法,其内部回做回收必要的设置,继续看内部源码:
注意k == null这里,继续调用了expungeStaleEntry(i)方法,expunge的意思是擦除,删除的意思,见名知意,在来看expungeStaleEntry方法的内部实现:
注意这里,将当前Entry删除后,会继续循环往下检查是否有key为null的节点,如果有则一并删除,防止内存泄漏。
如何避免泄漏
既然Key是弱引用,那么我们要做的事,就是在调用ThreadLocal的get()、set()方法时完成后再调用remove方法,将Entry节点和Map的引用关系移除,这样整个Entry对象在GC Roots分析后就变成不可达了,下次GC的时候就可以被回收。
如果使用ThreadLocal的set方法之后,没有显示的调用remove方法,就有可能发生内存泄露,所以养成良好的编程习惯十分重要,使用完ThreadLocal之后,记得调用remove方法。
为什么ThreadLocalMap的key是弱引用呢?
key 使用强引用:引用的ThreadLocal的对象被回收了,但是ThreadLocalMap还持有ThreadLocal的强引用,如果没有手动删除,ThreadLocal不会被回收,导致Entry内存泄漏。
key 使用弱引用:引用的ThreadLocal的对象被回收了,由于ThreadLocalMap持有ThreadLocal的弱引用,即使没有手动删除,ThreadLocal也会被回收。value在下一次ThreadLocalMap调用set,get,remove的时候会被清除。
错误使用ThreadLocal导致线程不安全
为什么每个线程都输出5?难道他们没有独自保存自己的Number副本吗?为什么其他线程还是能够修改这个值?仔细考察ThreadLocal和Thead的代码,我们发现ThreadLocalMap中保存的其实是对象的一个引用,这样的话,当有其他线程对这个引用指向的对象实例做修改时,其实也同时影响了所有的线程持有的对象引用所指向的同一个对象实例。这也就是为什么上面的程序为什么会输出一样的结果:5个线程中保存的是同一Number对象的引用,在线程睡眠的时候,其他线程将num变量进行了修改,而修改的对象Number的实例是同一份,因此它们最终输出的结果是相同的。
而上面的程序要正常的工作,应该的用法是让每个线程中的ThreadLocal都应该持有一个新的Number对象。
而上面的程序要正常的工作,应该的用法是让每个线程中的ThreadLocal都应该持有一个新的Number对象。
在父线程中设置的ThreadLocal变量,怎样在子线程中获取?
Fork/Join
分治法
基本思想
把一个规模大的问题划分为规模较小的子问题,然后分而治之,最后合并子问题的解得到原问题的解
步骤
分割原子问题
求解子问题
合并子问题 的解为原问题的 解
在分治法中,子问题一般是相互独立的,因此,经常通过递归调用算法来求解子问题。
典型 应用
二分搜索
大整数乘法
Strassen矩阵乘法
棋盘覆盖
合并排序
快速排序
线性时间选择
汉诺塔
特性
ForkJoinPool 不是为了替代 ExecutorService,而是它的补充,在某些应用场景下性能比 ExecutorService 更好
ForkJoinPool 主要用于实现“分而治之”的算法,特别是分治之后递归调用的函数,例如 quick sort 等
ForkJoinPool 最适合的是计算密集型的任务,如果存在 I/O,线程间同步,sleep() 等会造成线程长时间阻塞的情况时,最好配
合使用 ManagedBlocker。
合使用 ManagedBlocker。
工作窃取算法
指某个线程从其他队列里窃取任务来执行
我们需要做一个比较大的任务,我们可以把这个任务分割为若干互不依赖的子任务,为了减少线程间的竞争,于是把些子任务分别放到不同的队列里,并为每个队列创建一个单独的线程来执行队列里的任务,线程和队列一一对应,比如A线程负责处理A队列里的任务。
但是有的线程会先把自己队列里的任务干完,而其他线程对应的队列里还有任务等待处理。干完活的线程与其等着,不如去帮其他线程干活,于是它就去其他线程的队列里窃取一个任务来执行。而在这时它们会访问同一个队列,所以为了减少窃取任务线程和被窃取任务线程之间的竞争,通常会使用双端队列,被窃取任务线程永远从双端队列的头部拿任务执行,而窃取任务的线程永远从双端队列的尾部拿任务执行。
但是有的线程会先把自己队列里的任务干完,而其他线程对应的队列里还有任务等待处理。干完活的线程与其等着,不如去帮其他线程干活,于是它就去其他线程的队列里窃取一个任务来执行。而在这时它们会访问同一个队列,所以为了减少窃取任务线程和被窃取任务线程之间的竞争,通常会使用双端队列,被窃取任务线程永远从双端队列的头部拿任务执行,而窃取任务的线程永远从双端队列的尾部拿任务执行。
优点
充分利用线程进行并行计算,并减少了线程间的竞争,其缺点是在某些情况下还是存在竞争,比如双端队列里
只有一个任务时。并且消耗了更多的系统资源,比如创建多个线程和多个双端队列
只有一个任务时。并且消耗了更多的系统资源,比如创建多个线程和多个双端队列
ForkJoinPool 的每个工作线程都维护着一个工作队列(WorkQueue),这是一个双端队列(Deque),里面存放的对象是任务(ForkJoinTask)
每个工作线程在运行中产生新的任务(通常是因为调用了 fork())时,会放入工作队列的队尾,并且工作线程在处理自己的工作队列时,使用的是 LIFO 方式,也就是说每次从队尾取出任务来执行。
每个工作线程在处理自己的工作队列同时,会尝试窃取一个任务(或是来自于刚刚提交到 pool 的任务,或是来自于其他工作线程的工作队列),窃取的任务位于其他线程的工作队列的队首,也就是说工作线程在窃取其他工作线程的任务时,使用的是 FIFO方式。
在遇到 join() 时,如果需要 join 的任务尚未完成,则会先处理其他任务,并等待其完成
在既没有自己的任务,也没有可以窃取的任务时,进入休眠。
fork/join的使用
ForkJoinTask:我们要使用 ForkJoin 框架,必须首先创建一个 ForkJoin 任务。它提供在任务中执行 fork() 和 join() 操作的机制,通常情况下我们不需要直接继承 ForkJoinTask 类,而只需要继承它的子类,Fork/Join 框架提供了以下两个子类:
RecursiveAction:用于没有返回结果的任务。(比如写数据到磁盘,然后就退出了。 一个RecursiveAction可以把自己的工作分割成更小的几块, 这样它们可以由独立的线程或者CPU执行。 我们可以通过继承来实现一个RecursiveAction)
RecursiveTask :用于有返回结果的任务。(可以将自己的工作分割为若干更小任务,并将这些子任务的执行合并到一个集体结果。可以有几个水平的分割和合并)
CountedCompleter: 在任务完成执行后会触发执行一个自定义的钩子函数
ForkJoinPool :ForkJoinTask 需要通过 ForkJoinPool 来执行,任务分割出的子任务会添加到当前工作线程所维护的双端队列中,进入队列的头部。当一个工作线程的队列里暂时没有任务时,它会随机从其他工作线程的队列的尾部获取一个任务
Java 1.7 引入了一种新的并发框架—— Fork/Join Framework
主要用于实现“分而治之”的算法,特别是分治之后递归调用的函数
提供了的一个用于并行执行任务的框架, 是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架
与ThreadPool共存,并不是要替换ThreadPool
Java 8 parallelStream
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9);
numbers.parallelStream() .forEach(out::println);
numbers.parallelStream() .forEach(out::println);
内部就是采用ForkJoinPool
ForkJoinPool 框架主要类
ForkJoinPool 实现ForkJoin的线程池 - ThreadPoo
ExecutorService 是Java Executor框架的基础类
其他ExecutorService的实现执行Runnable或Callables任务
ForkJoinPool执行ForkJoinTasks任务
ForkJoinTask<V> 一个描述ForkJoin的抽象类 Runnable/Callable
RecursiveAction 无返回结果的ForkJoinTask实现Runnable
RecursiveTask<V> 有返回结果的ForkJoinTask实现Callable
CountedCompleter<T> 在任务完成执行后会触发执行一个自定义的钩子函数
ForkJoinTask封装了数据及其相应的计算
支持细粒度的数据并行
ForkJoinTask比线程要轻量
ForkJoinPool中少量工作线程能够运行大量的ForkJoinTask
ForkJoinTask主要包括两个方法分别实现任务的分拆与合并
fork()类似于Thread.start(),但是它并不立即执行任务,而是将任务放入工作队列中
跟Thread.join()不同,ForkJoinTask的join()方法并不简单的阻塞线程
利用工作线程运行其他任务
当一个工作线程中调用join(),它将处理其他任务,直到注意到目标子任务已经完成
内部原理
ForkJoinPool中的所有的工作线程均有一个自己的工作队列WorkQueue
双端队列(Deque)
从队头取任务
线程私有,不共享
ForkJoinTask中fork的子任务,将放入运行该任务的工作线程的队头
工作线程以LIFO的顺序来处理它队列中的任务
为了最大化CPU利用率,空闲的线程将从其他线程的队列中“窃取”任务来执行
从工作队列的队尾“窃取”任务,以减少竞争
任务的“窃取”是以FIFO顺序进行的,因为先放入的任务往往表示更大的工作量
支持“窃取”线程进行进一步的递归分解
WorkQueue双端队列最小化任务“窃取”的竞争
push()/pop()仅在其所有者工作线程中调用
这些操作都是通过CAS来实现的,是Wait-free的
poll() 则由其他工作线程来调用“窃取”任务
可能不是wait-free
最佳实践
最适合的是计算密集型的任务
在需要阻塞工作线程时,可以使用 ManagedBlocker。是否内联方法
不应该在RecursiveTask<R>的内部使用ForkJoinPool.invoke()
Fork/Join框架执行流程
ForkJoinPool 中的任务执行分两种
直接通过 FJP 提交的外部任务(external/submissions task),存放在 workQueues 的偶数槽位
通过内部 fork 分割的子任务(Worker task),存放在 workQueues 的奇数槽位
Fork/Join实战
Fork/Join使用的标准范式
我们要使用ForkJoin框架,必须首先创建一个ForkJoin任务。它提供在任务中执行fork和join的操作机制,通常我们不直接继承ForkjoinTask类,只需要直接继承其子类。
1. RecursiveAction,用于没有返回结果的任务
2. RecursiveTask,用于有返回值的任务
task要通过ForkJoinPool来执行,使用submit 或 invoke 提交,两者的区别是:invoke是同步执行,调用之后需要等待任务完成,才能执行后面的代码;submit是异步执行。
join()和get方法当任务完成的时候返回计算结果。
在我们自己实现的compute方法里,首先需要判断任务是否足够小,如果足够小就直接执行任务。如果不足够小,就必须分割成两个子任务,每个子任务在调用invokeAll方法时,又会进入compute方法,看看当前子任务是否需要继续分割成孙任务,如果不需要继续分割,则执行当前子任务并返回结果。使用join方法会等待子任务执行完并得到其结果。
Fork/Join实战2
随机生成2000w条数据在数组当中,然后求和
ThreadLocalRandom
为什么不使用Random
它解决了Random类在多线程下多个线程竞争内部唯一的原子性种子变量而导致大量线程自旋重试的不足
ThreadLocalRandom使用ThreadLocal的原理,让每个线程内持有一个本地的种子变量,该种子变量只有在使用随机数时候才会被初始化,多线程下计算新种子时候是根据自己线程内维护的种子变量进行更新,从而避免了竞争
SimpleDateFormat是线程不安全的
SimpleDateFormat是线程不安全的类,定义为static对象,会有数据同步风险。通过源码可以看出,SimpleDateFormat内部有一个Calendar对象,在日期转字符串或字符串转日期的过程中,多线程共享时有非常高的概率产生错误,推荐的方式之一时使用ThreadLocal,让每个线程单独拥有这个对象。
解决办法
每次使用时new一个SimpleDateFormat 的 实例,这样可以保证每个实例使用自己的Calendar实例,但是每次使用都需要new一个对象,并且使用后由于没有其他引用,又需要回收,开销会很大。
可以使用syncheronized 对SimpleDtaFormat实例进行同步
使用ThreadLocl,这样每个线程只需要使用一个SimpleDateFormate实例,这相比第一种方式 节省了对象的创建销毁开销,并且不需要使多个线程同步。
Hook线程以及捕获线程执行异常
获取线程运行时异常
在Thread类中,关于处理运行时异常API总共有四个
setUncaughtExceptionHandler(UncaughtExceptionHandler eh) :为某个特定线程指定UncaughtExceptionHandler
setDefaultUncaughtExceptionHandler(UncaughtExceptionHandler eh) : 设置全局的UncaughtExceptionHandler
UncaughtExceptionHandler getUncaughtExceptionHandler() : 获取特定线程的UncaughtExceptionHandler
UncaughtExceptionHandler getDefaultUncaughtExceptionHandler(): 获取去全局的UncaughtExceptionHandler
UncaughtExceptionHandler介绍
线程在执行单元中是不允许抛出checked异常的额,而且线程运行在自己的上下文中,派生它的线程将无法直接获得它运行中出现的以常规信息,对此,Java 为我们提供了一个 UncaughtExceptionHandler 接口,,当线程在运行过程中出现异常时,会回调UncaughtExceptionHandler接口,从而得知是哪个线程在运行时出错,以及出现了什么样的错。
代码演示
注入钩子线程
Hook线程介绍
JVM进程的退出时由于JVM进程中没有活跃的非守护线程,或者收到了系统中断信号,向JVM程序注入一个Hook线程,在JVM进程退出的时候,Hook线程会启动执行,通过RunTime可以为JVM注入多个Hook线程,
ThreadHook
Hook线程实战
在我们的开发中 经常会遇到Hook线程,比如为了防止某个程序被重复启动,在进程启动时 会创建一个lock文件,进程收到中断信号的 时候会删除这个lock文件,我们在mysql服务器、zookeeper/kafka 等系统中都能看到lock文件的存在。
代码演示
Hook线程使用应用场景
Hook线程只有在收到退出信号的时候会被执行,如果在kill的时候使用了参数-9, 那么Hook线程不会得到执行,进程将会立即退出,因此.lock文件将得不到清理。
Hook线程中也可以执行一些资源释放的工作,比如关闭文件句柄、socket链接、数据库connection等。
尽量不要再Hook线程中执行一些耗时长的操作,因为其会导致程序迟迟不会退出。
Callable、Future 和 Futuretask
Runnable是一个接口,在它里面只声明了一个run()方法,由于run()方法返回值为void类型,所以在执行完任务之后无法返回任何结果。
Callable位于java.util.concurrent包下,它也是一个接口,在它里面也只声明了一个方法,只不过这个方法叫做call(),这是一个泛型接口,call()函数返回的类型就是传递进来的V类型。
Future就是对于具体的Runnable或者Callable任务的执行结果进行取消、查询是否完成、获取结果。必要时可以通过get方法获取执行结果,该方法会阻塞直到任务返回结果。
因为Future只是一个接口,所以是无法直接用来创建对象使用的,因此就有了下面的FutureTask。
FutureTask类实现了RunnableFuture接口,RunnableFuture继承了Runnable接口和Future接口,而FutureTask实现了RunnableFuture接口。所以它既可以作为Runnable被线程执行,又可以作为Future得到Callable的返回值。
因此我们通过一个线程运行Callable,但是Thread不支持构造方法中传递Callable的实例,所以我们需要通过FutureTask把一个Callable包装成Runnable,然后再通过这个FutureTask拿到Callable运行后的返回值。
要new一个FutureTask的实例,有两种方法
查看运行代码的汇编指令的工具
实战
项目实战:并发任务执行框架
架构师定义
架构设计、软件开发
确认需求
系统分解
技术选型
制定技术规格说明
开发管理
深深介入开发的方方面面
沟通协调
与用户,与产品,与升级,与团队成员
架构师方方面面
作用
设计架构
救火
布道
效果
攻关
信念
职责
产品架构
基础服务架构
需求的产生和分析
考试组
有批量离线文档要生成
题库组
批量的题目进行排重
根据条件批量修改题目的内容
共同痛点
都有批量任务要完成且速度慢
都要求可以查询进度
在使用省尽可能的对业务开发人员友好
我们需要做什么
提高性能,采用多线程,屏蔽细节
封装线程池和阻塞队列
每个批量任务拥有自己的上下文环境
需要一个并发安全的容器保存每个任务
自动清除已完成和过期任务
定时轮询?
框架业务示意图
框架流程图
具体实现-可查询进度的并发任务执行框架
用户业务方法结果?
如何执行业务的业务方法?
用户如何提交他的 工作任务和查询任务进度?
整个框架的流程和实现?
具体实现
1、用户业务方法的结果?
一个方法执行的结果有几种可能?三种,成功:按预想的流程出了结果;失败:按按预想的流程没出结果;异常:没按预想的流程抛出了预料之外的错误。因此我们定义了一个枚举,表示这三种情况,
对于方法的业务执行结果,返回值有很多种可能,基本类型,系统定义的对象类型,用户自定义的对象类型都是存在的,我们需要用泛型来说表示这个结果。同时方法执行失败了,我们还需要告诉用户或者业务开发人员,失败的原因,我们再定义了一个任务的结果类。
2、如何执行用户的业务方法?
我们是个框架,用户的业务各种各样,都要放到我们框架里执行,怎么办?当然是定义个接口,我们的框架就只执行这个方法,而使用我们框架的业务方都应该来实现这个接口,当然因为用户业务的数据多样性,意味着我们这个方法的参数也应该用泛型。
3、用户如何提交他的工作和查询任务进度?
用户在前端提交了工作(JOB)到后台,我们需要提供一种封装机制,让业务开发人员可以将任务的相关信息提交给这个封装机制,用户的需要查询进度的时候,也从这个封装机制中取得,同时我们的封装机制内部也要负责清除已完成任务。
在这个封装机制里我们定义了一个类JobInfo,抽象了对用户工作的封装,一个工作可以包含多个子任务(TASK),这个JobInfo中就包括了这个工作的相关信息,比如工作名,用以区分框架中唯一的工作,也可以避免重复提交,也方便查询时快速定位工作,除了工作名以外,工作中任务的列表,工作中任务的处理器都在其中定义。
同时JobInfo还有相当多的关于这个工作的方法,比如查询工作进度,查询每个任务的处理结果,记录每个任务的处理结果等等
负责清除已完成任务,我们则交给CheckJobProcesser类来完成,定时轮询的机制不够优雅,因此我们选用了DelayQueue来实现这个功能
并且在其中定义了清除已完成任务的Runnable和相关的工作线程。
4、框架的主体类
主体类则是PendingJobPool,这也是业务开发人员主要使用的类。这个类主要负责调度,例如工作(JOB)和任务(TASK)的提交,任务(TASK)的保存,任务(TASK)的并发执行,工作进度的查询接口和任务执行情况的查询等等。
流程图
代码
单体代码
和Spring集成代码
项目实战-应用性能优化实战
项目简介
考试后从题库中抽取题目生成大量的离线练习册文档并打印
题目在数据库中存储形式,平均长度800字节:
下图是Diameter协议中的那部分?《img src = "001.jpg"》
项目原来存在的问题
一份PDF文档的生成平均时长在50~55秒左右
为什么慢?
一份离线练习册的生成过程
一份离线练习册的生成过程
分离出需要处理的题目(60~120个,平均大约80个题目左右)
解析处理题目文本,对题目中的图片下载到本地,然后调用第三方工具生成PDF文档(耗时大约40~45秒)
将PDF文档上传到云空间进行存储(耗时大约9、10秒)
提供文档地址让用户去下载打印
分析和改进
第一版:单web
代码
第二版:服务化
代码
改进
考察业务特点:
1、从容量为10万左右的题库中为每个学生抽取适合他的题目;
2、每道题目都含有大量的图片需要下载到本地,和文字部分一起渲染;
3、存在热点题目
1、解决方案:缓存避免重复工作、题目处理并行和异步化
比如A 80 题 B 20 题 A和B有共同的10题
2、如何实现
1)先检索缓存,新题目在生成时要考虑并发安全和充分利用Future
2)题目有更新时,怎么处理?
比较题目有没有变化,可以使用MD5,SHA摘要
改进之后题目处理的流程
代码
见附件
能否继续改进呢?
如何继续改进
1、题库数量太多时,如何处理?
2、服务器重启的时候,已缓存的题目怎么办?
3、还可以继续优化吗?
解决方案
数据结构改进
使用Google ConcurrentLinkedHashMap 代替 concurrentHashMap
线程数的设置
缓存的改进
使用了本地文件存储,启用了一个二级缓存机制
用户体验的改进
使用并发任务执行框架中的思想相结合,在前端显示处理进度,给用户带来更好的使用体验
启示
性能优化一定要建立在对业务的深入分析上,比如我们在性能优化的切入点,在缓存数据结构的选择就建立在对业务的深入理解上;
性能优化要善于利用语言的高并发特性,
性能优化多多利用缓存,异步任务等机制,正是因为我们使用这些特性和机制,才让我们的应用在性能上有个了质的飞跃;
引入各种机制的同时要注意避免带来新的不安全因素和瓶颈,比如说缓存数据过期的问题,并发时的线程安全问题,都是需要我们去克服和解决的。
0 条评论
下一页