kube-apiserver的设计与实现
文章目录
kube-apiserver 是 kubernetes 中的一个组件,它直接与 etcd 交互并控制 kubernetes 中核心资源的更改。它提供以下主要功能。
- 提供Kubernetes API,包括鉴权授权、数据验证、集群状态变化,供客户端和其他组件调用。
- 代理集群中的一些附加组件组件,例如 Kubernetes UI、metrics-server、npd 等。
- 创建kubernetes服务,即提供apiserver的Service,kubernetes Service。
- 不同版本之间的资源转换。
kube-apiserver 处理流程
kube-apiserver 与其他组件交互主要是通过对外提供 API 来实现的,可以通过调用 kube-apiserver 的接口$ curl -k https://<masterIP>:6443
或者通过其提供的 swagger-ui来获取,主要有以下三个 API。
- 核心群体:主要在
/api/v1
. - 命名组:其路径为
/apis/$NAME/$VERSION
. - 一些暴露系统状态的 API:如
/metrics
,/healthz
等。
API 的 URL 大致由 组成/apis/group/version/namespaces/my-ns/myresource
,其中 API 的结构大致如下图所示。
在了解了 kube-apiserver 的 API 之后,下一节将介绍 kube-apiserver 如何处理 API 请求。请求的完整流程如下图所示。
此处显示了 POST 请求的示例。当请求到达kube-apiserver时,kube-apiserver首先执行http过滤器链中注册的过滤器链,该过滤器链进行一系列过滤操作,主要是鉴权、鉴权等校验操作。当过滤器链完成后,请求被路由到相应的处理程序,该处理程序主要与 etcd 交互。处理程序中的主要操作如下
Decoder
kubernetes 中的大多数资源都会有一个internal version
,因为一个资源在整个开发过程中可能有多个版本。例如,部署将具有extensions/v1beta1
, apps/v1
. 为了避免问题,kube-apiserver 必须知道如何在每对版本之间进行转换(例如,v1⇔v1alpha1、v1⇔v1beta1、v1beta1⇔v1alpha1)。因此它使用了一个特殊的internal version
,它是一个通用的版本,它包含了版本的所有字段并且具有所有版本的功能。解码器会先将 creater 对象转换为internal version
,然后再转换为storage version
,这是存储在 etcd 中时的另一个版本。
解码时,首先从HTTP路径中获取期望的版本,然后使用scheme创建一个与正确版本匹配的空对象,并使用JSON或protobuf解码器进行转换,在转换的第一步,如果用户省略了一些字段,解码器将它们设置为默认值。
Admission
解码完成后,您需要通过验证集群的全局约束并根据集群配置设置默认值来检查是否可以创建或更新对象。您可以在目录中看到所有 kube-apiserver 可以使用的全局约束插件k8s.io/kubernetes/plugin/pkg/admission
。kube-apiserver 通过将-enable-admission-plugins
参数设置为 -ValidatingAdmissionWebhook
or -MutatingAdmissionWebhook
来启动。
Validation
主要检查对象中字段的合法性。
在handler中执行完以上操作后,会执行最后一个与etcd相关的操作,POST操作会将数据写入etcd。以上在handler中的主要处理流程如下图所示。
v1beta1 ⇒ internal ⇒ | ⇒ | ⇒ v1 ⇒ json/yaml ⇒ etcd admission validation
kube-apiserver 中的组件
kube-apiserver 由 3 个组件(Aggregator、KubeAPIServer、APIExtensionServer)组成,它们通过委托依次处理请求。
- Aggregator:暴露类似七层负载均衡的功能,拦截用户请求转发到其他服务器,负责整个APIServer的Discovery功能。
- KubeAPIServer:负责一些通用的请求处理、认证、鉴权等,以及处理每个内置资源的 REST 服务。
- APIExtensionServer:主要处理CustomResourceDefinition(CRD)和CustomResource(CR)REST请求,是Delegation的最后一个环节,如果对应的CR不能处理,返回404。
Aggregator 和 APIExtensionsServer 分别对应扩展 APIServer 资源的两种主要方式,即 AA 和 CRD。
Aggregator 聚合器
Aggregator 通过将 APIServices 对象关联到一个 Service 来转发请求,而与之关联的 Service 类型进一步决定了请求转发的形式。聚合器由一个GenericAPIServer
和一个控制器组成,控制器保持自己的状态。主要处理组下APIService资源的GenericAPIServer
请求apiregistration.k8s.io
。
Aggregator 除了处理资源请求外,还包含几个控制器:
apiserviceRegistrationController
:负责APIServices中资源的注册和删除。availableConditionController
:维护APIServices的可用性状态,包括其引用的Service是否可用等。autoRegistrationController
:用于维护 API 中存在的一组特定 APIService。crdRegistrationController
:负责将 CRD GroupVersions 自动注册到 APIServices 中。openAPIAggregationController
:将 APIServices 资源的更改同步到提供的 OpenAPI 文档。
Kubernetes 中的一些附加组件,例如 metrics-server 通过 Aggregator 的方式进行扩展,可以在真实环境中使用apiserver-builder工具轻松创建自定义资源作为 Aggregator 扩展。
启用 API 聚合
需要在 kube-apiserver 中添加以下配置以启用 API 聚合。
--proxy-client-cert-file=/etc/kubernetes/certs/proxy.crt --proxy-client-key-file=/etc/kubernetes/certs/proxy.key --requestheader-client-ca-file=/etc/kubernetes/certs/proxy-ca.crt --requestheader-allowed-names=aggregator --requestheader-extra-headers-prefix=X-Remote-Extra- --requestheader-group-headers=X-Remote-Group --requestheader-username-headers=X-Remote-User
KubeAPI 服务器
KubeAPIServer 主要提供 API Resource 操作的请求,为 kubernetes 中的许多 API 注册路由信息,暴露 RESTful API,向公众提供 kubernetes 服务。
它使集群内外的服务能够通过 RESTful API 操作 Kubernetes 中的资源。
APIExtensionServer
APIExtensionServer 是委托链的最后一层,是处理用户通过自定义资源定义定义的所有资源的资源服务器。
包含的控制器及其功能如下所示。
openapiController
: 将 crd 资源的更改同步到提供的 OpenAPI 文档中,可以访问/openapi/v2
.crdController
: 负责将 crd 信息注册到 apiVersions 和 apiResources 中,两者都可以通过$ kubectl api-versions
和来查看$ kubectl api-resources
。namingController
:检查 crd obj 中的命名冲突,可在 crd 中查看.status.conditions
。establishingController
: 检查 crd 是否处于正常状态,在 crd 中可用.status.conditions
。nonStructuralSchemaController
: 检查 crd obj 结构是否正常,在 crd 中可用.status.conditions
。apiApprovalController
: 检查 crd 是否遵循 kubernetes API 声明策略,在 crd 中可用.status.conditions
。finalizingController
: 一个类似于finalizes的函数,与CRs的移除有关。
KubeAPI 服务器
KubeAPIServer 主要提供 API Resource 操作的请求,为 kubernetes 中的许多 API 注册路由信息,暴露 RESTful API,向公众提供 kubernetes 服务。
它使集群内外的服务能够通过 RESTful API 操作 Kubernetes 中的资源。
APIExtensionServer
APIExtensionServer 是委托链的最后一层,是处理用户通过自定义资源定义定义的所有资源的资源服务器。
包含的控制器及其功能如下所示。
openapiController
: 将 crd 资源的更改同步到提供的 OpenAPI 文档中,可以访问/openapi/v2
.crdController
: 负责将 crd 信息注册到 apiVersions 和 apiResources 中,两者都可以通过$ kubectl api-versions
和来查看$ kubectl api-resources
。namingController
:检查 crd obj 中的命名冲突,可在 crd 中查看.status.conditions
。establishingController
: 检查 crd 是否处于正常状态,在 crd 中可用.status.conditions
。nonStructuralSchemaController
: 检查 crd obj 结构是否正常,在 crd 中可用.status.conditions
。apiApprovalController
: 检查 crd 是否遵循 kubernetes API 声明策略,在 crd 中可用.status.conditions
。finalizingController
: 一个类似于finalizes的函数,与CRs的移除有关。
kube-apiserver 启动过程分析
Kubernetes版本:v1.16
首先,我们分析一下kube-apiserver是如何启动的。Run
kube-apiserver 也是通过它的方法启动主逻辑,在Run
调用该方法之前,会解析命令行参数,设置默认值等。
Run
该Run
方法的主要逻辑是。
- 调用
CreateServerChain
构建服务调用链,判断是否启动非安全http服务器。http服务器链包含apiserver要启动的三台服务器,以及为每台服务器注册相应资源的路由。 - 调用
server.PrepareRun
准备运行的服务,主要完成OpenAPI
路由的健康检查、生存检查和注册。 - 调用
prepared.Run
以启动 https 服务器。
服务器使用委托模式进行初始化,基本的API Server、CustomResource和Aggregator服务通过DelegationTarget接口链接在一起,对外提供服务。
k8s.io/kubernetes/cmd/kube-apiserver/app/server.go:147
func Run(completeOptions completedServerRunOptions, stopCh <-chan struct{}) error { server, err := CreateServerChain(completeOptions, stopCh) if err != nil { return err } prepared, err := server.PrepareRun() if err != nil { return err } return prepared.Run(stopCh) }
CreateServerChain
CreateServerChain
是完成服务端初始化的方法,包含初始化,最后返回实例的所有过程APIExtensionsServer
,初始化过程主要包括:http过滤链的配置,API Group的注册,http路径与handler的关联以及处理程序后端存储 etcd 的配置。主要逻辑如下。KubeAPIServer
AggregatorServer
aggregatorapiserver. APIAggregator
- 调用创建KubeAPIServer需要的配置
CreateKubeAPIServerConfig
,主要是createmaster.Config
,会调用buildGenericConfig
生成genericConfig,其中包含apiserver核心配置的核心配置。 - 确定是否启用了扩展 API 服务器并调用
createAPIExtensionsConfig
为其创建配置。apiExtensions server 是 kubeapiserver 中其他服务器的代理服务,例如 metric-server。 - 调用
createAPIExtensionsServer
以创建 apiExtensionsServer 的实例。 - 调用
CreateKubeAPIServer
以初始化 kubeAPIServer。 - 调用
createAggregatorConfig
以创建 aggregatorServer 的配置并调用createAggregatorServer
以初始化 aggregatorServer。 - 配置并确定是否启动非安全http服务器。
k8s.io/kubernetes/cmd/kube-apiserver/app/server.go:165
func CreateServerChain(completedOptions completedServerRunOptions, stopCh <-chan struct{}) (*aggregatorapiserver.APIAggregator, error) { nodeTunneler, proxyTransport, err := CreateNodeDialer(completedOptions) if err != nil { return nil, err } // 1、为 kubeAPIServer 创建配置 kubeAPIServerConfig, insecureServingInfo, serviceResolver, pluginInitializer, admissionPostStartHook, err := CreateKubeAPIServerConfig(completedOptions, nodeTunneler, proxyTransport) if err != nil { return nil, err } // 2、判断是否配置了 APIExtensionsServer,创建 apiExtensionsConfig apiExtensionsConfig, err := createAPIExtensionsConfig(*kubeAPIServerConfig.GenericConfig, kubeAPIServerConfig.ExtraConfig.VersionedInformers, pluginInitializer, completedOptions.ServerRunOptions, completedOptions.MasterCount, serviceResolver, webhook.NewDefaultAuthenticationInfoResolverWrapper(proxyTransport, kubeAPIServerConfig.GenericConfig.LoopbackClientConfig)) if err != nil { return nil, err } // 3、初始化 APIExtensionsServer apiExtensionsServer, err := createAPIExtensionsServer(apiExtensionsConfig, genericapiserver.NewEmptyDelegate()) if err != nil { return nil, err } // 4、初始化 KubeAPIServer kubeAPIServer, err := CreateKubeAPIServer(kubeAPIServerConfig, apiExtensionsServer.GenericAPIServer, admissionPostStartHook) if err != nil { return nil, err } // 5、创建 AggregatorConfig aggregatorConfig, err := createAggregatorConfig(*kubeAPIServerConfig.GenericConfig, completedOptions.ServerRunOptions, kubeAPIServerConfig. ExtraConfig.VersionedInformers, serviceResolver, proxyTransport, pluginInitializer) if err != nil { return nil, err } // 6、初始化 AggregatorServer aggregatorServer, err := createAggregatorServer(aggregatorConfig, kubeAPIServer.GenericAPIServer, apiExtensionsServer.Informers) if err != nil { return nil, err } // 7、判断是否启动非安全端口的 http server if insecureServingInfo != nil { insecureHandlerChain := kubeserver.BuildInsecureHandlerChain(aggregatorServer.GenericAPIServer.UnprotectedHandler(), kubeAPIServerConfig.GenericConfig) if err := insecureServingInfo.Serve(insecureHandlerChain, kubeAPIServerConfig.GenericConfig.RequestTimeout, stopCh); err != nil { return nil, err } } return aggregatorServer, nil }
创建KubeAPIServerConfig
主要目的CreateKubeAPIServerConfig
是调用buildGenericConfig
创建 genericConfig 并构建 master.Config 对象。
k8s.io/kubernetes/cmd/kube-apiserver/app/server.go:271
func CreateKubeAPIServerConfig( s completedServerRunOptions, nodeTunneler tunneler.Tunneler, proxyTransport *http.Transport, ) (......) { // 1、构建 genericConfig genericConfig, versionedInformers, insecureServingInfo, serviceResolver, pluginInitializers, admissionPostStartHook, storageFactory, lastErr = buildGenericConfig(s.ServerRunOptions, proxyTransport) if lastErr != nil { return } ...... // 2、初始化所支持的 capabilities capabilities.Initialize(capabilities.Capabilities{ AllowPrivileged: s.AllowPrivileged, PrivilegedSources: capabilities.PrivilegedSources{ HostNetworkSources: []string{}, HostPIDSources: []string{}, HostIPCSources: []string{}, }, PerConnectionBandwidthLimitBytesPerSec: s.MaxConnectionBytesPerSec, }) // 3、获取 service ip range 以及 api server service IP serviceIPRange, apiServerServiceIP, lastErr := master.DefaultServiceIPRange(s.PrimaryServiceClusterIPRange) if lastErr != nil { return } ...... // 4、构建 master.Config 对象 config = &master.Config{......} if nodeTunneler != nil { config.ExtraConfig.KubeletClientConfig.Dial = nodeTunneler.Dial } if config.GenericConfig.EgressSelector != nil { config.ExtraConfig.KubeletClientConfig.Lookup = config.GenericConfig.EgressSelector.Lookup } return }
构建通用配置
主要逻辑如下。
- 调用
genericapiserver.NewConfig
生成默认的genericConfig,genericConfig主要配置DefaultBuildHandlerChain
,DefaultBuildHandlerChain
包含一系列用于认证、鉴权、鉴权的http过滤器链,以及一系列http过滤器链。 - 调用
master.DefaultAPIResourceConfigSource
加载需要启用的API资源,在k8s.io/api
代码目录下可以看到集群中所有的API资源,随着版本的迭代也会不断变化。 - 为 genericConfig 中的一些字段设置默认值。
- 调用
completedStorageFactoryConfig.New
创建 storageFactory,稍后会使用它为每个 API Resource 创建对应的 RESTStorage。
k8s.io/kubernetes/cmd/kube-apiserver/app/server.go:386
func buildGenericConfig( s *options.ServerRunOptions, proxyTransport *http.Transport, ) (......) { // 1、为 genericConfig 设置默认值 genericConfig = genericapiserver.NewConfig(legacyscheme.Codecs) genericConfig.MergedResourceConfig = master.DefaultAPIResourceConfigSource() if lastErr = s.GenericServerRunOptions.ApplyTo(genericConfig); lastErr != nil { return } ...... genericConfig.OpenAPIConfig = genericapiserver.DefaultOpenAPIConfig(......) genericConfig.OpenAPIConfig.Info.Title = "Kubernetes" genericConfig.LongRunningFunc = filters.BasicLongRunningRequestCheck( sets.NewString("watch", "proxy"), sets.NewString("attach", "exec", "proxy", "log", "portforward"), ) kubeVersion := version.Get() genericConfig.Version = &kubeVersion storageFactoryConfig := kubeapiserver.NewStorageFactoryConfig() storageFactoryConfig.ApiResourceConfig = genericConfig.MergedResourceConfig completedStorageFactoryConfig, err := storageFactoryConfig.Complete(s.Etcd) if err != nil { lastErr = err return } // 初始化 storageFactory storageFactory, lastErr = completedStorageFactoryConfig.New() if lastErr != nil { return } if genericConfig.EgressSelector != nil { storageFactory.StorageConfig.Transport.EgressLookup = genericConfig.EgressSelector.Lookup } // 2、初始化 RESTOptionsGetter,后期根据其获取操作 Etcd 的句柄,同时添加 etcd 的健康检查方法 if lastErr = s.Etcd.ApplyWithStorageFactoryTo(storageFactory, genericConfig); lastErr != nil { return } // 3、设置使用 protobufs 用来内部交互,并且禁用压缩功能 genericConfig.LoopbackClientConfig.ContentConfig.ContentType = "application/vnd.kubernetes.protobuf" genericConfig.LoopbackClientConfig.DisableCompression = true // 4、创建 clientset kubeClientConfig := genericConfig.LoopbackClientConfig clientgoExternalClient, err := clientgoclientset.NewForConfig(kubeClientConfig) if err != nil { lastErr = fmt.Errorf("failed to create real external clientset: %v", err) return } versionedInformers = clientgoinformers.NewSharedInformerFactory(clientgoExternalClient, 10*time.Minute) // 5、创建认证实例,支持多种认证方式:请求 Header 认证、Auth 文件认证、CA 证书认证、Bearer token 认证、 // ServiceAccount 认证、BootstrapToken 认证、WebhookToken 认证等 genericConfig.Authentication.Authenticator, genericConfig.OpenAPIConfig.SecurityDefinitions, err = BuildAuthenticator(s, clientgoExternalClient, versionedInformers) if err != nil { lastErr = fmt.Errorf("invalid authentication config: %v", err) return } // 6、创建鉴权实例,包含:Node、RBAC、Webhook、ABAC、AlwaysAllow、AlwaysDeny genericConfig.Authorization.Authorizer, genericConfig.RuleResolver, err = BuildAuthorizer(s, versionedInformers) ...... serviceResolver = buildServiceResolver(s.EnableAggregatorRouting, genericConfig.LoopbackClientConfig.Host, versionedInformers) authInfoResolverWrapper := webhook.NewDefaultAuthenticationInfoResolverWrapper(proxyTransport, genericConfig.LoopbackClientConfig) // 7、审计插件的初始化 lastErr = s.Audit.ApplyTo(......) if lastErr != nil { return } // 8、准入插件的初始化 pluginInitializers, admissionPostStartHook, err = admissionConfig.New(proxyTransport, serviceResolver) if err != nil { lastErr = fmt.Errorf("failed to create admission plugin initializer: %v", err) return } err = s.Admission.ApplyTo(......) if err != nil { lastErr = fmt.Errorf("failed to initialize admission: %v", err) } return }
以上分析重点是 KubeAPIServerConfig 的初始化。其他两个server config的初始化暂不详细分析,我们继续分析server的初始化。
createAPIExtensionsServer
APIExtensionsServer 最先被初始化,通过如下逻辑调用初始化apiextensionsConfig.Complete().New
服务器createAPIExtensionsServer
。
- 首先调用
c.GenericConfig.New
根据go-restful
模式初始化Container,在c.GenericConfig.New
调用NewAPIServerHandler
中初始化handler,APIServerHandler是APIServer使用的Handler类型,包括go-restful
andnon-go-restful
,和一个你在两者之间选择的Director对象,go-restful
用来处理注册的handler和non-go restful
是用于处理不存在的处理程序,API URI 处理的选择过程是:FullHandlerChain-> Director -> {GoRestfulContainer, NonGoRestfulMux}
. 在c.GenericConfig.New
,installAPI
中也调用添加路由信息包括/
,/debug/*
,/metrics
,/version
等。这三种服务器都是通过调用c.GenericConfig.New
初始化genericServer然后注册API来初始化的。 - 调用
s.GenericAPIServer.InstallAPIGroup
注册路由中的API资源,这个方法的调用链很深,主要是注册要暴露给服务端的API资源,从而可以通过http接口对资源进行REST操作。其他几个服务器也在InstallAPI
初始化期间执行相应的操作。 - 初始化服务器中需要用到的控制器,主要是
openapiController
,crdController
,namingController
,establishingController
,nonStructuralSchemaController
,apiApprovalController
,finalizingControlle
r。 - 将需要启动的控制器和通知器添加到 PostStartHook。
k8s.io/kubernetes/cmd/kube-apiserver/app/apiextensions.go:94
func createAPIExtensionsServer(apiextensionsConfig *apiextensionsapiserver.Config, delegateAPIServer genericapiserver.DelegationTarget) (* apiextensionsapiserver.CustomResourceDefinitions, error) { return apiextensionsConfig.Complete().New(delegateAPIServer) }
k8s.io/kubernetes/staging/src/k8s.io/apiextensions-apiserver/pkg/apiserver/apiserver.go:132
func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) (*CustomResourceDefinitions, error) { // 1、初始化 genericServer genericServer, err := c.GenericConfig.New("apiextensions-apiserver", delegationTarget) if err != nil { return nil, err } s := &CustomResourceDefinitions{ GenericAPIServer: genericServer, } // 2、初始化 APIGroup Info,APIGroup 指该 server 需要暴露的 API apiResourceConfig := c.GenericConfig.MergedResourceConfig apiGroupInfo := genericapiserver.NewDefaultAPIGroupInfo(apiextensions.GroupName, Scheme, metav1.ParameterCodec, Codecs) if apiResourceConfig.VersionEnabled(v1beta1.SchemeGroupVersion) { storage := map[string]rest.Storage{} customResourceDefintionStorage := customresourcedefinition.NewREST(Scheme, c.GenericConfig.RESTOptionsGetter) storage["customresourcedefinitions"] = customResourceDefintionStorage storage["customresourcedefinitions/status"] = customresourcedefinition.NewStatusREST(Scheme, customResourceDefintionStorage) apiGroupInfo.VersionedResourcesStorageMap[v1beta1.SchemeGroupVersion.Version] = storage } if apiResourceConfig.VersionEnabled(v1.SchemeGroupVersion) { ...... } // 3、注册 APIGroup if err := s.GenericAPIServer.InstallAPIGroup(&apiGroupInfo); err != nil { return nil, err } // 4、初始化需要使用的 controller crdClient, err := internalclientset.NewForConfig(s.GenericAPIServer.LoopbackClientConfig) if err != nil { return nil, fmt.Errorf("failed to create clientset: %v", err) } s.Informers = internalinformers.NewSharedInformerFactory(crdClient, 5*time.Minute) ...... establishingController := establish.NewEstablishingController(s.Informers.Apiextensions().InternalVersion(). CustomResourceDefinitions(), crdClient.Apiextensions()) crdHandler, err := NewCustomResourceDefinitionHandler(......) if err != nil { return nil, err } s.GenericAPIServer.Handler.NonGoRestfulMux.Handle("/apis", crdHandler) s.GenericAPIServer.Handler.NonGoRestfulMux.HandlePrefix("/apis/", crdHandler) crdController := NewDiscoveryController(s.Informers.Apiextensions().InternalVersion().CustomResourceDefinitions(), versionDiscoveryHandler, groupDiscoveryHandler) namingController := status.NewNamingConditionController(s.Informers.Apiextensions().InternalVersion().CustomResourceDefinitions(), crdClient.Apiextensions()) nonStructuralSchemaController := nonstructuralschema.NewConditionController(s.Informers.Apiextensions().InternalVersion(). CustomResourceDefinitions(), crdClient.Apiextensions()) apiApprovalController := apiapproval.NewKubernetesAPIApprovalPolicyConformantConditionController(s.Informers.Apiextensions(). InternalVersion().CustomResourceDefinitions(), crdClient.Apiextensions()) finalizingController := finalizer.NewCRDFinalizer( s.Informers.Apiextensions().InternalVersion().CustomResourceDefinitions(), crdClient.Apiextensions(), crdHandler, ) var openapiController *openapicontroller.Controller if utilfeature.DefaultFeatureGate.Enabled(apiextensionsfeatures.CustomResourcePublishOpenAPI) { openapiController = openapicontroller.NewController(s.Informers.Apiextensions().InternalVersion().CustomResourceDefinitions()) } // 5、将 informer 以及 controller 添加到 PostStartHook 中 s.GenericAPIServer.AddPostStartHookOrDie("start-apiextensions-informers", func(context genericapiserver.PostStartHookContext) error { s.Informers.Start(context.StopCh) return nil }) s.GenericAPIServer.AddPostStartHookOrDie("start-apiextensions-controllers", func(context genericapiserver.PostStartHookContext) error { ...... go crdController.Run(context.StopCh) go namingController.Run(context.StopCh) go establishingController.Run(context.StopCh) go nonStructuralSchemaController.Run(5, context.StopCh) go apiApprovalController.Run(5, context.StopCh) go finalizingController.Run(5, context.StopCh) return nil }) s.GenericAPIServer.AddPostStartHookOrDie("crd-informer-synced", func(context genericapiserver.PostStartHookContext) error { return wait.PollImmediateUntil(100*time.Millisecond, func() (bool, error) { return s.Informers.Apiextensions().InternalVersion().CustomResourceDefinitions().Informer().HasSynced(), nil }, context.StopCh) }) return s, nil }
以上是APIExtensionsServer的初始化过程,其中最核心的方法是s.GenericAPIServer.InstallAPIGroup,也就是API的注册过程,三个服务器中API的注册过程是它的核心。
创建KubeAPIServer
本节继续分析 KubeAPIServer 的初始化,kubeAPIServerConfig.Complete().New
调用这里CreateKubeAPIServer
完成初始化操作。
kubeAPIServerConfig.Complete().New
主要逻辑如下。
- 调用 GenericAPIServer 初始化 GenericAPIServer,
c.GenericConfig.New
上面分析了它的主要实现。 - 判断是否支持日志相关路由,如果支持,添加
/logs
路由。 - 调用
m.InstallLegacyAPI
将核心 API 资源添加到路由中,该路由对应 apiserver 作为以 . 开头的资源/api
。 - 调用
m.InstallAPIs
以将扩展 API 资源添加到路由,在 apiserver 中是以 . 开头的资源/apis
。
k8s.io/kubernetes/cmd/kube-apiserver/app/server.go:214
func CreateKubeAPIServer(......) (*master.Master, error) { kubeAPIServer, err := kubeAPIServerConfig.Complete().New(delegateAPIServer) if err != nil { return nil, err } kubeAPIServer.GenericAPIServer.AddPostStartHookOrDie("start-kube-apiserver-admission-initializer", admissionPostStartHook) return kubeAPIServer, nil }
k8s.io/kubernetes/pkg/master/master.go:325
func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) (*Master, error) { ...... // 1、初始化 GenericAPIServer s, err := c.GenericConfig.New("kube-apiserver", delegationTarget) if err != nil { return nil, err } // 2、注册 logs 相关的路由 if c.ExtraConfig.EnableLogsSupport { routes.Logs{}.Install(s.Handler.GoRestfulContainer) } m := &Master{ GenericAPIServer: s, } // 3、安装 LegacyAPI if c.ExtraConfig.APIResourceConfigSource.VersionEnabled(apiv1.SchemeGroupVersion) { legacyRESTStorageProvider := corerest.LegacyRESTStorageProvider{ StorageFactory: c.ExtraConfig.StorageFactory, ProxyTransport: c.ExtraConfig.ProxyTransport, ...... } if err := m.InstallLegacyAPI(&c, c.GenericConfig.RESTOptionsGetter, legacyRESTStorageProvider); err != nil { return nil, err } } restStorageProviders := []RESTStorageProvider{ auditregistrationrest.RESTStorageProvider{}, authenticationrest.RESTStorageProvider{Authenticator: c.GenericConfig.Authentication.Authenticator, APIAudiences: c.GenericConfig. Authentication.APIAudiences}, ...... } // 4、安装 APIs if err := m.InstallAPIs(c.ExtraConfig.APIResourceConfigSource, c.GenericConfig.RESTOptionsGetter, restStorageProviders...); err != nil { return nil, err } if c.ExtraConfig.Tunneler != nil { m.installTunneler(c.ExtraConfig.Tunneler, corev1client.NewForConfigOrDie(c.GenericConfig.LoopbackClientConfig).Nodes()) } m.GenericAPIServer.AddPostStartHookOrDie("ca-registration", c.ExtraConfig.ClientCARegistrationHook.PostStartHook) return m, nil }
m.InstallLegacyAPI
该方法的主要作用是将核心API注册到路由中,是apiserver初始化过程中最核心的方法之一,但是它的调用链很深,下面会深入分析。将API注册到路由的最终目的是提供外部RESTful API来操作对应的资源,注册API主要分为两步,第一步是为API中的每个资源初始化RESTStorage来操作变更对于后端存储中的数据,第二步是根据每个资源的动词构建对应的路由第二步是根据每个资源的动词构建对应的路由。的主要逻辑m.InstallLegacyAPI
如下。
- 调用
legacyRESTStorageProvider.NewLegacyRESTStorage
为 LegacyAPI 中的每个资源创建 RESTStorage。RESTStorage 的目的是对应每个资源的访问路径及其在后端存储中的操作。 - 初始化
bootstrap-controller
并添加到 PostStartHook 中,bootstrap-controller
是 apiserver 中的一个控制器,主要功能是创建一些系统需要的命名空间并创建 kubernetes 服务并定期触发相应的同步操作。apiserver 将在启动bootstrap-controller
后通过调用 PostStartHook 来启动。 - 为资源创建 RESTStorage 后,调用
m.GenericAPIServer.InstallLegacyAPIGroup
为 APIGroup 注册路由信息。方法的调用链InstallLegacyAPIGroup
很深,主要是InstallLegacyAPIGroup --> installAPIResources --> InstallREST --> Install --> registerResourceHandlers
,而最终的核心路由构建在registerResourceHandlers
方法中,比较复杂,它的主要作用是确定哪些操作(如创建、更新等)可以通过通过上一步构建的REST Storage获取资源,并将对应的操作存储到action中,每个action对应一个标准最后,根据actions数组,每个action都会添加一个handler方法并注册到路由中,然后该路由将注册到网络服务。Web 服务最终将按照 go-restful 设计模式注册到容器中。
legacyRESTStorageProvider.NewLegacyRESTStorage
和方法的细节m.GenericAPIServer.InstallLegacyAPIGroup
将在后面的部分继续。
k8s.io/kubernetes/pkg/master/master.go:406
func (m *Master) InstallLegacyAPI(......) error { legacyRESTStorage, apiGroupInfo, err := legacyRESTStorageProvider.NewLegacyRESTStorage(restOptionsGetter) if err != nil { return fmt.Errorf("Error building core storage: %v", err) } controllerName := "bootstrap-controller" coreClient := corev1client.NewForConfigOrDie(c.GenericConfig.LoopbackClientConfig) bootstrapController := c.NewBootstrapController(legacyRESTStorage, coreClient, coreClient, coreClient, coreClient.RESTClient()) m.GenericAPIServer.AddPostStartHookOrDie(controllerName, bootstrapController.PostStartHook) m.GenericAPIServer.AddPreShutdownHookOrDie(controllerName, bootstrapController.PreShutdownHook) if err := m.GenericAPIServer.InstallLegacyAPIGroup(genericapiserver.DefaultLegacyAPIPrefix, &apiGroupInfo); err != nil { return fmt.Errorf("Error in registering group versions: %v", err) } return nil }
InstallAPIs
和的主要过程InstallLegacyAPI
类似,为了篇幅,这里不再赘述。
创建聚合器服务器
AggregatorServer
主要用于自定义聚合控制器,使CRD自动注册到集群。
主要逻辑如下。
- 调用
aggregatorConfig.Complete().NewWithDelegate
以创建 aggregatorServer。 - 初始化
crdRegistrationController
和autoRegistrationController
。crdRegistrationController
负责注册CRD,autoRegistrationController
负责自动注册CRDcrdRegistrationController
负责注册CRD,autoRegistrationController
负责自动向apiserver注册相应的APIServices。 autoRegistrationController
将和添加crdRegistrationController
到 PostStartHook。
k8s.io/kubernetes/cmd/kube-apiserver/app/aggregator.go:124
func createAggregatorServer(......) (*aggregatorapiserver.APIAggregator, error) { // 1、初始化 aggregatorServer aggregatorServer, err := aggregatorConfig.Complete().NewWithDelegate(delegateAPIServer) if err != nil { return nil, err } // 2、初始化 auto-registration controller apiRegistrationClient, err := apiregistrationclient.NewForConfig(aggregatorConfig.GenericConfig.LoopbackClientConfig) if err != nil { return nil, err } autoRegistrationController := autoregister.NewAutoRegisterController(......) apiServices := apiServicesToRegister(delegateAPIServer, autoRegistrationController) crdRegistrationController := crdregistration.NewCRDRegistrationController(......) err = aggregatorServer.GenericAPIServer.AddPostStartHook("kube-apiserver-autoregistration", func(context genericapiserver.PostStartHookContext) error { go crdRegistrationController.Run(5, context.StopCh) go func() { if aggregatorConfig.GenericConfig.MergedResourceConfig.AnyVersionForGroupEnabled("apiextensions.k8s.io") { crdRegistrationController.WaitForInitialSync() } autoRegistrationController.Run(5, context.StopCh) }() return nil }) if err != nil { return nil, err } err = aggregatorServer.GenericAPIServer.AddBootSequenceHealthChecks( makeAPIServiceAvailableHealthCheck( "autoregister-completion", apiServices, aggregatorServer.APIRegistrationInformers.Apiregistration().V1().APIServices(), ), ) if err != nil { return nil, err } return aggregatorServer, nil }
aggregatorConfig.Complete().NewWithDelegate
NewWithDelegate`是初始化aggregatorServer的方法,主要逻辑如下。
- 调用
c.GenericConfig.New
初始化GenericAPIServer,上面已经分析了其内部主要功能。 - 调用
apiservicerest.NewRESTStorage
为APIServices资源创建RESTStorage,RESTStorage的目的是对应每个资源的访问路径及其后端存储操作。 - 调用
s.GenericAPIServer.InstallAPIGroup
为 APIGroup 注册路由信息。
k8s.io/kubernetes/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go:158
func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.DelegationTarget) (*APIAggregator, error) { openAPIConfig := c.GenericConfig.OpenAPIConfig c.GenericConfig.OpenAPIConfig = nil // 1、初始化 genericServer genericServer, err := c.GenericConfig.New("kube-aggregator", delegationTarget) if err != nil { return nil, err } apiregistrationClient, err := clientset.NewForConfig(c.GenericConfig.LoopbackClientConfig) if err != nil { return nil, err } informerFactory := informers.NewSharedInformerFactory( apiregistrationClient, 5*time.Minute, ) s := &APIAggregator{ GenericAPIServer: genericServer, delegateHandler: delegationTarget.UnprotectedHandler(), ...... } // 2、为 API 注册路由 apiGroupInfo := apiservicerest.NewRESTStorage(c.GenericConfig.MergedResourceConfig, c.GenericConfig.RESTOptionsGetter) if err := s.GenericAPIServer.InstallAPIGroup(&apiGroupInfo); err != nil { return nil, err } // 3、初始化 apiserviceRegistrationController、availableController apisHandler := &apisHandler{ codecs: aggregatorscheme.Codecs, lister: s.lister, } s.GenericAPIServer.Handler.NonGoRestfulMux.Handle("/apis", apisHandler) s.GenericAPIServer.Handler.NonGoRestfulMux.UnlistedHandle("/apis/", apisHandler) apiserviceRegistrationController := NewAPIServiceRegistrationController(informerFactory.Apiregistration().V1().APIServices(), s) availableController, err := statuscontrollers.NewAvailableConditionController( ...... ) if err != nil { return nil, err } // 4、添加 PostStartHook s.GenericAPIServer.AddPostStartHookOrDie("start-kube-aggregator-informers", func(context genericapiserver.PostStartHookContext) error { informerFactory.Start(context.StopCh) c.GenericConfig.SharedInformerFactory.Start(context.StopCh) return nil }) s.GenericAPIServer.AddPostStartHookOrDie("apiservice-registration-controller", func(context genericapiserver.PostStartHookContext) error { go apiserviceRegistrationController.Run(context.StopCh) return nil }) s.GenericAPIServer.AddPostStartHookOrDie("apiservice-status-available-controller", func(context genericapiserver.PostStartHookContext) error { go availableController.Run(5, context.StopCh) return nil }) return s, nil }
以上是对AggregatorServer初始化过程的分析,可以看出在创建APIExtensionsServer、KubeAPIServer和AggregatorServer的时候,模式是类似的,先调用c.GenericConfig.New
根据模式go- New
初始化Container go-restful
,然后再创建RESTStorage为需要在服务器中注册的资源,最后将资源的APIGroup信息注册到路由中。
至此,CreateServerChain中的流程已经分析完毕,调用链如下图所示。
|--> CreateNodeDialer | |--> CreateKubeAPIServerConfig | CreateServerChain --|--> createAPIExtensionsConfig | | |--> c.GenericConfig.New |--> createAPIExtensionsServer --> apiextensionsConfig.Complete().New --| | |--> s.GenericAPIServer.InstallAPIGroup | | |--> c.GenericConfig.New --> legacyRESTStorageProvider.NewLegacyRESTStorage | | |--> CreateKubeAPIServer --> kubeAPIServerConfig.Complete().New --|--> m.InstallLegacyAPI | | | |--> m.InstallAPIs | | |--> createAggregatorConfig | | |--> c.GenericConfig.New | | |--> createAggregatorServer --> aggregatorConfig.Complete().NewWithDelegate --|--> apiservicerest.NewRESTStorage | |--> s.GenericAPIServer.InstallAPIGroup
prepared.Run
该Run
方法首先调用CreateServerChain
完成各个服务器的初始化,然后调用server.PrepareRun
完成服务启动前的准备工作,最后调用该prepared.Run
方法启动安全http服务器。
serverPrepare.Run
主要完成健康检查、生存检查和OpenAPI
路由注册,然后继续分析流程prepared.Run
,其中s.NonBlockingRun
调用完成启动。
k8s.io/kubernetes/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go:269
func (s preparedAPIAggregator) Run(stopCh <-chan struct{}) error { return s.runnable.Run(stopCh) }
k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go:316
func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error { delayedStopCh := make(chan struct{}) go func() { defer close(delayedStopCh) <-stopCh time.Sleep(s.ShutdownDelayDuration) }() // 调用 s.NonBlockingRun 完成启动流程 err := s.NonBlockingRun(delayedStopCh) if err != nil { return err } // 当收到退出信号后完成一些收尾工作 <-stopCh err = s.RunPreShutdownHooks() if err != nil { return err } <-delayedStopCh s.HandlerChainWaitGroup.Wait() return nil }
s.NonBlockingRun
的主要逻辑s.NonBlockingRun
如下。
- 确定是否启动审计日志服务。
- 调用
s.SecureServingInfo.Serve
以配置和启动 https 服务器。 - 执行 postStartHooks。
- 向 systemd 发送就绪信号。
k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go:351
func (s preparedGenericAPIServer) NonBlockingRun(stopCh <-chan struct{}) error { auditStopCh := make(chan struct{}) // 1、判断是否要启动审计日志 if s.AuditBackend != nil { if err := s.AuditBackend.Run(auditStopCh); err != nil { return fmt.Errorf("failed to run the audit backend: %v", err) } } // 2、启动 https server internalStopCh := make(chan struct{}) var stoppedCh <-chan struct{} if s.SecureServingInfo != nil && s.Handler != nil { var err error stoppedCh, err = s.SecureServingInfo.Serve(s.Handler, s.ShutdownTimeout, internalStopCh) if err != nil { close(internalStopCh) close(auditStopCh) return err } } go func() { <-stopCh close(s.readinessStopCh) close(internalStopCh) if stoppedCh != nil { <-stoppedCh } s.HandlerChainWaitGroup.Wait() close(auditStopCh) }() // 3、执行 postStartHooks s.RunPostStartHooks(stopCh) // 4、向 systemd 发送 ready 信号 if _, err := systemd.SdNotify(true, "READY=1\n"); err != nil { klog.Errorf("Unable to send systemd daemon successful start message: %v\n", err) } return nil }
以上就是服务端的初始化和启动过程的分析,如前所述,服务端初始化过程中最重要的部分就是API Resource RESTStorage的初始化和路由的注册,因为流程是比较复杂,下面将分别讲述。
StorageFactory construction
如前所述,apiserver最终实现的handler对应的后端数据存储在一个Store结构体中。在这里,例如,以 开头的路由用于通过该方法为每个资源/api
创建RESTStorage 。NewLegacyRESTStorage
RESTStorage 是在 下定义的结构k8s.io/apiserver/pkg/registry/generic/registry/store.go
,主要包含NewFunc
返回资源特定信息、NewListFunc
返回资源特定列表以及CreateStrategy
资源创建、UpdateStrategy
更新和DeleteStrategy
删除。在里面NewLegacyRESTStorage
,你可以看到创建了多个资源的 RESTStorage。
的调用链NewLegacyRESTStorage
是CreateKubeAPIServer --> kubeAPIServerConfig.Complete().New --> m.InstallLegacyAPI --> legacyRESTStorageProvider.NewLegacyRESTStorage
。
NewLegacyRESTStorage
一个 API Group 下的所有资源都有自己的 REST 实现,所有的 Group 下k8s.io/kubernetes/pkg/registry
都有一个 rest 目录,存放着对应资源的 RESTStorage。在该NewLegacyRESTStorage
方法中,每个资源对应的storage会由NewREST
or生成NewStorage
,这里以pod为例。
k8s.io/kubernetes/pkg/registry/core/rest/storage_core.go:102
func (c LegacyRESTStorageProvider) NewLegacyRESTStorage(restOptionsGetter generic.RESTOptionsGetter) (LegacyRESTStorage, genericapiserver. APIGroupInfo, error) { apiGroupInfo := genericapiserver.APIGroupInfo{ PrioritizedVersions: legacyscheme.Scheme.PrioritizedVersionsForGroup(""), VersionedResourcesStorageMap: map[string]map[string]rest.Storage{}, Scheme: legacyscheme.Scheme, ParameterCodec: legacyscheme.ParameterCodec, NegotiatedSerializer: legacyscheme.Codecs, } var podDisruptionClient policyclient.PodDisruptionBudgetsGetter if policyGroupVersion := (schema.GroupVersion{Group: "policy", Version: "v1beta1"}); legacyscheme.Scheme. IsVersionRegistered(policyGroupVersion) { var err error podDisruptionClient, err = policyclient.NewForConfig(c.LoopbackClientConfig) if err != nil { return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err } } // 1、LegacyAPI 下的 resource RESTStorage 的初始化 restStorage := LegacyRESTStorage{} podTemplateStorage, err := podtemplatestore.NewREST(restOptionsGetter) if err != nil { return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err } eventStorage, err := eventstore.NewREST(restOptionsGetter, uint64(c.EventTTL.Seconds())) if err != nil { return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err } limitRangeStorage, err := limitrangestore.NewREST(restOptionsGetter) if err != nil { return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err } ...... endpointsStorage, err := endpointsstore.NewREST(restOptionsGetter) if err != nil { return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err } nodeStorage, err := nodestore.NewStorage(restOptionsGetter, c.KubeletClientConfig, c.ProxyTransport) if err != nil { return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err } // 2、pod RESTStorage 的初始化 podStorage, err := podstore.NewStorage(......) if err != nil { return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err } ...... serviceClusterIPAllocator, err := ipallocator.NewAllocatorCIDRRange(&serviceClusterIPRange, func(max int, rangeSpec string) (allocator. Interface, error) { ...... }) if err != nil { return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, fmt.Errorf("cannot create cluster IP allocator: %v", err) } restStorage.ServiceClusterIPAllocator = serviceClusterIPRegistry var secondaryServiceClusterIPAllocator ipallocator.Interface if utilfeature.DefaultFeatureGate.Enabled(features.IPv6DualStack) && c.SecondaryServiceIPRange.IP != nil { ...... } var serviceNodePortRegistry rangeallocation.RangeRegistry serviceNodePortAllocator, err := portallocator.NewPortAllocatorCustom(c.ServiceNodePortRange, func(max int, rangeSpec string) (allocator.Interface, error) { ...... }) if err != nil { return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, fmt.Errorf("cannot create cluster port allocator: %v", err) } restStorage.ServiceNodePortAllocator = serviceNodePortRegistry controllerStorage, err := controllerstore.NewStorage(restOptionsGetter) if err != nil { return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err } serviceRest, serviceRestProxy := servicestore.NewREST(......) // 3、restStorageMap 保存 resource http path 与 RESTStorage 对应关系 restStorageMap := map[string]rest.Storage{ "pods": podStorage.Pod, "pods/attach": podStorage.Attach, "pods/status": podStorage.Status, "pods/log": podStorage.Log, "pods/exec": podStorage.Exec, "pods/portforward": podStorage.PortForward, "pods/proxy": podStorage.Proxy, ...... "componentStatuses": componentstatus.NewStorage(componentStatusStorage{c.StorageFactory}.serversToValidate), } ...... }
substore.NewStorage
podstore.NewStorage
是一种为 pod 生成存储的方法。该方法的主要作用是为 pod 创建后端存储,并最终返回一个 RESTStorage 对象,该对象调用store.CompleteWithOptions
创建后端存储。
k8s.io/kubernetes/pkg/registry/core/pod/storage/storage.go:71
func NewStorage(......) (PodStorage, error) { store := &genericregistry.Store{ NewFunc: func() runtime.Object { return &api.Pod{} }, NewListFunc: func() runtime.Object { return &api.PodList{} }, ...... } options := &generic.StoreOptions{ RESTOptions: optsGetter, AttrFunc: pod.GetAttrs, TriggerFunc: map[string]storage.IndexerFunc{"spec.nodeName": pod.NodeNameTriggerFunc}, } // 调用 store.CompleteWithOptions if err := store.CompleteWithOptions(options); err != nil { return PodStorage{}, err } statusStore := *store statusStore.UpdateStrategy = pod.StatusStrategy ephemeralContainersStore := *store ephemeralContainersStore.UpdateStrategy = pod.EphemeralContainersStrategy bindingREST := &BindingREST{store: store} // PodStorage 对象 return PodStorage{ Pod: &REST{store, proxyTransport}, Binding: &BindingREST{store: store}, LegacyBinding: &LegacyBindingREST{bindingREST}, Eviction: newEvictionStorage(store, podDisruptionBudgetClient), Status: &StatusREST{store: &statusStore}, EphemeralContainers: &EphemeralContainersREST{store: &ephemeralContainersStore}, Log: &podrest.LogREST{Store: store, KubeletConn: k}, Proxy: &podrest.ProxyREST{Store: store, ProxyTransport: proxyTransport}, Exec: &podrest.ExecREST{Store: store, KubeletConn: k}, Attach: &podrest.AttachREST{Store: store, KubeletConn: k}, PortForward: &podrest.PortForwardREST{Store: store, KubeletConn: k}, }, nil }
可以看到最终返回的对象中对 pod 的不同操作都是一个 REST 对象。
该genericregistry.Store
对象自动集成到 REST 中,该store.CompleteWithOptions
方法在 `genericregistry.xml 中初始化存储实例。
type REST struct { *genericregistry.Store proxyTransport http.RoundTripper } type BindingREST struct { store *genericregistry.Store } ......
store.CompleteWithOptions
CompleteWithOptions 主要用于设置 store 中配置的一些默认值,并根据提供的选项更新 store,主要是初始化 store 的后端存储实例。
在CompleteWithOptions
方法内部,options.RESTOptions.GetRESTOptions
方法被调用,最终返回generic.RESTOptions
对象,该对象包含了etcd的一些配置,该generic.RESTOptions
对象包含了etcd初始化的一些配置,数据序列化方法,以及一个存储。该StorageWithCacher-->NewRawStorage-->Create
方法被顺序调用以创建最终的依赖后端存储。
k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store.go:1192
func (e *Store) CompleteWithOptions(options *generic.StoreOptions) error { ...... var isNamespaced bool switch { case e.CreateStrategy != nil: isNamespaced = e.CreateStrategy.NamespaceScoped() case e.UpdateStrategy != nil: isNamespaced = e.UpdateStrategy.NamespaceScoped() default: return fmt.Errorf("store for %s must have CreateStrategy or UpdateStrategy set", e.DefaultQualifiedResource.String()) } ...... // 1、调用 options.RESTOptions.GetRESTOptions opts, err := options.RESTOptions.GetRESTOptions(e.DefaultQualifiedResource) if err != nil { return err } // 2、设置 ResourcePrefix prefix := opts.ResourcePrefix if !strings.HasPrefix(prefix, "/") { prefix = "/" + prefix } if prefix == "/" { return fmt.Errorf("store for %s has an invalid prefix %q", e.DefaultQualifiedResource.String(), opts.ResourcePrefix) } if e.KeyRootFunc == nil && e.KeyFunc == nil { ...... } keyFunc := func(obj runtime.Object) (string, error) { ...... } // 3、以下操作主要是将 opts 对象中的值赋值到 store 对象中 if e.DeleteCollectionWorkers == 0 { e.DeleteCollectionWorkers = opts.DeleteCollectionWorkers } e.EnableGarbageCollection = opts.EnableGarbageCollection if e.ObjectNameFunc == nil { ...... } if e.Storage.Storage == nil { e.Storage.Codec = opts.StorageConfig.Codec var err error e.Storage.Storage, e.DestroyFunc, err = opts.Decorator( opts.StorageConfig, prefix, keyFunc, e.NewFunc, e.NewListFunc, attrFunc, options.TriggerFunc, ) if err != nil { return err } e.StorageVersioner = opts.StorageConfig.EncodeVersioner if opts.CountMetricPollPeriod > 0 { stopFunc := e.startObservingCount(opts.CountMetricPollPeriod) previousDestroy := e.DestroyFunc e.DestroyFunc = func() { stopFunc() if previousDestroy != nil { previousDestroy() } } } } return nil }
options.RESTOptions
是一个接口,要找到它的GetRESTOptions
方法的实现就必须知道方法options.RESTOptions
中初始化时对应CreateKubeAPIServerConfig --> buildGenericConfig --> s.Etcd.ApplyWithStorageFactoryTo
的实例,对应的实例RESTOptions
是StorageFactoryRestOptionsFactory
,所以 PodStoragegenericserver.Config.RESTOptionsGetter
初始化时构造的store对象中的实际对象类型是StorageFactoryRestOptionsFactory
,而其GetRESTOptions
方法如下所示。
k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/server/options/etcd.go:253
func (f *StorageFactoryRestOptionsFactory) GetRESTOptions(resource schema.GroupResource) (generic.RESTOptions, error) { storageConfig, err := f.StorageFactory.NewConfig(resource) if err != nil { return generic.RESTOptions{}, fmt.Errorf("unable to find storage destination for %v, due to %v", resource, err.Error()) } ret := generic.RESTOptions{ StorageConfig: storageConfig, Decorator: generic.UndecoratedStorage, DeleteCollectionWorkers: f.Options.DeleteCollectionWorkers, EnableGarbageCollection: f.Options.EnableGarbageCollection, ResourcePrefix: f.StorageFactory.ResourcePrefix(resource), CountMetricPollPeriod: f.Options.StorageConfig.CountMetricPollPeriod, } if f.Options.EnableWatchCache { sizes, err := ParseWatchCacheSizes(f.Options.WatchCacheSizes) if err != nil { return generic.RESTOptions{}, err } cacheSize, ok := sizes[resource] if !ok { cacheSize = f.Options.DefaultWatchCacheSize } // 调用 generic.StorageDecorator ret.Decorator = genericregistry.StorageWithCacher(cacheSize) } return ret, nil }
在genericregistry.StorageWithCacher
一个不同的方法被调用,它最终调用factory.Create
来初始化存储实例,调用链是:genericregistry.StorageWithCacher --> generic. NewRawStorage --> factory.Create
.
k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/factory.go:30
func Create(c storagebackend.Config) (storage.Interface, DestroyFunc, error) { switch c.Type { case "etcd2": return nil, nil, fmt.Errorf("%v is no longer a supported storage backend", c.Type) // 目前 k8s 只支持使用 etcd v3 case storagebackend.StorageTypeUnset, storagebackend.StorageTypeETCD3: return newETCD3Storage(c) default: return nil, nil, fmt.Errorf("unknown storage type: %s", c.Type) } }
新ETCD3存储
其中newETCD3Storage
,首先调用newETCD3Client
创建etcd的客户端,最后使用官方的etcd客户端工具clientv3创建客户端。
k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.go:209
func newETCD3Storage(c storagebackend.Config) (storage.Interface, DestroyFunc, error) { stopCompactor, err := startCompactorOnce(c.Transport, c.CompactionInterval) if err != nil { return nil, nil, err } client, err := newETCD3Client(c.Transport) if err != nil { stopCompactor() return nil, nil, err } var once sync.Once destroyFunc := func() { once.Do(func() { stopCompactor() client.Close() }) } transformer := c.Transformer if transformer == nil { transformer = value.IdentityTransformer } return etcd3.New(client, c.Codec, c.Prefix, transformer, c.Paging), destroyFunc, nil }
至此,pod资源中store的构建分析基本完成,不同资源对应一个REST对象,依次引用该genericregistry.Store
对象,最后初始化`genericregistry。在分析了 store 的初始化之后,还有一个重要的步骤是路由的注册。路由注册的主要过程是根据不同的动词为资源建立一个http路径,并将路径绑定到对应的handler上。
路由注册
legacyRESTStorageProvider.NewLegacyRESTStorage
上面 RESTStorage的构造对应InstallLegacyAPI
. 下面继续分析中m.GenericAPIServer.InstallLegacyAPIGroup
方法的实现InstallLegacyAPI
。
k8s.io/kubernetes/pkg/master/master.go:406
func (m *Master) InstallLegacyAPI(......) error { legacyRESTStorage, apiGroupInfo, err := legacyRESTStorageProvider.NewLegacyRESTStorage(restOptionsGetter) if err != nil { return fmt.Errorf("Error building core storage: %v", err) } ...... if err := m.GenericAPIServer.InstallLegacyAPIGroup(genericapiserver.DefaultLegacyAPIPrefix, &apiGroupInfo); err != nil { return fmt.Errorf("Error in registering group versions: %v", err) } return nil }
for 的调用链m.GenericAPIServer.InstallLegacyAPIGroup
很深,最终为 Group 下的每个 API 资源注册了 handler 和路由信息,即:m.GenericAPIServer. InstallLegacyAPIGroup --> s.installAPIResources --> apiGroupVersion.InstallREST --> installer.Install --> a.registerResourceHandlers
. 其中几种方法的工作原理如下所示。
s.installAPIResources
:为每个 API 资源调用添加一个路由到apiGroupVersion.InstallREST
.apiGroupVersion.InstallREST
: 将restful.WebServic
对象添加到容器中。installer.Install
: 返回最终restful.WebService
对象
a.registerResourceHandlers
此方法实现从rest.Storage
到的转换restful.Route
。它首先确定 API Resource 支持的 REST 接口,然后为 REST 接口添加相应的处理程序,最后将其注册到路由中。
k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/endpoints/installer.go:181
func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storage, ws *restful.WebService) (*metav1.APIResource, error) { admit := a.group.Admit ...... // 1、判断该 resource 实现了哪些 REST 操作接口,以此来判断其支持的 verbs 以便为其添加路由 creater, isCreater := storage.(rest.Creater) namedCreater, isNamedCreater := storage.(rest.NamedCreater) lister, isLister := storage.(rest.Lister) getter, isGetter := storage.(rest.Getter) getterWithOptions, isGetterWithOptions := storage.(rest.GetterWithOptions) gracefulDeleter, isGracefulDeleter := storage.(rest.GracefulDeleter) collectionDeleter, isCollectionDeleter := storage.(rest.CollectionDeleter) updater, isUpdater := storage.(rest.Updater) patcher, isPatcher := storage.(rest.Patcher) watcher, isWatcher := storage.(rest.Watcher) connecter, isConnecter := storage.(rest.Connecter) storageMeta, isMetadata := storage.(rest.StorageMetadata) storageVersionProvider, isStorageVersionProvider := storage.(rest.StorageVersionProvider) if !isMetadata { storageMeta = defaultStorageMetadata{} } exporter, isExporter := storage.(rest.Exporter) if !isExporter { exporter = nil } ...... // 2、为 resource 添加对应的 actions 并根据是否支持 namespace switch { case !namespaceScoped: ...... actions = appendIf(actions, action{"LIST", resourcePath, resourceParams, namer, false}, isLister) actions = appendIf(actions, action{"POST", resourcePath, resourceParams, namer, false}, isCreater) actions = appendIf(actions, action{"DELETECOLLECTION", resourcePath, resourceParams, namer, false}, isCollectionDeleter) actions = appendIf(actions, action{"WATCHLIST", "watch/" + resourcePath, resourceParams, namer, false}, allowWatchList) actions = appendIf(actions, action{"GET", itemPath, nameParams, namer, false}, isGetter) if getSubpath { actions = appendIf(actions, action{"GET", itemPath + "/{path:*}", proxyParams, namer, false}, isGetter) } actions = appendIf(actions, action{"PUT", itemPath, nameParams, namer, false}, isUpdater) actions = appendIf(actions, action{"PATCH", itemPath, nameParams, namer, false}, isPatcher) actions = appendIf(actions, action{"DELETE", itemPath, nameParams, namer, false}, isGracefulDeleter) actions = appendIf(actions, action{"WATCH", "watch/" + itemPath, nameParams, namer, false}, isWatcher) actions = appendIf(actions, action{"CONNECT", itemPath, nameParams, namer, false}, isConnecter) actions = appendIf(actions, action{"CONNECT", itemPath + "/{path:*}", proxyParams, namer, false}, isConnecter && connectSubpath) default: ...... actions = appendIf(actions, action{"LIST", resourcePath, resourceParams, namer, false}, isLister) actions = appendIf(actions, action{"POST", resourcePath, resourceParams, namer, false}, isCreater) actions = appendIf(actions, action{"DELETECOLLECTION", resourcePath, resourceParams, namer, false}, isCollectionDeleter) actions = appendIf(actions, action{"WATCHLIST", "watch/" + resourcePath, resourceParams, namer, false}, allowWatchList) actions = appendIf(actions, action{"GET", itemPath, nameParams, namer, false}, isGetter) ...... } // 3、根据 action 创建对应的 route kubeVerbs := map[string]struct{}{} reqScope := handlers.RequestScope{ Serializer: a.group.Serializer, ParameterCodec: a.group.ParameterCodec, Creater: a.group.Creater, Convertor: a.group.Convertor, ...... } ...... // 4、从 rest.Storage 到 restful.Route 映射 // 为每个操作添加对应的 handler for _, action := range actions { ...... verbOverrider, needOverride := storage.(StorageMetricsOverride) switch action.Verb { case "GET": ...... case "LIST": case "PUT": case "PATCH": // 此处以 POST 操作进行说明 case "POST": var handler restful.RouteFunction // 5、初始化 handler if isNamedCreater { handler = restfulCreateNamedResource(namedCreater, reqScope, admit) } else { handler = restfulCreateResource(creater, reqScope, admit) } handler = metrics.InstrumentRouteFunc(action.Verb, group, version, resource, subresource, requestScope, metrics.APIServerComponent, handler) article := GetArticleForNoun(kind, " ") doc := "create" + article + kind if isSubresource { doc = "create " + subresource + " of" + article + kind } // 6、route 与 handler 进行绑定 route := ws.POST(action.Path).To(handler). Doc(doc). Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")). Operation("create"+namespaced+kind+strings.Title(subresource)+operationSuffix). Produces(append(storageMeta.ProducesMIMETypes(action.Verb), mediaTypes...)...). Returns(http.StatusOK, "OK", producedObject). Returns(http.StatusCreated, "Created", producedObject). Returns(http.StatusAccepted, "Accepted", producedObject). Reads(defaultVersionedObject). Writes(producedObject) if err := AddObjectParams(ws, route, versionedCreateOptions); err != nil { return nil, err } addParams(route, action.Params) // 7、添加到路由中 routes = append(routes, route) case "DELETE": case "DELETECOLLECTION": case "WATCH": case "WATCHLIST": case "CONNECT": default: } ...... return &apiResource, nil }
restfulCreateNamedResource
restfulCreateNamedResource
是POST操作的handler,最终会通过调用createHandler
方法来完成。
k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/endpoints/installer.go:1087
func restfulCreateNamedResource(r rest.NamedCreater, scope handlers.RequestScope, admit admission.Interface) restful.RouteFunction { return func(req *restful.Request, res *restful.Response) { handlers.CreateNamedResource(r, &scope, admit)(res.ResponseWriter, req.Request) } } func CreateNamedResource(r rest.NamedCreater, scope *RequestScope, admission admission.Interface) http.HandlerFunc { return createHandler(r, scope, admission, true) }
创建处理程序
createHandler
是向后端存储写入数据的方法,对资源的操作有权限控制,在里面会先执行and操作,然后调用该createHandler
方法decoder
完成资源的创建,在方法,最后将数据保存到后端存储。该操作执行 kube-apiserver 中的准入插件。admission-plugins 初始化为admissionChain in ,初始化调用链为,最后in将所有启用的插件包装成admissionChain ,这里要执行的admit 操作就是admission-plugins 中的admit 操作。admission
create
create
validate
admit
CreateKubeAPIServerConfig
CreateKubeAPIServerConfig --> buildGenericConfig --> s.Admission.ApplyTo --> a.GenericAdmission.ApplyTo --> a.Plugins.NewFromPlugins
NewFromPlugins
调用的create方法createHandler
是genericregistry.Store
对象的方法。genericregistry.Store
在 RESTStorage 的每个资源初始化时都会引入该对象中
所有操作createHandler
都是本文开头提到的请求流程,如下。
v1beta1 ⇒ internal ⇒ | ⇒ | ⇒ v1 ⇒ json/yaml ⇒ etcd admission validation
k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/create.go:46
func createHandler(r rest.NamedCreater, scope *RequestScope, admit admission.Interface, includeName bool) http.HandlerFunc { return func(w http.ResponseWriter, req *http.Request) { trace := utiltrace.New("Create", utiltrace.Field{"url", req.URL.Path}) defer trace.LogIfLong(500 * time.Millisecond) ...... gv := scope.Kind.GroupVersion() // 1、得到合适的SerializerInfo s, err := negotiation.NegotiateInputSerializer(req, false, scope.Serializer) if err != nil { scope.err(err, w, req) return } // 2、找到合适的 decoder decoder := scope.Serializer.DecoderToVersion(s.Serializer, scope.HubGroupVersion) body, err := limitedReadBody(req, scope.MaxRequestBodyBytes) if err != nil { scope.err(err, w, req) return } ...... defaultGVK := scope.Kind original := r.New() trace.Step("About to convert to expected version") // 3、decoder 解码 obj, gvk, err := decoder.Decode(body, &defaultGVK, original) ...... ae := request.AuditEventFrom(ctx) admit = admission.WithAudit(admit, ae) audit.LogRequestObject(ae, obj, scope.Resource, scope.Subresource, scope.Serializer) userInfo, _ := request.UserFrom(ctx) if len(name) == 0 { _, name, _ = scope.Namer.ObjectName(obj) } // 4、执行 admit 操作,即执行 kube-apiserver 启动时加载的 admission-plugins, admissionAttributes := admission.NewAttributesRecord(......) if mutatingAdmission, ok := admit.(admission.MutationInterface); ok && mutatingAdmission.Handles(admission.Create) { err = mutatingAdmission.Admit(ctx, admissionAttributes, scope) if err != nil { scope.err(err, w, req) return } } ...... // 5、执行 create 操作 result, err := finishRequest(timeout, func() (runtime.Object, error) { return r.Create( ctx, name, obj, rest.AdmissionToValidateObjectFunc(admit, admissionAttributes, scope), options, ) }) ...... } }
概括
本文分析了kube-apiserver的启动过程。kube-apiserver 包含三个服务器,分别是 KubeAPIServer、APIExtensionsServer 和 AggregatorServer,它们通过委托方式连接在一起。初始化过程类似,先为每个服务器创建对应的config,然后初始化http服务器,http服务器初始化过程就是先初始化GoRestfulContainer
,然后安装服务器自带的API,安装API时先为每个API创建对应的后端存储RES安装API时,先为每个API Resource创建对应的后端存储RESTStorage,然后为每个API添加对应的handler API Resource 支持动词,并将handler注册到路由,最后将路由注册到webservice,RESTFul API的实现流程是启动流程的核心。启动过程的核心是 RESTFul API 的实现。至于kube-apiserver中鉴权鉴权等过滤器的实现、多版本资源转换、kubernetes服务实现等细节,我们会在后面的文章中继续分析。