并发编程
2021-09-12 15:53:13 3 举报
AI智能生成
并发编程,多线程编程
作者其他创作
大纲/内容
多线程基础
sleep方法没有释放锁
wait方法释放了锁,使当前线程阻塞,进入等待状态,notify并不释放锁,只是告诉调用过wait方法的线程可以去参与获得锁的竞争了,但不是马上得到锁,因为锁还在别人手里,别人还没释放,这地方很多人会懵逼,记反,notify只是唤醒一个等待线程,执行完synchronized 代码块的代码才会释放锁。一般notify之后没有其他代码了,所以notify之后紧接着代码块也就执行完了,但是notify方法本身不释放锁
必须在synchronized代码块中使用,因为必须是拥有monitor lock的线程才可以执行wait与notify操作
yield()方法是停止当前线程,让同等优先权的线程运行。如果没有同等优先权的线程,那么Yield()方法将不会起作用。
相当于只是礼让一下
join()方法强制使当前线程停下来等待,直至另一个调用join方法的线程终止才继续顺序执行主线程。join的线程的在被激活后不一定马上就运行,而是进入到可运行线程的队列中
Interrupt():中断线程
synchronized
每个对象都有一个关联的monitor,比如一个对象实例就有一个monitor,一个类的Class对象也有一个monitor,
如果要对这个对象加锁,那么必须获取这个对象关联的monitor的lock锁
如果要对这个对象加锁,那么必须获取这个对象关联的monitor的lock锁
monitor里面有一个计数器,从0开始的。如果一个线程要获取monitor的锁,就看看他的计数器是不是0,如果是0的话,那么说明没人获取锁,他就可以获取锁了,然后对计数器加1
这个monitor的锁是支持重入加锁的
加锁,一般来说都是必须对一个对象进行加锁,第二次synchronized那里,会再次获取对象的monitor的锁,这个就是重入加锁了,然后计数器会再次加1,变成2
加锁,一般来说都是必须对一个对象进行加锁,第二次synchronized那里,会再次获取对象的monitor的锁,这个就是重入加锁了,然后计数器会再次加1,变成2
出了synchronized修饰的代码片段的范围,就会有一个monitorexit的指令,在底层。此时获取锁的线程就会对那个对象的monitor的计数器减1
synchronized锁升级
JDK1.6以后,引入了偏向锁,轻量级锁,重量级锁,减少竞争带来的上下文切换
Java对象头
Mark World,记录了对象和锁有关的信息
指向类的指针
数组长度(如果当前对象是数组)
锁升级主要依赖Mark Word中的锁标志位和释放偏向锁标识位
用户程序都是运行在用户态的,但是有时候程序确实需要做一些内核的事情,
例如从硬盘读取数据,或者从硬盘获取输入,而唯一可以做这些事情的就是操作系统,这时候就需要:将用户态程序切换到内核态,
也就是说synchronized是依赖操作系统实现的,因此在使用synchronized同步锁的时候需要进行用户态到内核态的切换。
例如从硬盘读取数据,或者从硬盘获取输入,而唯一可以做这些事情的就是操作系统,这时候就需要:将用户态程序切换到内核态,
也就是说synchronized是依赖操作系统实现的,因此在使用synchronized同步锁的时候需要进行用户态到内核态的切换。
synchronized 内核态切换
简单来说在JVM中monitorenter和monitorexit字节码依赖于底层的操作系统的Mutex Lock来实现的,但是由于使用Mutex Lock需要将当前线程挂起并从用户态切换到内核态来执行,这种切换的代价是非常昂贵的。所以就出现了锁升级的机制
锁在升级到重量锁之前,是不需要进行状态切换的
优化后的synchronized锁
JDK1.6开始对synchronized做了优化,通过上文讲的Mark World 来区分了不同场景下同步锁的不同类型,来减少线程切换的次数
偏向锁的作用
当有线程访问同步代码或方法时,线程只需要判断对象头的Mark Word中判断一下是否有偏向锁指向线程ID.
偏向锁记录过程
线程抢到了对象的同步锁(锁标志为01即无其他线程占用,无锁情况下,锁标志位也是01)
对象Mark World 将是否偏向标志位设置为1
记录抢到锁的线程ID
进入偏向状态,此时锁标志位仍然是01
偏向锁的优势
对象中记录了获取到对象锁的线程ID,这就意味如果短时间同一个线程再次访问这个加锁的同步代码或方法时,该线程只需要对对象头Mark Word中去判断一下是否有偏向锁指向它的ID,不需要在进入Monitor去竞争对象了
什么时候升级成轻量级锁
一旦出现其他线程竞争资源时,偏向锁就会被撤销
如果线程在全局安全点检查时,还需要使用该锁 则进行锁升级,如果线程已经不需要使用锁,并有其他线程需要使用时,将偏向锁的拥有者切换为另外线程
另外一个线程,发现对象头 Mark Word 中的线程 ID 不是自己的线程 ID,就会进行 CAS 操作获取锁
如果获取成功
直接替换 Mark Word 中的线程 ID 为自己的 ID,该锁会保持偏向锁状态
如果获取锁失败
代表当前锁有一定的竞争,偏向锁将升级为轻量级锁,锁标志位改成00
举个例子:假设A线程 持有锁 X(此时X是偏向锁) 这时有个B线程也同样用到了锁X,而B线程在检查锁对象的Mark World时发现偏向锁的线程ID已经指向了线程A。这时候就需要升级锁X为轻量级锁。轻量级锁意味着标示该资源现在处于竞争状态。
当有其他线程想访问加了轻量级锁的资源时,会使用自旋锁优化,来进行资源访问
自旋策略:JVM 提供了一种自旋锁,可以通过自旋方式不断尝试获取锁,从而避免线程被挂起阻塞。这是基于大多数情况下,线程持有锁的时间都不会太长,毕竟线程被挂起阻塞可能会得不偿失。
重量级锁
JDK1.7 开始,自旋锁默认启用,因为 CAS 重试操作意味着长时间地占用 CPU,自旋锁重试之后如果抢锁依然失败,同步锁就会升级至重量级锁,锁标志位改为 10。在这个状态下,未抢到锁的线程都会进入 Monitor,之后会被阻塞在 _WaitSet 队列中。
自旋失败,很大概率 再一次自选也是失败,因此直接升级成重量级锁,进行线程阻塞,减少cpu消耗
当锁升级为重量级锁后,未抢到锁的线程都会被阻塞,进入阻塞队列
总结
synchronized锁升级实际上是把本来的悲观锁变成了 在一定条件下 使用无锁(同样线程获取相同资源的偏向锁),以及使用乐观(自旋锁 cas)和一定条件下悲观(重量级锁)的形式
偏向锁:适用于单线程适用锁的情况
轻量级锁:适用于竞争较不激烈的情况(这和乐观锁的使用范围类似)
重量级锁:适用于竞争激烈的情况
synchronized双重锁,单例模式
存在问题:
在线程执行到第1处,代码读取到instance不为null时,instance引用的对象有可能还没有完成初始化
instance = new Instance();
分解为如下的3行伪代码
分解为如下的3行伪代码
emory = allocate(); //1.分配对象的内存空间
ctorInstance(memory); //2.初始化对象
instance = memory; //3.设置instance指向刚分配的内存地址
ctorInstance(memory); //2.初始化对象
instance = memory; //3.设置instance指向刚分配的内存地址
上面3行伪代码中的2和3之间,可能会被重排序
解决方案一
不允许2和3重排序
懒汉式单例模式,想保证线程安全,一定要吧变量修饰为volatile
基于volatile的解决方案
正常的初始化要优于延迟初始化。如果确实需要对实例字段使用线程安全的延迟初始化,请使用基于volatile的延迟初始化的方案;
解决方案二
允许2和3重排序,但不允许其他线程“看到”这个重排序
相当于饿汉式
基于类初始化的解决方案
在执行类的初始化期间,JVM会去获取一个锁.这个锁可以同步多个线程对同一个类的初始化。
如果确实需要对静态字段使用线程安全的延迟初始化,请使用基于类初始化的方案
CAS的底层实现原理
FC
函数调用function call:调用的过程,运用的是cpu的指令集,由cpu直接交互代码指令,很快。
cas是cpu层面执行,自旋,但不用考虑内核
SC
系统调用system call:synchronized是在内核里实现的,虽然也是靠cpu实现的,但是复杂
cpu执行过程中,靠晶振器做时钟中断来控制cpu
sync方法放到cpu寄存器,会从寄存器中去sync,也就是内核里处理
中断
中断向量表,包括GDT(一个表,描述那些是内核和用户空间),IDT中断号(根据中断号,做进程调度,调度活动队列,阻塞队列)
阻塞到运行:io阻塞,资源到位,数据到位,从阻塞队列进入运行队列,等着时钟中断
中断之后,轮到你了才运行
中断之后,轮到你了才运行
内存相当于一个线性地址空间,线型数组,内核空间也是在内存里的,但是内核空间是有一个保护模式
时钟中断后在IDT里做进程调度
尽管JAVA 1.6为synchronized做了优化,增加了从偏向锁到轻量级锁再到重量级锁的过过度,但是在最终转变为重量级锁之后,性能仍然比较低,
synchronized属于悲观锁,CAS属于乐观锁
synchronized属于悲观锁,CAS属于乐观锁
原子操作类的底层正是用到了“CAS机制”,即各种Atomic
实现原理
CAS机制中使用了3个基本操作数:内存地址V,旧的预期值A,要修改的新值B
更新一个变量的时候,只有当变量的预期值A和内存地址V当中的实际值相同时,才会将内存地址V对应的值修改为B。
CAS的缺点
CPU开销过大,并发量比较高的情况下,多线程反复尝试更新某一个变量,可能有很多线程会不停的自旋,占用cpu
不能保证代码块的原子性,比如需要保证3个变量共同进行原子性的更新,就不得不使用synchronized了
ABA问题:线程A进行CAS逻辑,在从内存中获取到var值到开始进行逻辑之间,会有一个时间差;如果刚好在这个时间差内,有其他某线程对var做了一系列的操作,但最后又恢复了var的值,即:出现“偷梁换柱”的情况;虽然此时线程A仍然能CAS成功,但是中间多出的那些过程仍然可能引发问题。
解决:使用AtomicStampedReference类,简单说AtomicStampedReference类引入了版本概念
Java 8对CAS机制的优化
Java 8推出了一个新的类,LongAdder,尝试使用分段CAS以及自动分段迁移的方式来大幅度提升多线程高并发执行CAS操作的性能!
在LongAdder的底层实现中,首先有一个base值,刚开始多线程来不停的累加数值,都是对base进行累加的,比如刚开始累加成了base = 5
接着如果发现并发更新的线程数量过多,就会开始施行分段CAS的机制,也就是内部会搞一个Cell数组,每个数组是一个数值分段
大量的线程分别去对不同Cell内部的value值进行CAS累加操作,这样就把CAS计算压力分散到了不同的Cell分段数值中了
自动分段迁移的机制:如果某个Cell的value执行CAS失败了,那么就会自动去找另外一个Cell分段内的value值进行CAS操作
最后,如果你要从LongAdder中获取当前累加的总值,就会把base值和所有Cell分段数值加起来返回给你,解决了线程空旋转、自旋不停等待执行CAS操作的问题,吞吐量增加
缺点:牺牲了空间,因为多了个cell分段数组
使用场景:那个变量是不是有人读,有人写,直接就是volatile就可以了。如果大家都要写,再判断一下,仅仅只是简单的数值累加或者变更,数值的一些操作,建议可以用 Atomic原子类,CAS机制,无锁化,并发性要比synchronized要好不少的
服务心跳计数器
Lock
Lock不是Java语言内置的,synchronized是Java语言的关键字,因此是内置特性。Lock是一个类,通过这个类可以实现同步访问
多个线程都只是进行读操作时,线程之间不会发生冲突,通过Lock就可以办到
通过Lock可以知道线程有没有成功获取到锁。这个是synchronized无法办到的
java.util.concurrent.locks包中,Lock是一个接口
Lock,必须主动去释放锁,一般来说,使用Lock必须在try{}catch{}块中进行,并且将释放锁的操作放在finally块中进行,以保证锁一定被被释放,防止死锁的发生
Lock lock = ...;
lock.lock();
try{
//处理任务
}catch(Exception ex){
}finally{
lock.unlock(); //释放锁
}
lock.lock();
try{
//处理任务
}catch(Exception ex){
}finally{
lock.unlock(); //释放锁
}
tryLock()方法是有返回值的,它表示用来尝试获取锁,如果获取成功,则返回true,如果获取失败(即锁已被其他线程获取),则返回false,也就说这个方法无论如何都会立即返回。在拿不到锁时不会一直在那等待。
tryLock(long time, TimeUnit unit)方法和tryLock()方法是类似的,只不过区别在于这个方法在拿不到锁时会等待一定的时间,在时间期限之内如果还拿不到锁,就返回false
lockInterruptibly()方法
如果线程正在等待获取锁,则这个线程能够响应中断
当一个线程获取了锁之后,是不会被interrupt()方法中断的
ReentrantLock是唯一实现了Lock接口的类
ReadWriteLock
只定义了两个方法,readLock,writeLock,将文件的读写操作分开,分成2个锁来分配给线程,从而使得多个线程可以同时进行读操作
ReentrantReadWriteLock
如果有一个线程已经占用了读锁,则此时其他线程如果要申请写锁,则申请写锁的线程会一直等待释放读锁
如果有一个线程已经占用了写锁,则此时其他线程如果申请写锁或者读锁,则申请的线程会一直等待释放写锁
可中断锁
synchronized就不是可中断锁,而Lock是可中断锁。
如果某一线程A正在执行锁中的代码,另一线程B正在等待获取该锁,可能由于等待时间过长,线程B不想等待了,想先处理其他事情,我们可以让它中断自己或者在别的线程中中断它,调用lockInterruptibly()方法来实现
ConcurrentHashMap
hashmap在JDK1.8的链表插入元素为什么改为了尾插法
首先,原因是因为jdk1.7在hashmap扩容的时候,多线程的情况下,会出现链表成环的问题,导致之后查询hashmap时死循环
,jdk1.8在hashmap扩容时多线程情况下,仍然有数据异常,但只是不会导致链表成环,所以无论1.8还是1.7多线程情况下,不加锁都会有问题
,jdk1.8在hashmap扩容时多线程情况下,仍然有数据异常,但只是不会导致链表成环,所以无论1.8还是1.7多线程情况下,不加锁都会有问题
jdk1.7链表成环的过程
首先hashmap扩容分为两步
扩容:创建一个新的Entry空数组,长度是原数组的2倍
ReHash:遍历原Entry数组,把所有的Entry重新Hash到新数组
单线程情况下,将原链表1->2->3倒叙插入到新扩容链表3->2->1(如果在扩容后还存在于table相同下标的链表中,这也太巧了,可以买彩票了)
多线程情况下,单链表的头插法,同一位置上新元素总会被放在链表的头部位置,源码中看出,会导致生成环型链表,也就是有可能1的next又指向3了
jdk1.8使用尾插法,在扩容时会保持链表元素原本的顺序,就不会出现链表成环的问题了
多线程情况下使用synchronized保证安全
缺点:两个不同的位置添加元素,也被锁管理了,这明显是没有必要的,会造成效率低下
ConcurrentHashMap闪亮登场
JDK 1.7 版本,它的实现方式是分段加锁,将HashMap在底层的数组分段成几个小数组,然后给每个数组分别加锁,
所以JDK1.7的底层是:segments+HashEntry数组
所以JDK1.7的底层是:segments+HashEntry数组
JDK1.8以及之后,锁粒度的细化,底层是散列表+红黑树和HashMap是一样的
数组中每个元素进行put都是有一个不同的锁,如果两个线程都是在数组[5]这个位置进行put,这个时候,对数组[5]这个位置进行put的时候,采取的是CAS策略
如果很多个线程对数组中不同位置的元素进行操作,大家是互相不会影响的
如果多个线程对同一个位置进行操作,产生冲突,CAS失败的线程,就会在这个位置基于链表+红黑树来进行处理,synchronized([5]),进行加锁。
ConcurrentHashMap的key和Value都不能为null
hashmap的key和Value都可以为null
AQS的实现原理
AQS的全称是AbstractQueuedSynchronizer,抽象队列同步器
ReentrantLock和AQS之间的关系:说白了,ReentrantLock内部包含了一个AQS对象,也就是AbstractQueuedSynchronizer类型的对象。这个AQS对象就是ReentrantLock可以实现加锁和释放锁的关键性的核心组件
实现原理
1,AQS对象内部有一个核心的变量叫做state,是int类型的,代表了加锁的状态。初始状态下,这个state的值是0
2,AQS内部还有一个关键变量,用来记录当前加锁的是哪个线程,初始化状态下,这个变量是null
线程1跑过来调用ReentrantLock的lock()方法尝试进行加锁,这个加锁的过程,直接就是用CAS操作将state值从0变为1。设置当前加锁线程是线程1
线程2跑过来一看,state的值不是0啊?所以CAS操作将state从0变为1的过程会失败,因为state的值当前为1,说明已经有人加锁了!
线程2会看一下,是不是自己之前加的锁啊?如果不是,此时就是加锁失败。
线程2会将自己放入AQS中的一个等待队列
如果是自己之前加的锁,将aqs里面的state加1。
线程1在执行完自己的业务逻辑代码之后,就会释放锁
将AQS内的state变量的值递减1,如果state值为0,则彻底释放锁
将“加锁线程”变量也设置为null
从等待队列的队头唤醒线程2重新尝试加锁
ReentrantLock这种东西只是一个外层的API,内核中的锁机制实现都是依赖AQS组件的
线程池
系统是不可能频繁的创建线程有销毁线程的,这样会非常影响性能,所以我们需要线程池。
运行原理
提交任务,先看一下线程池里的线程数量是否小于corePoolSize,也就是3,如果小于,直接创建一个线程出来执行你的任务
如果执行完任务之后,这个线程是不会死掉的,他会尝试从一个无界的LinkedBlockingQueue里获取新的任务,如果没有新的任务,此时就会阻塞住,等待新的任务到来
持续提交任务,上述流程反复执行,只要线程池的线程数量小于corePoolSize,都会直接创建新线程来执行这个任务,执行完了就尝试从无界队列里获取任务,直到线程池里有corePoolSize个线程
接着再次提交任务,会发现线程数量已经跟corePoolSize一样大了,此时就直接把任务放入队列中就可以了,线程会争抢获取任务执行的,如果所有的线程此时都在执行任务,那么无界队列里的任务就可能会越来越多
这个时候,如果你的maximumPoolSize是比corePoolSize大的,此时线程池就会继续创建额外的线程放入线程池中,来处理这些任务。这些额外创建的线程如果处理完了一个任务也会尝试从队列中获取任务来执行。线程池总共可以创建的线程的数量就是maximumPoolSize
如果任务非常多,额外线程全部创建完了,队列还是满的,此时还是有新的任务来,此时只能reject掉,有几种不同的reject策略,可以传入RejectedExecutionHandler
默认:AbortPolicy:丢弃任务并抛出RejectedExecutionException异常,比较关键的业务,推荐使用此拒绝策略
DiscardPolicy:丢弃任务,但是不抛出异常。如果线程队列已满,则后续提交的任务都会被丢弃,无关紧要的业务采用此策略。例如,博客网站统计阅读量就是采用的这种拒绝策略
DiscardOldestPolicy:丢弃队列最前面的任务,然后重新提交被拒绝的任务,根据实际业务是否允许丢弃老任务来认真衡量。
CallerRunsPolicy:由调用主线程处理该任务
后续慢慢没有任务了,额外创建的线程出去空闲状态,那么线程会等待最大存活时间,如果在这个时间内没有获取新的任务,它就会销毁。
Java提供的四种线程池实现
newCachedThreadPool创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程
newFixedThreadPool 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待
newScheduledThreadPool 创建一个定长线程池,支持定时及周期性任务执行
newSingleThreadExecutor 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。
如果线上机器突然宕机,线程池的阻塞队列中的请求怎么办?
我们可以在提交任务之前,在数据库中插入这个任务的信息,更新任务的状态:未提交、已提交、已完成。提交成功后,更新它的状态是已提交状态。
系统重启后,用一个后台线程去扫描数据库里的未提交和已提交状态的任务,可以把任务的信息读取出来,重新提交到线程池里去,继续进行执行。
系统重启后,用一个后台线程去扫描数据库里的未提交和已提交状态的任务,可以把任务的信息读取出来,重新提交到线程池里去,继续进行执行。
JAVA内存模型
read(读取):作用于主内存变量,表示把一个主内存变量的值传输到线程的工作内存
load(载入):作用于线程的工作内存的变量,表示把read操作从主内存中读取的变量的值放到工作内存的变量副本中
use(使用):作用于线程的工作内存中的变量,表示把工作内存中的一个变量的值传递给执行引擎,每当虚拟机遇到一个需要使用变量的值的字节码指令时就会执行该操作
assign(赋值):作用于线程的工作内存的变量,表示把执行引擎返回的结果赋值给工作内存中的变量,每当虚拟机遇到一个给变量赋值的字节码指令时就会执行该操作
store(存储):作用于线程的工作内存中的变量,把工作内存中的一个变量的值传递给主内存,以便随后的write操作使用
write(写入):作用于主内存的变量,把store操作从工作内存中得到的变量的值放入主内存的变量中
原子性:就是当有一个线程在对内存中的某个数据进行操作的时候,必须要等这个线程完全操作结束后,其他线程才能够操作
有序性:就是代码的顺序应该和指令的顺序相同。在执行过程中不会发生指令重排
可见性:如果一个线程成功修改了数据,那么其他线程能够立即更新工作内存中的该数据,即随时保持最新数据状态
volatile
volatile 变量具备两种特性,volatile 变量不会被缓存在寄存器或者对其他处理器不可见的
地方,因此在读取 volatile 类型的变量时总会返回最新写入的值
地方,因此在读取 volatile 类型的变量时总会返回最新写入的值
变量可见性,其一是保证该变量对所有线程可见,这里的可见性指的是当一个线程修改了变量的值,那么新的
值对于其他线程是可以立即获取的
值对于其他线程是可以立即获取的
volatile 禁止了指令重排
为啥一定要让每个线程用一个工作内存来存放变量的副本以供读取
要是每次需要一个变量的值,都从主内存加载,性能会比较差!
所以说后来想了一个办法,就是线程有工作内存的概念,类似于一个高速的本地缓存
所以说后来想了一个办法,就是线程有工作内存的概念,类似于一个高速的本地缓存
volatile最关键的几个作用
强制将这个data变量最新的值刷回主内存,必须让主内存里的data变量值立马变成最新的值
如果此时别的线程的工作内存中有这个data变量的本地缓存,也就是一个变量副本的话,那么会强制让其他线程的工作内存中的data变量缓存直接失效过期,不允许再次读取和使用了
如果线程2在代码运行过程中再次需要读取data变量的值,此时尝试从本地工作内存中读取,就会发现这个data已经过期了
此时,他就必须重新从主内存中加载data变量最新的值,所以volitile保证了可见性
常常使用的一个场景是对于一个变量,有的线程要更新它有的线程要读取它来进行判断操作,这个时候就需要使用volatile关键字,来保证读取到最新的数据。
基于内存屏障保证可见性和有序性
lock指令:volatile保证可见性
对volatile修饰的变量,执行写操作的话,JVM会发送一条lock前缀指令给CPU,CPU在计算完之后会立即将这个值写回主内存,同时因为有MESI缓存一致性协议,所以各个CPU都会对总线进行嗅探,自己本地缓存中的数据是否被别人修改
如果发现别人修改了某个缓存的数据,那么CPU就会将自己本地缓存的数据过期掉,然后这个CPU上执行的线程在读取那个变量的时候,就会从主内存重新加载最新的数据了
内存屏障:volatile禁止指令重排序
对于volatile修改变量的读写操作,都会加入内存屏障
每个volatile写操作前面,加StoreStore屏障,禁止上面的普通读写和他重排;每个volatile写操作后面,加StoreLoad屏障,禁止跟下面的volatile读/写重排
每个volatile读操作后面,加LoadLoad屏障,禁止下面的普通读和voaltile读重排;每个volatile读操作后面,加LoadStore屏障,禁止下面的普通写和volatile写重排
使用场景:如果仅仅只是有一些线程会来写一个变量,标志位,另外一个线程是来读取这个标志位的值,那么此时优先使用volatile
标志位修改等可见性场景优先使用volatile
ThreadLocal
ThreadLocal是通过每个线程单独一份存储空间,牺牲空间来解决冲突,并且相比于Synchronized,ThreadLocal具有线程隔离的效果,只有在线程内才能获取到对应的值,线程外则不能访问到想要的值
数据结构
ThreadLocal的静态内部类ThreadLocalMap为每个Thread都维护了一个数组table(table里面存的就是ThreadLocal变量),
源码中:private Entry[] table;
也就是说一个线程中可以定义多个ThreadLocal变量
源码中:private Entry[] table;
也就是说一个线程中可以定义多个ThreadLocal变量
当方法中定义一个ThreadLocal类型的变量时,支持泛型,假设定义变量名称是maBaoGuo,
代码:private static ThreadLocal<Map<String,Integer>> maBaoGuo = new ThreadLocal<>();
代码:private static ThreadLocal<Map<String,Integer>> maBaoGuo = new ThreadLocal<>();
maBaoGuo.get()方法
原理:ThreadLocal,连接ThreadLocalMap和Thread。来处理Thread的TheadLocalMap属性,包括init初始化属性赋值、get对应的变量,set设置变量等。通过当前线程,获取线程上的ThreadLocalMap属性,对数据进行get、set等操作
源码分析:
public T get() {
//获取当前线程,线程互不干扰的操作ThreadLocal的原因就是,它的set、get方法是要先获取当前线程,然后修改、操作这个线程对象的成员属性
Thread t = Thread.currentThread();
//ThreadLocalMap 是一个静态内部类,getMap方法返回的就是当前线程的变量threadLocals
ThreadLocalMap map = getMap(t);
//初始threadLocals是null,调用set方法以后才会有值
if (map != null) {
//ThreadLocalMap有个静态内部类Entry,key是ThreadLocal,值是我们自己设定的maBaoGuo,即类型是当初定义ThreadLocal类型的变量时,泛型的类型。且这个Entry还是个弱引用
ThreadLocalMap.Entry e = map.getEntry(this);//this就是ThreadLocal对象本身
if (e != null) {
@SuppressWarnings("unchecked")
T result = (T)e.value;
//把变量maBaoGuo就返回了
return result;
}
}
//初始化操作,返回null的同时,里面会给当前线程Thread的变量threadLocals赋值new ThreadLocalMap(this, firstValue);,这个threadLocals变量的类型是ThreadLocal.ThreadLocalMap,再强调下,threadLocals是thread类里面的变量。
return setInitialValue();
}
public T get() {
//获取当前线程,线程互不干扰的操作ThreadLocal的原因就是,它的set、get方法是要先获取当前线程,然后修改、操作这个线程对象的成员属性
Thread t = Thread.currentThread();
//ThreadLocalMap 是一个静态内部类,getMap方法返回的就是当前线程的变量threadLocals
ThreadLocalMap map = getMap(t);
//初始threadLocals是null,调用set方法以后才会有值
if (map != null) {
//ThreadLocalMap有个静态内部类Entry,key是ThreadLocal,值是我们自己设定的maBaoGuo,即类型是当初定义ThreadLocal类型的变量时,泛型的类型。且这个Entry还是个弱引用
ThreadLocalMap.Entry e = map.getEntry(this);//this就是ThreadLocal对象本身
if (e != null) {
@SuppressWarnings("unchecked")
T result = (T)e.value;
//把变量maBaoGuo就返回了
return result;
}
}
//初始化操作,返回null的同时,里面会给当前线程Thread的变量threadLocals赋值new ThreadLocalMap(this, firstValue);,这个threadLocals变量的类型是ThreadLocal.ThreadLocalMap,再强调下,threadLocals是thread类里面的变量。
return setInitialValue();
}
ThreadLocalMap有个静态内部类Entry,为什么entry对象里的 k是弱引用指向这个ThreadLocal对象
maBaoGuo会有两个引用,一个是强引用,另外一个就是当前线程里面的ThreadLocalMap的内部类entry又有一个弱引用。
方法执行完毕后,栈帧销毁,强引用 maBaoGuo也就没有了,但此时线程的ThreadLocalMap里某个entry的 k 引用还指向这个对象。若这个k 引用是强引用,就会导致k指向的ThreadLocal对象及v指向的对象不能被gc回收,造成内存泄漏,但是弱引用就不会有这个问题(弱引用及强引用等这里不说了)。使用弱引用,就可以使ThreadLocal对象在方法执行完毕后顺利被回收,而且在entry的k引用为null后,再调用get,set或remove方法时,就会尝试删除key为null的entry,可以释放value对象所占用的内存。
概括说就是:在方法中新建一个ThreadLocal对象,就有一个强引用指向它,在调用set()后,线程的ThreadLocalMap对象里的Entry对象又有一个引用 k 指向它。如果后面这个引用 k 是强引用就会使方法执行完,栈帧中的强引用销毁了,对象还不能回收,造成严重的内存泄露。
那假如引用为null后,我不再调用get,set或remove方法了呢?不还是会内存泄漏,那没办法,我们只能在代码里,要在不使用某个ThreadLocal对象后,手动调用remove方法来删除它,比如:maBaoGuo.remove();,避免maBaoGuo=null的操作
当然,可以等到线程运行结束后,整个Map都会被回收,但很多线程要运行很久,在线程结束之前,便会一直占着内存空间
尤其是在线程池中,不仅仅是内存泄露的问题,因为线程池中的线程是重复使用的,意味着这个线程的ThreadLocalMap对象也是重复使用的,如果我们不手动调用remove方法,那么后面的线程就有可能获取到上个线程遗留下来的value值,造成bug。
ThreadLocalMap为什么要定义在ThreadLocal中,而不直接定义在Thread中?
一个ThreadLocal对应一个线程,但是可没人说一个线程也只能有一个ThreadLocal
将ThreadLocalMap定义在Thread类内部看起来更符合逻辑,但是ThreadLocalMap并不需要Thread对象来操作,所以定义在Thread类内只会增加一些不必要的开销
private T setInitialValue() {
T value = initialValue();
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null)
map.set(this, value);
else
//注意,value 当前为null
createMap(t, value);
return value;
}
T value = initialValue();
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null)
map.set(this, value);
else
//注意,value 当前为null
createMap(t, value);
return value;
}
void createMap(Thread t, T firstValue) {
t.threadLocals = new ThreadLocalMap(this, firstValue);
}
t.threadLocals = new ThreadLocalMap(this, firstValue);
}
maBaoGuo.set()方法
假如set语句是,maBaoGuo.set(new HashMap<>());
源码分析:
public void set(T value) {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null)
map.set(this, value);//this是对象本身
else
createMap(t, value);
}
public void set(T value) {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null)
map.set(this, value);//this是对象本身
else
createMap(t, value);
}
private void set(ThreadLocal<?> key, Object value) {
//table表示每个线程里有多个ThreadLocal变量
Entry[] tab = table;
int len = tab.length;
//找到通过key找到当前threadlocal在entry[]中的下标位置
int i = key.threadLocalHashCode & (len-1);
//因为存储到table时,可能发生碰撞,set时的解决方案是找下一个下标中有没有值,如果没有就利用,所以会在此处用for循环去遍历,而不一定直接利用计算出来的下标就能找到你想要的entry(上面说过ThreadLocalMap有个静态内部类Entry,key是ThreadLocal)
for (Entry e = tab[i];
e != null;
e = tab[i = nextIndex(i, len)]) {
ThreadLocal<?> k = e.get();
//代表从table里找到了entry,直接替换
if (k == key) {
e.value = value;
return;
}
//如果==null,说明这个位置还没人用,直接set
if (k == null) {
replaceStaleEntry(key, value, i);
return;
}
}
tab[i] = new Entry(key, value);
int sz = ++size;
//如果没找到位置,说明已经空间不足,这时候对table进行扩容生成新的 Entry[],且重新rehash
if (!cleanSomeSlots(i, sz) && sz >= threshold)
rehash();
}
//table表示每个线程里有多个ThreadLocal变量
Entry[] tab = table;
int len = tab.length;
//找到通过key找到当前threadlocal在entry[]中的下标位置
int i = key.threadLocalHashCode & (len-1);
//因为存储到table时,可能发生碰撞,set时的解决方案是找下一个下标中有没有值,如果没有就利用,所以会在此处用for循环去遍历,而不一定直接利用计算出来的下标就能找到你想要的entry(上面说过ThreadLocalMap有个静态内部类Entry,key是ThreadLocal)
for (Entry e = tab[i];
e != null;
e = tab[i = nextIndex(i, len)]) {
ThreadLocal<?> k = e.get();
//代表从table里找到了entry,直接替换
if (k == key) {
e.value = value;
return;
}
//如果==null,说明这个位置还没人用,直接set
if (k == null) {
replaceStaleEntry(key, value, i);
return;
}
}
tab[i] = new Entry(key, value);
int sz = ++size;
//如果没找到位置,说明已经空间不足,这时候对table进行扩容生成新的 Entry[],且重新rehash
if (!cleanSomeSlots(i, sz) && sz >= threshold)
rehash();
}
应用场景:用redis实现分布式可重入锁,用到TreadLocal实现可重入的效果
加锁:
String uuid = UuidUtil.uuid();
distributeLock.distributeLock(key, uuid, seconds)
String uuid = UuidUtil.uuid();
distributeLock.distributeLock(key, uuid, seconds)
假如定义:ThreadLocal<Map<String,Integer>> REENTRANT_LOCAL_MAP = new ThreadLocal<>();
当第一次加锁时,在Threadlocal对象里新建TreadLocalMap,并且赋值value,代码:REENTRANT_LOCAL_MAP.set(new HashMap<>());
判断当前加锁次数:Integer reentrantCount = REENTRANT_LOCAL_MAP.get().getOrDefault(key,0);
大于0,表示已经加过锁,可重入,接着执行:REENTRANT_LOCAL_MAP.get().put(key,reentrantCount);,然后返回加锁成功
等于0;进行第一次加锁往redis存值后(当然在别的线程占得锁的情况下不会成功加锁),
不成功怎么办?while循环自旋,当前线程就阻塞
假如成功加锁后再执行:REENTRANT_LOCAL_MAP.get().put(key,1);
不成功怎么办?while循环自旋,当前线程就阻塞
假如成功加锁后再执行:REENTRANT_LOCAL_MAP.get().put(key,1);
解锁:distributeLock.releaseLock(key, uuid);
判断当前加锁次数:Integer reentrantCount = REENTRANT_LOCAL_MAP.get().get(key);
大于0,减1,然后再判断是否==0
假如==0,释放锁的同时执行:REENTRANT_LOCAL_MAP.get().remove(key);
为什么用uuid来保证全局唯一
首先这个跟可重入没关系,这是分布式锁要考虑的。顺便写一下
加锁的时候把uuid存到redis的value里,解锁时判断传参是否和redis中存的value一致,达到的目的就是保证线程自己加的锁,自己解锁,防止别的线程来解自己的锁
用treadID会保证唯一吗?理论上不会,因为多台服务器之间创建的线程id有可能重复。因为线程id不能保证全局唯一
应用场景:
如果你不需要多个线程共享读写一个数据的话,可以让每个线程保持一个本地变量的副本的话,那么你其实可以搞一个ThreadLocal,让每个线程都维护一个变量的副本,每个线程就操作自己本地的副本就可以了
CountDownLatch、CyclicBarrier、Semaphore
CountDownLatch(线程计数器)
比如有一个任务 A,它要等待其他 4 个任务执行完毕之后才能执行,此时就可以利用 CountDownLatch
来实现这种功能了
来实现这种功能了
在线程外定义:final CountDownLatch latch = new CountDownLatch(2);
线程内部start:latch.countDown();,每次减1
线程外部latch.await();等待两个线程执行完毕
继续执行主线程
CyclicBarrier
回环栅栏-等待至 barrier 状态再全部同时执行
字面意思回环栅栏,通过它可以实现让一组线程等待至某个状态之后再全部同时执行。叫做回环
是因为当所有等待线程都被释放以后,CyclicBarrier 可以被重用。我们暂且把这个状态就叫做
barrier,当调用 await()方法之后,线程就处于 barrier 了。
是因为当所有等待线程都被释放以后,CyclicBarrier 可以被重用。我们暂且把这个状态就叫做
barrier,当调用 await()方法之后,线程就处于 barrier 了。
await 方法, 2 个重载版本
public int await():用来挂起当前线程,直至所有线程都到达 barrier 状态再同时执行后续任
务
务
public int await(long timeout, TimeUnit unit):让这些线程等待至一定的时间,如果还有
线程没有到达 barrier 状态就直接让到达 barrier 的线程执行后续任务
线程没有到达 barrier 状态就直接让到达 barrier 的线程执行后续任务
CyclicBarrier barrier = new CyclicBarrier(N);
for(int i=0;i<N;i++)
new Writer(barrier).start();
new Writer(barrier).start();
static class Writer extends Thread{
private CyclicBarrier cyclicBarrier;
public Writer(CyclicBarrier cyclicBarrier) {
this.cyclicBarrier = cyclicBarrier;
}
private CyclicBarrier cyclicBarrier;
public Writer(CyclicBarrier cyclicBarrier) {
this.cyclicBarrier = cyclicBarrier;
}
public void run() {
try {
Thread.sleep(5000); //以睡眠来模拟线程需要预定写入数据操作
System.out.println("线程"+Thread.currentThread().getName()+"写入数据完
毕,等待其他线程写入完毕");
cyclicBarrier.await();
try {
Thread.sleep(5000); //以睡眠来模拟线程需要预定写入数据操作
System.out.println("线程"+Thread.currentThread().getName()+"写入数据完
毕,等待其他线程写入完毕");
cyclicBarrier.await();
Semaphore
信号量-控制同时访问的线程个数,通过acquire() 获取一个许可,如果没有就等待,而 release() 释放一个许可
1. public void acquire(): 用来获取一个许可,若无许可能够获得,则会一直等待,直到获得许
可。
2. public void acquire(int permits):获取 permits 个许可
3. public void release() { } :释放许可。注意,在释放许可之前,必须先获获得许可。
4. public void release(int permits) { }:释放 permits 个许可
上面 4 个方法都会被阻塞,如果想立即得到执行结果,可以使用下面几个方法
可。
2. public void acquire(int permits):获取 permits 个许可
3. public void release() { } :释放许可。注意,在释放许可之前,必须先获获得许可。
4. public void release(int permits) { }:释放 permits 个许可
上面 4 个方法都会被阻塞,如果想立即得到执行结果,可以使用下面几个方法
1. public boolean tryAcquire():尝试获取一个许可,若获取成功,则立即返回 true,若获取失
败,则立即返回 false
2. public boolean tryAcquire(long timeout, TimeUnit unit):尝试获取一个许可,若在指定的
时间内获取成功,则立即返回 true,否则则立即返回 false
3. public boolean tryAcquire(int permits):尝试获取 permits 个许可,若获取成功,则立即返
回 true,若获取失败,则立即返回 false
4. public boolean tryAcquire(int permits, long timeout, TimeUnit unit): 尝试获取 permits
个许可,若在指定的时间内获取成功,则立即返回 true,否则则立即返回 false
5. 还可以通过 availablePermits()方法得到可用的许可数目。
败,则立即返回 false
2. public boolean tryAcquire(long timeout, TimeUnit unit):尝试获取一个许可,若在指定的
时间内获取成功,则立即返回 true,否则则立即返回 false
3. public boolean tryAcquire(int permits):尝试获取 permits 个许可,若获取成功,则立即返
回 true,若获取失败,则立即返回 false
4. public boolean tryAcquire(int permits, long timeout, TimeUnit unit): 尝试获取 permits
个许可,若在指定的时间内获取成功,则立即返回 true,否则则立即返回 false
5. 还可以通过 availablePermits()方法得到可用的许可数目。
例子:若一个工厂有 5 台机器,但是有 8 个工人,一台机器同时只能被一个工人使用
Semaphore semaphore = new Semaphore(5); //机器数目
public void run() {
try {
semaphore.acquire();
System.out.println("工人"+this.num+"占用一个机器在生产...");
Thread.sleep(2000);
System.out.println("工人"+this.num+"释放出机器");
semaphore.release();
try {
semaphore.acquire();
System.out.println("工人"+this.num+"占用一个机器在生产...");
Thread.sleep(2000);
System.out.println("工人"+this.num+"释放出机器");
semaphore.release();
Semaphore 其实和锁有点类似,它一般用于控制对某组资源的访问权限
CountDownLatch和CyclicBarrier区别
CountDownLatch 一般用于某个线程 A 等待若干个其他线程执行完任务之后,它才执行;
CyclicBarrier 一般用于一组线程互相等待至某个状态,然后这一组线程再同时执行
另外,CountDownLatch 是不能够重用的,而 CyclicBarrier 是可以重用的。
jdk并发包
java.util.concurrent
Condition
java 1.5中才出现的,它用来替代传统的Object的wait()、notify()实现线程间的协作
使用Condition的await()、signal()这种方式实现线程间协作更加安全和高效
await对应Object的wait()
signal()对应Object的notify()
Condition依赖于Lock接口,生成一个Condition的基本代码是lock.newCondition()
BlockingQueue
BlockingQueue通常用于一个线程生产对象,另外一个线程消费这些对象的场景
对应的方法中都使用了锁, 所以不会出现线程安全问题
ReentrantLock lock = this.lock
插入
add
无法立即执行,抛一个异常
offer
无法立即执行,返回一个特定的值(常常是 true / false)。
offer(o,timeOut,timeUnit)
发生阻塞,直到能够执行,但等待时间不会超过给定值
put
阻塞
移除
remove
同上
poll
同上
take
同上
poll(o,timeOut,timeUnit)
同上
元素检查
element
无法立即执行,抛一个异常
peek
无法立即执行,返回一个特定的值(常常是 true / false)。
阻塞队列
通过Condition的await、signal实现
所谓的阻塞添加是指当阻塞队列元素已满时,队列会阻塞加入元素的线程,直队列元素不满时才重新唤醒线程执行元素加入操作
阻塞删除是指在队列元素为空时,删除队列元素的线程将被阻塞,直到队列不为空再执行删除操作(一般都会返回被删除的元素)
实现类
ArrayBlockingQueue
ArrayBlockingQueue queue = new ArrayBlockingQueue(1024);
ArrayBlockingQueue是有界的,必须指定队列的大小
ArrayBlockingQueue中的锁是没有分离的,即生产和消费用的是同一个锁
基于数组,在生产和消费的时候,是直接将枚举对象插入或移除的,不会产生或销毁任何额外的对象实例
LinkedBlockingQueue
没有定义上限将使用Integer.MAX_VALUE( 2 的 31 次方),也可以人为定义大小
LinkedBlockingQueue中的锁是分离的,即生产用的是putLock,消费是takeLock
基于链表,在生产和消费的时候,需要把枚举对象转换为Node<E>进行插入或移除,会生成一个额外的Node对象
需要高效并发地处理大批量数据的系统中,其对于GC的影响还是存在一定的区别
SynchronousQueue
内部只能容纳一个元素,如果队列已有一元素的话,试图插入一个元素会阻塞
直到另一个线程将该元素从队列中抽走,注意与线程池结合,不放入队列,直接执行
DelayQueue
class DelayQueue<E extends Delayed> extends AbstractQueue<E>
implements BlockingQueue<E> {
implements BlockingQueue<E> {
DelayQueue 将会在每个元素的 getDelay() 方法返回的值的时间段之后才释放掉该元素
如果返回的是 0 或者负值,延迟将被认为过期,该元素将会在 DelayQueue 的下一次 take 被调用的时候被释放掉
Delayed 接口也继承了 java.lang.Comparable 接口,这也就意味着 Delayed 对象之间可以进行对比。这个可能在对 DelayQueue 队列中的元素进行排序时有用
PriorityQueue
public PriorityBlockingQueue(int initialCapacity,
Comparator<? super E> comparator) {
Comparator<? super E> comparator) {
优先级队列的元素必须实现Comparable接口
闭锁 CountDownLatch
CountDownLatch以一个给定的数量初始化,countDown()每被调用一次,这一数量就减一,通过调用await()方法,线程可以阻塞等待这一数量到达零
栅栏 CyclicBarrier
就是一个所有线程必须等待的栅栏,所有(一定数量的)线程等待栅栏CyclicBarrier达成,所有线程将释放掉继续运行
执行器服务ExecutorService
Executor
接口,只有一个execute(Runnable command)方法
ExecutorService 接口表示一个异步执行机制,使我们能够在后台执行任务
继承Executor
ExecutorService 实现就是一个线程池实现
ExecutorService executorService = Executors.newFixedThreadPool(10);//return new ThreadPoolExecutor
executeService.execute(new Runnable(){
public void run(){
System.out.println("Asynchronous task");
}
});
executeService.execute(new Runnable(){
public void run(){
System.out.println("Asynchronous task");
}
});
Executors工具类
ThreadPoolExecutor实现了ExecutorService
ExecutorService扩展类
ScheduledExecutorService
继承ExecutorService,是一个接口
它能够将任务延后执行,或者间隔固定时间多次执行。 任务由一个工作者线程异步执行,而不是由提交任务给 ScheduledExecutorService 的那个线程执行
ScheduledFuture scheduledFuture = scheduledExecutorService.schedule(new Callable() {
public Object call() throws Exception {
System.out.println("Executed!");
return "Called!";
}
}, 5, TimeUnit.SECONDS);
public Object call() throws Exception {
System.out.println("Executed!");
return "Called!";
}
}, 5, TimeUnit.SECONDS);
ScheduledThreadPoolExecutor
是 ScheduledExecutorService接口的一个实现类
方法
schedule (Callable task, long delay, TimeUnit timeunit)
方法计划指定的 Callable 在给定的延迟之后执行
scheduleAtFixedRate (Runnable, long initialDelay, long period, TimeUnit timeunit)
规划一个任务将被定期执行。该任务将会在首个 initialDelay 之后得到执行
scheduleWithFixedDelay
延迟是执行结束之间的间隔,而不是执行开始之间的间隔
ThreadPoolExecutor
ThreadPoolExecutor 是 ExecutorService 接口的一个实现类
当一个任务委托给线程池时,如果池中线程数量低于 corePoolSize,一个新的线程将被创建,即使池中可能尚有空闲线程。
如果内部任务队列已满,而且有至少 corePoolSize 正在运行,但是运行线程的数量低于 maximumPoolSize,一个新的线程将被创建去执行该任务。
如果内部任务队列已满,而且有至少 corePoolSize 正在运行,但是运行线程的数量低于 maximumPoolSize,一个新的线程将被创建去执行该任务。
Executors提供的线程池配置方案
构造一个固定线程数目的线程池
ExecutorService executorService2 = Executors.newFixedThreadPool(10);
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
配置的corePoolSize与maximumPoolSize大小相同
同时使用了一个无界LinkedBlockingQueue存放阻塞任务,因此多余的任务将存在再阻塞队列,不会由RejectedExecutionHandler处理
构造一个缓冲功能的线程池
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
无容量的阻塞队列 SynchronousQueue,因此任务提交之后,将会创建新的线程执行;线程空闲超过60s将会销毁
只支持一个线程的线程池
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
无界阻塞队列LinkedBlockingQueue;保证任务由一个线程串行执行
构造有定时功能的线程池
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory) {
super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
new DelayedWorkQueue(), threadFactory);
}
无界延迟阻塞队列DelayedWorkQueue;有意思的是:maximumPoolSize=Integer.MAX_VALUE,由于DelayedWorkQueue是无界队列,所以这个值是没有意义的
定制属于自己的非阻塞线程池
ExecutorService pool = new ThreadPoolExecutor(
10,
30,
30,
TimeUnit.MINUTES,
new ArrayBlockingQueue<Runnable>(10),
new CustomThreadFactory(),
new CustomRejectedExecutionHandler());
}
10,
30,
30,
TimeUnit.MINUTES,
new ArrayBlockingQueue<Runnable>(10),
new CustomThreadFactory(),
new CustomRejectedExecutionHandler());
}
拒绝策略
定义一个Handler例如:RejectedExecutionHandler handler = new ThreadPoolExecutor.AbortPolicy();
1,ThreadPoolExecutor.AbortPolicy()抛出 RejectedExecutionException
2,CallerRunsPolicy调用当前线程池的所在的线程去执行被拒绝的任务
3,DiscardOldestPolicy抛弃任务队列中最旧的任务
4,ThreadPoolExecutor.DiscardPolicy丢弃被拒绝的任务
往线程池加入线程
executorService.execute(new Runnable() {
public void run() {
System.out.println("Asynchronous task");
}
});
public void run() {
System.out.println("Asynchronous task");
}
});
execute(Runnable)方法要求一个java.lang.Runnable对象,然后对它进行异步执行
submit(Runnable)也要求一个Runnable实现类,返回一个Future对象用来检查Runnable是否已经执行完毕
Future future = executorService.submit(new Callable()
ExecutorService executorService = Executors.newSingleThreadExecutor();
executorService.execute(new Runnable() {
public void run() { System.out.println("Asynchronous task"); }
});
executorService.shutdown();
executorService.execute(new Runnable() {
public void run() { System.out.println("Asynchronous task"); }
});
executorService.shutdown();
Callable接口及Futrue接口详解
创建线程一般有两种方式,一种是继承Thread类,一种是实现Runnable接口。然而,这两种方式的缺点是在线程任务执行结束后,无法获取执行结果
Java中,也提供了使用Callable和Future来实现获取任务结果的操作。Callable用来执行任务,产生结果,而Future用来获得结果。
class AddNumberTask implements Callable<Integer> {
public AddNumberTask() {
}
@Override
public Integer call() throws Exception {
System.out.println("####AddNumberTask###call()");
Thread.sleep(5000);
return 5000;
}
public AddNumberTask() {
}
@Override
public Integer call() throws Exception {
System.out.println("####AddNumberTask###call()");
Thread.sleep(5000);
return 5000;
}
ExecutorService executor = Executors.newCachedThreadPool();
Future<Integer> future = executor.submit(new AddNumberTask());
System.out.println(Thread.currentThread().getName() + "线程执行其他任务");
Integer integer = future.get();
System.out.println(integer);
Future<Integer> future = executor.submit(new AddNumberTask());
System.out.println(Thread.currentThread().getName() + "线程执行其他任务");
Integer integer = future.get();
System.out.println(integer);
Callable接口
使线程返回结果
实现Callable而必须重写call方法
创建线程
通过实现callable接口的方式,可以创建一个线程
需要重写其中的call方法。启动线程时,需要新建一个Callable的实例
再用FutureTask实例包装它,最终,再包装成Thread实例,调用start方法启动
Futrue接口
当call()方法完成时,结果必须存储在主线程已知的对象中,以便主线程可以知道该线程返回的结果
cancel(boolean mayInterrupt)
用于停止任务。如果尚未启动,它将停止任务。如果已启动,则仅在mayInterrupt为true时才会中断任
get()
用于获取任务的结果。如果任务完成,它将立即返回结果,否则将等待任务完成,然后返回结果
isDone()
如果任务完成,则返回true,否则返回false
区别
一个产生结果,一个拿到结果
Callable接口类似于Runnable,从名字就可以看出来了,但是Runnable不会返回结果
这个返回值可以被Future拿到,也就是说,Future可以拿到异步执行任务的返回值
java.util.concurrent.locks 包
Lock 接口
实现类ReentrantLock
方法
lock()
调用 lock() 方法的线程将会阻塞
lockInterruptibly() 方法将会被调用线程锁定,除非该线程被打断
如果一个线程在通过这个方法来锁定 Lock 对象时进入阻塞等待,而它被打断了的话,该线程将会退出这个方法调用
tryLock()
试图立即锁定 Lock 实例。如果锁定成功,它将返回 true,如果 Lock 实例已被锁定该方法返回 false。这一方法永不阻塞
tryLock(long timeout, TimeUnit timeUnit)
类似于 tryLock() 方法,除了它在放弃锁定 Lock 之前等待一个给定的超时时间之外
ReadWriteLock 接口
实现类ReentrantReadWriteLock
ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
ReadLock rl = readWriteLock.readLock();
WriteLock wl = readWriteLock.writeLock();
ReadLock rl = readWriteLock.readLock();
WriteLock wl = readWriteLock.writeLock();
java.util.concurrent.atomic包
原子类
AtomicBoolean atomicBoolean = new AtomicBoolean(); //默认false
getAndSet() 方法
交换一个 AtomicBoolean 实例的值。getAndSet() 方法将返回 AtomicBoolean 当前的值,并将为 AtomicBoolean 设置一个新值
AtomicInteger atomicInteger = new AtomicInteger(); //默认为0
通过了一个原子性的 compareAndSet() 方法。这一方法将 AtomicInteger 实例的当前值与期望值进行比较,如果二者相等,为 AtomicInteger 实例设置一个新值
AtomicReference
提供了一个可以被原子性读和写的对象引用变量
原子性的意思是多个想要改变同一个 AtomicReference 的线程不会导致 AtomicReference 处于不一致的状态
非泛型
String initialReference = "the initially referenced string";
AtomicReference atomicReference = new AtomicReference(initialReference);
String reference = (String) atomicReference.get();
AtomicReference atomicReference = new AtomicReference(initialReference);
String reference = (String) atomicReference.get();
泛型
AtomicReference<String> atomicReference = new AtomicReference<String>("first value referenced");
String reference = atomicReference.get();
String reference = atomicReference.get();
get() 方法来获取保存在 AtomicReference 里的引用
如何在两个线程之间共享数据
将数据抽象成一个类,并将对这个数据的操作作为这个类的方法
然后把实例对象传入Runnable
然后把实例对象传入Runnable
假如想要共享的数据是private int j=0;
public class MyData {
private int j=0;
public synchronized void add(){
j++;
}
public synchronized void dec(){
j--;
}
public int getData(){
return j;
} }
private int j=0;
public synchronized void add(){
j++;
}
public synchronized void dec(){
j--;
}
public int getData(){
return j;
} }
public class AddRunnable implements Runnable{
MyData data;
public AddRunnable(MyData data){
this.data= data;
}
public void run() {
data.add();
} }
MyData data;
public AddRunnable(MyData data){
this.data= data;
}
public void run() {
data.add();
} }
Runnable 对象作为一个类的内部类,共享数据作为这个类的成员变量,每个线程对共享数
据的操作方法也封装在外部类,以便实现对数据的各个操作的同步和互斥,作为内部类的各
个 Runnable 对象调用外部类的这些方法。
据的操作方法也封装在外部类,以便实现对数据的各个操作的同步和互斥,作为内部类的各
个 Runnable 对象调用外部类的这些方法。
public class MyData {
private int j=0;
public synchronized void add(){
private int j=0;
public synchronized void add(){
public static void main(String[] args) {
final MyData data = new MyData();
for(int i=0;i<2;i++){
new Thread(new Runnable(){
public void run() {
data.add();
}
}).start();
final MyData data = new MyData();
for(int i=0;i<2;i++){
new Thread(new Runnable(){
public void run() {
data.add();
}
}).start();
copyonwrite
做Write更新的操作时,进行复制一个副本出来进行lock
copyonwrite的机制虽然是线程安全的,但是在add操作的时候不停的拷贝是一件很费时的操作
内部用到了ReentrantLock,写的时候用到lock
如果不用copy, 只用了写lock, 只能保证写的安全,如果不用copy,那么读是不安全的
例如,迭代器正在迭代,然后就被删除了一个,那么会有问题
1,写操作效率其实很低, 对于数据量比较大, 并且写操作比较频繁的场景是很不合适的
2,读和写其实是分隔开了的, 除了写在同步时(setArray), 这两种操作不会互相影响,所以适合读写不互斥,读多写少的并发场景
举例:
1,kafka生产端,写入缓存时,batchs用的就是copyonwrite的思想,自己封装了一个copyonwriteMap
通过topic分区,从copyonwriteMap获取一个队列,队列里包含了多个batch
2,用于读多写少的场景,比如白名单,黑名单,搜索中有一些关键字不可以搜索,这些关键字放在一个黑名单中,只有偶尔才更新黑名单
优点:CopyOnWrite容器可以并发的进行读操作,而不需要加锁,因为 当前容器不会添加任何元素,所以这也是一种读写分离的思想,读和写的操作分开了
缺点
1.内存占用问题,产生了两个容器
2.只能保持数据的最终一致性,无法保持 实时性,所以如果希望读到新数据,不要用copyOnWrite
2.只能保持数据的最终一致性,无法保持 实时性,所以如果希望读到新数据,不要用copyOnWrite
0 条评论
下一页