rocketmq源码之broker处理生产者消息
2022-09-20 14:47:58 0 举报
rocketmq源码之broker处理生产者消息
作者其他创作
大纲/内容
合并刷盘和同步salve的结果
service.putRequest(request);
Container
处理请求分支
构建msgInner,为存盘做准备
asyncPutMessage(msg)
waitForPutResult(asyncPutMessage(msg));
构建一个RequestTask来提交任务
写commitLog日志
pair.getObject1().rejectRequest()
8、sysFlag(4bytes)
14、PreparedTransactionOffset(8bytes)
this.commitLog.asyncPutMessage(msg);
同步发送的存盘
PutMessageResult encodeResult = putMessageThreadLocal.getEncoder().encode(msg);
1
16、topicLenth(1byte)+topicData
DefaultFlushManager
run
追加数据
13、reconsumeTimes(4bytes)
SendMessageProcessor#sendMessage
4、queueId(4bytes)
将消息发给slave并同步等待ack
9、BornTimestamp(8bytes)
CommitLog#doAppend
flushResultFuture.thenCombine
17、propertiesLength(2bytes)+propertiesData
SendMessageRequestHeader requestHeader = parseRequestHeader(request);
return request.future();
needAckNums是否需要收到slave的ack才返回
异步
putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
10、bornHost
pair.getObject2().submit(requestTask);
使用给定的值返回
channelRead0
waitForPutResult()
拒绝请求的几种情况:1、从broker正在故障转移,还不能代替主broker,并且当前是从broker2、系统页缓存繁忙,或者缓冲池不足
处理单挑消息
加入requestsWrite并唤醒刷盘线程
赋值给MessageExtBrokerInner的encoderBuffer
7、physicalOffset(8bytes)
commitLogService.wakeup();
对消息进行编码
2、magicCode(4bytes)
flushDiskWatcher.add(request);
HAService haService = this.defaultMessageStore.getHaService();
NettyServerHandler
是否拒绝请求
return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
刷盘
5、flag(4bytes)
SendMessageProcessor#processRequest
这里pair.getObject1()就是SendMessageProcessor实例
同步
将消息编码成这种结构赋值给MessageExtEncoder.byteBuf
6、queueOffset(8bytes)
1、msgLen(4bytes)
2、如果消息处理完毕需要进行刷盘和同步salve
case REQUEST_COMMAND:
监控刷盘是否超时
waitPoint.countDown(); // notify
从processorTable中获取处理器
3、bodyCRC(4bytes)
返回结果
同步给slave
Broker处理数据的入口先分析同步发送
15、bodylength(4bytes)+bodyData
11、StoreTimestamp(8bytes)
12、storeHostAddress
0 条评论
下一页