消息队列
2023-11-08 16:36:02 3 举报
AI智能生成
基于RabbitMQ的消息队列设计
作者其他创作
大纲/内容
拓展思路
虚拟主机管理
上述我们规定使用
vIrtualHostName+exchangeName
vIrtualHostName+queueName
划分虚拟主机和他们内部子模块的从属关系,同时保证了不同的虚拟主机内部可以存在同名交换机队列等。
可以在数据内存管理模块中创建一个哈希表用来表示虚拟主机,同时可以进行持久化操作,便于重启恢复
根据数据结构可以增加虚拟主机的增删查改操作
用户管理
可以拓展用户名密码字段,验证用户身份进一步保证数据安全
死信队列
可以考虑设置⼀个死信队列,每⼀个队列都会绑定⼀个死信队列。
应⽤场景1:当消息在消费过程中出现异常,就会把消息投⼊到死信队列中,进行进一步处理
应用场景2:可以⽤⼀个线程扫描待确认队列, 如果发现队列中的消息超时未确认, 则放⼊死信队列,进行进一步处理
公共模块
消费相关
回调函数
创建函数式接口 Consumer
handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body)
消费环境
消费者是以队列为维度来订阅消息的,⼀个队列可以有多个消费者,在队列维护一个消费者列表,列表中的每一个元素表示一个完整的消费环境ConsumerEnv,包括consumerTag消费者标识、queueName所属队列名、autoAck是否自动应答、consumer消费回调函数
队列每次以轮询的方式获取消费者,进行具体消费
序列化/反序列化
这里使用Java标准库提供了序列化⽅案 ObjectInputStream 和 ObjectOutputStream
序列化
创建一个ByteArrayOutputStream()流对象,这个流对象相当于一个变长的字节数组
调用objectOutputStream.writeObject(object)
返回序列化结果byteArrayOutputStream.toByteArray()
反序列化
创建一个ByteArrayInputStream(data)流对象
调用objectInputStream.readObject()将数据反序列化为Object对象
通信协议
传输层协议
TCP
自定义应用层协议
自定义协议格式
请求
payload 表⽰这次⽅法调⽤的各种参数信息
响应
payload 表⽰这次⽅法调⽤的返回值
定义Request
请求类型 int type
请求载荷长度 int length
请求载荷 byte[] payload
定义参数⽗类 BasicArguments
用于匹配请求/响应的标识:
String rid
用于表示发送请求的channel:
String channelId
定义Response
响应类型 int type
响应载荷长度 int length
响应载荷 byte[] payload
定义响应父类 BasicReturns
用于匹配请求/响应的标识:
String rid
用于表示获取响应的channel:
String channelId
表示这次API调用的结果:
boolean ok
服务器模块
Broker 核心概念定义
交换机 Exchange
交换机名 exchangeName
交换机类型 type
直接交换机 Direct
扇出交换机 Fanout
主题交换机 Topic
持久化 durable
自动删除 autoDelete
额外参数 arguments
队列 MSGQueue
队列名 Queuename
持久化 durable
自动删除 autoDelete
是否独占 exclusive
额外参数 arguments
绑定 Binding
交换机名 exchangeName
队列名 queueName
绑定键 bindingKey
消息 Message
消息属性 basicProperties
消息Id messageId
路由键 routingKey
持久化 deliverMode
消息内容 body
定位属性 offsetBeg
定位属性 offsetEnd
是否有效 isValid(1:有效 0:无效)
数据硬盘管理 DiskDataCenter
数据库管理 DataBaseManager
引入SQlite
导入相关依赖
这里使用 SQlite 而不是 MySQL,因为相比之下SQlite更轻量,消息队列作为一个中间件,选用 SQlite 更为合适
创建Mapper接口
添加相关配置
SQlite 连接配置
Mybatis 路径配置
数据库初始化
inti()
checkDBExists()
数据库已存在啥也不做
数据库不存在则创建数据库目录 ./data
不需要手动创建 meta.db,MyBatis 会帮我们创建
createTable()
创建交换机表
创建队列表
创建绑定表
createDefaultData()
添加一个匿名交换机
交换机
insertExchange(Exchange)
deleteExchange(String)
selectAllExchanges()
队列
insertQueue(MSGQueue)
deleteQueue(String)
selectAllQueues()
绑定
insertBinding(Binding)
deleteBinding(Binding)
selectAllBindings()
文件管理 MessageFileManager
消息操作只设计简单的读写,不涉及复杂的查询,并且消息的数量会很多,使用数据库效率并不高。
定义消息文件目录读取的辅助方法
获取指定队列消息文件所在路径
String getQueueDirPath(String queueName)
获取指定对列统计文件所在路径
String getQueueStatPath(String queueName)
获取指定队列数据文件所在路径
String getQueueDataPath(String queueName)
检查队列对应的目录和文件是否存在
boolean checkFilesExits(String queueName)
创建队列对应的文件和目录
createQueueFiles(String queueName)
创建队列对应的消息目录
创建统计文件
创建数据文件
初始化统计文件(0\t0)
删除队列对应的文件和目录
destoryQueueFiles(String queueName)
先删除文件
后删除目录
规定消息存储格式
消息是依托于队列的,因此存储的时候,就要把 消息 按照 队列 维度展开。在 data 中创建⼀些⼦⽬录,每个队列对应⼀个⼦⽬录,⼦⽬录名就是队列名
分支主题
每个队列中规定存在两个文件
queue_data.txt:消息数据文件
分支主题
queue_stat.txt:消息统计文件
分支主题
统计文件的读写
Stat 内部类:用于表示该队列的统计信息
读取统计文件
Stat readStat(String queueName)
判断文件是否存在
将stat文件信息读取到Stat对象中
写入统计文件
writeStat(String queueName,Stat stat)
判断文件是否存在
将stat对象中的数据写入到stat文件中(覆盖写入)
刷新缓冲区
数据文件的读写删
在文件中添加消息
sendMessage(MSGQueue msgQueue, Message message)
校验写入文件目录是否存在
调用 BinaryTool.toBytes(message) 将消息对象序列化为二进制数据
计算offsetBeg 和 offsetEnd 便于定位消息在文件中的位置
使用 DataOutputStream 进行写入(追加写)
调用writeInt()保证写入的是4个字节的消息长度
调用write()写入消息体
更新统计文件
有效数据个数+1
总数据个数+1
删除文件中的指定消息(逻辑删除)
deleteMessage(MSGQueue msgQueue,Message message)
根据传入 Message 对象中的offsetBeg和offsetEnd 在文件中读取数据
使用 RandomAccessFile 它提供了随机访问文件的能力,可以在文件中任意位置进行读写操作
调用randomAccessFile.seek(int)调整光标位置
从光标位置读取指定长度数据
调用 BinaryTool.toObject() 将读取到的二进制数据反序列化为Message对象
将读取到的Message对象中的isValid置为无效,并调用BinaryTool.toBytes()重新序列化数据
调用randomAccessFile.seek(int)重新调整光标位置
将修改过的数据重新写入到原来的位置
获取有效消息集合(服务器重启时执行)
LinkedList<Message> loadAllMessageFromFile(String queueName)
使用DataOutPutStream 进行死循环读取
读取上述指定长度的数据作为消息的主体
校验实际读取长度是否和消息长度一致,避免格式错误
调用readInt()保证读取的是4个字节的消息长度
调用BinaryTool.toObject()将读取到的消息反序列化为Message对象
判定一下这个消息对象是否为有效对象
无效直接跳过
有效则设置消息的offsetBeg和offsetEnd并添加到返回链表中
使用DataOutPutStream读取到文件末尾会抛出异常:EOFException.借助异常结束循环
垃圾回收GC
GC条件
boolean checkGC(String queueName)
根据有效消息与总消息的比例判断是否进行GC
GC策略
复制算法:遍历原有的消息数据⽂件,把所有的有效数据数据重新拷⻉⼀份到新的⽂件中,然后将新文件重命名为原文件名,最后把旧的⽂件直接删除掉。
分支主题
GC实现
创建一个文件副本,用于存储有效消息
判断文件副本是否已存在(如果已存在说明上次GC程序出错)
调用loadAllMessageFromFile()方法,获取所有信息
把有效消息写入到文件副本中
同样使用DataOutputStream进行写入操作
调用writeInt()写入消息长度
调用write()写入消息内容
删除旧的data文件
将新data文件重命名
更新统计文件
数据内存管理 MemoryDataCenter
内存数据结构设计
交换机结构
ConcurrentHashMap<String, Exchange>
key表示交换机名
队列结构
ConcurrentHashMap<String, MSGQueue>
key表示队列名
绑定结构
ConcurrentHashMap<String, HashMap<String, Binding>>
第⼀个key是交换机名, 第⼆个key是队列名
消息中心结构
ConcurrentHashMap<String, Message>
key表示消息Id
消息队列关联结构
ConcurrentHashMap<String, LinkedList<Message>>
key表示队列名
待确认消息结构
ConcurrentHashMap<String, ConcurrentHashMap<String, Message>>
第一个key表示队列名,第二个key表示消息Id
交换机内存管理
新增交换机
insertExchange(Exchange exchange)
删除交换机
deleteExchange(String exchangeName)
查询交换机
Exchange getExchange(String exchangeName)
队列内存管理
新增队列
insertQueue(MSGQueue queue)
删除队列
deleteQueue(String queueName)
查询队列
MSGQueue getQueue(String queueName)
绑定内存管理
新增绑定
insertBinding(Binding binding)
删除绑定
deleteBinding(Binding binding)
查询指定交换机和指定队列的绑定
Binding getBinding(String exchangeName,String queueName)
查询指定交换机的所有绑定
ConcurrentHashMap<String, Binding> getBindings(String exchangName)
消息内存管理
消息中心内存管理
新增消息
addMessage(Message message)
删除消息
removeMessage(String messageId)
获取指定消息
Message getMessage(String messageId)
消息队列关联结构内存管理
发送消息到指定队列
sendMessage(String queueName,Message message)
从指定队列中获取一条消息(头删)
Message pollMessage(String queueName)
获取指定队列的消息个数
int getMessageCount(String queueName)
待确认消息管理
指定队列待确认消息+1
addMessageWaitAsk(String queueName,Message message)
指定队列待确认消息-1
removeMessageWaitAsk(String queueName,String messageId)
获取指定队列中的指定待确认消息
Message getMessageWaitAsk(String queueName,String messageId)
硬盘数据恢复
recovery(DiskDataCenter diskDataCenter)
清空所有内存数据
恢复所有交换机数据
恢复所有队列数据
恢复所有绑定数据
恢复所有消息数据
VirtualHost
核心字段
虚拟主机名
String virtualhostName
这里的虚拟主机名是预留字段,便于后续拓展虚拟主机管理模块
使用 虚拟主机名+交换机名/队列名 对数据进行命名,可以起到不同虚拟主机之间数据的隔离作用
硬盘实例
DiskDataCenter diskDataCenter
内存实例
MemoryDataCenter memoryDataCenter
规则实例
Router router
检验routingKey是否合法
checkRoutingKey(String routingKey)
数字、字母、下划线组成,由.进行分割
形如:aaa.bbb.ccc
检验bindingKey是否合法
checkBindingKey(String bindingKey)
数字、字母、下划线、*、#组成,由.分割
并且*和#必须为独立的部分
形如:aaa.*.bbb.#.CCC
Topic/Fanout转发规则
route(ExchangeType exchangeType,Binding binding,Message message)
Fanout交换机直接返回true
Topic交换机规则判断(routingKey和bindingKey进行匹配)
每个单词进行等值匹配,其中bindingKey中的通配符 * 表示匹配一个单词,# 表示零个或多个单词
消费管理实例
ConsumerManager consumerManager
创建一个存放令牌的阻塞队列(存放内部消息不为空的队列名)
BlockingQueue<String> tokenQueue
创建一个扫描线程,持续扫描令牌队列,进行消费
Thread scannerThread
创建一个线程池取处理consumer参数中的业务逻辑
ExecutorService workPool
核心 API
创建ConsumerManager实例
ConsumerManager(VirtualHost virtualHost)
初始化VirtualHost实例
实例化扫描线程,循环扫描令牌队列
根据队列名获取并判断队列是否有效有效,无效则抛异常
如果有效则调用consumeMessage()
添加队列名到阻塞队列(公开)
notifyConsume(String queueName)
给指定队列添加订阅者(公开)
addConsumerEnv(String consumerTag, String queueName, boolean autoAsk, Consumer consumer)
获取并判断队列是否有效,无效则抛出异常
有效则封装ConsumerEnv对象,并将其添加到队列中的List<ConsumerEnv>列表中
获取当前队列的消息数量,如果消息数量为0啥也不做
如果消息数量不为0,为n,则调用n次consumeMessage(queue),消费存在的消息
这个步骤是很有必要的:
1、如果生产者先上线,生产了一些消息后,消费者才上线,这样的话,上面的扫描线程拿到队列名后由于没有订阅者也无能为力,就直接把令牌中的队列名取走啥也不干。因此这里需要进行保底策略,如果出现上面的情况,就在这里进行消费。
2、如果是消费者先上线,也就是先有订阅,再生产消息,那么这里的消息数量就为空,这段逻辑也就不在起作用了
消费(处理)消息(私有)
consumeMessage(MSGQueue queue)
使用轮询的方式从队列中取出一个订阅者,不存在订阅者,暂不消费
从内存中取消息,不存在消息,暂不消费
使用线程池将消息传入到回调方法上
执行回调之前先将消息添加到待应答容器中
真正执行consumer回调函数
如果是自动应答,直接把消息删除即可(硬盘+消息中心内存数据+待确认内存数据)
如果是手动应答,这里就先不处理,等待后续消费者调用basicAck实现
核心 API
创建虚拟机
VirtualHost()
初始化虚拟机名
调用diskDataCenter.init()进行初始化
调用memoryDataCenter.recovery()恢复硬盘数据到内存
创建交换机
exchangeDeclare()
构造交换机名:虚拟主机名+交换机名
判断交换机是否已存在,存在则啥也不做
根据传入参数封装交换机对象
判断持久化:如果durable为true 写入数据库
将交换机写⼊内存
删除交换机
exchangeDelete()
构造交换机名:虚拟主机名+交换机名
判断交换机是否存在,不存在则抛异常
删除硬盘上的数据
删除内存数据
创建队列
queueDeclare()
构造队列名:虚拟主机名+队列名
判断队列是否已存在,存在则啥也不做
根据传入参数封装队列对象
判断持久化:如果durable为true 写入数据库
将队列写⼊内存
删除对列
queueDelete()
构造队列名:虚拟主机名+队列名
判断队列是否存在,不存在则抛异常
删除硬盘上的数据
删除内存数据
创建绑定
queueBind()
构造交换机名、构造队列名
判断绑定是否已存在,存在则啥也不做
调用router.checkBindingKey()判断bindingKey是否合法,不合法抛异常
判断交换机是否存在,不存在抛异常
判断队列是否存在,不存在抛异常
判断是否持久化,如果交换机和队列的durable均为true写入数据库
将绑定写入内存
删除绑定
queueUnbind()
构造交换机名、构造队列名
判断该绑定是否存在,如果不存在抛异常
删除硬盘数据(不管是否持久化都没关系)
删除内存数据
发送消息到指定交换机
basicPublish()
调用router.checkRoutingKey()检测routingKey是否合法,不合法则抛异常
判断交换机是否存在,不能存在则抛异常
封装sendMessage()方法
判断持久化,如果DeliverMode=2,调用diskDataCenter.sendMessage()
写入内存,调用memoryDataCenter.sendMessage()
调用consumerManager.notifyConsume()将队列名添加到阻塞令牌队列中
判断交换机类型
直接交换机 Direct
构造队列名:交换机名+routingKey
判断队列是否存在,不存在抛异常
根据传入参数封装消息对象
调用sendMessage()发送消息到指定队列
扇出交换机 Fanout &
主题交换机 Topic
获取交换机的所有绑定,如果不存在任何绑定则抛异常
遍历该交换机对应的所有绑定,如果绑定的队列不存在则跳过这次遍历
调用router.route()判定这个消息是否能转发给该队列
根据传入参数封装消息对象
调用sendMessage(),发送消息
订阅指定队列的消息(在指定队列中添加订阅者)
basicConsume()
构造队列名:虚拟主机名+队列名
调用consumerManager.addConsumerEnv()将参数consumerTag,queueName, autoAsk, consumer传入,在指定队列中添加一个消费者.
手动应答
basicAck()
查询队列,消息是否存在(包括消息中心和待确认集合),不存在则抛异常
删除硬盘上的数据
删除消息中心和确认集合中的内存消息数据
BrokerServer
核心属性
VirtualHost virtualHost
表⽰服务器持有的虚拟主机. 队列, 交换机, 绑定, 消息都是通过虚拟主机管理.当前程序只考虑⼀个虚拟主机的情况
ConcurrentHashMap<String, Socket> sessions
⽤来管理所有的客⼾端的连接. 记录每个客⼾端的 socket.
ServerSocket serverSocket
是服务器⾃⾝的 socket
ExecutorService executorService
这个线程池⽤来处理响应
boolean runnable
这个标志位⽤来控制服务器的运⾏停⽌
创建BrokerServer
BrokerServer(int port)
初始化serverSocket并绑定端口号
启动BrokerServer
start()
循环读取客户端连接
serverSocket.accept()
将处理连接的过程交给线程池
processConnection(clientSocket)
内部使用一个循环进行具体的处理
读取请求并解析
readRequest(dataInputStream)
读取4个字节的请求类型
读取4个字节的载荷长度
读取length的载荷
校验实际读取长度和读取的长度是否一致
封装并返回Request对象
根据请求构造响应
process(request,clientSocket)
对payload做一个初步解析(调用BinaryTool.toObject()先初步解析为BasicArgument,后续根据具体情况转为对应子类)
定义一个变量 ok 表示方法调用结果
根据request.getType()进行进一步解析
0x1 创建 channel
sessions.put();
0x2 关闭 channel
sessions.remove();
0x3 创建 exchange
将payload进一步解析为ExchangeDeclareArguments
调用virtualHost.exchangeDeclare()
0x4 销毁 exchange
将payload进一步解析为ExchangeDeleteArguments
调用virtualHost.exchangeDelete()
0x5 创建 queue
将payload进一步解析为QueueDeclareArguments
调用virtualHost.queueDeclare()
0x6 销毁 queue
将payload进一步解析为QueueDeleteArguments
调用virtualHost.queueDelete()
0x7 创建 binding
将payload进一步解析为
QueueBindArguments
调用virtualHost.queueBind()
0x8 销毁 binding
将payload进一步解析为QueueUnbindArguments
调用virtualHost.queueUnbind()
0x9 发送 message
将payload进一步解析为BasicPublishArguments
调用virtualHost.basicPublish()
0xa 订阅 message
将payload进一步解析为BasicConsumeArguments
调用virtualHost.basicCosume()
传入一个固定的consumer回调函数,表示将消息推送给订阅者的客户端
这里的consumerTag是订阅者的channelId,根据channelId找到对应的订阅者socket
构造响应数据SubScribeReturns
调用BinaryTool.toBytes()将响应数据转换为二进制数据 payload
构造 0xc响应 Response,并调用writeResponse()写回客户端
0xb 返回 ack
将payload进一步解析为
BasicAckArguments
调用virtualHost.basicAck()
将响应写回到客户端
writeResponse(dataOutputStream,response)
写入4个字节的响应类型
写入4个字节的响应长度
写入length的载荷
客户端连接关闭时清理session(私有)
clearClosedSession(Socket clientSocket)
将session表中value为clientSocket的session删除即可
停止BrokerServer
stop()
runnable = false
executorService.shutdownNow()
serverSocket.close()
客户端模块
创建连接的⼯⼚类,创建出连接 Connection 对象
ConnectionFactory
核心属性
BrokerServer 的 ip
String host
BrokerServer 的 port
int port
⽤⼾认证和多虚拟主机, ⽤⼾名密码
String virtualHost
String username
String password
核心方法
建⽴⼀个 tcp 连接
Connection newConnection()
Tcp连接
Connection
核心属性
客⼾端持有的套接字
Socket socket
socket通信接口
InputStream、OutputStream
DataOutputStream、DataInputStream
来管理该连接中所有的 Channel
ConcurrentHashMap<String,Channel> channelMap
客⼾端这边执⾏⽤⼾回调的线程池
ExecutorService executorService
核心方法
创建一个Tcp连接
Connection(String host,int post)
初始化socket和服务端建立连接
初始化通信接口
初始化线程池
创建一个扫描线程,负责不断地从socket中读取响应数据,分别交给对应的channel进行处理
响应读取
readResponse()
实现响应的分发
dispatchResponse(response)
response.getType=0xc
表示服务器推送的消息
进行消息解析,调用BinaryTool.toObject()反序列化为SubScribeReturns
根据channelId或consumerTag在channelMap中找到对应的channel对象
执行channel对象内部的回调(交给线程池)
response.getType!=0xc
表示客户端发送请求获取到的响应
进行响应解析,调用BinaryTool.toObject()反序列化为basicReturns
根据channelId在channelMap中找到对应channel对象
将响应结果放到对应channel的存储响应结果的哈希表中channel.putReturns(basicReturns)
创建一个channel对象
createChannel()
创建一个channel对象,并将其记录到当前Connection的channelMap中
调用channel中的方法在服务器端创建channel
如果服务器创建失败,需要将connection中的哈希表中删除
返回创建的Channel对象
发送请求
writeRequest(Request request)
写4个字节的请求类型
写4个字节的请求长度
写指定长度的请求载荷
刷新缓冲区,保证实时性
读取响应
readResponse()
读取4个字节的响应类型
读取4个字节的响应长度
读取指定长度的响应载荷
校验实际读取长度和读取的长度是否一致
封装并返回Response对象
关闭connection,释放资源
close()
executorService.shutdownNow();
channelMap.clear();
inputStream.close();
outputStream.close();
socket.close();
Channel
核心属性
channel标识 channelId
channel所属连接 connection
当前channel的响应结果 ConcurrentHashMap<String, BasicReturns> basicReturnsMap
当前channel的回调函数consumer(一个channel只能有一个回调函数)
核心方法
创建channel
Channel()
初始化channelId
初始化connection
channel发送请求后阻塞等待服务器的响应
waitResult(String rid)
使用循环持续不断的在basicReturnsMap中查询是否存在rid的响应,不存在则阻塞等待
存在则将basicReturnsMap中的响应对象删除,并返回响应结果basicReturns
向该channel对象对应的basicReturnsMap中添加一条响应结果,同时唤醒阻塞等待结果的channel对象
putReturns(BasicReturns basicReturns)
发送请求
0x1 创建 channel
createChannel()
0x2 关闭 channel
closeChannel()
0x3 创建 exchange
exchangeDeclare()
0x4 销毁 exchange
exchangeDelete()
0x5 创建 queue
queueDeclare()
0x6 销毁 queue
queueDelete()
0x7 创建 binding
queueBind()
0x8 销毁 binding
queueUnbind()
0x9 发送 message
basicPublish()
0xa 订阅 message
basicConsume()
0xb 返回 ack
basicAck()
0 条评论
下一页