消息队列
2023-11-08 16:36:02 3 举报
AI智能生成
基于RabbitMQ的消息队列设计
作者其他创作
大纲/内容
上述我们规定使用vIrtualHostName+exchangeNamevIrtualHostName+queueName划分虚拟主机和他们内部子模块的从属关系,同时保证了不同的虚拟主机内部可以存在同名交换机队列等。
可以在数据内存管理模块中创建一个哈希表用来表示虚拟主机,同时可以进行持久化操作,便于重启恢复
根据数据结构可以增加虚拟主机的增删查改操作
虚拟主机管理
可以拓展用户名密码字段,验证用户身份进一步保证数据安全
用户管理
应⽤场景1:当消息在消费过程中出现异常,就会把消息投⼊到死信队列中,进行进一步处理
可以考虑设置⼀个死信队列,每⼀个队列都会绑定⼀个死信队列。
死信队列
拓展思路
创建函数式接口 Consumer
回调函数
队列每次以轮询的方式获取消费者,进行具体消费
消费环境
消费相关
回调函数用于消息的的业务处理
创建一个ByteArrayOutputStream()流对象,这个流对象相当于一个变长的字节数组
调用objectOutputStream.writeObject(object)
返回序列化结果byteArrayOutputStream.toByteArray()
序列化
创建一个ByteArrayInputStream(data)流对象
调用objectInputStream.readObject()将数据反序列化为Object对象
反序列化
这里使用Java标准库提供了序列化⽅案 ObjectInputStream 和 ObjectOutputStream
参与序列化的对象需要实现Serializable接口
序列化/反序列化
TCP
传输层协议
payload 表⽰这次⽅法调⽤的各种参数信息
请求
payload 表⽰这次⽅法调⽤的返回值
响应
自定义协议格式
0x1 创建 channel
0x2 关闭 channel
0x3 创建 exchange
0x4 销毁 exchange
0x5 创建 queue
0x6 销毁 queue
0x7 创建 binding
0x8 销毁 binding
0x9 发送 message
0xa 订阅 message
0xb 返回 ack
0xc 服务器给客⼾端推送的消息. (被订阅的消息) 响应独有的.
type
请求类型 int type
请求载荷长度 int length
用于匹配请求/响应的标识: String rid
用于表示发送请求的channel: String channelId
定义参数⽗类 BasicArguments
请求载荷 byte[] payload
交换机名 exchangeName
交换机类型 exchangeType
持久化 durable
自动删除 autoDelete
额外参数 arguments
创建交换机 ExchangeDeclareArguments
删除交换机 ExchangeDeleteArguments
队列名 Queuename
是否独占 exclusive
创建队列 QueueDeclareArguments
队列名 queueName
删除队列 QueueDeleteArguments
绑定键 bindingKey
创建绑定 QueueBindArguments
删除绑定 QueueUnbindArguments
路由键 routingKey
消息属性 basicProperties
消息体 body
发送消息 BasicPublishArguments
消费者标识 consumerTag
自动应答 autoAck
在订阅消息这块并没有定义一个属性表示回调函数consumer,这是因为回调函数是不能通过网络进行传输的,这里我们会服务端写一个固定的回调换算表示将消息推送给客户端。
订阅消息 BasicConsumeArguments
消息Id messageId
应答消息 BasicAckArguments
定义子类(每个具体的VirtualHost方法)
父类子类均需要实现Serializable接口,它们都需要序列化为二进制数据填入payload
定义Request
响应类型 int type
响应载荷长度 int length
用于表示获取响应的channel: String channelId
表示这次API调用的结果:boolean ok
定义响应父类 BasicReturns
响应载荷 byte[] payload
消息主体 body
对于这个父类来说只需要派生一个子类用来表示服务器给客户端推送的消息. SubScribeReturns
定义Response
自定义应用层协议
通信协议
公共模块
直接交换机 Direct
扇出交换机 Fanout
主题交换机 Topic
交换机类型 type
交换机 Exchange
队列 MSGQueue
绑定 Binding
Binding 依赖于Exchange 和 MSGqueue
持久化 deliverMode
消息内容 body
定位属性 offsetBeg
定位属性 offsetEnd
是否有效 isValid(1:有效 0:无效)
消息 Message
这两个属性用于记录一条消息在硬盘中的起始位置和结束位置,便于定位到文件中指定消息位置
Broker 核心概念定义
这里使用 SQlite 而不是 MySQL,因为相比之下SQlite更轻量,消息队列作为一个中间件,选用 SQlite 更为合适
导入相关依赖
引入SQlite
创建Mapper接口
SQlite 连接配置
Mybatis 路径配置
添加相关配置
数据库已存在啥也不做
数据库不存在则创建数据库目录 ./data
不需要手动创建 meta.db,MyBatis 会帮我们创建
checkDBExists()
创建交换机表
创建队列表
创建绑定表
createTable()
添加一个匿名交换机
createDefaultData()
inti()
调用 Mapper 接口
数据库初始化
insertExchange(Exchange)
deleteExchange(String)
selectAllExchanges()
交换机
insertQueue(MSGQueue)
deleteQueue(String)
selectAllQueues()
队列
insertBinding(Binding)
deleteBinding(Binding)
selectAllBindings()
绑定
数据库管理 DataBaseManager
消息操作只设计简单的读写,不涉及复杂的查询,并且消息的数量会很多,使用数据库效率并不高。
String getQueueDirPath(String queueName)
获取指定队列消息文件所在路径
String getQueueStatPath(String queueName)
获取指定对列统计文件所在路径
String getQueueDataPath(String queueName)
获取指定队列数据文件所在路径
boolean checkFilesExits(String queueName)
检查队列对应的目录和文件是否存在
创建队列对应的消息目录
创建统计文件
创建数据文件
初始化统计文件(0\t0)
createQueueFiles(String queueName)
创建队列对应的文件和目录
先删除文件
后删除目录
destoryQueueFiles(String queueName)
删除队列对应的文件和目录
定义消息文件目录读取的辅助方法
分支主题
消息是依托于队列的,因此存储的时候,就要把 消息 按照 队列 维度展开。在 data 中创建⼀些⼦⽬录,每个队列对应⼀个⼦⽬录,⼦⽬录名就是队列名
queue_data.txt:消息数据文件
queue_stat.txt:消息统计文件
每个队列中规定存在两个文件
规定消息存储格式
Stat 内部类:用于表示该队列的统计信息
判断文件是否存在
将stat文件信息读取到Stat对象中
Stat readStat(String queueName)
读取统计文件
将stat对象中的数据写入到stat文件中(覆盖写入)
刷新缓冲区
写入统计文件
统计文件的读写
校验写入文件目录是否存在
调用 BinaryTool.toBytes(message) 将消息对象序列化为二进制数据
计算offsetBeg 和 offsetEnd 便于定位消息在文件中的位置
调用writeInt()保证写入的是4个字节的消息长度
调用write()写入消息体
使用 DataOutputStream 进行写入(追加写)
有效数据个数+1
总数据个数+1
更新统计文件
在文件中添加消息
调用randomAccessFile.seek(int)调整光标位置
从光标位置读取指定长度数据
调用 BinaryTool.toObject() 将读取到的二进制数据反序列化为Message对象
将读取到的Message对象中的isValid置为无效,并调用BinaryTool.toBytes()重新序列化数据
调用randomAccessFile.seek(int)重新调整光标位置
将修改过的数据重新写入到原来的位置
使用 RandomAccessFile 它提供了随机访问文件的能力,可以在文件中任意位置进行读写操作
根据传入 Message 对象中的offsetBeg和offsetEnd 在文件中读取数据
删除文件中的指定消息(逻辑删除)
读取上述指定长度的数据作为消息的主体
校验实际读取长度是否和消息长度一致,避免格式错误
调用readInt()保证读取的是4个字节的消息长度
调用BinaryTool.toObject()将读取到的消息反序列化为Message对象
无效直接跳过
有效则设置消息的offsetBeg和offsetEnd并添加到返回链表中
判定一下这个消息对象是否为有效对象
使用DataOutPutStream读取到文件末尾会抛出异常:EOFException.借助异常结束循环
使用DataOutPutStream 进行死循环读取
LinkedList<Message> loadAllMessageFromFile(String queueName)
获取有效消息集合(服务器重启时执行)
数据文件的读写删
需要针对修改的队列进行加锁,涉及到多个客户端操作
根据有效消息与总消息的比例判断是否进行GC
boolean checkGC(String queueName)
GC条件
复制算法:遍历原有的消息数据⽂件,把所有的有效数据数据重新拷⻉⼀份到新的⽂件中,然后将新文件重命名为原文件名,最后把旧的⽂件直接删除掉。
GC策略
判断文件副本是否已存在(如果已存在说明上次GC程序出错)
创建一个文件副本,用于存储有效消息
调用loadAllMessageFromFile()方法,获取所有信息
调用writeInt()写入消息长度
调用write()写入消息内容
同样使用DataOutputStream进行写入操作
把有效消息写入到文件副本中
删除旧的data文件
将新data文件重命名
GC实现
需要对GC队列加锁
垃圾回收GC
文件管理 MessageFileManager
数据硬盘管理 DiskDataCenter
key表示交换机名
交换机结构
key表示队列名
队列结构
绑定结构
key表示消息Id
消息中心结构
消息队列关联结构
第一个key表示队列名,第二个key表示消息Id
待确认消息结构
内存数据结构设计
这里的基本数据结构使用的是ConcurrentHashMap而不是普通的哈希表,目的就是为了保证多线程下线程安全
insertExchange(Exchange exchange)
新增交换机
deleteExchange(String exchangeName)
删除交换机
Exchange getExchange(String exchangeName)
查询交换机
交换机内存管理
insertQueue(MSGQueue queue)
新增队列
deleteQueue(String queueName)
删除队列
MSGQueue getQueue(String queueName)
查询队列
队列内存管理
insertBinding(Binding binding)
新增绑定
deleteBinding(Binding binding)
删除绑定
查询指定交换机和指定队列的绑定
查询指定交换机的所有绑定
绑定内存管理
添加绑定依赖判定条件,为避免value覆盖此处需要以当前的 bindingMap 为锁对象进⾏加锁!对于删除绑定来说,不存在线程安全问题,因为哈希表不存在重复删除的问题。
addMessage(Message message)
新增消息
removeMessage(String messageId)
删除消息
Message getMessage(String messageId)
获取指定消息
消息中心内存管理
发送消息到指定队列
Message pollMessage(String queueName)
从指定队列中获取一条消息(头删)
int getMessageCount(String queueName)
获取指定队列的消息个数
消息队列关联结构内存管理
由于此时增加删除消息操作的是LinkedList,所以为保证线程安全,需要加锁
消息内存管理
指定队列待确认消息+1
指定队列待确认消息-1
获取指定队列中的指定待确认消息
待确认消息管理
父结构和子结构均为ConcurrentHashMap,不存在线程安全问题
清空所有内存数据
恢复所有交换机数据
恢复所有队列数据
恢复所有绑定数据
恢复所有消息数据
recovery(DiskDataCenter diskDataCenter)
硬盘数据恢复
这个方法主要用于重启时将持久化数据恢复到内存中。这里不涉及未确认消息恢复,一旦在等待ACK的过程中服务器重启,此时这些未确认的消息就被恢复成未取走的消息,由消费者重新消费.
数据内存管理 MemoryDataCenter
这里的虚拟主机名是预留字段,便于后续拓展虚拟主机管理模块
使用 虚拟主机名+交换机名/队列名 对数据进行命名,可以起到不同虚拟主机之间数据的隔离作用
String virtualhostName
虚拟主机名
DiskDataCenter diskDataCenter
硬盘实例
MemoryDataCenter memoryDataCenter
内存实例
数字、字母、下划线组成,由.进行分割形如:aaa.bbb.ccc
checkRoutingKey(String routingKey)
检验routingKey是否合法
数字、字母、下划线、*、#组成,由.分割并且*和#必须为独立的部分形如:aaa.*.bbb.#.CCC
checkBindingKey(String bindingKey)
检验bindingKey是否合法
Fanout交换机直接返回true
每个单词进行等值匹配,其中bindingKey中的通配符 * 表示匹配一个单词,# 表示零个或多个单词
Topic交换机规则判断(routingKey和bindingKey进行匹配)
Topic/Fanout转发规则
Router router
规则实例
BlockingQueue<String> tokenQueue
创建一个存放令牌的阻塞队列(存放内部消息不为空的队列名)
Thread scannerThread
创建一个扫描线程,持续扫描令牌队列,进行消费
ExecutorService workPool
创建一个线程池取处理consumer参数中的业务逻辑
初始化VirtualHost实例
实例化扫描线程,循环扫描令牌队列
根据队列名获取并判断队列是否有效有效,无效则抛异常
如果有效则调用consumeMessage()
ConsumerManager(VirtualHost virtualHost)
由于consumeMessage()内部操作List<ConsumerEnv>集合,这里需要加锁
创建ConsumerManager实例
notifyConsume(String queueName)
添加队列名到阻塞队列(公开)
获取并判断队列是否有效,无效则抛出异常
有效则封装ConsumerEnv对象,并将其添加到队列中的List<ConsumerEnv>列表中
获取当前队列的消息数量,如果消息数量为0啥也不做
这个步骤是很有必要的:1、如果生产者先上线,生产了一些消息后,消费者才上线,这样的话,上面的扫描线程拿到队列名后由于没有订阅者也无能为力,就直接把令牌中的队列名取走啥也不干。因此这里需要进行保底策略,如果出现上面的情况,就在这里进行消费。2、如果是消费者先上线,也就是先有订阅,再生产消息,那么这里的消息数量就为空,这段逻辑也就不在起作用了
如果消息数量不为0,为n,则调用n次consumeMessage(queue),消费存在的消息
这里同样涉及到操作List<ConsumerEnv>,需要加锁
给指定队列添加订阅者(公开)
使用轮询的方式从队列中取出一个订阅者,不存在订阅者,暂不消费
从内存中取消息,不存在消息,暂不消费
执行回调之前先将消息添加到待应答容器中
真正执行consumer回调函数
如果是自动应答,直接把消息删除即可(硬盘+消息中心内存数据+待确认内存数据)
如果是手动应答,这里就先不处理,等待后续消费者调用basicAck实现
使用线程池将消息传入到回调方法上
consumeMessage(MSGQueue queue)
消费(处理)消息(私有)
核心 API
ConsumerManager consumerManager
消费管理实例
构造方法中传入VirtualHost实例,内部调用VirtualHost实例中的getter方法,操作diskDataCenter和memoryDataCenter,也就是说ConsumerManager和VirtualHost共享同一份数据
核心字段
VirtualHost使用这两个字段实现对内部数据的管理
初始化虚拟机名
调用diskDataCenter.init()进行初始化
调用memoryDataCenter.recovery()恢复硬盘数据到内存
VirtualHost()
创建虚拟机
构造交换机名:虚拟主机名+交换机名
判断交换机是否已存在,存在则啥也不做
根据传入参数封装交换机对象
判断持久化:如果durable为true 写入数据库
将交换机写⼊内存
exchangeDeclare()
创建交换机
判断交换机是否存在,不存在则抛异常
删除硬盘上的数据
删除内存数据
exchangeDelete()
构造队列名:虚拟主机名+队列名
判断队列是否已存在,存在则啥也不做
根据传入参数封装队列对象
将队列写⼊内存
queueDeclare()
创建队列
判断队列是否存在,不存在则抛异常
queueDelete()
删除对列
构造交换机名、构造队列名
判断绑定是否已存在,存在则啥也不做
调用router.checkBindingKey()判断bindingKey是否合法,不合法抛异常
判断交换机是否存在,不存在抛异常
判断队列是否存在,不存在抛异常
判断是否持久化,如果交换机和队列的durable均为true写入数据库
将绑定写入内存
queueBind()
创建绑定
判断该绑定是否存在,如果不存在抛异常
删除硬盘数据(不管是否持久化都没关系)
queueUnbind()
此时无论绑定对应的交换机和队列是否存在都进行删除.因为有一种情况是如果先删除了队列/交换机,再删除绑定就会失败
调用router.checkRoutingKey()检测routingKey是否合法,不合法则抛异常
判断交换机是否存在,不能存在则抛异常
判断持久化,如果DeliverMode=2,调用diskDataCenter.sendMessage()
写入内存,调用memoryDataCenter.sendMessage()
调用consumerManager.notifyConsume()将队列名添加到阻塞令牌队列中
封装sendMessage()方法
构造队列名:交换机名+routingKey
根据传入参数封装消息对象
调用sendMessage()发送消息到指定队列
获取交换机的所有绑定,如果不存在任何绑定则抛异常
遍历该交换机对应的所有绑定,如果绑定的队列不存在则跳过这次遍历
调用router.route()判定这个消息是否能转发给该队列
调用sendMessage(),发送消息
扇出交换机 Fanout &主题交换机 Topic
判断交换机类型
basicPublish()
发送消息到指定交换机
basicConsume()
订阅指定队列的消息(在指定队列中添加订阅者)
查询队列,消息是否存在(包括消息中心和待确认集合),不存在则抛异常
删除消息中心和确认集合中的内存消息数据
basicAck()
手动应答
为保证多线程下线程安全,针对exchangeLocker对象加锁
为保证多线程下线程安全,针对queueLocker对象加锁
先后针对exchangeLocker对象、queueLocker对象进行加锁,保证加锁顺序一致,避免死锁
生产消费体系 ConsumerManager
VirtualHost
VirtualHost virtualHost
⽤来管理所有的客⼾端的连接. 记录每个客⼾端的 socket.
是服务器⾃⾝的 socket
ServerSocket serverSocket
这个线程池⽤来处理响应
ExecutorService executorService
这个标志位⽤来控制服务器的运⾏停⽌
boolean runnable
核心属性
初始化serverSocket并绑定端口号
BrokerServer(int port)
创建BrokerServer
serverSocket.accept()
循环读取客户端连接
读取4个字节的请求类型
读取4个字节的载荷长度
读取length的载荷
校验实际读取长度和读取的长度是否一致
封装并返回Request对象
readRequest(dataInputStream)
读取请求并解析
对payload做一个初步解析(调用BinaryTool.toObject()先初步解析为BasicArgument,后续根据具体情况转为对应子类)
定义一个变量 ok 表示方法调用结果
sessions.put();
sessions.remove();
将payload进一步解析为ExchangeDeclareArguments
调用virtualHost.exchangeDeclare()
将payload进一步解析为ExchangeDeleteArguments
调用virtualHost.exchangeDelete()
将payload进一步解析为QueueDeclareArguments
调用virtualHost.queueDeclare()
将payload进一步解析为QueueDeleteArguments
调用virtualHost.queueDelete()
将payload进一步解析为QueueBindArguments
调用virtualHost.queueBind()
将payload进一步解析为QueueUnbindArguments
调用virtualHost.queueUnbind()
将payload进一步解析为BasicPublishArguments
调用virtualHost.basicPublish()
将payload进一步解析为BasicConsumeArguments
调用virtualHost.basicCosume()
这里的consumerTag是订阅者的channelId,根据channelId找到对应的订阅者socket
构造响应数据SubScribeReturns
调用BinaryTool.toBytes()将响应数据转换为二进制数据 payload
构造 0xc响应 Response,并调用writeResponse()写回客户端
传入一个固定的consumer回调函数,表示将消息推送给订阅者的客户端
将payload进一步解析为BasicAckArguments
调用virtualHost.basicAck()
根据request.getType()进行进一步解析
根据请求构造响应
写入4个字节的响应类型
写入4个字节的响应长度
写入length的载荷
将响应写回到客户端
由于涉及到多个部分的写入,为了避免写入单个数据不完整,这里需要加锁
内部使用一个循环进行具体的处理
processConnection(clientSocket)
借助EOFException关闭连接
将处理连接的过程交给线程池
start()
启动BrokerServer
将session表中value为clientSocket的session删除即可
clearClosedSession(Socket clientSocket)
客户端连接关闭时清理session(私有)
runnable = false
executorService.shutdownNow()
serverSocket.close()
stop()
停止BrokerServer
BrokerServer
服务器模块
String host
BrokerServer 的 ip
int port
BrokerServer 的 port
String virtualHost
String username
String password
暂不实现
Connection newConnection()
建⽴⼀个 tcp 连接
核心方法
ConnectionFactory
创建连接的⼯⼚类,创建出连接 Connection 对象
Socket socket
客⼾端持有的套接字
InputStream、OutputStream
DataOutputStream、DataInputStream
socket通信接口
来管理该连接中所有的 Channel
客⼾端这边执⾏⽤⼾回调的线程池
初始化socket和服务端建立连接
初始化通信接口
初始化线程池
readResponse()
响应读取
进行消息解析,调用BinaryTool.toObject()反序列化为SubScribeReturns
根据channelId或consumerTag在channelMap中找到对应的channel对象
执行channel对象内部的回调(交给线程池)
表示服务器推送的消息
response.getType=0xc
进行响应解析,调用BinaryTool.toObject()反序列化为basicReturns
根据channelId在channelMap中找到对应channel对象
将响应结果放到对应channel的存储响应结果的哈希表中channel.putReturns(basicReturns)
表示客户端发送请求获取到的响应
response.getType!=0xc
dispatchResponse(response)
实现响应的分发
创建一个扫描线程,负责不断地从socket中读取响应数据,分别交给对应的channel进行处理
创建一个Tcp连接
创建一个channel对象,并将其记录到当前Connection的channelMap中
调用channel中的方法在服务器端创建channel
如果服务器创建失败,需要将connection中的哈希表中删除
返回创建的Channel对象
createChannel()
创建一个channel对象
写4个字节的请求类型
写4个字节的请求长度
写指定长度的请求载荷
刷新缓冲区,保证实时性
writeRequest(Request request)
发送请求
读取4个字节的响应类型
读取4个字节的响应长度
读取指定长度的响应载荷
封装并返回Response对象
读取响应
executorService.shutdownNow();
channelMap.clear();
inputStream.close();
outputStream.close();
socket.close();
close()
关闭connection,释放资源
Connection
Tcp连接
channel标识 channelId
channel所属连接 connection
当前channel的回调函数consumer(一个channel只能有一个回调函数)
初始化channelId
初始化connection
Channel()
创建channel
使用循环持续不断的在basicReturnsMap中查询是否存在rid的响应,不存在则阻塞等待
存在则将basicReturnsMap中的响应对象删除,并返回响应结果basicReturns
waitResult(String rid)
channel发送请求后阻塞等待服务器的响应
putReturns(BasicReturns basicReturns)
向该channel对象对应的basicReturnsMap中添加一条响应结果,同时唤醒阻塞等待结果的channel对象
closeChannel()
1.封装载荷部分2.封装请求对象3.发送请求4.等待服务器响应5.返回结果
Channel
客户端模块
消息队列
0 条评论
回复 删除
下一页