消息中间件(MQ)
2021-03-26 16:03:12 161 举报
AI智能生成
消息中间件(MQ)是一种基于异步通信、分布式的消息传递架构,用于在分布式系统中实现应用程序之间的解耦、缓冲和削峰填谷。它通过将消息发送者与接收者分离,使得系统具有更高的可扩展性、可靠性和灵活性。消息中间件通常采用发布-订阅模式,支持多种消息传输协议,如AMQP、MQTT和STOMP等。常见的消息中间件有RabbitMQ、Kafka、ActiveMQ和RocketMQ等。
作者其他创作
大纲/内容
activeMq
吞吐量低
订阅形式
点对点(p2p)
广播(发布-订阅)
现状
没有经过大规模吞吐量场景验证,社区不活跃
基于内存的队列,极其成熟
rocketMq
概述
比rabbitMq高
订阅形式
基于topic/messageTag以及按照消息类型、属性进行正则匹配的发布订阅模
现状
社区活跃度不算太高
国内相对活跃
要点
优先级队列
Message Priority
Message Priority
没有实现优先级队列,但可以通过定义高优先级队列和低优先级队列的方式来实现
顺序队列
message order
message order
基本概念
生产者消息推送方式
同步发送
异步发送
顺序发送
普通顺序消息
消费者通过同一个消费队列收到的消息是有顺序的,
不同消息队列收到的消息则可能是无顺序的。
不同消息队列收到的消息则可能是无顺序的。
严格顺序消息
消费者收到的所有消息均是有顺序的
单向发送
消费者消费消息两种方式
pull
客户端不断的轮询请求服务端,来获取新的消息。
push
(默认)
(默认)
只要有数据Broker就会一直推,不关注消费端是否能够处理
客户端与服务端建立连接后,当服务端有消息时,将消息推送到客户端。
非真正意义推送,基于长轮询实现
两种消费模式
集群消费
相同Consumer Group的每个Consumer实例平均分摊消息(只有一个能消费)
广播消费
相同Consumer Group的每个Consumer实例都接收全量的消息
主题(Topic)
表示一类消息的集合,每个主题包含若干条消息,每条消息只能属于一个主题,是RocketMQ进行消息订阅的基本单位。
名字服务器(name server)
充当路由消息的提供者。生产者或消费者能够通过Nameserver查找各主题相应的Broker IP列表。
多个Namesrver实例组成集群,但相互独立,没有信息交换。
多个Namesrver实例组成集群,但相互独立,没有信息交换。
消息(Message)
每条消息必须属于一个主题。
RocketMQ中每个消息拥有唯一的Message ID,且可以携带具有业务标识的Key。
系统提供了通过Message ID和Key查询消息的功能。
RocketMQ中每个消息拥有唯一的Message ID,且可以携带具有业务标识的Key。
系统提供了通过Message ID和Key查询消息的功能。
标签(Tag)
为消息设置的标志,用于同一主题下区分不同类型的消息。
来自同一业务单元的消息,可以根据不同业务目的在同一主题下设置不同标签。
标签能够有效地保持代码的清晰度和连贯性,并优化RocketMQ提供的查询系统。
消费者可以根据Tag实现对不同子主题的不同消费逻辑,实现更好的扩展性。
来自同一业务单元的消息,可以根据不同业务目的在同一主题下设置不同标签。
标签能够有效地保持代码的清晰度和连贯性,并优化RocketMQ提供的查询系统。
消费者可以根据Tag实现对不同子主题的不同消费逻辑,实现更好的扩展性。
特性
消息顺序
全局顺序(严格顺序)
指定的一个 Topic,所有消息按照严格的先入先出(FIFO)的顺序进行发布和消费。
适用场景
性能要求不高,所有的消息严格按照 FIFO 原则进行消息发布和消费的场景
分区顺序(普通顺序)
指定的一个 Topic,所有消息根据 sharding key 进行区块分区。
同一个分区内的消息按照严格的 FIFO 顺序进行发布和消费。
Sharding key 是顺序消息中用来区分不同分区的关键字段,和普通消息的 Key 是完全不同的概念。
同一个分区内的消息按照严格的 FIFO 顺序进行发布和消费。
Sharding key 是顺序消息中用来区分不同分区的关键字段,和普通消息的 Key 是完全不同的概念。
适用场景
性能要求高,以 sharding key 作为分区字段,在同一个区块中严格的按照 FIFO 原则进行消息发布和消费的场景。
消息过滤
消费者可以根据Tag进行消息过滤,也支持自定义属性过滤。
消息过滤目前是在Broker端实现的,
优点:减少了对于Consumer无用消息的网络传输;
缺点:增加了Broker的负担、而且实现相对复杂。
消息过滤目前是在Broker端实现的,
优点:减少了对于Consumer无用消息的网络传输;
缺点:增加了Broker的负担、而且实现相对复杂。
消息可靠性
影响可靠性原因
1.Broker非正常关闭
2.Broker异常Crash
3.OS Crash
4.机器掉电,但是能立即恢复供电情况
5.机器无法开机(可能是cpu、主板、内存等关键设备损坏)
6.磁盘设备损坏
2.Broker异常Crash
3.OS Crash
4.机器掉电,但是能立即恢复供电情况
5.机器无法开机(可能是cpu、主板、内存等关键设备损坏)
6.磁盘设备损坏
对策
1)、2)、3)、4) 四种情况都属于硬件资源可立即恢复情况,
RocketMQ在这四种情况下能保证消息不丢,或者丢失少量数据(依赖刷盘方式是同步还是异步)。
RocketMQ在这四种情况下能保证消息不丢,或者丢失少量数据(依赖刷盘方式是同步还是异步)。
5),6)通过异步复制,可保证99%的消息不丢,但是仍然会有极少量的消息可能丢失。
通过同步双写技术可以完全避免单点,同步双写势必会影响性能,适合对消息可靠性要求极高的场合
通过同步双写技术可以完全避免单点,同步双写势必会影响性能,适合对消息可靠性要求极高的场合
回溯消费
回溯消费是指Consumer已经消费成功的消息,由于业务上需求需要重新消费,
要支持此功能,Broker在向Consumer投递成功消息后,消息仍然需要保留。
要支持此功能,Broker在向Consumer投递成功消息后,消息仍然需要保留。
RocketMQ支持按照时间回溯消费,时间维度精确到毫秒。
事务消息
基于“半消息“实现
基于回查确认的方式保证故障情况下的事物有一致性
如生产者发送“半消息”之后,发生异常或者本地事物失败等情况,一直没有确认“半消息”,
则rocketmq等待一定时间后会调用生产者接口回查数据已确认事物是否已经完成,从而做到事物一致性
则rocketmq等待一定时间后会调用生产者接口回查数据已确认事物是否已经完成,从而做到事物一致性
延迟队列
消息发送到broker后,不会立即被消费,等待特定时间投递给真正的topic
broker有配置项messageDelayLevel,
默认值为“1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”,18个level。
可以配置自定义messageDelayLevel
默认值为“1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”,18个level。
可以配置自定义messageDelayLevel
messageDelayLevel是broker的属性,不属于某个topic。
发消息时,设置delayLevel等级即可:msg.setDelayLevel(level)。level有以下三种情况:
发消息时,设置delayLevel等级即可:msg.setDelayLevel(level)。level有以下三种情况:
- level == 0,消息为非延迟消息
- 1<=level<=maxLevel,消息延迟特定时间,例如level==1,延迟1s
- level > maxLevel,则level== maxLevel,例如level==20,延迟2h
延迟消息会暂存在名为SCHEDULE_TOPIC_XXXX的topic中,并根据delayTimeLevel存入特定的queue,queueId = delayTimeLevel – 1,即一个queue只存相同延迟的消息,保证具有相同发送延迟的消息能够顺序消费。broker会调度地消费SCHEDULE_TOPIC_XXXX,将消息写入真实的topic。
定时消息会在第一次写入和调度写入真实topic时都会计数,因此发送数量、tps都会变高
RocketMQ支持定时消息,但是不支持任意时间精度,支持特定的level(因为相同延时的消息放到同一个队列,如果用户自定义将产生大量队列),例如定时5s,10s,1m等
消息重试
(消费者消费失败)
(消费者消费失败)
Consumer消费消息失败后,要提供一种重试机制,令消息再消费一次
RocketMQ会为每个消费组都设置一个Topic名称为“%RETRY%+consumerGroup”的重试队列(这里需要注意的是,这个Topic的重试队列是针对消费组,而不是针对每个Topic设置的),用于暂时保存因为各种异常而导致Consumer端无法消费的消息。考虑到异常恢复起来需要一些时间,会为重试队列设置多个重试级别,每个重试级别都有与之对应的重新投递延时,重试次数越多投递延时就越大。RocketMQ对于重试消息的处理是先保存至Topic名称为“SCHEDULE_TOPIC_XXXX”的延迟队列中,后台定时任务按照对应的时间进行Delay后重新保存至“%RETRY%+consumerGroup”的重试队列中。
消息重投
(生产者投递消息失败)
(生产者投递消息失败)
生产者在发送消息时,同步消息失败会重投,异步消息有重试,oneway没有任何保证
消息重投保证消息尽可能发送成功、不丢失,但可能会造成消息重复,消息重复在RocketMQ中是无法避免的问题
消息重投策略
同步发送
retryTimesWhenSendFailed
retryTimesWhenSendFailed
同步发送失败重投次数,默认为2,因此生产者会最多尝试发送retryTimesWhenSendFailed + 1次。
不会选择上次失败的broker,尝试向其他broker发送,最大程度保证消息不丢。
超过重投次数,抛出异常,由客户端保证消息不丢。当出现RemotingException、MQClientException和部分MQBrokerException时会重投。
不会选择上次失败的broker,尝试向其他broker发送,最大程度保证消息不丢。
超过重投次数,抛出异常,由客户端保证消息不丢。当出现RemotingException、MQClientException和部分MQBrokerException时会重投。
同步双写
retryAnotherBrokerWhenNotStoreOK
retryAnotherBrokerWhenNotStoreOK
消息刷盘(主或备)超时或slave不可用(返回状态非SEND_OK),是否尝试发送到其他broker,
默认false。十分重要消息可以开启。
默认false。十分重要消息可以开启。
异步发送
retryTimesWhenSendAsyncFailed
retryTimesWhenSendAsyncFailed
异步发送失败重试次数,异步重试不会选择其他broker,仅在同一个broker上做重试,不保证消息不丢。
流量控制
生产者流控
1.commitLog文件被锁时间超过osPageCacheBusyTimeOutMills时,参数默认为1000ms,返回流控。
2.如果开启transientStorePoolEnable == true,且broker为异步刷盘的主机,且transientStorePool中资源不足,拒绝当前send请求,返回流控。
3.broker每隔10ms检查send请求队列头部请求的等待时间,如果超过waitTimeMillsInSendQueue,默认200ms,拒绝当前send请求,返回流控。
4.broker通过拒绝send 请求方式实现流量控制。
2.如果开启transientStorePoolEnable == true,且broker为异步刷盘的主机,且transientStorePool中资源不足,拒绝当前send请求,返回流控。
3.broker每隔10ms检查send请求队列头部请求的等待时间,如果超过waitTimeMillsInSendQueue,默认200ms,拒绝当前send请求,返回流控。
4.broker通过拒绝send 请求方式实现流量控制。
注意
生产者流控,不会尝试消息重投。需要自己实现逻辑
消费者流控
1.超数-消费者本地缓存消息数超过pullThresholdForQueue时,默认1000。
2.超量-消费者本地缓存消息大小超过pullThresholdSizeForQueue时,默认100MB。
3.超时-消费者本地缓存消息跨度超过consumeConcurrentlyMaxSpan时,默认2000。
2.超量-消费者本地缓存消息大小超过pullThresholdSizeForQueue时,默认100MB。
3.超时-消费者本地缓存消息跨度超过consumeConcurrentlyMaxSpan时,默认2000。
消费者流控的结果是降低拉取频率。
死信队列
用于处理无法被正常消费的消息。达到最大重试次数后,若消费依然失败,
则表明消费者在正常情况下无法正确地消费该消息,此时,消息队列 不会立刻将消息丢弃,而是将其发送到该消费者对应的特殊队列中。
则表明消费者在正常情况下无法正确地消费该消息,此时,消息队列 不会立刻将消息丢弃,而是将其发送到该消费者对应的特殊队列中。
可以通过使用console控制台对死信队列中的消息进行重发来使得消费者实例再次进行消费。
架构
Producer
与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从NameServer获取Topic路由信息,并向提供Topic 服务的Master建立长连接,且定时向Master发送心跳(默认30秒)。Producer完全无状态
启动时先跟NameServer集群中的其中一台建立长连接,并从NameServer中获取当前发送的Topic存在哪些Broker上,轮询从队列列表中选择(可设置策略)一个队列,然后与队列所在的Broker建立长连接从而向Broker发消息。
Consumer
与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从NameServer获取Topic路由信息,并向提供Topic服务的Master、Slave建立长连接,且定时向Master、Slave发送心跳(默认30秒)。
Consumer既可以从Master订阅消息,也可以从Slave订阅消息,消费者在向Master拉取消息时,Master服务器会根据拉取偏移量与最大偏移量的距离(判断是否读老消息,产生读I/O),以及从服务器是否可读等因素建议下一次是从Master还是Slave拉取
Consumer既可以从Master订阅消息,也可以从Slave订阅消息,消费者在向Master拉取消息时,Master服务器会根据拉取偏移量与最大偏移量的距离(判断是否读老消息,产生读I/O),以及从服务器是否可读等因素建议下一次是从Master还是Slave拉取
获取当前订阅Topic存在哪些Broker上,然后直接跟Broker建立连接通道
NameServer
Topic路由注册中心,支持Broker的动态注册与发现
通常也是集群的方式部署,各实例间相互不进行信息通讯。Broker是向每一台NameServer注册自己的路由信息,所以每一个NameServer实例上面都保存一份完整的路由信息。当某个NameServer因某种原因下线了,Broker仍然可以向其它NameServer同步其路由信息,Producer,Consumer仍然可以动态感知Broker的路由的信息。
功能
Broker管理
接受Broker集群的注册信息并且保存下来作为路由信息的基本数据。然后提供心跳检测机制,检查Broker是否还存活
路由信息管理
每个NameServer将保存关于Broker集群的整个路由信息和用于客户端查询的队列信息。
然后Producer和Conumser通过NameServer就可以知道整个Broker集群的路由信息,从而进行消息的投递和消费
然后Producer和Conumser通过NameServer就可以知道整个Broker集群的路由信息,从而进行消息的投递和消费
BrokerServer
主要负责消息的存储、投递和查询以及服务高可用保证
子模块
Remoting Module
整个Broker的实体,负责处理来自clients端的请求
Client Manager
负责管理客户端(Producer/Consumer)和维护Consumer的Topic订阅信息
Store Service
提供方便简单的API接口处理消息存储到物理硬盘和查询功能。
HA Service
高可用服务,提供Master Broker 和 Slave Broker之间的数据同步功能。
Index Service
根据特定的Message key对投递到Broker的消息进行索引服务,以提供消息的快速查询。
集群部署
Master与Slave 的对应关系通过指定相同的BrokerName,不同的BrokerId 来定义,BrokerId为0表示Master,非0表示Slave;
Master也可以部署多个。
每个Broker与NameServer集群中的所有节点建立长连接,定时注册Topic信息到所有NameServer
Master也可以部署多个。
每个Broker与NameServer集群中的所有节点建立长连接,定时注册Topic信息到所有NameServer
目前在部署架构上支持一Master多Slave,但只有BrokerId=1的从服务器才会参与消息的读负载
架构设计
存储架构
文件系统
CommitLog
消息主体以及元数据的存储主体,存储Producer端写入的消息主体内容;
消息内容不是定长的
消息内容不是定长的
单个文件大小默认1G ,文件名长度为20位,左边补零,剩余为起始偏移量
比如
00000000000000000000代表了第一个文件,起始偏移量为0,文件大小为1G=1073741824;
当第一个文件写满了,第二个文件为00000000001073741824,起始偏移量为1073741824,以此类推
00000000000000000000代表了第一个文件,起始偏移量为0,文件大小为1G=1073741824;
当第一个文件写满了,第二个文件为00000000001073741824,起始偏移量为1073741824,以此类推
消息主要是顺序写入日志文件,当文件满了,写入下一个文件
ConsumeQueue
作为消费消息的索引,保存了指定Topic下的队列消息在CommitLog中的起始物理偏移量offset,消息大小size和消息Tag的HashCode值;
consumequeue文件可以看成是基于topic的commitlog索引文件
consumequeue文件可以看成是基于topic的commitlog索引文件
consumequeue文件夹的组织方式如下:topic/queue/file三层结构,具体存储路径为:$HOME/store/consumequeue/{topic}/{queueId}/{fileName}
文件采取定长设计,每一个条目共20个字节,分别为8字节的commitlog物理偏移量、4字节的消息长度、8字节tag hashcode,单个文件由30W个条目组成,可以像数组一样随机访问每一个条目,每个ConsumeQueue文件大小约5.72M
Index File
提供了一种可以通过key或时间区间来查询消息的方法
存储位置:$HOME \store\index${fileName}
文件名fileName:以创建时的时间戳命名的
单个文件大小约为400M,可以保存 2000W个索引
底层存储设计为在文件系统中实现HashMap结构,故底层实现为hash索引。
页面缓存与内存映射
页面缓存
对文件进行顺序读写的速度几乎接近于内存的读写速度;
主要原因OS使用PageCache机制对读写访问操作进行了性能优化,将一部分的内存用作PageCache(预读取)
主要原因OS使用PageCache机制对读写访问操作进行了性能优化,将一部分的内存用作PageCache(预读取)
写数据
OS会先写入至Cache内,
随后通过异步的方式由pdflush内核线程将Cache内的数据刷盘至物理磁盘上
随后通过异步的方式由pdflush内核线程将Cache内的数据刷盘至物理磁盘上
读数据
一次读取文件时出现未命中PageCache的情况,
OS从物理磁盘上访问读取文件的同时,会顺序对其他相邻块的数据文件进行预读取
OS从物理磁盘上访问读取文件的同时,会顺序对其他相邻块的数据文件进行预读取
ConsumeQueue逻辑消费队列存储的数据较少,并且是顺序读取
CommitLog消息存储的日志数据文件来说,读取消息内容时候会产生较多的随机访问读取,严重影响性能
优化方向:选择合适的系统IO调度算法
RocketMQ主要通过MappedByteBuffer对文件进行读写操作
利用了NIO中的FileChannel模型将磁盘上的物理文件直接映射到用户态的内存地址中(减少内存拷贝)
将对文件的操作转化为直接对内存地址进行操作,从而极大地提高了文件的读写效率
(正因为需要使用内存映射机制,故RocketMQ的文件存储都使用定长结构来存储,方便一次将整个文件映射至内存)
(正因为需要使用内存映射机制,故RocketMQ的文件存储都使用定长结构来存储,方便一次将整个文件映射至内存)
消息刷盘
同步刷盘
(性能损耗大)
(性能损耗大)
在消息真正持久化至磁盘后RocketMQ的Broker端才会真正返回给Producer端一个成功的ACK响应
异步刷盘
(极端情况会丢失数据)
(极端情况会丢失数据)
能够充分利用OS的PageCache的优势,只要消息写入PageCache即可将成功的ACK返回给Producer端。
消息刷盘采用后台异步线程提交的方式进行,降低了读写延迟,提高了MQ的性能和吞吐量。
消息刷盘采用后台异步线程提交的方式进行,降低了读写延迟,提高了MQ的性能和吞吐量。
通讯机制
概述
(1) Broker启动后需要完成一次将自己注册至NameServer的操作;随后每隔30s时间定时向NameServer上报Topic路由信息。
(2) 消息生产者Producer作为客户端发送消息时候,需要根据消息的Topic从本地缓存的TopicPublishInfoTable获取路由信息。如果没有则更新路由信息会从NameServer上重新拉取,同时Producer会默认每隔30s向NameServer拉取一次路由信息。
(3) 消息生产者Producer根据2)中获取的路由信息选择一个队列(MessageQueue)进行消息发送;Broker作为消息的接收者接收消息并落盘存储。
(4) 消息消费者Consumer根据2)中获取的路由信息,并再完成客户端的负载均衡后,选择其中的某一个或者某几个消息队列来拉取消息并进行消费。
RocketMQ消息队列自定义了通信协议并在Netty的基础之上扩展了通信模块
Reactor多线程设计
消息过滤
Consumer端订阅消息时再做消息过滤的
2种的过滤方式
Tag过滤方式
Consumer端在订阅消息时除了指定Topic还可以指定TAG,如果一个消息有多个TAG,可以用||分隔。
其中,Consumer端会将这个订阅请求构建成一个 SubscriptionData,发送一个Pull消息的请求给Broker端。
Broker端从RocketMQ的文件存储层—Store读取数据之前,会用这些数据先构建一个MessageFilter,然后传给Store。
Store从 ConsumeQueue读取到一条记录后,会用它记录的消息tag hash值去做过滤,由于在服务端只是根据hashcode进行判断,无法精确对tag原始字符串进行过滤,故在消息消费端拉取到消息后,还需要对消息的原始tag字符串进行比对,如果不同,则丢弃该消息,不进行消息消费。
其中,Consumer端会将这个订阅请求构建成一个 SubscriptionData,发送一个Pull消息的请求给Broker端。
Broker端从RocketMQ的文件存储层—Store读取数据之前,会用这些数据先构建一个MessageFilter,然后传给Store。
Store从 ConsumeQueue读取到一条记录后,会用它记录的消息tag hash值去做过滤,由于在服务端只是根据hashcode进行判断,无法精确对tag原始字符串进行过滤,故在消息消费端拉取到消息后,还需要对消息的原始tag字符串进行比对,如果不同,则丢弃该消息,不进行消息消费。
SQL92的过滤方式
和Tag过滤方式一样,只是在Store层的具体过滤过程不太一样,真正的 SQL expression 的构建和执行由rocketmq-filter模块负责的。
每次过滤都去执行SQL表达式会影响效率,所以RocketMQ使用了BloomFilter避免了每次都去执行。SQL92的表达式上下文为消息的属性。
每次过滤都去执行SQL表达式会影响效率,所以RocketMQ使用了BloomFilter避免了每次都去执行。SQL92的表达式上下文为消息的属性。
负载均衡
Producer的负载均衡
发送消息的时候,会先根据Topic找到指定的TopicPublishInfo,在获取了TopicPublishInfo路由信息后,
RocketMQ的客户端在默认方式下selectOneMessageQueue()方法会从TopicPublishInfo中的messageQueueList中选择一个队列(MessageQueue)进行发送消息
RocketMQ的客户端在默认方式下selectOneMessageQueue()方法会从TopicPublishInfo中的messageQueueList中选择一个队列(MessageQueue)进行发送消息
容错策略均在MQFaultStrategy这个类中定义
有一个sendLatencyFaultEnable开关变量,
如果开启,在随机递增取模的基础上,再过滤掉not available的Broker代理。
所谓的"latencyFaultTolerance",是指对之前失败的,按一定的时间做退避。
如果开启,在随机递增取模的基础上,再过滤掉not available的Broker代理。
所谓的"latencyFaultTolerance",是指对之前失败的,按一定的时间做退避。
默认轮询发送消息到consumer queue上
Consumer的负载均衡
两种消费模式(Push/Pull)都是基于拉模式来获取消息的,而在Push模式只是对pull模式的一种封装,其本质实现为消息拉取线程在从服务器拉取到一批消息后,然后提交到消息消费线程池后,又“马不停蹄”的继续向服务器再次尝试拉取消息。如果未拉取到消息,则延迟一下又继续拉取
每个consumer平均分配consumer queue
所有负载均衡都由客户端完成
集群模式下,queue都是只允许分配只一个实例,这是由于如果多个实例同时消费一个queue的消息,由于拉取哪些消息是consumer主动控制的,那样会导致同一个消息在不同的实例下被消费多次,所以算法上都是一个queue只分给一个consumer实例,一个consumer实例可以允许同时分到不同的queue
事物消息
采用2PC模式实现事务,增加一个补偿逻辑来处理二阶段超时或失败的情况
1.提交“半消息”到队列,内部实现为将该消息放到一个特殊的topic下,使其cumsumer不可见;
2.commit的时候直接修改message的topic索引即可;
3.rollback不是真正的删除message,由于顺序读写的方式,无法真正删除,只是修改message的一个特殊标志位,标识改message作废。
2.commit的时候直接修改message的topic索引即可;
3.rollback不是真正的删除message,由于顺序读写的方式,无法真正删除,只是修改message的一个特殊标志位,标识改message作废。
流程
消息发送和提交逻辑
(1) 发送消息(half消息)。
(2) 服务端响应消息写入结果。
(3) 根据发送结果执行本地事务(如果写入失败,此时half消息对业务不可见,本地逻辑不执行)。
(4) 根据本地事务状态执行Commit或者Rollback(Commit操作生成消息索引,消息对消费者可见)
(2) 服务端响应消息写入结果。
(3) 根据发送结果执行本地事务(如果写入失败,此时half消息对业务不可见,本地逻辑不执行)。
(4) 根据本地事务状态执行Commit或者Rollback(Commit操作生成消息索引,消息对消费者可见)
补偿流程
(解决消息Commit或者Rollback发生超时或者失败的情况)
(解决消息Commit或者Rollback发生超时或者失败的情况)
(1) 对没有Commit/Rollback的事务消息(pending状态的消息),从服务端发起一次“回查”
(2) Producer收到回查消息,检查回查消息对应的本地事务的状态
(3) 根据本地事务状态,重新Commit或者Rollback
(2) Producer收到回查消息,检查回查消息对应的本地事务的状态
(3) 根据本地事务状态,重新Commit或者Rollback
默认回查15次,如果15次回查还是无法得知事务状态,rocketmq默认回滚该消息
消息查询
按照MessageId查询
MessageId的长度总共有16字节,其中包含了消息存储主机地址(IP地址和端口),消息Commit Log offset。
Client端从MessageId中解析出Broker的地址(IP地址和端口)和Commit Log的偏移地址后封装成一个RPC请求后通过Remoting通信层发送请求;
读取消息的过程用其中的 commitLog offset 和 size 去 commitLog 中找到真正的记录并解析成一个完整的消息返回
读取消息的过程用其中的 commitLog offset 和 size 去 commitLog 中找到真正的记录并解析成一个完整的消息返回
按照MessageKey查询
基于RocketMQ的IndexFile索引文件来实现的
过期消息清理
4.6版本默认48小时后会删除不再使用的CommitLog文件
检查这个文件最后访问时间;判断是否大于过期时间指定时间删除;
默认凌晨4点
默认凌晨4点
未被消费的消息不会存在超时删除这情况。
kafka
特性
高吞吐、低延迟
每秒几十万条消息,延迟最低只有几毫秒
可扩展
kafka集群支持热扩展
持久性、可靠性
消息被持久化到本地磁盘,并且支持数据备份防止丢失
容错性
允许集群中节点失败
高并发
支持数千个客户端同时读写
比rocketMq高
要点
至此以集合未单位进行发送消息,在此基础上,kafka还支持对消息集合进行压缩,减少传输的数据量,减少对网络传输的压力
消费端采用拉取的方式消费
顺序性
Kafka只保证一个Partition内的消息的有序性。
现状
大数据领域的实时计算,日志采集的行业标准
消息可靠性
At most once
消息可能丢失,但绝不会重复传输
At least one
消息绝对不会丢失,但可能会重复传输
Exactly once
每条消息肯定会被传输一次且仅传输一次
架构
zookeeper协调控制
管理broker与consumer的动态加入与离开。
触发负载均衡
维护消费关系及每个partition的消费信息
Kafka会为每一个Consumer Group保留一些metadata信息——当前消费的消息的position,也即offset。记录在zk中
broker
controller
(Leader)
(Leader)
选举
所有Broker节点再zookeeper上注册一个临时节点,
唯一成功的哪个成为leader,称为Broker Controller, 其他的称为Broker follower
唯一成功的哪个成为leader,称为Broker Controller, 其他的称为Broker follower
当leader Partition分区宕掉之后,由Broker Controller负责在有用分区(ISR)中选择新的Leader
可以动态增加broker
无状态
没有副本机制,不保存订阅者状态,由订阅者自己保存
Topic
Partition
概述
topic中的数据分割为一个或多个partition
每个topic至少有一个partition
物理上一个partion对应一个文件夹,该文件夹存储所有消息和索引文件
物理上每个partition中的数据使用多个segment文件存储
提供两种策略删除旧数据
基于时间删除
基于文件大小删除
partition中的数据是有序的,不同partition间的数据丢失了数据的顺序;
需要严格保证消息的消费顺序的场景下,需要将partition数目设为1
需要严格保证消息的消费顺序的场景下,需要将partition数目设为1
角色
leader
每个partition有多个副本,其中有且仅有一个作为Leader,Leader是当前负责数据的读写的partition。
Leader Election算法
(有两种方式实现基于ZooKeeper的分布式锁)
(有两种方式实现基于ZooKeeper的分布式锁)
临时顺序节点
所有客户端在某个目录下创建自己的临时顺序节点,只有序号最小的才获得锁
节点名称唯一性
多个客户端创建一个节点,只有成功创建节点的客户端才能获得锁
Kafka 0.8.*的Leader Election方案解决了上述问题,它在所有broker中选出一个controller,所有Partition的Leader选举都由controller决定。
controller会将Leader的改变直接通过RPC的方式(比ZooKeeper Queue的方式更高效)通知需为此作为响应的Broker。
同时controller也负责增删Topic以及Replica的重新分配。
controller会将Leader的改变直接通过RPC的方式(比ZooKeeper Queue的方式更高效)通知需为此作为响应的Broker。
同时controller也负责增删Topic以及Replica的重新分配。
ISR(in-sync replicas)
leader负责跟踪所有follower的状态
follower
Follower跟随Leader,所有写请求都通过Leader路由,数据变更会广播给所有Follower
当Follower与Leader挂掉、卡住或者同步太慢,leader会把这个follower从“in sync replicas”(ISR)列表中删除,重新创建一个Follower
从Leader中复制数据,不与producer/consumer交互
在broker上的分配策略
1.将所有Broker(假设共n个Broker)和待分配的Partition排序
2.将第i个Partition分配到第(i mod n)个Broker上
3.将第i个Partition的第j个Replica分配到第((i + j) mode n)个Broker上
2.将第i个Partition分配到第(i mod n)个Broker上
3.将第i个Partition的第j个Replica分配到第((i + j) mode n)个Broker上
消息同步传输策略
Producer只将该消息发送到该Partition的Leader。Leader会将该消息写入其本地Log。
每个Follower都从Leader pull数据。Follower在收到该消息并写入其Log后,向Leader发送ACK。
一旦Leader收到了ISR中的所有Replica的ACK,该消息就被认为已经commit了,Leader将向Producer发送ACK
每个Follower都从Leader pull数据。Follower在收到该消息并写入其Log后,向Leader发送ACK。
一旦Leader收到了ISR中的所有Replica的ACK,该消息就被认为已经commit了,Leader将向Producer发送ACK
为了提高性能
每个Follower在接收到数据后就立马向Leader发送ACK,而非等到数据写入Log中
replication是否“活着”包含两个条件
一是它必须维护与ZooKeeper的session(这个通过ZooKeeper的Heartbeat机制来实现)
二是Follower必须能够及时将Leader的消息复制过来,不能“落后太多”
Producer
可靠性投递
partition ack
partition ack
ack = -1
表示需要所有partition成功,才返回成功
ack = 0
表示发送即代表发送成功,不等broker返回确认信息
ack = 1
producer写道partition leader成功后,broker就返回
ack = 2
表示有2个partition成功,就返回成功
producer路由
1、 指定了 partition,则直接使用;
2、 未指定 partition 但指定 key,通过对 key 的 value 进行hash 选出一个 patition
3、 partition 和 key 都未指定,使用轮询选出一个 patition。
2、 未指定 partition 但指定 key,通过对 key 的 value 进行hash 选出一个 patition
3、 partition 和 key 都未指定,使用轮询选出一个 patition。
Consumer
消息确认方式
手动commit
zookeeper中保存该Consumer在该Partition中读取消息的offset
自动commit
采用pull的消费模式,可以逐条消费也可以批量消费
Consumer Group
每个Consumer属于一个特定的Consumer Group
可为每个Consumer指定group name,若不指定group name则属于默认的group
message
消息有一个定长的header和边长的字节数组组成
单个消息的大小无限制
推荐消息大小不要超过1MB
如果某个Partition的所有Replica都宕机了,就无法保证数据不丢失了。这种情况下有两种可行的方案
等待ISR中的任一个Replica“活”过来,并且选它作为Leader
可能回永远起不起来
选择第一个“活”过来的Replica(不一定是ISR中的)作为Leader
将会丢失没有同步到的数据
概述
优点
异步处理、应用解耦、流量消峰
缺点
系统可用性减低
需要依赖中间件服务(存在该服务崩溃的风险)
系统复杂度提高
如果保证数据一致性(事务)
如何保证不被重复消费
常见问题
消息的顺序问题
方案一
解决方案
保证生产者-mqServer-消费者一对一关系
缺点
并行度低
只要消费端出现问题,就会导致整个处理流程阻塞
方案二
通过合理的设计或者将问题分解来规避
不关注乱序的应用实际大量存在
从业务层面来保证消息的顺序而不仅仅是依赖于消息系统,是一种更合理的方式。
消息的重复问题
根本原因
网络不达
解决办法
消费者保证幂等性
利用一张日志表来记录已经处理成功的消息的 ID,如果新到的消息 ID 已经在日志表中,那么就不再处理这条消息。
消息挤压
临时紧急扩容
可以给消息设置过期时间
rabbitMq、kafka支持 AMQP协议,可以集成企业级消息总线,可以和spring bus集成
rabbitMq
概述
性能比activeMq高
现状
erlang语言阻碍了二次开发,不过项目稳定,社区活跃
要点
可以控制每一次给消费者消费的消息数量(prefetchCount=???),控制消费者压力
消费端,采用推送的方式消费
基本概念
- Broker: 简单来说就是消息队列服务器实体
- Exchange: 消息交换机,它指定消息按什么规则,路由到哪个队列
- Queue: 消息队列载体,每个消息都会被投入到一个或多个队列
- Binding: 绑定,它的作用就是把exchange和queue按照路由规则绑定起来 (最大长度255字节)
- Routing Key: 路由关键字,exchange根据这个关键字进行消息投递(最大长度255字节)
- VHost: vhost 可以理解为虚拟 broker ,即 mini-RabbitMQ server。其内部均含有独立的 queue、exchange 和 binding 等,但最最重要的是,其拥有独立的权限系统,可以做到 vhost 范围的用户控制。当然,从 RabbitMQ 的全局角度,vhost 可以作为不同权限隔离的手段(一个典型的例子就是不同的应用可以跑在不同的 vhost 中)。
- Producer: 消息生产者,就是投递消息的程序
- Consumer: 消息消费者,就是接受消息的程序
- Channel: 消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务
顺序性保证
一个queue 一个 consumer
消息挤压解决办法
临时紧急扩容
Mq消息失效
可以设置消息的过期时间也就是TTL
传输协议
TCP协议连接和销毁成本大,rabbitMq采用信道方式传输数据,
信道是创建在TCP连接内的虚拟连接,且每条TCP连接上的信道连接没有限制
信道是创建在TCP连接内的虚拟连接,且每条TCP连接上的信道连接没有限制
交换器
交换器类型
direct
路由键完全匹配,消息被投递到对应的队列,每个amqp的实现都必须有一个direct交换器,包含一个空白字符串名称的默认交换器。
声明一个队列时,会自动绑定到默认交换器,并且以队列名称作为路由键:channel->basic_public($msg,’ ’,’queue-name’)
声明一个队列时,会自动绑定到默认交换器,并且以队列名称作为路由键:channel->basic_public($msg,’ ’,’queue-name’)
fanout
消息广播到绑定的队列,不处理路由键
topic
通过使用“*”和“#”,使来自不同源头的消息到达同一个队列,”.”将路由键分为了几个标识符,“*”匹配1个,“#”匹配一个或多个
headers
header模式取消routingkey,使用header中的 key/value(键值对)匹配队列
备用交换器
在第一次声明交换器时被指定,用来提供一种预先存在的交换器,如果主交换器无法路由消息,那么消息将被路由到这个新的备用交换器。
备用交换器就是普通的交换器,没有任何特殊的地方。
使用备用交换器,向往常一样,声明Queue和备用交换器,把Queue绑定到备用交换器上。
然后在声明主交换器时,通过交换器的参数,alternate-exchange,,将备用交换器设置给主交换器。
然后在声明主交换器时,通过交换器的参数,alternate-exchange,,将备用交换器设置给主交换器。
死信交换器DLX
投递消息被拒绝后的一个可选行为,往往用在对问题消息的诊断上。
死信交换器仍然只是一个普通的交换器,创建时并没有特别要求和操作。
在创建队列的时候,声明该交换器将用作保存被拒绝的消息即可,
相关的参数是x-dead-letter-exchange。
在创建队列的时候,声明该交换器将用作保存被拒绝的消息即可,
相关的参数是x-dead-letter-exchange。
包含情况
消息被拒绝,并且设置 requeue 参数为 false
消息过期
队列达到最大长度
队列
临时队列
自动删除队列
当最后一个消费者也断开连接时,队列将会被删除
属性auto-delete标识为true即可
单消费者队列
普通队列允许的消费者没有限制,多个消费者绑定到多个队列时,RabbitMQ会采用轮询进行投递。
如果需要消费者独占队列,在队列创建的时候,设定属性exclusive为true。
如果需要消费者独占队列,在队列创建的时候,设定属性exclusive为true。
自动过期队列
指队列在超过一定时间没使用,会被从RabbitMQ中被删除。
过期的定义
- 一定时间内没有Get操作发生
- 没有Consumer连接在队列上
- 特别的:就算一直有消息进入队列,也不算队列在被使用。
- 通过声明队列时,设定x-expires参数即可,单位毫秒。
永久队列
(普通队列)
(普通队列)
队列的持久性
持久化队列会被保存在磁盘中,固定并持久的存储,
当Rabbit服务重启后,该队列会保持原来的状态在RabbitMQ中被管理
当Rabbit服务重启后,该队列会保持原来的状态在RabbitMQ中被管理
队列级别消息过期
为每个队列设置消息的超时时间。
只要给队列设置x-message-ttl 参数,就设定了该队列所有消息的存活时间,时间单位是毫秒。
如果声明队列时指定了死信交换器,则过期消息会成为死信消息。
只要给队列设置x-message-ttl 参数,就设定了该队列所有消息的存活时间,时间单位是毫秒。
如果声明队列时指定了死信交换器,则过期消息会成为死信消息。
消息
存活时间
当队列消息的TTL 和消息TTL都被设置,时间短的TTL设置生效。
如果将一个过期消息发送给RabbitMQ,该消息不会路由到任何队列,而是直接丢弃。
如果将一个过期消息发送给RabbitMQ,该消息不会路由到任何队列,而是直接丢弃。
RabbitMQ只对处于队头的消息判断是否过期(即延迟判断,不会扫描队列)
持久化
将队列和交换器的durable属性设为true,缺省为false,
还需要将消息在发布前,将投递模式设置为2。消息要持久化,必须要有持久化的队列、交换器和投递模式都为2。
还需要将消息在发布前,将投递模式设置为2。消息要持久化,必须要有持久化的队列、交换器和投递模式都为2。
消息发送方式
1.不做任何配置
生产者不知道消费者是否真正到达rabbitMq服务器
2.失败通知
失败才通知生产者,成功则不通知
3.事务
主要是对信道的设置,
分为:启动事务、提交事务、回滚事务;
会伴随严重的性能问题
分为:启动事务、提交事务、回滚事务;
会伴随严重的性能问题
4.发送确认模式;
该模式比事务模式轻,性能消耗几乎不计
消息不可路由时
消息可路由时
消息获取方式
拉取Get
属于一种轮询模型,发送一次get请求,获得一个消息
如果没有消息会返回一个表示为空的回复
性能低,即使没有数据也要不断的盲循
推送Consume
注册一个消费者后,RabbitMQ会在消息可用时,自动将消息进行推送给消费者
Qos预取模式
(批量消费)
(批量消费)
在确认消息被接收之前,消费者可以预先要求接收一定数量的消息,在处理完一定数量的消息后,批量进行确认。
如果消费者应用程序在确认消息之前崩溃,则所有未确认的消息将被重新发送给其他消费者。
如果消费者应用程序在确认消息之前崩溃,则所有未确认的消息将被重新发送给其他消费者。
消息拒绝策略
Reject
(单个拒绝)
(单个拒绝)
在拒绝消息时,可以使用requeue标识,告诉RabbitMQ是否需要重新发送给别的消费者。
不重新发送,一般这个消息就会被RabbitMQ丢弃。Reject一次只能拒绝一条消息。
不重新发送,一般这个消息就会被RabbitMQ丢弃。Reject一次只能拒绝一条消息。
Nack
(批量拒绝)
(批量拒绝)
对Reject的扩展,可以一次性拒绝多个消息
可靠性传输
消息必达性保证
发送方确认模式
消息到达队列的确认
接收方确认机制
接收方确认消息
数据丢失原因
生产者丢失
消息列表丢失
消费者丢失
解决办法
transaction和confirm模式来确保生产者不丢消息
失败通知
它只会让RabbitMQ向你通知失败,而不会通知成功。
如果消息正确路由到队列,则发布者不会受到任何通知。
如果消息正确路由到队列,则发布者不会受到任何通知。
发送消息时设置mandatory标志 : mandatory=true
缺点: 无法确保发布消息一定是成功的,因为通知失败的消息可能会丢失。
transaction机制
发送消息前,开启事务(channel.txSelect()),然后发送消息,如果发送过程中出现什么异常,事务就会回滚(channel.txRollback()),
缺点:吞吐量下降; 一般选择使用confirm模式替代
缺点:吞吐量下降; 一般选择使用confirm模式替代
confirm模式
所有在该信道上发布的消息都将会被指派一个唯一的ID(从1开始),
一旦消息被投递到所有匹配的队列之后;rabbitMQ就会发送一个ACK给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了;如果rabbitMQ没能处理该消息,则会发送一个Nack消息给你,你可以进行重试操作。
一旦消息被投递到所有匹配的队列之后;rabbitMQ就会发送一个ACK给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了;如果rabbitMQ没能处理该消息,则会发送一个Nack消息给你,你可以进行重试操作。
三种实现方式
channel.waitForConfirms()普通发送方确认模式;消息到达交换器,就会返回true。
channel.waitForConfirmsOrDie()批量确认模式;使用同步方式等所有的消息发送之后才会执行后面代码,
只要有一个消息未到达交换器就会抛出IOException异常。
只要有一个消息未到达交换器就会抛出IOException异常。
channel.addConfirmListener()异步监听发送方确认模式;
消息队列持久化
开启消息接收确认模式,处理消息成功后,手动回复确认消息
工作模式
Simple模式(最简单的收发模式)
点对点模式
一个生产者,一个消费者
Work工作模式(资源的竞争)
一个生产者,多个消费者,每个消费者获取到的消息唯一
(竞争消费者模式(默认循环发送给每个消费者))
(竞争消费者模式(默认循环发送给每个消费者))
Publish/Subscribe发布订阅模式(共享资源)
一个生产者发送的消息会被多个消费者获取(发布一次,消费多个)
Routing路由模式
发送消息到交换机并且要指定路由key ,消费者将队列绑定到交换机时需要指定路由key
Topic主题模式(路由模式的一种)
将路由键和某模式进行匹配,此时队列需要绑定在一个模式上,
“#”匹配一个词或多个词,“*”只匹配一个词
“#”匹配一个词或多个词,“*”只匹配一个词
rpc
集群
主备模式(Warren)
主节点提供读写;从节点不提供服务,只备份数据,当主节点不可用时,完成主从切换
适用
并发与数据量都不高的情况
普通集群
多台机器上启动多个 RabbitMQ 实例,每个机器启动一个。
创建的 queue,只会放在一个 RabbitMQ 实例上,
只是每个实例都同步 queue 的元数据(元数据可以认为是 queue 的一些配置信息,通过元数据,可以找到 queue 所在实例)。
消费的时候,实际上如果连接到了另外一个实例,那个实例会从 queue 所在实例上拉取数据过来。
这方案主要是提高吞吐量的,让集群中多个节点来服务某个 queue 的读写操作
创建的 queue,只会放在一个 RabbitMQ 实例上,
只是每个实例都同步 queue 的元数据(元数据可以认为是 queue 的一些配置信息,通过元数据,可以找到 queue 所在实例)。
消费的时候,实际上如果连接到了另外一个实例,那个实例会从 queue 所在实例上拉取数据过来。
这方案主要是提高吞吐量的,让集群中多个节点来服务某个 queue 的读写操作
镜像模式(mirro)
保证数据100%不丢失
用 KeepAlived 做了 HA-Proxy 的高可用,建议有 3 个及以上节点的 MQ 服务,
消息发送到主节点上,主节点通过 mirror 队列把数据同步到其他的 MQ 节点,这样来实现其高可靠。
消息发送到主节点上,主节点通过 mirror 队列把数据同步到其他的 MQ 节点,这样来实现其高可靠。
多中心模式
远程模式(shovel)
描述
双活的一种模式
简称 shovel 模式,
所谓的 shovel 就是把消息进行不同数据中心的复制工作,可以跨地域的让两个 MQ 集群互联,远距离通信和复制
所谓的 shovel 就是把消息进行不同数据中心的复制工作,可以跨地域的让两个 MQ 集群互联,远距离通信和复制
当我们的消息到达 exchange,它会判断当前的负载情况以及设定的阈值,
如果负载不高就把消息放到我们正常的 warehouse_goleta 队列中,
如果负载过高了,就会放到 backup_orders 队列中。
backup_orders 队列通过 shovel 插件与另外的 MQ 集群进行同步数据,把消息发到第二个 MQ 集群上。
如果负载不高就把消息放到我们正常的 warehouse_goleta 队列中,
如果负载过高了,就会放到 backup_orders 队列中。
backup_orders 队列通过 shovel 插件与另外的 MQ 集群进行同步数据,把消息发到第二个 MQ 集群上。
多活模式
rabbitMQ 部署架构采用双中心模式(多中心),那么在两套(或多套)数据中心各部署一套 rabbitMQ 集群,
各中心的rabbitMQ 服务除了需要为业务提供正常的消息服务外,中心之间还需要实现部分队列消息共享。
各中心的rabbitMQ 服务除了需要为业务提供正常的消息服务外,中心之间还需要实现部分队列消息共享。
federation 插件
federation 插件可以在 brokers 或者 cluster 之间传输消息,连接的双方可以使用不同的 users 和 virtual hosts,双方也可以使用不同版本的 rabbitMQ 和 erlang。federation 插件使用 AMQP 协议通信,可以接受不连续的传输。federation 不是建立在集群上的,而是建立在单个节点上的
zeroMq
比kafka高
订阅形式
点对点(p2p)
现状
0 条评论
下一页