refactor(controller): use kubernetes informers provided by client-go
This commit is contained in:
parent
e1713705f4
commit
ad190a312c
|
|
@ -26,6 +26,7 @@ import (
|
|||
rbacv1 "k8s.io/api/rbac/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
informers_core_v1 "k8s.io/client-go/informers/core/v1"
|
||||
"k8s.io/client-go/kubernetes/scheme"
|
||||
typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
|
|
@ -401,16 +402,8 @@ func (c *Controller) initSharedInformers() {
|
|||
}
|
||||
|
||||
// Pods
|
||||
podLw := &cache.ListWatch{
|
||||
ListFunc: c.podListFunc,
|
||||
WatchFunc: c.podWatchFunc,
|
||||
}
|
||||
|
||||
c.podInformer = cache.NewSharedIndexInformer(
|
||||
podLw,
|
||||
&v1.Pod{},
|
||||
constants.QueueResyncPeriodPod,
|
||||
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
|
||||
c.podInformer = informers_core_v1.NewPodInformer(c.KubeClient.Clientset,
|
||||
c.opConfig.WatchedNamespace, constants.QueueResyncPeriodPod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
|
||||
|
||||
c.podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: c.podAdd,
|
||||
|
|
@ -419,15 +412,7 @@ func (c *Controller) initSharedInformers() {
|
|||
})
|
||||
|
||||
// Kubernetes Nodes
|
||||
nodeLw := &cache.ListWatch{
|
||||
ListFunc: c.nodeListFunc,
|
||||
WatchFunc: c.nodeWatchFunc,
|
||||
}
|
||||
|
||||
c.nodesInformer = cache.NewSharedIndexInformer(
|
||||
nodeLw,
|
||||
&v1.Node{},
|
||||
constants.QueueResyncPeriodNode,
|
||||
informers_core_v1.NewNodeInformer(c.KubeClient.Clientset, constants.QueueResyncPeriodNode,
|
||||
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
|
||||
|
||||
c.nodesInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
|
|
|
|||
|
|
@ -9,33 +9,11 @@ import (
|
|||
v1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
|
||||
"github.com/zalando/postgres-operator/pkg/cluster"
|
||||
"github.com/zalando/postgres-operator/pkg/util"
|
||||
)
|
||||
|
||||
func (c *Controller) nodeListFunc(options metav1.ListOptions) (runtime.Object, error) {
|
||||
opts := metav1.ListOptions{
|
||||
Watch: options.Watch,
|
||||
ResourceVersion: options.ResourceVersion,
|
||||
TimeoutSeconds: options.TimeoutSeconds,
|
||||
}
|
||||
|
||||
return c.KubeClient.Nodes().List(context.TODO(), opts)
|
||||
}
|
||||
|
||||
func (c *Controller) nodeWatchFunc(options metav1.ListOptions) (watch.Interface, error) {
|
||||
opts := metav1.ListOptions{
|
||||
Watch: options.Watch,
|
||||
ResourceVersion: options.ResourceVersion,
|
||||
TimeoutSeconds: options.TimeoutSeconds,
|
||||
}
|
||||
|
||||
return c.KubeClient.Nodes().Watch(context.TODO(), opts)
|
||||
}
|
||||
|
||||
func (c *Controller) nodeAdd(obj interface{}) {
|
||||
node, ok := obj.(*v1.Node)
|
||||
if !ok {
|
||||
|
|
|
|||
|
|
@ -1,12 +1,7 @@
|
|||
package controller
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
v1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
|
||||
"github.com/zalando/postgres-operator/pkg/cluster"
|
||||
"github.com/zalando/postgres-operator/pkg/spec"
|
||||
|
|
@ -14,26 +9,6 @@ import (
|
|||
"k8s.io/apimachinery/pkg/types"
|
||||
)
|
||||
|
||||
func (c *Controller) podListFunc(options metav1.ListOptions) (runtime.Object, error) {
|
||||
opts := metav1.ListOptions{
|
||||
Watch: options.Watch,
|
||||
ResourceVersion: options.ResourceVersion,
|
||||
TimeoutSeconds: options.TimeoutSeconds,
|
||||
}
|
||||
|
||||
return c.KubeClient.Pods(c.opConfig.WatchedNamespace).List(context.TODO(), opts)
|
||||
}
|
||||
|
||||
func (c *Controller) podWatchFunc(options metav1.ListOptions) (watch.Interface, error) {
|
||||
opts := metav1.ListOptions{
|
||||
Watch: options.Watch,
|
||||
ResourceVersion: options.ResourceVersion,
|
||||
TimeoutSeconds: options.TimeoutSeconds,
|
||||
}
|
||||
|
||||
return c.KubeClient.Pods(c.opConfig.WatchedNamespace).Watch(context.TODO(), opts)
|
||||
}
|
||||
|
||||
func (c *Controller) dispatchPodEvent(clusterName spec.NamespacedName, event cluster.PodEvent) {
|
||||
c.clustersMu.RLock()
|
||||
cluster, ok := c.clusters[clusterName]
|
||||
|
|
|
|||
|
|
@ -67,6 +67,7 @@ type KubernetesClient struct {
|
|||
zalandov1.FabricEventStreamsGetter
|
||||
|
||||
RESTClient rest.Interface
|
||||
Clientset *kubernetes.Clientset
|
||||
AcidV1ClientSet *zalandoclient.Clientset
|
||||
Zalandov1ClientSet *zalandoclient.Clientset
|
||||
}
|
||||
|
|
@ -148,6 +149,7 @@ func NewFromConfig(cfg *rest.Config) (KubernetesClient, error) {
|
|||
return kubeClient, fmt.Errorf("could not get clientset: %v", err)
|
||||
}
|
||||
|
||||
kubeClient.Clientset = client
|
||||
kubeClient.PodsGetter = client.CoreV1()
|
||||
kubeClient.ServicesGetter = client.CoreV1()
|
||||
kubeClient.EndpointsGetter = client.CoreV1()
|
||||
|
|
|
|||
Loading…
Reference in New Issue