消息中间件(MQ)
2021-03-26 16:03:12 161 举报
AI智能生成
消息中间件(MQ)是一种基于异步通信、分布式的消息传递架构,用于在分布式系统中实现应用程序之间的解耦、缓冲和削峰填谷。它通过将消息发送者与接收者分离,使得系统具有更高的可扩展性、可靠性和灵活性。消息中间件通常采用发布-订阅模式,支持多种消息传输协议,如AMQP、MQTT和STOMP等。常见的消息中间件有RabbitMQ、Kafka、ActiveMQ和RocketMQ等。
作者其他创作
大纲/内容
吞吐量低
点对点(p2p)
广播(发布-订阅)
订阅形式
没有经过大规模吞吐量场景验证,社区不活跃
现状
基于内存的队列,极其成熟
activeMq
比rabbitMq高
基于topic/messageTag以及按照消息类型、属性进行正则匹配的发布订阅模
社区活跃度不算太高
国内相对活跃
概述
没有实现优先级队列,但可以通过定义高优先级队列和低优先级队列的方式来实现
优先级队列Message Priority
顺序队列message order
要点
同步发送
异步发送
消费者通过同一个消费队列收到的消息是有顺序的,不同消息队列收到的消息则可能是无顺序的。
普通顺序消息
消费者收到的所有消息均是有顺序的
严格顺序消息
顺序发送
单向发送
生产者消息推送方式
需要Broker返回确认消息
客户端不断的轮询请求服务端,来获取新的消息。
pull
只要有数据Broker就会一直推,不关注消费端是否能够处理
客户端与服务端建立连接后,当服务端有消息时,将消息推送到客户端。
非真正意义推送,基于长轮询实现
push(默认)
消费者消费消息两种方式
相同Consumer Group的每个Consumer实例平均分摊消息(只有一个能消费)
集群消费
相同Consumer Group的每个Consumer实例都接收全量的消息
广播消费
两种消费模式
表示一类消息的集合,每个主题包含若干条消息,每条消息只能属于一个主题,是RocketMQ进行消息订阅的基本单位。
主题(Topic)
充当路由消息的提供者。生产者或消费者能够通过Nameserver查找各主题相应的Broker IP列表。多个Namesrver实例组成集群,但相互独立,没有信息交换。
名字服务器(name server)
每条消息必须属于一个主题。RocketMQ中每个消息拥有唯一的Message ID,且可以携带具有业务标识的Key。系统提供了通过Message ID和Key查询消息的功能。
消息(Message)
为消息设置的标志,用于同一主题下区分不同类型的消息。来自同一业务单元的消息,可以根据不同业务目的在同一主题下设置不同标签。标签能够有效地保持代码的清晰度和连贯性,并优化RocketMQ提供的查询系统。消费者可以根据Tag实现对不同子主题的不同消费逻辑,实现更好的扩展性。
标签(Tag)
基本概念
指定的一个 Topic,所有消息按照严格的先入先出(FIFO)的顺序进行发布和消费。
性能要求不高,所有的消息严格按照 FIFO 原则进行消息发布和消费的场景
适用场景
全局顺序(严格顺序)
指定的一个 Topic,所有消息根据 sharding key 进行区块分区。 同一个分区内的消息按照严格的 FIFO 顺序进行发布和消费。Sharding key 是顺序消息中用来区分不同分区的关键字段,和普通消息的 Key 是完全不同的概念。
性能要求高,以 sharding key 作为分区字段,在同一个区块中严格的按照 FIFO 原则进行消息发布和消费的场景。
分区顺序(普通顺序)
消息顺序
消费者可以根据Tag进行消息过滤,也支持自定义属性过滤。消息过滤目前是在Broker端实现的,优点:减少了对于Consumer无用消息的网络传输;缺点:增加了Broker的负担、而且实现相对复杂。
消息过滤
1.Broker非正常关闭2.Broker异常Crash3.OS Crash4.机器掉电,但是能立即恢复供电情况5.机器无法开机(可能是cpu、主板、内存等关键设备损坏)6.磁盘设备损坏
影响可靠性原因
1)、2)、3)、4) 四种情况都属于硬件资源可立即恢复情况,RocketMQ在这四种情况下能保证消息不丢,或者丢失少量数据(依赖刷盘方式是同步还是异步)。
5),6)通过异步复制,可保证99%的消息不丢,但是仍然会有极少量的消息可能丢失。通过同步双写技术可以完全避免单点,同步双写势必会影响性能,适合对消息可靠性要求极高的场合
对策
消息可靠性
回溯消费是指Consumer已经消费成功的消息,由于业务上需求需要重新消费,要支持此功能,Broker在向Consumer投递成功消息后,消息仍然需要保留。
RocketMQ支持按照时间回溯消费,时间维度精确到毫秒。
回溯消费
基于“半消息“实现
如生产者发送“半消息”之后,发生异常或者本地事物失败等情况,一直没有确认“半消息”,则rocketmq等待一定时间后会调用生产者接口回查数据已确认事物是否已经完成,从而做到事物一致性
基于回查确认的方式保证故障情况下的事物有一致性
事务消息
消息发送到broker后,不会立即被消费,等待特定时间投递给真正的topic
broker有配置项messageDelayLevel,默认值为“1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”,18个level。可以配置自定义messageDelayLevel
messageDelayLevel是broker的属性,不属于某个topic。发消息时,设置delayLevel等级即可:msg.setDelayLevel(level)。level有以下三种情况:level == 0,消息为非延迟消息1<=level<=maxLevel,消息延迟特定时间,例如level==1,延迟1slevel > maxLevel,则level== maxLevel,例如level==20,延迟2h
延迟消息会暂存在名为SCHEDULE_TOPIC_XXXX的topic中,并根据delayTimeLevel存入特定的queue,queueId = delayTimeLevel – 1,即一个queue只存相同延迟的消息,保证具有相同发送延迟的消息能够顺序消费。broker会调度地消费SCHEDULE_TOPIC_XXXX,将消息写入真实的topic。
定时消息会在第一次写入和调度写入真实topic时都会计数,因此发送数量、tps都会变高
RocketMQ支持定时消息,但是不支持任意时间精度,支持特定的level(因为相同延时的消息放到同一个队列,如果用户自定义将产生大量队列),例如定时5s,10s,1m等
延迟队列
Consumer消费消息失败后,要提供一种重试机制,令消息再消费一次
RocketMQ会为每个消费组都设置一个Topic名称为“%RETRY%+consumerGroup”的重试队列(这里需要注意的是,这个Topic的重试队列是针对消费组,而不是针对每个Topic设置的),用于暂时保存因为各种异常而导致Consumer端无法消费的消息。考虑到异常恢复起来需要一些时间,会为重试队列设置多个重试级别,每个重试级别都有与之对应的重新投递延时,重试次数越多投递延时就越大。RocketMQ对于重试消息的处理是先保存至Topic名称为“SCHEDULE_TOPIC_XXXX”的延迟队列中,后台定时任务按照对应的时间进行Delay后重新保存至“%RETRY%+consumerGroup”的重试队列中。
消息重试(消费者消费失败)
生产者在发送消息时,同步消息失败会重投,异步消息有重试,oneway没有任何保证
消息重投保证消息尽可能发送成功、不丢失,但可能会造成消息重复,消息重复在RocketMQ中是无法避免的问题
同步发送失败重投次数,默认为2,因此生产者会最多尝试发送retryTimesWhenSendFailed + 1次。不会选择上次失败的broker,尝试向其他broker发送,最大程度保证消息不丢。超过重投次数,抛出异常,由客户端保证消息不丢。当出现RemotingException、MQClientException和部分MQBrokerException时会重投。
同步发送retryTimesWhenSendFailed
消息刷盘(主或备)超时或slave不可用(返回状态非SEND_OK),是否尝试发送到其他broker,默认false。十分重要消息可以开启。
同步双写retryAnotherBrokerWhenNotStoreOK
异步发送失败重试次数,异步重试不会选择其他broker,仅在同一个broker上做重试,不保证消息不丢。
异步发送retryTimesWhenSendAsyncFailed
消息重投策略
消息重投(生产者投递消息失败)
1.commitLog文件被锁时间超过osPageCacheBusyTimeOutMills时,参数默认为1000ms,返回流控。2.如果开启transientStorePoolEnable == true,且broker为异步刷盘的主机,且transientStorePool中资源不足,拒绝当前send请求,返回流控。3.broker每隔10ms检查send请求队列头部请求的等待时间,如果超过waitTimeMillsInSendQueue,默认200ms,拒绝当前send请求,返回流控。4.broker通过拒绝send 请求方式实现流量控制。
生产者流控,不会尝试消息重投。需要自己实现逻辑
注意
生产者流控
1.超数-消费者本地缓存消息数超过pullThresholdForQueue时,默认1000。2.超量-消费者本地缓存消息大小超过pullThresholdSizeForQueue时,默认100MB。3.超时-消费者本地缓存消息跨度超过consumeConcurrentlyMaxSpan时,默认2000。
消费者流控的结果是降低拉取频率。
消费者流控
流量控制
用于处理无法被正常消费的消息。达到最大重试次数后,若消费依然失败,则表明消费者在正常情况下无法正确地消费该消息,此时,消息队列 不会立刻将消息丢弃,而是将其发送到该消费者对应的特殊队列中。
可以通过使用console控制台对死信队列中的消息进行重发来使得消费者实例再次进行消费。
死信队列
特性
与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从NameServer获取Topic路由信息,并向提供Topic 服务的Master建立长连接,且定时向Master发送心跳(默认30秒)。Producer完全无状态
启动时先跟NameServer集群中的其中一台建立长连接,并从NameServer中获取当前发送的Topic存在哪些Broker上,轮询从队列列表中选择(可设置策略)一个队列,然后与队列所在的Broker建立长连接从而向Broker发消息。
Producer
与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从NameServer获取Topic路由信息,并向提供Topic服务的Master、Slave建立长连接,且定时向Master、Slave发送心跳(默认30秒)。Consumer既可以从Master订阅消息,也可以从Slave订阅消息,消费者在向Master拉取消息时,Master服务器会根据拉取偏移量与最大偏移量的距离(判断是否读老消息,产生读I/O),以及从服务器是否可读等因素建议下一次是从Master还是Slave拉取
获取当前订阅Topic存在哪些Broker上,然后直接跟Broker建立连接通道
Consumer
Topic路由注册中心,支持Broker的动态注册与发现
通常也是集群的方式部署,font color=\"#f15a23\
接受Broker集群的注册信息并且保存下来作为路由信息的基本数据。然后提供心跳检测机制,检查Broker是否还存活
Broker管理
每个NameServer将保存关于Broker集群的整个路由信息和用于客户端查询的队列信息。然后Producer和Conumser通过NameServer就可以知道整个Broker集群的路由信息,从而进行消息的投递和消费
路由信息管理
功能
NameServer
主要负责消息的存储、投递和查询以及服务高可用保证
整个Broker的实体,负责处理来自clients端的请求
Remoting Module
负责管理客户端(Producer/Consumer)和维护Consumer的Topic订阅信息
Client Manager
提供方便简单的API接口处理消息存储到物理硬盘和查询功能。
Store Service
高可用服务,提供Master Broker 和 Slave Broker之间的数据同步功能。
HA Service
根据特定的Message key对投递到Broker的消息进行索引服务,以提供消息的快速查询。
Index Service
子模块
Master与Slave 的对应关系通过指定相同的BrokerName,不同的BrokerId 来定义,BrokerId为0表示Master,非0表示Slave;Master也可以部署多个。每个Broker与NameServer集群中的所有节点建立长连接,定时注册Topic信息到所有NameServer
目前在部署架构上支持一Master多Slave,但只有BrokerId=1的从服务器才会参与消息的读负载
集群部署
span style=\
架构
消息主体以及元数据的存储主体,存储Producer端写入的消息主体内容;消息内容不是定长的
比如00000000000000000000代表了第一个文件,起始偏移量为0,文件大小为1G=1073741824;当第一个文件写满了,第二个文件为00000000001073741824,起始偏移量为1073741824,以此类推
单个文件大小默认1G ,文件名长度为20位,左边补零,剩余为起始偏移量
消息主要是顺序写入日志文件,当文件满了,写入下一个文件
CommitLog
作为消费消息的索引,保存了指定Topic下的队列消息在CommitLog中的起始物理偏移量offset,消息大小size和消息Tag的HashCode值;consumequeue文件可以看成是基于topic的commitlog索引文件
consumequeue文件夹的组织方式如下:topic/queue/file三层结构,具体存储路径为:$HOME/store/consumequeue/{topic}/{queueId}/{fileName}
文件采取定长设计,每一个条目共20个字节,分别为8字节的commitlog物理偏移量、4字节的消息长度、8字节tag hashcode,单个文件由30W个条目组成,可以像数组一样随机访问每一个条目,每个ConsumeQueue文件大小约5.72M
ConsumeQueue
提供了一种可以通过key或时间区间来查询消息的方法
存储位置:$HOME \\store\\index${fileName}
文件名fileName:以创建时的时间戳命名的
单个文件大小约为400M,可以保存 2000W个索引
底层存储设计为在文件系统中实现HashMap结构,故底层实现为hash索引。
Index File
文件系统
OS会先写入至Cache内,随后通过异步的方式由pdflush内核线程将Cache内的数据刷盘至物理磁盘上
写数据
一次读取文件时出现未命中PageCache的情况,OS从物理磁盘上访问读取文件的同时,会顺序对其他相邻块的数据文件进行预读取
读数据
对文件进行顺序读写的速度几乎接近于内存的读写速度;主要原因OS使用PageCache机制对读写访问操作进行了性能优化,将一部分的内存用作PageCache(预读取)
页面缓存
ConsumeQueue逻辑消费队列存储的数据较少,并且是顺序读取
优化方向:选择合适的系统IO调度算法
CommitLog消息存储的日志数据文件来说,读取消息内容时候会产生较多的随机访问读取,严重影响性能
利用了NIO中的FileChannel模型将磁盘上的物理文件直接映射到用户态的内存地址中(减少内存拷贝)
RocketMQ主要通过MappedByteBuffer对文件进行读写操作
将对文件的操作转化为直接对内存地址进行操作,从而极大地提高了文件的读写效率(正因为需要使用内存映射机制,故RocketMQ的文件存储都使用定长结构来存储,方便一次将整个文件映射至内存)
页面缓存与内存映射
在消息真正持久化至磁盘后RocketMQ的Broker端才会真正返回给Producer端一个成功的ACK响应
同步刷盘(性能损耗大)
能够充分利用OS的PageCache的优势,只要消息写入PageCache即可将成功的ACK返回给Producer端。消息刷盘采用后台异步线程提交的方式进行,降低了读写延迟,提高了MQ的性能和吞吐量。
异步刷盘(极端情况会丢失数据)
消息刷盘
存储架构
(1) Broker启动后需要完成一次将自己注册至NameServer的操作;随后每隔30s时间定时向NameServer上报Topic路由信息。
(2) 消息生产者Producer作为客户端发送消息时候,需要根据消息的Topic从本地缓存的TopicPublishInfoTable获取路由信息。如果没有则更新路由信息会从NameServer上重新拉取,同时Producer会默认每隔30s向NameServer拉取一次路由信息。
(3) 消息生产者Producer根据2)中获取的路由信息选择一个队列(MessageQueue)进行消息发送;Broker作为消息的接收者接收消息并落盘存储。
(4) 消息消费者Consumer根据2)中获取的路由信息,并再完成客户端的负载均衡后,选择其中的某一个或者某几个消息队列来拉取消息并进行消费。
RocketMQ消息队列自定义了通信协议并在Netty的基础之上扩展了通信模块
Reactor多线程设计
通讯机制
Consumer端订阅消息时再做消息过滤的
Consumer端在订阅消息时除了指定Topic还可以指定TAG,如果一个消息有多个TAG,可以用||分隔。其中,Consumer端会将这个订阅请求构建成一个 SubscriptionData,发送一个Pull消息的请求给Broker端。Broker端从RocketMQ的文件存储层—Store读取数据之前,会用这些数据先构建一个MessageFilter,然后传给Store。Store从 ConsumeQueue读取到一条记录后,会用它记录的消息tag hash值去做过滤,由于在服务端只是根据hashcode进行判断,无法精确对tag原始字符串进行过滤,故在消息消费端拉取到消息后,还需要对消息的原始tag字符串进行比对,如果不同,则丢弃该消息,不进行消息消费。
Tag过滤方式
和Tag过滤方式一样,只是在Store层的具体过滤过程不太一样,真正的 SQL expression 的构建和执行由rocketmq-filter模块负责的。每次过滤都去执行SQL表达式会影响效率,所以RocketMQ使用了BloomFilter避免了每次都去执行。SQL92的表达式上下文为消息的属性。
SQL92的过滤方式
2种的过滤方式
发送消息的时候,会先根据Topic找到指定的TopicPublishInfo,在获取了TopicPublishInfo路由信息后,RocketMQ的客户端在默认方式下selectOneMessageQueue()方法会从TopicPublishInfo中的messageQueueList中选择一个队列(MessageQueue)进行发送消息
有一个sendLatencyFaultEnable开关变量,如果开启,在随机递增取模的基础上,再过滤掉not available的Broker代理。所谓的\"latencyFaultTolerance\",是指对之前失败的,按一定的时间做退避。
容错策略均在MQFaultStrategy这个类中定义
默认轮询发送消息到consumer queue上
Producer的负载均衡
两种消费模式(Push/Pull)都是基于拉模式来获取消息的,而在Push模式只是对pull模式的一种封装,其本质实现为消息拉取线程在从服务器拉取到一批消息后,然后提交到消息消费线程池后,又“马不停蹄”的继续向服务器再次尝试拉取消息。如果未拉取到消息,则延迟一下又继续拉取
每个consumer平均分配consumer queue
Consumer的负载均衡
所有负载均衡都由客户端完成
集群模式下,queue都是只允许分配只一个实例,这是由于如果多个实例同时消费一个queue的消息,由于拉取哪些消息是consumer主动控制的,那样会导致同一个消息在不同的实例下被消费多次,所以算法上都是一个queue只分给一个consumer实例,一个consumer实例可以允许同时分到不同的queue
RocketMQ原理(3)——水平扩展及负载均衡详解
负载均衡
采用2PC模式实现事务,增加一个补偿逻辑来处理二阶段超时或失败的情况
1.提交“半消息”到队列,内部实现为将该消息放到一个特殊的topic下,使其cumsumer不可见;2.commit的时候直接修改message的topic索引即可;3.rfont color=\"#f15a23\
(1) 发送消息(half消息)。(2) 服务端响应消息写入结果。(3) 根据发送结果执行本地事务(如果写入失败,此时half消息对业务不可见,本地逻辑不执行)。(4) 根据本地事务状态执行Commit或者Rollback(Commit操作生成消息索引,消息对消费者可见)
消息发送和提交逻辑
(1) 对没有Commit/Rollback的事务消息(pending状态的消息),从服务端发起一次“回查”(2) Producer收到回查消息,检查回查消息对应的本地事务的状态(3) 根据本地事务状态,重新Commit或者Rollback
默认回查15次,如果15次回查还是无法得知事务状态,rocketmq默认回滚该消息
补偿流程(解决消息Commit或者Rollback发生超时或者失败的情况)
流程
事物消息
MessageId的长度总共有16字节,其中包含了消息存储主机地址(IP地址和端口),消息Commit Log offset。
Client端从MessageId中解析出Broker的地址(IP地址和端口)和Commit Log的偏移地址后封装成一个RPC请求后通过Remoting通信层发送请求;读取消息的过程用其中的 commitLog offset 和 size 去 commitLog 中找到真正的记录并解析成一个完整的消息返回
按照MessageId查询
基于RocketMQ的IndexFile索引文件来实现的
按照MessageKey查询
3.0 RocketMQ消息查询
消息查询
检查这个文件最后访问时间;判断是否大于过期时间指定时间删除;默认凌晨4点
4.6版本默认48小时后会删除不再使用的CommitLog文件
未被消费的消息不会存在超时删除这情况。
过期消息清理
架构设计
rocketMq
每秒几十万条消息,延迟最低只有几毫秒
高吞吐、低延迟
kafka集群支持热扩展
可扩展
消息被持久化到本地磁盘,并且支持数据备份防止丢失
持久性、可靠性
允许集群中节点失败
容错性
比rocketMq高
支持数千个客户端同时读写
高并发
至此以集合未单位进行发送消息,在此基础上,kafka还支持对消息集合进行压缩,减少传输的数据量,减少对网络传输的压力
消费端采用拉取的方式消费
Kafka只保证一个Partition内的消息的有序性。
顺序性
大数据领域的实时计算,日志采集的行业标准
消息可能丢失,但绝不会重复传输
At most once
消息绝对不会丢失,但可能会重复传输
At least one
每条消息肯定会被传输一次且仅传输一次
Exactly once
管理broker与consumer的动态加入与离开。
触发负载均衡
Kafka会为每一个Consumer Group保留一些metadata信息——当前消费的消息的position,也即offset。记录在zk中
维护消费关系及每个partition的消费信息
zookeeper协调控制
选举
当leader Partition分区宕掉之后,由Broker Controller负责在有用分区(ISR)中选择新的Leader
controller(Leader)
可以动态增加broker
没有副本机制,不保存订阅者状态,由订阅者自己保存
无状态
broker
Topic
topic中的数据分割为一个或多个partition
每个topic至少有一个partition
物理上一个partion对应一个文件夹,该文件夹存储所有消息和索引文件
基于时间删除
基于文件大小删除
提供两种策略删除旧数据
物理上每个partition中的数据使用多个segment文件存储
partition中的数据是有序的,不同partition间的数据丢失了数据的顺序;需要严格保证消息的消费顺序的场景下,需要将partition数目设为1
每个partition有多个副本,其中有且仅有一个作为Leader,Leader是当前负责数据的读写的partition。
所有客户端在某个目录下创建自己的临时顺序节点,只有序号最小的才获得锁
临时顺序节点
多个客户端创建一个节点,只有成功创建节点的客户端才能获得锁
节点名称唯一性
Kafka 0.8.*的Leader Election方案解决了上述问题,它在所有broker中选出一个controller,所有Partition的Leader选举都由controller决定。controller会将Leader的改变直接通过RPC的方式(比ZooKeeper Queue的方式更高效)通知需为此作为响应的Broker。同时controller也负责增删Topic以及Replica的重新分配。
ISR(in-sync replicas)
Leader Election算法(有两种方式实现基于ZooKeeper的分布式锁)
leader负责跟踪所有follower的状态
leader
Follower跟随Leader,所有写请求都通过Leader路由,数据变更会广播给所有Follower
当Follower与Leader挂掉、卡住或者同步太慢,leader会把这个follower从“in sync replicas”(ISR)列表中删除,重新创建一个Follower
从Leader中复制数据,不与producer/consumer交互
1.将所有Broker(假设共n个Broker)和待分配的Partition排序2.将第i个Partition分配到第(i mod n)个Broker上3.将第i个Partition的第j个Replica分配到第((i + j) mode n)个Broker上
在broker上的分配策略
Producer只将该消息发送到该Partition的Leader。Leader会将该消息写入其本地Log。每个Follower都从Leader pull数据。Follower在收到该消息并写入其Log后,向Leader发送ACK。一旦Leader收到了ISR中的所有Replica的ACK,该消息就被认为已经commit了,Leader将向Producer发送ACK
每个Follower在接收到数据后就立马向Leader发送ACK,而非等到数据写入Log中
为了提高性能
消息同步传输策略
一是它必须维护与ZooKeeper的session(这个通过ZooKeeper的Heartbeat机制来实现)
二是Follower必须能够及时将Leader的消息复制过来,不能“落后太多”
replication是否“活着”包含两个条件
follower
角色
Partition
表示需要所有partition成功,才返回成功
ack = -1
表示发送即代表发送成功,不等broker返回确认信息
ack = 0
producer写道partition leader成功后,broker就返回
ack = 1
表示有2个partition成功,就返回成功
ack = 2
可靠性投递partition ack
1、 指定了 partition,则直接使用;2、 未指定 partition 但指定 key,通过对 key 的 value 进行hash 选出一个 patition3、 partition 和 key 都未指定,使用轮询选出一个 patition。
producer路由
zookeeper中保存该Consumer在该Partition中读取消息的offset
手动commit
自动commit
消息确认方式
采用pull的消费模式,可以逐条消费也可以批量消费
每个Consumer属于一个特定的Consumer Group
可为每个Consumer指定group name,若不指定group name则属于默认的group
Consumer Group
消息有一个定长的header和边长的字节数组组成
推荐消息大小不要超过1MB
单个消息的大小无限制
message
kafka学习之路一
可能回永远起不起来
等待ISR中的任一个Replica“活”过来,并且选它作为Leader
将会丢失没有同步到的数据
选择第一个“活”过来的Replica(不一定是ISR中的)作为Leader
如果某个Partition的所有Replica都宕机了,就无法保证数据不丢失了。这种情况下有两种可行的方案
Kafka史上最详细原理总结上
kafka
异步处理、应用解耦、流量消峰
优点
需要依赖中间件服务(存在该服务崩溃的风险)
系统可用性减低
如果保证数据一致性(事务)
如何保证不被重复消费
系统复杂度提高
缺点
保证生产者-mqServer-消费者一对一关系
解决方案
并行度低
只要消费端出现问题,就会导致整个处理流程阻塞
方案一
通过合理的设计或者将问题分解来规避
不关注乱序的应用实际大量存在
从业务层面来保证消息的顺序而不仅仅是依赖于消息系统,是一种更合理的方式。
方案二
消息的顺序问题
网络不达
根本原因
利用一张日志表来记录已经处理成功的消息的 ID,如果新到的消息 ID 已经在日志表中,那么就不再处理这条消息。
消费者保证幂等性
解决办法
消息的重复问题
临时紧急扩容
可以给消息设置过期时间
消息挤压
常见问题
rabbitMq、kafka支持 AMQP协议,可以集成企业级消息总线,可以和spring bus集成
性能比activeMq高
erlang语言阻碍了二次开发,不过项目稳定,社区活跃
font color=\"#f15a23\
消费端,采用推送的方式消费
Broker: 简单来说就是消息队列服务器实体Exchange: 消息交换机,它指定消息按什么规则,路由到哪个队列Queue: 消息队列载体,每个消息都会被投入到一个或多个队列Binding: 绑定,它的作用就是把exchange和queue按照路由规则绑定起来 (最大长度255字节)Routing Key: 路由关键字,exchange根据这个关键字进行消息投递(最大长度255字节)VHost: vhost 可以理解为虚拟 broker ,即 mini-RabbitMQ server。其内部均含有独立的 queue、exchange 和 binding 等,但最最重要的是,其拥有独立的权限系统,可以做到 vhost 范围的用户控制。当然,从 RabbitMQ 的全局角度,vhost 可以作为不同权限隔离的手段(一个典型的例子就是不同的应用可以跑在不同的 vhost 中)。Producer: 消息生产者,就是投递消息的程序Consumer: 消息消费者,就是接受消息的程序Channel: 消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务
一个queue 一个 consumer
顺序性保证
可以设置消息的过期时间也就是TTL
Mq消息失效
消息挤压解决办法
TCP协议连接和销毁成本大,rabbitMq采用信道方式传输数据,信道是创建在TCP连接内的虚拟连接,且每条TCP连接上的信道连接没有限制
传输协议
direct
消息广播到绑定的队列,不处理路由键
fanout
通过使用“*”和“#”,使来自不同源头的消息到达同一个队列,”.”将路由键分为了几个标识符,“*”匹配1个,“#”匹配一个或多个
topic
header模式取消routingkey,使用header中的 key/value(键值对)匹配队列
headers
交换器类型
在第一次声明交换器时被指定,用来提供一种预先存在的交换器,如果主交换器无法路由消息,那么消息将被路由到这个新的备用交换器。
备用交换器就是普通的交换器,没有任何特殊的地方。
使用备用交换器,向往常一样,声明Queue和备用交换器,把Queue绑定到备用交换器上。然后在声明主交换器时,通过交换器的参数,font color=\"#00bcd4\
备用交换器
投递消息被拒绝后的一个可选行为,往往用在对问题消息的诊断上。
死信交换器仍然只是一个普通的交换器,创建时并没有特别要求和操作。在创建队列的时候,声明该交换器将用作保存被拒绝的消息即可,相关的参数是x-dead-letter-exchange。
消息被拒绝,并且设置 requeue 参数为 false
消息过期
队列达到最大长度
包含情况
死信交换器DLX
交换器
当最后一个消费者也断开连接时,队列将会被删除
属性auto-delete标识为true即可
自动删除队列
普通队列允许的消费者没有限制,多个消费者绑定到多个队列时,RabbitMQ会采用轮询进行投递。如果需要消费者独占队列,在队列创建的时候,设定属性exclusive为true。
单消费者队列
指队列在超过一定时间没使用,会被从RabbitMQ中被删除。
一定时间内没有Get操作发生没有Consumer连接在队列上特别的:就算一直有消息进入队列,也不算队列在被使用。通过声明队列时,设定x-expires参数即可,单位毫秒。
过期的定义
自动过期队列
临时队列
持久化队列会被保存在磁盘中,固定并持久的存储,当Rabbit服务重启后,该队列会保持原来的状态在RabbitMQ中被管理
队列的持久性
永久队列(普通队列)
为每个队列设置消息的超时时间。只要给队列设置x-message-ttl 参数,就设定了该队列所有消息的存活时间,时间单位是毫秒。如果声明队列时指定了死信交换器,则过期消息会成为死信消息。
队列级别消息过期
队列
当队列消息的TTL 和消息TTL都被设置,时间短的TTL设置生效。如果将一个过期消息发送给RabbitMQ,该消息不会路由到任何队列,而是直接丢弃。
RabbitMQ只对处于队头的消息判断是否过期(即延迟判断,不会扫描队列)
存活时间
将队列和交换器的durable属性设为true,缺省为false,还需要将消息在发布前,将投递模式设置为2。消息要持久化,必须要有持久化的队列、交换器和投递模式都为2。
持久化
消息
生产者不知道消费者是否真正到达rabbitMq服务器
1.不做任何配置
失败才通知生产者,成功则不通知
2.失败通知
主要是对信道的设置,分为:启动事务、提交事务、回滚事务;会伴随严重的性能问题
3.事务
消息不可路由时
消息可路由时
该模式比事务模式轻,性能消耗几乎不计
4.发送确认模式;
消息发送方式
如果没有消息会返回一个表示为空的回复
性能低,即使没有数据也要不断的盲循
属于一种轮询模型,发送一次get请求,获得一个消息
拉取Get
注册一个消费者后,RabbitMQ会在消息可用时,自动将消息进行推送给消费者
在确认消息被接收之前,消费者可以预先要求接收一定数量的消息,在处理完一定数量的消息后,批量进行确认。如果消费者应用程序在确认消息之前崩溃,则所有未确认的消息将被重新发送给其他消费者。
Qos预取模式(批量消费)
推送Consume
消息获取方式
在拒绝消息时,可以使用requeue标识,告诉RabbitMQ是否需要重新发送给别的消费者。不重新发送,一般这个消息就会被RabbitMQ丢弃。Reject一次只能拒绝一条消息。
Reject(单个拒绝)
对Reject的扩展,可以一次性拒绝多个消息
Nack(批量拒绝)
消息拒绝策略
消息到达队列的确认
发送方确认模式
接收方确认消息
接收方确认机制
消息必达性保证
生产者丢失
消息列表丢失
消费者丢失
数据丢失原因
它只会让RabbitMQ向你通知失败,而不会通知成功。如果消息正确路由到队列,则发布者不会受到任何通知。
发送消息时设置mandatory标志 : mandatory=true
缺点: 无法确保发布消息一定是成功的,因为通知失败的消息可能会丢失。
失败通知
transaction机制
所有在该信道上发布的消息都将会被指派一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后;rabbitMQ就会发送一个ACK给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了;如果rabbitMQ没能处理该消息,则会发送一个Nack消息给你,你可以进行重试操作。
channel.waitForConfirms()普通发送方确认模式;消息到达交换器,就会返回true。
channel.waitForConfirmsOrDie()批量确认模式;使用同步方式等所有的消息发送之后才会执行后面代码,只要有一个消息未到达交换器就会抛出IOException异常。
channel.addConfirmListener()异步监听发送方确认模式;
三种实现方式
confirm模式
transaction和confirm模式来确保生产者不丢消息
消息队列持久化
开启消息接收确认模式,处理消息成功后,手动回复确认消息
可靠性传输
一个生产者,一个消费者
点对点模式
Simple模式(最简单的收发模式)
一个生产者,多个消费者,每个消费者获取到的消息唯一(竞争消费者模式(默认循环发送给每个消费者))
Work工作模式(资源的竞争)
一个生产者发送的消息会被多个消费者获取(发布一次,消费多个)
Publish/Subscribe发布订阅模式(共享资源)
发送消息到交换机并且要指定路由key ,消费者将队列绑定到交换机时需要指定路由key
Routing路由模式
将路由键和某模式进行匹配,此时队列需要绑定在一个模式上,“#”匹配一个词或多个词,“*”只匹配一个词
Topic主题模式(路由模式的一种)
rpc
官网
工作模式
订阅模式:是分发到所有绑定到交换机的队列,路由模式:只分发到绑定在交换机上面指定路由键的队列
主节点提供读写;从节点不提供服务,只备份数据,当主节点不可用时,完成主从切换
并发与数据量都不高的情况
适用
主备模式(Warren)
多台机器上启动多个 RabbitMQ 实例,每个机器启动一个。创建的 queue,只会放在一个 RabbitMQ 实例上,只是每个实例都同步 queue 的元数据(元数据可以认为是 queue 的一些配置信息,通过元数据,可以找到 queue 所在实例)。消费的时候,实际上如果连接到了另外一个实例,那个实例会从 queue 所在实例上拉取数据过来。这方案主要是提高吞吐量的,让集群中多个节点来服务某个 queue 的读写操作
普通集群
保证数据100%不丢失
用 KeepAlived 做了 HA-Proxy 的高可用,建议有 3 个及以上节点的 MQ 服务,消息发送到主节点上,主节点通过 mirror 队列把数据同步到其他的 MQ 节点,这样来实现其高可靠。
镜像模式(mirro)
简称 shovel 模式,所谓的 shovel 就是把消息进行不同数据中心的复制工作,可以跨地域的让两个 MQ 集群互联,远距离通信和复制
双活的一种模式
当我们的消息到达 exchange,它会判断当前的负载情况以及设定的阈值,如果负载不高就把消息放到我们正常的 warehouse_goleta 队列中,如果负载过高了,就会放到 backup_orders 队列中。backup_orders 队列通过 shovel 插件与另外的 MQ 集群进行同步数据,把消息发到第二个 MQ 集群上。
描述
远程模式(shovel)
rabbitMQ 部署架构采用双中心模式(多中心),那么在两套(或多套)数据中心各部署一套 rabbitMQ 集群,各中心的rabbitMQ 服务除了需要为业务提供正常的消息服务外,中心之间还需要实现部分队列消息共享。
federation 插件可以在 brokers 或者 cluster 之间传输消息,连接的双方可以使用不同的 users 和 virtual hosts,双方也可以使用不同版本的 rabbitMQ 和 erlang。federation 插件使用 AMQP 协议通信,可以接受不连续的传输。federation 不是建立在集群上的,而是建立在单个节点上的
federation 插件
多活模式
多中心模式
RabbitMQ 的4种集群架构
集群
消息中间件MQ与RabbitMQ面试题(2020最新版)
rabbitMq
比kafka高
zeroMq
消息中间件(MQ)
0 条评论
回复 删除
下一页