您现在的位置是:亿华云 > IT科技类资讯
Kubebuilder 进阶之源码分析
亿华云2025-10-08 23:11:16【IT科技类资讯】0人已围观
简介在前面的文章当中我们已经完整的完成了一个 Operator 的开发,涉及到了 CURD、预删除、Status、Event、OwnerReference、WebHook,也算是将一个 Operator
在前面的进阶文章当中我们已经完整的完成了一个 Operator 的开发,涉及到了 CURD、源码预删除、分析Status、进阶Event、源码OwnerReference、分析WebHook,进阶也算是源码将一个 Operator 开发中会涉及到的点大部分都了解了一下。kubebuilder 帮我们做了很多事情,分析让我们的进阶开发基本上只需要关注一个 Reconcile 函数就可以了,但是源码从另外一个方面来讲,kubebuilder 目前对我们来说它还是分析一个黑盒,会产生很多的进阶疑问:
Reconcile 方法是怎么被触发的? 怎么识别到不同的资源? 整体是如何进行工作的? ……架构
我们先来看一下来自官方文档的这个架构图[1]
arch
Process 进程通过 main.go启动,一般来说一个 Controller 只有一个进程,源码如果做了高可用的分析话,会有多个 Manager 每个进程会有一个 Manager,这是核心组件,网站模板主要负责 metrics 的暴露 webhook 证书 初始化共享的 cache 初始化共享的 clients 用于和 APIServer 进行通信 所有的 Controller 的运行 Client 一般来说,我们 创建、更新、删除某个资源的时候会直接调用 Client 和 APIServer 进行通信 Cache 负责同步 Controller 关心的资源,其核心是 GVK -> Informer 的映射,一般我们的 Get 和 List 操作都会从 Cache 中获取数据 Controller 控制器的业务逻辑所在的地方,一个 Manager 可能会有多个 Controller,我们一般只需要实现 Reconcile 方法就行。图上的 Predicate 是事件过滤器,我们可以在 Controller 中过滤掉我们不关心的事件信息 WebHook 就是我们准入控制实现的地方了,主要是有两类接口,一个是 MutatingAdmissionWebhook 需要实现 Defaulter 接口,一个是 ValidatingAdmissionWebhook 需要实现 Validator 接口源码分析
了解了基本的云服务器架构之后,我们就从入口 main.go 开始,看一看 kubebuilder 究竟在后面偷偷的做了哪些事情吧。
main.go
// 省略了参数绑定和 error check 的代码 func main() { var metricsAddr string var enableLeaderElection bool var probeAddr string ctrl.SetLogger(zap.New(zap.UseFlagOptions(&opts))) mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{ Scheme: scheme, MetricsBindAddress: metricsAddr, Port: 9443, HealthProbeBindAddress: probeAddr, LeaderElection: enableLeaderElection, LeaderElectionID: "97acaccf.lailin.xyz", // CertDir: "config/cert/", // 手动指定证书位置用于测试 }) (&controllers.NodePoolReconciler{ Client: mgr.GetClient(), Log: ctrl.Log.WithName("controllers").WithName("NodePool"), Scheme: mgr.GetScheme(), Recorder: mgr.GetEventRecorderFor("NodePool"), }).SetupWithManager(mgr) (&nodesv1.NodePool{ }).SetupWebhookWithManager(mgr) //+kubebuilder:scaffold:builder mgr.AddHealthzCheck("healthz", healthz.Ping) mgr.AddReadyzCheck("readyz", healthz.Ping) setupLog.Info("starting manager") mgr.Start(ctrl.SetupSignalHandler()) }可以看到 main.go 主要是做了一些启动的工作包括:
创建一个 Manager 使用刚刚创建的 Manager 创建了一个 Controller 启动 WebHook 添加健康检查 启动 Manager下面我们就顺着 main 函数里面的逻辑一步步的往下看看
NewManger
// New returns a new Manager for creating Controllers. func New(config *rest.Config, options Options) (Manager, error) { // 省略配置初始化相关代码 // 创建 cache cache, err := options.NewCache(config, cache.Options{ Scheme: options.Scheme, // main 中传入的 scheme Mapper: mapper, // k8s api 和 go type 的转换器 Resync: options.SyncPeriod, // 默认 10 小时,一般不要改 Namespace: options.Namespace, // 需要监听的 namespace }) // 创建和 APIServer 交互的 client,读写分离 clientOptions := client.Options{ Scheme: options.Scheme, Mapper: mapper} apiReader, err := client.New(config, clientOptions) writeObj, err := options.ClientBuilder. WithUncached(options.ClientDisableCacheFor...). Build(cache, config, clientOptions) if options.DryRunClient { writeObj = client.NewDryRunClient(writeObj) } // 创建事件记录器 recorderProvider, err := options.newRecorderProvider(config, options.Scheme, options.Logger.WithName("events"), options.makeBroadcaster) // 需要需要高可用的话,创建选举相关的配置 leaderConfig := config if options.LeaderElectionConfig != nil { leaderConfig = options.LeaderElectionConfig } resourceLock, err := options.newResourceLock(leaderConfig, recorderProvider, leaderelection.Options{ LeaderElection: options.LeaderElection, LeaderElectionResourceLock: options.LeaderElectionResourceLock, LeaderElectionID: options.LeaderElectionID, LeaderElectionNamespace: options.LeaderElectionNamespace, }) // 创建 metric 和 健康检查的接口 metricsListener, err := options.newMetricsListener(options.MetricsBindAddress) // By default we have no extra endpoints to expose on metrics http server. metricsExtraHandlers := make(map[string]http.Handler) // Create health probes listener. This will throw an error if the bind // address is invalid or already in use. healthProbeListener, err := options.newHealthProbeListener(options.HealthProbeBindAddress) if err != nil { return nil, err } // 最后将这些配置放到 manager 中 return &controllerManager{ config: config, scheme: options.Scheme, cache: cache, fieldIndexes: cache, client: writeObj, apiReader: apiReader, recorderProvider: recorderProvider, resourceLock: resourceLock, mapper: mapper, metricsListener: metricsListener, metricsExtraHandlers: metricsExtraHandlers, logger: options.Logger, elected: make(chan struct{ }), port: options.Port, host: options.Host, certDir: options.CertDir, leaseDuration: *options.LeaseDuration, renewDeadline: *options.RenewDeadline, retryPeriod: *options.RetryPeriod, healthProbeListener: healthProbeListener, readinessEndpointName: options.ReadinessEndpointName, livenessEndpointName: options.LivenessEndpointName, gracefulShutdownTimeout: *options.GracefulShutdownTimeout, internalProceduresStop: make(chan struct{ }), }, nil }创建 Cache
func New(config *rest.Config, opts Options) (Cache, error) { opts, err := defaultOpts(config, opts) if err != nil { return nil, err } im := internal.NewInformersMap(config, opts.Scheme, opts.Mapper, *opts.Resync, opts.Namespace) return &informerCache{ InformersMap: im}, nil }这里主要是调用 NewInformersMap方法创建 Informer 的映射
func NewInformersMap(config *rest.Config, scheme *runtime.Scheme, mapper meta.RESTMapper, resync time.Duration, namespace string) *InformersMap { return &InformersMap{ structured: newStructuredInformersMap(config, scheme, mapper, resync, namespace), unstructured: newUnstructuredInformersMap(config, scheme, mapper, resync, namespace), metadata: newMetadataInformersMap(config, scheme, mapper, resync, namespace), Scheme: scheme, } }NewInformersMap会去分别创建,结构化、非结构化以及 metadata 的 InformerMap 而这些方法最后都会去调用 newSpecificInformersMap方法,区别就是不同的方法传入的 createListWatcherFunc 参数不同
func newSpecificInformersMap(config *rest.Config, scheme *runtime.Scheme, mapper meta.RESTMapper, resync time.Duration, namespace string, createListWatcher createListWatcherFunc) *specificInformersMap { ip := &specificInformersMap{ config: config, Scheme: scheme, mapper: mapper, informersByGVK: make(map[schema.GroupVersionKind]*MapEntry), codecs: serializer.NewCodecFactory(scheme), paramCodec: runtime.NewParameterCodec(scheme), resync: resync, startWait: make(chan struct{ }), createListWatcher: createListWatcher, namespace: namespace, } return ip }newSpecificInformersMap 和常规的 InformersMap 类似,区别是没实现 WaitForCacheSync方法
以结构化的源码下载传入的 createStructuredListWatch 为例,主要是返回一个用于创建 SharedIndexInformer 的 ListWatch 对象
func createStructuredListWatch(gvk schema.GroupVersionKind, ip *specificInformersMap) (*cache.ListWatch, error) { // Kubernetes APIs work against Resources, not GroupVersionKinds. Map the // groupVersionKind to the Resource API we will use. mapping, err := ip.mapper.RESTMapping(gvk.GroupKind(), gvk.Version) if err != nil { return nil, err } client, err := apiutil.RESTClientForGVK(gvk, false, ip.config, ip.codecs) if err != nil { return nil, err } listGVK := gvk.GroupVersion().WithKind(gvk.Kind + "List") listObj, err := ip.Scheme.New(listGVK) if err != nil { return nil, err } // TODO: the functions that make use of this ListWatch should be adapted to // pass in their own contexts instead of relying on this fixed one here. ctx := context.TODO() // Create a new ListWatch for the obj return &cache.ListWatch{ ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) { res := listObj.DeepCopyObject() isNamespaceScoped := ip.namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot err := client.Get().NamespaceIfScoped(ip.namespace, isNamespaceScoped).Resource(mapping.Resource.Resource).VersionedParams(&opts, ip.paramCodec).Do(ctx).Into(res) return res, err }, // Setup the watch function WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) { // Watch needs to be set to true separately opts.Watch = true isNamespaceScoped := ip.namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot return client.Get().NamespaceIfScoped(ip.namespace, isNamespaceScoped).Resource(mapping.Resource.Resource).VersionedParams(&opts, ip.paramCodec).Watch(ctx) }, }, nil }小结: cache 主要是创建了一些 InformerMap,完成了 GVK 到 Informer 的映射,每个 Informer 会根据 ListWatch 函数对对应的 GVK 进行 List 和 Watch。
创建 Client
func New(config *rest.Config, options Options) (Client, error) { if config == nil { return nil, fmt.Errorf("must provide non-nil rest.Config to client.New") } // Init a scheme if none provided if options.Scheme == nil { options.Scheme = scheme.Scheme } // Init a Mapper if none provided if options.Mapper == nil { var err error options.Mapper, err = apiutil.NewDynamicRESTMapper(config) if err != nil { return nil, err } } clientcache := &clientCache{ config: config, scheme: options.Scheme, mapper: options.Mapper, codecs: serializer.NewCodecFactory(options.Scheme), structuredResourceByType: make(map[schema.GroupVersionKind]*resourceMeta), unstructuredResourceByType: make(map[schema.GroupVersionKind]*resourceMeta), } rawMetaClient, err := metadata.NewForConfig(config) if err != nil { return nil, fmt.Errorf("unable to construct metadata-only client for use as part of client: %w", err) } c := &client{ typedClient: typedClient{ cache: clientcache, paramCodec: runtime.NewParameterCodec(options.Scheme), }, unstructuredClient: unstructuredClient{ cache: clientcache, paramCodec: noConversionParamCodec{ }, }, metadataClient: metadataClient{ client: rawMetaClient, restMapper: options.Mapper, }, scheme: options.Scheme, mapper: options.Mapper, } return c, nil }client 创建了两个一个用于读,一个用于写,用于读的会直接使用上面的 cache,用于写的才会直接和 APIServer 进行交互
Controller
下面我们看一下核心的 Controller 是怎么初始化和工作的
if err = (&controllers.NodePoolReconciler{ Client: mgr.GetClient(), Log: ctrl.Log.WithName("controllers").WithName("NodePool"), Scheme: mgr.GetScheme(), Recorder: mgr.GetEventRecorderFor("NodePool"), }).SetupWithManager(mgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", "NodePool") os.Exit(1) }main.go 的方法里面主要是初始化了 Controller 的结构体,然后调用了 SetupWithManager方法
// SetupWithManager sets up the controller with the Manager. func (r *NodePoolReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). For(&nodesv1.NodePool{ }). Watches(&source.Kind{ Type: &corev1.Node{ }}, handler.Funcs{ UpdateFunc: r.nodeUpdateHandler}). Complete(r) }SetupWithManager之前有讲到过,主要是使用了建造者模式,去构建了我们需要监听的对象,只有这些对象的相关事件才会触发我们的 Reconcile 逻辑。这里面的 Complete 最后其实是调用了 Build 方法
func (blder *Builder) Build(r reconcile.Reconciler) (controller.Controller, error) { // 省略参数校验 // Set the Config blder.loadRestConfig() // Set the ControllerManagedBy if err := blder.doController(r); err != nil { return nil, err } // Set the Watch if err := blder.doWatch(); err != nil { return nil, err } return blder.ctrl, nil }Build主要调用 doController 、doWatch两个方法
func (blder *Builder) doController(r reconcile.Reconciler) error { ctrlOptions := blder.ctrlOptions if ctrlOptions.Reconciler == nil { ctrlOptions.Reconciler = r } // Retrieve the GVK from the object were reconciling // to prepopulate logger information, and to optionally generate a default name. gvk, err := getGvk(blder.forInput.object, blder.mgr.GetScheme()) if err != nil { return err } // Setup the logger. if ctrlOptions.Log == nil { ctrlOptions.Log = blder.mgr.GetLogger() } ctrlOptions.Log = ctrlOptions.Log.WithValues("reconciler group", gvk.Group, "reconciler kind", gvk.Kind) // Build the controller and return. blder.ctrl, err = newController(blder.getControllerName(gvk), blder.mgr, ctrlOptions) return err }doController主要是初始化了一个 Controller,这里面传入了我们实现 的Reconciler以及获取到我们的 GVK 的名称
func (blder *Builder) doWatch() error { // Reconcile type typeForSrc, err := blder.project(blder.forInput.object, blder.forInput.objectProjection) if err != nil { return err } src := &source.Kind{ Type: typeForSrc} hdler := &handler.EnqueueRequestForObject{ } allPredicates := append(blder.globalPredicates, blder.forInput.predicates...) if err := blder.ctrl.Watch(src, hdler, allPredicates...); err != nil { return err } // Watches the managed types for _, own := range blder.ownsInput { typeForSrc, err := blder.project(own.object, own.objectProjection) if err != nil { return err } src := &source.Kind{ Type: typeForSrc} hdler := &handler.EnqueueRequestForOwner{ OwnerType: blder.forInput.object, IsController: true, } allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...) allPredicates = append(allPredicates, own.predicates...) if err := blder.ctrl.Watch(src, hdler, allPredicates...); err != nil { return err } } // Do the watch requests for _, w := range blder.watchesInput { allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...) allPredicates = append(allPredicates, w.predicates...) // If the source of this watch is of type *source.Kind, project it. if srckind, ok := w.src.(*source.Kind); ok { typeForSrc, err := blder.project(srckind.Type, w.objectProjection) if err != nil { return err } srckind.Type = typeForSrc } if err := blder.ctrl.Watch(w.src, w.eventhandler, allPredicates...); err != nil { return err } } return nil }Watch 主要是监听我们想要的资源变化,blder.ctrl.Watch(src, hdler, allPredicates...)通过过滤源事件的变化,allPredicates是过滤器,只有所有的过滤器都返回 true 时,才会将事件传递给 EventHandler hdler,这里会将 Handler 注册到 Informer 上
启动
func (cm *controllerManager) Start(ctx context.Context) (err error) { cm.internalCtx, cm.internalCancel = context.WithCancel(ctx) // 这个用来表示所有的协程都已经退出了, stopComplete := make(chan struct{ }) defer close(stopComplete) // ...... // 用于保存错误 cm.errChan = make(chan error) // 如果需要 metric 就启动 metric 服务 if cm.metricsListener != nil { go cm.serveMetrics() } // 启动健康检查服务 if cm.healthProbeListener != nil { go cm.serveHealthProbes() } go cm.startNonLeaderElectionRunnables() go func() { if cm.resourceLock != nil { err := cm.startLeaderElection() if err != nil { cm.errChan <- err } } else { // Treat not having leader election enabled the same as being elected. close(cm.elected) go cm.startLeaderElectionRunnables() } }() // 判断是否需要退出 select { case <-ctx.Done(): // We are done return nil case err := <-cm.errChan: // Error starting or running a runnable return err } }无论是不是 leader 最后都会使用 startRunnable 启动 Controller
func (cm *controllerManager) startNonLeaderElectionRunnables() { cm.mu.Lock() defer cm.mu.Unlock() cm.waitForCache(cm.internalCtx) // Start the non-leaderelection Runnables after the cache has synced for _, c := range cm.nonLeaderElectionRunnables { // Controllers block, but we want to return an error if any have an error starting. // Write any Start errors to a channel so we can return them cm.startRunnable(c) } }实际上是调用了 Controller 的 Start方法
// Start implements controller.Controller func (c *Controller) Start(ctx context.Context) error { // Controller 只能被执行一次 c.mu.Lock() if c.Started { return errors.New("controller was started more than once. This is likely to be caused by being added to a manager multiple times") } // Set the internal context. c.ctx = ctx // 获取队列 c.Queue = c.MakeQueue() defer c.Queue.ShutDown() err := func() error { defer c.mu.Unlock() defer utilruntime.HandleCrash() // 尝试等待缓存 for _, watch := range c.startWatches { c.Log.Info("Starting EventSource", "source", watch.src) if err := watch.src.Start(ctx, watch.handler, c.Queue, watch.predicates...); err != nil { return err } } // 启动 Controller c.Log.Info("Starting Controller") for _, watch := range c.startWatches { syncingSource, ok := watch.src.(source.SyncingSource) if !ok { continue } if err := syncingSource.WaitForSync(ctx); err != nil { // This code is unreachable in case of kube watches since WaitForCacheSync will never return an error // Leaving it here because that could happen in the future err := fmt.Errorf("failed to wait for %s caches to sync: %w", c.Name, err) c.Log.Error(err, "Could not wait for Cache to sync") return err } } // All the watches have been started, we can reset the local slice. // // We should never hold watches more than necessary, each watch source can hold a backing cache, // which wont be garbage collected if we hold a reference to it. c.startWatches = nil if c.JitterPeriod == 0 { c.JitterPeriod = 1 * time.Second } // Launch workers to process resources c.Log.Info("Starting workers", "worker count", c.MaxConcurrentReconciles) ctrlmetrics.WorkerCount.WithLabelValues(c.Name). Set(float64(c.MaxConcurrentReconciles)) for i := 0; i < c.MaxConcurrentReconciles; i++ { go wait.UntilWithContext(ctx, func(ctx context.Context) { // 查询队列中有没有关注的事件,有的话就触发我们的 reconcile 逻辑 for c.processNextWorkItem(ctx) { } }, c.JitterPeriod) } c.Started = true return nil }() if err != nil { return err } <-ctx.Done() c.Log.Info("Stopping workers") return nil } // attempt to process it, by calling the reconcileHandler. func (c *Controller) processNextWorkItem(ctx context.Context) bool { obj, shutdown := c.Queue.Get() if shutdown { // Stop working return false } // We call Done here so the workqueue knows we have finished // processing this item. We also must remember to call Forget if we // do not want this work item being re-queued. For example, we do // not call Forget if a transient error occurs, instead the item is // put back on the workqueue and attempted again after a back-off // period. defer c.Queue.Done(obj) ctrlmetrics.ActiveWorkers.WithLabelValues(c.Name).Add(1) defer ctrlmetrics.ActiveWorkers.WithLabelValues(c.Name).Add(-1) c.reconcileHandler(ctx, obj) return true }总结
Reconcile 方法的触发是通过 Cache 中的 Informer 获取到资源的变更事件,然后再通过生产者消费者的模式触发我们自己实现的 Reconcile 方法的。
Kubebuilder 是一个非常好用的 Operator 开发框架,不仅极大的简化了 Operator 的开发过程,并且充分的利用了 go interface 的特性留下了足够的扩展性,这个我们可以学习,如果我们的业务代码开发框架能够做到这个地步,我觉得也就不错了
参考文献
架构图 https://master.book.kubebuilder.io/architecture.html?
本文转载自微信公众号「mohuishou」,可以通过以下二维码关注。转载本文请联系mohuishou公众号。
原文链接:https://lailin.xyz/post/operator-09-kubebuilder-code.html
很赞哦!(59693)