From 6f7399b36f83de1ed20ec1602c1913234a4a46ca Mon Sep 17 00:00:00 2001 From: Murat Kabilov Date: Mon, 13 Mar 2017 11:33:52 +0100 Subject: [PATCH] Sync clusters states * move statefulset creation from cluster spec to the separate function * sync cluster state with desired state; * move out from arrays for cluster resources; * recreate pods instead of deleting them in case of statefulset change * check for master while creating cluster/updating pods * simplify retryutil * list pvc while listing resources * name kubernetes resources with capital letter * do rolling update in case of env variables change --- pkg/cluster/cluster.go | 26 ++- pkg/cluster/pod.go | 61 +++---- pkg/cluster/resources.go | 33 ++-- pkg/cluster/sync.go | 159 ++++++++++++++++++ pkg/cluster/util.go | 102 +++++++++-- pkg/controller/pod.go | 6 +- pkg/controller/postgresql.go | 2 + pkg/controller/util.go | 2 +- pkg/util/constants/constants.go | 3 +- pkg/util/k8sutil/tpr_util.go | 2 +- .../resources/{factory.go => resources.go} | 0 pkg/util/retryutil/retry_util.go | 22 +-- pkg/util/util.go | 4 + 13 files changed, 334 insertions(+), 88 deletions(-) create mode 100644 pkg/cluster/sync.go rename pkg/util/resources/{factory.go => resources.go} (100%) diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index a758f015e..f52556a9f 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -109,6 +109,11 @@ func (c *Cluster) needsRollingUpdate(otherSpec *spec.Postgresql) bool { return true } + newSs := genStatefulSet(c.ClusterName(), otherSpec.Spec, c.etcdHost, c.dockerImage) + diff, res := statefulsetsEqual(c.Statefulset, newSs) + if diff && res { + return true + } return false } @@ -159,7 +164,7 @@ func (c *Cluster) Create() error { } if err := c.applySecrets(); err != nil { - return fmt.Errorf("Can't create secrets: %s", err) + return fmt.Errorf("Can't create Secrets: %s", err) } else { c.logger.Infof("Secrets have been successfully created") } @@ -172,8 +177,8 @@ func (c *Cluster) Create() error { } c.logger.Info("Waiting for cluster being ready") - err = c.waitStatefulsetPodsReady() - if err != nil { + + if err := c.waitStatefulsetPodsReady(); err != nil { c.logger.Errorf("Failed to create cluster: %s", err) return err } @@ -202,7 +207,7 @@ func (c *Cluster) Update(newSpec *spec.Postgresql) error { c.logger.Infof("Pods need to be recreated") } - newStatefulSet := getStatefulSet(c.ClusterName(), newSpec.Spec, c.etcdHost, c.dockerImage) + newStatefulSet := genStatefulSet(c.ClusterName(), newSpec.Spec, c.etcdHost, c.dockerImage) newService := resources.Service(c.ClusterName(), c.TeamName(), newSpec.Spec.AllowedSourceRanges) if !servicesEqual(newService, c.Service) { @@ -234,22 +239,25 @@ func (c *Cluster) Update(newSpec *spec.Postgresql) error { } func (c *Cluster) Delete() error { + epName := util.NameFromMeta(c.Endpoint.ObjectMeta) if err := c.deleteEndpoint(); err != nil { c.logger.Errorf("Can't delete Endpoint: %s", err) } else { - c.logger.Infof("Endpoint '%s' has been deleted", util.NameFromMeta(c.Endpoint.ObjectMeta)) + c.logger.Infof("Endpoint '%s' has been deleted", epName) } + svcName := util.NameFromMeta(c.Service.ObjectMeta) if err := c.deleteService(); err != nil { c.logger.Errorf("Can't delete Service: %s", err) } else { - c.logger.Infof("Service '%s' has been deleted", util.NameFromMeta(c.Service.ObjectMeta)) + c.logger.Infof("Service '%s' has been deleted", svcName) } + ssName := util.NameFromMeta(c.Statefulset.ObjectMeta) if err := c.deleteStatefulSet(); err != nil { c.logger.Errorf("Can't delete StatefulSet: %s", err) } else { - c.logger.Infof("StatefulSet '%s' has been deleted", util.NameFromMeta(c.Statefulset.ObjectMeta)) + c.logger.Infof("StatefulSet '%s' has been deleted", ssName) } for _, obj := range c.Secrets { @@ -265,8 +273,8 @@ func (c *Cluster) Delete() error { } else { c.logger.Infof("Pods have been deleted") } - err := c.deletePersistenVolumeClaims() - if err != nil { + + if err := c.deletePersistenVolumeClaims(); err != nil { return fmt.Errorf("Can't delete PersistentVolumeClaims: %s", err) } diff --git a/pkg/cluster/pod.go b/pkg/cluster/pod.go index 7640b6214..701b699b5 100644 --- a/pkg/cluster/pod.go +++ b/pkg/cluster/pod.go @@ -9,7 +9,7 @@ import ( "github.bus.zalan.do/acid/postgres-operator/pkg/util" ) -func (c *Cluster) clusterPods() ([]v1.Pod, error) { +func (c *Cluster) listPods() ([]v1.Pod, error) { ns := c.Metadata.Namespace listOptions := v1.ListOptions{ LabelSelector: c.labelsSet().String(), @@ -17,29 +17,12 @@ func (c *Cluster) clusterPods() ([]v1.Pod, error) { pods, err := c.config.KubeClient.Pods(ns).List(listOptions) if err != nil { - return nil, fmt.Errorf("Can't get list of pods: %s", err) + return nil, fmt.Errorf("Can't get list of Pods: %s", err) } return pods.Items, nil } -func (c *Cluster) deletePods() error { - pods, err := c.clusterPods() - if err != nil { - return err - } - - for _, obj := range pods { - if err := c.deletePod(&obj); err != nil { - c.logger.Errorf("Can't delete pod: %s", err) - } else { - c.logger.Infof("Pod '%s' has been deleted", util.NameFromMeta(obj.ObjectMeta)) - } - } - - return nil -} - func (c *Cluster) listPersistentVolumeClaims() ([]v1.PersistentVolumeClaim, error) { ns := c.Metadata.Namespace listOptions := v1.ListOptions{ @@ -53,6 +36,23 @@ func (c *Cluster) listPersistentVolumeClaims() ([]v1.PersistentVolumeClaim, erro return pvcs.Items, nil } +func (c *Cluster) deletePods() error { + pods, err := c.listPods() + if err != nil { + return err + } + + for _, obj := range pods { + if err := c.deletePod(&obj); err != nil { + c.logger.Errorf("Can't delete Pod: %s", err) + } else { + c.logger.Infof("Pod '%s' has been deleted", util.NameFromMeta(obj.ObjectMeta)) + } + } + + return nil +} + func (c *Cluster) deletePersistenVolumeClaims() error { ns := c.Metadata.Namespace pvcs, err := c.listPersistentVolumeClaims() @@ -116,7 +116,7 @@ func (c *Cluster) recreatePod(pod v1.Pod, spiloRole string) error { }() if err := c.config.KubeClient.Pods(pod.Namespace).Delete(pod.Name, deleteOptions); err != nil { - return fmt.Errorf("Can't delete pod: %s", err) + return fmt.Errorf("Can't delete Pod: %s", err) } if err := c.waitForPodDeletion(ch); err != nil { @@ -136,10 +136,10 @@ func (c *Cluster) podEventsDispatcher(stopCh <-chan struct{}) { select { case event := <-c.podEvents: if subscriber, ok := c.podSubscribers[event.PodName]; ok { - c.logger.Debugf("New event for '%s' pod", event.PodName) + c.logger.Debugf("New event for '%s' Pod", event.PodName) go func() { subscriber <- event }() //TODO: is it a right way to do nonblocking send to the channel? } else { - c.logger.Debugf("Skipping event for an unwatched pod '%s'", event.PodName) + c.logger.Debugf("Skipping event for an unwatched Pod '%s'", event.PodName) } case <-stopCh: return @@ -156,15 +156,15 @@ func (c *Cluster) recreatePods() error { } pods, err := c.config.KubeClient.Pods(namespace).List(listOptions) if err != nil { - return fmt.Errorf("Can't get the list of the pods: %s", err) + return fmt.Errorf("Can't get the list of Pods: %s", err) } else { - c.logger.Infof("There are %d pods in the cluster to recreate", len(pods.Items)) + c.logger.Infof("There are %d Pods in the cluster to recreate", len(pods.Items)) } var masterPod v1.Pod for _, pod := range pods.Items { - role, ok := pod.Labels["spilo-role"] - if !ok { + role := util.PodSpiloRole(&pod) + if role == "" { continue } @@ -175,16 +175,19 @@ func (c *Cluster) recreatePods() error { err = c.recreatePod(pod, "replica") if err != nil { - return fmt.Errorf("Can't recreate replica pod '%s': %s", util.NameFromMeta(pod.ObjectMeta), err) + return fmt.Errorf("Can't recreate replica Pod '%s': %s", util.NameFromMeta(pod.ObjectMeta), err) } } + if masterPod.Name == "" { + c.logger.Warningln("No master Pod in the cluster") + } //TODO: do manual failover //TODO: specify master, leave new master empty - c.logger.Infof("Recreating master pod '%s'", util.NameFromMeta(masterPod.ObjectMeta)) + c.logger.Infof("Recreating master Pod '%s'", util.NameFromMeta(masterPod.ObjectMeta)) if err := c.recreatePod(masterPod, "replica"); err != nil { - return fmt.Errorf("Can't recreate master pod '%s': %s", util.NameFromMeta(masterPod.ObjectMeta), err) + return fmt.Errorf("Can't recreate master Pod '%s': %s", util.NameFromMeta(masterPod.ObjectMeta), err) } return nil diff --git a/pkg/cluster/resources.go b/pkg/cluster/resources.go index 2baa53e13..6f6d35673 100644 --- a/pkg/cluster/resources.go +++ b/pkg/cluster/resources.go @@ -18,7 +18,7 @@ var ( orphanDependents = false ) -func getStatefulSet(clusterName spec.ClusterName, cSpec spec.PostgresSpec, etcdHost, dockerImage string) *v1beta1.StatefulSet { +func genStatefulSet(clusterName spec.ClusterName, cSpec spec.PostgresSpec, etcdHost, dockerImage string) *v1beta1.StatefulSet { volumeSize := cSpec.Volume.Size volumeStorageClass := cSpec.Volume.StorageClass resourceList := resources.ResourceList(cSpec.Resources) @@ -36,7 +36,7 @@ func (c *Cluster) LoadResources() error { services, err := c.config.KubeClient.Services(ns).List(listOptions) if err != nil { - return fmt.Errorf("Can't get list of services: %s", err) + return fmt.Errorf("Can't get list of Services: %s", err) } if len(services.Items) > 1 { return fmt.Errorf("Too many(%d) Services for a cluster", len(services.Items)) @@ -45,7 +45,7 @@ func (c *Cluster) LoadResources() error { endpoints, err := c.config.KubeClient.Endpoints(ns).List(listOptions) if err != nil { - return fmt.Errorf("Can't get list of endpoints: %s", err) + return fmt.Errorf("Can't get list of Endpoints: %s", err) } if len(endpoints.Items) > 1 { return fmt.Errorf("Too many(%d) Endpoints for a cluster", len(endpoints.Items)) @@ -54,7 +54,7 @@ func (c *Cluster) LoadResources() error { secrets, err := c.config.KubeClient.Secrets(ns).List(listOptions) if err != nil { - return fmt.Errorf("Can't get list of secrets: %s", err) + return fmt.Errorf("Can't get list of Secrets: %s", err) } for i, secret := range secrets.Items { if _, ok := c.Secrets[secret.UID]; ok { @@ -66,7 +66,7 @@ func (c *Cluster) LoadResources() error { statefulSets, err := c.config.KubeClient.StatefulSets(ns).List(listOptions) if err != nil { - return fmt.Errorf("Can't get list of stateful sets: %s", err) + return fmt.Errorf("Can't get list of StatefulSets: %s", err) } if len(statefulSets.Items) > 1 { return fmt.Errorf("Too many(%d) StatefulSets for a cluster", len(statefulSets.Items)) @@ -86,15 +86,24 @@ func (c *Cluster) ListResources() error { c.logger.Infof("Endpoint: %s (uid: %s)", util.NameFromMeta(c.Endpoint.ObjectMeta), c.Endpoint.UID) c.logger.Infof("Service: %s (uid: %s)", util.NameFromMeta(c.Service.ObjectMeta), c.Service.UID) - pods, err := c.clusterPods() + pods, err := c.listPods() if err != nil { - return fmt.Errorf("Can't get pods: %s", err) + return fmt.Errorf("Can't get the list of Pods: %s", err) } for _, obj := range pods { c.logger.Infof("Pod: %s (uid: %s)", util.NameFromMeta(obj.ObjectMeta), obj.UID) } + pvcs, err := c.listPersistentVolumeClaims() + if err != nil { + return fmt.Errorf("Can't get the list of PVCs: %s", err) + } + + for _, obj := range pvcs { + c.logger.Infof("PVC: %s (uid: %s)", util.NameFromMeta(obj.ObjectMeta), obj.UID) + } + return nil } @@ -102,7 +111,7 @@ func (c *Cluster) createStatefulSet() (*v1beta1.StatefulSet, error) { if c.Statefulset != nil { return nil, fmt.Errorf("StatefulSet already exists in the cluster") } - statefulSetSpec := getStatefulSet(c.ClusterName(), c.Spec, c.etcdHost, c.dockerImage) + statefulSetSpec := genStatefulSet(c.ClusterName(), c.Spec, c.etcdHost, c.dockerImage) statefulSet, err := c.config.KubeClient.StatefulSets(statefulSetSpec.Namespace).Create(statefulSetSpec) if k8sutil.ResourceAlreadyExists(err) { return nil, fmt.Errorf("StatefulSet '%s' already exists", util.NameFromMeta(statefulSetSpec.ObjectMeta)) @@ -232,7 +241,7 @@ func (c *Cluster) applySecrets() error { secrets, err := resources.UserSecrets(c.ClusterName(), c.pgUsers) if err != nil { - return fmt.Errorf("Can't get user secrets") + return fmt.Errorf("Can't get user Secrets") } for secretUsername, secretSpec := range secrets { @@ -240,7 +249,7 @@ func (c *Cluster) applySecrets() error { if k8sutil.ResourceAlreadyExists(err) { curSecrets, err := c.config.KubeClient.Secrets(secretSpec.Namespace).Get(secretSpec.Name) if err != nil { - return fmt.Errorf("Can't get current secret: %s", err) + return fmt.Errorf("Can't get current Secret: %s", err) } pwdUser := c.pgUsers[secretUsername] pwdUser.Password = string(curSecrets.Data["password"]) @@ -249,10 +258,10 @@ func (c *Cluster) applySecrets() error { continue } else { if err != nil { - return fmt.Errorf("Can't create secret for user '%s': %s", secretUsername, err) + return fmt.Errorf("Can't create Secret for user '%s': %s", secretUsername, err) } c.Secrets[secret.UID] = secret - c.logger.Debugf("Created new secret, uid: %s", secret.UID) + c.logger.Debugf("Created new Secret, uid: %s", secret.UID) } } diff --git a/pkg/cluster/sync.go b/pkg/cluster/sync.go new file mode 100644 index 000000000..daba17aef --- /dev/null +++ b/pkg/cluster/sync.go @@ -0,0 +1,159 @@ +package cluster + +import ( + "fmt" + + "k8s.io/client-go/pkg/api/v1" + + "github.bus.zalan.do/acid/postgres-operator/pkg/util" + "github.bus.zalan.do/acid/postgres-operator/pkg/util/resources" +) + +func (c *Cluster) SyncCluster() { + if err := c.syncSecrets(); err != nil { + c.logger.Infof("Can't sync Secrets: %s", err) + } + + if err := c.syncEndpoint(); err != nil { + c.logger.Errorf("Can't sync Endpoints: %s", err) + } + + if err := c.syncService(); err != nil { + c.logger.Errorf("Can't sync Services: %s", err) + } + + if err := c.syncStatefulSet(); err != nil { + c.logger.Errorf("Can't sync StatefulSets: %s", err) + } + + if err := c.syncPods(); err != nil { + c.logger.Errorf("Can't sync Pods: %s", err) + } +} + +func (c *Cluster) syncSecrets() error { + //TODO: mind the secrets of the deleted/new users + err := c.applySecrets() + if err != nil { + return err + } + + return nil +} + +func (c *Cluster) syncService() error { + cSpec := c.Spec + if c.Service == nil { + c.logger.Infof("Can't find the cluster's Service") + svc, err := c.createService() + if err != nil { + return fmt.Errorf("Can't create missing Service: %s", err) + } + c.logger.Infof("Created missing Service '%s'", util.NameFromMeta(svc.ObjectMeta)) + + return nil + } + + desiredSvc := resources.Service(c.ClusterName(), c.Spec.TeamId, cSpec.AllowedSourceRanges) + if servicesEqual(c.Service, desiredSvc) { + return nil + } + c.logger.Infof("Service '%s' needs to be updated", util.NameFromMeta(desiredSvc.ObjectMeta)) + + if err := c.updateService(desiredSvc); err != nil { + return fmt.Errorf("Can't update Service to match desired state: %s", err) + } + c.logger.Infof("Service '%s' is in the desired state now", util.NameFromMeta(desiredSvc.ObjectMeta)) + + return nil +} + +func (c *Cluster) syncEndpoint() error { + if c.Endpoint == nil { + c.logger.Infof("Can't find the cluster's Endpoint") + ep, err := c.createEndpoint() + if err != nil { + return fmt.Errorf("Can't create missing Endpoint: %s", err) + } + c.logger.Infof("Created missing Endpoint '%s'", util.NameFromMeta(ep.ObjectMeta)) + return nil + } + + return nil +} + +func (c *Cluster) syncStatefulSet() error { + cSpec := c.Spec + if c.Statefulset == nil { + c.logger.Infof("Can't find the cluster's StatefulSet") + ss, err := c.createStatefulSet() + if err != nil { + return fmt.Errorf("Can't create missing StatefulSet: %s", err) + } + err = c.waitStatefulsetPodsReady() + if err != nil { + return fmt.Errorf("Cluster is not ready: %s", err) + } + c.logger.Infof("Created missing StatefulSet '%s'", util.NameFromMeta(ss.ObjectMeta)) + return nil + } + + desiredSS := genStatefulSet(c.ClusterName(), cSpec, c.etcdHost, c.dockerImage) + equalSS, rollUpdate := statefulsetsEqual(c.Statefulset, desiredSS) + if equalSS { + return nil + } + c.logger.Infof("StatefulSet '%s' is not in the desired state", util.NameFromMeta(c.Statefulset.ObjectMeta)) + + if err := c.updateStatefulSet(desiredSS); err != nil { + return fmt.Errorf("Can't update StatefulSet: %s", err) + } + + if !rollUpdate { + c.logger.Debugln("No rolling update is needed") + return nil + } + c.logger.Debugln("Performing rolling update") + if err := c.recreatePods(); err != nil { + return fmt.Errorf("Can't recreate Pods: %s", err) + } + c.logger.Infof("Pods have been recreated") + + return nil +} + +func (c *Cluster) syncPods() error { + curSs := c.Statefulset + + ls := c.labelsSet() + namespace := c.Metadata.Namespace + + listOptions := v1.ListOptions{ + LabelSelector: ls.String(), + } + pods, err := c.config.KubeClient.Pods(namespace).List(listOptions) + if err != nil { + return fmt.Errorf("Can't get list of Pods: %s", err) + } + if int32(len(pods.Items)) != *curSs.Spec.Replicas { + return fmt.Errorf("Number of existing Pods does not match number of replicas of the StatefulSet") + } + + for _, pod := range pods.Items { + if podMatchesTemplate(&pod, curSs) { + c.logger.Infof("Pod '%s' matches StatefulSet pod template", util.NameFromMeta(pod.ObjectMeta)) + continue + } + + c.logger.Infof("Pod '%s' does not match StatefulSet pod template and needs to be deleted.", util.NameFromMeta(pod.ObjectMeta)) + + if util.PodSpiloRole(&pod) == "master" { + //TODO: do manual failover first + } + err = c.recreatePod(pod, "replica") // newly created pods are always "replica"s + + c.logger.Infof("Pod '%s' has been successfully recreated", util.NameFromMeta(pod.ObjectMeta)) + } + + return nil +} diff --git a/pkg/cluster/util.go b/pkg/cluster/util.go index 7372428be..f5fb0d716 100644 --- a/pkg/cluster/util.go +++ b/pkg/cluster/util.go @@ -9,9 +9,11 @@ import ( etcdclient "github.com/coreos/etcd/client" "k8s.io/client-go/pkg/api/v1" + "k8s.io/client-go/pkg/apis/apps/v1beta1" "k8s.io/client-go/pkg/labels" "github.bus.zalan.do/acid/postgres-operator/pkg/spec" + "github.bus.zalan.do/acid/postgres-operator/pkg/util" "github.bus.zalan.do/acid/postgres-operator/pkg/util/constants" "github.bus.zalan.do/acid/postgres-operator/pkg/util/retryutil" ) @@ -43,6 +45,83 @@ func normalizeUserFlags(userFlags []string) (flags []string, err error) { return } +func statefulsetsEqual(ss1, ss2 *v1beta1.StatefulSet) (equal bool, needsRollUpdate bool) { + equal = true + needsRollUpdate = false + //TODO: improve me + if *ss1.Spec.Replicas != *ss2.Spec.Replicas { + equal = false + } + if len(ss1.Spec.Template.Spec.Containers) != len(ss1.Spec.Template.Spec.Containers) { + equal = false + needsRollUpdate = true + return + } + if len(ss1.Spec.Template.Spec.Containers) == 0 { + return + } + + container1 := ss1.Spec.Template.Spec.Containers[0] + container2 := ss2.Spec.Template.Spec.Containers[0] + if container1.Image != container2.Image { + equal = false + needsRollUpdate = true + return + } + + if !reflect.DeepEqual(container1.Ports, container2.Ports) { + equal = false + needsRollUpdate = true + return + } + + if !reflect.DeepEqual(container1.Resources, container2.Resources) { + equal = false + needsRollUpdate = true + return + } + if !reflect.DeepEqual(container1.Env, container2.Env) { + equal = false + needsRollUpdate = true + } + + return +} + +func servicesEqual(svc1, svc2 *v1.Service) bool { + //TODO: check of Ports + //TODO: improve me + if reflect.DeepEqual(svc1.Spec.LoadBalancerSourceRanges, svc2.Spec.LoadBalancerSourceRanges) { + return true + } + + return false +} + +func podMatchesTemplate(pod *v1.Pod, ss *v1beta1.StatefulSet) bool { + //TODO: improve me + if len(pod.Spec.Containers) != 1 { + return false + } + container := pod.Spec.Containers[0] + ssContainer := ss.Spec.Template.Spec.Containers[0] + + if container.Image != ssContainer.Image { + return false + } + if !reflect.DeepEqual(container.Env, ssContainer.Env) { + return false + } + if !reflect.DeepEqual(container.Ports, ssContainer.Ports) { + return false + } + if !reflect.DeepEqual(container.Resources, ssContainer.Resources) { + return false + } + + return true +} + func (c *Cluster) getTeamMembers() ([]string, error) { teamInfo, err := c.config.TeamsAPIClient.TeamInfo(c.Spec.TeamId) if err != nil { @@ -56,10 +135,8 @@ func (c *Cluster) waitForPodLabel(podEvents chan spec.PodEvent, spiloRole string for { select { case podEvent := <-podEvents: - podLabels := podEvent.CurPod.Labels - c.logger.Debugf("Pod has following labels: %+v", podLabels) - val, ok := podLabels["spilo-role"] - if ok && val == spiloRole { + role := util.PodSpiloRole(podEvent.CurPod) + if role == spiloRole { return nil } case <-time.After(constants.PodLabelWaitTimeout): @@ -82,7 +159,7 @@ func (c *Cluster) waitForPodDeletion(podEvents chan spec.PodEvent) error { } func (c *Cluster) waitStatefulsetReady() error { - return retryutil.Retry(constants.ResourceCheckInterval, int(constants.ResourceCheckTimeout/constants.ResourceCheckInterval), + return retryutil.Retry(constants.ResourceCheckInterval, constants.ResourceCheckTimeout, func() (bool, error) { listOptions := v1.ListOptions{ LabelSelector: c.labelsSet().String(), @@ -119,8 +196,7 @@ func (c *Cluster) waitPodLabelsReady() error { } podsNumber := len(pods.Items) - return retryutil.Retry( - constants.ResourceCheckInterval, int(constants.ResourceCheckTimeout/constants.ResourceCheckInterval), + return retryutil.Retry(constants.ResourceCheckInterval, constants.ResourceCheckTimeout, func() (bool, error) { masterPods, err := c.config.KubeClient.Pods(namespace).List(masterListOption) if err != nil { @@ -133,6 +209,9 @@ func (c *Cluster) waitPodLabelsReady() error { if len(masterPods.Items) > 1 { return false, fmt.Errorf("Too many masters") } + if len(replicaPods.Items) == podsNumber { + return false, fmt.Errorf("Cluster has no master") + } return len(masterPods.Items)+len(replicaPods.Items) == podsNumber, nil }) @@ -185,12 +264,3 @@ func (c *Cluster) deleteEtcdKey() error { return nil } - -func servicesEqual(svc1, svc2 *v1.Service) bool { - //TODO: improve me - if reflect.DeepEqual(svc1.Spec.LoadBalancerSourceRanges, svc2.Spec.LoadBalancerSourceRanges) { - return true - } - - return false -} diff --git a/pkg/controller/pod.go b/pkg/controller/pod.go index d4a8d2313..315e15c33 100644 --- a/pkg/controller/pod.go +++ b/pkg/controller/pod.go @@ -117,15 +117,15 @@ func (c *Controller) podDelete(obj interface{}) { } func (c *Controller) podEventsDispatcher(stopCh <-chan struct{}) { - c.logger.Infof("Watching all pod events") + c.logger.Infof("Watching all Pod events") for { select { case event := <-c.podCh: if subscriber, ok := c.clusters[event.ClusterName]; ok { - c.logger.Debugf("Sending %s event of pod '%s' to the '%s' cluster channel", event.EventType, event.PodName, event.ClusterName) + c.logger.Debugf("Sending %s event of Pod '%s' to the '%s' cluster channel", event.EventType, event.PodName, event.ClusterName) go subscriber.ReceivePodEvent(event) } else { - c.logger.Debugf("Skipping pods unrelated to clusters: %s", event.PodName) + c.logger.Debugf("Skipping Pods unrelated to clusters: %s", event.PodName) } case <-stopCh: return diff --git a/pkg/controller/postgresql.go b/pkg/controller/postgresql.go index e743462d5..47bc2849a 100644 --- a/pkg/controller/postgresql.go +++ b/pkg/controller/postgresql.go @@ -53,7 +53,9 @@ func (c *Controller) clusterListFunc(options api.ListOptions) (runtime.Object, e c.clusters[clusterName] = cl cl.LoadResources() go cl.Run(stopCh) + cl.ListResources() + cl.SyncCluster() } if len(c.clusters) > 0 { c.logger.Infof("There are %d clusters currently running", len(c.clusters)) diff --git a/pkg/controller/util.go b/pkg/controller/util.go index f26840da9..bb680959a 100644 --- a/pkg/controller/util.go +++ b/pkg/controller/util.go @@ -24,7 +24,7 @@ func (c *Controller) getOAuthToken() (string, error) { credentialsSecret, err := c.config.KubeClient.Secrets(api.NamespaceDefault).Get(constants.OAuthTokenSecretName) if err != nil { - return "", fmt.Errorf("Can't get credentials secret: %s", err) + return "", fmt.Errorf("Can't get credentials Secret: %s", err) } data := credentialsSecret.Data diff --git a/pkg/util/constants/constants.go b/pkg/util/constants/constants.go index 348daaa4b..a80b6f0d6 100644 --- a/pkg/util/constants/constants.go +++ b/pkg/util/constants/constants.go @@ -29,12 +29,13 @@ const ( PamConfiguration = "https://info.example.com/oauth2/tokeninfo?access_token= uid realm=/employees" PasswordLength = 64 TeamsAPIUrl = "https://teams.example.com/api/" - UserSecretTemplate = "%s.%s.credentials.%s.%s" + UserSecretTemplate = "%s.%s.credentials.%s.%s" // Username, ClusterName, TPRName, TPRVendor OAuthTokenSecretName = "postgresql-operator" ServiceAccountName = "operator" DataVolumeName = "pgdata" ZalandoDnsNameAnnotation = "zalando.org/dnsname" + // TODO: move DbHostedZone to operator configuration DbHostedZone = "db.example.com" ) diff --git a/pkg/util/k8sutil/tpr_util.go b/pkg/util/k8sutil/tpr_util.go index fa40360d2..48521e753 100644 --- a/pkg/util/k8sutil/tpr_util.go +++ b/pkg/util/k8sutil/tpr_util.go @@ -15,7 +15,7 @@ func listClustersURI(ns string) string { } func WaitTPRReady(restclient rest.Interface, interval, timeout time.Duration, ns string) error { - return retryutil.Retry(interval, int(timeout/interval), func() (bool, error) { + return retryutil.Retry(interval, timeout, func() (bool, error) { _, err := restclient.Get().RequestURI(listClustersURI(ns)).DoRaw() if err != nil { if ResourceNotFound(err) { // not set up yet. wait more. diff --git a/pkg/util/resources/factory.go b/pkg/util/resources/resources.go similarity index 100% rename from pkg/util/resources/factory.go rename to pkg/util/resources/resources.go diff --git a/pkg/util/retryutil/retry_util.go b/pkg/util/retryutil/retry_util.go index 6e81f6599..0d38017dd 100644 --- a/pkg/util/retryutil/retry_util.go +++ b/pkg/util/retryutil/retry_util.go @@ -5,24 +5,14 @@ import ( "time" ) -type RetryError struct { - n int -} - -func (e *RetryError) Error() string { - return fmt.Sprintf("still failing after %d retries", e.n) -} - type ConditionFunc func() (bool, error) -// Retry retries f every interval until after maxRetries. -// The interval won't be affected by how long f takes. -// For example, if interval is 3s, f takes 1s, another f will be called 2s later. -// However, if f takes longer than interval, it will be delayed. -func Retry(interval time.Duration, maxRetries int, f ConditionFunc) error { - if maxRetries <= 0 { - return fmt.Errorf("maxRetries (%d) should be > 0", maxRetries) +func Retry(interval time.Duration, timeout time.Duration, f ConditionFunc) error { + //TODO: make the retry exponential + if timeout < interval { + return fmt.Errorf("timout(%s) should be greater than interval(%s)", timeout, interval) } + maxRetries := int(timeout / interval) tick := time.NewTicker(interval) defer tick.Stop() @@ -39,5 +29,5 @@ func Retry(interval time.Duration, maxRetries int, f ConditionFunc) error { } <-tick.C } - return &RetryError{maxRetries} + return fmt.Errorf("Still failing after %d retries", maxRetries) } diff --git a/pkg/util/util.go b/pkg/util/util.go index 7b68234af..f085e8da6 100644 --- a/pkg/util/util.go +++ b/pkg/util/util.go @@ -45,6 +45,10 @@ func PodClusterName(pod *v1.Pod) spec.ClusterName { return spec.ClusterName{} } +func PodSpiloRole(pod *v1.Pod) string { + return pod.Labels["spilo-role"] +} + func ClusterDNSName(clusterName, teamName, hostedZone string) string { return fmt.Sprintf("%s.%s.%s", clusterName, teamName, hostedZone) }