rocketMQ-5-通信模块
2023-08-03 17:36:09 0 举报
为你推荐
查看更多
rocketMQ的异步通信模型,核心步骤详解,架构师必看!
作者其他创作
大纲/内容
EventLoopGroup eventLoopGroupBoss boss线程池,num=1
同步请求
NettyRemotingServer.start()远程通信服务端启动
发送消息步骤
提交给线程池
同步执行invokeSync
执行处理器
组装RemotingCommand初始化callback
NettyRemotingServer远程通信服务端
RemotingCommand(code=10)
DefaultEventExecutorGroup defaultEventExecutorGroupworker线程池,num=8
processSendResponse()处理发送返回
nameserver启动时
PullAPIWrapper.pullKernelImpl()核心拉取消息
判断请求是否为单向(oneway)
procuder生产者
CHECK_TRANSACTION_STATE 校验事务状态NOTIFY_CONSUMER_IDS_CHANGED 通知消费者ID变更,再平衡RESET_CONSUMER_CLIENT_OFFSET 重置消费者客户端偏移量GET_CONSUMER_STATUS_FROM_CLIENT 获取消费者状态GET_CONSUMER_RUNNING_INFO 获取消费者运行时信息CONSUME_MESSAGE_DIRECTLY 立即消费消息
将opaque存入ResponseFutrue中
RemotingCommand(code=11)
RocketMQ Remoting 异步通信流程
DefaultMQProducer生产者-发送消息
consumer消费者
NettyServerHandler通信服务端处理器
remotingClient.font color=\"ff0000\
EventLoopGroup eventLoopGroupSelector Selector线程池,num=3
失败回调invokeCallback.operationComplete()
根据broker Addr创建通信的channel
回调InvokeCallback.operationComplete(ResponseFuture responseFuture)
1.初始化处理器prepareSharableHandlers()
start()启动
font color=\"ff0000\
MQClientAPIImpl MQ客户端API实现类
PullMessageService拉消息服务pullMessage(pullRequest)定时拉取消息
processPullResponse()处理拉取消息返回
回调InvokeCallback.operationComplete(ResponseFuture responseFuture)->
pullMessage()构造RemotingCommand(code=11)->pullMessageAsync()异步发送
根据业务编码获取处理器
NettyEncoder 加密处理器
processorTable处理器HashMap表
命令模式
数据写入
NettyServerHandler.channelRead0
2.构造ServerBootstrap,初始化channel
各种业务处理器Handler
sendMessage()构造RemotingCommand(code=10)->sendMessageAsync()异步发送
读取Netty收到的消息并处理
broker
channel.writeAndFlush(request).addListener()请求写入channel,设置监听器
processRequest()
Channel通道类:NioServerSocketChannel
处理器执行后得到response结果
NettyRemotingClient远程通信客户端
EventLoopGroup执行线程组(boss+Selector)
在真正执行业务逻辑之前需要进行SSL验证、编解码、空闲检查、网络连接管理
异步请求
调用netty-channel的writeAndFlush()发送消息
NettyRemotingServer通信服务端
获取信号量
执行callback方法
NettyRemotingClient.font color=\"ff0000\
processMessageReceived()处理接收的消息
Semaphore.tryAcquire()同步获取信号量
ExecutorService publicExecutor公共线程池(关闭+回调),num=CPU核心数
支持的命令code#SendMessageProcessor发送消息处理器SEND_MESSAGE 10SEND_MESSAGE_V2SEND_BATCH_MESSAGECONSUMER_SEND_MSG_BACK#PullMessageProcessor拉取消息处理器PULL_MESSAGE 11#ReplyMessageProcessor重试消息处理器SEND_REPLY_MESSAGE SEND_REPLY_MESSAGE_V2#QueryMessageProcessor查询消息处理器QUERY_MESSAGEVIEW_MESSAGE_BY_ID#ClientManageProcessor 客户端管理处理器HEART_BEATUNREGISTER_CLIENTCHECK_CLIENT_CONFIG#ConsumerManageProcessor消费者管理处理器GET_CONSUMER_LIST_BY_GROUPUPDATE_CONSUMER_OFFSETQUERY_CONSUMER_OFFSET#EndTransactionProcessor完结事务处理器END_TRANSACTION
NettyRemotingAbstract.invokeAsyncImpl()
消费消息步骤
根据opaque获取ResponseFutrue
监听器ChannelFutureListener
异步执行invokeAsync
更新nameServer地址列表updateNameServerAddressList
NettyRemotingClient
1.启动时注册7种处理器
NettyClientHandler
Thread
获取nameServer地址列表getNameServerAddressList
读取netty收到的消息并处理
设置opaque到response中并发送到client
NettyConnectManageHandler连接管理处理器
MQClientInstance启动(生产者/消费者)
发送消息
构造结果返回:ResponseFuture
拉取消息
NettyRemotingServer
单向执行invokeOneway
依赖Syn实现(FairSync)Sync extends AbstractQueuedSynchronizer底层就是个AQS,默认65535个并发, 公平
根据broker addr构造channel
pullMessage(pullRequest)
一个 Reactor 主线程,负责监听 TCP网络连接请求,建立好连接,创建SocketChannel,并注册到selector上。
远程通信客户端
broker启动时
NettyServerHandler
defaultRequestProcessor默认请求处理器
DefaultMQPushConsumerImpl
asyncProcessRequest()
defaultMQProducerImpl.sendDefaultImpl()->sendKernelImpl()核心发送方法
HandshakeHandler 握手处理器
doBeforeRpcHooks请求前置钩子
RemotingClient远程客户端接口
0 条评论
回复 删除
下一页