RocketMQ存储机制
2022-12-27 23:35:15 4 举报
RocketMQ存储机制架构与源码
作者其他创作
大纲/内容
BrokerControoler.iniitialize()
两个线程FlushCommitLogService extends ServiceThread
waitPoint.await
构造方法
// 不断交替 写入列表 requestsWrite 和 读取列表 requestsRead // 类似于生产者和消费者 ,但是生产和消费队列分开,不断交替
获取消费分区初始值---topic+queueId CommitLog的Offset----fileFromOffset+ByteBuffer.position.....多个属性写入ByteBuffer
二级缓存[MappedFile.writeBuffer]isTransientStorePoolEnable
同步刷盘线程GroupCommitService
doCommit
同步刷盘
1-获取当前内存对象的写入位置(wrotePostion变量值),若写入位置没有超过文件大小则继续顺序写入2-由内存对象MappedByteBuffer创建一个指向同一块内存的ByteBuffer对象,并将内存对象的写入指针指向写入位置3-以font color=\"#d32f2f\
与运算<<右移。
this.onWaitEnd
获取groupCommitService
自定义CountDownnLatch
MPSC多生产者单消费者
通过MappedFileQueue获取MappedFile
锁
增加的重置latch的特性
GroupTransferService 继承于ServiceThread
可重入锁自旋锁
数据写入流程
run
public final boolean releaseShared(int arg) {//共享锁 if (tryReleaseShared(arg)) { doReleaseShared(); //唤醒所有等待的线程 return true; } return false; }
private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); }
同步复制的场景
异步刷盘
是否需要睡眠
计数器值不能重置
释放锁
消息存储CommitLog.putMessage(..)
刷盘线程
JUC的CountDownLatch
1-找到模板方法2-找到子类钩子方法的实现
有啥妙的?
DefaultAppendMessageCallback.doAppend(...)
子类钩子实现
自定义了自旋锁或重入锁
wakeup()
异步刷盘线程FlushRealTimeService
对锁进行抢占, public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); }
public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); }
MappedFile.appendFile()
共享锁的释放
DefaultmessageStore.的构造方法调用
上锁
入口
创建一个映射文件队列-new MappedFileQueue(..)创建刷盘线程-new GroupCommitService/FlushRealTimeService()创建可重入锁/自旋锁
requestRequest记录了下一条消息要写入的位置,当前消息的尾巴? nextOffsetflushWhere=表示该位置的消息已经刷入磁盘
GroupCommitService --读写分离模式--onwatiEnd钩子回时,进行读队列与写队列的交换。
同步器-new CountDownLactch(2)==this.sync=new Sync(2)==setState(2)队列同步器,抢占不成功,就会等待=>doAcquireSharedInterruptibly(x);
二级缓存提交线程
构造GroupCommitRequest(result.getWroteOffset+result+getWroteBytes())//nextOffset到达CommitLog的最后面了。service.putRequest(req); //背后做了什么?boolean flushOk= request.waitForFlush(timeOut);//等待闭锁
waitForRunning()
CountDownLatch2
HAService调用GroupTransferService.putRequest(req)
根据需要设置延迟级别,topic系统标志
核心成员变量与方法
钩子实现
业务线程等待-waitFor?
文件队列
indexFile
主从复制handleHA
public void countDown() { sync.releaseShared(1); }
因为ServiceThread是单消费者的模式。重置的时候,等待队列里面没有等待线程在等待了
是否中断?
模板方法--定义在AQS中
waitPoint.reset
waitForRunning
CommitLog
ConsumeQueue
获取或创建最后一个映射文件MapedFileQueue.getLastMapedFile()
protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1;}即判断getState()==0? 如果为0,则抢占成功,不为0则失败,排队。
可以执行定时任务+紧急任务周期性的任务执行-waitForRunning(interval)打开闭锁,使得具有执行紧急任务的特性-wakeUp()等待的时候,不会去耗费CPU的时间片?
0x1 <<1 0x1 <<20x1 <<4
this.swapRequests
有waitStoreMsgOK的标志同步消息会带有该属性
刷盘失败-则?
同步器里面的模板方法
数据刷盘handleDiskFlush
核心成员变量
ServiceThread线程模式
HAService.putRequest(new GroupCommitRequest(nextOffset))request.waitForFlush(timeOut)
收藏
收藏
0 条评论
下一页