rocketmq源码之生产者发送消息流程
2022-09-20 14:46:30 0 举报
rocketmq源码之生产者发送消息流程
作者其他创作
大纲/内容
mQClientFactory.start();
发送之前需要再次计算超时时间
this.mQClientFactory.getMQClientAPIImpl().sendMessage
先从本地获取
this.remotingClient.start();
发送之前计算一下超时时间(设置的超时时间减去发送之前花费的时间)
sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage
处理消息状态
this.defaultMQProducerImpl.send(msg);
扩展点:实现接口MessageQueueSelector,重写select方法(作为入参赋值给生产者属性)可以自定义选择mq
responseFuture.executeInvokeCallback();
看下NettyClientHandler的逻辑
同步,异步发送有重试,缺省是重试两次
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
operationComplete
获取brokerName和BrokerAddress(选择master)
topicEndPointsTable:获取brokerNamebrokerAddrTable:获取brokerAddress
sendCallback.onSuccess(sendResult);
携带发送的方式:同步
response.getCode()
否调用自定义的回调函数入参就是返回的结果
final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
看下socket返回response是如何调用回调函数的
DefaultMQProducer#SendResult send(Message msg)
SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
构建SendResult返回
从responseTable中获取responseFuture
扩展点:消息检查。实现接口CheckForbiddenHook。 加入checkForbiddenHookList中,从而触发这个检查
这里实现了同步重试机制
设置VIPchannel
case ASYNC:
channel.writeAndFlush(request)
DefaultMQProducer#start()
唤醒调用countDownLatch.await的线程
处理返回结果
这里取到SendResult
调用new InvokeCallback()#operationComplete
this.mQClientAPIImpl.start();
if (this.tryToCompressMessage(msg)) { sysFlag |= MessageSysFlag.COMPRESSED_FLAG; sysFlag |= compressType.getCompressionFlag(); msgBodyCompressed = true; }
发送消息
可以扩展
其实先调用InvokeCallbackWrapper#operationComplete
同步发送
TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
生产者启动
选择不同于上次broker进行发送
case SYNC:
异步主要看下回调机制
处理业务的handlerNettyClientHandler
扩展点:通过设置参数com.rocketmq.sendMessageWithVIPChannel=ip:port可以设置指定的broker来处理消息
延迟消息判断
if (null == this.clientConfig.getNamesrvAddr()) { this.mQClientAPIImpl.fetchNameServerAddr(); }
等待消息返回
case RESPONSE_COMMAND:
构造消息请求头
case SYNC:同步发送
如果本地topicPublishInfoTable是空则从namesrv获取
hasCheckForbiddenHook()
1、this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK() --->需要自己设置为true,默认是false;2、RemotingException:重试3、MQClientException:重试4、MQBrokerException:重试 需要判断this.defaultMQProducer.getRetryResponseCodes().contains(e.getResponseCode())
RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);
消息检查
this.hasSendMessageHook()
responseFuture.putResponse(cmd);
int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
invokeCallback.operationComplete(this);
executeInvokeCallback(responseFuture);
this.responseCommand = responseCommand;this.countDownLatch.countDown();
response==null
生产者内部实现是defaultMQProducerImpl
释放信号量:异步发送通过信号量来控制异步请求的最大数量,保护系统内存
对于remotingClient.invokeAsync调用抛出的异常也会重试
this.start(true);
调用responseFuture成员变量countDownLatch的countDown()
携带发送超时时间
启动netty客户端
处理response
异步发送
final ResponseFuture responseFuture = responseTable.get(opaque);
消息绑定topic
boolean isReply = msgType != null && msgType.equals(MixAll.REPLY_MESSAGE_FLAG);
选择MessageQueue
调用remotingClient同步发送的API
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
调用回调前提responseFuture封装了回调
responseFuture.release();
String brokerName = this.mQClientFactory.getBrokerNameFromMessageQueue(mq); String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(brokerName);
这里的发送模式是ASYNC
事务消息的标志设置?
查找地址使用httpClient
回调对象传给了responseFuture
压缩消息大于4K的消息默认压缩
开启一个线程去发送消息
msg.setTopic(withNamespace(msg.getTopic()));
四个地方能重试
利用栅栏屏障让线程等待
发送消息之前触发的钩子函数可以自定义
NettyClientHandler#channelRead0
1、批量消息不压缩2、this.defaultMQProducer.getCompressMsgBodyOverHowmuch():compressMsgBodyOverHowmuch = 1024 * 4 压缩阈值
入参个数不同异步发送还需要带重试次数
构建responseFuture
是对于response==null(肯定会重试)的情况提供了三种重试
同步请求
发送之前获取topic信息
this.defaultMQProducerImpl.start();
0 条评论
下一页