并发
2024-12-18 21:15:46 0 举报
AI智能生成
java并发
作者其他创作
大纲/内容
基础
并发是指1s内处理的请求数
之所以有并发的概念是因为CPU可以分时间片运行
最小的调度单元是线程
之所以有并发的概念是因为CPU可以分时间片运行
最小的调度单元是线程
两个维度:并发和并行
并发:服务端能承载的吞吐量,Qps,单核也可以并发,通过CPU时间片切换
并行:当前CPU能承载的线程数量,能同时执行。
并行:当前CPU能承载的线程数量,能同时执行。
锁
同步锁
Syncronized
可重入锁
是一种特殊的互斥锁,它允许同一个线程在持有锁的情况下再次获取该锁。
也就是说,同一个线程可以多次获取同一个可重入锁,而不会发生死锁。
也就是说,同一个线程可以多次获取同一个可重入锁,而不会发生死锁。
锁关联了线程和计数器
乐观锁
悲观锁
类锁
作用范围:针对类,无论多少对象,类锁只有一个
获取方式:通过Synchronized关键字加载静态方法、静态变量或者使用Class对象来获取
影响范围:会影响整个类所有对象实例上的其他线程
对象锁
作用范围:针对类的实例,每个对象实例有自己的锁
获取方式:Synchronized关键字加载代码块或者实例方法上
影响范围:不同实例之间的线程不受影响
volitale
ThreadLocal
LockSupport
park()挂起线程
unpark()唤醒线程
CAS
CompareAndSwap
比较并交换
CompareAndSwap
比较并交换
特性
无锁、非阻塞、线程安全
用于保证共享变量的原子性更新
包含三个操作:内存位置、预期原值与更新值
执行CAS时,将内存位置的值与预期原值比较,如果相同,将该位置更新为新值;
不过不匹配,不做任何操作。
不过不匹配,不做任何操作。
多个线程同时进行CAS操作,只有一个会成功
如何保证原子性
是由单个CPU执行实现的,这个指令在执行期间不会被打断
从而保证原子性
从而保证原子性
属于一条原子指令
局限
ABA问题,A变成B又变成A,CAS发现值没有变化,其实已经发生了变化
解决:AtomicStamptedReference,加时间戳或者版本号
解决:AtomicStamptedReference,加时间戳或者版本号
循环时间开销大
只保证一个共享变量的原子操作
AQS同步队列
(AbstractQueuedSynchronizer)
(AbstractQueuedSynchronizer)
FIFO先进先出的队列
volatile 配合 CAS
基于LockSupport的挂起和唤醒
volatile 配合 CAS
基于LockSupport的挂起和唤醒
公平与非公平策略
公平策略:每次查看等待队列,按照队列顺去获取锁
非公平策略:先来的线程和等待队列中的线程同时竞争锁(CAS),新来的线程竞争成功,就是非公平
非公平策略的吞吐量高
工作模式
独占模式
同一时间只能有一个线程获取锁
同一时间只能有一个线程获取锁
共享模式
同一时间可以有多个线程获取锁,比如读
同一时间可以有多个线程获取锁,比如读
工作原理
同步状态
使用一个volatile整形变量state表示同步状态
独占锁:state=0表示未持有;state=1表示有线程持有
共享锁:state表示当前持有的读锁数量
独占锁:state=0表示未持有;state=1表示有线程持有
共享锁:state表示当前持有的读锁数量
等待队列
FIFO先进先出的队列
当线程请求无法获取同步状态时,会被加入等待队列
等待队列由一个双向链表实现,每个节点表示一个等待线程
当线程请求无法获取同步状态时,会被加入等待队列
等待队列由一个双向链表实现,每个节点表示一个等待线程
实际应用
ReentrantLock
Semaphore
CountDownLatch
线程
生命周期
状态
new
new Thread() 后是new状态
ready
调用.start方法后进入ready状态
调用notify、notifyall、unpark
调用notify、notifyall、unpark
run
争夺到CPU的时间片,变为run状态
blocked
synchronized修试的方法、代码块
wait状态被notify后,重新获取锁失败
发出I/O请求时,JVM 会把该线程置为阻塞状态
wait状态被notify后,重新获取锁失败
发出I/O请求时,JVM 会把该线程置为阻塞状态
time-waitting
sleep(时间)方法,睡眠固定的时间,为time-waitting状态
wait(long)\join(long)\parkUtil(long)\parkNanos(long)
wait(long)\join(long)\parkUtil(long)\parkNanos(long)
waitting
sleep没有时间,为waitting状态
wait、join\park方法,进入waitting状态
wait、join\park方法,进入waitting状态
terminate
执行完成进入结束状态
线程间通信
以锁为中心(Object方法):wait、notify、notifyAll
针对当前线程(Thread方法):sleep、yield、join
线程方法
区别
sleep与wait的区别
sleep:不会释放锁,需要等固定的时间或者interrupt打断
wait :让线程等待,需要notify、notifyAll唤醒
sleep:不会释放锁,需要等固定的时间或者interrupt打断
wait :让线程等待,需要notify、notifyAll唤醒
yield与join的区别
yield:线程的礼让,当前线程让出时间片,再次参与争夺CPU的时间片,
只是礼让的动作,让出后可能有抢到时间片执行,也可能没抢到
join :一个线程等待另一个线程(直到结束或者持续一段时间)才执行,那么谁等待谁?
在哪个线程调用,哪个线程就会等待;调用的哪个Thread对象,就会等待哪个线程结束;
yield:线程的礼让,当前线程让出时间片,再次参与争夺CPU的时间片,
只是礼让的动作,让出后可能有抢到时间片执行,也可能没抢到
join :一个线程等待另一个线程(直到结束或者持续一段时间)才执行,那么谁等待谁?
在哪个线程调用,哪个线程就会等待;调用的哪个Thread对象,就会等待哪个线程结束;
start与run的区别
start可以重新启动一个线程,启动新的线程后调用的是run方法
run方法不可以启动一个线程,
start可以重新启动一个线程,启动新的线程后调用的是run方法
run方法不可以启动一个线程,
线程创建
继承Tread类,重写run()方法
Thread thread = new Thread()
Thread thread = new Thread()
MyThread1 thread= new MyThread1();
thread.start()
thread.start()
实现Runnable接口,实现run()方法
MyThread2 myThread=new MyThread2();
Thread thread=new Thread(myThread);
thread.start();
Thread thread=new Thread(myThread);
thread.start();
实现Callnable接口,实现run()方法
MyThread3 myThread = new MyThread3();
FutureTask<Integer> task = new FutureTask<>(myThread);
Thread thread = new Thread(task);
thread.start()
FutureTask<Integer> task = new FutureTask<>(myThread);
Thread thread = new Thread(task);
thread.start()
线程池的方式(见线程池章节)
线程池
一个池化的概念,用于管理线程的生命周期;主要是为了管理线程、避免频繁的创建于销毁、控制线程的数量等
好处
线程复用
资源控制
控制最大线程数
方便管理
核心线程、存活时间、最大线程等多种配置参数
流程
线程设置
CPU密集型
CPU核数+1
IO密集型
CPU核数 * 2
线程池关闭
shutdown(), 所有线程任务执行完成后关闭
shutdownnow(), 不等线程执行完成就立即关闭
线程池创建
ThreadPoolExecutor
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue)
Parameters:
// 核心线程数
corePoolSize - the number of threads to keep in the pool, even if they are idle, unless allowCoreThreadTimeOut is set
// 最大线程数
maximumPoolSize - the maximum number of threads to allow in the pool
// 线程空闲时间
keepAliveTime - when the number of threads is greater than the core, this is the maximum time that excess idle threads will wait for new tasks before terminating.
// 线程空闲时间单位
unit - the time unit for the keepAliveTime argument
// 任务队列
workQueue - the queue to use for holding tasks before they are executed. This queue will hold only the Runnable tasks submitted by the execute method.
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue)
Parameters:
// 核心线程数
corePoolSize - the number of threads to keep in the pool, even if they are idle, unless allowCoreThreadTimeOut is set
// 最大线程数
maximumPoolSize - the maximum number of threads to allow in the pool
// 线程空闲时间
keepAliveTime - when the number of threads is greater than the core, this is the maximum time that excess idle threads will wait for new tasks before terminating.
// 线程空闲时间单位
unit - the time unit for the keepAliveTime argument
// 任务队列
workQueue - the queue to use for holding tasks before they are executed. This queue will hold only the Runnable tasks submitted by the execute method.
任务提交
execute(),提交Runnable任务
submit(),提交Runnable和Callable任务
SpringBoot中使用
① 定义全局的线程池(或者针对不同业务定义)
@Configuration
@EnableAsync // 启用异步任务
public class AsyncRealDataThreadConfiguration {
// 声明一个线程池(并指定线程池的名字)
@Bean("realDataTaskExecutor")
public Executor asyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
//核心线程数15:线程池创建时候初始化的线程数
executor.setCorePoolSize(15);
//最大线程数50:线程池最大的线程数,只有在缓冲队列满了之后才会申请超过核心线程数的线程
executor.setMaxPoolSize(100);
//缓冲队列5:用来缓冲执行任务的队列,队列占满后才会增加核心线程数
executor.setQueueCapacity(5);
//允许线程的空闲时间300秒:当超过了核心线程出之外的线程在空闲时间到达之后会被销毁
executor.setKeepAliveSeconds(300);
// 任务拒绝
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
//线程池名的前缀:设置好了之后可以方便我们定位处理任务所在的线程池
executor.setThreadNamePrefix("RealDataTask-");
executor.initialize();
return executor;
}
}
@Configuration
@EnableAsync // 启用异步任务
public class AsyncRealDataThreadConfiguration {
// 声明一个线程池(并指定线程池的名字)
@Bean("realDataTaskExecutor")
public Executor asyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
//核心线程数15:线程池创建时候初始化的线程数
executor.setCorePoolSize(15);
//最大线程数50:线程池最大的线程数,只有在缓冲队列满了之后才会申请超过核心线程数的线程
executor.setMaxPoolSize(100);
//缓冲队列5:用来缓冲执行任务的队列,队列占满后才会增加核心线程数
executor.setQueueCapacity(5);
//允许线程的空闲时间300秒:当超过了核心线程出之外的线程在空闲时间到达之后会被销毁
executor.setKeepAliveSeconds(300);
// 任务拒绝
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
//线程池名的前缀:设置好了之后可以方便我们定位处理任务所在的线程池
executor.setThreadNamePrefix("RealDataTask-");
executor.initialize();
return executor;
}
}
②使用@Async("realDataTaskExecutor") 定义在需要异步执行的方法上
@Scheduled(initialDelay = 1 * 60 * 1000, fixedRate = 1 * 60 * 1000)
@Async("realDataTaskExecutor")
public void judgePowerOutage() { ... }
@Scheduled(initialDelay = 1 * 60 * 1000, fixedRate = 1 * 60 * 1000)
@Async("realDataTaskExecutor")
public void judgePowerOutage() { ... }
JAVA并发工具(JUC)
ReentrantLock
可重入锁、可中断锁、超时性、公平性(公平和非公平)
子主题
Semaphore
信号灯
信号灯
1、Semaphore sp = new Semaphore(int permits) 接受一个整数型的参数,表示有几盏灯。
线程可以通过semaphore.acquire()获取一盏信号灯,通过semaphore.release()释放。
它与 Lock 的区别就是 Lock 只能是一个锁,而 Semaphore 更像是多个锁的一个集合,
像一个阻塞队列一样,当队列中的锁用完了,而又需要锁的时候,就必须等待其他的线程释放锁
2、非重入锁
3、信号量类基于AQS的共享锁实现,有公平锁和非公平锁两个版本
线程可以通过semaphore.acquire()获取一盏信号灯,通过semaphore.release()释放。
它与 Lock 的区别就是 Lock 只能是一个锁,而 Semaphore 更像是多个锁的一个集合,
像一个阻塞队列一样,当队列中的锁用完了,而又需要锁的时候,就必须等待其他的线程释放锁
2、非重入锁
3、信号量类基于AQS的共享锁实现,有公平锁和非公平锁两个版本
构造方法
public Semaphore(int permits):permits 表示许可线程的数量
public Semaphore(int permits, boolean fair):fair 表示公平性,如果设为 true,表示是公平,那么等待最久的线程先执行
常用方法
public void acquire():表示一个线程获取1个许可,那么线程许可数量相应减少一个
public void release():表示释放1个许可,那么线程许可数量相应会增加
其他方法
void acquire(int permits):表示一个线程获取n个许可,这个数量由参数permits决定
void release(int permits):表示一个线程释放n个许可,这个数量由参数permits决定
int availablePermits():返回当前信号量线程许可数量
int getQueueLength(): 返回等待获取许可的线程数的预估值
void release(int permits):表示一个线程释放n个许可,这个数量由参数permits决定
int availablePermits():返回当前信号量线程许可数量
int getQueueLength(): 返回等待获取许可的线程数的预估值
CountDownLatch
计数器
计数器
基于 AQS 构建同步器【共享模式】。
它可以让一个或多个线程等待,一直等到其他线程中执行完成一组操作;
适用于在多线程的场景需要等待所有子线程全部执行完毕之后再做操作的场景
它可以让一个或多个线程等待,一直等到其他线程中执行完成一组操作;
适用于在多线程的场景需要等待所有子线程全部执行完毕之后再做操作的场景
一般用作多线程倒计时计数器,强制它们等待其他一组任务,计数器的减法是一个不可逆的过程。
即:计数器值递减到 0 的时候,不能再复原。
即:计数器值递减到 0 的时候,不能再复原。
原理概述
1.通过CountDownLatch(int count)构造函数,设置AQS的state值为,构造函数传递的state=count
2.通过countDown()方法,唤醒线程。将执行:count=state-1。判断:判断state是否等于0
不等于0,方法结束,外部的方法继续向下执行。
等于0,就会调用doReleaseShared()方法,这个方法就会自旋执行:
如果阻塞队列中有节点,且节点的状态为SIGNAL,那么就会唤醒这个节点的线程。这一步是由最后一个执行countdown方法的线程执行的。
不等于0,方法结束,外部的方法继续向下执行。
等于0,就会调用doReleaseShared()方法,这个方法就会自旋执行:
如果阻塞队列中有节点,且节点的状态为SIGNAL,那么就会唤醒这个节点的线程。这一步是由最后一个执行countdown方法的线程执行的。
3.通过await()方法,阻塞线程。判断state是否等于0
等于0,方法结束,外部的方法继续向下执行。
不等于0,调用doAcquireSharedInterruptibly(arg),阻塞当前线程,并加入CLH阻塞队列
等于0,方法结束,外部的方法继续向下执行。
不等于0,调用doAcquireSharedInterruptibly(arg),阻塞当前线程,并加入CLH阻塞队列
构造方法
CountDownLatch(int count)
常用方法
await() 调用 await() 方法的线程会被挂起,它会等待直到 count 值为 0 才继续执行
await(long timeout, TimeUnit unit) 和 await() 类似,
若等待 timeout 时长后,count 值还是没有变为 0,不再等待,
继续执行 countDown() 减少计数器的值,当计数器的值变为0时,等待的线程将被唤醒。
若等待 timeout 时长后,count 值还是没有变为 0,不再等待,
继续执行 countDown() 减少计数器的值,当计数器的值变为0时,等待的线程将被唤醒。
CyclicBarrier
栅栏
栅栏
多个线程互相等待,直到所有的线程都达到屏障点,才可以一起接着执行。
同样可以理解为内部有一个可重置的计数器,每个线程调用await()后,计数器减1,若计数器的值不为0,将会阻塞该线程。
当最后一个线程调用await()后,计数器为0,将会唤醒所有阻塞的线程,并开启下一代,重置此计数器的值
同样可以理解为内部有一个可重置的计数器,每个线程调用await()后,计数器减1,若计数器的值不为0,将会阻塞该线程。
当最后一个线程调用await()后,计数器为0,将会唤醒所有阻塞的线程,并开启下一代,重置此计数器的值
构造方法
public CyclicBarrier(int parties, Runnable barrierAction)
parties线程数
barrierAction下一个屏障
parties线程数
barrierAction下一个屏障
public CyclicBarrier(int parties)
parties线程数
parties线程数
原理概述
CyclicBarrier借助ReentranLock与Condition来对线程进行阻塞的。
parties是传入构造方法的线程总数,在该CyclicBarrier实例的整个生命周期内,该值保持不变,并且会在换代的时候,使得count=parties
barrierCommand,换代任务,打破屏障前需要执行的任务,任务执行完成后(不管成功还是失败),才会唤醒所有线程
generation代表当前代,两个屏障之间称为一代,原点与第一个屏障可以称为第一代
count,计数器,每有一个线程到达屏障时,count值就会减1。一旦减为0后,则先同步执行换代任务,接着打破屏障,开启下一代,然后唤醒所有阻塞的线程,最后将count重置为parties
parties是传入构造方法的线程总数,在该CyclicBarrier实例的整个生命周期内,该值保持不变,并且会在换代的时候,使得count=parties
barrierCommand,换代任务,打破屏障前需要执行的任务,任务执行完成后(不管成功还是失败),才会唤醒所有线程
generation代表当前代,两个屏障之间称为一代,原点与第一个屏障可以称为第一代
count,计数器,每有一个线程到达屏障时,count值就会减1。一旦减为0后,则先同步执行换代任务,接着打破屏障,开启下一代,然后唤醒所有阻塞的线程,最后将count重置为parties
常用方法
int await()
int await(long timeout, TimeUnit unit)
CountDownLatch 与 CyclicBarrier的区别
CountDownLatch是一个或者多个线程等待 另外N个线程完成之后才能执行
CyclicBarrier是N个线程之间相互等待,任何一个线程没有完成,所有线程都必须等待
CyclicBarrier是N个线程之间相互等待,任何一个线程没有完成,所有线程都必须等待
CountDownLatch不可重用;
CyclicBarrier 可以重复使用
CyclicBarrier 可以重复使用
CountDownLatch 基于AQS实现
CyclicBarrier基于ReetrantLock和Condition实现
CyclicBarrier基于ReetrantLock和Condition实现
CountDownLatch不能设置的回调处理对象
CyclicBarrier可以设置回调处理对象
CyclicBarrier可以设置回调处理对象
0 条评论
下一页
为你推荐
查看更多