From feef866164bacc70750bb431e909912c5fcd7779 Mon Sep 17 00:00:00 2001 From: Felix Kunde Date: Fri, 27 Mar 2020 17:04:05 +0100 Subject: [PATCH] reflect k8s client API changes --- pkg/cluster/cluster.go | 20 ++++---- pkg/cluster/exec.go | 5 +- pkg/cluster/k8sres.go | 11 +++- pkg/cluster/pod.go | 21 ++++---- pkg/cluster/resources.go | 85 ++++++++++++++++++------------- pkg/cluster/sync.go | 33 ++++++------ pkg/cluster/util.go | 11 ++-- pkg/cluster/volumes.go | 11 ++-- pkg/controller/controller.go | 5 +- pkg/controller/node.go | 7 +-- pkg/controller/operator_config.go | 4 +- pkg/controller/pod.go | 8 +-- pkg/controller/postgresql.go | 11 ++-- pkg/controller/util.go | 13 +++-- pkg/util/k8sutil/k8sutil.go | 27 +++++----- pkg/util/teams/teams_test.go | 4 +- 16 files changed, 160 insertions(+), 116 deletions(-) diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index dba67c142..45ee55300 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -3,6 +3,7 @@ package cluster // Postgres CustomResourceDefinition object i.e. Spilo import ( + "context" "database/sql" "encoding/json" "fmt" @@ -88,7 +89,7 @@ type Cluster struct { pgDb *sql.DB mu sync.Mutex userSyncStrategy spec.UserSyncer - deleteOptions *metav1.DeleteOptions + deleteOptions metav1.DeleteOptions podEventsQueue *cache.FIFO teamsAPIClient teams.Interface @@ -131,7 +132,7 @@ func New(cfg Config, kubeClient k8sutil.KubernetesClient, pgSpec acidv1.Postgres Services: make(map[PostgresRole]*v1.Service), Endpoints: make(map[PostgresRole]*v1.Endpoints)}, userSyncStrategy: users.DefaultUserSyncStrategy{}, - deleteOptions: &metav1.DeleteOptions{PropagationPolicy: &deletePropagationPolicy}, + deleteOptions: metav1.DeleteOptions{PropagationPolicy: &deletePropagationPolicy}, podEventsQueue: podEventsQueue, KubeClient: kubeClient, } @@ -182,7 +183,8 @@ func (c *Cluster) setStatus(status string) { // we cannot do a full scale update here without fetching the previous manifest (as the resourceVersion may differ), // however, we could do patch without it. In the future, once /status subresource is there (starting Kubernetes 1.11) // we should take advantage of it. - newspec, err := c.KubeClient.AcidV1ClientSet.AcidV1().Postgresqls(c.clusterNamespace()).Patch(c.Name, types.MergePatchType, patch, "status") + newspec, err := c.KubeClient.AcidV1ClientSet.AcidV1().Postgresqls(c.clusterNamespace()).Patch( + context.TODO(), c.Name, types.MergePatchType, patch, metav1.PatchOptions{}, "status") if err != nil { c.logger.Errorf("could not update status: %v", err) // return as newspec is empty, see PR654 @@ -1185,12 +1187,12 @@ func (c *Cluster) deleteClusterObject( func (c *Cluster) deletePatroniClusterServices() error { get := func(name string) (spec.NamespacedName, error) { - svc, err := c.KubeClient.Services(c.Namespace).Get(name, metav1.GetOptions{}) + svc, err := c.KubeClient.Services(c.Namespace).Get(context.TODO(), name, metav1.GetOptions{}) return util.NameFromMeta(svc.ObjectMeta), err } deleteServiceFn := func(name string) error { - return c.KubeClient.Services(c.Namespace).Delete(name, c.deleteOptions) + return c.KubeClient.Services(c.Namespace).Delete(context.TODO(), name, c.deleteOptions) } return c.deleteClusterObject(get, deleteServiceFn, "service") @@ -1198,12 +1200,12 @@ func (c *Cluster) deletePatroniClusterServices() error { func (c *Cluster) deletePatroniClusterEndpoints() error { get := func(name string) (spec.NamespacedName, error) { - ep, err := c.KubeClient.Endpoints(c.Namespace).Get(name, metav1.GetOptions{}) + ep, err := c.KubeClient.Endpoints(c.Namespace).Get(context.TODO(), name, metav1.GetOptions{}) return util.NameFromMeta(ep.ObjectMeta), err } deleteEndpointFn := func(name string) error { - return c.KubeClient.Endpoints(c.Namespace).Delete(name, c.deleteOptions) + return c.KubeClient.Endpoints(c.Namespace).Delete(context.TODO(), name, c.deleteOptions) } return c.deleteClusterObject(get, deleteEndpointFn, "endpoint") @@ -1211,12 +1213,12 @@ func (c *Cluster) deletePatroniClusterEndpoints() error { func (c *Cluster) deletePatroniClusterConfigMaps() error { get := func(name string) (spec.NamespacedName, error) { - cm, err := c.KubeClient.ConfigMaps(c.Namespace).Get(name, metav1.GetOptions{}) + cm, err := c.KubeClient.ConfigMaps(c.Namespace).Get(context.TODO(), name, metav1.GetOptions{}) return util.NameFromMeta(cm.ObjectMeta), err } deleteConfigMapFn := func(name string) error { - return c.KubeClient.ConfigMaps(c.Namespace).Delete(name, c.deleteOptions) + return c.KubeClient.ConfigMaps(c.Namespace).Delete(context.TODO(), name, c.deleteOptions) } return c.deleteClusterObject(get, deleteConfigMapFn, "configmap") diff --git a/pkg/cluster/exec.go b/pkg/cluster/exec.go index 8dd6bd91d..8b5089b4e 100644 --- a/pkg/cluster/exec.go +++ b/pkg/cluster/exec.go @@ -2,10 +2,11 @@ package cluster import ( "bytes" + "context" "fmt" "strings" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/tools/remotecommand" @@ -23,7 +24,7 @@ func (c *Cluster) ExecCommand(podName *spec.NamespacedName, command ...string) ( execErr bytes.Buffer ) - pod, err := c.KubeClient.Pods(podName.Namespace).Get(podName.Name, metav1.GetOptions{}) + pod, err := c.KubeClient.Pods(podName.Namespace).Get(context.TODO(), podName.Name, metav1.GetOptions{}) if err != nil { return "", fmt.Errorf("could not get pod info: %v", err) } diff --git a/pkg/cluster/k8sres.go b/pkg/cluster/k8sres.go index 2c40bb0ba..c4919c62d 100644 --- a/pkg/cluster/k8sres.go +++ b/pkg/cluster/k8sres.go @@ -1,6 +1,7 @@ package cluster import ( + "context" "encoding/json" "fmt" "path" @@ -914,11 +915,17 @@ func (c *Cluster) generateStatefulSet(spec *acidv1.PostgresSpec) (*appsv1.Statef if c.OpConfig.PodEnvironmentConfigMap != (pkgspec.NamespacedName{}) { var cm *v1.ConfigMap - cm, err = c.KubeClient.ConfigMaps(c.OpConfig.PodEnvironmentConfigMap.Namespace).Get(c.OpConfig.PodEnvironmentConfigMap.Name, metav1.GetOptions{}) + cm, err = c.KubeClient.ConfigMaps(c.OpConfig.PodEnvironmentConfigMap.Namespace).Get( + context.TODO(), + c.OpConfig.PodEnvironmentConfigMap.Name, + metav1.GetOptions{}) if err != nil { // if not found, try again using the cluster's namespace if it's different (old behavior) if k8sutil.ResourceNotFound(err) && c.Namespace != c.OpConfig.PodEnvironmentConfigMap.Namespace { - cm, err = c.KubeClient.ConfigMaps(c.Namespace).Get(c.OpConfig.PodEnvironmentConfigMap.Name, metav1.GetOptions{}) + cm, err = c.KubeClient.ConfigMaps(c.Namespace).Get( + context.TODO(), + c.OpConfig.PodEnvironmentConfigMap.Name, + metav1.GetOptions{}) } if err != nil { return nil, fmt.Errorf("could not read PodEnvironmentConfigMap: %v", err) diff --git a/pkg/cluster/pod.go b/pkg/cluster/pod.go index 095f859f0..9991621cc 100644 --- a/pkg/cluster/pod.go +++ b/pkg/cluster/pod.go @@ -1,6 +1,7 @@ package cluster import ( + "context" "fmt" "math/rand" @@ -17,7 +18,7 @@ func (c *Cluster) listPods() ([]v1.Pod, error) { LabelSelector: c.labelsSet(false).String(), } - pods, err := c.KubeClient.Pods(c.Namespace).List(listOptions) + pods, err := c.KubeClient.Pods(c.Namespace).List(context.TODO(), listOptions) if err != nil { return nil, fmt.Errorf("could not get list of pods: %v", err) } @@ -30,7 +31,7 @@ func (c *Cluster) getRolePods(role PostgresRole) ([]v1.Pod, error) { LabelSelector: c.roleLabelsSet(false, role).String(), } - pods, err := c.KubeClient.Pods(c.Namespace).List(listOptions) + pods, err := c.KubeClient.Pods(c.Namespace).List(context.TODO(), listOptions) if err != nil { return nil, fmt.Errorf("could not get list of pods: %v", err) } @@ -73,7 +74,7 @@ func (c *Cluster) deletePod(podName spec.NamespacedName) error { ch := c.registerPodSubscriber(podName) defer c.unregisterPodSubscriber(podName) - if err := c.KubeClient.Pods(podName.Namespace).Delete(podName.Name, c.deleteOptions); err != nil { + if err := c.KubeClient.Pods(podName.Namespace).Delete(context.TODO(), podName.Name, c.deleteOptions); err != nil { return err } @@ -183,7 +184,7 @@ func (c *Cluster) MigrateMasterPod(podName spec.NamespacedName) error { eol bool ) - oldMaster, err := c.KubeClient.Pods(podName.Namespace).Get(podName.Name, metav1.GetOptions{}) + oldMaster, err := c.KubeClient.Pods(podName.Namespace).Get(context.TODO(), podName.Name, metav1.GetOptions{}) if err != nil { return fmt.Errorf("could not get pod: %v", err) @@ -206,7 +207,9 @@ func (c *Cluster) MigrateMasterPod(podName spec.NamespacedName) error { // we must have a statefulset in the cluster for the migration to work if c.Statefulset == nil { var sset *appsv1.StatefulSet - if sset, err = c.KubeClient.StatefulSets(c.Namespace).Get(c.statefulSetName(), + if sset, err = c.KubeClient.StatefulSets(c.Namespace).Get( + context.TODO(), + c.statefulSetName(), metav1.GetOptions{}); err != nil { return fmt.Errorf("could not retrieve cluster statefulset: %v", err) } @@ -247,7 +250,7 @@ func (c *Cluster) MigrateMasterPod(podName spec.NamespacedName) error { // MigrateReplicaPod recreates pod on a new node func (c *Cluster) MigrateReplicaPod(podName spec.NamespacedName, fromNodeName string) error { - replicaPod, err := c.KubeClient.Pods(podName.Namespace).Get(podName.Name, metav1.GetOptions{}) + replicaPod, err := c.KubeClient.Pods(podName.Namespace).Get(context.TODO(), podName.Name, metav1.GetOptions{}) if err != nil { return fmt.Errorf("could not get pod: %v", err) } @@ -276,7 +279,7 @@ func (c *Cluster) recreatePod(podName spec.NamespacedName) (*v1.Pod, error) { defer c.unregisterPodSubscriber(podName) stopChan := make(chan struct{}) - if err := c.KubeClient.Pods(podName.Namespace).Delete(podName.Name, c.deleteOptions); err != nil { + if err := c.KubeClient.Pods(podName.Namespace).Delete(context.TODO(), podName.Name, c.deleteOptions); err != nil { return nil, fmt.Errorf("could not delete pod: %v", err) } @@ -300,7 +303,7 @@ func (c *Cluster) recreatePods() error { LabelSelector: ls.String(), } - pods, err := c.KubeClient.Pods(namespace).List(listOptions) + pods, err := c.KubeClient.Pods(namespace).List(context.TODO(), listOptions) if err != nil { return fmt.Errorf("could not get the list of pods: %v", err) } @@ -349,7 +352,7 @@ func (c *Cluster) recreatePods() error { } func (c *Cluster) podIsEndOfLife(pod *v1.Pod) (bool, error) { - node, err := c.KubeClient.Nodes().Get(pod.Spec.NodeName, metav1.GetOptions{}) + node, err := c.KubeClient.Nodes().Get(context.TODO(), pod.Spec.NodeName, metav1.GetOptions{}) if err != nil { return false, err } diff --git a/pkg/cluster/resources.go b/pkg/cluster/resources.go index 2e02a9a83..c0c731ed8 100644 --- a/pkg/cluster/resources.go +++ b/pkg/cluster/resources.go @@ -1,6 +1,7 @@ package cluster import ( + "context" "fmt" "strconv" "strings" @@ -80,7 +81,10 @@ func (c *Cluster) createStatefulSet() (*appsv1.StatefulSet, error) { if err != nil { return nil, fmt.Errorf("could not generate statefulset: %v", err) } - statefulSet, err := c.KubeClient.StatefulSets(statefulSetSpec.Namespace).Create(statefulSetSpec) + statefulSet, err := c.KubeClient.StatefulSets(statefulSetSpec.Namespace).Create( + context.TODO(), + statefulSetSpec, + metav1.CreateOptions{}) if err != nil { return nil, err } @@ -129,7 +133,7 @@ func (c *Cluster) createConnectionPool(lookup InstallFunction) (*ConnectionPoolO // should be good enough to not think about it here. deployment, err := c.KubeClient. Deployments(deploymentSpec.Namespace). - Create(deploymentSpec) + Create(context.TODO(), deploymentSpec, metav1.CreateOptions{}) if err != nil { return nil, err @@ -138,7 +142,7 @@ func (c *Cluster) createConnectionPool(lookup InstallFunction) (*ConnectionPoolO serviceSpec := c.generateConnPoolService(&c.Spec) service, err := c.KubeClient. Services(serviceSpec.Namespace). - Create(serviceSpec) + Create(context.TODO(), serviceSpec, metav1.CreateOptions{}) if err != nil { return nil, err @@ -180,7 +184,7 @@ func (c *Cluster) deleteConnectionPool() (err error) { options := metav1.DeleteOptions{PropagationPolicy: &policy} err = c.KubeClient. Deployments(c.Namespace). - Delete(deploymentName, &options) + Delete(context.TODO(), deploymentName, options) if !k8sutil.ResourceNotFound(err) { c.logger.Debugf("Connection pool deployment was already deleted") @@ -202,7 +206,7 @@ func (c *Cluster) deleteConnectionPool() (err error) { // will be deleted. err = c.KubeClient. Services(c.Namespace). - Delete(serviceName, &options) + Delete(context.TODO(), serviceName, options) if !k8sutil.ResourceNotFound(err) { c.logger.Debugf("Connection pool service was already deleted") @@ -251,7 +255,7 @@ func (c *Cluster) preScaleDown(newStatefulSet *appsv1.StatefulSet) error { } podName := fmt.Sprintf("%s-0", c.Statefulset.Name) - masterCandidatePod, err := c.KubeClient.Pods(c.clusterNamespace()).Get(podName, metav1.GetOptions{}) + masterCandidatePod, err := c.KubeClient.Pods(c.clusterNamespace()).Get(context.TODO(), podName, metav1.GetOptions{}) if err != nil { return fmt.Errorf("could not get master candidate pod: %v", err) } @@ -350,9 +354,12 @@ func (c *Cluster) updateStatefulSetAnnotations(annotations map[string]string) (* return nil, fmt.Errorf("could not form patch for the statefulset metadata: %v", err) } result, err := c.KubeClient.StatefulSets(c.Statefulset.Namespace).Patch( + context.TODO(), c.Statefulset.Name, types.MergePatchType, - []byte(patchData), "") + []byte(patchData), + metav1.PatchOptions{}, + "") if err != nil { return nil, fmt.Errorf("could not patch statefulset annotations %q: %v", patchData, err) } @@ -380,9 +387,12 @@ func (c *Cluster) updateStatefulSet(newStatefulSet *appsv1.StatefulSet) error { } statefulSet, err := c.KubeClient.StatefulSets(c.Statefulset.Namespace).Patch( + context.TODO(), c.Statefulset.Name, types.MergePatchType, - patchData, "") + patchData, + metav1.PatchOptions{}, + "") if err != nil { return fmt.Errorf("could not patch statefulset spec %q: %v", statefulSetName, err) } @@ -414,7 +424,7 @@ func (c *Cluster) replaceStatefulSet(newStatefulSet *appsv1.StatefulSet) error { oldStatefulset := c.Statefulset options := metav1.DeleteOptions{PropagationPolicy: &deletePropagationPolicy} - err := c.KubeClient.StatefulSets(oldStatefulset.Namespace).Delete(oldStatefulset.Name, &options) + err := c.KubeClient.StatefulSets(oldStatefulset.Namespace).Delete(context.TODO(), oldStatefulset.Name, options) if err != nil { return fmt.Errorf("could not delete statefulset %q: %v", statefulSetName, err) } @@ -425,7 +435,7 @@ func (c *Cluster) replaceStatefulSet(newStatefulSet *appsv1.StatefulSet) error { err = retryutil.Retry(c.OpConfig.ResourceCheckInterval, c.OpConfig.ResourceCheckTimeout, func() (bool, error) { - _, err2 := c.KubeClient.StatefulSets(oldStatefulset.Namespace).Get(oldStatefulset.Name, metav1.GetOptions{}) + _, err2 := c.KubeClient.StatefulSets(oldStatefulset.Namespace).Get(context.TODO(), oldStatefulset.Name, metav1.GetOptions{}) if err2 == nil { return false, nil } @@ -439,7 +449,7 @@ func (c *Cluster) replaceStatefulSet(newStatefulSet *appsv1.StatefulSet) error { } // create the new statefulset with the desired spec. It would take over the remaining pods. - createdStatefulset, err := c.KubeClient.StatefulSets(newStatefulSet.Namespace).Create(newStatefulSet) + createdStatefulset, err := c.KubeClient.StatefulSets(newStatefulSet.Namespace).Create(context.TODO(), newStatefulSet, metav1.CreateOptions{}) if err != nil { return fmt.Errorf("could not create statefulset %q: %v", statefulSetName, err) } @@ -460,7 +470,7 @@ func (c *Cluster) deleteStatefulSet() error { return fmt.Errorf("there is no statefulset in the cluster") } - err := c.KubeClient.StatefulSets(c.Statefulset.Namespace).Delete(c.Statefulset.Name, c.deleteOptions) + err := c.KubeClient.StatefulSets(c.Statefulset.Namespace).Delete(context.TODO(), c.Statefulset.Name, c.deleteOptions) if err != nil { return err } @@ -482,7 +492,7 @@ func (c *Cluster) createService(role PostgresRole) (*v1.Service, error) { c.setProcessName("creating %v service", role) serviceSpec := c.generateService(role, &c.Spec) - service, err := c.KubeClient.Services(serviceSpec.Namespace).Create(serviceSpec) + service, err := c.KubeClient.Services(serviceSpec.Namespace).Create(context.TODO(), serviceSpec, metav1.CreateOptions{}) if err != nil { return nil, err } @@ -509,9 +519,12 @@ func (c *Cluster) updateService(role PostgresRole, newService *v1.Service) error if len(newService.ObjectMeta.Annotations) > 0 { if annotationsPatchData, err := metaAnnotationsPatch(newService.ObjectMeta.Annotations); err == nil { _, err = c.KubeClient.Services(serviceName.Namespace).Patch( + context.TODO(), serviceName.Name, types.MergePatchType, - []byte(annotationsPatchData), "") + []byte(annotationsPatchData), + metav1.PatchOptions{}, + "") if err != nil { return fmt.Errorf("could not replace annotations for the service %q: %v", serviceName, err) @@ -528,7 +541,7 @@ func (c *Cluster) updateService(role PostgresRole, newService *v1.Service) error if newServiceType == "ClusterIP" && newServiceType != oldServiceType { newService.ResourceVersion = c.Services[role].ResourceVersion newService.Spec.ClusterIP = c.Services[role].Spec.ClusterIP - svc, err = c.KubeClient.Services(serviceName.Namespace).Update(newService) + svc, err = c.KubeClient.Services(serviceName.Namespace).Update(context.TODO(), newService, metav1.UpdateOptions{}) if err != nil { return fmt.Errorf("could not update service %q: %v", serviceName, err) } @@ -539,9 +552,7 @@ func (c *Cluster) updateService(role PostgresRole, newService *v1.Service) error } svc, err = c.KubeClient.Services(serviceName.Namespace).Patch( - serviceName.Name, - types.MergePatchType, - patchData, "") + context.TODO(), serviceName.Name, types.MergePatchType, patchData, metav1.PatchOptions{}, "") if err != nil { return fmt.Errorf("could not patch service %q: %v", serviceName, err) } @@ -560,7 +571,7 @@ func (c *Cluster) deleteService(role PostgresRole) error { return nil } - if err := c.KubeClient.Services(service.Namespace).Delete(service.Name, c.deleteOptions); err != nil { + if err := c.KubeClient.Services(service.Namespace).Delete(context.TODO(), service.Name, c.deleteOptions); err != nil { return err } @@ -584,7 +595,7 @@ func (c *Cluster) createEndpoint(role PostgresRole) (*v1.Endpoints, error) { } endpointsSpec := c.generateEndpoint(role, subsets) - endpoints, err := c.KubeClient.Endpoints(endpointsSpec.Namespace).Create(endpointsSpec) + endpoints, err := c.KubeClient.Endpoints(endpointsSpec.Namespace).Create(context.TODO(), endpointsSpec, metav1.CreateOptions{}) if err != nil { return nil, fmt.Errorf("could not create %s endpoint: %v", role, err) } @@ -626,7 +637,7 @@ func (c *Cluster) createPodDisruptionBudget() (*policybeta1.PodDisruptionBudget, podDisruptionBudgetSpec := c.generatePodDisruptionBudget() podDisruptionBudget, err := c.KubeClient. PodDisruptionBudgets(podDisruptionBudgetSpec.Namespace). - Create(podDisruptionBudgetSpec) + Create(context.TODO(), podDisruptionBudgetSpec, metav1.CreateOptions{}) if err != nil { return nil, err @@ -647,7 +658,7 @@ func (c *Cluster) updatePodDisruptionBudget(pdb *policybeta1.PodDisruptionBudget newPdb, err := c.KubeClient. PodDisruptionBudgets(pdb.Namespace). - Create(pdb) + Create(context.TODO(), pdb, metav1.CreateOptions{}) if err != nil { return fmt.Errorf("could not create pod disruption budget: %v", err) } @@ -665,7 +676,7 @@ func (c *Cluster) deletePodDisruptionBudget() error { pdbName := util.NameFromMeta(c.PodDisruptionBudget.ObjectMeta) err := c.KubeClient. PodDisruptionBudgets(c.PodDisruptionBudget.Namespace). - Delete(c.PodDisruptionBudget.Name, c.deleteOptions) + Delete(context.TODO(), c.PodDisruptionBudget.Name, c.deleteOptions) if err != nil { return fmt.Errorf("could not delete pod disruption budget: %v", err) } @@ -674,7 +685,7 @@ func (c *Cluster) deletePodDisruptionBudget() error { err = retryutil.Retry(c.OpConfig.ResourceCheckInterval, c.OpConfig.ResourceCheckTimeout, func() (bool, error) { - _, err2 := c.KubeClient.PodDisruptionBudgets(pdbName.Namespace).Get(pdbName.Name, metav1.GetOptions{}) + _, err2 := c.KubeClient.PodDisruptionBudgets(pdbName.Namespace).Get(context.TODO(), pdbName.Name, metav1.GetOptions{}) if err2 == nil { return false, nil } @@ -697,7 +708,8 @@ func (c *Cluster) deleteEndpoint(role PostgresRole) error { return fmt.Errorf("there is no %s endpoint in the cluster", role) } - if err := c.KubeClient.Endpoints(c.Endpoints[role].Namespace).Delete(c.Endpoints[role].Name, c.deleteOptions); err != nil { + if err := c.KubeClient.Endpoints(c.Endpoints[role].Namespace).Delete( + context.TODO(), c.Endpoints[role].Name, c.deleteOptions); err != nil { return fmt.Errorf("could not delete endpoint: %v", err) } @@ -711,7 +723,7 @@ func (c *Cluster) deleteEndpoint(role PostgresRole) error { func (c *Cluster) deleteSecret(secret *v1.Secret) error { c.setProcessName("deleting secret %q", util.NameFromMeta(secret.ObjectMeta)) c.logger.Debugf("deleting secret %q", util.NameFromMeta(secret.ObjectMeta)) - err := c.KubeClient.Secrets(secret.Namespace).Delete(secret.Name, c.deleteOptions) + err := c.KubeClient.Secrets(secret.Namespace).Delete(context.TODO(), secret.Name, c.deleteOptions) if err != nil { return err } @@ -736,7 +748,7 @@ func (c *Cluster) createLogicalBackupJob() (err error) { } c.logger.Debugf("Generated cronJobSpec: %v", logicalBackupJobSpec) - _, err = c.KubeClient.CronJobsGetter.CronJobs(c.Namespace).Create(logicalBackupJobSpec) + _, err = c.KubeClient.CronJobsGetter.CronJobs(c.Namespace).Create(context.TODO(), logicalBackupJobSpec, metav1.CreateOptions{}) if err != nil { return fmt.Errorf("could not create k8s cron job: %v", err) } @@ -754,9 +766,12 @@ func (c *Cluster) patchLogicalBackupJob(newJob *batchv1beta1.CronJob) error { // update the backup job spec _, err = c.KubeClient.CronJobsGetter.CronJobs(c.Namespace).Patch( + context.TODO(), c.getLogicalBackupJobName(), types.MergePatchType, - patchData, "") + patchData, + metav1.PatchOptions{}, + "") if err != nil { return fmt.Errorf("could not patch logical backup job: %v", err) } @@ -768,7 +783,7 @@ func (c *Cluster) deleteLogicalBackupJob() error { c.logger.Info("removing the logical backup job") - return c.KubeClient.CronJobsGetter.CronJobs(c.Namespace).Delete(c.getLogicalBackupJobName(), c.deleteOptions) + return c.KubeClient.CronJobsGetter.CronJobs(c.Namespace).Delete(context.TODO(), c.getLogicalBackupJobName(), c.deleteOptions) } // GetServiceMaster returns cluster's kubernetes master Service @@ -818,11 +833,13 @@ func (c *Cluster) updateConnPoolDeployment(oldDeploymentSpec, newDeployment *app // worker at one time will try to update it chances of conflicts are // minimal. deployment, err := c.KubeClient. - Deployments(c.ConnectionPool.Deployment.Namespace). - Patch( - c.ConnectionPool.Deployment.Name, - types.MergePatchType, - patchData, "") + Deployments(c.ConnectionPool.Deployment.Namespace).Patch( + context.TODO(), + c.ConnectionPool.Deployment.Name, + types.MergePatchType, + patchData, + metav1.PatchOptions{}, + "") if err != nil { return nil, fmt.Errorf("could not patch deployment: %v", err) } diff --git a/pkg/cluster/sync.go b/pkg/cluster/sync.go index a7c933ae7..eb3835787 100644 --- a/pkg/cluster/sync.go +++ b/pkg/cluster/sync.go @@ -1,6 +1,7 @@ package cluster import ( + "context" "fmt" batchv1beta1 "k8s.io/api/batch/v1beta1" @@ -140,7 +141,7 @@ func (c *Cluster) syncService(role PostgresRole) error { ) c.setProcessName("syncing %s service", role) - if svc, err = c.KubeClient.Services(c.Namespace).Get(c.serviceName(role), metav1.GetOptions{}); err == nil { + if svc, err = c.KubeClient.Services(c.Namespace).Get(context.TODO(), c.serviceName(role), metav1.GetOptions{}); err == nil { c.Services[role] = svc desiredSvc := c.generateService(role, &c.Spec) if match, reason := k8sutil.SameService(svc, desiredSvc); !match { @@ -166,7 +167,7 @@ func (c *Cluster) syncService(role PostgresRole) error { return fmt.Errorf("could not create missing %s service: %v", role, err) } c.logger.Infof("%s service %q already exists", role, util.NameFromMeta(svc.ObjectMeta)) - if svc, err = c.KubeClient.Services(c.Namespace).Get(c.serviceName(role), metav1.GetOptions{}); err != nil { + if svc, err = c.KubeClient.Services(c.Namespace).Get(context.TODO(), c.serviceName(role), metav1.GetOptions{}); err != nil { return fmt.Errorf("could not fetch existing %s service: %v", role, err) } } @@ -181,7 +182,7 @@ func (c *Cluster) syncEndpoint(role PostgresRole) error { ) c.setProcessName("syncing %s endpoint", role) - if ep, err = c.KubeClient.Endpoints(c.Namespace).Get(c.endpointName(role), metav1.GetOptions{}); err == nil { + if ep, err = c.KubeClient.Endpoints(c.Namespace).Get(context.TODO(), c.endpointName(role), metav1.GetOptions{}); err == nil { // TODO: No syncing of endpoints here, is this covered completely by updateService? c.Endpoints[role] = ep return nil @@ -200,7 +201,7 @@ func (c *Cluster) syncEndpoint(role PostgresRole) error { return fmt.Errorf("could not create missing %s endpoint: %v", role, err) } c.logger.Infof("%s endpoint %q already exists", role, util.NameFromMeta(ep.ObjectMeta)) - if ep, err = c.KubeClient.Endpoints(c.Namespace).Get(c.endpointName(role), metav1.GetOptions{}); err != nil { + if ep, err = c.KubeClient.Endpoints(c.Namespace).Get(context.TODO(), c.endpointName(role), metav1.GetOptions{}); err != nil { return fmt.Errorf("could not fetch existing %s endpoint: %v", role, err) } } @@ -213,7 +214,7 @@ func (c *Cluster) syncPodDisruptionBudget(isUpdate bool) error { pdb *policybeta1.PodDisruptionBudget err error ) - if pdb, err = c.KubeClient.PodDisruptionBudgets(c.Namespace).Get(c.podDisruptionBudgetName(), metav1.GetOptions{}); err == nil { + if pdb, err = c.KubeClient.PodDisruptionBudgets(c.Namespace).Get(context.TODO(), c.podDisruptionBudgetName(), metav1.GetOptions{}); err == nil { c.PodDisruptionBudget = pdb newPDB := c.generatePodDisruptionBudget() if match, reason := k8sutil.SamePDB(pdb, newPDB); !match { @@ -239,7 +240,7 @@ func (c *Cluster) syncPodDisruptionBudget(isUpdate bool) error { return fmt.Errorf("could not create pod disruption budget: %v", err) } c.logger.Infof("pod disruption budget %q already exists", util.NameFromMeta(pdb.ObjectMeta)) - if pdb, err = c.KubeClient.PodDisruptionBudgets(c.Namespace).Get(c.podDisruptionBudgetName(), metav1.GetOptions{}); err != nil { + if pdb, err = c.KubeClient.PodDisruptionBudgets(c.Namespace).Get(context.TODO(), c.podDisruptionBudgetName(), metav1.GetOptions{}); err != nil { return fmt.Errorf("could not fetch existing %q pod disruption budget", util.NameFromMeta(pdb.ObjectMeta)) } } @@ -255,7 +256,7 @@ func (c *Cluster) syncStatefulSet() error { podsRollingUpdateRequired bool ) // NB: Be careful to consider the codepath that acts on podsRollingUpdateRequired before returning early. - sset, err := c.KubeClient.StatefulSets(c.Namespace).Get(c.statefulSetName(), metav1.GetOptions{}) + sset, err := c.KubeClient.StatefulSets(c.Namespace).Get(context.TODO(), c.statefulSetName(), metav1.GetOptions{}) if err != nil { if !k8sutil.ResourceNotFound(err) { return fmt.Errorf("could not get statefulset: %v", err) @@ -404,14 +405,14 @@ func (c *Cluster) syncSecrets() error { secrets := c.generateUserSecrets() for secretUsername, secretSpec := range secrets { - if secret, err = c.KubeClient.Secrets(secretSpec.Namespace).Create(secretSpec); err == nil { + if secret, err = c.KubeClient.Secrets(secretSpec.Namespace).Create(context.TODO(), secretSpec, metav1.CreateOptions{}); err == nil { c.Secrets[secret.UID] = secret c.logger.Debugf("created new secret %q, uid: %q", util.NameFromMeta(secret.ObjectMeta), secret.UID) continue } if k8sutil.ResourceAlreadyExists(err) { var userMap map[string]spec.PgUser - if secret, err = c.KubeClient.Secrets(secretSpec.Namespace).Get(secretSpec.Name, metav1.GetOptions{}); err != nil { + if secret, err = c.KubeClient.Secrets(secretSpec.Namespace).Get(context.TODO(), secretSpec.Name, metav1.GetOptions{}); err != nil { return fmt.Errorf("could not get current secret: %v", err) } if secretUsername != string(secret.Data["username"]) { @@ -434,7 +435,7 @@ func (c *Cluster) syncSecrets() error { pwdUser.Origin == spec.RoleOriginInfrastructure { c.logger.Debugf("updating the secret %q from the infrastructure roles", secretSpec.Name) - if _, err = c.KubeClient.Secrets(secretSpec.Namespace).Update(secretSpec); err != nil { + if _, err = c.KubeClient.Secrets(secretSpec.Namespace).Update(context.TODO(), secretSpec, metav1.UpdateOptions{}); err != nil { return fmt.Errorf("could not update infrastructure role secret for role %q: %v", secretUsername, err) } } else { @@ -577,7 +578,7 @@ func (c *Cluster) syncLogicalBackupJob() error { // sync the job if it exists jobName := c.getLogicalBackupJobName() - if job, err = c.KubeClient.CronJobsGetter.CronJobs(c.Namespace).Get(jobName, metav1.GetOptions{}); err == nil { + if job, err = c.KubeClient.CronJobsGetter.CronJobs(c.Namespace).Get(context.TODO(), jobName, metav1.GetOptions{}); err == nil { desiredJob, err = c.generateLogicalBackupJob() if err != nil { @@ -611,7 +612,7 @@ func (c *Cluster) syncLogicalBackupJob() error { return fmt.Errorf("could not create missing logical backup job: %v", err) } c.logger.Infof("logical backup job %q already exists", jobName) - if _, err = c.KubeClient.CronJobsGetter.CronJobs(c.Namespace).Get(jobName, metav1.GetOptions{}); err != nil { + if _, err = c.KubeClient.CronJobsGetter.CronJobs(c.Namespace).Get(context.TODO(), jobName, metav1.GetOptions{}); err != nil { return fmt.Errorf("could not fetch existing logical backup job: %v", err) } } @@ -696,7 +697,7 @@ func (c *Cluster) syncConnectionPool(oldSpec, newSpec *acidv1.Postgresql, lookup func (c *Cluster) syncConnectionPoolWorker(oldSpec, newSpec *acidv1.Postgresql) error { deployment, err := c.KubeClient. Deployments(c.Namespace). - Get(c.connPoolName(), metav1.GetOptions{}) + Get(context.TODO(), c.connPoolName(), metav1.GetOptions{}) if err != nil && k8sutil.ResourceNotFound(err) { msg := "Deployment %s for connection pool synchronization is not found, create it" @@ -710,7 +711,7 @@ func (c *Cluster) syncConnectionPoolWorker(oldSpec, newSpec *acidv1.Postgresql) deployment, err := c.KubeClient. Deployments(deploymentSpec.Namespace). - Create(deploymentSpec) + Create(context.TODO(), deploymentSpec, metav1.CreateOptions{}) if err != nil { return err @@ -755,7 +756,7 @@ func (c *Cluster) syncConnectionPoolWorker(oldSpec, newSpec *acidv1.Postgresql) service, err := c.KubeClient. Services(c.Namespace). - Get(c.connPoolName(), metav1.GetOptions{}) + Get(context.TODO(), c.connPoolName(), metav1.GetOptions{}) if err != nil && k8sutil.ResourceNotFound(err) { msg := "Service %s for connection pool synchronization is not found, create it" @@ -764,7 +765,7 @@ func (c *Cluster) syncConnectionPoolWorker(oldSpec, newSpec *acidv1.Postgresql) serviceSpec := c.generateConnPoolService(&newSpec.Spec) service, err := c.KubeClient. Services(serviceSpec.Namespace). - Create(serviceSpec) + Create(context.TODO(), serviceSpec, metav1.CreateOptions{}) if err != nil { return err diff --git a/pkg/cluster/util.go b/pkg/cluster/util.go index dc1e93954..3c3dffcaf 100644 --- a/pkg/cluster/util.go +++ b/pkg/cluster/util.go @@ -2,6 +2,7 @@ package cluster import ( "bytes" + "context" "encoding/gob" "encoding/json" "fmt" @@ -47,7 +48,7 @@ func (g *SecretOauthTokenGetter) getOAuthToken() (string, error) { // Temporary getting postgresql-operator secret from the NamespaceDefault credentialsSecret, err := g.kubeClient. Secrets(g.OAuthTokenSecretName.Namespace). - Get(g.OAuthTokenSecretName.Name, metav1.GetOptions{}) + Get(context.TODO(), g.OAuthTokenSecretName.Name, metav1.GetOptions{}) if err != nil { return "", fmt.Errorf("could not get credentials secret: %v", err) @@ -278,7 +279,7 @@ func (c *Cluster) waitStatefulsetReady() error { listOptions := metav1.ListOptions{ LabelSelector: c.labelsSet(false).String(), } - ss, err := c.KubeClient.StatefulSets(c.Namespace).List(listOptions) + ss, err := c.KubeClient.StatefulSets(c.Namespace).List(context.TODO(), listOptions) if err != nil { return false, err } @@ -313,7 +314,7 @@ func (c *Cluster) _waitPodLabelsReady(anyReplica bool) error { } podsNumber = 1 if !anyReplica { - pods, err := c.KubeClient.Pods(namespace).List(listOptions) + pods, err := c.KubeClient.Pods(namespace).List(context.TODO(), listOptions) if err != nil { return err } @@ -327,7 +328,7 @@ func (c *Cluster) _waitPodLabelsReady(anyReplica bool) error { func() (bool, error) { masterCount := 0 if !anyReplica { - masterPods, err2 := c.KubeClient.Pods(namespace).List(masterListOption) + masterPods, err2 := c.KubeClient.Pods(namespace).List(context.TODO(), masterListOption) if err2 != nil { return false, err2 } @@ -337,7 +338,7 @@ func (c *Cluster) _waitPodLabelsReady(anyReplica bool) error { } masterCount = len(masterPods.Items) } - replicaPods, err2 := c.KubeClient.Pods(namespace).List(replicaListOption) + replicaPods, err2 := c.KubeClient.Pods(namespace).List(context.TODO(), replicaListOption) if err2 != nil { return false, err2 } diff --git a/pkg/cluster/volumes.go b/pkg/cluster/volumes.go index d92ae6258..a5bfe6c2d 100644 --- a/pkg/cluster/volumes.go +++ b/pkg/cluster/volumes.go @@ -1,11 +1,12 @@ package cluster import ( + "context" "fmt" "strconv" "strings" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -23,7 +24,7 @@ func (c *Cluster) listPersistentVolumeClaims() ([]v1.PersistentVolumeClaim, erro LabelSelector: c.labelsSet(false).String(), } - pvcs, err := c.KubeClient.PersistentVolumeClaims(ns).List(listOptions) + pvcs, err := c.KubeClient.PersistentVolumeClaims(ns).List(context.TODO(), listOptions) if err != nil { return nil, fmt.Errorf("could not list of PersistentVolumeClaims: %v", err) } @@ -38,7 +39,7 @@ func (c *Cluster) deletePersistentVolumeClaims() error { } for _, pvc := range pvcs { c.logger.Debugf("deleting PVC %q", util.NameFromMeta(pvc.ObjectMeta)) - if err := c.KubeClient.PersistentVolumeClaims(pvc.Namespace).Delete(pvc.Name, c.deleteOptions); err != nil { + if err := c.KubeClient.PersistentVolumeClaims(pvc.Namespace).Delete(context.TODO(), pvc.Name, c.deleteOptions); err != nil { c.logger.Warningf("could not delete PersistentVolumeClaim: %v", err) } } @@ -78,7 +79,7 @@ func (c *Cluster) listPersistentVolumes() ([]*v1.PersistentVolume, error) { continue } } - pv, err := c.KubeClient.PersistentVolumes().Get(pvc.Spec.VolumeName, metav1.GetOptions{}) + pv, err := c.KubeClient.PersistentVolumes().Get(context.TODO(), pvc.Spec.VolumeName, metav1.GetOptions{}) if err != nil { return nil, fmt.Errorf("could not get PersistentVolume: %v", err) } @@ -143,7 +144,7 @@ func (c *Cluster) resizeVolumes(newVolume acidv1.Volume, resizers []volumes.Volu c.logger.Debugf("filesystem resize successful on volume %q", pv.Name) pv.Spec.Capacity[v1.ResourceStorage] = newQuantity c.logger.Debugf("updating persistent volume definition for volume %q", pv.Name) - if _, err := c.KubeClient.PersistentVolumes().Update(pv); err != nil { + if _, err := c.KubeClient.PersistentVolumes().Update(context.TODO(), pv, metav1.UpdateOptions{}); err != nil { return fmt.Errorf("could not update persistent volume: %q", err) } c.logger.Debugf("successfully updated persistent volume %q", pv.Name) diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 0ce0d026e..9c48b7ef2 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -1,6 +1,7 @@ package controller import ( + "context" "fmt" "os" "sync" @@ -99,7 +100,7 @@ func (c *Controller) initOperatorConfig() { if c.config.ConfigMapName != (spec.NamespacedName{}) { configMap, err := c.KubeClient.ConfigMaps(c.config.ConfigMapName.Namespace). - Get(c.config.ConfigMapName.Name, metav1.GetOptions{}) + Get(context.TODO(), c.config.ConfigMapName.Name, metav1.GetOptions{}) if err != nil { panic(err) } @@ -406,7 +407,7 @@ func (c *Controller) getEffectiveNamespace(namespaceFromEnvironment, namespaceFr } else { - if _, err := c.KubeClient.Namespaces().Get(namespace, metav1.GetOptions{}); err != nil { + if _, err := c.KubeClient.Namespaces().Get(context.TODO(), namespace, metav1.GetOptions{}); err != nil { c.logger.Fatalf("Could not find the watched namespace %q", namespace) } else { c.logger.Infof("Listenting to the specific namespace %q", namespace) diff --git a/pkg/controller/node.go b/pkg/controller/node.go index 8052458c3..be41b79ab 100644 --- a/pkg/controller/node.go +++ b/pkg/controller/node.go @@ -1,6 +1,7 @@ package controller import ( + "context" "fmt" "time" @@ -22,7 +23,7 @@ func (c *Controller) nodeListFunc(options metav1.ListOptions) (runtime.Object, e TimeoutSeconds: options.TimeoutSeconds, } - return c.KubeClient.Nodes().List(opts) + return c.KubeClient.Nodes().List(context.TODO(), opts) } func (c *Controller) nodeWatchFunc(options metav1.ListOptions) (watch.Interface, error) { @@ -32,7 +33,7 @@ func (c *Controller) nodeWatchFunc(options metav1.ListOptions) (watch.Interface, TimeoutSeconds: options.TimeoutSeconds, } - return c.KubeClient.Nodes().Watch(opts) + return c.KubeClient.Nodes().Watch(context.TODO(), opts) } func (c *Controller) nodeAdd(obj interface{}) { @@ -87,7 +88,7 @@ func (c *Controller) attemptToMoveMasterPodsOffNode(node *v1.Node) error { opts := metav1.ListOptions{ LabelSelector: labels.Set(c.opConfig.ClusterLabels).String(), } - podList, err := c.KubeClient.Pods(c.opConfig.WatchedNamespace).List(opts) + podList, err := c.KubeClient.Pods(c.opConfig.WatchedNamespace).List(context.TODO(), opts) if err != nil { c.logger.Errorf("could not fetch list of the pods: %v", err) return err diff --git a/pkg/controller/operator_config.go b/pkg/controller/operator_config.go index 970eef701..c9b3c5ea4 100644 --- a/pkg/controller/operator_config.go +++ b/pkg/controller/operator_config.go @@ -1,6 +1,7 @@ package controller import ( + "context" "fmt" "time" @@ -14,7 +15,8 @@ import ( func (c *Controller) readOperatorConfigurationFromCRD(configObjectNamespace, configObjectName string) (*acidv1.OperatorConfiguration, error) { - config, err := c.KubeClient.AcidV1ClientSet.AcidV1().OperatorConfigurations(configObjectNamespace).Get(configObjectName, metav1.GetOptions{}) + config, err := c.KubeClient.AcidV1ClientSet.AcidV1().OperatorConfigurations(configObjectNamespace).Get( + context.TODO(), configObjectName, metav1.GetOptions{}) if err != nil { return nil, fmt.Errorf("could not get operator configuration object %q: %v", configObjectName, err) } diff --git a/pkg/controller/pod.go b/pkg/controller/pod.go index 27fd6c956..0defe88b1 100644 --- a/pkg/controller/pod.go +++ b/pkg/controller/pod.go @@ -1,7 +1,9 @@ package controller import ( - "k8s.io/api/core/v1" + "context" + + v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/watch" @@ -19,7 +21,7 @@ func (c *Controller) podListFunc(options metav1.ListOptions) (runtime.Object, er TimeoutSeconds: options.TimeoutSeconds, } - return c.KubeClient.Pods(c.opConfig.WatchedNamespace).List(opts) + return c.KubeClient.Pods(c.opConfig.WatchedNamespace).List(context.TODO(), opts) } func (c *Controller) podWatchFunc(options metav1.ListOptions) (watch.Interface, error) { @@ -29,7 +31,7 @@ func (c *Controller) podWatchFunc(options metav1.ListOptions) (watch.Interface, TimeoutSeconds: options.TimeoutSeconds, } - return c.KubeClient.Pods(c.opConfig.WatchedNamespace).Watch(opts) + return c.KubeClient.Pods(c.opConfig.WatchedNamespace).Watch(context.TODO(), opts) } func (c *Controller) dispatchPodEvent(clusterName spec.NamespacedName, event cluster.PodEvent) { diff --git a/pkg/controller/postgresql.go b/pkg/controller/postgresql.go index 5d48bac39..e81671c7d 100644 --- a/pkg/controller/postgresql.go +++ b/pkg/controller/postgresql.go @@ -1,6 +1,7 @@ package controller import ( + "context" "fmt" "reflect" "strings" @@ -43,7 +44,7 @@ func (c *Controller) listClusters(options metav1.ListOptions) (*acidv1.Postgresq var pgList acidv1.PostgresqlList // TODO: use the SharedInformer cache instead of quering Kubernetes API directly. - list, err := c.KubeClient.AcidV1ClientSet.AcidV1().Postgresqls(c.opConfig.WatchedNamespace).List(options) + list, err := c.KubeClient.AcidV1ClientSet.AcidV1().Postgresqls(c.opConfig.WatchedNamespace).List(context.TODO(), options) if err != nil { c.logger.Errorf("could not list postgresql objects: %v", err) } @@ -535,7 +536,7 @@ func (c *Controller) submitRBACCredentials(event ClusterEvent) error { func (c *Controller) createPodServiceAccount(namespace string) error { podServiceAccountName := c.opConfig.PodServiceAccountName - _, err := c.KubeClient.ServiceAccounts(namespace).Get(podServiceAccountName, metav1.GetOptions{}) + _, err := c.KubeClient.ServiceAccounts(namespace).Get(context.TODO(), podServiceAccountName, metav1.GetOptions{}) if k8sutil.ResourceNotFound(err) { c.logger.Infof(fmt.Sprintf("creating pod service account %q in the %q namespace", podServiceAccountName, namespace)) @@ -543,7 +544,7 @@ func (c *Controller) createPodServiceAccount(namespace string) error { // get a separate copy of service account // to prevent a race condition when setting a namespace for many clusters sa := *c.PodServiceAccount - if _, err = c.KubeClient.ServiceAccounts(namespace).Create(&sa); err != nil { + if _, err = c.KubeClient.ServiceAccounts(namespace).Create(context.TODO(), &sa, metav1.CreateOptions{}); err != nil { return fmt.Errorf("cannot deploy the pod service account %q defined in the configuration to the %q namespace: %v", podServiceAccountName, namespace, err) } @@ -560,7 +561,7 @@ func (c *Controller) createRoleBindings(namespace string) error { podServiceAccountName := c.opConfig.PodServiceAccountName podServiceAccountRoleBindingName := c.PodServiceAccountRoleBinding.Name - _, err := c.KubeClient.RoleBindings(namespace).Get(podServiceAccountRoleBindingName, metav1.GetOptions{}) + _, err := c.KubeClient.RoleBindings(namespace).Get(context.TODO(), podServiceAccountRoleBindingName, metav1.GetOptions{}) if k8sutil.ResourceNotFound(err) { c.logger.Infof("Creating the role binding %q in the %q namespace", podServiceAccountRoleBindingName, namespace) @@ -568,7 +569,7 @@ func (c *Controller) createRoleBindings(namespace string) error { // get a separate copy of role binding // to prevent a race condition when setting a namespace for many clusters rb := *c.PodServiceAccountRoleBinding - _, err = c.KubeClient.RoleBindings(namespace).Create(&rb) + _, err = c.KubeClient.RoleBindings(namespace).Create(context.TODO(), &rb, metav1.CreateOptions{}) if err != nil { return fmt.Errorf("cannot bind the pod service account %q defined in the configuration to the cluster role in the %q namespace: %v", podServiceAccountName, namespace, err) } diff --git a/pkg/controller/util.go b/pkg/controller/util.go index 9b7dca063..511f02823 100644 --- a/pkg/controller/util.go +++ b/pkg/controller/util.go @@ -1,6 +1,7 @@ package controller import ( + "context" "encoding/json" "fmt" @@ -50,7 +51,7 @@ func (c *Controller) clusterWorkerID(clusterName spec.NamespacedName) uint32 { } func (c *Controller) createOperatorCRD(crd *apiextv1beta1.CustomResourceDefinition) error { - if _, err := c.KubeClient.CustomResourceDefinitions().Create(crd); err != nil { + if _, err := c.KubeClient.CustomResourceDefinitions().Create(context.TODO(), crd, metav1.CreateOptions{}); err != nil { if k8sutil.ResourceAlreadyExists(err) { c.logger.Infof("customResourceDefinition %q is already registered and will only be updated", crd.Name) @@ -58,7 +59,8 @@ func (c *Controller) createOperatorCRD(crd *apiextv1beta1.CustomResourceDefiniti if err != nil { return fmt.Errorf("could not marshal new customResourceDefintion: %v", err) } - if _, err := c.KubeClient.CustomResourceDefinitions().Patch(crd.Name, types.MergePatchType, patch); err != nil { + if _, err := c.KubeClient.CustomResourceDefinitions().Patch( + context.TODO(), crd.Name, types.MergePatchType, patch, metav1.PatchOptions{}); err != nil { return fmt.Errorf("could not update customResourceDefinition: %v", err) } } else { @@ -69,7 +71,7 @@ func (c *Controller) createOperatorCRD(crd *apiextv1beta1.CustomResourceDefiniti } return wait.Poll(c.config.CRDReadyWaitInterval, c.config.CRDReadyWaitTimeout, func() (bool, error) { - c, err := c.KubeClient.CustomResourceDefinitions().Get(crd.Name, metav1.GetOptions{}) + c, err := c.KubeClient.CustomResourceDefinitions().Get(context.TODO(), crd.Name, metav1.GetOptions{}) if err != nil { return false, err } @@ -115,7 +117,7 @@ func (c *Controller) getInfrastructureRoles(rolesSecret *spec.NamespacedName) (m infraRolesSecret, err := c.KubeClient. Secrets(rolesSecret.Namespace). - Get(rolesSecret.Name, metav1.GetOptions{}) + Get(context.TODO(), rolesSecret.Name, metav1.GetOptions{}) if err != nil { c.logger.Debugf("infrastructure roles secret name: %q", *rolesSecret) return nil, fmt.Errorf("could not get infrastructure roles secret: %v", err) @@ -161,7 +163,8 @@ Users: } // perhaps we have some map entries with usernames, passwords, let's check if we have those users in the configmap - if infraRolesMap, err := c.KubeClient.ConfigMaps(rolesSecret.Namespace).Get(rolesSecret.Name, metav1.GetOptions{}); err == nil { + if infraRolesMap, err := c.KubeClient.ConfigMaps(rolesSecret.Namespace).Get( + context.TODO(), rolesSecret.Name, metav1.GetOptions{}); err == nil { // we have a configmap with username - json description, let's read and decode it for role, s := range infraRolesMap.Data { roleDescr, err := readDecodedRole(s) diff --git a/pkg/util/k8sutil/k8sutil.go b/pkg/util/k8sutil/k8sutil.go index 75b99ec7c..3a397af7d 100644 --- a/pkg/util/k8sutil/k8sutil.go +++ b/pkg/util/k8sutil/k8sutil.go @@ -1,6 +1,7 @@ package k8sutil import ( + "context" "fmt" "reflect" @@ -237,7 +238,7 @@ func SameLogicalBackupJob(cur, new *batchv1beta1.CronJob) (match bool, reason st return true, "" } -func (c *mockSecret) Get(name string, options metav1.GetOptions) (*v1.Secret, error) { +func (c *mockSecret) Get(ctx context.Context, name string, options metav1.GetOptions) (*v1.Secret, error) { if name != "infrastructureroles-test" { return nil, fmt.Errorf("NotFound") } @@ -253,7 +254,7 @@ func (c *mockSecret) Get(name string, options metav1.GetOptions) (*v1.Secret, er } -func (c *mockConfigMap) Get(name string, options metav1.GetOptions) (*v1.ConfigMap, error) { +func (c *mockConfigMap) Get(ctx context.Context, name string, options metav1.GetOptions) (*v1.ConfigMap, error) { if name != "infrastructureroles-test" { return nil, fmt.Errorf("NotFound") } @@ -283,7 +284,7 @@ func (mock *MockDeploymentNotExistGetter) Deployments(namespace string) appsv1.D return &mockDeploymentNotExist{} } -func (mock *mockDeployment) Create(*apiappsv1.Deployment) (*apiappsv1.Deployment, error) { +func (mock *mockDeployment) Create(context.Context, *apiappsv1.Deployment, metav1.CreateOptions) (*apiappsv1.Deployment, error) { return &apiappsv1.Deployment{ ObjectMeta: metav1.ObjectMeta{ Name: "test-deployment", @@ -294,11 +295,11 @@ func (mock *mockDeployment) Create(*apiappsv1.Deployment) (*apiappsv1.Deployment }, nil } -func (mock *mockDeployment) Delete(name string, opts *metav1.DeleteOptions) error { +func (mock *mockDeployment) Delete(ctx context.Context, name string, opts metav1.DeleteOptions) error { return nil } -func (mock *mockDeployment) Get(name string, opts metav1.GetOptions) (*apiappsv1.Deployment, error) { +func (mock *mockDeployment) Get(ctx context.Context, name string, opts metav1.GetOptions) (*apiappsv1.Deployment, error) { return &apiappsv1.Deployment{ ObjectMeta: metav1.ObjectMeta{ Name: "test-deployment", @@ -318,7 +319,7 @@ func (mock *mockDeployment) Get(name string, opts metav1.GetOptions) (*apiappsv1 }, nil } -func (mock *mockDeployment) Patch(name string, t types.PatchType, data []byte, subres ...string) (*apiappsv1.Deployment, error) { +func (mock *mockDeployment) Patch(ctx context.Context, name string, t types.PatchType, data []byte, opts metav1.PatchOptions, subres ...string) (*apiappsv1.Deployment, error) { return &apiappsv1.Deployment{ Spec: apiappsv1.DeploymentSpec{ Replicas: Int32ToPointer(2), @@ -329,7 +330,7 @@ func (mock *mockDeployment) Patch(name string, t types.PatchType, data []byte, s }, nil } -func (mock *mockDeploymentNotExist) Get(name string, opts metav1.GetOptions) (*apiappsv1.Deployment, error) { +func (mock *mockDeploymentNotExist) Get(ctx context.Context, name string, opts metav1.GetOptions) (*apiappsv1.Deployment, error) { return nil, &apierrors.StatusError{ ErrStatus: metav1.Status{ Reason: metav1.StatusReasonNotFound, @@ -337,7 +338,7 @@ func (mock *mockDeploymentNotExist) Get(name string, opts metav1.GetOptions) (*a } } -func (mock *mockDeploymentNotExist) Create(*apiappsv1.Deployment) (*apiappsv1.Deployment, error) { +func (mock *mockDeploymentNotExist) Create(context.Context, *apiappsv1.Deployment, metav1.CreateOptions) (*apiappsv1.Deployment, error) { return &apiappsv1.Deployment{ ObjectMeta: metav1.ObjectMeta{ Name: "test-deployment", @@ -356,7 +357,7 @@ func (mock *MockServiceNotExistGetter) Services(namespace string) corev1.Service return &mockServiceNotExist{} } -func (mock *mockService) Create(*v1.Service) (*v1.Service, error) { +func (mock *mockService) Create(context.Context, *v1.Service, metav1.CreateOptions) (*v1.Service, error) { return &v1.Service{ ObjectMeta: metav1.ObjectMeta{ Name: "test-service", @@ -364,11 +365,11 @@ func (mock *mockService) Create(*v1.Service) (*v1.Service, error) { }, nil } -func (mock *mockService) Delete(name string, opts *metav1.DeleteOptions) error { +func (mock *mockService) Delete(ctx context.Context, name string, opts metav1.DeleteOptions) error { return nil } -func (mock *mockService) Get(name string, opts metav1.GetOptions) (*v1.Service, error) { +func (mock *mockService) Get(ctx context.Context, name string, opts metav1.GetOptions) (*v1.Service, error) { return &v1.Service{ ObjectMeta: metav1.ObjectMeta{ Name: "test-service", @@ -376,7 +377,7 @@ func (mock *mockService) Get(name string, opts metav1.GetOptions) (*v1.Service, }, nil } -func (mock *mockServiceNotExist) Create(*v1.Service) (*v1.Service, error) { +func (mock *mockServiceNotExist) Create(context.Context, *v1.Service, metav1.CreateOptions) (*v1.Service, error) { return &v1.Service{ ObjectMeta: metav1.ObjectMeta{ Name: "test-service", @@ -384,7 +385,7 @@ func (mock *mockServiceNotExist) Create(*v1.Service) (*v1.Service, error) { }, nil } -func (mock *mockServiceNotExist) Get(name string, opts metav1.GetOptions) (*v1.Service, error) { +func (mock *mockServiceNotExist) Get(ctx context.Context, name string, opts metav1.GetOptions) (*v1.Service, error) { return nil, &apierrors.StatusError{ ErrStatus: metav1.Status{ Reason: metav1.StatusReasonNotFound, diff --git a/pkg/util/teams/teams_test.go b/pkg/util/teams/teams_test.go index 51bbcbc31..33d01b75b 100644 --- a/pkg/util/teams/teams_test.go +++ b/pkg/util/teams/teams_test.go @@ -133,11 +133,11 @@ var requestsURLtc = []struct { }{ { "coffee://localhost/", - fmt.Errorf(`Get coffee://localhost/teams/acid: unsupported protocol scheme "coffee"`), + fmt.Errorf(`Get "coffee://localhost/teams/acid": unsupported protocol scheme "coffee"`), }, { "http://192.168.0.%31/", - fmt.Errorf(`parse http://192.168.0.%%31/teams/acid: invalid URL escape "%%31"`), + fmt.Errorf(`parse "http://192.168.0.%%31/teams/acid": invalid URL escape "%%31"`), }, }