diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index 41e2918c8..f36ff79a8 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -56,12 +56,14 @@ type Cluster struct { kubeResources spec.Postgresql Config - logger *logrus.Entry - pgUsers map[string]spec.PgUser - podEvents chan spec.PodEvent - podSubscribers map[spec.NamespacedName]chan spec.PodEvent - pgDb *sql.DB - mu sync.Mutex + logger *logrus.Entry + pgUsers map[string]spec.PgUser + podEvents chan spec.PodEvent + podSubscribers map[spec.NamespacedName]chan spec.PodEvent + pgDb *sql.DB + mu sync.Mutex + masterLess bool + podDispatcherRunning bool } func New(cfg Config, pgSpec spec.Postgresql, logger *logrus.Logger) *Cluster { @@ -69,13 +71,15 @@ func New(cfg Config, pgSpec spec.Postgresql, logger *logrus.Logger) *Cluster { kubeResources := kubeResources{Secrets: make(map[types.UID]*v1.Secret)} cluster := &Cluster{ - Config: cfg, - Postgresql: pgSpec, - logger: lg, - pgUsers: make(map[string]spec.PgUser), - podEvents: make(chan spec.PodEvent), - podSubscribers: make(map[spec.NamespacedName]chan spec.PodEvent), - kubeResources: kubeResources, + Config: cfg, + Postgresql: pgSpec, + logger: lg, + pgUsers: make(map[string]spec.PgUser), + podEvents: make(chan spec.PodEvent), + podSubscribers: make(map[spec.NamespacedName]chan spec.PodEvent), + kubeResources: kubeResources, + masterLess: false, + podDispatcherRunning: false, } return cluster @@ -90,12 +94,6 @@ func (c *Cluster) TeamName() string { return c.Spec.TeamId } -func (c *Cluster) Run(stopCh <-chan struct{}) { - go c.podEventsDispatcher(stopCh) - - <-stopCh -} - func (c *Cluster) SetStatus(status spec.PostgresStatus) { c.Status = status b, err := json.Marshal(status) @@ -155,7 +153,12 @@ func (c *Cluster) etcdKeyExists(keyName string) (bool, error) { return resp != nil, err } -func (c *Cluster) Create() error { +func (c *Cluster) Create(stopCh <-chan struct{}) error { + if !c.podDispatcherRunning { + go c.podEventsDispatcher(stopCh) + c.podDispatcherRunning = true + } + keyExist, err := c.etcdKeyExists(fmt.Sprintf("/%s/%s", c.OpConfig.EtcdScope, c.Metadata.Name)) if err != nil { c.logger.Warnf("Can't check etcd key: %s", err) @@ -203,14 +206,18 @@ func (c *Cluster) Create() error { return err } - if err := c.initDbConn(); err != nil { - return fmt.Errorf("Can't init db connection: %s", err) - } + if !c.masterLess { + if err := c.initDbConn(); err != nil { + return fmt.Errorf("Can't init db connection: %s", err) + } - if err := c.createUsers(); err != nil { - return fmt.Errorf("Can't create users: %s", err) + if err := c.createUsers(); err != nil { + return fmt.Errorf("Can't create users: %s", err) + } else { + c.logger.Infof("Users have been successfully created") + } } else { - c.logger.Infof("Users have been successfully created") + c.logger.Warnln("Cluster is masterless") } c.ListResources() diff --git a/pkg/cluster/pod.go b/pkg/cluster/pod.go index 40cfd37f6..3b35ccbf5 100644 --- a/pkg/cluster/pod.go +++ b/pkg/cluster/pod.go @@ -45,11 +45,13 @@ func (c *Cluster) deletePods() error { } for _, obj := range pods { - c.logger.Debugf("Deleting Pod '%s'", util.NameFromMeta(obj.ObjectMeta)) - if err := c.deletePod(&obj); err != nil { - c.logger.Errorf("Can't delete Pod: %s", err) + podName := util.NameFromMeta(obj.ObjectMeta) + + c.logger.Debugf("Deleting Pod '%s'", podName) + if err := c.deletePod(podName); err != nil { + c.logger.Errorf("Can't delete Pod '%s': %s", podName, err) } else { - c.logger.Infof("Pod '%s' has been deleted", util.NameFromMeta(obj.ObjectMeta)) + c.logger.Infof("Pod '%s' has been deleted", podName) } } if len(pods) > 0 { @@ -83,9 +85,7 @@ func (c *Cluster) deletePersistenVolumeClaims() error { return nil } -func (c *Cluster) deletePod(pod *v1.Pod) error { - podName := util.NameFromMeta(pod.ObjectMeta) - +func (c *Cluster) deletePod(podName spec.NamespacedName) error { ch := make(chan spec.PodEvent) if _, ok := c.podSubscribers[podName]; ok { panic("Pod '" + podName.String() + "' is already subscribed") @@ -96,7 +96,7 @@ func (c *Cluster) deletePod(pod *v1.Pod) error { delete(c.podSubscribers, podName) }() - if err := c.KubeClient.Pods(pod.Namespace).Delete(pod.Name, deleteOptions); err != nil { + if err := c.KubeClient.Pods(podName.Namespace).Delete(podName.Name, deleteOptions); err != nil { return err } diff --git a/pkg/cluster/sync.go b/pkg/cluster/sync.go index 505ac5f01..1b5e62fec 100644 --- a/pkg/cluster/sync.go +++ b/pkg/cluster/sync.go @@ -6,7 +6,11 @@ import ( "github.bus.zalan.do/acid/postgres-operator/pkg/util" ) -func (c *Cluster) SyncCluster() { +func (c *Cluster) SyncCluster(stopCh <-chan struct{}) { + if !c.podDispatcherRunning { + go c.podEventsDispatcher(stopCh) + } + c.logger.Debugf("Syncing Secrets") if err := c.syncSecrets(); err != nil { c.logger.Infof("Can't sync Secrets: %s", err) diff --git a/pkg/cluster/util.go b/pkg/cluster/util.go index 2b5e70ce0..21e5e0f11 100644 --- a/pkg/cluster/util.go +++ b/pkg/cluster/util.go @@ -171,7 +171,7 @@ func (c *Cluster) waitPodLabelsReady() error { } podsNumber := len(pods.Items) - return retryutil.Retry(c.OpConfig.ResourceCheckInterval, c.OpConfig.ResourceCheckTimeout, + err = retryutil.Retry(c.OpConfig.ResourceCheckInterval, c.OpConfig.ResourceCheckTimeout, func() (bool, error) { masterPods, err := c.KubeClient.Pods(namespace).List(masterListOption) if err != nil { @@ -185,11 +185,20 @@ func (c *Cluster) waitPodLabelsReady() error { return false, fmt.Errorf("Too many masters") } if len(replicaPods.Items) == podsNumber { - return false, fmt.Errorf("Cluster has no master") + c.masterLess = true + return true, nil } return len(masterPods.Items)+len(replicaPods.Items) == podsNumber, nil }) + + //TODO: wait for master for a while and then set masterLess flag + + if err != nil { + return err + } + + return nil } func (c *Cluster) waitStatefulsetPodsReady() error { diff --git a/pkg/controller/postgresql.go b/pkg/controller/postgresql.go index 47b471143..e9b80a4d2 100644 --- a/pkg/controller/postgresql.go +++ b/pkg/controller/postgresql.go @@ -48,10 +48,8 @@ func (c *Controller) clusterListFunc(options api.ListOptions) (runtime.Object, e c.stopChMap[clusterName] = stopCh c.clusters[clusterName] = cl cl.LoadResources() - go cl.Run(stopCh) - cl.ListResources() - cl.SyncCluster() + cl.SyncCluster(stopCh) } if len(c.clusters) > 0 { c.logger.Infof("There are %d clusters currently running", len(c.clusters)) @@ -95,13 +93,12 @@ func (c *Controller) postgresqlAdd(obj interface{}) { c.stopChMap[clusterName] = stopCh cl.SetStatus(spec.ClusterStatusCreating) - if err := cl.Create(); err != nil { + if err := cl.Create(stopCh); err != nil { c.logger.Errorf("Can't create cluster: %s", err) cl.SetStatus(spec.ClusterStatusAddFailed) return } cl.SetStatus(spec.ClusterStatusRunning) //TODO: are you sure it's running? - go cl.Run(stopCh) c.logger.Infof("Postgresql cluster '%s' has been created", clusterName) }