From ad190a312c9591e11ec62b95a2eb5ddf3476c407 Mon Sep 17 00:00:00 2001 From: zak905 Date: Wed, 22 Apr 2026 20:03:19 +0200 Subject: [PATCH] refactor(controller): use kubernetes informers provided by client-go --- pkg/controller/controller.go | 23 ++++------------------- pkg/controller/node.go | 22 ---------------------- pkg/controller/pod.go | 25 ------------------------- pkg/util/k8sutil/k8sutil.go | 2 ++ 4 files changed, 6 insertions(+), 66 deletions(-) diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 13e4017c8..873254d5b 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -26,6 +26,7 @@ import ( rbacv1 "k8s.io/api/rbac/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + informers_core_v1 "k8s.io/client-go/informers/core/v1" "k8s.io/client-go/kubernetes/scheme" typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/tools/cache" @@ -401,16 +402,8 @@ func (c *Controller) initSharedInformers() { } // Pods - podLw := &cache.ListWatch{ - ListFunc: c.podListFunc, - WatchFunc: c.podWatchFunc, - } - - c.podInformer = cache.NewSharedIndexInformer( - podLw, - &v1.Pod{}, - constants.QueueResyncPeriodPod, - cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) + c.podInformer = informers_core_v1.NewPodInformer(c.KubeClient.Clientset, + c.opConfig.WatchedNamespace, constants.QueueResyncPeriodPod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) c.podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: c.podAdd, @@ -419,15 +412,7 @@ func (c *Controller) initSharedInformers() { }) // Kubernetes Nodes - nodeLw := &cache.ListWatch{ - ListFunc: c.nodeListFunc, - WatchFunc: c.nodeWatchFunc, - } - - c.nodesInformer = cache.NewSharedIndexInformer( - nodeLw, - &v1.Node{}, - constants.QueueResyncPeriodNode, + informers_core_v1.NewNodeInformer(c.KubeClient.Clientset, constants.QueueResyncPeriodNode, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) c.nodesInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ diff --git a/pkg/controller/node.go b/pkg/controller/node.go index 2836b4f7f..730a9133b 100644 --- a/pkg/controller/node.go +++ b/pkg/controller/node.go @@ -9,33 +9,11 @@ import ( v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/watch" "github.com/zalando/postgres-operator/pkg/cluster" "github.com/zalando/postgres-operator/pkg/util" ) -func (c *Controller) nodeListFunc(options metav1.ListOptions) (runtime.Object, error) { - opts := metav1.ListOptions{ - Watch: options.Watch, - ResourceVersion: options.ResourceVersion, - TimeoutSeconds: options.TimeoutSeconds, - } - - return c.KubeClient.Nodes().List(context.TODO(), opts) -} - -func (c *Controller) nodeWatchFunc(options metav1.ListOptions) (watch.Interface, error) { - opts := metav1.ListOptions{ - Watch: options.Watch, - ResourceVersion: options.ResourceVersion, - TimeoutSeconds: options.TimeoutSeconds, - } - - return c.KubeClient.Nodes().Watch(context.TODO(), opts) -} - func (c *Controller) nodeAdd(obj interface{}) { node, ok := obj.(*v1.Node) if !ok { diff --git a/pkg/controller/pod.go b/pkg/controller/pod.go index 0defe88b1..1aaa307ea 100644 --- a/pkg/controller/pod.go +++ b/pkg/controller/pod.go @@ -1,12 +1,7 @@ package controller import ( - "context" - v1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/watch" "github.com/zalando/postgres-operator/pkg/cluster" "github.com/zalando/postgres-operator/pkg/spec" @@ -14,26 +9,6 @@ import ( "k8s.io/apimachinery/pkg/types" ) -func (c *Controller) podListFunc(options metav1.ListOptions) (runtime.Object, error) { - opts := metav1.ListOptions{ - Watch: options.Watch, - ResourceVersion: options.ResourceVersion, - TimeoutSeconds: options.TimeoutSeconds, - } - - return c.KubeClient.Pods(c.opConfig.WatchedNamespace).List(context.TODO(), opts) -} - -func (c *Controller) podWatchFunc(options metav1.ListOptions) (watch.Interface, error) { - opts := metav1.ListOptions{ - Watch: options.Watch, - ResourceVersion: options.ResourceVersion, - TimeoutSeconds: options.TimeoutSeconds, - } - - return c.KubeClient.Pods(c.opConfig.WatchedNamespace).Watch(context.TODO(), opts) -} - func (c *Controller) dispatchPodEvent(clusterName spec.NamespacedName, event cluster.PodEvent) { c.clustersMu.RLock() cluster, ok := c.clusters[clusterName] diff --git a/pkg/util/k8sutil/k8sutil.go b/pkg/util/k8sutil/k8sutil.go index c34faddd4..8515027ab 100644 --- a/pkg/util/k8sutil/k8sutil.go +++ b/pkg/util/k8sutil/k8sutil.go @@ -67,6 +67,7 @@ type KubernetesClient struct { zalandov1.FabricEventStreamsGetter RESTClient rest.Interface + Clientset *kubernetes.Clientset AcidV1ClientSet *zalandoclient.Clientset Zalandov1ClientSet *zalandoclient.Clientset } @@ -148,6 +149,7 @@ func NewFromConfig(cfg *rest.Config) (KubernetesClient, error) { return kubeClient, fmt.Errorf("could not get clientset: %v", err) } + kubeClient.Clientset = client kubeClient.PodsGetter = client.CoreV1() kubeClient.ServicesGetter = client.CoreV1() kubeClient.EndpointsGetter = client.CoreV1()