消息队列
2023-09-12 15:43:05 4 举报
AI智能生成
消息队列的思维导图
作者其他创作
大纲/内容
公共模块
序列化和反序列化,建立一个公共的类,利用objectOutputSteam(Input)实现
BinaryTool
本质都是操作文件,通过二进制字节流处理文件
toBytes()
fromBytes()
通信协议
BasicArguments
使用这个类表示方法的公共参数/辅助的字段,后续每个方法又会有一些不同的参数, 不同的参数再分别使用不同的子类来表示
BasicAckArguments
BasicConsumeArguments
BasicPublishArguments
BasicReturns
ExchangeDeclareArguments
ExchangeDeleteArguments
QueueBindArguments
QueueDeclareArguments
QueueDeleteArguments
QueueUnbindArguments
Request
Response
SubScribeReturns
参数
consumerTag
消费者标识
basicProperties
信息的基本组成
body
消息本体
ConsumerEnv
表示一个消费者完整的执行环境
MqException
自定义异常处理
Consumer
一个函数式接口(回调函数),收到消息后需要处理消息时调用的方法;具体如何消费看需求
客户端模块
通信管理
Channel
参数
每个通信都有自己的id
channelId
channel 是来自哪个连接的
connection
用来存储后续客户端收到的服务器的响应
ConcurrentHashMap<String, BasicReturns>
如果当前的 channel 定义了某个队列,就需要在此处记录下对应的回调是啥,当该队列的消息返回来就调用一下回调函数
这里约定,一个 Channel 只能有一个回调
消费者
Consumer
方法
构造方法
Channel(String channelId, Connection connection)
完成 channelId 和 connection 的初始化
createChannel
在这个方法中,和服务器进行交互,告知服务器,此处客户端创建了新的 channel
对于创建 Channel 操作来说, payload 就是一个 basicArguments 对象
大致流程
先 new 出 BasicArguments 对象
这个对象下有两个参数:rid:表示一次请求/响应 的身份标识,可以把请求和响应对上; channelId
这个 rid 也就是 UUId,为了防止混淆,加一个 “R”前缀
将 channelId 和 Rid 设置进去,再将其转化为一个 字节数组
构造出一个请求,这个请求包含了设置创建一个 channel 、这个载荷的长度、载荷
构造出完整请求之后, 就可以发送这个请求
最后等待服务器响应
waitResult(String rid)
期望使用这个返回值来阻塞等待服务器响应
因为没有做到发出一个请求就立马返回至客户端,不是自己的响应不能直接接收;同时期可能存在多个响应
通过这个参数 rid 来判断是不是自己的响应
收到这个响应之后从哈希表中删除,否则会越积越多
putReturns
把所有线程都唤醒
被 Connection 下的 dispatchResponse 调用
close
关闭 channel,给服务器一个 type = 0x2 的请求
大致流程类似于 createChannel
BasicReturns basicReturns = waitResult(arguments.getRid()); 每个构造的请求,在等待响应时都会调用
exchangeDeclare
创建交换机
大致流程:
将一个交换机所需要的参数都设置好
其余流程也类似与 createChannel
exchangeDelete
删除交换机
queueDeclare
创建队列
也是将队列所需的参数都设置好,再进行构造请求
queueDelete
删除队列
queueBind
创建绑定
也是将绑定所需的参数都设置好,再进行构造请求
queueUnbind
解除绑定
basicConsume
订阅消息
大致流程:
先设置回调
判断执行回调的 consumer 是否为null,不为null 就抛异常
new 出 BasicConsumeArguments
设置 BasicConsumeArguments 的参数
构造请求
发出请求,并等待响应
BasicReturns basicReturns = waitResult(arguments.getRid()); 阻塞等待服务器响应
return basicReturns.isOk();
basicPublish
发送消息
大致流程:
new 出 QueueUnbindArguments
设置好其中的参数,再构造出请求
发出请求并等待响应
BasicReturns basicReturns = waitResult(arguments.getRid());
basicAck
确认消息
大致流程:
new 出 BasicAckArguments
设置好其中的参数,再构造出请求
发出请求并等待响应
BasicReturns basicReturns = waitResult(arguments.getRid());
关于其余参数设置的get、set 方法
通道管理
Connection
参数
一个连接需要管理多个 channel,使用一个哈希表,把若干个 channel 组织起来
channelMap
传输原始二进制数据
InputStream
OutputStream
确保数据以独立于平台的方式格式化
DataInputStream
DataOutputStream
套接字
Socket
线程池
ExecutorService
方法
构造方法
Connection(String host, int port)
将 host 和 port 给 Socket new 出套接字
创建一个扫描线程, 由这个线程负责不停的从 socket 中读取响应数据. 把这个响应数据再交给对应的 channel 负责处理
获取到socket 中的输入输出流,并且创建出 DataInputStream 和 DataOutputStream
dispatchResponse
使用这个方法来分别处理, 当前的响应是一个针对控制请求的响应, 还是服务器推送的消息
close
释放资源
writeRequest
构造并发送请求
主要时通过DataOutputSteam 写入请求(自定义约定的)
readResponse
读取响应
同样是通过DataInputSteam 读取自定义约定的请求
大致流程:
new 出Response
将这个 response 设置好类型和长度
new 出相同大小的 byte 数组
判断这个数组长度是否和请求长度相等
相等则将这个长度设置为有效载荷
createChannel
在 Connection 中创建一个 Channel
大致流程:
通过 UUID 作为ChannelId,为了防止与其他 Id 重复,给它加个“C”的前缀
创建完同时把它放入到存放 channel 的哈希表中
同时告诉服务器,创建了这个 channel,并且要处理创建失败的情况
ConnectionFactory
提供 brokerServer 的 ip 地址和 端口号
提供了 ip地址 和 端口号的get、set方法
还可以设置虚拟机的属性如:用户名、密码、虚拟机名字(未实现)
服务器模块(BrokerServer)
虚拟机控制模块(BrokerServer控制虚拟主机,从逻辑上对各个组件进行隔离)
内存管理(具体是否存入硬盘还得看是否需要持久化)
交换机管理
添加交换机(insertExchange())
删除交换机(deleteExchange())
获取交换机(getExchange())
队列管理
添加队列(insertQueue())
删除队列(deleteQueue())
获取队列(getQueue())
绑定管理
添加绑定(insertBinding())
删除绑定(deleteBinding())
获取绑定(根据交换机和队列)(getBinding())
消息管理
添加消息(addMessage())
根据id查找消息(getMessage())
删除消息(removeMessage())
发送消息(sendMessage())
获取消息(pollMessage())
加载队列中的消息(loadAllMessageFromQueue())
只有在程序启动时才会调用
待确认消息管理
获取队列中消息的个数(getMessageCount())
添加未确认消息(addMessageWaitAck())
确认消息(removeMessageWaitAck())
获取没被确认的消息(getMessageWaitAck())
将硬盘持久化的数据保存在内存中(recover())
封装
初始化
init()
硬盘
只需要new,底层已经初始化了
内存
需要自己手动初始化,底层没有初始化
封装交换机
插入交换机(insertExchange())
删除交换机(deleteExchange())
查询所有交换机(selectExchanges())
封装队列
插入队列(insertQueue())
删除队列(deleteQueue())
查询所有队列(selectAllQueue())
封装绑定
插入绑定(insertBinding())
删除绑定(deleteBinding())
查询所有绑定(selectAllBindings())
封装消息
发送消息(sendMessage())
删除消息(deleteMessage())
从队列中查询所有的消息(loadAllMessageFromQueue())
消息转发
不同的交换机有不同的转发规则
Direct:直接交换机
只发送指定的队列发送一次
给指定的队列发消息
Fanout:扇出交换机
只要存在的队列都要发送消息
直接给所有队列发送消息
Topic:主题交换机
bindingKey 和 RoutingKey 相互匹配时才可以发消息
设置了自定义转发规则,规则如下:
bindingKey
1. 数字, 字母, 下划线
2. 使用 . 分割成若干部分
3. 允许存在 * 和 # 作为通配符. 但是通配符只能作为独立的分段.
4.连着的 * 和 # 只有 *.* 合法,其他都不合法
routingKey
1. 数字, 字母, 下划线
2. 使用 . 分割成若干部分
Header:消息头交换机
规则复杂并且应用场景较少(未实现)
硬盘管理
数据库管理
初始化数据(init())
建库
建表
交换机
insertExchange()
deleteExchange()
selectExchanges()
队列
insertQueue()
deleteQueue()
selectAllQueue()
绑定
insertBinding()
deleteBinding()
selectAllBindings()
文件管理(主要是存消息)
消息内容
创建队列对应的文件和目录
createQueueFiles()
大致流程
先创建队列对应的消息目录
创建队列数据文件
创建消息统计文件
给消息统计文件, 设定初始值
删除队列的目录和文件
destroyQueueFiles()
大致流程
先删文件
再删目录
检查队列的目录和文件是否存在
checkFilesExits()
检查统计文件和数据队列
将消息添加到数据文件中
sendMessage()
加锁:存在多个用户端同时添加的问题
将数据转换为二进制再添加到文件中,具体通过调用自定义实现的转换类中的方法实现
删除一个消息
deleteMessage()
逻辑删除,把参数 isValid 改为0,只有当触发回收机制时才真正执行删除操作
读取消息存放到内存(一个链表)中
loadAllMessageFromQueue()
创建一个死循环,不断读取,读到文件末尾会抛出EOFException()异常
由于该方法是在程序启动时调用, 此时服务器还不能处理请求呢 不涉及多线程操作文件.所以可以不用加锁
消息统计信息(作为一个内部类存放在文件管理类中,因为只有在处理消息文件才会用到,不必单独一个类)
读写文件信息个数
readStat
writeStat
垃圾回收
利用复制算法实现垃圾回收机制
触发回收
checkGC()
判断有效消息和无效消息的比例
执行回收
gc()
创建一个新文件,将旧文件中有效数据存至新文件中,再将旧文件删除,最后将新文件名改为旧文件名
API实现
调用下层API,同时为上层调用提供API
创建虚拟机(VirtualHost())
创建交换机(exchangeDeclare())
删除交换机(exchangeDelete())
创建队列(queueDeclare())
删除队列(queueDelete())
创建绑定(queueBind())
删除绑定(queueUnbind())
发送消息(basicPublish())
订阅消息(basicConsume())
消费者调用basicConsume就是订阅某个指定队列的消息,当队列收到消息之后,就需要把消息推送给他的订阅者
参数有如下几个
consumerTag
consumer
queueName
队列名
autoAck
消息被消费完成后, 应答的方式. 为 true 自动应答. 为 false 手动应答.
consumer
是一个回调函数. 此处类型设定成函数式接口. 这样后续调用 basicConsume 并且传实参的时候, 就可以写作 lambda
内部方法sendMessage()
大致流程:
写内存、写硬盘、结束完唤醒消费者(所谓唤醒就是将消息存放令牌队列中put进去)
手动应答(basicAck())
就是手动将消息删除,删除硬盘、内存(消息中心和待确认的集合中的数据)中的数据
删硬盘
deleteMessage(queue,message)
删除消息中心
removeMessage(messageId)
删待确认集合
removeMessageWaitAck(queueName,messageId)
消息队列 的本体服务器
属性
ServerSocket
VirtualHost
当前考虑一个 BrokerServer 上只有一个 虚拟主机
private VirtualHost virtualHost = new VirtualHost("default");
sessions
使用这个 哈希表 表示当前的所有会话(也就是说有哪些客户端正在和咱们的服务器进行通信)
此处的 key 是 channelId, value 为对应的 Socket 对象
ConcurrentHashMap<String, Socket>
ExecutorService
引入一个线程池, 来处理多个客户端的请求
runnable
引入一个 boolean 变量控制服务器是否继续运行
方法
构造方法
BrokerServer(int port)
设置客户端服务器套接字的端口号
start
启动方法
启动线程池
创建与客户端的连接
借助上面布尔值的参数,不断的循环,并且把处理连接的逻辑丢给这个线程池
stop
关闭服务器
停止线程池,就是将上面的布尔变量变为 false,循环结束
停止正在运行的线程
关闭 TCP 连接(socket)
私有方法
关于连接
writeResponse
将处理好的响应返回给客户端
通过 DataOutputSteam返回写入
写入 type、length、载荷,最后刷新一下
process
根据解析(将字节流转换成定义的结构)响应类型请求,计算响应
0x1
创建 channel
0x2
销毁 channel
0x3
创建 交换机
0x4
删除 交换机
0x5
创建 队列
0x6
删除 队列
0x7
创建 绑定
0x8
删除 绑定
0x9
将请求发送到指定的交换机
0xa
消费消息(订阅消息)
0xb
确认应答
0xc
表示服务器给客户端提供的消费数据
其余的皆为非法的
processConnection
处理一个客户端的连接,在这一个连接中, 可能会涉及到多个请求和响应
按照特定格式来读取并解析. 此时就需要用到 DataInputStream 和 DataOutputStream
这个方法内部写了个死循环,不停的进行下列操作
1. 读取请求并解析
2. 根据请求计算响应
调用了 process
3. 把响应写回给客户端
直到客户端不发出请求,这里通过抛出异常的情况,结束循环
finally 中 关闭相关信息
一个 TCP 连接中, 可能包含多个 channel. 需要把当前这个 socket 对应的所有 channel 也顺便清理掉
调用了 clearClosedSession
clearClosedSession
遍历上述 sessions hash 表, 把该被关闭的 socket 对应的键值对, 统统删掉
大致流程:
不断地遍历 sessions 这个哈希表
判断 是否等于 clientSocket 这个待删除的 Socket
结果为真就添加到另一个待撒删除的哈希表
sessions 遍历结束,才进入删除阶段
一边遍历待删除集合,一边删除 sessions 集合中的 键值对
readResponse
读取请求,并解析
根据定义的结构提取相应的信息,大致就是 new 出 request,再构造 type、length 和 载荷 ,最后返回
DataBaseManager
本质上是一个 TCP 服务器
MessageFileManager
MemoryDataCenter
rabbitmq消息消费的一个主要流程
消费者连接到 RabbitMQ Broker,建立一个连接(Connection),开启一个信道(Channel)。
消费者向 RabbitMQ Broker 请求消费相应队列中的消息,可能会设置相应的回调函数,以及做一些准备工作
等待 RabbitMQ Broker 回应并投递相应队列中的消息,消费者接收消息。
消费者确认(ack)接收到的消息。
RabbitMQ 从队列中删除相应已经被确认的消息。
关闭信道。
为什么消息中间件不直接使用http协议
http大部分都是短连接,在实际的交互过程中,一个请求到响应很可能会被中断,中断以后也不会持久化,就会造成数据的丢失。这样就不适合在消息中间件中使用,因为消息中间件是一个长期的获取信息的过程,如果出现问题或故障,要进行持久化,目的是为了保证消息和数据的高可靠和高可用。
0 条评论
下一页