diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index 42109f53c..ecd52650f 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -40,11 +40,12 @@ type Config struct { } type KubeResources struct { - Services map[types.UID]*v1.Service - Endpoints map[types.UID]*v1.Endpoints - Secrets map[types.UID]*v1.Secret - Statefulsets map[types.UID]*v1beta1.StatefulSet + Service *v1.Service + Endpoint *v1.Endpoints + Secrets map[types.UID]*v1.Secret + Statefulset *v1beta1.StatefulSet //Pods are treated separately + //PVCs are treated separately } type Cluster struct { @@ -63,12 +64,7 @@ type Cluster struct { func New(cfg Config, pgSpec spec.Postgresql) *Cluster { lg := logrus.WithField("pkg", "cluster").WithField("cluster-name", pgSpec.Metadata.Name) - kubeResources := KubeResources{ - Services: make(map[types.UID]*v1.Service), - Endpoints: make(map[types.UID]*v1.Endpoints), - Secrets: make(map[types.UID]*v1.Secret), - Statefulsets: make(map[types.UID]*v1beta1.StatefulSet), - } + kubeResources := KubeResources{Secrets: make(map[types.UID]*v1.Secret)} cluster := &Cluster{ config: cfg, @@ -149,18 +145,15 @@ func (c *Cluster) Create() error { } c.initSystemUsers() - err = c.initRobotUsers() - if err != nil { + if err := c.initRobotUsers(); err != nil { return fmt.Errorf("Can't init robot users: %s", err) } - err = c.initHumanUsers() - if err != nil { + if err := c.initHumanUsers(); err != nil { return fmt.Errorf("Can't init human users: %s", err) } - err = c.applySecrets() - if err != nil { + if err := c.applySecrets(); err != nil { return fmt.Errorf("Can't create secrets: %s", err) } else { c.logger.Infof("Secrets have been successfully created") @@ -174,19 +167,17 @@ func (c *Cluster) Create() error { } c.logger.Info("Waiting for cluster being ready") - err = c.waitClusterReady() + err = c.waitStatefulsetPodsReady() if err != nil { c.logger.Errorf("Failed to create cluster: %s", err) return err } - err = c.initDbConn() - if err != nil { + if err := c.initDbConn(); err != nil { return fmt.Errorf("Can't init db connection: %s", err) } - err = c.createUsers() - if err != nil { + if err := c.createUsers(); err != nil { return fmt.Errorf("Can't create users: %s", err) } else { c.logger.Infof("Users have been successfully created") @@ -198,18 +189,20 @@ func (c *Cluster) Create() error { } func (c *Cluster) Update(newSpec *spec.Postgresql, rollingUpdate bool) error { - statefulSet := getStatefulSet(c.ClusterName(), newSpec.Spec, c.etcdHost, c.dockerImage) + newStatefulSet := getStatefulSet(c.ClusterName(), newSpec.Spec, c.etcdHost, c.dockerImage) + + if !reflect.DeepEqual(newSpec.Spec.Volume, c.Spec.Volume) { + //TODO: update PVC + } //TODO: mind the case of updating allowedSourceRanges - err := c.updateStatefulSet(statefulSet) - if err != nil { + if err := c.updateStatefulSet(newStatefulSet); err != nil { return fmt.Errorf("Can't upate cluster: %s", err) } if rollingUpdate { - err = c.recreatePods() // TODO: wait for actual streaming to the replica - if err != nil { + if err := c.recreatePods(); err != nil { return fmt.Errorf("Can't recreate pods: %s", err) } } @@ -218,45 +211,36 @@ func (c *Cluster) Update(newSpec *spec.Postgresql, rollingUpdate bool) error { } func (c *Cluster) Delete() error { - for _, obj := range c.Statefulsets { - err := c.deleteStatefulSet(obj) - if err != nil { - c.logger.Errorf("Can't delete StatefulSet: %s", err) - } else { - c.logger.Infof("StatefulSet '%s' has been deleted", util.NameFromMeta(obj.ObjectMeta)) - } + if err := c.deleteEndpoint(); err != nil { + c.logger.Errorf("Can't delete endpoint: %s", err) + } else { + c.logger.Infof("Endpoint '%s' has been deleted", util.NameFromMeta(c.Endpoint.ObjectMeta)) + } + + if err := c.deleteService(); err != nil { + c.logger.Errorf("Can't delete service: %s", err) + } else { + c.logger.Infof("Service '%s' has been deleted", util.NameFromMeta(c.Service.ObjectMeta)) + } + + if err := c.deleteStatefulSet(); err != nil { + c.logger.Errorf("Can't delete StatefulSet: %s", err) + } else { + c.logger.Infof("StatefulSet '%s' has been deleted", util.NameFromMeta(c.Statefulset.ObjectMeta)) } for _, obj := range c.Secrets { - err := c.deleteSecret(obj) - if err != nil { + if err := c.deleteSecret(obj); err != nil { c.logger.Errorf("Can't delete secret: %s", err) } else { c.logger.Infof("Secret '%s' has been deleted", util.NameFromMeta(obj.ObjectMeta)) } } - for _, obj := range c.Endpoints { - err := c.deleteEndpoint(obj) - if err != nil { - c.logger.Errorf("Can't delete endpoint: %s", err) - } else { - c.logger.Infof("Endpoint '%s' has been deleted", util.NameFromMeta(obj.ObjectMeta)) - } - } - - for _, obj := range c.Services { - err := c.deleteService(obj) - if err != nil { - c.logger.Errorf("Can't delete service: %s", err) - } else { - c.logger.Infof("Service '%s' has been deleted", util.NameFromMeta(obj.ObjectMeta)) - } - } - - err := c.deletePods() - if err != nil { - return fmt.Errorf("Can't delete pods: %s", err) + if err := c.deletePods(); err != nil { + c.logger.Errorf("Can't delete pods: %s", err) + } else { + c.logger.Infof("Pods have been deleted") } err = c.deletePersistenVolumeClaims() if err != nil { diff --git a/pkg/cluster/pod.go b/pkg/cluster/pod.go index b4d792305..7640b6214 100644 --- a/pkg/cluster/pod.go +++ b/pkg/cluster/pod.go @@ -30,8 +30,7 @@ func (c *Cluster) deletePods() error { } for _, obj := range pods { - err := c.deletePod(&obj) - if err != nil { + if err := c.deletePod(&obj); err != nil { c.logger.Errorf("Can't delete pod: %s", err) } else { c.logger.Infof("Pod '%s' has been deleted", util.NameFromMeta(obj.ObjectMeta)) @@ -84,13 +83,11 @@ func (c *Cluster) deletePod(pod *v1.Pod) error { delete(c.podSubscribers, podName) }() - err := c.config.KubeClient.Pods(pod.Namespace).Delete(pod.Name, deleteOptions) - if err != nil { + if err := c.config.KubeClient.Pods(pod.Namespace).Delete(pod.Name, deleteOptions); err != nil { return err } - err = c.waitForPodDeletion(ch) - if err != nil { + if err := c.waitForPodDeletion(ch); err != nil { return err } @@ -118,16 +115,14 @@ func (c *Cluster) recreatePod(pod v1.Pod, spiloRole string) error { delete(c.podSubscribers, podName) }() - err := c.config.KubeClient.Pods(pod.Namespace).Delete(pod.Name, deleteOptions) - if err != nil { + if err := c.config.KubeClient.Pods(pod.Namespace).Delete(pod.Name, deleteOptions); err != nil { return fmt.Errorf("Can't delete pod: %s", err) } - err = c.waitForPodDeletion(ch) - if err != nil { + + if err := c.waitForPodDeletion(ch); err != nil { return err } - err = c.waitForPodLabel(ch, spiloRole) - if err != nil { + if err := c.waitForPodLabel(ch, spiloRole); err != nil { return err } c.logger.Infof("Pod '%s' is ready", podName) @@ -161,7 +156,7 @@ func (c *Cluster) recreatePods() error { } pods, err := c.config.KubeClient.Pods(namespace).List(listOptions) if err != nil { - return fmt.Errorf("Can't get list of pods: %s", err) + return fmt.Errorf("Can't get the list of the pods: %s", err) } else { c.logger.Infof("There are %d pods in the cluster to recreate", len(pods.Items)) } @@ -187,8 +182,8 @@ func (c *Cluster) recreatePods() error { //TODO: do manual failover //TODO: specify master, leave new master empty c.logger.Infof("Recreating master pod '%s'", util.NameFromMeta(masterPod.ObjectMeta)) - err = c.recreatePod(masterPod, "replica") - if err != nil { + + if err := c.recreatePod(masterPod, "replica"); err != nil { return fmt.Errorf("Can't recreate master pod '%s': %s", util.NameFromMeta(masterPod.ObjectMeta), err) } diff --git a/pkg/cluster/resources.go b/pkg/cluster/resources.go index 0235d0ab2..687564c1e 100644 --- a/pkg/cluster/resources.go +++ b/pkg/cluster/resources.go @@ -38,24 +38,19 @@ func (c *Cluster) LoadResources() error { if err != nil { return fmt.Errorf("Can't get list of services: %s", err) } - for i, service := range services.Items { - if _, ok := c.Services[service.UID]; ok { - continue - } - c.Services[service.UID] = &services.Items[i] + if len(services.Items) > 1 { + return fmt.Errorf("Too many(%d) Services for a cluster", len(services.Items)) } + c.Service = &services.Items[0] endpoints, err := c.config.KubeClient.Endpoints(ns).List(listOptions) if err != nil { return fmt.Errorf("Can't get list of endpoints: %s", err) } - for i, endpoint := range endpoints.Items { - if _, ok := c.Endpoints[endpoint.UID]; ok { - continue - } - c.Endpoints[endpoint.UID] = &endpoints.Items[i] - c.logger.Debugf("Endpoint loaded, uid: %s", endpoint.UID) + if len(endpoints.Items) > 1 { + return fmt.Errorf("Too many(%d) Endpoints for a cluster", len(endpoints.Items)) } + c.Endpoint = &endpoints.Items[0] secrets, err := c.config.KubeClient.Secrets(ns).List(listOptions) if err != nil { @@ -73,33 +68,23 @@ func (c *Cluster) LoadResources() error { if err != nil { return fmt.Errorf("Can't get list of stateful sets: %s", err) } - for i, statefulSet := range statefulSets.Items { - if _, ok := c.Statefulsets[statefulSet.UID]; ok { - continue - } - c.Statefulsets[statefulSet.UID] = &statefulSets.Items[i] - c.logger.Debugf("StatefulSet loaded, uid: %s", statefulSet.UID) + if len(statefulSets.Items) > 1 { + return fmt.Errorf("Too many(%d) StatefulSets for a cluster", len(statefulSets.Items)) } + c.Statefulset = &statefulSets.Items[0] return nil } func (c *Cluster) ListResources() error { - for _, obj := range c.Statefulsets { - c.logger.Infof("StatefulSet: %s", util.NameFromMeta(obj.ObjectMeta)) - } + c.logger.Infof("StatefulSet: %s (uid: %s)", util.NameFromMeta(c.Statefulset.ObjectMeta), c.Statefulset.UID) for _, obj := range c.Secrets { - c.logger.Infof("Secret: %s", util.NameFromMeta(obj.ObjectMeta)) + c.logger.Infof("Secret: %s (uid: %s)", util.NameFromMeta(obj.ObjectMeta), obj.UID) } - for _, obj := range c.Endpoints { - c.logger.Infof("Endpoint: %s", util.NameFromMeta(obj.ObjectMeta)) - } - - for _, obj := range c.Services { - c.logger.Infof("Service: %s", util.NameFromMeta(obj.ObjectMeta)) - } + c.logger.Infof("Endpoint: %s (uid: %s)", util.NameFromMeta(c.Endpoint.ObjectMeta), c.Endpoint.UID) + c.logger.Infof("Service: %s (uid: %s)", util.NameFromMeta(c.Service.ObjectMeta), c.Service.UID) pods, err := c.clusterPods() if err != nil { @@ -107,13 +92,16 @@ func (c *Cluster) ListResources() error { } for _, obj := range pods { - c.logger.Infof("Pod: %s", util.NameFromMeta(obj.ObjectMeta)) + c.logger.Infof("Pod: %s (uid: %s)", util.NameFromMeta(obj.ObjectMeta), obj.UID) } return nil } func (c *Cluster) createStatefulSet() (*v1beta1.StatefulSet, error) { + if c.Statefulset != nil { + return nil, fmt.Errorf("StatefulSet already exists in the cluster") + } statefulSetSpec := getStatefulSet(c.ClusterName(), c.Spec, c.etcdHost, c.dockerImage) statefulSet, err := c.config.KubeClient.StatefulSets(statefulSetSpec.Namespace).Create(statefulSetSpec) if k8sutil.ResourceAlreadyExists(err) { @@ -122,61 +110,44 @@ func (c *Cluster) createStatefulSet() (*v1beta1.StatefulSet, error) { if err != nil { return nil, err } - c.Statefulsets[statefulSet.UID] = statefulSet + c.Statefulset = statefulSet c.logger.Debugf("Created new StatefulSet, uid: %s", statefulSet.UID) return statefulSet, nil } -func (c *Cluster) updateStatefulSet(statefulSet *v1beta1.StatefulSet) error { - statefulSet, err := c.config.KubeClient.StatefulSets(statefulSet.Namespace).Update(statefulSet) - if err != nil { - c.Statefulsets[statefulSet.UID] = statefulSet +func (c *Cluster) updateStatefulSet(newStatefulSet *v1beta1.StatefulSet) error { + if c.Statefulset == nil { + return fmt.Errorf("There is no StatefulSet in the cluster") } - - return err -} - -func (c *Cluster) deleteStatefulSet(statefulSet *v1beta1.StatefulSet) error { - err := c.config.KubeClient. - StatefulSets(statefulSet.Namespace). - Delete(statefulSet.Name, deleteOptions) - + statefulSet, err := c.config.KubeClient.StatefulSets(newStatefulSet.Namespace).Update(newStatefulSet) if err != nil { return err } - delete(c.Statefulsets, statefulSet.UID) + + c.Statefulset = statefulSet return nil } -func (c *Cluster) createEndpoint() (*v1.Endpoints, error) { - endpointSpec := resources.Endpoint(c.ClusterName()) - - endpoint, err := c.config.KubeClient.Endpoints(endpointSpec.Namespace).Create(endpointSpec) - if k8sutil.ResourceAlreadyExists(err) { - return nil, fmt.Errorf("Endpoint '%s' already exists", util.NameFromMeta(endpointSpec.ObjectMeta)) +func (c *Cluster) deleteStatefulSet() error { + if c.Statefulset == nil { + return fmt.Errorf("There is no StatefulSet in the cluster") } - if err != nil { - return nil, err - } - c.Endpoints[endpoint.UID] = endpoint - c.logger.Debugf("Created new endpoint, uid: %s", endpoint.UID) - return endpoint, nil -} - -func (c *Cluster) deleteEndpoint(endpoint *v1.Endpoints) error { - err := c.config.KubeClient.Endpoints(endpoint.Namespace).Delete(endpoint.Name, deleteOptions) + err := c.config.KubeClient.StatefulSets(c.Statefulset.Namespace).Delete(c.Statefulset.Name, deleteOptions) if err != nil { return err } - delete(c.Endpoints, endpoint.UID) + c.Statefulset = nil return nil } func (c *Cluster) createService() (*v1.Service, error) { + if c.Service != nil { + return nil, fmt.Errorf("Service already exists in the cluster") + } serviceSpec := resources.Service(c.ClusterName(), c.Spec.AllowedSourceRanges) service, err := c.config.KubeClient.Services(serviceSpec.Namespace).Create(serviceSpec) @@ -186,39 +157,73 @@ func (c *Cluster) createService() (*v1.Service, error) { if err != nil { return nil, err } - c.Services[service.UID] = service - c.logger.Debugf("Created new service, uid: %s", service.UID) + c.Service = service return service, nil } -func (c *Cluster) deleteService(service *v1.Service) error { - err := c.config.KubeClient.Services(service.Namespace).Delete(service.Name, deleteOptions) +func (c *Cluster) updateService(newService *v1.Service) error { + if c.Service == nil { + return fmt.Errorf("There is no Service in the cluster") + } + newService.ObjectMeta = c.Service.ObjectMeta + newService.Spec.ClusterIP = c.Service.Spec.ClusterIP + + svc, err := c.config.KubeClient.Services(newService.Namespace).Update(newService) if err != nil { return err } - delete(c.Services, service.UID) + c.Service = svc return nil } -func (c *Cluster) createUsers() error { - for username, user := range c.pgUsers { - if username == constants.SuperuserName || username == constants.ReplicationUsername { - continue - } - - isHuman, err := c.createPgUser(user) - var userType string - if isHuman { - userType = "human" - } else { - userType = "robot" - } - if err != nil { - return fmt.Errorf("Can't create %s user '%s': %s", userType, username, err) - } +func (c *Cluster) deleteService() error { + if c.Service == nil { + return fmt.Errorf("There is no Service in the cluster") } + err := c.config.KubeClient.Services(c.Service.Namespace).Delete(c.Service.Name, deleteOptions) + if err != nil { + return err + } + c.Service = nil + + return nil +} + +func (c *Cluster) createEndpoint() (*v1.Endpoints, error) { + if c.Endpoint != nil { + return nil, fmt.Errorf("Endpoint already exists in the cluster") + } + endpointSpec := resources.Endpoint(c.ClusterName()) + + endpoint, err := c.config.KubeClient.Endpoints(endpointSpec.Namespace).Create(endpointSpec) + if k8sutil.ResourceAlreadyExists(err) { + return nil, fmt.Errorf("Endpoint '%s' already exists", util.NameFromMeta(endpointSpec.ObjectMeta)) + } + if err != nil { + return nil, err + } + c.Endpoint = endpoint + + return endpoint, nil +} + +func (c *Cluster) updateEndpoint(newEndpoint *v1.Endpoints) error { + //TODO: to be implemented + + return nil +} + +func (c *Cluster) deleteEndpoint() error { + if c.Endpoint == nil { + return fmt.Errorf("There is no Endpoint in the cluster") + } + err := c.config.KubeClient.Endpoints(c.Endpoint.Namespace).Delete(c.Endpoint.Name, deleteOptions) + if err != nil { + return err + } + c.Endpoint = nil return nil } @@ -263,3 +268,24 @@ func (c *Cluster) deleteSecret(secret *v1.Secret) error { return err } + +func (c *Cluster) createUsers() error { + for username, user := range c.pgUsers { + if username == constants.SuperuserName || username == constants.ReplicationUsername { + continue + } + + isHuman, err := c.createPgUser(user) + var userType string + if isHuman { + userType = "human" + } else { + userType = "robot" + } + if err != nil { + return fmt.Errorf("Can't create %s user '%s': %s", userType, username, err) + } + } + + return nil +} diff --git a/pkg/cluster/util.go b/pkg/cluster/util.go index 4ca66c684..458e3d92b 100644 --- a/pkg/cluster/util.go +++ b/pkg/cluster/util.go @@ -137,16 +137,14 @@ func (c *Cluster) waitPodLabelsReady() error { }) } -func (c *Cluster) waitClusterReady() error { +func (c *Cluster) waitStatefulsetPodsReady() error { // TODO: wait for the first Pod only - err := c.waitStatefulsetReady() - if err != nil { + if err := c.waitStatefulsetReady(); err != nil { return fmt.Errorf("Statuful set error: %s", err) } // TODO: wait only for master - err = c.waitPodLabelsReady() - if err != nil { + if err := c.waitPodLabelsReady(); err != nil { return fmt.Errorf("Pod labels error: %s", err) } diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index eb7942929..b883b4be3 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -52,8 +52,7 @@ func (c *Controller) Run(stopCh <-chan struct{}, wg *sync.WaitGroup) { wg.Add(1) c.initController() - err := c.initEtcdClient() - if err != nil { + if err := c.initEtcdClient(); err != nil { c.logger.Errorf("Can't get etcd client: %s", err) return } @@ -65,8 +64,7 @@ func (c *Controller) Run(stopCh <-chan struct{}, wg *sync.WaitGroup) { } func (c *Controller) initController() { - err := c.createTPR() - if err != nil { + if err := c.createTPR(); err != nil { c.logger.Fatalf("Can't register ThirdPartyResource: %s", err) } diff --git a/pkg/controller/postgresql.go b/pkg/controller/postgresql.go index d3f66de91..7a86a2886 100644 --- a/pkg/controller/postgresql.go +++ b/pkg/controller/postgresql.go @@ -52,9 +52,8 @@ func (c *Controller) clusterListFunc(options api.ListOptions) (runtime.Object, e c.stopChMap[clusterName] = stopCh c.clusters[clusterName] = cl cl.LoadResources() - cl.ListResources() - go cl.Run(stopCh) + cl.ListResources() } if len(c.clusters) > 0 { c.logger.Infof("There are %d clusters currently running", len(c.clusters)) @@ -96,8 +95,7 @@ func (c *Controller) postgresqlAdd(obj interface{}) { c.logger.Infof("Creation of a new Postgresql cluster '%s' started", clusterName) cl := cluster.New(c.makeClusterConfig(), *pg) cl.MustSetStatus(spec.ClusterStatusCreating) - err := cl.Create() - if err != nil { + if err := cl.Create(); err != nil { c.logger.Errorf("Can't create cluster: %s", err) cl.MustSetStatus(spec.ClusterStatusAddFailed) return @@ -150,8 +148,7 @@ func (c *Controller) postgresqlUpdate(prev, cur interface{}) { } pgCluster.MustSetStatus(spec.ClusterStatusUpdating) - err := pgCluster.Update(pgNew, rollingUpdate) - if err != nil { + if err := pgCluster.Update(pgNew, rollingUpdate); err != nil { pgCluster.MustSetStatus(spec.ClusterStatusUpdateFailed) c.logger.Errorf("Can't update cluster: %s", err) } else { @@ -176,8 +173,7 @@ func (c *Controller) postgresqlDelete(obj interface{}) { } c.logger.Infof("Cluster delete: %s", util.NameFromMeta(pgCur.Metadata)) - err := pgCluster.Delete() - if err != nil { + if err := pgCluster.Delete(); err != nil { c.logger.Errorf("Can't delete cluster '%s': %s", clusterName, err) return }