Do not create roles if cluster is masterless
fix pod deletion
This commit is contained in:
		
							parent
							
								
									da438aab3a
								
							
						
					
					
						commit
						a7c57874d5
					
				|  | @ -56,12 +56,14 @@ type Cluster struct { | ||||||
| 	kubeResources | 	kubeResources | ||||||
| 	spec.Postgresql | 	spec.Postgresql | ||||||
| 	Config | 	Config | ||||||
| 	logger         *logrus.Entry | 	logger               *logrus.Entry | ||||||
| 	pgUsers        map[string]spec.PgUser | 	pgUsers              map[string]spec.PgUser | ||||||
| 	podEvents      chan spec.PodEvent | 	podEvents            chan spec.PodEvent | ||||||
| 	podSubscribers map[spec.NamespacedName]chan spec.PodEvent | 	podSubscribers       map[spec.NamespacedName]chan spec.PodEvent | ||||||
| 	pgDb           *sql.DB | 	pgDb                 *sql.DB | ||||||
| 	mu             sync.Mutex | 	mu                   sync.Mutex | ||||||
|  | 	masterLess           bool | ||||||
|  | 	podDispatcherRunning bool | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func New(cfg Config, pgSpec spec.Postgresql, logger *logrus.Logger) *Cluster { | func New(cfg Config, pgSpec spec.Postgresql, logger *logrus.Logger) *Cluster { | ||||||
|  | @ -69,13 +71,15 @@ func New(cfg Config, pgSpec spec.Postgresql, logger *logrus.Logger) *Cluster { | ||||||
| 	kubeResources := kubeResources{Secrets: make(map[types.UID]*v1.Secret)} | 	kubeResources := kubeResources{Secrets: make(map[types.UID]*v1.Secret)} | ||||||
| 
 | 
 | ||||||
| 	cluster := &Cluster{ | 	cluster := &Cluster{ | ||||||
| 		Config:         cfg, | 		Config:               cfg, | ||||||
| 		Postgresql:     pgSpec, | 		Postgresql:           pgSpec, | ||||||
| 		logger:         lg, | 		logger:               lg, | ||||||
| 		pgUsers:        make(map[string]spec.PgUser), | 		pgUsers:              make(map[string]spec.PgUser), | ||||||
| 		podEvents:      make(chan spec.PodEvent), | 		podEvents:            make(chan spec.PodEvent), | ||||||
| 		podSubscribers: make(map[spec.NamespacedName]chan spec.PodEvent), | 		podSubscribers:       make(map[spec.NamespacedName]chan spec.PodEvent), | ||||||
| 		kubeResources:  kubeResources, | 		kubeResources:        kubeResources, | ||||||
|  | 		masterLess:           false, | ||||||
|  | 		podDispatcherRunning: false, | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	return cluster | 	return cluster | ||||||
|  | @ -90,12 +94,6 @@ func (c *Cluster) TeamName() string { | ||||||
| 	return c.Spec.TeamId | 	return c.Spec.TeamId | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (c *Cluster) Run(stopCh <-chan struct{}) { |  | ||||||
| 	go c.podEventsDispatcher(stopCh) |  | ||||||
| 
 |  | ||||||
| 	<-stopCh |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func (c *Cluster) SetStatus(status spec.PostgresStatus) { | func (c *Cluster) SetStatus(status spec.PostgresStatus) { | ||||||
| 	c.Status = status | 	c.Status = status | ||||||
| 	b, err := json.Marshal(status) | 	b, err := json.Marshal(status) | ||||||
|  | @ -155,7 +153,12 @@ func (c *Cluster) etcdKeyExists(keyName string) (bool, error) { | ||||||
| 	return resp != nil, err | 	return resp != nil, err | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (c *Cluster) Create() error { | func (c *Cluster) Create(stopCh <-chan struct{}) error { | ||||||
|  | 	if !c.podDispatcherRunning { | ||||||
|  | 		go c.podEventsDispatcher(stopCh) | ||||||
|  | 		c.podDispatcherRunning = true | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
| 	keyExist, err := c.etcdKeyExists(fmt.Sprintf("/%s/%s", c.OpConfig.EtcdScope, c.Metadata.Name)) | 	keyExist, err := c.etcdKeyExists(fmt.Sprintf("/%s/%s", c.OpConfig.EtcdScope, c.Metadata.Name)) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		c.logger.Warnf("Can't check etcd key: %s", err) | 		c.logger.Warnf("Can't check etcd key: %s", err) | ||||||
|  | @ -203,14 +206,18 @@ func (c *Cluster) Create() error { | ||||||
| 		return err | 		return err | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	if err := c.initDbConn(); err != nil { | 	if !c.masterLess { | ||||||
| 		return fmt.Errorf("Can't init db connection: %s", err) | 		if err := c.initDbConn(); err != nil { | ||||||
| 	} | 			return fmt.Errorf("Can't init db connection: %s", err) | ||||||
|  | 		} | ||||||
| 
 | 
 | ||||||
| 	if err := c.createUsers(); err != nil { | 		if err := c.createUsers(); err != nil { | ||||||
| 		return fmt.Errorf("Can't create users: %s", err) | 			return fmt.Errorf("Can't create users: %s", err) | ||||||
|  | 		} else { | ||||||
|  | 			c.logger.Infof("Users have been successfully created") | ||||||
|  | 		} | ||||||
| 	} else { | 	} else { | ||||||
| 		c.logger.Infof("Users have been successfully created") | 		c.logger.Warnln("Cluster is masterless") | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	c.ListResources() | 	c.ListResources() | ||||||
|  |  | ||||||
|  | @ -45,11 +45,13 @@ func (c *Cluster) deletePods() error { | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	for _, obj := range pods { | 	for _, obj := range pods { | ||||||
| 		c.logger.Debugf("Deleting Pod '%s'", util.NameFromMeta(obj.ObjectMeta)) | 		podName := util.NameFromMeta(obj.ObjectMeta) | ||||||
| 		if err := c.deletePod(&obj); err != nil { | 
 | ||||||
| 			c.logger.Errorf("Can't delete Pod: %s", err) | 		c.logger.Debugf("Deleting Pod '%s'", podName) | ||||||
|  | 		if err := c.deletePod(podName); err != nil { | ||||||
|  | 			c.logger.Errorf("Can't delete Pod '%s': %s", podName, err) | ||||||
| 		} else { | 		} else { | ||||||
| 			c.logger.Infof("Pod '%s' has been deleted", util.NameFromMeta(obj.ObjectMeta)) | 			c.logger.Infof("Pod '%s' has been deleted", podName) | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
| 	if len(pods) > 0 { | 	if len(pods) > 0 { | ||||||
|  | @ -83,9 +85,7 @@ func (c *Cluster) deletePersistenVolumeClaims() error { | ||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (c *Cluster) deletePod(pod *v1.Pod) error { | func (c *Cluster) deletePod(podName spec.NamespacedName) error { | ||||||
| 	podName := util.NameFromMeta(pod.ObjectMeta) |  | ||||||
| 
 |  | ||||||
| 	ch := make(chan spec.PodEvent) | 	ch := make(chan spec.PodEvent) | ||||||
| 	if _, ok := c.podSubscribers[podName]; ok { | 	if _, ok := c.podSubscribers[podName]; ok { | ||||||
| 		panic("Pod '" + podName.String() + "' is already subscribed") | 		panic("Pod '" + podName.String() + "' is already subscribed") | ||||||
|  | @ -96,7 +96,7 @@ func (c *Cluster) deletePod(pod *v1.Pod) error { | ||||||
| 		delete(c.podSubscribers, podName) | 		delete(c.podSubscribers, podName) | ||||||
| 	}() | 	}() | ||||||
| 
 | 
 | ||||||
| 	if err := c.KubeClient.Pods(pod.Namespace).Delete(pod.Name, deleteOptions); err != nil { | 	if err := c.KubeClient.Pods(podName.Namespace).Delete(podName.Name, deleteOptions); err != nil { | ||||||
| 		return err | 		return err | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
|  |  | ||||||
|  | @ -6,7 +6,11 @@ import ( | ||||||
| 	"github.bus.zalan.do/acid/postgres-operator/pkg/util" | 	"github.bus.zalan.do/acid/postgres-operator/pkg/util" | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
| func (c *Cluster) SyncCluster() { | func (c *Cluster) SyncCluster(stopCh <-chan struct{}) { | ||||||
|  | 	if !c.podDispatcherRunning { | ||||||
|  | 		go c.podEventsDispatcher(stopCh) | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
| 	c.logger.Debugf("Syncing Secrets") | 	c.logger.Debugf("Syncing Secrets") | ||||||
| 	if err := c.syncSecrets(); err != nil { | 	if err := c.syncSecrets(); err != nil { | ||||||
| 		c.logger.Infof("Can't sync Secrets: %s", err) | 		c.logger.Infof("Can't sync Secrets: %s", err) | ||||||
|  |  | ||||||
|  | @ -171,7 +171,7 @@ func (c *Cluster) waitPodLabelsReady() error { | ||||||
| 	} | 	} | ||||||
| 	podsNumber := len(pods.Items) | 	podsNumber := len(pods.Items) | ||||||
| 
 | 
 | ||||||
| 	return retryutil.Retry(c.OpConfig.ResourceCheckInterval, c.OpConfig.ResourceCheckTimeout, | 	err = retryutil.Retry(c.OpConfig.ResourceCheckInterval, c.OpConfig.ResourceCheckTimeout, | ||||||
| 		func() (bool, error) { | 		func() (bool, error) { | ||||||
| 			masterPods, err := c.KubeClient.Pods(namespace).List(masterListOption) | 			masterPods, err := c.KubeClient.Pods(namespace).List(masterListOption) | ||||||
| 			if err != nil { | 			if err != nil { | ||||||
|  | @ -185,11 +185,20 @@ func (c *Cluster) waitPodLabelsReady() error { | ||||||
| 				return false, fmt.Errorf("Too many masters") | 				return false, fmt.Errorf("Too many masters") | ||||||
| 			} | 			} | ||||||
| 			if len(replicaPods.Items) == podsNumber { | 			if len(replicaPods.Items) == podsNumber { | ||||||
| 				return false, fmt.Errorf("Cluster has no master") | 				c.masterLess = true | ||||||
|  | 				return true, nil | ||||||
| 			} | 			} | ||||||
| 
 | 
 | ||||||
| 			return len(masterPods.Items)+len(replicaPods.Items) == podsNumber, nil | 			return len(masterPods.Items)+len(replicaPods.Items) == podsNumber, nil | ||||||
| 		}) | 		}) | ||||||
|  | 
 | ||||||
|  | 	//TODO: wait for master for a while and then set masterLess flag
 | ||||||
|  | 
 | ||||||
|  | 	if err != nil { | ||||||
|  | 		return err | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	return nil | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (c *Cluster) waitStatefulsetPodsReady() error { | func (c *Cluster) waitStatefulsetPodsReady() error { | ||||||
|  |  | ||||||
|  | @ -48,10 +48,8 @@ func (c *Controller) clusterListFunc(options api.ListOptions) (runtime.Object, e | ||||||
| 		c.stopChMap[clusterName] = stopCh | 		c.stopChMap[clusterName] = stopCh | ||||||
| 		c.clusters[clusterName] = cl | 		c.clusters[clusterName] = cl | ||||||
| 		cl.LoadResources() | 		cl.LoadResources() | ||||||
| 		go cl.Run(stopCh) |  | ||||||
| 
 |  | ||||||
| 		cl.ListResources() | 		cl.ListResources() | ||||||
| 		cl.SyncCluster() | 		cl.SyncCluster(stopCh) | ||||||
| 	} | 	} | ||||||
| 	if len(c.clusters) > 0 { | 	if len(c.clusters) > 0 { | ||||||
| 		c.logger.Infof("There are %d clusters currently running", len(c.clusters)) | 		c.logger.Infof("There are %d clusters currently running", len(c.clusters)) | ||||||
|  | @ -95,13 +93,12 @@ func (c *Controller) postgresqlAdd(obj interface{}) { | ||||||
| 	c.stopChMap[clusterName] = stopCh | 	c.stopChMap[clusterName] = stopCh | ||||||
| 
 | 
 | ||||||
| 	cl.SetStatus(spec.ClusterStatusCreating) | 	cl.SetStatus(spec.ClusterStatusCreating) | ||||||
| 	if err := cl.Create(); err != nil { | 	if err := cl.Create(stopCh); err != nil { | ||||||
| 		c.logger.Errorf("Can't create cluster: %s", err) | 		c.logger.Errorf("Can't create cluster: %s", err) | ||||||
| 		cl.SetStatus(spec.ClusterStatusAddFailed) | 		cl.SetStatus(spec.ClusterStatusAddFailed) | ||||||
| 		return | 		return | ||||||
| 	} | 	} | ||||||
| 	cl.SetStatus(spec.ClusterStatusRunning) //TODO: are you sure it's running?
 | 	cl.SetStatus(spec.ClusterStatusRunning) //TODO: are you sure it's running?
 | ||||||
| 	go cl.Run(stopCh) |  | ||||||
| 
 | 
 | ||||||
| 	c.logger.Infof("Postgresql cluster '%s' has been created", clusterName) | 	c.logger.Infof("Postgresql cluster '%s' has been created", clusterName) | ||||||
| } | } | ||||||
|  |  | ||||||
		Loading…
	
		Reference in New Issue