消息队列
2024-03-18 12:38:26 0 举报
AI智能生成
消息队列
作者其他创作
大纲/内容
定义
一个存放消息的容器,当我们需要使用消息的时候,直接从容器中取出消息供自己使用即可。
由于队列 Queue 是一种先进先出的数据结构,所以消费消息时也是按照顺序来消费的。
参与消息传递的双方称为 生产者 和 消费者 ,生产者负责发送消息,消费者负责处理消息。
中间件
中间件(Middleware),又译中间件、中介层,一类为应用软件服务的软件,应用软件是为用户服务的,用户不会接触或者使用到中间件。
作用
通过异步处理提高系统性能(减少响应所需时间)
削峰/限流
先将短时间高并发产生的事务消息存储在消息队列中,然后后端服务再慢慢根据自己的能力去消费这些消息,这样就避免直接把后端服务打垮掉。
降低系统耦合性
对新增业务,只要对该类消息感兴趣,即可订阅该消息,对原有系统和业务没有任何影响,从而实现网站业务的可扩展性设计。
实现分布式事务
RocketMQ、 Kafka、Pulsar、QMQ 都提供了事务相关的功能。事务允许事件流应用将消费,处理,生产消息整个过程定义为一个原子操作。
问题
系统可用性降低: 系统可用性在某种程度上降低。
系统复杂性提高: 加入 MQ 之后,你需要保证消息没有被重复消费、处理消息丢失的情况、保证消息传递的顺序性等等问题!
一致性问题: 消息的真正消费者并没有正确消费消息,这样就会导致数据不一致的情况。
JMS
定义
JMS(JAVA Message Service)Java 的消息服务,JMS 的客户端之间可以通过 JMS 服务进行异步的消息传输。
JMS API 是一个消息服务的标准或者说是规范,允许应用程序组件基于 JavaEE 平台创建、发送、接收和读取消息。
JMS API 使分布式通信耦合度更低,消息服务更加可靠以及异步性。
ActiveMQ(已被淘汰) 就是基于 JMS 规范实现的。
消息格式
StreamMessage:Java 原始值的数据流。
MapMessage:一套名称-值对。
TextMessage:一个字符串对象。
ObjectMessage:一个序列化的 Java 对象。
BytesMessage:一个字节的数据流。
消息模型
点到点(P2P)模型
使用队列(Queue)作为消息通信载体,满足生产者与消费者模式,一条消息只能被一个消费者使用,未被消费的消息在队列中保留直到被消费或超时。
发布/订阅(Pub/Sub)模型
使用主题(Topic)作为消息通信载体,类似于广播模式;发布者发布一条消息,该消息通过主题传递给所有的订阅者。
AMQP
定义
AMQP(Advanced Message Queuing Protocol)高级消息队列协议,应用层协议的一个开放标准,为面向消息的中间件设计,兼容 JMS。
基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件同产品,不同的开发语言等条件的限制。
RabbitMQ 就是基于 AMQP 协议实现的。
JMS 和 AMQP
AMQP 为消息定义了线路层(wire-level protocol)的协议,而 JMS 所定义的是 API 规范。
在 Java 体系中,多个 client 均可以通过 JMS 进行交互,不需要应用修改代码,但是其对跨平台的支持较差。而 AMQP 天然具有跨平台、跨语言特性。
在 Java 体系中,多个 client 均可以通过 JMS 进行交互,不需要应用修改代码,但是其对跨平台的支持较差。而 AMQP 天然具有跨平台、跨语言特性。
JMS 支持 TextMessage、MapMessage 等复杂的消息类型;而 AMQP 仅支持 byte[] 消息类型(复杂的类型可序列化后发送)。
由于 Exchange 提供的路由算法,AMQP 可以提供多样化的路由方式来传递消息到消息队列,而 JMS 仅支持 队列 和 主题/订阅 方式两种。
RPC 和 消息队列
从用途来看:RPC 主要用来解决两个服务的远程通信问题,不需要了解底层网络的通信机制。
通过 RPC 可以帮助我们调用远程计算机上某个服务的方法,这个过程就像调用本地方法一样简单。
消息队列主要用来降低系统耦合性、实现任务异步、有效地进行流量削峰。
通过 RPC 可以帮助我们调用远程计算机上某个服务的方法,这个过程就像调用本地方法一样简单。
消息队列主要用来降低系统耦合性、实现任务异步、有效地进行流量削峰。
从通信方式来看:RPC 是双向直接网络通讯,消息队列是单向引入中间载体的网络通讯。
从架构上来看:消息队列需要把消息存储起来,RPC 则没有这个要求,因为前面也说了 RPC 是双向直接网络通讯。
从请求处理的时效性来看:通过 RPC 发出的调用一般会立即被处理,存放在消息队列中的消息并不一定会立即被处理。
RPC 和 消息队列 本质上是网络通讯的两种不同的实现机制,两者的用途不同,万不可将两者混为一谈。
消息模型
队列模型
使用队列(Queue)作为消息通信载体,满足生产者与消费者模式,一条消息只能被一个消费者使用,未被消费的消息在队列中保留直到被消费或超时。
缺点
将生产者产生的消息分发给多个消费者,不好解决。
发布-订阅模型
发布订阅模型/主题模型(Pub-Sub) 使用主题(Topic) 作为消息通信载体,类似于广播模式。
消息的生产者称为 发布者(Publisher) ,消息的消费者称为 订阅者(Subscriber) ,存放消息的容器称为 主题(Topic) 。
发布者发布一条消息,该消息通过主题传递给所有的订阅者,在一条消息广播之后才订阅的用户则是收不到该条消息的。
发布-订阅模型主要是为了解决队列模型存在的问题。
在发布 - 订阅模型中,如果只有一个订阅者,那它和队列模型就基本是一样的了。所以,发布 - 订阅模型在功能层面上是可以兼容队列模型的。
Kafka、RocketMQ、RabbitMQ 采用的都是发布 - 订阅模型。但底层设计不一样,如 Kafka 的 分区 ,RocketMQ 的 队列 ,RabbitMQ 的 Exchange 。
技术选型
Kafka
定义
Kafka 是 LinkedIn 开源的一个分布式流式处理平台,Apache 顶级项目,早期用于处理海量的日志,后面才慢慢发展成一款功能全面的高性能消息队列。
Kafka 是一个分布式系统,由通过高性能 TCP 网络协议进行通信的服务器和客户端组成,可以部署在在本地和云环境中的裸机硬件、虚拟机和容器上。
在 Kafka 2.8 之前,Kafka 最被大家诟病的就是其重度依赖于 Zookeeper 做元数据管理和集群的高可用。
在 Kafka 2.8 之后,引入了基于 Raft 协议的 KRaft 模式,不再依赖 Zookeeper,大大简化了 Kafka 的架构,让你可以以一种轻量级的方式来使用 Kafka。
在 Kafka 2.8 之后,引入了基于 Raft 协议的 KRaft 模式,不再依赖 Zookeeper,大大简化了 Kafka 的架构,让你可以以一种轻量级的方式来使用 Kafka。
如果要使用 KRaft 模式,建议选择较高版本的 Kafka(Kafka 3.3.1 及以上),低版本此功能还不完善。
关键功能
消息队列:发布和订阅消息流,这个功能类似于消息队列,这也是 Kafka 也被归类为消息队列的原因。
容错的持久方式存储记录消息流:Kafka 会把消息持久化到磁盘,有效避免了消息丢失的风险。
流式处理平台: 在消息发布的时候进行处理,Kafka 提供了一个完整的流式处理类库。
核心概念
Producer(生产者) : 产生消息的一方。
Consumer(消费者) : 消费消息的一方。
Broker(代理) : 可以看作是一个独立的 Kafka 实例。多个 Kafka Broker 组成一个 Kafka Cluster。
Topic(主题) : Producer 将消息发送到特定的主题,Consumer 通过订阅特定的 Topic(主题) 来消费消息。
Partition(分区) : Partition 属于 Topic 的一部分。一个 Topic 可以有多个 Partition ,
并且同一 Topic 下的 Partition 可以分布在不同的 Broker 上,这也就表明一个 Topic 可以横跨多个 Broker 。
并且同一 Topic 下的 Partition 可以分布在不同的 Broker 上,这也就表明一个 Topic 可以横跨多个 Broker 。
Kafka 中的 Partition(分区) 实际上可以对应成为消息队列中的队列。
多副本机制
Kafka 为分区(Partition)引入了多副本(Replica)机制。
分区(Partition)中的多个副本之间会有一个叫做 leader 的家伙,其他副本称为 follower。
发送的消息会被发送到 leader 副本,然后 follower 副本才能从 leader 副本中拉取消息进行同步。
分区(Partition)中的多个副本之间会有一个叫做 leader 的家伙,其他副本称为 follower。
发送的消息会被发送到 leader 副本,然后 follower 副本才能从 leader 副本中拉取消息进行同步。
生产者和消费者只与 leader 副本交互。可以理解为其他副本只是 leader 副本的拷贝,它们的存在只是为了保证消息存储的安全性。
当 leader 副本发生故障时会从 follower 中选举出一个 leader,但是 follower 中如果有和 leader 同步程度达不到要求的参加不了 leader 的竞选。
当 leader 副本发生故障时会从 follower 中选举出一个 leader,但是 follower 中如果有和 leader 同步程度达不到要求的参加不了 leader 的竞选。
优点
Kafka 通过给特定 Topic 指定多个 Partition,而各个 Partition 可以分布在不同的 Broker 上,这样便能提供比较好的并发能力(负载均衡)。
Partition 可以指定对应的 Replica 数,这也极大地提高了消息存储的安全性,提高了容灾能力,不过也相应的增加了所需要的存储空间。
Zookeeper 和 Kafka
ZooKeeper 主要为 Kafka 提供元数据的管理的功能。
Broker 注册:在 Zookeeper 上会有一个专门用来进行 Broker 服务器列表记录的节点。每个 Broker 在启动时,都会到 Zookeeper 上进行注册,
即到 /brokers/ids 下创建属于自己的节点。每个 Broker 就会将自己的 IP 地址和端口等信息记录到该节点中去。
即到 /brokers/ids 下创建属于自己的节点。每个 Broker 就会将自己的 IP 地址和端口等信息记录到该节点中去。
Topic 注册:同一个Topic 的消息会被分成多个分区并将其分布在多个 Broker 上,这些分区信息及与 Broker 的对应关系也都是由 Zookeeper 在维护。
比如创建一个名为 my-topic 的主题并且它有两个分区,zookeeper 创建文件夹:/brokers/topics/my-topic/Partitions/0、/brokers/topics/my-topic/Partitions/1。
比如创建一个名为 my-topic 的主题并且它有两个分区,zookeeper 创建文件夹:/brokers/topics/my-topic/Partitions/0、/brokers/topics/my-topic/Partitions/1。
负载均衡:Kafka 通过给特定 Topic 指定多个 Partition,而各个 Partition 可以分布在不同的 Broker 上,这样便能提供比较好的并发能力。
对于同一个 Topic 的不同 Partition,Kafka 会尽力将这些 Partition 分布到不同的 Broker 服务器上。
当生产者产生消息后也会尽量投递到不同 Broker 的 Partition 里面。
当 Consumer 消费的时候,Zookeeper 可以根据当前的 Partition 数量以及 Consumer 数量来实现动态负载均衡。
对于同一个 Topic 的不同 Partition,Kafka 会尽力将这些 Partition 分布到不同的 Broker 服务器上。
当生产者产生消息后也会尽量投递到不同 Broker 的 Partition 里面。
当 Consumer 消费的时候,Zookeeper 可以根据当前的 Partition 数量以及 Consumer 数量来实现动态负载均衡。
消息模型
发布 - 订阅模型。
底层:分区。
消息安全
消费顺序
原理
每次添加消息到 Partition(分区) 的时候都会采用尾加法。 Kafka 只能为我们保证 Partition(分区) 中的消息有序。
消息在被追加到 Partition(分区) 的时候都会分配一个特定的偏移量(offset)。Kafka 通过偏移量(offset)来保证消息在分区内的顺序性。
方法
1 个 Topic 只对应一个 Partition。
(推荐)发送消息的时候指定 key/Partition。
消息不丢失
生产者丢失消息
生产者(Producer) 调用 send 方法(异步)发送消息之后,消息可能因为网络问题并没有发送过去。
方法
异步调用 send 后,通过 get() 方法获取调用结果,但会变为同步操作,不推荐。
异步调用 send 后,添加回调函数形式获取结果。若失败,检查原因后重新发送即可。推荐设定一个 retries(重试次数)。
消费者丢失消息
当消费者刚拿到这个消息准备进行真正消费的时候,突然挂掉了,消息实际上并没有被消费,但是 offset 却被自动提交了。
方法
手动关闭自动提交 offset,每次在真正消费完消息之后再自己手动提交 offset 。
消息理论上会被消费两次。
Kafka 弄丢了消息
假如 leader 副本所在的 broker 突然挂掉,那么就要从 follower 副本重新选出一个 leader ,
但是 leader 的数据还有一些没有被 follower 副本的同步的话,就会造成消息丢失。
但是 leader 的数据还有一些没有被 follower 副本的同步的话,就会造成消息丢失。
方法
设置 acks = all。
acks 是 Kafka 生产者(Producer) 很重要的一个参数,默认为 1。
当 acks = 1 时,代表我们的消息被 leader 副本接收之后就算被成功发送。
当 acks = all 时,表示只有所有 ISR 列表的副本全部收到消息时,生产者才会接收到来自服务器的响应。最高级别且最安全的,但延迟会很高。
acks 是 Kafka 生产者(Producer) 很重要的一个参数,默认为 1。
当 acks = 1 时,代表我们的消息被 leader 副本接收之后就算被成功发送。
当 acks = all 时,表示只有所有 ISR 列表的副本全部收到消息时,生产者才会接收到来自服务器的响应。最高级别且最安全的,但延迟会很高。
设置 replication.factor >= 3。
为了保证 leader 副本能有 follower 副本能同步消息,一般会为 topic 设置 replication.factor >= 3。
这样就可以保证每个 分区(partition) 至少有 3 个副本。虽然造成了数据冗余,但是带来了数据的安全性。
为了保证 leader 副本能有 follower 副本能同步消息,一般会为 topic 设置 replication.factor >= 3。
这样就可以保证每个 分区(partition) 至少有 3 个副本。虽然造成了数据冗余,但是带来了数据的安全性。
设置 min.insync.replicas > 1。
一般情况下我们还需要设置 min.insync.replicas > 1 ,这样配置代表消息至少要被写入到 2 个副本才算是被成功发送。
min.insync.replicas 默认值为 1 ,在实际生产中应尽量避免默认值 1。
为保证整个 Kafka 服务的高可用性,需确保 replication.factor > min.insync.replicas 。一般推荐设置成 replication.factor = min.insync.replicas + 1。
一般情况下我们还需要设置 min.insync.replicas > 1 ,这样配置代表消息至少要被写入到 2 个副本才算是被成功发送。
min.insync.replicas 默认值为 1 ,在实际生产中应尽量避免默认值 1。
为保证整个 Kafka 服务的高可用性,需确保 replication.factor > min.insync.replicas 。一般推荐设置成 replication.factor = min.insync.replicas + 1。
设置 unclean.leader.election.enable = false。
Kafka 0.11.0.0 版本开始 unclean.leader.election.enable 参数的默认值由原来的 true 改为 false
当 leader 副本发生故障时就不会从 follower 副本中和 leader 同步程度达不到要求的副本中选择出 leader ,这样降低了消息丢失的可能性。
Kafka 0.11.0.0 版本开始 unclean.leader.election.enable 参数的默认值由原来的 true 改为 false
当 leader 副本发生故障时就不会从 follower 副本中和 leader 同步程度达不到要求的副本中选择出 leader ,这样降低了消息丢失的可能性。
消息不重复消费
原因
服务端侧已经消费的数据没有成功提交 offset(根本原因)。
Kafka 侧 由于服务端处理业务时间长或者网络链接等等原因让 Kafka 认为服务假死,触发了分区 rebalance。
方法
消费消息服务做幂等校验,比如 Redis 的 set、MySQL 的主键等天然的幂等功能。这种方法最有效。
将 enable.auto.commit 参数设置为 false,关闭自动提交,开发者在代码中手动提交 offset。
手动提交 offset 时机
处理完消息再提交:依旧有消息重复消费的风险,和自动提交一样。
拉取到消息即提交:会有消息丢失的风险。允许消息延时的场景,一般会采用这种方式。然后,通过定时任务在业务不繁忙(比如凌晨)的时候做数据兜底。
重试机制
消费失败
在默认配置下,当消费异常会进行重试,重试多次后会跳过当前消息,继续进行后续消息的消费,不会一直卡在当前消息。
重试规则
默认配置下会进行最多 10 次 的重试,每次重试的时间间隔为 0,即立即进行重试。
如果在 10 次重试后仍然无法成功消费消息,则不再进行重试,消息将被视为消费失败。
如果在 10 次重试后仍然无法成功消费消息,则不再进行重试,消息将被视为消费失败。
自定义重试规则
只需要在 DefaultErrorHandler 初始化的时候传入自定义的 FixedBackOff 即可。
重新实现一个 KafkaListenerContainerFactory ,调用 setCommonErrorHandler 设置新的自定义的错误处理器就可以实现。
重新实现一个 KafkaListenerContainerFactory ,调用 setCommonErrorHandler 设置新的自定义的错误处理器就可以实现。
重试失败后告警
自定义重试失败后逻辑,需要手动实现,重写 DefaultErrorHandler 的 handleRemaining 函数,加上自定义的告警等操作。
DefaultErrorHandler 只是默认的一个错误处理器,Spring Kafka 还提供了 CommonErrorHandler 接口。
手动实现 CommonErrorHandler 就可以实现更多的自定义操作,有很高的灵活性。例如根据不同的错误类型,实现不同的重试逻辑以及业务逻辑等。
手动实现 CommonErrorHandler 就可以实现更多的自定义操作,有很高的灵活性。例如根据不同的错误类型,实现不同的重试逻辑以及业务逻辑等。
重试失败后再处理
死信队列(Dead Letter Queue,简称 DLQ) 是消息中间件中的一种特殊队列。
它主要用于处理无法被消费者正确处理的消息,通常是因为消息格式错误、处理失败、消费超时等情况导致的消息被"丢弃"或"死亡"的情况。
它主要用于处理无法被消费者正确处理的消息,通常是因为消息格式错误、处理失败、消费超时等情况导致的消息被"丢弃"或"死亡"的情况。
@RetryableTopic 是 Spring Kafka 中的一个注解,它用于配置某个 Topic 支持消息重试,更推荐使用这个注解来完成重试。
当达到最大重试次数后,如果仍然无法成功处理消息,消息会被发送到对应的死信队列中。
对于死信队列的处理,既可以用 @DltHandler 处理,也可以使用 @KafkaListener 重新消费。
对于死信队列的处理,既可以用 @DltHandler 处理,也可以使用 @KafkaListener 重新消费。
应用场景
消息队列:建立实时流数据管道,以可靠地在系统或应用程序之间获取数据。
数据处理: 构建实时的流数据处理程序来转换或处理数据流。
优势
极致的性能:基于 Scala 和 Java 语言开发,设计中大量使用了批量处理和异步的思想,最高可以每秒处理千万级别的消息。
生态系统兼容性无可匹敌:Kafka 与周边生态系统的兼容性是最好的没有之一,尤其在大数据和流计算领域。
RocketMQ
定义
RocketMQ 是一个 队列模型 的消息中间件,具有高性能、高可靠、高实时、分布式的特点。采用 Java 语言开发的分布式的消息系统。
阿里开源的一款云原生“消息、事件、流”实时数据处理平台,借鉴了 Kafka,已经成为 Apache 顶级项目。
核心特性
云原生:生与云,长与云,无限弹性扩缩,K8s 友好。
高吞吐:万亿级吞吐保证,同时满足微服务与大数据场景。
流处理:提供轻量、高扩展、高性能和丰富功能的流计算引擎。
金融级:金融级的稳定性,广泛用于交易核心链路。
架构极简:零外部依赖,Shared-nothing 架构。
生态友好:无缝对接微服务、实时计算、数据湖等周边生态。
消息模型
发布 - 订阅模型。
底层:队列。
RocketMQ 通过使用在一个 Topic 中配置多个队列并且每个队列维护每个消费者组的消费位置 实现了 主题模式/发布订阅模式 。
角色
Producer Group 生产者组:代表某一类的生产者,比如有多个秒杀系统作为生产者,合在一起就是一个 Producer Group 生产者组,一般生产相同的消息。
Consumer Group 消费者组:代表某一类的消费者,比如有多个短信系统作为消费者,合在一起就是一个 Consumer Group 消费者组,一般消费相同的消息。
Topic 主题:代表一类消息,比如订单消息,物流消息等等。
角色
Broker:主要负责消息的存储、投递和查询以及服务高可用保证。就是消息队列服务器,生产者生产消息到 Broker ,消费者从 Broker 拉取消息并消费。
一个 Topic 分布在多个 Broker上,一个 Broker 可以配置多个 Topic ,它们是多对多的关系。
一个 Topic 分布在多个 Broker上,一个 Broker 可以配置多个 Topic ,它们是多对多的关系。
NameServer:注册中心 ,主要提供 Broker 管理 和 路由信息管理。就是 Broker 注册后,消费者和生产者从中获取信息来与 Broker 通信。
Producer:消息发布的角色,支持分布式集群方式部署。就是生产者。
Consumer:消息消费的角色,支持分布式集群方式部署。
支持以 push 推,pull 拉 两种模式对消息进行消费。同时也支持集群方式和广播方式的消费,它提供实时消息订阅机制。说白了就是消费者。
支持以 push 推,pull 拉 两种模式对消息进行消费。同时也支持集群方式和广播方式的消费,它提供实时消息订阅机制。说白了就是消费者。
消息类型
普通消息
一般应用于微服务解耦、事件驱动、数据集成等场景,这些大多数要求数据传输通道具有可靠传输的能力,且对消息的处理时机、处理顺序没有特别要求。
生命周期
初始化:消息被生产者构建并完成初始化,待发送到服务端的状态。
待消费:消息被发送到服务端,对消费者可见,等待消费者消费的状态。
消费中:消息被消费者获取,并按照消费者本地的业务逻辑进行处理的过程。
消费提交:消费者完成消费处理,并向服务端提交消费结果,服务端标记当前消息已经被处理(包括消费成功和失败)。
消息删除:RocketMQ 按照消息保存机制滚动清理最早的消息数据,将消息从物理文件中删除。
定时消息
可以简化定时调度任务的开发逻辑,实现高性能、可扩展、高可靠的定时触发能力。
优势
精度高、开发门槛低:基于消息通知方式不存在定时阶梯间隔。可以轻松实现任意精度事件触发,无需业务去重。
高性能可扩展:传统的数据库扫描方式较为复杂,需要频繁调用接口扫描,容易产生性能瓶颈。RocketMQ 的定时消息具有高并发和水平扩展的能力。
生命周期
初始化:消息被生产者构建并完成初始化,待发送到服务端的状态。
定时中:消息被发送到服务端,和普通消息不同的是,服务端不会直接构建消息索引,而是会将定时消息单独存储在定时存储系统中,等待定时时刻到达。
待消费:定时时刻到达后,服务端将消息重新写入普通存储引擎,对下游消费者可见,等待消费者消费的状态。
消费中:消息被消费者获取,并按照消费者本地的业务逻辑进行处理的过程。
消费提交:消费者完成消费处理,并向服务端提交消费结果,服务端标记当前消息已经被处理(包括消费成功和失败)。
消息删除:Apache RocketMQ 按照消息保存机制滚动清理最早的消息数据,将消息从物理文件中删除。
顺序消息
仅支持使用 MessageType 为 FIFO 的主题,即顺序消息只能发送至类型为顺序消息的主题中,发送的消息的类型必须和主题的类型一致。
和普通消息发送相比,顺序消息发送必须要设置消息组。(推荐实现 MessageQueueSelector 的方式)。要保证消息的顺序性需要单一生产者串行发送。
单线程使用 MessageListenerConcurrently 可以顺序消费,多线程环境下使用 MessageListenerOrderly 才能顺序消费。
事务消息
高级消息类型,支持在分布式场景下保障消息生产和本地事务的最终一致性。
发送消息
不建议单一进程创建大量生产者
Apache RocketMQ 的生产者和主题是多对多的关系,支持同一个生产者向多个主题发送消息。
对于生产者的创建和初始化,建议遵循够用即可、最大化复用原则,如果有需要发送消息到多个主题的场景,无需为每个主题都创建一个生产者。
不建议频繁创建和销毁生产者
Apache RocketMQ 的生产者是可以重复利用的底层资源,类似数据库的连接池。
不需要在每次发送消息时动态创建生产者,且在发送结束后销毁生产者。这样频繁的创建销毁会在服务端产生大量短连接请求,严重影响系统性能。
消费者分类
PushConsumer
高度封装的消费者类型,消费消息仅仅通过消费监听器监听并返回结果。消息的获取、消费状态提交以及消费重试都通过 RocketMQ 的客户端 SDK 完成。
监听执行结果
返回消费成功:以 Java SDK 为例,返回ConsumeResult.SUCCESS,表示该消息处理成功,服务端按照消费结果更新消费进度。
返回消费失败:以 Java SDK 为例,返回ConsumeResult.FAILURE,表示该消息处理失败,需要根据消费重试逻辑判断是否进行重试消费。
出现非预期失败:例如抛异常等行为,该结果按照消费失败处理,需要根据消费重试逻辑判断是否进行重试消费。
错误方式
消息还未处理完成,就提前返回消费成功结果。此时如果消息消费失败,RocketMQ 服务端是无法感知的,因此不会进行消费重试。
在消费监听器内将消息再次分发到自定义的其他线程,消费监听器提前返回消费结果。
此时如果消息消费失败,RocketMQ 服务端同样无法感知,因此也不会进行消费重试。
此时如果消息消费失败,RocketMQ 服务端同样无法感知,因此也不会进行消费重试。
PushConsumer 严格限制了消息同步处理及每条消息的处理超时时间,适用于以下场景:
消息处理时间可预估:如果不确定消息处理耗时,经常有预期之外的耗时的消息,PushConsumer 的可靠性保证会频繁触发消息重试机制造成大量重复消息。
无异步化、高级定制场景:PushConsumer 限制了消费逻辑的线程模型,由客户端 SDK 内部按最大吞吐量触发消息处理。该模型开发逻辑简单,但是不允许使用异步化和自定义处理流程。
消息处理时间可预估:如果不确定消息处理耗时,经常有预期之外的耗时的消息,PushConsumer 的可靠性保证会频繁触发消息重试机制造成大量重复消息。
无异步化、高级定制场景:PushConsumer 限制了消费逻辑的线程模型,由客户端 SDK 内部按最大吞吐量触发消息处理。该模型开发逻辑简单,但是不允许使用异步化和自定义处理流程。
SimpleConsumer
一种接口原子型的消费者类型,消息的获取、消费状态提交以及消费重试都是通过消费者业务逻辑主动发起调用完成。
应用
消息处理时长不可控:如果消息处理时长无法预估,经常有长时间耗时的消息处理情况。
建议使用 SimpleConsumer 消费类型,可以在消费时自定义消息的预估处理时长,若实际业务中预估的消息处理时长不符合预期,也可以通过接口提前修改。
建议使用 SimpleConsumer 消费类型,可以在消费时自定义消息的预估处理时长,若实际业务中预估的消息处理时长不符合预期,也可以通过接口提前修改。
需要异步化、批量消费等高级定制场景:SimpleConsumer 在 SDK 内部没有复杂的线程封装,完全由业务逻辑自由定制,可以实现异步分发、批量消费等高级定制场景。
需要自定义消费速率:SimpleConsumer 是由业务逻辑主动调用接口获取消息,因此可以自由调整获取消息的频率,自定义控制消费速率。
PullConsumer
主动调用Consumer的拉消息方法从Broker服务器拉消息、主动权由应用控制。
一旦获取了批量消息,应用就会启动消费过程。Pull指的是客户端主动向服务端请求,拉取数据。
一旦获取了批量消息,应用就会启动消费过程。Pull指的是客户端主动向服务端请求,拉取数据。
分组
生产者分组
RocketMQ 服务端 5.x 版本开始,生产者是匿名的,无需管理生产者分组(ProducerGroup);
对于历史版本服务端 3.x 和 4.x 版本,已经使用的生产者分组可以废弃无需再设置,且不会对当前业务产生影响。
对于历史版本服务端 3.x 和 4.x 版本,已经使用的生产者分组可以废弃无需再设置,且不会对当前业务产生影响。
消费者分组
多个消费行为一致的消费者的负载均衡分组。消费者分组不是具体实体而是一个逻辑资源。通过消费者分组实现消费性能的水平扩展以及高可用容灾。
策略
消费者分组中的订阅关系、投递顺序性、消费重试策略是一致的。
订阅关系:Apache RocketMQ 以消费者分组的粒度管理订阅关系,实现订阅关系的管理和追溯。
投递顺序性:Apache RocketMQ 的服务端将消息投递给消费者消费时,支持顺序投递和并发投递,投递方式在消费者分组中统一配置。
消费重试策略: 消费者消费消息失败时的重试策略,包括重试次数、死信队列设置等。
RocketMQ 服务端 5.x 版本:消费者的消费行为从关联的消费者分组中统一获取,因此,同一分组内所有消费者的消费行为必然是一致的,客户端无需关注。
RocketMQ 服务端 3.x/4.x 历史版本:消费逻辑由消费者客户端接口定义,因此,需要在消费者客户端设置时保证同一分组下的消费者的消费行为一致。
RocketMQ 服务端 3.x/4.x 历史版本:消费逻辑由消费者客户端接口定义,因此,需要在消费者客户端设置时保证同一分组下的消费者的消费行为一致。
消费安全
顺序消费
RocketMQ 在主题上是无序的、它只有在队列层面才是保证有序的。
普通顺序
消费者通过同一个消费队列收到的消息是有顺序的 ,不同消息队列收到的消息则可能是无顺序的。
普通顺序消息在 Broker 重启情况下不会保证消息顺序性 (短暂时间) 。
严格顺序
消费者收到的 所有消息 均是有顺序的。严格顺序消息 即使在异常情况下也会保证消息的顺序性 。
如果你使用严格顺序模式,Broker 集群中只要有一台机器不可用,则整个集群都不可用。
MQ 都是能容忍短暂的乱序,所以推荐使用普通顺序模式。
可以使用 Hash 取模法 来保证将同一语义下的消息放入同一个队列。
队列选择算法
轮询算法
轮询算法就是向消息指定的 topic 所在队列中依次发送消息,保证消息均匀分布
RocketMQ 默认队列选择算法。
最小投递延迟算法
每次消息投递的时候统计消息投递的延迟,选择队列时优先选择消息延时小的队列,导致消息分布不均匀。
解决消息分布不均匀
按照 producer.setSendLatencyFaultEnable(true); 设置即可。
特殊情况处理
发送异常
选择队列后会与 Broker 建立连接,通过网络请求将消息发送到 Broker 上,如果 Broker 挂了或者网络波动发送消息超时此时 RocketMQ 会进行重试。
重新选择其他 Broker 中的消息队列进行发送,默认重试两次,可以手动设置。
producer.setRetryTimesWhenSendFailed(5);
消息过大
消息超过 4k 时 RocketMQ 会将消息压缩后在发送到 Broker 上,减少网络资源的占用。
重复消费
幂等
其任意多次执行所产生的影响均与一次执行的影响相同。
实现
这个还是需要结合具体的业务的。
可以使用 写入 Redis 来保证,因为 Redis 的 key 和 value 就是天然支持幂等的。
使用 数据库插入法 ,基于数据库的唯一键来保证重复数据不会被插入多条。
分布式事务
在 RocketMQ 中使用的是 事务消息加上事务反查机制 来解决分布式事务问题的。
消息队列中的分布式事务是本地事务和存储消息到消息队列才是同一个事务。
这样也就产生了事务的最终一致性,因为整个过程是异步的,每个系统只要保证它自己那一部分的事务就行了。
这样也就产生了事务的最终一致性,因为整个过程是异步的,每个系统只要保证它自己那一部分的事务就行了。
消息堆积
当流量到峰值的时候是因为生产者生产太快,可以使用一些 限流降级 的方法。
增加多个消费者实例去水平扩展增加消费能力来匹配生产的激增,不过 同时你还需要增加每个主题的队列数量 。(最快速)
如果消费者消费过慢的话,可以先检查消费者否是出现了大量的消费错误 ,或打印一下日志查看是否是哪一个线程卡死,出现了锁资源不释放等等的问题。
回溯消费
Consumer 已经消费成功的消息,由于业务上需求需要重新消费,在RocketMQ 中,Broker 在向 Consumer 投递成功消息后,消息仍然需要保留 。
高性能读写
传统 IO 方式
传统的 IO 读写其实就是 read + write 的操作。
过程
用户调用 read()方法,开始读取数据,此时发生一次上下文从用户态到内核态的切换。
将磁盘数据通过 DMA 拷贝到内核缓存区。
将内核缓存区的数据拷贝到用户缓冲区,这样用户,也就是我们写的代码就能拿到文件的数据。
read()方法返回,此时就会从内核态切换到用户态。
当我们拿到数据之后,就可以调用 write()方法,此时上下文会从用户态切换到内核态。
CPU 将用户缓冲区的数据拷贝到 Socket 缓冲区。
将 Socket 缓冲区数据拷贝至网卡。
write()方法返回,上下文重新从内核态切换到用户态。
整个过程发生了 4 次上下文切换和 4 次数据的拷贝,这在高并发场景下肯定会严重影响读写性能,故引入了零拷贝技术。
零拷贝技术
mmap
一种内存映射文件的方法,即将一个文件或者其它对象映射到进程的地址空间,实现文件磁盘地址和进程虚拟地址空间中一段虚拟地址的一一对映关系。
简单地说就是内核缓冲区和应用缓冲区共享,从而减少了从读缓冲区到用户缓冲区的一次 CPU 拷贝。
基于 mmap IO 读写其实就变成 mmap + write 的操作,也就是用 mmap 替代传统 IO 中的 read 操作。
过程
当用户发起 mmap 调用的时候会发生上下文切换 1,进行内存映射,然后数据被拷贝到内核缓冲区,mmap 返回,发生上下文切换 2;
随后用户调用 write,发生上下文切换 3,将内核缓冲区的数据拷贝到 Socket 缓冲区,write 返回,发生上下文切换 4。
随后用户调用 write,发生上下文切换 3,将内核缓冲区的数据拷贝到 Socket 缓冲区,write 返回,发生上下文切换 4。
发生 4 次上下文切换和 3 次 IO 拷贝操作。
sendfile
sendfile()跟 mmap()一样,也会减少一次 CPU 拷贝,但是它同时也会减少两次上下文切换。
过程
用户在发起 sendfile()调用时会发生切换 1,之后数据通过 DMA 拷贝到内核缓冲区,
之后再将内核缓冲区的数据 CPU 拷贝到 Socket 缓冲区,最后拷贝到网卡,sendfile()返回,发生切换 2。发生了 3 次拷贝和两次切换。
之后再将内核缓冲区的数据 CPU 拷贝到 Socket 缓冲区,最后拷贝到网卡,sendfile()返回,发生切换 2。发生了 3 次拷贝和两次切换。
sendfile 并没有文件的读写操作,而是直接将文件的数据传输到 target 目标缓冲区,也就是说,sendfile 是无法知道文件的具体的数据的;
但是 mmap 不一样,他是可以修改内核缓冲区的数据的。假设如果需要对文件的内容进行修改之后再传输,只有 mmap 可以满足。
但是 mmap 不一样,他是可以修改内核缓冲区的数据的。假设如果需要对文件的内容进行修改之后再传输,只有 mmap 可以满足。
基于零拷贝技术,可以减少 CPU 的拷贝次数和上下文切换次数,从而可以实现文件高效的读写操作。
RocketMQ 内部主要是使用基于 mmap 实现的零拷贝,用来读写文件,这也是 RocketMQ 为什么快的一个很重要原因。
刷盘机制
刷盘
同步刷盘
在同步刷盘中需要等待一个刷盘成功的 ACK ,对 MQ 消息可靠性来说是一种不错的保障,但性能上会有较大影响 ,一般地适用于金融等特定业务场景。
异步刷盘
异步刷盘是开启一个线程去异步地执行刷盘操作。
消息刷盘采用后台异步线程提交的方式进行,降低了读写延迟,提高了 MQ 的性能和吞吐量,一般适用于如发验证码等对于消息保证要求不太高的业务场景。
消息刷盘采用后台异步线程提交的方式进行,降低了读写延迟,提高了 MQ 的性能和吞吐量,一般适用于如发验证码等对于消息保证要求不太高的业务场景。
异步刷盘只有在 Broker 意外宕机的时候会丢失部分数据,可以设置 Broker 的参数 FlushDiskType 来调整刷盘策略(ASYNC_FLUSH 或者 SYNC_FLUSH)。
复制
同步刷盘和异步刷盘是在单个结点层面的,而同步复制和异步复制主要是指的 Borker 主从模式下,主节点返回消息给客户端的时候是否需要同步从节点。
同步复制
也叫 “同步双写”,也就是说,只有消息同步双写到主从节点上时才返回写入成功 。
异步复制
消息写入主节点之后就直接返回写入成功 。
问题
复制方式无法保证 严格顺序。
解决:采用 Dledger,要求写入消息时,要求至少消息复制到半数以上的节点之后,才给客⼾端返回写⼊成功,并且它是⽀持通过选举来动态切换主节点的。
解决:采用 Dledger,要求写入消息时,要求至少消息复制到半数以上的节点之后,才给客⼾端返回写⼊成功,并且它是⽀持通过选举来动态切换主节点的。
对于消息可靠性是通过不同的刷盘策略保证的,而像异步同步复制策略仅仅是影响到了可用性 。
主要原因是 RocketMQ 是不支持自动主从切换的,当主节点挂掉之后,生产者就不能再给这个主节点生产消息了。
主要原因是 RocketMQ 是不支持自动主从切换的,当主节点挂掉之后,生产者就不能再给这个主节点生产消息了。
存储机制
存储架构角色
CommitLog
消息主体以及元数据的存储主体,存储 Producer 端写入的消息主体内容,消息内容不是定长的。
单个文件大小默认 1G ,文件名长度为 20 位,左边补零,剩余为起始偏移量。
消息主要是顺序写入日志文件,当文件满了,写入下一个文件。
ConsumeQueue
消息消费队列,引入的目的主要是提高消息消费的性能。
RocketMQ 是基于主题 Topic 的订阅模式,消息消费是针对主题进行的,如果要遍历 commitlog 文件中根据 Topic 检索消息是非常低效的。
Consumer 即可根据 ConsumeQueue 来查找待消费的消息。其中,ConsumeQueue(逻辑消费队列)作为消费消息的索引,
保存了指定 Topic 下的队列消息在 CommitLog 中的起始物理偏移量 offset ,消息大小 size 和消息 Tag 的 HashCode 值。
保存了指定 Topic 下的队列消息在 CommitLog 中的起始物理偏移量 offset ,消息大小 size 和消息 Tag 的 HashCode 值。
consumequeue 文件可以看成是基于 topic 的 commitlog 索引文件,
故 consumequeue 文件夹的组织方式如下:topic/queue/file 三层组织结构,具体存储路径为:$HOME/store/consumequeue/{topic}/{queueId}/{fileName}。
故 consumequeue 文件夹的组织方式如下:topic/queue/file 三层组织结构,具体存储路径为:$HOME/store/consumequeue/{topic}/{queueId}/{fileName}。
IndexFile
IndexFile(索引文件)提供了一种可以通过 key 或时间区间来查询消息的方法。
整个消息存储的结构,最主要的就是 CommitLoq 和 ConsumeQueue 。而 ConsumeQueue 你可以大概理解为 Topic 中的队列。
RocketMQ 采用的是 混合型的存储结构 ,即为 Broker 单个实例下所有的队列共用一个日志数据文件来存储消息。
在同样高并发的 Kafka 中会为每个 Topic 分配一个存储文件。
在同样高并发的 Kafka 中会为每个 Topic 分配一个存储文件。
应用
大规模消息流处理:RocketMQ 能够处理大量的消息流,并提供高吞吐量和低延迟的消息传递能力。
因此,它适用于需要处理大规模消息流的场景,如实时日志处理、实时数据分析和监控系统等。
因此,它适用于需要处理大规模消息流的场景,如实时日志处理、实时数据分析和监控系统等。
异步通信:RocketMQ 的发布-订阅模式和队列模式可以实现异步通信,将消息发送方和接收方解耦,提高系统的可伸缩性和弹性。
它适用于需要异步通信的场景,如异步任务处理、解耦系统组件和微服务架构等。
它适用于需要异步通信的场景,如异步任务处理、解耦系统组件和微服务架构等。
高可靠性和顺序性要求:RocketMQ 提供了可靠性投递和消息顺序保证的特性,
适用于对消息的可靠性和顺序性要求较高的场景,如金融交易系统、订单处理和支付系统等。
适用于对消息的可靠性和顺序性要求较高的场景,如金融交易系统、订单处理和支付系统等。
数据集成和异构系统集成:RocketMQ 可以作为数据集成和异构系统集成的中间件,将不同系统之间的数据进行传递和转换。
它适用于需要数据集成和系统间通信的场景,如数据同步、消息驱动的架构和异构系统集成等。
它适用于需要数据集成和系统间通信的场景,如数据同步、消息驱动的架构和异构系统集成等。
RabbitMQ
定义
采用 Erlang 语言实现 AMQP 的消息中间件,它最初起源于金融系统,用于在分布式系统中存储转发消息。
在易用性、扩展性、可靠性和高可用性等方面有着卓著表现。
特点
可靠性: RabbitMQ 使用一些机制来保证消息的可靠性,如持久化、传输确认及发布确认等。
灵活的路由: 在消息进入队列之前,通过交换器来路由消息。
对于典型的路由功能,RabbitMQ 己经提供了一些内置的交换器来实现。
针对更复杂的路由功能,可以将多个交换器绑定在一起,也可以通过插件机制来实现自己的交换器。
对于典型的路由功能,RabbitMQ 己经提供了一些内置的交换器来实现。
针对更复杂的路由功能,可以将多个交换器绑定在一起,也可以通过插件机制来实现自己的交换器。
扩展性: 多个 RabbitMQ 节点可以组成一个集群,也可以根据实际业务情况动态地扩展集群中节点。
高可用性: 队列可以在集群中的机器上设置镜像,使得在部分节点出现问题的情况下队列仍然可用。
支持多种协议: RabbitMQ 除了原生支持 AMQP 协议,还支持 STOMP、MQTT 等多种消息中间件协议。
多语言客户端: RabbitMQ 几乎支持所有常用语言,比如 Java、Python、Ruby、PHP、C#、JavaScript 等。
易用的管理界面: RabbitMQ 提供了一个易用的用户界面,使用户可以监控和管理消息、集群中的节点等。
插件机制: RabbitMQ 提供了许多插件,以实现从多方面进行扩展,当然也可以编写自己的插件。感觉这个有点类似 Dubbo 的 SPI 机制。
消息模型
发布 - 订阅模型。
底层:Exchange。
核心概念
RabbitMQ 整体上是一个生产者与消费者模型,主要负责接收、存储和转发消息。
Producer(生产者)
生产消息的一方(邮件投递者)。
消息
消息头(标签 Label)
由可选属性组成,这些属性包括 routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可能需要持久性存储)等。
消息体
消息体也可以称为 payLoad,消息体是不透明的。
生产者把消息交由 RabbitMQ 后,RabbitMQ 会根据消息头把消息发送给感兴趣的 Consumer(消费者)。
Consumer(消费者)
消费消息的一方(邮件收件人)。消费者连接到 RabbitMQ 服务器,并订阅到队列上。消费消息时只消费消息体,丢弃标签。
Exchange
生产者将消息发送到交换器,由交换器将消息路由到一个或者多个队列中。当路由不到时,或返回给生产者或直接丢弃。
在 RabbitMQ 中,消息并不是直接被投递到 Queue(消息队列) 中的,
中间还必须经过 Exchange(交换器) 这一层,Exchange(交换器) 会把我们的消息分配到对应的 Queue(消息队列) 中。
中间还必须经过 Exchange(交换器) 这一层,Exchange(交换器) 会把我们的消息分配到对应的 Queue(消息队列) 中。
Exchange(交换器) 用来接收生产者发送的消息并将这些消息路由给服务器中的队列中,
如果路由不到,或许会返回给 Producer(生产者) ,或许会被直接丢弃掉。这里可以将 RabbitMQ 中的交换器看作一个简单的实体。
如果路由不到,或许会返回给 Producer(生产者) ,或许会被直接丢弃掉。这里可以将 RabbitMQ 中的交换器看作一个简单的实体。
生产者将消息发给交换器的时候,一般会指定一个 RoutingKey(路由键),
用来指定这个消息的路由规则,而这个 RoutingKey 需要与交换器类型和绑定键(BindingKey)联合使用才能最终生效。
用来指定这个消息的路由规则,而这个 RoutingKey 需要与交换器类型和绑定键(BindingKey)联合使用才能最终生效。
RabbitMQ 中通过 Binding(绑定) 将 Exchange(交换器) 与 Queue(消息队列) 关联起来,在绑定的时候一般会指定一个 BindingKey(绑定建),
这样 RabbitMQ 就知道如何正确将消息路由到队列了。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,
所以可以将交换器理解成一个由绑定构成的路由表。Exchange 和 Queue 的绑定可以是多对多的关系。
这样 RabbitMQ 就知道如何正确将消息路由到队列了。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,
所以可以将交换器理解成一个由绑定构成的路由表。Exchange 和 Queue 的绑定可以是多对多的关系。
生产者将消息发送给交换器时,需要一个 RoutingKey,当 BindingKey 和 RoutingKey 相匹配时,消息会被路由到对应的队列中。
在绑定多个队列到同一个交换器的时候,这些绑定允许使用相同的 BindingKey。
BindingKey 并不是在所有的情况下都生效,它依赖于交换器类型,比如 fanout 类型的交换器就会无视,而是将消息路由到所有绑定到该交换器的队列中。
在绑定多个队列到同一个交换器的时候,这些绑定允许使用相同的 BindingKey。
BindingKey 并不是在所有的情况下都生效,它依赖于交换器类型,比如 fanout 类型的交换器就会无视,而是将消息路由到所有绑定到该交换器的队列中。
类型
fanout
fanout 类型的 Exchange 路由规则非常简单,它会把所有发送到该 Exchange 的消息路由到所有与它绑定的 Queue 中,
不需要做任何判断操作,所以 fanout 类型是所有的交换机类型里面速度最快的。fanout 类型常用来广播消息。
不需要做任何判断操作,所以 fanout 类型是所有的交换机类型里面速度最快的。fanout 类型常用来广播消息。
direct
direct 类型的 Exchange 路由规则也很简单,它会把消息路由到那些 Bindingkey 与 RoutingKey 完全匹配的 Queue 中。
如果发送消息的时候设置路由键为“warning”,那么消息会路由到 Queue1 和 Queue2。
如果在发送消息的时候设置路由键为"Info”或者"debug”,消息只会路由到 Queue2。如果以其他的路由键发送消息,则消息不会路由到这两个队列中。
如果在发送消息的时候设置路由键为"Info”或者"debug”,消息只会路由到 Queue2。如果以其他的路由键发送消息,则消息不会路由到这两个队列中。
direct 类型常用在处理有优先级的任务,根据任务的优先级把消息发送到对应的队列,这样可以指派更多的资源去处理高优先级的队列。
topic
topic 类型的交换器在匹配规则上进行了扩展,它与 direct 类型的交换器相似,
也是将消息路由到 BindingKey 和 RoutingKey 相匹配的队列中,但这里的匹配规则有些不同。
也是将消息路由到 BindingKey 和 RoutingKey 相匹配的队列中,但这里的匹配规则有些不同。
约定
RoutingKey 为一个点号“.”分隔的字符串(被点号“.”分隔开的每一段独立的字符串称为一个单词),
如 “com.rabbitmq.client”、“java.util.concurrent”、“com.hidden.client”。
如 “com.rabbitmq.client”、“java.util.concurrent”、“com.hidden.client”。
BindingKey 和 RoutingKey 一样也是点号“.”分隔的字符串。
BindingKey 中可以存在两种特殊字符串“*”和“#”,用于做模糊匹配,其中“*”用于匹配一个单词,“#”用于匹配多个单词(可以是零个)。
示例
路由键为 “com.rabbitmq.client” 的消息会同时路由到 Queue1 和 Queue2。
路由键为 “com.hidden.client” 的消息只会路由到 Queue2 中。
路由键为 “com.hidden.demo” 的消息只会路由到 Queue2 中。
路由键为 “java.rabbitmq.demo” 的消息只会路由到 Queue1 中。
路由键为 “java.util.concurrent” 的消息将会被丢弃或者返回给生产者(需要设置 mandatory 参数),因为它没有匹配任何路由键。
headers(不推荐)
headers 类型的交换器不依赖于路由键的匹配规则来路由消息,而是根据发送的消息内容中的 headers 属性进行匹配。
在绑定队列和交换器时指定一组键值对,当发送消息到交换器时,RabbitMQ 会获取到该消息的 headers(也是一个键值对的形式),
对比其中的键值对是否完全匹配队列和交换器绑定时指定的键值对,如果完全匹配则消息会路由到该队列,否则不会路由到该队列。
headers 类型的交换器性能会很差,而且也不实用,基本上不会看到它的存在。
在绑定队列和交换器时指定一组键值对,当发送消息到交换器时,RabbitMQ 会获取到该消息的 headers(也是一个键值对的形式),
对比其中的键值对是否完全匹配队列和交换器绑定时指定的键值对,如果完全匹配则消息会路由到该队列,否则不会路由到该队列。
headers 类型的交换器性能会很差,而且也不实用,基本上不会看到它的存在。
Queue
RabbitMQ 的内部对象,用于存储消息。多个消费者可以订阅同一队列,这时队列中的消息会被平摊(轮询)给多个消费者进行处理。
Queue(消息队列) 用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。
一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。
一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。
RabbitMQ 中消息只能存储在 队列 中,这一点和 Kafka 这种消息中间件相反。
Kafka 将消息存储在 topic(主题) 这个逻辑层面,而相对应的队列逻辑只是 topic 实际存储文件中的位移标识。
RabbitMQ 的生产者生产消息并最终投递到队列中,消费者可以从队列中获取消息并消费。
Kafka 将消息存储在 topic(主题) 这个逻辑层面,而相对应的队列逻辑只是 topic 实际存储文件中的位移标识。
RabbitMQ 的生产者生产消息并最终投递到队列中,消费者可以从队列中获取消息并消费。
多个消费者可以订阅同一个队列,这时队列中的消息会被平均分摊(Round-Robin,即轮询)给多个消费者进行处理,
而不是每个消费者都收到所有的消息并处理,这样避免消息被重复消费。
而不是每个消费者都收到所有的消息并处理,这样避免消息被重复消费。
RabbitMQ 不支持队列层面的广播消费,如果有广播消费的需求,需要在其上进行二次开发,这样会很麻烦,不建议这样做。
Broker
对于 RabbitMQ 来说,一个 RabbitMQ Broker 可以简单地看作一个 RabbitMQ 服务节点,或者 RabbitMQ 服务实例。
大多数情况下也可以将一个 RabbitMQ Broker 看作一台 RabbitMQ 服务器。
大多数情况下也可以将一个 RabbitMQ Broker 看作一台 RabbitMQ 服务器。
可以看做 RabbitMQ 的服务节点。一般情况下一个 Broker 可以看做一个 RabbitMQ 服务器。
AMQP
定义
RabbitMQ 就是 AMQP 协议的 Erlang 的实现(当然 RabbitMQ 还支持 STOMP2、 MQTT3 等协议 )。
AMQP 的模型架构 和 RabbitMQ 的模型架构是一样的,生产者将消息发送给交换器,交换器和队列绑定 。
RabbitMQ 中的交换器、交换器类型、队列、绑定、路由键等都是遵循的 AMQP 协议中相 应的概念。目前 RabbitMQ 最新版本默认支持的是 AMQP 0-9-1。
协议层
Module Layer:协议最高层,主要定义了一些客户端调用的命令,客户端可以用这些命令实现自己的业务逻辑。
Session Layer:中间层,主要负责客户端命令发送给服务器,再将服务端应答返回客户端,提供可靠性同步机制和错误处理。
TransportLayer:最底层,主要传输二进制数据流,提供帧的处理、信道服用、错误检测和数据表示等。
组件
交换器 (Exchange):消息代理服务器中用于把消息路由到队列的组件。
队列 (Queue):用来存储消息的数据结构,位于硬盘或内存中。
绑定 (Binding):一套规则,告知交换器消息应该将消息投递给哪个队列。
队列类型
死信队列
DLX,全称为 Dead-Letter-Exchange,死信交换器,死信邮箱。
当消息在一个队列中变成死信 (dead message) 之后,它能被重新被发送到另一个交换器中,这个交换器就是 DLX,绑定 DLX 的队列就称之为死信队列。
当消息在一个队列中变成死信 (dead message) 之后,它能被重新被发送到另一个交换器中,这个交换器就是 DLX,绑定 DLX 的队列就称之为死信队列。
原因
消息被拒(Basic.Reject /Basic.Nack) 且 requeue = false。
消息 TTL 过期。
队列满了,无法再添加。
延迟队列
延迟队列指的是存储对应的延迟消息,消息被发送以后,并不想让消费者立刻拿到消息,而是等待特定时间后,消费者才能拿到这个消息进行消费。
AMQP 协议以及 RabbitMQ 本身没有直接支持延迟队列的功能,但是可以通过 TTL 和 DLX 模拟出延迟队列的功能。
实现方式
通过 RabbitMQ 本身队列的特性来实现,需要使用 RabbitMQ 的死信交换机(Exchange)和消息的存活时间 TTL(Time To Live)。
在 RabbitMQ 3.5.7 及以上的版本提供了一个插件(rabbitmq-delayed-message-exchange)来实现延迟队列功能。插件依赖 Erlang/OPT 18.0 及以上。
优先级队列
RabbitMQ 自 V3.5.0 有优先级队列实现,优先级高的队列会先被消费。
可以通过x-max-priority参数来实现优先级队列。不过,当消费速度大于生产速度且 Broker 没有堆积的情况下,优先级显得没有意义。
工作模式
简单模式
work 工作模式
pub/sub 发布订阅模式
Routing 路由模式
Topic 主题模式
消息
传输
由于 TCP 链接的创建和销毁开销较大,且并发数受系统资源限制,会造成性能瓶颈,所以 RabbitMQ 使用信道的方式来传输数据。
信道(Channel)是生产者、消费者与 RabbitMQ 通信的渠道,信道是建立在 TCP 链接上的虚拟链接,且每条 TCP 链接上的信道数量没有限制。
就是说 RabbitMQ 在一条 TCP 链接上建立成百上千个信道来达到多个线程处理,
这个 TCP 被多个线程共享,每个信道在 RabbitMQ 都有唯一的 ID,保证了信道私有性,每个信道对应一个线程使用。
信道(Channel)是生产者、消费者与 RabbitMQ 通信的渠道,信道是建立在 TCP 链接上的虚拟链接,且每条 TCP 链接上的信道数量没有限制。
就是说 RabbitMQ 在一条 TCP 链接上建立成百上千个信道来达到多个线程处理,
这个 TCP 被多个线程共享,每个信道在 RabbitMQ 都有唯一的 ID,保证了信道私有性,每个信道对应一个线程使用。
安全
可靠性
消息到 MQ 的过程中搞丢,MQ 自己搞丢,MQ 到消费过程中搞丢。
方式
生产者到 RabbitMQ:事务机制和 Confirm 机制,注意:事务机制和 Confirm 机制是互斥的,两者不能共存,会导致 RabbitMQ 报错。
RabbitMQ 自身:持久化、集群、普通模式、镜像模式。
RabbitMQ 到消费者:basicAck 机制、死信队列、消息补偿机制。
顺序性
拆分多个 queue(消息队列),每个 queue(消息队列) 一个 consumer(消费者),就是多一些 queue (消息队列)而已,确实是麻烦点;
或者就一个 queue (消息队列)但是对应一个 consumer(消费者),然后这个 consumer(消费者)内部用内存队列做排队,然后分发给底层不同的 worker 来处理。
高可用
RabbitMQ 是比较有代表性的,因为是基于主从(非分布式)做高可用性的。
模式
单机模式
Demo 级别的,一般就是你本地启动了玩玩儿的?,没人生产用单机模式。
普通集群模式
在多台机器上启动多个 RabbitMQ 实例,每个机器启动一个。
镜像集群模式
RabbitMQ 的高可用模式。
跟普通集群模式不一样,在镜像集群模式下,创建的 queue,无论元数据还是 queue 里的消息都会存在于多个实例上,
就是说,每个 RabbitMQ 节点都有这个 queue 的一个完整镜像,包含 queue 的全部数据的意思。
然后每次你写消息到 queue 的时候,都会自动把消息同步到多个实例的 queue 上。
RabbitMQ 有很好的管理控制台,就是在后台新增一个策略,这个策略是镜像集群模式的策略,指定的时候是可以要求数据同步到所有节点的,
也可以要求同步到指定数量的节点,再次创建 queue 的时候,应用这个策略,就会自动将数据同步到其他的节点上去了。
就是说,每个 RabbitMQ 节点都有这个 queue 的一个完整镜像,包含 queue 的全部数据的意思。
然后每次你写消息到 queue 的时候,都会自动把消息同步到多个实例的 queue 上。
RabbitMQ 有很好的管理控制台,就是在后台新增一个策略,这个策略是镜像集群模式的策略,指定的时候是可以要求数据同步到所有节点的,
也可以要求同步到指定数量的节点,再次创建 queue 的时候,应用这个策略,就会自动将数据同步到其他的节点上去了。
优点
任何一个机器宕机了,没事儿,其它机器(节点)还包含了这个 queue 的完整数据,别的 consumer 都可以到其它节点上去消费数据。
缺点
这个性能开销也太大了吧,消息需要同步到所有机器上,导致网络带宽压力和消耗很重。
过期失效
RabbtiMQ 是可以设置过期时间的,也就是 TTL。如果消息在 queue 中积压超过一定的时间就会被 RabbitMQ 给清理掉,这个数据就没了。
解决
批量重导,即手动补回来。
Disruptor
定义
Disruptor 是一个开源的高性能内存队列,诞生初衷是为了解决内存队列的性能和内存安全问题,由英国外汇交易公司 LMAX 开发。
基于 Disruptor 开发的系统 LMAX(新的零售金融交易平台),单线程就能支撑每秒 600 万订单。
Disruptor 提供的功能优点类似于 Kafka、RocketMQ 这类分布式队列,不过,其作为范围是 JVM(内存)。
Disruptor 主要解决了 JDK 内置线程安全队列的性能和内存安全问题。
Kafka 和 Disruptor
Kafka:分布式消息队列,一般用在系统或者服务之间的消息传递,还可以被用作流式处理平台。
Disruptor:内存级别的消息队列,一般用在系统内部中线程间的消息传递。
应用
Log4j2:Log4j2 是一款常用的日志框架,它基于 Disruptor 来实现异步日志。
SOFATracer:SOFATracer 是蚂蚁金服开源的分布式应用链路追踪工具,它基于 Disruptor 来实现异步日志。
Storm : Storm 是一个开源的分布式实时计算系统,它基于 Disruptor 来实现工作进程内发生的消息传递(同一 Storm 节点上的线程间,无需网络通信)。
HBase:HBase 是一个分布式列存储数据库系统,它基于 Disruptor 来提高写并发性能。
核心概念
Event:你可以把 Event 理解为存放在队列中等待消费的消息对象。
EventFactory:事件工厂用于生产事件,我们在初始化 Disruptor 类的时候需要用到。
EventHandler:Event 在对应的 Handler 中被处理,你可以将其理解为生产消费者模型中的消费者。
EventProcessor:EventProcessor 持有特定消费者(Consumer)的 Sequence,并提供用于调用事件处理实现的事件循环(Event Loop)。
Disruptor:事件的生产和消费需要用到 Disruptor 对象。
RingBuffer:RingBuffer(环形数组)用于保存事件。
WaitStrategy:等待策略。决定了没有事件可以消费的时候,事件消费者如何等待新事件的到来。
Producer:生产者,只是泛指调用 Disruptor 对象发布事件的用户代码,Disruptor 没有定义特定接口或类型。
ProducerType:指定是单个事件发布者模式还是多个事件发布者模式(发布者和生产者的意思类似,我个人比较喜欢用发布者)。
Sequencer:Sequencer 是 Disruptor 的真正核心。
此接口有两个实现类 SingleProducerSequencer、MultiProducerSequencer ,它们定义在生产者和消费者之间快速、正确地传递数据的并发算法。
此接口有两个实现类 SingleProducerSequencer、MultiProducerSequencer ,它们定义在生产者和消费者之间快速、正确地传递数据的并发算法。
等待策略
BlockingWaitStrategy:基于 ReentrantLock+Condition 来实现等待和唤醒操作,实现代码非常简单,是 Disruptor 默认的等待策略。
虽然最慢,但也是 CPU 使用率最低和最稳定的选项生产环境推荐使用。
虽然最慢,但也是 CPU 使用率最低和最稳定的选项生产环境推荐使用。
BusySpinWaitStrategy:性能很好,存在持续自旋的风险,使用不当会造成 CPU 负载 100%,慎用。
LiteBlockingWaitStrategy:基于 BlockingWaitStrategy 的轻量级等待策略,在没有锁竞争的时候会省去唤醒操作,但是作者说测试不充分,因此不建议使用。
TimeoutBlockingWaitStrategy:带超时的等待策略,超时后会执行业务指定的处理逻辑。
LiteTimeoutBlockingWaitStrategy:基于TimeoutBlockingWaitStrategy的策略,当没有锁竞争的时候会省去唤醒操作。
SleepingWaitStrategy:三段式策略,第一阶段自旋,第二阶段执行 Thread.yield 让出 CPU,第三阶段睡眠执行时间,反复的睡眠。
YieldingWaitStrategy:二段式策略,第一阶段自旋,第二阶段执行 Thread.yield 交出 CPU。
PhasedBackoffWaitStrategy:四段式策略,第一阶段自旋指定次数,第二阶段自旋指定时间,第三阶段执行 Thread.yield 交出 CPU,
第四阶段调用成员变量的waitFor方法,该成员变量可以被设置为BlockingWaitStrategy、LiteBlockingWaitStrategy、SleepingWaitStrategy三个中的一个。
第四阶段调用成员变量的waitFor方法,该成员变量可以被设置为BlockingWaitStrategy、LiteBlockingWaitStrategy、SleepingWaitStrategy三个中的一个。
快的原因
RingBuffer(环形数组) : Disruptor 内部的 RingBuffer 是通过数组实现的。
由于这个数组中的所有元素在初始化时一次性全部创建,因此这些元素的内存地址一般来说是连续的。
由于这个数组中的所有元素在初始化时一次性全部创建,因此这些元素的内存地址一般来说是连续的。
避免了伪共享问题:CPU 缓存内部是按照 Cache Line(缓存行)管理的,一般的 Cache Line 大小在 64 字节左右。
为了确保目标字段独占一个 Cache Line,会在目标字段前后增加了 64 个字节的填充(前 56 后 8 ),可以避免 伪共享(False Sharing)问题。
为了确保目标字段独占一个 Cache Line,会在目标字段前后增加了 64 个字节的填充(前 56 后 8 ),可以避免 伪共享(False Sharing)问题。
无锁设计:Disruptor 采用无锁设计,避免了传统锁机制带来的竞争和延迟。
Disruptor 的无锁实现起来比较复杂,主要是基于 CAS、内存屏障(Memory Barrier)、RingBuffer 等技术实现的。
Disruptor 的无锁实现起来比较复杂,主要是基于 CAS、内存屏障(Memory Barrier)、RingBuffer 等技术实现的。
总结
Disruptor 之所以能够如此快,是基于一系列优化策略的综合作用,既充分利用了现代 CPU 缓存结构的特点,又避免了常见的并发问题和性能瓶颈。
Pulsar
定义
下一代云原生分布式消息流平台,最初由 Yahoo 开发 ,已经成为 Apache 顶级项目。
Pulsar 集消息、存储、轻量化函数式计算为一体,采用计算与存储分离架构设计,支持多租户、持久化存储、多机房跨区域数据复制,
具有强一致性、高吞吐、低延时及高可扩展性等流数据存储特性,被看作是云原生时代实时消息流传输、存储和计算最佳解决方案。
具有强一致性、高吞吐、低延时及高可扩展性等流数据存储特性,被看作是云原生时代实时消息流传输、存储和计算最佳解决方案。
特性
是下一代云原生分布式消息流平台。
Pulsar 的单个实例原生支持多个集群,可跨机房在集群间无缝地完成消息复制。
极低的发布延迟和端到端延迟。
可无缝扩展到超过一百万个 topic。
简单的客户端 API,支持 Java、Go、Python 和 C++。
主题的多种订阅模式(独占、共享和故障转移)。
通过 Apache BookKeeper 提供的持久化消息存储机制保证消息传递 。
由轻量级的 serverless 计算框架 Pulsar Functions 实现流原生的数据处理。
基于 Pulsar Functions 的 serverless connector 框架 Pulsar IO 使得数据更易移入、移出 Apache Pulsar。
分层式存储可在数据陈旧时,将数据从热存储卸载到冷/长期存储(如 S3、GCS)中。
ActiveMQ
目前已经被淘汰,不推荐使用,不建议学习。
选择
ActiveMQ 的社区算是比较成熟,但是较目前来说,ActiveMQ 的性能比较差,而且版本迭代很慢,不推荐使用,已经被淘汰了。
RabbitMQ 在吞吐量方面虽然稍逊于 Kafka、RocketMQ 和 Pulsar,但是由于它基于 Erlang 开发,所以并发能力很强,性能极其好,延时很低,达到微秒级。
但也因为基于 Erlang 开发,国内很难进行源码级的研究和定制,如果业务场景对并发量要求不高(十万级、百万级),RabbitMQ 是首选。
但也因为基于 Erlang 开发,国内很难进行源码级的研究和定制,如果业务场景对并发量要求不高(十万级、百万级),RabbitMQ 是首选。
RocketMQ 和 Pulsar 支持强一致性,对消息一致性要求比较高的场景可以使用。
RocketMQ 阿里出品,Java 系开源项目,源代码可以直接阅读,然后可以定制自己公司的 MQ,并且 RocketMQ 有阿里巴巴的实际业务场景的实战考验。
Kafka 的特点其实很明显,就是仅仅提供较少的核心功能,但是提供超高的吞吐量,ms 级的延迟,极高的可用性以及可靠性,而且分布式可以任意扩展。
同时 Kafka 最好是支撑较少的 topic 数量即可,保证其超高吞吐量。
Kafka 唯一的一点劣势是有可能消息重复消费,那么对数据准确性会造成极其轻微的影响,在大数据领域中以及日志采集中,这点轻微影响可以忽略。
天然适合大数据实时计算以及日志收集。如果是大数据领域的实时计算、日志采集等场景,用 Kafka 是业内标准的,几乎是全世界这个领域的事实性规范。
同时 Kafka 最好是支撑较少的 topic 数量即可,保证其超高吞吐量。
Kafka 唯一的一点劣势是有可能消息重复消费,那么对数据准确性会造成极其轻微的影响,在大数据领域中以及日志采集中,这点轻微影响可以忽略。
天然适合大数据实时计算以及日志收集。如果是大数据领域的实时计算、日志采集等场景,用 Kafka 是业内标准的,几乎是全世界这个领域的事实性规范。
0 条评论
下一页