FileCoin资源调度
2023-06-16 18:02:14 0 举报
AI智能生成
FileCoin资源调度
作者其他创作
大纲/内容
Manager资源
store
存储系统
localWorker
处理本地服务
scheduler
调度调度器
remoteWorker
支持远程服务处理
Manager事务
Manager负责给每一个存储模块sectorIndex 这标志了数据存储的扇区编号
manager通过访问JsonRPC接口来实现对remote worker的管理
使用manager的fetchHandle API接口来实现文件的传输
specs-storage.Prover/Sealer/Storage实现sector的证明、封装以及存储
远程的remote Worker的资源调度也由本地的scheduler控制
每个连接到Manager的Worker会和Manager同步它的内存/CPU以及显存的信息
Scheduler在接受到新的请求时,会针对请求(Task)的类型以及资源的需求,从当前Worker中挑选最合适的Worker进行请求的处理
sector-storage
Worker
Sechduler
Manager
schedule调度模块
resource.go
此模块描述了运行时资源分配情况以及性能参数。
cbor_gen.go
做各种角色的JSON包和cbor之间的转换 。编码和解码的集合,对象分别是 Call、WorkStae、 WorkID
sched.go
进行资源调度和分配的核心模块 是scheduler调度器方法的主要定义部分
sched_resource.go
分配器-计算机资源方面进行查询判断操作的包
sche_worker.go
woker分配数据结构。统揽。定义了schedWorker以及此结构体的一众方法 , Woker资源调度的具体实现
scheduler worker manager
resource.go
重要函数方法:
func (r Resources) Thr\ads(wcpus uint64) uint64
线程资源分配函数
var ResourceTable = map[sealtasks.TaskType]map[abi.RegisteredSealProof]Resources
资源情况映射表
Percent of threads to allocate to parallel tasks
描述了运行时资源分配情况以及性能参数
cbor_gen.go
重要方法函数:
func (t *Call) MarshalCBOR(w io.Writer) error
将调用信息的JSON数据转换成cbor简明二进制展现
func (t *Call) UnmarshalCBOR(r io.Reader) error
将cbor数据转换成数据结构打包的JSON数据
func (t *WorkState) MarshalCBOR(w io.Writer) error
func (t *WorkState) UnmarshalCBOR(r io.Reader) error
func (t *WorkID) MarshalCBOR(w io.Writer) error
func (t *WorkID) UnmarshalCBOR(r io.Reader) error
做各种角色的JSON包和cbor之间的转换 。编码和解码的集合,对象分别是 Call、WorkStae、 WorkID
sched.go
进行资源调度和分配的核心模块 是scheduler调度器方法的主要定义部分
schedWindow
用于描述调度窗口,存放了worker的请求数组
SchedPriorityKey
优先级类型
context.Context
用context表示JSON数据格式类型
WorkerAction
表示一个匿名函数
scheduler
调度器,用于分配计算机资源给不同的任务
activeResources
计算机目前的活跃资源
schedWindow
资源分配窗口 显示当前计算机的活跃资源以及矿工提出的请求的序列
schedWindowRequest
矿工请求处理窗口
workerRequest
矿工具体的请求的数据结构
workerResponse
错误响应信息的数据结构
SchedDiagRequestInfo
分配诊断请求信息结构体。包含了扇区ID 、封装任务类型以及该调度信息的优先级
SchedDiagInfo
资源诊断信息结构体,包含了请求信息的集合以及打开的窗口的集合
getPriority
从context中获取任务的优先级
WithPriority
给任务赋上优先级
func (r *workerRequest) respond(err error)1
处理worker的错误信息 打包成error类并返会
func newScheduler() *scheduler
创建一个新的调度器 返回该调度器的指针
func (sh *scheduler) Schedule
调度器的成员函数 ,开始进入监听状态等待矿工的请求
func (sh *scheduler) runSched()
进行资源分配的起始函数,分配器的成员函数,监听分配器中各个通道的变化并作出相对应的反应。
func (sh *scheduler) trySched()
尝试进行资源分配的核心过程函数。
func (sh *scheduler) schedClose()
关闭调度处理的函数,遍历矿工列表,清理为他们分配的资源,释放矿工所占用的窗口
func (sh *scheduler) Info(ctx context.Context) (interface{}, error)
获取调度器信息的函数
func (sh *scheduler) Close(ctx context.Context) error
关闭调度器函数 关闭失败则返回报错信息
workerHandle
这个结构体根据目前的资源情况去调度分配 对sector、Windows、activeResource进行操作
sched_resource.go
分配器-计算机资源方面进行查询判断操作的包
func (a *activeResources) withResources
对worker分配资源的入口函数 并把它所需要的资源加入activeResource活动资源的结构体中
func (a *activeResources) add(wr storiface.WorkerResources, r Resources)
把worker所需要的资源加入到activeResource中的具体实现
func (a *activeResources) free(wr storiface.WorkerResources, r Resources)
释放woker所占的资源 并删除冲activeResource中删除对应的资源信息
func (a *activeResources) canHandleRequest(needRes Resources, wid WorkerID, caller string, res storiface.WorkerResources) bool
查询目前的activeResource是否足够该woker所请求的资源 返回布尔值 true表示成功 false表示不能分配
func (a *activeResources) utilization(wr storiface.WorkerResources) float64
查看目前活跃资源当中CPU占用率 最小内存空间和最大内存空间占用率 返回三个当中一个最大的,活动资源的利用率
func (wh *workerHandle) utilization() float64
返回woker占用的所有活动资源的占用率之和
sched_worker.go
woker分配数据结构。统揽。定义了schedWorker以及此结构体的一众方法 , Woker资源调度的具体实现
schedWorker
用于对Worker进行资源分配的结构体,schedWorker结构 包含了一个资源调度器 wokerer信息查询 workerID 分配窗口的通道 请求窗口数量
workHandle
schedWorker通过workHandle来对worker进行管理 资源分配。workerHandle处理结构体:
scheduler
这是核心的调度器数据结构的定义, 调度器的workers大映射表是从WorkerID到workerHandle解决方案
func (sh *scheduler) runWorker(ctx context.Context, w Worker) error
context类型的参数用于读取其其中的信息以对后续的worker调度进行设置
func (sw *schedWorker) handleWorker()
处理矿工需求的实现函数
读取context中的内容开始。开始进行对worker请求的处理。如果worker调度器没有更多的窗口来应对请求,则结束。等待更多的窗口到来或者是等待阻塞中的任务结束。如果有新窗口的到来,则退出循环向窗口发送请求信息;如果没有新窗口的到来则等待更多的任务被主调度器分配或者等待worker结束一项任务并空闲下来。 空闲之后的worker要对其分配资源,绑定worker和窗口以及分配进程任务给窗口
func (sh *scheduler) workerCleanup(wid WorkerID, w *workerHandle)
worker清空函数,关闭worker的manager管理器以及worker,用内置close()函数关闭manager。通过删除调度器中打开窗口中的wokerID窗口,从而删除worker。
func (sw *schedWorker) disable(ctx context.Context) error
使得context所表示的内容无效化。先在主调度器的线程中等待清理程序,再等待清理程序完成以使其无效化,最终清空活动窗口,使得请求窗口清0
func (sw *schedWorker) checkSession(ctx context.Context) bool
对session进行检查
func (sw *schedWorker) requestWindows() bool
检查所有的请求窗口
func (sw *schedWorker) waitForUpdates() (update bool, sched bool, ok bool)
等待窗口的更新或者是worker任务的完成
func (sw *schedWorker) workerCompactWindows()
把老窗口中的任务挪动到新窗口当中,将worker和窗口绑定在一起
func (sw *schedWorker) processAssignedWindows()
将任务分配到目前所有的活动窗口中
func (sw *schedWorker) startProcessingTask(taskDone chan struct{}, req *workerRequest) error
开始执行一个任务的入口函数
Manager.go
模块文件:Manager.go\Manager_calltracker.go
Manager 结构体
包含manger的各种信息,结构体中包含了本地存储的接口、远程存储信息、本地存储类、Scheduler调度器以及storiface.CallID到WorkerID和chan result的映射等。
result结构体
存储结果信息,有两个参数:interface{}结构体和Error出错信息类。
sealerConfig结构体
参数中含有能并行获取的数量限制以及一系列布尔型的变量,用于配置。 布尔型变量中包含了:AllowCommit 允许提交/AllowAddPiece 允许增加piece/AllowPreCommit1 允许预提交1/AllowPreCommit2 允许预提交2/AllowUnseal 允许解封扇区。
func New(...) (*Manager, error)
New这个函数负责创建一个新的manager,执行过程大致为:
1.首先新建一个本地的存储点,如果创建失败则跳出函数返回错误信息
2.随后新建一个provider,如果创建失败则跳出函数返回错误信息
3.新建一个线上(远距离)的存储点
4.将新的manager信息放入参数m中,设置m的基本参数并运行
5.新建任务,并且设置任务的基本参数 也就是sealerConfig中的参数
6.增加工人,如果增加失败返回错误信息
7.新建manager完成
1.首先新建一个本地的存储点,如果创建失败则跳出函数返回错误信息
2.随后新建一个provider,如果创建失败则跳出函数返回错误信息
3.新建一个线上(远距离)的存储点
4.将新的manager信息放入参数m中,设置m的基本参数并运行
5.新建任务,并且设置任务的基本参数 也就是sealerConfig中的参数
6.增加工人,如果增加失败返回错误信息
7.新建manager完成
addLocalStorage
这函数功能是增加当地存储空间,会报三种错误1.增加储存空间错误2.已经存在该存储空间错误3.存储空间配置错误
AddWorker
增加worker,调用sched文件中的runWork函数。开始进行worker请求处理,如果在其他线程中存在解决方案则直接返回,否则开始处理(这个地方的请求是增加Worker 并且传递过去了Worker类型的参数
ServerHTTP
提供远程的HTTP服务
schedNop
空计划指令,什么都不进行操作
schedFetch
获取计划,经理通过等待worker获得计划
readPiece
从指定扇区中取出一部分数据,通过调用函数tryReadUnsealedPiece()来尝试读取扇区中已经被解封的数据
tryReadUnsealedPiece
尝试读取没封装的数据
AddPiece
在sector中新建文件存储片区,执行步骤:1.判锁2.查看是否有存在的片区,如果有则分配,如果没有则新建3.录入片区的一系列信息并返回
SealPreCommit1 SealPreCommit2 SealCommit1 SealCommit2
执行上述这一系列操作
FinalizeSector
这个模块就是将需要存储的信息传递给manager并且释放worker中扇区和缓存中无用的信息。新建选择其,并调用manager中的scheduler对象的方法,为worker分配资源。
ReleaseUnsealed
释放没有被封装的(函数里也没有具体的实现)只有一句提示“即将执行释放操作”
Remove
移除封装或者未封装的扇区,或者移除缓冲区
一系列Return函数
代码复用,返回是否执行成功的结果
FinalizeSector
FinalizeSector
深入理解存储与管理
相关模块
localworker
remoteworker
Manager
Sehcduler
Store
README图解
模块1 schedule调度模块
resource.go模块
cbor_gen.go模块
资源分配路径: runSchend() -> trySched() [schedWindow] runSched()工作过程: 进行资源分配的起始函数,分配器的成员函数,监听分配器中各个通道的变化并作出相对应的反应。 第一阶段进入select监听状态,如果有请求加入则把请求加入请求队列中,如果有请求窗口进入则把窗口分配到已打开窗口的数组当中。当调度器关闭的时候则退出第一状态。 第二状态是在在初始化和调度器开始进行分配动作[比如workerChange workerDisable等情况发生]之后开始,同样是进入循环监听状态如果此时scheduler没有任何改变则退出第二监听状态。 之后对数据进行处理,对请求窗口等信息保存的对应的容器中。然后调用trySched()函数正式进行资源分配
trySched()工作过程: 尝试进行资源分配的核心过程函数。 任务分配基于优先级、Worker工人资源可用性、特定任务Worker的性能、窗口请求年龄
为每个调度队列中的任务找到能处理他们的可容纳窗口 1.1 创建解决任务的窗口容量的列表 acceptableWindws slice 1.2 根据任务选择器的性能为窗口排序
再次遍历调度队列 把任务分配给第一个有可用资源的合适的窗口
把被调度后的窗口提交给Worker
资源调度关闭路径: schedClose() -> workerCleanup() scheduler调度器管理workers,schedClose()函数遍历所有的worker对每个worker执行对象的方法workerCleanup()来清空worker占用的资源并将其从数组中删除实现关闭。
activeSource 资源分配的实现路径: withResources() -> [add free等接口函数](获得简单信息的函数)
trySched()工作过程: 尝试进行资源分配的核心过程函数。 任务分配基于优先级、Worker工人资源可用性、特定任务Worker的性能、窗口请求年龄
为每个调度队列中的任务找到能处理他们的可容纳窗口 1.1 创建解决任务的窗口容量的列表 acceptableWindws slice 1.2 根据任务选择器的性能为窗口排序
再次遍历调度队列 把任务分配给第一个有可用资源的合适的窗口
把被调度后的窗口提交给Worker
资源调度关闭路径: schedClose() -> workerCleanup() scheduler调度器管理workers,schedClose()函数遍历所有的worker对每个worker执行对象的方法workerCleanup()来清空worker占用的资源并将其从数组中删除实现关闭。
activeSource 资源分配的实现路径: withResources() -> [add free等接口函数](获得简单信息的函数)
sched.go模块
sched_resource.go模块
模块的主要功能:资源分配与调度 模块文件:resource.go/cbor_gen.go/sched.go/sched_resource.go/sche_worker.go
模块2 Manager模块
Manager.go
func (wh *workerHandle) utilization() ->func (a *activeResources) utilization workHandle结构体调用活跃资源的方法来获得当前资源的占用情况
worker资源调度路径:
func (sh *scheduler) runWorker() -> func (sw *schedWorker) go sw.handleWorker() 工作过程:->func (sw *schedWorker) workerCompactWindows() ->func (sw *schedWorker) processAssignedWindows()
runWorker()该函数运行一个Worker处理其的请求,一个worker对应一个handleWorker的结构体,如果已经存在结构体 也就是在其他线程中解决此worker的情况 直接返回map[workerID]*handleWorker中的handleWorker的结构体指针,如果不存在,则需要配置调度矿工的信息,执行处理矿工的函数。 在存储好schedWorker worker资源调度清单之后 执行go sw.handleWorker(),开启一个goroutinue在后台并行执行。 handleWorker是一个专门处理worker调度的方法,他是处理矿工需求的实现函数。 读取context中的内容开始。开始进行对worker请求的处理。如果worker调度器没有更多的窗口来应对请求,则结束。等待更多的窗口到来或者是等待阻塞中的任务结束。如果有新窗口的到来,则退出循环向窗口发送请求信息;如果没有新窗口的到来则等待更多的任务被主调度器分配或者等待worker结束一项任务并空闲下来。空闲之后的worker要对其分配资源,绑定worker和窗口以及分配进程任务给窗口
worker_tracked worker跟踪部分路径:
SealPreCommit1() SealPreCommit2() SealCommit1()...等函数调用track方法 ->func (wt *workTracker) track -> 匿名函数
worker资源调度路径:
func (sh *scheduler) runWorker() -> func (sw *schedWorker) go sw.handleWorker() 工作过程:->func (sw *schedWorker) workerCompactWindows() ->func (sw *schedWorker) processAssignedWindows()
runWorker()该函数运行一个Worker处理其的请求,一个worker对应一个handleWorker的结构体,如果已经存在结构体 也就是在其他线程中解决此worker的情况 直接返回map[workerID]*handleWorker中的handleWorker的结构体指针,如果不存在,则需要配置调度矿工的信息,执行处理矿工的函数。 在存储好schedWorker worker资源调度清单之后 执行go sw.handleWorker(),开启一个goroutinue在后台并行执行。 handleWorker是一个专门处理worker调度的方法,他是处理矿工需求的实现函数。 读取context中的内容开始。开始进行对worker请求的处理。如果worker调度器没有更多的窗口来应对请求,则结束。等待更多的窗口到来或者是等待阻塞中的任务结束。如果有新窗口的到来,则退出循环向窗口发送请求信息;如果没有新窗口的到来则等待更多的任务被主调度器分配或者等待worker结束一项任务并空闲下来。空闲之后的worker要对其分配资源,绑定worker和窗口以及分配进程任务给窗口
worker_tracked worker跟踪部分路径:
SealPreCommit1() SealPreCommit2() SealCommit1()...等函数调用track方法 ->func (wt *workTracker) track -> 匿名函数
manager_calltracker.go
模块文件:Manager.go\Manager_calltracker.go
模块3 Worker工作者模块
worker_calltracker.go
worker_local.go
模块文件:worker_ calltracker.go worker_local.go worker_tracked.go
模块4 Seletor 选择器模块
selector_alloc.go
selector_existing.go
selector模块没有特别的调度线路,仅仅提供了三种选择器的函数方法以供调用。
selector_task.
模块关键包含:selector_alloc.go selector_existing.go selector_task.go
杂项 零散的模块
Sector-Storage Scheduler模块层次包含情况
0 条评论
下一页