Kubernetes 控制器管理器的工作原理

在 Kubernetes Master 节点中,有三个重要的组件:ApiServer、ControllerManager 和 Scheduler,它们共同负责整个集群的管理。在本文中,我们尝试梳理一下ControllerManager的工作流程和原理。

什么是控制器管理器

根据官方文档:kube-controller-manager 运行控制器,这是处理集群中常规任务的后台线程。

例如,当通过 Deployment 创建的 Pod 异常退出时,RS Controller 会接受并处理退出并创建新的 Pod 以保持预期的副本数。

几乎每个特定的资源都由特定的 Controller 管理以维持预期的状态,而 Controller Manager 的职责是聚合所有 Controller:

  1. 提供基础设施以降低控制器实现的复杂性
  2. 启动和维护控制器的正常运行时间

这样,Controller 保证集群中的资源保持在预期状态,Controller Manager 确保 Controller 保持在预期状态。

 

 

控制器工作流程

在我们解释 Controller Manager 如何为 Controller 提供基础架构和运行时环境之前,让我们先了解一下 Controller 工作流程是什么样的。

从高维的角度来看,ControllerManager主要提供了分发事件的能力,而不同的Controller只需要注册相应的Handler即可等待接收和处理事件。

以Deployment Controller为例,其中的NewDeploymentController方法pkg/controller/deployment/deployment_controller.go包括Event Handler的注册,对于Deployment Controller,只需要根据不同的事件实现不同的处理逻辑,就可以实现对相应资源的管理。

dInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
	AddFunc:    dc.addDeployment,
	UpdateFunc: dc.updateDeployment,
	// This will enter the sync loop and no-op, because the deployment has been deleted from the store.
	DeleteFunc: dc.deleteDeployment,
})
rsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
	AddFunc:    dc.addReplicaSet,
	UpdateFunc: dc.updateReplicaSet,
	DeleteFunc: dc.deleteReplicaSet,
})
podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
	DeleteFunc: dc.deletePod,
})

可以看到,在ControllerManager的帮助下,Controller的逻辑可以很纯粹的通过实现对应的EventHandler来完成,那么ControllerManager具体做了哪些工作呢?

 

 

控制器管理器架构

帮助 Controller Manager 进行事件分发的关键模块是 client-go,而更关键的模块之一是 informer。

kubernetes在github上提供了client-go的架构图,从中可以看出Controller是描述的下半部分(CustomController),而Controller Manager主要是完成的上半部分。

 

 

Informer Factory

从上图中可以看出,Informer 是一个非常关键的“桥梁”,所以对 Informer 的管理是 Controller Manager 做的第一件事。

由于每个 Informer 都与 Api Server 保持一个 watch long 连接,因此这个单实例工厂通过为所有 Controller 提供一个唯一的入口点来获取 Informer,从而确保每种类型的 Informer 仅实例化一次。

这个单例工厂的初始化逻辑。

// NewSharedInformerFactoryWithOptions constructs a new instance of a SharedInformerFactory with additional options.
func NewSharedInformerFactoryWithOptions(client kubernetes.Interface, defaultResync time.Duration, options ...SharedInformerOption) SharedInformerFactory {
	factory := &sharedInformerFactory{
		client:           client,
		namespace:        v1.NamespaceAll,
		defaultResync:    defaultResync,
		informers:        make(map[reflect.Type]cache.SharedIndexInformer),
		startedInformers: make(map[reflect.Type]bool),
		customResync:     make(map[reflect.Type]time.Duration),
	}

	// Apply all options
	for _, opt := range options {
		factory = opt(factory)
	}

	return factory
}

 

从上面的初始化逻辑可以看出,其中最重要的部分sharedInformerFactory是名为 的map informers,其中key是资源类型,value是关心该资源类型的Informer。每种类型的 Informer 只会被实例化一次并存储在map中。不同的 Controller 只有在需要相同的资源时才会得到相同的 Informer 实例。

对于Controller Manager来说,保持所有Informer正常工作是所有Controller正常工作的基本条件。通过这个sharedInformerFactorymap维护所有informer实例,所以sharedInformerFactory也负责提供一个统一的启动入口。

// Start initializes all requested informers.
func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) {
	f.lock.Lock()
	defer f.lock.Unlock()

	for informerType, informer := range f.informers {
		if !f.startedInformers[informerType] {
			go informer.Run(stopCh)
			f.startedInformers[informerType] = true
		}
	}
}

Controller Manager启动时,最重要的是通过Start这个工厂的方法运行所有的Informer。

 

 

Informer creation

以下是这些 Informer 的创建方式,Controller ManagerNewControllerInitializerscmd/kube-controller-manager/app/controllermanager.go. 由于代码冗长,这里仅提供部署控制器的示例。

初始化部署控制器的逻辑startDeploymentControllercmd/kube-controller-manager/app/apps.go.

func startDeploymentController(ctx ControllerContext) (http.Handler, bool, error) {
	if !ctx.AvailableResources[schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "deployments"}] {
		return nil, false, nil
	}
	dc, err := deployment.NewDeploymentController(
		ctx.InformerFactory.Apps().V1().Deployments(),
		ctx.InformerFactory.Apps().V1().ReplicaSets(),
		ctx.InformerFactory.Core().V1().Pods(),
		ctx.ClientBuilder.ClientOrDie("deployment-controller"),
	)
	if err != nil {
		return nil, true, fmt.Errorf("error creating Deployment controller: %v", err)
	}
	go dc.Run(int(ctx.ComponentConfig.DeploymentController.ConcurrentDeploymentSyncs), ctx.Stop)
	return nil, true, nil
}

最关键的逻辑在 中deployment.NewDeploymentController,实际上是创建了 Deployment Controller,创建函数的前三个参数分别是 Deployment、ReplicaSet 和 Pod 的 Informer。如您所见,Informer 的单例工厂提供了一个入口点,用于使用 ApiGroup 作为路径创建具有不同资源的 Informer。

但是,重要的是要注意这一点。Apps().V1().Deployments() 返回 type 的实例deploymentInformer,但deploymentInformer不是真正的 Informer(尽管它的 Informer 名称)。它只是一个模板类,其主要功能是为创建专注于部署的 Informer 提供模板作为特定资源。

// Deployments returns a DeploymentInformer.
func (v *version) Deployments() DeploymentInformer {
	return &deploymentInformer{factory: v.factory, namespace: v.namespace, tweakListOptions: v.tweakListOptions}
}

 

创建 Informer 的真正逻辑在deploymentInformer.Informer()client-go/informers/apps/v1/deployment.go) 中,是默认的 Deployment Informer 创建模板方法,通过将资源实例和该模板方法传递给 Informer 工厂f.defaultInformer的方法来创建只关注 Deployment 资源的 Informer。InformerFor

func (f *deploymentInformer) Informer() cache.SharedIndexInformer {
	return f.factory.InformerFor(&appsv1.Deployment{}, f.defaultInformer)
}

简要说明。

  1. 您可以通过 Informer 工厂获取特定类型的 Informer 模板类(即deploymentInformer本例中)
  2. Informer()实际上,为特定资源创建 Informer 的是 Informer 模板类的方法。
  3. Informer()方法只是通过InformerForInformer 工厂创建真正的 Informer

这里使用了模板方法(设计模式),虽然有点绕,但是可以参考下图梳理一下。理解它的关键是 Informer 的差异化创建逻辑委托给了模板类

最后,命名的结构sharedIndexInformer将被实例化,并实际承担 Informer 的职责。它也是注册到 Informer 工厂映射的实例。

 

 

Informer operation

由于真正的 Informer 实例是一个类型的对象sharedIndexInformer,当 Informer 工厂启动时(通过执行Start方法),它sharedIndexInformer就是实际运行的。

sharedIndexInformer是client-go中的一个组件,它的方法Run有几十行,但是工作量很大。这是我们进入控制器管理器最有趣的部分的地方。

func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
	defer utilruntime.HandleCrash()

	fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, s.indexer)

	cfg := &Config{
		Queue:            fifo,
		ListerWatcher:    s.listerWatcher,
		ObjectType:       s.objectType,
		FullResyncPeriod: s.resyncCheckPeriod,
		RetryOnError:     false,
		ShouldResync:     s.processor.shouldResync,

		Process: s.HandleDeltas,
	}

	func() {
		s.startedLock.Lock()
		defer s.startedLock.Unlock()

		s.controller = New(cfg)
		s.controller.(*controller).clock = s.clock
		s.started = true
	}()

	// Separate stop channel because Processor should be stopped strictly after controller
	processorStopCh := make(chan struct{})
	var wg wait.Group
	defer wg.Wait()              // Wait for Processor to stop
	defer close(processorStopCh) // Tell Processor to stop
	wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)
	wg.StartWithChannel(processorStopCh, s.processor.run)

	defer func() {
		s.startedLock.Lock()
		defer s.startedLock.Unlock()
		s.stopped = true // Don't want any new listeners
	}()
	s.controller.Run(stopCh)
}

启动逻辑sharedIndexInformer做了几件事。

  1. 创建一个名为 的队列fifo
  2. 创建并运行一个名为controller.
  3. 开始了cacheMutationDetector
  4. 开始了processor

这些术语(或组件)在上一篇文章中没有提到,但这四件事是 Controller Manager 工作的核心,因此我将在下面逐一介绍。

 

 

sharedIndexInformer

sharedIndexInformer是一个共享的 Informer 框架,不同的 Controller 只需要提供一个模板类(就像deploymentInformer上面提到的)来创建一个特定于他们需要的 Informer。

sharedIndexInformer包含一堆工具来完成 Informer 的工作,主要代码在client-go/tools/cache/shared_informer.go. 它的创建逻辑也在其中。

// NewSharedIndexInformer creates a new instance for the listwatcher.
func NewSharedIndexInformer(lw ListerWatcher, objType runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer {
	realClock := &clock.RealClock{}
	sharedIndexInformer := &sharedIndexInformer{
		processor:                       &sharedProcessor{clock: realClock},
		indexer:                         NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers),
		listerWatcher:                   lw,
		objectType:                      objType,
		resyncCheckPeriod:               defaultEventHandlerResyncPeriod,
		defaultEventHandlerResyncPeriod: defaultEventHandlerResyncPeriod,
		cacheMutationDetector:           NewCacheMutationDetector(fmt.Sprintf("%T", objType)),
		clock:                           realClock,
	}
	return sharedIndexInformer
}

在创建逻辑中,有几点需要注意:

  1. processor:提供EventHandler注册和事件分发的功能
  2. indexer:提供资源缓存功能
  3. listerWatcher:由模板类提供,包含特定资源的List和Watch方法
  4. objectType:用于标记要关注的具体资源类型
  5. cacheMutationDetector:监控Informer的缓存

此外,它还包含DeltaFIFO队列和controller上面的启动逻辑中提到的,下面分别介绍。

 

 

sharedProcessor

处理器是 sharedIndexInformer 中一个非常有趣的组件。ControllerManager通过一个Informer单例工厂保证不同的Controller共享同一个Informer,但是不同的Controller在共享的Informer上注册了不同的Handler。

处理器是管理注册的Handler并将事件分发给不同的Handler的组件。

type sharedProcessor struct {
	listenersStarted bool
	listenersLock    sync.RWMutex
	listeners        []*processorListener
	syncingListeners []*processorListener
	clock            clock.Clock
	wg               wait.Group
}

sharedProcessor 工作的核心围绕着listeners.

当我们向 Informer 注册一个 Handler 时,它最终会被转换为一个名为processorListener

func newProcessListener(handler ResourceEventHandler, requestedResyncPeriod, resyncPeriod time.Duration, now time.Time, bufferSize int) *processorListener {
	ret := &processorListener{
		nextCh:                make(chan interface{}),
		addCh:                 make(chan interface{}),
		handler:               handler,
		pendingNotifications:  *buffer.NewRingGrowing(bufferSize),
		requestedResyncPeriod: requestedResyncPeriod,
		resyncPeriod:          resyncPeriod,
	}

	ret.determineNextResync(now)

	return ret
}

 

该实例主要包含两个通道和外部注册的 Handler 方法。此处实例化的processorListener对象最终将被添加到sharedProcessor.listeners列表中。

func (p *sharedProcessor) addListener(listener *processorListener) {
	p.listenersLock.Lock()
	defer p.listenersLock.Unlock()

	p.addListenerLocked(listener)
	if p.listenersStarted {
		p.wg.Start(listener.run)
		p.wg.Start(listener.pop)
	}
}

 

如图所示,Controller 中的 Handler 方法最终会添加到 Listener 中,Listener 会附加到sharedProcessor

前面说过,启动时sharedIndexInformer会运行sharedProcessor,启动的逻辑sharedProcessor与这些有关listeners

func (p *sharedProcessor) run(stopCh <-chan struct{}) {
	func() {
		p.listenersLock.RLock()
		defer p.listenersLock.RUnlock()
		for _, listener := range p.listeners {
			p.wg.Start(listener.run)
			p.wg.Start(listener.pop)
		}
		p.listenersStarted = true
	}()
	<-stopCh
	p.listenersLock.RLock()
	defer p.listenersLock.RUnlock()
	for _, listener := range p.listeners {
		close(listener.addCh) // Tell .pop() to stop. .pop() will tell .run() to stop
	}
	p.wg.Wait() // Wait for all .pop() and .run() to stop
}

可以看到,启动的时候会依次sharedProcessor执行 的runpop方法,所以现在来看这两个方法。listener

 

 

Starting the listener

由于监听器包含注册到Controller的Handler方法,所以监听器最重要的作用就是在事件发生时触发这些方法,并listener.run不断从nextCh通道中获取事件并执行相应的处理程序。

func (p *processorListener) run() {
	// this call blocks until the channel is closed.  When a panic happens during the notification
	// we will catch it, **the offending item will be skipped!**, and after a short delay (one second)
	// the next notification will be attempted.  This is usually better than the alternative of never
	// delivering again.
	stopCh := make(chan struct{})
	wait.Until(func() {
		// this gives us a few quick retries before a long pause and then a few more quick retries
		err := wait.ExponentialBackoff(retry.DefaultRetry, func() (bool, error) {
			for next := range p.nextCh {
				switch notification := next.(type) {
				case updateNotification:
					p.handler.OnUpdate(notification.oldObj, notification.newObj)
				case addNotification:
					p.handler.OnAdd(notification.newObj)
				case deleteNotification:
					p.handler.OnDelete(notification.oldObj)
				default:
					utilruntime.HandleError(fmt.Errorf("unrecognized notification: %T", next))
				}
			}
			// the only way to get here is if the p.nextCh is empty and closed
			return true, nil
		})

		// the only way to get here is if the p.nextCh is empty and closed
		if err == nil {
			close(stopCh)
		}
	}, 1*time.Minute, stopCh)
}

可以看到listener.run不断从nextCh通道中获取事件,但是通道中的事件是nextCh从哪里来的呢?将listener.pop事件放入nextCh.

listener.pop是一个非常聪明和有趣的逻辑。

func (p *processorListener) pop() {
	defer utilruntime.HandleCrash()
	defer close(p.nextCh) // Tell .run() to stop

	var nextCh chan<- interface{}
	var notification interface{}
	for {
		select {
		case nextCh <- notification:
			// Notification dispatched
			var ok bool
			notification, ok = p.pendingNotifications.ReadOne()
			if !ok { // Nothing to pop
				nextCh = nil // Disable this select case
			}
		case notificationToAdd, ok := <-p.addCh:
			if !ok {
				return
			}
			if notification == nil { // No notification to pop (and pendingNotifications is empty)
				// Optimize the case - skip adding to pendingNotifications
				notification = notificationToAdd
				nextCh = p.nextCh
			} else { // There is already a notification waiting to be dispatched
				p.pendingNotifications.WriteOne(notificationToAdd)
			}
		}
	}
}

 

listener包含两个通道addCh的原因nextCh是:Informer 无法预测listener.handler消耗事件的速度是否比产生事件的速度快,因此它添加了一个名为pendingNotifications. 队列来保存没有被及时消费的事件。

pop一方面,该方法不断获取最新事件,addCh以确保生产者不会阻塞。然后它确定缓冲区是否存在,如果存在,则将事件添加到缓冲区,如果不存在,则尝试将其推送到nextCh.

另一方面,它确定缓冲区中是否还有任何事件,如果还有库存,它会不断将其传递给nextCh.

pop方法实现了一种带有缓冲区的分发机制,该缓冲区允许事件不断地从 传递addChnextCh。但是问题来了,这些addCh事件是从哪里来的?

源代码非常简单,listener有一个add以事件为输入的方法,它将新事件推送到addCh. 该add方法由sharedProcessor管理所有 s 的listener s调用。

 

 

如上所述,sharedProcessor负责管理所有的Handler和分发事件,但distribute真正分发的是方法。

func (p *sharedProcessor) distribute(obj interface{}, sync bool) {
	p.listenersLock.RLock()
	defer p.listenersLock.RUnlock()

	if sync {
		for _, listener := range p.syncingListeners {
			listener.add(obj)
		}
	} else {
		for _, listener := range p.listeners {
			listener.add(obj)
		}
	}
}

到目前为止,我们对一个部分有了更清晰的了解:

  1. Controller 向 Informer 注册 Handler。
  2. Informer 通过sharedProcessor.
  3. Informer 接收事件并通过sharedProcessor.distribute.
  4. Controller由对应的Handler触发处理自己的逻辑

那么剩下的问题是 Informer 事件从何而来?

 

 

DeltaFIFO

在分析 Informer fetch 事件之前,需要提前告知的一个非常有趣的小工具fifosharedIndexInformer.Run.

fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, s.indexer)

DeltaFIFO 是一个非常有趣的队列,其代码定义在client-go/tools/cache/delta_fifo.go. 对于队列来说,最重要的肯定是 Add 和 Pop 方法。DeltaFIFO 提供了几种 Add 方法,虽然根据不同的事件类型(add/update/delete/sync)来区分不同的方法,但最终都是执行queueActionLocked

// queueActionLocked appends to the delta list for the object.
// Caller must lock first.
func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {
	id, err := f.KeyOf(obj)
	if err != nil {
		return KeyError{obj, err}
	}

	// If object is supposed to be deleted (last event is Deleted),
	// then we should ignore Sync events, because it would result in
	// recreation of this object.
	if actionType == Sync && f.willObjectBeDeletedLocked(id) {
		return nil
	}

	newDeltas := append(f.items[id], Delta{actionType, obj})
	newDeltas = dedupDeltas(newDeltas)

	if len(newDeltas) > 0 {
		if _, exists := f.items[id]; !exists {
			f.queue = append(f.queue, id)
		}
		f.items[id] = newDeltas
		f.cond.Broadcast()
	} else {
		// We need to remove this from our map (extra items in the queue are
		// ignored if they are not in the map).
		delete(f.items, id)
	}
	return nil
}

 

queueActionLocked方法的第一个参数 actionType 是事件类型。

const (
	Added   DeltaType = "Added"   // watch api 获得的创建事件
	Updated DeltaType = "Updated" // watch api 获得的更新事件
	Deleted DeltaType = "Deleted" // watch api 获得的删除事件
	Sync DeltaType = "Sync"       // 触发了 List Api,需要刷新缓存
)

 

事件类型和入队方式表明这是一个具有业务功能的队列,而不仅仅是“先进先出”,入队方式有两个非常巧妙的设计。

  1. 队列中的事件会先判断资源是否有未消费的事件,然后进行适当的处​​理。
  2. 如果 list 方法发现资源已经被删除,则不处理。

第二点比较容易理解,如果触发了列表请求,发现要处理的资源已经被删除了,那么就不需要再排队了。第一点需要和out of queue方法一起看。

func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
	f.lock.Lock()
	defer f.lock.Unlock()
	for {
		for len(f.queue) == 0 {
			// When the queue is empty, invocation of Pop() is blocked until new item is enqueued.
			// When Close() is called, the f.closed is set and the condition is broadcasted.
			// Which causes this loop to continue and return from the Pop().
			if f.IsClosed() {
				return nil, ErrFIFOClosed
			}

			f.cond.Wait()
		}
		id := f.queue[0]
		f.queue = f.queue[1:]
		if f.initialPopulationCount > 0 {
			f.initialPopulationCount--
		}
		item, ok := f.items[id]
		if !ok {
			// Item may have been deleted subsequently.
			continue
		}
		delete(f.items, id)
		err := process(item)
		if e, ok := err.(ErrRequeue); ok {
			f.addIfNotPresent(id, item)
			err = e.Err
		}
		// Don't need to copyDeltas here, because we're transferring
		// ownership to the caller.
		return item, err
	}
}

 

DeltaFIFO 的Pop方法有一个输入,即处理函数。当它从队列中出来时,DeltaFIFO会首先根据资源id获取资源所有的事件,然后交给handler函数。

工作流程如图所示。

一般来说,DeltaFIFO的queue方法首先判断资源是否已经在items,如果是,则资源还没有被消费(仍然在排队),所以直接将事件追加到items[resource_id]。如果不在 中items,则items[resource_id]创建 then 并将资源 id 附加到queue.

DeltaFIFO out-of-queue 方法从 获取队列顶部的资源 id queue,然后从 获取该资源的所有事件items,最后调用该方法PopProcessFunc传入的类型处理程序Pop

所以,DeltaFIFO 的特点是队列中的(资源的)事件,当它从队列中出来时,它获取队列中第一个资源的所有事件。这种设计确保不会因为某个资源疯狂地创建事件而导致饥饿,从而使其他资源没有机会被处理。

 

 

控制器 controller

DeltaFIFO 是一个非常重要的组件,唯一真正使它有价值的是 Informer controller

虽然 K8s 源代码确实使用了这个词controller,但这controller不是像部署控制器那样的资源控制器。相反,它是一个自上而下的事件控制器(从 API 服务器获取事件并将它们发送到 Informer 进行处理)。

职责controller是双重的。

  1. 通过 List-Watch 从 Api Server 获取事件并将事件推送到 DeltaFIFO
  2. HandleDeltas以 的方法sharedIndexInformer作为参数调用 DeltaFIFO 的 Pop 方法

定义controller很简单,其核心是Reflector

type controller struct {
	config         Config
	reflector      *Reflector
	reflectorMutex sync.RWMutex
	clock          clock.Clock
}

 

controllerr的代码Reflector比较繁琐但是很简单,就是通过listerWatcher sharedIndexInformer中定义的list-watch,将获取到的事件推送到DeltaFIFO中。

控制器启动后,启动Reflector,然后执行processLoop,这是一个死循环,不断从DeltaFIFO中读取资源事件,并交给sharedIndexInformer(分配给config.Process)的HandleDeltas方法。

func (c *controller) processLoop() {
	for {
		obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
		if err != nil {
			if err == ErrFIFOClosed {
				return
			}
			if c.config.RetryOnError {
				// This is the safe way to re-enqueue.
				c.config.Queue.AddIfNotPresent(obj)
			}
		}
	}
}

 

如果我们看一下 sharedIndexInformer 的 HandleDeltas 方法,我们可以看到整个事件消费过程是有效的。

func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
	s.blockDeltas.Lock()
	defer s.blockDeltas.Unlock()

	// from oldest to newest
	for _, d := range obj.(Deltas) {
		switch d.Type {
		case Sync, Added, Updated:
			isSync := d.Type == Sync
			s.cacheMutationDetector.AddObject(d.Object)
			if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
				if err := s.indexer.Update(d.Object); err != nil {
					return err
				}
				s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
			} else {
				if err := s.indexer.Add(d.Object); err != nil {
					return err
				}
				s.processor.distribute(addNotification{newObj: d.Object}, isSync)
			}
		case Deleted:
			if err := s.indexer.Delete(d.Object); err != nil {
				return err
			}
			s.processor.distribute(deleteNotification{oldObj: d.Object}, false)
		}
	}
	return nil
}

 

前面我们了解到,该processor.attribute方法将事件分发给 all listeners,并controller使用Reflector来从 ApiServer 中获取事件并放入队列中,然后通过processLoop为要处理的资源从队列中取出事件,最后调用processor.attributevia的HandleDeltas方法sharedIndexInformer。所有事件,最后processor.attribute是通过 的HandleDeltas方法调用的sharedIndexInformer

因此,我们可以按如下方式组织整个事件流。

 

Indexer

上面,我们整理了从事件接收到分发的​​所有逻辑,但是在sharedIndexInformer的HandleDeltas方法中,有一些逻辑比较有意思,就是所有的事件都是s.indexer先更新再分发。

前文提到,Indexer是一个线程安全的存储,作为缓存来缓解资源控制器(Controller)查询资源时对ApiServer的压力。

当有事件更新时,会先刷新Indexer中的缓存,然后将事件分发给资源控制器,资源控制器会先从Indexer获取资源详情,从而减少对APIServer的不必要查询请求。

Indexer存储的具体实现在client-go/tools/cache/thread_safe_store.go中,数据存储在threadSafeMap.

type threadSafeMap struct {
	lock  sync.RWMutex
	items map[string]interface{}

	// indexers maps a name to an IndexFunc
	indexers Indexers
	// indices maps a name to an Index
	indices Indices
}

 

本质上,threadSafeMap是一个带有读/写锁的映射,除此之外还可以定义索引,有趣的是由两个字段实现。

  1. Indexers是一个定义了多个索引函数的map,key是indexName,value是索引函数(计算资源的索引值)。
  2. Indices保存索引值和数据key的映射关系,Indices是一个两级的map,第一级的key是indexName,对应Indexers并决定用什么方法计算索引值,value是一个保存关联的map “索引值-资源键”关联。

相关逻辑比较简单,如下图所示。

MutationDetector

更新数据的HandleDeltas方法除了.sharedIndexInformers.indexers.cacheMutationDetector

开头提到,在sharedIndexInformer启动的时候,也会启动一个cacheMutationDetector来监控索引器的缓存。

因为 indexer 缓存实际上是一个指针,所以多个 Controller 访问 indexer 的缓存资源实际上得到的是同一个资源实例。如果一个Controller玩不好,修改了一个资源的属性,肯定会影响其他Controller的正确性。

当 Informer 接收到新事件时,MutationDetector 会保存指向资源的指针(索引器也是如此)和资源的深层副本。通过周期性地检查指针指向的资源是否与开头存储的深拷贝相匹配,我们就知道缓存的资源是否被修改过。

但是,是否启用监控会受到环境变量的影响KUBE_CACHE_MUTATION_DETECTOR。如果未设置此环境变量,sharedIndexInformer 将实例化dummyMutationDetector并且在启动后不执行任何操作。

如果KUBE_CACHE_MUTATION_DETECTORtrue,sharedIndexInformer 实例化defaultCacheMutationDetector,它以 1s 的间隔执行缓存的定期检查,如果发现缓存被修改,则触发故障处理函数,如果未定义该函数,则触发恐慌。

 

 

概括

本文对ControllerManager进行了狭义的解释,因为它不包括具体的资源管理器(Controller),而只是解释了ControllerManager是如何“管理控制器”的。

可以看到ControllerManager做了很多工作来保证Controller可以只关注自己关心的事件,而这项工作的核心就是Informer。当您了解 Informer 如何与其他组件一起工作时,就会清楚控制器管理器为资源管理器铺平了道路。

 

 

发表评论