消息队列MQ汇总
2022-05-13 09:40:24 3 举报
AI智能生成
消息队列MQ汇总
作者其他创作
大纲/内容
消息队列综述
消息队列简介、什么是消息队列
消息队列是一个容器,可以对程序产生的消息进行存储。
MQ(Message Queue)是一种跨进程的通信机制,用于传递消息。
通俗点说,就是一个先进先出的数据结构。
子主题
消息队列的主要用途
(削峰、异步、解耦)
用一个实际场景来解释下
有一家果汁生产企业
张三是采购员,负责采购水果;
李四、赵五是配送员,分别负责将苹果、香蕉配送到生产车间。
削峰(消息队列是一个仓库)
传统模式下,张三采购完成,回到公司后,联系李四、赵五配送采购的水果。
但是随着公司业务量大增,张三一次性采购的水果,李四、赵五得需要几天才能配送完。
所以需要一个仓库,张三采购完成直接放到仓库里,李四、赵五慢慢从仓库取出配送。
此处的仓库就是消息队列,张三是采购消息的生产者,李四、赵五是消费者。
当生产的消息太多时,可以使用队列削峰,这样消费者可以慢慢处理消息。
异步(不必等待消费者消费)
传统模式下,张三采购完成后,需要等待李四、赵五来取,实际上极大浪费了张三的时间。
如果直接放入仓库,可以不必等待,直接进行下面的工作。
也就是说,张三与李四、赵五的工作是异步的,减少了等待时间。
解耦(不用管消费者的事情)
之前张三采购完成后,有责任通知李四、赵五来取。
万一李四、赵五忘带手机,张三还得联系领导协调处理,说实话张三就是个大老粗,整天为这些破事烦得不行。
如果直接放入仓库,张三根本不用管李四、赵五的事情,感觉愉快极了。
张三与李四、赵五的工作不再互相依赖,都变得更加简单了,这就是解耦。
异步解耦
最常见的一个场景是用户注册后,需要发送注册邮件和短信通知,以告知用户注册成功。
异步解耦是消息队列 MQ 的主要特点,主要目的是减少请求响应时间和解耦。
主要的使用场景就是将比较耗时而且不需要即时(同步)返回结果的操作作为消息放入消息队列。
同时,由于使用了消息队列MQ,只要保证消息格式不变,消息的发送方和接收方并不需要彼此联系,也不需要受对方的影响,即解耦合。
传统的做法如下:
传统的做法如下:
此架构下注册、邮件、短信三个任务全部完成后,才返回注册结果到客户端,用户才能使用账号登录。
但是对于用户来说,注册功能实际只需要注册系统存储用户的账户信息后,该用户便可以登录,而后续
的注册短信和邮件不是即时需要关注的步骤。
但是对于用户来说,注册功能实际只需要注册系统存储用户的账户信息后,该用户便可以登录,而后续
的注册短信和邮件不是即时需要关注的步骤。
改善后的做法
所以实际当数据写入注册系统后,注册系统就可以把其他的操作放入对应的消息队列 MQ 中然后马上返
回用户结果,由消息队列 MQ 异步地进行这些操作。
回用户结果,由消息队列 MQ 异步地进行这些操作。
架构图
流量削峰
流量削峰也是消息队列 MQ 的常用场景,一般在秒杀或团队抢购(高并发)活动中使用广泛。
在秒杀或团队抢购活动中,由于用户请求量较大,导致流量暴增,秒杀的应用在处理如此大量的访问流
量后,下游的通知系统无法承载海量的调用量,甚至会导致系统崩溃等问题而发生漏通知的情况。为解
决这些问题,可在应用和下游通知系统之间加入消息队列 MQ。
量后,下游的通知系统无法承载海量的调用量,甚至会导致系统崩溃等问题而发生漏通知的情况。为解
决这些问题,可在应用和下游通知系统之间加入消息队列 MQ。
流量削峰
秒杀处理流程如下所述:
1. 用户发起海量秒杀请求到秒杀业务处理系统。
2. 秒杀处理系统按照秒杀处理逻辑将满足秒杀条件的请求发送至消息队列 MQ。
3. 下游的通知系统订阅消息队列 MQ 的秒杀相关消息,再将秒杀成功的消息发送到相应用户。
4. 用户收到秒杀成功的通知。
2. 秒杀处理系统按照秒杀处理逻辑将满足秒杀条件的请求发送至消息队列 MQ。
3. 下游的通知系统订阅消息队列 MQ 的秒杀相关消息,再将秒杀成功的消息发送到相应用户。
4. 用户收到秒杀成功的通知。
为啥要用消息系统 ?为什么需要消息队列?为什么要用MQ
灵活性&峰值处理能力(削峰)
削峰
而不会因为突发的超负荷的请求而完全奔溃
异步通信
异步
消息队列提供了异步处理机制,允许把一个消息放入队列中,当时不处理,等想要处理的时候才处理
解耦
解耦
基于统一的数据的接口,生产者和消费者可以独立的扩展或者修改两边的处理过程
冗余
消息队列把数据进行持久化直到它们已经完全处理,防止数据处理过程中数据丢失
扩展性
消息队列解耦了你的处理过程,可以灵活的增大消息的入队和处理
可恢复性
消息队列降低了进程间的耦合度,但一部分组件失效,不会影响整个系统,挂掉的进程仍然可以在恢复队后处理队列中的消息
送达保证
消息队列提供的冗余机制保证了消息能被实际的处理,只要一个进程读取了该队列即可,“通过”消息预定的方式“保证”只送达一次
顺序保证
消息队列本来就是排序的,并且能够保证数据按照特定的顺序来处理。
大部分消息系统通过FIFO的顺序来处理
缓冲
消息队列的缓冲能够高效的执行写入队列的处理而不受从队列读的预备处理的约束。
缓冲有利于控制和优化数据流经过系统的速度
理解数据流
通过消息队列消息被处理的频率来辅助确定表现不佳的处理过程
常见消息队列
消息队列带来的问题
可用性降低
复杂度提高
Kafka
Kafka诞生、简介、基本概念
背景/解决问题
解决大数据各个子系统之间的数据的高性能、低延迟的不停流转
解决在线应用(消息)和离线应用(数据文件,日志)大规模的数据处理。作为一个沟通生产者produce和消费者消费Consume(处理分析)的桥梁
Kafka诞生、简介
Apache Kafka
之后成为Apache项目的一部分
目前是Apache的开源项目
2010贡献给Apache基金会称为顶级开源项目
Apache下的一个子项目
Kafka是Apache下的一个子项目
LinkedIn公司
最初由LinkedIn公司开发
Scala
用Scala语言编写
使用Scala语言编写
日志服务
提交日志服务
消息系统
消息队列系统
消息队列系统
分布式的
设计内在就是分布式的
分布式
基于发布和订阅的
发布-订阅
发布/订阅
基于发布/订阅的
Publish/Subscribe
可分区的
分区的
可划分的
可复制的
持久性的
高性能
高性能
跨语言
跨语言
快速
可扩展的
高吞吐量的
冗余备份的
相对于ActiveMQ是一个非常轻量级的消息系统
基于hadoop的批处理系统
用于处理活跃的流式数据
它主要用于处理流式数据。
Kafka需要结合ZooKeeper使用
kafka中的 zookeeper 起到什么作用,可以不用zookeeper么 ?
一个分布式,支持分区(partition),多副本的(replica),基于zookeeper协调的分布式消息系统
Kafka用到了zookeeper,所有首先启动zookeeper服务,再启动Kafka
可以在命令的结尾加个&符号,这样就可以启动后离开控制台。【守护进程】
保存着meta数据
集群broker、topic、partition等
另外,还负责broker故障发现,partition leader选举,负载均衡等功能
zookeeper 是一个分布式的协调组件
早期版本
kafka用zookeeper做meta信息存储
consumer的消费状态,group的管理以及 offset的值
新版本
考虑到zk本身的一些因素以及整个架构较大概率存在单点问题
新版本中逐渐弱化了zookeeper的作用
新的consumer使用了kafka内部的group coordination协议,也减少了对zookeeper的依赖,
但是broker依然依赖于ZK,zookeeper 在kafka中还用来选举controller 和 检测broker是否存活等等。
Kafka的各种版本情况
Apache
社区版
Confluent
集群监控、跨数据中心备份
Cloudera/Hortonworks
云版本,配合云使用
kafka开发环境搭建
maven项目
org.apache.kafka
kafka_2.10
0.8.0 version>
参数配置
package com.sohu.kafkademon;
public interface KafkaProperties
{
final static String zkConnect = "10.22.10.139:2181";
final static String groupId = "group1";
final static String topic = "topic1";
final static String kafkaServerURL = "10.22.10.139";
final static int kafkaServerPort = 9092;
final static int kafkaProducerBufferSize = 64 * 1024;
final static int connectionTimeOut = 20000;
final static int reconnectInterval = 10000;
final static String topic2 = "topic2";
final static String topic3 = "topic3";
final static String clientId = "SimpleConsumerDemoClient";
}
producer,consumer等
下载解压
https://www.apache.org/dyn/closer.cgi?path=/kafka//kafka_.tgz
tar -xzf kafka_2.9.2-0.8.1.1.tgz
cd kafka_2.9.2-0.8.1.1
启动服务
Kafka用到了Zookeeper,所有首先启动Zookper。
bin/zookeeper-server-start.sh config/zookeeper.properties
启动kafka
bin/kafka-server-start.sh config/server.properties
测试安装
创建topic
创建一个叫做“test”的topic,它只有一个分区,一个副本。
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
通过list命令查看创建的topic
bin/kafka-topics.sh --list --zookeeper localhost:2181
发送消息
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
启动consumer
Kafka也有一个命令行consumer可以读取消息并输出到标准输出:
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
大致步骤
Kafka,将消息以Topic主题为单位进行归纳。
将向Kafka topic发布消息的程序,称为producers.
将订阅topics,并消费消息的程序,称为consumer.
Kafka以集群的方式运行,可以由一个或多个服务组成,每个服务叫做一个broker.
producers通过网络,将消息发送到Kafka集群,集群向消费者提供消息
kafka架构
架构
producer,broker,consumer关系图
Kafka的整体架构非常简单,是显式分布式架构,producer、broker(kafka)和consumer都可以有多个。
Producer,consumer实现Kafka注册的接口,数据从producer发送到broker,broker承担一个中间缓存和分发的作用。
broker分发注册到系统中的consumer。broker的作用类似于缓存,即活跃的数据和离线处理系统之间的缓存。
消息发送图
流程图
图解Kafka
图解Kafka组件之间的关系
图解Kafka组件之间的关系
图解Kafka
Kafka的实际应用/使用场景/适用场景
消息服务、消息系统、消息中间件、消息队列
分布式消息引擎
集群
消息中间件
低延迟
高可用
消息服务
(Messaging)
消息
异步通信
峰值压力缓冲
系统之间解耦合
降低系统组网复杂度
降低编程复杂度
建立实时数据流管道 从而可靠的在系统或程序间共享数据
有更好的吞吐量,内置的分区,冗余以及容错性,这让kafka成为一个很好的大规模消息处理应用的解决方案
对于一些常规的消息系统,kafka是个不错的选择
partitons/replication和容错,可以使kafka具有良好的扩展性和性能优势
kafka并没有提供JMS中的企业级特性
事务性
消息传输担保(消息确认机制)
消息分组
kafka只能使用作为"常规"的消息系统
在一定程度上,尚未确保消息的发送与接收绝对可靠
消息重发
消息发送丢失
信息监控
监控数据处理
作为操作记录的监控模块来使用,即汇聚记录一些操作信息,可以理解为运维性质的数据监控
monitor 监控
监控
kafka-consumer-groups 脚本
关注 LAG
Kafka Java Consumer API
Kafka JMX 监控指标
records-lag-max
records-lead-min
lead 越小,可能要丢消息,kafka 会定期清理数据
日志聚合
性能
低延迟
高可用
日志整合(Log Aggregation)
kafka的特性决定它非常适合作为"日志收集中心"
application可以将操作日志批量异步的发送到kafka集群中,而不是保存在本地或者DB中
kafka可以批量提交消息/压缩消息等,这对producer端而言,几乎感觉不到性能的开支
此时consumer端,可以使hadoop等其他系统化的存储和分析系统.
日志收集
文件的细节,将其更清晰地抽象成一个个日志或事件的消息流。
这让kafka处理过程延迟更低,更容易支持多数据源和分布式处理。
相对于Flume,kafka提供同样高效的性能和因为复制导致的更高的耐用性保证,以及更低端到端延迟
持久性日志(Commit Log)
Kafka可以为一种外部的持久性日志的分布式系统提服务。
这种日志可以在节点间备份数据,并为故障点数据恢复提供一种重新同步的机制。
kafka中日志压缩功能为这种用法提供了条件。
在这种用法中,kafka类似于Apache BookKeeper项目
持久化队列,像日志简单的读取好追加到文件尾
所有操作O1
无性能代价无限量磁盘空间
保存消息时间长
存储
写入持久化日志,不需要立刻刷新磁盘
以pagecache为中心的设计风格
事件源(Event Sourcing)
事件源是一种应用程序设计的方式,该方式的状态转移被记录为按时间顺序排序的记录序列。
Kafka可以存储大量的日志数据,使得它成为一个对这种方式的应用来说绝佳的后台,比如动态汇总New feed
存储系统
消息数据写到磁盘和备份分区
磁盘结构很好扩展
与数据量无关
特殊的分布式文件系统
高性能
低延迟
只能自备份和自我复制
行为跟踪、网站活性跟踪
访问日志
网站活动跟踪
实时处理
实时监控
数据量大
跟踪用户的浏览页面,搜索及其它行为,
以发布-订阅的方式实时记录到对应的topic里,这些结果被订阅者拿到后,可进一步实时处理或监控,或放到hadoop/离线数仓中处理
各大公司一般把kafka用在用户行为日志的采集和传输上,比如大数据团队要收集APP上用户的一些日志,这种日志就是用kafka来收集和传输的,因为这种日志适当丢失数据也没有关系,而且一般量特别大,要求吞吐量高,一般就是收发消息,不需要太多高级功能。所以比较适合这种场景。
作为"网站活性跟踪"的最佳工具
可以将网页/用户操作等信息发送到kafka中
实时监控
离线统计分析
活性跟踪(Website activity tracking)
大数据处理的消息系统/大数据场景
在大数据中,使用了大量的数据。 关于数据,我们有两个主要挑战。
第一个挑战是如何收集大量的数据,第二个挑战是分析收集的数据。
为了克服这些挑战,您必须需要一个消息系统。
Kafka专为分布式高吞吐量系统而设计。 Kafka往往工作得很好,作为一个更传统的消息代理的替代品。
与其他消息传递系统相比,Kafka具有更好的吞吐量,内置分区,复制和固有的容错能力,这使得它非常适合大规模消息处理应用程序。
适用于需要高吞吐量,允许适当的消息丢失,不需要太高级功能|日志采集|标杆
支持Hadoop数据并行加载,对于像Hadoop的一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。
Kafka通过Hadoop的并行加载机制统一了在线和离线的消息处理。
kafka对于可扩展的数据持久性的支持,非常适合向hadoop或者数仓中进行数据装载
流处理,数据ETL的粘合剂
在各个App,流处理,数据ETL(Extract Transform and Load)等应用服务中,担任着“粘合剂”的角色。
分布式流平台
发布,订阅记录流
消息队列
企业消息系统
持久化接受到的记录流
处理接收到的记录流
可以实时的处理大量的数据满足各种需求场景
构建实时流应用程序,能够变换或者对数据进行相应处理
允许应用程序充当流处理器
数据流处理
流处理实时数据流
从输入的主题联系获取数据流
进行一系列处理
产生联系的数据流到输出主题
从一个或多个主题中获取输入流,并产生一个或多个输出流,能够有效的变输入流为输出流
storm/spark流式处理引擎
流式处理场景可能比较多
批量消费
因此可用于批量消费,例如ETL,以及实时应用程序。
例如ETL,以及实时应用程序。
kafka对于可扩展的数据持久性的支持,非常适合向hadoop或者数仓中进行数据装载
图解使用场景
图解使用场景
图解应用场景
Kafka的设计目标、特征特点、独特设计、核心设计要点、选用原因
目标
支持分区,分布式,实时数据处理数据源并创建新的数据源,并保证容错性
【高吞吐率】高吞吐,低延迟
高吞吐率
吞吐量
高吞吐
吞吐量高
吞吐量极高
同时为发布和订阅,提供高吞吐量
高吞吐是kafka核心目标之一
低延迟的实时系统
它的延迟最低只有几毫秒
10W/s的吞吐速率
在一台普通的服务器,也可以达到10W/s的吞吐速率;
kafka每秒可以处理几十万条消息,
常规机器配置,一台机器可达每秒十几万的QPS,相当强悍。
即使在非常廉价的商用机器上,也能做到单机支持每秒100K条以上消息的传输。
据了解,Kafka每秒可以生产约25万消息(50 MB),每秒处理55万消息(110 MB)。
同步异步
producer采用异步push方式,极大提高kafka系统的吞吐率
可以通过参数控制采用同比和异步
pull&push模式
kafka中producer和consumer采用的是pull&push模式,
即producer只管向broker push消息,consumer只管从broker pull消息,
两者对消息的生产和消费是异步的
【冗余与持久化】快速消息持久化
为什么需要持久化?
有些情况下,处理数据的过程会失败。
除非数据被持久化,否则将造成丢失。
消息队列把数据进行持久化,直到它们已经被完全处理,通过这一方式规避了数据丢失风险。
"插入-获取-删除"范式
许多消息队列所采用的"插入-获取-删除"范式中,
在把一个消息从队列中删除之前,需要你的处理系统明确的指出该消息已经被处理完毕,
从而确保你的数据被安全的保存直到你使用完毕。
持久化到磁盘
将消息持久化到磁盘
消息不会存储在内存中,直接写入磁盘中
可进行持久化操作,将消息持久化到磁盘
消息持久化到磁盘,且支持数据备份防止数据丢失
把消费持久化到本地文件系统中,并保持极高的效率
数据磁盘持久化:
消息不在内存中cache,直接写入磁盘,充分利用磁盘的顺序读写性能
常数时间复杂度
以时间复杂度为O(1)的方式,提供消息持久化能力
可以在O(1)的系统开销下进行消息持久化;
即使对TB级以上数据也能保证常数时间复杂度的访问性能。
性能是常量级:O(1)
物理存储
每个分区分段存储,可以按时间和文件大小分段,
例如按每天或者文件大小超过1G进行分段
分区索引:索引把偏移量映射到片段文件和偏移量在文件的位置
kafka对于可扩展的数据持久性的支持,非常适合向hadoop或者数仓中进行数据装载
【数据持久化】在6块7200转的SATA RAID-5磁盘阵列的线性写速度差不多是600MB/s,但是随即写的速度却是100k/s,差了差不多6000倍。
通过将数据持久化到硬盘以及replication,防止数据丢失。
【消息有效期】会长久保留其中的消息,以便consumer可以多次消费,当前其中很多细节可以配置
自己不丢数据,默认每隔7天清理数据
日志追加
高吞吐量
日志段
定时删除
【可扩展性】分布式,高可用、可扩展性、易扩展
可扩展的持久化能力
直接导数据
失败数据不丢失 重新处理
高可用
被设计为一个分布式系统,易于向外扩展
Broker、Producer、Consumer都原生自动支持分布式,自动实现负载均衡;
【完全的分布式系统】
kafka集群支持热扩展
分布式系统,易于向外扩展。
不需要改变代码、不需要调节参数。
扩展就像调大电力按钮一样简单。
性能高,基本上发送给kafka都是毫秒级的性能。可用性很高。
所有的producer、broker和consumer都会有多个,均为分布式的。
因为消息队列解耦了处理过程,所以增大消息入队和处理的频率是很容易的,只要另外增加处理过程即可。
kafka运行在一个或多个服务器集群cluster上
无需停机即可扩展机器。
当需要增加broker节点时,新增的broker会向zookeeper注册,
而producer以及consumer会根据注册在zookeeper上的watcher感知这些变化,并及时作为调整
【消息顺序传输】更强的消息顺序能力 与 消息状态
顺序保证
在大多使用场景下,数据处理的顺序都很重要。
大部分消息队列本来就是排序的,并且能保证数据会按照特定的顺序来处理。
Kafka保证一个Partition内的消息的有序性。
【消息顺序传输】
支持Kafka Server间的消息分区,及分布式消费
同时保证每个Partition内的消息顺序传输。
并行性分区主题
每个分区指定给消费者群众的一个消费者
消费者群中的消费者数多于分区数时 闲置
消息状态
消息的状态被保存在consumer中,只记录一个offset值,
指向partition中下一个要被消费的消息位置
如果consumer处理不好,broker上的一个消息可能会被消费多次
【生产者消费者】生产消费者模式、拉取系统、支持高并发
高并发
支持数千个客户端同时读写
多partition的设计消息被处理的状态是在consumer端维护,而不受有server端维护,
当失败是能自动平衡
拉取系统
因为kafka broker会持久化数据,broker没有内存压力,consumer非常适合pull的方式消费数据
简化kafka设计
consumer根据消费能力自主控制消息拉取速度
consumer根据自身情况自主选择消费模式,比如批量、重复消费、从尾端开始消费
【多订阅者】
支持多订阅者
当失败时,能自动平衡消费者;
【容错性】容错性、容灾、可恢复性
容灾核心是日志复制
维护一个同步备份的集合
当数据同步到所有节点时候提交
每个节点都是可以作为leader
可以容忍n-1个备份不可用
客户的决定是否提交
不需要故障节点回恢复数据
producer写数据时候可以选择等待几个节点相应
消息被处理的状态是在consumer端维护,而不是由server端维护。
当失败时能自动平衡。
允许集群中节点失败
系统的一部分组件失效时,不会影响到整个系统。
消息队列降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理。
【轻量级的消息系统】
Apache Kafka,相对于ActiveMQ是一个非常轻量级的消息系统
除了性能非常好之外,还是一个工作良好的分布式系统。
集群
支持集群化部署
可支持集群部署,部分机器宕机可以继续运行。
集群关系
Kafka使用ZooKeeper来维护群成员信息。
每个broker都有一个唯一的id,broker启动时,会创建临时节点将自己id注册到ZooKeeper的/brokers/ids路径。
Kafka组件,订阅ZooKeeper的/brokers/ids路径,当有broker加入或者退出时,这些组件会得到通知
移除broker
当控制器发现某个broker离开集群后,会为分区(首领恰好在改broker上的分区)选举出新的首领,然后将新首领信息发送给相关broker
复制
首领副本
每个分区只有一个首领副本,所有的消费者和生产者请求都经过这个副本
跟随者副本
首领以外的副本都是跟随者副本
跟随者副本不处理任何请求,只保持与首领副本的同步,
当首领副本不可用时,其中一个同步的副本会成为新的首领副本。
同步副本
与首领副本同步的跟随者副本
同步条件:
10S内有请求最新数据的请求
消费者只能获取已经写入所有同步副本的消息
首选首领
首选首领在各broker是均衡分布的
创建主题时选定的首领就是分区的首领
首选首领在各broker是均衡分布的
消息可靠性
事务 transaction
跨会话、跨分区幂等
消息原子提交到分区
生产者的事务能够保证一条消息仅仅会保存在kafka的某一个分区上,不会出现在多个分区上,另外,能够保证多条消息原子性的发送到多个分区。
也就是说它只保证了从producer端到broker端消息不丢失不重复。
但对于consumer端,由于偏移量的提交和消息处理的顺序有前有后,依然可能导致重复消费或者消息丢失消费,如果要实现消费者消费的精确一次,还需要通过额外机制在消费端实现偏移量提交和消息消费的事务处理。
幂等 Idempotence
单分区幂等
单会话幂等
三种消息策略、语义 level
最多一次(at most once)
至少一次(at least once)
精确一次(exactly once)
生产者
设置ack=all
使用producer.send(msg, callback) 或者使用同步方式发送消息
设置retries 次数
消费者
enable.auto.commiot=false,确保消息消费完成再提交
Broker
设置unclean.leader.election.enable = false 分区落后太多不允许竞选为leader
replication.factor >= 3
min.insync.replicas > 1
确保 replication.factor > min.insync.replicas
可靠性
可靠性保证
Kafka可以保证分区消息的顺序
只有当消息被写入分区的所有同步副本时,才会被认为是已提交的
只要有一个副本活跃,已提交的信息不会丢失
消费者只能读取已提交的信息
可靠性保障
复制
Kafka的复制机制和分区的多副本架构是可靠性保证的核心
配置
replication.factor
复制因子,指定主题分区的副本个数
unclean.leader.election
集群范围配置
是否允许不完全分区首领选举,即是否允许不同步的跟随副本称为首领副本
是否允许不完全分区首领选举,即是否允许不同步的跟随副本称为首领副本
min.insync.replicas
最小同步副本
当同步副本个数小于最小同步副本时,分区拒绝写入消息,直到同步副本个数恢复至大于等于最小同步副本数
当同步副本个数小于最小同步副本时,分区拒绝写入消息,直到同步副本个数恢复至大于等于最小同步副本数
健壮性
消息队列可以堆积请求,所以消费端业务即使短时间死掉,也不会影响主要业务的正常进行。
冗余
可以采用一对多的方式,一个生产者发布消息,可以被多个订阅topic的服务消费到,供多个毫无关联的业务使用。
异步通信
很多时候,用户不想也不需要立即处理消息。
消息队列提供了异步处理机制,允许用户把一个消息放入队列,但并不立即处理它。
想向队列中放入多少消息就放多少,然后在需要的时候再去处理它们。
通信方式
TCP通信
很多时候,用户不想也不需要立即处理消息。
消息队列提供了异步处理机制,允许用户把一个消息放入队列,但并不立即处理它。
想向队列中放入多少消息就放多少,然后在需要的时候再去处理它们。
提高parallelism
解耦和扩展性
解耦
项目启动初期来预测将来项目会碰到什么需求,是极其困难的。
消息系统,在处理过程中间,插入了一个隐含的、基于数据的接口层,两边的处理过程都要实现这一接口。
这允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束。
扩展性
项目开始时,并不能确定具体需求。
消息队列可以作为一个接口层,解耦重要的业务流程。
只需要遵守约定,针对数据编程即可获取扩展能力。
缓冲和削峰
在任何重要的系统中,都会有需要不同的处理时间的元素。
例如,加载一张图片比应用过滤器花费更少的时间。
消息队列,通过一个缓冲层,来帮助任务最高效率的执行———写入队列的处理会尽可能的快速。
该缓冲有助于控制和优化数据流经过系统的速度。
上游数据时有突发流量,下游可能扛不住,或者下游没有足够多的机器来保证冗余,kafka在中间可以起到一个缓冲的作用,把消息暂存在kafka中,下游服务就可以按照自己的节奏进行慢慢处理。
灵活性 & 峰值处理能力
在访问量剧增的情况下,应用仍然需要继续发挥作用,但是这样的突发流量并不常见;
如果为以能处理这类峰值访问为标准来投入资源随时待命,无疑是巨大的浪费。
使用消息队列,能够使关键组件顶住突发的访问压力,而不会因为突发的超负荷的请求,而完全崩溃。
【离线】灵活与离线场景
离线数据加载
支持online和offline的场景
支持离线数据处理和实时数据处理。
批量处理、批量发送
kafka支持以消息集合为单位进行批量发送,以提高push效率
数据批量发送
因此可用于批量消费
批量发送
是提高消息吞吐量重要的方式,
Producer端可以在内存中合并多条消息后,以一次请求的方式发送了批量的消息给broker,
从而大大减少broker存储消息的IO操作次数。但也一定程度上影响了消息的实时性,
相当于以时延代价,换取更好的吞吐量。
消息聚合
producer一次 发送多个消息
压缩消息
直接向leader节点发消息
可以自定义策略发送到那个分区
同一个用户顺序消费
consumer一次收一批消息
通过给leader partition发fetch请求指定offset
consumer对消费有绝对的控制权,可以重新设置offset从新消费
consumer pull数据
每个consumer对应一个分区,对于同一条消息只分给一个consmer
重新消费特别容易
标准二进制消息格式
pagecache 和sendfile组合 在消费者只写的情况下看不到磁盘活动
sendfile和更多的zero-copy背景知识见zero-copy
负载均衡方面
produce根据用户指定的算法,将消息传递给指定的partition
存在多个partition,每个partition有自己的replica,每个replica分布在不同的broker节点上
多个partition需要选取出lead partition,lead partition负责读写,并由zookeeper负责fail over
通过zookeeper管理broker与 consumer的动态加入与离开
对kafka0.8而言,提供一个metadata API来管理broker之间的负载,对kafka0.8而言
对kafka0.7而言,主要靠zookeeper来实现负载均衡
数据压缩、日志压缩
Compress
发生位置
Broker
Consumer
Producer 端压缩、Broker 端保持、Consumer 端解压缩
Broker 重新压缩
压缩算法不一致
Broker 发生消息格式转换
压缩日志
所有consumer能够消费的消息都有存储
消息顺序不变
消息偏移量,offset不变
从头开始处理日志的consumer都能拿到每个key的最终状态
删除的消息在某个时间段内可以看到
零拷贝
zero-copy:减少IO操作步骤
限流
每个唯一客户端group都有一个集群配置的固定限额
基于broker
限流以后降速
插件支持
现在开发出不少插件来拓展kafka的功能
比如来配合storm,hadoop,flume相关的插件
官方文档很详细
kafka 为什么那么快 ?
Cache Filesystem 缓存文件系统
Cache PageCache缓存
顺序写
由于现代的操作系统提供了预读和写技术,磁盘的顺序写大多数情况下比随机写内存还要快。
Zero-copy零拷贝技术
零拷技术减少拷贝次数
Batching of Messages 批量量处理
合并小的请求,然后以流的方式进行交互,直顶网络上限。
Pull 拉模式
使用拉模式进行消息的获取消费,与消费端处理能力相符。
Kafka的组件、基本组成要素、部件消息系统术语
Broker
Kafka 服务端
Kafka Cluster包含一个或多个服务器,这种服务器被称为broker。
Kafka 服务端
Kafka服务器
一个服务器节点就是一个`broker`,
特点
有序性
同一分区
负载均衡
不同分区
集群
多个`broker`构成了`kafka`集群
运行在多个集群上
集群服务broker
Kafka以集群的方式运行,可以由一个或多个服务组成,每个服务叫做一个Broker.
缓存代理,Kafa集群中的一台或多台服务器统称为broker。
Broker
作用
负责消息存储和转发
kafka集群中Broker之间的关系
不是主从关系
各个Broker在集群中的定位一样,可随意的增加或删除任意一个broker节点
Broker是消息的代理
Producers往Brokers里面的指定Topic中写消息,
Consumers从Brokers里面拉取指定Topic的消息,
然后进行业务处理,Broker在中间起到一个代理保存消息的中转站。
Topic主题
主题 topic
优点
这使得每个日志的数量不会太大,可以在单个服务上保存。
每个分区可以单独发布和消费,为并发操作topic,提供了一种可能。
类别/分类/归纳/逻辑集合
每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic(主题)。
数据的逻辑集合
消息类别
消息归纳topic
一个topic是对一组消息的归纳
Kafka处理的消息源的不同分类
kafka集群分类存储的记录流
作用
Kafka按照topic来分类消息
物理上,不同Topic的消息分开存储
逻辑上一个Topic的消息,虽然保存于一个或多个broker上但用户只需指定消息的Topic即可生产或消费数据而不必关心数据存于何处)。
每个消息记录包含一个键 一个值和时间戳
支持多用户订阅
可以有0个1个或多个消费者订阅
分布式
服务器共享分区进行数据请求处理
一个 leader
处理所有读取和写入分区的请求
0个或多个followers
被动拷贝数据
选举产生leader
日志分区
有序、不可变消息序列
偏移量
唯一标识
分区会给每个消息分配一个顺序id号
集群保留所有发布记录,不管有没有被消费过
可配置删除旧数据策略
可设置保留策略
性能与数据量大小无关
保留每个消费者元数据中最基础的数据就是消费者正在处理的当前记录的偏移量offset或位置position
由消费者 consumer控制
消费者可以在任何喜欢的位置读取数据
实现kafka消费者代价很小
消费者增加或减少不对集群产生较大影响
好处
允许数据扩展到更多服务器上
分区是并行处理的基础单元
Topics 和Logs
图解Topic细节图
Topic细节图
对每个topic,Kafka 对它的日志进行了分区
图解Kafka Log Implementation
图解Kafka Log Implementation
topic存储结构
Message消息
消息,是通信的基本单位。
每个Producer可以向一个Topic发布一些消息
Partition 分区
一个主题可以有多个分区
主题划分多个区
topic的分区
每个Topic包含一个或多个partition
一个topic可以包含多个partition
topic消息保存在各个partition上
一个topic可以有多个partition组成
一个topic可以分为多个partition
Topic划分多个partition,
物理集合与分组
Topic物理上的分组
Partition是物理上的概念
数据的物理集合
有序的队列
每个partition是一个有序的队列。
整体不是严格的先进先出FIFO,但是分区是严格的FIFO
在一个分区中消息的顺序就是producer发送消息的顺序
partition中的每条消息都会被分配一个有序的id(offset)
其他
片段
每条消息只在一个分区上
分区机制
具体分区的数据可配置
kafka的broker端支持消息分区,producer可以决定把消息发送到那个分区
一个partition可以有多个副本,副本分布在不同的broker上。
把数据分割在多个 Broker 上
高伸缩性 Scalability
负载均衡
轮询
随机
Key-ordering
同 Key 的消息有序
地理位置
Producers 消息生产者
如何选择分区
简单的由负载均衡机制,随机选择分区
【推荐】通过特定的分区函数选择分区。
Producer将消息发布到它指定的topic中,并负责决定发布到哪个分区。
负责发布消息到Kafka broker。
允许应用程序发布记录流或多个kafka的topics
发布消息
发布数据到topic
配置
acks
`acks = all`所有可用分区确认写入才算发送成功
`acks = 0`不需要等待写入确认,数据发出就算成功
`acks = 1`分区首领确认写入就算发送成功
max.in.flight.requests.per.connection
在收到服务器的响应前,生产者最多可以发送多少消息。
当设为1时,可以保证单个连接中消息顺序写入分区。
分区器
当消息`key`为`null`时,默认分区器使用轮训算法将消息发送到不同的分区
当消息`key`不为`null`时,默认分区器使用hash算法将消息映射到不同分区,相同`key`的消息会分到同一分区
在进行映射操作时,使用的是所有的分区,不仅仅是可用分区
在进行映射操作时,使用的是所有的分区,不仅仅是可用分区
当分区数量发生变化后,具有相同`key`的新消息可以被映射到不同分区
批量发送
是提高消息吞吐量重要的方式,
Producer端可以在内存中合并多条消息后,以一次请求的方式发送了批量的消息给broker,
从而大大减少broker存储消息的IO操作次数。但也一定程度上影响了消息的实时性,
相当于以时延代价,换取更好的吞吐量。
压缩(GZIP或Snappy)
Producer端可以通过GZIP或Snappy格式对消息集合进行压缩。
Producer端进行压缩之后,在Consumer端需进行解压。
压缩的好处
减少传输的数据量
减轻对网络传输的压力
在对大数据处理上,瓶颈往往体现在网络上而不是CPU
(压缩和解压会耗掉部分CPU资源)
生产者设计
发送方式
同步发送
异步发送
发送并忘记
异步发送
发送并忘记
负载均衡
(partition会均衡分布到不同broker上)
由于消息topic由多个partition组成,且partition会均衡分布到不同broker上,
因此,为了有效利用broker集群的性能,提高消息的吞吐量,
producer可以通过随机或者hash等方式,将消息平均发送到多个partition上,以实现负载均衡。
负责把记录分配到分区
可以轮训进行平衡负载
也可以用语义分区算法
发送消息时根据时间属性给一个偏移量
消息和数据生产者,向Kafka的一个topic发布消息的过程叫做producers。
Producer将消息发布到它指定的topic中,并负责决定发布到哪个分区。
通常简单的由负载均衡机制随机选择分区,但也可以通过特定的分区函数选择分区。使用的更多的是第二种。
图解生产者
图解生产者
Consumers 消息消费者
connector api
例如连接到数据的连接器可能获取每个表的变化
允许构建和允许可重用消费者,能够把kafka主题连接到先用的应用程序或数据系统
图解消费者设计
图解消费者设计1
图解消费者设计2
Kafka的两种模式:队列模式和发布-订阅模式
Consumer Group消费者组
每个消费者接受主题中一部分分区消息
消费者组中的消费者数量不能大于主题分区个数,否则会有部分消费者分配不到分区,无法消费消息
Consumers可以加入一个consumer 组,共同竞争一个topic,topic中的消息将被分发到组中的一个成员中。
如果所有的consumer都在一个组中,这就成为了传统的队列模式,在各consumer中实现负载均衡。
如果所有的consumer都不在不同的组中,这就成为了发布-订阅模式,所有的消息都被分发到所有的consumer中。
消费者实例
负载均衡
同一个消费者群 consumer group
广播
不同消费者群
图
每个消息只会发给消费者群中的一个消费者的一个实例
消费者群 consumer group来标识自己
消费者分组
Kafka 提供的可扩展且具有容错性的消费者机制
可以有一个或多个 Consumer
同一个 GroupId
每个Consumer必须属于一个group
消息只能被同组内一个 Consumer 消费
各个consumer可以组成一个组,每个消息只能被组中的一个consumer消费,
如果一个消息可以被多个consumer消费的话,那么这些consumer必须在不同的组
每个消费者接受主题中一部分分区消息
消费者组中的消费者数量不能大于主题分区个数,否则会有部分消费者分配不到分区,无法消费消息
消费者隶属于消费者组
消费者组中的消费者订阅同一主题
同一Consumer Group中的多个Consumer实例,不同时消费同一个partition,等效于队列模式。
partition内消息是有序的,Consumer通过pull方式消费消息。
对于partition,顺序读写磁盘数据,以时间复杂度O(1)方式提供消息持久化能力。
消息和数据消费者,订阅topics并处理其发布的消息的过程叫做consumers。
消费消息
消息消费者,向Kafka broker读取消息的客户端。
允许应用程序订阅一个或多个主题,并处理执行记录接收的记录流
集群关系图
只能保证一个分区中的消息有序,不能保证多个分区中消息有序
只分一个分区
拦截器
消息审计
监控
消费者组监控
lead
计算方式
=Consumer Offset -LogStartOffset
作用
lead越接近于0,那么就表示有可能要丢消息
lag
计算方式
=HW-Consumer Offset
作用
消费者当前落后于生产者的程度
心跳
消费者向群组协调器发送心跳来维持它们和群组的从属关系以及它们对分区的所有权关系
当心跳停止时,会触发再均衡
当心跳停止时,会触发再均衡
轮询
消费者通过轮询API从各分区获取数据
在第一次轮询时还会完成join group的操作,同时还能接受分配的分区,再均衡过程也会在轮询API中完成
配置
auto.offset.reset
当消费一个没有偏移量信息的分区时,
`latest`从最新的记录开始消费,
`earlist`从起始位置开始消费
partition.assignment.strategy
分区分配策略
Range会将若干连续分区分配给消费者
RoundRobin会将所有分区逐个分配给消费者
enable.auto.commit
`true`根据配置的时间间隔,自动提交偏移量
`false`人工提交偏移量
提交
自动提交
同步提交
异步提交
提交特定偏移量
独立消费者
不属于任何群组的消费者,需要手动分区
Controller 控制器
Controller 控制器概述
在 ZooKeeper 的帮助下管理和协调整个 Kafka 集群
Controller 控制器的选举
第一个成功创建 /controller 节点的 Broker 会被指定为控制器。
第一个在`ZooKeeper`中创建`/controller`节点的`broker`就是控制器
成为控制器后,会从`ZooKeeper`获取一个递增的`controller epoch`,其他`broker`会忽略`epoch`较小的消息,防止脑裂
Controller 控制器的作用
分区重分配
Preferred 领导者选举
特殊的`broker`
负责分区首领的选举
主题管理
创建分区
删除分区
增加分区
数据服务
集群成员管理
Broker 宕机
Broker 主动关闭
新增 Broker
Controller 控制器故障转移(Failover)
选一个broker作为controller
负责一个故障节点引起的所有分区leader变更
controller故障选取新的broker
各broker watch /controller临时节点,故障后重新选举
Controller 控制器内部设计原理
0.11 版本后,把把多线程的方案改成了单线程加事件队列的方案
将之前同步操作 ZooKeeper 全部改为异步操作
Replica 副本 、Replication 备份
本质就是一个只能追加写消息的提交日志
Replication 备份
基于领导者(Leader-based)的副本机制
分区首领
即首领副本
领导者副本
承担读写
方便实现“Read-your-writes”
方便实现单调读(Monotonic Reads)
Kafka的读写只能发生在leader副本上
追随者副本
同一个分区下的所有副本保存有相同的消息序列
同步
ISR(In Sync Replicas)
判断标准 replica.lag.time.max.ms(默认10s)
leader 副本也在其中
超过设置的落后时间,踢出 ISR
赶上可以加回
Unclean 领导者选举
Unclean 领导者选举(Unclean Leader Election)
参数
Broker 端参数 unclean.leader.election.enable 控制是否允许 Unclean 领导者选举
选择
在这个问题上,Kafka 赋予你选择 C 或 A 的权利
建议
unclean.leader.election.enable=false,牺牲可用性换取一致性
Replication 备份的作用
只有一个作用,就是提供冗余以实现高可用
ISR
In-Sync Replicas 副本同步队列
ISR是由leader维护,follower从leader同步数据有一些延迟
包括两个维度
延迟时间replica.lag.time.max.ms
当前最新的版本0.10.x中只支持这个维度
延迟条数replica.lag.max.messages
任意一个超过阈值都会把follower剔除出ISR,存入OSR(Outof-Sync Replicas)列表,新加入的follower也会先存放在OSR中。
AR
Assigned Replicas
所有副本
AR=ISR+OSR。
什么情况下一个 broker 会从 ISR 中踢出去 ?
leader会维护一个与其基本保持同步的Replica列表,该列表称为ISR(in-sync Replica),
每个Partition都会有一个ISR,而且是由leader动态维护 ,
如果一个follower比一个leader落后太多,或者超过一定时间未发起数据复制请求,则leader将其重ISR中移除 。
Coordinator 协调者/群组协调器
消费者群组的偏移量会写入到`__consumer_offsets topic`的某个分区,该分区的`leader`所在`broker`就是该消费者组的群组协调器
群组协调器的主要作用是
偏移量管理
再均衡
Offset位移、偏移量
是什么?
消息在日志中的位置
消息在partition上的偏移量
代表该消息的唯一序号
Consumer Offset
OFFSET由consumer维护
消费进度
Map<GroupId, Partition, Offset>
zookeeper -> 内部主题
用户不可修改消息格式
自动创建
Compact 策略来定期清理过期消息
Kafka如何搭建及创建topic、发送消息、消费消息?
每个分区都由一系列有序的、不可变的消息组成,这些消息被连续的追加到分区中。
分区中的每个消息都有一个连续的序列号叫做offset,用来在分区中唯一的标识这个消息。
实际上每个consumer唯一需要维护的数据是消息在日志中的位置,也就是offset.
这个offset有consumer来维护:
一般情况下随着consumer不断的读取消息,这offset的值不断增加,
但其实consumer可以以任意的顺序读取消息,
比如它可以将offset设置成为一个旧的值来重读之前的消息。
优点
相比传统的消息系统,Kafka可以很好的保证有序性。
Kafka可以在多个consumer组并发的情况下,提供较好的负载均衡。
偏移量
`0.9.0`和之前版本,`kafka`将分区偏移量信息保存在`ZooKeeper`中
`0.9.0`以后,分区偏移量写入到`__consumers_offsets topic`中
__consumer_offsets
实现方式
使用 __consumer_offsets内部主题的方式来保存位移
消息格式
普通消息格式
Value格式:主要是offset的值,还包括时间戳等
Key的格式:<GroupID,主题名,分区号 >
保存Consumer Group 信息的消息
用于删除 Group 过期位移甚至是删除 Group 的消息
默认设置
如果位移主题是 Kafka 自动创建的,那么该主题的分区数是 50,副本数是 3
消息Compaction
提交位移
手动提交
直接提交最新一条消息的位移
commitSync()
Consumer 程序会处于阻塞状态,直到远端的 Broker 返回提交结果
commitAsync()
立即返回,不会阻塞,不会影响 TPS,但是有错误不会重试
精细化提交位移
commitSync(Map<TopicPartition, OffsetAndMetadata>
commitAsync(Map<TopicPartition, OffsetAndMetadata>
自动提交
参数设置
enable.auto.commit=true
auto.commit.interval.ms(默认5s)
auto.commit.interval.ms(默认5s)
提交位移时机
在开始调用 poll 方法时,提交上次 poll 返回的所有消息。
导致问题:它可能会出现重复消费
导致问题:它可能会出现重复消费
最佳实践
手动提交,组合异步提交和同步提交
CommitFailedException
发生时机
CommitFailedException 异常通常发生在手动提交位移时,
即用户显式调用 KafkaConsumer.commitSync() 方法时
即用户显式调用 KafkaConsumer.commitSync() 方法时
原因
同时出现了设置相同 group.id 值的消费者组程序和独立消费者程序
消息处理的总时间超过预设的 max.poll.interval.ms 参数值
解决方案
缩短单条消息处理的时间
增加 Consumer 端允许下游系统消费一批消息的最大时长,调大max.poll.interval.ms
减少下游系统一次性消费的消息总数,调小max.poll.records
下游系统使用多线程来加速消费
高水位 High Watermark
位移值等于高水位的消息也属于未提交消息。
也就是说,高水位上的消息是不能被消费者消费的
Log End Offset 日志末端位移
Rebalance再均衡
是什么 ?
再均衡
本质上是一种协议
规定了一个 Consumer Group 下的所有 Consumer 如何达成一致,来分配订阅 Topic 的每个分区
分区的所有权从一个消费者转义到另一个消费者
再均衡期间消费者不会消费信息,服务暂时不可用
第一个加入群组的消费者是该群群主
再均衡时,群主向群组协调器获取群组成员列表,并对成员分配分区。
分区完毕后将分区信息发送给群组协调器,群组协调器再将信息发送给所有消费者,各消费者只知道自己的分区信息
触发条件、触发时机
组成员数发生变更
新组成员加入或老组成员离开
组成员崩溃,或者网络异常被踢出组
订阅主题数发生变更
使用正则表达式来订阅topic
订阅主题的分区数发生变更
增加分数区
问题
【类似 STW 过程】在 Rebalance 过程中,TPS不高。所有 Consumer 实例都会停止消费,等待 Rebalance 完成
【所有 Consumer 重新分配】在Rebalance 时,效率不高。所有 Consumer 实例共同参与,全部重新分配所有分区
【太慢】Rebalance过程太慢
Coordinator
Coordinator 协调者
Rebalance 的执行者
订阅主题分区的分配由Coordinator负责
所有Broker 都有各自的 Coordinator 组件
Consumer Group 如何确定为它服务的
Coordinator 在哪台 Broker 上?
Coordinator 在哪台 Broker 上?
第 1 步:
确定由位移主题的哪个分区来保存该 Group 数据:
partitionId=Math.abs(groupId.hashCode() % offsetsTopicPartitionCount)。
第 2 步:
找出该分区 Leader 副本所在的 Broker
该 Broker 即为对应的 Coordinator
如何避免
可避免情况
非必要 Rebalance 是因为未能及时发送心跳,导致 Consumer 被“踢出”Group 而引发的
Consumer 实例会被 Coordinator 错误地认为“已停止”从而被“踢出”Group
非必要 Rebalance 是 Consumer 消费时间过长导致的
相关参数
session.timeout.ms
每个 Consumer 实例都会定期地向Coordinator 发送心跳请求,表明它还存活着
heartbeat.interval.ms
控制发送心跳请求频率
max.poll.interval.ms
它限定了 Consumer 端应用程序两次调用 poll 方法的最大时间间隔。
它的默认值是 5 分钟
它的默认值是 5 分钟
方案
设置 session.timeout.ms = 6s
设置 heartbeat.interval.ms = 2s
要保证 Consumer 实例在被判定为“dead”之前,能够发送至少 3 轮的心跳请求
设置 heartbeat.interval.ms = 2s
要保证 Consumer 实例在被判定为“dead”之前,能够发送至少 3 轮的心跳请求
max.poll.interval.ms的值要大于消费者消费的最大时长
FGC
过程
重平衡过程是如何通知到其他消费者实例的?
靠消费者端的心跳线程(Heartbeat Thread)
心跳分支来通知
Kafka的三种消息策略、语义 level、消息传输的事务定义
最多一次(at most once)
最多一次(at most once)
消息不会被重复发送,最多被传输一次,但也有可能一次不传输。
消息可能会丢失,但绝不会被重复发送
可能丢失 不会重复
log 位置信息
至少一次、最少一次(at least once)
至少一次(at least once)
消息不会丢失,但有可能被重复发送。
消息不会被漏发送,最少被传输一次,但也有可能被重复传输.
可能重复 不会丢失
log 消息处理结果
精确一次(Exactly Once)
精确一次(exactly once)
消息不会丢失,也不会被重复发送。
不丢失 不重复,只发一次
上面两个都存且存到同一个位置上
不会漏传输也不会重复传输,每个消息都传输被一次而且仅仅被传输一次,
这是大家所期望的。
Kafka的缺点
数据丢失问题
丢消息
kafka收到消息后会写入磁盘缓冲区里,没有直接落到物理磁盘上去,所以机器本身故障了,可能导致磁盘缓冲区里的数据丢失。
功能单一
主要支持发送消息,消费消息。
其他场景比较受限制。
kafka快的原因
顺序读写
索引
批量读写和文件压缩(清除已经更新消息的历史版本)
零拷贝
拓展知识点
用户空间和内核空间
传统I/O过程
Kafka的两种模式
队列模式
(queuing)
一组消费者可以从服务器读取数据记录,每个记录都会被其中的一个消费者处理
consumers可以同时从服务端读取消息,每个消息只被其中一个consumer读到。
队列模式中,consumers可以同时从服务端读取消息,每个消息只被其中一个consumer读到。
发布-订阅模式
消息被广播到所有的consumer中。
记录被广播到所有的消费者
(publish-subscribe)
发布-订阅模式中消息被广播到所有的consumer中。
Kafka数据存储设计
partition的数据文件(offset,MessageSize,data)
partition中的每条Message包含了以下三个属性
offset
offset表示Message在这个partition中的偏移量
offset不是该Message在partition数据文件中的实际存储位置,而是逻辑上一个值
它唯一确定了partition中的一条Message,可以认为offset是partition中Message的id
MessageSize
表示消息内容data的大小
data
Message的具体内容
数据文件分段segment(顺序读写、分段命令、二分查找)
partition物理上由多个segment文件组成,每个segment大小相等,顺序读写。
每个segment数据文件以该段中最小的offset命名,文件扩展名为.log。
这样在查找指定offset的Message的时候,用二分查找就可以定位到该Message在哪个segment数据文件中。
数据文件索引(分段索引、稀疏存储)
Kafka为每个分段后的数据文件建立了索引文件,文件名与数据文件的名字是一样的,只是文件扩展名为.index。
index文件中并没有为数据文件中的每条Message建立索引,而是采用了稀疏存储的方式,每隔一定字节的数据建立一条索引。
这样避免了索引文件占用过多的空间,从而可以将索引文件保留在内存中。
图解
Kafka的分布式
副本
每个分区在Kafka集群的若干服务中,都有副本
这样这些持有副本的服务,可以共同处理数据和请求
副本使Kafka具备了容错能力。
副本数量是可以配置的。
每个分区都由一个服务器作为“leader”,零或若干服务器作为“followers”,
leader,负责处理消息的读和写
followers,则去复制leader
如果leader down了,followers中的一台则会自动成为leader。
集群中的每个服务都会同时扮演两个角色,
作为它所持有的一部分分区的leader
作为其他分区的followers
这样集群就会据有较好的负载均衡
Kafka的工作原理说明
设计目标
支持在线水平扩展。
工作原理解释
因为Kafka内在就是分布式的,一个Kafka集群通常包括多个代理。
为了均衡负载,将Topic分成多个分区,每个代理存储一或多个分区。多个生产者和消费者能够同时生产和获取消息。
消息由生产者通过Kafka Producer API push到Kafka对应Topic中的Partition
再被所关注此Topic的消费者通过Consumer API pull到本地消费
同时Consumer自己管控消息的偏移量offset,即记录此次消息的消息位置。
图解Kafka工作原理
Kafka的工作原理快速理解
kafka follower如何与leader同步数据
Kafka的复制机制
既不是完全的同步复制
也不是单纯的异步复制
完全同步复制方式
要求All Alive Follower都复制完,
这条消息才会被认为commit,
这种复制方式极大的影响了吞吐率
异步复制方式
Follower异步的从Leader复制数据,
数据只要被Leader写入log就被认为已经commit,
这种情况下,如果leader挂掉,会丢失数据,
kafka使用ISR的方式很好的均衡了确保数据不丢失以及吞吐率。
Follower可以批量的从Leader复制数据,而且Leader充分利用磁盘顺序读以及send file(zero copy)机制,
这样极大的提高复制性能,内部批量写磁盘,大幅减少了Follower与Leader的消息量差。
命令行使用指导
Step 1:下载Kafka
点击下载最新的版本并解压.
> tar -xzf kafka_2.9.2-0.8.1.1.tgz
> cd kafka_2.9.2-0.8.1.1
Step 2:启动服务
Kafka用到了ZK,所有首先启动ZK
下面简单的启用一个单实例的ZK服务
可以在命令的结尾加个&符号,这样就可以启动后离开控制台。
> bin/zookeeper-server-start.sh config/zookeeper.properties &
INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
现在启动Kafka
>bin/kafka-server-start.sh config/server.properties
INFO Verifying properties (kafka.utils.VerifiableProperties)
INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)
Step 3:创建 topic,查看创建的topic
手动创建一个叫做“test”的topic
>bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
它只有一个分区,一个副本。
除了手动创建topic,还可以配置broker,让它自动创建topic.
通过list命令,查看创建的topic
bin/kafka-topics.sh --list --zookeeper localhost:2181
Step 4:发送消息
Kafka 使用一个简单的命令行producer,从文件中或者从标准输入中读取消息,并发送到服务端。
默认的每条命令将发送一条消息。
运行producer,并在控制台中输一些消息,这些消息将被发送到服务端:
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
ctrl+c可以退出发送。
Step 5: 启动consumer
Kafka也有一个命令行consumer,可以读取消息并输出到标准输出:
> bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
在一个终端中运行consumer命令行,另一个终端中运行producer命令行,就可以在一个终端输入消息,另一个终端读取消息。
这两个命令都有自己的可选参数,可以在运行时,不加任何参数,查看帮助信息。
Step 6: 搭建一个多个broker的集群
刚才只是启动了单个broker
现在启动有3个broker组成的集群,这些broker节点也都是在本机上的
(1)为每个节点编写配置文件
拷贝文件
(2)在拷贝出的新文件中,添加以下参数
config/server-1.properties:
配置内容1
config/server-2.properties:
配置内容2
说明
broker.id在集群中,唯一的标注一个节点,
因为在同一个机器上,所以必须制定不同的端口和日志文件,避免数据被覆盖。
(3)刚才已经启动了ZK和一个节点,现在启动另外两个节点
启动命令 > bin/kafka-server-start.sh config/server-1.properties &
启动命令 > bin/kafka-server-start.sh config/server-2.properties &
(4)创建一个拥有3个副本的topic:
> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic
(5)现在搭建了一个集群,怎么知道每个节点的信息呢?
运行“"describe topics”命令即可
> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
输出内容
解释一下这些输出
第一行是对所有分区的一个描述,然后每个分区都会对应一行,因为只有一个分区,所以下面就只加了一行。
leader:
本例中:节点1是作为leader运行。
负责处理消息的读和写,
leader是从所有节点中随机选择的.
replicas:列出了所有的副本节点,不管节点是否在服务中.
isr:是正在服务中的节点.
(6)向topic发送消息:
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic
(7)消费这些消息:
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic my-replicated-topic
(8)测试一下容错能力
Broker 1作为leader运行,现在kill掉它:
kill -9
另外一个节点被选做了leader,node 1 不再出现在 in-sync 副本列表中:
bin/kafka-topics.sh --describe --zookeeper localhost:218192 --topic my-replicated-topic
虽然最初负责续写消息的leader down掉了,但之前的消息还是可以消费的:
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic my-replicated-topic
看来Kafka的容错机制还是不错的。
常见问题
消费者的消费能力达不到生产者的生产能力如何处理?如何提前规避?
为什么kafka不支持读写分离
读写分离不能提高读性能
方便实现“Read-your-writes”
方便实现单调读(Monotonic Reads)
学习链接
http://www.aboutyun.com/thread-12882-1-1.html
http://blog.jobbole.com/75328/
http://www.cnblogs.com/likehua/p/3999538.html
Spring-kafka应用
在spring boot环境中使用,引入需要依赖的jar包(引入POM文件)
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
KafkaTemplate 主要实现kafka的什么功能?
快速使用KafkaTemplate的步骤
如何配置配置文件信息
怎么封装更快速使用
怎么封装更快速使用
KafkaTemplate使用的配置文件
#============== kafka ===================
# 指定kafka 代理地址,可以多个
spring.kafka.bootstrap-servers=127.0.0.1:9092
#=============== provider =======================
spring.kafka.producer.retries=0
# 每次批量发送消息的数量
spring.kafka.producer.batch-size=16384
spring.kafka.producer.buffer-memory=33554432
# 指定消息key和消息体的编解码方式
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
# 指定kafka 代理地址,可以多个
spring.kafka.bootstrap-servers=127.0.0.1:9092
#=============== provider =======================
spring.kafka.producer.retries=0
# 每次批量发送消息的数量
spring.kafka.producer.batch-size=16384
spring.kafka.producer.buffer-memory=33554432
# 指定消息key和消息体的编解码方式
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
KafkaTemplate封装运用
/**
* Copyright 2018 Desaysv Inc.
*/
package com.desaysv.rd.analysis.utils;
import java.util.List;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import com.desaysv.rd.analysis.entity.KafkaMsgBean;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.StrUtil;
/**
* @Description: 功能描述
*
* @author Xiquan.Liu@desay-svautomotive.com on [2019年5月16日下午7:57:35]
* @Modified By: [修改人] on [修改日期] for [修改说明]
*
*/
@Component
public class KafkaUtils {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
/**
* @Description: 发送消息到kafka
*
* @author Xiquan.Liu@desay-svautomotive.com
* @Date 2019年5月17日上午8:57:23
* @param topic kafka的主题
* @param json 发送的内容
*/
public boolean sendMsg(String topic, String json) {
if (StrUtil.isBlank(topic) || StrUtil.isBlank(json)) {
return false;
}
kafkaTemplate.send(topic, json);
return true;
}
/**
* @Description: 发送消息到kafka
*
* @author Xiquan.Liu@desay-svautomotive.com
* @Date 2019年5月30日上午9:07:36
* @param topic kafka主题
* @param key 主键
* @param json 发送的内容
* @return
*/
public boolean sendMsg(String topic, String key, String json) {
if (StrUtil.isBlank(topic) || StrUtil.isBlank(json)) {
return false;
}
kafkaTemplate.send(topic, key, json);
return true;
}
/**
* @Description: 批量存储
*
* @author Xiquan.Liu@desay-svautomotive.com
* @Date 2019年6月10日下午5:10:32
* @param topic 存储的主题
* @param msgs
* @return
*/
public boolean sendBatchMsg(String topic, List<KafkaMsgBean> msgs) {
if (CollUtil.isEmpty(msgs)) {
return false;
}
msgs.forEach(msg -> {
sendMsg(topic, msg.getDeviceId(), msg.getMsgContent());
});
return true;
}
}
* Copyright 2018 Desaysv Inc.
*/
package com.desaysv.rd.analysis.utils;
import java.util.List;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import com.desaysv.rd.analysis.entity.KafkaMsgBean;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.StrUtil;
/**
* @Description: 功能描述
*
* @author Xiquan.Liu@desay-svautomotive.com on [2019年5月16日下午7:57:35]
* @Modified By: [修改人] on [修改日期] for [修改说明]
*
*/
@Component
public class KafkaUtils {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
/**
* @Description: 发送消息到kafka
*
* @author Xiquan.Liu@desay-svautomotive.com
* @Date 2019年5月17日上午8:57:23
* @param topic kafka的主题
* @param json 发送的内容
*/
public boolean sendMsg(String topic, String json) {
if (StrUtil.isBlank(topic) || StrUtil.isBlank(json)) {
return false;
}
kafkaTemplate.send(topic, json);
return true;
}
/**
* @Description: 发送消息到kafka
*
* @author Xiquan.Liu@desay-svautomotive.com
* @Date 2019年5月30日上午9:07:36
* @param topic kafka主题
* @param key 主键
* @param json 发送的内容
* @return
*/
public boolean sendMsg(String topic, String key, String json) {
if (StrUtil.isBlank(topic) || StrUtil.isBlank(json)) {
return false;
}
kafkaTemplate.send(topic, key, json);
return true;
}
/**
* @Description: 批量存储
*
* @author Xiquan.Liu@desay-svautomotive.com
* @Date 2019年6月10日下午5:10:32
* @param topic 存储的主题
* @param msgs
* @return
*/
public boolean sendBatchMsg(String topic, List<KafkaMsgBean> msgs) {
if (CollUtil.isEmpty(msgs)) {
return false;
}
msgs.forEach(msg -> {
sendMsg(topic, msg.getDeviceId(), msg.getMsgContent());
});
return true;
}
}
KafkaListener运用
消费者
标识需要监听主题
标识需要监听主题
Spring的生命周期
•Spring对bean进行实例化,默认bean是单例;
•Spring对bean进行依赖注入;
•如果bean实现了BeanNameAware接口,spring将bean的id传给setBeanName()方法;
•如果bean实现了BeanFactoryAware接口,spring将调用setBeanFactory方法,将BeanFactory实例传进来;
•如果bean实现了ApplicationContextAware接口,它的setApplicationContext()方法将被调用,将应用上下文的引用传入到bean中;
•如果bean实现了BeanPostProcessor接口,它的postProcessBeforeInitialization方法将被调用;
•如果bean实现了InitializingBean接口,spring将调用它的afterPropertiesSet接口方法,类似的如果bean使用了init-method属性声明了初始化方法,该方法也会被调用;
•如果bean实现了BeanPostProcessor接口,它的postProcessAfterInitialization接口方法将被调用;
•此时bean已经准备就绪,可以被应用程序使用了,他们将一直驻留在应用上下文中,直到该应用上下文被销毁;
•若bean实现了DisposableBean接口,spring将调用它的distroy()接口方法。同样的,如果bean使用了destroy-method属性声明了销毁方法,则该方法被调用;
•Spring对bean进行依赖注入;
•如果bean实现了BeanNameAware接口,spring将bean的id传给setBeanName()方法;
•如果bean实现了BeanFactoryAware接口,spring将调用setBeanFactory方法,将BeanFactory实例传进来;
•如果bean实现了ApplicationContextAware接口,它的setApplicationContext()方法将被调用,将应用上下文的引用传入到bean中;
•如果bean实现了BeanPostProcessor接口,它的postProcessBeforeInitialization方法将被调用;
•如果bean实现了InitializingBean接口,spring将调用它的afterPropertiesSet接口方法,类似的如果bean使用了init-method属性声明了初始化方法,该方法也会被调用;
•如果bean实现了BeanPostProcessor接口,它的postProcessAfterInitialization接口方法将被调用;
•此时bean已经准备就绪,可以被应用程序使用了,他们将一直驻留在应用上下文中,直到该应用上下文被销毁;
•若bean实现了DisposableBean接口,spring将调用它的distroy()接口方法。同样的,如果bean使用了destroy-method属性声明了销毁方法,则该方法被调用;
Spring bean初使化与销毁说明
通过实现接口实现bean的初使化和销毁;
让Bean实现InitializingBean及DisposableBean接口。
通过注解实现实现bean的初使化和销毁;
通过在bean中定义@PostConstruct和@PreDestroy注解定义初始化及销毁方法。
BeanPostProcessor说明
BeanPostProcessor是一个接口,提供了postProcessBeforeInitialization 以及postProcessAfterInitialization两个方法,主要用于在Bean初始化前后做一些处理工作。 查看注释发现:
postProcessBeforeInitialization:在bean初始化之前被调用;
postProcessAfterInitialization:在bean初始化之后被调用
让Bean实现InitializingBean及DisposableBean接口。
通过注解实现实现bean的初使化和销毁;
通过在bean中定义@PostConstruct和@PreDestroy注解定义初始化及销毁方法。
BeanPostProcessor说明
BeanPostProcessor是一个接口,提供了postProcessBeforeInitialization 以及postProcessAfterInitialization两个方法,主要用于在Bean初始化前后做一些处理工作。 查看注释发现:
postProcessBeforeInitialization:在bean初始化之前被调用;
postProcessAfterInitialization:在bean初始化之后被调用
KafkaListener监听流程
KafkaListener监听流程
KafkaListener使用流程
在配置文件上配置kafka的信息;
使用注解@EnableKafka,书写java config;
使用注解@ KafkaListener,标明监听的主题,并且在该方法下,编写解析逻辑;
使用注解@EnableKafka,书写java config;
使用注解@ KafkaListener,标明监听的主题,并且在该方法下,编写解析逻辑;
KafkaListener具体运用示例
Kafka的性能调优
调优目标
高吞吐
Broker端
调大 num.replica.fetchers( Follower 副本用多少个线程来拉取消息)
调优GC参数避免FGC
Producer端
适当调大batch.size的值
适当增大linger.ms的值
设置compression.type为lz4或者zstd
设置ack为0或者1
设置retries为0
多线程共享Producer实例,则适当调大buffer.memory
Consumer端
采用多线程消费
增大fetch.min.size值
低延迟
调优层次
操作系统层
禁止atime mount -o noatime
选择合适的文件系统 ext4 或 XFS
swap 空间设置得比较小 sudo sysctl vm.swappiness=N
加大文件句柄数 ulimit -n
调大 vm.max_map_count
预留较大的页缓存
JVM
将JVM 堆大小设置成 6~8GB
使用 G1 收集器
大对象 增加 JVM 启动参数 -XX:+G1HeapRegionSize=N
Broker
即尽力保持客户端版本和 Broker 端版本一致
应用层
不要频繁地创建 Producer 和 Consumer 对象实例
用完及时关闭
合理利用多线程来改善性能
Producer如何优化打入速度 ?
跨数据中心的传输:
增加线程
提高 batch.size
增加更多 producer 实例
增加 partition 数
设置 acks=-1 时,如果延迟增大:可以增大 num.replica.fetchers(follower 同步数据的线程数)来调解;
增加 socket 缓冲区设置以及 OS tcp 缓冲区设置。
RocketMQ
RocketMQ的基本概念、组件、架构原理
图解RocketMQ
图解RocketMQ
消息模型(Message Model)
组(Group)
生产者组(Producer Group)
Producer Group
生产者组,简单来说就是多个发送同一类消息的生产者称之为一个生产者组。
消费者组(Consumer Group)
Consumer Group
消费者组,消费同一类消息的多个 consumer 实例组成一个消费者组。
多个机器可同属于一个消费组,例如库存服务4个机器。
宕机/加机器时,会进行Rebalance,重新分配消费者对应的MessageQueue
消息消费者/收件人(Consumer)
消息消费者(Consumer)负责消费消息
消息的消费者
需要从NameServer获取Broker信息,然后与Broker建立连接,从Broker获取消息
消费模式、消费消息、丰富的消息拉取模式
PULL
消息拉取
PullConsumer
pull
拉取式消费(Pull Consumer)
通过长轮询在没有消息时Hold住请求
ConsumeGroup大部分在OS cache中,读取性能很高
CommitLog数据很多,不可能全放在OS cache中,先读取cache中的,再读磁盘
while循环不断拉取
关于何时master让消费者去slave拉取消息
10w的消息,只消费了2w,还有8w待消费
master最多只能用10GB的OS cache,只能缓存5w条数据
还有3w就要去磁盘中拉取
此时他就会冉伟可能是master负载太高了,你去slave拉取吧
PUSH
PushConsumer
push
推动式消费(Push Consumer)
注册MessageListener监听器
监听器监听是否有消息待消费
消息模式、消费方式
集群模式
集群消费(Clustering)
集群消费
消费组中的一个机器可以拉取到消息
相同Consumer Group的每个Consumer实例平均分摊消息
广播模式
广播消费(Broadcasting)
广播消费
消费组中的每一个机器可以拉取到消息
相同Consumer Group的每个Consumer实例都接收全量的消息
消息过滤
简单消息过滤
Tag
高级消息过滤
Filter Server
使用 Java class上传作为过滤表达式是一个双刃剑
Filter Server是双刃剑
【好处】方便了应用的过滤操作且节省网卡资源
【坏处】带来了服务器端的安全风险
Consumer负载均衡方式
连续分配(默认)AllocateMessageQueueAveragely
轮流:AllocateMessageQueueAveragelyByCircle
通过配置:AllocateMessageQueueByConfig
一致性Hash:AllocateMessageQueueConsistentHash
指定一个broker的topic中的queue:AllocateMessageQueueByMachineRoom
按broker的机房就近:AllocateMachineRoomNearBy
消费者重试、消息重发、消息消费异常处理、重试策略及死信队列
重试策略
重试
消息失败重试
消费失败处理方式
消息发送失败如何处理
消息发送失败处理方式
originMsgId
消息id变化,但是原始id不变
默认最多重试16次
默认最多16次
16次重试失败,则放入死信队列
重试时间不对衰减,最多重试16次
如果在16次重试范围内消息处理无法成功,就需要自动进入死信队列
配置重试时间间隔
重试时间间隔可以进行如下配置:messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
每次重试间隔如下:messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
16次重试失败,则放入死信队列
根据使用场景,对死信队列中的消息,还是要一直不停的重试。
由消费组名称推出重试队列和死信队列
比如消费组名称是“VoucherConsumerGroup”
意思是优惠券的消费组
推出重试队列
名字为“%RETRY%VoucherConsumerGroup”的重试队列
推出死信队列
名称为“%DLQ%VoucherConsumerGroup”的死信队列
%DLQ%Test_group
场景
消费者消费消息时,db宕机,怎么办?返回success,这条数据就丢了,不返回?那要等多久
解决方案
死信队列
作用
用于处理无法被正常消费的消息
什么时候产生 ?
当一条消息初次消费失败,消息队列会自动进行消息重试;
达到最大重试次数后,若消费依然失败,则表明消费者在正常情况下无法正确地消费该消息,
此时,消息队列 不会立刻将消息丢弃,而是将其发送到该消费者对应的特殊队列中。
不再被正常消费
保存3天
面向消费者组
控制台 重发 重写消费者 单独消费
这个消息在rocket mq后台管理平台可以看见
后续可以开个后台线程,处理死信队列内的消息
返回RECONSUME_LATER状态
try-cache捕获异常,返回reconsume_later
如果对消息的处理有异常,可以返回RECONSUME_LATER状态,
即通知MQ将消息放到针对你这个消费组的consumeQueue创建的重试队列里,过一会再让消费者消费
消费者组内的重试队列
例如Test_group,重试队列%retry%test_group
意思是现在没法完成这批消息的处理,麻烦你稍后过段时间再次给我这批消息让我重试一下。
然后会把你这批消息放到你这个消费组的重试队列中去。
消费过程幂等
消息幂等
幂等
消费过程要做到幂等
消息去重
(即消费端去重)
去重
消息表主键冲突
消息重复的场景
系统间调用的重复机制
A->B,B系统处理慢导致超时,A系统重新发送请求导致重复
A->B,B系统处理慢导致超时,A系统重新发送请求导致重复
上述B系统后有个将消息放入MQ的操作
手动提交offset未完成
消费者成功消费完消息,未返回consume_commit时,系统重启|系统宕机,
MQ重新发送消息到同消息组其他消费者机器,导致消息重复
消费者成功消费完消息,未返回consume_commit时,系统重启|系统宕机,
MQ重新发送消息到同消息组其他消费者机器,导致消息重复
重试机制导致的问题
消息成功发送到MQ中,但MQ因网络原因未能成功返回,导致重试机制重试机制重复发送到MQ
消息成功发送到MQ中,但MQ因网络原因未能成功返回,导致重试机制重试机制重复发送到MQ
解决方案
produce
业务判断法
操作方法
向MQ发送消息时,先确认消息是否在MQ中
缺点
MQ虽然有这个查询功能,但是不建议使用,性能不好
msg.setKeys(orderId);
broker会基于这个key, hash创建一个索引,放到indexFile中
可以根据mq提供的命令,用这个id来查询消息是否存在
未来得及将消息成功写入redis,系统就宕机了,仍然会有重复消息的问题
基于redis的幂等机制
Redis记录是否已成功发送此条消息
consume
业务判断法
db中数据的状态翻转,查一下就知道是不是消费过了
消费速度慢的处理方式
1 提高消费并行度
提高消费并行度
2 批量方式消费
批量方式消费
3 跳过非重要消息
跳过非重要消息
4 优化每条消息消费过程
如何判断消费収生了堆积,offset计算
优化每条消息消费过程
消费速度慢处理方式
消费打印日志
消费打印日志
其他消费建议
消息生产者/寄件人(Producer)
消息生产者(Producer)负责生产消息
消息的生产者
需要从NameServer获取Broker信息,然后与Broker建立连接,向Broker发送消息
四种发送消息的方式
同步发送
同步
SendResult send = produce.send(msg)
同步发送
异步发送
异步
可以调整失败重试次数
produce.send(msg, new SendCallBack(){
@Override
public void onSuccess(){
}
@Override
public void onException(Throwable e){
}
})
@Override
public void onSuccess(){
}
@Override
public void onException(Throwable e){
}
})
异步发送
单向发送
单向
produce.SendOneWay(msg);
1.3选择oneway形式发送
某些场景选择oneway形式发送
单向发送
顺序发送
顺序消息
生产者发送消息到达broker是有序,不能使用多线程发送,需要顺序发送
写入broker的时候顺序写入,相同主体集中写入,选择同一个queue,MessageQueueSelector传入相同的hashKey
消费者消费的时候只能有一个线程
发送消息注意事项
发送顺序消息注意事项
一个应用尽可能用一个Topic,消息子类型用tags来标识
每个消息在业务局面的唯一标识码
日志务必要打印sendresult和key字段
对于消息不可丢失应用,务必要有消息重収机制
Producer负载均衡 、消息发送规则
SelectMessageQueueByHash
(默认)
自增轮询方式
默认会轮询所有的message queue发送,以达到让消息平均落在不同的queue上。
SelectMessageQueueByRandom
随机选择一个队列
SelectMessageQueueByMachineRoom
返回空
自定义实现MessageQueueSelector
事务消息
开源版本并不支持事务消息
7.1事务消息概述
7.2事务消息机制
支持事务
(开源不支持)
事务消息
应用本地事务和发送消息操作可以被定义到全局事务中,要么同时成功,要么同时失败。
RocketMQ的事务消息提供类似 X/Open XA 的分布事务功能,通过事务消息能达到分布式事务的最终一致
延迟消息、延时消息、延迟队列、定时消息
7.3延迟消息概述
7.4延迟消息机制
延迟消息、延时消息
应用场景
电商场景下,下订单15min未付款则关闭订单
针对此种场景,不能频繁扫描db,数据量太大
可以通过MQ的消息延迟机制,现将订单消息发送到MQ,设定15min后消费者可见这条消息
订单服务消费者获取到MQ消息,此时查询订单状态,若仍旧为未付款,则关闭订单
message.setDelayTimeLevel(3)也就是延迟级别,跟消息重发机制那个一样
发送延迟消息的核心,是设置消息的delayTimeLevel
默认支持一些延迟级别如下:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
如果设置为3,意思是延迟10s
如果设置为3,意思是延迟10s
定时消息(延迟队列)是指消息发送到broker后,不会立即被消费,等待特定时间投递给真正的topic。 broker有配置项messageDelayLevel,默认值为“1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”,18个level。可以配置自定义messageDelayLevel
定时消息会暂存在名为SCHEDULE_TOPIC_XXXX的topic中,并根据delayTimeLevel存入特定的queue,queueId = delayTimeLevel – 1,即一个queue只存相同延迟的消息,保证具有相同发送延迟的消息能够顺序消费。broker会调度地消费SCHEDULE_TOPIC_XXXX,将消息写入真实的topic
消息队列MessageQueue
Message Queue
为了提高性能和吞吐量,引入了Message Queue
一个Topic可以设置一个或多个Message Queue
这样消息就可以并行往各个Message Queue发送消息,消费者也可以并行的从多个Message Queue读取消息
commit log
message queue只是存储CommitLog中对应的位置信息
方便通过message queue,找到对应存储在CommitLog的消息
message queue本身并不存储消息
真正的消息存储会写在CommitLog的文件
生产者
NameServer集群
拉取Topic元数据
写入数据
均匀写入
Master Broker
Topic
MessageQueue
一个MessageQueue对应一个消费者机器处理
MessageQueue
Topic
MessageQueue
MessageQueue
故障无法写入
开启sendLatencyFaultEnable自动容错机制
未成功访问broker时,自动回避一段
Message Queue
默认8个队列
每隔30秒拉取NameServer上路由信息
通过tcp与name server建立长连接,定时拉取broker元数据
Broker(邮递员)
Broker概述
代理服务器(Broker Server)
Broker 角色
什么是Broker
Broker作用
Broker 负责存储消息
存储转发消息
负责消息的接收,存储,投递等功能
消息中转角色,负责存储消息、转发消息
Broker存储目录结构
Broker启动和停止流程
Broker重启对客户端的影响
Broker的配置
Broker 配置
Broker集群搭建
Broker支持集群化部署
Broker 使用指南
Broker是RocketMQ的核心
6.2 Broker存储机制
6.2.1 Broker消息存储结构
6.2.2 Broker消息存储机制
1.Broker消息存储的流程
2.内存映射机制与高效写磁盘
3.文件刷盘机制
6.2.3 Broker读写分离机制
Broker数据存储
所有topic都写入同一个文件中
为每一个消费者组存储消费topic最后一个offset单独存储(consume queue)
物理存储
commit log
comsume queue
index file
PageCache
零拷贝
内存映射(Memery Map)mmap
6.3 Broker CommitLog索引机制
6.3.1 索引的数据结构
6.3.2 索引的构建过程
1.创建Consume Queue和Index File
2.索引创建失败怎么办
6.3.3 索引如何使用
1.按照位点查消息
2.按照时间段查消息
3.按照key查询消息
6.4 Broker过期文件删除机制
6.4.1 CommitLog文件的删除过程
6.4.2 Consume Queue、Index File文件的删除过程
6.5 Broker主从同步机制
6.5.1 主从同步概述
每个borker可以有自己的副本slave。
6.5.2 主从同步流程
1.名词解释
2.配置数据同步流程
3.CommitLog数据同步流程
主从架构
Master Broker
Slave Broker
Master Broker
Slave Broker
Master Broker
Slave Broker
6.6 Broker的关机恢复机制
6.6.1 Broker关机恢复概述
6.6.2 Broker关机恢复流程
Broker 在启动时会初始化 abort、checkpoint 两个文件。正常关闭进程时会删除 abort文件,将 checkpoint 文件刷盘;异常关闭时,通常来不及删除abort 文件。由此,在重新启动Broker时会根据abort判断是否需要异常停止进程,而后恢复数据。
CommitLog消息顺序写入机制
样例: 接到消息
CommitLog
消息a
消息b
消息c
消息d
......
消息a
消息b
消息c
消息d
......
可以是很多log文件,每个文件限制1GB大小
采用PageCache写入+OS异步刷盘+磁盘顺序写的策略,提升性能,基本和使用内存差不多
即不直接写入磁盘,先写入OS的pageCache中,再定时异步刷入磁盘,顺序写入文件
相当于基于内存操作,有数据丢失风险
可以设定为同步刷盘,即强制写入磁盘后才返回ack,除非磁盘坏了,不然会导致吞吐量急剧下降
FlushDiskType
ConsumeQueue
对应MessageQueue Topic下的每个MessageQueue都会有一系列的ConsumeQueue文件
$HOME/store/consumequeue/{topic}/{queueId}/{filename}
ConsumeQueue里面储存的是CommitLog的偏移量offset
ConsumeQueue同样是基于OS cache的
它的每条数据非常小,30w消息可能才5.7M
大部分都是放在OS cache中的,性能非常高
JMS中的Provider
Broker向所有的NameServer结点建立长连接,注册Topic信息。
每隔30秒发送心跳到NameServer中
中转消息,消息持久化
底层通信基于Netty
Netty NIO 、 Netty 引入
Produce
TCP长连接
端口
Broker
Reactor主线程
Reactor主线程
SocketChannel
长连接发送消息
SocketChannel
Reactor线程池
线程1
线程2
线程3
线程4
...
线程1
线程2
线程3
线程4
...
Worker线程池
线程1
线程2
线程3
线程4
...
线程1
线程2
线程3
线程4
...
mmap内存映射技术+pageCache实现性能读取
正常IO读取数据会经过两次内核态切换,两次数据拷贝
用户私有进程->内核IO缓冲区->磁盘
mmap内存映射技术,映射用户私有进程地址和磁盘数据地址到pageCache不需要从OS cache拷贝至用户私有进程
不能太大,1.5G到2G之间
写入同理,直接进入pageCache,然后异步刷盘
除此之外,还有内存预映射机制,文件预热来优化性能
名字服务/邮局(NameServer 、Namesrv)
什么是Namesrv ?概述
RocketMQ 服务发现(Name Server)
Namesrv
消息队列的协调者
Namesrv架构
作用
用于管理broker
临时保存、管理Topic路由信息
各个Namesrv节点是无状态的
Broker向它注册路由信息,同时Producer和Consumer向其获取路由信息
即每两个Namesrv节点之间不通信,互相不知道彼此的存在。
Namesrv核心数据结构和API
不使用ZK,而是NameServer
早期是zk后来改了
Namesrv和Zookeeper的区别
Namesrv实现了AP
可用性(Availability)
分区容错性(Partition tolerance)
Zookeeper实现了CP
一致性(Consistency)
分区容错性(Partition tolerance)
NameServer
无状态模式
broker向发心跳顺便带上所有的Topic信息
在Broker、生产者、消费者启动时,轮询全部配置的 Namesrv 节点,拉取路由信息。
支持集群化部署
Peer-to-peer
Broker会注册到NameServer,Producer和Consumer用NameServer来发现Broker
每隔10秒检查Broker的最新心跳时间,如果超过120S都没有发送心跳,则从路由中移除
Producer和Consumer每隔30秒拉取NameServer上的信息。ScheduleAtFixRate
Name Server是RocketMQ的寻址服务。
用于把Broker的路由信息做聚合。用户端依靠Name Server决定去获取对应topic的路由信息,从而决定对哪些Broker做连接。
Name Server是一个几乎无状态的结点,Name Server之间采取share-nothing的设计,互不通信。
Name Server所有状态都从Broker上报而来,本身不存储任何状态,所有数据均在内存。
5.3 RocketMQ的路由原理
注册
broker需要向每个NameServer注册
master和slave都需要注册
通过tcp长连接与broker通信
broker跟每个NameServer建立长连接
心跳机制
broker每30s向NameServer发送一次心跳
源码中,心跳即重新发送了一次注册
NameServer内部维护了一个ConcurrentHashMap存储注册的broker
NameServer每10s钟检查一次心跳
120s未接到心跳,则认为该broker宕机了
消息/邮件(Message、Message Queue)
主题/地区(Topic)
每条消息只能属于一个主题,是RocketMQ进行消息订阅的基本单位
可以根据tag之类过滤消息
一个topic中可能有上百万的数据
分布式存储在多个broker上
topic可以设置多个MessageQueue, 分布在不同的broker上
用来区分不同类型的消息,
发送和接收消息前都需要先创建Topic,针对Topic来发送和接收消息
标签(Tag)
为消息设置的标志,用于同一主题下区分不同类型的消息。
来自同一业务单元的消息,可以根据不同业务目的在同一主题下设置不同标签。
标签能够有效地保持代码的清晰度和连贯性,并优化RocketMQ提供的查询系统。
消费者可以根据Tag实现对不同子主题的不同消费逻辑,实现更好的扩展性。
Offset
Message
Message 是消息的载体。
顺序消息
场景
多个consume消费消息,导致消息消费无序
解决方案
取模选择对应messageQueue+批量等待机制
首先要保证消息是有序进入MQ的
消息放入MQ之前,对id等关键字进行取模,放入指定messageQueue
consume消费消息失败时,不能返回reconsume——later,这样会导致乱序
应该返回suspend_current_queue_a_moment,意思是先等一会,一会儿再处理这批消息,而不是放到重试队列里
应该返回suspend_current_queue_a_moment,意思是先等一会,一会儿再处理这批消息,而不是放到重试队列里
普通顺序消息(Normal Ordered Message)
严格顺序消息(Strictly Ordered Message)
RocketMQ部署架构和技术架构
RocketMQ实现的功能特性、特性(features)、特点、优点
基本特点
阿里出品
RocketMQ是阿里的开源消息中间件
久进沙场,非常可靠,几乎同时解决了kafka和Rabbitmq的缺陷
阿里巴巴的MQ中间件
RocketMQ是阿里巴巴开源的分布式消息中间件
Apache
现在是Apache的一个顶级项目
开源
很容易阅读他的源码,甚至修改他的源码
Java语言开发
基于java开发,符合国内大多数公司技术栈
由java语言开发
功能完备
性能非常好,能够撑住双十一的大流量,而且使用起来很简单。
在阿里内部使用非常广泛,已经经过了"双11"这种万亿级的消息流转。
可支持各种高级功能
1 订阅与发布
2 消息顺序
顺序消息
顺序消息
Hash取模
RocketMQ提供了MessageQueueSelector队列选择机制
顺序发送 顺序消费由 消费者保证
消费者是多线程
3 消息过滤
4 消息可靠性
Broker非正常关闭
Broker异常Crash
OS Crash
机器掉电,但是能立即恢复供电情况
机器无法开机(可能是cpu、主板、内存等关键设备损坏)
磁盘设备损坏
Broker异常Crash
OS Crash
机器掉电,但是能立即恢复供电情况
机器无法开机(可能是cpu、主板、内存等关键设备损坏)
磁盘设备损坏
5 至少一次
6 回溯消费
消息回溯
可查询
发送成功后返回consume_success
回溯消费
消费保证
7 事务消息
8 定时消息
9 消息重试
10 消息重投
11 流量控制
12 死信队列
集群、高可用
可以保证高可用性
高可用性
主从同步
意义
数据备份
高可用
提高性能
消费实时
数据同步
集群名字相同,连接到相同NameServer,brokerId=0代表master,1代表slave
刷盘类型
主从异步复制
主从同步双写
异步刷盘
同步刷盘
主从同步流程
从服务建立TCP连接主服务器,每隔5S向主服务器发送commitLog文件最大偏移量拉取还未同步消息
主服务器开启监听端口,监听从服务器发送过来的消息,解析并返回查找出未同步的消息给从服务器
客户端收到主服务器的消息后,将这批消息写入commitLog文件中,然后更新commitLog拉取偏移量,介者继续向主服务器拉取未同步消息
故障转移
Dledger集群搭建
高可用
集群
NameService 集群
Broker 主从 双主 双从
Consumer 自动切换
producer 链接两个Broker
刷盘
同步 超时会返回错误
异步 不返回
消息的主从复制
同步复制
异步复制
主从同步 异步刷盘
支持集群模式
多master
多master多slave异步复制
多master多slave双写
水平扩展
集群化部署
可以部署大规模的集群。
高吞吐量
性能很高
吞吐量很高,单机可达10万QPS以上
消息积压
亿级堆积
消息不丢失
支持通过配置保证数据不丢失
实时消息
丰富的消息机制
Rocket MQ提供给我们push|pull 两种消息拉取机制
push
推送消息->即MQ主动将消息推送给消费者
Rocket MQ的push底层实现其实是一次次的pull request
这就产生了一个问题,如果消费者实时感知消息,就需要通过不断的轮询进行消息的拉取
consumer不断轮询,一次pull request不到,马上就进行下一次pull request,这无疑是非常消耗性能的。
pull
拉取消息->即消费者通过轮询机制从broker中拉取消息
可能有消息积压的问题
长轮询机制
由客户端发起pull请求,服务端接收到客户端请求后,如果发现队列中没有消息,并不立即返回
而是持有该请求一段时间,在此期间,服务端不断轮询队列中是否有新消息
如果有,则用现在连接将消息返回给客户端,如果一段时间内还是没有消息,则返回空
好处在于,其本质还是pull,所以消息处理的主动权还是在客户端手里,客户端可以根据自己的能力去做消息处理
而服务端持有请求一段时间的机制由很大程度避免了空拉取,减少资源浪费
但是这种机制也有一定的问题,当客户端数量过多时,服务端可能在某个时间段内需要持有过多的连接,服务端压力比较大
不过一般来说,消息队列的承压能力还是比较可靠的,再加上集群的保障,基本不用担心这个问题。
支持读写分离
master根据自己的负载,向consume发送建议,是否从slave拉取消息
slave同步落后过多时,master建议consume只从master拉取消息
比如100w消息,slave才同步了96w
故障自动切换
slave挂掉对整体有点影响不过不大
master挂掉需要运维工程师手动调整配置,把slave切换成master,不支持自动主备切换
master-slave不是彻底的高可用,无法实现自动主备切换,version 4.5后,引入了Dledger,实现了高可用自动切换
消息零丢失
produce
事务消息机制->half-commit
流程
先发送一条half消息到MQ,看MQ是不是还活着
失败
一系列回滚操作,退款等
成功
说明MQ状态正常,继续做后续流程
执行本地逻辑,db插入等
本地方法失败
发送一个rollback消息到MQ,在内部op_topic中记录此条half消息为rollback状态
成功
继续走后续流程
提交commit到MQ
失败-订单系统感知
本地操作回滚,db数据状态回滚
MQ后台线程定时扫描half消息,长时间未commit/rollback则回调系统接口,若询问15次后仍未返回,则默认消息rollback了
查询db状态,已关闭则返回rollback给MQ,在内部op_topic中记录此条half消息为rollback状态
失败-订单系统认为已经成功
MQ后台线程定时扫描half消息,长时间未commit/rollback则回调订单接口,若询问15次后仍然未返回,则默认消息rollback了
查询db状态,已成功则返回commit给MQ,消息标记为commit并转入消费者topic中,此时消费者可见正常消费。
原理
initial
produce
发送消息->Topic(ConsumeTopic)
CommitLog
消息1
消息2
消息3
...
消息1
消息2
消息3
...
ConsumeQueue
(MessageQueue)
地址1
地址2
地址3
...
(MessageQueue)
地址1
地址2
地址3
...
consume
now
produce
发送消息->Topic(ConsumeTopic)
CommitLog
消息1
消息2
消息3
...
消息1
消息2
消息3
...
ConsumeQueue
(RMQ_SYS_TRANS_HALF_TOPIC对应的)
地址3
...
(RMQ_SYS_TRANS_HALF_TOPIC对应的)
地址3
...
ConsumeQueue(MessageQueue对应的)
地址1
地址2
地址3
...
地址1
地址2
地址3
...
consume
同步发送+不断重试机制
流程
先执行本地方法
本地方法执行完成后,同步调用MQ消息
MQ发送失败,重试三次
依旧失败的话回滚
kafka用的就是这种机制,但是这种机制仍然存在问题
MQ在本地事务之外
如果本地方法执行成功,但是发送MQ失败,在重试的时候,本地服务崩溃了就会导致本地方法已经执行完了,但是MQ没有收到消息,消息就丢了
MQ在本地事务之内
看似解决了问题,但重试会导致性能接口非常差,而且本地方法中可能还有写redis/es的操作,是无法通过事务回滚的。
broker
默认异步刷盘->同步刷盘
流程
initial
MQ接收到消息,写入CommitLog(OS cache)中,写入consumeQueue(OS cache)中
后台线程异步将os cache的数据刷入磁盘
消费者直接从os cache中拿到consumeQueue的偏移量offset对应的commitLog的消息,进行消费
如果此时broker宕机,消息就丢了,因为此时消息都在os cache中,未写入磁盘持久化
now
MQ接收到消息,写入commitLog(os cache)中,写入consumeQueue(os cache)中
强制要求同步刷入磁盘
flushDiskType由默认的ASYNC_FLUSH->SYNC_FLUSH
消费者直接从OS cache中拿到consumeQueue的偏移量offset对应的commitLog消息进行消费
数据落入磁盘中,宕机也不会丢失消息
master-slave
上述方案在磁盘坏了的时候,仍然会丢失消息,通过主从架构将消息备份到slave,多备份机制避免因磁盘损坏导致的消息丢失问题
consume
手动提交offset+自动故障转移
监听器监听消息
成功消费消息后,返回一个consume_commit标识
注意,消息消费不能异步
如果消息未消费完宕机,则本地事务回滚,未返回consume_commit标识给MQ
MQ感知到消费者机器宕机,且未完成消息消费,自动将消息发送给消费者组内的其他机器进行消费
消费者机器宕机时,MQ会执行rebalance, 重新分配messagequeue对应消费者机器
存储特点
零拷贝
零拷贝原理
mmap+write
文件系统
ext4
数据存储结构
消费队列服务
消息索引服务
事务状态服务
定时消息服务
RocketMQ的缺点
官方文档内容较为简单
官方文档相对简单。
RocketMQ的使用场景
适合广泛使用
国内很多一线互联网大厂都切换使用RocketMQ了,他们需要rocketMQ的高吞吐量,大规模部署能力,
还有各种高阶功能去支撑自己的各种业务场景,同时还可以根据自己的需求定制修改RocketMQ的源码。
解耦
降低系统耦合
异步
提高接收能力
异步化提升性能
流量削峰,削峰填谷
系统A发送过来的每秒1万请求是一个流量洪峰,然后MQ直接扛下来了,都存储在本地磁盘,
这个过程就是流量削峰的过程,瞬间把一个洪峰给削下来了,让系统B后续慢慢获取消息来处理。
并行
最终一致
RocketMQ关键机制的设计原理
1 消息存储
1.1 消息存储整体架构
(1) CommitLog:消息主体以及元数据的存储主体,存储Producer端写入的消息主体内容,消息内容不是定长的。
(2) ConsumeQueue
Consumer即可根据ConsumeQueue来查找待消费的消息
(3) IndexFile:IndexFile(索引文件)提供了一种可以通过key或时间区间来查询消息的方法。
1.2 页缓存与内存映射
页缓存(PageCache)是OS对文件的缓存,用于加速对文件的读写。
1.3 消息刷盘
2 通信机制
2.1 Remoting通信类结构
2.2 协议设计与编解码
2.3 消息的通信方式和流程
2.4 Reactor多线程设计
3 消息过滤
4 负载均衡
4.1 Producer的负载均衡
4.2 Consumer的负载均衡
1、Consumer端的心跳包发送
2、Consumer端实现负载均衡的核心类—RebalanceImpl
5 事务消息
5.1 RocketMQ事务消息流程概要
5.2 RocketMQ事务消息设计
1.事务消息在一阶段对用户不可见
2.Commit和Rollback操作以及Op消息的引入
3.Op消息的存储和对应关系
4.Half消息的索引构建
5.如何处理二阶段失败的消息?
6 消息查询
6.1 按照MessageId查询消息
6.2 按照Message Key查询消息
RocketMQ源代码阅读
8.1 RocketMQ源代码结构概述
8.2 RocketMQ源代码编译
8.3如何阅读源代码
8.4源代码阅读范例:通过消息id查询消息
源码分析
IDEA源码9(RocketMQ)导入
目录解析
broker
RocketMQ的Broker相关的代码,这里的代码可以用来启动Broker进程
client
RocketMQ的Producer、Consumer这些客户端的代码,生产消息、消费消息的代码都在里面
common
一些公共的代码
dev
开发相关的一些信息
distribution
部署RocketMQ的一些东西,比如bin目录 ,conf目录,等等
example
RocketMQ的一些例子
filter
RocketMQ的一些过滤器的东西
logappender和logging
RocketMQ的日志打印相关的东西
namesrv
NameServer的源码
openmessaging
开放消息标准,这个可以先忽略
remoting
这个很重要,这里放的是RocketMQ的远程网络通信模块的代码,基于netty实现的
srvutil
一些工具类
store
消息在Broker上进行存储相关的一些源码
style、test、tools
这里放的是checkstyle代码检查的东西,一些测试相关的类,还有就是tools里放的一些命令行监控工具类
源码调试方法
在源码中打一些端点,观察rocketmq源码运行过程,然后在这个过程中,需要从rocketmq实际运行和使用的角度,去观察他的源码运行流程。概括来讲可以分为三点:场景驱动+通俗语言+大量画图。
场景驱动
按照我们平常使用RocketMQ的各种场景进行源码分析,在一个场景中把各种源码串联起来分析。
通俗语言
用通俗易懂的语言表达
大量画图
尽量多画图
NameServer源码调试
启动类
NameSrvStartup
关键类
NamesrvController
NamesrvController controller = createNamesrvController(args);
start()
initialize()
this.kvConfigManager.load();
this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
this.serverBootstrap= new ServerBootstrap();
remotingServer.start()
localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort())) 设置Netty服务器监听端口号,默认9876
最核心的是用于接收网络请求的初始化Netty网络服务器
Runtime类注册了一个JVM关闭时后的shutdown钩子
registerProcessor()
DefaultRequestProcessor处理broker注册请求
registerBroker()
RouteInfoManager这个数据管理组件里面有如何把一个broker机器的数据放入RouteInfoManager中维护的路由数据表的代码。
核心思路十分简单,就是用一些类似Map的数据结构,去存放Broker的路由数据。包括Broker的clusterName,brokerId,brokerName这些核心数据。
基于java并发包下ReadWriteLock进行读写锁加锁,因为在这里更新了那么多的内存Map数据结构,必须要加一个写锁,此时只能有一个线程更新他们。
配置类
NamesrvConfig
NettyServerConfig
结合NameServer启动日志观察其启动过程。
总结
初步了解了NameServer是如何启动的,了解到他最核心的就是基于Netty实现了一个网络服务器,然后监听9876端口,可以接收到Broker和客户端的网络请求。
Broker源码调试
启动类
BrokerStartup
createBrokerController()
创建初始化以及启动BrokerController这个核心组件,启动一个broker管理控制组件,让BrokerController去控制管理Broker在JVM进程中的一切行为。包括接收网络请求,包括管理磁盘上的消息数据,以及一代对后台线程的运行。
controller.getConfiguration().registerConfig(properties);
controller.initialize();
一种线程池是处理别人发送过来请求的
一种是后台定时调度任务的
关键类
BrokerController
if(null == System.getProperty()){
NettySystemConfig.socketSndBufSize=131072
}
NettySystemConfig.socketSndBufSize=131072
}
设置Netty网络通信相关变量,socket发送缓冲大小
场景驱动
启动时干了哪些事
Broker启动了,必然要去注册自己到NameServer去,所以BrokerOuterAPI这个组件是核心组件。
Broker启动后,必然有个网络服务器去接收别人的请求,此时NettyServer这个组件是必须知道的。
当你的NettyServer接收到网络请求后,需要有线程池来处理,你这里应该有个处理各种请求的线程池。
处理请求的线程池在处理每个请求的时候,需要各种核心功能组件的协调,比如写入消息到CommitLog,然后写入索引到indexfile和consumer queue文件里去,此时你需要一些messageStore之类的组件来配合。
此外,需要一些后台定时调度运行的线程来工作,比如发送定时心跳到NameServer去。
Broker如何把自己注册到NameServer
BrokeController.this.registerBrokeAll(true, false, brokeConfig.isForceRegister());
通过BrokerOuterAPI发送网络请求(通过请求头和请求体构成了一个请求,通过底层NettyClient发送)给所有的NameServer,把这个Broker注册上去。
深入探索Broker API发送请求过程
Broker和NameServer之间的网络连接是channel,通过基于Netty的Channel API可以发送注册的网络请求给NameServer即可。
RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime);
配置类
NettyServerConfig
NettyClientConfig
BrokerConfig
MessageStoreConfig
总结
- BrokerController一旦初始化完成后,其实就准备好了Netty服务器,可以用于接收网络请求,然后准备好了处理各种请求的线程池,准备好了各种执行后台定时调度任务的线程池。同时也会在完成启动过程中向NameServer进行注册,以及保持心跳。
消息生产与消费配置,代码
RocketMQ生产经验、生产实战、生产环境其他功能、拓展
Dledger实现高可用
Dledger快速搭建(Quick Start) :
DledgerCommitLog代替Broker管理CommitLog
基于Raft协议选举Leader Broker
先发起一轮投票,都投自己
随机休眠,再次投票,先醒来的投自己,并发送投票给其他人,其他人醒来因为自己没投票,就会尊重发送投票的选择
基于Raft协议进行多副本同步
Leader Broker上的Dledger接到一条消息后,会先标记为uncommitted状态
通过DledgerServer组件将消息发送给Follow Broker的DledgerServer
收到消息的Follow Broker必须返回给Leader Broker一个ack
Leader Broker收到半数以上的Follow Broker返回的ack后,将这条消息标记为committed状态
Leader Broker崩溃的话怎么办
Dledger基于raft重新选举Leader,继续对外提供服务。
新选举出的Leader会对没完成数据同步的数据进行恢复性操作,保证数据不会丢失。
MQ限流方案
防止程序bug等导致疯狂写入MQ消息的情况
可采用令牌桶算法等来进行限制
避免因此种情况导致MQ集群挂掉
MQ迁移方案
双写+双读方案同步数据
消费历史消息
一个是从Topic的第一条数据开始消费。
CONSUME_FROM_LAST_OFFSET
一个是从最后一次消费过的消息之后开始消费。
CONSUME_FROM_FIRST_OFFSET
企业级的权限控制
权限管理(Auth Management)
配置
通过在broker端放一个acl权限文件,在里面规定好权限,哪个消费者对应哪个topic有什么操作权限等
消息轨迹追踪功能
生产环境消息轨迹追踪
消息轨迹指南(Message Trace):
broker
broker开启配置traceTopicEnable=true,存储消息轨迹追踪数据的。
开启上述选项后,启动这个broker的时候,自动创建出来一个topic,就是RMQ_SYS_TRACE_TOPIC
Broker端也会记录消息的轨迹数据,包括:消息存储的Toic,消息存储的位置,消息的key,消息的tags
producer/consumer
生产者开启代码
Producer会上报这些数据到RMQ_SYS_TRACE_TOPIC:Producer的信息,发送消息的时间,消息是否发送成功,发送消息的耗时。
consumer也会记录这些数据,包括:Consumer的信息,投递消息的时间,这是第几轮投递消息,消息消费是否成功,消费这条消息的耗时。
broker的配置文件里开启traceTopicEnable=true
启动broker时,会自动创建一个名为rmq_sys_trace_topic的topic存放消息轨迹记录
通过rocket-console上面的消息轨迹,创建查询任务,也可以看到消息的轨迹
秒杀系统技术方案、秒杀场景处理
CDN+Nginx,Lua脚本+Redis三级缓存处理静态页面
下单前加入答题模块,避免脚本疯狂调用接口以及错峰
独立部署一套秒杀响应的服务
redis控制扣减库存,快速响应
抢购完毕时,Lua脚本过滤无效请求
瞬时流量进入rocketmq削峰
难点分析
高并发的读
高并发的写
如何解决秒杀活动压力过大的问题
加数据库服务器方案
不靠谱,会导致公司服务器成本急剧飙升。
商品详情页系统的架构设计优化
页面数据静态化+多级缓存方案
把秒杀活动的商品详情页做成静态化的。即提前从数据库里把这个页面需要的数据都提取出来组装成一份静态数据放在别的地方,避免每次访问都要访问后端数据库。
CDN+Nginx+Redis多级缓存架构
第一级缓存
请求秒杀商品详情页数据时,从就近的cdn上加载,不需要每次请求都到上海机房。
Nginx基于lua脚本实现本地缓存
提前把秒杀商品详情页的数据放到Nginx中缓存,如果请求发送过来,可以从Nginx中直接加载缓存数据。不需要把请求转发到我们商品系统上去。
Nginx上存在缓存数据过期之类的问题,导致没找到我们需要的数据,此时可以由Nginx中的Lua脚本发送请求到Redis集群中加载我们提前放进去的秒杀商品数据。
如果还是没找到,把请求转发到商品详情页系统里加载即可
用答题的方案避免作弊以及延缓下单
在前端/客户端设置秒杀答题,错开大量人下单的时间,阻止作弊器刷单。
为秒杀独立出来一套订单系统,专门负责处理秒杀请求
如果秒杀下单请求和普通下单请求都由一套订单系统来承载,那么可能导致秒杀下单请求耗尽了订单系统资源,或者导致系统不稳定。然后后导致其他普通下单请求也出现问题。没法完成下单。所以我们一般会对订单系统部署两个集群,一个集群式秒杀订单系统集群,一个集群是普通订单系统集群。
基于redis实现下单时精准扣减库存,一旦库存扣减完则秒杀结束
一般会将每个秒杀商品的库存提前写入Redis中,然后在请求到来之后,直接对redis中库存进行扣减。
抢购完毕后提前过滤无效请求,大幅度消减转发到后端流量
在redis中库存被扣减完成后,说明后续其他请求没有必要发送到秒杀系统中了。因为商品已经被抢购完成了。此时我们可以让Nginx接收到后续请求的时候,直接把后续请求过滤掉。
比如一旦商品抢购完毕,可以在Zookeeper中写入一个秒杀完毕的标志位,然后ZK会反向通知Nginx中我们自己写的Lua脚本,通过Lua脚本后续在请求过来的时候直接过滤掉,不要向后转发了。
瞬时高并发下单请求进入RocketMQ进行削峰,订单系统慢慢拉取消息完成下单操作
如果判断发现通过redis完成了库存扣减,此时直接发送消息到RocketMQ中即可,让普通订单系统从RocketMQ中消费秒杀成功的消息进行常规性的流程处理即可,比如创建订单等等。此时的化,瞬间上万并发的压力会被RocketMQ轻松扛下来,然后普通订单系统可以根据自己的工作负载慢慢的从RocketMQ中拉取秒杀成功的消息,然后进行后续操作就可以了,不会对订单数据库造成过大的压力。
后续订单系统以每秒几千的速率慢慢处理,延迟个可能几十秒,这些下单请求就会处理完毕。
总结
避免直接基于数据库进行高并发库存扣减
架构优化的核心时独立出来一套系统专门处理,避免高并发请求落在mysql上。
后续占据99%的请求,直接在Nginx层面拦截掉
订单请求,写入RocketMQ进行削峰,让RocketMQ轻松抗下高并发压力,让订单系统慢慢消费和处理下单操作
提高消费者的吞吐量
可以设置consumer端的参数:consumeThreadMin,consumeThreadMax,这样一台consumer机器上的消费线程越多,消费的速度就越快。
可以开启消费者批量消费的功能,设置consumeMessageBatchMaxSize参数,默认是1,设置的多一些,就会交给你的回调函数一批消息处理
MQ百万消息积压处理
一般如果消息不重要的话就在consume上直接释放掉
如果topic的messageQueue设置的比较多,比如设置了20个,consume实例只有4个,那么每个consume实例对应5个messageQueue,这个时候可以申请临时加机器,增加consume实例为20个,达到快速消费的目的
如果messageQueue设置的比较少,比如只设置了4个,那么这个时候就不能通过加consume机器来解决了,这时候就需要修改消费者代码了,不再消费者消费,而是把要消费的消息放到mq的另一个topic中,这个topic设置20个messageQueue,对应20个consume实例,进行消费
基于MQ的数据同步方案
介绍
Mysql Binlog同步系统,会监听Mysql数据库的Binlog,发送给你的系统,让你处理这些增删该操作日志
开源技术方案
阿里开源的Canal
Linkedin开源的Databus
数据同步
Produce向NameServer拉取broker信息
主从同步:slave从master拉取数据
consume从broker拉取数据消费
具体方案
采用Canal监听MYSQL binlog,然后直接发送到RocketMQ里
大数据团队的数据同步系统从RocketMQ种获取到Mysql Binlog,接着把其中的数据库操作还原到自己的数据库种就可以。
要解决的问题
MQ消息乱序,导致大数据存储中的数据都错乱了。
万一有顺序的消息处理失败了可以走重试队列吗?
处理不关注表的binlog,很浪费时间。
对应解决方法
业务id相同,必须保证进入同一个MessageQueue,可以采用取模的方法,用id对MessageQueue的数量进行取模。
这时候不能返回RECONSUME_LATER状态,就必须返回SUSPEND_CURRENT_QUEUE_A_MOMENT,意思是先等一会,一会再继续处理这批消息,而不能把这批消息放入重试队列去,然后直接处理下一批消息。
发送消息的时候,给消息设置tag和属性。消费消息的时候,则根据tag和属性过滤。
代码
过滤语法
(1)数值比较,比如:>,>=,<,<=,BETWEEN,=;
(2)字符比较,比如:=,<>,IN;
(3)IS NULL 或者 IS NOT NULL;
(4)逻辑符号 AND,OR,NOT;
(5)数值,比如:123,3.1415;
(6)字符,比如:'abc',必须用单引号包裹起来;
(7)NULL,特殊的常量
(8)布尔值,TRUE 或 FALSE
由于消费者系统故障导致的RocketMQ百万消息积压问题
如果消息允许丢失,就可以紧急修改消费者系统代码,在代码中对所有消息都获取到的就直接丢弃,不做任何处理,这样可以迅速的让积压在MQ的百万消息被处理掉,只不过处理方式是全部丢弃而已。
临时申请16台机器多部署16个消费者系统的实例,然后20个消费者同时消费,每个人消费一个MessageQueue的消息,此时你会发现消费的速度提高五倍,很快积压的百万消息都会被处理完毕。处理完成后,直接下线新加的机器。
如果messageQueue数量有限,只有4个,然后就没办法扩容消费者系统,因为加再多的消费者系统,还是只有4个messageQueue,没法并行消费,所以可以临时修改4个消费者的代码,让他们获取消息后不写nosql,而是直接把消息写入新的topic,这个速度很快,因为仅仅只是读写mq,然后新的messageQueue有20个,可以再部署20台临时追加的消费者系统,去消费新的topic后写入数据到NoSQL里去,这样子也可以迅速增加消费者的并行处理能力,使得用一个新的topic来允许更多的消费者系统并行处理。
金融级系统设计MQ崩溃高可用方案
try-catch机制,发送消息到MQ,失败的话捕获,重试3次,依旧失败则认为MQ集群崩溃
这个时候需要把消息有序的放到DB/NO SQL中,待MQ集群恢复正常后,再进行消费
注意:此方案保证消息写入有序
金融级的系统如何针对RocketMQ集群崩溃设计高可用方案
通常会在你发送消息到MQ的那个系统中设计高可用的降级方案,这个降级方案通常的思路是,需要在你发送消息到MQ代码里去try catch捕获异常,如果你发送消息到MQ有异常,此时你需要进行重试。
如果连续重试3次还是失败,说明此时可能就是你的MQ集群崩溃了,此时你必须把这条重要的消息写入本地存储中去,可以说是写入数据库中,也可以是写入到机器本地磁盘文件里,或者noSQL存储里,几种方式我们都做过。之后只要你不停的发送消息到MQ去,一旦发现MQ集群恢复了,你必须有一个后台线程可以把之前持久化存储的消息都查询出来,然后依次按照顺序发送到MQ集群里去,这样才能保证你的消息不会因为MQ奔溃而彻底丢失。
你把消息写入存储中暂存时,一定要保证他的顺序,比如按照顺序一条一条的写入本地磁盘文件去暂存消息。一旦集群故障了,你后续的所有写消息代码必须严格按照顺序把消息写入到本地磁盘文件里暂存,这个顺序是要严格保证的。只要有这个方案在,哪怕你的MQ集群突然崩溃了,你的系统也是不会丢失消息的。
给RocketMQ增加消息限流功能保证其高可用性
接收消息时,必须引入限流机制,可以采用令牌桶算法。
设计一套Kafka到RocketMQ的双写+双读技术方案
如果你要做MQ集群迁移,是不可能简单粗暴的,在某个时间点突然之间就说把所有的producer系统都停机了,然后更新他的代码,接着全部重新上线,然后所有的Producer系统都把消息写入到RocketMQ里去了。
一般来说,首先你要做到双写,也就是说你所有的Producer系统中,要引入一个双写的代码,让他同时往kafka和rocketmq中写入消息,然后多写几天,起码双写要持续1周左右,因为mq一般都是实时数据,里面数据最多保留一周。
当你双写持续一周过后,你会发现你的kafka和rocketmq里面的数据看起来几乎一模一样了,因为MQ也只保留近几天的数据。
但是光双写还是不够的,还需要同时进行双读,也就是说在你双写的同时,你所有的consumer系统都需要同时从kafka和rocketmq获取消息,分别用一摸一样的逻辑处理一遍,只不过从kafka获取到的消息还是走核心逻辑去处理,然后落库或者别的存储什么的,但是对于rocketmq用一样的逻辑处理,只是不落库。
你的consumer系统在同时从kafka和rocketmq进行消息读取的时候,需要统计每个mq当日读取和处理的消息数量,这点非常重要,同时对于rocketmq读取的消息处理之后的结果。可以写入临时存储中。
当你观察一段时间,发现双写和双读一段时间之后,如果所有的consumer系统通过对比发现,从kafka和rocketmq读取和处理的消息数量一致,同时处理之后得到的结果一致,此时就可以判断说当前kafka和rocketmq里的消息一致,可以计算出来的结果也是一致的。
这个时候就可以实施正式的切换了,可以停机Producer,再重新修改上线后,全部修改为仅仅写rocketmq这个时候数据也不会丢失,因为之前已经双写了一段时间后,然后所有的consumer系统可以全部下线后,修改代码再上限,全部基于rocketmq来获取消息,计算和处理,结果写入存储中。
灵活运用tags来过滤数据, Tag实现消息过滤
设置tag,consume进行消费的时候可以根据tag进行过滤
consume.subscribe("TopicOrderDbData","Table A || Table B")
consume.subscribe("TopicOrderDbData",MessageSelector.bySql("a > 5 and b = 'abc'"))
RocketMQ还是支持比较丰富的数据过滤语法的
1.数值比较,比如:>,>=,<,<=, BETWEEN, =;
2.字符比较,比如:=,<>, IN;
3.IS NULL或IS NOT NULL;
4.逻辑符号AND, OR, NOT;
5.数值, 比如:123,3.1415;
6.字符, 比如:'abc',必须用单引号包裹起来;
7.NULL,特殊的常量
8.布尔值,TRUE或者FALSE
基于消息key来定位消息是否丢失
mqadmin queryMsgByKey -n 127.0.0.1:9876 -t SCANERCORD -k orderId
消息零丢失方案,消息零丢失方案的补充
消息发送零丢失
事务消息机制
先发送half消息给mq,如果失败,就无法与mq通信,则回滚本地事务。
如果成功,MQ会把half消息写入到自己内部的“RMQ_SYS_TRANS_HALF_TOPIC”这个topic对应的一个ConsumeQueue,此时则发送消息给生产者,消费者则完成本地任务。
如果更新本地任务失败(成功),发送rollback(commit)请求。通知删除half消息。本地可以写入支付失败记录,开启后台线程在MYSQL恢复之后,把订单状态更新为已关闭。
当mq系统没有收到half消息commit/rollback响应,mq后台会有定时任务,定时任务会扫描RMQ_SYS_TRANS_HALF_TOPIC的half消息,如果超过一定时间还是half消息,即mq没有收到commit/rollback操作,会回调生产者接口,(最多回调15次,如果15次后你没法告知他half消息的状态,就直接标记为rollback)查询half消息是否应该rockback还是commit,再做处理。
在RocketMQ执行rollback操作时,在内部OP_TOPIC中写入一条rollback OP记录到这个topic中,标记某个half消息是否rollback了。
如果生产者执行了commit操作,RocketMQ就会在OP_TOPIC里写入一条记录,标记half消息已经是commit状态,接着需要把放在RMQ_SYS_TRANS_HALF_TOPI中half消息给写入到对应topic的ConsumeQueue里。然后消费者就可以进行消费了。
总结
如此复杂的机制(事务消息机制)可能导致整体性能比较差,而且吞吐量比较低,这时可以用同步发送消息+反复重试多次的方案去确保消息绝对投递到了MQ中,似乎还是不够的。
代码
异步刷盘调整为同步刷盘
以免将half topic写入对应topic时系统宕机造成消息丢失。
flushDiskType配置设置为SYNC_FLUSH,默认值时ASYNC_FLUSH,默认是异步刷盘。
通过主从架构模式避免磁盘故障导致的消息丢失
保证数据在Master Broker和Slave Broker上有多个副本的冗余。
消息消费零丢失
代码
消费者获取到一批消息后,回调监听器函数,处理这一批消息,处理完毕后,返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS作为消费成功的示意
接着提交这批消息的offset到broker中去。如果系统宕机,没有提交offset给broker,broker不会认为你已经处理完了这批消息,此时他其实会感知到consumer挂了。会把没处理完的消息交到其他消费者机器去进行处理,这种情况下,消息也绝对不会丢失。
总结
发送消息MQ零丢失
方案一(同步发送消息+反复多次重试)
方案二(事务消息机制),两者都有保证消息发送零丢失的效果,但是经过分析,事务消息方案整体会更好一些
MQ收到消息之后的零丢失
开启同步刷盘策略+主从架构同步机制,只要让一个Broker收到消息之后同步写入磁盘,同时复制给其他Broker,然后再返回响应给生产者说写入成功,此时就可以保证MQ自己不会再弄丢消息。
消费消息的零丢失
采用RocketMQ的消费者天然就可以保证你处理完消息之后,才会提交消息的offset到broker去,只要记住别采用多线程异步处理消息的方式即可。
不能有任何差错的核心链路采用消息零丢失方案,非核心链路采用同步发送消息+反复重试几次的方案。
RocketMQ企业最佳实践
生产配置以及压测
机器配置
NameServer
3台
8C16GB
Broker
3台
24C48GB
单集群->单master & 双slave
Produce
2台
4C8GB
Consume
2台
4C8GB
监控
rocket-console可视化监控平台
MQ监控
Zabbix || Open-Falcon
机器资源监控。 cpu || io || jvm
参数调整
操作系统
vm.overcommit_memory=1
把所有可用的物理内存都分配给你
vm.swappiness=10
尽量使用磁盘内存,不要放到swap中去
vm.max_max_count=655360
保证中间件可以开启足够多的线程
ulimit=1000000
最大文件连接数,保证网络通信和磁盘io
JVM
大体就是优化jvm相关参数,rocketmq默认是g1回收器
MQ核心参数
sendMessageThreadPoolNums=10
内部用来发送消息的线程数,可根据机器配置相应调整
压测
测试出最高负载
即MQ的TPS和机器的资源和负载之间取得一个平衡
需要关注的影响点
RocketMQ的tps和消息延时
cpu负载情况
内存使用率
jvm gc频率
磁盘io负载
网卡流量
sar -n DEV12
5 客户端配置
5.1 客户端寻址方式
5.2 客户端配置
客户端使用指南(配置)
ClientConfig
ClientConfig 为客户端的公共配置类
继承
DefaultMQAdminExt
DefaultMQProducer
DefaultMQPullConsumer
DefaultMQPushConsumer
配置
namesrvAddr
instanceName
RocketMQ用一个叫ClientID的概念,来唯一标记一个客户端实例,一个客户端实例对于Broker而言会开辟一个Netty的客户端实例。
persistConsumerOffsetInterval
由于持久化不是立刻持久化的,所以如果消费实例突然退出(如断点)、
或者触发了负载均衡分consue queue重排,有可能会有已经消费过的消费进度没有及时更新而导致重新投递。
故本配置值越小,重复的概率越低,但同时也会增加网络通信的负担。
vipChannelEnabled
broker的netty server会起两个通信服务。两个服务除了服务的端口号不一样,其他都一样。
其中一个的端口(配置端口-2)作为vip通道,客户端可以启用本设置项把发送消息此vip通道。
DefaultMQProducer
配置
producerGroup
createTopicKey
defaultTopicQueueNums
sendMsgTimeout
TransactionMQProducer
事务消息机制
事务生产者,截至至4.1,由于暂时事务回查功能缺失,整体并不完全可用
4.3可用了
DefaultMQPushConsumer
consumerGroup
messageModel
CLUSTERING //集群消费模式
BROADCASTING //广播消费模式
consumeFromWhere
consumeTimestamp
allocateMessageQueueStrategy
AllocateMessageQueueAveragely //取模平均
AllocateMessageQueueAveragelyByCircle //环形平均
AllocateMessageQueueByConfig // 按照配置,传入的messageQueueList
AllocateMessageQueueByMachineRoom //按机房,从源码上看,必须和阿里的某些broker命名一致才行
AllocateMessageQueueConsistentHash //一致性哈希算法。用于解决“惊群效应”。
subscription
messageListener
offsetStore
若没有显示设置的情况下,广播模式将使用LocalFileOffsetStore,集群模式将使用RemoteBrokerOffsetStore,不建议修改。
consumeThreadMin
consumeThreadMax
consumeConcurrentlyMaxSpan
并发消费下,单条consume queue队列允许的最大offset跨度,达到则触发流控
注:只对并发消费(ConsumeMessageConcurrentlyService)生效
pullInterval
pullBatchSize
幂等去重策略
maxReconsumeTimes
一个消息如果消费失败的话,最多重新消费多少次才投递到死信队列
注,这个值默认值虽然是-1,但是实际使用的时候默认并不是-1。按照消费是并行还是串行消费有所不同的默认值。
并行:默认16次
串行:默认无限大(Interge.MAX_VALUE)。
由于顺序消费的特性必须等待前面的消息成功消费才能消费后面的,默认无限大即一直不断消费直到消费完成。
consumeTimeout
消费的最长超时时间。默认
15分钟
如果消费超时,RocketMQ会等同于消费失败来处理
DefaultMQPullConsumer
registerTopics
allocateMessageQueueStrategy
6 系统配置
6.1 JVM选项
6.2 Linux内核参数
9.1 RocketMQ落地概述
9.2 RocketMQ集群管理
9.3 RocketMQ集群监控和报警
9.4 RocketMQ集群迁移
9.5 RocketMQ测试环境实践
9.6 RocketMQ接入实践
运维管理之集群搭建与部署(Operation): .
集群部署(Cluster Deployment):
4 种高可用 RocketMQ 集群搭建方案!
集群搭建
nohup sh bin/mqnamesrv &
nohup sh bin/mqbroker -n localhost:9876 &
sh bin/mqshutdown broker
sh bin/mqshutdown namesrv
集群搭建
单Master模式
单master模式
多Master模式
多master模式
多Master多Slave模式-异步复制
多master多slave异步复制模式
多Master多Slave模式-同步双写
多master多slave同步双写模式
mqadmin管理工具
运维常见问题
RocketMQ的消费流程和最佳实践
3.1消费者概述
3.2消费者启动机制
1. DefaultLitePullConsumer
3.3消费者的Rebalance机制
1. 这里我们主要讲消费者实例在收到Broker通知后是怎么执行Rebalance的
2. 目前队列分配策略有以下5种实现方法:
AllocateMessageQueueStrategy
AllocateMessageQueueAveragely:平均分配,也是默认使用的策略(强烈推荐)。
AllocateMessageQueueAveragelyByCircle:环形分配策略。
AllocateMessageQueueByConfig:手动配置。
AllocateMessageQueueConsistentHash:一致性Hash分配。
AllocateMessageQueueByMachineRoom:机房分配策略。
3.4消费进度保存机制
从 3.1.2 节可知,在消费者启动时会同时启动位点管理器,那么位点具体是怎么管理的呢?RocketMQ 设计了远程位点管理和本地位点管理两种位点管理方式。集群消费时,位点由客户端提交给Broker保存,具体实现代码在RemoteBrokerOffsetStore.java文件中;广播消费时,位点保存在消费者本地磁盘上,实现代码在LocalFileOffsetStore.java文件中。
3.5消费方式
1. Pull方式
2. Push 方式
3.5.1 Pull消费流程
3.6消息过滤
3.6.1 为什么要设计过滤功能
RocketMQ 设计了消息过滤,来解决大量无意义流量的传输:即对于客户端不需要的消息,Broker就不会传输给客户端,以免浪费宽带。
3.6.1 为什么要设计过滤功能
3.7消费者最佳实践总结
RocketMQ的生产者原理和最佳实践
2.1生产者原理
2.2生产者启动流程
2.3消息发送流程
2.4发送消息最佳实践
2.5生产者最佳实践总结
RocketMQ环境搭建
接下来我们先在linux平台下安装一个RocketMQ的服务
环境准备
下载RocketMQ
http://rocketmq.apache.org/release_notes/release-notes-4.4.0/
环境要求
Linux 64位操作系统
64bit JDK 1.8+
64bit JDK 1.8+
安装RocketMQ
上传文件到Linux系统
[root@heima rocketmq]# ls /usr/local/src/
rocketmq-all-4.4.0-bin-release.zip
rocketmq-all-4.4.0-bin-release.zip
解压到安装目录
[root@heima src]# unzip rocketmq-all-4.4.0-bin-release.zip
[root@heima src]# mv rocketmq-all-4.4.0-bin-release ../rocketmq
[root@heima src]# mv rocketmq-all-4.4.0-bin-release ../rocketmq
启动RocketMQ
切换到安装目录
[root@heima rocketmq]# ls
benchmark bin conf lib LICENSE NOTICE README.md
benchmark bin conf lib LICENSE NOTICE README.md
启动NameServer
[root@heima rocketmq]# nohup ./bin/mqnamesrv &
[1] 1467
# 只要进程不报错,就应该是启动成功了,可以查看一下日志
[root@heima rocketmq]# tail -f /root/logs/rocketmqlogs/namesrv.log
[1] 1467
# 只要进程不报错,就应该是启动成功了,可以查看一下日志
[root@heima rocketmq]# tail -f /root/logs/rocketmqlogs/namesrv.log
启动Broker
# 编辑bin/runbroker.sh 和 bin/runserver.sh文件,修改里面的
# JAVA_OPT="${JAVA_OPT} -server -Xms8g -Xmx8g -Xmn4g"
# 为JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m"
[root@heima rocketmq]# nohup bin/mqbroker -n localhost:9876 &
[root@heima rocketmq]# tail -f /root/logs/rocketmqlogs/broker.log
# JAVA_OPT="${JAVA_OPT} -server -Xms8g -Xmx8g -Xmn4g"
# 为JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m"
[root@heima rocketmq]# nohup bin/mqbroker -n localhost:9876 &
[root@heima rocketmq]# tail -f /root/logs/rocketmqlogs/broker.log
测试RocketMQ
1 测试消息发送
[root@heima rocketmq]# export NAMESRV_ADDR=localhost:9876
[root@heima rocketmq]# bin/tools.sh
org.apache.rocketmq.example.quickstart.Producer
[root@heima rocketmq]# bin/tools.sh
org.apache.rocketmq.example.quickstart.Producer
2 测试消息接收
[root@heima rocketmq]# export NAMESRV_ADDR=localhost:9876
[root@heima rocketmq]# bin/tools.sh
org.apache.rocketmq.example.quickstart.Consumer
[root@heima rocketmq]# bin/tools.sh
org.apache.rocketmq.example.quickstart.Consumer
关闭RocketMQ
[root@heima rocketmq]# bin/mqshutdown broker
[root@heima rocketmq]# bin/mqshutdown namesrv
[root@heima rocketmq]# bin/mqshutdown namesrv
RocketMQ架构和部署最佳实践
物理部署
逻辑部署
4.1 RocketMQ架构
4.2常用的部署拓扑和部署实践
4.2.1 常用的拓扑图
4.2.2 同步复制、异步复制和同步刷盘、异步刷盘
4.2.3 部署实践
RocketMQ控制台安装
下载
# 在git上下载下面的工程 rocketmq-console-1.0.0
https://github.com/apache/rocketmq-externals/releases
https://github.com/apache/rocketmq-externals/releases
修改配置文件
# 修改配置文件 rocketmq-console\src\main\resources\application.properties
server.port=7777 #项目启动后的端口号
rocketmq.config.namesrvAddr=192.168.109.131:9876 #nameserv的地址,注意防火墙要开启
9876端口
server.port=7777 #项目启动后的端口号
rocketmq.config.namesrvAddr=192.168.109.131:9876 #nameserv的地址,注意防火墙要开启
9876端口
打成jar包,并启动
# 进入控制台项目,将工程打成jar包
mvn clean package -Dmaven.test.skip=true
# 启动控制台
java -jar target/rocketmq-console-ng-1.0.0.jar
mvn clean package -Dmaven.test.skip=true
# 启动控制台
java -jar target/rocketmq-console-ng-1.0.0.jar
访问控制台
子主题
样例
1) 基本样例
1.1加入依赖:
1.2消息发送
1、Producer端 发送同步消息
2、发送异步消息
3、单向发送消息
1.3消费消息
2) 顺序消息样例
■2.1顺序消息生产
sh bin/tools.sh org.apache.rocketmq.example.ordermessage.Producer
2.2顺序消费消息
sh bin/tools.sh org.apache.rocketmq.example.ordermessage.Consumer
3) 延时消息样例
3.1启动消费者等待传入订阅消息
3.2发送延时消息
3.3验证
■3.4延时消息的使用场景
3.5延时消息的使用限制
)4批量消息样例
■4.1发送批量消息
■4.2消息列表分割
5过滤消息样例
5.1基本语法
5.2使用样例
■1、生产者样例
2、消费者样例
6消息事务样例
6.1发送事务消息样例
■1、创建事务性生产者
2、实现事务的监听接口
6.2事务消息使用上的限制
1. 事务消息不支持延时消息和批量消息。
2. 为了避免单个消息被检查太多次而导致半队列消息累积,我们默认将单个消息的检查次数限制为 15 次,但是用户可以通过 Broker 配置文件的 transactionCheckMax参数来修改此限制。如果已经检查某条消息超过 N 次的话( N = transactionCheckMax ) 则 Broker 将丢弃此消息,并在默认情况下同时打印错误日志。用户可以通过重写 AbstractTransactionalMessageCheckListener 类来修改这个行为。
3. 事务消息将在 Broker 配置文件中的参数 transactionTimeout 这样的特定时间长度之后被检查。当发送事务消息时,用户还可以通过设置用户属性 CHECK_IMMUNITY_TIME_IN_SECONDS 来改变这个限制,该参数优先于 transactionTimeout 参数。
4. 事务性消息可能不止一次被检查或消费。
5. 提交给用户的目标主题消息可能会失败,目前这依日志的记录而定。它的高可用性通过 RocketMQ 本身的高可用性机制来保证,如果希望确保事务消息不丢失、并且事务完整性得到保证,建议使用同步的双重写入机制。
6. 事务消息的生产者 ID 不能与其他类型消息的生产者 ID 共享。与其他类型的消息不同,事务消息允许反向查询、MQ服务器能通过它们的生产者 ID 查询到消费者。
7 Logappender样例
7.1 log4j样例
7.2 log4j2样例
7.3 logback样例
8 OpenMessaging样例
8.1 OMSProducer样例
8.2 OMSPullConsumer
8.3 OMSPushConsumer
常见面试题
RocketMQ的基本了解?
RocketMQ如何避免消息堆积?
RocketMQ如何保证消息的一致性?
可靠性
业内常用MQ
集群化支持
QPS-吞吐量
常用功能是否完备
参考书籍
《RocketMQ分布式消息中间件:核心原理与最佳实践》
RabbitMQ
RabbitMQ简介、概念
AMQP
Advanced Message Queue
高级消息队列协议
一个异步消息传递所使用应用层协议规范
为面向消息中间件设计
基于此协议的客户端与消息中间件可传递消息,并不受产品、开发语言等条件的限制。
基于此协议的客户端与消息中间件可以无视消息来源传递消息,不受客户端、消息中间件、不同的开发语言环境等条件的限制;
AMQP 的出现其实也是应了广大人民群众的需求
虽然在同步消息通讯的世界里有很多公开标准,
(如 COBAR的 IIOP ,或者是 SOAP 等)
但是在异步消息处理中却不是这样,只有大企业有一些商业实现
(如微软的 MSMQ ,IBM 的 Websphere MQ 等)
因此,在 2006 年的 6 月,Cisco 、Redhat、iMatix 等联合制定了 AMQP 的公开标准。
RabbitMQ遵循 AMQP 协议
采用erlang语言开发
使用Erlang编写的
使用erlang语言开发
一个由 Erlang 语言开发的 AMQP 的开源实现
一个由erlang开发的AMQP的开源实现
RabbitMQ是采用erlang语言开发的,所以必须有erlang环境才可以运行
RabbitMQ的起源
RabbitMQ 最初起源于金融系统
RabbitMQ是由RabbitMQ Technologies Ltd开发并且提供商业支持的。
该公司在2010年4月被SpringSource收购。
(VMWare的一个部门)
在2013年5月被并入Pivotal。
其实VMWare,Pivotal和EMC本质上是一家的。
不同的是VMWare是独立上市子公司,而Pivotal是整合了EMC的某些资源,现在并没有上市。
重量级的消息队列
更适合企业级的开发
本身支持很多的协议:AMQP,XMPP, SMTP, STOMP
也正因如此,它非常重量级,更适合于企业级的开发。
其他特点
开源
一个开源的消息队列
一种消息中间件
RabbitMQ 是非常出名的消息中间件
用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。
对路由,负载均衡或者数据持久化都有很好的支持
同时实现了Broker构架,这意味着消息在发送给客户端时,先在中心队列排队。
性能较好
适合于企业级的开发
RabbitMQ关键名词、RabbitMQ架构、工作模型
架构图
Publisher
消息的生产者,也是一个向交换器发布消息的客户端应用程序。
Server(Broker)
表示消息队列服务器实体。
接收客户端连接,实现AMQP协议的消息队列和路由功能的进程
Broker(代理/中介)
就是rabbitmq服务,默认端口5672
Virtual Host (Vhost)
虚拟主机,表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。
虚拟主机的概念,类似权限控制组
一个Virtual Host里可以有多个Exchange和Queue
Vhost(Exchange+Message Queue)
Exchange
Exchange(将消息路由给队列 )
交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。
交换机,接收生产者发送的消息,并根据Routing Key将消息路由到服务器中的队列Queue。
交换机,通过绑定键(Binding Key)与Queue绑定
ExchangeType、Exchange 类型、路由方式
交换机类型决定了路由消息行为
Exchange分发消息时,根据类型的不同,分发策略有区别,目前共四种类型。
RabbitMQ中有四种类型Exchange、RabbitMQ五种队列实现
Fanout Exchange(扇型交换机、广播分发、发布订阅模式)
Fanout(广播分发)
Fanout exchange(扇型交换机)
将消息路由给绑定到它身上的所有队列
不需要绑定键,发送到所有绑定队列
Fanout:每个发到 fanout 类型交换器的消息都会分到所有绑定的队列上去。
很像子网广播,每台子网内的主机都获得了一份复制的消息。
fanout 类型转发消息是最快的。
发布订阅模式(fanout)
添加依赖
工具类
生产者
消费者
邮件消费者
短信消费者
子主题
Direct Exchange(直连交换机、点对点(简单)的队列)
Direct exchange(直连交换机)
Direct键(routing key)分布:
绑定键和路由键完全匹配
Direct:消息中的路由键(routing key)如果和 Binding 中的 binding key 一致, 交换器就将消息发到对应的队列中。它是完全匹配、单播的模式。
是根据消息携带的路由键(routing key)将消息投递给对应队列的
点对点(简单)的队列
添加依赖
工具类
生产者
消费者
子主题
Topic Exchange(主题交换机、模式匹配交换机、通配符模式交换机)
Topic 交换器
Topic exchange(主题交换机)
Topic交换器(模式匹配)
通配符模式Topics
Topic交换器通过模式匹配分配消息的路由键属性,将路由键和某个模式进行匹配,此时队列需要绑定到一个模式上。
它将路由键和绑定键的字符串切分成单词,这些单词之间用点隔开。
队列通过路由键绑定到交换机上,然后,交换机根据消息里的路由值,将消息路由给一个或多个绑定队列
它同样也会识别两个通配符
符号“#”
#匹配0个或多个单词
# 0 个或者多个单词
符号“*”
* 不多不少一个单词
* 匹配不多不少一个单词
通配符模式Topics
添加依赖
工具类
生产者
消费者
邮件消费者
短信消费者
Topic exchange(主题交换机、模式匹配交换机、通配符模式交换机)
【性能差弃用】Headers exchange(头交换机)
类似主题交换机,但是头交换机使用多个消息属性来代替路由键建立路由规则。
通过判断消息头的值能否与指定的绑定相匹配来确立路由规则。
headers 匹配 AMQP 消息的 header 而不是路由键,
此外 headers 交换器和 direct 交换器完全一致,但性能差很多,目前几乎用不到了
(很少使用)
工作(公平性)队列模式
工作(公平性)队列模式
添加依赖
工具类
生产者
消费者
路由模式Routing
路由模式Routing
添加依赖
工具类
生产者
消费者
邮件消费者
短信消费者
Binding
(消息队列和交换器之间的关联)
绑定,用于消息队列和交换器之间的关联。
一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。
BindingKey
绑定关键字
将一个特定的Exchange和一个特定的Queue绑定起来
Message Queue
Queue
队列,存储消息
消息队列,用来保存消息直到发送给消费者。
它是消息的容器,也是消息的终点。
一个消息可投入一个或多个队列。
消息一直在队列里面,等待消费者连接到这个队列将其取走。
消息队列,用于存储还未被消费者消费的消息;
Message
消息,消息是不具名的,它由两部分组成
消息头
Header
由一系列的可选属性组成
由生产者添加的各种属性的集合
这些属性包括
routing-key(路由键)
priority(相对于其他消息的优先权)
优先级是多少
delivery-mode(指出该消息可能需要持久性存储)
由哪个Message Queue接收
Message是否被持久化
消息体
body
不透明的
真正需要发送的数据内容
Connection
网络连接,比如一个TCP连接。
生产者和消费者与Broker建立的TCP长连接
Channel
信道,多路复用连接中的一条独立的双向数据流通道。
信道是建立在真实的TCP连接内地虚拟连接,AMQP 命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。
因为对于操作系统来说建立和销毁 TCP 都是非常昂贵的开销,所以引入了信道的概念,以复用一条 TCP 连接。
虚拟连接,减少TCP创建和销毁的性能损耗,api编程最主要接口
Consumer
消息的消费者,表示一个从消息队列中取得消息的客户端应用程序。
子主题
RabbitMQ优点、具体特点包括
官方文档很详细
可靠性
高可靠
Reliability
消息不丢失
可以保证数据不丢失。
RabbitMQ 使用一些机制来保证可靠性
持久化
传输确认
发布确认
可靠性投递
消息发送到rabbitmq服务器
Transaction(事务)模式
Confirm(确认)模式
普通确认模式
批量确认模式
异步确认模式
消息从交换机路由到队列
服务端重发给生产者
交换机路由到另一个备份的交换机
消息在队列中存储
队列持久化
交换机持久化
消息持久化
集群
消费者回调
调用生产者Api
发送响应消息给生产者
补偿机制
发送间隔
重发次数
消息幂等性
生成唯一业务ID,日志或者落库
最终一致
对账
消息的顺序性
一个消费者消费一个队列
灵活的路由
灵活的路由
Flexible Routing
在消息进入队列之前,通过 Exchange 来路由消息的。
对于典型的路由功能,RabbitMQ 已经提供了一些内置的 Exchange 来实现。
针对更复杂的路由功能,可以将多个 Exchange 绑定在一起,也通过插件机制实现自己的 Exchange 。
集群与高可用
集群
(Clustering)
集群与扩展性
集群化部署
多个 RabbitMQ 服务器可以组成一个集群,形成一个逻辑 Broker
为什么要做集群
高可用
负载均衡
集群模式
普通集群
镜像队列
如何支持集群
.erlang.cookie
rabbitmq节点类型
磁盘节点
内存节点
普通集群
镜像集群
高可用
高可用
高可用队列
Highly Available Queues
基于Docker 安装 HAproxy负载Keepalived
RabbitMQ 具备低时延、高可用的特点
也能保证高可用性,即集群部署时,部分机器宕机可以继续运行。
队列可以在集群中的机器上进行镜像,使得在部分节点出问题的情况下队列仍然可用。
高可用架构
普通集群/镜像集群
多种协议
Multi-protocol
RabbitMQ 支持多种消息队列协议
本身支持很多的协议:AMQP,XMPP, SMTP, STOMP、MQTT
多语言客户端
多客户端支持
Many Clients
与Spring集成
RabbitMQ 几乎支持所有常用语言,比如 Java、.NET、Ruby 等等
可以跨平台、跨语言使用。
管理界面
权限管理
可视化管理界面
简洁易用的可视化管理界面
Management UI
RabbitMQ 提供了一个易用的用户界面,使得用户可以监控和管理消息 Broker 的许多方面。
图解RabbitMQ 可视化管理界面
RabbitMQ 可视化管理界面
跟踪机制
Tracing
如果消息异常,RabbitMQ 提供了消息跟踪机制,使用者可以找出发生了什么。
插件机制
Plugin System
插件系统
RabbitMQ 提供了许多内置插件,来从多方面进行扩展
RabbitMQ也可以编写自己的插件
支持功能较完备,支持部分高级功能,比如
死信队列
死信队列流程
生产者 --> 消息 --> 交换机 --> 队列 --> 变成死信 --> DLX交换机 -->队列 --> 消费者
消息流转图
消息什么时候变成死信
存放时机
消息被拒绝(basic.reject或basic.nack)并且requeue=false.
队列达到最大长度
生产者投递消息到MQ中,消息过期了
消费者多次消费该消息失败
变成了 “死信” 后被重新投递(publish)到另一个Exchange,
该Exchange 就是DLX,然后该Exchange 根据绑定规则转发到对应的队列上监听该队列 就可以重新消费
说白了就是没有被消费的消息换个地方重新被消费
如何使用死信交换机
配置类
正常消费者
死信消费者
死信队列服务部署
死信队列不能够和正常队列存放在同一个服务器中,应该分开服务器存放。
死信队列实际应用
基于MQ延迟队列实现订单30分钟自动超时
如果没有及时的消费者消费消息,生产者一直不断往队列服务器存放消息会导致消息堆积。
延迟队列
TTL+DLX实现
基于延迟队列插件实现
消息过期时间TTL
死信队列
消息重试
消费端限流
设置channel的prefetch count
服务端流控
内存控制
磁盘控制
RabbitMQ的缺点
吞吐量比较低,一般每秒几万级别,所以遇到特别高并发的情况,支撑起来很困难。
集群扩展很麻烦。
还有一个致命缺陷是开发语言是erlang,国内少有精通erlang语言的工程师,因此也没法阅读源码,甚至修改源码。
吞吐量较低
集群化加机器比较麻烦
erlang语言开发,不利于做二次开发和维护,不变阅读和修改源码做定制化
RabbitMQ的适用场景
适用于不需要太高吞吐量,不做定制化修改的中小型公司
一些不需要很高吞吐量,也不需要部署特别大规模集群,无需阅读和修改rabbitmq源码的中小型公司
RabbitMQ生产者消息确认机制
AMQP 事务机制
示例代码
Confirm 模式
示例代码
RabbitMQ消费者应答模式
自动
手动
//开启手动应答
channel.basicConsume(QUEUE_NAME, false, defaultConsumer);
//执行手动应答
channel.basicAck(envelope.getDeliveryTag(), false);
channel.basicConsume(QUEUE_NAME, false, defaultConsumer);
//执行手动应答
channel.basicAck(envelope.getDeliveryTag(), false);
RabbitMQ如何保证消息不丢失
生产者 确保我们的生产者将消息投递到MQ成功 使用消息确认机制
消费者 确保我们的消费者消费消息成功 采用手动ack确认
MQ服务器端 需要将数据持久化到我们的硬盘
RabbitMQ的安装与基本使用
windows下安装
下载并安装erlang:http://www.erlang.org/download
配置erlang环境变量信息
新增环境变量ERLANG_HOME=erlang的安装地址
将%ERLANG_HOME%\bin加入到path中
下载并安装RabbitMQ,下载地址:http://www.rabbitmq.com/download.html
注意: RabbitMQ 它依赖于Erlang,需要先安装Erlang。
配置erlang环境变量信息
新增环境变量ERLANG_HOME=erlang的安装地址
将%ERLANG_HOME%\bin加入到path中
下载并安装RabbitMQ,下载地址:http://www.rabbitmq.com/download.html
注意: RabbitMQ 它依赖于Erlang,需要先安装Erlang。
基于Docker安装RabbitMQ
docker安装
docker pull rabbitmq:3-management
docker run -d --hostname my-rabbit --name rabbit -p 15672:15672 -p 5672:5672 rabbitmq:3-management
docker run -d --hostname my-rabbit --name rabbit -e RABBITMQ_DEFAULT_USER=user -e RABBITMQ_DEFAULT_PASS=password -p 15672:15672 -p 5672:5672 rabbitmq:3-management
docker run -d --hostname my-rabbit --name rabbit -p 15672:15672 -p 5672:5672 rabbitmq:3-management
docker run -d --hostname my-rabbit --name rabbit -e RABBITMQ_DEFAULT_USER=user -e RABBITMQ_DEFAULT_PASS=password -p 15672:15672 -p 5672:5672 rabbitmq:3-management
https://www.cnblogs.com/yufeng218/p/9452621.html
基本使用
单机安装
java原生api操作
UI管理界面
RabbitMQ的安装、基本使用与操作实践
SpringBoot整合RabbitMQ
Spring AMQP介绍
Spring AMQP核心组件
Spring集成RabbitMQ配置文件解读
Spring Boot集成RabbitMQ
配置类代码
生产者代码
消费者代码
参数解析
SpringBoot整合RabbitMQ实现消息队列技术1
添加依赖
常量类
添加配置类
springboot会自动创建交换机和队列
配置文件
生产者
邮件消费者
短信消费者
启动类
消费者如何保证消息幂等性,不被重复消费
使用全局MessageID判断消费方使用同一个,解决幂等性。
生产者
消费者
使用业务逻辑保证唯一(比如订单号码,保证一个订单号码只可能被插入一次数据库)
RabbitMQ签收模式配置
开启手动应答模式
消费者
SpringBoot整合RabbitMQ实现消息队列技术2
针对上面的场景,使用 Spring Boot ,结合 RabbitMQ 来具体实现下水果采购、配送的管理。
使用 Spring Initializr 创建项目
Spring Boot 版本选择 2.2.5 ,Group 为 com.imooc , Artifact 为 spring-boot-rabbitmq,
生成项目后导入 Eclipse 开发环境。
引入项目依赖
<!-- AMQP 依赖 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
引入 Web 项目依赖与 AMQP 消息队列依赖。
配置 RabbitMQ 连接信息
项目创建后,通过 applicaiton.properties 配置 RabbitMQ 的链接信息。
#地址
spring.rabbitmq.host=127.0.0.1
#端口 默认5672
spring.rabbitmq.port=5672
#用户名
spring.rabbitmq.username=guest
#密码
sprng.rabbitmq.password=guest
spring.rabbitmq.host=127.0.0.1
#端口 默认5672
spring.rabbitmq.port=5672
#用户名
spring.rabbitmq.username=guest
#密码
sprng.rabbitmq.password=guest
配置两个队列
首先配置两个队列,存储苹果采购消息、香蕉采购消息。
苹果采购消息队列appleQueue
香蕉采购消息队列bananaQueue
//消息队列配置类
@Configuration
public class RabbitConfig {
//苹果采购消息队列
@Bean
public Queue appleQueue() {
return new Queue("apple-queue");
}
//香蕉采购消息队列
@Bean
public Queue bananaQueue() {
return new Queue("banana-queue");
}
}
@Configuration
public class RabbitConfig {
//苹果采购消息队列
@Bean
public Queue appleQueue() {
return new Queue("apple-queue");
}
//香蕉采购消息队列
@Bean
public Queue bananaQueue() {
return new Queue("banana-queue");
}
}
配置交换机和绑定
如果消息直接发到队列的话,不够灵活, RabbitMQ 提供了交换机与绑定机制。
消息发送给交换机,交换机可以灵活地与队列进行绑定,这样消息就可以通过多种方式进入队列了。
//配置交换机
@Bean
TopicExchange exchangeTopic() {
return new TopicExchange("exchange-topic");
}
//交换机绑定苹果采购消息队列
@Bean
Binding bindAppleQueue() {
return BindingBuilder.bind(appleQueue()).to(exchangeTopic()).with("#.apple.#");
}
//交换机绑定香蕉采购消息队列
@Bean
Binding bindBananaQueue() {
return BindingBuilder.bind(bananaQueue()).to(exchangeTopic()).with("#.banana.#");
}
@Bean
TopicExchange exchangeTopic() {
return new TopicExchange("exchange-topic");
}
//交换机绑定苹果采购消息队列
@Bean
Binding bindAppleQueue() {
return BindingBuilder.bind(appleQueue()).to(exchangeTopic()).with("#.apple.#");
}
//交换机绑定香蕉采购消息队列
@Bean
Binding bindBananaQueue() {
return BindingBuilder.bind(bananaQueue()).to(exchangeTopic()).with("#.banana.#");
}
解释下交换机与绑定的运行机制
配置了一个交换机 exchangeTopic ,它可以接收消息。
交换机 exchangeTopic 绑定了两个队列 ,说明这两个队列在关注该交换机收到的消息。
appleQueue
bananaQueue
交换机 exchangeTopic 收到的消息到底会进入哪个队列呢
发现交换机的类型是 TopicExchange ,说明该交换机是话题交换机
队列应该是获取其感兴趣的话题相关的消息。
当 appleQueue 队列绑定到交换机时,with("#.apple.#") 就表示
appleQueue 关心的是 apple 相关的话题
而 bananaQueue 关心的是 banana 相关的话题。
所以可以推断出,消息在发送时,可以指定话题相关的信息,以便消息能被关注该话题的队列接收。
经过上面的分析,消息发送时通过携带话题信息,交换机会将该消息路由到关心该话题的队列中。
创建消费者
接下来,定义消息的消费者李四、赵五了。他俩分别关心苹果采购消息和香蕉采购消息。也就是监听苹果消息队列和香蕉消息队列。
//消息队列接收
@Component
public class RabbitReceiver {
//lisi负责监听apple-queue
@RabbitListener(queues = "apple-queue")
public void lisi(String msg) {
System.out.println("李四知道:" + msg);
}
//zhaowu负责监听banana-queue
@RabbitListener(queues = "banana-queue")
public void zhaowu(String msg) {
System.out.println("赵五知道:" + msg);
}
}
@Component
public class RabbitReceiver {
//lisi负责监听apple-queue
@RabbitListener(queues = "apple-queue")
public void lisi(String msg) {
System.out.println("李四知道:" + msg);
}
//zhaowu负责监听banana-queue
@RabbitListener(queues = "banana-queue")
public void zhaowu(String msg) {
System.out.println("赵五知道:" + msg);
}
}
测试
运行启动类,从 RabbitMQ 管理界面可以看到已生成指定名称的队列了。
RabbitMQ 已生成队列
此时定义一个控制器,用于发起测试,直接使用 rabbitTemplate 发送消息即可。
@RestController
public class TestController {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/test")
public void test() {
// 发送消息 参数分别为:交换机名称、路由键、消息内容
rabbitTemplate.convertAndSend("exchange-topic", "apple", "苹果来了10斤");
rabbitTemplate.convertAndSend("exchange-topic", "banana", "香蕉来了5斤");
rabbitTemplate.convertAndSend("exchange-topic", "apple.banana", "苹果来了8斤;香蕉来了20斤");
}
}
public class TestController {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/test")
public void test() {
// 发送消息 参数分别为:交换机名称、路由键、消息内容
rabbitTemplate.convertAndSend("exchange-topic", "apple", "苹果来了10斤");
rabbitTemplate.convertAndSend("exchange-topic", "banana", "香蕉来了5斤");
rabbitTemplate.convertAndSend("exchange-topic", "apple.banana", "苹果来了8斤;香蕉来了20斤");
}
}
convertAndSend() 方法的
第 1 个参数表示交换机
第 1 个消息会被 apple-queue 接收
第 2 个参数表示路由键(消息的话题)
第 2 个消息会被 banana-queue 接收
第 3 个是消息内容。
第 3 个消息会被两个队列接收。
启动项目,然后访问 http://127.0.0.1:8080/test ,控制台输出如下,验证成功。
赵五知道:香蕉来了5斤
李四知道:苹果来了10斤
赵五知道:苹果来了8斤;香蕉来了20斤
李四知道:苹果来了8斤;香蕉来了20斤
李四知道:苹果来了10斤
赵五知道:苹果来了8斤;香蕉来了20斤
李四知道:苹果来了8斤;香蕉来了20斤
RabbitMQ报错:connection error; protocol method: #method<connection.close>(reply-code=530, reply-text=N
https://blog.csdn.net/pang_ping/article/details/111227552
实践经验总结
资源管理
配置文件与命名规范
对template进一步封装
消息落库+定时任务
生产环境运维监控
日志追踪
如何减少连接数
常见面试题
消息队列的作用与使用场景?
Channel 和 vhost 的作用是什么?
RabbitMQ 的消息有哪些路由方式?适合在什么业务场景使用?
交换机与队列、队列与消费者的绑定关系是什么样的?
无法被路由的消息,去了哪里?
消息在什么时候会变成 Dead Letter(死信)?
如果一个项目要从多个服务器接收消息,怎么做?如果一个项目要发送消息到多个服务器,怎么做?
RabbitMQ 如何实现延迟队列?
哪些情况会导致消息丢失?怎么解决?哪些情况会导致消息重复?怎么解决?
一个队列最多可以存放多少条消息?
可以用队列的 x-max-length 最大消息数来实现限流吗?例如秒杀场景。
如何提高消息的消费速率?
AmqpTemplate 和 RabbitTemplate 的区别?
如何动态地创建消费者监听队列?
Spring AMQP 中消息怎么封装?用什么转换?
如何保证消息的顺序性?
RabbitMQ 的集群节点类型?
如何保证 RabbitMQ 的高可用?
大量消息堆积怎么办?
MQ 选型分析?
设计一个 MQ,你的思路是什么?
ZeroMQ
ZeroMQ号称最快的消息队列系统,尤其针对大吞吐量的需求场景。
号称最快的消息队列系统,尤其针对大吞吐量的需求场景。
扩展性好,开发比较灵活,采用C语言实现,实际上只是一个socket库的重新封装,如果做为消息队列使用,需要开发大量的代码。
ZeroMQ仅提供非持久性的队列,也就是说如果down机,数据将会丢失。
针对大吞吐量的需求场景,实现高级/复杂的队列。ZeroMQ提供非持久性的队列,如果宕机,数据会丢失
一款开源的消息队列软件
ZeroMQ能够实现RabbitMQ不擅长的高级/复杂的队列,但是开发人员需要自己组合多种技术框架,技术上的复杂度是对这MQ能够应用成功的挑战。
ZeroMQ具有一个独特的非中间件的模式,你不需要安装和运行一个消息服务器或中间件,因为你的应用程序将扮演这个服务器角色。
你只需要简单的引用ZeroMQ程序库,可以使用NuGet安装,然后就可以愉快的在应用程序之间发送消息了。
【缺点】但是ZeroMQ仅提供非持久性的队列,也就是说如果宕机,数据将会丢失。
应用场景
Storm
Storm是什么?
Storm是Twitter开源的分布式实时大数据处理框架,被业界称为实时版Hadoop。
随着越来越多的场景对Hadoop的MapReduce高延迟无法容忍,比如网站统计、推荐系统、预警系统、金融系统(高频交易、股票)等等
大数据实时处理解决方案(流计算)的应用日趋广泛
目前已是分布式技术领域最新爆发点,而Storm更是流计算技术中的佼佼者和主流。
Storm的底层
Twitter的Storm 0.9.0以前的版本中,默认使用ZeroMQ作为数据流的传输
Storm从0.9版本开始,同时支持ZeroMQ和Netty作为传输模块
SaltStack
Saltstack由Master和Minion构成,通过ZeroMQ进行通信
用于在 Minion 端与 Master 端建立系统通信桥梁。
SaltStack 采用 C/S 结构来对云环境内的服务器操作管理及配置管理。采用 C/S模式,Server端就是salt的master,Client端就是minion
Saltstack由Master和Minion构成,通过ZeroMQ进行通信。
Master是服务器端,表示一台服务器;Minion是客户端,表示有多台服务器。
在master上发送命令给符合条件的minion,minion就会执行相应的命令
SaltStack架构中的一种就是master > minion。
minion与master之间通过轻量级的ZeroMQ、ZMQ(消息队列)进行通信的
使用消息队列zeroMQ传输数据
从网上数据来看,SaltStack比ansible快大约40倍
底层使用ZeroMQ消息队列pub/sub方式通信
结合了轻量级的消息队列软件 ZeroMQ 与 Python 第三方模块构建。
号称世界上最快的消息队列ZeroMQ,使得SaltStack能快速在成千上万台机器上进行各种操作
Redis
Redis是一个基于Key-Value对的NoSQL数据库,开发维护很活跃。
虽然它是一个Key-Value数据库存储系统,但它本身支持MQ功能,所以完全可以当做一个轻量级的队列服务来使用。
Redis和RabbitMQ的入队和出队操作对比
各执行100万次,每10万次记录一次执行时间。
测试数据分为128Bytes、512Bytes、1K和10K四个不同大小的数据。
实验表明
入队
当数据比较小时,Redis的性能要高于RabbitMQ
如果数据大小超过了10K,Redis则慢的无法忍受
出队
无论数据大小,Redis都表现出非常好的性能
而RabbitMQ的出队性能则远低于Redis。
支持MQ功能的基于key-value对的Nosql数据库,可以当轻量级的队列服务来使用
实验表明:入队时,当数据比较小时Redis的性能高于RabbitMQ,数据大小超过10k,Redis则慢的无法忍受;出对时,无论数据大小,Redis都表现出非常好的的性能,而RabbitMQ的性能则远低于Redis
Apache Kafka/Jafka
Jafka是kafka的一个升级版。
拥有非常好的性能,具有很多优良的特性,快速持久性;
高吞吐;
完全分布式;
自动实现负载均衡;
支持hadoop数据并行加载来统一在线和离线的消息
一个强大灵活的开源分布式发布-订阅消息系统框架
可灵活处理分布式,大并发等消息应用场景
但是在事务性方面,没有同类的中间件强大
Kafka是Apache下的一个子项目,是一个高性能跨语言分布式发布/订阅消息队列系统
而Jafka是在Kafka之上孵化而来的,即Kafka的一个升级版。
ActiveMQ
ActiveMQ是Apache下的一个子项目。
类似于ZeroMQ,它能够以代理人和点对点的技术实现队列。
同时类似于RabbitMQ,它少量代码就可以高效地实现高级应用场景。
以代理人和点对点的技术实现队列,适用少量代码就可以高效的实现高级应用场景
历史悠久的Apache开源项目。已经在很多产品中得到应用,实现了JMS1.1规范,可以和springjms
轻松融合,实现了多种协议,支持持久化到数据库,对队列数较多的情况支持不好。
轻松融合,实现了多种协议,支持持久化到数据库,对队列数较多的情况支持不好。
EMQ X
EMQ是什么?
EMQ X (Erlang/Enterprise/Elastic MQTT Broker) 是基于 Erlang/OTP 平台开发的开源物联⽹ MQTT 消息服务器。
Erlang/OTP 是出⾊的语⾔平台。
软实时 (Soft-Realtime)
低延时 (Low-Latency)
分布式 (Distributed)
MQTT 是轻量的 (Lightweight)、发布订阅模式 (PubSub) 的物联⽹消息协议。
EMQ X 设计⽬标是
实现⾼可靠,并⽀持承载海量物联⽹终端的 MQTT 连接,⽀持在海量物联⽹设备间低延时消息路由:
稳定承载⼤规模的 MQTT 客户端连接,单服务器节点⽀持 50 万到 100 万连接。
分布式节点集群,快速低延时的消息路由,单集群⽀持 1000 万规模的路由。
消息服务器内扩展,⽀持定制多种认证⽅式、⾼效存储消息到后端数据库。
完整物联⽹协议⽀持,MQTT、MQTT-SN、CoAP、LwM2M、WebSocket 或私有协议⽀持。
EMQ X 开源产品不⽀持服务器内部消息持久化,这是⼀个架构设计选择。
EMQ X 解决的核⼼问题是连接与路由。
EMQ X 开源版本重要核⼼功能
完整的 MQTT V3.1/V3.1.1 及 V5.0 协议规范⽀持
LDAP、Redis、MySQL、PostgreSQL、MongoDB、HTTP 认证集成
基于客户端 ID、IP 地址、⽤户名的访问控制 (ACL)
多服务器节点集群 (Cluster)
MQTT Broker 桥接⽀持及规则引擎
各种物联⽹协议⽀持,包括 MQTT-SN、CoAP、 LwM2M 等协议
EMQ X的共享订阅
对共享订阅的协议⽀持
此前标准⽆共享订阅的内容,共享订阅由各个软件⼚商⾃已定义,不具备通⽤性。
常⻅的 MQ 都有类似的共享订阅模式的分⽚机制
现状
子主题
在这种结构下,如果订阅节点发⽣故障,就会导致发布者的消息丢失(QoS 0)或者堆积在 Server 中(QoS 1, 2)。
⼀般情况下,解决这个问题的办法都是直接增加订阅节点,但这样⼜产⽣了⼤量的重复消息,
不仅浪费性能,在某些业务场景下,订阅节点还需要⾃⾏去重,进⼀步增加了业务的复杂度。
⽬前两种实现⽅式
基于选举机制(Raft 或 ETCD)实现多活,当前只有⼀个节点实际处理消息;
业务层实现消息分⽚,每个节点只处理某些设备的数据,实现难度⾼且难以维护
改进
利⽤中间件将有状态服务转化为⽆状态服务
不同的消息可以路由⾄不同的订阅⽅,能够实现消息分⽚,
并且如果当某⼀个订阅⽅出现宕机,对应的消息会分配给存活的订阅⽅,从⽽能够实现⾼可⽤。
共享策略
random
在所有共享订阅会话中随机选择⼀个发送消息
round_robin
轮询调度
按照订阅顺序轮流选择
sticky
使⽤ random 策略随机选择⼀个订阅会话,持续使⽤⾄该会话取消订阅或断开连接再重复这⼀流程
hash:
对发送者的 ClientID 进⾏ hash 操作,根据 hash 结果选择订阅会话
hash 的共享策略⾮常适合根据设备做分⽚,因为不同的⻋可以通过 ClientID 区分。
MQTT Broker
子主题
学习地址
https://www.emqx.com/zh
HiveMQ
Apache Pulsar
0 条评论
下一页