kube-apiserver的设计与实现

kube-apiserver 是 kubernetes 中的一个组件,它直接与 etcd 交互并控制 kubernetes 中核心资源的更改。它提供以下主要功能。

  • 提供Kubernetes API,包括鉴权授权、数据验证、集群状态变化,供客户端和其他组件调用。
  • 代理集群中的一些附加组件组件,例如 Kubernetes UI、metrics-server、npd 等。
  • 创建kubernetes服务,即提供apiserver的Service,kubernetes Service。
  • 不同版本之间的资源转换。

 

kube-apiserver 处理流程

kube-apiserver 与其他组件交互主要是通过对外提供 API 来实现的,可以通过调用 kube-apiserver 的接口$ curl -k https://<masterIP>:6443或者通过其提供的 swagger-ui来获取,主要有以下三个 API。

  • 核心群体:主要在/api/v1.
  • 命名组:其路径为/apis/$NAME/$VERSION.
  • 一些暴露系统状态的 API:如/metrics,/healthz等。

API 的 URL 大致由 组成/apis/group/version/namespaces/my-ns/myresource,其中 API 的结构大致如下图所示。

在了解了 kube-apiserver 的 API 之后,下一节将介绍 kube-apiserver 如何处理 API 请求。请求的完整流程如下图所示。

此处显示了 POST 请求的示例。当请求到达kube-apiserver时,kube-apiserver首先执行http过滤器链中注册的过滤器链,该过滤器链进行一系列过滤操作,主要是鉴权、鉴权等校验操作。当过滤器链完成后,请求被路由到相应的处理程序,该处理程序主要与 etcd 交互。处理程序中的主要操作如下

 

 

Decoder

kubernetes 中的大多数资源都会有一个internal version,因为一个资源在整个开发过程中可能有多个版本。例如,部署将具有extensions/v1beta1apps/v1. 为了避免问题,kube-apiserver 必须知道如何在每对版本之间进行转换(例如,v1⇔v1alpha1、v1⇔v1beta1、v1beta1⇔v1alpha1)。因此它使用了一个特殊的internal version,它是一个通用的版本,它包含了版本的所有字段并且具有所有版本的功能。解码器会先将 creater 对象转换为internal version,然后再转换为storage version,这是存储在 etcd 中时的另一个版本。

解码时,首先从HTTP路径中获取期望的版本,然后使用scheme创建一个与正确版本匹配的空对象,并使用JSON或protobuf解码器进行转换,在转换的第一步,如果用户省略了一些字段,解码器将它们设置为默认值。

 

 

Admission

解码完成后,您需要通过验证集群的全局约束并根据集群配置设置默认值来检查是否可以创建或更新对象。您可以在目录中看到所有 kube-apiserver 可以使用的全局约束插件k8s.io/kubernetes/plugin/pkg/admission。kube-apiserver 通过将-enable-admission-plugins参数设置为  -ValidatingAdmissionWebhookor -MutatingAdmissionWebhook 来启动。

 

 

Validation

主要检查对象中字段的合法性。

在handler中执行完以上操作后,会执行最后一个与etcd相关的操作,POST操作会将数据写入etcd。以上在handler中的主要处理流程如下图所示。

v1beta1 ⇒ internal ⇒    |    ⇒       |    ⇒  v1  ⇒ json/yaml ⇒ etcd
                     admission    validation

 

kube-apiserver 中的组件

kube-apiserver 由 3 个组件(Aggregator、KubeAPIServer、APIExtensionServer)组成,它们通过委托依次处理请求。

  • Aggregator:暴露类似七层负载均衡的功能,拦截用户请求转发到其他服务器,负责整个APIServer的Discovery功能。
  • KubeAPIServer:负责一些通用的请求处理、认证、鉴权等,以及处理每个内置资源的 REST 服务。
  • APIExtensionServer:主要处理CustomResourceDefinition(CRD)和CustomResource(CR)REST请求,是Delegation的最后一个环节,如果对应的CR不能处理,返回404。

Aggregator 和 APIExtensionsServer 分别对应扩展 APIServer 资源的两种主要方式,即 AA 和 CRD。

 

Aggregator 聚合器

Aggregator 通过将 APIServices 对象关联到一个 Service 来转发请求,而与之关联的 Service 类型进一步决定了请求转发的形式。聚合器由一个GenericAPIServer和一个控制器组成,控制器保持自己的状态。主要处理组下APIService资源的GenericAPIServer请求apiregistration.k8s.io

Aggregator 除了处理资源请求外,还包含几个控制器:

  • apiserviceRegistrationController:负责APIServices中资源的注册和删除。
  • availableConditionController:维护APIServices的可用性状态,包括其引用的Service是否可用等。
  • autoRegistrationController:用于维护 API 中存在的一组特定 APIService。
  • crdRegistrationController:负责将 CRD GroupVersions 自动注册到 APIServices 中。
  • openAPIAggregationController:将 APIServices 资源的更改同步到提供的 OpenAPI 文档。

Kubernetes 中的一些附加组件,例如 metrics-server 通过 Aggregator 的方式进行扩展,可以在真实环境中使用apiserver-builder工具轻松创建自定义资源作为 Aggregator 扩展。

 

 

启用 API 聚合

需要在 kube-apiserver 中添加以下配置以启用 API 聚合。

--proxy-client-cert-file=/etc/kubernetes/certs/proxy.crt
--proxy-client-key-file=/etc/kubernetes/certs/proxy.key
--requestheader-client-ca-file=/etc/kubernetes/certs/proxy-ca.crt
--requestheader-allowed-names=aggregator
--requestheader-extra-headers-prefix=X-Remote-Extra-
--requestheader-group-headers=X-Remote-Group
--requestheader-username-headers=X-Remote-User

 

 

KubeAPI 服务器

KubeAPIServer 主要提供 API Resource 操作的请求,为 kubernetes 中的许多 API 注册路由信息,暴露 RESTful API,向公众提供 kubernetes 服务。

它使集群内外的服务能够通过 RESTful API 操作 Kubernetes 中的资源。

 

 

APIExtensionServer

APIExtensionServer 是委托链的最后一层,是处理用户通过自定义资源定义定义的所有资源的资源服务器。

包含的控制器及其功能如下所示。

  • openapiController: 将 crd 资源的更改同步到提供的 OpenAPI 文档中,可以访问/openapi/v2.
  • crdController: 负责将 crd 信息注册到 apiVersions 和 apiResources 中,两者都可以通过$ kubectl api-versions和来查看$ kubectl api-resources
  • namingController:检查 crd obj 中的命名冲突,可在 crd 中查看.status.conditions
  • establishingController: 检查 crd 是否处于正常状态,在 crd 中可用.status.conditions
  • nonStructuralSchemaController: 检查 crd obj 结构是否正常,在 crd 中可用.status.conditions
  • apiApprovalController: 检查 crd 是否遵循 kubernetes API 声明策略,在 crd 中可用.status.conditions
  • finalizingController: 一个类似于finalizes的函数,与CRs的移除有关。

    KubeAPI 服务器

    KubeAPIServer 主要提供 API Resource 操作的请求,为 kubernetes 中的许多 API 注册路由信息,暴露 RESTful API,向公众提供 kubernetes 服务。

    它使集群内外的服务能够通过 RESTful API 操作 Kubernetes 中的资源。

    APIExtensionServer

    APIExtensionServer 是委托链的最后一层,是处理用户通过自定义资源定义定义的所有资源的资源服务器。

    包含的控制器及其功能如下所示。

    • openapiController: 将 crd 资源的更改同步到提供的 OpenAPI 文档中,可以访问/openapi/v2.
    • crdController: 负责将 crd 信息注册到 apiVersions 和 apiResources 中,两者都可以通过$ kubectl api-versions和来查看$ kubectl api-resources
    • namingController:检查 crd obj 中的命名冲突,可在 crd 中查看.status.conditions
    • establishingController: 检查 crd 是否处于正常状态,在 crd 中可用.status.conditions
    • nonStructuralSchemaController: 检查 crd obj 结构是否正常,在 crd 中可用.status.conditions
    • apiApprovalController: 检查 crd 是否遵循 kubernetes API 声明策略,在 crd 中可用.status.conditions
    • finalizingController: 一个类似于finalizes的函数,与CRs的移除有关。

 

kube-apiserver 启动过程分析

Kubernetes版本:v1.16

首先,我们分析一下kube-apiserver是如何启动的。Runkube-apiserver 也是通过它的方法启动主逻辑,在Run调用该方法之前,会解析命令行参数,设置默认值等。

 

Run

Run方法的主要逻辑是。

  • 调用CreateServerChain构建服务调用链,判断是否启动非安全http服务器。http服务器链包含apiserver要启动的三台服务器,以及为每台服务器注册相应资源的路由。
  • 调用server.PrepareRun准备运行的服务,主要完成OpenAPI路由的健康检查、生存检查和注册。
  • 调用prepared.Run以启动 https 服务器。

服务器使用委托模式进行初始化,基本的API Server、CustomResource和Aggregator服务通过DelegationTarget接口链接在一起,对外提供服务。

k8s.io/kubernetes/cmd/kube-apiserver/app/server.go:147

func Run(completeOptions completedServerRunOptions, stopCh <-chan struct{}) error {
    server, err := CreateServerChain(completeOptions, stopCh)
    if err != nil {
        return err
    }

    prepared, err := server.PrepareRun()
    if err != nil {
        return err
    }

    return prepared.Run(stopCh)
}

 

CreateServerChain

CreateServerChain是完成服务端初始化的方法,包含初始化,最后返回实例的所有过程APIExtensionsServer,初始化过程主要包括:http过滤链的配置,API Group的注册,http路径与handler的关联以及处理程序后端存储 etcd 的配置。主要逻辑如下。KubeAPIServerAggregatorServeraggregatorapiserver. APIAggregator

  • 调用创建KubeAPIServer需要的配置CreateKubeAPIServerConfig,主要是create master.Config,会调用buildGenericConfig生成genericConfig,其中包含apiserver核心配置的核心配置。
  • 确定是否启用了扩展 API 服务器并调用createAPIExtensionsConfig为其创建配置。apiExtensions server 是 kubeapiserver 中其他服务器的代理服务,例如 metric-server。
  • 调用createAPIExtensionsServer以创建 apiExtensionsServer 的实例。
  • 调用CreateKubeAPIServer以初始化 kubeAPIServer。
  • 调用createAggregatorConfig以创建 aggregatorServer 的配置并调用createAggregatorServer以初始化 aggregatorServer。
  • 配置并确定是否启动非安全http服务器。

k8s.io/kubernetes/cmd/kube-apiserver/app/server.go:165

func CreateServerChain(completedOptions completedServerRunOptions, stopCh <-chan struct{}) (*aggregatorapiserver.APIAggregator, error) {
    nodeTunneler, proxyTransport, err := CreateNodeDialer(completedOptions)
    if err != nil {
        return nil, err
    }
    // 1、为 kubeAPIServer 创建配置
    kubeAPIServerConfig, insecureServingInfo, serviceResolver, pluginInitializer, admissionPostStartHook, err :=                                         CreateKubeAPIServerConfig(completedOptions, nodeTunneler, proxyTransport)
    if err != nil {
        return nil, err
    }

    // 2、判断是否配置了 APIExtensionsServer,创建 apiExtensionsConfig 
    apiExtensionsConfig, err := createAPIExtensionsConfig(*kubeAPIServerConfig.GenericConfig, kubeAPIServerConfig.ExtraConfig.VersionedInformers,        pluginInitializer, completedOptions.ServerRunOptions, completedOptions.MasterCount,
        serviceResolver, webhook.NewDefaultAuthenticationInfoResolverWrapper(proxyTransport, kubeAPIServerConfig.GenericConfig.LoopbackClientConfig))
    if err != nil {
        return nil, err
    }
    
    // 3、初始化 APIExtensionsServer
    apiExtensionsServer, err := createAPIExtensionsServer(apiExtensionsConfig, genericapiserver.NewEmptyDelegate())
    if err != nil {
        return nil, err
    }

    // 4、初始化 KubeAPIServer
    kubeAPIServer, err := CreateKubeAPIServer(kubeAPIServerConfig, apiExtensionsServer.GenericAPIServer, admissionPostStartHook)
    if err != nil {
        return nil, err
    }
    
    // 5、创建 AggregatorConfig
    aggregatorConfig, err := createAggregatorConfig(*kubeAPIServerConfig.GenericConfig, completedOptions.ServerRunOptions, kubeAPIServerConfig.          ExtraConfig.VersionedInformers, serviceResolver, proxyTransport, pluginInitializer)
    if err != nil {
        return nil, err
    }
    
    // 6、初始化 AggregatorServer
    aggregatorServer, err := createAggregatorServer(aggregatorConfig, kubeAPIServer.GenericAPIServer, apiExtensionsServer.Informers)
    if err != nil {
        return nil, err
    }

    // 7、判断是否启动非安全端口的 http server
    if insecureServingInfo != nil {
        insecureHandlerChain := kubeserver.BuildInsecureHandlerChain(aggregatorServer.GenericAPIServer.UnprotectedHandler(), kubeAPIServerConfig.GenericConfig)
        if err := insecureServingInfo.Serve(insecureHandlerChain, kubeAPIServerConfig.GenericConfig.RequestTimeout, stopCh); err != nil {
            return nil, err
        }
    }
    return aggregatorServer, nil
}

 

创建KubeAPIServerConfig

主要目的CreateKubeAPIServerConfig是调用buildGenericConfig创建 genericConfig 并构建 master.Config 对象。

k8s.io/kubernetes/cmd/kube-apiserver/app/server.go:271

func CreateKubeAPIServerConfig(
    s completedServerRunOptions,
    nodeTunneler tunneler.Tunneler,
    proxyTransport *http.Transport,
) (......) {

    // 1、构建 genericConfig
    genericConfig, versionedInformers, insecureServingInfo, serviceResolver, pluginInitializers, admissionPostStartHook, storageFactory,    lastErr = buildGenericConfig(s.ServerRunOptions, proxyTransport)
    if lastErr != nil {
        return
    }

    ......

    // 2、初始化所支持的 capabilities
    capabilities.Initialize(capabilities.Capabilities{
        AllowPrivileged: s.AllowPrivileged,
        PrivilegedSources: capabilities.PrivilegedSources{
            HostNetworkSources: []string{},
            HostPIDSources:     []string{},
            HostIPCSources:     []string{},
        },
        PerConnectionBandwidthLimitBytesPerSec: s.MaxConnectionBytesPerSec,
    })

    // 3、获取 service ip range 以及 api server service IP
    serviceIPRange, apiServerServiceIP, lastErr := master.DefaultServiceIPRange(s.PrimaryServiceClusterIPRange)
    if lastErr != nil {
        return
    }

    ......

    // 4、构建 master.Config 对象
    config = &master.Config{......}
    
    if nodeTunneler != nil {
        config.ExtraConfig.KubeletClientConfig.Dial = nodeTunneler.Dial
    }
    if config.GenericConfig.EgressSelector != nil {
        config.ExtraConfig.KubeletClientConfig.Lookup = config.GenericConfig.EgressSelector.Lookup
    }

    return
}

 

 

构建通用配置

主要逻辑如下。

  • 调用genericapiserver.NewConfig生成默认的genericConfig,genericConfig主要配置DefaultBuildHandlerChainDefaultBuildHandlerChain包含一系列用于认证、鉴权、鉴权的http过滤器链,以及一系列http过滤器链。
  • 调用master.DefaultAPIResourceConfigSource加载需要启用的API资源,在k8s.io/api代码目录下可以看到集群中所有的API资源,随着版本的迭代也会不断变化。
  • 为 genericConfig 中的一些字段设置默认值。
  • 调用completedStorageFactoryConfig.New创建 storageFactory,稍后会使用它为每个 API Resource 创建对应的 RESTStorage。

k8s.io/kubernetes/cmd/kube-apiserver/app/server.go:386

func buildGenericConfig(
    s *options.ServerRunOptions,
    proxyTransport *http.Transport,
) (......) {
    // 1、为 genericConfig 设置默认值
    genericConfig = genericapiserver.NewConfig(legacyscheme.Codecs)
    genericConfig.MergedResourceConfig = master.DefaultAPIResourceConfigSource()

    if lastErr = s.GenericServerRunOptions.ApplyTo(genericConfig); lastErr != nil {
        return
    }
    ......

    genericConfig.OpenAPIConfig = genericapiserver.DefaultOpenAPIConfig(......)
    genericConfig.OpenAPIConfig.Info.Title = "Kubernetes"
    genericConfig.LongRunningFunc = filters.BasicLongRunningRequestCheck(
        sets.NewString("watch", "proxy"),
        sets.NewString("attach", "exec", "proxy", "log", "portforward"),
    )

    kubeVersion := version.Get()
    genericConfig.Version = &kubeVersion

    storageFactoryConfig := kubeapiserver.NewStorageFactoryConfig()
    storageFactoryConfig.ApiResourceConfig = genericConfig.MergedResourceConfig
    completedStorageFactoryConfig, err := storageFactoryConfig.Complete(s.Etcd)
    if err != nil {
        lastErr = err
        return
    }
    // 初始化 storageFactory
    storageFactory, lastErr = completedStorageFactoryConfig.New()
    if lastErr != nil {
        return
    }
    if genericConfig.EgressSelector != nil {
        storageFactory.StorageConfig.Transport.EgressLookup = genericConfig.EgressSelector.Lookup
    }
    
    // 2、初始化 RESTOptionsGetter,后期根据其获取操作 Etcd 的句柄,同时添加 etcd 的健康检查方法
    if lastErr = s.Etcd.ApplyWithStorageFactoryTo(storageFactory, genericConfig); lastErr != nil {
        return
    }

    // 3、设置使用 protobufs 用来内部交互,并且禁用压缩功能
    genericConfig.LoopbackClientConfig.ContentConfig.ContentType = "application/vnd.kubernetes.protobuf"
    
    genericConfig.LoopbackClientConfig.DisableCompression = true
		
    // 4、创建 clientset
    kubeClientConfig := genericConfig.LoopbackClientConfig
    clientgoExternalClient, err := clientgoclientset.NewForConfig(kubeClientConfig)
    if err != nil {
        lastErr = fmt.Errorf("failed to create real external clientset: %v", err)
        return
    }
    versionedInformers = clientgoinformers.NewSharedInformerFactory(clientgoExternalClient, 10*time.Minute)

    // 5、创建认证实例,支持多种认证方式:请求 Header 认证、Auth 文件认证、CA 证书认证、Bearer token 认证、
    // ServiceAccount 认证、BootstrapToken 认证、WebhookToken 认证等
    genericConfig.Authentication.Authenticator, genericConfig.OpenAPIConfig.SecurityDefinitions, err = BuildAuthenticator(s,                 clientgoExternalClient, versionedInformers)
    if err != nil {
        lastErr = fmt.Errorf("invalid authentication config: %v", err)
        return
    }

    // 6、创建鉴权实例,包含:Node、RBAC、Webhook、ABAC、AlwaysAllow、AlwaysDeny
    genericConfig.Authorization.Authorizer, genericConfig.RuleResolver, err = BuildAuthorizer(s, versionedInformers)
    ......
		
    serviceResolver = buildServiceResolver(s.EnableAggregatorRouting, genericConfig.LoopbackClientConfig.Host, versionedInformers)

    authInfoResolverWrapper := webhook.NewDefaultAuthenticationInfoResolverWrapper(proxyTransport, genericConfig.LoopbackClientConfig)

    // 7、审计插件的初始化
    lastErr = s.Audit.ApplyTo(......)
    if lastErr != nil {
        return
    }

    // 8、准入插件的初始化
    pluginInitializers, admissionPostStartHook, err = admissionConfig.New(proxyTransport, serviceResolver)
    if err != nil {
        lastErr = fmt.Errorf("failed to create admission plugin initializer: %v", err)
        return
    }
    err = s.Admission.ApplyTo(......)
    if err != nil {
        lastErr = fmt.Errorf("failed to initialize admission: %v", err)
    }

    return
}

以上分析重点是 KubeAPIServerConfig 的初始化。其他两个server config的初始化暂不详细分析,我们继续分析server的初始化。

 

 

createAPIExtensionsServer

APIExtensionsServer 最先被初始化,通过如下逻辑调用初始化apiextensionsConfig.Complete().New服务器createAPIExtensionsServer

  • 首先调用c.GenericConfig.New根据go-restful模式初始化Container,在c.GenericConfig.New调用NewAPIServerHandler中初始化handler,APIServerHandler是APIServer使用的Handler类型,包括go-restfuland non-go-restful,和一个你在两者之间选择的Director对象,go-restful用来处理注册的handler和non-go restful是用于处理不存在的处理程序,API URI 处理的选择过程是:FullHandlerChain-> Director -> {GoRestfulContainer, NonGoRestfulMux}. 在c.GenericConfig.New,installAPI中也调用添加路由信息包括//debug/*/metrics,/version等。这三种服务器都是通过调用c.GenericConfig.New初始化genericServer然后注册API来初始化的。
  • 调用s.GenericAPIServer.InstallAPIGroup注册路由中的API资源,这个方法的调用链很深,主要是注册要暴露给服务端的API资源,从而可以通过http接口对资源进行REST操作。其他几个服务器也在InstallAPI初始化期间执行相应的操作。
  • 初始化服务器中需要用到的控制器,主要是openapiControllercrdControllernamingControllerestablishingControllernonStructuralSchemaControllerapiApprovalControllerfinalizingController。
  • 将需要启动的控制器和通知器添加到 PostStartHook。

k8s.io/kubernetes/cmd/kube-apiserver/app/apiextensions.go:94

func createAPIExtensionsServer(apiextensionsConfig *apiextensionsapiserver.Config, delegateAPIServer genericapiserver.DelegationTarget) (*  apiextensionsapiserver.CustomResourceDefinitions, error) {
    return apiextensionsConfig.Complete().New(delegateAPIServer)
}

 

k8s.io/kubernetes/staging/src/k8s.io/apiextensions-apiserver/pkg/apiserver/apiserver.go:132

func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) (*CustomResourceDefinitions, error) {
    // 1、初始化 genericServer
    genericServer, err := c.GenericConfig.New("apiextensions-apiserver", delegationTarget)
    if err != nil {
        return nil, err
    }

    s := &CustomResourceDefinitions{
        GenericAPIServer: genericServer,
    }

    // 2、初始化 APIGroup Info,APIGroup 指该 server 需要暴露的 API
    apiResourceConfig := c.GenericConfig.MergedResourceConfig
    apiGroupInfo := genericapiserver.NewDefaultAPIGroupInfo(apiextensions.GroupName, Scheme, metav1.ParameterCodec, Codecs)
    if apiResourceConfig.VersionEnabled(v1beta1.SchemeGroupVersion) {
        storage := map[string]rest.Storage{}
        customResourceDefintionStorage := customresourcedefinition.NewREST(Scheme, c.GenericConfig.RESTOptionsGetter)
        storage["customresourcedefinitions"] = customResourceDefintionStorage
        storage["customresourcedefinitions/status"] = customresourcedefinition.NewStatusREST(Scheme, customResourceDefintionStorage)

        apiGroupInfo.VersionedResourcesStorageMap[v1beta1.SchemeGroupVersion.Version] = storage
    }
    if apiResourceConfig.VersionEnabled(v1.SchemeGroupVersion) {
        ......
    }

    // 3、注册 APIGroup
    if err := s.GenericAPIServer.InstallAPIGroup(&apiGroupInfo); err != nil {
        return nil, err
    }

    // 4、初始化需要使用的 controller
    crdClient, err := internalclientset.NewForConfig(s.GenericAPIServer.LoopbackClientConfig)
    if err != nil {
        return nil, fmt.Errorf("failed to create clientset: %v", err)
    }
    s.Informers = internalinformers.NewSharedInformerFactory(crdClient, 5*time.Minute)
		
    ......
    establishingController := establish.NewEstablishingController(s.Informers.Apiextensions().InternalVersion().                    CustomResourceDefinitions(), crdClient.Apiextensions())
    crdHandler, err := NewCustomResourceDefinitionHandler(......)
    if err != nil {
        return nil, err
    }
    s.GenericAPIServer.Handler.NonGoRestfulMux.Handle("/apis", crdHandler)
    s.GenericAPIServer.Handler.NonGoRestfulMux.HandlePrefix("/apis/", crdHandler)

    crdController := NewDiscoveryController(s.Informers.Apiextensions().InternalVersion().CustomResourceDefinitions(),                 versionDiscoveryHandler, groupDiscoveryHandler)
    namingController := status.NewNamingConditionController(s.Informers.Apiextensions().InternalVersion().CustomResourceDefinitions(), crdClient.Apiextensions())
    nonStructuralSchemaController := nonstructuralschema.NewConditionController(s.Informers.Apiextensions().InternalVersion().         CustomResourceDefinitions(), crdClient.Apiextensions())
    apiApprovalController := apiapproval.NewKubernetesAPIApprovalPolicyConformantConditionController(s.Informers.Apiextensions().      InternalVersion().CustomResourceDefinitions(), crdClient.Apiextensions())
    finalizingController := finalizer.NewCRDFinalizer(
        s.Informers.Apiextensions().InternalVersion().CustomResourceDefinitions(),
        crdClient.Apiextensions(),
        crdHandler,
    )
    var openapiController *openapicontroller.Controller
    if utilfeature.DefaultFeatureGate.Enabled(apiextensionsfeatures.CustomResourcePublishOpenAPI) {
        openapiController = openapicontroller.NewController(s.Informers.Apiextensions().InternalVersion().CustomResourceDefinitions())
    }

    // 5、将 informer 以及 controller 添加到 PostStartHook 中
    s.GenericAPIServer.AddPostStartHookOrDie("start-apiextensions-informers", func(context genericapiserver.PostStartHookContext) error {
        s.Informers.Start(context.StopCh)
        return nil
    })
    s.GenericAPIServer.AddPostStartHookOrDie("start-apiextensions-controllers", func(context genericapiserver.PostStartHookContext) error {
        ......
        go crdController.Run(context.StopCh)
        go namingController.Run(context.StopCh)
        go establishingController.Run(context.StopCh)
        go nonStructuralSchemaController.Run(5, context.StopCh)
        go apiApprovalController.Run(5, context.StopCh)
        go finalizingController.Run(5, context.StopCh)
        return nil
    })

    s.GenericAPIServer.AddPostStartHookOrDie("crd-informer-synced", func(context genericapiserver.PostStartHookContext) error {
        return wait.PollImmediateUntil(100*time.Millisecond, func() (bool, error) {
            return s.Informers.Apiextensions().InternalVersion().CustomResourceDefinitions().Informer().HasSynced(), nil
        }, context.StopCh)
    })

    return s, nil
}

以上是APIExtensionsServer的初始化过程,其中最核心的方法是s.GenericAPIServer.InstallAPIGroup,也就是API的注册过程,三个服务器中API的注册过程是它的核心。

 

创建KubeAPIServer

本节继续分析 KubeAPIServer 的初始化,kubeAPIServerConfig.Complete().New调用这里CreateKubeAPIServer完成初始化操作。

 

kubeAPIServerConfig.Complete().New

主要逻辑如下。

  • 调用 GenericAPIServer 初始化 GenericAPIServer,c.GenericConfig.New上面分析了它的主要实现。
  • 判断是否支持日志相关路由,如果支持,添加/logs路由。
  • 调用m.InstallLegacyAPI将核心 API 资源添加到路由中,该路由对应 apiserver 作为以 . 开头的资源/api
  • 调用m.InstallAPIs以将扩展 API 资源添加到路由,在 apiserver 中是以 . 开头的资源/apis

k8s.io/kubernetes/cmd/kube-apiserver/app/server.go:214

func CreateKubeAPIServer(......) (*master.Master, error) {
    kubeAPIServer, err := kubeAPIServerConfig.Complete().New(delegateAPIServer)
    if err != nil {
        return nil, err
    }

    kubeAPIServer.GenericAPIServer.AddPostStartHookOrDie("start-kube-apiserver-admission-initializer", admissionPostStartHook)

    return kubeAPIServer, nil
}

 

k8s.io/kubernetes/pkg/master/master.go:325

func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) (*Master, error) {
    ......
    // 1、初始化 GenericAPIServer
    s, err := c.GenericConfig.New("kube-apiserver", delegationTarget)
    if err != nil {
        return nil, err
    }

    // 2、注册 logs 相关的路由
    if c.ExtraConfig.EnableLogsSupport {
        routes.Logs{}.Install(s.Handler.GoRestfulContainer)
    }

    m := &Master{
        GenericAPIServer: s,
    }
    
    // 3、安装 LegacyAPI
    if c.ExtraConfig.APIResourceConfigSource.VersionEnabled(apiv1.SchemeGroupVersion) {
        legacyRESTStorageProvider := corerest.LegacyRESTStorageProvider{
            StorageFactory:              c.ExtraConfig.StorageFactory,
            ProxyTransport:              c.ExtraConfig.ProxyTransport,
            ......
        }
        if err := m.InstallLegacyAPI(&c, c.GenericConfig.RESTOptionsGetter, legacyRESTStorageProvider); err != nil {
            return nil, err
        }
    }
    restStorageProviders := []RESTStorageProvider{
        auditregistrationrest.RESTStorageProvider{},
        authenticationrest.RESTStorageProvider{Authenticator: c.GenericConfig.Authentication.Authenticator, APIAudiences: c.GenericConfig.  Authentication.APIAudiences},
        ......
    }
    // 4、安装 APIs
    if err := m.InstallAPIs(c.ExtraConfig.APIResourceConfigSource, c.GenericConfig.RESTOptionsGetter, restStorageProviders...); err != nil {
        return nil, err
    }

    if c.ExtraConfig.Tunneler != nil {
        m.installTunneler(c.ExtraConfig.Tunneler, corev1client.NewForConfigOrDie(c.GenericConfig.LoopbackClientConfig).Nodes())
    }

    m.GenericAPIServer.AddPostStartHookOrDie("ca-registration", c.ExtraConfig.ClientCARegistrationHook.PostStartHook)

    return m, nil
}

 

m.InstallLegacyAPI

该方法的主要作用是将核心API注册到路由中,是apiserver初始化过程中最核心的方法之一,但是它的调用链很深,下面会深入分析。将API注册到路由的最终目的是提供外部RESTful API来操作对应的资源,注册API主要分为两步,第一步是为API中的每个资源初始化RESTStorage来操作变更对于后端存储中的数据,第二步是根据每个资源的动词构建对应的路由第二步是根据每个资源的动词构建对应的路由。的主要逻辑m.InstallLegacyAPI如下。

 

  • 调用legacyRESTStorageProvider.NewLegacyRESTStorage为 LegacyAPI 中的每个资源创建 RESTStorage。RESTStorage 的目的是对应每个资源的访问路径及其在后端存储中的操作。
  • 初始化bootstrap-controller并添加到 PostStartHook 中,bootstrap-controller是 apiserver 中的一个控制器,主要功能是创建一些系统需要的命名空间并创建 kubernetes 服务并定期触发相应的同步操作。apiserver 将在启动bootstrap-controller后通过调用 PostStartHook 来启动。
  • 为资源创建 RESTStorage 后,调用m.GenericAPIServer.InstallLegacyAPIGroup为 APIGroup 注册路由信息。方法的调用链InstallLegacyAPIGroup很深,主要是InstallLegacyAPIGroup --> installAPIResources --> InstallREST --> Install --> registerResourceHandlers,而最终的核心路由构建在registerResourceHandlers方法中,比较复杂,它的主要作用是确定哪些操作(如创建、更新等)可以通过通过上一步构建的REST Storage获取资源,并将对应的操作存储到action中,每个action对应一个标准最后,根据actions数组,每个action都会添加一个handler方法并注册到路由中,然后该路由将注册到网络服务。Web 服务最终将按照 go-restful 设计模式注册到容器中。

 

legacyRESTStorageProvider.NewLegacyRESTStorage和方法的细节m.GenericAPIServer.InstallLegacyAPIGroup将在后面的部分继续。

k8s.io/kubernetes/pkg/master/master.go:406

func (m *Master) InstallLegacyAPI(......) error {
    legacyRESTStorage, apiGroupInfo, err := legacyRESTStorageProvider.NewLegacyRESTStorage(restOptionsGetter)
    if err != nil {
        return fmt.Errorf("Error building core storage: %v", err)
    }

    controllerName := "bootstrap-controller"
    coreClient := corev1client.NewForConfigOrDie(c.GenericConfig.LoopbackClientConfig)
    bootstrapController := c.NewBootstrapController(legacyRESTStorage, coreClient, coreClient, coreClient, coreClient.RESTClient())
    m.GenericAPIServer.AddPostStartHookOrDie(controllerName, bootstrapController.PostStartHook)
    m.GenericAPIServer.AddPreShutdownHookOrDie(controllerName, bootstrapController.PreShutdownHook)

    if err := m.GenericAPIServer.InstallLegacyAPIGroup(genericapiserver.DefaultLegacyAPIPrefix, &apiGroupInfo); err != nil {
        return fmt.Errorf("Error in registering group versions: %v", err)
    }
    return nil
}

InstallAPIs和的主要过程InstallLegacyAPI类似,为了篇幅,这里不再赘述。

 

创建聚合器服务器

AggregatorServer主要用于自定义聚合控制器,使CRD自动注册到集群。

主要逻辑如下。

  • 调用aggregatorConfig.Complete().NewWithDelegate以创建 aggregatorServer。
  • 初始化crdRegistrationControllerautoRegistrationControllercrdRegistrationController负责注册CRD,autoRegistrationController负责自动注册CRDcrdRegistrationController负责注册CRD,autoRegistrationController负责自动向apiserver注册相应的APIServices。
  • autoRegistrationController将和添加crdRegistrationController到 PostStartHook。

k8s.io/kubernetes/cmd/kube-apiserver/app/aggregator.go:124

func createAggregatorServer(......) (*aggregatorapiserver.APIAggregator, error) {
    // 1、初始化 aggregatorServer
    aggregatorServer, err := aggregatorConfig.Complete().NewWithDelegate(delegateAPIServer)
    if err != nil {
        return nil, err
    }

    // 2、初始化 auto-registration controller
    apiRegistrationClient, err := apiregistrationclient.NewForConfig(aggregatorConfig.GenericConfig.LoopbackClientConfig)
    if err != nil {
        return nil, err
    }
    autoRegistrationController := autoregister.NewAutoRegisterController(......)
    apiServices := apiServicesToRegister(delegateAPIServer, autoRegistrationController)
    crdRegistrationController := crdregistration.NewCRDRegistrationController(......)
    err = aggregatorServer.GenericAPIServer.AddPostStartHook("kube-apiserver-autoregistration", func(context genericapiserver.PostStartHookContext) error {
        go crdRegistrationController.Run(5, context.StopCh)
        go func() {
            if aggregatorConfig.GenericConfig.MergedResourceConfig.AnyVersionForGroupEnabled("apiextensions.k8s.io") {
                crdRegistrationController.WaitForInitialSync()
            }
            autoRegistrationController.Run(5, context.StopCh)
        }()
        return nil
    })
    if err != nil {
        return nil, err
    }

    err = aggregatorServer.GenericAPIServer.AddBootSequenceHealthChecks(
        makeAPIServiceAvailableHealthCheck(
            "autoregister-completion",
            apiServices,
            aggregatorServer.APIRegistrationInformers.Apiregistration().V1().APIServices(),
        ),
    )
    if err != nil {
        return nil, err
    }

    return aggregatorServer, nil
}

 

 

aggregatorConfig.Complete().NewWithDelegate

NewWithDelegate`是初始化aggregatorServer的方法,主要逻辑如下。

  • 调用c.GenericConfig.New初始化GenericAPIServer,上面已经分析了其内部主要功能。
  • 调用apiservicerest.NewRESTStorage为APIServices资源创建RESTStorage,RESTStorage的目的是对应每个资源的访问路径及其后端存储操作。
  • 调用s.GenericAPIServer.InstallAPIGroup为 APIGroup 注册路由信息。

k8s.io/kubernetes/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go:158

func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.DelegationTarget) (*APIAggregator, error) {
    openAPIConfig := c.GenericConfig.OpenAPIConfig
    c.GenericConfig.OpenAPIConfig = nil
    // 1、初始化 genericServer
    genericServer, err := c.GenericConfig.New("kube-aggregator", delegationTarget)
    if err != nil {
        return nil, err
    }

    apiregistrationClient, err := clientset.NewForConfig(c.GenericConfig.LoopbackClientConfig)
    if err != nil {
        return nil, err
    }
    informerFactory := informers.NewSharedInformerFactory(
        apiregistrationClient,
        5*time.Minute, 
    )
    s := &APIAggregator{
        GenericAPIServer: genericServer,
        delegateHandler: delegationTarget.UnprotectedHandler(),
        ......
    }

    // 2、为 API 注册路由
    apiGroupInfo := apiservicerest.NewRESTStorage(c.GenericConfig.MergedResourceConfig, c.GenericConfig.RESTOptionsGetter)
    if err := s.GenericAPIServer.InstallAPIGroup(&apiGroupInfo); err != nil {
        return nil, err
    }
	
    // 3、初始化 apiserviceRegistrationController、availableController
    apisHandler := &apisHandler{
        codecs: aggregatorscheme.Codecs,
        lister: s.lister,
    }
    s.GenericAPIServer.Handler.NonGoRestfulMux.Handle("/apis", apisHandler)
    s.GenericAPIServer.Handler.NonGoRestfulMux.UnlistedHandle("/apis/", apisHandler)
    apiserviceRegistrationController := NewAPIServiceRegistrationController(informerFactory.Apiregistration().V1().APIServices(), s)
    availableController, err := statuscontrollers.NewAvailableConditionController(
       ......
    )
    if err != nil {
        return nil, err
    }

    // 4、添加 PostStartHook
    s.GenericAPIServer.AddPostStartHookOrDie("start-kube-aggregator-informers", func(context genericapiserver.PostStartHookContext) error {
        informerFactory.Start(context.StopCh)
        c.GenericConfig.SharedInformerFactory.Start(context.StopCh)
        return nil
    })
    s.GenericAPIServer.AddPostStartHookOrDie("apiservice-registration-controller", func(context genericapiserver.PostStartHookContext)      error {
        go apiserviceRegistrationController.Run(context.StopCh)
        return nil
    })
    s.GenericAPIServer.AddPostStartHookOrDie("apiservice-status-available-controller", func(context genericapiserver.PostStartHookContext)  error {
        go availableController.Run(5, context.StopCh)
        return nil
    })

    return s, nil
}

 

以上是对AggregatorServer初始化过程的分析,可以看出在创建APIExtensionsServer、KubeAPIServer和AggregatorServer的时候,模式是类似的,先调用c.GenericConfig.New根据模式go- New初始化Container go-restful,然后再创建RESTStorage为需要在服务器中注册的资源,最后将资源的APIGroup信息注册到路由中。

至此,CreateServerChain中的流程已经分析完毕,调用链如下图所示。

                    |--> CreateNodeDialer
                    |
                    |--> CreateKubeAPIServerConfig
                    |
CreateServerChain --|--> createAPIExtensionsConfig
                    |
                    |                                                                       |--> c.GenericConfig.New
                    |--> createAPIExtensionsServer --> apiextensionsConfig.Complete().New --|
                    |                                                                       |--> s.GenericAPIServer.InstallAPIGroup
                    |
                    |                                                                 |--> c.GenericConfig.New --> legacyRESTStorageProvider.NewLegacyRESTStorage
                    |                                                                 |
                    |--> CreateKubeAPIServer --> kubeAPIServerConfig.Complete().New --|--> m.InstallLegacyAPI
                    |                                                                 |
                    |                                                                 |--> m.InstallAPIs
                    |
                    |
                    |--> createAggregatorConfig
                    |
                    |                                                                             |--> c.GenericConfig.New
                    |                                                                             |
                    |--> createAggregatorServer --> aggregatorConfig.Complete().NewWithDelegate --|--> apiservicerest.NewRESTStorage
                                                                                                  |
                                                                                                  |--> s.GenericAPIServer.InstallAPIGroup

 

prepared.Run

Run方法首先调用CreateServerChain完成各个服务器的初始化,然后调用server.PrepareRun完成服务启动前的准备工作,最后调用该prepared.Run方法启动安全http服务器。

serverPrepare.Run主要完成健康检查、生存检查和OpenAPI路由注册,然后继续分析流程prepared.Run,其中s.NonBlockingRun调用完成启动。

k8s.io/kubernetes/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go:269

func (s preparedAPIAggregator) Run(stopCh <-chan struct{}) error {
    return s.runnable.Run(stopCh)
}

 

k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go:316

func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error {
    delayedStopCh := make(chan struct{})

    go func() {
        defer close(delayedStopCh)
        <-stopCh

        time.Sleep(s.ShutdownDelayDuration)
    }()

    // 调用 s.NonBlockingRun 完成启动流程
    err := s.NonBlockingRun(delayedStopCh)
    if err != nil {
        return err
    }

    // 当收到退出信号后完成一些收尾工作
    <-stopCh
    err = s.RunPreShutdownHooks()
    if err != nil {
        return err
    }

    <-delayedStopCh
    s.HandlerChainWaitGroup.Wait()
    return nil
}

 

 

s.NonBlockingRun

的主要逻辑s.NonBlockingRun如下。

  • 确定是否启动审计日志服务。
  • 调用s.SecureServingInfo.Serve以配置和启动 https 服务器。
  • 执行 postStartHooks。
  • 向 systemd 发送就绪信号。

k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go:351

func (s preparedGenericAPIServer) NonBlockingRun(stopCh <-chan struct{}) error {
    auditStopCh := make(chan struct{})

    // 1、判断是否要启动审计日志
    if s.AuditBackend != nil {
        if err := s.AuditBackend.Run(auditStopCh); err != nil {
            return fmt.Errorf("failed to run the audit backend: %v", err)
        }
    }

    // 2、启动 https server
    internalStopCh := make(chan struct{})
    var stoppedCh <-chan struct{}
    if s.SecureServingInfo != nil && s.Handler != nil {
        var err error
        stoppedCh, err = s.SecureServingInfo.Serve(s.Handler, s.ShutdownTimeout, internalStopCh)
        if err != nil {
            close(internalStopCh)
            close(auditStopCh)
            return err
        }
    }

    go func() {
        <-stopCh
        close(s.readinessStopCh)
        close(internalStopCh)
        if stoppedCh != nil {
            <-stoppedCh
        }
        s.HandlerChainWaitGroup.Wait()
        close(auditStopCh)
    }()

    // 3、执行 postStartHooks
    s.RunPostStartHooks(stopCh)

    // 4、向 systemd 发送 ready 信号
    if _, err := systemd.SdNotify(true, "READY=1\n"); err != nil {
        klog.Errorf("Unable to send systemd daemon successful start message: %v\n", err)
    }

    return nil
}

以上就是服务端的初始化和启动过程的分析,如前所述,服务端初始化过程中最重要的部分就是API Resource RESTStorage的初始化和路由的注册,因为流程是比较复杂,下面将分别讲述。

 

StorageFactory construction

如前所述,apiserver最终实现的handler对应的后端数据存储在一个Store结构体中。在这里,例如,以 开头的路由用于通过该方法为每个资源/api创建RESTStorage 。NewLegacyRESTStorageRESTStorage 是在 下定义的结构k8s.io/apiserver/pkg/registry/generic/registry/store.go,主要包含NewFunc返回资源特定信息、NewListFunc返回资源特定列表以及CreateStrategy资源创建、UpdateStrategy更新和DeleteStrategy删除。在里面NewLegacyRESTStorage,你可以看到创建了多个资源的 RESTStorage。

的调用链NewLegacyRESTStorageCreateKubeAPIServer --> kubeAPIServerConfig.Complete().New --> m.InstallLegacyAPI --> legacyRESTStorageProvider.NewLegacyRESTStorage

 

 

NewLegacyRESTStorage

一个 API Group 下的所有资源都有自己的 REST 实现,所有的 Group 下k8s.io/kubernetes/pkg/registry都有一个 rest 目录,存放着对应资源的 RESTStorage。在该NewLegacyRESTStorage方法中,每个资源对应的storage会由NewRESTor生成NewStorage,这里以pod为例。

k8s.io/kubernetes/pkg/registry/core/rest/storage_core.go:102

func (c LegacyRESTStorageProvider) NewLegacyRESTStorage(restOptionsGetter generic.RESTOptionsGetter) (LegacyRESTStorage, genericapiserver.  APIGroupInfo, error) {
    apiGroupInfo := genericapiserver.APIGroupInfo{
        PrioritizedVersions:          legacyscheme.Scheme.PrioritizedVersionsForGroup(""),
        VersionedResourcesStorageMap: map[string]map[string]rest.Storage{},
        Scheme:                       legacyscheme.Scheme,
        ParameterCodec:               legacyscheme.ParameterCodec,
        NegotiatedSerializer:         legacyscheme.Codecs,
    }

    var podDisruptionClient policyclient.PodDisruptionBudgetsGetter
    if policyGroupVersion := (schema.GroupVersion{Group: "policy", Version: "v1beta1"}); legacyscheme.Scheme.                               IsVersionRegistered(policyGroupVersion) {
        var err error
        podDisruptionClient, err = policyclient.NewForConfig(c.LoopbackClientConfig)
        if err != nil {
            return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err
        }
    }
    // 1、LegacyAPI 下的 resource RESTStorage 的初始化
    restStorage := LegacyRESTStorage{}

    podTemplateStorage, err := podtemplatestore.NewREST(restOptionsGetter)
    if err != nil {
        return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err
    }
    eventStorage, err := eventstore.NewREST(restOptionsGetter, uint64(c.EventTTL.Seconds()))
    if err != nil {
        return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err
    }
    limitRangeStorage, err := limitrangestore.NewREST(restOptionsGetter)
    if err != nil {
        return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err
    }

    ......

    endpointsStorage, err := endpointsstore.NewREST(restOptionsGetter)
    if err != nil {
        return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err
    }

    nodeStorage, err := nodestore.NewStorage(restOptionsGetter, c.KubeletClientConfig, c.ProxyTransport)
    if err != nil {
        return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err
    }

    // 2、pod RESTStorage 的初始化
    podStorage, err := podstore.NewStorage(......)
    if err != nil {
        return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err
    }
    ......
		
    serviceClusterIPAllocator, err := ipallocator.NewAllocatorCIDRRange(&serviceClusterIPRange, func(max int, rangeSpec string) (allocator. Interface, error) {
        ......
    })
    if err != nil {
        return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, fmt.Errorf("cannot create cluster IP allocator: %v", err)
    }
    restStorage.ServiceClusterIPAllocator = serviceClusterIPRegistry

    var secondaryServiceClusterIPAllocator ipallocator.Interface
    if utilfeature.DefaultFeatureGate.Enabled(features.IPv6DualStack) && c.SecondaryServiceIPRange.IP != nil {
        ......
    }

    var serviceNodePortRegistry rangeallocation.RangeRegistry
    serviceNodePortAllocator, err := portallocator.NewPortAllocatorCustom(c.ServiceNodePortRange, func(max int, rangeSpec string)      (allocator.Interface, error) {
        ......
    })
    if err != nil {
        return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, fmt.Errorf("cannot create cluster port allocator: %v", err)
    }
    restStorage.ServiceNodePortAllocator = serviceNodePortRegistry

    controllerStorage, err := controllerstore.NewStorage(restOptionsGetter)
    if err != nil {
        return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err
    }
    
    serviceRest, serviceRestProxy := servicestore.NewREST(......)
    
    // 3、restStorageMap 保存 resource http path 与 RESTStorage 对应关系
    restStorageMap := map[string]rest.Storage{
        "pods":             podStorage.Pod,
        "pods/attach":      podStorage.Attach,
        "pods/status":      podStorage.Status,
        "pods/log":         podStorage.Log,
        "pods/exec":        podStorage.Exec,
        "pods/portforward": podStorage.PortForward,
        "pods/proxy":       podStorage.Proxy,
        ......
        "componentStatuses": componentstatus.NewStorage(componentStatusStorage{c.StorageFactory}.serversToValidate),
    }
    ......
}

 

 

substore.NewStorage

podstore.NewStorage是一种为 pod 生成存储的方法。该方法的主要作用是为 pod 创建后端存储,并最终返回一个 RESTStorage 对象,该对象调用store.CompleteWithOptions创建后端存储。

k8s.io/kubernetes/pkg/registry/core/pod/storage/storage.go:71

func NewStorage(......) (PodStorage, error) {
    store := &genericregistry.Store{
        NewFunc:                  func() runtime.Object { return &api.Pod{} },
        NewListFunc:              func() runtime.Object { return &api.PodList{} },
        ......
    }
    options := &generic.StoreOptions{
        RESTOptions: optsGetter,
        AttrFunc:    pod.GetAttrs,
        TriggerFunc: map[string]storage.IndexerFunc{"spec.nodeName": pod.NodeNameTriggerFunc},
    }
    
    // 调用 store.CompleteWithOptions
    if err := store.CompleteWithOptions(options); err != nil {
        return PodStorage{}, err
    }
    statusStore := *store
    statusStore.UpdateStrategy = pod.StatusStrategy
    ephemeralContainersStore := *store
    ephemeralContainersStore.UpdateStrategy = pod.EphemeralContainersStrategy

    bindingREST := &BindingREST{store: store}
    
    // PodStorage 对象
    return PodStorage{
        Pod:                 &REST{store, proxyTransport},
        Binding:             &BindingREST{store: store},
        LegacyBinding:       &LegacyBindingREST{bindingREST},
        Eviction:            newEvictionStorage(store, podDisruptionBudgetClient),
        Status:              &StatusREST{store: &statusStore},
        EphemeralContainers: &EphemeralContainersREST{store: &ephemeralContainersStore},
        Log:                 &podrest.LogREST{Store: store, KubeletConn: k},
        Proxy:               &podrest.ProxyREST{Store: store, ProxyTransport: proxyTransport},
        Exec:                &podrest.ExecREST{Store: store, KubeletConn: k},
        Attach:              &podrest.AttachREST{Store: store, KubeletConn: k},
        PortForward:         &podrest.PortForwardREST{Store: store, KubeletConn: k},
    }, nil
}

 

可以看到最终返回的对象中对 pod 的不同操作都是一个 REST 对象。

genericregistry.Store对象自动集成到 REST 中,该store.CompleteWithOptions方法在 `genericregistry.xml 中初始化存储实例。

type REST struct {
    *genericregistry.Store
    proxyTransport http.RoundTripper
}

type BindingREST struct {
    store *genericregistry.Store
}
......

 

 

store.CompleteWithOptions

CompleteWithOptions 主要用于设置 store 中配置的一些默认值,并根据提供的选项更新 store,主要是初始化 store 的后端存储实例。

CompleteWithOptions方法内部,options.RESTOptions.GetRESTOptions方法被调用,最终返回generic.RESTOptions对象,该对象包含了etcd的一些配置,该generic.RESTOptions对象包含了etcd初始化的一些配置,数据序列化方法,以及一个存储。该StorageWithCacher-->NewRawStorage-->Create方法被顺序调用以创建最终的依赖后端存储。

k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store.go:1192

func (e *Store) CompleteWithOptions(options *generic.StoreOptions) error {
    ......

    var isNamespaced bool
    switch {
    case e.CreateStrategy != nil:
        isNamespaced = e.CreateStrategy.NamespaceScoped()
    case e.UpdateStrategy != nil:
        isNamespaced = e.UpdateStrategy.NamespaceScoped()
    default:
        return fmt.Errorf("store for %s must have CreateStrategy or UpdateStrategy set", e.DefaultQualifiedResource.String())
    }
    ......

    // 1、调用 options.RESTOptions.GetRESTOptions 
    opts, err := options.RESTOptions.GetRESTOptions(e.DefaultQualifiedResource)
    if err != nil {
        return err
    }

    // 2、设置 ResourcePrefix 
    prefix := opts.ResourcePrefix
    if !strings.HasPrefix(prefix, "/") {
        prefix = "/" + prefix
    }
    
    if prefix == "/" {
        return fmt.Errorf("store for %s has an invalid prefix %q", e.DefaultQualifiedResource.String(), opts.ResourcePrefix)
    }
    
    if e.KeyRootFunc == nil && e.KeyFunc == nil {
        ......
    }

    keyFunc := func(obj runtime.Object) (string, error) {
        ......
    }

    // 3、以下操作主要是将 opts 对象中的值赋值到 store 对象中
    if e.DeleteCollectionWorkers == 0 {
        e.DeleteCollectionWorkers = opts.DeleteCollectionWorkers
    }

    e.EnableGarbageCollection = opts.EnableGarbageCollection
    if e.ObjectNameFunc == nil {
        ......
    }

    if e.Storage.Storage == nil {
        e.Storage.Codec = opts.StorageConfig.Codec
        var err error
        e.Storage.Storage, e.DestroyFunc, err = opts.Decorator(
            opts.StorageConfig,
            prefix,
            keyFunc,
            e.NewFunc,
            e.NewListFunc,
            attrFunc,
            options.TriggerFunc,
        )
        if err != nil {
            return err
        }
        e.StorageVersioner = opts.StorageConfig.EncodeVersioner

        if opts.CountMetricPollPeriod > 0 {
            stopFunc := e.startObservingCount(opts.CountMetricPollPeriod)
            previousDestroy := e.DestroyFunc
            e.DestroyFunc = func() {
                stopFunc()
                if previousDestroy != nil {
                    previousDestroy()
                }
            }
        }
    }

    return nil
}

 

options.RESTOptions是一个接口,要找到它的GetRESTOptions方法的实现就必须知道方法options.RESTOptions中初始化时对应CreateKubeAPIServerConfig --> buildGenericConfig --> s.Etcd.ApplyWithStorageFactoryTo的实例,对应的实例RESTOptionsStorageFactoryRestOptionsFactory,所以 PodStoragegenericserver.Config.RESTOptionsGetter初始化时构造的store对象中的实际对象类型是StorageFactoryRestOptionsFactory,而其GetRESTOptions方法如下所示。

k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/server/options/etcd.go:253

func (f *StorageFactoryRestOptionsFactory) GetRESTOptions(resource schema.GroupResource) (generic.RESTOptions, error) {
    storageConfig, err := f.StorageFactory.NewConfig(resource)
    if err != nil {
        return generic.RESTOptions{}, fmt.Errorf("unable to find storage destination for %v, due to %v", resource, err.Error())
    }

    ret := generic.RESTOptions{
        StorageConfig:           storageConfig,
        Decorator:               generic.UndecoratedStorage,
        DeleteCollectionWorkers: f.Options.DeleteCollectionWorkers,
        EnableGarbageCollection: f.Options.EnableGarbageCollection,
        ResourcePrefix:          f.StorageFactory.ResourcePrefix(resource),
        CountMetricPollPeriod:   f.Options.StorageConfig.CountMetricPollPeriod,
    }
    if f.Options.EnableWatchCache {
        sizes, err := ParseWatchCacheSizes(f.Options.WatchCacheSizes)
        if err != nil {
            return generic.RESTOptions{}, err
        }
        cacheSize, ok := sizes[resource]
        if !ok {
            cacheSize = f.Options.DefaultWatchCacheSize
        }
        // 调用 generic.StorageDecorator
        ret.Decorator = genericregistry.StorageWithCacher(cacheSize)
    }

    return ret, nil
}

 

genericregistry.StorageWithCacher一个不同的方法被调用,它最终调用factory.Create来初始化存储实例,调用链是:genericregistry.StorageWithCacher --> generic. NewRawStorage --> factory.Create.

k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/factory.go:30

func Create(c storagebackend.Config) (storage.Interface, DestroyFunc, error) {
    switch c.Type {
    case "etcd2":
        return nil, nil, fmt.Errorf("%v is no longer a supported storage backend", c.Type)
    // 目前 k8s 只支持使用 etcd v3
    case storagebackend.StorageTypeUnset, storagebackend.StorageTypeETCD3:
        return newETCD3Storage(c)
    default:
        return nil, nil, fmt.Errorf("unknown storage type: %s", c.Type)
    }
}

 

新ETCD3存储

其中newETCD3Storage,首先调用newETCD3Client创建etcd的客户端,最后使用官方的etcd客户端工具clientv3创建客户端。

k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.go:209

func newETCD3Storage(c storagebackend.Config) (storage.Interface, DestroyFunc, error) {
    stopCompactor, err := startCompactorOnce(c.Transport, c.CompactionInterval)
    if err != nil {
        return nil, nil, err
    }

    client, err := newETCD3Client(c.Transport)
    if err != nil {
        stopCompactor()
        return nil, nil, err
    }

    var once sync.Once
    destroyFunc := func() {
        once.Do(func() {
            stopCompactor()
            client.Close()
        })
    }
    transformer := c.Transformer
    if transformer == nil {
        transformer = value.IdentityTransformer
    }
    return etcd3.New(client, c.Codec, c.Prefix, transformer, c.Paging), destroyFunc, nil
}

至此,pod资源中store的构建分析基本完成,不同资源对应一个REST对象,依次引用该genericregistry.Store对象,最后初始化`genericregistry。在分析了 store 的初始化之后,还有一个重要的步骤是路由的注册。路由注册的主要过程是根据不同的动词为资源建立一个http路径,并将路径绑定到对应的handler上。

 

 

路由注册

legacyRESTStorageProvider.NewLegacyRESTStorage上面 RESTStorage的构造对应InstallLegacyAPI. 下面继续分析中m.GenericAPIServer.InstallLegacyAPIGroup方法的实现InstallLegacyAPI

k8s.io/kubernetes/pkg/master/master.go:406

func (m *Master) InstallLegacyAPI(......) error {
    legacyRESTStorage, apiGroupInfo, err := legacyRESTStorageProvider.NewLegacyRESTStorage(restOptionsGetter)
    if err != nil {
        return fmt.Errorf("Error building core storage: %v", err)
    }
    ......

    if err := m.GenericAPIServer.InstallLegacyAPIGroup(genericapiserver.DefaultLegacyAPIPrefix, &apiGroupInfo); err != nil {
        return fmt.Errorf("Error in registering group versions: %v", err)
    }
    return nil
}

for 的调用链m.GenericAPIServer.InstallLegacyAPIGroup很深,最终为 Group 下的每个 API 资源注册了 handler 和路由信息,即:m.GenericAPIServer. InstallLegacyAPIGroup --> s.installAPIResources --> apiGroupVersion.InstallREST --> installer.Install --> a.registerResourceHandlers. 其中几种方法的工作原理如下所示。

  • s.installAPIResources:为每个 API 资源调用添加一个路由到apiGroupVersion.InstallREST.
  • apiGroupVersion.InstallREST: 将restful.WebServic对象添加到容器中。
  • installer.Install: 返回最终restful.WebService对象

 

 

a.registerResourceHandlers

此方法实现从rest.Storage到的转换restful.Route。它首先确定 API Resource 支持的 REST 接口,然后为 REST 接口添加相应的处理程序,最后将其注册到路由中。

k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/endpoints/installer.go:181

func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storage, ws *restful.WebService) (*metav1.APIResource, error) {       
    admit := a.group.Admit

    ......
   
    // 1、判断该 resource 实现了哪些 REST 操作接口,以此来判断其支持的 verbs 以便为其添加路由
    creater, isCreater := storage.(rest.Creater)
    namedCreater, isNamedCreater := storage.(rest.NamedCreater)
    lister, isLister := storage.(rest.Lister)
    getter, isGetter := storage.(rest.Getter)
    getterWithOptions, isGetterWithOptions := storage.(rest.GetterWithOptions)
    gracefulDeleter, isGracefulDeleter := storage.(rest.GracefulDeleter)
    collectionDeleter, isCollectionDeleter := storage.(rest.CollectionDeleter)
    updater, isUpdater := storage.(rest.Updater)
    patcher, isPatcher := storage.(rest.Patcher)
    watcher, isWatcher := storage.(rest.Watcher)
    connecter, isConnecter := storage.(rest.Connecter)
    storageMeta, isMetadata := storage.(rest.StorageMetadata)
    storageVersionProvider, isStorageVersionProvider := storage.(rest.StorageVersionProvider)
    if !isMetadata {
        storageMeta = defaultStorageMetadata{}
    }
    exporter, isExporter := storage.(rest.Exporter)
    if !isExporter {
        exporter = nil
    }

    ......
    
    // 2、为 resource 添加对应的 actions 并根据是否支持 namespace 
    switch {
    case !namespaceScoped:
        ......

        actions = appendIf(actions, action{"LIST", resourcePath, resourceParams, namer, false}, isLister)
        actions = appendIf(actions, action{"POST", resourcePath, resourceParams, namer, false}, isCreater)
        actions = appendIf(actions, action{"DELETECOLLECTION", resourcePath, resourceParams, namer, false}, isCollectionDeleter)
        actions = appendIf(actions, action{"WATCHLIST", "watch/" + resourcePath, resourceParams, namer, false}, allowWatchList)

        actions = appendIf(actions, action{"GET", itemPath, nameParams, namer, false}, isGetter)
        if getSubpath {
            actions = appendIf(actions, action{"GET", itemPath + "/{path:*}", proxyParams, namer, false}, isGetter)
        }
        actions = appendIf(actions, action{"PUT", itemPath, nameParams, namer, false}, isUpdater)
        actions = appendIf(actions, action{"PATCH", itemPath, nameParams, namer, false}, isPatcher)
        actions = appendIf(actions, action{"DELETE", itemPath, nameParams, namer, false}, isGracefulDeleter)
        actions = appendIf(actions, action{"WATCH", "watch/" + itemPath, nameParams, namer, false}, isWatcher)
        actions = appendIf(actions, action{"CONNECT", itemPath, nameParams, namer, false}, isConnecter)
        actions = appendIf(actions, action{"CONNECT", itemPath + "/{path:*}", proxyParams, namer, false}, isConnecter && connectSubpath)
    default:
        ......
        actions = appendIf(actions, action{"LIST", resourcePath, resourceParams, namer, false}, isLister)
        actions = appendIf(actions, action{"POST", resourcePath, resourceParams, namer, false}, isCreater)
        actions = appendIf(actions, action{"DELETECOLLECTION", resourcePath, resourceParams, namer, false}, isCollectionDeleter)
        actions = appendIf(actions, action{"WATCHLIST", "watch/" + resourcePath, resourceParams, namer, false}, allowWatchList)

        actions = appendIf(actions, action{"GET", itemPath, nameParams, namer, false}, isGetter)
        ......
    }

    // 3、根据 action 创建对应的 route
    kubeVerbs := map[string]struct{}{}
    reqScope := handlers.RequestScope{
        Serializer:      a.group.Serializer,
        ParameterCodec:  a.group.ParameterCodec,
        Creater:         a.group.Creater,
        Convertor:       a.group.Convertor,
        ......
    }
    ......
    // 4、从 rest.Storage 到 restful.Route 映射
    // 为每个操作添加对应的 handler
    for _, action := range actions {
        ......
        verbOverrider, needOverride := storage.(StorageMetricsOverride)
        switch action.Verb {
        case "GET": ......
        case "LIST":
        case "PUT":
        case "PATCH":
        // 此处以 POST 操作进行说明
        case "POST": 
            var handler restful.RouteFunction
            // 5、初始化 handler
            if isNamedCreater {
                handler = restfulCreateNamedResource(namedCreater, reqScope, admit)
            } else {
                handler = restfulCreateResource(creater, reqScope, admit)
            }
            handler = metrics.InstrumentRouteFunc(action.Verb, group, version, resource, subresource, requestScope, metrics.APIServerComponent, handler)
            article := GetArticleForNoun(kind, " ")
            doc := "create" + article + kind
            if isSubresource {
                doc = "create " + subresource + " of" + article + kind
            }
            // 6、route 与 handler 进行绑定
            route := ws.POST(action.Path).To(handler).
                Doc(doc).
                Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")).
                Operation("create"+namespaced+kind+strings.Title(subresource)+operationSuffix).
                Produces(append(storageMeta.ProducesMIMETypes(action.Verb), mediaTypes...)...).
                Returns(http.StatusOK, "OK", producedObject).
                Returns(http.StatusCreated, "Created", producedObject).
                Returns(http.StatusAccepted, "Accepted", producedObject).
                Reads(defaultVersionedObject).
                Writes(producedObject)
            if err := AddObjectParams(ws, route, versionedCreateOptions); err != nil {
                return nil, err
            }
            addParams(route, action.Params)
            // 7、添加到路由中
            routes = append(routes, route)
        case "DELETE": 
        case "DELETECOLLECTION":
        case "WATCH":
        case "WATCHLIST":
        case "CONNECT":
        default:
    }
    ......
    return &apiResource, nil
}

 

 

restfulCreateNamedResource

restfulCreateNamedResource是POST操作的handler,最终会通过调用createHandler方法来完成。

k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/endpoints/installer.go:1087

func restfulCreateNamedResource(r rest.NamedCreater, scope handlers.RequestScope, admit admission.Interface) restful.RouteFunction {
    return func(req *restful.Request, res *restful.Response) {
        handlers.CreateNamedResource(r, &scope, admit)(res.ResponseWriter, req.Request)
    }
}

func CreateNamedResource(r rest.NamedCreater, scope *RequestScope, admission admission.Interface) http.HandlerFunc {
    return createHandler(r, scope, admission, true)
}

 

创建处理程序

createHandler是向后端存储写入数据的方法,对资源的操作有权限控制,在里面会先执行and操作,然后调用该createHandler方法decoder完成资源的创建,在方法,最后将数据保存到后端存储。该操作执行 kube-apiserver 中的准入插件。admission-plugins 初始化为admissionChain in ,初始化调用链为,最后in将所有启用的插件包装成admissionChain ,这里要执行的admit 操作就是admission-plugins 中的admit 操作。admissioncreatecreatevalidateadmitCreateKubeAPIServerConfigCreateKubeAPIServerConfig --> buildGenericConfig --> s.Admission.ApplyTo --> a.GenericAdmission.ApplyTo --> a.Plugins.NewFromPluginsNewFromPlugins

调用的create方法createHandlergenericregistry.Store对象的方法。genericregistry.Store在 RESTStorage 的每个资源初始化时都会引入该对象中

所有操作createHandler都是本文开头提到的请求流程,如下。

v1beta1 ⇒ internal ⇒    |    ⇒       |    ⇒  v1  ⇒ json/yaml ⇒ etcd
                     admission    validation

 

k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/create.go:46

func createHandler(r rest.NamedCreater, scope *RequestScope, admit admission.Interface, includeName bool) http.HandlerFunc {
    return func(w http.ResponseWriter, req *http.Request) {
        trace := utiltrace.New("Create", utiltrace.Field{"url", req.URL.Path})
        defer trace.LogIfLong(500 * time.Millisecond)
        ......

        gv := scope.Kind.GroupVersion()
        // 1、得到合适的SerializerInfo
        s, err := negotiation.NegotiateInputSerializer(req, false, scope.Serializer)
        if err != nil {
            scope.err(err, w, req)
            return
        }
        // 2、找到合适的 decoder
        decoder := scope.Serializer.DecoderToVersion(s.Serializer, scope.HubGroupVersion)

        body, err := limitedReadBody(req, scope.MaxRequestBodyBytes)
        if err != nil {
            scope.err(err, w, req)
            return
        }

        ......

        defaultGVK := scope.Kind
        original := r.New()
        trace.Step("About to convert to expected version")
        // 3、decoder 解码
        obj, gvk, err := decoder.Decode(body, &defaultGVK, original)
        ......

        ae := request.AuditEventFrom(ctx)
        admit = admission.WithAudit(admit, ae)
        audit.LogRequestObject(ae, obj, scope.Resource, scope.Subresource, scope.Serializer)

        userInfo, _ := request.UserFrom(ctx)


        if len(name) == 0 {
            _, name, _ = scope.Namer.ObjectName(obj)
        }
        // 4、执行 admit 操作,即执行 kube-apiserver 启动时加载的 admission-plugins,
        admissionAttributes := admission.NewAttributesRecord(......)
        if mutatingAdmission, ok := admit.(admission.MutationInterface); ok && mutatingAdmission.Handles(admission.Create) {
            err = mutatingAdmission.Admit(ctx, admissionAttributes, scope)
            if err != nil {
                scope.err(err, w, req)
                return
            }
        }

        ......
        // 5、执行 create 操作
        result, err := finishRequest(timeout, func() (runtime.Object, error) {
            return r.Create(
                ctx,
                name,
                obj,
                rest.AdmissionToValidateObjectFunc(admit, admissionAttributes, scope),
                options,
            )
        })
        ......
    }
}

 

 

概括

本文分析了kube-apiserver的启动过程。kube-apiserver 包含三个服务器,分别是 KubeAPIServer、APIExtensionsServer 和 AggregatorServer,它们通过委托方式连接在一起。初始化过程类似,先为每个服务器创建对应的config,然后初始化http服务器,http服务器初始化过程就是先初始化GoRestfulContainer,然后安装服务器自带的API,安装API时先为每个API创建对应的后端存储RES安装API时,先为每个API Resource创建对应的后端存储RESTStorage,然后为每个API添加对应的handler API Resource 支持动词,并将handler注册到路由,最后将路由注册到webservice,RESTFul API的实现流程是启动流程的核心。启动过程的核心是 RESTFul API 的实现。至于kube-apiserver中鉴权鉴权等过滤器的实现、多版本资源转换、kubernetes服务实现等细节,我们会在后面的文章中继续分析。

 

 

发表评论