MQ
2021-11-12 11:25:53 0 举报
AI智能生成
针对于Java体系,常用的消息中间件,Rabbitmq、Kafka等
作者其他创作
大纲/内容
问题
消息发送
Kafka
同步/异步
发送失败后可以配置重试策略
Rabbitmq
同步/异步
发送失败后,自己进行重试策略
问题
网络抖动会造成消息丢失
解决方案
可以在本地进行临时存储消息,定时去处理没有返回Ack的消息,并且进行重发
消息丢失(可靠性投递)
Kafka
生产端
设置ACK的三种模式
Rabbitmq
消息落库
1.搞一个消息记录表,发送消息前,做一个消息状态标识
2.消息传递到Broker中后,Confirm回调通知生产者进行消息状态更新
3.可靠性投递-并不包含消息成功消费
4.搞一个定时Job,按照业务设置时间进行定时处理状态未改变的消息,进行尝试,多次失败后,不再进行重发
延时投递
说明
考虑到如上方案会进行至少两次Db的操作,因此阻塞性能,不再依赖生产者进行消息状态更新
优化
引入第三方服务,进行接收二次投递检查消息和消费端成功消费通知的消息(更新消息状态)
1.生产者在原有业务消息的基础上再传递二次确认消息进行处理(第三方服务处理该消息)-也是Mq消息吗
2.消费端服务处理完逻辑后,发送确认消费消息到第三方服务中-通过MQ吗
3.第三方服务接收消费端确认消息后,更新消息的状态
4.第三方服务接收到二次核查消息后,进行判断消息的状态,出现问题后,回调发送端服务(如何进行回调呢),进行重试发送
问题
生产者的二次核查消息优先于消费端发送的消息,到达第三方服务的时候,此时让生产者再次发送重试也是没问题的
消息重复消费
Kafka
生产者
网络抖动下,生产者重复发送
Kafka中Broker配置生产端幂等性的配置(默认false)
包含
PID
和生产者绑定一致
Sequence Number
对应分区的主键递增的数字
消费端
自动提交方式
还没来记得提交Offset就挂了
手动提交方式
消费一半服务宕机后,还没有提交Offset
默认会重试10次
需要重写Bean,然后定义多长时间重试多少次
Rabbitmq
消费端
配置消费重试策略,造成重复消费
解决
防幂等
消息消费超时
Kafka
其一生产端消息比较多,消费端消费能力比较弱,导致因逻辑耗时未及时提交到Broker,Broker判定当前的消费端能力比较弱,更换其分区对应的消费者
Rabbitmq
消息消费超时后会怎么办
姑且认为你有超时时间,只不过不知道多少了
消息顺序消费(通用)
生产
设置模式为ACK=0
采用同步发送的方式
Broker
保证一个Topic只有一份分区
消费
保证一个分组只有一个消费端进行消费
性能问题
通过消费端再次进行排序,根据业务标识排列到内部不同的队列中,采用多线程进行处理
如果就必须采用多个分区对应多个消费者的处理办法呢?
网上查阅资料-待定
消息积压(通用)
说明
消息出现积压问题,证明消费能力弱了,查看是否分区数目和消费端数目保持一致,不一致的话(消费者<分区数目),增加消费者;不可以临时扩增分区,因为会触发Rebalance机制,消费端会进行暂停处理
方案
1.另外再搞一个服务,该服务只负责接收消息,然后进行转发,并不进行逻辑处理
2.再搞一个Topic,然后增加主题对应分区的数目,再多搞一些消费者进行消费
对比
生产端发送数据
Rabbitmq
默认没有失败后,自动重发的策略,在Return/confirm中实现
Kafka
可以配置失败策略
消费端获取数据
Rabbitmq
第一次启动的时候去拉取消息,后续Server会推送数据给消费端
Kakfa
主动去Poll数据
核心关联组件
Rabbitmq
队列进行关联消费和生产端
Kafka
利用Topic进行关联生产、分区、消费端的
Kafka
服务端
服务端-Broker
消费者会长期和生产者建立一个心跳机制
分区-Partition
日志分段存储(生产者发送消息保存)
描述
消息存储文件夹是通过topic名称+分区号命名,消息在分区内是分段(segment)存储,每个段的消息都存储在不一样的log文件里,这种特性方便old segment file快速被删除,kafka规定了一个段位的 log 文件最大为 1G
Segment(topic名称+分区号文件夹下存在多个Segment)
index
kafka每次往分区发4K(可配置)消息就会记录一条当前消息的offset到index文件
log
消息存储文件,主要存offset和消息体
timeindex
kafka每次往分区发4K(可配置)消息就会记录一条当前消息的发送时间戳与对应的offset
消费者消费消息偏移量
kafka-log日志中会存放0-50文件夹自身维护“消费偏移量”的信息
key是consumerGroupId+topic+分区号,value就是当前offset的值
选择策略
Hash(groupId)%offset记录文件大小(默认50)
副本故障处理机制
场景
适用于Leader和Follower间进行同步数据的过程中,两者中其一发生问题,导致偏移量出现不一致问题,从而利用LEO和HW进行同步消息
方案名词
LEO
每个副本数据最后一个Offset的位置
HW
消费者能够消费的最大的Offset的值
用于在某个节点宕机后,同步数据的标尺,容易造成Leader节点还未同步到Follower节点的数据丢失
话题-Topic
包含
Leader
标识哪台节点是Leader,同时给定partition的所有读写请求
Replicas
某个partition在哪几个broker上存在备份,挂掉后,也会列举出来
Isr
列举出当前还存活着的,并且已同步备份了该partition的节点
注意
同一个主题不同分区leader是不一样的
在集群数量为3的情况下,某个Topic,分为两个分区
Broker 0作为Partition 0 的Leader节点,其它节点都是该Partition的副本
Broker 1作为Partition 1的Leader节点,其它节点都是该Partition的副本
偏移量-Offset
标识消费者消费的位置(通过定时任务进行删除)
控制器-Controller
作用
集群中选举一个Broker称为控制器角色,负责维护和管理每个Broker分区副本的状态
当某个分区的leader副本出现故障时,由控制器负责为该分区选举新的leader副本
当检测到某个分区的ISR集合发生变化时,由控制器负责通知所有broker更新其元数据信息
当使用kafka-topics.sh脚本为某个topic增加分区数量时,同样还是由控制器负责让新分区被其他节点感知
到。
到。
Controller选举原理
在ZK上创建临时节点,成功创建的Broker成为了控制器角色,其它节点则订阅该Broker,一旦控制器角色发生宕机后,则会通知其它节点,进行重新选举控制器角色
Partition中Leader选举机制
副本进入ISR列表有两个条件
副本节点不能产生分区
副本能复制leader上的所有写操作,并且不能落后太多
ISR机制
unclean.leader.election.enable=false
挑ISR中第一个broker作为leader
unclean.leader.election.enable=true
在ISR列表里所有副本都挂
了的时候可以在ISR列表以外的副本中选leader
了的时候可以在ISR列表以外的副本中选leader
过程
1.优先创建Topic,并且指定Partition分区数量;如果生产者发送的Topic不存在的话,则默认创建;因此某个Topic下面会包含多个描述信息,详见Topic
客户端
生产者-Producer
ACK机制(集群)
0
不考虑任何Broker是否接收到消息并且持久化磁盘
延迟低,消息容易丢失
-1
Leader持久化成功,并且Follow节点成功同步消息
延迟高,Leader节点宕机后,造成消息重复存放在其它的Follow节点中
1
Leader持久化到磁盘成功,回调通知ack
延迟适中,Follow节点容易同步失败
发送消息方式
同步阻塞
异步接收
配合CountDownLatch使用
高性能设置
自身本地缓存容量(32MB)
批次(16KB)
当消息没有放满批次的容量后,可以通过配置定时(另外开一个线程)主动Send Message
消费者组-Group
同一个组内只有一个consumer消费消息
消费者-Consumer
Rebalance机制
描述
消费组里的消费者数量有变化或消费的分区数有变化,重新将分区和消费者建立联系
策略
Range分配
轮询分配
Sticky策略
保证如下两个原则
分区的分配要尽可能均匀
分区的分配尽可能与上次分配的保持相同
消费Topic的方式
从启动后开始消费(默认)
从头开始消费,下次启动后,再继续从未消费过的位置进行消费
区别于 每次都从头开始消费
剔除机制
1.如果消费端与Broker心跳超时,则剔除
2.如果消费端连续两次Poll时间间隔超时,则剔除
机制
生产者缓冲区(持久化磁盘)
减少生产端和Broker的交互次数
消费组中是如何记录是否消费过消息,也就是更改了Offset偏移量的(消费组和偏移量有关系)
老的kafka是记录到ZK了
新版本直接生成消费者的偏移量文件
描述
每个consumer会定期将自己消费分区的offset提交给kafka内部topic
内容
key是consumerGroupId+topic+分区号,value就是当前offset的值
消费者获取消息和偏移量无直接关系
设置自动提交后,先获取消息,偏移量自动加一,一旦消费端出错后,消息不可重新消费;并且服务重启后,消息不会被重复消费
不设置自动提交后,先获取消息,手动更改偏移量,如果没有更改的话,那么会接着获取消息,偏移量还是在首个消息位置;解决了消息出错后不可再次消息,但是服务重启后,消息会被重复进行消费(一般都会设置手动提交)
高性能
磁盘的顺序读写
kafka的消息写入文件都是追加在文件末尾,不会写入文件中的某个位置(随机写)保证了磁盘顺序写
kafka的消息写入文件都是追加在文件末尾,不会写入文件中的某个位置(随机写)保证了磁盘顺序写
数据传输的零拷贝(CPU拷贝过程)
DMA控制器(1)
接收内核态的IO操作指令,读取成功后,发送给内核态数据,减少CPU直接去拷贝次数
Mmap技术-“内核”缓冲区和“应用程序”缓存空间映射相互映射(2)
减少一次内核态到应用空间的拷贝次数
内核态到用户态还是照常进行切换的
SendFile(3)
版本升级
2.1版本
直接通过内核态拷贝数据到Socket缓冲区,然后发送到网卡,减少了两次内核态到用户态的切换
2.4版本
直接通过内核态缓冲区采用DMA技术把数据拷贝到网卡,减少了以此CPU拷贝的次数
结论
最终采用了两次上下文切换
两次DMA拷贝技术
事务机制
说明
只能保证Kafka本地的事务一致性,不能保证分布式的事务一致性
场景
针对于多个不同的Topic同时成功或者失败
过程
利用Kafka开启事务/提交/回滚
问题
为什么要对Topic中数据进行分区存储?
数据做了分布式存储
提高并行度
生产者是根据什么将消息投递到分区中?
1.是否指定了某个分区
2.未指定发送分区,具体发送的分区计算公式:hash(key)%partitionNum
3.没指定分区,并且没有设置key的话,则采用轮询的方式选择分区
消费者是如何绑定分区的?
如何查找指定的offset的值?
1.先找到分区对应的segment,其实就是分区文件夹
2.根据segment,找到索引表中偏移量对应log中的消息地址(索引表中并不会记录所有的偏移量索引)
选举机制
其一就是控制器的选举,利用临时顺序节点在Zk上面创建,先创建成功的会优先当做控制器Broker
其二就是分区的Leader选举,其实是通过控制器来进行判断的,出现问题后会动态跟新Isr中的数据,并且会及时更新到Broker的服务器缓存中
分区设置越多吞吐量越好吗?
利用Kafka压测工具进行测试
Rabbitmq
五种消息模式
一对一模式
一对多模式
发布订阅模式
多个Topic对应多个消费端
说明
该模式开始依赖于交换机,因此交换机会存在三种类型
路由模式
通配符模式
RPC模式
交换机类型
广播-Fanout
将消息发送给绑定到交换机上面所有的队列
定向-Direct
利用router_key进行转发
主题-Topic
利用router_key通配符*号
特性
消费端限流消费消息
默认一条一条消费
Confirm
生产者将消息传达到Broker中会通知
Return
说明
交换机指定到队列中会返回
情况
1.broker中根本没有对应的exchange交换机来接受该消息
2.消息能够投递到broker的交换机上,但是交换机根据routingKey 路由不到某一个队列上
措施
针对于以上的情况,如果消息生产端 的mandatory设置为true,那么会触发Return机制;否则,消息会丢失
TTl
消息
该消息在队列头部消费的时候,判断是否过期
队列
该队列中的消息都会存在过期时间
死信队列
消息进入死信队列的条件
消息队列满了
设置延迟消息,过期时还未被消费者进行消费
消费者消费多次失败
设置消费重试的次数
basicNack/basicReject,并且不把消息重新放入原目标队列,requeue=false
0 条评论
下一页