kubernetes 的 client-go 源码解析
2022-08-03 09:18:57 0 举报
AI智能生成
kubernetes 的 client-go 源码解析,关注我,后续将有更多的源码解析
作者其他创作
大纲/内容
说明
主要分析 informer 的机制
例子
拿 deployment_controller_manager 来解析
源码位置 pkg/controller/deployment/deployment_controller_test.go:269
分析
f := newFixture(t)
d := newDeployment()
metav1.TypeMeta{APIVersion: "apps/v1", Kind: "Deployment"}
Image: "foo/bar"
期望的测试结果
f.expectCreateRSAction(rs)
f.expectUpdateDeploymentStatusAction(d)
f.expectUpdateDeploymentStatusAction(d)
f.run(testutil.GetKey(d, t))
c, informers, err := f.newController()
f.client = fake.NewSimpleClientset(f.objects...)
informers.NewSharedInformerFactory(f.client, controller.NoResyncPeriodFunc())
defaultResync = 0
NewDeploymentController
入参
informers.Apps().V1().Deployments()
informers.Apps().V1().ReplicaSets()
informers.Core().V1().Pods()
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: client.CoreV1().Events("")})
DeploymentController{}
client
eventRecorder
queue
dc.rsControl = controller.RealRSControl()
dInformer.Informer().AddEventHandler()
dInformer.Informer()
f.factory.InformerFor()
informer, exists := f.informers[informerType]
resyncPeriod, exists := f.customResync[informerType]
informer = newFunc(f.client, resyncPeriod)
f.informers[informerType] = informer
newFunc -> defaultInformer
ListWatch 对象
ListFunc
WatchFunc
&appsv1.Deployment{} 关注的的对象
resyncPeriod 重新同步周期
indexers 缓存对象
NewSharedIndexInformer() 创建 Informer
objectType
processor
cacheMutationDetector
indexer
AddEventHandlerWithResyncPeriod()
listener := newProcessListener()
s.processor.addListener(listener)
ResourceEventHandlerFuncs
dc.addDeployment
dc.updateDeployment
dc.deleteDeployment
rsInformer.Informer().AddEventHandler()
podInformer.Informer().AddEventHandler()
dc.syncHandler = dc.syncDeployment
dc.enqueueDeployment = dc.enqueue
dc.dLister = dInformer.Lister()
dc.rsLister = rsInformer.Lister()
dc.podLister = podInformer.Lister()
dc.dListerSynced = dInformer.Informer().HasSynced
dc.rsListerSynced = rsInformer.Informer().HasSynced
dc.podListerSynced = podInformer.Informer().HasSynced
for _, d := range f.dLister {
informers.Apps().V1().Deployments().Informer().GetIndexer().Add(d)
}
informers.Apps().V1().Deployments().Informer().GetIndexer().Add(d)
}
for _, rs := range f.rsLister {
informers.Apps().V1().ReplicaSets().Informer().GetIndexer().Add(rs)
}
informers.Apps().V1().ReplicaSets().Informer().GetIndexer().Add(rs)
}
for _, pod := range f.podLister {
informers.Core().V1().Pods().Informer().GetIndexer().Add(pod)
}
informers.Core().V1().Pods().Informer().GetIndexer().Add(pod)
}
informers.Start(stopCh)
go informer.Run(stopCh)
sharedIndexInformer.Run()
fifo := NewDeltaFIFOWithOptions
KnownObjects:s.indexer 缓存对象
EmitDeltaTypeReplaced: true
cfg := &Config{}
Queue: fifo
ListerWatcher: s.listerWatcher
ObjectType: s.objectType 关注的对象,比如 Deployment
ShouldResync: s.processor.shouldResync
Process: s.HandleDeltas
WatchErrorHandler: s.watchErrorHandler 默认为 null
s.controller = New(cfg)
s.cacheMutationDetector.Run()
s.processor.run()
p.wg.Start(listener.run) 处理 Notification
case updateNotification:
p.handler.OnUpdate(notification.oldObj, notification.newObj)
p.handler.OnUpdate(notification.oldObj, notification.newObj)
case addNotification:
p.handler.OnAdd(notification.newObj)
p.handler.OnAdd(notification.newObj)
case deleteNotification:
p.handler.OnDelete(notification.oldObj)
p.handler.OnDelete(notification.oldObj)
p.wg.Start(listener.pop) 添加 Notification
nextCh <- notification
notificationToAdd, ok := <-p.addCh
s.controller.Run(stopCh)
r := NewReflector
watchErrorHandler: WatchErrorHandler(DefaultWatchErrorHandler)
store: c.config.Queue,
r.Run()
ListAndWatch(stopCh)
pager.List(context.Background(), options)
这里是支持分页查询的,但是分页查询必须重新执行 ListAndWatch 函数
listMetaInterface, err := meta.ListAccessor(list)
resourceVersion = listMetaInterface.GetResourceVersion() 获取 resourceVersion
resourceVersion = listMetaInterface.GetResourceVersion() 获取 resourceVersion
items, err := meta.ExtractList(list)
r.syncWith(items, resourceVersion) 全量同步
r.syncWith(items, resourceVersion) 全量同步
r.store.Replace(found, resourceVersion)
启动一个 reSync() 的 goroutine
r.ShouldResync == nil || r.ShouldResync()
r.store.Resync()
w, err := r.listerWatcher.Watch(options) 开始watch资源
r.watchHandler() 处理watch的资源
watch.Added
r.store.Add(event.Object)
watch.Modified
r.store.Update(event.Object)
watch.Deleted
r.store.Delete(event.Object)
watch.Bookmark
不处理
r.setLastSyncResourceVersion(newResourceVersion) 更新resourceVersion
r.watchErrorHandler(r, err)
c.processLoop
c.config.Queue.Pop() 从队列中弹出对象
PopProcessFunc(c.config.Process) 处理弹出的对象
c.syncDeployment(context.TODO(), deploymentName)
queueActionLocked(actionType DeltaType, obj interface{})
f.cond.Broadcast()
processDeltas
clientState 操作
clientState.Update(obj)
clientState.Add(obj)
clientState.Delete(obj)
handler 操作
handler.OnUpdate(old, obj)
handler.OnAdd(obj)
handler.OnDelete(obj)
更新本地缓存
0 条评论
下一页