DruidCoordinator源码解析
2025-01-24 17:18:30 2 举报
AI智能生成
DruidCoordinator源码解析
作者其他创作
大纲/内容
DruidCoordinatorConfig
在CliCoordinator中绑定,将配置中druid.coordinator开头的配置注入到该类
ZkPathsConfig
在ServerModule中绑定,将配置中druid.zk.paths开头的配置注入到该类
JacksonConfigManager
在JacksonConfigManagerModule中绑定依赖,JacksonConfigManager依赖其他,比如ObjectMapper在JacksonModule中实现了注入的类型,包括Json.class Smile.class,默认为Json.class,并在DefaultObjectMapper中实现了Jackson多态序列化
SegmentsMetadataManager
在SegmentsMetadataManagerProvider中通过Provider方式注入,实际由SqlSegmentsMetadataManagerProvider提供
ServerInventoryView
MetadataRuleManager
Provider<CuratorFramework>
ServiceEmitter
ScheduledExecutorFactory
IndexingServiceClient
LoadQueueTaskMaster
ServiceAnnouncer
DruidNode
CoordinatorCustomDutyGroups
BalancerStrategyFactory
LookupCoordinatorManager
CompactSegments
ZkEnablementConfig
核心代码流程
在生命周期管理的start方法内注册一个listener,监听coordinator选主后的逻辑,成为leader后调用becomeLeader方法,不再是leader时调用stopBeingLeader方法
项目在启动时会注入一个CuratorDruidLeaderSelector,启动zk客户端
coordLeaderSelector.registerListener 注册Listener中 createNewLeaderLatchWithListener(),在LeaderLatchListener中回调 becomeLeader 和stopBeingLeader方法
segmentsMetadataManager.startPollingDatabasePeriodically(); SegmentsMetadataManager将数据同步到Coordinator的内存中,MetadataSegmentView 数据在Broker的内存中
首先判断是否已经在周期性的从数据库中同步数据了,是则返回,否就继续
固定延迟线程执行createPollTaskForStartOrder(localStartOrder, periodicDatabasePoll)
poll() 采用jdbi以流式的形式查询metadata表;生成dataSourcesSnapshot
metadataRuleManager.start();
创建一个默认的元数据规则,创建的默认规则为ForeverLoadRule, 其中加载到_default_tier,副本数为2
然后按照配置进行定期拉取配置: poll()
lookupCoordinatorManager.start();
lookupManagementLoop()
serviceAnnouncer.announce(self);
该方法当前已废弃,通过在CLICoordinator中启动时通过Discovery模块
添加各种的DutyRunnable
makeHistoricalManagementDuties
LogUsedSegments
DEBUG日志开启的前提下,在日志中打印出来所有在used状态下的segment
UpdateCoordinatorStateAndPrepareCluster
prepareCurrentServers()
获取到所有server,过滤出来可以充当广播和副本的server, 转换为ImmutableDruidServer
startPeonsForNewServers(currentServers) 为每一个server启动一个LoadQueuePeon
从LoadQueueTaskMaster中取一个peon,然后启动,固定延迟执行doSegmentManagement()
将segmentsToDrop和segmentsToLoad合并,迭代处理batchSize个数据,加入到newRequests中
调用httpclient.go方法
NettyHttpClient.go
增加callback执行方法
onSuccess
handleResponseStatus(e.getRequest(), e.getStatus());
Load则从segmentsToLoad移除,Drop则从segmentsToDrop移除
onFailure
失败了就打日志
prepareCluster(params, currentServers)
因为currentServers为筛选后的节点,所以返回的DruidCluster中只有historical和bridge
segmentReplicantLookup = SegmentReplicantLookup.make segment的副本lookup
两层遍历,外层循环为按tier遍历historical
内层循环为按historical遍历segment,放入segmentInCluster
内层循环为按queue中的segment遍历,放入loadingSegment
RunRules
获取到cluster,从DatasourcesSnapshot中获取到overshadowedSegment
更新DruidCoordinatorRuntimeParams,加入replicatorThrottler
对所有状态为used的segment运行所有匹配的规则
遍历所有的DataSource,根据数据源的name获取到该数据源的所有的规则(这一步是为了拿到所有broadcast的数据源)
遍历所有的规则,如果是broadcast类型的规则就退出内层循环,将DataSource的name加入到broadcastDatasources中
遍历所有used状态的segment
如果是overshadowed segment则退出本次循环
遍历所有的规则,如果该条规则可以应用于segment上,rule.appliesTo(segment, now)
核心方法是Rules.eligibleForLoad(), 根据Interval或者Period来计算是否满足
rule.run(),里边有两个变量:targetReplicants为当前segment的目标副本数 currentReplicants为当前segment的当前副本数(包括已经load和正在loading之和)
LoadRule: 如果主副本已经存在或者正在load,则分配副分片;否则依次分配
assign()
assignReplicas() -> assignReplicasForTier()
判断是否需要分配;判断是否有可用的server来分配;判断是否满足限流条件;满足条件后开始分配:首先注册限流,然后loadSegment,load执行完成后移除掉注册(这里根据不同的load实现是不同的,http的方式会等待historical将segment load完成后返回,zk的方式只是将segment写到historical的loadQueue中)
assignPrimary() 分配主副本, 分配后分配副分片
assignReplicas() 继续分配其他副本
drop()
正在loading时不进行drop
tier中当前的副本数大于目标副本数,则进行dropForTier, drop分为从activeServers和decomissionServers分别drop
DropRule: coordinator.markSegmentAsUnused(segment); 将segment置为unused状态
UPDATE %s SET used=false WHERE id = :segmentID
BroadcastDistributionRule 获取到所有可以load broadcast类型segment的server
assign()
drop() 从所有server中的loadPeon中drop掉segment
UnloadUnusedSegments
遍历broadcastDataSource,初始化status为true
分别处理historical、broker、realtime中标记为unused的segments
遍历server中的dataSource,首先判断是否为broadcastDataSource,是则退出当前循环
按dataSource的segments遍历,dropSegment
MarkAsUnusedOvershadowedSegments
BalanceSegments,有四种balance策略,diskNormalized cost cachingCost random,默认为cost
获取集群中的historicals,按层tier进行遍历Map<String, NavigableSet<ServerHolder>> (historical节点是有分层tier的概念的,在实际中比如把热数据load到ssd机器,把冷数据load到sata机器)
按层开始进行balance,如果上次balance没有结束,则本次不进行
将该层的historical按照是否活着(decomminssion)分为两组, decommissioningServers和activeServers。 balance的一个功能就是将已经dead的historical上的segment移动到active的historical
计算出来该层所有的historical中一共有多少segment,如果是0则不需要balance
首先处理dead节点的segment,移动到active节点 balanceServers(params, decommissioningServers, activeServers, maxSegmentsToMoveFromDecommissioningNodes);
然后处理所有active节点上的平衡 balanceServers(params, activeServers, activeServers, maxGeneralSegmentsToMove);
balanceServers()
maxSegmentsToMove 最大要移动的segment数量 ; maxIterations 最大迭代次数;maxToLoad 配置的loadingQueue中最大数量
strategy.pickSegmentsToMove 根据配置的balance策略选择将要移动的segment
ReservoirSegmentSampler.getRandomBalancerSegmentHolders
broadcast类型的segment不做balance
遍历所有的segment,先决定移动k个,然后再随机加入若干个
开始循环进行移动选出来的segment, toMoveToWithLoadQueueCapacityAndNotServingSegment 这个表示移动到的目标机器,这个机器必须不是原来移动前的机器,同时它的loadQueue也得有容量
strategy.findNewSegmentHomeBalancer 从上述的目标机器中再根据配置策略找到一个目标机器
chooseBestServer()
线程池执行计算computeCost
空间不够或者正在loading segment的historical不参与分配
当前segment的cost加上即将load的segment的cost减去将被drop的segment的cost
同一个DataSource的要尽量在一起,更可能被同时查询
Interval更接近的要尽量在一起,更可能被同时查询
https://github.com/apache/druid/pull/2972里边有详细的描述
遍历选择的结果,找到得分最大的;如果得分是正无穷,说明没有合适的,不用分配;最后得分相同的话随机取一个
moveSegment(segmentToMoveHolder, destinationHolder.getServer(), params)
一系列校验(segment不能为null,fromServer和toServer不能相同,元数据中能找到segment, LoadQueuePeon需要被创建,目标机器还能放的下)
loadSegment 先判断是否已经在load的队列中,然后执行
成功move++, 不成功unmoved++,循环次数超过maxIterations则停止
EmitClusterStatsAndMetrics
makeIndexingServiceDuties
LogUsedSegments
CoordinatorIndexingServiceDuty
KillUnusedSegments
获取到所有要kill掉unusedsegment的数据源
遍历数据源,满足运行条件时则调用indexingServiceClient的killUnusedSegments方法提交一个kill task
根据前缀、任务类型、数据源、时间间隔等生成任务ID
运行任务
调用/druid/indexer/v1/task提交一个POST请求,然后处理返回结果
KillStalePendingSegments
CompactSegments
makeMetadataStoreManagementDuties
CoordinatorMetadataStoreManagementDuty
KillSupervisors
KillAuditLog
KillRules
KillDatasourceMetadata
KillCompactionConfig
将每一个DutyRunnable丢到定时调度线程池运行
0 条评论
下一页