k8s排水命令源码阅读
kubectl
K8s 使用 cobra 作为命令行构建器(我觉得 cobra 不是很有用,文档也不清楚。)实际的处理逻辑pkg/kubectl/cmd/cmd.go
在cmd/kubectl/kubectl.go
... groups := templates.CommandGroups{ { Message: "Basic Commands (Beginner):", ... }, { Message: "Deploy Commands:", ... }, { Message: "Cluster Management Commands:", Commands: []*cobra.Command{ certificates.NewCmdCertificate(f, ioStreams), clusterinfo.NewCmdClusterInfo(f, ioStreams), top.NewCmdTop(f, ioStreams), drain.NewCmdCordon(f, ioStreams), drain.NewCmdUncordon(f, ioStreams), drain.NewCmdDrain(f, ioStreams), taint.NewCmdTaint(f, ioStreams), }, }, ... } groups.Add(cmds)
正如您在kubectl
所有子命令的入口点中看到的那样,drain
我们今天要查看的命令都是集群管理命令并且包含。
- cordon
- uncordon
- drain
cordon
让我们从命令开始,该cordon
命令的目的是将一个节点标记为不可调度,以防止 K8s 在进行节点维护时将资源调度到该节点。
func NewCmdCordon(f cmdutil.Factory, ioStreams genericclioptions.IOStreams) *cobra.Command { o := NewDrainCmdOptions(f, ioStreams) cmd := &cobra.Command{ Use: "cordon NODE", DisableFlagsInUseLine: true, Short: i18n.T("Mark node as unschedulable"), Long: cordonLong, Example: cordonExample, Run: func(cmd *cobra.Command, args []string) { cmdutil.CheckErr(o.Complete(f, cmd, args)) cmdutil.CheckErr(o.RunCordonOrUncordon(true)) }, } cmd.Flags().StringVarP(&o.drainer.Selector, "selector", "l", o.drainer.Selector, "Selector (label query) to filter on") cmdutil.AddDryRunFlag(cmd) return cmd }
直接Run
看. cmdutil.CheckErr
_ o.Complete
_ o.RunCordonOrUncordon
的根本目的kubectl
是向 APIServer 发送相应的 HTTP 请求,并通过 和实现kubectl
一层封装,使各个子命令的实现统一简洁。一致和简洁。Builder
Visitor
// Builder provides convenience functions for taking arguments and parameters // from the command line and converting them to a list of resources to iterate // over using the Visitor interface. type Builder struct { ... } // Visitor lets clients walk a list of resources. type Visitor interface { Visit(VisitorFunc) error }
对应builder
的构造在o.Complete
:
... // 根据命令行参数构建 builder 实例 builder := f.NewBuilder(). WithScheme(scheme.Scheme, scheme.Scheme.PrioritizedVersionsAllGroups()...). NamespaceParam(o.Namespace).DefaultNamespace(). ResourceNames("nodes", args...). SingleResourceType(). Flatten() if len(o.drainer.Selector) > 0 { builder = builder.LabelSelectorParam(o.drainer.Selector). ResourceTypes("nodes") } // builder.Do 返回带有 Visitor 的 Result 对象 r := builder.Do()
查看builder.Do()
下一步返回 Result 类型资源的操作。
func (b *Builder) Do() *Result { // 调用 visitorResult 返回 Result 类型 r := b.visitorResult() ... return r } ... func (b *Builder) visitorResult() *Result { ... // 跳过其他步骤,直接看最简单的通过 Name 来获取 Result if len(b.names) != 0 { return b.visitByName() } ... } ... func (b *Builder) visitByName() *Result { // 声明 Result 对象 result := &Result{ singleItemImplied: len(b.names) == 1, targetsSingleItems: true, } ... // 获取 K8s client client, err := b.getClient(mapping.GroupVersionKind.GroupVersion()) ... visitors := []Visitor{} for _, name := range b.names { info := &Info{ Client: client, Mapping: mapping, Namespace: selectorNamespace, Name: name, Export: b.export, } visitors = append(visitors, info) } // VisitorList 也实现了 Visit 接口,遍历执行 Visitor 的 Visit 方法 result.visitor = VisitorList(visitors) result.sources = visitors return result }
看完了如何获取Result类型的对象,我们来看看如何o.Complete
处理,传入一个VisitorFunc,Result的访问者都实现了Visit
接口,接口的作用Visit
就是接收VisitorFunc
并执行。接口的作用Visit
是接收VisitorFunc
并执行它。
return r.Visit(func(info *resource.Info, err error) error { ... }) ... func (v DecoratedVisitor) Visit(fn VisitorFunc) error { return v.visitor.Visit(func(info *Info, err error) error { ... for i := range v.decorators { if err := v.decorators[i](info, nil); err != nil { return err } } return fn(info, nil) }) }
接下来,看看有什么o.RunCordonOrUncordon
作用。
func (o *DrainCmdOptions) RunCordonOrUncordon(desired bool) error { cordonOrUncordon := "cordon" if !desired { cordonOrUncordon = "un" + cordonOrUncordon } // 通过 Visit 获取到的 nodeInfos 列表 for _, nodeInfo := range o.nodeInfos { ... gvk := nodeInfo.ResourceMapping().GroupVersionKind if gvk.Kind == "Node" { c, err := drain.NewCordonHelperFromRuntimeObject(nodeInfo.Object, scheme.Scheme, gvk) if updateRequired := c.UpdateIfRequired(desired); !updateRequired { ... } else { if o.drainer.DryRunStrategy != cmdutil.DryRunClient { ... // 修改对应节点的配置 err, patchErr := c.PatchOrReplace(o.drainer.Client, o.drainer.DryRunStrategy == cmdutil.DryRunServer) ... } } } ... } return nil } ... func (c *CordonHelper) PatchOrReplace(clientset kubernetes.Interface, serverDryRun bool) (error, error) { client := clientset.CoreV1().Nodes() oldData, err := json.Marshal(c.node) // 更新 node Spec 的 Unschedulable 字段 c.node.Spec.Unschedulable = c.desired newData, err := json.Marshal(c.node) // merge 数据,通过 diff 然后获取 patchBytes, patchErr := strategicpatch.CreateTwoWayMergePatch(oldData, newData, c.node) if patchErr == nil { ... _, err = client.Patch(context.TODO(), c.node.Name, types.StrategicMergePatchType, patchBytes, patchOptions) } ... }
Drain
在Cordon之后,去Drain。
func NewCmdDrain(f cmdutil.Factory, ioStreams genericclioptions.IOStreams) *cobra.Command { ... cmd := &cobra.Command{ ... Run: func(cmd *cobra.Command, args []string) { cmdutil.CheckErr(o.Complete(f, cmd, args)) cmdutil.CheckErr(o.RunDrain()) }, } ...
直接o.RunDrain
看,我们首先看到的是执行o.RunCordonOrUncordon
,将节点标记为不可调度,所以我之前写的博客其实是不正确的,如果要让节点下线,那就执行kubectl drain
。
func (o *DrainCmdOptions) RunDrain() error { if err := o.RunCordonOrUncordon(true); err != nil { return err } ... drainedNodes := sets.NewString() var fatal error for _, info := range o.nodeInfos { // 驱逐 Pod if err := o.deleteOrEvictPodsSimple(info); err == nil { drainedNodes.Insert(info.Name) printObj(info.Object, o.Out) } else { // 如果驱逐 Pod 失败,则显示对应的 Node 信息 if len(remainingNodes) > 0 { fmt.Fprintf(o.ErrOut, "There are pending nodes to be drained:\n") for _, nodeName := range remainingNodes { fmt.Fprintf(o.ErrOut, " %s\n", nodeName) } } break } } }
在deleteOrEvictPodsSimple
中,首先通过 Node name 获取对应的 Pod 信息,然后执行 eviction 动作。
func (o *DrainCmdOptions) deleteOrEvictPodsSimple(nodeInfo *resource.Info) error { list, errs := o.drainer.GetPodsForDeletion(nodeInfo.Name) ... if err := o.drainer.DeleteOrEvictPods(list.Pods()); err != nil { ... } }
这里GetPodsForDeletion
执行一个过滤器,其中包含以下场景的过滤器,需要注意的是,这里的过滤场景是有严格顺序的。
func (d *Helper) makeFilters() []podFilter { return []podFilter{ // 被标记删除的 Pod(DeletionTimestamp 不为0) d.skipDeletedFilter, // 属于 DaemonSet 的 Pod d.daemonSetFilter, // mirror pod 其实就是 static pod, // 是我们在 /etc/kubernetes/manifests/ 中定义的由 kubelet 负责生命周期管理的 Pod // 在 `Annotations` 中会包含 `kubernetes.io/config.mirror` d.mirrorPodFilter, // 包含本地存储的 Pod,Pod 中的 Volume 字段不为空 d.localStorageFilter, // 不属于 replicate 的 pod,`Controlled By` 不为空的 pod d.unreplicatedFilter, } }
一旦获得过滤后的 Pod 列表,就会执行驱逐操作,从每个 Pod 的 goroutine 开始,并等待 Pod 驱逐完成。
func (d *Helper) evictPods(pods []corev1.Pod, policyGroupVersion string, getPodFn func(namespace, name string) (*corev1.Pod, error)) error { returnCh := make(chan error, 1) ... ctx, cancel := context.WithTimeout(d.getContext(), globalTimeout) defer cancel() for _, pod := range pods { go func(pod corev1.Pod, returnCh chan error) { for { ... select { case <-ctx.Done(): // 驱逐超时 returnCh <- fmt.Errorf("error when evicting pod %q: global timeout reached: %v", pod.Name, globalTimeout) return default: } // 驱逐 Pod 动作,最终执行 d.Client.PolicyV1beta1().Evictions(eviction.Namespace).Evict(eviction) err := d.EvictPod(pod, policyGroupVersion) ... } ... params := waitForDeleteParams{ ... } // 等待驱逐动作完成 _, err := waitForDelete(params) if err == nil { returnCh <- nil } else { returnCh <- fmt.Errorf("error when waiting for pod %q terminating: %v", pod.Name, err) } }(pod, returnCh) }
waitForDelete
如果没有立即完成,将ConditionFunc
进入循环,其中检测到 Pod 存在并且 ObjectMeta UID 已更改。WaitFor
ConditionFunc
func WaitFor(wait WaitFunc, fn ConditionFunc, done <-chan struct{}) error { stopCh := make(chan struct{}) defer close(stopCh) c := wait(stopCh) for { select { case _, open := <-c: ok, err := runConditionWithCrashProtection(fn) if err != nil { return err } if ok { return nil } if !open { return ErrWaitTimeout } case <-done: return ErrWaitTimeout } } }
概括
该kubectl drain
命令的实现非常简单,没有特别复杂的逻辑。K8s 能够做到这一点的一个重要原因是所有的动作都是声明性的,不需要在声明后等待执行完成后主动做脏事。在 Pod 驱逐的情况下,并非所有 Pod 都会被驱逐到其他节点,因此需要特别注意在节点下线之前检查是否有简单的 Pod 资源仍在运行,或者是否有任何 Pod 使用本地存储,或类似的。
我已经看到这种设计模式的组合有一段时间了,我正在寻找机会重新学习它们。