introduce cluster interface;
remove cluster types separate file
This commit is contained in:
parent
a6c1e8f64d
commit
685e0c533e
|
|
@ -28,6 +28,24 @@ import (
|
||||||
"github.com/zalando-incubator/postgres-operator/pkg/util/volumes"
|
"github.com/zalando-incubator/postgres-operator/pkg/util/volumes"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type Interface interface {
|
||||||
|
Create() error
|
||||||
|
Delete() error
|
||||||
|
ExecCommand(podName *spec.NamespacedName, command ...string) (string, error)
|
||||||
|
ReceivePodEvent(event spec.PodEvent)
|
||||||
|
Run(stopCh <-chan struct{})
|
||||||
|
Sync() error
|
||||||
|
Update(newSpec *spec.Postgresql) error
|
||||||
|
SetFailed(err error)
|
||||||
|
}
|
||||||
|
|
||||||
|
type PostgresRole string
|
||||||
|
|
||||||
|
const (
|
||||||
|
Master PostgresRole = "master"
|
||||||
|
Replica PostgresRole = "replica"
|
||||||
|
)
|
||||||
|
|
||||||
// Config contains operator-wide clients and configuration used from a cluster. TODO: remove struct duplication.
|
// Config contains operator-wide clients and configuration used from a cluster. TODO: remove struct duplication.
|
||||||
type Config struct {
|
type Config struct {
|
||||||
KubeClient *kubernetes.Clientset //TODO: move clients to the better place?
|
KubeClient *kubernetes.Clientset //TODO: move clients to the better place?
|
||||||
|
|
@ -630,3 +648,7 @@ func (c *Cluster) initInfrastructureRoles() error {
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *Cluster) SetFailed(err error) {
|
||||||
|
c.Error = err
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -1,8 +0,0 @@
|
||||||
package cluster
|
|
||||||
|
|
||||||
type PostgresRole string
|
|
||||||
|
|
||||||
const (
|
|
||||||
Master PostgresRole = "master"
|
|
||||||
Replica PostgresRole = "replica"
|
|
||||||
)
|
|
||||||
|
|
@ -31,7 +31,7 @@ type Controller struct {
|
||||||
logger *logrus.Entry
|
logger *logrus.Entry
|
||||||
|
|
||||||
clustersMu sync.RWMutex
|
clustersMu sync.RWMutex
|
||||||
clusters map[spec.NamespacedName]*cluster.Cluster
|
clusters map[spec.NamespacedName]cluster.Interface
|
||||||
stopChs map[spec.NamespacedName]chan struct{}
|
stopChs map[spec.NamespacedName]chan struct{}
|
||||||
|
|
||||||
postgresqlInformer cache.SharedIndexInformer
|
postgresqlInformer cache.SharedIndexInformer
|
||||||
|
|
@ -56,7 +56,7 @@ func New(controllerConfig *Config, operatorConfig *config.Config) *Controller {
|
||||||
Config: *controllerConfig,
|
Config: *controllerConfig,
|
||||||
opConfig: operatorConfig,
|
opConfig: operatorConfig,
|
||||||
logger: logger.WithField("pkg", "controller"),
|
logger: logger.WithField("pkg", "controller"),
|
||||||
clusters: make(map[spec.NamespacedName]*cluster.Cluster),
|
clusters: make(map[spec.NamespacedName]cluster.Interface),
|
||||||
stopChs: make(map[spec.NamespacedName]chan struct{}),
|
stopChs: make(map[spec.NamespacedName]chan struct{}),
|
||||||
podCh: make(chan spec.PodEvent),
|
podCh: make(chan spec.PodEvent),
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -134,8 +134,9 @@ func (c *Controller) processEvent(obj interface{}) error {
|
||||||
c.clustersMu.Unlock()
|
c.clustersMu.Unlock()
|
||||||
|
|
||||||
if err := cl.Create(); err != nil {
|
if err := cl.Create(); err != nil {
|
||||||
cl.Error = fmt.Errorf("could not create cluster: %v", err)
|
err = fmt.Errorf("could not create cluster: %v", err)
|
||||||
logger.Errorf("%v", cl.Error)
|
logger.Errorf("%v", err)
|
||||||
|
cl.SetFailed(err)
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
@ -149,12 +150,13 @@ func (c *Controller) processEvent(obj interface{}) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
if err := cl.Update(event.NewSpec); err != nil {
|
if err := cl.Update(event.NewSpec); err != nil {
|
||||||
cl.Error = fmt.Errorf("could not update cluster: %s", err)
|
err = fmt.Errorf("could not update cluster: %s", err)
|
||||||
logger.Errorf("%v", cl.Error)
|
logger.Errorf("%v", err)
|
||||||
|
cl.SetFailed(err)
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
cl.Error = nil
|
cl.SetFailed(nil)
|
||||||
logger.Infof("Cluster '%s' has been updated", clusterName)
|
logger.Infof("Cluster '%s' has been updated", clusterName)
|
||||||
case spec.EventDelete:
|
case spec.EventDelete:
|
||||||
logger.Infof("Deletion of the '%s' cluster started", clusterName)
|
logger.Infof("Deletion of the '%s' cluster started", clusterName)
|
||||||
|
|
@ -191,11 +193,12 @@ func (c *Controller) processEvent(obj interface{}) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := cl.Sync(); err != nil {
|
if err := cl.Sync(); err != nil {
|
||||||
cl.Error = fmt.Errorf("could not sync cluster '%s': %v", clusterName, err)
|
err = fmt.Errorf("could not sync cluster '%s': %v", clusterName, err)
|
||||||
logger.Errorf("%v", cl.Error)
|
logger.Errorf("%v", err)
|
||||||
|
cl.SetFailed(err)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
cl.Error = nil
|
cl.SetFailed(nil)
|
||||||
|
|
||||||
logger.Infof("Cluster '%s' has been synced", clusterName)
|
logger.Infof("Cluster '%s' has been synced", clusterName)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue