Mq---ActiveMq,RabbitMq,Zookeepr
2021-05-10 17:08:27 1 举报
AI智能生成
消息中间件?原理是什么?消息机制什么?知其然,知其所以然
作者其他创作
大纲/内容
Activemq
架构
消息机制
建立连接
Server--Mq打开端口
client--客户端通过端口建立连接
client---把收到的socket放入阻塞队列
client---handler监听阻塞的是否有数据
client--生成transport用来处理链路的状态
生成消息处理链条,生成TcpTransport
建立链路完成
关闭连接
双心跳
Broker心跳
Client心跳
定时器
每隔一段固定的时间调用一次
ReacCheck
WriteCheck
消息模式
(2种)
(2种)
Queue模式
点对点,一个消息一个消费者
离线消费者,可收到积压的消息
Topic模式
普通订阅
不区分消费者
根据广播发送
持久订阅
区分消费者
离线消费者,可收到积压的消息
异常消息处理,消息重试极致
https://www.cnblogs.com/shamo89/p/8092714.html
kafka
架构
broker
一台kafka机器就是一个broker;
消息模式
(共2种)
(共2种)
点对点模式
发布订阅模式
特殊说明
消费后会记录偏移量offset;
它存在在topic中,该topic为:__consumer_offsets
它存在在topic中,该topic为:__consumer_offsets
删除文件可配置参数:
log.retention.hours=16
log.retention.bytes=1g
log.retention.hours=16
log.retention.bytes=1g
存储机制
存储
规则
topic
一个topic可以有多个partition
一个topic可以有多个partition
partition 0
segment
partition 1
segment
工作流程
为了数据安全: partition不允许,leader和follow在一起;
为了数据安全: partition不允许,leader和follow在一起;
文件
两个落地文件: .index 与.log
partition 0
以分区0为例;
他的巧妙就在这位了防止单个log过大,拆分了多个子文件
以分区0为例;
他的巧妙就在这位了防止单个log过大,拆分了多个子文件
segement 0
落地文件:
00000000000000009014.index 索引信息, 14代表offset
00000000000000009014.log 数据信息 14代表offset
落地文件:
00000000000000009014.index 索引信息, 14代表offset
00000000000000009014.log 数据信息 14代表offset
segement 1
落地文件:
00000000000000009015.index
00000000000000009015.log
落地文件:
00000000000000009015.index
00000000000000009015.log
存储示意图
可靠性
发送的可靠性
producer发送消息到topic的partition的时候,需要向producer发送ack,不然会重新发送
副本可靠性
ISR
in-sync replica set
同步副本集
in-sync replica set
同步副本集
同步成功
每个broker的partion都会有一个ISR,当producer write的时候,;
主partition,等待副本isr告诉主是否成功的消息;
如果其中一个isr失败就被移除;
主partition,等待副本isr告诉主是否成功的消息;
如果其中一个isr失败就被移除;
同步失败
HW( high watemark)
维护了hw这个记录了最后一次的offset
维护了hw这个记录了最后一次的offset
副本重启的时候从offset开始追数据,一旦追上就可以重新加入到ISR
ISR
partion消费策略
RoundRobin 轮询
解决消费不均衡的问题
Range 默认
consumer自己订阅 topic的partition
RabbitMq
消息架构
消息机制
Provider提供者
创建会话
队列&交换机绑定
创建路由关键字
发送消息
Consumer消费者
创建会话
绑定队列
A:实现消费(结束)
B:实现消费(结束)
队列监听
autoAck=true
自动回复mq
autoAck=false
编程自行回复
消息模式
(共4种)
(共4种)
Fanout 扇形
即广播模式
即广播模式
没有对应关系,谁绑定了我这个交换机,我就发送消息给谁
简单粗暴
Direct 直连
绑定了我这个交换机,还得指定一个routing_key用它来绑定一个队列
适用于有消息优先级的场景
Topic 主题
(用的最多)
(用的最多)
绑定了我这个交换机,还得指定一个routing_key的正则表达式,用正则来匹配队列
适用于同一业务对象的所有数据,多属性下发
比如说,你有个用户中心,
需要用户中心有订单属性,物流属性需要下发可以用它
比如说,你有个用户中心,
需要用户中心有订单属性,物流属性需要下发可以用它
Provider
1、主题Topic Key String EXCHANGE_TOPICS_INFORM="exchange_topics_inform";
channel.exchangeDeclare(EXCHANGE_TOPICS_INFORM, BuiltinExchangeType.TOPIC);
channel.exchangeDeclare(EXCHANGE_TOPICS_INFORM, BuiltinExchangeType.TOPIC);
2、交换机key String ROUTINGKEY_EMAIL="inform.#.email.#";
3、队列 key String QUEUE_INFORM_EMAIL = "queue_inform_email";
Consumer
1、队列key String QUEUE_INFORM_EMAIL = "queue_inform_email";
Headers 首部
不需要routing_key,消息用hash结构,
适用于,规则不只是字符串的
Zookeeper
架构
Leader
更新系统状态
Follower
接收请求
存储结构
每一个节点能存储1MB数据
一致性
Server
每个Server都保存一份相同的数据副本,Client无论连接哪一个都是一致的
Paxos
高度容错性,一致性算法
主要介绍
RabbitMq
activemq
kafka
Zookeeper
0 条评论
下一页