CSI如何运作的简要分析
最近一直在做CSI相关的工作,在开发的过程中,我觉得CSI的细节是相当繁琐的。通过整理CSI工作流程,可以加深对CSI的理解,与大家分享我对CSI的认识。
我将通过两篇文章介绍 CSI,第一篇将重点介绍 CSI 的基本组件和工作原理,基于 Kubernetes 作为 CSI 的 CO(Container Orchestration Systems)。第二篇将选取几个典型的CSI项目,分析具体实现。
CSI的基本组成部分
CSI 云提供商有两种类型,一种是树内类型,一种是树外类型。前者是运行在k8s核心组件内部的存储插件;后者是一个独立于 k8s 组件运行的存储插件。本文重点介绍树外类型的插件。
out-of-tree 类型的插件通过 gRPC 接口与 k8s 组件交互,k8s 提供了多个 SideCar 组件与 CSI 插件配合实现丰富的功能。对于out-of-tree插件,使用的组件分为SideCar组件和需要第三方实现的插件。
SideCar components
external-attacher
侦听 VolumeAttachment 对象并调用 CSI 驱动程序控制器服务的ControllerPublishVolume
和ControllerUnpublishVolume
接口以将卷附加到节点或从节点中删除卷。
如果存储系统需要附加/分离步骤,则需要此组件,因为 K8s 内部附加/分离控制器不直接调用 CSI 驱动程序接口。
external-provisioner
前提是PVC中指定的StorageClass的provisioner字段与CSI驱动Controller服务的CreateVolume
和接口的返回值相同。身份服务DeleteVolume
的接口返回相同的值。GetPluginInfo
一旦配置了新卷,K8s 将创建相应的 PV。
如果PVC绑定的PV的回收策略是delete,则external-provisioner组件监听PVC的删除,并调用DeleteVolume
CSI驱动Controller服务的接口。一旦卷删除成功,组件也会删除对应的 PV。
该组件还支持从快照创建数据源。如果 PVC 中指定了 Snapshot CRD 数据源,则组件SnapshotContent
在调用CreateVolume
接口时会通过对象获取快照的相关信息并将该内容传递给 CSI 驱动程序,CSI 驱动程序需要根据数据创建卷源快照。
external-resizer
监听 PVC 对象,如果用户在 PVC 对象上请求更多存储,组件调用NodeExpandVolume
CSI 驱动控制器服务的接口,用于扩展卷。
external-snapshotter
该组件与 Snapshot Controller 结合使用,后者根据集群中创建的 Snapshot 对象创建相应的 VolumeSnapshotContent,以及监听 VolumeSnapshotContent 对象的 external-snapshotter。当监听到 VolumeSnapshotContent 时,将相应的参数传递给 CSI 驱动 Controller 服务通过CreateSnapshotRequest
调用其CreateSnapshot
接口。该组件还负责调用DeleteSnapshot
和ListSnapshots
接口。
livenessprobe
负责监控 CSI driver 的健康状况并通过 Liveness Probe 机制上报给 k8s,并在 CSI driver 检测到异常时重启 pod。
node-driver-registrar
通过直接调用NodeGetInfo
CSI驱动Node服务的接口,通过kubelet的插件注册机制,将CSI驱动信息注册到对应节点的kubelet上。
external-health-monitor-controller
在 PVC 事件中,通过调用CSI 驱动控制器服务的ListVolumes
或接口来检查和报告 CSI 卷的健康状况。ControllerGetVolume
external-health-monitor-agent
通过调用NodeGetVolumeStats
CSI driver Node 服务的接口检查 CSI 卷的健康状况,并在 pod 的事件中上报。
第三方插件
第三方存储提供者(即SP,Storage Provider)需要实现两个插件,Controller负责卷管理,部署为StatefulSet,Node负责将卷挂载到pod中,是在每个节点中部署为 DaemonSet。
CSI 插件通过 Unix Domani Socket gRPC 与 kubelet 和 k8s 外部组件交互,gRPC 定义了 SP 需要实现的三组 RPC 接口,以便与 k8s 外部组件进行通信。三组接口分别是CSI Identity、CSI Controller、CSI Node,下面详细定义。
CSI 身份
CSI Identity用于提供CSI驱动的身份信息,需要Controller和Node共同实现。接口如下。
service Identity { rpc GetPluginInfo(GetPluginInfoRequest) returns (GetPluginInfoResponse) {} rpc GetPluginCapabilities(GetPluginCapabilitiesRequest) returns (GetPluginCapabilitiesResponse) {} rpc Probe (ProbeRequest) returns (ProbeResponse) {} }
GetPluginInfo
是强制的,node-driver-registrar组件会调用该接口将CSI驱动注册到kubelet;GetPluginCapabilities
用于表示CSI驱动主要提供哪些功能。
CSI 控制器
用于创建/删除卷、附加/分离卷、卷快照、卷扩展/收缩等。Controller插件需要实现这组接口。接口如下。
service Controller { rpc CreateVolume (CreateVolumeRequest) returns (CreateVolumeResponse) {} rpc DeleteVolume (DeleteVolumeRequest) returns (DeleteVolumeResponse) {} rpc ControllerPublishVolume (ControllerPublishVolumeRequest) returns (ControllerPublishVolumeResponse) {} rpc ControllerUnpublishVolume (ControllerUnpublishVolumeRequest) returns (ControllerUnpublishVolumeResponse) {} rpc ValidateVolumeCapabilities (ValidateVolumeCapabilitiesRequest) returns (ValidateVolumeCapabilitiesResponse) {} rpc ListVolumes (ListVolumesRequest) returns (ListVolumesResponse) {} rpc GetCapacity (GetCapacityRequest) returns (GetCapacityResponse) {} rpc ControllerGetCapabilities (ControllerGetCapabilitiesRequest) returns (ControllerGetCapabilitiesResponse) {} rpc CreateSnapshot (CreateSnapshotRequest) returns (CreateSnapshotResponse) {} rpc DeleteSnapshot (DeleteSnapshotRequest) returns (DeleteSnapshotResponse) {} rpc ListSnapshots (ListSnapshotsRequest) returns (ListSnapshotsResponse) {} rpc ControllerExpandVolume (ControllerExpandVolumeRequest) returns (ControllerExpandVolumeResponse) {} rpc ControllerGetVolume (ControllerGetVolumeRequest) returns (ControllerGetVolumeResponse) { option (alpha_method) = true; } }
上面在k8s外部组件的介绍中提到,不同的组件针对不同的功能提供不同的接口。例如,CreateVolume
/DeleteVolume
可以与 external-provisioner 一起使用来创建/删除卷,ControllerPublishVolume
/ControllerUnpublishVolume
可以与 external-attacher 一起使用来附加/分离卷等。
CSI 节点
Node插件需要实现这一套接口,用于挂载/卸载卷、检查卷状态等,接口如下。
service Node { rpc NodeStageVolume (NodeStageVolumeRequest) returns (NodeStageVolumeResponse) {} rpc NodeUnstageVolume (NodeUnstageVolumeRequest) returns (NodeUnstageVolumeResponse) {} rpc NodePublishVolume (NodePublishVolumeRequest) returns (NodePublishVolumeResponse) {} rpc NodeUnpublishVolume (NodeUnpublishVolumeRequest) returns (NodeUnpublishVolumeResponse) {} rpc NodeGetVolumeStats (NodeGetVolumeStatsRequest) returns (NodeGetVolumeStatsResponse) {} rpc NodeExpandVolume(NodeExpandVolumeRequest) returns (NodeExpandVolumeResponse) {} rpc NodeGetCapabilities (NodeGetCapabilitiesRequest) returns (NodeGetCapabilitiesResponse) {} rpc NodeGetInfo (NodeGetInfoRequest) returns (NodeGetInfoResponse) {} }
NodeStageVolume
用于实现多个pod共享一个volume的功能,支持先将volume挂载到临时目录,再通过 挂载到pod NodePublishVolume
;NodeUnstageVolume
相反。
工作流程
下面看一下 Pod 安装卷的整个工作流程。该过程分为三个阶段:Provision/Delete、Attach/Detach 和 Mount/Unmount,但并非每个存储解决方案都会经历这三个阶段,例如 NFS 没有 Attach/Detach 阶段。
整个过程不仅涉及到上述组件的工作,还涉及到ControllerManager和kubelet的AttachDetachController和PVController组件,下面分别对Provision、Attach、Mount阶段进行详细分析。
Provision
我们先看Provision阶段,整个过程如上图所示。extenal-provisioner 和 PVController 都监视 PVC 资源。
- 当 PVController 观察到集群中创建了一个 PVC 时,它会判断是否存在与之匹配的 in-tree 插件,如果没有,则判断其存储类型为 out-of-tree,因此将 PVC 标注为
volume.beta.kubernetes.io/storage- provisioner={csi driver name}
. CreateVolume
当 extenal-provisioner 观察到 PVC 的注释 csi 驱动程序与它自己的 csi 驱动程序一致时,调用 CSI 控制器的接口。- 当
CreateVolume
CSI Controller 的接口返回成功时,extenal-provisioner 在集群中创建对应的 PV。 - 当 PVController 观察到集群中创建了 PV 时,它会将 PV 绑定到 PVC。
Attach
Attach阶段是指将一个卷附加到一个节点上,整个过程如上图所示。
- ADController 在 pod 被分派到节点并且正在使用 CSI 类型的 PV 时进行侦听,并调用内部树内 CSI 插件接口,该接口在集群中创建 VolumeAttachment 资源。
- external-attacher 组件监视要创建的 VolumeAttachment 资源并调用 CSI 控制器的
ControllerPublishVolume
接口。 - 当
ControllerPublishVolume
成功调用 CSI Controller 的接口时,external-attacher 将相应 VolumeAttachment 对象的 Attached 状态设置为 true。 - 当 ADController 观察到 VolumeAttachment 对象的 Attached 状态为真时,它会更新 ADController ActualStateOfWorld 的内部状态。
Mount
将卷挂载到 Pod 的最后一步涉及到 kubelet,整个过程简单描述为对应节点上的 kubelet 在创建 Pod 的过程中调用 CSI Node 插件进行挂载操作。以下是 kubelet 内部组件的细分。
首先,在syncPod
kubelet 创建 pod 的 main 函数中,kubelet 调用WaitForAttachAndMount
其子组件 volumeManager 的方法,等待卷挂载完成。
func (kl *Kubelet) syncPod(o syncPodOptions) error { ... // Volume manager will not mount volumes for terminated pods if !kl.podIsTerminated(pod) { // Wait for volumes to attach/mount if err := kl.volumeManager.WaitForAttachAndMount(pod); err != nil { kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedMountVolume, "Unable to attach or mount volumes: %v", err) klog.Errorf("Unable to attach or mount volumes for pod %q: %v; skipping pod", format.Pod(pod), err) return err } } ... }
volumeManager 包含两个组件:desiredStateOfWorldPopulator 和 reconciler,它们共同完成 Pod 中卷的挂载和卸载过程。整个过程如下。
desiredStateOfWorldPopulator 和 reconciler 具有生产者-消费者模型。在volumeManager中维护了两个队列(技术上是一个接口,但在这里充当一个队列),分别是DesiredStateOfWorld和前者维护当前节点中volume的期望状态;后者维护当前节点中卷的实际状态。
desiredStateOfWorldPopulator 在自己的循环中只做了两件事,一是从 kubelet 的 podManager 中获取新创建的 Pod 并将其需要挂载的卷的信息记录到 DesiredStateOfWorld 中;另一件事是从当前节点的 podManager 中获取正在挂载的卷的信息到 DesiredStateOfWorld 中。另一件事是从当前节点的 podManager 中获取已删除的 pod,并检查它们的卷是否在 ActualStateOfWorld 记录中,如果没有,则将它们从 DesiredStateOfWorld 中删除,从而确保 DesiredStateOfWorld 记录所有卷的期望状态在节点中。这可确保 DesiredStateOfWorld 记录节点中所有卷的所需状态。相关代码如下(为了精简逻辑,去掉了部分代码)。
// Iterate through all pods and add to desired state of world if they don't // exist but should func (dswp *desiredStateOfWorldPopulator) findAndAddNewPods() { // Map unique pod name to outer volume name to MountedVolume. mountedVolumesForPod := make(map[volumetypes.UniquePodName]map[string]cache.MountedVolume) ... processedVolumesForFSResize := sets.NewString() for _, pod := range dswp.podManager.GetPods() { dswp.processPodVolumes(pod, mountedVolumesForPod, processedVolumesForFSResize) } } // processPodVolumes processes the volumes in the given pod and adds them to the // desired state of the world. func (dswp *desiredStateOfWorldPopulator) processPodVolumes( pod *v1.Pod, mountedVolumesForPod map[volumetypes.UniquePodName]map[string]cache.MountedVolume, processedVolumesForFSResize sets.String) { uniquePodName := util.GetUniquePodName(pod) ... for _, podVolume := range pod.Spec.Volumes { pvc, volumeSpec, volumeGidValue, err := dswp.createVolumeSpec(podVolume, pod, mounts, devices) // Add volume to desired state of world _, err = dswp.desiredStateOfWorld.AddPodToVolume( uniquePodName, pod, volumeSpec, podVolume.Name, volumeGidValue) dswp.actualStateOfWorld.MarkRemountRequired(uniquePodName) } }
协调者是消费者,它主要做三件事。
unmountVolumes()
: 遍历ActualStateOfWorld中的volume,判断是否在DesiredStateOfWorld中,如果不是,则调用CSI Node的接口进行unmount,并记录在ActualStateOfWorld中。mountAttachVolumes()
:从DesiredStateOfWorld中获取需要挂载的volume,调用CSI Node的接口进行挂载或扩容,并记录在ActualStateOfWorld中。unmountDetachDevices()
: 遍历ActualStateOfWorld中的volume,如果已经被attach了,但是没有pod在使用并且没有记录在DesiredStateOfWorld中,那么unmount/detach它。
我们以mountAttachVolumes()
一个例子来看看它是如何调用CSI Node接口的。
func (rc *reconciler) mountAttachVolumes() { // Ensure volumes that should be attached/mounted are attached/mounted. for _, volumeToMount := range rc.desiredStateOfWorld.GetVolumesToMount() { volMounted, devicePath, err := rc.actualStateOfWorld.PodExistsInVolume(volumeToMount.PodName, volumeToMount.VolumeName) volumeToMount.DevicePath = devicePath if cache.IsVolumeNotAttachedError(err) { ... } else if !volMounted || cache.IsRemountRequiredError(err) { // Volume is not mounted, or is already mounted, but requires remounting err := rc.operationExecutor.MountVolume( rc.waitForAttachTimeout, volumeToMount.VolumeToMount, rc.actualStateOfWorld, isRemount) ... } else if cache.IsFSResizeRequiredError(err) { err := rc.operationExecutor.ExpandInUseVolume( volumeToMount.VolumeToMount, rc.actualStateOfWorld) ... } } }
mount 的执行是在 中完成的rc.operationExecutor
,看 operationExecutor 代码。
func (oe *operationExecutor) MountVolume( waitForAttachTimeout time.Duration, volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater, isRemount bool) error { ... var generatedOperations volumetypes.GeneratedOperations generatedOperations = oe.operationGenerator.GenerateMountVolumeFunc( waitForAttachTimeout, volumeToMount, actualStateOfWorld, isRemount) // Avoid executing mount/map from multiple pods referencing the // same volume in parallel podName := nestedpendingoperations.EmptyUniquePodName return oe.pendingOperations.Run( volumeToMount.VolumeName, podName, "" /* nodeName */, generatedOperations) }
这个函数先构造executor函数,然后执行,再看构造函数。
func (og *operationGenerator) GenerateMountVolumeFunc( waitForAttachTimeout time.Duration, volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater, isRemount bool) volumetypes.GeneratedOperations { volumePlugin, err := og.volumePluginMgr.FindPluginBySpec(volumeToMount.VolumeSpec) mountVolumeFunc := func() volumetypes.OperationContext { // Get mounter plugin volumePlugin, err := og.volumePluginMgr.FindPluginBySpec(volumeToMount.VolumeSpec) volumeMounter, newMounterErr := volumePlugin.NewMounter( volumeToMount.VolumeSpec, volumeToMount.Pod, volume.VolumeOptions{}) ... // Execute mount mountErr := volumeMounter.SetUp(volume.MounterArgs{ FsUser: util.FsUserFrom(volumeToMount.Pod), FsGroup: fsGroup, DesiredSize: volumeToMount.DesiredSizeLimit, FSGroupChangePolicy: fsGroupChangePolicy, }) // Update actual state of world markOpts := MarkVolumeOpts{ PodName: volumeToMount.PodName, PodUID: volumeToMount.Pod.UID, VolumeName: volumeToMount.VolumeName, Mounter: volumeMounter, OuterVolumeSpecName: volumeToMount.OuterVolumeSpecName, VolumeGidVolume: volumeToMount.VolumeGidValue, VolumeSpec: volumeToMount.VolumeSpec, VolumeMountState: VolumeMounted, } markVolMountedErr := actualStateOfWorld.MarkVolumeAsMounted(markOpts) ... return volumetypes.NewOperationContext(nil, nil, migrated) } return volumetypes.GeneratedOperations{ OperationName: "volume_mount", OperationFunc: mountVolumeFunc, EventRecorderFunc: eventRecorderFunc, CompleteFunc: util.OperationCompleteHook(util.GetFullQualifiedPluginNameForVolume(volumePluginName, volumeToMount.VolumeSpec), "volume_mount"), } }
这里我们先去注册到kubelet的CSI的插件列表,找到对应的插件,然后执行volumeMounter.SetUp
,最后更新ActualStateOfWorld记录。这里,外部CSI插件为csiMountMgr,代码如下。
func (c *csiMountMgr) SetUp(mounterArgs volume.MounterArgs) error { return c.SetUpAt(c.GetPath(), mounterArgs) } func (c *csiMountMgr) SetUpAt(dir string, mounterArgs volume.MounterArgs) error { csi, err := c.csiClientGetter.Get() ... err = csi.NodePublishVolume( ctx, volumeHandle, readOnly, deviceMountPath, dir, accessMode, publishContext, volAttribs, nodePublishSecrets, fsType, mountOptions, ) ... return nil }
可以看到,CSI Node NodePublishVolume
/ NodeUnPublishVolume
interface是由kubelet中volumeManager的csiMountMgr调用的。
概括
本文从三个方面来分析整个 CSI 系统:CSI 组件,CSI 接口,以及 Volume 挂载到 Pod 的过程。
CSI 是整个容器生态系统的标准存储接口,CO 通过 gRPC 与 CSI 插件通信。为了实现通用性,k8s 设计了很多外部组件来配合 CSI 插件实现不同的功能,从而保证了 k8s 内部逻辑的纯洁性和 CSI 插件的简洁性。