make postgresql roles public
This commit is contained in:
parent
8b85935a7a
commit
90b49a24ba
|
|
@ -42,7 +42,7 @@ type Config struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
type kubeResources struct {
|
type kubeResources struct {
|
||||||
Services map[postgresRole]*v1.Service
|
Services map[PostgresRole]*v1.Service
|
||||||
Endpoint *v1.Endpoints
|
Endpoint *v1.Endpoints
|
||||||
Secrets map[types.UID]*v1.Secret
|
Secrets map[types.UID]*v1.Secret
|
||||||
Statefulset *v1beta1.StatefulSet
|
Statefulset *v1beta1.StatefulSet
|
||||||
|
|
@ -98,7 +98,7 @@ func New(cfg Config, kubeClient k8sutil.KubernetesClient, pgSpec spec.Postgresql
|
||||||
pgUsers: make(map[string]spec.PgUser),
|
pgUsers: make(map[string]spec.PgUser),
|
||||||
systemUsers: make(map[string]spec.PgUser),
|
systemUsers: make(map[string]spec.PgUser),
|
||||||
podSubscribers: make(map[spec.NamespacedName]chan spec.PodEvent),
|
podSubscribers: make(map[spec.NamespacedName]chan spec.PodEvent),
|
||||||
kubeResources: kubeResources{Secrets: make(map[types.UID]*v1.Secret), Services: make(map[postgresRole]*v1.Service)},
|
kubeResources: kubeResources{Secrets: make(map[types.UID]*v1.Secret), Services: make(map[PostgresRole]*v1.Service)},
|
||||||
masterLess: false,
|
masterLess: false,
|
||||||
userSyncStrategy: users.DefaultUserSyncStrategy{},
|
userSyncStrategy: users.DefaultUserSyncStrategy{},
|
||||||
deleteOptions: &metav1.DeleteOptions{OrphanDependents: &orphanDependents},
|
deleteOptions: &metav1.DeleteOptions{OrphanDependents: &orphanDependents},
|
||||||
|
|
@ -192,8 +192,8 @@ func (c *Cluster) Create() error {
|
||||||
}
|
}
|
||||||
c.logger.Infof("endpoint %q has been successfully created", util.NameFromMeta(ep.ObjectMeta))
|
c.logger.Infof("endpoint %q has been successfully created", util.NameFromMeta(ep.ObjectMeta))
|
||||||
|
|
||||||
for _, role := range []postgresRole{master, replica} {
|
for _, role := range []PostgresRole{Master, Replica} {
|
||||||
if role == replica && !c.Spec.ReplicaLoadBalancer {
|
if role == Replica && !c.Spec.ReplicaLoadBalancer {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
service, err = c.createService(role)
|
service, err = c.createService(role)
|
||||||
|
|
@ -247,7 +247,7 @@ func (c *Cluster) Create() error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Cluster) sameServiceWith(role postgresRole, service *v1.Service) (match bool, reason string) {
|
func (c *Cluster) sameServiceWith(role PostgresRole, service *v1.Service) (match bool, reason string) {
|
||||||
//TODO: improve comparison
|
//TODO: improve comparison
|
||||||
if c.Services[role].Spec.Type != service.Spec.Type {
|
if c.Services[role].Spec.Type != service.Spec.Type {
|
||||||
return false, fmt.Sprintf("new %s service's type %q doesn't match the current one %q",
|
return false, fmt.Sprintf("new %s service's type %q doesn't match the current one %q",
|
||||||
|
|
@ -420,8 +420,8 @@ func (c *Cluster) Update(newSpec *spec.Postgresql) error {
|
||||||
c.Postgresql = *newSpec
|
c.Postgresql = *newSpec
|
||||||
}()
|
}()
|
||||||
|
|
||||||
for _, role := range []postgresRole{master, replica} {
|
for _, role := range []PostgresRole{Master, Replica} {
|
||||||
if role == replica {
|
if role == Replica {
|
||||||
if !newSpec.Spec.ReplicaLoadBalancer {
|
if !newSpec.Spec.ReplicaLoadBalancer {
|
||||||
// old spec had a load balancer, but the new one doesn't
|
// old spec had a load balancer, but the new one doesn't
|
||||||
if c.Spec.ReplicaLoadBalancer {
|
if c.Spec.ReplicaLoadBalancer {
|
||||||
|
|
@ -519,8 +519,8 @@ func (c *Cluster) Delete() error {
|
||||||
return fmt.Errorf("could not delete endpoint: %v", err)
|
return fmt.Errorf("could not delete endpoint: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, role := range []postgresRole{master, replica} {
|
for _, role := range []PostgresRole{Master, Replica} {
|
||||||
if role == replica && !c.Spec.ReplicaLoadBalancer {
|
if role == Replica && !c.Spec.ReplicaLoadBalancer {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if err := c.deleteService(role); err != nil {
|
if err := c.deleteService(role); err != nil {
|
||||||
|
|
|
||||||
|
|
@ -58,9 +58,9 @@ func (c *Cluster) endpointName() string {
|
||||||
return c.Name
|
return c.Name
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Cluster) serviceName(role postgresRole) string {
|
func (c *Cluster) serviceName(role PostgresRole) string {
|
||||||
name := c.Name
|
name := c.Name
|
||||||
if role == replica {
|
if role == Replica {
|
||||||
name = name + "-repl"
|
name = name + "-repl"
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -383,7 +383,7 @@ func (c *Cluster) generateStatefulSet(spec spec.PostgresSpec) (*v1beta1.Stateful
|
||||||
},
|
},
|
||||||
Spec: v1beta1.StatefulSetSpec{
|
Spec: v1beta1.StatefulSetSpec{
|
||||||
Replicas: &spec.NumberOfInstances,
|
Replicas: &spec.NumberOfInstances,
|
||||||
ServiceName: c.serviceName(master),
|
ServiceName: c.serviceName(Master),
|
||||||
Template: *podTemplate,
|
Template: *podTemplate,
|
||||||
VolumeClaimTemplates: []v1.PersistentVolumeClaim{*volumeClaimTemplate},
|
VolumeClaimTemplates: []v1.PersistentVolumeClaim{*volumeClaimTemplate},
|
||||||
},
|
},
|
||||||
|
|
@ -465,10 +465,10 @@ func (c *Cluster) generateSingleUserSecret(namespace string, pgUser spec.PgUser)
|
||||||
return &secret
|
return &secret
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Cluster) generateService(role postgresRole, newSpec *spec.PostgresSpec) *v1.Service {
|
func (c *Cluster) generateService(role PostgresRole, newSpec *spec.PostgresSpec) *v1.Service {
|
||||||
var dnsName string
|
var dnsName string
|
||||||
|
|
||||||
if role == master {
|
if role == Master {
|
||||||
dnsName = c.masterDNSName()
|
dnsName = c.masterDNSName()
|
||||||
} else {
|
} else {
|
||||||
dnsName = c.replicaDNSName()
|
dnsName = c.replicaDNSName()
|
||||||
|
|
@ -479,7 +479,7 @@ func (c *Cluster) generateService(role postgresRole, newSpec *spec.PostgresSpec)
|
||||||
Type: v1.ServiceTypeClusterIP,
|
Type: v1.ServiceTypeClusterIP,
|
||||||
}
|
}
|
||||||
|
|
||||||
if role == replica {
|
if role == Replica {
|
||||||
serviceSpec.Selector = c.roleLabelsSet(role)
|
serviceSpec.Selector = c.roleLabelsSet(role)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -523,7 +523,7 @@ func (c *Cluster) generateMasterEndpoints(subsets []v1.EndpointSubset) *v1.Endpo
|
||||||
ObjectMeta: metav1.ObjectMeta{
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
Name: c.endpointName(),
|
Name: c.endpointName(),
|
||||||
Namespace: c.Namespace,
|
Namespace: c.Namespace,
|
||||||
Labels: c.roleLabelsSet(master),
|
Labels: c.roleLabelsSet(Master),
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
if len(subsets) > 0 {
|
if len(subsets) > 0 {
|
||||||
|
|
|
||||||
|
|
@ -19,16 +19,16 @@ func (c *Cluster) loadResources() error {
|
||||||
var err error
|
var err error
|
||||||
ns := c.Namespace
|
ns := c.Namespace
|
||||||
|
|
||||||
masterService, err := c.KubeClient.Services(ns).Get(c.serviceName(master), metav1.GetOptions{})
|
masterService, err := c.KubeClient.Services(ns).Get(c.serviceName(Master), metav1.GetOptions{})
|
||||||
if err == nil {
|
if err == nil {
|
||||||
c.Services[master] = masterService
|
c.Services[Master] = masterService
|
||||||
} else if !k8sutil.ResourceNotFound(err) {
|
} else if !k8sutil.ResourceNotFound(err) {
|
||||||
c.logger.Errorf("could not get master service: %v", err)
|
c.logger.Errorf("could not get master service: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
replicaService, err := c.KubeClient.Services(ns).Get(c.serviceName(replica), metav1.GetOptions{})
|
replicaService, err := c.KubeClient.Services(ns).Get(c.serviceName(Replica), metav1.GetOptions{})
|
||||||
if err == nil {
|
if err == nil {
|
||||||
c.Services[replica] = replicaService
|
c.Services[Replica] = replicaService
|
||||||
} else if !k8sutil.ResourceNotFound(err) {
|
} else if !k8sutil.ResourceNotFound(err) {
|
||||||
c.logger.Errorf("could not get replica service: %v", err)
|
c.logger.Errorf("could not get replica service: %v", err)
|
||||||
}
|
}
|
||||||
|
|
@ -214,7 +214,7 @@ func (c *Cluster) deleteStatefulSet() error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Cluster) createService(role postgresRole) (*v1.Service, error) {
|
func (c *Cluster) createService(role PostgresRole) (*v1.Service, error) {
|
||||||
if c.Services[role] != nil {
|
if c.Services[role] != nil {
|
||||||
return nil, fmt.Errorf("service already exists in the cluster")
|
return nil, fmt.Errorf("service already exists in the cluster")
|
||||||
}
|
}
|
||||||
|
|
@ -229,7 +229,7 @@ func (c *Cluster) createService(role postgresRole) (*v1.Service, error) {
|
||||||
return service, nil
|
return service, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Cluster) updateService(role postgresRole, newService *v1.Service) error {
|
func (c *Cluster) updateService(role PostgresRole, newService *v1.Service) error {
|
||||||
if c.Services[role] == nil {
|
if c.Services[role] == nil {
|
||||||
return fmt.Errorf("there is no service in the cluster")
|
return fmt.Errorf("there is no service in the cluster")
|
||||||
}
|
}
|
||||||
|
|
@ -244,7 +244,7 @@ func (c *Cluster) updateService(role postgresRole, newService *v1.Service) error
|
||||||
err error
|
err error
|
||||||
)
|
)
|
||||||
|
|
||||||
if role == master {
|
if role == Master {
|
||||||
// for the master service we need to re-create the endpoint as well. Get the up-to-date version of
|
// for the master service we need to re-create the endpoint as well. Get the up-to-date version of
|
||||||
// the addresses stored in it before the service is deleted (deletion of the service removes the endpooint)
|
// the addresses stored in it before the service is deleted (deletion of the service removes the endpooint)
|
||||||
currentEndpoint, err = c.KubeClient.Endpoints(c.Services[role].Namespace).Get(c.Services[role].Name, metav1.GetOptions{})
|
currentEndpoint, err = c.KubeClient.Endpoints(c.Services[role].Namespace).Get(c.Services[role].Name, metav1.GetOptions{})
|
||||||
|
|
@ -262,7 +262,7 @@ func (c *Cluster) updateService(role postgresRole, newService *v1.Service) error
|
||||||
return fmt.Errorf("could not create service %q: %v", serviceName, err)
|
return fmt.Errorf("could not create service %q: %v", serviceName, err)
|
||||||
}
|
}
|
||||||
c.Services[role] = svc
|
c.Services[role] = svc
|
||||||
if role == master {
|
if role == Master {
|
||||||
// create the new endpoint using the addresses obtained from the previous one
|
// create the new endpoint using the addresses obtained from the previous one
|
||||||
endpointSpec := c.generateMasterEndpoints(currentEndpoint.Subsets)
|
endpointSpec := c.generateMasterEndpoints(currentEndpoint.Subsets)
|
||||||
ep, err := c.KubeClient.Endpoints(c.Services[role].Namespace).Create(endpointSpec)
|
ep, err := c.KubeClient.Endpoints(c.Services[role].Namespace).Create(endpointSpec)
|
||||||
|
|
@ -304,7 +304,7 @@ func (c *Cluster) updateService(role postgresRole, newService *v1.Service) error
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Cluster) deleteService(role postgresRole) error {
|
func (c *Cluster) deleteService(role PostgresRole) error {
|
||||||
c.logger.Debugf("deleting service %s", role)
|
c.logger.Debugf("deleting service %s", role)
|
||||||
if c.Services[role] == nil {
|
if c.Services[role] == nil {
|
||||||
return fmt.Errorf("there is no %s service in the cluster", role)
|
return fmt.Errorf("there is no %s service in the cluster", role)
|
||||||
|
|
@ -406,12 +406,12 @@ func (c *Cluster) createRoles() (err error) {
|
||||||
|
|
||||||
// GetServiceMaster returns cluster's kubernetes master Service
|
// GetServiceMaster returns cluster's kubernetes master Service
|
||||||
func (c *Cluster) GetServiceMaster() *v1.Service {
|
func (c *Cluster) GetServiceMaster() *v1.Service {
|
||||||
return c.Services[master]
|
return c.Services[Master]
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetServiceReplica returns cluster's kubernetes replica Service
|
// GetServiceReplica returns cluster's kubernetes replica Service
|
||||||
func (c *Cluster) GetServiceReplica() *v1.Service {
|
func (c *Cluster) GetServiceReplica() *v1.Service {
|
||||||
return c.Services[replica]
|
return c.Services[Replica]
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetEndpoint returns cluster's kubernetes Endpoint
|
// GetEndpoint returns cluster's kubernetes Endpoint
|
||||||
|
|
|
||||||
|
|
@ -41,8 +41,8 @@ func (c *Cluster) Sync() error {
|
||||||
}
|
}
|
||||||
|
|
||||||
c.logger.Debugf("syncing services")
|
c.logger.Debugf("syncing services")
|
||||||
for _, role := range []postgresRole{master, replica} {
|
for _, role := range []PostgresRole{Master, Replica} {
|
||||||
if role == replica && !c.Spec.ReplicaLoadBalancer {
|
if role == Replica && !c.Spec.ReplicaLoadBalancer {
|
||||||
if c.Services[role] != nil {
|
if c.Services[role] != nil {
|
||||||
// delete the left over replica service
|
// delete the left over replica service
|
||||||
if err := c.deleteService(role); err != nil {
|
if err := c.deleteService(role); err != nil {
|
||||||
|
|
@ -80,7 +80,7 @@ func (c *Cluster) Sync() error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Cluster) syncService(role postgresRole) error {
|
func (c *Cluster) syncService(role PostgresRole) error {
|
||||||
cSpec := c.Spec
|
cSpec := c.Spec
|
||||||
if c.Services[role] == nil {
|
if c.Services[role] == nil {
|
||||||
c.logger.Infof("could not find the cluster's %s service", role)
|
c.logger.Infof("could not find the cluster's %s service", role)
|
||||||
|
|
|
||||||
|
|
@ -1,8 +1,8 @@
|
||||||
package cluster
|
package cluster
|
||||||
|
|
||||||
type postgresRole string
|
type PostgresRole string
|
||||||
|
|
||||||
const (
|
const (
|
||||||
master postgresRole = "master"
|
Master PostgresRole = "master"
|
||||||
replica postgresRole = "replica"
|
Replica PostgresRole = "replica"
|
||||||
)
|
)
|
||||||
|
|
|
||||||
|
|
@ -94,7 +94,7 @@ func (c *Cluster) logStatefulSetChanges(old, new *v1beta1.StatefulSet, isUpdate
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Cluster) logServiceChanges(role postgresRole, old, new *v1.Service, isUpdate bool, reason string) {
|
func (c *Cluster) logServiceChanges(role PostgresRole, old, new *v1.Service, isUpdate bool, reason string) {
|
||||||
if isUpdate {
|
if isUpdate {
|
||||||
c.logger.Infof("%s service %q has been changed",
|
c.logger.Infof("%s service %q has been changed",
|
||||||
role, util.NameFromMeta(old.ObjectMeta),
|
role, util.NameFromMeta(old.ObjectMeta),
|
||||||
|
|
@ -283,7 +283,7 @@ func (c *Cluster) labelsSet() labels.Set {
|
||||||
return labels.Set(lbls)
|
return labels.Set(lbls)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Cluster) roleLabelsSet(role postgresRole) labels.Set {
|
func (c *Cluster) roleLabelsSet(role PostgresRole) labels.Set {
|
||||||
lbls := c.labelsSet()
|
lbls := c.labelsSet()
|
||||||
lbls[c.OpConfig.PodRoleLabel] = string(role)
|
lbls[c.OpConfig.PodRoleLabel] = string(role)
|
||||||
return lbls
|
return lbls
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue