From 4d936fc3a194e485ae592bdcc20710d4dd04f9fd Mon Sep 17 00:00:00 2001 From: Murat Kabilov Date: Mon, 17 Jul 2017 15:21:06 +0200 Subject: [PATCH 01/15] use proper ParameterCodec --- pkg/controller/postgresql.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/controller/postgresql.go b/pkg/controller/postgresql.go index a9ceba3c2..f5cd8e6f0 100644 --- a/pkg/controller/postgresql.go +++ b/pkg/controller/postgresql.go @@ -12,7 +12,7 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/watch" - "k8s.io/client-go/pkg/api" + "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/tools/cache" "github.com/zalando-incubator/postgres-operator/pkg/cluster" @@ -39,7 +39,7 @@ func (c *Controller) clusterListFunc(options meta_v1.ListOptions) (runtime.Objec req := c.RestClient.Get(). RequestURI(fmt.Sprintf(constants.ListClustersURITemplate, c.opConfig.Namespace)). - VersionedParams(&options, api.ParameterCodec). + VersionedParams(&options, scheme.ParameterCodec). FieldsSelectorParam(fields.Everything()) object, err := req.Do().Get() @@ -92,7 +92,7 @@ func (c *Controller) clusterListFunc(options meta_v1.ListOptions) (runtime.Objec func (c *Controller) clusterWatchFunc(options meta_v1.ListOptions) (watch.Interface, error) { req := c.RestClient.Get(). RequestURI(fmt.Sprintf(constants.WatchClustersURITemplate, c.opConfig.Namespace)). - VersionedParams(&options, api.ParameterCodec). + VersionedParams(&options, scheme.ParameterCodec). FieldsSelectorParam(fields.Everything()) return req.Watch() } From 3165106317fc8bcaaac5204be7140ee07662ef54 Mon Sep 17 00:00:00 2001 From: Murat Kabilov Date: Mon, 17 Jul 2017 17:07:16 +0200 Subject: [PATCH 02/15] use get parameter for k8s resource watch url --- pkg/controller/postgresql.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/controller/postgresql.go b/pkg/controller/postgresql.go index f5cd8e6f0..0c068fe20 100644 --- a/pkg/controller/postgresql.go +++ b/pkg/controller/postgresql.go @@ -90,8 +90,9 @@ func (c *Controller) clusterListFunc(options meta_v1.ListOptions) (runtime.Objec } func (c *Controller) clusterWatchFunc(options meta_v1.ListOptions) (watch.Interface, error) { + options.Watch = true req := c.RestClient.Get(). - RequestURI(fmt.Sprintf(constants.WatchClustersURITemplate, c.opConfig.Namespace)). + RequestURI(fmt.Sprintf(constants.ListClustersURITemplate, c.opConfig.Namespace)). VersionedParams(&options, scheme.ParameterCodec). FieldsSelectorParam(fields.Everything()) return req.Watch() From 129bcd7bd61dc3126f27453a59b73d8b9a4054fd Mon Sep 17 00:00:00 2001 From: Murat Kabilov Date: Mon, 17 Jul 2017 17:08:40 +0200 Subject: [PATCH 03/15] rest client: use interface instead of structure pointer --- pkg/cluster/cluster.go | 2 +- pkg/controller/controller.go | 2 +- pkg/util/k8sutil/k8sutil.go | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index 994abf222..028d2b123 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -37,7 +37,7 @@ var ( // Config contains operator-wide clients and configuration used from a cluster. TODO: remove struct duplication. type Config struct { KubeClient *kubernetes.Clientset //TODO: move clients to the better place? - RestClient *rest.RESTClient + RestClient rest.Interface RestConfig *rest.Config TeamsAPIClient *teams.API OpConfig config.Config diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 0091d70e4..6112ec6b9 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -20,7 +20,7 @@ import ( type Config struct { RestConfig *rest.Config KubeClient *kubernetes.Clientset - RestClient *rest.RESTClient + RestClient rest.Interface TeamsAPIClient *teams.API InfrastructureRoles map[string]spec.PgUser } diff --git a/pkg/util/k8sutil/k8sutil.go b/pkg/util/k8sutil/k8sutil.go index 2239fbde2..f3be75bfd 100644 --- a/pkg/util/k8sutil/k8sutil.go +++ b/pkg/util/k8sutil/k8sutil.go @@ -38,7 +38,7 @@ func ResourceNotFound(err error) bool { return apierrors.IsNotFound(err) } -func KubernetesRestClient(c *rest.Config) (*rest.RESTClient, error) { +func KubernetesRestClient(c *rest.Config) (rest.Interface, error) { c.GroupVersion = &schema.GroupVersion{Version: constants.K8sVersion} c.APIPath = constants.K8sAPIPath c.NegotiatedSerializer = serializer.DirectCodecFactory{CodecFactory: api.Codecs} From 2161a0816ca02f61905f1f3d2c6969ae40da175a Mon Sep 17 00:00:00 2001 From: Murat Kabilov Date: Tue, 18 Jul 2017 12:24:15 +0200 Subject: [PATCH 04/15] use scheme package instead of api --- pkg/util/k8sutil/k8sutil.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/util/k8sutil/k8sutil.go b/pkg/util/k8sutil/k8sutil.go index f3be75bfd..75181d24c 100644 --- a/pkg/util/k8sutil/k8sutil.go +++ b/pkg/util/k8sutil/k8sutil.go @@ -10,7 +10,7 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/serializer" "k8s.io/client-go/kubernetes" - "k8s.io/client-go/pkg/api" + "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" @@ -41,7 +41,7 @@ func ResourceNotFound(err error) bool { func KubernetesRestClient(c *rest.Config) (rest.Interface, error) { c.GroupVersion = &schema.GroupVersion{Version: constants.K8sVersion} c.APIPath = constants.K8sAPIPath - c.NegotiatedSerializer = serializer.DirectCodecFactory{CodecFactory: api.Codecs} + c.NegotiatedSerializer = serializer.DirectCodecFactory{CodecFactory: scheme.Codecs} schemeBuilder := runtime.NewSchemeBuilder( func(scheme *runtime.Scheme) error { @@ -57,7 +57,7 @@ func KubernetesRestClient(c *rest.Config) (rest.Interface, error) { ) return nil }) - if err := schemeBuilder.AddToScheme(api.Scheme); err != nil { + if err := schemeBuilder.AddToScheme(scheme.Scheme); err != nil { return nil, fmt.Errorf("could not apply functions to register PostgreSQL TPR type: %v", err) } From d7e9142fc749ab6ecc695546396b159a4e106f93 Mon Sep 17 00:00:00 2001 From: Murat Kabilov Date: Mon, 24 Jul 2017 12:37:06 +0200 Subject: [PATCH 05/15] proper names for constants; some clean up for log messages --- cmd/main.go | 6 +-- pkg/cluster/cluster.go | 32 +++++++-------- pkg/cluster/filesystems.go | 2 +- pkg/cluster/k8sres.go | 2 +- pkg/cluster/pod.go | 14 +++---- pkg/cluster/resources.go | 52 ++++++++++++------------ pkg/cluster/sync.go | 8 ++-- pkg/cluster/util.go | 12 +++--- pkg/cluster/volumes.go | 22 +++++----- pkg/controller/pod.go | 2 +- pkg/controller/postgresql.go | 32 +++++++-------- pkg/controller/util.go | 11 +++-- pkg/spec/postgresql_test.go | 6 +-- pkg/spec/types_test.go | 2 +- pkg/util/constants/kubernetes.go | 4 +- pkg/util/constants/roles.go | 2 +- pkg/util/constants/thirdpartyresource.go | 7 ++-- pkg/util/filesystems/ext234.go | 2 +- pkg/util/k8sutil/k8sutil.go | 2 +- pkg/util/users/users.go | 8 ++-- pkg/util/util_test.go | 4 +- pkg/util/volumes/ebs.go | 12 +++--- 22 files changed, 122 insertions(+), 122 deletions(-) diff --git a/cmd/main.go b/cmd/main.go index 5a0cbedbe..d8e189b23 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -47,17 +47,17 @@ func init() { func ControllerConfig() *controller.Config { restConfig, err := k8sutil.RestConfig(KubeConfigFile, OutOfCluster) if err != nil { - log.Fatalf("Can't get REST config: %s", err) + log.Fatalf("Can't get REST config: %v", err) } client, err := k8sutil.KubernetesClient(restConfig) if err != nil { - log.Fatalf("Can't create client: %s", err) + log.Fatalf("Can't create client: %v", err) } restClient, err := k8sutil.KubernetesRestClient(restConfig) if err != nil { - log.Fatalf("Can't create rest client: %s", err) + log.Fatalf("Can't create rest client: %v", err) } return &controller.Config{ diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index 028d2b123..bff10235f 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -137,7 +137,7 @@ func (c *Cluster) setStatus(status spec.PostgresStatus) { } if err != nil { - c.logger.Warningf("could not set status for cluster '%s': %s", c.clusterName(), err) + c.logger.Warningf("could not set status for cluster %q: %v", c.clusterName(), err) } } @@ -180,7 +180,7 @@ func (c *Cluster) Create() error { if err != nil { return fmt.Errorf("could not create endpoint: %v", err) } - c.logger.Infof("endpoint '%s' has been successfully created", util.NameFromMeta(ep.ObjectMeta)) + c.logger.Infof("endpoint %q has been successfully created", util.NameFromMeta(ep.ObjectMeta)) for _, role := range []PostgresRole{Master, Replica} { if role == Replica && !c.Spec.ReplicaLoadBalancer { @@ -190,7 +190,7 @@ func (c *Cluster) Create() error { if err != nil { return fmt.Errorf("could not create %s service: %v", role, err) } - c.logger.Infof("%s service '%s' has been successfully created", role, util.NameFromMeta(service.ObjectMeta)) + c.logger.Infof("%s service %q has been successfully created", role, util.NameFromMeta(service.ObjectMeta)) } if err = c.initUsers(); err != nil { @@ -207,12 +207,12 @@ func (c *Cluster) Create() error { if err != nil { return fmt.Errorf("could not create statefulset: %v", err) } - c.logger.Infof("statefulset '%s' has been successfully created", util.NameFromMeta(ss.ObjectMeta)) + c.logger.Infof("statefulset %q has been successfully created", util.NameFromMeta(ss.ObjectMeta)) c.logger.Info("Waiting for cluster being ready") if err = c.waitStatefulsetPodsReady(); err != nil { - c.logger.Errorf("Failed to create cluster: %s", err) + c.logger.Errorf("Failed to create cluster: %v", err) return err } c.logger.Infof("pods are ready") @@ -233,7 +233,7 @@ func (c *Cluster) Create() error { err = c.listResources() if err != nil { - c.logger.Errorf("could not list resources: %s", err) + c.logger.Errorf("could not list resources: %v", err) } return nil @@ -243,7 +243,7 @@ func (c *Cluster) sameServiceWith(role PostgresRole, service *v1.Service) (match //TODO: improve comparison match = true if c.Service[role].Spec.Type != service.Spec.Type { - return false, fmt.Sprintf("new %s service's type %s doesn't match the current one %s", + return false, fmt.Sprintf("new %s service's type %q doesn't match the current one %q", role, service.Spec.Type, c.Service[role].Spec.Type) } oldSourceRanges := c.Service[role].Spec.LoadBalancerSourceRanges @@ -259,7 +259,7 @@ func (c *Cluster) sameServiceWith(role PostgresRole, service *v1.Service) (match oldDNSAnnotation := c.Service[role].Annotations[constants.ZalandoDNSNameAnnotation] newDNSAnnotation := service.Annotations[constants.ZalandoDNSNameAnnotation] if oldDNSAnnotation != newDNSAnnotation { - return false, fmt.Sprintf("new %s service's '%s' annotation doesn't match the current one", role, constants.ZalandoDNSNameAnnotation) + return false, fmt.Sprintf("new %s service's %q annotation doesn't match the current one", role, constants.ZalandoDNSNameAnnotation) } return true, "" @@ -290,7 +290,7 @@ func (c *Cluster) compareStatefulSetWith(statefulSet *v1beta1.StatefulSet) *comp } if len(c.Statefulset.Spec.Template.Spec.Containers) == 0 { - c.logger.Warnf("statefulset '%s' has no container", util.NameFromMeta(c.Statefulset.ObjectMeta)) + c.logger.Warnf("statefulset %q has no container", util.NameFromMeta(c.Statefulset.ObjectMeta)) return &compareStatefulsetResult{} } // In the comparisons below, the needsReplace and needsRollUpdate flags are never reset, since checks fall through @@ -333,12 +333,12 @@ func (c *Cluster) compareStatefulSetWith(statefulSet *v1beta1.StatefulSet) *comp } if !reflect.DeepEqual(c.Statefulset.Spec.VolumeClaimTemplates[i].Annotations, statefulSet.Spec.VolumeClaimTemplates[i].Annotations) { needsReplace = true - reasons = append(reasons, fmt.Sprintf("new statefulset's annotations for volume %s doesn't match the current one", name)) + reasons = append(reasons, fmt.Sprintf("new statefulset's annotations for volume %q doesn't match the current one", name)) } if !reflect.DeepEqual(c.Statefulset.Spec.VolumeClaimTemplates[i].Spec, statefulSet.Spec.VolumeClaimTemplates[i].Spec) { name := c.Statefulset.Spec.VolumeClaimTemplates[i].Name needsReplace = true - reasons = append(reasons, fmt.Sprintf("new statefulset's volumeClaimTemplates specification for volume %s doesn't match the current one", name)) + reasons = append(reasons, fmt.Sprintf("new statefulset's volumeClaimTemplates specification for volume %q doesn't match the current one", name)) } } @@ -405,7 +405,7 @@ func (c *Cluster) Update(newSpec *spec.Postgresql) error { defer c.mu.Unlock() c.setStatus(spec.ClusterStatusUpdating) - c.logger.Debugf("Cluster update from version %s to %s", + c.logger.Debugf("Cluster update from version %q to %q", c.Metadata.ResourceVersion, newSpec.Metadata.ResourceVersion) /* Make sure we update when this function exists */ @@ -431,7 +431,7 @@ func (c *Cluster) Update(newSpec *spec.Postgresql) error { if err != nil { return fmt.Errorf("could not create new %s service: %v", role, err) } - c.logger.Infof("%s service '%s' has been created", role, util.NameFromMeta(service.ObjectMeta)) + c.logger.Infof("%s service %q has been created", role, util.NameFromMeta(service.ObjectMeta)) } } // only proceed further if both old and new load balancer were present @@ -446,7 +446,7 @@ func (c *Cluster) Update(newSpec *spec.Postgresql) error { c.setStatus(spec.ClusterStatusUpdateFailed) return fmt.Errorf("could not update %s service: %v", role, err) } - c.logger.Infof("%s service '%s' has been updated", role, util.NameFromMeta(c.Service[role].ObjectMeta)) + c.logger.Infof("%s service %q has been updated", role, util.NameFromMeta(c.Service[role].ObjectMeta)) } } @@ -471,11 +471,11 @@ func (c *Cluster) Update(newSpec *spec.Postgresql) error { } } //TODO: if there is a change in numberOfInstances, make sure Pods have been created/deleted - c.logger.Infof("statefulset '%s' has been updated", util.NameFromMeta(c.Statefulset.ObjectMeta)) + c.logger.Infof("statefulset %q has been updated", util.NameFromMeta(c.Statefulset.ObjectMeta)) } if c.Spec.PgVersion != newSpec.Spec.PgVersion { // PG versions comparison - c.logger.Warnf("Postgresql version change(%s -> %s) is not allowed", + c.logger.Warnf("Postgresql version change(%q -> %q) is not allowed", c.Spec.PgVersion, newSpec.Spec.PgVersion) //TODO: rewrite pg version in tpr spec } diff --git a/pkg/cluster/filesystems.go b/pkg/cluster/filesystems.go index e68e5614c..65e7048c7 100644 --- a/pkg/cluster/filesystems.go +++ b/pkg/cluster/filesystems.go @@ -39,5 +39,5 @@ func (c *Cluster) resizePostgresFilesystem(podName *spec.NamespacedName, resizer return err } - return fmt.Errorf("could not resize filesystem: no compatible resizers for the filesystem of type %s", fsType) + return fmt.Errorf("could not resize filesystem: no compatible resizers for the filesystem of type %q", fsType) } diff --git a/pkg/cluster/k8sres.go b/pkg/cluster/k8sres.go index cebef874a..2ee3aa521 100644 --- a/pkg/cluster/k8sres.go +++ b/pkg/cluster/k8sres.go @@ -199,7 +199,7 @@ PATRONI_INITDB_PARAMS: } result, err := json.Marshal(config) if err != nil { - c.logger.Errorf("Cannot convert spilo configuration into JSON: %s", err) + c.logger.Errorf("Cannot convert spilo configuration into JSON: %v", err) return "" } return string(result) diff --git a/pkg/cluster/pod.go b/pkg/cluster/pod.go index 3040bc7e9..b3e490a4c 100644 --- a/pkg/cluster/pod.go +++ b/pkg/cluster/pod.go @@ -35,11 +35,11 @@ func (c *Cluster) deletePods() error { for _, obj := range pods { podName := util.NameFromMeta(obj.ObjectMeta) - c.logger.Debugf("Deleting pod '%s'", podName) + c.logger.Debugf("Deleting pod %q", podName) if err := c.deletePod(podName); err != nil { - c.logger.Errorf("could not delete pod '%s': %s", podName, err) + c.logger.Errorf("could not delete pod %q: %v", podName, err) } else { - c.logger.Infof("pod '%s' has been deleted", podName) + c.logger.Infof("pod %q has been deleted", podName) } } if len(pods) > 0 { @@ -107,7 +107,7 @@ func (c *Cluster) recreatePod(pod v1.Pod) error { if err := c.waitForPodLabel(ch); err != nil { return err } - c.logger.Infof("pod '%s' is ready", podName) + c.logger.Infof("pod %q is ready", podName) return nil } @@ -136,7 +136,7 @@ func (c *Cluster) recreatePods() error { } if err := c.recreatePod(pod); err != nil { - return fmt.Errorf("could not recreate replica pod '%s': %v", util.NameFromMeta(pod.ObjectMeta), err) + return fmt.Errorf("could not recreate replica pod %q: %v", util.NameFromMeta(pod.ObjectMeta), err) } } if masterPod.Name == "" { @@ -144,10 +144,10 @@ func (c *Cluster) recreatePods() error { } else { //TODO: do manual failover //TODO: specify master, leave new master empty - c.logger.Infof("Recreating master pod '%s'", util.NameFromMeta(masterPod.ObjectMeta)) + c.logger.Infof("Recreating master pod %q", util.NameFromMeta(masterPod.ObjectMeta)) if err := c.recreatePod(masterPod); err != nil { - return fmt.Errorf("could not recreate master pod '%s': %v", util.NameFromMeta(masterPod.ObjectMeta), err) + return fmt.Errorf("could not recreate master pod %q: %v", util.NameFromMeta(masterPod.ObjectMeta), err) } } diff --git a/pkg/cluster/resources.go b/pkg/cluster/resources.go index 6b7cbc0ee..1ee9bd61c 100644 --- a/pkg/cluster/resources.go +++ b/pkg/cluster/resources.go @@ -61,7 +61,7 @@ func (c *Cluster) loadResources() error { continue } c.Secrets[secret.UID] = &secrets.Items[i] - c.logger.Debugf("secret loaded, uid: %s", secret.UID) + c.logger.Debugf("secret loaded, uid: %q", secret.UID) } statefulSets, err := c.KubeClient.StatefulSets(ns).List(listOptions) @@ -80,19 +80,19 @@ func (c *Cluster) loadResources() error { func (c *Cluster) listResources() error { if c.Statefulset != nil { - c.logger.Infof("Found statefulset: %s (uid: %s)", util.NameFromMeta(c.Statefulset.ObjectMeta), c.Statefulset.UID) + c.logger.Infof("Found statefulset: %q (uid: %q)", util.NameFromMeta(c.Statefulset.ObjectMeta), c.Statefulset.UID) } for _, obj := range c.Secrets { - c.logger.Infof("Found secret: %s (uid: %s)", util.NameFromMeta(obj.ObjectMeta), obj.UID) + c.logger.Infof("Found secret: %q (uid: %q)", util.NameFromMeta(obj.ObjectMeta), obj.UID) } if c.Endpoint != nil { - c.logger.Infof("Found endpoint: %s (uid: %s)", util.NameFromMeta(c.Endpoint.ObjectMeta), c.Endpoint.UID) + c.logger.Infof("Found endpoint: %q (uid: %q)", util.NameFromMeta(c.Endpoint.ObjectMeta), c.Endpoint.UID) } for role, service := range c.Service { - c.logger.Infof("Found %s service: %s (uid: %s)", role, util.NameFromMeta(service.ObjectMeta), service.UID) + c.logger.Infof("Found %s service: %q (uid: %q)", role, util.NameFromMeta(service.ObjectMeta), service.UID) } pods, err := c.listPods() @@ -101,7 +101,7 @@ func (c *Cluster) listResources() error { } for _, obj := range pods { - c.logger.Infof("Found pod: %s (uid: %s)", util.NameFromMeta(obj.ObjectMeta), obj.UID) + c.logger.Infof("Found pod: %q (uid: %q)", util.NameFromMeta(obj.ObjectMeta), obj.UID) } pvcs, err := c.listPersistentVolumeClaims() @@ -110,7 +110,7 @@ func (c *Cluster) listResources() error { } for _, obj := range pvcs { - c.logger.Infof("Found PVC: %s (uid: %s)", util.NameFromMeta(obj.ObjectMeta), obj.UID) + c.logger.Infof("Found PVC: %q (uid: %q)", util.NameFromMeta(obj.ObjectMeta), obj.UID) } return nil @@ -129,7 +129,7 @@ func (c *Cluster) createStatefulSet() (*v1beta1.StatefulSet, error) { return nil, err } c.Statefulset = statefulSet - c.logger.Debugf("Created new statefulset '%s', uid: %s", util.NameFromMeta(statefulSet.ObjectMeta), statefulSet.UID) + c.logger.Debugf("Created new statefulset %q, uid: %q", util.NameFromMeta(statefulSet.ObjectMeta), statefulSet.UID) return statefulSet, nil } @@ -144,7 +144,7 @@ func (c *Cluster) updateStatefulSet(newStatefulSet *v1beta1.StatefulSet) error { patchData, err := specPatch(newStatefulSet.Spec) if err != nil { - return fmt.Errorf("could not form patch for the statefulset '%s': %v", statefulSetName, err) + return fmt.Errorf("could not form patch for the statefulset %q: %v", statefulSetName, err) } statefulSet, err := c.KubeClient.StatefulSets(c.Statefulset.Namespace).Patch( @@ -152,7 +152,7 @@ func (c *Cluster) updateStatefulSet(newStatefulSet *v1beta1.StatefulSet) error { types.MergePatchType, patchData, "") if err != nil { - return fmt.Errorf("could not patch statefulset '%s': %v", statefulSetName, err) + return fmt.Errorf("could not patch statefulset %q: %v", statefulSetName, err) } c.Statefulset = statefulSet @@ -174,7 +174,7 @@ func (c *Cluster) replaceStatefulSet(newStatefulSet *v1beta1.StatefulSet) error options := meta_v1.DeleteOptions{OrphanDependents: &orphanDepencies} if err := c.KubeClient.StatefulSets(oldStatefulset.Namespace).Delete(oldStatefulset.Name, &options); err != nil { - return fmt.Errorf("could not delete statefulset '%s': %v", statefulSetName, err) + return fmt.Errorf("could not delete statefulset %q: %v", statefulSetName, err) } // make sure we clear the stored statefulset status if the subsequent create fails. c.Statefulset = nil @@ -194,7 +194,7 @@ func (c *Cluster) replaceStatefulSet(newStatefulSet *v1beta1.StatefulSet) error // create the new statefulset with the desired spec. It would take over the remaining pods. createdStatefulset, err := c.KubeClient.StatefulSets(newStatefulSet.Namespace).Create(newStatefulSet) if err != nil { - return fmt.Errorf("could not create statefulset '%s': %v", statefulSetName, err) + return fmt.Errorf("could not create statefulset %q: %v", statefulSetName, err) } // check that all the previous replicas were picked up. if newStatefulSet.Spec.Replicas == oldStatefulset.Spec.Replicas && @@ -216,7 +216,7 @@ func (c *Cluster) deleteStatefulSet() error { if err != nil { return err } - c.logger.Infof("statefulset '%s' has been deleted", util.NameFromMeta(c.Statefulset.ObjectMeta)) + c.logger.Infof("statefulset %q has been deleted", util.NameFromMeta(c.Statefulset.ObjectMeta)) c.Statefulset = nil if err := c.deletePods(); err != nil { @@ -270,12 +270,12 @@ func (c *Cluster) updateService(role PostgresRole, newService *v1.Service) error } err = c.KubeClient.Services(c.Service[role].Namespace).Delete(c.Service[role].Name, c.deleteOptions) if err != nil { - return fmt.Errorf("could not delete service '%s': '%v'", serviceName, err) + return fmt.Errorf("could not delete service %q: %v", serviceName, err) } c.Endpoint = nil svc, err := c.KubeClient.Services(newService.Namespace).Create(newService) if err != nil { - return fmt.Errorf("could not create service '%s': '%v'", serviceName, err) + return fmt.Errorf("could not create service %q: %v", serviceName, err) } c.Service[role] = svc if role == Master { @@ -283,7 +283,7 @@ func (c *Cluster) updateService(role PostgresRole, newService *v1.Service) error endpointSpec := c.generateMasterEndpoints(currentEndpoint.Subsets) ep, err := c.KubeClient.Endpoints(c.Service[role].Namespace).Create(endpointSpec) if err != nil { - return fmt.Errorf("could not create endpoint '%s': '%v'", endpointName, err) + return fmt.Errorf("could not create endpoint %q: %v", endpointName, err) } c.Endpoint = ep } @@ -299,13 +299,13 @@ func (c *Cluster) updateService(role PostgresRole, newService *v1.Service) error []byte(annotationsPatchData), "") if err != nil { - return fmt.Errorf("could not replace annotations for the service '%s': %v", serviceName, err) + return fmt.Errorf("could not replace annotations for the service %q: %v", serviceName, err) } } patchData, err := specPatch(newService.Spec) if err != nil { - return fmt.Errorf("could not form patch for the service '%s': %v", serviceName, err) + return fmt.Errorf("could not form patch for the service %q: %v", serviceName, err) } svc, err := c.KubeClient.Services(c.Service[role].Namespace).Patch( @@ -313,7 +313,7 @@ func (c *Cluster) updateService(role PostgresRole, newService *v1.Service) error types.MergePatchType, patchData, "") if err != nil { - return fmt.Errorf("could not patch service '%s': %v", serviceName, err) + return fmt.Errorf("could not patch service %q: %v", serviceName, err) } c.Service[role] = svc @@ -330,7 +330,7 @@ func (c *Cluster) deleteService(role PostgresRole) error { if err != nil { return err } - c.logger.Infof("%s service '%s' has been deleted", role, util.NameFromMeta(service.ObjectMeta)) + c.logger.Infof("%s service %q has been deleted", role, util.NameFromMeta(service.ObjectMeta)) c.Service[role] = nil return nil } @@ -359,7 +359,7 @@ func (c *Cluster) deleteEndpoint() error { if err != nil { return err } - c.logger.Infof("endpoint '%s' has been deleted", util.NameFromMeta(c.Endpoint.ObjectMeta)) + c.logger.Infof("endpoint %q has been deleted", util.NameFromMeta(c.Endpoint.ObjectMeta)) c.Endpoint = nil return nil @@ -376,7 +376,7 @@ func (c *Cluster) applySecrets() error { if err != nil { return fmt.Errorf("could not get current secret: %v", err) } - c.logger.Debugf("secret '%s' already exists, fetching it's password", util.NameFromMeta(curSecret.ObjectMeta)) + c.logger.Debugf("secret %q already exists, fetching it's password", util.NameFromMeta(curSecret.ObjectMeta)) if secretUsername == c.systemUsers[constants.SuperuserKeyName].Name { secretUsername = constants.SuperuserKeyName userMap = c.systemUsers @@ -393,10 +393,10 @@ func (c *Cluster) applySecrets() error { continue } else { if err != nil { - return fmt.Errorf("could not create secret for user '%s': %v", secretUsername, err) + return fmt.Errorf("could not create secret for user %q: %v", secretUsername, err) } c.Secrets[secret.UID] = secret - c.logger.Debugf("Created new secret '%s', uid: %s", util.NameFromMeta(secret.ObjectMeta), secret.UID) + c.logger.Debugf("Created new secret %q, uid: %q", util.NameFromMeta(secret.ObjectMeta), secret.UID) } } @@ -404,12 +404,12 @@ func (c *Cluster) applySecrets() error { } func (c *Cluster) deleteSecret(secret *v1.Secret) error { - c.logger.Debugf("Deleting secret '%s'", util.NameFromMeta(secret.ObjectMeta)) + c.logger.Debugf("Deleting secret %q", util.NameFromMeta(secret.ObjectMeta)) err := c.KubeClient.Secrets(secret.Namespace).Delete(secret.Name, c.deleteOptions) if err != nil { return err } - c.logger.Infof("secret '%s' has been deleted", util.NameFromMeta(secret.ObjectMeta)) + c.logger.Infof("secret %q has been deleted", util.NameFromMeta(secret.ObjectMeta)) delete(c.Secrets, secret.UID) return err diff --git a/pkg/cluster/sync.go b/pkg/cluster/sync.go index 625eb34f5..2b796b511 100644 --- a/pkg/cluster/sync.go +++ b/pkg/cluster/sync.go @@ -95,7 +95,7 @@ func (c *Cluster) syncService(role PostgresRole) error { if err != nil { return fmt.Errorf("could not create missing %s service: %v", role, err) } - c.logger.Infof("Created missing %s service '%s'", role, util.NameFromMeta(svc.ObjectMeta)) + c.logger.Infof("Created missing %s service %q", role, util.NameFromMeta(svc.ObjectMeta)) return nil } @@ -110,7 +110,7 @@ func (c *Cluster) syncService(role PostgresRole) error { if err := c.updateService(role, desiredSvc); err != nil { return fmt.Errorf("could not update %s service to match desired state: %v", role, err) } - c.logger.Infof("%s service '%s' is in the desired state now", role, util.NameFromMeta(desiredSvc.ObjectMeta)) + c.logger.Infof("%s service %q is in the desired state now", role, util.NameFromMeta(desiredSvc.ObjectMeta)) return nil } @@ -122,7 +122,7 @@ func (c *Cluster) syncEndpoint() error { if err != nil { return fmt.Errorf("could not create missing endpoint: %v", err) } - c.logger.Infof("Created missing endpoint '%s'", util.NameFromMeta(ep.ObjectMeta)) + c.logger.Infof("Created missing endpoint %q", util.NameFromMeta(ep.ObjectMeta)) return nil } @@ -151,7 +151,7 @@ func (c *Cluster) syncStatefulSet() error { if err != nil { return fmt.Errorf("cluster is not ready: %v", err) } - c.logger.Infof("Created missing statefulset '%s'", util.NameFromMeta(ss.ObjectMeta)) + c.logger.Infof("Created missing statefulset %q", util.NameFromMeta(ss.ObjectMeta)) if !rollUpdate { return nil } diff --git a/pkg/cluster/util.go b/pkg/cluster/util.go index fb78d90ec..1f34fbfc2 100644 --- a/pkg/cluster/util.go +++ b/pkg/cluster/util.go @@ -77,11 +77,11 @@ func metadataAnnotationsPatch(annotations map[string]string) string { func (c *Cluster) logStatefulSetChanges(old, new *v1beta1.StatefulSet, isUpdate bool, reasons []string) { if isUpdate { - c.logger.Infof("statefulset '%s' has been changed", + c.logger.Infof("statefulset %q has been changed", util.NameFromMeta(old.ObjectMeta), ) } else { - c.logger.Infof("statefulset '%s' is not in the desired state and needs to be updated", + c.logger.Infof("statefulset %q is not in the desired state and needs to be updated", util.NameFromMeta(old.ObjectMeta), ) } @@ -89,18 +89,18 @@ func (c *Cluster) logStatefulSetChanges(old, new *v1beta1.StatefulSet, isUpdate if len(reasons) > 0 { for _, reason := range reasons { - c.logger.Infof("Reason: %s", reason) + c.logger.Infof("Reason: %q", reason) } } } func (c *Cluster) logServiceChanges(role PostgresRole, old, new *v1.Service, isUpdate bool, reason string) { if isUpdate { - c.logger.Infof("%s service '%s' has been changed", + c.logger.Infof("%s service %q has been changed", role, util.NameFromMeta(old.ObjectMeta), ) } else { - c.logger.Infof("%s service '%s is not in the desired state and needs to be updated", + c.logger.Infof("%s service %q is not in the desired state and needs to be updated", role, util.NameFromMeta(old.ObjectMeta), ) } @@ -127,7 +127,7 @@ func (c *Cluster) getOAuthToken() (string, error) { Get(c.OpConfig.OAuthTokenSecretName.Name, meta_v1.GetOptions{}) if err != nil { - c.logger.Debugf("Oauth token secret name: %s", c.OpConfig.OAuthTokenSecretName) + c.logger.Debugf("Oauth token secret name: %q", c.OpConfig.OAuthTokenSecretName) return "", fmt.Errorf("could not get credentials secret: %v", err) } data := credentialsSecret.Data diff --git a/pkg/cluster/volumes.go b/pkg/cluster/volumes.go index 9889916d1..03115d96b 100644 --- a/pkg/cluster/volumes.go +++ b/pkg/cluster/volumes.go @@ -36,7 +36,7 @@ func (c *Cluster) deletePersistenVolumeClaims() error { return err } for _, pvc := range pvcs { - c.logger.Debugf("Deleting PVC '%s'", util.NameFromMeta(pvc.ObjectMeta)) + c.logger.Debugf("Deleting PVC %q", util.NameFromMeta(pvc.ObjectMeta)) if err := c.KubeClient.PersistentVolumeClaims(pvc.Namespace).Delete(pvc.Name, c.deleteOptions); err != nil { c.logger.Warningf("could not delete PersistentVolumeClaim: %v", err) } @@ -63,10 +63,10 @@ func (c *Cluster) listPersistentVolumes() ([]*v1.PersistentVolume, error) { if lastDash > 0 && lastDash < len(pvc.Name)-1 { pvcNumber, err := strconv.Atoi(pvc.Name[lastDash+1:]) if err != nil { - return nil, fmt.Errorf("could not convert last part of the persistent volume claim name %s to a number", pvc.Name) + return nil, fmt.Errorf("could not convert last part of the persistent volume claim name %q to a number", pvc.Name) } if int32(pvcNumber) > lastPodIndex { - c.logger.Debugf("Skipping persistent volume %s corresponding to a non-running pods", pvc.Name) + c.logger.Debugf("Skipping persistent volume %q corresponding to a non-running pods", pvc.Name) continue } } @@ -119,22 +119,22 @@ func (c *Cluster) resizeVolumes(newVolume spec.Volume, resizers []volumes.Volume if err != nil { return err } - c.logger.Debugf("updating persistent volume %s to %d", pv.Name, newSize) + c.logger.Debugf("updating persistent volume %q to %d", pv.Name, newSize) if err := resizer.ResizeVolume(awsVolumeId, newSize); err != nil { - return fmt.Errorf("could not resize EBS volume %s: %v", awsVolumeId, err) + return fmt.Errorf("could not resize EBS volume %q: %v", awsVolumeId, err) } - c.logger.Debugf("resizing the filesystem on the volume %s", pv.Name) + c.logger.Debugf("resizing the filesystem on the volume %q", pv.Name) podName := getPodNameFromPersistentVolume(pv) if err := c.resizePostgresFilesystem(podName, []filesystems.FilesystemResizer{&filesystems.Ext234Resize{}}); err != nil { - return fmt.Errorf("could not resize the filesystem on pod '%s': %v", podName, err) + return fmt.Errorf("could not resize the filesystem on pod %q: %v", podName, err) } - c.logger.Debugf("filesystem resize successful on volume %s", pv.Name) + c.logger.Debugf("filesystem resize successful on volume %q", pv.Name) pv.Spec.Capacity[v1.ResourceStorage] = newQuantity - c.logger.Debugf("updating persistent volume definition for volume %s", pv.Name) + c.logger.Debugf("updating persistent volume definition for volume %q", pv.Name) if _, err := c.KubeClient.PersistentVolumes().Update(pv); err != nil { - return fmt.Errorf("could not update persistent volume: %s", err) + return fmt.Errorf("could not update persistent volume: %q", err) } - c.logger.Debugf("successfully updated persistent volume %s", pv.Name) + c.logger.Debugf("successfully updated persistent volume %q", pv.Name) } } if len(pvs) > 0 && totalCompatible == 0 { diff --git a/pkg/controller/pod.go b/pkg/controller/pod.go index 306e0f8e0..690d0f431 100644 --- a/pkg/controller/pod.go +++ b/pkg/controller/pod.go @@ -107,7 +107,7 @@ func (c *Controller) podEventsDispatcher(stopCh <-chan struct{}) { c.clustersMu.RUnlock() if ok { - c.logger.Debugf("Sending %s event of pod '%s' to the '%s' cluster channel", event.EventType, event.PodName, event.ClusterName) + c.logger.Debugf("Sending %q event of pod %q to the %q cluster channel", event.EventType, event.PodName, event.ClusterName) cluster.ReceivePodEvent(event) } case <-stopCh: diff --git a/pkg/controller/postgresql.go b/pkg/controller/postgresql.go index 0c068fe20..c458ba78e 100644 --- a/pkg/controller/postgresql.go +++ b/pkg/controller/postgresql.go @@ -120,11 +120,11 @@ func (c *Controller) processEvent(obj interface{}) error { switch event.EventType { case spec.EventAdd: if clusterFound { - logger.Debugf("Cluster '%s' already exists", clusterName) + logger.Debugf("Cluster %q already exists", clusterName) return nil } - logger.Infof("Creation of the '%s' cluster started", clusterName) + logger.Infof("Creation of the %q cluster started", clusterName) stopCh := make(chan struct{}) cl = cluster.New(c.makeClusterConfig(), *event.NewSpec, logger) @@ -142,31 +142,31 @@ func (c *Controller) processEvent(obj interface{}) error { return nil } - logger.Infof("Cluster '%s' has been created", clusterName) + logger.Infof("Cluster %q has been created", clusterName) case spec.EventUpdate: - logger.Infof("Update of the '%s' cluster started", clusterName) + logger.Infof("Update of the %q cluster started", clusterName) if !clusterFound { - logger.Warnf("Cluster '%s' does not exist", clusterName) + logger.Warnf("Cluster %q does not exist", clusterName) return nil } if err := cl.Update(event.NewSpec); err != nil { - cl.Error = fmt.Errorf("could not update cluster: %s", err) + cl.Error = fmt.Errorf("could not update cluster: %v", err) logger.Errorf("%v", cl.Error) return nil } cl.Error = nil - logger.Infof("Cluster '%s' has been updated", clusterName) + logger.Infof("Cluster %q has been updated", clusterName) case spec.EventDelete: - logger.Infof("Deletion of the '%s' cluster started", clusterName) + logger.Infof("Deletion of the %q cluster started", clusterName) if !clusterFound { - logger.Errorf("Unknown cluster: %s", clusterName) + logger.Errorf("Unknown cluster: %q", clusterName) return nil } if err := cl.Delete(); err != nil { - logger.Errorf("could not delete cluster '%s': %s", clusterName, err) + logger.Errorf("could not delete cluster %q: %v", clusterName, err) return nil } close(c.stopChs[clusterName]) @@ -176,9 +176,9 @@ func (c *Controller) processEvent(obj interface{}) error { delete(c.stopChs, clusterName) c.clustersMu.Unlock() - logger.Infof("Cluster '%s' has been deleted", clusterName) + logger.Infof("Cluster %q has been deleted", clusterName) case spec.EventSync: - logger.Infof("Syncing of the '%s' cluster started", clusterName) + logger.Infof("Syncing of the %q cluster started", clusterName) // no race condition because a cluster is always processed by single worker if !clusterFound { @@ -193,13 +193,13 @@ func (c *Controller) processEvent(obj interface{}) error { } if err := cl.Sync(); err != nil { - cl.Error = fmt.Errorf("could not sync cluster '%s': %v", clusterName, err) + cl.Error = fmt.Errorf("could not sync cluster %q: %v", clusterName, err) logger.Errorf("%v", cl.Error) return nil } cl.Error = nil - logger.Infof("Cluster '%s' has been synced", clusterName) + logger.Infof("Cluster %q has been synced", clusterName) } return nil @@ -236,7 +236,7 @@ func (c *Controller) queueClusterEvent(old, new *spec.Postgresql, eventType spec } if clusterError != nil && eventType != spec.EventDelete { - c.logger.Debugf("Skipping %s event for invalid cluster %s (reason: %v)", eventType, clusterName, clusterError) + c.logger.Debugf("Skipping %q event for invalid cluster %q (reason: %v)", eventType, clusterName, clusterError) return } @@ -253,7 +253,7 @@ func (c *Controller) queueClusterEvent(old, new *spec.Postgresql, eventType spec if err := c.clusterEventQueues[workerID].Add(clusterEvent); err != nil { c.logger.WithField("worker", workerID).Errorf("error when queueing cluster event: %v", clusterEvent) } - c.logger.WithField("worker", workerID).Infof("%s of the '%s' cluster has been queued", eventType, clusterName) + c.logger.WithField("worker", workerID).Infof("%q of the %q cluster has been queued", eventType, clusterName) } func (c *Controller) postgresqlAdd(obj interface{}) { diff --git a/pkg/controller/util.go b/pkg/controller/util.go index 4346f8dcb..1ad23f7c6 100644 --- a/pkg/controller/util.go +++ b/pkg/controller/util.go @@ -49,17 +49,16 @@ func (c *Controller) clusterWorkerID(clusterName spec.NamespacedName) uint32 { } func (c *Controller) createTPR() error { - TPRName := fmt.Sprintf("%s.%s", constants.TPRName, constants.TPRVendor) - tpr := thirdPartyResource(TPRName) + tpr := thirdPartyResource(constants.TPRName) _, err := c.KubeClient.ExtensionsV1beta1().ThirdPartyResources().Create(tpr) if err != nil { if !k8sutil.ResourceAlreadyExists(err) { return err } - c.logger.Infof("ThirdPartyResource '%s' is already registered", TPRName) + c.logger.Infof("ThirdPartyResource %q is already registered", constants.TPRName) } else { - c.logger.Infof("ThirdPartyResource '%s' has been registered", TPRName) + c.logger.Infof("ThirdPartyResource %q' has been registered", constants.TPRName) } return k8sutil.WaitTPRReady(c.RestClient, c.opConfig.TPR.ReadyWaitInterval, c.opConfig.TPR.ReadyWaitTimeout, c.opConfig.Namespace) @@ -75,7 +74,7 @@ func (c *Controller) getInfrastructureRoles() (result map[string]spec.PgUser, er Secrets(c.opConfig.InfrastructureRolesSecretName.Namespace). Get(c.opConfig.InfrastructureRolesSecretName.Name, meta_v1.GetOptions{}) if err != nil { - c.logger.Debugf("Infrastructure roles secret name: %s", c.opConfig.InfrastructureRolesSecretName) + c.logger.Debugf("Infrastructure roles secret name: %q", c.opConfig.InfrastructureRolesSecretName) return nil, fmt.Errorf("could not get infrastructure roles secret: %v", err) } @@ -103,7 +102,7 @@ Users: case "inrole": t.MemberOf = append(t.MemberOf, s) default: - c.logger.Warnf("Unknown key %s", p) + c.logger.Warnf("Unknown key %q", p) } } } diff --git a/pkg/spec/postgresql_test.go b/pkg/spec/postgresql_test.go index b4ba10670..cbdc75e0a 100644 --- a/pkg/spec/postgresql_test.go +++ b/pkg/spec/postgresql_test.go @@ -362,7 +362,7 @@ func TestClusterName(t *testing.T) { continue } if name != tt.clusterName { - t.Errorf("Expected cluserName: %s, got: %s", tt.clusterName, name) + t.Errorf("Expected cluserName: %q, got: %q", tt.clusterName, name) } } } @@ -399,7 +399,7 @@ func TestMarshalMaintenanceWindow(t *testing.T) { } if !bytes.Equal(s, tt.in) { - t.Errorf("Expected Marshal: %s, got: %s", string(tt.in), string(s)) + t.Errorf("Expected Marshal: %q, got: %q", string(tt.in), string(s)) } } } @@ -434,7 +434,7 @@ func TestMarshal(t *testing.T) { continue } if !bytes.Equal(m, tt.marshal) { - t.Errorf("Marshal Postgresql expected: %s, got: %s", string(tt.marshal), string(m)) + t.Errorf("Marshal Postgresql expected: %q, got: %q", string(tt.marshal), string(m)) } } } diff --git a/pkg/spec/types_test.go b/pkg/spec/types_test.go index b690586e9..d0368268a 100644 --- a/pkg/spec/types_test.go +++ b/pkg/spec/types_test.go @@ -49,7 +49,7 @@ func TestNamespacedNameError(t *testing.T) { var actual NamespacedName err := actual.Decode(tt) if err == nil { - t.Errorf("Error expected for '%s', got: %#v", tt, actual) + t.Errorf("Error expected for %q, got: %#v", tt, actual) } } } diff --git a/pkg/util/constants/kubernetes.go b/pkg/util/constants/kubernetes.go index 3a56aa35a..3fa134349 100644 --- a/pkg/util/constants/kubernetes.go +++ b/pkg/util/constants/kubernetes.go @@ -4,8 +4,8 @@ import "time" // General kubernetes-related constants const ( - ListClustersURITemplate = "/apis/" + TPRVendor + "/" + TPRApiVersion + "/namespaces/%s/" + ResourceName // Namespace - WatchClustersURITemplate = "/apis/" + TPRVendor + "/" + TPRApiVersion + "/watch/namespaces/%s/" + ResourceName // Namespace + ListClustersURITemplate = "/apis/" + TPRGroup + "/" + TPRApiVersion + "/namespaces/%s/" + ResourceName // Namespace + WatchClustersURITemplate = "/apis/" + TPRGroup + "/" + TPRApiVersion + "/watch/namespaces/%s/" + ResourceName // Namespace K8sVersion = "v1" K8sAPIPath = "/api" StatefulsetDeletionInterval = 1 * time.Second diff --git a/pkg/util/constants/roles.go b/pkg/util/constants/roles.go index 85fb42b1b..9f584c370 100644 --- a/pkg/util/constants/roles.go +++ b/pkg/util/constants/roles.go @@ -2,7 +2,7 @@ package constants const ( PasswordLength = 64 - UserSecretTemplate = "%s.%s.credentials." + TPRName + "." + TPRVendor // Username, ClusterName + UserSecretTemplate = "%s.%s.credentials." + TPRKind + "." + TPRGroup // Username, ClusterName SuperuserKeyName = "superuser" ReplicationUserKeyName = "replication" RoleFlagSuperuser = "SUPERUSER" diff --git a/pkg/util/constants/thirdpartyresource.go b/pkg/util/constants/thirdpartyresource.go index 7207b4583..a815adc55 100644 --- a/pkg/util/constants/thirdpartyresource.go +++ b/pkg/util/constants/thirdpartyresource.go @@ -2,9 +2,10 @@ package constants // Different properties of the PostgreSQL Third Party Resources const ( - TPRName = "postgresql" - TPRVendor = "acid.zalan.do" + TPRKind = "postgresql" + TPRGroup = "acid.zalan.do" TPRDescription = "Managed PostgreSQL clusters" TPRApiVersion = "v1" - ResourceName = TPRName + "s" + TPRName = TPRKind + "." + TPRKind + ResourceName = TPRKind + "s" ) diff --git a/pkg/util/filesystems/ext234.go b/pkg/util/filesystems/ext234.go index ceea73984..fc2943d46 100644 --- a/pkg/util/filesystems/ext234.go +++ b/pkg/util/filesystems/ext234.go @@ -37,5 +37,5 @@ func (c *Ext234Resize) ResizeFilesystem(deviceName string, commandExecutor func( (strings.Contains(out, "on-line resizing required") && ext2fsSuccessRegexp.MatchString(out)) { return nil } - return fmt.Errorf("unrecognized output: %s, assuming error", out) + return fmt.Errorf("unrecognized output: %q, assuming error", out) } diff --git a/pkg/util/k8sutil/k8sutil.go b/pkg/util/k8sutil/k8sutil.go index 75181d24c..c3f665138 100644 --- a/pkg/util/k8sutil/k8sutil.go +++ b/pkg/util/k8sutil/k8sutil.go @@ -47,7 +47,7 @@ func KubernetesRestClient(c *rest.Config) (rest.Interface, error) { func(scheme *runtime.Scheme) error { scheme.AddKnownTypes( schema.GroupVersion{ - Group: constants.TPRVendor, + Group: constants.TPRGroup, Version: constants.TPRApiVersion, }, &spec.Postgresql{}, diff --git a/pkg/util/users/users.go b/pkg/util/users/users.go index 8ff32d305..1b5f5966f 100644 --- a/pkg/util/users/users.go +++ b/pkg/util/users/users.go @@ -66,11 +66,11 @@ func (s DefaultUserSyncStrategy) ExecuteSyncRequests(reqs []spec.PgSyncUserReque switch r.Kind { case spec.PGSyncUserAdd: if err := s.createPgUser(r.User, db); err != nil { - return fmt.Errorf("could not create user '%s': %v", r.User.Name, err) + return fmt.Errorf("could not create user %q: %v", r.User.Name, err) } case spec.PGsyncUserAlter: if err := s.alterPgUser(r.User, db); err != nil { - return fmt.Errorf("could not alter user '%s': %v", r.User.Name, err) + return fmt.Errorf("could not alter user %q: %v", r.User.Name, err) } default: return fmt.Errorf("unrecognized operation: %v", r.Kind) @@ -100,7 +100,7 @@ func (s DefaultUserSyncStrategy) createPgUser(user spec.PgUser, db *sql.DB) (err _, err = db.Query(query) // TODO: Try several times if err != nil { - err = fmt.Errorf("dB error: %s, query: %v", err, query) + err = fmt.Errorf("dB error: %v, query: %q", err, query) return } @@ -122,7 +122,7 @@ func (s DefaultUserSyncStrategy) alterPgUser(user spec.PgUser, db *sql.DB) (err _, err = db.Query(query) // TODO: Try several times if err != nil { - err = fmt.Errorf("dB error: %s query %v", err, query) + err = fmt.Errorf("dB error: %v query %q", err, query) return } diff --git a/pkg/util/util_test.go b/pkg/util/util_test.go index 946b573bd..53b5dfc46 100644 --- a/pkg/util/util_test.go +++ b/pkg/util/util_test.go @@ -73,7 +73,7 @@ func TestPGUserPassword(t *testing.T) { for _, tt := range pgUsers { pwd := PGUserPassword(tt.in) if pwd != tt.out { - t.Errorf("PgUserPassword expected: %s, got: %s", tt.out, pwd) + t.Errorf("PgUserPassword expected: %q, got: %q", tt.out, pwd) } } } @@ -81,7 +81,7 @@ func TestPGUserPassword(t *testing.T) { func TestPrettyDiff(t *testing.T) { for _, tt := range prettyDiffTest { if actual := PrettyDiff(tt.inA, tt.inB); actual != tt.out { - t.Errorf("PrettyDiff expected: %s, got: %s", tt.out, actual) + t.Errorf("PrettyDiff expected: %q, got: %q", tt.out, actual) } } } diff --git a/pkg/util/volumes/ebs.go b/pkg/util/volumes/ebs.go index 8d6ec12b7..c213a1126 100644 --- a/pkg/util/volumes/ebs.go +++ b/pkg/util/volumes/ebs.go @@ -42,11 +42,11 @@ func (c *EBSVolumeResizer) VolumeBelongsToProvider(pv *v1.PersistentVolume) bool func (c *EBSVolumeResizer) GetProviderVolumeID(pv *v1.PersistentVolume) (string, error) { volumeID := pv.Spec.AWSElasticBlockStore.VolumeID if volumeID == "" { - return "", fmt.Errorf("volume id is empty for volume %s", pv.Name) + return "", fmt.Errorf("volume id is empty for volume %q", pv.Name) } idx := strings.LastIndex(volumeID, constants.EBSVolumeIDStart) + 1 if idx == 0 { - return "", fmt.Errorf("malfored EBS volume id %s", volumeID) + return "", fmt.Errorf("malfored EBS volume id %q", volumeID) } return volumeID[idx:], nil } @@ -60,7 +60,7 @@ func (c *EBSVolumeResizer) ResizeVolume(volumeId string, newSize int64) error { } vol := volumeOutput.Volumes[0] if *vol.VolumeId != volumeId { - return fmt.Errorf("describe volume %s returned information about a non-matching volume %s", volumeId, *vol.VolumeId) + return fmt.Errorf("describe volume %q returned information about a non-matching volume %q", volumeId, *vol.VolumeId) } if *vol.Size == newSize { // nothing to do @@ -74,7 +74,7 @@ func (c *EBSVolumeResizer) ResizeVolume(volumeId string, newSize int64) error { state := *output.VolumeModification.ModificationState if state == constants.EBSVolumeStateFailed { - return fmt.Errorf("could not modify persistent volume %s: modification state failed", volumeId) + return fmt.Errorf("could not modify persistent volume %q: modification state failed", volumeId) } if state == "" { return fmt.Errorf("received empty modification status") @@ -91,10 +91,10 @@ func (c *EBSVolumeResizer) ResizeVolume(volumeId string, newSize int64) error { return false, fmt.Errorf("could not describe volume modification: %v", err) } if len(out.VolumesModifications) != 1 { - return false, fmt.Errorf("describe volume modification didn't return one record for volume \"%s\"", volumeId) + return false, fmt.Errorf("describe volume modification didn't return one record for volume %q", volumeId) } if *out.VolumesModifications[0].VolumeId != volumeId { - return false, fmt.Errorf("non-matching volume id when describing modifications: \"%s\" is different from \"%s\"", + return false, fmt.Errorf("non-matching volume id when describing modifications: %q is different from %q", *out.VolumesModifications[0].VolumeId, volumeId) } return *out.VolumesModifications[0].ModificationState != constants.EBSVolumeStateModifying, nil From 4455f1b639ecda689cf9777e8345a9db350a8376 Mon Sep 17 00:00:00 2001 From: Oleksii Kliukin Date: Mon, 24 Jul 2017 16:56:46 +0200 Subject: [PATCH 06/15] Feature/unit tests (#53) - Avoid relying on Clientset structure to call Kubernetes API functions. While Clientset is a convinient "catch-all" abstraction for calling REST API related to different Kubernetes objects, it's impossible to mock. Replacing it wih the kubernetes.Interface would be quite straightforward, but would require an exra level of mocked interfaces, because of the versioning. Instead, a new interface is defined, which contains only the objects we need of the pre-defined versions. - Move KubernetesClient to k8sutil package. - Add more tests. --- cmd/main.go | 6 +- pkg/cluster/cluster.go | 3 +- pkg/controller/controller.go | 9 +- pkg/controller/pod.go | 4 +- pkg/controller/util.go | 12 +-- pkg/controller/util_test.go | 154 +++++++++++++++++++++++++++++++++++ pkg/spec/types.go | 1 - pkg/util/k8sutil/k8sutil.go | 31 ++++++- 8 files changed, 201 insertions(+), 19 deletions(-) create mode 100644 pkg/controller/util_test.go diff --git a/cmd/main.go b/cmd/main.go index 2d53f4b82..c38eb2e91 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -48,7 +48,7 @@ func ControllerConfig() *controller.Config { log.Fatalf("Can't get REST config: %s", err) } - client, err := k8sutil.KubernetesClient(restConfig) + client, err := k8sutil.ClientSet(restConfig) if err != nil { log.Fatalf("Can't create client: %s", err) } @@ -60,7 +60,7 @@ func ControllerConfig() *controller.Config { return &controller.Config{ RestConfig: restConfig, - KubeClient: client, + KubeClient: k8sutil.NewFromKubernetesInterface(client), RestClient: restClient, } } @@ -101,7 +101,7 @@ func main() { log.Printf("Config: %s", cfg.MustMarshal()) - c := controller.New(controllerConfig, cfg) + c := controller.NewController(controllerConfig, cfg) c.Run(stop, wg) sig := <-sigs diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index e62f0b16e..8683803b9 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -11,7 +11,6 @@ import ( "sync" "github.com/Sirupsen/logrus" - "k8s.io/client-go/kubernetes" "k8s.io/client-go/pkg/api" "k8s.io/client-go/pkg/api/v1" "k8s.io/client-go/pkg/apis/apps/v1beta1" @@ -36,7 +35,7 @@ var ( // Config contains operator-wide clients and configuration used from a cluster. TODO: remove struct duplication. type Config struct { - KubeClient *kubernetes.Clientset //TODO: move clients to the better place? + KubeClient k8sutil.KubernetesClient RestClient *rest.RESTClient RestConfig *rest.Config TeamsAPIClient *teams.API diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index b0e0962bd..dc62504d3 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -5,7 +5,6 @@ import ( "sync" "github.com/Sirupsen/logrus" - "k8s.io/client-go/kubernetes" "k8s.io/client-go/pkg/api/v1" "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" @@ -14,12 +13,13 @@ import ( "github.com/zalando-incubator/postgres-operator/pkg/spec" "github.com/zalando-incubator/postgres-operator/pkg/util/config" "github.com/zalando-incubator/postgres-operator/pkg/util/constants" + "github.com/zalando-incubator/postgres-operator/pkg/util/k8sutil" "github.com/zalando-incubator/postgres-operator/pkg/util/teams" ) type Config struct { RestConfig *rest.Config - KubeClient *kubernetes.Clientset + KubeClient k8sutil.KubernetesClient RestClient *rest.RESTClient TeamsAPIClient *teams.API InfrastructureRoles map[string]spec.PgUser @@ -27,6 +27,7 @@ type Config struct { type Controller struct { Config + opConfig *config.Config logger *logrus.Entry @@ -43,7 +44,7 @@ type Controller struct { lastClusterSyncTime int64 } -func New(controllerConfig *Config, operatorConfig *config.Config) *Controller { +func NewController(controllerConfig *Config, operatorConfig *config.Config) *Controller { logger := logrus.New() if operatorConfig.DebugLogging { @@ -82,7 +83,7 @@ func (c *Controller) initController() { c.logger.Fatalf("could not register ThirdPartyResource: %v", err) } - if infraRoles, err := c.getInfrastructureRoles(); err != nil { + if infraRoles, err := c.getInfrastructureRoles(&c.opConfig.InfrastructureRolesSecretName); err != nil { c.logger.Warningf("could not get infrastructure roles: %v", err) } else { c.InfrastructureRoles = infraRoles diff --git a/pkg/controller/pod.go b/pkg/controller/pod.go index 3b31d439f..c3fb2a5e8 100644 --- a/pkg/controller/pod.go +++ b/pkg/controller/pod.go @@ -29,7 +29,7 @@ func (c *Controller) podListFunc(options api.ListOptions) (runtime.Object, error TimeoutSeconds: options.TimeoutSeconds, } - return c.KubeClient.CoreV1().Pods(c.opConfig.Namespace).List(opts) + return c.KubeClient.Pods(c.opConfig.Namespace).List(opts) } func (c *Controller) podWatchFunc(options api.ListOptions) (watch.Interface, error) { @@ -52,7 +52,7 @@ func (c *Controller) podWatchFunc(options api.ListOptions) (watch.Interface, err TimeoutSeconds: options.TimeoutSeconds, } - return c.KubeClient.CoreV1Client.Pods(c.opConfig.Namespace).Watch(opts) + return c.KubeClient.Pods(c.opConfig.Namespace).Watch(opts) } func (c *Controller) podAdd(obj interface{}) { diff --git a/pkg/controller/util.go b/pkg/controller/util.go index 76b33168c..d7ea55e3a 100644 --- a/pkg/controller/util.go +++ b/pkg/controller/util.go @@ -51,7 +51,7 @@ func (c *Controller) createTPR() error { TPRName := fmt.Sprintf("%s.%s", constants.TPRName, constants.TPRVendor) tpr := thirdPartyResource(TPRName) - _, err := c.KubeClient.ExtensionsV1beta1().ThirdPartyResources().Create(tpr) + _, err := c.KubeClient.ThirdPartyResources().Create(tpr) if err != nil { if !k8sutil.ResourceAlreadyExists(err) { return err @@ -64,17 +64,17 @@ func (c *Controller) createTPR() error { return k8sutil.WaitTPRReady(c.RestClient, c.opConfig.TPR.ReadyWaitInterval, c.opConfig.TPR.ReadyWaitTimeout, c.opConfig.Namespace) } -func (c *Controller) getInfrastructureRoles() (result map[string]spec.PgUser, err error) { - if c.opConfig.InfrastructureRolesSecretName == (spec.NamespacedName{}) { +func (c *Controller) getInfrastructureRoles(rolesSecret *spec.NamespacedName) (result map[string]spec.PgUser, err error) { + if *rolesSecret == (spec.NamespacedName{}) { // we don't have infrastructure roles defined, bail out return nil, nil } infraRolesSecret, err := c.KubeClient. - Secrets(c.opConfig.InfrastructureRolesSecretName.Namespace). - Get(c.opConfig.InfrastructureRolesSecretName.Name) + Secrets(rolesSecret.Namespace). + Get(rolesSecret.Name) if err != nil { - c.logger.Debugf("Infrastructure roles secret name: %s", c.opConfig.InfrastructureRolesSecretName) + c.logger.Debugf("Infrastructure roles secret name: %s", *rolesSecret) return nil, fmt.Errorf("could not get infrastructure roles secret: %v", err) } diff --git a/pkg/controller/util_test.go b/pkg/controller/util_test.go new file mode 100644 index 000000000..bd7d7d049 --- /dev/null +++ b/pkg/controller/util_test.go @@ -0,0 +1,154 @@ +package controller + +import ( + "fmt" + "reflect" + "testing" + + v1core "k8s.io/client-go/kubernetes/typed/core/v1" + "k8s.io/client-go/pkg/api/v1" + + "github.com/zalando-incubator/postgres-operator/pkg/spec" + "github.com/zalando-incubator/postgres-operator/pkg/util/config" + "github.com/zalando-incubator/postgres-operator/pkg/util/k8sutil" +) + +const ( + testInfrastructureRolesSecretName = "infrastructureroles-test" +) + +type mockSecret struct { + v1core.SecretInterface +} + +func (c *mockSecret) Get(name string) (*v1.Secret, error) { + if name != testInfrastructureRolesSecretName { + return nil, fmt.Errorf("NotFound") + } + secret := &v1.Secret{} + secret.Name = mockController.opConfig.ClusterNameLabel + secret.Data = map[string][]byte{ + "user1": []byte("testrole"), + "password1": []byte("testpassword"), + "inrole1": []byte("testinrole"), + } + return secret, nil + +} + +type MockSecretGetter struct { +} + +func (c *MockSecretGetter) Secrets(namespace string) v1core.SecretInterface { + return &mockSecret{} +} + +func newMockKubernetesClient() k8sutil.KubernetesClient { + return k8sutil.KubernetesClient{SecretsGetter: &MockSecretGetter{}} +} + +func newMockController() *Controller { + controller := NewController(&Config{}, &config.Config{}) + controller.opConfig.ClusterNameLabel = "cluster-name" + controller.opConfig.InfrastructureRolesSecretName = + spec.NamespacedName{v1.NamespaceDefault, testInfrastructureRolesSecretName} + controller.opConfig.Workers = 4 + controller.KubeClient = newMockKubernetesClient() + return controller +} + +var mockController = newMockController() + +func TestPodClusterName(t *testing.T) { + var testTable = []struct { + in *v1.Pod + expected spec.NamespacedName + }{ + { + &v1.Pod{}, + spec.NamespacedName{}, + }, + { + &v1.Pod{ + ObjectMeta: v1.ObjectMeta{ + Namespace: v1.NamespaceDefault, + Labels: map[string]string{ + mockController.opConfig.ClusterNameLabel: "testcluster", + }, + }, + }, + spec.NamespacedName{v1.NamespaceDefault, "testcluster"}, + }, + } + for _, test := range testTable { + resp := mockController.podClusterName(test.in) + if resp != test.expected { + t.Errorf("expected response %v does not match the actual %v", test.expected, resp) + } + } +} + +func TestClusterWorkerID(t *testing.T) { + var testTable = []struct { + in spec.NamespacedName + expected uint32 + }{ + { + in: spec.NamespacedName{"foo", "bar"}, + expected: 2, + }, + { + in: spec.NamespacedName{"default", "testcluster"}, + expected: 3, + }, + } + for _, test := range testTable { + resp := mockController.clusterWorkerID(test.in) + if resp != test.expected { + t.Errorf("expected response %v does not match the actual %v", test.expected, resp) + } + } +} + +func TestGetInfrastructureRoles(t *testing.T) { + var testTable = []struct { + secretName spec.NamespacedName + expectedRoles map[string]spec.PgUser + expectedError error + }{ + { + spec.NamespacedName{}, + nil, + nil, + }, + { + spec.NamespacedName{v1.NamespaceDefault, "null"}, + nil, + fmt.Errorf(`could not get infrastructure roles secret: NotFound`), + }, + { + spec.NamespacedName{v1.NamespaceDefault, testInfrastructureRolesSecretName}, + map[string]spec.PgUser{ + "testrole": { + "testrole", + "testpassword", + nil, + []string{"testinrole"}, + }, + }, + nil, + }, + } + for _, test := range testTable { + roles, err := mockController.getInfrastructureRoles(&test.secretName) + if err != test.expectedError { + if err != nil && test.expectedError != nil && err.Error() == test.expectedError.Error() { + continue + } + t.Errorf("expected error '%v' does not match the actual error '%v'", test.expectedError, err) + } + if !reflect.DeepEqual(roles, test.expectedRoles) { + t.Errorf("expected roles output %v does not match the actual %v", test.expectedRoles, roles) + } + } +} diff --git a/pkg/spec/types.go b/pkg/spec/types.go index 398003841..822395ce9 100644 --- a/pkg/spec/types.go +++ b/pkg/spec/types.go @@ -3,7 +3,6 @@ package spec import ( "fmt" "strings" - "database/sql" "k8s.io/client-go/pkg/api/v1" diff --git a/pkg/util/k8sutil/k8sutil.go b/pkg/util/k8sutil/k8sutil.go index f0ea50dd4..981083cb9 100644 --- a/pkg/util/k8sutil/k8sutil.go +++ b/pkg/util/k8sutil/k8sutil.go @@ -5,6 +5,9 @@ import ( "time" "k8s.io/client-go/kubernetes" + v1beta1 "k8s.io/client-go/kubernetes/typed/apps/v1beta1" + v1core "k8s.io/client-go/kubernetes/typed/core/v1" + extensions "k8s.io/client-go/kubernetes/typed/extensions/v1beta1" "k8s.io/client-go/pkg/api" apierrors "k8s.io/client-go/pkg/api/errors" "k8s.io/client-go/pkg/api/unversioned" @@ -18,6 +21,32 @@ import ( "github.com/zalando-incubator/postgres-operator/pkg/util/retryutil" ) +type KubernetesClient struct { + v1core.SecretsGetter + v1core.ServicesGetter + v1core.EndpointsGetter + v1core.PodsGetter + v1core.PersistentVolumesGetter + v1core.PersistentVolumeClaimsGetter + v1core.ConfigMapsGetter + v1beta1.StatefulSetsGetter + extensions.ThirdPartyResourcesGetter +} + +func NewFromKubernetesInterface(src kubernetes.Interface) (c KubernetesClient) { + c = KubernetesClient{} + c.PodsGetter = src.CoreV1() + c.ServicesGetter = src.CoreV1() + c.EndpointsGetter = src.CoreV1() + c.SecretsGetter = src.CoreV1() + c.ConfigMapsGetter = src.CoreV1() + c.PersistentVolumeClaimsGetter = src.CoreV1() + c.PersistentVolumesGetter = src.CoreV1() + c.StatefulSetsGetter = src.AppsV1beta1() + c.ThirdPartyResourcesGetter = src.ExtensionsV1beta1() + return +} + func RestConfig(kubeConfig string, outOfCluster bool) (*rest.Config, error) { if outOfCluster { return clientcmd.BuildConfigFromFlags("", kubeConfig) @@ -25,7 +54,7 @@ func RestConfig(kubeConfig string, outOfCluster bool) (*rest.Config, error) { return rest.InClusterConfig() } -func KubernetesClient(config *rest.Config) (client *kubernetes.Clientset, err error) { +func ClientSet(config *rest.Config) (client *kubernetes.Clientset, err error) { return kubernetes.NewForConfig(config) } From 143eb968ea89713ce7e1ddabc5d9e8a99036b754 Mon Sep 17 00:00:00 2001 From: Murat Kabilov Date: Mon, 24 Jul 2017 19:38:08 +0200 Subject: [PATCH 07/15] refactor --- cmd/main.go | 52 ++--------- glide.lock | 13 +-- pkg/cluster/cluster.go | 13 +-- pkg/cluster/util.go | 2 +- pkg/controller/controller.go | 79 +++++++++++++---- pkg/controller/postgresql.go | 108 ++++++++++------------- pkg/controller/util.go | 3 - pkg/util/constants/kubernetes.go | 5 +- pkg/util/constants/thirdpartyresource.go | 2 +- pkg/util/k8sutil/k8sutil.go | 42 +++------ 10 files changed, 149 insertions(+), 170 deletions(-) diff --git a/cmd/main.go b/cmd/main.go index d8e189b23..9785c051f 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -8,11 +8,8 @@ import ( "sync" "syscall" - meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "github.com/zalando-incubator/postgres-operator/pkg/controller" "github.com/zalando-incubator/postgres-operator/pkg/spec" - "github.com/zalando-incubator/postgres-operator/pkg/util/config" "github.com/zalando-incubator/postgres-operator/pkg/util/k8sutil" ) @@ -47,28 +44,19 @@ func init() { func ControllerConfig() *controller.Config { restConfig, err := k8sutil.RestConfig(KubeConfigFile, OutOfCluster) if err != nil { - log.Fatalf("Can't get REST config: %v", err) - } - - client, err := k8sutil.KubernetesClient(restConfig) - if err != nil { - log.Fatalf("Can't create client: %v", err) - } - - restClient, err := k8sutil.KubernetesRestClient(restConfig) - if err != nil { - log.Fatalf("Can't create rest client: %v", err) + log.Fatalf("couldn't get REST config: %v", err) } return &controller.Config{ - RestConfig: restConfig, - KubeClient: client, - RestClient: restClient, + RestConfig: restConfig, + NoDatabaseAccess: noDatabaseAccess, + NoTeamsAPI: noTeamsAPI, + ConfigMapName: configMapName, + Namespace: podNamespace, } } func main() { - configMapData := make(map[string]string) log.SetOutput(os.Stdout) log.Printf("Spilo operator %s\n", version) @@ -78,33 +66,7 @@ func main() { wg := &sync.WaitGroup{} // Goroutines can add themselves to this to be waited on - controllerConfig := ControllerConfig() - - if configMapName != (spec.NamespacedName{}) { - configMap, err := controllerConfig.KubeClient.ConfigMaps(configMapName.Namespace).Get(configMapName.Name, meta_v1.GetOptions{}) - if err != nil { - panic(err) - } - - configMapData = configMap.Data - } else { - log.Printf("No ConfigMap specified. Loading default values") - } - - if configMapData["namespace"] == "" { // Namespace in ConfigMap has priority over env var - configMapData["namespace"] = podNamespace - } - if noDatabaseAccess { - configMapData["enable_database_access"] = "false" - } - if noTeamsAPI { - configMapData["enable_teams_api"] = "false" - } - cfg := config.NewFromMap(configMapData) - - log.Printf("Config: %s", cfg.MustMarshal()) - - c := controller.New(controllerConfig, cfg) + c := controller.New(ControllerConfig()) c.Run(stop, wg) sig := <-sigs diff --git a/glide.lock b/glide.lock index eda1edf36..f9a3aa4b1 100644 --- a/glide.lock +++ b/glide.lock @@ -1,8 +1,8 @@ hash: 140e0c8a606d18ca405e9c50359cc673e4aa0cc88bcae5d7f83791e7002bd6a1 -updated: 2017-07-12T12:52:55.896264+02:00 +updated: 2017-07-24T19:24:17.604824235+02:00 imports: - name: github.com/aws/aws-sdk-go - version: b1a7b51924b90a6ecdbaeb17e96418740ff07a1e + version: afd601335e2a72d43caa3af6bd2abe512fcc3bfd subpackages: - aws - aws/awserr @@ -30,7 +30,7 @@ imports: - service/ec2 - service/sts - name: github.com/davecgh/go-spew - version: 5215b55f46b2b919f50a1df0eaa5886afe4e3b3d + version: 782f4967f2dc4564575ca782fe2d04090b5faca8 subpackages: - spew - name: github.com/docker/distribution @@ -104,7 +104,7 @@ imports: - name: github.com/PuerkitoBio/urlesc version: 5bd2802263f21d8788851d5305584c82a5c75d7e - name: github.com/Sirupsen/logrus - version: 7f976d3a76720c4c27af2ba716b85d2e0a7e38b1 + version: a3f95b5c423586578a4e099b11a46c2479628cac - name: github.com/spf13/pflag version: 9ff6c6923cfffbcd502984b8e0c80539a94968b7 - name: github.com/ugorji/go @@ -118,12 +118,15 @@ imports: - name: golang.org/x/net version: f2499483f923065a842d38eb4c7f1927e6fc6e6d subpackages: + - html + - html/atom - http2 - http2/hpack - idna - lex/httplex + - websocket - name: golang.org/x/sys - version: 8f0908ab3b2457e2e15403d3697c9ef5cb4b57a9 + version: c4489faa6e5ab84c0ef40d6ee878f7a030281f0f subpackages: - unix - name: golang.org/x/text diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index bff10235f..068e0b64e 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -36,11 +36,8 @@ var ( // Config contains operator-wide clients and configuration used from a cluster. TODO: remove struct duplication. type Config struct { - KubeClient *kubernetes.Clientset //TODO: move clients to the better place? - RestClient rest.Interface - RestConfig *rest.Config - TeamsAPIClient *teams.API OpConfig config.Config + RestConfig *rest.Config InfrastructureRoles map[string]spec.PgUser // inherited from the controller } @@ -68,6 +65,9 @@ type Cluster struct { userSyncStrategy spec.UserSyncer deleteOptions *meta_v1.DeleteOptions podEventsQueue *cache.FIFO + + teamsAPIClient *teams.API + KubeClient *kubernetes.Clientset //TODO: move clients to the better place? } type compareStatefulsetResult struct { @@ -78,7 +78,7 @@ type compareStatefulsetResult struct { } // New creates a new cluster. This function should be called from a controller. -func New(cfg Config, pgSpec spec.Postgresql, logger *logrus.Entry) *Cluster { +func New(cfg Config, kubeClient *kubernetes.Clientset, pgSpec spec.Postgresql, logger *logrus.Entry) *Cluster { lg := logger.WithField("pkg", "cluster").WithField("cluster-name", pgSpec.Metadata.Name) kubeResources := kubeResources{Secrets: make(map[types.UID]*v1.Secret), Service: make(map[PostgresRole]*v1.Service)} orphanDependents := true @@ -104,6 +104,7 @@ func New(cfg Config, pgSpec spec.Postgresql, logger *logrus.Entry) *Cluster { userSyncStrategy: users.DefaultUserSyncStrategy{}, deleteOptions: &meta_v1.DeleteOptions{OrphanDependents: &orphanDependents}, podEventsQueue: podEventsQueue, + KubeClient: kubeClient, } return cluster @@ -126,7 +127,7 @@ func (c *Cluster) setStatus(status spec.PostgresStatus) { } request := []byte(fmt.Sprintf(`{"status": %s}`, string(b))) //TODO: Look into/wait for k8s go client methods - _, err = c.RestClient.Patch(types.MergePatchType). + _, err = c.KubeClient.CoreV1().RESTClient().Patch(types.MergePatchType). RequestURI(c.Metadata.GetSelfLink()). Body(request). DoRaw() diff --git a/pkg/cluster/util.go b/pkg/cluster/util.go index 1f34fbfc2..f7a7a16b1 100644 --- a/pkg/cluster/util.go +++ b/pkg/cluster/util.go @@ -153,7 +153,7 @@ func (c *Cluster) getTeamMembers() ([]string, error) { return []string{}, fmt.Errorf("could not get oauth token: %v", err) } - teamInfo, err := c.TeamsAPIClient.TeamInfo(c.Spec.TeamID, token) + teamInfo, err := c.teamsAPIClient.TeamInfo(c.Spec.TeamID, token) if err != nil { return nil, fmt.Errorf("could not get team info: %v", err) } diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 6112ec6b9..a4199d771 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -5,6 +5,7 @@ import ( "sync" "github.com/Sirupsen/logrus" + meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" "k8s.io/client-go/pkg/api/v1" "k8s.io/client-go/rest" @@ -14,15 +15,18 @@ import ( "github.com/zalando-incubator/postgres-operator/pkg/spec" "github.com/zalando-incubator/postgres-operator/pkg/util/config" "github.com/zalando-incubator/postgres-operator/pkg/util/constants" + "github.com/zalando-incubator/postgres-operator/pkg/util/k8sutil" "github.com/zalando-incubator/postgres-operator/pkg/util/teams" ) type Config struct { RestConfig *rest.Config - KubeClient *kubernetes.Clientset - RestClient rest.Interface - TeamsAPIClient *teams.API InfrastructureRoles map[string]spec.PgUser + + NoDatabaseAccess bool + NoTeamsAPI bool + ConfigMapName spec.NamespacedName + Namespace string } type Controller struct { @@ -30,6 +34,10 @@ type Controller struct { opConfig *config.Config logger *logrus.Entry + KubeClient *kubernetes.Clientset + RestClient rest.Interface + TeamsAPIClient *teams.API + clustersMu sync.RWMutex clusters map[spec.NamespacedName]*cluster.Cluster stopChs map[spec.NamespacedName]chan struct{} @@ -43,22 +51,58 @@ type Controller struct { lastClusterSyncTime int64 } -func New(controllerConfig *Config, operatorConfig *config.Config) *Controller { +func New(controllerConfig *Config) *Controller { + configMapData := make(map[string]string) logger := logrus.New() + client, err := k8sutil.KubernetesClient(controllerConfig.RestConfig) + if err != nil { + logger.Fatalf("couldn't create client: %v", err) + } + + restClient, err := k8sutil.KubernetesRestClient(*controllerConfig.RestConfig) + if err != nil { + logger.Fatalf("couldn't create rest client: %v", err) + } + + if controllerConfig.ConfigMapName != (spec.NamespacedName{}) { + configMap, err := client.ConfigMaps(controllerConfig.ConfigMapName.Namespace).Get(controllerConfig.ConfigMapName.Name, meta_v1.GetOptions{}) + if err != nil { + panic(err) + } + + configMapData = configMap.Data + } else { + logger.Infoln("No ConfigMap specified. Loading default values") + } + + if configMapData["namespace"] == "" { // Namespace in ConfigMap has priority over env var + configMapData["namespace"] = controllerConfig.Namespace + } + if controllerConfig.NoDatabaseAccess { + configMapData["enable_database_access"] = "false" + } + if controllerConfig.NoTeamsAPI { + configMapData["enable_teams_api"] = "false" + } + operatorConfig := config.NewFromMap(configMapData) + + logger.Infof("Config: %s", operatorConfig.MustMarshal()) + if operatorConfig.DebugLogging { logger.Level = logrus.DebugLevel } - controllerConfig.TeamsAPIClient = teams.NewTeamsAPI(operatorConfig.TeamsAPIUrl, logger) - return &Controller{ - Config: *controllerConfig, - opConfig: operatorConfig, - logger: logger.WithField("pkg", "controller"), - clusters: make(map[spec.NamespacedName]*cluster.Cluster), - stopChs: make(map[spec.NamespacedName]chan struct{}), - podCh: make(chan spec.PodEvent), + Config: *controllerConfig, + opConfig: operatorConfig, + logger: logger.WithField("pkg", "controller"), + clusters: make(map[spec.NamespacedName]*cluster.Cluster), + stopChs: make(map[spec.NamespacedName]chan struct{}), + podCh: make(chan spec.PodEvent), + TeamsAPIClient: teams.NewTeamsAPI(operatorConfig.TeamsAPIUrl, logger), + KubeClient: client, + RestClient: restClient, } } @@ -89,15 +133,14 @@ func (c *Controller) initController() { } // Postgresqls - clusterLw := &cache.ListWatch{ - ListFunc: c.clusterListFunc, - WatchFunc: c.clusterWatchFunc, - } c.postgresqlInformer = cache.NewSharedIndexInformer( - clusterLw, + &cache.ListWatch{ + ListFunc: c.clusterListFunc, + WatchFunc: c.clusterWatchFunc, + }, &spec.Postgresql{}, constants.QueueResyncPeriodTPR, - cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) + cache.Indexers{}) c.postgresqlInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: c.postgresqlAdd, diff --git a/pkg/controller/postgresql.go b/pkg/controller/postgresql.go index c458ba78e..d9fc8ff1c 100644 --- a/pkg/controller/postgresql.go +++ b/pkg/controller/postgresql.go @@ -3,18 +3,15 @@ package controller import ( "fmt" "reflect" - "sync/atomic" "time" - "k8s.io/apimachinery/pkg/api/meta" meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/watch" - "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/tools/cache" + "encoding/json" "github.com/zalando-incubator/postgres-operator/pkg/cluster" "github.com/zalando-incubator/postgres-operator/pkg/spec" "github.com/zalando-incubator/postgres-operator/pkg/util" @@ -35,67 +32,60 @@ func (c *Controller) clusterResync(stopCh <-chan struct{}) { } func (c *Controller) clusterListFunc(options meta_v1.ListOptions) (runtime.Object, error) { - c.logger.Info("Getting list of currently running clusters") - - req := c.RestClient.Get(). - RequestURI(fmt.Sprintf(constants.ListClustersURITemplate, c.opConfig.Namespace)). - VersionedParams(&options, scheme.ParameterCodec). - FieldsSelectorParam(fields.Everything()) - - object, err := req.Do().Get() + req := c.RestClient. + Get(). + Namespace(c.opConfig.Namespace). + Resource(constants.ResourceName). + VersionedParams(&options, meta_v1.ParameterCodec) + b, err := req.DoRaw() if err != nil { - return nil, fmt.Errorf("could not get list of postgresql objects: %v", err) + return nil, err + } + var list spec.PostgresqlList + + return &list, json.Unmarshal(b, &list) +} + +type tprDecoder struct { + dec *json.Decoder + close func() error +} + +func (d *tprDecoder) Close() { + d.close() +} + +func (d *tprDecoder) Decode() (action watch.EventType, object runtime.Object, err error) { + var e struct { + Type watch.EventType + Object spec.Postgresql + } + if err := d.dec.Decode(&e); err != nil { + return watch.Error, nil, err } - objList, err := meta.ExtractList(object) - if err != nil { - return nil, fmt.Errorf("could not extract list of postgresql objects: %v", err) - } - - if time.Now().Unix()-atomic.LoadInt64(&c.lastClusterSyncTime) <= int64(c.opConfig.ResyncPeriod.Seconds()) { - c.logger.Debugln("skipping resync of clusters") - return object, err - } - - var activeClustersCnt, failedClustersCnt int - for _, obj := range objList { - pg, ok := obj.(*spec.Postgresql) - if !ok { - return nil, fmt.Errorf("could not cast object to postgresql") - } - - if pg.Error != nil { - failedClustersCnt++ - continue - } - c.queueClusterEvent(nil, pg, spec.EventSync) - activeClustersCnt++ - } - if len(objList) > 0 { - if failedClustersCnt > 0 && activeClustersCnt == 0 { - c.logger.Infof("There are no clusters running. %d are in the failed state", failedClustersCnt) - } else if failedClustersCnt == 0 && activeClustersCnt > 0 { - c.logger.Infof("There are %d clusters running", activeClustersCnt) - } else { - c.logger.Infof("There are %d clusters running and %d are in the failed state", activeClustersCnt, failedClustersCnt) - } - } else { - c.logger.Infof("No clusters running") - } - - atomic.StoreInt64(&c.lastClusterSyncTime, time.Now().Unix()) - - return object, err + return e.Type, &e.Object, nil } func (c *Controller) clusterWatchFunc(options meta_v1.ListOptions) (watch.Interface, error) { options.Watch = true - req := c.RestClient.Get(). - RequestURI(fmt.Sprintf(constants.ListClustersURITemplate, c.opConfig.Namespace)). - VersionedParams(&options, scheme.ParameterCodec). - FieldsSelectorParam(fields.Everything()) - return req.Watch() + r, err := c.RestClient. + Get(). + Namespace(c.opConfig.Namespace). + Resource(constants.ResourceName). + VersionedParams(&options, meta_v1.ParameterCodec). + FieldsSelectorParam(nil). + Stream() + + if err != nil { + return nil, err + } + + return watch.NewStreamWatcher(&tprDecoder{ + dec: json.NewDecoder(r), + close: r.Close, + }), nil } func (c *Controller) processEvent(obj interface{}) error { @@ -127,7 +117,7 @@ func (c *Controller) processEvent(obj interface{}) error { logger.Infof("Creation of the %q cluster started", clusterName) stopCh := make(chan struct{}) - cl = cluster.New(c.makeClusterConfig(), *event.NewSpec, logger) + cl = cluster.New(c.makeClusterConfig(), c.KubeClient, *event.NewSpec, logger) cl.Run(stopCh) c.clustersMu.Lock() @@ -183,7 +173,7 @@ func (c *Controller) processEvent(obj interface{}) error { // no race condition because a cluster is always processed by single worker if !clusterFound { stopCh := make(chan struct{}) - cl = cluster.New(c.makeClusterConfig(), *event.NewSpec, logger) + cl = cluster.New(c.makeClusterConfig(), c.KubeClient, *event.NewSpec, logger) cl.Run(stopCh) c.clustersMu.Lock() diff --git a/pkg/controller/util.go b/pkg/controller/util.go index 1ad23f7c6..b3f10c4bd 100644 --- a/pkg/controller/util.go +++ b/pkg/controller/util.go @@ -22,10 +22,7 @@ func (c *Controller) makeClusterConfig() cluster.Config { } return cluster.Config{ - KubeClient: c.KubeClient, - RestClient: c.RestClient, RestConfig: c.RestConfig, - TeamsAPIClient: c.TeamsAPIClient, OpConfig: config.Copy(c.opConfig), InfrastructureRoles: infrastructureRoles, } diff --git a/pkg/util/constants/kubernetes.go b/pkg/util/constants/kubernetes.go index 3fa134349..79c60cad2 100644 --- a/pkg/util/constants/kubernetes.go +++ b/pkg/util/constants/kubernetes.go @@ -4,10 +4,7 @@ import "time" // General kubernetes-related constants const ( - ListClustersURITemplate = "/apis/" + TPRGroup + "/" + TPRApiVersion + "/namespaces/%s/" + ResourceName // Namespace - WatchClustersURITemplate = "/apis/" + TPRGroup + "/" + TPRApiVersion + "/watch/namespaces/%s/" + ResourceName // Namespace - K8sVersion = "v1" - K8sAPIPath = "/api" + K8sAPIPath = "/apis" StatefulsetDeletionInterval = 1 * time.Second StatefulsetDeletionTimeout = 30 * time.Second diff --git a/pkg/util/constants/thirdpartyresource.go b/pkg/util/constants/thirdpartyresource.go index a815adc55..a0a00d259 100644 --- a/pkg/util/constants/thirdpartyresource.go +++ b/pkg/util/constants/thirdpartyresource.go @@ -6,6 +6,6 @@ const ( TPRGroup = "acid.zalan.do" TPRDescription = "Managed PostgreSQL clusters" TPRApiVersion = "v1" - TPRName = TPRKind + "." + TPRKind + TPRName = TPRKind + "." + TPRGroup ResourceName = TPRKind + "s" ) diff --git a/pkg/util/k8sutil/k8sutil.go b/pkg/util/k8sutil/k8sutil.go index c3f665138..c5780743b 100644 --- a/pkg/util/k8sutil/k8sutil.go +++ b/pkg/util/k8sutil/k8sutil.go @@ -1,20 +1,16 @@ package k8sutil import ( - "fmt" "time" apierrors "k8s.io/apimachinery/pkg/api/errors" - meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/serializer" "k8s.io/client-go/kubernetes" - "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/pkg/api" "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" - "github.com/zalando-incubator/postgres-operator/pkg/spec" "github.com/zalando-incubator/postgres-operator/pkg/util/constants" "github.com/zalando-incubator/postgres-operator/pkg/util/retryutil" ) @@ -23,6 +19,7 @@ func RestConfig(kubeConfig string, outOfCluster bool) (*rest.Config, error) { if outOfCluster { return clientcmd.BuildConfigFromFlags("", kubeConfig) } + return rest.InClusterConfig() } @@ -38,35 +35,24 @@ func ResourceNotFound(err error) bool { return apierrors.IsNotFound(err) } -func KubernetesRestClient(c *rest.Config) (rest.Interface, error) { - c.GroupVersion = &schema.GroupVersion{Version: constants.K8sVersion} - c.APIPath = constants.K8sAPIPath - c.NegotiatedSerializer = serializer.DirectCodecFactory{CodecFactory: scheme.Codecs} - - schemeBuilder := runtime.NewSchemeBuilder( - func(scheme *runtime.Scheme) error { - scheme.AddKnownTypes( - schema.GroupVersion{ - Group: constants.TPRGroup, - Version: constants.TPRApiVersion, - }, - &spec.Postgresql{}, - &spec.PostgresqlList{}, - &meta_v1.ListOptions{}, - &meta_v1.DeleteOptions{}, - ) - return nil - }) - if err := schemeBuilder.AddToScheme(scheme.Scheme); err != nil { - return nil, fmt.Errorf("could not apply functions to register PostgreSQL TPR type: %v", err) +func KubernetesRestClient(cfg rest.Config) (rest.Interface, error) { + cfg.GroupVersion = &schema.GroupVersion{ + Group: constants.TPRGroup, + Version: constants.TPRApiVersion, } + cfg.APIPath = constants.K8sAPIPath + cfg.NegotiatedSerializer = serializer.DirectCodecFactory{CodecFactory: api.Codecs} - return rest.RESTClientFor(c) + return rest.RESTClientFor(&cfg) } func WaitTPRReady(restclient rest.Interface, interval, timeout time.Duration, ns string) error { return retryutil.Retry(interval, timeout, func() (bool, error) { - _, err := restclient.Get().RequestURI(fmt.Sprintf(constants.ListClustersURITemplate, ns)).DoRaw() + _, err := restclient. + Get(). + Namespace(ns). + Resource(constants.ResourceName). + DoRaw() if err != nil { if ResourceNotFound(err) { // not set up yet. wait more. return false, nil From fc8f0916a582aa39a274950eb105ca4d8e889aa5 Mon Sep 17 00:00:00 2001 From: Murat Kabilov Date: Mon, 24 Jul 2017 19:45:17 +0200 Subject: [PATCH 08/15] fix exec --- pkg/cluster/exec.go | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/pkg/cluster/exec.go b/pkg/cluster/exec.go index c4874d5ee..50eba794f 100644 --- a/pkg/cluster/exec.go +++ b/pkg/cluster/exec.go @@ -6,7 +6,8 @@ import ( meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1" remotecommandconsts "k8s.io/apimachinery/pkg/util/remotecommand" - "k8s.io/client-go/pkg/api" + "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/pkg/api/v1" "k8s.io/client-go/tools/remotecommand" "github.com/zalando-incubator/postgres-operator/pkg/spec" @@ -27,17 +28,17 @@ func (c *Cluster) ExecCommand(podName *spec.NamespacedName, command ...string) ( return "", fmt.Errorf("could not determine which container to use") } - req := c.RestClient.Post(). + req := c.KubeClient.CoreV1().RESTClient().Post(). Resource("pods"). Name(podName.Name). Namespace(podName.Namespace). SubResource("exec") - req.VersionedParams(&api.PodExecOptions{ + req.VersionedParams(&v1.PodExecOptions{ Container: pod.Spec.Containers[0].Name, Command: command, Stdout: true, Stderr: true, - }, api.ParameterCodec) + }, scheme.ParameterCodec) exec, err := remotecommand.NewExecutor(c.RestConfig, "POST", req.URL()) if err != nil { From 72a8fcc9aa0d5b8aa6af136c440b2762095388ef Mon Sep 17 00:00:00 2001 From: Murat Kabilov Date: Mon, 24 Jul 2017 21:13:13 +0200 Subject: [PATCH 09/15] remove unnecessary vars and funcs --- cmd/main.go | 53 +++++++++++++++++++++++------------------------------ 1 file changed, 23 insertions(+), 30 deletions(-) diff --git a/cmd/main.go b/cmd/main.go index 9785c051f..c39628355 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -9,54 +9,41 @@ import ( "syscall" "github.com/zalando-incubator/postgres-operator/pkg/controller" - "github.com/zalando-incubator/postgres-operator/pkg/spec" "github.com/zalando-incubator/postgres-operator/pkg/util/k8sutil" ) var ( - KubeConfigFile string - podNamespace string - configMapName spec.NamespacedName - OutOfCluster bool - noTeamsAPI bool - noDatabaseAccess bool - version string + KubeConfigFile string + OutOfCluster bool + version string + + config controller.Config ) func init() { flag.StringVar(&KubeConfigFile, "kubeconfig", "", "Path to kubeconfig file with authorization and master location information.") flag.BoolVar(&OutOfCluster, "outofcluster", false, "Whether the operator runs in- our outside of the Kubernetes cluster.") - flag.BoolVar(&noDatabaseAccess, "nodatabaseaccess", false, "Disable all access to the database from the operator side.") - flag.BoolVar(&noTeamsAPI, "noteamsapi", false, "Disable all access to the teams API") + flag.BoolVar(&config.NoDatabaseAccess, "nodatabaseaccess", false, "Disable all access to the database from the operator side.") + flag.BoolVar(&config.NoTeamsAPI, "noteamsapi", false, "Disable all access to the teams API") flag.Parse() - podNamespace = os.Getenv("MY_POD_NAMESPACE") - if podNamespace == "" { - podNamespace = "default" + config.Namespace = os.Getenv("MY_POD_NAMESPACE") + if config.Namespace == "" { + config.Namespace = "default" } configMap := os.Getenv("CONFIG_MAP_NAME") if configMap != "" { - configMapName.Decode(configMap) - } -} - -func ControllerConfig() *controller.Config { - restConfig, err := k8sutil.RestConfig(KubeConfigFile, OutOfCluster) - if err != nil { - log.Fatalf("couldn't get REST config: %v", err) - } - - return &controller.Config{ - RestConfig: restConfig, - NoDatabaseAccess: noDatabaseAccess, - NoTeamsAPI: noTeamsAPI, - ConfigMapName: configMapName, - Namespace: podNamespace, + err := config.ConfigMapName.Decode(configMap) + if err != nil { + log.Fatalf("incorrect config map name") + } } } func main() { + var err error + log.SetOutput(os.Stdout) log.Printf("Spilo operator %s\n", version) @@ -66,7 +53,13 @@ func main() { wg := &sync.WaitGroup{} // Goroutines can add themselves to this to be waited on - c := controller.New(ControllerConfig()) + config.RestConfig, err = k8sutil.RestConfig(KubeConfigFile, OutOfCluster) + if err != nil { + log.Fatalf("couldn't get REST config: %v", err) + } + + c := controller.New(&config) + c.Run(stop, wg) sig := <-sigs From b2a882f9fd353c5c63401c154e5d59dee6734d14 Mon Sep 17 00:00:00 2001 From: Murat Kabilov Date: Mon, 24 Jul 2017 21:42:57 +0200 Subject: [PATCH 10/15] merge with master --- cmd/main.go | 2 +- pkg/cluster/cluster.go | 3 +-- pkg/cluster/exec.go | 2 +- pkg/controller/controller.go | 7 +++---- pkg/controller/util.go | 4 ++-- pkg/spec/types.go | 2 +- pkg/util/k8sutil/k8sutil.go | 2 ++ 7 files changed, 11 insertions(+), 11 deletions(-) diff --git a/cmd/main.go b/cmd/main.go index c39628355..3093cedb8 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -58,7 +58,7 @@ func main() { log.Fatalf("couldn't get REST config: %v", err) } - c := controller.New(&config) + c := controller.NewController(&config) c.Run(stop, wg) diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index 6ea208c5f..c52de6980 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -13,7 +13,6 @@ import ( "github.com/Sirupsen/logrus" meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" - "k8s.io/client-go/pkg/api" "k8s.io/client-go/pkg/api/v1" "k8s.io/client-go/pkg/apis/apps/v1beta1" "k8s.io/client-go/rest" @@ -127,7 +126,7 @@ func (c *Cluster) setStatus(status spec.PostgresStatus) { } request := []byte(fmt.Sprintf(`{"status": %s}`, string(b))) //TODO: Look into/wait for k8s go client methods - _, err = c.KubeClient.CoreV1().RESTClient().Patch(types.MergePatchType). + _, err = c.KubeClient.RESTClient.Patch(types.MergePatchType). RequestURI(c.Metadata.GetSelfLink()). Body(request). DoRaw() diff --git a/pkg/cluster/exec.go b/pkg/cluster/exec.go index 50eba794f..1f6cb76e2 100644 --- a/pkg/cluster/exec.go +++ b/pkg/cluster/exec.go @@ -28,7 +28,7 @@ func (c *Cluster) ExecCommand(podName *spec.NamespacedName, command ...string) ( return "", fmt.Errorf("could not determine which container to use") } - req := c.KubeClient.CoreV1().RESTClient().Post(). + req := c.KubeClient.RESTClient.Post(). Resource("pods"). Name(podName.Name). Namespace(podName.Namespace). diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index ba8a98d16..dc3504d86 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -6,7 +6,6 @@ import ( "github.com/Sirupsen/logrus" meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/client-go/kubernetes" "k8s.io/client-go/pkg/api/v1" "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" @@ -35,7 +34,7 @@ type Controller struct { opConfig *config.Config logger *logrus.Entry - KubeClient *kubernetes.Clientset + KubeClient k8sutil.KubernetesClient RestClient rest.Interface TeamsAPIClient *teams.API @@ -56,7 +55,7 @@ func NewController(controllerConfig *Config) *Controller { configMapData := make(map[string]string) logger := logrus.New() - client, err := k8sutil.KubernetesClient(controllerConfig.RestConfig) + client, err := k8sutil.ClientSet(controllerConfig.RestConfig) if err != nil { logger.Fatalf("couldn't create client: %v", err) } @@ -102,7 +101,7 @@ func NewController(controllerConfig *Config) *Controller { stopChs: make(map[spec.NamespacedName]chan struct{}), podCh: make(chan spec.PodEvent), TeamsAPIClient: teams.NewTeamsAPI(operatorConfig.TeamsAPIUrl, logger), - KubeClient: client, + KubeClient: k8sutil.NewFromKubernetesInterface(client), RestClient: restClient, } } diff --git a/pkg/controller/util.go b/pkg/controller/util.go index 546d6c342..e6abeefae 100644 --- a/pkg/controller/util.go +++ b/pkg/controller/util.go @@ -48,7 +48,7 @@ func (c *Controller) clusterWorkerID(clusterName spec.NamespacedName) uint32 { func (c *Controller) createTPR() error { tpr := thirdPartyResource(constants.TPRName) - _, err := c.KubeClient.ExtensionsV1beta1().ThirdPartyResources().Create(tpr) + _, err := c.KubeClient.ThirdPartyResources().Create(tpr) if err != nil { if !k8sutil.ResourceAlreadyExists(err) { return err @@ -69,7 +69,7 @@ func (c *Controller) getInfrastructureRoles(rolesSecret *spec.NamespacedName) (r infraRolesSecret, err := c.KubeClient. Secrets(rolesSecret.Namespace). - Get(rolesSecret.Name) + Get(rolesSecret.Name, meta_v1.GetOptions{}) if err != nil { c.logger.Debugf("Infrastructure roles secret name: %q", *rolesSecret) return nil, fmt.Errorf("could not get infrastructure roles secret: %v", err) diff --git a/pkg/spec/types.go b/pkg/spec/types.go index 687421bdf..1a43cde05 100644 --- a/pkg/spec/types.go +++ b/pkg/spec/types.go @@ -1,9 +1,9 @@ package spec import ( + "database/sql" "fmt" "strings" - "database/sql" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/pkg/api/v1" diff --git a/pkg/util/k8sutil/k8sutil.go b/pkg/util/k8sutil/k8sutil.go index 367a23858..09ad38444 100644 --- a/pkg/util/k8sutil/k8sutil.go +++ b/pkg/util/k8sutil/k8sutil.go @@ -28,6 +28,7 @@ type KubernetesClient struct { v1core.ConfigMapsGetter v1beta1.StatefulSetsGetter extensions.ThirdPartyResourcesGetter + RESTClient rest.Interface } func NewFromKubernetesInterface(src kubernetes.Interface) (c KubernetesClient) { @@ -41,6 +42,7 @@ func NewFromKubernetesInterface(src kubernetes.Interface) (c KubernetesClient) { c.PersistentVolumesGetter = src.CoreV1() c.StatefulSetsGetter = src.AppsV1beta1() c.ThirdPartyResourcesGetter = src.ExtensionsV1beta1() + c.RESTClient = src.CoreV1().RESTClient() return } From a30329e04ae7b3e38f74e85e381b66379b94f76f Mon Sep 17 00:00:00 2001 From: Murat Kabilov Date: Mon, 24 Jul 2017 21:55:04 +0200 Subject: [PATCH 11/15] fix list cluster --- pkg/controller/postgresql.go | 37 +++++++++++++++++++++++++++++++++--- 1 file changed, 34 insertions(+), 3 deletions(-) diff --git a/pkg/controller/postgresql.go b/pkg/controller/postgresql.go index d9fc8ff1c..d01f69162 100644 --- a/pkg/controller/postgresql.go +++ b/pkg/controller/postgresql.go @@ -1,8 +1,10 @@ package controller import ( + "encoding/json" "fmt" "reflect" + "sync/atomic" "time" meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -11,7 +13,6 @@ import ( "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/tools/cache" - "encoding/json" "github.com/zalando-incubator/postgres-operator/pkg/cluster" "github.com/zalando-incubator/postgres-operator/pkg/spec" "github.com/zalando-incubator/postgres-operator/pkg/util" @@ -32,6 +33,9 @@ func (c *Controller) clusterResync(stopCh <-chan struct{}) { } func (c *Controller) clusterListFunc(options meta_v1.ListOptions) (runtime.Object, error) { + var list spec.PostgresqlList + var activeClustersCnt, failedClustersCnt int + req := c.RestClient. Get(). Namespace(c.opConfig.Namespace). @@ -42,9 +46,36 @@ func (c *Controller) clusterListFunc(options meta_v1.ListOptions) (runtime.Objec if err != nil { return nil, err } - var list spec.PostgresqlList + err = json.Unmarshal(b, &list) - return &list, json.Unmarshal(b, &list) + if time.Now().Unix()-atomic.LoadInt64(&c.lastClusterSyncTime) <= int64(c.opConfig.ResyncPeriod.Seconds()) { + c.logger.Debugln("skipping resync of clusters") + return &list, err + } + + for _, pg := range list.Items { + if pg.Error != nil { + failedClustersCnt++ + continue + } + c.queueClusterEvent(nil, &pg, spec.EventSync) + activeClustersCnt++ + } + if len(list.Items) > 0 { + if failedClustersCnt > 0 && activeClustersCnt == 0 { + c.logger.Infof("There are no clusters running. %d are in the failed state", failedClustersCnt) + } else if failedClustersCnt == 0 && activeClustersCnt > 0 { + c.logger.Infof("There are %d clusters running", activeClustersCnt) + } else { + c.logger.Infof("There are %d clusters running and %d are in the failed state", activeClustersCnt, failedClustersCnt) + } + } else { + c.logger.Infof("No clusters running") + } + + atomic.StoreInt64(&c.lastClusterSyncTime, time.Now().Unix()) + + return &list, err } type tprDecoder struct { From 2bbcb5f6271f2a4a398a31362e4abaa93dab250a Mon Sep 17 00:00:00 2001 From: Murat Kabilov Date: Tue, 25 Jul 2017 00:35:52 +0200 Subject: [PATCH 12/15] split init controller; make controller config private --- pkg/controller/controller.go | 110 +++++++++++++++++++---------------- pkg/controller/util.go | 4 +- 2 files changed, 61 insertions(+), 53 deletions(-) diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index dc3504d86..ff6e29c26 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -29,11 +29,10 @@ type Config struct { } type Controller struct { - Config - + config Config opConfig *config.Config - logger *logrus.Entry + logger *logrus.Entry KubeClient k8sutil.KubernetesClient RestClient rest.Interface TeamsAPIClient *teams.API @@ -46,82 +45,76 @@ type Controller struct { podInformer cache.SharedIndexInformer podCh chan spec.PodEvent - clusterEventQueues []*cache.FIFO - + clusterEventQueues []*cache.FIFO lastClusterSyncTime int64 } func NewController(controllerConfig *Config) *Controller { - configMapData := make(map[string]string) logger := logrus.New() - client, err := k8sutil.ClientSet(controllerConfig.RestConfig) - if err != nil { - logger.Fatalf("couldn't create client: %v", err) + return &Controller{ + config: *controllerConfig, + opConfig: &config.Config{}, + logger: logger.WithField("pkg", "controller"), + clusters: make(map[spec.NamespacedName]*cluster.Cluster), + stopChs: make(map[spec.NamespacedName]chan struct{}), + podCh: make(chan spec.PodEvent), } +} - restClient, err := k8sutil.KubernetesRestClient(*controllerConfig.RestConfig) +func (c *Controller) initClients() { + client, err := k8sutil.ClientSet(c.config.RestConfig) if err != nil { - logger.Fatalf("couldn't create rest client: %v", err) + c.logger.Fatalf("couldn't create client: %v", err) } + c.KubeClient = k8sutil.NewFromKubernetesInterface(client) - if controllerConfig.ConfigMapName != (spec.NamespacedName{}) { - configMap, err := client.ConfigMaps(controllerConfig.ConfigMapName.Namespace).Get(controllerConfig.ConfigMapName.Name, meta_v1.GetOptions{}) + c.RestClient, err = k8sutil.KubernetesRestClient(*c.config.RestConfig) + if err != nil { + c.logger.Fatalf("couldn't create rest client: %v", err) + } +} + +func (c *Controller) initOperatorConfig() { + configMapData := make(map[string]string) + + if c.config.ConfigMapName != (spec.NamespacedName{}) { + configMap, err := c.KubeClient.ConfigMaps(c.config.ConfigMapName.Namespace). + Get(c.config.ConfigMapName.Name, meta_v1.GetOptions{}) if err != nil { panic(err) } configMapData = configMap.Data } else { - logger.Infoln("No ConfigMap specified. Loading default values") + c.logger.Infoln("No ConfigMap specified. Loading default values") } if configMapData["namespace"] == "" { // Namespace in ConfigMap has priority over env var - configMapData["namespace"] = controllerConfig.Namespace + configMapData["namespace"] = c.config.Namespace } - if controllerConfig.NoDatabaseAccess { + if c.config.NoDatabaseAccess { configMapData["enable_database_access"] = "false" } - if controllerConfig.NoTeamsAPI { + if c.config.NoTeamsAPI { configMapData["enable_teams_api"] = "false" } - operatorConfig := config.NewFromMap(configMapData) - logger.Infof("Config: %s", operatorConfig.MustMarshal()) - - if operatorConfig.DebugLogging { - logger.Level = logrus.DebugLevel - } - - return &Controller{ - Config: *controllerConfig, - opConfig: operatorConfig, - logger: logger.WithField("pkg", "controller"), - clusters: make(map[spec.NamespacedName]*cluster.Cluster), - stopChs: make(map[spec.NamespacedName]chan struct{}), - podCh: make(chan spec.PodEvent), - TeamsAPIClient: teams.NewTeamsAPI(operatorConfig.TeamsAPIUrl, logger), - KubeClient: k8sutil.NewFromKubernetesInterface(client), - RestClient: restClient, - } -} - -func (c *Controller) Run(stopCh <-chan struct{}, wg *sync.WaitGroup) { - defer wg.Done() - wg.Add(1) - - c.initController() - - go c.runInformers(stopCh) - - for i := range c.clusterEventQueues { - go c.processClusterEventsQueue(i) - } - - c.logger.Info("Started working in background") + c.opConfig = config.NewFromMap(configMapData) } func (c *Controller) initController() { + c.initClients() + c.initOperatorConfig() + + c.logger.Infof("Config: %s", c.opConfig.MustMarshal()) + + if c.opConfig.DebugLogging { + c.logger.Level = logrus.DebugLevel + } + + c.TeamsAPIClient = teams.NewTeamsAPI(c.opConfig.TeamsAPIUrl, c.logger.Logger) + if err := c.createTPR(); err != nil { c.logger.Fatalf("could not register ThirdPartyResource: %v", err) } @@ -129,7 +122,7 @@ func (c *Controller) initController() { if infraRoles, err := c.getInfrastructureRoles(&c.opConfig.InfrastructureRolesSecretName); err != nil { c.logger.Warningf("could not get infrastructure roles: %v", err) } else { - c.InfrastructureRoles = infraRoles + c.config.InfrastructureRoles = infraRoles } // Postgresqls @@ -179,6 +172,21 @@ func (c *Controller) initController() { } } +func (c *Controller) Run(stopCh <-chan struct{}, wg *sync.WaitGroup) { + defer wg.Done() + wg.Add(1) + + c.initController() + + go c.runInformers(stopCh) + + for i := range c.clusterEventQueues { + go c.processClusterEventsQueue(i) + } + + c.logger.Info("Started working in background") +} + func (c *Controller) runInformers(stopCh <-chan struct{}) { go c.postgresqlInformer.Run(stopCh) go c.podInformer.Run(stopCh) diff --git a/pkg/controller/util.go b/pkg/controller/util.go index e6abeefae..2ea4f65b9 100644 --- a/pkg/controller/util.go +++ b/pkg/controller/util.go @@ -17,12 +17,12 @@ import ( func (c *Controller) makeClusterConfig() cluster.Config { infrastructureRoles := make(map[string]spec.PgUser) - for k, v := range c.InfrastructureRoles { + for k, v := range c.config.InfrastructureRoles { infrastructureRoles[k] = v } return cluster.Config{ - RestConfig: c.RestConfig, + RestConfig: c.config.RestConfig, OpConfig: config.Copy(c.opConfig), InfrastructureRoles: infrastructureRoles, } From 0ed47c7aeacec990b493c3c803432a6ec6342dc3 Mon Sep 17 00:00:00 2001 From: Murat Kabilov Date: Tue, 25 Jul 2017 00:36:10 +0200 Subject: [PATCH 13/15] fix controller util test --- pkg/controller/util_test.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/controller/util_test.go b/pkg/controller/util_test.go index bd7d7d049..6bea382d6 100644 --- a/pkg/controller/util_test.go +++ b/pkg/controller/util_test.go @@ -5,11 +5,11 @@ import ( "reflect" "testing" + meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1" v1core "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/pkg/api/v1" "github.com/zalando-incubator/postgres-operator/pkg/spec" - "github.com/zalando-incubator/postgres-operator/pkg/util/config" "github.com/zalando-incubator/postgres-operator/pkg/util/k8sutil" ) @@ -21,7 +21,7 @@ type mockSecret struct { v1core.SecretInterface } -func (c *mockSecret) Get(name string) (*v1.Secret, error) { +func (c *mockSecret) Get(name string, options meta_v1.GetOptions) (*v1.Secret, error) { if name != testInfrastructureRolesSecretName { return nil, fmt.Errorf("NotFound") } @@ -48,7 +48,7 @@ func newMockKubernetesClient() k8sutil.KubernetesClient { } func newMockController() *Controller { - controller := NewController(&Config{}, &config.Config{}) + controller := NewController(&Config{}) controller.opConfig.ClusterNameLabel = "cluster-name" controller.opConfig.InfrastructureRolesSecretName = spec.NamespacedName{v1.NamespaceDefault, testInfrastructureRolesSecretName} @@ -70,7 +70,7 @@ func TestPodClusterName(t *testing.T) { }, { &v1.Pod{ - ObjectMeta: v1.ObjectMeta{ + ObjectMeta: meta_v1.ObjectMeta{ Namespace: v1.NamespaceDefault, Labels: map[string]string{ mockController.opConfig.ClusterNameLabel: "testcluster", From 9c19a22a7fb5ed54c0e127294a349897272008a9 Mon Sep 17 00:00:00 2001 From: Murat Kabilov Date: Tue, 25 Jul 2017 00:41:28 +0200 Subject: [PATCH 14/15] fix typos --- pkg/cluster/k8sres.go | 2 +- pkg/spec/postgresql.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/cluster/k8sres.go b/pkg/cluster/k8sres.go index 2ee3aa521..48d5bcaa6 100644 --- a/pkg/cluster/k8sres.go +++ b/pkg/cluster/k8sres.go @@ -451,7 +451,7 @@ func (c *Cluster) generateService(role PostgresRole, newSpec *spec.PostgresSpec) if (newSpec.UseLoadBalancer != nil && *newSpec.UseLoadBalancer) || (newSpec.UseLoadBalancer == nil && c.OpConfig.EnableLoadBalancer) { - // safe default value: lock load balancer to only local address unless overriden explicitely. + // safe default value: lock load balancer to only local address unless overridden explicitly. sourceRanges := []string{localHost} allowedSourceRanges := newSpec.AllowedSourceRanges if len(allowedSourceRanges) >= 0 { diff --git a/pkg/spec/postgresql.go b/pkg/spec/postgresql.go index 51d914d36..469d841cc 100644 --- a/pkg/spec/postgresql.go +++ b/pkg/spec/postgresql.go @@ -87,7 +87,7 @@ type PostgresSpec struct { TeamID string `json:"teamId"` AllowedSourceRanges []string `json:"allowedSourceRanges"` - // EnableLoadBalancer is a pointer, since it is importat to know if that parameters is omited from the manifest + // EnableLoadBalancer is a pointer, since it is importat to know if that parameters is omitted from the manifest UseLoadBalancer *bool `json:"useLoadBalancer,omitempty"` ReplicaLoadBalancer bool `json:"replicaLoadBalancer,omitempty"` NumberOfInstances int32 `json:"numberOfInstances"` From 941bfb3cc51bea4dbf7fd9f7c9238c1d575749c8 Mon Sep 17 00:00:00 2001 From: Murat Kabilov Date: Tue, 25 Jul 2017 01:01:49 +0200 Subject: [PATCH 15/15] remove teams api client from controller and make it per cluster --- pkg/cluster/cluster.go | 1 + pkg/controller/controller.go | 10 +++------- 2 files changed, 4 insertions(+), 7 deletions(-) diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index c52de6980..7c0e26579 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -104,6 +104,7 @@ func New(cfg Config, kubeClient k8sutil.KubernetesClient, pgSpec spec.Postgresql deleteOptions: &meta_v1.DeleteOptions{OrphanDependents: &orphanDependents}, podEventsQueue: podEventsQueue, KubeClient: kubeClient, + teamsAPIClient: teams.NewTeamsAPI(cfg.OpConfig.TeamsAPIUrl, logger.Logger), } return cluster diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index ff6e29c26..36db103c2 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -15,7 +15,6 @@ import ( "github.com/zalando-incubator/postgres-operator/pkg/util/config" "github.com/zalando-incubator/postgres-operator/pkg/util/constants" "github.com/zalando-incubator/postgres-operator/pkg/util/k8sutil" - "github.com/zalando-incubator/postgres-operator/pkg/util/teams" ) type Config struct { @@ -32,10 +31,9 @@ type Controller struct { config Config opConfig *config.Config - logger *logrus.Entry - KubeClient k8sutil.KubernetesClient - RestClient rest.Interface - TeamsAPIClient *teams.API + logger *logrus.Entry + KubeClient k8sutil.KubernetesClient + RestClient rest.Interface clustersMu sync.RWMutex clusters map[spec.NamespacedName]*cluster.Cluster @@ -113,8 +111,6 @@ func (c *Controller) initController() { c.logger.Level = logrus.DebugLevel } - c.TeamsAPIClient = teams.NewTeamsAPI(c.opConfig.TeamsAPIUrl, c.logger.Logger) - if err := c.createTPR(); err != nil { c.logger.Fatalf("could not register ThirdPartyResource: %v", err) }