reflect k8s client API changes
This commit is contained in:
parent
927f1cb06f
commit
feef866164
|
|
@ -3,6 +3,7 @@ package cluster
|
|||
// Postgres CustomResourceDefinition object i.e. Spilo
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
|
|
@ -88,7 +89,7 @@ type Cluster struct {
|
|||
pgDb *sql.DB
|
||||
mu sync.Mutex
|
||||
userSyncStrategy spec.UserSyncer
|
||||
deleteOptions *metav1.DeleteOptions
|
||||
deleteOptions metav1.DeleteOptions
|
||||
podEventsQueue *cache.FIFO
|
||||
|
||||
teamsAPIClient teams.Interface
|
||||
|
|
@ -131,7 +132,7 @@ func New(cfg Config, kubeClient k8sutil.KubernetesClient, pgSpec acidv1.Postgres
|
|||
Services: make(map[PostgresRole]*v1.Service),
|
||||
Endpoints: make(map[PostgresRole]*v1.Endpoints)},
|
||||
userSyncStrategy: users.DefaultUserSyncStrategy{},
|
||||
deleteOptions: &metav1.DeleteOptions{PropagationPolicy: &deletePropagationPolicy},
|
||||
deleteOptions: metav1.DeleteOptions{PropagationPolicy: &deletePropagationPolicy},
|
||||
podEventsQueue: podEventsQueue,
|
||||
KubeClient: kubeClient,
|
||||
}
|
||||
|
|
@ -182,7 +183,8 @@ func (c *Cluster) setStatus(status string) {
|
|||
// we cannot do a full scale update here without fetching the previous manifest (as the resourceVersion may differ),
|
||||
// however, we could do patch without it. In the future, once /status subresource is there (starting Kubernetes 1.11)
|
||||
// we should take advantage of it.
|
||||
newspec, err := c.KubeClient.AcidV1ClientSet.AcidV1().Postgresqls(c.clusterNamespace()).Patch(c.Name, types.MergePatchType, patch, "status")
|
||||
newspec, err := c.KubeClient.AcidV1ClientSet.AcidV1().Postgresqls(c.clusterNamespace()).Patch(
|
||||
context.TODO(), c.Name, types.MergePatchType, patch, metav1.PatchOptions{}, "status")
|
||||
if err != nil {
|
||||
c.logger.Errorf("could not update status: %v", err)
|
||||
// return as newspec is empty, see PR654
|
||||
|
|
@ -1185,12 +1187,12 @@ func (c *Cluster) deleteClusterObject(
|
|||
|
||||
func (c *Cluster) deletePatroniClusterServices() error {
|
||||
get := func(name string) (spec.NamespacedName, error) {
|
||||
svc, err := c.KubeClient.Services(c.Namespace).Get(name, metav1.GetOptions{})
|
||||
svc, err := c.KubeClient.Services(c.Namespace).Get(context.TODO(), name, metav1.GetOptions{})
|
||||
return util.NameFromMeta(svc.ObjectMeta), err
|
||||
}
|
||||
|
||||
deleteServiceFn := func(name string) error {
|
||||
return c.KubeClient.Services(c.Namespace).Delete(name, c.deleteOptions)
|
||||
return c.KubeClient.Services(c.Namespace).Delete(context.TODO(), name, c.deleteOptions)
|
||||
}
|
||||
|
||||
return c.deleteClusterObject(get, deleteServiceFn, "service")
|
||||
|
|
@ -1198,12 +1200,12 @@ func (c *Cluster) deletePatroniClusterServices() error {
|
|||
|
||||
func (c *Cluster) deletePatroniClusterEndpoints() error {
|
||||
get := func(name string) (spec.NamespacedName, error) {
|
||||
ep, err := c.KubeClient.Endpoints(c.Namespace).Get(name, metav1.GetOptions{})
|
||||
ep, err := c.KubeClient.Endpoints(c.Namespace).Get(context.TODO(), name, metav1.GetOptions{})
|
||||
return util.NameFromMeta(ep.ObjectMeta), err
|
||||
}
|
||||
|
||||
deleteEndpointFn := func(name string) error {
|
||||
return c.KubeClient.Endpoints(c.Namespace).Delete(name, c.deleteOptions)
|
||||
return c.KubeClient.Endpoints(c.Namespace).Delete(context.TODO(), name, c.deleteOptions)
|
||||
}
|
||||
|
||||
return c.deleteClusterObject(get, deleteEndpointFn, "endpoint")
|
||||
|
|
@ -1211,12 +1213,12 @@ func (c *Cluster) deletePatroniClusterEndpoints() error {
|
|||
|
||||
func (c *Cluster) deletePatroniClusterConfigMaps() error {
|
||||
get := func(name string) (spec.NamespacedName, error) {
|
||||
cm, err := c.KubeClient.ConfigMaps(c.Namespace).Get(name, metav1.GetOptions{})
|
||||
cm, err := c.KubeClient.ConfigMaps(c.Namespace).Get(context.TODO(), name, metav1.GetOptions{})
|
||||
return util.NameFromMeta(cm.ObjectMeta), err
|
||||
}
|
||||
|
||||
deleteConfigMapFn := func(name string) error {
|
||||
return c.KubeClient.ConfigMaps(c.Namespace).Delete(name, c.deleteOptions)
|
||||
return c.KubeClient.ConfigMaps(c.Namespace).Delete(context.TODO(), name, c.deleteOptions)
|
||||
}
|
||||
|
||||
return c.deleteClusterObject(get, deleteConfigMapFn, "configmap")
|
||||
|
|
|
|||
|
|
@ -2,10 +2,11 @@ package cluster
|
|||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/client-go/kubernetes/scheme"
|
||||
"k8s.io/client-go/tools/remotecommand"
|
||||
|
|
@ -23,7 +24,7 @@ func (c *Cluster) ExecCommand(podName *spec.NamespacedName, command ...string) (
|
|||
execErr bytes.Buffer
|
||||
)
|
||||
|
||||
pod, err := c.KubeClient.Pods(podName.Namespace).Get(podName.Name, metav1.GetOptions{})
|
||||
pod, err := c.KubeClient.Pods(podName.Namespace).Get(context.TODO(), podName.Name, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("could not get pod info: %v", err)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,6 +1,7 @@
|
|||
package cluster
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"path"
|
||||
|
|
@ -914,11 +915,17 @@ func (c *Cluster) generateStatefulSet(spec *acidv1.PostgresSpec) (*appsv1.Statef
|
|||
|
||||
if c.OpConfig.PodEnvironmentConfigMap != (pkgspec.NamespacedName{}) {
|
||||
var cm *v1.ConfigMap
|
||||
cm, err = c.KubeClient.ConfigMaps(c.OpConfig.PodEnvironmentConfigMap.Namespace).Get(c.OpConfig.PodEnvironmentConfigMap.Name, metav1.GetOptions{})
|
||||
cm, err = c.KubeClient.ConfigMaps(c.OpConfig.PodEnvironmentConfigMap.Namespace).Get(
|
||||
context.TODO(),
|
||||
c.OpConfig.PodEnvironmentConfigMap.Name,
|
||||
metav1.GetOptions{})
|
||||
if err != nil {
|
||||
// if not found, try again using the cluster's namespace if it's different (old behavior)
|
||||
if k8sutil.ResourceNotFound(err) && c.Namespace != c.OpConfig.PodEnvironmentConfigMap.Namespace {
|
||||
cm, err = c.KubeClient.ConfigMaps(c.Namespace).Get(c.OpConfig.PodEnvironmentConfigMap.Name, metav1.GetOptions{})
|
||||
cm, err = c.KubeClient.ConfigMaps(c.Namespace).Get(
|
||||
context.TODO(),
|
||||
c.OpConfig.PodEnvironmentConfigMap.Name,
|
||||
metav1.GetOptions{})
|
||||
}
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not read PodEnvironmentConfigMap: %v", err)
|
||||
|
|
|
|||
|
|
@ -1,6 +1,7 @@
|
|||
package cluster
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
|
||||
|
|
@ -17,7 +18,7 @@ func (c *Cluster) listPods() ([]v1.Pod, error) {
|
|||
LabelSelector: c.labelsSet(false).String(),
|
||||
}
|
||||
|
||||
pods, err := c.KubeClient.Pods(c.Namespace).List(listOptions)
|
||||
pods, err := c.KubeClient.Pods(c.Namespace).List(context.TODO(), listOptions)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not get list of pods: %v", err)
|
||||
}
|
||||
|
|
@ -30,7 +31,7 @@ func (c *Cluster) getRolePods(role PostgresRole) ([]v1.Pod, error) {
|
|||
LabelSelector: c.roleLabelsSet(false, role).String(),
|
||||
}
|
||||
|
||||
pods, err := c.KubeClient.Pods(c.Namespace).List(listOptions)
|
||||
pods, err := c.KubeClient.Pods(c.Namespace).List(context.TODO(), listOptions)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not get list of pods: %v", err)
|
||||
}
|
||||
|
|
@ -73,7 +74,7 @@ func (c *Cluster) deletePod(podName spec.NamespacedName) error {
|
|||
ch := c.registerPodSubscriber(podName)
|
||||
defer c.unregisterPodSubscriber(podName)
|
||||
|
||||
if err := c.KubeClient.Pods(podName.Namespace).Delete(podName.Name, c.deleteOptions); err != nil {
|
||||
if err := c.KubeClient.Pods(podName.Namespace).Delete(context.TODO(), podName.Name, c.deleteOptions); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
|
|
@ -183,7 +184,7 @@ func (c *Cluster) MigrateMasterPod(podName spec.NamespacedName) error {
|
|||
eol bool
|
||||
)
|
||||
|
||||
oldMaster, err := c.KubeClient.Pods(podName.Namespace).Get(podName.Name, metav1.GetOptions{})
|
||||
oldMaster, err := c.KubeClient.Pods(podName.Namespace).Get(context.TODO(), podName.Name, metav1.GetOptions{})
|
||||
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not get pod: %v", err)
|
||||
|
|
@ -206,7 +207,9 @@ func (c *Cluster) MigrateMasterPod(podName spec.NamespacedName) error {
|
|||
// we must have a statefulset in the cluster for the migration to work
|
||||
if c.Statefulset == nil {
|
||||
var sset *appsv1.StatefulSet
|
||||
if sset, err = c.KubeClient.StatefulSets(c.Namespace).Get(c.statefulSetName(),
|
||||
if sset, err = c.KubeClient.StatefulSets(c.Namespace).Get(
|
||||
context.TODO(),
|
||||
c.statefulSetName(),
|
||||
metav1.GetOptions{}); err != nil {
|
||||
return fmt.Errorf("could not retrieve cluster statefulset: %v", err)
|
||||
}
|
||||
|
|
@ -247,7 +250,7 @@ func (c *Cluster) MigrateMasterPod(podName spec.NamespacedName) error {
|
|||
|
||||
// MigrateReplicaPod recreates pod on a new node
|
||||
func (c *Cluster) MigrateReplicaPod(podName spec.NamespacedName, fromNodeName string) error {
|
||||
replicaPod, err := c.KubeClient.Pods(podName.Namespace).Get(podName.Name, metav1.GetOptions{})
|
||||
replicaPod, err := c.KubeClient.Pods(podName.Namespace).Get(context.TODO(), podName.Name, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not get pod: %v", err)
|
||||
}
|
||||
|
|
@ -276,7 +279,7 @@ func (c *Cluster) recreatePod(podName spec.NamespacedName) (*v1.Pod, error) {
|
|||
defer c.unregisterPodSubscriber(podName)
|
||||
stopChan := make(chan struct{})
|
||||
|
||||
if err := c.KubeClient.Pods(podName.Namespace).Delete(podName.Name, c.deleteOptions); err != nil {
|
||||
if err := c.KubeClient.Pods(podName.Namespace).Delete(context.TODO(), podName.Name, c.deleteOptions); err != nil {
|
||||
return nil, fmt.Errorf("could not delete pod: %v", err)
|
||||
}
|
||||
|
||||
|
|
@ -300,7 +303,7 @@ func (c *Cluster) recreatePods() error {
|
|||
LabelSelector: ls.String(),
|
||||
}
|
||||
|
||||
pods, err := c.KubeClient.Pods(namespace).List(listOptions)
|
||||
pods, err := c.KubeClient.Pods(namespace).List(context.TODO(), listOptions)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not get the list of pods: %v", err)
|
||||
}
|
||||
|
|
@ -349,7 +352,7 @@ func (c *Cluster) recreatePods() error {
|
|||
}
|
||||
|
||||
func (c *Cluster) podIsEndOfLife(pod *v1.Pod) (bool, error) {
|
||||
node, err := c.KubeClient.Nodes().Get(pod.Spec.NodeName, metav1.GetOptions{})
|
||||
node, err := c.KubeClient.Nodes().Get(context.TODO(), pod.Spec.NodeName, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,6 +1,7 @@
|
|||
package cluster
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
|
@ -80,7 +81,10 @@ func (c *Cluster) createStatefulSet() (*appsv1.StatefulSet, error) {
|
|||
if err != nil {
|
||||
return nil, fmt.Errorf("could not generate statefulset: %v", err)
|
||||
}
|
||||
statefulSet, err := c.KubeClient.StatefulSets(statefulSetSpec.Namespace).Create(statefulSetSpec)
|
||||
statefulSet, err := c.KubeClient.StatefulSets(statefulSetSpec.Namespace).Create(
|
||||
context.TODO(),
|
||||
statefulSetSpec,
|
||||
metav1.CreateOptions{})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
@ -129,7 +133,7 @@ func (c *Cluster) createConnectionPool(lookup InstallFunction) (*ConnectionPoolO
|
|||
// should be good enough to not think about it here.
|
||||
deployment, err := c.KubeClient.
|
||||
Deployments(deploymentSpec.Namespace).
|
||||
Create(deploymentSpec)
|
||||
Create(context.TODO(), deploymentSpec, metav1.CreateOptions{})
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
|
@ -138,7 +142,7 @@ func (c *Cluster) createConnectionPool(lookup InstallFunction) (*ConnectionPoolO
|
|||
serviceSpec := c.generateConnPoolService(&c.Spec)
|
||||
service, err := c.KubeClient.
|
||||
Services(serviceSpec.Namespace).
|
||||
Create(serviceSpec)
|
||||
Create(context.TODO(), serviceSpec, metav1.CreateOptions{})
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
|
@ -180,7 +184,7 @@ func (c *Cluster) deleteConnectionPool() (err error) {
|
|||
options := metav1.DeleteOptions{PropagationPolicy: &policy}
|
||||
err = c.KubeClient.
|
||||
Deployments(c.Namespace).
|
||||
Delete(deploymentName, &options)
|
||||
Delete(context.TODO(), deploymentName, options)
|
||||
|
||||
if !k8sutil.ResourceNotFound(err) {
|
||||
c.logger.Debugf("Connection pool deployment was already deleted")
|
||||
|
|
@ -202,7 +206,7 @@ func (c *Cluster) deleteConnectionPool() (err error) {
|
|||
// will be deleted.
|
||||
err = c.KubeClient.
|
||||
Services(c.Namespace).
|
||||
Delete(serviceName, &options)
|
||||
Delete(context.TODO(), serviceName, options)
|
||||
|
||||
if !k8sutil.ResourceNotFound(err) {
|
||||
c.logger.Debugf("Connection pool service was already deleted")
|
||||
|
|
@ -251,7 +255,7 @@ func (c *Cluster) preScaleDown(newStatefulSet *appsv1.StatefulSet) error {
|
|||
}
|
||||
|
||||
podName := fmt.Sprintf("%s-0", c.Statefulset.Name)
|
||||
masterCandidatePod, err := c.KubeClient.Pods(c.clusterNamespace()).Get(podName, metav1.GetOptions{})
|
||||
masterCandidatePod, err := c.KubeClient.Pods(c.clusterNamespace()).Get(context.TODO(), podName, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not get master candidate pod: %v", err)
|
||||
}
|
||||
|
|
@ -350,9 +354,12 @@ func (c *Cluster) updateStatefulSetAnnotations(annotations map[string]string) (*
|
|||
return nil, fmt.Errorf("could not form patch for the statefulset metadata: %v", err)
|
||||
}
|
||||
result, err := c.KubeClient.StatefulSets(c.Statefulset.Namespace).Patch(
|
||||
context.TODO(),
|
||||
c.Statefulset.Name,
|
||||
types.MergePatchType,
|
||||
[]byte(patchData), "")
|
||||
[]byte(patchData),
|
||||
metav1.PatchOptions{},
|
||||
"")
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not patch statefulset annotations %q: %v", patchData, err)
|
||||
}
|
||||
|
|
@ -380,9 +387,12 @@ func (c *Cluster) updateStatefulSet(newStatefulSet *appsv1.StatefulSet) error {
|
|||
}
|
||||
|
||||
statefulSet, err := c.KubeClient.StatefulSets(c.Statefulset.Namespace).Patch(
|
||||
context.TODO(),
|
||||
c.Statefulset.Name,
|
||||
types.MergePatchType,
|
||||
patchData, "")
|
||||
patchData,
|
||||
metav1.PatchOptions{},
|
||||
"")
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not patch statefulset spec %q: %v", statefulSetName, err)
|
||||
}
|
||||
|
|
@ -414,7 +424,7 @@ func (c *Cluster) replaceStatefulSet(newStatefulSet *appsv1.StatefulSet) error {
|
|||
oldStatefulset := c.Statefulset
|
||||
|
||||
options := metav1.DeleteOptions{PropagationPolicy: &deletePropagationPolicy}
|
||||
err := c.KubeClient.StatefulSets(oldStatefulset.Namespace).Delete(oldStatefulset.Name, &options)
|
||||
err := c.KubeClient.StatefulSets(oldStatefulset.Namespace).Delete(context.TODO(), oldStatefulset.Name, options)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not delete statefulset %q: %v", statefulSetName, err)
|
||||
}
|
||||
|
|
@ -425,7 +435,7 @@ func (c *Cluster) replaceStatefulSet(newStatefulSet *appsv1.StatefulSet) error {
|
|||
|
||||
err = retryutil.Retry(c.OpConfig.ResourceCheckInterval, c.OpConfig.ResourceCheckTimeout,
|
||||
func() (bool, error) {
|
||||
_, err2 := c.KubeClient.StatefulSets(oldStatefulset.Namespace).Get(oldStatefulset.Name, metav1.GetOptions{})
|
||||
_, err2 := c.KubeClient.StatefulSets(oldStatefulset.Namespace).Get(context.TODO(), oldStatefulset.Name, metav1.GetOptions{})
|
||||
if err2 == nil {
|
||||
return false, nil
|
||||
}
|
||||
|
|
@ -439,7 +449,7 @@ func (c *Cluster) replaceStatefulSet(newStatefulSet *appsv1.StatefulSet) error {
|
|||
}
|
||||
|
||||
// create the new statefulset with the desired spec. It would take over the remaining pods.
|
||||
createdStatefulset, err := c.KubeClient.StatefulSets(newStatefulSet.Namespace).Create(newStatefulSet)
|
||||
createdStatefulset, err := c.KubeClient.StatefulSets(newStatefulSet.Namespace).Create(context.TODO(), newStatefulSet, metav1.CreateOptions{})
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not create statefulset %q: %v", statefulSetName, err)
|
||||
}
|
||||
|
|
@ -460,7 +470,7 @@ func (c *Cluster) deleteStatefulSet() error {
|
|||
return fmt.Errorf("there is no statefulset in the cluster")
|
||||
}
|
||||
|
||||
err := c.KubeClient.StatefulSets(c.Statefulset.Namespace).Delete(c.Statefulset.Name, c.deleteOptions)
|
||||
err := c.KubeClient.StatefulSets(c.Statefulset.Namespace).Delete(context.TODO(), c.Statefulset.Name, c.deleteOptions)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
@ -482,7 +492,7 @@ func (c *Cluster) createService(role PostgresRole) (*v1.Service, error) {
|
|||
c.setProcessName("creating %v service", role)
|
||||
|
||||
serviceSpec := c.generateService(role, &c.Spec)
|
||||
service, err := c.KubeClient.Services(serviceSpec.Namespace).Create(serviceSpec)
|
||||
service, err := c.KubeClient.Services(serviceSpec.Namespace).Create(context.TODO(), serviceSpec, metav1.CreateOptions{})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
@ -509,9 +519,12 @@ func (c *Cluster) updateService(role PostgresRole, newService *v1.Service) error
|
|||
if len(newService.ObjectMeta.Annotations) > 0 {
|
||||
if annotationsPatchData, err := metaAnnotationsPatch(newService.ObjectMeta.Annotations); err == nil {
|
||||
_, err = c.KubeClient.Services(serviceName.Namespace).Patch(
|
||||
context.TODO(),
|
||||
serviceName.Name,
|
||||
types.MergePatchType,
|
||||
[]byte(annotationsPatchData), "")
|
||||
[]byte(annotationsPatchData),
|
||||
metav1.PatchOptions{},
|
||||
"")
|
||||
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not replace annotations for the service %q: %v", serviceName, err)
|
||||
|
|
@ -528,7 +541,7 @@ func (c *Cluster) updateService(role PostgresRole, newService *v1.Service) error
|
|||
if newServiceType == "ClusterIP" && newServiceType != oldServiceType {
|
||||
newService.ResourceVersion = c.Services[role].ResourceVersion
|
||||
newService.Spec.ClusterIP = c.Services[role].Spec.ClusterIP
|
||||
svc, err = c.KubeClient.Services(serviceName.Namespace).Update(newService)
|
||||
svc, err = c.KubeClient.Services(serviceName.Namespace).Update(context.TODO(), newService, metav1.UpdateOptions{})
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not update service %q: %v", serviceName, err)
|
||||
}
|
||||
|
|
@ -539,9 +552,7 @@ func (c *Cluster) updateService(role PostgresRole, newService *v1.Service) error
|
|||
}
|
||||
|
||||
svc, err = c.KubeClient.Services(serviceName.Namespace).Patch(
|
||||
serviceName.Name,
|
||||
types.MergePatchType,
|
||||
patchData, "")
|
||||
context.TODO(), serviceName.Name, types.MergePatchType, patchData, metav1.PatchOptions{}, "")
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not patch service %q: %v", serviceName, err)
|
||||
}
|
||||
|
|
@ -560,7 +571,7 @@ func (c *Cluster) deleteService(role PostgresRole) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
if err := c.KubeClient.Services(service.Namespace).Delete(service.Name, c.deleteOptions); err != nil {
|
||||
if err := c.KubeClient.Services(service.Namespace).Delete(context.TODO(), service.Name, c.deleteOptions); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
|
|
@ -584,7 +595,7 @@ func (c *Cluster) createEndpoint(role PostgresRole) (*v1.Endpoints, error) {
|
|||
}
|
||||
endpointsSpec := c.generateEndpoint(role, subsets)
|
||||
|
||||
endpoints, err := c.KubeClient.Endpoints(endpointsSpec.Namespace).Create(endpointsSpec)
|
||||
endpoints, err := c.KubeClient.Endpoints(endpointsSpec.Namespace).Create(context.TODO(), endpointsSpec, metav1.CreateOptions{})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not create %s endpoint: %v", role, err)
|
||||
}
|
||||
|
|
@ -626,7 +637,7 @@ func (c *Cluster) createPodDisruptionBudget() (*policybeta1.PodDisruptionBudget,
|
|||
podDisruptionBudgetSpec := c.generatePodDisruptionBudget()
|
||||
podDisruptionBudget, err := c.KubeClient.
|
||||
PodDisruptionBudgets(podDisruptionBudgetSpec.Namespace).
|
||||
Create(podDisruptionBudgetSpec)
|
||||
Create(context.TODO(), podDisruptionBudgetSpec, metav1.CreateOptions{})
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
|
@ -647,7 +658,7 @@ func (c *Cluster) updatePodDisruptionBudget(pdb *policybeta1.PodDisruptionBudget
|
|||
|
||||
newPdb, err := c.KubeClient.
|
||||
PodDisruptionBudgets(pdb.Namespace).
|
||||
Create(pdb)
|
||||
Create(context.TODO(), pdb, metav1.CreateOptions{})
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not create pod disruption budget: %v", err)
|
||||
}
|
||||
|
|
@ -665,7 +676,7 @@ func (c *Cluster) deletePodDisruptionBudget() error {
|
|||
pdbName := util.NameFromMeta(c.PodDisruptionBudget.ObjectMeta)
|
||||
err := c.KubeClient.
|
||||
PodDisruptionBudgets(c.PodDisruptionBudget.Namespace).
|
||||
Delete(c.PodDisruptionBudget.Name, c.deleteOptions)
|
||||
Delete(context.TODO(), c.PodDisruptionBudget.Name, c.deleteOptions)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not delete pod disruption budget: %v", err)
|
||||
}
|
||||
|
|
@ -674,7 +685,7 @@ func (c *Cluster) deletePodDisruptionBudget() error {
|
|||
|
||||
err = retryutil.Retry(c.OpConfig.ResourceCheckInterval, c.OpConfig.ResourceCheckTimeout,
|
||||
func() (bool, error) {
|
||||
_, err2 := c.KubeClient.PodDisruptionBudgets(pdbName.Namespace).Get(pdbName.Name, metav1.GetOptions{})
|
||||
_, err2 := c.KubeClient.PodDisruptionBudgets(pdbName.Namespace).Get(context.TODO(), pdbName.Name, metav1.GetOptions{})
|
||||
if err2 == nil {
|
||||
return false, nil
|
||||
}
|
||||
|
|
@ -697,7 +708,8 @@ func (c *Cluster) deleteEndpoint(role PostgresRole) error {
|
|||
return fmt.Errorf("there is no %s endpoint in the cluster", role)
|
||||
}
|
||||
|
||||
if err := c.KubeClient.Endpoints(c.Endpoints[role].Namespace).Delete(c.Endpoints[role].Name, c.deleteOptions); err != nil {
|
||||
if err := c.KubeClient.Endpoints(c.Endpoints[role].Namespace).Delete(
|
||||
context.TODO(), c.Endpoints[role].Name, c.deleteOptions); err != nil {
|
||||
return fmt.Errorf("could not delete endpoint: %v", err)
|
||||
}
|
||||
|
||||
|
|
@ -711,7 +723,7 @@ func (c *Cluster) deleteEndpoint(role PostgresRole) error {
|
|||
func (c *Cluster) deleteSecret(secret *v1.Secret) error {
|
||||
c.setProcessName("deleting secret %q", util.NameFromMeta(secret.ObjectMeta))
|
||||
c.logger.Debugf("deleting secret %q", util.NameFromMeta(secret.ObjectMeta))
|
||||
err := c.KubeClient.Secrets(secret.Namespace).Delete(secret.Name, c.deleteOptions)
|
||||
err := c.KubeClient.Secrets(secret.Namespace).Delete(context.TODO(), secret.Name, c.deleteOptions)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
@ -736,7 +748,7 @@ func (c *Cluster) createLogicalBackupJob() (err error) {
|
|||
}
|
||||
c.logger.Debugf("Generated cronJobSpec: %v", logicalBackupJobSpec)
|
||||
|
||||
_, err = c.KubeClient.CronJobsGetter.CronJobs(c.Namespace).Create(logicalBackupJobSpec)
|
||||
_, err = c.KubeClient.CronJobsGetter.CronJobs(c.Namespace).Create(context.TODO(), logicalBackupJobSpec, metav1.CreateOptions{})
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not create k8s cron job: %v", err)
|
||||
}
|
||||
|
|
@ -754,9 +766,12 @@ func (c *Cluster) patchLogicalBackupJob(newJob *batchv1beta1.CronJob) error {
|
|||
|
||||
// update the backup job spec
|
||||
_, err = c.KubeClient.CronJobsGetter.CronJobs(c.Namespace).Patch(
|
||||
context.TODO(),
|
||||
c.getLogicalBackupJobName(),
|
||||
types.MergePatchType,
|
||||
patchData, "")
|
||||
patchData,
|
||||
metav1.PatchOptions{},
|
||||
"")
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not patch logical backup job: %v", err)
|
||||
}
|
||||
|
|
@ -768,7 +783,7 @@ func (c *Cluster) deleteLogicalBackupJob() error {
|
|||
|
||||
c.logger.Info("removing the logical backup job")
|
||||
|
||||
return c.KubeClient.CronJobsGetter.CronJobs(c.Namespace).Delete(c.getLogicalBackupJobName(), c.deleteOptions)
|
||||
return c.KubeClient.CronJobsGetter.CronJobs(c.Namespace).Delete(context.TODO(), c.getLogicalBackupJobName(), c.deleteOptions)
|
||||
}
|
||||
|
||||
// GetServiceMaster returns cluster's kubernetes master Service
|
||||
|
|
@ -818,11 +833,13 @@ func (c *Cluster) updateConnPoolDeployment(oldDeploymentSpec, newDeployment *app
|
|||
// worker at one time will try to update it chances of conflicts are
|
||||
// minimal.
|
||||
deployment, err := c.KubeClient.
|
||||
Deployments(c.ConnectionPool.Deployment.Namespace).
|
||||
Patch(
|
||||
c.ConnectionPool.Deployment.Name,
|
||||
types.MergePatchType,
|
||||
patchData, "")
|
||||
Deployments(c.ConnectionPool.Deployment.Namespace).Patch(
|
||||
context.TODO(),
|
||||
c.ConnectionPool.Deployment.Name,
|
||||
types.MergePatchType,
|
||||
patchData,
|
||||
metav1.PatchOptions{},
|
||||
"")
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not patch deployment: %v", err)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,6 +1,7 @@
|
|||
package cluster
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
batchv1beta1 "k8s.io/api/batch/v1beta1"
|
||||
|
|
@ -140,7 +141,7 @@ func (c *Cluster) syncService(role PostgresRole) error {
|
|||
)
|
||||
c.setProcessName("syncing %s service", role)
|
||||
|
||||
if svc, err = c.KubeClient.Services(c.Namespace).Get(c.serviceName(role), metav1.GetOptions{}); err == nil {
|
||||
if svc, err = c.KubeClient.Services(c.Namespace).Get(context.TODO(), c.serviceName(role), metav1.GetOptions{}); err == nil {
|
||||
c.Services[role] = svc
|
||||
desiredSvc := c.generateService(role, &c.Spec)
|
||||
if match, reason := k8sutil.SameService(svc, desiredSvc); !match {
|
||||
|
|
@ -166,7 +167,7 @@ func (c *Cluster) syncService(role PostgresRole) error {
|
|||
return fmt.Errorf("could not create missing %s service: %v", role, err)
|
||||
}
|
||||
c.logger.Infof("%s service %q already exists", role, util.NameFromMeta(svc.ObjectMeta))
|
||||
if svc, err = c.KubeClient.Services(c.Namespace).Get(c.serviceName(role), metav1.GetOptions{}); err != nil {
|
||||
if svc, err = c.KubeClient.Services(c.Namespace).Get(context.TODO(), c.serviceName(role), metav1.GetOptions{}); err != nil {
|
||||
return fmt.Errorf("could not fetch existing %s service: %v", role, err)
|
||||
}
|
||||
}
|
||||
|
|
@ -181,7 +182,7 @@ func (c *Cluster) syncEndpoint(role PostgresRole) error {
|
|||
)
|
||||
c.setProcessName("syncing %s endpoint", role)
|
||||
|
||||
if ep, err = c.KubeClient.Endpoints(c.Namespace).Get(c.endpointName(role), metav1.GetOptions{}); err == nil {
|
||||
if ep, err = c.KubeClient.Endpoints(c.Namespace).Get(context.TODO(), c.endpointName(role), metav1.GetOptions{}); err == nil {
|
||||
// TODO: No syncing of endpoints here, is this covered completely by updateService?
|
||||
c.Endpoints[role] = ep
|
||||
return nil
|
||||
|
|
@ -200,7 +201,7 @@ func (c *Cluster) syncEndpoint(role PostgresRole) error {
|
|||
return fmt.Errorf("could not create missing %s endpoint: %v", role, err)
|
||||
}
|
||||
c.logger.Infof("%s endpoint %q already exists", role, util.NameFromMeta(ep.ObjectMeta))
|
||||
if ep, err = c.KubeClient.Endpoints(c.Namespace).Get(c.endpointName(role), metav1.GetOptions{}); err != nil {
|
||||
if ep, err = c.KubeClient.Endpoints(c.Namespace).Get(context.TODO(), c.endpointName(role), metav1.GetOptions{}); err != nil {
|
||||
return fmt.Errorf("could not fetch existing %s endpoint: %v", role, err)
|
||||
}
|
||||
}
|
||||
|
|
@ -213,7 +214,7 @@ func (c *Cluster) syncPodDisruptionBudget(isUpdate bool) error {
|
|||
pdb *policybeta1.PodDisruptionBudget
|
||||
err error
|
||||
)
|
||||
if pdb, err = c.KubeClient.PodDisruptionBudgets(c.Namespace).Get(c.podDisruptionBudgetName(), metav1.GetOptions{}); err == nil {
|
||||
if pdb, err = c.KubeClient.PodDisruptionBudgets(c.Namespace).Get(context.TODO(), c.podDisruptionBudgetName(), metav1.GetOptions{}); err == nil {
|
||||
c.PodDisruptionBudget = pdb
|
||||
newPDB := c.generatePodDisruptionBudget()
|
||||
if match, reason := k8sutil.SamePDB(pdb, newPDB); !match {
|
||||
|
|
@ -239,7 +240,7 @@ func (c *Cluster) syncPodDisruptionBudget(isUpdate bool) error {
|
|||
return fmt.Errorf("could not create pod disruption budget: %v", err)
|
||||
}
|
||||
c.logger.Infof("pod disruption budget %q already exists", util.NameFromMeta(pdb.ObjectMeta))
|
||||
if pdb, err = c.KubeClient.PodDisruptionBudgets(c.Namespace).Get(c.podDisruptionBudgetName(), metav1.GetOptions{}); err != nil {
|
||||
if pdb, err = c.KubeClient.PodDisruptionBudgets(c.Namespace).Get(context.TODO(), c.podDisruptionBudgetName(), metav1.GetOptions{}); err != nil {
|
||||
return fmt.Errorf("could not fetch existing %q pod disruption budget", util.NameFromMeta(pdb.ObjectMeta))
|
||||
}
|
||||
}
|
||||
|
|
@ -255,7 +256,7 @@ func (c *Cluster) syncStatefulSet() error {
|
|||
podsRollingUpdateRequired bool
|
||||
)
|
||||
// NB: Be careful to consider the codepath that acts on podsRollingUpdateRequired before returning early.
|
||||
sset, err := c.KubeClient.StatefulSets(c.Namespace).Get(c.statefulSetName(), metav1.GetOptions{})
|
||||
sset, err := c.KubeClient.StatefulSets(c.Namespace).Get(context.TODO(), c.statefulSetName(), metav1.GetOptions{})
|
||||
if err != nil {
|
||||
if !k8sutil.ResourceNotFound(err) {
|
||||
return fmt.Errorf("could not get statefulset: %v", err)
|
||||
|
|
@ -404,14 +405,14 @@ func (c *Cluster) syncSecrets() error {
|
|||
secrets := c.generateUserSecrets()
|
||||
|
||||
for secretUsername, secretSpec := range secrets {
|
||||
if secret, err = c.KubeClient.Secrets(secretSpec.Namespace).Create(secretSpec); err == nil {
|
||||
if secret, err = c.KubeClient.Secrets(secretSpec.Namespace).Create(context.TODO(), secretSpec, metav1.CreateOptions{}); err == nil {
|
||||
c.Secrets[secret.UID] = secret
|
||||
c.logger.Debugf("created new secret %q, uid: %q", util.NameFromMeta(secret.ObjectMeta), secret.UID)
|
||||
continue
|
||||
}
|
||||
if k8sutil.ResourceAlreadyExists(err) {
|
||||
var userMap map[string]spec.PgUser
|
||||
if secret, err = c.KubeClient.Secrets(secretSpec.Namespace).Get(secretSpec.Name, metav1.GetOptions{}); err != nil {
|
||||
if secret, err = c.KubeClient.Secrets(secretSpec.Namespace).Get(context.TODO(), secretSpec.Name, metav1.GetOptions{}); err != nil {
|
||||
return fmt.Errorf("could not get current secret: %v", err)
|
||||
}
|
||||
if secretUsername != string(secret.Data["username"]) {
|
||||
|
|
@ -434,7 +435,7 @@ func (c *Cluster) syncSecrets() error {
|
|||
pwdUser.Origin == spec.RoleOriginInfrastructure {
|
||||
|
||||
c.logger.Debugf("updating the secret %q from the infrastructure roles", secretSpec.Name)
|
||||
if _, err = c.KubeClient.Secrets(secretSpec.Namespace).Update(secretSpec); err != nil {
|
||||
if _, err = c.KubeClient.Secrets(secretSpec.Namespace).Update(context.TODO(), secretSpec, metav1.UpdateOptions{}); err != nil {
|
||||
return fmt.Errorf("could not update infrastructure role secret for role %q: %v", secretUsername, err)
|
||||
}
|
||||
} else {
|
||||
|
|
@ -577,7 +578,7 @@ func (c *Cluster) syncLogicalBackupJob() error {
|
|||
// sync the job if it exists
|
||||
|
||||
jobName := c.getLogicalBackupJobName()
|
||||
if job, err = c.KubeClient.CronJobsGetter.CronJobs(c.Namespace).Get(jobName, metav1.GetOptions{}); err == nil {
|
||||
if job, err = c.KubeClient.CronJobsGetter.CronJobs(c.Namespace).Get(context.TODO(), jobName, metav1.GetOptions{}); err == nil {
|
||||
|
||||
desiredJob, err = c.generateLogicalBackupJob()
|
||||
if err != nil {
|
||||
|
|
@ -611,7 +612,7 @@ func (c *Cluster) syncLogicalBackupJob() error {
|
|||
return fmt.Errorf("could not create missing logical backup job: %v", err)
|
||||
}
|
||||
c.logger.Infof("logical backup job %q already exists", jobName)
|
||||
if _, err = c.KubeClient.CronJobsGetter.CronJobs(c.Namespace).Get(jobName, metav1.GetOptions{}); err != nil {
|
||||
if _, err = c.KubeClient.CronJobsGetter.CronJobs(c.Namespace).Get(context.TODO(), jobName, metav1.GetOptions{}); err != nil {
|
||||
return fmt.Errorf("could not fetch existing logical backup job: %v", err)
|
||||
}
|
||||
}
|
||||
|
|
@ -696,7 +697,7 @@ func (c *Cluster) syncConnectionPool(oldSpec, newSpec *acidv1.Postgresql, lookup
|
|||
func (c *Cluster) syncConnectionPoolWorker(oldSpec, newSpec *acidv1.Postgresql) error {
|
||||
deployment, err := c.KubeClient.
|
||||
Deployments(c.Namespace).
|
||||
Get(c.connPoolName(), metav1.GetOptions{})
|
||||
Get(context.TODO(), c.connPoolName(), metav1.GetOptions{})
|
||||
|
||||
if err != nil && k8sutil.ResourceNotFound(err) {
|
||||
msg := "Deployment %s for connection pool synchronization is not found, create it"
|
||||
|
|
@ -710,7 +711,7 @@ func (c *Cluster) syncConnectionPoolWorker(oldSpec, newSpec *acidv1.Postgresql)
|
|||
|
||||
deployment, err := c.KubeClient.
|
||||
Deployments(deploymentSpec.Namespace).
|
||||
Create(deploymentSpec)
|
||||
Create(context.TODO(), deploymentSpec, metav1.CreateOptions{})
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
|
|
@ -755,7 +756,7 @@ func (c *Cluster) syncConnectionPoolWorker(oldSpec, newSpec *acidv1.Postgresql)
|
|||
|
||||
service, err := c.KubeClient.
|
||||
Services(c.Namespace).
|
||||
Get(c.connPoolName(), metav1.GetOptions{})
|
||||
Get(context.TODO(), c.connPoolName(), metav1.GetOptions{})
|
||||
|
||||
if err != nil && k8sutil.ResourceNotFound(err) {
|
||||
msg := "Service %s for connection pool synchronization is not found, create it"
|
||||
|
|
@ -764,7 +765,7 @@ func (c *Cluster) syncConnectionPoolWorker(oldSpec, newSpec *acidv1.Postgresql)
|
|||
serviceSpec := c.generateConnPoolService(&newSpec.Spec)
|
||||
service, err := c.KubeClient.
|
||||
Services(serviceSpec.Namespace).
|
||||
Create(serviceSpec)
|
||||
Create(context.TODO(), serviceSpec, metav1.CreateOptions{})
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
|
|
|
|||
|
|
@ -2,6 +2,7 @@ package cluster
|
|||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/gob"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
|
|
@ -47,7 +48,7 @@ func (g *SecretOauthTokenGetter) getOAuthToken() (string, error) {
|
|||
// Temporary getting postgresql-operator secret from the NamespaceDefault
|
||||
credentialsSecret, err := g.kubeClient.
|
||||
Secrets(g.OAuthTokenSecretName.Namespace).
|
||||
Get(g.OAuthTokenSecretName.Name, metav1.GetOptions{})
|
||||
Get(context.TODO(), g.OAuthTokenSecretName.Name, metav1.GetOptions{})
|
||||
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("could not get credentials secret: %v", err)
|
||||
|
|
@ -278,7 +279,7 @@ func (c *Cluster) waitStatefulsetReady() error {
|
|||
listOptions := metav1.ListOptions{
|
||||
LabelSelector: c.labelsSet(false).String(),
|
||||
}
|
||||
ss, err := c.KubeClient.StatefulSets(c.Namespace).List(listOptions)
|
||||
ss, err := c.KubeClient.StatefulSets(c.Namespace).List(context.TODO(), listOptions)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
|
@ -313,7 +314,7 @@ func (c *Cluster) _waitPodLabelsReady(anyReplica bool) error {
|
|||
}
|
||||
podsNumber = 1
|
||||
if !anyReplica {
|
||||
pods, err := c.KubeClient.Pods(namespace).List(listOptions)
|
||||
pods, err := c.KubeClient.Pods(namespace).List(context.TODO(), listOptions)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
@ -327,7 +328,7 @@ func (c *Cluster) _waitPodLabelsReady(anyReplica bool) error {
|
|||
func() (bool, error) {
|
||||
masterCount := 0
|
||||
if !anyReplica {
|
||||
masterPods, err2 := c.KubeClient.Pods(namespace).List(masterListOption)
|
||||
masterPods, err2 := c.KubeClient.Pods(namespace).List(context.TODO(), masterListOption)
|
||||
if err2 != nil {
|
||||
return false, err2
|
||||
}
|
||||
|
|
@ -337,7 +338,7 @@ func (c *Cluster) _waitPodLabelsReady(anyReplica bool) error {
|
|||
}
|
||||
masterCount = len(masterPods.Items)
|
||||
}
|
||||
replicaPods, err2 := c.KubeClient.Pods(namespace).List(replicaListOption)
|
||||
replicaPods, err2 := c.KubeClient.Pods(namespace).List(context.TODO(), replicaListOption)
|
||||
if err2 != nil {
|
||||
return false, err2
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,11 +1,12 @@
|
|||
package cluster
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/api/resource"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
|
||||
|
|
@ -23,7 +24,7 @@ func (c *Cluster) listPersistentVolumeClaims() ([]v1.PersistentVolumeClaim, erro
|
|||
LabelSelector: c.labelsSet(false).String(),
|
||||
}
|
||||
|
||||
pvcs, err := c.KubeClient.PersistentVolumeClaims(ns).List(listOptions)
|
||||
pvcs, err := c.KubeClient.PersistentVolumeClaims(ns).List(context.TODO(), listOptions)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not list of PersistentVolumeClaims: %v", err)
|
||||
}
|
||||
|
|
@ -38,7 +39,7 @@ func (c *Cluster) deletePersistentVolumeClaims() error {
|
|||
}
|
||||
for _, pvc := range pvcs {
|
||||
c.logger.Debugf("deleting PVC %q", util.NameFromMeta(pvc.ObjectMeta))
|
||||
if err := c.KubeClient.PersistentVolumeClaims(pvc.Namespace).Delete(pvc.Name, c.deleteOptions); err != nil {
|
||||
if err := c.KubeClient.PersistentVolumeClaims(pvc.Namespace).Delete(context.TODO(), pvc.Name, c.deleteOptions); err != nil {
|
||||
c.logger.Warningf("could not delete PersistentVolumeClaim: %v", err)
|
||||
}
|
||||
}
|
||||
|
|
@ -78,7 +79,7 @@ func (c *Cluster) listPersistentVolumes() ([]*v1.PersistentVolume, error) {
|
|||
continue
|
||||
}
|
||||
}
|
||||
pv, err := c.KubeClient.PersistentVolumes().Get(pvc.Spec.VolumeName, metav1.GetOptions{})
|
||||
pv, err := c.KubeClient.PersistentVolumes().Get(context.TODO(), pvc.Spec.VolumeName, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not get PersistentVolume: %v", err)
|
||||
}
|
||||
|
|
@ -143,7 +144,7 @@ func (c *Cluster) resizeVolumes(newVolume acidv1.Volume, resizers []volumes.Volu
|
|||
c.logger.Debugf("filesystem resize successful on volume %q", pv.Name)
|
||||
pv.Spec.Capacity[v1.ResourceStorage] = newQuantity
|
||||
c.logger.Debugf("updating persistent volume definition for volume %q", pv.Name)
|
||||
if _, err := c.KubeClient.PersistentVolumes().Update(pv); err != nil {
|
||||
if _, err := c.KubeClient.PersistentVolumes().Update(context.TODO(), pv, metav1.UpdateOptions{}); err != nil {
|
||||
return fmt.Errorf("could not update persistent volume: %q", err)
|
||||
}
|
||||
c.logger.Debugf("successfully updated persistent volume %q", pv.Name)
|
||||
|
|
|
|||
|
|
@ -1,6 +1,7 @@
|
|||
package controller
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
"sync"
|
||||
|
|
@ -99,7 +100,7 @@ func (c *Controller) initOperatorConfig() {
|
|||
|
||||
if c.config.ConfigMapName != (spec.NamespacedName{}) {
|
||||
configMap, err := c.KubeClient.ConfigMaps(c.config.ConfigMapName.Namespace).
|
||||
Get(c.config.ConfigMapName.Name, metav1.GetOptions{})
|
||||
Get(context.TODO(), c.config.ConfigMapName.Name, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
|
@ -406,7 +407,7 @@ func (c *Controller) getEffectiveNamespace(namespaceFromEnvironment, namespaceFr
|
|||
|
||||
} else {
|
||||
|
||||
if _, err := c.KubeClient.Namespaces().Get(namespace, metav1.GetOptions{}); err != nil {
|
||||
if _, err := c.KubeClient.Namespaces().Get(context.TODO(), namespace, metav1.GetOptions{}); err != nil {
|
||||
c.logger.Fatalf("Could not find the watched namespace %q", namespace)
|
||||
} else {
|
||||
c.logger.Infof("Listenting to the specific namespace %q", namespace)
|
||||
|
|
|
|||
|
|
@ -1,6 +1,7 @@
|
|||
package controller
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
|
|
@ -22,7 +23,7 @@ func (c *Controller) nodeListFunc(options metav1.ListOptions) (runtime.Object, e
|
|||
TimeoutSeconds: options.TimeoutSeconds,
|
||||
}
|
||||
|
||||
return c.KubeClient.Nodes().List(opts)
|
||||
return c.KubeClient.Nodes().List(context.TODO(), opts)
|
||||
}
|
||||
|
||||
func (c *Controller) nodeWatchFunc(options metav1.ListOptions) (watch.Interface, error) {
|
||||
|
|
@ -32,7 +33,7 @@ func (c *Controller) nodeWatchFunc(options metav1.ListOptions) (watch.Interface,
|
|||
TimeoutSeconds: options.TimeoutSeconds,
|
||||
}
|
||||
|
||||
return c.KubeClient.Nodes().Watch(opts)
|
||||
return c.KubeClient.Nodes().Watch(context.TODO(), opts)
|
||||
}
|
||||
|
||||
func (c *Controller) nodeAdd(obj interface{}) {
|
||||
|
|
@ -87,7 +88,7 @@ func (c *Controller) attemptToMoveMasterPodsOffNode(node *v1.Node) error {
|
|||
opts := metav1.ListOptions{
|
||||
LabelSelector: labels.Set(c.opConfig.ClusterLabels).String(),
|
||||
}
|
||||
podList, err := c.KubeClient.Pods(c.opConfig.WatchedNamespace).List(opts)
|
||||
podList, err := c.KubeClient.Pods(c.opConfig.WatchedNamespace).List(context.TODO(), opts)
|
||||
if err != nil {
|
||||
c.logger.Errorf("could not fetch list of the pods: %v", err)
|
||||
return err
|
||||
|
|
|
|||
|
|
@ -1,6 +1,7 @@
|
|||
package controller
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"time"
|
||||
|
|
@ -14,7 +15,8 @@ import (
|
|||
|
||||
func (c *Controller) readOperatorConfigurationFromCRD(configObjectNamespace, configObjectName string) (*acidv1.OperatorConfiguration, error) {
|
||||
|
||||
config, err := c.KubeClient.AcidV1ClientSet.AcidV1().OperatorConfigurations(configObjectNamespace).Get(configObjectName, metav1.GetOptions{})
|
||||
config, err := c.KubeClient.AcidV1ClientSet.AcidV1().OperatorConfigurations(configObjectNamespace).Get(
|
||||
context.TODO(), configObjectName, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not get operator configuration object %q: %v", configObjectName, err)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,7 +1,9 @@
|
|||
package controller
|
||||
|
||||
import (
|
||||
"k8s.io/api/core/v1"
|
||||
"context"
|
||||
|
||||
v1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
|
|
@ -19,7 +21,7 @@ func (c *Controller) podListFunc(options metav1.ListOptions) (runtime.Object, er
|
|||
TimeoutSeconds: options.TimeoutSeconds,
|
||||
}
|
||||
|
||||
return c.KubeClient.Pods(c.opConfig.WatchedNamespace).List(opts)
|
||||
return c.KubeClient.Pods(c.opConfig.WatchedNamespace).List(context.TODO(), opts)
|
||||
}
|
||||
|
||||
func (c *Controller) podWatchFunc(options metav1.ListOptions) (watch.Interface, error) {
|
||||
|
|
@ -29,7 +31,7 @@ func (c *Controller) podWatchFunc(options metav1.ListOptions) (watch.Interface,
|
|||
TimeoutSeconds: options.TimeoutSeconds,
|
||||
}
|
||||
|
||||
return c.KubeClient.Pods(c.opConfig.WatchedNamespace).Watch(opts)
|
||||
return c.KubeClient.Pods(c.opConfig.WatchedNamespace).Watch(context.TODO(), opts)
|
||||
}
|
||||
|
||||
func (c *Controller) dispatchPodEvent(clusterName spec.NamespacedName, event cluster.PodEvent) {
|
||||
|
|
|
|||
|
|
@ -1,6 +1,7 @@
|
|||
package controller
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"reflect"
|
||||
"strings"
|
||||
|
|
@ -43,7 +44,7 @@ func (c *Controller) listClusters(options metav1.ListOptions) (*acidv1.Postgresq
|
|||
var pgList acidv1.PostgresqlList
|
||||
|
||||
// TODO: use the SharedInformer cache instead of quering Kubernetes API directly.
|
||||
list, err := c.KubeClient.AcidV1ClientSet.AcidV1().Postgresqls(c.opConfig.WatchedNamespace).List(options)
|
||||
list, err := c.KubeClient.AcidV1ClientSet.AcidV1().Postgresqls(c.opConfig.WatchedNamespace).List(context.TODO(), options)
|
||||
if err != nil {
|
||||
c.logger.Errorf("could not list postgresql objects: %v", err)
|
||||
}
|
||||
|
|
@ -535,7 +536,7 @@ func (c *Controller) submitRBACCredentials(event ClusterEvent) error {
|
|||
func (c *Controller) createPodServiceAccount(namespace string) error {
|
||||
|
||||
podServiceAccountName := c.opConfig.PodServiceAccountName
|
||||
_, err := c.KubeClient.ServiceAccounts(namespace).Get(podServiceAccountName, metav1.GetOptions{})
|
||||
_, err := c.KubeClient.ServiceAccounts(namespace).Get(context.TODO(), podServiceAccountName, metav1.GetOptions{})
|
||||
if k8sutil.ResourceNotFound(err) {
|
||||
|
||||
c.logger.Infof(fmt.Sprintf("creating pod service account %q in the %q namespace", podServiceAccountName, namespace))
|
||||
|
|
@ -543,7 +544,7 @@ func (c *Controller) createPodServiceAccount(namespace string) error {
|
|||
// get a separate copy of service account
|
||||
// to prevent a race condition when setting a namespace for many clusters
|
||||
sa := *c.PodServiceAccount
|
||||
if _, err = c.KubeClient.ServiceAccounts(namespace).Create(&sa); err != nil {
|
||||
if _, err = c.KubeClient.ServiceAccounts(namespace).Create(context.TODO(), &sa, metav1.CreateOptions{}); err != nil {
|
||||
return fmt.Errorf("cannot deploy the pod service account %q defined in the configuration to the %q namespace: %v", podServiceAccountName, namespace, err)
|
||||
}
|
||||
|
||||
|
|
@ -560,7 +561,7 @@ func (c *Controller) createRoleBindings(namespace string) error {
|
|||
podServiceAccountName := c.opConfig.PodServiceAccountName
|
||||
podServiceAccountRoleBindingName := c.PodServiceAccountRoleBinding.Name
|
||||
|
||||
_, err := c.KubeClient.RoleBindings(namespace).Get(podServiceAccountRoleBindingName, metav1.GetOptions{})
|
||||
_, err := c.KubeClient.RoleBindings(namespace).Get(context.TODO(), podServiceAccountRoleBindingName, metav1.GetOptions{})
|
||||
if k8sutil.ResourceNotFound(err) {
|
||||
|
||||
c.logger.Infof("Creating the role binding %q in the %q namespace", podServiceAccountRoleBindingName, namespace)
|
||||
|
|
@ -568,7 +569,7 @@ func (c *Controller) createRoleBindings(namespace string) error {
|
|||
// get a separate copy of role binding
|
||||
// to prevent a race condition when setting a namespace for many clusters
|
||||
rb := *c.PodServiceAccountRoleBinding
|
||||
_, err = c.KubeClient.RoleBindings(namespace).Create(&rb)
|
||||
_, err = c.KubeClient.RoleBindings(namespace).Create(context.TODO(), &rb, metav1.CreateOptions{})
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot bind the pod service account %q defined in the configuration to the cluster role in the %q namespace: %v", podServiceAccountName, namespace, err)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,6 +1,7 @@
|
|||
package controller
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
|
||||
|
|
@ -50,7 +51,7 @@ func (c *Controller) clusterWorkerID(clusterName spec.NamespacedName) uint32 {
|
|||
}
|
||||
|
||||
func (c *Controller) createOperatorCRD(crd *apiextv1beta1.CustomResourceDefinition) error {
|
||||
if _, err := c.KubeClient.CustomResourceDefinitions().Create(crd); err != nil {
|
||||
if _, err := c.KubeClient.CustomResourceDefinitions().Create(context.TODO(), crd, metav1.CreateOptions{}); err != nil {
|
||||
if k8sutil.ResourceAlreadyExists(err) {
|
||||
c.logger.Infof("customResourceDefinition %q is already registered and will only be updated", crd.Name)
|
||||
|
||||
|
|
@ -58,7 +59,8 @@ func (c *Controller) createOperatorCRD(crd *apiextv1beta1.CustomResourceDefiniti
|
|||
if err != nil {
|
||||
return fmt.Errorf("could not marshal new customResourceDefintion: %v", err)
|
||||
}
|
||||
if _, err := c.KubeClient.CustomResourceDefinitions().Patch(crd.Name, types.MergePatchType, patch); err != nil {
|
||||
if _, err := c.KubeClient.CustomResourceDefinitions().Patch(
|
||||
context.TODO(), crd.Name, types.MergePatchType, patch, metav1.PatchOptions{}); err != nil {
|
||||
return fmt.Errorf("could not update customResourceDefinition: %v", err)
|
||||
}
|
||||
} else {
|
||||
|
|
@ -69,7 +71,7 @@ func (c *Controller) createOperatorCRD(crd *apiextv1beta1.CustomResourceDefiniti
|
|||
}
|
||||
|
||||
return wait.Poll(c.config.CRDReadyWaitInterval, c.config.CRDReadyWaitTimeout, func() (bool, error) {
|
||||
c, err := c.KubeClient.CustomResourceDefinitions().Get(crd.Name, metav1.GetOptions{})
|
||||
c, err := c.KubeClient.CustomResourceDefinitions().Get(context.TODO(), crd.Name, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
|
@ -115,7 +117,7 @@ func (c *Controller) getInfrastructureRoles(rolesSecret *spec.NamespacedName) (m
|
|||
|
||||
infraRolesSecret, err := c.KubeClient.
|
||||
Secrets(rolesSecret.Namespace).
|
||||
Get(rolesSecret.Name, metav1.GetOptions{})
|
||||
Get(context.TODO(), rolesSecret.Name, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
c.logger.Debugf("infrastructure roles secret name: %q", *rolesSecret)
|
||||
return nil, fmt.Errorf("could not get infrastructure roles secret: %v", err)
|
||||
|
|
@ -161,7 +163,8 @@ Users:
|
|||
}
|
||||
|
||||
// perhaps we have some map entries with usernames, passwords, let's check if we have those users in the configmap
|
||||
if infraRolesMap, err := c.KubeClient.ConfigMaps(rolesSecret.Namespace).Get(rolesSecret.Name, metav1.GetOptions{}); err == nil {
|
||||
if infraRolesMap, err := c.KubeClient.ConfigMaps(rolesSecret.Namespace).Get(
|
||||
context.TODO(), rolesSecret.Name, metav1.GetOptions{}); err == nil {
|
||||
// we have a configmap with username - json description, let's read and decode it
|
||||
for role, s := range infraRolesMap.Data {
|
||||
roleDescr, err := readDecodedRole(s)
|
||||
|
|
|
|||
|
|
@ -1,6 +1,7 @@
|
|||
package k8sutil
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"reflect"
|
||||
|
||||
|
|
@ -237,7 +238,7 @@ func SameLogicalBackupJob(cur, new *batchv1beta1.CronJob) (match bool, reason st
|
|||
return true, ""
|
||||
}
|
||||
|
||||
func (c *mockSecret) Get(name string, options metav1.GetOptions) (*v1.Secret, error) {
|
||||
func (c *mockSecret) Get(ctx context.Context, name string, options metav1.GetOptions) (*v1.Secret, error) {
|
||||
if name != "infrastructureroles-test" {
|
||||
return nil, fmt.Errorf("NotFound")
|
||||
}
|
||||
|
|
@ -253,7 +254,7 @@ func (c *mockSecret) Get(name string, options metav1.GetOptions) (*v1.Secret, er
|
|||
|
||||
}
|
||||
|
||||
func (c *mockConfigMap) Get(name string, options metav1.GetOptions) (*v1.ConfigMap, error) {
|
||||
func (c *mockConfigMap) Get(ctx context.Context, name string, options metav1.GetOptions) (*v1.ConfigMap, error) {
|
||||
if name != "infrastructureroles-test" {
|
||||
return nil, fmt.Errorf("NotFound")
|
||||
}
|
||||
|
|
@ -283,7 +284,7 @@ func (mock *MockDeploymentNotExistGetter) Deployments(namespace string) appsv1.D
|
|||
return &mockDeploymentNotExist{}
|
||||
}
|
||||
|
||||
func (mock *mockDeployment) Create(*apiappsv1.Deployment) (*apiappsv1.Deployment, error) {
|
||||
func (mock *mockDeployment) Create(context.Context, *apiappsv1.Deployment, metav1.CreateOptions) (*apiappsv1.Deployment, error) {
|
||||
return &apiappsv1.Deployment{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "test-deployment",
|
||||
|
|
@ -294,11 +295,11 @@ func (mock *mockDeployment) Create(*apiappsv1.Deployment) (*apiappsv1.Deployment
|
|||
}, nil
|
||||
}
|
||||
|
||||
func (mock *mockDeployment) Delete(name string, opts *metav1.DeleteOptions) error {
|
||||
func (mock *mockDeployment) Delete(ctx context.Context, name string, opts metav1.DeleteOptions) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (mock *mockDeployment) Get(name string, opts metav1.GetOptions) (*apiappsv1.Deployment, error) {
|
||||
func (mock *mockDeployment) Get(ctx context.Context, name string, opts metav1.GetOptions) (*apiappsv1.Deployment, error) {
|
||||
return &apiappsv1.Deployment{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "test-deployment",
|
||||
|
|
@ -318,7 +319,7 @@ func (mock *mockDeployment) Get(name string, opts metav1.GetOptions) (*apiappsv1
|
|||
}, nil
|
||||
}
|
||||
|
||||
func (mock *mockDeployment) Patch(name string, t types.PatchType, data []byte, subres ...string) (*apiappsv1.Deployment, error) {
|
||||
func (mock *mockDeployment) Patch(ctx context.Context, name string, t types.PatchType, data []byte, opts metav1.PatchOptions, subres ...string) (*apiappsv1.Deployment, error) {
|
||||
return &apiappsv1.Deployment{
|
||||
Spec: apiappsv1.DeploymentSpec{
|
||||
Replicas: Int32ToPointer(2),
|
||||
|
|
@ -329,7 +330,7 @@ func (mock *mockDeployment) Patch(name string, t types.PatchType, data []byte, s
|
|||
}, nil
|
||||
}
|
||||
|
||||
func (mock *mockDeploymentNotExist) Get(name string, opts metav1.GetOptions) (*apiappsv1.Deployment, error) {
|
||||
func (mock *mockDeploymentNotExist) Get(ctx context.Context, name string, opts metav1.GetOptions) (*apiappsv1.Deployment, error) {
|
||||
return nil, &apierrors.StatusError{
|
||||
ErrStatus: metav1.Status{
|
||||
Reason: metav1.StatusReasonNotFound,
|
||||
|
|
@ -337,7 +338,7 @@ func (mock *mockDeploymentNotExist) Get(name string, opts metav1.GetOptions) (*a
|
|||
}
|
||||
}
|
||||
|
||||
func (mock *mockDeploymentNotExist) Create(*apiappsv1.Deployment) (*apiappsv1.Deployment, error) {
|
||||
func (mock *mockDeploymentNotExist) Create(context.Context, *apiappsv1.Deployment, metav1.CreateOptions) (*apiappsv1.Deployment, error) {
|
||||
return &apiappsv1.Deployment{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "test-deployment",
|
||||
|
|
@ -356,7 +357,7 @@ func (mock *MockServiceNotExistGetter) Services(namespace string) corev1.Service
|
|||
return &mockServiceNotExist{}
|
||||
}
|
||||
|
||||
func (mock *mockService) Create(*v1.Service) (*v1.Service, error) {
|
||||
func (mock *mockService) Create(context.Context, *v1.Service, metav1.CreateOptions) (*v1.Service, error) {
|
||||
return &v1.Service{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "test-service",
|
||||
|
|
@ -364,11 +365,11 @@ func (mock *mockService) Create(*v1.Service) (*v1.Service, error) {
|
|||
}, nil
|
||||
}
|
||||
|
||||
func (mock *mockService) Delete(name string, opts *metav1.DeleteOptions) error {
|
||||
func (mock *mockService) Delete(ctx context.Context, name string, opts metav1.DeleteOptions) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (mock *mockService) Get(name string, opts metav1.GetOptions) (*v1.Service, error) {
|
||||
func (mock *mockService) Get(ctx context.Context, name string, opts metav1.GetOptions) (*v1.Service, error) {
|
||||
return &v1.Service{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "test-service",
|
||||
|
|
@ -376,7 +377,7 @@ func (mock *mockService) Get(name string, opts metav1.GetOptions) (*v1.Service,
|
|||
}, nil
|
||||
}
|
||||
|
||||
func (mock *mockServiceNotExist) Create(*v1.Service) (*v1.Service, error) {
|
||||
func (mock *mockServiceNotExist) Create(context.Context, *v1.Service, metav1.CreateOptions) (*v1.Service, error) {
|
||||
return &v1.Service{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "test-service",
|
||||
|
|
@ -384,7 +385,7 @@ func (mock *mockServiceNotExist) Create(*v1.Service) (*v1.Service, error) {
|
|||
}, nil
|
||||
}
|
||||
|
||||
func (mock *mockServiceNotExist) Get(name string, opts metav1.GetOptions) (*v1.Service, error) {
|
||||
func (mock *mockServiceNotExist) Get(ctx context.Context, name string, opts metav1.GetOptions) (*v1.Service, error) {
|
||||
return nil, &apierrors.StatusError{
|
||||
ErrStatus: metav1.Status{
|
||||
Reason: metav1.StatusReasonNotFound,
|
||||
|
|
|
|||
|
|
@ -133,11 +133,11 @@ var requestsURLtc = []struct {
|
|||
}{
|
||||
{
|
||||
"coffee://localhost/",
|
||||
fmt.Errorf(`Get coffee://localhost/teams/acid: unsupported protocol scheme "coffee"`),
|
||||
fmt.Errorf(`Get "coffee://localhost/teams/acid": unsupported protocol scheme "coffee"`),
|
||||
},
|
||||
{
|
||||
"http://192.168.0.%31/",
|
||||
fmt.Errorf(`parse http://192.168.0.%%31/teams/acid: invalid URL escape "%%31"`),
|
||||
fmt.Errorf(`parse "http://192.168.0.%%31/teams/acid": invalid URL escape "%%31"`),
|
||||
},
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Reference in New Issue