Kubernetes 控制器管理器的工作原理
文章目录
在 Kubernetes Master 节点中,有三个重要的组件:ApiServer、ControllerManager 和 Scheduler,它们共同负责整个集群的管理。在本文中,我们尝试梳理一下ControllerManager的工作流程和原理。
什么是控制器管理器
根据官方文档:kube-controller-manager 运行控制器,这是处理集群中常规任务的后台线程。
例如,当通过 Deployment 创建的 Pod 异常退出时,RS Controller 会接受并处理退出并创建新的 Pod 以保持预期的副本数。
几乎每个特定的资源都由特定的 Controller 管理以维持预期的状态,而 Controller Manager 的职责是聚合所有 Controller:
- 提供基础设施以降低控制器实现的复杂性
- 启动和维护控制器的正常运行时间
这样,Controller 保证集群中的资源保持在预期状态,Controller Manager 确保 Controller 保持在预期状态。
控制器工作流程
在我们解释 Controller Manager 如何为 Controller 提供基础架构和运行时环境之前,让我们先了解一下 Controller 工作流程是什么样的。
从高维的角度来看,ControllerManager主要提供了分发事件的能力,而不同的Controller只需要注册相应的Handler即可等待接收和处理事件。
以Deployment Controller为例,其中的NewDeploymentController
方法pkg/controller/deployment/deployment_controller.go
包括Event Handler的注册,对于Deployment Controller,只需要根据不同的事件实现不同的处理逻辑,就可以实现对相应资源的管理。
dInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: dc.addDeployment, UpdateFunc: dc.updateDeployment, // This will enter the sync loop and no-op, because the deployment has been deleted from the store. DeleteFunc: dc.deleteDeployment, }) rsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: dc.addReplicaSet, UpdateFunc: dc.updateReplicaSet, DeleteFunc: dc.deleteReplicaSet, }) podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ DeleteFunc: dc.deletePod, })
可以看到,在ControllerManager的帮助下,Controller的逻辑可以很纯粹的通过实现对应的EventHandler来完成,那么ControllerManager具体做了哪些工作呢?
控制器管理器架构
帮助 Controller Manager 进行事件分发的关键模块是 client-go,而更关键的模块之一是 informer。
kubernetes在github上提供了client-go的架构图,从中可以看出Controller是描述的下半部分(CustomController),而Controller Manager主要是完成的上半部分。
Informer Factory
从上图中可以看出,Informer 是一个非常关键的“桥梁”,所以对 Informer 的管理是 Controller Manager 做的第一件事。
由于每个 Informer 都与 Api Server 保持一个 watch long 连接,因此这个单实例工厂通过为所有 Controller 提供一个唯一的入口点来获取 Informer,从而确保每种类型的 Informer 仅实例化一次。
这个单例工厂的初始化逻辑。
// NewSharedInformerFactoryWithOptions constructs a new instance of a SharedInformerFactory with additional options. func NewSharedInformerFactoryWithOptions(client kubernetes.Interface, defaultResync time.Duration, options ...SharedInformerOption) SharedInformerFactory { factory := &sharedInformerFactory{ client: client, namespace: v1.NamespaceAll, defaultResync: defaultResync, informers: make(map[reflect.Type]cache.SharedIndexInformer), startedInformers: make(map[reflect.Type]bool), customResync: make(map[reflect.Type]time.Duration), } // Apply all options for _, opt := range options { factory = opt(factory) } return factory }
从上面的初始化逻辑可以看出,其中最重要的部分sharedInformerFactory
是名为 的map informers
,其中key是资源类型,value是关心该资源类型的Informer。每种类型的 Informer 只会被实例化一次并存储在map中。不同的 Controller 只有在需要相同的资源时才会得到相同的 Informer 实例。
对于Controller Manager来说,保持所有Informer正常工作是所有Controller正常工作的基本条件。通过这个sharedInformerFactory
map维护所有informer实例,所以sharedInformerFactory
也负责提供一个统一的启动入口。
// Start initializes all requested informers. func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) { f.lock.Lock() defer f.lock.Unlock() for informerType, informer := range f.informers { if !f.startedInformers[informerType] { go informer.Run(stopCh) f.startedInformers[informerType] = true } } }
Controller Manager启动时,最重要的是通过Start
这个工厂的方法运行所有的Informer。
Informer creation
以下是这些 Informer 的创建方式,Controller ManagerNewControllerInitializers
在cmd/kube-controller-manager/app/controllermanager.go
. 由于代码冗长,这里仅提供部署控制器的示例。
初始化部署控制器的逻辑startDeploymentController
在cmd/kube-controller-manager/app/apps.go
.
func startDeploymentController(ctx ControllerContext) (http.Handler, bool, error) { if !ctx.AvailableResources[schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "deployments"}] { return nil, false, nil } dc, err := deployment.NewDeploymentController( ctx.InformerFactory.Apps().V1().Deployments(), ctx.InformerFactory.Apps().V1().ReplicaSets(), ctx.InformerFactory.Core().V1().Pods(), ctx.ClientBuilder.ClientOrDie("deployment-controller"), ) if err != nil { return nil, true, fmt.Errorf("error creating Deployment controller: %v", err) } go dc.Run(int(ctx.ComponentConfig.DeploymentController.ConcurrentDeploymentSyncs), ctx.Stop) return nil, true, nil }
最关键的逻辑在 中deployment.NewDeploymentController
,实际上是创建了 Deployment Controller,创建函数的前三个参数分别是 Deployment、ReplicaSet 和 Pod 的 Informer。如您所见,Informer 的单例工厂提供了一个入口点,用于使用 ApiGroup 作为路径创建具有不同资源的 Informer。
但是,重要的是要注意这一点。Apps().V1().Deployments()
返回 type 的实例deploymentInformer
,但deploymentInformer
不是真正的 Informer(尽管它的 Informer 名称)。它只是一个模板类,其主要功能是为创建专注于部署的 Informer 提供模板作为特定资源。
// Deployments returns a DeploymentInformer. func (v *version) Deployments() DeploymentInformer { return &deploymentInformer{factory: v.factory, namespace: v.namespace, tweakListOptions: v.tweakListOptions} }
创建 Informer 的真正逻辑在deploymentInformer.Informer()
( client-go/informers/apps/v1/deployment.go
) 中,是默认的 Deployment Informer 创建模板方法,通过将资源实例和该模板方法传递给 Informer 工厂f.defaultInformer
的方法来创建只关注 Deployment 资源的 Informer。InformerFor
func (f *deploymentInformer) Informer() cache.SharedIndexInformer { return f.factory.InformerFor(&appsv1.Deployment{}, f.defaultInformer) }
简要说明。
- 您可以通过 Informer 工厂获取特定类型的 Informer 模板类(即
deploymentInformer
本例中) Informer()
实际上,为特定资源创建 Informer 的是 Informer 模板类的方法。- 该
Informer()
方法只是通过InformerFor
Informer 工厂创建真正的 Informer
这里使用了模板方法(设计模式),虽然有点绕,但是可以参考下图梳理一下。理解它的关键是 Informer 的差异化创建逻辑委托给了模板类。
最后,命名的结构sharedIndexInformer
将被实例化,并实际承担 Informer 的职责。它也是注册到 Informer 工厂映射的实例。
Informer operation
由于真正的 Informer 实例是一个类型的对象sharedIndexInformer
,当 Informer 工厂启动时(通过执行Start
方法),它sharedIndexInformer
就是实际运行的。
sharedIndexInformer
是client-go中的一个组件,它的方法Run
有几十行,但是工作量很大。这是我们进入控制器管理器最有趣的部分的地方。
func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) { defer utilruntime.HandleCrash() fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, s.indexer) cfg := &Config{ Queue: fifo, ListerWatcher: s.listerWatcher, ObjectType: s.objectType, FullResyncPeriod: s.resyncCheckPeriod, RetryOnError: false, ShouldResync: s.processor.shouldResync, Process: s.HandleDeltas, } func() { s.startedLock.Lock() defer s.startedLock.Unlock() s.controller = New(cfg) s.controller.(*controller).clock = s.clock s.started = true }() // Separate stop channel because Processor should be stopped strictly after controller processorStopCh := make(chan struct{}) var wg wait.Group defer wg.Wait() // Wait for Processor to stop defer close(processorStopCh) // Tell Processor to stop wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run) wg.StartWithChannel(processorStopCh, s.processor.run) defer func() { s.startedLock.Lock() defer s.startedLock.Unlock() s.stopped = true // Don't want any new listeners }() s.controller.Run(stopCh) }
启动逻辑sharedIndexInformer
做了几件事。
- 创建一个名为 的队列
fifo
。 - 创建并运行一个名为
controller
. - 开始了
cacheMutationDetector
。 - 开始了
processor
。
这些术语(或组件)在上一篇文章中没有提到,但这四件事是 Controller Manager 工作的核心,因此我将在下面逐一介绍。
sharedIndexInformer
是一个共享的 Informer 框架,不同的 Controller 只需要提供一个模板类(就像deploymentInformer
上面提到的)来创建一个特定于他们需要的 Informer。
sharedIndexInformer
包含一堆工具来完成 Informer 的工作,主要代码在client-go/tools/cache/shared_informer.go
. 它的创建逻辑也在其中。
// NewSharedIndexInformer creates a new instance for the listwatcher. func NewSharedIndexInformer(lw ListerWatcher, objType runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer { realClock := &clock.RealClock{} sharedIndexInformer := &sharedIndexInformer{ processor: &sharedProcessor{clock: realClock}, indexer: NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers), listerWatcher: lw, objectType: objType, resyncCheckPeriod: defaultEventHandlerResyncPeriod, defaultEventHandlerResyncPeriod: defaultEventHandlerResyncPeriod, cacheMutationDetector: NewCacheMutationDetector(fmt.Sprintf("%T", objType)), clock: realClock, } return sharedIndexInformer }
在创建逻辑中,有几点需要注意:
- processor:提供EventHandler注册和事件分发的功能
- indexer:提供资源缓存功能
- listerWatcher:由模板类提供,包含特定资源的List和Watch方法
- objectType:用于标记要关注的具体资源类型
- cacheMutationDetector:监控Informer的缓存
此外,它还包含DeltaFIFO
队列和controller
上面的启动逻辑中提到的,下面分别介绍。
处理器是 sharedIndexInformer 中一个非常有趣的组件。ControllerManager通过一个Informer单例工厂保证不同的Controller共享同一个Informer,但是不同的Controller在共享的Informer上注册了不同的Handler。
处理器是管理注册的Handler并将事件分发给不同的Handler的组件。
type sharedProcessor struct { listenersStarted bool listenersLock sync.RWMutex listeners []*processorListener syncingListeners []*processorListener clock clock.Clock wg wait.Group }
sharedProcessor 工作的核心围绕着listeners
.
当我们向 Informer 注册一个 Handler 时,它最终会被转换为一个名为processorListener
func newProcessListener(handler ResourceEventHandler, requestedResyncPeriod, resyncPeriod time.Duration, now time.Time, bufferSize int) *processorListener { ret := &processorListener{ nextCh: make(chan interface{}), addCh: make(chan interface{}), handler: handler, pendingNotifications: *buffer.NewRingGrowing(bufferSize), requestedResyncPeriod: requestedResyncPeriod, resyncPeriod: resyncPeriod, } ret.determineNextResync(now) return ret }
该实例主要包含两个通道和外部注册的 Handler 方法。此处实例化的processorListener
对象最终将被添加到sharedProcessor.listeners
列表中。
func (p *sharedProcessor) addListener(listener *processorListener) { p.listenersLock.Lock() defer p.listenersLock.Unlock() p.addListenerLocked(listener) if p.listenersStarted { p.wg.Start(listener.run) p.wg.Start(listener.pop) } }
如图所示,Controller 中的 Handler 方法最终会添加到 Listener 中,Listener 会附加到sharedProcessor
前面说过,启动时sharedIndexInformer
会运行sharedProcessor
,启动的逻辑sharedProcessor
与这些有关listeners
。
func (p *sharedProcessor) run(stopCh <-chan struct{}) { func() { p.listenersLock.RLock() defer p.listenersLock.RUnlock() for _, listener := range p.listeners { p.wg.Start(listener.run) p.wg.Start(listener.pop) } p.listenersStarted = true }() <-stopCh p.listenersLock.RLock() defer p.listenersLock.RUnlock() for _, listener := range p.listeners { close(listener.addCh) // Tell .pop() to stop. .pop() will tell .run() to stop } p.wg.Wait() // Wait for all .pop() and .run() to stop }
可以看到,启动的时候会依次sharedProcessor
执行 的run
和pop
方法,所以现在来看这两个方法。listener
Starting the listener
由于监听器包含注册到Controller的Handler方法,所以监听器最重要的作用就是在事件发生时触发这些方法,并listener.run
不断从nextCh
通道中获取事件并执行相应的处理程序。
func (p *processorListener) run() { // this call blocks until the channel is closed. When a panic happens during the notification // we will catch it, **the offending item will be skipped!**, and after a short delay (one second) // the next notification will be attempted. This is usually better than the alternative of never // delivering again. stopCh := make(chan struct{}) wait.Until(func() { // this gives us a few quick retries before a long pause and then a few more quick retries err := wait.ExponentialBackoff(retry.DefaultRetry, func() (bool, error) { for next := range p.nextCh { switch notification := next.(type) { case updateNotification: p.handler.OnUpdate(notification.oldObj, notification.newObj) case addNotification: p.handler.OnAdd(notification.newObj) case deleteNotification: p.handler.OnDelete(notification.oldObj) default: utilruntime.HandleError(fmt.Errorf("unrecognized notification: %T", next)) } } // the only way to get here is if the p.nextCh is empty and closed return true, nil }) // the only way to get here is if the p.nextCh is empty and closed if err == nil { close(stopCh) } }, 1*time.Minute, stopCh) }
可以看到listener.run
不断从nextCh
通道中获取事件,但是通道中的事件是nextCh
从哪里来的呢?将listener.pop
事件放入nextCh
.
listener.pop
是一个非常聪明和有趣的逻辑。
func (p *processorListener) pop() { defer utilruntime.HandleCrash() defer close(p.nextCh) // Tell .run() to stop var nextCh chan<- interface{} var notification interface{} for { select { case nextCh <- notification: // Notification dispatched var ok bool notification, ok = p.pendingNotifications.ReadOne() if !ok { // Nothing to pop nextCh = nil // Disable this select case } case notificationToAdd, ok := <-p.addCh: if !ok { return } if notification == nil { // No notification to pop (and pendingNotifications is empty) // Optimize the case - skip adding to pendingNotifications notification = notificationToAdd nextCh = p.nextCh } else { // There is already a notification waiting to be dispatched p.pendingNotifications.WriteOne(notificationToAdd) } } } }
listener
包含两个通道addCh
的原因nextCh
是:Informer 无法预测listener.handler
消耗事件的速度是否比产生事件的速度快,因此它添加了一个名为pendingNotifications
. 队列来保存没有被及时消费的事件。
pop
一方面,该方法不断获取最新事件,addCh
以确保生产者不会阻塞。然后它确定缓冲区是否存在,如果存在,则将事件添加到缓冲区,如果不存在,则尝试将其推送到nextCh
.
另一方面,它确定缓冲区中是否还有任何事件,如果还有库存,它会不断将其传递给nextCh
.
该pop
方法实现了一种带有缓冲区的分发机制,该缓冲区允许事件不断地从 传递addCh
到nextCh
。但是问题来了,这些addCh
事件是从哪里来的?
源代码非常简单,listener
有一个add
以事件为输入的方法,它将新事件推送到addCh
. 该add
方法由sharedProcessor
管理所有 s 的listener s
调用。
如上所述,sharedProcessor
负责管理所有的Handler和分发事件,但distribute
真正分发的是方法。
func (p *sharedProcessor) distribute(obj interface{}, sync bool) { p.listenersLock.RLock() defer p.listenersLock.RUnlock() if sync { for _, listener := range p.syncingListeners { listener.add(obj) } } else { for _, listener := range p.listeners { listener.add(obj) } } }
到目前为止,我们对一个部分有了更清晰的了解:
- Controller 向 Informer 注册 Handler。
- Informer 通过
sharedProcessor
. - Informer 接收事件并通过
sharedProcessor.distribute
. - Controller由对应的Handler触发处理自己的逻辑
那么剩下的问题是 Informer 事件从何而来?
DeltaFIFO
在分析 Informer fetch 事件之前,需要提前告知的一个非常有趣的小工具fifo
是sharedIndexInformer.Run
.
fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, s.indexer)
DeltaFIFO 是一个非常有趣的队列,其代码定义在client-go/tools/cache/delta_fifo.go
. 对于队列来说,最重要的肯定是 Add 和 Pop 方法。DeltaFIFO 提供了几种 Add 方法,虽然根据不同的事件类型(add/update/delete/sync)来区分不同的方法,但最终都是执行queueActionLocked
。
// queueActionLocked appends to the delta list for the object. // Caller must lock first. func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error { id, err := f.KeyOf(obj) if err != nil { return KeyError{obj, err} } // If object is supposed to be deleted (last event is Deleted), // then we should ignore Sync events, because it would result in // recreation of this object. if actionType == Sync && f.willObjectBeDeletedLocked(id) { return nil } newDeltas := append(f.items[id], Delta{actionType, obj}) newDeltas = dedupDeltas(newDeltas) if len(newDeltas) > 0 { if _, exists := f.items[id]; !exists { f.queue = append(f.queue, id) } f.items[id] = newDeltas f.cond.Broadcast() } else { // We need to remove this from our map (extra items in the queue are // ignored if they are not in the map). delete(f.items, id) } return nil }
该queueActionLocked
方法的第一个参数 actionType 是事件类型。
const ( Added DeltaType = "Added" // watch api 获得的创建事件 Updated DeltaType = "Updated" // watch api 获得的更新事件 Deleted DeltaType = "Deleted" // watch api 获得的删除事件 Sync DeltaType = "Sync" // 触发了 List Api,需要刷新缓存 )
事件类型和入队方式表明这是一个具有业务功能的队列,而不仅仅是“先进先出”,入队方式有两个非常巧妙的设计。
- 队列中的事件会先判断资源是否有未消费的事件,然后进行适当的处理。
- 如果 list 方法发现资源已经被删除,则不处理。
第二点比较容易理解,如果触发了列表请求,发现要处理的资源已经被删除了,那么就不需要再排队了。第一点需要和out of queue方法一起看。
func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) { f.lock.Lock() defer f.lock.Unlock() for { for len(f.queue) == 0 { // When the queue is empty, invocation of Pop() is blocked until new item is enqueued. // When Close() is called, the f.closed is set and the condition is broadcasted. // Which causes this loop to continue and return from the Pop(). if f.IsClosed() { return nil, ErrFIFOClosed } f.cond.Wait() } id := f.queue[0] f.queue = f.queue[1:] if f.initialPopulationCount > 0 { f.initialPopulationCount-- } item, ok := f.items[id] if !ok { // Item may have been deleted subsequently. continue } delete(f.items, id) err := process(item) if e, ok := err.(ErrRequeue); ok { f.addIfNotPresent(id, item) err = e.Err } // Don't need to copyDeltas here, because we're transferring // ownership to the caller. return item, err } }
DeltaFIFO 的Pop
方法有一个输入,即处理函数。当它从队列中出来时,DeltaFIFO会首先根据资源id获取资源所有的事件,然后交给handler函数。
工作流程如图所示。
一般来说,DeltaFIFO的queue方法首先判断资源是否已经在items
,如果是,则资源还没有被消费(仍然在排队),所以直接将事件追加到items[resource_id]
。如果不在 中items
,则items[resource_id]
创建 then 并将资源 id 附加到queue
.
DeltaFIFO out-of-queue 方法从 获取队列顶部的资源 id queue
,然后从 获取该资源的所有事件items
,最后调用该方法PopProcessFunc
传入的类型处理程序Pop
。
所以,DeltaFIFO 的特点是队列中的(资源的)事件,当它从队列中出来时,它获取队列中第一个资源的所有事件。这种设计确保不会因为某个资源疯狂地创建事件而导致饥饿,从而使其他资源没有机会被处理。
控制器 controller
DeltaFIFO 是一个非常重要的组件,唯一真正使它有价值的是 Informer controller
。
虽然 K8s 源代码确实使用了这个词controller
,但这controller
不是像部署控制器那样的资源控制器。相反,它是一个自上而下的事件控制器(从 API 服务器获取事件并将它们发送到 Informer 进行处理)。
职责controller
是双重的。
- 通过 List-Watch 从 Api Server 获取事件并将事件推送到 DeltaFIFO
HandleDeltas
以 的方法sharedIndexInformer
作为参数调用 DeltaFIFO 的 Pop 方法
定义controller
很简单,其核心是Reflector
。
type controller struct { config Config reflector *Reflector reflectorMutex sync.RWMutex clock clock.Clock }
controllerr
的代码Reflector
比较繁琐但是很简单,就是通过listerWatcher
sharedIndexInformer
中定义的list-watch
,将获取到的事件推送到DeltaFIFO
中。
控制器启动后,启动Reflector
,然后执行processLoop
,这是一个死循环,不断从DeltaFIFO中读取资源事件,并交给sharedIndexInformer
(分配给config.Process)的HandleDeltas
方法。
func (c *controller) processLoop() { for { obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process)) if err != nil { if err == ErrFIFOClosed { return } if c.config.RetryOnError { // This is the safe way to re-enqueue. c.config.Queue.AddIfNotPresent(obj) } } } }
如果我们看一下 sharedIndexInformer 的 HandleDeltas 方法,我们可以看到整个事件消费过程是有效的。
func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error { s.blockDeltas.Lock() defer s.blockDeltas.Unlock() // from oldest to newest for _, d := range obj.(Deltas) { switch d.Type { case Sync, Added, Updated: isSync := d.Type == Sync s.cacheMutationDetector.AddObject(d.Object) if old, exists, err := s.indexer.Get(d.Object); err == nil && exists { if err := s.indexer.Update(d.Object); err != nil { return err } s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync) } else { if err := s.indexer.Add(d.Object); err != nil { return err } s.processor.distribute(addNotification{newObj: d.Object}, isSync) } case Deleted: if err := s.indexer.Delete(d.Object); err != nil { return err } s.processor.distribute(deleteNotification{oldObj: d.Object}, false) } } return nil }
前面我们了解到,该processor.attribute
方法将事件分发给 all listeners
,并controller
使用Reflector
来从 ApiServer 中获取事件并放入队列中,然后通过processLoop
为要处理的资源从队列中取出事件,最后调用processor.attribute
via的HandleDeltas
方法sharedIndexInformer
。所有事件,最后processor.attribute
是通过 的HandleDeltas
方法调用的sharedIndexInformer
。
因此,我们可以按如下方式组织整个事件流。
Indexer
上面,我们整理了从事件接收到分发的所有逻辑,但是在sharedIndexInformer的HandleDeltas方法中,有一些逻辑比较有意思,就是所有的事件都是s.indexer
先更新再分发。
前文提到,Indexer是一个线程安全的存储,作为缓存来缓解资源控制器(Controller)查询资源时对ApiServer的压力。
当有事件更新时,会先刷新Indexer中的缓存,然后将事件分发给资源控制器,资源控制器会先从Indexer获取资源详情,从而减少对APIServer的不必要查询请求。
Indexer存储的具体实现在client-go/tools/cache/thread_safe_store.go中,数据存储在threadSafeMap
.
type threadSafeMap struct { lock sync.RWMutex items map[string]interface{} // indexers maps a name to an IndexFunc indexers Indexers // indices maps a name to an Index indices Indices }
本质上,threadSafeMap
是一个带有读/写锁的映射,除此之外还可以定义索引,有趣的是由两个字段实现。
Indexers
是一个定义了多个索引函数的map,key是indexName,value是索引函数(计算资源的索引值)。Indices
保存索引值和数据key的映射关系,Indices
是一个两级的map,第一级的key是indexName,对应Indexers
并决定用什么方法计算索引值,value是一个保存关联的map “索引值-资源键”关联。
相关逻辑比较简单,如下图所示。
MutationDetector
更新数据的HandleDeltas
方法除了.sharedIndexInformer
s.indexer
s.cacheMutationDetector
开头提到,在sharedIndexInformer
启动的时候,也会启动一个cacheMutationDetector
来监控索引器的缓存。
因为 indexer 缓存实际上是一个指针,所以多个 Controller 访问 indexer 的缓存资源实际上得到的是同一个资源实例。如果一个Controller玩不好,修改了一个资源的属性,肯定会影响其他Controller的正确性。
当 Informer 接收到新事件时,MutationDetector 会保存指向资源的指针(索引器也是如此)和资源的深层副本。通过周期性地检查指针指向的资源是否与开头存储的深拷贝相匹配,我们就知道缓存的资源是否被修改过。
但是,是否启用监控会受到环境变量的影响KUBE_CACHE_MUTATION_DETECTOR
。如果未设置此环境变量,sharedIndexInformer
将实例化dummyMutationDetector
并且在启动后不执行任何操作。
如果KUBE_CACHE_MUTATION_DETECTOR
为true
,sharedIndexInformer 实例化defaultCacheMutationDetector
,它以 1s 的间隔执行缓存的定期检查,如果发现缓存被修改,则触发故障处理函数,如果未定义该函数,则触发恐慌。
概括
本文对ControllerManager进行了狭义的解释,因为它不包括具体的资源管理器(Controller),而只是解释了ControllerManager是如何“管理控制器”的。
可以看到ControllerManager做了很多工作来保证Controller可以只关注自己关心的事件,而这项工作的核心就是Informer。当您了解 Informer 如何与其他组件一起工作时,就会清楚控制器管理器为资源管理器铺平了道路。