004 - 同步容器类
2022-03-08 09:36:22 0 举报
AI智能生成
同步容器类,系统性总结
作者其他创作
大纲/内容
这类容器有Vector、Hashtable、Stack
这类容器的方法上都加了synchronized锁, 是线程安全的实现。
Vector、Hashtable、Stack这些容器,现在几乎都不在使用
因为这些容器在多线程环境下的效率不高。
本来就线程安全实现的容器
Collections会把这些容器类的状态封装起来, 并对每个同步方法进行同步,使得每次只有一个线程能够访问容器的状态。
同步容器将所有对容器状态的访问都串行化,以实现他们之间的线程安全性。
用Collections.synchronized会把它们封装起来编程线程安全的容器
汇总
其中每个synchronizedxx都是,相当于创建了一个静态内部类
要不为啥要称Collections为集合工具类呢?
Collections.synchronizedList
Collections.synchronizedMap
举出两个例子
这些线程安全的实现
可以通过Collections源码可以看出这些线程安全的实现
虽然同步容器类都是线程安全的,但是在某些情况下需要额外的客户端加锁来保证一些复合操作的安全性
如果线程A在包含这么多元素的基础上调用getVector方法, 会得到一个数值, getVector只是取得该元素, 而并不是从vector中移除, removeVector方法是得到一个元素进行移除, 这段代码的不安全因素就是, 因为线程的时间片是乱序的, 而且getVector和remove Vector并不会保证互斥, 所以在removeVector方法把某个值比如6666移除后, vector中就不存在这个6666的元素, 此时getVector方法取得6666, 就会抛出数组越界异常。
vector的源码
为什么是数组越界异常呢?可以看一下vector的源码
如果用图表示的话,则会是下面这样
所以,从系统的层面来看,上面这段代码也要保证线程安全性才可以,也就是在客户端加锁实现,
只要让复合操作使用一把锁,那么这些操作就和其他单独的操作一样都是原子性的。
如下面例子所示
子主题
也可以通过锁住.class来保证原子性操作, 也能达到同样的效果。
在调用size和get之间, Vector的长度可能会发生变化, 这种变化在对Vector进行排序时出现, 如下所示
这种迭代的操作正确性取决于运气, 即在调用size和get之间会修改Vector, 在单线程环境中, 这种假设完全成立, 但是再有其他线程并发修改Vector时, 则可能会导致麻烦。
仍旧可以通过客户端加锁的方式来避免这种情况
这种方式为客户端的可靠性提供了保证,但是牺牲了伸缩性,而且这种在遍历过程中进行加锁,也不是我们所希望看到的。
例如
这些复合操作在没有客户端加锁的情况下是线程安全的,但是当多个线程并发修改容器时,可能会表现出意料之外的行为。
有两个及以上的方法组成的操作
若没有则添加
比如最典型的就是,若没有则添加,用伪代码表示则是
比如可以用来判断Map中是否有某个key, 如果没有则添加进Map中。
什么是复合操作?
客户端加锁的讲解
某些情况下需要额外的客户端加锁,来保证一些复合操作的安全性
由Collections.synchronizedxxx实现的非线程安全的容器
同步容器主要包括两类
客户端加锁,对可靠性提供了保证,但是牺牲了伸缩性,而且这种在遍历过程中进行加锁,不是希望看到的
诞生背景
在系统设计中,快速失效系统一种可以立即报告任何可能表明故障的情况的系统。
快速失效系统通常设计用于停止正常操作,而不是试图继续可能存在缺陷的过程。
这种设计通常会在操作中的多个点检查系统的状态,因此可以及早检测到任何故障。
快速失败模块的职责是检测错误,然后让系统的下一个最高级别处理错误。
其实,这是一种理念,说白了就是在做系统设计的时候先考虑异常情况,一旦发生异常,直接停止并上报。
什么是fail-fast/快速失效系统/快速失败机制
通常说的Java 中的fail-fast 机制,默认指的是Java 集合的一种错误检测机制。
当多个线程对部分集合进行结构上的改变的操作时,有可能会产生fail-fast 机制,
这个时候就会抛出ConcurrentModificationException(后文用CME 代替)。
因为大部分集合内部都是使用Iterator进行遍历, 在循环中使用同步锁的开销会很大,
而Iterator的创建是轻量级的, 所以在集合内部如果有并发修改的操作, 集合会进行快速失败, 也就是fail-fast
很多集合类都提供了一种fail-fast机制 、集合类中的fail-fast
明明自己的代码并没有在多线程环境中执行,
为什么会抛出这种并发有关的异常呢?
这种情况在什么情况下才会抛出呢?
代码中为什么抛出了CMException?
CMException,当方法检测到对象的并发修改,但不允许这种修改时就抛出该异常。
当发现容器在迭代过程中被修改时, 会抛出ConcurrentModificationException异常
这种快速失败不是一种完备的处理机制,而只是意的捕获并发错误。
造成这种异常的原因是由于多个线程在让历集合的同时对集合类内部进行了修改, 这也就是fail-fast机制
抛出的原则由两种
这个问题也是很经典的一个问题
使用ArrayList来举例子
一个是modCount, ,
一个是expectedModCount
该段代码会发生异常, 因为在ArrayList内部, 有两个属性
checkForComodification的判断
ArrayList在remove等对集合结构的元素造成数量上的操作会有checkForComodification的判断, 如下所示, 这也是这段代码的错误原因。
该注解还声明了另外一种方式
ConcurrentModificationException的注解
在Java 中, 如果在foreach 循环里对某些集合元素进行元素的remove/add 操作时,就会触发fail-fast 机制,进而抛出CMException。
以上代码,使用增强for 循环遍历元素,并尝试删除其中的Hollis 字符串元素
如以下代码:
运行以上代码,会抛出以下异常:
同样的,可以尝试下在增强for 循环中使用add 方法添加元素,结果也会同样抛出该异常。
看一下foreach 具体如何实现的。
使用jad 工具,对编译后的class 进行反编译,得到以下代码:
可以发现,foreach 其实是依赖了while 循环和Iterator 实现的。
把foreach 进行解语法糖
ConcurrentModificationException异常复现
java.util.ArrayList$Itr.checkForComodification(ArrayList.java:909)
通过以上代码的异常堆栈,我们可以跟踪到真正抛出异常的代码是:
该方法是在iterator.next()方法中调用的。
我们看下该方法的实现:
如上,在该方法中对modCount 和expectedModCount 进行了比较,如果二者不想等,则抛出CMException。
是ArrayList 中的一个成员变量。
它表示该集合实际被修改的次数。
List<String> userNames = new ArrayList<String>() {{add(\"Hollis\");add(\"hollis\");add(\"HollisChuang\");add(\"H\");}};当使用以上代码初始化集合之后该变量就有了。初始值为0。
modCount
是ArrayList 中的一个内部类——Itr 中的成员变量。
Iterator iterator = userNames.iterator();
以上代码,即可得到一个Itr 类,该类实现了Iterator 接口。
表示这个迭代器预期该集合被修改的次数。
其值随着Itr 被创建而初始化。
只有通过迭代器对集合进行操作,该值才会改变。
expectedModCount
那么,modCount 和expectedModCount 是什么?是什么原因导致他们的值不想等的呢?
可以看到,它只修改了modCount,并没有对expectedModCount 做任何操作。
通过翻阅代码,我们也可以发现,remove 方法核心逻辑如下:
简单画一张图描述下以上场景:
userNames.remove(userName);方法里面做了什么事情,为什么会导致expectedModCount 和modCount 的值不一样。
之所以会抛出CMException 异常,是因为代码中使用了增强for循环
而在增强for循环中, 集合遍历是通过iterator进行的, 但是元素的add/remove 却是直接使用的集合类自己的方法。
这就导致iterator在遍历时,会发现有一个元素在自己不知不觉的情况下就被删除/添加了,就会抛出一个异常,用来提示用户,可能发生了并发修改!
所以,在使用Java 的集合类时,如果发生CMException,优先考虑fail-fast有关的情况
实际上这里并没有真的发生并发,只是Iterator 使用了fail-fast 的保护机制,只要他发现有某一次修改是未经过自己进行的,那么就会抛出异常。
简单总结一下,
ConcurrentModificationException异常原理
抛出ConcurrentModificationException异常(CME)
上面的代码是一个对两个整数做除法的方法
在divide 方法中,对被除数做了个简单的检查,如果其值为0,那么就直接抛出一个异常,并明确提示异常原因。
这其实就是fail-fast 理念的实际应用。
举一个最简单的fail-fast 的例子/fail-fast 理念的实际应用
一方面,可以避免执行复杂的其他代码,
另外一方面,这种异常情况被识别之后也可以针对性的做一些单独处理。
使用fail-fast的好处,可以预先识别出一些错误情况
原因是Java 的集合类中运用了fail-fast 机制进行设计,
一旦使用不当,触发fail-fast 机制设计的代码,就会发生非预期情况。
为什么禁止在foreach 循环里进行元素的remove/add 操作
为什么说fail-fast 会有坑呢?
fail-fast机制
Java中的一种安全失败机制
它表示的是,在遍历时,不是直接在原集合上进行访问,而是先复制原有集合内容,在拷贝的集合上进行遍历。
为了避免触发fail-fast 机制,导致异常,可以使用Java 中提供的一些采用了fail-safe 机制的集合类。
这样的集合容器在遍历时,不是直接在集合内容上访问的,而是先复制原有集合内容,在拷贝的集合上进行遍历。
fail-safe 集合的所有对集合的修改都是先拷贝一份副本,然后在副本集合上进行的,并不是直接对原集合进行修改。
并且这些修改方法,如add/remove 都是通过加锁来控制并发的
是什么?
JUC包下的容器都是安全失败的, 可以在多线程条件下使用,并发修改。
JUC包下的容器都是fail-safe 的,可以在多线程下并发使用,并发修改。同时也可以在foreach 中进行add/remove 。
由于迭代时,是对原集合的拷贝,进行遍历
所以在遍历过程中,对原集合所作的修改,并不能被送代器检测到
所以不会触发ConcurrentModificationException。
不会触发ConcurrentModificationException
一种fail-safe机制的集合,它就不会出现异常
ArrayList的一种线程安全的变体
(比如add和set等)
所有可变操作都是通过对数组进行全新复制来实现的。
(因为fail-fast 的主要目的就是识别并发,然后通过异常的方式通知用户)
迭代器在迭代的过程中,不需要做fail-fast 的并发检测。
拿CopyOnWriteArrayList 这个fail-safe 的集合类来简单分析一下
CopyOnWriteArrayList的使用代码1
以上代码,使用CopyOnWriteArrayList 代替了ArrayList,就不会发生异常。
源码
CopyOnWriteArrayList的使用代码2
得到CopyOnWriteArrayList 的Iterator 之后,通过for 循环直接删除原数组中的值,最后在结尾处输出Iterator,结果发现内容如下:
迭代器遍历的是开始遍历那一刻拿到的集合拷贝,在遍历期间原集合发生的修改迭代器是不知道的。
CopyOnWriteArrayList的使用代码3
虽然基于拷贝内容的优点是避免了CME,但同样地,迭代器并不能访问到修改后的内容。
比如CopyOnWriteArrayList
fail-safe机制
讲到并发容器,就不得不提操作系统级别实现了哪些进程/线程间的井发容器
其实就是数据结构的设计
什么是操作系统级别的并发工具
E.W.Dijkstra在1965年提出的一种方法
它使用一个整形变量来累计唤醒次数, 以供之后使用。
在他的观点中, 有一个新的变量类型称作信号量(semaphore)
不需要任何唤醒
0
唤醒次数
任意正数
一个信号量的取值
用sleep来表示
down
用wakeup来表示
up
信号量有两个操作
down这个指令的操作,会检查值是否大于0
如果大于0, 则将其值减11若该值为0, 则进程将睡眠, 而且此时down操作将会继续执行。
检查数值、修改变量值以及可能发生的睡眠操作均为一个单一的、不可分割的原子操作(atomic action) 完成
原理
信号量
mutex(互斥量)
如果不需要信号量的计数能力时, 可以使用信号量的一个简单版本
0表示解锁
解锁(unlocked)
其他所有的值表示加锁
比1大的值表示加锁的次数。
加锁(locked)
互斥量是一个处于两种状态之一的共享变量
只需要一个二进制位来表示它, 不过一般情况下, 通常会用一个整型(integer) 来表示
当一个线程(或者进程) 需要访问关键区域时, 会调用mutex_lock进行加锁
如果互斥锁当前处于解锁状态(表示关键区域可用),则调用成功,并且调用线程可以自由进入关键区域。
另一方面, 如果mutex互斥量已经锁定的话, 调用线程会阻塞直到关键区域内的线程执行完毕并且调用了mutex_unlock.
如果多个线程在mutex互斥量上阻塞, 将随机选择一个线程并允许它获得锁
说明
mutex使用两个过程
在一些共享资源和一段代码中保持互斥
由于互斥的实现既简单又有效,这使得互斥量在实现用户空间线程包时非常有用。
优势
互斥量
有效的同步(synchronization)
互斥/锁定(locking)
随着并行的增加, 同步和互斥对于性能来说是非常重要的。
但是如果等待时间比较长, 那么这会浪费CPU周期。
如果进程等待时间很短, 那么自旋锁(Spinlock) 是非常有效:
如果进程很多, 那么阻塞此进程, 并仅当锁被释放时,让内核解除阻塞是更有效的方式。
不幸的是,这种方式也会导致另外的问题:它可以在进程竞争频繁的时候运行良好
但是在竞争不是很激烈的情况下,内核切换的消耗会非常大,而且更困难的是,预测锁的竞争数量更不容易。
Linux中的特性
把两者的优点结合起来
有一种有趣的解决方案是把两者的优点结合起来, 提出一种新的思想
或者是快速用户空间互斥(fast userspace mutex)
实现了基本的锁定(很像是互斥锁)
而且避免了陷入内核中, 因为内核的切换的开销非常大, 这样做可以大大提高性能
特点
提供了一个等待队列(wait queue) 允许多个进程在锁上排队等待
除非内核明确的对他们解除阻塞,否则它们不会运行。
内核服务
用户库
图解Futex
由两部分组成
Futex
Futexes
最基本的机制是使用互斥量变量, 可以锁定和解锁, 用来保护每个关键区域,
如果mutex没有加锁, 线程能够马上进入并且互斥量能够自动锁定, 从而阻止其他线程进入。
如果mutex已经加锁, 调用线程会阻塞,直到mutex解锁。
希望进入关键区域的线程,首先要尝试获取mutex.
如果多个线程在相同的互斥量上等待, 当互斥量解锁时, 只有一个线程能够进入并且重新加锁。
这些锁并不是必须的,程序员需要正确使用它们。
与互斥量有关的函数调用
Phread_mutex_init
Pthread_mutex_destroy
和想象中的一样, mutex能够被创建和销毁, 扮演这两个角色的分别是
mutex也可以通过Pthread_mutex_Lock来进行加锁, 如果互斥量已经加锁, 则会阻塞调用者。
当mutex已经被加锁时,会返回一个错误代码而不是阻塞调用者。
这个调用允许线程有效的进行忙等。
还有一个调用Pthread_mutex_try Lock用来尝试对线程加锁
最后, Pthread_mutex_unlock会对mutex解锁并且释放一个正在等待的线程。
与互斥量有关的重要的pthread函数调用
互斥量(mutex)
互斥量(mutex),可以很好的允许或阻止对关键区域的访问
条件变量(condition variables),允许线程由于未满足某些条件而阻塞
条件变量(不像是信号量)不会存在于内存中。
如果将一个信号量,传递给一个没有线程等待的条件变量,那么这个信号就会丢失
注意
下面进一步来研究线程、互斥量、条件变量之间的关联
一个线程将东西放在一个缓冲区内,由另一个线程将它们取出。
如果生产者发现缓冲区没有空槽可以使用了,生产者线程会阻塞起来直到有一个线程可以使用。
生产者使用mutex来进行原子性检查从而不受其他线程干扰。
但是当发现缓冲区已经满了以后, 生产者需要一种方法来阻塞自己并在以后被唤醒。
这便是条件变量做的工作。
重新认识一下生产者和消费者问题
与条件变量有关的重要的pthread函数调用
表中给出了一些调用用来创建和销毁条件变量。
Pthread_cond_wait
Pthread_cond_signal
条件变量上的主要属性是
阻塞的线程,通常需要等待唤醒的信号,以此来释放资源或者执行某些其他活动。
只有这样阻塞的线程才能继续工作。
条件变量允许等待与阻塞原子性的进程。
前者阻塞调用线程, 直到其他线程发出信号为止(使用后者调用)
Pthread_cond_broadcast用来唤醒多个阻塞的、需要等待信号唤醒的线程。
条件变量(condition variables)
绝大多数情况下这两种方法是一起使用的
Pthreads提供的两种同步机制(Pthreads提供了一些功能用来同步线程)
Pthreads中的互斥量与条件变量
编写更加准确无误的程序
目的
一个更高级的同步原语叫做管程(monitor)
Brinch Hansen和Hoare提出
在任何时候管程中只能有一个活跃的进程
这一特性使管程能够很方便的实现互斥操作。
一个很重要的特性
管程是编程语言的特性,所以编译器知道它们的特殊性,因此可以采用与其他过程调用不同的方法,来处理对管程的调用
如果有活跃的进程,调用进程将被挂起,直到另一个进程离开管程才将其唤醒。
如果没有活跃进程在使用管程,那么该调用进程才可以进入。
通常情况下,当进程调用管程中的程序时,该程序的前几条指令会检查管程中是否有其他活跃的进程
即使管程提供了一种简单的方式来实现互斥,但在我们看来,这还不够.
因为还需要一种在进程无法执行被阻塞,
很容易将针对缓冲区满和缓冲区空的测试放在管程程序中,
但是生产者在发现缓冲区满的时候该如何阻塞呢?
在生产者-消费者问题中,
解决的办法是引入条件变量(condition variables) 以及相关的两个操作wait和signal
当一个管程程序,发现它不能运行时(例如, 生产者发现缓冲区已满) , 它会在某个条件变量(如full) 上执行wait操作。
在前面的pthread中我们已经探讨过条件变量的实现细节了,
这个操作造成调用进程阻塞, 并且还将另一个以前等在管程之外的进程调入管程。
另一个进程, 比如消费者可以通过执行signal来唤醒阻塞的调用进程。
为什么需要引入条件变量以及相关操作?
进入管程中的互斥由编译器负责, 但是一种通用做法是使用互斥量(mutex) 和二进制信号量(binary semaphore)
由于编译器而不是程序员在操作, 因此出错的几率会大大降低
在任何时候,编写管程的程序员都无需关心编译器是如何处理的。
他只需要知道将所有的临界区转换成为管程过程即可。
程序员无需关心
通过临界区自动的互斥,管程比信号量更容易保证并行编程的正确性。
绝不会有两个进程,同时执行临界区中的代码。
更容易保证并行编程的正确性。
管程是一个编程语言的概念
编译器必须要识别管程,并用某种方式,对其互斥作出保证
C、Pascal以及大多数其他编程语言,都没有管程,所以不能依靠编译器来遵守互斥规则。
大多数编程语言都没有管程
与管程和信号量有关的另一个问题是, 这些机制都是设计用来解决访问共享内存的一个或多个CPU上的互斥问题的。
通过将信号量放在共享内存中并用TSL或XCHG指令来保护它们, 可以避免竞争,
同时具有多个CPU的情况
每个CPU都有自己的私有内存
信号量太低级了
管程在少数几种编程语言之外无法使用
它们通过网络相连,那么这些原语将会失效,
所以还需要其他方法
但是如果是在分布式系统中
不适用于分布式系统
缺点
管程
上面提到的其他方法就是消息传递(messa age passing)
这种进程间通信的方法使用两个原语send和receive
它们像信号量而不像管程, 是系统调用,而不是语言级别
如果没有消息,接受者可能被阻塞,直到接收一条消息或者带着错误码返回,
send方法用于向一个给定的目标发送一条消息
receive从一个给定的源接收一条消息。
实例如下
消息传递系统现在面临着许多信号量和管程所未涉及的问题和设计难点,尤其是对那些在网络中不同机器上的通信状况
为了防止消息丢失,发送方和接收方可以达成一致:
一旦接受到消息后, 接收方马上回送一条特殊的确认(acknowLedgement) 消息,
如果发送方在一段时间间隙内未收到确认,则重发消息。
如何保证消息不被网络丢失?重发机制
现在考虑消息本身被正确接收,而返回给发送着的确认消息丢失的情况。
发送者将重发消息,这样接受者将收到两次相同的消息。
img src=\
图解
对于接收者来说,如何区分新的消息和一条重发的老消息是非常重要的。
通常采用在每条原始消息中嵌入一个连续的序号来解决此问题。
如果接受者收到一条消息,它具有与前面某一条消息一样的序号,就知道这条消息是重复的,可以忽略。
如何保证返回给发送者的确认消息不丢失?
以便在发送或接收调用中清晰的指明进程。
如何命名进程的问题
比如客户端怎么知道它是在与一个真正的文件服务器通信
从发送方到接收方的信息有可能被中间人所篡改。
身份验证( authentication) 也是一个问题
常见问题
消息传递系统问题和设计难点
消息传递
屏障(barrier)
一种同步机制
屏障可用于一组进程同步
最后一个同步机制是准备用于进程组,而不是进程间的生产者-消费者情况的。
在某些应用中划分了若干阶段,并且规定,
除非所有的进程都就绪准备着手下一个阶段,否则任何进程都不能进入下一个阶段,
可以通过在每个阶段的结尾安装一个屏障来实现这种行为。
当一个进程到达屏障时, 它会被屏障所拦截,直到所有的屏障都到达为止。
实现原理
图解屏障
图a,有四个进程接近屏障,这意味着每个进程都在进行运算,但是还没有到达每个阶段的结尾。
图b,过了一段时间后,A、B、D进程都到达了屏障,各自进程被挂起,但此时还不能进入下个阶段,因为进程B还没有执行完毕,
结果,当最后一个C到达屏障后,这个进程组才能够进入下一个阶段。
屏障
最快的锁是根本没有锁。
问题在于没有锁的情况下,是否允许对共享数据结构的并发读写进行访问,答案当然是不可以。
假设进程A正在对一个数字数组进行排序,而进程B正在计算其平均值,而此时你进行A的移动,会导致B会多次读到重复值,而某些值根本没有遇到过。
然而,在某些情况下,可以允许写操作来更新数据结构,即便还有其他的进程正在使用,窍门在于确保每个读操作要么读取旧的版本,要么读取新的版本
上面的树中,读操作从根部到叶子遍历整个树。
加入一个新节点X后,为了实现这一操作,要让这个节点在树中可见之前使它*恰好正确*:
对节点X中的所有值进行初始化,包括它的子节点指针。然后通过原子写操作,使X称为A的子节点。
所有的读操作都不会读到前后不一致的版本
在上面的图中,我们接着移除B和D,首先,将A的左子节点指针指向C。所有原本在A中的读操作将会后续读到节点C,而永远不会读到B和D。、
也就是说,它们将只会读取到新版数据。同样,所有当前在B和D中的读操作将继续按照原始的数据结构指针并且读取旧版数据。
所有操作均能正确运行,不需要锁佳任何东西,而不需要锁住数据就能够移除B和D的主要原因就是RUC, 将更新过程中的移除和再分配过程分离开。
读-复制-更新(Ready-Copy-Update, RCU)
例如下面的树
避免锁:读-复制-更新
操作系统级别的并发工具
Vector
Hashtable
Stack
原理: 同步容器将所有对容器状态的访问都串行化,以实现他们之间的线程安全性。
这种方法的代价是严重降低了并发性能,当多个线程争抢容器锁的同时,产重降低吞吐量。
传统同步容器
JDK 1.5提供了许多种并发容器,来改进同步容器的性能
并发容器
用来替代基于散列的Map容器
ConcurrentHashMap继承了AbstractMap接口,并实现了ConcurrentMap和Seralizable接口
AbstractMap和ConcurrentMap都是Map的实现类, 只不过AbstractMap是抽象实现。
在并发集合中的位置(继承关系)
Hashtable 类(因性能差的线性安全被弃用)
HashTable底层采用数组+链表存储键值对, 由于被弃用, 后人也没有对它进行任何改进
它是一个线程安全的集合,一个线程安全的Map,所有的操作都是线程安全的
只不过Hashtable容器在激烈竞争的场景中,会表现出效率低下的现象
与新的集合实现不同,Hashtable是线程安全的, 但是因为这个线程安全, 它就被淘汰掉了
它所有的方法都被加上了synchronized关键字, 也是因为这个关键字,它注定成为了时代的弃儿。
它被淘汰的原因也主要因为两个字:性能
所有访问Hashtable的线程都想获取同一把锁
如果容器里面有多把锁,并且每一把锁都只用来锁定一段数据,
那么,当多个线程访问不同的数据段时,就不存在竞争关系。
在这种锁实现中, 任意数量的读取线程可以并发的访问Map,
执行读取操作的线程和执行写入的线程可以并发的访问Map, 并且在读取的同时,也可以并发修改Map。
图解ConcurrentHashMap采用的分段锁实现
ConcurrentHashMap采用的分段锁实现
和Hashtable的构造非常相似,但是原理大不相同
实现带来的结果是
在并发环境下,可以实现更高的吞吐量
在单线程环境下,只损失非常小的性能
底层原理:采用的分段锁实现
具有fail-fast机制
它是一种强一致性的集合
在数据不一致的情况下,会抛出ConcurrentModificationException异常
HashMap
一种弱一致性的集合
在并发修改其内部结构时, 它不会抛出ConcurrentModificationException异常, 弱一致性能够容忍并发修改。
ConcurrentHashMap
它是一种弱一致性的集合,能够容忍并发修改
一般使用的size、empty、containsKey等方法都是标准方法
其返回的结果是一定的, 包含就是包含, 不包含就是不包含
可以作为判断条件
size、empty、containsKey等方法
这些方法只是参考方法, 它不是一个精确值,
这些方法在并发场景下用处很小, 因为返回值总是在不断变化,所以这些操作的需求就被弱化了。
ConcurrentHashMap图解
标准方法如size、empty、containsKey等方法被弱化
如Hashtable
如Collections.synchronizedMap
在线程安全的Map实现都实现了独占访问, 因此只能单个线程修改Map,
在ConcurrentHashMap中,没有实现对Map加锁从而实现独占访问
ConcurrentHashMap与这些Map容器相比, 具有更多的优势和更少的劣势
只有当需要独占访问的需求时,才会使用Hashtable或者是Collections.synchronizedMap,
否则其他并发场景下, 应该使用Concurrent HashMap.
使用场景
没有实现对Map加锁,从而实现独占访问
ConcurrentHashMap(高性能的HashMap)
ConcurrentHashMap 和HashMap 的实现方式不一样,虽然都是使用桶数组实现的,但是还是有区别,ConcurrentHashMap 对桶数组进行了分段,而HashMap 并没有。
实现方式
ConcurrentHashMap 在每一个分段上都用锁进行了保护。HashMap 没有锁机制。所以,前者线程安全的,后者不是线程安全的。
锁机制与线程安全
HashMap 和ConcurrentHashMap 的区别?
PS:以上区别基于jdk1.8 以前的版本。
一个接口
四个新的方法
这四个方法都是原子性方法, 进一步扩展了Map的功能
它继承了Map接口并提供了Map接口中四个新的方法
java.util.concurrent.Concurrent Navigable Map类是java.util.NavigableMap的子类
视图就是集合中的一段数据序列
什么是视图呢?
ConcurrentNavigableMap中支持使用headMap、subMap、tailMap返回的视图
与其重新解释一下NavigableMap中找到的所有方法,不如看一下
headMap方法:headMap方法返回一个严格小于给定键的视图
tailMap方法:tailMap方法返回包含大于或等于给定键的视图
subMap方法:subMap方法返回给定两个参数的视图
ConcurrentNavigableMap中添加的方法
它支持并发访问,并且允许其视图的并发访问
descendingkeySet()
descendingMap()
navigableKeySet()
ConcurrentNavigableMap
线程安全的有序的哈希表
适用于高并发的场景
继承关系
底层数据结构是基于跳表实现的。
ConcurrentSkipListMap可以提供Comparable内部排序或者是Comparator外部排序, 具体取决于使用哪个构造函数。
ConcurrentSkipListMap
线程安全的有序的集合
底层是通过ConcurrentNavigableMap来实现的, 它是一个有序的线程安全的集合。
有序的, 基于元素的自然排序或者通过比较器确定的顺序
线程安全的
ConcurrentSkipListSet
ConcurrentMap
新增加了用来替代基于散列的Map容器
替代ArrayList接口实现类
CopyOnWriteArrayList就是ArrayList的一种线程安全的变体
是ArrayList的变体,
每次并发修改,CopyOnWriteArrayList都相当于重新创建副本
其内部有一个指向数组的引用, 数组是不会被修改的
CopyOnWriteArrayList中的所有可变操作(比如add和set等)都是通过对数组进行全新复制来实现的。
这个过程存在一定的开销,所以,一般在规模很大时,读操作要远span lang=\"EN-US\" style=\"font-size:8.0pt;mso-bidi-font-size:11.0pt;font-family:宋体; mso-fareast-font-family:宋体;mso-fareast-theme-font:minor-fareast;mso-bidi-font-family: 宋体;color:black\"远多于写操作时,span lang=\"EN-US\" style=\"font-size:8.0pt; mso-bidi-font-size:11.0pt;font-family:宋体;mso-fareast-font-family:宋体; mso-fareast-theme-font:minor-fareast;mso-hansi-font-family:Calibri; mso-hansi-theme-font:minor-latin;mso-bidi-font-family:"Times New Roman"; mso-bidi-theme-font:minor-bidi;color:black\" span lang=\"EN-US\" style=\"font-size:8.0pt;mso-bidi-font-size:11.0pt;font-family:宋体;mso-fareast-font-family: 宋体;mso-fareast-theme-font:minor-fareast;mso-bidi-font-family:宋体;color:black\"为了保证线程安全性,span lang=\"EN-US\" style=\"font-size:8.0pt;mso-bidi-font-size:11.0pt;font-family:宋体; mso-fareast-font-family:宋体;mso-fareast-theme-font:minor-fareast;mso-hansi-font-family: Calibri;mso-hansi-theme-font:minor-latin;mso-bidi-font-family:"Times New Roman"; mso-bidi-theme-font:minor-bidi;color:black\" span lang=\"EN-US\" style=\"font-size:8.0pt;mso-bidi-font-size:11.0pt;font-family:宋体;mso-fareast-font-family: 宋体;mso-fareast-theme-font:minor-fareast;mso-bidi-font-family:宋体;color:black\"会使用Copyspan lang=\"EN-US\" style=\"font-size:8.0pt;mso-bidi-font-size:11.0pt;font-family:宋体; mso-fareast-font-family:宋体;mso-fareast-theme-font:minor-fareast;mso-hansi-font-family: Calibri;mso-hansi-theme-font:minor-latin;mso-bidi-font-family:"Times New Roman"; mso-bidi-theme-font:minor-bidi;color:black\" On Wrte ArrayList.
每次并发写操作,都会创建新的副本
它就不会出现异常
一种fail-safe机制的,它不会抛出Concurrent Modification Exception异常,
并且返回元素与迭代器创建时的元素相同。
它就是一种fail-safe机制的集合
比如CopyOnWriteArrayList案例
CopyOnWriteArrayList案例代码
图解CopyOnWriteArraySet继承关系
图解CopyOnWriteArrayList继承关系
CopyOnWriteArrayList
替代Set接口实现类
类似的, CopyOnWriteArraySet的作用也相当于替代了Set接口。
CopyOnWriteArraySet
新增加了分别替代ArrayList和Set接口实现类
队列
传统的先进先出队列
ConcurrentLinkedQueue是一个先入先出的队列
它的操作不会阻塞, 如果队列为空,那么获取元素的操作会返回空值,
ConcurrentLinkedQueue
并发优先级队列
PriorityQueue扩展了Queue, 增加了可阻塞的播入和获取等操作
如果队列为空, 那么获取元素的操作将一直阻塞,直到队列中出现一个可用的元素为止
如果队列已满,那么插入操作则一直阻塞,直到队列中有可用的空间为止。
PriorityQueue
它有一些实现分别是
Queue
阻塞队列,关键字是阻塞,先理解阻塞的含义,
译为阻塞队列
它是JDK 1.5添加的新的工具类
它继承于Queue队列, 并扩展了Queue的功能
在检索元素时,会等待队列变成非空, 并在存储元素时,会等待队列变为可用
当队列中没有数据的情况下,消费者端的所有线程都会被自动阻塞(挂起),直到有数据放入队列。
图1 线程阻塞情况1
当队列中填满数据的情况下,生产者端的所有线程都会被自动阻塞(挂起),直到队列中有空的位置,线程被自动唤醒。
图2 线程阻塞情况2
阻塞队列中,线程阻塞两种情况
BlockingQueue的继承关系
第一种是抛出异常
特殊值:第二种是根据情况会返回null或者false
阻塞:第三种是无限期的阻塞当前线程直到操作变为可用后
超时:第四种是给定一个最大的超时时间,超过后才会放弃
有四种实现形式, 以不同的方式来处理
在其实现类的方法add、put或者offer后时,添加null会抛出空指针异常。
不允许添加null元素
在任意时间内, 它都会有一个remainCapacity,
超过该值之前, 任意put元素都会阻塞。
有容量限制
一般用于实现生产者-消费者队列
如下图所示
阻塞队列的主要方法
抛出异常:抛出一个异常;
阻塞:在成功操作之前,一直阻塞线程
超时:放弃前只在最大的时间内阻塞
方法类型
1:public abstract boolean add(E paramE):将指定元素插入此队列中(如果立即可行且不会违反容量限制),成功时返回 true,如果当前没有可用的空间,则抛出 IllegalStateException。如果该元素是NULL,则会抛出NullPointerException异常。
2:public abstract boolean offer(E paramE):将指定元素插入此队列中(如果立即可行且不会违反容量限制),成功时返回 true,如果当前没有可用的空间,则返回 false。
3:public abstract void put(E paramE) throws InterruptedException: 将指定元素插入此队列中,将等待可用的空间(如果有必要)
插入操作
4.drainTo():一次性从BlockingQueue获取所有可用的数据对象(还可以指定获取数据的个数),通过该方法,可以提升获取数据效率;不需要多次分批加锁或释放锁。
获取数据操作:
阻塞队列家族
一个用数组实现的有界队列,和ArrayList对应
比同步List具有更好的并发性能
FIFO先入先出队列,此队列顺序按照先入先出的原则对元素进行排序。
阻塞的线程,可以按照阻塞的先后顺序访问
即先阻塞线程先访问队列
公平访问队列是什么?
对先等待的线程是非公平的
有可能先阻塞的线程,最后才访问队列。
非公平性访问队列是什么?
默认情况下不保证线程公平的访问队列
由数组结构组成的有界阻塞队列。
用数组实现的有界阻塞队列。
此队列按照先进先出(FIFO)的原则对元素进行排序。
默认情况下,不保证访问者公平的访问队列,
(公平、非公平)
所谓公平访问队列是指阻塞的所有生产者线程或消费者线程,当队列可用时,可以按照阻塞的先后顺序访问队列,即先阻塞的生产者线程,可以先往队列里插入元素,先阻塞的消费者线程,可以先从队列里获取元素。
通常情况下为了保证公平性会降低吞吐量。
ArrayBlockingQueue
FIFO先入先出队列
一种BlockingQueue的实现,和LinkedList对应
队列的head也就是头元素是在队列中等待时间最长的元素;
队列的tail也就是队尾元素是队列中等待时间最短的元素。
新的元素会被插入到队尾中,检索操作将获取队列中的头部元素,
它是一种基于链表的构造、先入先出的有界阻塞队列
链表队列通常比基于数组的队列具有更高的吞吐量,但是在大多数并发应用程序中,可预测的性能较差。
由链表结构组成的有界阻塞队列。
基于链表的阻塞队列
同ArrayListBlockingQueue类似,此队列按照先进先出(FIFO)的原则对元素进行排序。
(两个独立锁提高并发)
而LinkedBlockingQueue之所以能够高效的处理并发数据,还因为其对于生产者端和消费者端分别采用了独立的锁来控制数据同步,这也意味着在高并发的情况下生产者和消费者可以并行地操作队列中的数据,以此来提高整个队列的并发性能。
LinkedBlockingQueue会默认一个类似无限大小的容量(Integer.MAX_VALUE)。
LinkedBlockingQueue
一个优先级排序的阻塞队列,一个支持优先级的阻塞队列
注意:不能保证同优先级元素的顺序
如果你希望按照某种顺序,而不是FIFO处理元素时,这个队列将非常有用。
既可以按照自然顺序来比较元素,
默认情况下的元素采取自然顺序生序或者降序,
自然排序
也可以使用Comparator比较器进行外部元素比较。
也可以自己定义Comparator进行外部排序。
自定义排序
正如其他有序的容器一样, PriorityBlockingQueue
支持优先级排序的无界阻塞队列
PriorityBlockingQueue(compareTo排序实现优先)
是一个支持优先级的无界队列。
默认情况下元素采取自然顺序升序排列。
可以自定义实现compareTo()方法来指定元素进行排序规则,或者初始化PriorityBlockingQueue时,指定构造参数Comparator来对元素进行排序。
需要注意的是不能保证同优先级元素的顺序。
PriorityBlockingQueue
一个支持延时获取元素的无阻塞队列
其中的元素只能在延迟到期后才能使用
队列头是延迟最长时间的元素, 如果没有延迟, 则没有head头元素, poll方法会返回null
getDelay(TimeUnit.NANOSECONDS) 方法返回一个值小于或者等于0,就会发生过期.
判断依据
使用优先级队列实现的无界阻塞队列
DelayQueue(缓存失效、定时任务 )
是一个支持延时获取元素的无界阻塞队列。
队列使用PriorityQueue来实现。
队列中的元素必须实现Delayed接口,在创建元素时可以指定多久才能从队列中获取当前元素。
只有在延迟期满时才能从队列中提取元素。
可以用DelayQueue保存缓存元素的有效期,
使用一个线程循环查询DelayQueue
一旦能从DelayQueue中获取元素时,表示缓存有效期到了。
缓存系统的设计:
使用DelayQueue保存当天将会执行的任务和执行时间
一旦从DelayQueue中获取到任务就开始执行
比如TimerQueue就是使用DelayQueue实现的。
定时任务调度
可以将DelayQueue运用在以下应用场景
DelayQueue
它的每个insert操作必须等待其他相关线程的remove方法后才能执行,反之亦然,
它维护的是一组线程而不是一组队列, 实际上它不是一个队列,
不存储元素的阻塞队列
SynchronousQueue(不存储数据、可用于传递数据)
是一个不存储元素的阻塞队列。
每一个put操作必须等待一个take操作,否则不能继续添加元素。
SynchronousQueue可以看成是一个传球手,负责把生产者线程处理的数据直接传递给消费者线程。
SynchronousQueue的吞吐量高于LinkedBlockingQueue 和 ArrayBlockingQueue。
SynchronousQueue
继承于BlockingQueue
它是一个接口, 一个BlockingQueue是一个生产者可能等待消费者接受元素
Transfer Queue则更进一步, 生产者会一直阻塞,直到所添加到队列的元素被某一个消费者所消费, 新添加的transfer方法用来实现这种约束。
一个是非阻塞的
参数设置超时时间的
另一个是带有timeout
两个tryTransfer方法
hasWaitingConsumer
getWaitingConsumerCount
两个辅助方法
Transfer Queue有下面这些方法
TransferQueue
由链表结构组成的无界阻塞队列
这个队列对任何给定的生产者进行FIFO排序
head是队列中存在时间最长的元素, tail是队列中存在时间最短的元素,
一个无界的基于链表的TransferQueue
是一个由链表结构组成的无界阻塞TransferQueue队列。
相对于其他阻塞队列,LinkedTransferQueue多了tryTransfer和transfer方法。
如果当前有消费者正在等待接收元素(消费者使用take()方法或带时间限制的poll()方法时)
transfer方法可以把生产者传入的元素立刻transfer(传输)给消费者。
如果没有消费者在等待接收元素,
transfer方法会将元素存放在队列的tail节点,并等到该元素被消费者消费了才返回。
transfer方法是必须等到消费者消费了才返回。
transfer方法
则是用来试探下生产者传入的元素是否能直接传给消费者。
如果没有消费者等待接收元素,则返回false。
tryTransfer方法无论消费者是否接收,方法立即返回。
tryTransfer方法
试图把生产者传入的元素直接传给消费者,但是如果没有消费者消费该元素则等待指定的时间再返回,
如果超时还没消费元素,则返回false,
如果在超时时间内消费了元素,则返回true。
LinkedTransferQueue
由链表结构组成的双向阻塞队列
LinkedBlockingDeque
是一个由链表结构组成的双向阻塞队列
所谓双向队列指的你可以从队列的两端插入和移出元素。
双端队列因为多了一个操作队列的入口,在多线程同时入队时,也就减少了一半的竞争。
相比其他的阻塞队列,LinkedBlockingDeque多了addFirst,addLast,offerFirst,offerLast,peekFirst,peekLast等方法
以First单词结尾的方法,表示插入,获取(peek)或移除双端队列的第一个元素。
以Last单词结尾的方法,表示插入,获取或移除双端队列的最后一个元素。
另外插入方法add等同于addLast
移除方法remove等效于removeFirst
但是take方法却等同于takeFirst,不知道是不是Jdk的bug,使用时还是用带有First和Last后缀的方法更清楚。
在初始化LinkedBlockingDeque时可以设置容量防止其过渡膨胀。另外双向阻塞队列可以运用在“工作窃取”模式中。
Java中的阻塞队列BlockingQueue有多种实现
BlockingQueue
还新增加了两种容器类型, 分别是
Java 5.0中
还引入了分别作为同步的SortedMap和SortedSet的并发替代品
JDK 1.6被提出
在工作密取的设计中,每个消费者都有各自的双端队列
如果一个消费者完成了自己双端队列的任务,就会去其他双端队列的末尾进行消费。
密取方式要比传统的生产者-消费者队列具有更高的可伸缩性
因为每个工作密取的工作者都有自己的双端队列,不存在竞争的情况。
而双端队列适用于工作密取
分别实现了在队列头和队列尾的插入
Deque是一个双端队列
Deque的可动态调整大小的数组实现
ArrayDeque的数据结构:数组, 并提供头尾指针下标对数组元素进行操作
使用数组实现的双端队列, 它是无界的双端队列,
一个更完善,可靠性更强的LIFO栈操作,
由Deque接口和他的实现 提供
底层实现
在日常使用得不多
禁止空元素,
这个类作为栈使用时要比Stack快,
Deque<Integer> stack = new ArrayDeque<Integer>()
应该优先选择使用ArrayDeque实现栈
采用链表实现双端队列
LinkedList
作为queue使用时,要比LinkedList快。
ArrayDeque
它与LinkedList的对比
ArrayDeque作为栈时比Stack性能好, 作为队列时比LinkedList性能好
如果没有外部加锁的情况下, 不支持多线程访问。
ArrayDeque不是线程安全的
如果创建了迭代器之后, 却使用了迭代器外部的remove等修改方法,
那么这个类将会抛出ConcurrentModificationException异常。
ArrayDeque具有fail-fast的特性
由于双端队列只能在头部和尾部操作元素,所以删除元素和插入元素的时间复杂度大部分都稳定在0(1),
除了remove、removeFirstOccurrence、removeLastOccurrence、contains、interator、remove外,
大部分的ArrayDeque都以恒定的开销运行。
时间复杂度大部分都稳定在0(1)
除非在扩容时会涉及到元素的批量复制操作,但是在大多数情况下,使用它时应该指定一个大概的数组长度,避免频繁的扩容。
数组下标的移动要比指针的操作要廉价,而且数组采用连续的内存地址空间,
而链表元素的内存地址是不连续的,所以数组操作元素的效率在寻址上会比链表要快。
链表的插入、除操作涉及到指针的操作
最小的容量是8(JDK 1.8) , 在JDK 11看到它默认容量已经是16了。
最小的容量是8(JDK 1.8) , JDK 11,默认容量已经是16
内部没有容量限制,会根据需要进行增长
该数据结构更加完善、可靠性更好
依靠队列也可以实现LIFO的栈操作
优点
双向链表的无界并发队列
JDK 1.7引入
它与ConcurrentLinkedQueue的区别是:ConcurrentLinkedDeque同时支持FIFO和FILO两种操作方式,即可以从队列的头和尾同时操作(插入/删除}
支持happen-before原则
不允许空元素
ConcurrentLinkedDeque
Deque的实现有
Deque
对Queue和BlockingQueue做了扩展
继承关系:对Queue和BlockingQueue做了扩展
阻塞模式一般用于生产者-消费者队列,
一个由链表结构组成的双向阻塞队列
即可以从队列的两端插入和移除元素
双向队列,因为多了一个操作队列的入口,在多线程同时入队时,也就减少了一半的竞争
把初始容量和构造函数绑定, 这样能够有效过度拓展。
初始容量如果没有指定,就取的是Integer.MAX_VALUE, 这也是Linked Blocking Deque的默认构造函数。
BlockingDeque的实现
BlockingDeque
与BlockingQueue相对的,还有
Java 6.0
Java并发工具包JUC综述
在多线程程序中,诸如++i 或 i++等运算不具有原子性,是不安全的线程操作之一。
通常会使用synchronized将该操作变成一个原子操作,
但JVM为此类操作特意提供了一些同步类,使得使用更方便,且使程序运行效率变得更高。
首先说明,此处AtomicInteger,一个提供原子操作的Integer的类,常见的还有AtomicBoolean、AtomicInteger、AtomicLong、AtomicReference等
实现原理相同,区别在与运算对象类型的不同。
还可以通过AtomicReference<V>将一个对象的所有操作转化成原子操作。
相关资料显示,通常AtomicInteger的性能是ReentantLock的好几倍。
AtomicInteger:一个提供原子操作的Integer的类
JDK1.5的原子包:java.util.concurrent.atomic这个包里面提供了一组原子类
其基本的特性就是在多线程环境下,当有多个线程同时执行这些类的实例包含的方法时,具有排他性
即当某个线程进入方法,执行其中的指令时,不会被其他线程打断,
而别的线程就像自旋锁一样,一直等到该方法执行完成,才由JVM从等待队列中选择一个另一个线程进入
这只是一种逻辑上的理解。
getAndIncrement采用了CAS操作,每次从内存中读取数据然后将此数据和+1后的结果进行CAS操作,
如果成功就返回结果,否则重试直到成功为止。
而compareAndSet利用JNI来完成CPU指令的操作。
由于一般CPU切换时间比CPU指令集操作更加长, 所以J.U.C在性能上有了很大的提升。
原子包 java.util.concurrent.atomic(锁自旋)
什么是CAS(比较并交换-乐观锁机制-锁自旋)
CAS(Compare And Swap/Set)比较并交换
相对于对于synchronized这种阻塞算法,CAS是非阻塞算法的一种常见实现。
CAS操作是抱着乐观的态度进行的(乐观锁),它总是认为自己可以成功完成操作。
当多个线程同时使用CAS操作一个变量时,只有一个会胜出,并成功更新,其余均会失败。
失败的线程不会被挂起,仅是被告知失败,并且允许再次尝试,当然也允许失败的线程放弃操作。
基于这样的原理,CAS操作即使没有锁,也可以发现其他线程对当前线程的干扰,并进行恰当的处理。
概念及特性
内存地址V
旧的预期值A
即将要更新的目标值B
CAS指令执行时,当且仅当内存地址V的值与预期值A相等时,将内存地址V的值修改为B,否则就什么都不做。
CAS需要有3个操作数
V表示要更新的变量(内存值),E表示预期值(旧的),N表示新值。
当且仅当V值等于E值时,才会将V的值设为N,
如果V值和E值不同,则说明已经有其他线程做了更新,则当前线程什么都不做。
最后,CAS返回当前V的真实值。
CAS算法的过程
整个比较并替换的操作是一个原子操作。
CAS操作是一条CPU指令,不会被打断,所以是原子操作
CAS是一种更新的原子操作,比较当前值跟传入值是否一样,一样则更新,否则失败。
CAS是一种更新的原子操作
CAS会导致“ABA问题”。
存在CAS的ABA问题
CAS算法实现一个重要前提需要取出内存中某时刻的数据,而在下时刻比较并替换,那么在这个时间差类会导致数据的变化。
如果内存地址V初次读取的值是A,并且在准备赋值的时候检查到它的值仍然为A,但可能期间它已经被修改过了
通常将 CAS 用于同步的方式是从地址 V 读取值 A,执行多步计算来获得新值 B,然后使用 CAS 将 V 的值从 A 改为 B。
如果 V 处的值尚未同时更改,则 CAS 操作成功。
举例1
比如说一个线程one从内存位置V中取出A,这时候另一个线程two也从内存中取出A,并且two进行了一些操作变成了B,然后two又将V位置的数据变成A,
这时候线程one进行CAS操作发现内存中仍然是A,然后one操作成功。
尽管线程one的CAS操作成功,但是不代表这个过程就是没有问题的。
举例2
使用AtomicStampedReference的版本号机制来管理
乐观锁每次在执行数据的修改操作时,都会带上一个版本号
一旦版本号和数据的版本号一致就可以执行修改操作并对版本号执行+1操作,否则就执行失败。
因为每次操作的版本号都会随之增加,所以不会出现ABA问题,因为版本号只会增加不会减少。
部分乐观锁的实现是通过版本号(version)的方式
解决ABA问题
ABA问题
CAS算法
JDK增加了LongAdder等四个类,高并发时效率更高,但是消耗空间更多
对数据进行操作时(使用CAS来保证)可以保证原子性
Unsafe底层实际上是调用C代码,C代码调用汇编,最后生成出一条CPU指令cmpxchg,完成操作
CompareAndSwap...()
通过unsafe实现的包装类
Atomic(原子操作)
AbstractQueuedSynchronizer类如其名,抽象的队列式的同步器,
一个抽象的队列式的同步器,定义了一套多线程访问共享资源的同步器框架(实现锁的框架),
AQS定义了一套多线程访问共享资源的同步器框架,
许多同步类实现都依赖于它,本质上其实就是对于state的获取和释放
ReentrantLock/ReentrantReadWriteLock /Semaphore/ CountDownLatch的实现都依赖于它
AQS只是一个框架,具体资源的获取/释放方式交由自定义同步器去实现
(通过state的get/set/CAS)
AQS这里只定义了一个接口,具体资源的获取交由自定义同步器去实现了
之所以没有定义成abstract,是因为独占模式下只用实现tryAcquire-tryRelease,而共享模式下只用实现tryAcquireShared-tryReleaseShared。
如果都定义成abstract,那么每个模式也要去实现另一模式下的接口。
不同的自定义同步器争用共享资源的方式也不同。
自定义同步器在实现时只需要实现共享资源state的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS已经在顶层实现好了
什么是 AQS(抽象的队列同步器)
1. isHeldExclusively():该线程是否正在独占资源。只有用到condition才需要去实现它。
2. tryAcquire(int):独占方式。尝试获取资源,成功则返回true,失败则返回false。
3. tryRelease(int):独占方式。尝试释放资源,成功则返回true,失败则返回false。
4. tryAcquireShared(int):共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
5. tryReleaseShared(int):共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点返回true,否则返回false。
自定义同步器实现时主要实现以下几种方法
同步器的实现是ABS核心
以ReentrantLock为例,state初始化为0,表示未锁定状态。A线程lock()时,会调用tryAcquire()独占该锁并将state+1。此后,其他线程再tryAcquire()时就会失败,直到A线程unlock()到state=0(即释放锁)为止,其它线程才有机会获取该锁。当然,释放锁之前,A线程自己是可以重复获取此锁的(state会累加),这就是可重入的概念。但要注意,获取多少次就要释放多么次,这样才能保证state是能回到零态的。
以CountDownLatch以例,任务分为N个子线程去执行,state也初始化为N(注意N要与线程个数一致)。这N个子线程是并行执行的,每个子线程执行完后countDown()一次,state会CAS减1。等到所有子线程都执行完后(即state=0),会unpark()主调用线程,然后主调用线程就会从await()函数返回,继续后余动作。
同步器的实现是ABS核心(state资源状态计数)
它维护了一个volatile int state(代表共享资源)和一个FIFO线程等待队列(多线程争用资源被阻塞时会进入此队列)。
这里volatile是核心关键词,具体volatile的语义,在此不述。
共享资源 volatile int state
getState()
setState()
compareAndSetState()
state的访问方式有三种:
数据结构
tryAcquire(int)/tryAcquireShared(int )
Exclusive独占资源-ReentrantLock
Exclusive(独占,只有一个线程能执行,如ReentrantLock)
独享模式
tryRelease/tryReleaseShared(int)
Share共享资源-Semaphore/CountDownLatch
Share(共享,多个线程可同时执行,如Semaphore/CountDownLatch)。
共享模式
一般来说,自定义同步器要么是独占方法,要么是共享方式,只需实现tryAcquire-tryRelease、tryAcquireShared-tryReleaseShared中的一种即可。
但AQS也支持自定义同步器同时实现独占和共享两种方式,如ReentrantReadWriteLock。
既可独占,也可共享(ReentrantReadWriteLock)
ReentrantReadWriteLock实现独占和共享两种方式
线程模式(AQS定义两种资源共享方式)
图解AQS
AQS
tryAcquire
acquireQueued
CAS
NonfairSync
hasQueuedPredecessors
如果是当前持有锁的线程 可重入
FairSync
入队 出队
头结点设计
共享和独享的实现
实际应用
cpu开销
AtomicReference
只能保证一个共享变量原子操作
标志位 时间戳
ABA
存在的问题
AbstractQueuedSynchronizer
ReentrantLock
ReadLock
WriteLock
ReentrantReadWriteLock
StampedLock锁支持工具
state的变量高16位是读锁,低16位是写锁。
读锁不能升级为写锁
写锁可以降级为读锁
当访问方式是读取操作时,使用读锁即可,当访问方式是修改操作时,则使用写锁
ReentrantReadWriteLock默认实现在sync中(sync继承了AQS)
WriteLock(实现Lock接口)
ReadLock(实现Lock接口)
内部类
Locks(并发锁)
同步工具类可以是任何一个对象
只要它根据自身状态来协调线程的控制流
什么是同步工具类
阻塞队列
翻译过来就是信号量
在进程间通信时, 就会谈到信号量进行通信
在Linux操作系统,采取中断时,也会向进程发出中断信号,根据进程的种类和信号的类型判断是否应该结束进程。
它其实就是一种信号, 在操作系统中, 也有信号量的这个概念
什么是Semaphore?
用来控制同时访问特定资源的线程数量, 它通过协调各个线程,以保证合理的使用公共资源。
Semaphore管理着一组许可(permit) , 许可的初始数量由构造函数来指定。
在获取某个资源之前,应该先从信号量获取许可(permit), 以确保资源是否可用
当线程完成对资源的操作后, 会把它放在池中并向信号量返回一个许可,从而允许其他线程访问资源,这叫做释放许可。
如果没有许可的话,那么acquire将会阻塞直到有许可(中断或者操作超时) 为止。release方法将返回一个许可信号量,
在Java中的Semaphore(信号量)
如果连接池不为空时,解除阻塞
如果数据库连接池为空,则阻塞线程,直接返回失败,
例如常用的数据库连接池, 线程请求资源时,
可以用来实现流量控制
信号量(Semaphore)
翻译成字面意思为 信号量
控制同时访问的线程个数
一种基于计数的信号量。
Semaphore(信号量)
它可以设定一个阈值,基于此,多个线程竞争获取许可信号,做完自己的申请后归还,超过阈值后,线程申请许可信号将会被阻塞。
Semaphore可以用来构建一些对象池,资源池之类的,比如数据库连接池
创建计数为1的Semaphore,将其作为一种类似互斥锁的机制,这也叫二元信号量,表示两种互斥状态。
实现互斥锁(计数器为1)
例子:若一个工厂有5台机器,但是有8个工人,一台机器同时只能被一个工人使用,只有使用完了,其他工人才能继续使用。通过Semaphore来实现
可以用于做流量控制,特别是公用资源有限的应用场景,比如数据库连接
应用场景
Semaphore可以控制同时访问的线程个数,通过 acquire() 获取一个许可,如果没有就等待,而 release() 释放一个许可。
消费一个许可证。如果没有许可证了,会阻塞起来
acquire()
添加一个许可证
release()
可以控制同时访问的线程个数,它维护了一组\"许可证\"。
public void acquire(): 用来获取一个许可,若无许可能够获得,则会一直等待,直到获得许可。
public void acquire(int permits):获取permits个许可
public void release() { } :释放许可。注意,在释放许可之前,必须先获获得许可。
public void release(int permits) { }:释放permits个许可
上面4个方法都会被阻塞,如果想立即得到执行结果,可以使用下面几个方法
public boolean tryAcquire():尝试获取一个许可,若获取成功,则立即返回true,若获取失败,则立即返回false
public boolean tryAcquire(int permits):尝试获取permits个许可,若获取成功,则立即返回true,若获取失败,则立即返回false
availablePermits()方法得到可用的许可数目。
Semaphore类中比较重要的几个方法
Semaphore其实和锁有点类似,它一般用于控制对某组资源的访问权限。
Semaphore基本能完成ReentrantLock的所有工作,使用方法也与之类似,通过acquire()与release()方法来获得和释放临界资源。
经实测,Semaphone.acquire()方法默认为可响应中断锁,与ReentrantLock.lockInterruptibly()作用效果一致,也就是说在等待临界资源的过程中可以被Thread.interrupt()方法中断。
此外,Semaphore也实现了可轮询的锁请求与定时锁的功能,除了方法名tryAcquire与tryLock不同,其使用方法与ReentrantLock几乎一致。
Semaphore也提供了公平与非公平锁的机制,也可在构造函数中进行设定。
Semaphore的锁释放操作也由手动进行,因此与ReentrantLock一样,为避免线程因抛出异常而无法正常释放锁的情况发生,释放锁的操作也必须在finally代码块中完成。
Semaphore 与ReentrantLock
闭锁(Latch) 是一种同步工具类
它可以延迟线程的进度以直到其到达终止状态
在闭锁达到结束状态前,门是一直关着的,没有任何线程能够通过。
当闭锁到达结束状态后,这扇门会打开井且允许任何线程通过,然后就一直保持打开状态。
闭锁的作用相当于是一扇门,
通过闭锁来启动一组相关的操作,使用闭锁来等待一组事件的执行。
闭锁是一种一次性对象,一旦进入终止状态后,就不能被置
闭锁(Latch)
闭锁的一种实现
它可以使一个或者多个线程等待一组事件的发生
闭锁有一个计数器, 闭锁需要对计数器进行初始化, 表示需要等待的次数
闭锁在调用await处进行等待, 其他线程在调用countDown把闭锁count次数进行递减, 直到递减为0, 唤醒await。
略
如下代码所示
CountDownLatch
Barrier的特点和闭锁也很类似, 它也是阻塞一组线程直到某个事件发生。
栅栏与闭锁的区别在于,所有线程必须同时到达栅栏的位置,才能继续执行
ABCD四条线程, 必须同时到达Barrier, 然后手牵手一起走过幸福的殿堂。
当线程到达Barrier的位置时,会调用await方法, 这个方法会阻塞直到所有线程都到达Barrier的位置,
如果所有线程都到达Barrier的位置, 那么Barrier将会打开使所有线程都被释放, 而Barrier将被重置以等待下次使用.
如果调用await方法导致超时, 或者await阻塞的线程被中断, 那么Barrier就被认为被打破, 所有阻塞的await都会抛出BrokenBarrierException.
如果成功通过栅栏后,await方法返回一个唯一索引号, 可以利用这些索引号选举一个新的leader, 来处理一下其他工作
栅栏(Barrier)
CountDownLatch类位于JUC包下,利用它可以实现类似计数器的功能
只有一个构造方法 只会被赋值一次
另外,CountDownLatch是不能够重用的,
CountDownLatch一般用于某个线程A等待若干个其他线程执行完任务之后,它才执行
没有别的方法可以修改 count
count数减1
countDown()
线程阻塞直到count等于0
await()
CountDownLatch允许一个或多个线程等待其他线程完成操作
开启多个线程分块下载一个大文件,每个线程只下载固定的一截,最后由另外一个线程来拼接所有的分段。
应用程序的主线程希望在负责启动框架服务的线程已经启动所有的框架服务之后再执行。
确保一个计算不会执行,直到所需要的资源被初始化。
比如有一个任务A,它要等待其他4个任务执行完毕之后才能执行,此时就可以利用CountDownLatch来实现这种功能了。
CountDownLatch(线程计数器 )
等待至barrier状态再全部同时执行
字面意思回环栅栏,通过它可以实现让一组线程等待至某个状态之后再全部同时执行。
叫做回环是因为当所有等待线程都被释放以后,CyclicBarrier可以被重用。
我们暂且把这个状态就叫做barrier,当调用await()方法之后,线程就处于barrier了。
当cyclicBarrier的count数等于0的时候,阻塞的线程都继续执行
CyclicBarrier(int ),参数表示屏障拦截的线程数量,每个线程调用await方法告诉CyclicBarrier我已经到达了屏障,然后当前线程被阻塞。
CyclicBarrier中最重要的方法就是await方法
public int await():用来挂起当前线程,直至所有线程都到达barrier状态再同时执行后续任务;
它有2个重载版本
多线程计算
CyclicBarrier一般用于一组线程互相等待至某个状态,然后这一组线程再同时执行;
而CyclicBarrier是可以重用的。
CyclicBarrier是可以重用的。
CyclicBarrier(回环栅栏)
CountDownLatch和CyclicBarrier都能够实现线程之间的等待,只不过它们侧重点不同;
CountDownLatch的计数器只能使用一次,而CyclicBarrier的计数器可以使用reset()方法重置
CountDownLatch.await一般阻塞主线程,而CyclicBarrierton一般阻塞工作线程
CountDownLatch主要用于描述一个或者多个线程等待其他线程执行完毕
CyclicBarrier主要用于描述多个线程之间相互等待
CountDownLatch和CyclicBarrier的区别
与Barrier相关联的还有一个工具类就是Exchanger
Exchanger是一个用于线程间协作的工具类。
Exchanger用于进行线程间的数据交换.
它提供一个同步点, 在这个同步点两个线程可以交换彼此的数据。
这两个线程通过exchange方法交换数据, 如果第一个线程先执行exchange方法, 它会一直等待第二个线程也执行exchange,
当两个线程都到达同步点时,这两个线程就可以交换数据,将本线程生产出来的数据传递给对方,因此使用Exchanger的重点是成对的线程使用exchange) 方法, 当有一对线程达到了同步点, 就会进行交换数据。
因此该工具类的线程对象是成对的。
Exchanger
Exchanger用于进行线程间的数据交换。
它提供一个同步点,在这个同步点,两个线程可以交换彼此的数据。
这两个线程通过exchange方法交换数据,如果第一个线程先执行exchange()方法,它会一直等待第二个线程也执行exchange方法,当两个线程都到达同步点时,这两个线程就可以交换数据,将本线程生产出来的数据传递给对方。
多种类型的同步控制类、同步工具类
Tools(同步工具)
继承了ReentrantLock
一个concurrentHashMap包含了一个segment数组
一个segment包含了一个hashEntry链表
segment
链表结构的元素
为了维护链表结构,防止并发问题
hashEntry的成员变量除了value都定义为final
hashEntry
JDK1.7
Node数组
链表
红黑树
JDK1.8
1.首先Hash定位到Segment
2.对当前Segment加锁,如果Segment中元素的数量超过了阈值,则需要进行扩容并且进行rehash
3.再定位到链表头部
流程
put()
根据java内存模型的happen before原则,对volatile字段的写入操作先于读操作,即使两个线程同时修改和获取volatile变量,get操作也能拿到最新的值,这是用volatile替换锁的经典应用场景
volatile能够保证内存可见性
因为共享变量都定义为了volatile
不用加锁,是非阻塞的
需要进行两次Hash操作,第一次Hash定位到Segment,第二次Hash定位到元素所在的链表的头部
取出的value是null的原因是可能现在正在进行put操作
定位到链表头部之后根据key取出对应的value值
get()
因为HashEntry中的next是final的,一经赋值以后就不可修改,所以在定位到待删除元素e的位置以后,程序就将待删除元素前面的那一些元素全部复制一遍,然后再一个一个重新接到链表上去。尾结点指向e的下一个结点。e后面的结点不需要复制,它们可以重用。e之前的元素在remove()之后为remove之前的逆置
remove()
size()操作涉及到多个segment
size操作就是遍历了两次Segment,每次记录Segment的modCount值,然后将两次的modCount进行比较,如果相同,则表示期间没有发生过写入操作,就将原先遍历的结果返回,如果不相同,则把这个过程再重复做一次,如果再不相同,则就需要将所有的Segment都锁住,然后一个一个遍历了
size()
常用方法底层实现(JDK1.7)
key和value都不可以为null
get()方法不加锁,是非阻塞的
是线程安全的
JDK 1.8 使用了 CAS 操作来支持更高的并发度,在 CAS 操作失败时使用内置锁 synchronized。并且 JDK 1.8 的实现也在链表过长时会转换为红黑树。
ConcurrentLinkedQueue
一个内部使用跳表,并且支持排序和并发的一个Map
一般很少会被用到,也是一个比较偏门的数据结构。
跳表是一种允许在一个有顺序的序列中进行快速查询的数据结构。
在普通的顺序链表中查询一个元素,需要从链表头部开始一个一个节点进行遍历,然后找到节点。
跳表可以解决这种查询时间过长,其元素遍历的图示
跳表是一种使用”空间换时间”的概念,用来提高查询效率的链表。
跳表
ConcurrentSkipListMap 和ConcurrentSkipListSet
它们是CopyOnWriteArrayList 和CopyOnWriteArraySet。
它的add/remove 等方法都已经加锁了,还要copy 一份再修改干嘛?多此一举?
同样是线程安全的集合,这玩意和Vector 有啥区别呢?
疑问?
Copy-On-Write
简称COW
是一种用于程序设计中的优化策略
Copy-On-Write是什么?
从一开始大家都在共享同一个内容,当某个人想要修改这个内容时,才会真正把内容Copy 出去形成一个新的内容然后再改,
这是一种延时懒惰策略。
Copy-On-Write的基本思路
CopyOnWrite 容器即写时复制的容器。
CopyOnWrite 容器非常有用,可以在非常多的并发场景中使用到。
通俗的理解是
不直接往当前容器添加,
而是先将当前容器进行Copy,复制出一个新的容器,
然后新的容器里添加元素,添加完元素之后,再将原容器的引用指向新的容器。
当往一个容器添加元素时,
CopyOnWrite 容器
适合多读少写的场景
CopyOnWrite 并发容器用于读多写少的并发场景。
比如白名单,黑名单,商品类目的访问和更新场景。
CopyOnWrite 并发容器的使用场景
CopyOnWriteArrayList 中add/remove 等写方法是需要加锁的,
目的是为了避免Copy 出N 个副本出来,导致并发写。
注意:CopyOnWriteArrayList 的整个add 操作都是在锁的保护下进行的。也就是说add 方法是线程安全的。
写方法是需要加锁的
这样做的好处是我们可以对CopyOnWrite 容器进行并发的读,当然,这里读到的数据可能不是最新的。
因为写时复制的思想是通过延时更新的策略来实现数据的最终一致性的,并非强一致性。
读方法是没有加锁的
所以CopyOnWrite 容器是一种读写分离的思想,读和写不同的容器
而Vector 在读写时,使用同一个容器,读写互斥,同时只能做一件事儿。
CopyOnWriteArrayList和Vector的本质区别
(add()、set() 和remove() 等等)
因为通常需要复制整个基础数组,所以可变操作的开销很大
但不支持可变remove()等操作
迭代器,
使用迭代器进行遍历的速度很快,并且不会与其他线程发生冲突。
在构造迭代器时,迭代器依赖于不变的数组快照。
支持高效率并发且是线程安全的
和ArrayList基本一模一样,但它是线程安全的
CopyOnWriteArrayList 相当于线程安全的ArrayList
CopyOnWriteArrayList 使用了一种叫写时复制的方法
当有新元素add 到CopyOnWriteArrayList 时,先从原有的数组中拷贝一份出来,然后在新的数组做写操作,写完之后,再将原来的数组引用指向到新数组。
任何对array在结构上有所改变的操作(add、remove、clear等),CopyOnWriterArrayList都会copy现有的数据,再在copy的数据上修改,font color=\"#c41230\
font color=\"#c41230\
CopyOnWriteArrayList的特点总结
1.复制的数组会消耗内存
2.不能读取实时性的数据
3.会产生大量的对象
CopyOnWriteArrayList的缺点
CopyOnWriteArrayList(线程安全的ArrayList)
两个Copy-On-Write机制实现的并发容器
Collections(并发容器)
继承Thread类
实现Runnable接口
常见的创建多线程的方式有两种
这两种方式都没有返回值
Callable接口
Future就是:对具体的Runnable或者Callable任务的执行结果进行一系列的操作
必要时可通过get方法获取执行结果, 这个方法会阻塞直到执行结束。
Future中的主要方法有
尝试取消任务的执行, 如果任务已经完成、已经被取消或者由于某些原因而无法取消,那么这个尝试会失败。
如果取消成功,或者在调用cancel时此任务尚未开始, 那么此任务永远不会执行,
如果任务已经开始, 那么mayInterruptIfRunning参数会确定是否中断执行任务以便于尝试停止该任务。
这个方法返回后,会对isDone的后续调用也返回true,
如果cancel返回true, 那么后续的调用isCancelled也会返回true,
cancel(boolean mayInterruptIfRunning)
如果此任务在正常完成之前被取消, 则返回true
boolean isCancelled()
如果任务完成, 返回true
boolean isDone()
等待必要的计算完成,然后检索其结果
V get() throws InterruptedException,ExecutionException
必要时最多等待给定时间以完成计算, 然后检索其结果。
因为Future只是一个接口, 所以是无法直接用来创建对象使用的, 因此就有了下面的FutureTask。
Future接口
FutureTask实现了RunnableFuture接口
RunnableFuture接口又继承了Runnable接口和Future接口。
纳尼?在Java中不是只允许单继承么?
是的,单继承更多的是说的类与类之间的继承关系
子类继承父类,扩展父类的接口,这个过程是单向的, 就是为了解决多继承引起的过渡引用问题。
Java编程思想
而接口之间的继承是接口的扩展, 在Java编程思想中也印证了这一点
成功执行的run方法,会使Future接口的完成并允许访问其结果。
所以它既可以作为Runnable被线程执行, 又可以作为Future得到Callable的返回值,
对RunnableFuture接口的解释是
RunnableFuture接口是什么
等待运行
正在运行
运行完成
FutureTask也可以用作闭锁, 它可以处于以下三种状态
FutureTask在Executor框架中表示异步任务,
此外还可以表示一些时间较长的计算, 这些计算可以在使用计算结果之前启动。
FutureTask类
创建多线程还有其他三种方式
Future和FutureTask
Executor(并发框架)
图解JUC
图解JUC并发包
JUC
同步容器类
0 条评论
下一页