shareflow
2022-02-10 02:43:49 0 举报
登录查看完整内容
为你推荐
查看更多
抱歉,暂无相关内容
shareflow 源码流程
作者其他创作
大纲/内容
没有订阅者 = 0
丢弃buffer内的数据bufferSize--
SUSPEND
不符合
唤醒
slot.cont 赋值
bufferEndIndex小表示当前index已经是最后一位
发送者
唤醒所有resumes
数据订阅者
replayIndex如果在此时有新的订阅着,那么订阅者的index会设置为这个值
普通发送tryEmit
1
符合情况1.bufferCapacity =0 , 必然会走的逻辑2.bufferCapactity 0,部分订阅着还没处理历史值
nCollectors = 3 //用于表示当前订阅的数量
queued emitters产生于,buffer满了,但是onBufferOverflow = SUSPEND
构造参数replay //重新订阅,重发的数量bufferCapacity //保存数据的buffer 的容量onBufferOverflow //如果buffer 容量不够后的策略
循环
newBufferSize1 = (newBufferEndIndex - head).toInt()内部newBufferEndIndex变化,重新计算
同步方法
maxResumeCount >0
action
DROP_OLDEST
newBufferEndIndex++newReplayIndex++
replayCache ,重发缓存如果replay = 3重发缓存4个数据
index>head
订阅者resumes
nCollectors订阅者数量判断
ShareFlow源码流程详解
返回true
失败返回false
数据保存结构
唤醒队列内部可能包含发送等待队列和订阅者队列
获取最新的值
返回resume内部包含从头到尾需要唤醒的等待发送队列需要接受最新数据的订阅者
尝试将数据放置buffer内部tryEmitLocked
子类参数bufferSize //buffer内数据的量minCollectorIndex //多个订阅着的index,其中最小的indexbufferSize //buffer的数量queueSize //等待队列的数量
index < 0
maxResumeCount = queueSize
持有个oldIndex
查找所有需要等待唤醒的订阅着,放入resume
失败
遍历订阅着的slots,找到符合更新的slot
tryEmitLocked
bufferSize > replay
蓝色表示同步方法
存在订阅者
E
清除等待队列内的一些参数
内部
不需要重发
head也等于,如果有多个订阅着,那么最慢的订阅着的index = head
newMinCollectorIndex <= minCollectorIndex
2
得到新值
newBufferSize0 = (newBufferEndIndex - newMinCollectorIndex).toInt()span style=\"font-size: inherit;\
订阅着数量查看nCollectors == 0
slot.index +1
index == head
resume 空值
resume为空
通过index获取buffer内对应的值newValue
更新buffer列表的各类数据状态
queueSize == 0
5
bufferCapacity == 0 && queueSize > 0
查询所有可以发送数据的订阅者resumes
数据入buffer 值是 emitterqueueSize++
协程唤醒
清空入队列的数据
replaySize >replay
循环所有订阅者的index,和newMinCollectorIndex 比较拿到最小的minIndexnewMinCollectorIndex=minIndex
大于
数据入bufferbufferSize++
get(index || (size - 1))通过与运算,取值在一定的范围内循环
等待新的值等待唤醒
返回值 true
bufferSize >= bufferCapacity minCollectorIndex <= replayIndex
index<0
是否是取消唤醒
普通发送发送结果
订阅者resmume
newBufferEndIndex = bufferEndIndex
如果取值是Emitter,也就是队列内的参数则取其内部的value
协程发送者
挂起方法内部实现
onBufferOverflow类型判断
newMinCollectorIndex = newBufferEndIndex因为没有订阅者所以,表示内部订阅者的最小值就是bufferEndIndex
slot1
紫色表示协程唤醒关联
4
NO_VALUE
oldIndex > minCollectorIndex
表示存在等待队列,并且也有空闲的buffer
开始
newMinCollectorIndex ++
3
获取最新值
emit发送调用对外方法
表示没有存在等待队列的情况
挂起
开始方法
橙色表示数据订阅者和发送者
没有订阅者
符合说明存在等待队列有取消的情况
collect
符合
bufferCapacity > 0
成功
函数挂起,return false
tryEmitLocked 同步方法
是否需要唤醒的等待队列
将发送值,入等待队列,并挂起
结束
slot3
indexhead最早处理完的协程,等比较慢的协程和细节2相似返回-1
head + bufferSizebufferEndIndex
通过slot的引用协程,唤醒订阅着的协程
slots :
构建Emitter内部包含自己的位置index = head + totalSize等待发送的值 value协程上下文 cont 用于唤醒
不大于
订阅者数量nCollectors>0
DROP_LASTEST
返回空的resumes
后续只在bufferCapacity =0 的情况处理
返回拿到的newValue
需要重发
newQueueEndIndex = newBufferEndIndex + queueSize
临时head = headnewMinCollectorIndex = head + bufferSize
index < bufferEndIndex
等待队列数量 >0有等待数据返回index
head + totalSize
是否要重发replay ==0
符合说明这个订阅者持有的index是所有订阅者里面最小的
唤醒阻塞的发送等待队列,就是唤醒自己
bufferCapacity 0造成特性见文档 细节2返回-1
等待队列数量 = 0没有等待数据返回-1
符合情况发生在,多个订阅者,但是当前订阅者不是接受数据最慢的
value
获取最新的值 同步方法
buffered valuesbuffersize = 6
丢弃值,返回true
协程调用emit
等待唤醒方法 同步方法
通过allocateSlot 分配一个slot
slot获取index
SharedFlowSlotindex //用于表示当前slot 的indexcont // slot 保留协程的引用
父类参数nCollectors //订阅者的数量slots //保存所有的订阅着
slot2
resumes.isNotEmpty()
拿到一个需要唤醒的队列resumes
否
绿色表示内部方法的输入输出
说明结束订阅的情况
根据空闲buffer数量将等待队列里面的数据从头到尾放置buffer内,并且,将放置buffer内数据的协程上下文放置于 resume 内部,放置一个值内部newBufferEndIndex++
不符合oldIndex = minCollectorIndex
bufferCapacity == 0newReplayIndex < newQueueEndIndex buffer!!.getBufferAt(newReplayIndex) == NO_VALUE
返回false
临时数据赋值newReplayIndexnewMinCollectorIndexnewBufferEndIndexnewQueueEndIndex
index小表示还能取到buffer内的数据返回index
resumes = EMPTY_RESUMESresumes 是个空的数组
value = ?
持有一个oldIndex
不符合
bufferCapacity == 0
不符合EMPTY_RESUMES
红色表示api调用方法
订阅者列表
6
订阅者唤醒
更新控制器内部的所有index的值
获取当前slot内部的index
0 条评论
回复 删除
下一页