RocketMQ
2024-08-01 18:37:38 7 举报
AI智能生成
RocketMQ是一个分布式、高性能、易于扩展的消息中间件,适用于大规模分布式系统之间的异步通信。它提供了包括发布/订阅、请求/回复、负载均衡、容错性、事务性消息等特性。RocketMQ基于Java开发,支持多种编程语言的客户端,如Java、C++、Python等。它主要用于大规模数据采集、实时处理、在线大数据分析等场景。
作者其他创作
大纲/内容
消息中间件是什么
消息队列(MQ)是一种系统间相互写作的通信机制,目前业界通常由两种方式来实现系统间通信,其中一种是基于远程过程调用的方式;另一种是基于消息队列的方式。前一种就是我们常说的RPC调用,
客户端不需要知道调用的具体实现细节,只需要调用实际存在于远程计算机上的某个对象即可,但调用方式看起来和调用本地应用程序中的对象一样。基于消息队列的方式是指由应用中的某个系统负责发送消息,由关心这条消息的相应系统负责接收消息,并在收到消息后进行各自系统内的业务处理,消息可以非常简单,比如只包含文本字符串;也可以很复杂,比如包含字节流、字节数组,还可能包含嵌入对象,消息在被发送后可以立即返回,由消息队列来负责消息的传递,消息发布者只管将消息发布到消息队列而不用管谁来去,消息使用者只管从消息队列中取消息而不管是谁发布的,这样发布者和使用者都不用知道对方的存在
客户端不需要知道调用的具体实现细节,只需要调用实际存在于远程计算机上的某个对象即可,但调用方式看起来和调用本地应用程序中的对象一样。基于消息队列的方式是指由应用中的某个系统负责发送消息,由关心这条消息的相应系统负责接收消息,并在收到消息后进行各自系统内的业务处理,消息可以非常简单,比如只包含文本字符串;也可以很复杂,比如包含字节流、字节数组,还可能包含嵌入对象,消息在被发送后可以立即返回,由消息队列来负责消息的传递,消息发布者只管将消息发布到消息队列而不用管谁来去,消息使用者只管从消息队列中取消息而不管是谁发布的,这样发布者和使用者都不用知道对方的存在
为什么需要消息中间件
解耦。多个业务系统间的解耦,使得单个业务系统更加独立
异步。对于一些业务上实时性要求不高的处理,可以转为异步处理
削峰。对于某个时间段的高并发流量可以进行固定速率进行处理
数据一致性。事务消息可以帮我们更好的实现分布式事务
MQ和RPC的区别
1.RPC虽然可以分为同步调用和异步调用,但在大多数情况下,
RPC请求发出时是需要获取最终的调用结果的,需要结果进行回传
RPC请求发出时是需要获取最终的调用结果的,需要结果进行回传
2.MQ虽然也可以实现分布式系统之间的调用,但是MQ回传被调用方的处理结果给调用方是,
这个操作是比较困难的,不是很容易实现
这个操作是比较困难的,不是很容易实现
Kafka与RocketMQ
为什么Kafka不能直接通过水平扩容机制增加消费者来提升消费速度,
而RocketMQ却可以
而RocketMQ却可以
Kafka之所以这样设计的原因有以下几点:
+ 保证分区局部有序性。一个分区同一时刻只能让一个消费者消费,这样有助于保证分区内的消息是有序的,能够实现在局部消息的顺序性,如果同时让多个消费者消费,必然会破坏分区的顺序性
+ 消费者组更好地协作和高吞吐。Kafka的集群消费模式中,一个消息只能被一个消费者组中的一个消费者消费,如果你要让一个Consumer消费Partion-0和Partion-1,那么其他的Consumer也要消费Partition-0和Partion-1,如果恰好出现Partiion-0的一条消息同时被两个Consumer拉取到,将会出现消息竞争,需要加锁来控制,这样势必会降低性能,这与Kafka高吞吐的理念相悖
所以在水平扩容消费者上面,相对RocketMQ来说不是那么地直接,在Kafka中需要做进一步考虑,多说一句,在RocketMQ中由于业务场景不同,相比Kafka处理的业务场景要复杂地多,所以RocketMQ需要支持消费者的水平扩容,这样就会出现消息竞争,但是为了水平扩容,RocketMQ需要这样做。
对比RocketMQ
RocketMQ在大多数情况下只会被同一个消费者组中的一个消费者实例消费,以保证消息的有序性。
但是在有些情况下,RocketMQ也支持消息负载均衡,即允许同一个MessageQueue被同一个消费者组中的多个消费者实例共同消费,
+ 消息负载均衡: 如果消费者组中存在一个实例处理速度较快,RocketMQ可能会将同一个MessageQueue分配给这个组中的其他相对较慢的实例,以实现负载均衡
+ 动态扩容:也就是我们讨论的动态增加消费者实例时,新加入的实例可能会被分配到已有实例所消费的MessageQueue上,以实现动态扩容
+ 保证分区局部有序性。一个分区同一时刻只能让一个消费者消费,这样有助于保证分区内的消息是有序的,能够实现在局部消息的顺序性,如果同时让多个消费者消费,必然会破坏分区的顺序性
+ 消费者组更好地协作和高吞吐。Kafka的集群消费模式中,一个消息只能被一个消费者组中的一个消费者消费,如果你要让一个Consumer消费Partion-0和Partion-1,那么其他的Consumer也要消费Partition-0和Partion-1,如果恰好出现Partiion-0的一条消息同时被两个Consumer拉取到,将会出现消息竞争,需要加锁来控制,这样势必会降低性能,这与Kafka高吞吐的理念相悖
所以在水平扩容消费者上面,相对RocketMQ来说不是那么地直接,在Kafka中需要做进一步考虑,多说一句,在RocketMQ中由于业务场景不同,相比Kafka处理的业务场景要复杂地多,所以RocketMQ需要支持消费者的水平扩容,这样就会出现消息竞争,但是为了水平扩容,RocketMQ需要这样做。
对比RocketMQ
RocketMQ在大多数情况下只会被同一个消费者组中的一个消费者实例消费,以保证消息的有序性。
但是在有些情况下,RocketMQ也支持消息负载均衡,即允许同一个MessageQueue被同一个消费者组中的多个消费者实例共同消费,
+ 消息负载均衡: 如果消费者组中存在一个实例处理速度较快,RocketMQ可能会将同一个MessageQueue分配给这个组中的其他相对较慢的实例,以实现负载均衡
+ 动态扩容:也就是我们讨论的动态增加消费者实例时,新加入的实例可能会被分配到已有实例所消费的MessageQueue上,以实现动态扩容
这是因为Kafka在一开始设计Parition的时候,就已经设计成了一个Parition在同一个时刻只能被一个Consumer消费,当消费者数量大于分区数量时,新加入的消费者是消费不到消息的,除非之前的分区数量是小于消费者数量
RocketMQ
概述
RocketMQ是阿里巴巴于2012年开源的分布式消息中间件,后来捐赠给Apache软件基金会,并于2017年9月25日称为Apache的顶级项目.
作为经历多过多次阿里巴巴双11这种超级工程的洗礼并有稳定出色表现得国产中间件,以其高性能、低延迟和高可靠等特性近年来被越来越多
的国内企业所使用
作为经历多过多次阿里巴巴双11这种超级工程的洗礼并有稳定出色表现得国产中间件,以其高性能、低延迟和高可靠等特性近年来被越来越多
的国内企业所使用
RocketMQ是所有的Topic都写入到一个CommitLog文件
模型设计图
集群结构
组件结构
领域模型结构
特点
具有灵活的可扩展性。RocketMQ天然支持集群,其核心四大组件(NameServer、Broker、Producer、Consumer)
的每一个都可以在没有单点故障的情况下进行水平扩展
的每一个都可以在没有单点故障的情况下进行水平扩展
具有海量消息堆积能力。RocketMQ采用零拷贝原理实现了超大量消息的堆积能力,据说单机已经可以支持亿级消息堆积
而且在堆积了这么多消息后依然可以保持写入低延迟
而且在堆积了这么多消息后依然可以保持写入低延迟
支持顺序消息。RocketMQ可以保证消息消费者按照消息的发送的顺序对消息进行消费。顺序消息分为全局有序消息和局部有序,
一般推荐使用局部有序消息,即生产者通过将某一类的消息按顺序发送到同一个队列中来实现
一般推荐使用局部有序消息,即生产者通过将某一类的消息按顺序发送到同一个队列中来实现
支持多种消息过滤方式。消息过滤分为在服务端过滤和在消费端过滤。在服务端过滤时可以按照消息消费者的要求进行过滤,
优点是减少了不必要的消息传输,缺点是增加了消息服务器的负担,实现相对复杂。消费端过滤则完全由具体应用自定义实现,
这种方式更加灵活,缺点是很多无用的消息会被传输给消息消费者
优点是减少了不必要的消息传输,缺点是增加了消息服务器的负担,实现相对复杂。消费端过滤则完全由具体应用自定义实现,
这种方式更加灵活,缺点是很多无用的消息会被传输给消息消费者
支持事务消息,RocketMQ除了支持普通消息、顺序消息之外,还支持事务消息,这个特性对于分布式事务来说提供了另一种解决思路
支持回溯消费,回溯消费是指对于消费者已经消费成功的消息,由于业务需求需要重新消费,RocketMQ支持按照时间回溯消费
时间精确到毫秒,可以向前回溯,也可以向后回溯
时间精确到毫秒,可以向前回溯,也可以向后回溯
核心组件
Topic
主题(Topic)可以被看作是消息的归类,它是消息的第一级类型,比如一个电商系统可以分为交易信息、物流信息等,
一条消息必须有一个主题,主题与生产者和消费者的关系非常松散,一个主题可以有0个或多个生产者向其发送消息,
一个生产者也可以同时向不同的主题发送消息,一个主题也可以被多个消费者订阅
一条消息必须有一个主题,主题与生产者和消费者的关系非常松散,一个主题可以有0个或多个生产者向其发送消息,
一个生产者也可以同时向不同的主题发送消息,一个主题也可以被多个消费者订阅
Message
消息就是要传输的信息。一条消息必须有一个主题,主题可以被看作是信件要邮寄的地址
一条消息也可以拥有可选的标签和额外的键值对,它们被用于设置一个业务key并在broker上
查找此消息,以便在卡法期间查找问题
一条消息也可以拥有可选的标签和额外的键值对,它们被用于设置一个业务key并在broker上
查找此消息,以便在卡法期间查找问题
Queue
主题被划分为一个或者多个子主题,即队列(Queue),在一个主题下可以设置多个队列,在发送消息时执行该消息的主题,
RocketMQ会轮询该主题下的所有队列将消息发送出去
RocketMQ会轮询该主题下的所有队列将消息发送出去
Tag
标签(tag)可以被看作是子主题,它是消息的第二级类型,用于伪用户提供额外的灵活性。使用标签,同一业务模块的不同目的的
消息就可以用相同的主题而不同的标签来标识。比如交易消息又可以分为交易创建消息,交易完成消息等,一条消息可以没有标签
标签有助于保持代码干净和连贯,并且还可以为RocketMQ的查询系统提供帮助
消息就可以用相同的主题而不同的标签来标识。比如交易消息又可以分为交易创建消息,交易完成消息等,一条消息可以没有标签
标签有助于保持代码干净和连贯,并且还可以为RocketMQ的查询系统提供帮助
Producer
负责生产消息,生产者向消息服务器发送由业务应用程序系统生成的消息,RocketMQ提供了三种方式发送消息
同步、异步、单向
同步、异步、单向
同步发送
同步发送指消息发送方发出数据后,会在收到接收方发出的响应之后才发送下一个数据包,
一般适用于重要通知消息场景,例如重要通知邮件,营销短信等,消息最可靠,如果发送失败,
则进行重传,不会引起消息丢失,但可能会发出重复消息,性能比较低
一般适用于重要通知消息场景,例如重要通知邮件,营销短信等,消息最可靠,如果发送失败,
则进行重传,不会引起消息丢失,但可能会发出重复消息,性能比较低
异步发送
异步发送指发出数据后,不等接收方发回响应,就接着发送下一个数据包
一般适用于可能链路耗时较长而对响应时间敏感的业务场景,例如用户视频上传后通知
转码服务等。一般需要提供回调接口,当broker收到消息时调用回调方法,让Produce回查哪些消息
发送成功,哪些发送失败,对于发送失败的消息,进行重发,对于批量消息而言,如果其中某一个消息发送
失败,则需要重发这一批消息,容易引起消息重复,但是这种机制效率最高,实现也比较复杂
一般适用于可能链路耗时较长而对响应时间敏感的业务场景,例如用户视频上传后通知
转码服务等。一般需要提供回调接口,当broker收到消息时调用回调方法,让Produce回查哪些消息
发送成功,哪些发送失败,对于发送失败的消息,进行重发,对于批量消息而言,如果其中某一个消息发送
失败,则需要重发这一批消息,容易引起消息重复,但是这种机制效率最高,实现也比较复杂
单向发送
单向发送指只负责发送消息而不等服务器回应且没有回调函数触发。一般适用于某些耗时
非常短但对可靠性要求并不高的场景,例如日志收集
非常短但对可靠性要求并不高的场景,例如日志收集
Consumer
消费者负责消费消息,它从消息服务器拉取消息并将其输入用户应用程序中,
从用户应用的角度来看,消费者由两种类型,拉取型消费者和推送型消费者
从用户应用的角度来看,消费者由两种类型,拉取型消费者和推送型消费者
拉取型消费者(Pull Consumer)主动从消息服务器拉取消息,只要批量拉取到消息,用户应用就会启动消费过程,所以poll被称为主动消费类型
推送型消费者(PushConsumer)封装了消息的拉取、消费进度和其他内部维护工作,将消息到达时执行的回调接口留给用户应用程序来实现,
所以Push被称为被动消费类型。但从实现上来看,还是从消息服务器拉取消息的,不同于Pull的是,Push首先要注册消费监听器,
当监听器触发后才开始消费
所以Push被称为被动消费类型。但从实现上来看,还是从消息服务器拉取消息的,不同于Pull的是,Push首先要注册消费监听器,
当监听器触发后才开始消费
Broker
消息服务器(Broker)是消息存储中心,其主要作用是接收来自生产者的消息并进行存储,消费者从这里拉取消息,
它还存储与消息相关的元数据,包括用户组、消费进度偏移量、队列信息等,从部署结构中可以看出,Broker有Master和Slave两种类型,
其中Master既可以读又可以写,Slave不可以写只可以读,从物理结构上看,Broker的集群部署有单Master,多Master、多Master多Slave等多种方式
它还存储与消息相关的元数据,包括用户组、消费进度偏移量、队列信息等,从部署结构中可以看出,Broker有Master和Slave两种类型,
其中Master既可以读又可以写,Slave不可以写只可以读,从物理结构上看,Broker的集群部署有单Master,多Master、多Master多Slave等多种方式
单Master
采用这种方式,一旦Broker重启或宕机就会导致整个服务不可用,这种方式风险较大,所以不建议在线上使用
采用这种方式,一旦Broker重启或宕机就会导致整个服务不可用,这种方式风险较大,所以不建议在线上使用
多Master
所有消息服务器都是Master,没有Slave.这种方式的优点是配置简单,单个Master当即或重启维护对应用无影响
缺点是在单台机器宕机期间,该机器上未被消费的消息在机器恢复之前不可订阅,消息的实时性会受到影响
所有消息服务器都是Master,没有Slave.这种方式的优点是配置简单,单个Master当即或重启维护对应用无影响
缺点是在单台机器宕机期间,该机器上未被消费的消息在机器恢复之前不可订阅,消息的实时性会受到影响
多Master多Slave(同步双写)
为每个Master都配置一个Slave,所以有多对Master-Slave,消息采用同步双写方式,主备都写成功了才返回成功,
这种方式的优点是数据与服务没有单点问题,Master宕机时消息五延迟,服务于数据的可用性非常高
缺点是相对异步复制的方式其性能略低,发送消息的延迟略高
为每个Master都配置一个Slave,所以有多对Master-Slave,消息采用同步双写方式,主备都写成功了才返回成功,
这种方式的优点是数据与服务没有单点问题,Master宕机时消息五延迟,服务于数据的可用性非常高
缺点是相对异步复制的方式其性能略低,发送消息的延迟略高
多Master多Slave(异步复制)
为每个Master都配置一个Slave,所以有多对Master-Slave,消息采用异步复制方式,竹北之间有毫秒级消息延迟,
这种方式的优点时丢失的消息非常少,且消息的实时性不会受到影响,Master宕机后消费者可以继续从Slave消费,
中间的过程对用户应用程序透明,不需要人工干预,性能同多Master方式几乎一样,缺点是Master宕机后在磁盘
损坏的情况下会丢失极少量的消息
为每个Master都配置一个Slave,所以有多对Master-Slave,消息采用异步复制方式,竹北之间有毫秒级消息延迟,
这种方式的优点时丢失的消息非常少,且消息的实时性不会受到影响,Master宕机后消费者可以继续从Slave消费,
中间的过程对用户应用程序透明,不需要人工干预,性能同多Master方式几乎一样,缺点是Master宕机后在磁盘
损坏的情况下会丢失极少量的消息
Producer Group
事务消息回查机制,可以使用到,同一个生产者组的生产消息逻辑是相同的,所以当事务消息向Broker提交本地事务不成功时,
有可能是在执行完本地事务之后宕机的,那么Broker只需要向同一个Producer Group中的任意一个Producer调用事务回查就可以
获取到本地事务的执行结果
有可能是在执行完本地事务之后宕机的,那么Broker只需要向同一个Producer Group中的任意一个Producer调用事务回查就可以
获取到本地事务的执行结果
生产者组(ProducerGroup)是一类生产者地集合,这类生产者通常发送一类消息并且发送逻辑一致,所以将这些生产者分组在一起
从部署结构上看,生产者通过生产者组的名字来标识自己是一个集群
从部署结构上看,生产者通过生产者组的名字来标识自己是一个集群
Producer分组有什么用
Consumer Group
消费者组(ConsumerGroup)是一类消费者的组合,这类消费者通常消费同一类消息并且消费逻辑一致,
所以将这些消费者分组在一起,消费者组于生产者组类似,都是将相同角色的消费者分组在一起并命名的。
分组是一个很精妙的概念设计,RocketMQ正是通过这种分组机制,实现了天然的消息负载均衡。
消费消息时,通过消费者组实现了将消息分发到多个消费者服务器实例,比如某个主题由9条消息,其中
一个消费者组由3个实例(3个进程或者3台机器),那么每个实例将均摊3条消息,也就意味着我们可以很方便地
通过增加机器来实现水平扩容
>>> 为什么Kafka就不能水平扩容?(见左边或者自己写的博客:https://blog.csdn.net/Cover_sky/article/details/135728325?spm=1001.2014.3001.5501)
所以将这些消费者分组在一起,消费者组于生产者组类似,都是将相同角色的消费者分组在一起并命名的。
分组是一个很精妙的概念设计,RocketMQ正是通过这种分组机制,实现了天然的消息负载均衡。
消费消息时,通过消费者组实现了将消息分发到多个消费者服务器实例,比如某个主题由9条消息,其中
一个消费者组由3个实例(3个进程或者3台机器),那么每个实例将均摊3条消息,也就意味着我们可以很方便地
通过增加机器来实现水平扩容
>>> 为什么Kafka就不能水平扩容?(见左边或者自己写的博客:https://blog.csdn.net/Cover_sky/article/details/135728325?spm=1001.2014.3001.5501)
Consumer分组有什么用?
NameServer
名称服务器(NameServer)用来保存Broker相关元信息,并给生产者和消费者查找Broker信息,名称服务器被设计成几乎无状态,
可以横向扩展,节点之间无通信,每个NameServer都存储了全部broker的信息,通过部署多台机器来标识自己是一个伪集群,
每个Broker在启动时都会到名称服务器中注册,生产者在发送消息前会根据主题到名称服务器中获取Broker的路由信息,消费者
也会定时获取主题的路由信息。所以从功能上看,它应该和Zookeeper差不多,据说RocketMQ的早期版本确实使用了ZooKeeper,
后来改为自己实现的名称服务器
可以横向扩展,节点之间无通信,每个NameServer都存储了全部broker的信息,通过部署多台机器来标识自己是一个伪集群,
每个Broker在启动时都会到名称服务器中注册,生产者在发送消息前会根据主题到名称服务器中获取Broker的路由信息,消费者
也会定时获取主题的路由信息。所以从功能上看,它应该和Zookeeper差不多,据说RocketMQ的早期版本确实使用了ZooKeeper,
后来改为自己实现的名称服务器
为什么要自己实现注册中心,而不用Zookeeper
1.NameServer之间没有任何通信,每个都是独立的,不存在选举问题
而ZooKeeper之间需要通过选举
而ZooKeeper之间需要通过选举
2.ZooKeeper在CAP理论中保证的是CP,站在服务中心的角度来看,AP要比CP好,
如果客户端发消息请求NameServer发送失败,重发即可,在用Zookeeper,一致性期间,
NamerServer无法访问
如果客户端发消息请求NameServer发送失败,重发即可,在用Zookeeper,一致性期间,
NamerServer无法访问
3.Nacos通用的服务注册和参数配置,虽然Nacos是AP,但是RocketMQ中NameServer做的事情比较少,
只需要保管好这些Broker的信息即可,这是一个轻量级框架
只需要保管好这些Broker的信息即可,这是一个轻量级框架
4.如果要用分布式,则必然要解决脑裂问题,当出现网络分区,选出两个Master时,则需要再次进行选举
支持的消息类型
普通消息
顺序消息
将同一个订单(即具有相同的orderId)的消息按状态先后顺序消费的,所以消息生产者调用send方法发送时需要传入MessageQueueSelector接口的
实现类,将orderId相同的消息放入同一个MessageQueue中,比如对orderId进行取余,消费端还需要实现MessageListenerOrderly接口用于消费
有序的消息,MessageListenerConcurrently接口消费的消息是无序的
实现类,将orderId相同的消息放入同一个MessageQueue中,比如对orderId进行取余,消费端还需要实现MessageListenerOrderly接口用于消费
有序的消息,MessageListenerConcurrently接口消费的消息是无序的
顺序消费的原理是确保将消息投递到同一个队列中,在队列内部RocketMQ保证先进先出,而同一个队列会被投递到同一个消费者实例
再由消费者拉取数据进行消费。在消费者内部会维护本地队列锁,以保证当前只有一个线程能够进行消费,所拉到的消息先被存入消息
处理队列中,然后再从消息处理队列中顺序获取消息用MessageListenerOrderly进行消费(这也是在顺序消费时监听消息要实现
MessageListener接口)的原因
再由消费者拉取数据进行消费。在消费者内部会维护本地队列锁,以保证当前只有一个线程能够进行消费,所拉到的消息先被存入消息
处理队列中,然后再从消息处理队列中顺序获取消息用MessageListenerOrderly进行消费(这也是在顺序消费时监听消息要实现
MessageListener接口)的原因
消费者端的顺序消费,需要有个前提,那就是保证Producer、Broker要保证有序,缺一不可
Producer
在默认情况下,消息发送者会采取Round Robin轮询方式把消息发送到不同的MessageQueue(分区队列),
而消费者消费的时候也从多个MessageQueue上拉取消息,这种情况下消息是不能保证顺序的.
而只有当一组有序的消息发送到同一个MessageQueue上时,才能利用MesageQueue先进先出的特性
保证这一组消息有序
而消费者消费的时候也从多个MessageQueue上拉取消息,这种情况下消息是不能保证顺序的.
而只有当一组有序的消息发送到同一个MessageQueue上时,才能利用MesageQueue先进先出的特性
保证这一组消息有序
Broker
Broker中一个队列内的消息是可以保证有序的
Consumer
消费者会从多个消息队列上去拿消息,这时虽然每个消息队列上的消息是有序的,但是多个队列之间的消息
仍然是乱序的,消费者端要保证消息是有序的,就需要按队列一个一个地来取消息,即取完一个队列的消息后,
再去取下一个队列的消息。而给Consumer注入的MessageListenerOrderly对象,在RocketMQ内部就会通过
锁队列的方式保证消息是一个一个队列来取的,MessageListenerConcurrently这个消息监听器不会锁队列,
每次都是从多个Message中取一批数据(默认不超过32条),因此也无法保证消息有序
仍然是乱序的,消费者端要保证消息是有序的,就需要按队列一个一个地来取消息,即取完一个队列的消息后,
再去取下一个队列的消息。而给Consumer注入的MessageListenerOrderly对象,在RocketMQ内部就会通过
锁队列的方式保证消息是一个一个队列来取的,MessageListenerConcurrently这个消息监听器不会锁队列,
每次都是从多个Message中取一批数据(默认不超过32条),因此也无法保证消息有序
延时消息
延时消息实现的效果就是再调用producer.send方法后,消息并不会立即发送出去,而是会等一段时间再发送出去,
这是RocketMQ特有的一个功能
延迟时间的设置就是在Message消息对象上设置一个延迟级别message.setDelayTimeLevel(3);
开源版本的RocketMQ中,对延迟消息并不支持任意时间的延迟设定(商业版本中支持),
而是只支持18个固定的延迟级别,1到18分别对应messageDelayLevel = 1s 5s 10s 30s 1m 2m 3m 4m.....2h
而这18个延迟级别也支持自行定义,不过一般情况下最好不要自定义修改
这是RocketMQ特有的一个功能
延迟时间的设置就是在Message消息对象上设置一个延迟级别message.setDelayTimeLevel(3);
开源版本的RocketMQ中,对延迟消息并不支持任意时间的延迟设定(商业版本中支持),
而是只支持18个固定的延迟级别,1到18分别对应messageDelayLevel = 1s 5s 10s 30s 1m 2m 3m 4m.....2h
而这18个延迟级别也支持自行定义,不过一般情况下最好不要自定义修改
批量消息
批量消息是指将多条消息合并成一个批量消息,一次发送出去,这样的好处是可以减少网络IO,提升吞吐量
官网注释,如果批量消息大于1MB就不要使用一个批次发送,而要拆分成多个批次消息发送,也就是说
一个批次消息的大小不要超过1MB,实际使用时,这个1MB的限制可以稍微扩大点,实际的最大的限制是大约4MB.
但是使用批量消息时,这个消息长度确实是必须考虑的一个问题,而且批量消息的使用是有一定限制的,
这些消息应该有相同的Topic,相同的waitStoreMsgOK,而且不能是延迟消息、事务消息等
官网注释,如果批量消息大于1MB就不要使用一个批次发送,而要拆分成多个批次消息发送,也就是说
一个批次消息的大小不要超过1MB,实际使用时,这个1MB的限制可以稍微扩大点,实际的最大的限制是大约4MB.
但是使用批量消息时,这个消息长度确实是必须考虑的一个问题,而且批量消息的使用是有一定限制的,
这些消息应该有相同的Topic,相同的waitStoreMsgOK,而且不能是延迟消息、事务消息等
过滤消息
大多数情况下,可以使用Message的Tag属性来简单快速地过滤信息,主要时看消息消费者
tag是RocketMQ中特有的一个消息属性,RocketMQ的最佳实践中就建议,使用RocketMQ时,
一个应用可以就用一个Topic,而应用中的不同业务就用tag区分,但是,这种方式有一个很大的限制,
就是一个消息只能有一个tag,在一些比较复杂的场景就有点不足了
可以使用SQL表达式来对消息进行过滤,SQL92语法
tag是RocketMQ中特有的一个消息属性,RocketMQ的最佳实践中就建议,使用RocketMQ时,
一个应用可以就用一个Topic,而应用中的不同业务就用tag区分,但是,这种方式有一个很大的限制,
就是一个消息只能有一个tag,在一些比较复杂的场景就有点不足了
可以使用SQL表达式来对消息进行过滤,SQL92语法
过滤消息是在Broker端做,这样能节省网络带宽。
过滤时,先进行MessageTag的HashCode比较,如果相同,再进行内容比较
过滤时,先进行MessageTag的HashCode比较,如果相同,再进行内容比较
事务消息
所谓事务消息就是基于消息中间件模拟的两阶段提交(2PC),属于对消息中间件的一种特殊利用。总体思路如下:
1.系统A先向消息中间件发送一条预备消息(Half Message),消息中间件在保存好消息之后向系统A发送确认消息
2.系统A执行本地事务
3.系统A根据本地事务执行结果再向消息中间价发送提交消息,以提交二次确认,如果消息中间件得到不到系统A的本地事务执行情况,将会执行系统A实现的本地事务回查接口
4.消息中间件收到提交消息后,把预备消息标记为可投递,订阅者最终将接收到该消息
1.系统A先向消息中间件发送一条预备消息(Half Message),消息中间件在保存好消息之后向系统A发送确认消息
2.系统A执行本地事务
3.系统A根据本地事务执行结果再向消息中间价发送提交消息,以提交二次确认,如果消息中间件得到不到系统A的本地事务执行情况,将会执行系统A实现的本地事务回查接口
4.消息中间件收到提交消息后,把预备消息标记为可投递,订阅者最终将接收到该消息
事务消息设计模型图
Producer本地事务执行和Half消息发送组成一个整体,消费者消费下游服务组成一个整体
消费模式
集群消费
广播消费
消费顺序
顺序消费
顺序消费表示消息消费的顺序同生产者为每个消息队列发送的顺序一致,如果正在处理的全局顺序时强制性的场景,
则需要保证所使用的主题只有一个消息队列
则需要保证所使用的主题只有一个消息队列
并行消费
不再保证消息顺序,消费的最大并行数量受每个消费者客户端指定的线程池限制
系统参数调优相关
1.配置RocketMQ的JVM内存大小
runserver.sh需要定制nameserver的内存大小,
runbroker.sh中需要定制broker的内存大小
这些默认的配置可以认为都是经过检验的最优化配置,但是在实际情况中还需要根据服务器的实际情况进行调整
以runbroker.sh中对G1GC的配置举例,在runbroker.sh的关键配置
-XX:+UseG1GC(使用G1垃圾回收器)
-XX:G1HeapRegion(将G1的region块大小设为16M)
-XX:G1ReservePercent(在G1的老年代中预留25%空闲内存,这个默认值是10%)
-XX:InitiaatingHeapOccupancyPercent=30(当堆内存的使用率达到30%之后就会启动G1垃圾回收器尝试回收垃圾,
默认值是45%,RocketMQ把这个参数调小了,也就是提高了GC的频率,但是避免了垃圾对象过多,
一次垃圾回收时间太长的问题)
runserver.sh需要定制nameserver的内存大小,
runbroker.sh中需要定制broker的内存大小
这些默认的配置可以认为都是经过检验的最优化配置,但是在实际情况中还需要根据服务器的实际情况进行调整
以runbroker.sh中对G1GC的配置举例,在runbroker.sh的关键配置
-XX:+UseG1GC(使用G1垃圾回收器)
-XX:G1HeapRegion(将G1的region块大小设为16M)
-XX:G1ReservePercent(在G1的老年代中预留25%空闲内存,这个默认值是10%)
-XX:InitiaatingHeapOccupancyPercent=30(当堆内存的使用率达到30%之后就会启动G1垃圾回收器尝试回收垃圾,
默认值是45%,RocketMQ把这个参数调小了,也就是提高了GC的频率,但是避免了垃圾对象过多,
一次垃圾回收时间太长的问题)
2.RocketMQ的其他一些核心参数
3.Linux内核参数定制。在部署RocketMQ的时候,
还需要对Linux内核参数进行一定的定制
还需要对Linux内核参数进行一定的定制
ulimit,需要进行大量的网络通信和磁盘IO
vm.extra_freee_kbytes,告诉VM在后台回收(kswapd)启动的阈值与直接回收(通过分配进程)的阈值之间保留额外
的可用内存,RocketMQ使用此参数来避免内存分配中的长延迟(与具体内核版本相关)
的可用内存,RocketMQ使用此参数来避免内存分配中的长延迟(与具体内核版本相关)
vm.min_free_kbytes,如果将其设置为低于1024KB,将会巧妙地将系统破坏,并且系统在高负载下容易出现死锁
vm.max_map_count,限制一个进程可能具有的最大内存映射数
RocketMQ将使用mmap加载CommitLog和ConsumeQueue,
因此建议将此参数设置较大的值
RocketMQ将使用mmap加载CommitLog和ConsumeQueue,
因此建议将此参数设置较大的值
vm.swappiness,定义内核交换内存页面的积极程度,较高的值会增加攻击性,
较低的值会减少交换量,建议将值设置为10来避免交换延迟
较低的值会减少交换量,建议将值设置为10来避免交换延迟
vm.descriptor limits,RocketMQ需要为文件(CommitLog和ConsumeQueue)和网络连接打开文件描述符
建议设置文件描述符的值尽可能地调大 如:655350
CentOS7中的配置文件都在/proc/sys/vm目录下
RocketMQ的bin目录下有个os.sh里面设置了RocketMQ建议的系统内核参数,
可以根据情况进行调整
建议设置文件描述符的值尽可能地调大 如:655350
CentOS7中的配置文件都在/proc/sys/vm目录下
RocketMQ的bin目录下有个os.sh里面设置了RocketMQ建议的系统内核参数,
可以根据情况进行调整
常见问题
重复消费
产生的原因是发送消息时采用了多数分布式消息中间件产品提供的最少一次(at least once)的投递保障,对于这个问题最常见的解决方案
就是消息消费端实现业务幂等,只要保持幂等性,不管来多少条重复消息,最后处理的结果都是一样的
就是消息消费端实现业务幂等,只要保持幂等性,不管来多少条重复消息,最后处理的结果都是一样的
保障策略有at most once 最多消费一次, at least once 最少消费一次, exactly once 刚好一次,RocketMQ不支持exactly once只有一次的模式,
因为要在分布式系统下实现发送不重复并且消费不重复,将会产生非常大的开销,RocketMQ为了追求高性能并没有支持此特性
其实该问题的本质时网络调用存在不确定性,即既不成功也不失败的第三种状态,所以才会产生消息重复的问题
因为要在分布式系统下实现发送不重复并且消费不重复,将会产生非常大的开销,RocketMQ为了追求高性能并没有支持此特性
其实该问题的本质时网络调用存在不确定性,即既不成功也不失败的第三种状态,所以才会产生消息重复的问题
为什么RocketMQ不用ZooKeeper而要自己实现一个NameServer来注册?
见NameServer组件介绍
见NameServer组件介绍
Consumer分组有什么用? Producer分组的作用?
见Producer/Consumer Group组件介绍
见Producer/Consumer Group组件介绍
哪些环节会有丢消息的可能
这4个环节都有丢消息的可能
RocketMQ消息零丢失方案
1.生产者使用事务消息机制保证消息零丢失
1.为什么要发送这个half消息?有什么用
这个Half消息时在订单系统进行下单操作前发送,并且对下游服务的消费者是不可见的。
那这个消息的作用更多的体现在确认RocketMQ的服务是否正常。相当于嗅探下RocketMQ
服务是否正常,并且通知RocketMQ,我马上就要发一个很重要的消息了,你做好准备
那这个消息的作用更多的体现在确认RocketMQ的服务是否正常。相当于嗅探下RocketMQ
服务是否正常,并且通知RocketMQ,我马上就要发一个很重要的消息了,你做好准备
2.half消息如果写入失败了怎么办?
如果没有half消息这个流程,那我们通常是会在订单系统中先完成下单,再发送消息给MQ.
这时候写入消息到MQ如果失败就会非常尴尬了,而Half消息如果写入失败,我们就可以认为MQ的服务
是有问题的,这时就不能通知下游服务了,我们可以在下单时给订单一个状态标记,然后等待MQ服务
正常后再进行补偿操作,等MQ服务正常后重新下单通知下游服务
这时候写入消息到MQ如果失败就会非常尴尬了,而Half消息如果写入失败,我们就可以认为MQ的服务
是有问题的,这时就不能通知下游服务了,我们可以在下单时给订单一个状态标记,然后等待MQ服务
正常后再进行补偿操作,等MQ服务正常后重新下单通知下游服务
3.订单系统写数据库失败了怎么办?
这个问题我们同样比较下没有使用事务消息机制时会怎么办?如果没有使用事务消息,我们只能判断
下单失败,抛出了异常,那就不往MQ发消息了,这样至少保证不会对下游服务进行错误的通知。但是这样的话,
如果过一段时间数据库恢复过来了,这个消息就无法再次发送了,当然也可以设计另外的补偿机制,
例如将订单数据缓存起来,再启动一个线程定时尝试往数据库写。
如果使用事务机制,就可以有一种更优雅的方案,如果下单时,写数据库失败了(可能是数据库崩了,需要等待
一段时间才能恢复),那我们可以另外找个地方把订单消息先缓存起来(Redis、文本或者其他方式),
然后给RocketMQ返回一个UNKNOWN状态,这样RocketMQ就会过一段时间来回查事务状态,我们就可以
在回查事务状态时再尝试把订单数据写入数据库,如果数据库这时候已经恢复了,那就能完成正常的下单,
再继续后面的业务。这样这个订单的消息就不会因为数据库临时崩了而丢失
下单失败,抛出了异常,那就不往MQ发消息了,这样至少保证不会对下游服务进行错误的通知。但是这样的话,
如果过一段时间数据库恢复过来了,这个消息就无法再次发送了,当然也可以设计另外的补偿机制,
例如将订单数据缓存起来,再启动一个线程定时尝试往数据库写。
如果使用事务机制,就可以有一种更优雅的方案,如果下单时,写数据库失败了(可能是数据库崩了,需要等待
一段时间才能恢复),那我们可以另外找个地方把订单消息先缓存起来(Redis、文本或者其他方式),
然后给RocketMQ返回一个UNKNOWN状态,这样RocketMQ就会过一段时间来回查事务状态,我们就可以
在回查事务状态时再尝试把订单数据写入数据库,如果数据库这时候已经恢复了,那就能完成正常的下单,
再继续后面的业务。这样这个订单的消息就不会因为数据库临时崩了而丢失
4.Half消息写入成功后RocketMQ挂了怎么办?
在事务消息的处理机制中,未知状态的事务状态回查是由RocketMQ的Broker主动发起的,
也就是如果出现了这种情况,那RocketMQ就不会回调到事务消息中回查事务状态的服务,
这时,我们就可以将订单一直标记为"新下单"的状态。而等RocketMQ恢复后,只要存储的消息
没有丢失,RocketMQ就会再次继续状态回查的流程
也就是如果出现了这种情况,那RocketMQ就不会回调到事务消息中回查事务状态的服务,
这时,我们就可以将订单一直标记为"新下单"的状态。而等RocketMQ恢复后,只要存储的消息
没有丢失,RocketMQ就会再次继续状态回查的流程
5.下单成功后如何优雅地等待支付成功?
数据库方案.
在订单场景下,通常会要求下单完成后,客户在一定时间内,例如10分钟内完成订单支付,支付完成
后才会通知下游服务进行进一步地营销补偿?
如果不适用事务消息,那通常会怎么办?
最简单地方式是启动一个定时任务,每隔一段时间扫描订单表,比对未支付的订单的下单时间,将超时的
订单进行回收,这种方式显然有很大问题的,需要定时扫描很庞大的一个订单信息,这对系统是个不小
的压力。
在订单场景下,通常会要求下单完成后,客户在一定时间内,例如10分钟内完成订单支付,支付完成
后才会通知下游服务进行进一步地营销补偿?
如果不适用事务消息,那通常会怎么办?
最简单地方式是启动一个定时任务,每隔一段时间扫描订单表,比对未支付的订单的下单时间,将超时的
订单进行回收,这种方式显然有很大问题的,需要定时扫描很庞大的一个订单信息,这对系统是个不小
的压力。
延迟消息方案.
更进一步的方案是什么呢?是不是就可以使用RocketMQ提供的延迟消息机制,往MQ发一个延迟一分钟的
消息,消费到这个消息后去检查订单的支付状态,如果订单已经支付,就往下游发送下单的通知,而如果
没有支付,就再发一个延迟1分钟的消息,最终在第是个消息时把订单回收,这个方案就不用对全部的订单
表进行扫描,而只需要每次处理一个单独的订单消息
更进一步的方案是什么呢?是不是就可以使用RocketMQ提供的延迟消息机制,往MQ发一个延迟一分钟的
消息,消费到这个消息后去检查订单的支付状态,如果订单已经支付,就往下游发送下单的通知,而如果
没有支付,就再发一个延迟1分钟的消息,最终在第是个消息时把订单回收,这个方案就不用对全部的订单
表进行扫描,而只需要每次处理一个单独的订单消息
事务消息方案.
利用事务消息的状态回查机制来替代定时的任务。在下单时,给Broker返回一个UNKNOWN的位置状态。
而在状态回查的方法中去查询订单的支付状态。这样整个业务逻辑就会简单很多。只需要配置RocketMQ的
事务消息回查次数(默认15此)和事务回查间隔时间(messageDelayLevel)就可以更优雅的完成这个支付状态
检查的需求
利用事务消息的状态回查机制来替代定时的任务。在下单时,给Broker返回一个UNKNOWN的位置状态。
而在状态回查的方法中去查询订单的支付状态。这样整个业务逻辑就会简单很多。只需要配置RocketMQ的
事务消息回查次数(默认15此)和事务回查间隔时间(messageDelayLevel)就可以更优雅的完成这个支付状态
检查的需求
6.事务消息机制的作用
整体来说,在订单这个场景下,消息不丢失的问题实际上就还是转化成了下单这个业务与下游服务
的业务的分布式事务一致性问题,而事务一致性问题一直依赖都是一个非常复杂的问题。而RocketMQ
的事务消息机制,实际上只保证了整个事务消息的一半,他保证的是订单系统下单和发消息这两个事件
的事务一致性,而对下游服务的事务并没有保证,但是即便如此,也是分布式事务的一个很好的降级方案,
目前来看,也是业内最好的降级方案
的业务的分布式事务一致性问题,而事务一致性问题一直依赖都是一个非常复杂的问题。而RocketMQ
的事务消息机制,实际上只保证了整个事务消息的一半,他保证的是订单系统下单和发消息这两个事件
的事务一致性,而对下游服务的事务并没有保证,但是即便如此,也是分布式事务的一个很好的降级方案,
目前来看,也是业内最好的降级方案
2.RocketMQ配置同步刷盘+(Dledger)Broker主从架构保证MQ主从同步时不会丢消息
1.同步刷盘
可以简单的把RocketMQ的刷盘方式flushDiskType配置成同步刷盘就可以保证消息在刷盘过程中不会丢失了
2.Dledger的文件同步
在使用Dledger技术搭建的RocketMQ集群中,Dledger会通过两阶段提交的方式抱着呢个文件在主从
之间成功同步
之间成功同步
简单来说,数据同步会通过两个阶段,一个是uncommited阶段,一个是commiitted阶段
Leader Broker上的Dledger收到一条消息后,会标记为uncommitted状态,然后他通过自己的
DledgerServer组件把这个uncommitted数据发送给Follower Broker的DledgerServer组件。
接着Follower Broker的DledgerServer收到uncommitted消息之后,必须返回一个ack给
Leader Broker的Dledger,如果Leader Broker收到超过半数的Follower Broker返回的ack之后,
就会把消息标记为committed状态
再接下来,Leader Broker上的DledgerServer就会发送committed消息给Follower Broker上的DledgerServer,
让它们把消息也标记为committed状态,这样,就基于Raft协议完成了两阶段的数据同步
Leader Broker上的Dledger收到一条消息后,会标记为uncommitted状态,然后他通过自己的
DledgerServer组件把这个uncommitted数据发送给Follower Broker的DledgerServer组件。
接着Follower Broker的DledgerServer收到uncommitted消息之后,必须返回一个ack给
Leader Broker的Dledger,如果Leader Broker收到超过半数的Follower Broker返回的ack之后,
就会把消息标记为committed状态
再接下来,Leader Broker上的DledgerServer就会发送committed消息给Follower Broker上的DledgerServer,
让它们把消息也标记为committed状态,这样,就基于Raft协议完成了两阶段的数据同步
3.消费者端不要使用异步消费机制
正常情况下,消费者端都是需要先处理本地事务,然后再给MQ一个ACK相应,这时MQ
就会修改Offset,将消息标记为已消费,从而不再往其他消费者推送消息。所以在Broker的
这种情况会造成服务端消息丢失,这种异步消费的方式,就有可能造成消息状态返回后消费者
本地业务逻辑处理失败造成消息丢失的可能
就会修改Offset,将消息标记为已消费,从而不再往其他消费者推送消息。所以在Broker的
这种情况会造成服务端消息丢失,这种异步消费的方式,就有可能造成消息状态返回后消费者
本地业务逻辑处理失败造成消息丢失的可能
4.RocketMQ特有的问题,NameServer挂了如何保证消息不丢失
NameServer在RocketMQ中,扮演一个路由中心的角色,提供到Broker的路由功能。
但是其实这样的路由中心这样的功能,在所有的MQ中都是需要的,Kafka使用ZooKeeper
和一个作为Controller的Broker一起来提供路由服务的,整个功能是相当复杂纠结的。而
RabbitMQ是由每一个Broker来提供路由服务,只有RocketMQ把这个路由中心单独抽取了
出来,并独立部署,每一个NameServer都是独立的,集群中任意多的节点挂掉,都不会影响
它提供的路由功能,如果集群中所有的NameServer节点都挂了呢?
但是其实这样的路由中心这样的功能,在所有的MQ中都是需要的,Kafka使用ZooKeeper
和一个作为Controller的Broker一起来提供路由服务的,整个功能是相当复杂纠结的。而
RabbitMQ是由每一个Broker来提供路由服务,只有RocketMQ把这个路由中心单独抽取了
出来,并独立部署,每一个NameServer都是独立的,集群中任意多的节点挂掉,都不会影响
它提供的路由功能,如果集群中所有的NameServer节点都挂了呢?
有很多人就会认为生产者和消费者中都会有全部路由信息的缓存副本,那整个服务可以正常工作
一段时间,当NameServer全部挂了后,胜场这和消费者是立即就无法工作了的
一段时间,当NameServer全部挂了后,胜场这和消费者是立即就无法工作了的
回到消息不丢失的问题。在这种情况下,RocketMQ相当于整个服务都不可用了,那它本身肯定无法给我们
保证消息不丢失了。我们只能自己设计一个降级方案来处理这个问题了。例如再订单系统中,如果多次尝试
发送RocketMQ不成功,那就只能另找地方(Redis、文件或者内存等)把订单消息缓存下来,然后起一个线程
定时地扫描这些失败地订单消息,尝试往RocketMQ发送,这样等RocketMQ的服务恢复过来后,就能第一
时间把这些消息重新发送出去。整个这套降级的机制,在大型互联网项目中,都是必须要有的
保证消息不丢失了。我们只能自己设计一个降级方案来处理这个问题了。例如再订单系统中,如果多次尝试
发送RocketMQ不成功,那就只能另找地方(Redis、文件或者内存等)把订单消息缓存下来,然后起一个线程
定时地扫描这些失败地订单消息,尝试往RocketMQ发送,这样等RocketMQ的服务恢复过来后,就能第一
时间把这些消息重新发送出去。整个这套降级的机制,在大型互联网项目中,都是必须要有的
5.RocketMQ消息零丢失方案总结
1.生产者使用事务消息机制
2.Broker配置同步刷盘+Dledger主从架构
3.消费者不要使用异步消费
4.整个MQ挂了之后准备降级方案
这套方案在各个环节都大量地降低了系统地处理性能以及吞吐量。在很多场景下,这套方案带来的性能损失
的代价可能远大于部分消息丢失的代价。所以在使用这套方案时,要根据实际的业务情况来考虑,
例如,如果针对所有服务器都在同一个机房的场景,完全可以把Broker配置成异步刷盘来提升吞吐量。
而在有些对消息可靠性要求没有那么高的场景,在生产者端就可以采用其他一些更简单的方案来提升吞吐,
而采用定时对账、补偿的机制来提高消息的可靠性。而如果消费者不需要进行消息存盘,
那使用异步消费的机制带来的性能提升也是非常显著的。
的代价可能远大于部分消息丢失的代价。所以在使用这套方案时,要根据实际的业务情况来考虑,
例如,如果针对所有服务器都在同一个机房的场景,完全可以把Broker配置成异步刷盘来提升吞吐量。
而在有些对消息可靠性要求没有那么高的场景,在生产者端就可以采用其他一些更简单的方案来提升吞吐,
而采用定时对账、补偿的机制来提高消息的可靠性。而如果消费者不需要进行消息存盘,
那使用异步消费的机制带来的性能提升也是非常显著的。
使用RocketMQ如何保证消息顺序
1.为什么要保证消息有序?
比如,下单完之后,需要支付成功,才会进行物流快递,不能先让物流服务执行,再支付成功
2.如何保证消息有序?
全局有序。整个MQ系统的所有消息岩哥按照队列先入先出顺序进行消费
局部有序。只保证一部分关键消息的消费顺序
首先我们需要分析下这个问题,在通常的业务场景中,全局有序和局部有序哪个更重要?
其实在大部分的MQ业务场景,我们只需要保证局部有序就可以了,对于电商订单场景,
只要保证一个订单的所有消息是有序的就可以了,全局消息的顺序并不会太关心
其实在大部分的MQ业务场景,我们只需要保证局部有序就可以了,对于电商订单场景,
只要保证一个订单的所有消息是有序的就可以了,全局消息的顺序并不会太关心
落地到RocketMQ。通常情况下,发送者发送消息时,会通过MessageQueue轮询的方式
保证消息尽量均匀地分布到所有的MessageQueue上,而消费者也就同样需要从多个MessageQueue
上消费消息。而MessageQueue是RocketMQ存储消息的最小单元,它们之间的消息都是互相隔离的,
在这种情况下,是无法保证消息全局有序的,而对于局部有序的要求,只需要将有序的一组消息都存入
同一个MessageQueue里,这样MessageQueue的FIFO设计天生就可以保证这一组消息的有序。
RocketMQ中,剋在发送者发送消息时指定一个MessageSelector对象,让这个对象来决定消息发到
哪一个MessageQueue。这样就可以保证一组有序的消息能够发到同一个MessageQueue里。
保证消息尽量均匀地分布到所有的MessageQueue上,而消费者也就同样需要从多个MessageQueue
上消费消息。而MessageQueue是RocketMQ存储消息的最小单元,它们之间的消息都是互相隔离的,
在这种情况下,是无法保证消息全局有序的,而对于局部有序的要求,只需要将有序的一组消息都存入
同一个MessageQueue里,这样MessageQueue的FIFO设计天生就可以保证这一组消息的有序。
RocketMQ中,剋在发送者发送消息时指定一个MessageSelector对象,让这个对象来决定消息发到
哪一个MessageQueue。这样就可以保证一组有序的消息能够发到同一个MessageQueue里。
另外,通常所谓的保证Topic全局消息有序的方式,就是将Topic配置成只有一个MessageQueue队列(默认是4个)。
这样天生就能保证消息全局有序,这种方式对整个Topic的消息吞吐影响是非常大的,如果这样用,基本上就没有
用MQ的必要了
这样天生就能保证消息全局有序,这种方式对整个Topic的消息吞吐影响是非常大的,如果这样用,基本上就没有
用MQ的必要了
使用RocketMQ如何快速处理积压消息
1.如何确定RocketMQ有大量的消息积压?
在正常情况下,使用MQ都会要尽量保证它的消息生产速度和消费速度整体上
是平衡的,但是如果部分消费者系统出现故障,就会造成大量的消息积累。
这类问题通常在实际工作中会出现得比较隐蔽。例如某一天一个数据库突然挂了,
大家大概率就会集中处理数据库得问题,等好不容易把数据库恢复过来了,这时
基于这个数据库服务得消费者程序就会积累大量的消息。或者网络波动等情况,
也会导致消息大量的积累。这在一些大型的互联网项目中,消息积压的速度是相当
恐怖的,所以消息积压是个需要时时关注的问题
是平衡的,但是如果部分消费者系统出现故障,就会造成大量的消息积累。
这类问题通常在实际工作中会出现得比较隐蔽。例如某一天一个数据库突然挂了,
大家大概率就会集中处理数据库得问题,等好不容易把数据库恢复过来了,这时
基于这个数据库服务得消费者程序就会积累大量的消息。或者网络波动等情况,
也会导致消息大量的积累。这在一些大型的互联网项目中,消息积压的速度是相当
恐怖的,所以消息积压是个需要时时关注的问题
对于消息积压,如果是RocketMQ或者Kafka还好,它们的消息积压不会对性能造成
很大的影响,而如果是RabbitMQ的话,那就不太好了,大量的消息积压可以瞬间造成
性能直线下滑。对于RocketMQ来说,有个最简单的方式来确定消息是否有积压。那就是
使用web控制台,就能直接看到消息的积压情况,另外也可以通过mqadmin指令在后台检查
各个Topic的消息延迟情况,还可以在它的${sotrePathRootDir}/config目录下落地一系列
的json文件,也可以用来跟踪消息积压情况
很大的影响,而如果是RabbitMQ的话,那就不太好了,大量的消息积压可以瞬间造成
性能直线下滑。对于RocketMQ来说,有个最简单的方式来确定消息是否有积压。那就是
使用web控制台,就能直接看到消息的积压情况,另外也可以通过mqadmin指令在后台检查
各个Topic的消息延迟情况,还可以在它的${sotrePathRootDir}/config目录下落地一系列
的json文件,也可以用来跟踪消息积压情况
2.如何处理大量积压的消息
如果Topic下的MessageQueue配置得是足够多的,那每个Consumer实际上会分配
多个MessageQueue来进行消费。这个时候,就可以简单地通过增加Consumer的
服务节点数量来加快消息的消费,等积压消息消费完了,再恢复成正常情况。最极限的
情况是把Consumer的节点个数设置成跟MessageQueue的个数相同。但是如果此时
再继续增加Consumer的服务节点就没有用了
多个MessageQueue来进行消费。这个时候,就可以简单地通过增加Consumer的
服务节点数量来加快消息的消费,等积压消息消费完了,再恢复成正常情况。最极限的
情况是把Consumer的节点个数设置成跟MessageQueue的个数相同。但是如果此时
再继续增加Consumer的服务节点就没有用了
如果Topic下的MessageQueue配置不够多的话,那就不能用上面这种增加Consumer节点个数的方法了
这时如果要快速处理积压的消息,可以创建一个新的Topic,配置足够的多MessageQueue.然后把所有
消费者节点的目标Topic转向新的Topic,并紧急上线一组新的消费者,只负责消费旧Topic中的消息,
并转储到新的Topic中,这个速度是可以很快的,然后在新的Topic上,就可以通过增加消费者个数来提高
消费速度了.之后再根据情况恢复成正常的情况
这时如果要快速处理积压的消息,可以创建一个新的Topic,配置足够的多MessageQueue.然后把所有
消费者节点的目标Topic转向新的Topic,并紧急上线一组新的消费者,只负责消费旧Topic中的消息,
并转储到新的Topic中,这个速度是可以很快的,然后在新的Topic上,就可以通过增加消费者个数来提高
消费速度了.之后再根据情况恢复成正常的情况
RocketMQ的消息轨迹
RocketMQ默认提供了消息轨迹的功能,这个功能在排查问题时是非常有用的
1.RocketMQ消息轨迹数据的关键属性
Producer
生产实例信息
发送消息时间
消息是否发送成功
发送耗时
Consumer
消费实例信息
投递时间,投递轮次
消息是否消费成功
消费耗时
Broker
消息的Topic
消息存储位置
消息的Key值
消息的Tag值
2.消息轨迹配置
broker.conf打开一个关键配置
traceTopicEnable=true,默认是false,关闭的
traceTopicEnable=true,默认是false,关闭的
3.消息轨迹数据存储
默认情况下,消息轨迹数据是存于一个系统级别的Topic
RMQ_SYS_TRACE_TOPIC,这个Topic在Broker节点启动时
会自动创建出来,当然也可以自定义
RMQ_SYS_TRACE_TOPIC,这个Topic在Broker节点启动时
会自动创建出来,当然也可以自定义
订阅关系不一致和不能消费时如何排查?
订阅关系不一致
调整之前,调整任意一个实例的订阅关系和另一个保持一致
调整之后
消费者不能消费消息是最常见的问题之一,也是每个消息队列服务都会遇到的问题
1.确认哪个消息未消费。在这时消费者至少需要手机消息id、消息key、消息发送时间段三者之一
2.确认消息是否发送成功。可以通过消息id、消息key、消息时间段等任意一个条件在社区提供的
RocketMQ Console查找消息。如果查到消息,说明问题在消费者自身。此时消费者可以做如下检查,确认问题:
2.1 订阅的Topic和发送消息的Topic是否一致,包含大小写一致
2.2 订阅关系是否一致
2.3 消费代码是否抛出了异常,导致没有记录日志
2.4 消费者服务器和Namesrv或者Broker是否网络通畅
3.如果在第二步中没有查到消息,说明生产者没有生产成功。消息没有生产成功的问题可能是生产者自身的问题,
也可能是Namesrv或者Broker问题导致消息发送失败。此时生产者可以做如下检查
3.1 确认生产者服务器与Namesrv或Broker网络是否通畅
3.2 检查生产者发送日志,确认生产者是否被流控
3.3 检查Broker日志,确认Broker是否繁忙
3.4 检查Broker日志,确认磁盘是否已满
2.确认消息是否发送成功。可以通过消息id、消息key、消息时间段等任意一个条件在社区提供的
RocketMQ Console查找消息。如果查到消息,说明问题在消费者自身。此时消费者可以做如下检查,确认问题:
2.1 订阅的Topic和发送消息的Topic是否一致,包含大小写一致
2.2 订阅关系是否一致
2.3 消费代码是否抛出了异常,导致没有记录日志
2.4 消费者服务器和Namesrv或者Broker是否网络通畅
3.如果在第二步中没有查到消息,说明生产者没有生产成功。消息没有生产成功的问题可能是生产者自身的问题,
也可能是Namesrv或者Broker问题导致消息发送失败。此时生产者可以做如下检查
3.1 确认生产者服务器与Namesrv或Broker网络是否通畅
3.2 检查生产者发送日志,确认生产者是否被流控
3.3 检查Broker日志,确认Broker是否繁忙
3.4 检查Broker日志,确认磁盘是否已满
RocketMQ高性能背后的核心原理
读队列与写队列
在往写队列里写Message时,会同步写入到一个对应的读队列中
如果写队列大于读队列,就会有一部分写队列无法写入到读队列中,这一部分的消息就无法被读取,
就会造成消息丢失 --消息存入了,但是读不出来
在RocketMQ的管理控制台创建Topic时,可以看到要单独设置读队列和写队列。
通常在运行时,都需要设置读队列=写队列。perm字段表示Topic的权限,有三个可选项
2:禁写禁订阅
4: 可订阅
6: 可写可订阅
这其中,写队列会真实的创建对应的存储文件,负责消息写入。而读队列会记录Consumer的Offset,
负责消息读取,这其实是一种读写分离的思想。RocketMQ在设置MessageQueue的路由策略时,
就可以通过指向不同的队列来实现读写分离
通常在运行时,都需要设置读队列=写队列。perm字段表示Topic的权限,有三个可选项
2:禁写禁订阅
4: 可订阅
6: 可写可订阅
这其中,写队列会真实的创建对应的存储文件,负责消息写入。而读队列会记录Consumer的Offset,
负责消息读取,这其实是一种读写分离的思想。RocketMQ在设置MessageQueue的路由策略时,
就可以通过指向不同的队列来实现读写分离
在往写队列里写Message时,会同步写入到一个对应的读队列中
如果写队列大于读队列,就会有一部分写队列无法写入到读队列中,这一部分的消息就无法被读取,
就会造成消息丢失 --消息存入了,但是读不出来
而如果反过来,写队列小于读队列,那就有一部分读队列里时没有消息写入的,如果有一个消费者
被分配的时这些没有消息的读队列,那这些消费者就无法消费消息,造成消费者空转,极大的浪费性能
被分配的时这些没有消息的读队列,那这些消费者就无法消费消息,造成消费者空转,极大的浪费性能
从这里可以看到,写队列>读队列,会造成消息丢失,写队列<读队列,又会造成消费者空转,
所以,在使用时,都是要求=读队列.只有一种情况下可以考虑将读写队列设置为不一致,就是要对
Topic的MessageQueue进行缩减的时候。例如原来四个队列,现在要缩减成两个队列。如果立即
缩减读写队列,那么被缩减的MessageQueue上没有被消费的消息,就会丢失,这时,可以先缩减写队列,
待空出来的读队列上的消息都被消费完了之后,再来缩减读队列,这样就可以比较平稳的实现队列缩减了
所以,在使用时,都是要求=读队列.只有一种情况下可以考虑将读写队列设置为不一致,就是要对
Topic的MessageQueue进行缩减的时候。例如原来四个队列,现在要缩减成两个队列。如果立即
缩减读写队列,那么被缩减的MessageQueue上没有被消费的消息,就会丢失,这时,可以先缩减写队列,
待空出来的读队列上的消息都被消费完了之后,再来缩减读队列,这样就可以比较平稳的实现队列缩减了
消息持久化--重点
RocketMQ消息直接采用磁盘文件保存消息,默认路径在${user_home}/store目录下,这些存储目录
可以在broker.conf中自行指定,存储文件主要分为三个部分
可以在broker.conf中自行指定,存储文件主要分为三个部分
CommitLog
存储消息的元数据。所有消息都会顺序存入到CommitLog文件当中。CommitLog由多个人文件组成,
每个文件固定大小1G,以第一条消息的偏移量为文件名
每个文件固定大小1G,以第一条消息的偏移量为文件名
ConsumerQueue
存储消息在CommitLog的索引。一个MessageQueue一个文件,
记录当前MessageQueue被哪些消费者组,消费到了哪一条CommitLog.
记录当前MessageQueue被哪些消费者组,消费到了哪一条CommitLog.
IndexFile
为了消息查询提供了一种通过key或者时间区间来查询消息的方法,这种通过IndexFile来查找
消息的方法不影响发送与消费消息的主流程
消息的方法不影响发送与消费消息的主流程
另外还有几个辅助的存储文件
checkpoint
数据存盘检查点。里面主要记录commitlog文件、ConsumeQueue文件
以及IndexFile文件最后一次刷盘的时间戳
以及IndexFile文件最后一次刷盘的时间戳
config/*.json
这些文件是将RocketMQ的一些关键配置信息进行存盘保存。例如Topic配置、
消费者组配置、消费者组消息偏移量Offset等等一些信息
消费者组配置、消费者组消息偏移量Offset等等一些信息
abort
这个文件是RocketMQ用来判断程序是否正常关闭的一个标识文件。正常情况下,
会在启动时创建,而关闭服务时删除。但是如果遇到一些服务器宕机,或者kill -9
这样一些非正常关闭服务的情况,这个abort文件就不会删除,因此RocketMQ就
可以判断上一次服务是非正常关闭的,后续就会做一些数据恢复的操作
会在启动时创建,而关闭服务时删除。但是如果遇到一些服务器宕机,或者kill -9
这样一些非正常关闭服务的情况,这个abort文件就不会删除,因此RocketMQ就
可以判断上一次服务是非正常关闭的,后续就会做一些数据恢复的操作
整体的消息存储结构
1.CommitLog文件存储所有消息实体。所有生产者发过来的消息,都会无差别的依次
存储到commitLog文件当中。这样的好处是可以减少查找目标文件的时间,让消息以
最快的速度落盘,对比Kafka存文件时,需要寻找消息所属的Partition文件,再完成写入,
当Topic比较多时,这样的Partition寻址就会浪费比较多的时间,所以Kafka不太适合多Topic的
场景,而RocketMQ的这种快速落盘的方式在多Topic场景下,优势就比较明显
文件结构:CommitLog的文件大小是固定的,但是其中存储的每个消息单元长度是不固定的,
具体格式可以参考org.apache.rokcet.store.CommitLog.正因为消息的记录大小不固定,
所以RocketMQ在每次存CommitLog文件时,都会去检查当前CommitLog文件空间是否足够,
如果不够的话,就重新创建一个CommitLog文件,文件名为当前消息的偏移量
存储到commitLog文件当中。这样的好处是可以减少查找目标文件的时间,让消息以
最快的速度落盘,对比Kafka存文件时,需要寻找消息所属的Partition文件,再完成写入,
当Topic比较多时,这样的Partition寻址就会浪费比较多的时间,所以Kafka不太适合多Topic的
场景,而RocketMQ的这种快速落盘的方式在多Topic场景下,优势就比较明显
文件结构:CommitLog的文件大小是固定的,但是其中存储的每个消息单元长度是不固定的,
具体格式可以参考org.apache.rokcet.store.CommitLog.正因为消息的记录大小不固定,
所以RocketMQ在每次存CommitLog文件时,都会去检查当前CommitLog文件空间是否足够,
如果不够的话,就重新创建一个CommitLog文件,文件名为当前消息的偏移量
2.ConsumeQueue文件主要是加速消费者的消息索引。它的每个文件夹对应RocketMQ中的
一个MessageQueue,文件夹下的文件记录了每个MessageQueue中的消息在CommitLog
文件当中的偏移量。这样,消费者通过ConsumeQueue文件,就可以快速找到CommitLog文件
中感兴趣的消息记录。而消费者在COnsumeQueue文件当中的消费进度,会保存在
config/consumerOffset.json文件当中
文件结构:每个COnsumeQueue文件固定由30万个固定大小20Byte的数据块组成,数据块的内容
包括:msgPhyOffset(8Byte,消息在文件中的起始位置) + msgSize(4byte,消息在文件中占用的长度)
+ msgTagCode(8Byte,消息tag的Hash值)
在ConsumeQueue.java当中有一个常量CQ_STORE_UNIT_SIZE=20,这个常量就表示一个数据块的大小
一个MessageQueue,文件夹下的文件记录了每个MessageQueue中的消息在CommitLog
文件当中的偏移量。这样,消费者通过ConsumeQueue文件,就可以快速找到CommitLog文件
中感兴趣的消息记录。而消费者在COnsumeQueue文件当中的消费进度,会保存在
config/consumerOffset.json文件当中
文件结构:每个COnsumeQueue文件固定由30万个固定大小20Byte的数据块组成,数据块的内容
包括:msgPhyOffset(8Byte,消息在文件中的起始位置) + msgSize(4byte,消息在文件中占用的长度)
+ msgTagCode(8Byte,消息tag的Hash值)
在ConsumeQueue.java当中有一个常量CQ_STORE_UNIT_SIZE=20,这个常量就表示一个数据块的大小
3.IndexFile文件主要是辅助消息检索。消费者进行消息消费时,通过ConsumeQueue文件就足够完成
消息检索了,但是如果要按照MessageId或者Messagekey来检索文件,比如RocketMQ管理控制台的
消息轨迹功能,ConsumeQueue文件就不够用了,IndexFile文件就是用来辅助这类消息检索的,它的
文件名比较特殊,不是以消息偏移量命名,而是用的时间命名。但是其实,它也是一个固定大小的文件,
文件结构:它的文件结构由indexHeader(固定40byte) + slot(固定500w个,每个固定20Byte)
+ index(最多500W*4个,每个固定20Byte)三部分组成
消息检索了,但是如果要按照MessageId或者Messagekey来检索文件,比如RocketMQ管理控制台的
消息轨迹功能,ConsumeQueue文件就不够用了,IndexFile文件就是用来辅助这类消息检索的,它的
文件名比较特殊,不是以消息偏移量命名,而是用的时间命名。但是其实,它也是一个固定大小的文件,
文件结构:它的文件结构由indexHeader(固定40byte) + slot(固定500w个,每个固定20Byte)
+ index(最多500W*4个,每个固定20Byte)三部分组成
过期文件的删除
消息既然要持久化,就必须有对应的删除机制,RocketMQ内置了一套过期文件的删除机制,
首先:如何判断过期文件:
RocketMQ中CommitLog文件和ConsumeQueue文件都是以偏移量命名的,对于非当前写的文件,
如果超过了一定的保留时间,那么这些文件都会被认为是过期文件,随时可以删除。这个保留时间
就是在broker.conf中配置的fieReservedTime属性。注意,RocketMQ判断文件是否过期的唯一
标准就是非当前写文件的保留时间,而并不关心文件当中的消息是否被消费过。所以,RocketMQ的
消息堆积也是有时间限度的
首先:如何判断过期文件:
RocketMQ中CommitLog文件和ConsumeQueue文件都是以偏移量命名的,对于非当前写的文件,
如果超过了一定的保留时间,那么这些文件都会被认为是过期文件,随时可以删除。这个保留时间
就是在broker.conf中配置的fieReservedTime属性。注意,RocketMQ判断文件是否过期的唯一
标准就是非当前写文件的保留时间,而并不关心文件当中的消息是否被消费过。所以,RocketMQ的
消息堆积也是有时间限度的
然后:何时删除过期文件:
RocketMQ内部有一个定时任务,对文件进行扫描,并且触发文件删除的操作。用户可以指定文件
删除操作的执行时间.在broker.conf中deleteWhen属性指定,默认是凌晨四点
RocketMQ内部有一个定时任务,对文件进行扫描,并且触发文件删除的操作。用户可以指定文件
删除操作的执行时间.在broker.conf中deleteWhen属性指定,默认是凌晨四点
另外,RocketMQ还会检查服务器的磁盘空间是否足够,如果磁盘空间的使用率达到一定的阈值,
也会触发过期文件删除,所以RocketMQ官方就特别建议,broker的磁盘空间不要少于4G
也会触发过期文件删除,所以RocketMQ官方就特别建议,broker的磁盘空间不要少于4G
高效文件写
RocketMQ采用了类似于Kafka的文件存储机制,但是文件存储是一个比较重的操作,
需要有非常多的设计才能保证频繁的文件读写场景下的高性能
需要有非常多的设计才能保证频繁的文件读写场景下的高性能
零拷贝技术加速文件读写。
零拷贝(zero-copy)是操作系统层面提供的一种加速文件读写的操作机制,非常多的开源软件
都在大量使用零拷贝,来提升IO操作的性能。对于Java应用层面,对应着mmap和sendFile
两种方式
零拷贝(zero-copy)是操作系统层面提供的一种加速文件读写的操作机制,非常多的开源软件
都在大量使用零拷贝,来提升IO操作的性能。对于Java应用层面,对应着mmap和sendFile
两种方式
1.理解CPU拷贝和DMA拷贝
我们知道,操作系统对于内存空间,是分为用户态和内核态的,用户态的应用程序无法
直接操作硬件,需要通过内核空间进行操作转换,才能真正操作硬件。这其实是为了保护
操作系统的安全,正因为如此,应用程序需要与网卡、磁盘等硬件进行数据交互时,就需要
在用户态和内核态之间来回的复制数据,而这些操作,原本都需要由CPU来进行任务的分配、
调度等管理步骤的,早先这些IO接口都是由CPU独立负责,所以当发生大规模的数据读写操作时,
CPU的占用率会非常高,见上图
直接操作硬件,需要通过内核空间进行操作转换,才能真正操作硬件。这其实是为了保护
操作系统的安全,正因为如此,应用程序需要与网卡、磁盘等硬件进行数据交互时,就需要
在用户态和内核态之间来回的复制数据,而这些操作,原本都需要由CPU来进行任务的分配、
调度等管理步骤的,早先这些IO接口都是由CPU独立负责,所以当发生大规模的数据读写操作时,
CPU的占用率会非常高,见上图
之后,操作系统为了避免完全被各种IO调用给占用,引入了DMA(Direct Memory Access,直接存储器访问),
由DMA来负责这些频繁的IO操作,DMA是一套独立的指令集,不会占用CPU的计算资源,这样,CPU就不需要
参与具体的数据复制的工作,只需要管理DMA的权限即可,见上图,DMA拷贝极大地释放了CPU的性能,因此它
的拷贝要快很多,但是,其实DMA拷贝本身,也在不断优化。
由DMA来负责这些频繁的IO操作,DMA是一套独立的指令集,不会占用CPU的计算资源,这样,CPU就不需要
参与具体的数据复制的工作,只需要管理DMA的权限即可,见上图,DMA拷贝极大地释放了CPU的性能,因此它
的拷贝要快很多,但是,其实DMA拷贝本身,也在不断优化。
引入DMA拷贝之后,在读写请求的过程重,CPU不再需要参与具体的工作,DMA可以独立完成数据在
系统内部的复制。但是,数据复制过程中,造成总线冲突,最终还是会影响数据读写性能。
为了避免DMA总线冲突对性能的影响,后来又引入了Channel通道的方式,Channel是一个完全独立的
处理器,专门负责IO操作,既然是处理器,Channel就有自己的IO指令,与CPU无关,它也更适合大型的
IO操作,性能更高,这也解释了,为什么Java应用层与零拷贝相关的操作都是通过Channel的字类实现的,
这其实是借鉴了操作系统中的概念,而所谓的零拷贝技术,其实并不是不拷贝,而是要尽量减少CPU拷贝
系统内部的复制。但是,数据复制过程中,造成总线冲突,最终还是会影响数据读写性能。
为了避免DMA总线冲突对性能的影响,后来又引入了Channel通道的方式,Channel是一个完全独立的
处理器,专门负责IO操作,既然是处理器,Channel就有自己的IO指令,与CPU无关,它也更适合大型的
IO操作,性能更高,这也解释了,为什么Java应用层与零拷贝相关的操作都是通过Channel的字类实现的,
这其实是借鉴了操作系统中的概念,而所谓的零拷贝技术,其实并不是不拷贝,而是要尽量减少CPU拷贝
2.mmap文件映射机制
以一次文件的读写操作为例,应用程序对磁盘文件的读与写,都需要经过内核态与用户态之间的状态切换,
每次状态切换的过程中,就需要有大量的数据复制,见上图,在这个过程中,总共需要进行四次数据拷贝,
而磁盘与内核态之间的数据拷贝在操作系统层面已经由CPU拷贝优化成了DMA拷贝。而内核态与用户态
之间的拷贝依然是CPU拷贝,所以,在这个场景下,零拷贝技术优化的重点,就是内核态与用户态之间的
这两次拷贝
每次状态切换的过程中,就需要有大量的数据复制,见上图,在这个过程中,总共需要进行四次数据拷贝,
而磁盘与内核态之间的数据拷贝在操作系统层面已经由CPU拷贝优化成了DMA拷贝。而内核态与用户态
之间的拷贝依然是CPU拷贝,所以,在这个场景下,零拷贝技术优化的重点,就是内核态与用户态之间的
这两次拷贝
而mmap文件映射的方式,就是在用户态不再保存文件的内容,而只保存文件的映射,包括文件的内存起始
地址,文件大小等。真实的数据,也不需要在用户态留存,可以直接通过操作映射,在内核态完成数据复制,
见上图,这个拷贝过程都是在操作系统的系统调用层面完成的,在Java应用层,其实是无法直接观测到的,
但是我们可以去JDK源码当中进行间接验证。在JDK的NIO包中,java.nio.HeapByteBuffer映射的就是JVM的
一块堆内内存,在HeapByteBuffer中,会由一个byte数组来缓存数据内容,所有的读写操作也是先操作这个
byte数组,这其实就是没有使用零拷贝的普通文件读写机制
地址,文件大小等。真实的数据,也不需要在用户态留存,可以直接通过操作映射,在内核态完成数据复制,
见上图,这个拷贝过程都是在操作系统的系统调用层面完成的,在Java应用层,其实是无法直接观测到的,
但是我们可以去JDK源码当中进行间接验证。在JDK的NIO包中,java.nio.HeapByteBuffer映射的就是JVM的
一块堆内内存,在HeapByteBuffer中,会由一个byte数组来缓存数据内容,所有的读写操作也是先操作这个
byte数组,这其实就是没有使用零拷贝的普通文件读写机制
而NIO把包中的另一个实现类java.nio.DirectByteBuffer则映射的是一块堆外内存。在DirectByteBuffer中,
并没有一个数据结构来保存数内容,只保存了一个内存地址。所有对数据的读写操作,都通过unsafe魔法类
直接交由内核完成,这其实就是mmap的读写机制。mmap文件映射机制,其实并不射你,我们启动任何一个
Java程序时,其实都大量用到了mmap文件映射。例如,我们可以在Linux机器上,运行一下,通过java指令运行起来后,
使用JPS查看运行的进程ID,再使用lsof -p {PID}的方式查看文件的映射情况
import java.util.Scanner;
public class BlockDemo {
public static void main(String[] args) {
Scanner scanner = new Scanner(System.in);
final String s= scanner.nextLine();
System.out.println(s);
}
}
这里能看到的mem类型的FD其实就是文件映射,最后这种mmap的映射机制由于还是需要用户态保存文件的映射信息,
数据复制的过程也需要用户态的参与,这其中的变数还是非常多的,所以,mmap机制适合操作小文件,如果文件太大,
映射信息也会过大,容易造成很多问题。通常mmap机制建议的映射文件大小不要超过2G.而RocketMQ做大的CommitLog
文件保持再1G固定大小,也是为了方便文件映射
并没有一个数据结构来保存数内容,只保存了一个内存地址。所有对数据的读写操作,都通过unsafe魔法类
直接交由内核完成,这其实就是mmap的读写机制。mmap文件映射机制,其实并不射你,我们启动任何一个
Java程序时,其实都大量用到了mmap文件映射。例如,我们可以在Linux机器上,运行一下,通过java指令运行起来后,
使用JPS查看运行的进程ID,再使用lsof -p {PID}的方式查看文件的映射情况
import java.util.Scanner;
public class BlockDemo {
public static void main(String[] args) {
Scanner scanner = new Scanner(System.in);
final String s= scanner.nextLine();
System.out.println(s);
}
}
这里能看到的mem类型的FD其实就是文件映射,最后这种mmap的映射机制由于还是需要用户态保存文件的映射信息,
数据复制的过程也需要用户态的参与,这其中的变数还是非常多的,所以,mmap机制适合操作小文件,如果文件太大,
映射信息也会过大,容易造成很多问题。通常mmap机制建议的映射文件大小不要超过2G.而RocketMQ做大的CommitLog
文件保持再1G固定大小,也是为了方便文件映射
import java.util.Scanner;
public class BlockDemo {
public static void main(String[] args) {
Scanner scanner = new Scanner(System.in);
final String s= scanner.nextLine();
System.out.println(s);
}
}
public class BlockDemo {
public static void main(String[] args) {
Scanner scanner = new Scanner(System.in);
final String s= scanner.nextLine();
System.out.println(s);
}
}
3.sendFile机制是怎么运行的
sendFile主要是通过java.nio.channels.FileChannel的transferTo方法完成的
sourcereadChannel.transferTo(0,sourceFile.length(), targetWriteChannel);
还记得Kafka当中是如何使用零拷贝的吗?就是将文件从磁盘复制到网卡时,就
大量地使用了零拷贝,见上图,早期地sendfile实现机制其实还是依靠CPU进行
页缓存与Socket缓冲区之间的数据拷贝,但是,在后期的不断改进过程中,sendFile
优化了实现机制,在拷贝过程中,并不直接拷贝文件的内容,而只是拷贝一个带有
文件位置和长度等信息的文件描述符FD,这样就大大减少了需要传递的数据。而真实
的数据内容,会交由DMA控制器,从也缓存中打包异步发送到socket中
Linux操作系统的man手册可以帮助看到一部分答案,使用man systemcall sendfile
就能看到Linux操作系统对于sendfile这个系统调用的手册,在2.6.33以前的Linux内核中,
out_fd只能是一个socket,但是现在的版本已经没有了这个限制,它可以是任何文件。
最后,sendFile机制在内核态直接完成了数据的复制,不需要用户态的参与,所以这种
机制的传输效率是非常稳定的,sendFile机制非常适合大数据的复制转移
sourcereadChannel.transferTo(0,sourceFile.length(), targetWriteChannel);
还记得Kafka当中是如何使用零拷贝的吗?就是将文件从磁盘复制到网卡时,就
大量地使用了零拷贝,见上图,早期地sendfile实现机制其实还是依靠CPU进行
页缓存与Socket缓冲区之间的数据拷贝,但是,在后期的不断改进过程中,sendFile
优化了实现机制,在拷贝过程中,并不直接拷贝文件的内容,而只是拷贝一个带有
文件位置和长度等信息的文件描述符FD,这样就大大减少了需要传递的数据。而真实
的数据内容,会交由DMA控制器,从也缓存中打包异步发送到socket中
Linux操作系统的man手册可以帮助看到一部分答案,使用man systemcall sendfile
就能看到Linux操作系统对于sendfile这个系统调用的手册,在2.6.33以前的Linux内核中,
out_fd只能是一个socket,但是现在的版本已经没有了这个限制,它可以是任何文件。
最后,sendFile机制在内核态直接完成了数据的复制,不需要用户态的参与,所以这种
机制的传输效率是非常稳定的,sendFile机制非常适合大数据的复制转移
子主题
顺序写加速文件写入磁盘。
通常应用程序往磁盘写文件时,由于磁盘空间不是连续的,会有很多碎片,所以在写一个文件时,
也就无法把一个文件卸载一块连续的磁盘空间中,而需要在磁盘多个扇区之间进行大量的随机写,
这个过程中有大量的寻址操作,会严重影响写数据的性能,而顺序写机制是在磁盘中提前申请一块
连续的磁盘空间,每次写数据时,就可以避免这些寻址操作,直接在之前写入的地址后面接着写就行。
Kafka官方详细分析过顺序写的性能提升问题,Kafka官方曾说明,顺序写的性能基本能够达到内存级别,
而如果配备固态硬盘,顺序写的性能甚至有可能超过写内存,而RocketMQ很大程度上借鉴了Kafka的这思想
通常应用程序往磁盘写文件时,由于磁盘空间不是连续的,会有很多碎片,所以在写一个文件时,
也就无法把一个文件卸载一块连续的磁盘空间中,而需要在磁盘多个扇区之间进行大量的随机写,
这个过程中有大量的寻址操作,会严重影响写数据的性能,而顺序写机制是在磁盘中提前申请一块
连续的磁盘空间,每次写数据时,就可以避免这些寻址操作,直接在之前写入的地址后面接着写就行。
Kafka官方详细分析过顺序写的性能提升问题,Kafka官方曾说明,顺序写的性能基本能够达到内存级别,
而如果配备固态硬盘,顺序写的性能甚至有可能超过写内存,而RocketMQ很大程度上借鉴了Kafka的这思想
刷盘机制保证消息不丢失。
在操作系统层面,当应用程序写入一个文件时,文件内容并不会直接写入到硬件当中,而是会先写入
到操作系统中的一个缓存PageCache中。PageCache缓存以4K大小为单位,缓存文件的具体内容。
这些写入到PageCache中的文件,在应用程序看来,是已经完全落盘保存好了的,可以正常修改、复制等等。
但是,本质上PageCache依然是内存形态,所以一断电就会丢失,因此,需要将内存状态的数据写入到磁盘当中,
这样数据才能真正完成持久化,断电也不会丢失这个过程就称为刷盘
在操作系统层面,当应用程序写入一个文件时,文件内容并不会直接写入到硬件当中,而是会先写入
到操作系统中的一个缓存PageCache中。PageCache缓存以4K大小为单位,缓存文件的具体内容。
这些写入到PageCache中的文件,在应用程序看来,是已经完全落盘保存好了的,可以正常修改、复制等等。
但是,本质上PageCache依然是内存形态,所以一断电就会丢失,因此,需要将内存状态的数据写入到磁盘当中,
这样数据才能真正完成持久化,断电也不会丢失这个过程就称为刷盘
PageCache是源源不断产生的,而Linux操作系统显然不可能时时刻刻往硬盘写文件,
所以,操作系统只会在某些特定的时刻将PageCache写入到磁盘。例如当我们正常关机时,
就会完成PageCache刷盘,另外,在Linux中,对于有数据修改的PageCache,会标记为Dirty
(脏页)状态。当DirtyPage的比例达到一定的阈值时,就会触发一次刷盘操作,例如在Linux操作
系统当中,可以通过/proc/meminfo文件查看到PageCache的状态
所以,操作系统只会在某些特定的时刻将PageCache写入到磁盘。例如当我们正常关机时,
就会完成PageCache刷盘,另外,在Linux中,对于有数据修改的PageCache,会标记为Dirty
(脏页)状态。当DirtyPage的比例达到一定的阈值时,就会触发一次刷盘操作,例如在Linux操作
系统当中,可以通过/proc/meminfo文件查看到PageCache的状态
但是,只要操作系统的刷盘操作不是时时刻刻执行的,那么对于用户态的应用程序来说,那就
避免不了非正常宕机时的数据丢失问题,因此,操作系统也提供了一个系统调用,应用程序可以
自行调用这个系统调用,完成PageCache的强制刷盘。在Linux中时fsync(),
也可以用man systemcall fsync()进行查看
避免不了非正常宕机时的数据丢失问题,因此,操作系统也提供了一个系统调用,应用程序可以
自行调用这个系统调用,完成PageCache的强制刷盘。在Linux中时fsync(),
也可以用man systemcall fsync()进行查看
RocketMQ对于何时进行刷盘,也设计了两种刷盘机制,同步刷盘和异步刷盘
同步刷盘。
在返回写成功状态时,消息已经被写入磁盘。具体流程是,消息写入内存的PageCache后,
立刻通知刷盘线程刷盘,然后等待刷盘完成,刷盘线程执行完后唤醒等待的线程,返回消息写
成功的状态
在返回写成功状态时,消息已经被写入磁盘。具体流程是,消息写入内存的PageCache后,
立刻通知刷盘线程刷盘,然后等待刷盘完成,刷盘线程执行完后唤醒等待的线程,返回消息写
成功的状态
异步刷盘。
在返回写成功状态时,消息可能只是被写入了内存的PageCache,写操作的返回快,吞吐量大,
当内存里的消息量积累到一定程度时,统一触发写磁盘动作,快速写入
在返回写成功状态时,消息可能只是被写入了内存的PageCache,写操作的返回快,吞吐量大,
当内存里的消息量积累到一定程度时,统一触发写磁盘动作,快速写入
配置方式:刷盘方式是通过Broker配置文件里的flushDiskType参数设置的,这个参数被配置成
SYNC_FLUSH、ASYNC_FLUSH中的一个,同步刷盘机制会更频繁地调用fsync,所以吞吐量
相比异步刷盘会降低,但是数据地安全性会得到提高
SYNC_FLUSH、ASYNC_FLUSH中的一个,同步刷盘机制会更频繁地调用fsync,所以吞吐量
相比异步刷盘会降低,但是数据地安全性会得到提高
消息主从复制
如果Broker以一个集群的方式部署,会有一个master节点和多个Slave节点,消息需要从master复制到slave上,
而消息复制的方式分为同步复制和异步复制。
同步复制:
同步复制是等Master和Slave都写入消息成功后才反馈给客户端写入成功的状态.在同步复制下,如果Master节点故障,
Slave上有全部的数据备份,这样容易恢复数据,但是同步复制会增大数据写入的延迟,降低系统的吞吐量
异步复制:
只要Master写入消息成功,就反馈给客户端写入成功的状态,然后再异步地将消息复制给Slave节点。在异步复制下,
系统拥有较低地延迟和较高地吞吐量,但是如果master节点故障,而有些数据没有完成复制,就会造成数据丢失
配置方式:
消息复制方式是通过Broker配置文件里地brokerRole参数进行设置的,
这个参数可以被设置成ASYNC_MASTER,SYNC_MASTER,SLAVE三个值中的一个
而消息复制的方式分为同步复制和异步复制。
同步复制:
同步复制是等Master和Slave都写入消息成功后才反馈给客户端写入成功的状态.在同步复制下,如果Master节点故障,
Slave上有全部的数据备份,这样容易恢复数据,但是同步复制会增大数据写入的延迟,降低系统的吞吐量
异步复制:
只要Master写入消息成功,就反馈给客户端写入成功的状态,然后再异步地将消息复制给Slave节点。在异步复制下,
系统拥有较低地延迟和较高地吞吐量,但是如果master节点故障,而有些数据没有完成复制,就会造成数据丢失
配置方式:
消息复制方式是通过Broker配置文件里地brokerRole参数进行设置的,
这个参数可以被设置成ASYNC_MASTER,SYNC_MASTER,SLAVE三个值中的一个
负载均衡--重点
Producer
Prodicer发送消息时,默认会轮询目标Topic下的所有MessageQueue,并采用递增取模的方式
往不同的MessageQueue上发送消息,已达到让消息平均落在不同的queue上的目的,而由于
MessageQueue是分布在不同的Broker上的,所以消息也会发送到不同的Broker上,见上图,
同时生产者在发送消息时,可以指定一个MessageQueueSelector,通过这个对象来将消息发送
到自己指定的MessageQueue上,这样可以保证消息局部有序
往不同的MessageQueue上发送消息,已达到让消息平均落在不同的queue上的目的,而由于
MessageQueue是分布在不同的Broker上的,所以消息也会发送到不同的Broker上,见上图,
同时生产者在发送消息时,可以指定一个MessageQueueSelector,通过这个对象来将消息发送
到自己指定的MessageQueue上,这样可以保证消息局部有序
Consumer
Consumer也是以MessageQueue为单位来进行负载均衡的,分为集群模式和广播模式
集群模式
在集群消费模式下,每条消息只需要投递到订阅的这个Topic的Consumer Group下的
一个实例即可,RocketMQ采用主动拉取的方式拉取并消费消息,在拉取的时候需要
明确指定拉取哪一条MessageQueue,没当实例的数量有变更,都会触发一次所有实例的
负载均衡,这时候会按照queue的数量和实例的数量平均分配queue给每个实例。每次
分配时,都会将MessageQueue和消费者ID进行排序后,再用不同的分配算法进行分配,
内置的分配的算法共有六种,分别对应AllocateMessageQueueStrategy下的六种实现类,
可以在consumer中直接set来指定,默认情况下使用的时最简单的平均分配策略
一个实例即可,RocketMQ采用主动拉取的方式拉取并消费消息,在拉取的时候需要
明确指定拉取哪一条MessageQueue,没当实例的数量有变更,都会触发一次所有实例的
负载均衡,这时候会按照queue的数量和实例的数量平均分配queue给每个实例。每次
分配时,都会将MessageQueue和消费者ID进行排序后,再用不同的分配算法进行分配,
内置的分配的算法共有六种,分别对应AllocateMessageQueueStrategy下的六种实现类,
可以在consumer中直接set来指定,默认情况下使用的时最简单的平均分配策略
AllocateMachineRoomNearby
将统计放的Consumer和Broker有限分配在一起。这个策略可以通过一个matchineRoomResolve对象来定制
Consumer和Broker的机房解析规则。然后还需要引入另外一个分配策略来对统计放的Broker和Consumer进行分配
一般也就用简单的平均分配策略或者轮询分配策略(但是比较鸡肋,直接给属性指定机房更好)
将统计放的Consumer和Broker有限分配在一起。这个策略可以通过一个matchineRoomResolve对象来定制
Consumer和Broker的机房解析规则。然后还需要引入另外一个分配策略来对统计放的Broker和Consumer进行分配
一般也就用简单的平均分配策略或者轮询分配策略(但是比较鸡肋,直接给属性指定机房更好)
AllocateMessageQueueAveragely
平均分配,将所有MessageQueue平均分给每一个消费者
平均分配,将所有MessageQueue平均分给每一个消费者
AllocateMessageQueueAveragelyByCircle
轮询分配。轮流地给一个消费者分配一个MessageQueue
轮询分配。轮流地给一个消费者分配一个MessageQueue
AllocateMessageQueueByConfig
不分配,直接指定一个MessageQueue,
类似于广播模式,直接指定所有队列
不分配,直接指定一个MessageQueue,
类似于广播模式,直接指定所有队列
AllocateMessageQueueByMachineRoom
按逻辑机房地概念进行分配。又是对BrokerName和COnsumerId定制化地配置
按逻辑机房地概念进行分配。又是对BrokerName和COnsumerId定制化地配置
ALlocateMessageQueueConsistenHash
这个一致性哈希策略只需要指定一个虚拟节点数,使用一个Hash环地算法,
虚拟节点是为了让Hash数据在环上分布更为均匀
这个一致性哈希策略只需要指定一个虚拟节点数,使用一个Hash环地算法,
虚拟节点是为了让Hash数据在环上分布更为均匀
广播模式
官博模式下,每一条消息都会投递给订阅了Topic的所有消费者实例,所以也就没有消息分配这一说,
而在实现上,即使在Consumer分配Queue时,所有Consumer都分到所有的Queue.
广播模式实现的关键是将消费者的消费偏移量不再保存到Broker当中
而在实现上,即使在Consumer分配Queue时,所有Consumer都分到所有的Queue.
广播模式实现的关键是将消费者的消费偏移量不再保存到Broker当中
消息重试
首先对于广播模式下的消息,是不存在消息重试的机制的,即消息消费失败后,会再重新进行发送,
而只是继续消费新的消息,而对于普通的消息,当消费者消费失败后,可以通过设置返回状态达到
消息重试的结果
而只是继续消费新的消息,而对于普通的消息,当消费者消费失败后,可以通过设置返回状态达到
消息重试的结果
如何让消息进行重试?
集群消费方式下,消息消费失败后期望消息重试,需要在消息监听器接口的实现中明确进行配置。
可以有三种配置方式:
1.返回Action.ReconsumeLater (推荐)
2.返回null
3.抛出异常
如果希望消费失败后不重试,可以直接返回.CommitMessage
集群消费方式下,消息消费失败后期望消息重试,需要在消息监听器接口的实现中明确进行配置。
可以有三种配置方式:
1.返回Action.ReconsumeLater (推荐)
2.返回null
3.抛出异常
如果希望消费失败后不重试,可以直接返回.CommitMessage
重试消息如何处理?
重试的消息会进入一个"%RETRY" + ConsumerGroup的队列中,然后RocketMQ默认允许
每条消息最多重试16次,每重试间隔时间如图,随着重试次数的递增,重发间隔时间也是递增的,
注:消费者实例要避免只有一个,否则重试次数是没有意义的
重试的消息会进入一个"%RETRY" + ConsumerGroup的队列中,然后RocketMQ默认允许
每条消息最多重试16次,每重试间隔时间如图,随着重试次数的递增,重发间隔时间也是递增的,
注:消费者实例要避免只有一个,否则重试次数是没有意义的
重试次数
如果消息重试16次后仍然失败,消息将不再投递,转为进入死信队列。另外一条消息无论重试多少次,
这些重试消息的MessageId始终都是一样的。这个重试次数,RocketMQ可以进行定制,例如通过
consumer.setMaxReconsumeTimes(20)// 将重试次数设置为20次,当定制的重试次数超过16次后,
消息的重试时间间隔均为2小时
这些重试消息的MessageId始终都是一样的。这个重试次数,RocketMQ可以进行定制,例如通过
consumer.setMaxReconsumeTimes(20)// 将重试次数设置为20次,当定制的重试次数超过16次后,
消息的重试时间间隔均为2小时
MessageId
在老版本的RocketMQ中,一条消息无论重试多少次,这些重试消息的MessgeId始终都是一样的
但是在4.9.1版本中,每次重试MessageId都会重建
但是在4.9.1版本中,每次重试MessageId都会重建
配置覆盖
消息最大重试次数的设置对相同GroupID下的所有Consumer实例有效,并且最后启动的Consuemr
会覆盖之前启动的Consumer配置
会覆盖之前启动的Consumer配置
死信队列
当一条消息消费失败,RocketMQ就会自动进行消息重试,而如果消息超过最大重试次数,
RocketMQ就会认为这个消息有问题,但是此时,RocketMQ不会立刻将这个有问题的消息丢弃,
而会将其发送到这个消费者组对应的一种特殊队列:死信队列
死信队列的名称是"%DLQ%+ConsumerGroup"
死信队列的特征:
1.一个死信队列对应一个ConsumerGroup,而不是对应某个消费者实例
2.如果一个ConsumerGroup没有产生死信队列,RocketMQ就不会为其创建相应的队列
3.一个死信队列中的消息不会再被消费者正常消费
4.死信队列的有效期跟正常消息相同,默认3天,对应broker.conf中的fileReservedTime属性,
超过这个最长时间的消息都会被删除,而不管消息是否被消费过
通常,一条消息进入了死信队列,意味着消息再消费处理的过程中出现了比较严重的错误,
并且无法自行恢复,此时,一般需要人工去查看死信队列中的消息,对错误原因进行排查,
然后对死信消息进行处理,比如转发到正常的Topic重新进行消费或者丢弃
注:默认创建出来的死信队列,它里面的消息是无法读取的,在控制台和消费者中都无法读取,
这是因为这些默认的死信队列,它们的权限perm被设置成了2:禁读(4,禁写,6:可读可写)
需要手动将死信队列的权限配置改成6,才能被消费
RocketMQ就会认为这个消息有问题,但是此时,RocketMQ不会立刻将这个有问题的消息丢弃,
而会将其发送到这个消费者组对应的一种特殊队列:死信队列
死信队列的名称是"%DLQ%+ConsumerGroup"
死信队列的特征:
1.一个死信队列对应一个ConsumerGroup,而不是对应某个消费者实例
2.如果一个ConsumerGroup没有产生死信队列,RocketMQ就不会为其创建相应的队列
3.一个死信队列中的消息不会再被消费者正常消费
4.死信队列的有效期跟正常消息相同,默认3天,对应broker.conf中的fileReservedTime属性,
超过这个最长时间的消息都会被删除,而不管消息是否被消费过
通常,一条消息进入了死信队列,意味着消息再消费处理的过程中出现了比较严重的错误,
并且无法自行恢复,此时,一般需要人工去查看死信队列中的消息,对错误原因进行排查,
然后对死信消息进行处理,比如转发到正常的Topic重新进行消费或者丢弃
注:默认创建出来的死信队列,它里面的消息是无法读取的,在控制台和消费者中都无法读取,
这是因为这些默认的死信队列,它们的权限perm被设置成了2:禁读(4,禁写,6:可读可写)
需要手动将死信队列的权限配置改成6,才能被消费
消息幂等
幂等概念
在MQ系统中,对于消息幂等有三种实现语义
1.at most once 最多一次:每条消息最多只会被消费一次
2.at least once 至少一次:每条消息至少会被消费一次
3.exactly once 刚刚好一次:每条消息都追确定地消费一次
这三种语义都有它使用的业务场景。其中,at most once是最好保证的,
RocketMQ中可以直接使用异步发送,sendOneWay等方式就可以保证,
而at least once这个语义,RocketMQ也有同步发送、事务消息等很多方式能够保证,
而这个exactly once是MQ种最理想也是最难保证的一种语义,需要有非常惊喜的设计才行,
RocketMQ只能保证at least once,保证不了exactly once,所以,使用RocketMQ时,
需要由业务系统自行保证消息的幂等性,
但是,对于Exactly once语义,阿里云上的商业版RocketMQ是有明确支持的,实现方式未开源
1.at most once 最多一次:每条消息最多只会被消费一次
2.at least once 至少一次:每条消息至少会被消费一次
3.exactly once 刚刚好一次:每条消息都追确定地消费一次
这三种语义都有它使用的业务场景。其中,at most once是最好保证的,
RocketMQ中可以直接使用异步发送,sendOneWay等方式就可以保证,
而at least once这个语义,RocketMQ也有同步发送、事务消息等很多方式能够保证,
而这个exactly once是MQ种最理想也是最难保证的一种语义,需要有非常惊喜的设计才行,
RocketMQ只能保证at least once,保证不了exactly once,所以,使用RocketMQ时,
需要由业务系统自行保证消息的幂等性,
但是,对于Exactly once语义,阿里云上的商业版RocketMQ是有明确支持的,实现方式未开源
消息幂等的必要性。
在互联网应用种,由器在网络不稳定的情况下,消息队列RocketMQ的消息有可能会出现重复,
这个重复简单可以概括为以下情况:
在互联网应用种,由器在网络不稳定的情况下,消息队列RocketMQ的消息有可能会出现重复,
这个重复简单可以概括为以下情况:
1.发送时消息重复
当一条消息已被成功发送到服务端并完成持久化,此时出现了网络闪断或者客户端宕机,
导致服务端对客户端应答失败。如果此时生产者意识到消息发送失败并尝试再次发送消息,
消费者后续会收到两条内容相同并且MessageId也相同的消息
当一条消息已被成功发送到服务端并完成持久化,此时出现了网络闪断或者客户端宕机,
导致服务端对客户端应答失败。如果此时生产者意识到消息发送失败并尝试再次发送消息,
消费者后续会收到两条内容相同并且MessageId也相同的消息
2.投递时消息重复。
消息消费的场景下,消息已投递到消费者并完成业务处理,当客户端给服务端反馈应答的时候
网络闪断,为了保证消息至少被消费一次,消息队列RocketMQ的服务端将在网络恢复后再次
投递之前已被处理过的消息,消费者后续会受到两条内容相同并且MessageId也相同的消息
消息消费的场景下,消息已投递到消费者并完成业务处理,当客户端给服务端反馈应答的时候
网络闪断,为了保证消息至少被消费一次,消息队列RocketMQ的服务端将在网络恢复后再次
投递之前已被处理过的消息,消费者后续会受到两条内容相同并且MessageId也相同的消息
3.负载均衡时消息重复(包括但不限于网络抖动、Broker重启亿级订阅应用重启)
当RocketMQ的Broker或客户端重启,扩容,缩容时,会触发Rebalance,此时消费
者可能会受到重复消息
当RocketMQ的Broker或客户端重启,扩容,缩容时,会触发Rebalance,此时消费
者可能会受到重复消息
处理方式
在RocketMQ是无法保证每个消息制备投递一次,所以要在业务上自行来保证消息消费的幂等性。
而要处理这个问题,RocketMQ的每条消息都有一个唯一的MessageId,这个参数在多次投递的过程
种是不会改变的,所以业务上可以用这个MessageId来作为判断幂等的关键依据。
但是,这个MessageId是无法保证全局唯一的,也会有冲突的情况,所以在一些对幂等要求严格
的场景,最好是使用业务上唯一的标识比较靠谱,例如订单id,而这个业务标识可以使用Message的key
来进行传递
而要处理这个问题,RocketMQ的每条消息都有一个唯一的MessageId,这个参数在多次投递的过程
种是不会改变的,所以业务上可以用这个MessageId来作为判断幂等的关键依据。
但是,这个MessageId是无法保证全局唯一的,也会有冲突的情况,所以在一些对幂等要求严格
的场景,最好是使用业务上唯一的标识比较靠谱,例如订单id,而这个业务标识可以使用Message的key
来进行传递
0 条评论
下一页