rocketmq源码(七)——HAService主从同步
2024-09-10 11:30:22 3 举报
在RocketMQ中,主从同步是通过HAService实现的。HAService是RocketMQ的核心组件之一,负责Broker与Broker之间的数据同步。在主从同步过程中,主Broker上的数据会实时复制到从Broker上,以保证数据的一致性和可靠性。HAService通过判断文件类型和修饰语来区分不同的数据文件,如CommitLog、ConsumerQueue等。这些文件的同步是RocketMQ实现高可用和负载均衡的关键。
作者其他创作
大纲/内容
设置slave提交的同步commitlog日志的offsetthis.nextTransferFromWhere = HAConnection.this.slaveRequestOffset;
true
等待消息到达broker被唤醒this.waitForRunning(10);
建立和维护master—slave端的NIOSOCKET连接
阻塞监听OP_WRITE写数据事件this.selector.select(1000);
start()
启动HaService流程
this.messageStore = new DefaultMessageStore
向slave发送需要同步的commitlog日志this.socketChannel.write(this.selectMappedBufferResult.getByteBuffer());
this.processReadEvent();
往SocketChannel写入数据this.writeSocketService = new WriteSocketService(this.socketChannel);
start
HAConnection.this.slaveAckOffset = readOffset;HAConnection.this.slaveRequestOffset = readOffset;
this.acceptSocketService = new AcceptSocketService(defaultMessageStore.getMessageStoreConfig().getHaListenPort());
Thread.run()
this.serverSocketChannel.socket().setReuseAddress(true);
往Master发送同步数据请求this.socketChannel.write(this.reportOffset);
从commitLog日志中获取偏移量日志SelectMappedBufferResult selectResult = HAConnection.this.haService.getDefaultMessageStore().getCommitLogData(this.nextTransferFromWhere);
HAService主从同步
while (!this.isStopped())
绑定监听端口this.serverSocketChannel.socket().bind(this.socketAddressListen);
new GroupTransferService()
boolean result = this.dispatchReadRequest();
this.haService.start();
new ReadSocketService
活跃事件列表Set<SelectionKey> selected = this.selector.selectedKeys();
this.readSocketService.start();
与slave节点建立的channelthis.socketChannel = socketChannel;
this.groupTransferService = new GroupTransferService();
SocketAddress socketAddress = RemotingUtil.string2SocketAddress(addr);
isTimeToReportOffset是否发送同步请求
处理事件
this.haClient = new HAClient();
this.lastWriteOver = this.transferData();
this.connectMaster()连接到Master节点
this.haService = new HAService(this);
beginAccept()
同步间隔大于配置的发送时间,默认是5s(haSendHeartbeatInterval = 1000 * 5)return interval > HAService.this.defaultMessageStore.getMessageStoreConfig() .getHaSendHeartbeatInterval();
this.acceptSocketService.beginAccept();
this.serverSocketChannel = ServerSocketChannel.open();
BrokerController.initialize()
new AcceptSocketService()
设置非阻塞this.serverSocketChannel.configureBlocking(false);
this.selector = RemotingUtil.openSelector();
this.thread.start();
String addr = this.masterAddress.get();
与Master建立连接this.socketChannel = RemotingUtil.connect(socketAddress);
阻塞监听事件this.selector.select(1000);
BrokerController.start()
new
从SocketChannel读取数据this.readSocketService = new ReadSocketService(this.socketChannel);
haService.start()
阻塞监听OP_READ读事件this.selector.select(1000);
conn.start();
new HAClient()
开启异步线程this.acceptSocketService.start();this.thread.start();
阻塞监听上面注册的读事件OP_READthis.selector.select(1000);
AcceptSocketService
ReadSocketService
DefaultMessageStore.start()
HAService
new HAConnection
WriteSocketService
读取Master同步过来的commitLog数据int readSize = this.socketChannel.read(this.byteBufferRead);
HAConnection
this.doWaitTransfer();
加入集合,方便维护HAService.this.addConnection(conn);
设置当前同步commitlog进度this.currentReportedOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();
创建HaService流程
this.reportSlaveMaxOffset(this.currentReportedOffset);
boolean ok = this.processReadEvent();处理读事件
byte[] bodyData = byteBufferRead.array();
new WriteSocketService
this.socketAddressListen = new InetSocketAddress(port);
通知producer数据同步成功或超时req.wakeupCustomer(transferOK ? PutMessageStatus.PUT_OK : PutMessageStatus.FLUSH_SLAVE_TIMEOUT);
获取producer发送的提交数据请求for (CommitLog.GroupCommitRequest req : this.requestsRead)
读取slave发送的同步数据请求到byteBufferReadint readSize = this.socketChannel.read(this.byteBufferRead);
HAClient
this.defaultMessageStore = defaultMessageStore;
等待slave发送拉取commitlog日志请求时唤醒this.notifyTransferObject.waitForRunning(1000);
读取请求偏移量位置long readOffset = this.byteBufferRead.getLong(pos - 8);
GroupTransferService
this.selectMappedBufferResult = selectResult;
new HAService(this)
建立channelSocketChannel sc = ((ServerSocketChannel) k.channel()).accept();
构建请求消息内容:请求拉取位置this.reportOffset.putLong(maxOffset);
唤醒reputMessageService线程,构建comsumqueue和Index文件this.reputMessageService.wakeup();
this.writeSocketService.start();
0 条评论
下一页