introduce Cluster interface
This commit is contained in:
		
							parent
							
								
									4ec4c71d27
								
							
						
					
					
						commit
						333dfdd640
					
				|  | @ -178,7 +178,8 @@ func (c *Cluster) Create() error { | ||||||
| 	//TODO: service will create endpoint implicitly
 | 	//TODO: service will create endpoint implicitly
 | ||||||
| 	ep, err := c.createEndpoint() | 	ep, err := c.createEndpoint() | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return fmt.Errorf("could not create endpoint: %v", err) | 		c.Error = fmt.Errorf("could not create endpoint: %v", err) | ||||||
|  | 		return c.Error | ||||||
| 	} | 	} | ||||||
| 	c.logger.Infof("endpoint '%s' has been successfully created", util.NameFromMeta(ep.ObjectMeta)) | 	c.logger.Infof("endpoint '%s' has been successfully created", util.NameFromMeta(ep.ObjectMeta)) | ||||||
| 
 | 
 | ||||||
|  | @ -188,41 +189,48 @@ func (c *Cluster) Create() error { | ||||||
| 		} | 		} | ||||||
| 		service, err := c.createService(role) | 		service, err := c.createService(role) | ||||||
| 		if err != nil { | 		if err != nil { | ||||||
| 			return fmt.Errorf("could not create %s service: %v", role, err) | 			c.Error = fmt.Errorf("could not create %s service: %v", role, err) | ||||||
|  | 			return c.Error | ||||||
| 		} | 		} | ||||||
| 		c.logger.Infof("%s service '%s' has been successfully created", role, util.NameFromMeta(service.ObjectMeta)) | 		c.logger.Infof("%s service '%s' has been successfully created", role, util.NameFromMeta(service.ObjectMeta)) | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	if err = c.initUsers(); err != nil { | 	if err = c.initUsers(); err != nil { | ||||||
| 		return err | 		c.Error = err | ||||||
|  | 		return c.Error | ||||||
| 	} | 	} | ||||||
| 	c.logger.Infof("User secrets have been initialized") | 	c.logger.Infof("User secrets have been initialized") | ||||||
| 
 | 
 | ||||||
| 	if err = c.applySecrets(); err != nil { | 	if err = c.applySecrets(); err != nil { | ||||||
| 		return fmt.Errorf("could not create secrets: %v", err) | 		c.Error = fmt.Errorf("could not create secrets: %v", err) | ||||||
|  | 		return c.Error | ||||||
| 	} | 	} | ||||||
| 	c.logger.Infof("secrets have been successfully created") | 	c.logger.Infof("secrets have been successfully created") | ||||||
| 
 | 
 | ||||||
| 	ss, err := c.createStatefulSet() | 	ss, err := c.createStatefulSet() | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return fmt.Errorf("could not create statefulset: %v", err) | 		c.Error = fmt.Errorf("could not create statefulset: %v", err) | ||||||
|  | 		return c.Error | ||||||
| 	} | 	} | ||||||
| 	c.logger.Infof("statefulset '%s' has been successfully created", util.NameFromMeta(ss.ObjectMeta)) | 	c.logger.Infof("statefulset '%s' has been successfully created", util.NameFromMeta(ss.ObjectMeta)) | ||||||
| 
 | 
 | ||||||
| 	c.logger.Info("Waiting for cluster being ready") | 	c.logger.Info("Waiting for cluster being ready") | ||||||
| 
 | 
 | ||||||
| 	if err = c.waitStatefulsetPodsReady(); err != nil { | 	if err = c.waitStatefulsetPodsReady(); err != nil { | ||||||
|  | 		c.Error = err | ||||||
| 		c.logger.Errorf("Failed to create cluster: %s", err) | 		c.logger.Errorf("Failed to create cluster: %s", err) | ||||||
| 		return err | 		return c.Error | ||||||
| 	} | 	} | ||||||
| 	c.logger.Infof("pods are ready") | 	c.logger.Infof("pods are ready") | ||||||
| 
 | 
 | ||||||
| 	if !(c.masterLess || c.databaseAccessDisabled()) { | 	if !(c.masterLess || c.databaseAccessDisabled()) { | ||||||
| 		if err := c.initDbConn(); err != nil { | 		if err := c.initDbConn(); err != nil { | ||||||
| 			return fmt.Errorf("could not init db connection: %v", err) | 			c.Error = fmt.Errorf("could not init db connection: %v", err) | ||||||
|  | 			return c.Error | ||||||
| 		} | 		} | ||||||
| 		if err = c.createUsers(); err != nil { | 		if err = c.createUsers(); err != nil { | ||||||
| 			return fmt.Errorf("could not create users: %v", err) | 			c.Error = fmt.Errorf("could not create users: %v", err) | ||||||
|  | 			return c.Error | ||||||
| 		} | 		} | ||||||
| 		c.logger.Infof("Users have been successfully created") | 		c.logger.Infof("Users have been successfully created") | ||||||
| 	} else { | 	} else { | ||||||
|  | @ -495,6 +503,7 @@ func (c *Cluster) Update(newSpec *spec.Postgresql) error { | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	c.setStatus(spec.ClusterStatusRunning) | 	c.setStatus(spec.ClusterStatusRunning) | ||||||
|  | 	c.Error = nil | ||||||
| 
 | 
 | ||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
|  |  | ||||||
|  | @ -73,6 +73,8 @@ func (c *Cluster) Sync() error { | ||||||
| 		return fmt.Errorf("could not sync persistent volumes: %v", err) | 		return fmt.Errorf("could not sync persistent volumes: %v", err) | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
|  | 	c.Error = nil | ||||||
|  | 
 | ||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  |  | ||||||
|  | @ -10,7 +10,6 @@ import ( | ||||||
| 	"k8s.io/client-go/rest" | 	"k8s.io/client-go/rest" | ||||||
| 	"k8s.io/client-go/tools/cache" | 	"k8s.io/client-go/tools/cache" | ||||||
| 
 | 
 | ||||||
| 	"github.com/zalando-incubator/postgres-operator/pkg/cluster" |  | ||||||
| 	"github.com/zalando-incubator/postgres-operator/pkg/spec" | 	"github.com/zalando-incubator/postgres-operator/pkg/spec" | ||||||
| 	"github.com/zalando-incubator/postgres-operator/pkg/util/config" | 	"github.com/zalando-incubator/postgres-operator/pkg/util/config" | ||||||
| 	"github.com/zalando-incubator/postgres-operator/pkg/util/constants" | 	"github.com/zalando-incubator/postgres-operator/pkg/util/constants" | ||||||
|  | @ -31,7 +30,7 @@ type Controller struct { | ||||||
| 	logger   *logrus.Entry | 	logger   *logrus.Entry | ||||||
| 
 | 
 | ||||||
| 	clustersMu sync.RWMutex | 	clustersMu sync.RWMutex | ||||||
| 	clusters   map[spec.NamespacedName]*cluster.Cluster | 	clusters   map[spec.NamespacedName]spec.Cluster | ||||||
| 	stopChs    map[spec.NamespacedName]chan struct{} | 	stopChs    map[spec.NamespacedName]chan struct{} | ||||||
| 
 | 
 | ||||||
| 	postgresqlInformer cache.SharedIndexInformer | 	postgresqlInformer cache.SharedIndexInformer | ||||||
|  | @ -56,7 +55,7 @@ func New(controllerConfig *Config, operatorConfig *config.Config) *Controller { | ||||||
| 		Config:   *controllerConfig, | 		Config:   *controllerConfig, | ||||||
| 		opConfig: operatorConfig, | 		opConfig: operatorConfig, | ||||||
| 		logger:   logger.WithField("pkg", "controller"), | 		logger:   logger.WithField("pkg", "controller"), | ||||||
| 		clusters: make(map[spec.NamespacedName]*cluster.Cluster), | 		clusters: make(map[spec.NamespacedName]spec.Cluster), | ||||||
| 		stopChs:  make(map[spec.NamespacedName]chan struct{}), | 		stopChs:  make(map[spec.NamespacedName]chan struct{}), | ||||||
| 		podCh:    make(chan spec.PodEvent), | 		podCh:    make(chan spec.PodEvent), | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
|  | @ -134,9 +134,7 @@ func (c *Controller) processEvent(obj interface{}) error { | ||||||
| 		c.clustersMu.Unlock() | 		c.clustersMu.Unlock() | ||||||
| 
 | 
 | ||||||
| 		if err := cl.Create(); err != nil { | 		if err := cl.Create(); err != nil { | ||||||
| 			cl.Error = fmt.Errorf("could not create cluster: %v", err) | 			logger.Errorf("could not create cluster '%s': %v", clusterName, err) | ||||||
| 			logger.Errorf("%v", cl.Error) |  | ||||||
| 
 |  | ||||||
| 			return nil | 			return nil | ||||||
| 		} | 		} | ||||||
| 
 | 
 | ||||||
|  | @ -149,17 +147,14 @@ func (c *Controller) processEvent(obj interface{}) error { | ||||||
| 			return nil | 			return nil | ||||||
| 		} | 		} | ||||||
| 		if err := cl.Update(event.NewSpec); err != nil { | 		if err := cl.Update(event.NewSpec); err != nil { | ||||||
| 			cl.Error = fmt.Errorf("could not update cluster: %s", err) | 			logger.Errorf("could not update cluster '%s': %v", clusterName, err) | ||||||
| 			logger.Errorf("%v", cl.Error) |  | ||||||
| 
 |  | ||||||
| 			return nil | 			return nil | ||||||
| 		} | 		} | ||||||
| 		cl.Error = nil |  | ||||||
| 		logger.Infof("Cluster '%s' has been updated", clusterName) | 		logger.Infof("Cluster '%s' has been updated", clusterName) | ||||||
| 	case spec.EventDelete: | 	case spec.EventDelete: | ||||||
| 		logger.Infof("Deletion of the '%s' cluster started", clusterName) | 		logger.Infof("Deletion of the '%s' cluster started", clusterName) | ||||||
| 		if !clusterFound { | 		if !clusterFound { | ||||||
| 			logger.Errorf("Unknown cluster: %s", clusterName) | 			logger.Errorf("Cluster '%s' is not found", clusterName) | ||||||
| 			return nil | 			return nil | ||||||
| 		} | 		} | ||||||
| 
 | 
 | ||||||
|  | @ -191,11 +186,9 @@ func (c *Controller) processEvent(obj interface{}) error { | ||||||
| 		} | 		} | ||||||
| 
 | 
 | ||||||
| 		if err := cl.Sync(); err != nil { | 		if err := cl.Sync(); err != nil { | ||||||
| 			cl.Error = fmt.Errorf("could not sync cluster '%s': %v", clusterName, err) | 			logger.Errorf("%v", "could not sync cluster '%s': %v", clusterName, err) | ||||||
| 			logger.Errorf("%v", cl.Error) |  | ||||||
| 			return nil | 			return nil | ||||||
| 		} | 		} | ||||||
| 		cl.Error = nil |  | ||||||
| 
 | 
 | ||||||
| 		logger.Infof("Cluster '%s' has been synced", clusterName) | 		logger.Infof("Cluster '%s' has been synced", clusterName) | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
|  | @ -1,9 +1,9 @@ | ||||||
| package spec | package spec | ||||||
| 
 | 
 | ||||||
| import ( | import ( | ||||||
|  | 	"database/sql" | ||||||
| 	"fmt" | 	"fmt" | ||||||
| 	"strings" | 	"strings" | ||||||
| 	"database/sql" |  | ||||||
| 
 | 
 | ||||||
| 	"k8s.io/client-go/pkg/api/v1" | 	"k8s.io/client-go/pkg/api/v1" | ||||||
| 	"k8s.io/client-go/pkg/types" | 	"k8s.io/client-go/pkg/types" | ||||||
|  | @ -73,6 +73,28 @@ type UserSyncer interface { | ||||||
| 	ExecuteSyncRequests(req []PgSyncUserRequest, db *sql.DB) error | 	ExecuteSyncRequests(req []PgSyncUserRequest, db *sql.DB) error | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | type ClusterEventHandler interface { | ||||||
|  | 	Create() error | ||||||
|  | 	Update(*Postgresql) error | ||||||
|  | 	Delete() error | ||||||
|  | 	Sync() error | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | type ClusterCommandExecutor interface { | ||||||
|  | 	ExecCommand(*NamespacedName, ...string) (string, error) | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | type ClusterController interface { | ||||||
|  | 	Run(<-chan struct{}) | ||||||
|  | 	ReceivePodEvent(PodEvent) | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | type Cluster interface { | ||||||
|  | 	ClusterEventHandler | ||||||
|  | 	ClusterCommandExecutor | ||||||
|  | 	ClusterController | ||||||
|  | } | ||||||
|  | 
 | ||||||
| func (n NamespacedName) String() string { | func (n NamespacedName) String() string { | ||||||
| 	return types.NamespacedName(n).String() | 	return types.NamespacedName(n).String() | ||||||
| } | } | ||||||
|  |  | ||||||
		Loading…
	
		Reference in New Issue