rocketmq源码(五)——生产者发送消息
2024-09-10 11:27:28 2 举报
在rocketmq源码中,生产者发送消息的过程可以分为以下几个关键步骤: 1. 构建消息:生产者首先需要创建一个Message实例,设置消息的主题(Topic)、标签(Tag)、键值对(Key-Value Pairs)等属性。此外,还需要设置消息的身体(Body),可以是字符串、字节数组等类型。 2. 发送消息:接下来,生产者需要调用发送消息的方法,将构建好的消息传递给rocketmq。在这个方法中,生产者需要设置一些参数,例如发送模式(同步或异步)、消息投递策略等。 3. 消息序列化:rocketmq将消息进行序列化,以便在网络上传输。序列化的方式可以是JSON、Protobuf等。 4. 消息存储:rocketmq将序列化后的消息存储在broker中。存储方式可以是文件存储、内存存储等。 5. 消息投递:rocketmq将消息投递给消费者。投递方式可以是推(Push)或拉(Pull)模式。 6. 消息确认:消费者接收到消息后,需要向rocketmq发送确认消息,表示已经成功接收到消息。如果消费者没有发送确认消息,rocketmq会在一定时间后重新投递消息。 以上就是rocketmq源码中生产者发送消息的主要过程。在这个过程中,涉及到了很多关键的技术点,例如消息序列化、存储、投递等,这些都是rocketmq高性能、高可靠的关键所在。
作者其他创作
大纲/内容
defaultMQProducer.send(Message msg)
发送消息线程提交任务后就可以直接返回了
it = this.responseTable.entrySet().iterator();
消息的键:键是消息的业务标识,可以用于唯一标识一条消息。在需要根据业务键进行消息查询或重试时,键非常有用。String keys
waitResponse等待服务端响应RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);
异步调用InvokeCallback的回调方法responseFuture.executeInvokeCallback();
根据地址拿到或者创建Channel
ASYNC
new Message() 参数
发送消息sendKernelImpl
Oneway
消息的主题:生产者将消息发送到特定的主题,消费者订阅该主题以接收消息。String topic;
RemotingCommand response = responseFuture.getResponseCommand();
回调正常方法sendCallback.onSuccess(sendResult);
ASYNC异步
true
Message消息结构
消息的标志:一个整数,用于自定义消息的某些特性或状态。例如,可以用于标记消息的优先级或其他特殊属性。int flag;
sendMessageSync
同步发送必须得到服务端响应assert response != null;
defaultMQProducer.sendOneway(message)
sendMessageAsync
false
NettyRemotingClient客服端在启动时,会开启定时任务扫描responseTableNettyRemotingClient.this.scanResponseTable();
组装RemotingCommand消息结构,消息的code为SEND_MESSAGE
defaultMQProducer生产者发送消息
根据topic名称,查找topic具体信息,如:队列配置、读写权限、broker信息等数据 this.tryToFindTopicPublishInfo(msg.getTopic());
发送成功,设置responseFuture的状态为发送成功responseFuture.setSendRequestOK(true);
创建SendMessageRequestHeader请求
发送消息sendMessage
SYNC
f.isSuccess()
消息体:消息体是消息的实际内容,可以是任何二进制数据。生产者发送的实际数据存储在消息体中。byte[] body;
回调异常处理方法sendCallback.onException(e);
设置responseFuture的状态为发送失败responseFuture.setSendRequestOK(false);responseFuture.putResponse(null);
创建响应的异步处理器ResponseFuture
invokeCallback.operationComplete(this);
response != null
拿到异步任务线程池ExecutorService executor = this.getAsyncSenderExecutor();
发送请求,并接受服务端响应channel.writeAndFlush(request)
提交异步发送消息任务executor.submit
执行异步run
executeInvokeCallback(rf);
消息的标签:标签是主题下的二级分类,可以用于更细粒度的消息过滤。消费者可以根据标签来选择性地接收消息。String tags
executeInvokeCallback(responseFuture);
是否等待消息存储完成: 这个布尔值表示生产者在发送消息后是否等待消息成功存储到Brokerboolean waitStoreMsgOK
通过客服端发送RemotingCommand
拿到broker地址
根据不同的发送模式发送数据communicationMode
标识消息的事务IDtransactionId
发送请求channel.writeAndFlush(request)
ASYNC同步
请求完成后执行监听方法operationComplete
发送失败requestFail(opaque);
使用channel发送客服端消息
rep.release();it.remove();rfList.add(rep);
0 条评论
下一页