RocketMQ-1
2022-05-28 12:33:46 1 举报
RocketMQ机制,功能
作者其他创作
大纲/内容
OS线程
注意Consumer的类名:DefaultMQPushConsumerPush消费模式:Broker主动推送消息给消费者
SocketChannel
备份消息
系统C
系统E
生产者如何从NameServer拉取路由信息的?如何选择Broker机器建立连接以及发送消息的?
sendOneway:发送消息给MQ,然后代码就往下走了,根本不会关注MQ有没有返回结果,也不需要MQ返回的结果,无论发送的消息是成功还是失败,都不关心
消费者组
系统F
Master Broker
异步发送消息到RocketMQ
长连接发送消息
数据集合,不同类型的数据放入不同的Topic中
磁盘存储消息
线程1
分布式存储,Topic数据分散存储在多台Broker机器上
RocketMQ集群
获取路由信息
线程n
DLedger是如何基于Raft协议进行多副本同步
虚拟内存
劣势
PageCache
数据拷贝
线程2
如果Leader Broker挂了,此时剩下的两个Follower Broker就会重新发起选举,他们会基于DLedger采用的Raft协议的算法,选举出来一个新的Leader Broker继续对外提供服务,而且会对没有完成的数据同步进行一些恢复性的操作,保证数据不会丢失。新选举出来的Leader会把数据通过DLedger同步给剩下的一个Follower Broker。
ConsumeQueue文件是基于os cache来进行优化的
Kafaka、RabbitMQ和RocketMQ对比
RocketMQ是否实现读写分离?1、消费者在获取消息的时候先发送请求到Master2、Master返回一批消息给消费者3、Master返回消息给消费者的时候,会根据当时Master的负载情况和Slave Broker的同步情况,向消费者建议下次拉取消息的时候是从Master拉取还是从Slave拉取
MessageQueue 数据存储
优势
Slave Broker
在进行消息拉取的时候第一步:读os cache里的少量ConsumeQueue的数据,这个性能是极高的第二步:根据读取到的offset去CommitLog里读取消息的完整数据
程序对磁盘文件发起IO操作读取里面的数据到自己这来,会经过以下一个顺序(如左图):1、从磁盘上把数据读取到内核IO缓冲区里去2、再从内核IO缓存区里读取到用户进程私有空间中3、程序才能拿到这个文件里的数据
基于DLedger技术的Broker主从同步原理
数据同步会分为两个阶段,一个是 uncommitted阶段,一个是 commited阶段1、首先Leader Broker上的DLedger收到一条数据之后,会标记为 uncommitted状态,然后他会通过自己的 DLedgerServer组件把 uncommitted数据 发送给Follower Broker的 DLedgerServer2、接着Follower Broker的DLedgerServer收到uncommitted消息后,必须返回一个 ack 给Leader Broker的DLedgerServer,然后如果Leader Broker收到超过半数的Follower Broker返回ack之后,就会将消息标记为committed状态3、然后Leader Broker上的DLedgerServer就会发送commited消息给Follower Broker机器的DLedgerServer,让他们也把消息标记为comitted状态以上就是基于Raft协议实现的两阶段完成的数据同步机制
如果消费组中出现机器宕机或者扩容加机器,会怎么处理
为了读取磁盘文件里的数据,发生了两次数据拷贝这个就是普通的IO操作的一个弊端,必然涉及到两次数据拷贝操作,对磁盘读写性能是有影响的反之将数据写入到磁盘文件中,是逆向操作,也是发生两次数据拷贝
RocketMQ
使用最传统和基本的普通文件IO操作去进行磁盘文件的读写,那么会存在什么样的性能问题?答案:多次数据拷贝问题
端口
Reactor主线程
Broker
集群化部署
生产者
消费者如何获取消费?
Kafaka
消息
地址映射
适用场景
很关键的点:拉取消息的时候必然会先读取ConsumeQueue文件
在Broker中,对Topic下的每个MessageQueue都会有一系列的ConsumeQueue文件在Broker的磁盘上,会有下面这种格式的一系列文件:$HOME/store/consumequeue/{topic}/{queueId}/{fileName}可查看配置文件看存储位置,比如右图对存储在这台Broker机器上的Topic下的一个MessageQueue,他有很多的ConsumeQueue文件,这个ConsumeQueue文件里存储的是一条消息对应在CommitLog文件中的offset偏移量
秒杀活动主要涉及到的并发压力就是两块:一个是高并发的读,一个是高并发的写
2
系统B
消费者机器拉取到一批消息之后,就会将这批消息回调我们注册的一个函数
1、MQ功能强大2、完善的可视化管理工作台3、国内大中小公司落地实践案例多
心跳机制 Broker和NameServer建立TCP长连接Broker会每隔30s给所有的NameServer发送心跳,告诉每个NameServer自己目前还活着每次NameServer收到一个Broker的心跳,就会更新Broker的最近一次心跳的时间NameServer每隔10s检查各个Broker的最近一次心跳时间,如果某个Broker超过120s都没发送心跳,就认为这个Broker已经挂掉了Broker心跳的时候会汇报给NameServer自己的数据情况,这样每个NameServer都知道集群里有哪些Broker,每个Broker存放了哪些Topic的数据。Slave Broker也会向所有的NameServer进行注册Slave Broker也会向所有的NameServer每30s发送心跳
MessageQueue与消费者的关系
Master Broker什么时候会让消费者从Slave Broker拉取数据
写入PageCache
Push模式和Pull模式 对比
CommitLog是基于os cache+磁盘一起读取的
系统A
Topic数据分散存储在多台Broker机器上比如一个Topic里有1000万条数据,此时有2台Broker,那么就可以让每台Broker上都放500万条数据因为Dledger技术要求每个Master都至少带两个Slave来进行选举。故一个订单Topic的数据分布式存储在两个Master Broker上了
Worker 线程池
内存映射:把物理上磁盘文件的地址和用户进程私有空间的虚拟内存地址进行了一个映射。刚开始建立映射的时候,并没有任何的数据拷贝操作,其实磁盘文件还是停留在那里。PageCache:实际上在这里就是对应于虚拟内存注意:mmap技术在进行文件映射的时候,一般有大小限制,在1.5GB~2GB之间所以RocketMQ才让CommitLog单个文件在1GB,ConsumeQueue文件在5.72MB,不会太大这样限制了RocketMQ底层文件的大小,就可以在进行文件读写的时候,很方便的进行内存映射了
1、抗高并发方面较弱,吞吐量比较低2、erlang语言开发,不方便改造源码
如何让消息写入CommitLog文件性能接近内存写性能的
大数据领域的用户日志传输,高吞吐,允许数据丢失
在创建Topic的时候需要指定一个很关键的参数,就是MessageQueue简单来说,就是要指定Topic对应了多少个队列,也就是多少个MessageQueue
基于MQ实现秒杀订单系统的异步化架构以及精准扣减库存的技术方案
是否有数据?
集群模式:默认情况下是集群模式,也就是一个消费组获取到一条消息,只会交给组内的一台机器去处理广播模式:可以通过如下设置改变为广播模式consumer.setMessageModel(MessageModel.BROADCASTING);对于消费组获取到的一条消息,组内每台机器都可以获取到这条消息
NameServer
NameServer负责管理集群里所有Broker的信息,让使用MQ的系统可以感知到集群里有哪些Broker
Push和Pull模式都是采用消费端主动拉取的方式,即consumer轮询从broker拉取消息区别:Push方式:consumer把轮询过程封装了,并注册MessageListener监听器,取到消息后,唤醒MessageListener的consumeMessage()来消费,对用户而言,感觉消息是被推送过来的。实时性比较高,但是会增加broker的负载;而且消费端能力不同,如果push推送过快,消费端会出现很多问题Pull方式:取消息的过程需要用户自己写,首先通过打算消费的Topic拿到MessageQueue的集合,遍历MessageQueue集合,然后针对每个MessageQueue批量取消息,一次取完后,记录该队列下一次要取的开始offset,直到取完了,再换另一个MessageQueuefont color=\"#f44336\
Consumer类是DefaultMQPullConsumerPull消费模式:Broker不会主动推送消息给Consumer,而是消费者主动发送请求到Broker去拉取消息过来
转交请求处理消息写入
内核IO缓冲区
异步刷盘
1、根据CAP理论,同时最多只能满足两个点,而zookeeper满足的是CP, 也就是说zookeeper并不能保证服务的可用性,zookeeper在进行选举的时候, 整个选举的时间太长,期间整个集群都处于不可用的状态,而这对于-个注册中心来说肯定是不能接受的,作为服务发现来说就应该是为可用性而设计。2、基于性能的考虑,NameServer本身的实现非常轻量,而且可以通过增加机器的方式水平扩展,增加集群的抗压能力,而zookeeper的写是不可扩展的,而zookeeper要解决这个问题只能通过划分领域,划分多个zookeeper集群来解首先操作起来太复杂,其次这样还是又违反了CAP中的A的设计,导致服务之间是不连通的。
大中小公司的业务系统
例子:订单系统需要往MQ里发送订单消息,那么此时建一个Topic,名字可以叫做:topic_order_info,也就是一个包含了订单信息的数据集合。然后订单系统投递的订单消息都是进入到这个“topic_order_info”中,如果仓储系统要获取订单消息,那么可以指定从“topic_order_info”这里面去获取消息。
读取数据的时候,也仅仅发生了一次拷贝
转交请求进行一系列预处理
一个Topic的多个MessageQueue会均匀分摊给消费组内的多个机器去消费原则:一个MessageQueue只能被一个消费机器去处理,但是一台消费者机器可以负责多个MessageQueue的消息处理
Broker是基于OS操作系统的 PageCache 和 顺序写 两个机制,来提升写入CommitLog文件的性能的首先Broker是以顺序的方式将消息写入CommitLog磁盘文件的,对文件进行顺序写的性能要比对文件随机写的性能提升很多数据写入CommitLog文件的时候,其实不是直接写入底层的物理磁盘文件的,而是先进入OS的PageCache内存缓存中,然后后续由OS的后台线程,异步化的将OS PageCache内存缓冲中的数据刷入底层的磁盘文件所以在这样的优化之下,采用 磁盘文件顺序写+OS PageCache内存缓存写入+OS异步刷盘 的策略,基本上可以让消息写入CommitLog的性能跟直接写入内存里是差不多的,正是如此,才可以让Broker高吞吐的处理每秒大量的消息写入
建立TCP长连接
从磁盘文件里读取数据步骤(如下图):1、判断当前要读取的数据是否在PageCache里?2、如果在的话,就可以直接从PageCache里读取(比如刚写入CommitLog的数据还在PageCache里,此时Consumer消费是从PageCache里读取数据)3、如果PageCache里没有数据,那么此时就会从磁盘文件里加载数据到PageCache中,而且PageCache技术在加载数据的时,还会将加载的数据块的临近的其他数据块也一起加载到PageCache中
Leader Broker崩溃了怎么处理
异步:发送消息的时不会阻塞,会继续执行下面的别的代码,当MQ返回结果时,会回调你的函数
只有一次数据拷贝的过程就是从PageCache里拷贝到磁盘文件中这就是使用mmap技术之后,相比于传统磁盘IO的一个性能优化
NameServer支持集群化部署(保证高可用性)若NameServer只部署一台机器的话,一旦NameServer宕机了,会导致RocketMQ集群出现故障
Broker对磁盘文件的写入主要是借助直接写入os cache来实现性能优化的,因为直接写入os cache,相当于就是写入内存一样的性能,后续等os内核中的线程异步把cache中的数据刷入磁盘文件即可
RabbitMQ的吞吐量是比较低的,一般就是每秒几万的级别
一致性(Consistency):Name Server 集群中的多个实例,彼此之间是不通信的,这意味着某一时刻,不同实例上维护的元数据可能是不同的,客户端获取到的数据也可能是不一致的。可用性(Availability):只要不是所有NameServer节点都挂掉,且某个节点可以在指定之间内响应客户端即可。 分区容错(Partiton Tolerance):对于分布式架构,网络条件不可控,出现网络分区是不可避免的,只要保证部分NameServer节点网络可达,就可以获取到数据。具体看公司如何实施,例如:为了实现跨机房的容灾,可以将NameServer部署在不同的机房,某个机房出现网络故障,其他机房依然可用,当然Broker集群/Producer集群/Consumer集群也要跨机房部署。
offset等信息
假设每台机器抗10万并发,集群化部署,将几十万请求分散到多台机器本质上RocketMQ存储海量消息的机制就是分布式的存储
数据路由 NameServer负责去管理集群里所有Broker的信息独立部署在机器上,所有Broker会把自己注册到NameServer上解决生产者发送消息到哪台Broker,消费者从哪台Broker消费
创建连接
同步复制、异步复制
从CommitLog里读取消息完整数据是如何读取的?是从os cache里读取?还是从磁盘里读取?答案:两者都有
生产者到底如何发送消息的?(MessageQueue)
处理完这批消息之后,消费者机器就会提交目前的消费进度到Broker上去,然后Broker就会存储消费进度
...
Broker是如何持久化存储消息的
基于DLedger技术替换Broker的CommitLog
监听
Rebalance机制:将一个Topic下的多个队列(MessageQueue),在同一个消费者组下的多个消费者实例之间进行重新分配Rebalance机制本意是为了提升消息的并行处理能力例如,一个Topic下5个队列,在只有1个消费者的情况下,那么这个消费者将负责处理这5个队列的消息。如果此时增加一个消费者,那么可以给其中一个消费者分配2个队列,给另一个分配3个队列,从而提升消息的并行处理能力Rebalance限制:由于一个队列最多分配给一个消费者,因此当某个消费者组下的消费者实例数量大于队列的数量时,多余的消费者实例将分配不到任何队列。Rebalance危害: 消费暂停: 1、考虑在只有Consumer 1的情况下,其负责消费所有5个队列 2、在新增Consumer 2,触发Rebalance时,需要分配2个队列给其消费。 3、那么Consumer 1 就需要停止这2个队列的消费,等到这两个队列分配给Consumer 2后,这两个队列才能继续被消费。重复消费: 1、Consumer 2 在消费分配给自己的2个队列时,必须接着从Consumer 1之前已经消费到的offset继续开始消费。 2、然而默认情况下,offset是异步提交的,如consumer 1当前消费到offset为10,但是异步提交给broker的offset为8; 3、那么如果consumer 2从8的offset开始消费,就会有2条消息重复。也就是Consumer 2 并不会等待Consumer1提交完offset后,再进行Rebalance,因此提交间隔越长,可能造成的重复消费就越多。消费突刺: 由于rebalance可能导致重复消费,如果需要重复消费的消息过多;或者因为rebalance暂停时间过长,导致积压了部分消息。那么都有可能导致在rebalance结束之后瞬间可能需要消费很多消息
RabbitMQ
1
基于mmap技术+pagecache技术实现高性能的文件读写
基于DLedger替换各个Broker上的CommitLog管理组件,那么每个Broker上都有一个DLedger组件DLedger是基于 Raft协议 来进行Leader Broker选举的,那么Raft协议是如何进行多台机器的Leader选举的:1、发起一轮一轮的投票,通过三台机器互相投票选出来一个Broker作为Leader2、第一轮中三台Broker机器启动的时候,他们都会投票自己作为Leader,然后把这个投票发送给其他Broker,所以第一轮选举失败3、接着每个Broker会进入一个随机时间的休眠,比如Broker01休眠3秒,Broker02休眠5秒,Broker03休眠4秒。4、此时Broker01必然是先苏醒过来的,他苏醒过来之后,直接会继续尝试投票给自己,并且发送自己的选票给别人5、接着Broker03休眠4秒后苏醒过来,发现Broker01已经发送来了一个选票是投给Broker01,此时他自己因为没投票,所以会尊重别人的选择,就直接把票投给Broker01了,同时把自己的投票发送给别人6、接着Broker02苏醒,进行与Broker03类似的操作7、此时所有Broker都会收到三张投票,都是投给Broker01的,那么Broker01就会当选为Leader。其实只要有(3台机器 / 2) + 1个Broker投票给某个Broker,就会选举他当Leader总结:Raft协议中选举leader算法的简单描述:确保有Broker可以成为Leader的核心机制就是一轮选举不出来Leader的话,就让全体 随机休眠 一下,先苏醒过来的Broker会投票给自己,其他Broker苏醒过后发现自己收到选票了,就会直接投票给那个人。依靠这个随机休眠的机制,基本上几轮投票过后,一般都是可以快速选举出来一个Leader。只有Leader可以接收数据写入,Follower只能接收Leader同步过来的数据
从之前的讲解可知:生产者会跟NameServer进行通信获取Topic的路由数据所以生产者从NameServer中就会知道,一个Topic有几个MessageQueue,哪些MessageQueue在哪台Broker机器上,哪些MesssageQueue在另外一台Broker机器上生产者然后根据写入MessageQueue的策略,来发送到对应MessageQueue中
监听请求
Master-Slave模式采取的是Pull模式Slave Broker不停的发送请求到Master Broker去拉取消息
长连接接收消息
DLedger是如何基于Raft协议选举Leader Broker
传统文件IO操作的多次数据拷贝问题
同步刷盘与异步刷盘
磁盘文件
总结:消息零丢失是一把双刃剑,要想用好,还是要视具体的业务场景,在性能和消息零丢失上做平衡实际应用中,推荐把Master和Slave设置成ASYNC_FLUSH的异步刷盘方式,主从之间配置成SYNC_MASTER的同步复制方式,这样即使有一台机器出故障,仍然可以保证数据不丢。
秒杀系统的技术难点以及秒杀商品详情页系统的架构设计
Broker集群必须在多台机器上部署一个集群,而且还得用主从架构实现数据多副本存储和高可用
架构原理
MessageQueue是RocketMQ中非常关键的一个数据分片机制,他通过MessageQueue将一个Topic的数据拆分成很多个数据分片,然后在每个Broker机器上都存储一些MessageQueue。比如 假设一个Topic有1万条数据,并且有4个MessageQueue,2个Broker,那么大致可以认为每个MessageQueue放入2500条数据(MessageQueue中存放的数据量是根据消息写入MessageQueue的策略来定,这里假设平均分配)MessageQueue则存放在Broker上,也就是2个Broker,每个Broker放2个MessageQueue
基于mmap内存映射实现磁盘文件的高性能读写
生产者一定是投递消息到Master Broker的,然后Master Broker会同步数据给他的Slave Brokers,实现一份数据多份副本,保证Master故障的时候数据不丢失,而且可以自动把Slave切换为Master提供服务
RocketMQ底层对CommitLog、ConsumeQueue之类的磁盘文件的读写操作,基本上都会采用mmap技术来实现。如果具体到代码层面,就是基于JDK NIO包下的MappedByteBuffer的map()函数(底层就是基于mmap技术实现的),来先将一个磁盘文件(比如一个CommitLog文件或者是一个ConsumeQueue文件)映射到内存里来
因为CommitLog是用来存放消息的完整数据的,所以内容量是很大的,毕竟他一个文件就要1GB,所以整体完全有可能多达几个TB,所以os cache对于CommitLog而言,是无法把全部数据都放在里面os cache对于CommitLog而言,主要是提升文件写入性能,当不停的写入的时候,很多最新写入的数据都会先停留在os cache里,之后os会自动把cache里的比较旧的数据刷入磁盘里,腾出来空间给新写入的数据,所以大部分数据可能多达几个TB都是在磁盘上的结论:1、如果消费者机器一直快速的在拉取和消费处理,紧紧的跟上了生产者写入broker的消息速率,那么每次拉取几乎都是在拉取最近刚写入CommitLog的数据,那几乎都在os cache里(内存读取,性能很高)2、若跟跟不上生产者写入的速率,读取的是比较早之前写入CommitLog的数据,那些数据早就被刷入磁盘了,那么此时就只能从磁盘上的文件里读取了,这个性能是比较差一些的
在构造Producer的时候加入下面红框中的代码
为什么这套网络通信框架会是高性能以及高并发的
之前讲解提到,producer写入消息到broker之后,broker会将消息写入本地CommitLog磁盘文件中,然后ConsumeQueue会存储Topic下各个MessageQueue的消息的物理位置如果要让Broker实现高可用,那么必须有一个Broker组,里面有一个是Leader Broker写入数据,然后让Leader Broker接收到数据之后,直接把数据同步给其他的Follower Broker这样一条数据就会在三个Broker上有三份副本,此时如果Leader Broker宕机,那么就直接让其他Follower Broker自动切换为新的Leader Broker,继续接受客户端的数据写入就可以了
MQ作用:解耦、异步、削峰
Reactor 线程池
写入消息的磁盘物理位置,消息长度,tag hashcode信息
如果使用 同步刷盘模式 ,生产者发送一条消息,broker收到消息,必须直接强制把消息刷入底层物理磁盘文件中,然后才会返回ack给producer,此时才知道消息写入成功了。只要消息进入了物理磁盘上,那么除非是物理磁盘坏了导致数据丢失,否则正常来说数据就不会丢失了但是如果强制每次消息写入都要直接进入磁盘中,必然导致每条消息写入性能急剧下降,导致消息写入吞吐量急剧下降,但是可以保证数据不会丢失。
服务器中存储目录结构:$HOME/store/consumequeue/{topic}/{queueId}/{fileName}topic对应消息的topic,queueId对应messagequeue,fileName对应ConsumeQueue文件
线程3
写入消息到CommitLog文件 操作步骤(如下图):1、先把一个CommitLog文件通过MappedByteBuffer的map()函数映射其地址到虚拟内存地址2、接着对MappedByteBuffer执行写入操作,写入的时候会直接进入PageCache中3、过一段时间后,由os的线程异步刷入磁盘中
RocketMQ四个核心技术
对比消费者当前没有拉取消息的数量和大小,以及最多可以存放在os cache内存里的消息的大小如果没拉取的消息超过了最大能使用的内存的量,那么说明后续会频繁从磁盘加载数据,此时就让消费者从slave broker去加载数据。
RocketMQ几乎同时解决了Kafka和RabbitMQ的缺陷。RocketMQ的吞吐量很高,单机可以达到10万QPS以上,而且可以保证高可用性,性能很高,而且支持通过配置保证数据绝对不丢失,可以部署大规模的集群,还支持各种高级的功能,比如说延迟消息、事务消息、消息回溯、死信队列、消息积压,等等
Push消费模式
实际上类似RocketMQ、Kafka、RabbitMQ的消息中间件系统,他们不只是写入消息和获取消息那么简单,他们本身最重要的就是提供强大的数据存储能力,可以把亿万级的海量消息存储在自己的服务器的磁盘上。Broker数据存储实际上才是一个MQ最核心的环节,他决定了生产者消息写入的吞吐量,决定了消息不能丢失,决定了消费者获取消息的吞吐量。
每台Broker启动都要向所有的NameServer进行注册RocketMQ 4.5之前,Slave Broker用作同步数据,但是一旦Master故障了,Slave是没法自动切换成Master的RocketMQ 4.5之后,基于Dledger(基于Raft协议实现的一个机制)实现RocketMQ的高可用自动切换让一个Master Broker对应多个Slave Broker,一旦Master Broker宕机,通过Dledger技术和Raft协议算法进行leader选举,新的Master Broker对外提供服务
Broker高可用架构原理回顾
生产者发送消息的时候写入哪个MessageQueue?
发送单向消息到RocketMQ
Broker是如何将消息读取出来返回给消费机器的
在上述的 异步刷盘模式 下,生产者把消息发送给Broker,Broker将消息写入OS PageCache中,就直接返回ACK给生产者了但是这样会出现问题:如果生产者认为消息写入成功了,但是实际上那条消息此时是在Broker机器上的os cache中的,如果此时Broker直接宕机,那么os cache中的这条数据就会丢失所以异步刷盘的的策略下,可以让消息写入吞吐量非常高,但是可能会有数据丢失的风险
Broker收到一条消息,会把消息直接写入磁盘上的一个日志文件,叫做CommitLog,直接顺序写入这个文件CommitLog是很多磁盘文件,每个文件限定最多1GB,Broker收到消息之后就直接 追加写 入这个文件的末尾。如果一个CommitLog写满了1GB,就会创建一个新的CommitLog文件
同步发送消息到RocketMQ
CommitLog
发送消息和消费消息的模式
多个消费组订阅同一个Topic,生产者发送消息到这个Topic,进入Broker后哦,消费组都会拉取到这条消息,但是在一个消费组中,只有一台机器会获取到这条消息(集群模式)不同的系统应该设置不同的消费组,如果不同的消费组订阅了同一个Topic,对Topic里的一条消息,每个消费组都会获取到这条消息实现了负载均衡和容错性
CommitLog消息顺序写入机制
丢数据的原因:因为Kafka收到消息之后会写入一个磁盘缓冲区里,并没有直接落地到物理磁盘上去,所以要是机器本身故障了,可能会导致磁盘缓冲区里的数据丢失功能非常的单一,主要是支持发送消息给他,然后从里面消费消息,其他就没有什么额外的高级功能了。所以基于Kafka有限的功能,可能适用的场景并不是很多
SendMessage线程池
Pull消费模式
Producer
集群模式消费 vs 广播模式消费
同步:通过这行代码发送消息到MQ去,SendResult sendResult = producer.send(msg),然后会阻塞,要一直等待MQ返回一个结果
1、专门分配一个Reactor主线程出来,就是专门负责跟各种Producer、Consumer之类的建立长连接2、一旦连接建立好之后,大量的长连接均匀的分配给Reactor线程池里的多个线程3、每个Reactor线程负责监听一部分连接的请求,通过多线程并发的监听不同连接的请求,可以有效的提升大量并发请求过来时候的处理能力,可以提升网络框架的并发能力4、接着后续对大量并发过来的请求都是基于Worker线程池进行预处理的,当Worker线程池预处理多个请求的时候,Reactor线程还是可以有条不紊的继续监听和接收大量连接的请求是否到达5、最终的读写磁盘文件之类的操作都是交给业务线程池来处理的,当并发执行多个请求的磁盘读写操作的时候,不影响其他线程池同时接收请求、预处理请求
一 一对应的关系
官方文档相对较为简单
消费者被组合在一起并命名消费组举例,有a,b两台机器,消费组名字都是A,则这两台机器就同属一个消费者组在代码中设置消费组的方式:
1、天生设计允许丢失数据,保证高吞吐2、MQ功能过于简单3、可以实现数据零丢失,但是吞吐量下降
1、MQ功能强大2、完善的可视化管理工作台3、阿里、滴滴、网易等互联网大厂都在用4、高并发能力极强5、支持数据0丢失的配置6、Java语言开发,方便改造源码
1、Reactor主线程在端口上监听Producer建立连接的请求,建立长连接2、Reactor线程池并发的监听多个连接的请求是否到达3、Worker线程池并发的对多个请求进行预处理(比如SSL加密验证、编码解码、连接空闲检查、网络连接管理等)4、业务线程池(SendMessage线程池)并发的对多个请求进行磁盘读写业务操作
Broker主从架构以及多副本策略,Master Broker收到消息后同步给Slave Broker解决Master宕机导致消息丢失,但Slave有数据副本,可以保证数据不丢失,继续对外提供服务,保证MQ的可靠性和高可用性
预映射机制 + 文件预热机制
超高吞吐
用户进程私有空间
路由消息:集群里的Broker信息、Topic信息、系统发送消息或者获取信息到哪台Broker以及其他相关的数据信息生产者和消费者主动去NameServer拉取路由信息
RocketMQ 是如何基于Netty扩展出高性能网络通信架构的
Broker针对上述的磁盘文件高性能读写机制做的一些优化:内存预映射机制:Broker会针对磁盘上的各种CommitLog、ConsumeQueue文件预先分配好MappedFile,提前对一些可能接下来要读写的磁盘文件,提前使用MappedByteBuffer执行map()函数完成映射,这样后续读写文件的时候,就可以直接执行了文件预热:在提前对一些文件完成映射之后,因为映射不会直接将数据加载到内存中,那么后续在读取尤其是CommitLog、ConsumeQueue的时候,其实有可能会频繁的从磁盘里加载数据到内存中。所以其实在执行完map()函数之后,会进行madvise系统调用,就是提前尽可能多的把磁盘文件加载到内存中通过上述优化,才真正能实现一个效果,就是写磁盘文件的时候都是进入PageCache的,保证写入高性能;同时尽可能多的通过map + madvise的映射后预热机制,把磁盘文件里的数据尽可能多的加载到PageCache里来,后续对CosumeQueue、CommitLog进行读取的时候,才能尽可能从内存里读取数据。
DLedger技术可以做什么:基于DLedger技术来实现Broker高可用架构,实际上就是用DLedger先替换掉原来Broker自己管理的CommitLog,由DLedger来管理CommitLogBroker还是可以基于DLedger管理的CommitLog去构建出来机器上的各个ConsumeQueue磁盘文件
如果某个Broker临时出现故障了,比如Master Broker挂了,此时正在等待的其他Slave Broker自动热切换为Master Broker,那么这个时候对这一组Broker就没有Master Broker可以写入了。建议在Producer中开启一个开关,就是sendLatencyFaultEnable这个开关有一个自动容错机制,比如某次访问一个Broker发现网络延迟有500ms,然后还无法访问,那么就会自动回避访问这个Broker一段时间,比如接下来3000ms内,就不会访问这个Broker了。就可以避免一个Broker故障之后,短时间内生产者频繁的发送消息到这个故障的Broker上去,出现较多次数的异常,过一段时间再去访问这个Broker
为什么RocketMQ不使用Zookeeper作为注册中心呢?RocketMQ的架构设计决定了只需要一个轻量级的元数据服务器就足够了,只需要保持最终一致,而不需要Zookeeper这样的强一致性解决方案,不需要再依赖另一个中间件,从而减少整体维护成本。根据CAP理论,RocketMQ在名称服务这个模块的设计上选择了AP,而不是CP
ConsumeQueue文件
长连接,注册,心跳
ConsumeQueue会被大量的消费者发送的请求给高并发的读取,所以ConsumeQueue文件的读操作是非常频繁的,而且同时会极大的影响到消费者进行消息拉取的性能和消费吞吐量Broker机器的磁盘上的大量的ConsumeQueue文件,在写入的时候也都是优先进入os cache中的;而且os自己有一个优化机制,就是读取一个磁盘文件的时候,会自动把磁盘文件的一些数据缓存到os cache中ConsumeQueue文件主要是存放消息的offset,所以每个文件很小,30万条消息的offset就只有几兆而已。所以实际上ConsumeQueue文件们是不占用多少磁盘空间的,几乎可以完全被os缓存在内存cache里所以实际上在消费者机器拉取消息的时候,第一步大量的频繁读取ConsumeQueue文件,几乎可以说就是跟读内存里的数据的性能是一样的,通过这个就可以保证数据消费的高性能以及高吞吐
如果某个Broker出现故障该怎么办?
缺点
集群部署
1、在前端/客户端设置秒杀答题/验证码,错开大量人下单的时间,阻止作弊器刷单 2、独立出来一套秒杀系统,专门负责处理秒杀请求 3、优先基于Redis进行高并发的库存扣减,一旦库存扣完则秒杀结束 4、秒杀结束之后,Nginx层过滤掉无效的请求,大幅度削减转发到后端的流量 5、瞬时生成的大量下单请求直接进入RocketMQ进行削峰,订单系统慢慢拉取消息完成下单操作
MQ的核心数据模型:Topic
中小型公司业务系统适用,无超高并发场景,无需改造源码
消费者是根据什么策略从Master或Slave上拉取消息的
秒杀活动的第一个问题就是大量用户同一时间频繁的访问 同一个秒杀商品的页面方案:页面数据静态化 + 多级缓存页面数据静态化:首先需要将秒杀活动的商品详情页里的数据做成静态化的,也就是提前从数据库里把页面需要的数据都提取出来组装成一份静态数据放在别的地方,避免每次访问页面都要访问后端数据库多级缓存:CDN + Nginx + Redis1、秒杀商品详情页的数据,首先会放一份在离用户地理位置比较近的CDN上,不同地方的用户在加载这个秒杀商品的详情页数据时,都是从就近的CDN上加载的,不需要每次请求都发送到公司的机房中。2、若缓存过期等问题,导致CDN上没有用户要加载的商品详情页数据,此时用户就会发送请求到机器中加载商品的数据了,这个时候需要在Nginx服务器里做一级缓存。 在Nginx中是可以基于Lua脚本实现本地缓存的,提前把秒杀商品详情页的数据放到Nginx中进行缓存,如果请求发送过来,可以从Nginx中直接加载缓存数据,不需要把请求转发到商品系统中。3、若Nginx中没有数据,此时就可以由Nginx中的Lua脚本发送请求到Redis集群中去加载提前放进去的秒杀商品数据。4、若Redis中没有数据,那么就由Nginx中的Lua脚本直接把请求转发到商品详情页系统里去加载就可以了,此时就会直接从数据库中加载数据出来。一般来说数据是可以从CDN、Nginx、Redis中加载到的,可能只有极少的请求会直接访问到商品系统去从数据库里加载商品页数据
消费者机器如何处理消息、进行ACK以及提交消费进度
大量系统往RcoketMQ中高并发写入数据,若每秒有几十万请求
系统D
消费者是如何获取消息处理以及进行ACK的
1、根据要消费的MessageQueue以及开始消费的位置2、去找到对应的ConsumeQueue读取里面对应位置的消息在CommitLog中的物理offset偏移量3、到CommitLog中根据offset读取消息数据,返回给消费者机器
Topic、MessageQueue以及Broker之间是什么关系?
0 条评论
下一页