rocketmq源码(六)——Broker端消息处理
2024-09-10 11:29:13 7 举报
Broker端消息处理是RocketMQ中的核心功能之一。RocketMQ中的Broker负责接收、存储和转发消息。在Broker端,消息处理主要包括以下几个步骤: 1. 接收消息:Broker接收来自生产者或者消费者的消息,这些消息可能是同步或者异步发送的。接收消息时,Broker需要检查消息的合法性,如消息的大小、格式等。 2. 存储消息:Broker将接收到的消息存储在磁盘或者内存中。存储消息时,Broker需要保证消息的顺序性和一致性。RocketMQ采用CopyOnWrite策略,确保消息的可靠性和性能。 3. 转发消息:Broker将接收到的消息转发给相应的消费者。在转发消息时,Broker需要根据消息的Topic和队列信息,找到相应的消费者,并将消息发送给消费者。 4. 消息过滤:Broker可以根据预先设定的过滤规则,对消息进行过滤,以满足业务需求。 5. 消息查询:Broker支持消息查询功能,用户可以根据消息ID、Topic等信息查询消息。 Broker端消息处理涉及到多种文件类型,包括配置文件、日志文件、数据文件等。其中,配置文件主要包括broker.conf和users.properties等,用于配置Broker的运行参数和访问控制策略。日志文件主要包括broker.log等,记录Broker的运行情况和错误信息。数据文件主要包括commitlog、consumequeue、index等,用于存储消息的元数据和内容。 在Broker端消息处理过程中,需要使用多种修饰语来描述消息的状态和处理过程。例如,uncommitted表示消息尚未写入到磁盘;flushed表示消息已经写入到磁盘;filtered表示消息已被过滤;expired表示消息已过期等。通过这些修饰语,可以方便地跟踪和管理消息的处理过程。
作者其他创作
大纲/内容
CommitLog.this.mappedFileQueue.flush(flushPhysicQueueLeastPages);
开启线程任务:刷盘操作flushCommitLogService.start();
唤醒线程,进行刷盘操作service.wakeup();
异步刷盘this.flushCommitLogService = new FlushRealTimeService();
Netty服务处理请求入口NettyServerHandler.channelRead0()
false
true
处理完消息后响应response
DefaultMessageStore.this.putMessagePositionInfo(request);
SendMessageProcessor
mappedFile为空或者写满时创建新的mappedFilemappedFile = this.mappedFileQueue.getLastMappedFile(0);
this.doFlush(1);
processor处理器在broker启动时注册到Table的
向客服端响应请求结果ctx.writeAndFlush(response);
ReputMessageService
this.indexService.load(lastExitOK);
创建MappedFileQueuethis.mappedFileQueue = new MappedFileQueue()
result = cq.flush(flushConsumeQueueLeastPages);
this.commitLog.start();
File dir = new File(this.storePath); File[] files = dir.listFiles();
校验MessageStore状态this.checkStoreStatus();
写队列添加刷盘请求this.requestsWrite.add(request);
this.fileName = fileName; this.fileSize = fileSize; this.file = new File(fileName); this.fileFromOffset = Long.parseLong(this.file.getName());
DefaultMessageStore.this.doDispatch(dispatchRequest);
this.indexService = new IndexService(this);
CommitLog
this.doReput();
负责将消费队列中的数据刷新到磁盘this.flushConsumeQueueService = new FlushConsumeQueueService();
将上一步的ByteBuffer赋值到msg的encodedBuff中msg.setEncodedBuff(putMessageThreadLocal.getEncoder().getEncoderBuffer());
添加到mappedFiles集合中this.mappedFiles.add(mappedFile);
indexService.buildIndex(request);
根据请求code拿到对应的处理器和处理线程matched = this.processorTable.get(cmd.getCode());
this.commitLog.asyncPutMessage(msg)
FlushConsumeQueueService
while (!this.isStopped())
按照commitLog日志文件名排序files.sort(Comparator.comparing(File::getName));
this.waitForRunning(interval);
flushCommitLogTimed== true
topic配置校验TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
同步刷盘GroupCommitService
同步刷盘等待日志写入被唤醒,最多等待10msthis.waitForRunning(10);
ConsumeQueue和index存盘this.reputMessageService = new ReputMessageService();
回调callback方法callback(RemotingCommand response)
File dir = new File(this.storePath);File[] ls = dir.listFiles();
this.flushConsumeQueueService.start();
更加Offset找到MappedFileMappedFile mappedFile =findMappedFileByOffset
tables = DefaultMessageStore.this.consumeQueueTable;
将消息编码到ByteBuffer中putMessageThreadLocal.getEncoder().encode(msg);
new DefaultMessageStore()构造函数
this.loadConsumeQueue();
run()
是否是同步刷盘== FlushDiskType.SYNC_FLUSH
new CommitLog(this);
service.putRequest(request);
刷新磁盘result = cq.flush(flushConsumeQueueLeastPages);
找到最后一个queue文件MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(expectLogicOffset);
return topicConfig;
校验messageStore 没有停止slave 不能写数据磁盘是否可写 页缓存是否忙
异步刷新间隔Thread.sleep(interval);
result
创建ByteBufferByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();
rocketmq 自动创建topic逻辑
写入内存中映射的文件缓冲区mappedFile.appendMessage(this.byteBufferIndex.array())
boolean flushCommitLogTimed = CommitLog.this.defaultMessageStore.getMessageStoreConfig().isFlushCommitLogTimed();
asyncPutMessage
responseCallback.callback(response);
异步刷新等待方式flushCommitLogTimed:true: sleep方式阻塞线程等待false: 使用CountDownLatch2自定义方式控制线程唤醒
处理CommitLog提交日志this.commitLog = new CommitLog(this);
mappedFileQueue
// 获取最后一个提交日志文件MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
broker初始化时调用mappedFileQueue.load()
消息结构转换,方便后续对消息的处理SendMessageRequestHeader requestHeader = parseRequestHeader(request);
开启线程刷盘IndexService.this.flush(flushThisFile);
dispatcher.dispatch(req);
MappedFile
new MappedFileQueue()
TopicConfig topicConfig =selectTopicConfig()
创建topiccreateTopicInSendMessageMethod
组装内部消息格式MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
load()加载Index索引文件
找到ConsumeQueueConsumeQueue cq =findConsumeQueue()
for (File file : files)
校验topic长度最大127消息的properties的key和value的长度最大为32767
校验请求消息this.checkMessage(msg)
broker服务启动时调用DefaultMessageStore.start()
写入切片缓冲区byteBuffer.put(preEncodeBuffer);
以defaultTopicConfig为模版创建自己的topic,前提是设置了autoCreateTopicEnable为truetopicConfig = new TopicConfig(topic);
run():ConsumeQueue文件刷盘
doLoad(Arrays.asList(ls))
CopyOnWriteArrayList<MappedFile> mappedFiles
同步还是异步刷盘==FlushDiskType.SYNC_FLUSH?
处理消息putMessageResult =this.brokerController.getMessageStore().asyncPutMessage(msgInner);
null == topicConfig
Thread.sleep(1);
AllocateMappedFileService allocateMappedFileService
持久化topic配置this.persist();
Broker处理producer端发送的消息(sync同步发送为例)
int mappedFileSize
唤醒刷盘线程waitPoint.countDown();
BuildConsumeQueue
cq.putMessagePositionInfoWrapper()
load()
条件等待唤醒this.waitForRunning(interval);
mappedFile.flush(flushLeastPages);
CommitLog.this.mappedFileQueue.flush(0);
this.commitLog.load();
得到默认的TBW102自动创建topic的配置TopicConfig defaultTopicConfig = this.topicConfigTable.get(defaultTopic);
将内存映射缓冲区中的数据强制写入磁盘this.mappedByteBuffer.force();
null == mappedFile || mappedFile.isFull()
indexService
TopicConfigManager
同步刷盘this.flushCommitLogService = new GroupCommitService();
MappedByteBuffer.slice() 是 Java NIO 中的一个方法,用于创建一个新的 MappedByteBuffer,该新缓冲区与原缓冲区共享相同的内存区域,但具有不同的位置、限制和标记。slice() 方法通常用于在同一个内存映射文件的不同部分之间创建视图。
indexFile =getAndCreateLastIndexFile
获取配置的等待超时时间,默认为500int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushIntervalCommitLog();
asyncPutMessage(msgInner);
拿出之前生成好的msg的ByteBufferByteBuffer preEncodeBuffer = msgInner.getEncodedBuff();
commitLog日志路径String storePath;
this.indexFileList.add(f);
this.reputMessageService.start();
processRequest
DefaultMessageStore
IndexFile indexFile = retryGetAndCreateIndexFile();
管理内存映射文件分配的服务this.allocateMappedFileService = new AllocateMappedFileService(this);
run()构建ComsumQueue和Index文件内容
appendMessage()
for (ConsumeQueue cq : maps.values())
提交刷盘this.doCommit();
生成responsefinal RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);
异步刷盘FlushRealTimeService
0 条评论
下一页