k8s排水命令源码阅读

kubectl

K8s 使用 cobra 作为命令行构建器(我觉得 cobra 不是很有用,文档也不清楚。)实际的处理逻辑pkg/kubectl/cmd/cmd.gocmd/kubectl/kubectl.go

   ...
groups := templates.CommandGroups{
	{
		Message: "Basic Commands (Beginner):",
		...
	},
	{
		Message: "Deploy Commands:",
		...
	},
	{
		Message: "Cluster Management Commands:",
		Commands: []*cobra.Command{
			certificates.NewCmdCertificate(f, ioStreams),
			clusterinfo.NewCmdClusterInfo(f, ioStreams),
			top.NewCmdTop(f, ioStreams),
			drain.NewCmdCordon(f, ioStreams),
			drain.NewCmdUncordon(f, ioStreams),
			drain.NewCmdDrain(f, ioStreams),
			taint.NewCmdTaint(f, ioStreams),
		},
	},
   ...
}
   groups.Add(cmds)

正如您在kubectl所有子命令的入口点中看到的那样,drain我们今天要查看的命令都是集群管理命令并且包含。

  • cordon
  • uncordon
  • drain

 

cordon

让我们从命令开始,该cordon命令的目的是将一个节点标记为不可调度,以防止 K8s 在进行节点维护时将资源调度到该节点。

func NewCmdCordon(f cmdutil.Factory, ioStreams genericclioptions.IOStreams) *cobra.Command {
	o := NewDrainCmdOptions(f, ioStreams)

	cmd := &cobra.Command{
		Use:                   "cordon NODE",
		DisableFlagsInUseLine: true,
		Short:                 i18n.T("Mark node as unschedulable"),
		Long:                  cordonLong,
		Example:               cordonExample,
		Run: func(cmd *cobra.Command, args []string) {
			cmdutil.CheckErr(o.Complete(f, cmd, args))
			cmdutil.CheckErr(o.RunCordonOrUncordon(true))
		},
	}
	cmd.Flags().StringVarP(&o.drainer.Selector, "selector", "l", o.drainer.Selector, "Selector (label query) to filter on")
	cmdutil.AddDryRunFlag(cmd)
	return cmd
}

直接Run看. cmdutil.CheckErro.Completeo.RunCordonOrUncordon的根本目的kubectl是向 APIServer 发送相应的 HTTP 请求,并通过 和实现kubectl一层封装,使各个子命令的实现统一简洁。一致和简洁。BuilderVisitor

 

// Builder provides convenience functions for taking arguments and parameters
// from the command line and converting them to a list of resources to iterate
// over using the Visitor interface.
type Builder struct {
    ...
}
// Visitor lets clients walk a list of resources.
type Visitor interface {
	Visit(VisitorFunc) error
}

 

对应builder的构造在o.Complete

...
    // 根据命令行参数构建 builder 实例
	builder := f.NewBuilder().
		WithScheme(scheme.Scheme, scheme.Scheme.PrioritizedVersionsAllGroups()...).
		NamespaceParam(o.Namespace).DefaultNamespace().
		ResourceNames("nodes", args...).
		SingleResourceType().
		Flatten()

	if len(o.drainer.Selector) > 0 {
		builder = builder.LabelSelectorParam(o.drainer.Selector).
			ResourceTypes("nodes")
	}
    // builder.Do 返回带有 Visitor 的 Result 对象
    r := builder.Do()

 

查看builder.Do()下一步返回 Result 类型资源的操作。

func (b *Builder) Do() *Result {
    // 调用 visitorResult 返回 Result 类型
	r := b.visitorResult()
    ...
	return r
}
...
func (b *Builder) visitorResult() *Result {
    ...
	// 跳过其他步骤,直接看最简单的通过 Name 来获取 Result
	if len(b.names) != 0 {
		return b.visitByName()
    }
    ...
}
...
func (b *Builder) visitByName() *Result {
    // 声明 Result 对象
	result := &Result{
		singleItemImplied:  len(b.names) == 1,
		targetsSingleItems: true,
    }
    ...
    // 获取 K8s client
	client, err := b.getClient(mapping.GroupVersionKind.GroupVersion())
	...
	visitors := []Visitor{}
	for _, name := range b.names {
		info := &Info{
			Client:    client,
			Mapping:   mapping,
			Namespace: selectorNamespace,
			Name:      name,
			Export:    b.export,
		}
		visitors = append(visitors, info)
    }
    // VisitorList 也实现了 Visit 接口,遍历执行 Visitor 的 Visit 方法
	result.visitor = VisitorList(visitors)
	result.sources = visitors
	return result
}

 

看完了如何获取Result类型的对象,我们来看看如何o.Complete处理,传入一个VisitorFunc,Result的访问者都实现了Visit接口,接口的作用Visit就是接收VisitorFunc并执行。接口的作用Visit是接收VisitorFunc并执行它。

return r.Visit(func(info *resource.Info, err error) error {
		...
    })
...
func (v DecoratedVisitor) Visit(fn VisitorFunc) error {
	return v.visitor.Visit(func(info *Info, err error) error {
        ...
		for i := range v.decorators {
			if err := v.decorators[i](info, nil); err != nil {
				return err
			}
		}
		return fn(info, nil)
	})
}

 

接下来,看看有什么o.RunCordonOrUncordon作用。

func (o *DrainCmdOptions) RunCordonOrUncordon(desired bool) error {
	cordonOrUncordon := "cordon"
	if !desired {
		cordonOrUncordon = "un" + cordonOrUncordon
	}
    // 通过 Visit 获取到的 nodeInfos 列表
	for _, nodeInfo := range o.nodeInfos {
        ...
		gvk := nodeInfo.ResourceMapping().GroupVersionKind
		if gvk.Kind == "Node" {
			c, err := drain.NewCordonHelperFromRuntimeObject(nodeInfo.Object, scheme.Scheme, gvk)
			if updateRequired := c.UpdateIfRequired(desired); !updateRequired {
				...
			} else {
				if o.drainer.DryRunStrategy != cmdutil.DryRunClient {
                    ...
                    // 修改对应节点的配置
					err, patchErr := c.PatchOrReplace(o.drainer.Client, o.drainer.DryRunStrategy == cmdutil.DryRunServer)
					...
				}
			}
        }
        ...
	}
	return nil
}
...
func (c *CordonHelper) PatchOrReplace(clientset kubernetes.Interface, serverDryRun bool) (error, error) {
	client := clientset.CoreV1().Nodes()
    oldData, err := json.Marshal(c.node)
    // 更新 node Spec 的 Unschedulable 字段
	c.node.Spec.Unschedulable = c.desired
	newData, err := json.Marshal(c.node)
    // merge 数据,通过 diff 然后获取
	patchBytes, patchErr := strategicpatch.CreateTwoWayMergePatch(oldData, newData, c.node)
	if patchErr == nil {
		...
		_, err = client.Patch(context.TODO(), c.node.Name, types.StrategicMergePatchType, patchBytes, patchOptions)
	} 
	...
}

 

Drain

在Cordon之后,去Drain。

func NewCmdDrain(f cmdutil.Factory, ioStreams genericclioptions.IOStreams) *cobra.Command {
    ...
	cmd := &cobra.Command{
		...
		Run: func(cmd *cobra.Command, args []string) {
			cmdutil.CheckErr(o.Complete(f, cmd, args))
			cmdutil.CheckErr(o.RunDrain())
		},
    }
    ...

直接o.RunDrain看,我们首先看到的是执行o.RunCordonOrUncordon,将节点标记为不可调度,所以我之前写的博客其实是不正确的,如果要让节点下线,那就执行kubectl drain

 

func (o *DrainCmdOptions) RunDrain() error {
    if err := o.RunCordonOrUncordon(true); err != nil {
		return err
	}
    ...
	drainedNodes := sets.NewString()
	var fatal error
	for _, info := range o.nodeInfos {
        // 驱逐 Pod
		if err := o.deleteOrEvictPodsSimple(info); err == nil {
			drainedNodes.Insert(info.Name)
			printObj(info.Object, o.Out)
		} else {
			// 如果驱逐 Pod 失败,则显示对应的 Node 信息
			if len(remainingNodes) > 0 {
				fmt.Fprintf(o.ErrOut, "There are pending nodes to be drained:\n")
				for _, nodeName := range remainingNodes {
					fmt.Fprintf(o.ErrOut, " %s\n", nodeName)
				}
			}
			break
		}
	}
}

deleteOrEvictPodsSimple中,首先通过 Node name 获取对应的 Pod 信息,然后执行 eviction 动作。

func (o *DrainCmdOptions) deleteOrEvictPodsSimple(nodeInfo *resource.Info) error {
	list, errs := o.drainer.GetPodsForDeletion(nodeInfo.Name)
    ...
	if err := o.drainer.DeleteOrEvictPods(list.Pods()); err != nil {
        ...
	}
}

 

这里GetPodsForDeletion执行一个过滤器,其中包含以下场景的过滤器,需要注意的是,这里的过滤场景是有严格顺序的。

func (d *Helper) makeFilters() []podFilter {
	return []podFilter{
        // 被标记删除的 Pod(DeletionTimestamp 不为0)
        d.skipDeletedFilter,
        // 属于 DaemonSet 的 Pod
        d.daemonSetFilter,
        // mirror pod 其实就是 static pod,
        // 是我们在 /etc/kubernetes/manifests/ 中定义的由 kubelet 负责生命周期管理的 Pod
        // 在 `Annotations` 中会包含 `kubernetes.io/config.mirror` 
        d.mirrorPodFilter,
        // 包含本地存储的 Pod,Pod 中的 Volume 字段不为空
        d.localStorageFilter,
        // 不属于 replicate 的 pod,`Controlled By` 不为空的 pod
		d.unreplicatedFilter,
	}
}

 

一旦获得过滤后的 Pod 列表,就会执行驱逐操作,从每个 Pod 的 goroutine 开始,并等待 Pod 驱逐完成。

func (d *Helper) evictPods(pods []corev1.Pod, policyGroupVersion string, getPodFn func(namespace, name string) (*corev1.Pod, error)) error {
	returnCh := make(chan error, 1)
    ...
	ctx, cancel := context.WithTimeout(d.getContext(), globalTimeout)
	defer cancel()
	for _, pod := range pods {
		go func(pod corev1.Pod, returnCh chan error) {
			for {
				...
				select {
				case <-ctx.Done():
					// 驱逐超时
					returnCh <- fmt.Errorf("error when evicting pod %q: global timeout reached: %v", pod.Name, globalTimeout)
					return
				default:
                }
                // 驱逐 Pod 动作,最终执行 d.Client.PolicyV1beta1().Evictions(eviction.Namespace).Evict(eviction)
				err := d.EvictPod(pod, policyGroupVersion)
				...
			}
			...
			params := waitForDeleteParams{
				...
            }
            // 等待驱逐动作完成
			_, err := waitForDelete(params)
			if err == nil {
				returnCh <- nil
			} else {
				returnCh <- fmt.Errorf("error when waiting for pod %q terminating: %v", pod.Name, err)
			}
		}(pod, returnCh)
    }

waitForDelete如果没有立即完成,将ConditionFunc进入循环,其中检测到 Pod 存在并且 ObjectMeta UID 已更改。WaitForConditionFunc

func WaitFor(wait WaitFunc, fn ConditionFunc, done <-chan struct{}) error {
	stopCh := make(chan struct{})
	defer close(stopCh)
	c := wait(stopCh)
	for {
		select {
		case _, open := <-c:
			ok, err := runConditionWithCrashProtection(fn)
			if err != nil {
				return err
			}
			if ok {
				return nil
			}
			if !open {
				return ErrWaitTimeout
			}
		case <-done:
			return ErrWaitTimeout
		}
	}
}

 

 

概括

kubectl drain命令的实现非常简单,没有特别复杂的逻辑。K8s 能够做到这一点的一个重要原因是所有的动作都是声明性的,不需要在声明后等待执行完成后主动做脏事。在 Pod 驱逐的情况下,并非所有 Pod 都会被驱逐到其他节点,因此需要特别注意在节点下线之前检查是否有简单的 Pod 资源仍在运行,或者是否有任何 Pod 使用本地存储,或类似的。

我已经看到这种设计模式的组合有一段时间了,我正在寻找机会重新学习它们。

发表评论