use sync methods while updating the cluster
This commit is contained in:
parent
47dd766fa7
commit
86803406db
|
|
@ -28,7 +28,6 @@ import (
|
|||
"github.com/zalando-incubator/postgres-operator/pkg/util/patroni"
|
||||
"github.com/zalando-incubator/postgres-operator/pkg/util/teams"
|
||||
"github.com/zalando-incubator/postgres-operator/pkg/util/users"
|
||||
"github.com/zalando-incubator/postgres-operator/pkg/util/volumes"
|
||||
)
|
||||
|
||||
var (
|
||||
|
|
@ -46,7 +45,7 @@ type Config struct {
|
|||
|
||||
type kubeResources struct {
|
||||
Services map[PostgresRole]*v1.Service
|
||||
Endpoint *v1.Endpoints
|
||||
Endpoints map[PostgresRole]*v1.Endpoints
|
||||
Secrets map[types.UID]*v1.Secret
|
||||
Statefulset *v1beta1.StatefulSet
|
||||
PodDisruptionBudget *policybeta1.PodDisruptionBudget
|
||||
|
|
@ -99,12 +98,15 @@ func New(cfg Config, kubeClient k8sutil.KubernetesClient, pgSpec spec.Postgresql
|
|||
})
|
||||
|
||||
cluster := &Cluster{
|
||||
Config: cfg,
|
||||
Postgresql: pgSpec,
|
||||
pgUsers: make(map[string]spec.PgUser),
|
||||
systemUsers: make(map[string]spec.PgUser),
|
||||
podSubscribers: make(map[spec.NamespacedName]chan spec.PodEvent),
|
||||
kubeResources: kubeResources{Secrets: make(map[types.UID]*v1.Secret), Services: make(map[PostgresRole]*v1.Service)},
|
||||
Config: cfg,
|
||||
Postgresql: pgSpec,
|
||||
pgUsers: make(map[string]spec.PgUser),
|
||||
systemUsers: make(map[string]spec.PgUser),
|
||||
podSubscribers: make(map[spec.NamespacedName]chan spec.PodEvent),
|
||||
kubeResources: kubeResources{
|
||||
Secrets: make(map[types.UID]*v1.Secret),
|
||||
Services: make(map[PostgresRole]*v1.Service),
|
||||
Endpoints: make(map[PostgresRole]*v1.Endpoints)},
|
||||
masterLess: false,
|
||||
userSyncStrategy: users.DefaultUserSyncStrategy{},
|
||||
deleteOptions: &metav1.DeleteOptions{OrphanDependents: &orphanDependents},
|
||||
|
|
@ -203,17 +205,16 @@ func (c *Cluster) Create() error {
|
|||
|
||||
c.setStatus(spec.ClusterStatusCreating)
|
||||
|
||||
//service will create endpoint implicitly
|
||||
ep, err = c.createEndpoint()
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not create endpoint: %v", err)
|
||||
}
|
||||
c.logger.Infof("endpoint %q has been successfully created", util.NameFromMeta(ep.ObjectMeta))
|
||||
|
||||
for _, role := range []PostgresRole{Master, Replica} {
|
||||
if role == Replica && !c.Spec.ReplicaLoadBalancer {
|
||||
continue
|
||||
}
|
||||
ep, err = c.createEndpoint(role)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not create %s endpoint: %v", role, err)
|
||||
}
|
||||
c.logger.Infof("endpoint %q has been successfully created", util.NameFromMeta(ep.ObjectMeta))
|
||||
|
||||
service, err = c.createService(role)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not create %s service: %v", role, err)
|
||||
|
|
@ -226,7 +227,7 @@ func (c *Cluster) Create() error {
|
|||
}
|
||||
c.logger.Infof("users have been initialized")
|
||||
|
||||
if err = c.applySecrets(); err != nil {
|
||||
if err = c.syncSecrets(); err != nil {
|
||||
return fmt.Errorf("could not create secrets: %v", err)
|
||||
}
|
||||
c.logger.Infof("secrets have been successfully created")
|
||||
|
|
@ -257,8 +258,8 @@ func (c *Cluster) Create() error {
|
|||
}
|
||||
c.logger.Infof("users have been successfully created")
|
||||
|
||||
if err = c.createDatabases(); err != nil {
|
||||
return fmt.Errorf("could not create databases: %v", err)
|
||||
if err = c.syncDatabases(); err != nil {
|
||||
return fmt.Errorf("could not sync databases: %v", err)
|
||||
}
|
||||
c.logger.Infof("databases have been successfully created")
|
||||
} else {
|
||||
|
|
@ -267,48 +268,13 @@ func (c *Cluster) Create() error {
|
|||
}
|
||||
}
|
||||
|
||||
err = c.listResources()
|
||||
if err != nil {
|
||||
if err := c.listResources(); err != nil {
|
||||
c.logger.Errorf("could not list resources: %v", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Cluster) sameServiceWith(role PostgresRole, service *v1.Service) (match bool, reason string) {
|
||||
//TODO: improve comparison
|
||||
if c.Services[role].Spec.Type != service.Spec.Type {
|
||||
return false, fmt.Sprintf("new %s service's type %q doesn't match the current one %q",
|
||||
role, service.Spec.Type, c.Services[role].Spec.Type)
|
||||
}
|
||||
oldSourceRanges := c.Services[role].Spec.LoadBalancerSourceRanges
|
||||
newSourceRanges := service.Spec.LoadBalancerSourceRanges
|
||||
/* work around Kubernetes 1.6 serializing [] as nil. See https://github.com/kubernetes/kubernetes/issues/43203 */
|
||||
if (len(oldSourceRanges) == 0) && (len(newSourceRanges) == 0) {
|
||||
return true, ""
|
||||
}
|
||||
if !reflect.DeepEqual(oldSourceRanges, newSourceRanges) {
|
||||
return false, fmt.Sprintf("new %s service's LoadBalancerSourceRange doesn't match the current one", role)
|
||||
}
|
||||
|
||||
oldDNSAnnotation := c.Services[role].Annotations[constants.ZalandoDNSNameAnnotation]
|
||||
newDNSAnnotation := service.Annotations[constants.ZalandoDNSNameAnnotation]
|
||||
if oldDNSAnnotation != newDNSAnnotation {
|
||||
return false, fmt.Sprintf("new %s service's %q annotation doesn't match the current one", role, constants.ZalandoDNSNameAnnotation)
|
||||
}
|
||||
|
||||
return true, ""
|
||||
}
|
||||
|
||||
func (c *Cluster) sameVolumeWith(volume spec.Volume) (match bool, reason string) {
|
||||
if !reflect.DeepEqual(c.Spec.Volume, volume) {
|
||||
reason = "new volume's specification doesn't match the current one"
|
||||
} else {
|
||||
match = true
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (c *Cluster) compareStatefulSetWith(statefulSet *v1beta1.StatefulSet) *compareStatefulsetResult {
|
||||
reasons := make([]string, 0)
|
||||
var match, needsRollUpdate, needsReplace bool
|
||||
|
|
@ -406,6 +372,7 @@ func (c *Cluster) compareStatefulSetWith(statefulSet *v1beta1.StatefulSet) *comp
|
|||
if needsRollUpdate || needsReplace {
|
||||
match = false
|
||||
}
|
||||
|
||||
return &compareStatefulsetResult{match: match, reasons: reasons, rollingUpdate: needsRollUpdate, replace: needsReplace}
|
||||
}
|
||||
|
||||
|
|
@ -417,12 +384,13 @@ func compareResources(a *v1.ResourceRequirements, b *v1.ResourceRequirements) (e
|
|||
if equal && (b != nil) {
|
||||
equal = compareResoucesAssumeFirstNotNil(b, a)
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func compareResoucesAssumeFirstNotNil(a *v1.ResourceRequirements, b *v1.ResourceRequirements) bool {
|
||||
if b == nil || (len(b.Requests) == 0) {
|
||||
return (len(a.Requests) == 0)
|
||||
return len(a.Requests) == 0
|
||||
}
|
||||
for k, v := range a.Requests {
|
||||
if (&v).Cmp(b.Requests[k]) != 0 {
|
||||
|
|
@ -440,108 +408,108 @@ func compareResoucesAssumeFirstNotNil(a *v1.ResourceRequirements, b *v1.Resource
|
|||
|
||||
// Update changes Kubernetes objects according to the new specification. Unlike the sync case, the missing object.
|
||||
// (i.e. service) is treated as an error.
|
||||
func (c *Cluster) Update(newSpec *spec.Postgresql) error {
|
||||
func (c *Cluster) Update(oldSpec, newSpec *spec.Postgresql) error {
|
||||
updateFailed := false
|
||||
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
|
||||
c.setStatus(spec.ClusterStatusUpdating)
|
||||
c.Postgresql = *newSpec
|
||||
|
||||
/* Make sure we update when this function exits */
|
||||
defer func() {
|
||||
c.Postgresql = *newSpec
|
||||
if updateFailed {
|
||||
c.setStatus(spec.ClusterStatusUpdateFailed)
|
||||
} else if c.Status != spec.ClusterStatusRunning {
|
||||
c.setStatus(spec.ClusterStatusRunning)
|
||||
}
|
||||
}()
|
||||
|
||||
for _, role := range []PostgresRole{Master, Replica} {
|
||||
if role == Replica {
|
||||
if !newSpec.Spec.ReplicaLoadBalancer {
|
||||
// old spec had a load balancer, but the new one doesn't
|
||||
if c.Spec.ReplicaLoadBalancer {
|
||||
err := c.deleteService(role)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not delete obsolete %s service: %v", role, err)
|
||||
}
|
||||
c.logger.Infof("deleted obsolete %s service", role)
|
||||
}
|
||||
} else {
|
||||
if !c.Spec.ReplicaLoadBalancer {
|
||||
// old spec didn't have a load balancer, but the one does
|
||||
service, err := c.createService(role)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not create new %s service: %v", role, err)
|
||||
}
|
||||
c.logger.Infof("%s service %q has been created", role, util.NameFromMeta(service.ObjectMeta))
|
||||
}
|
||||
}
|
||||
// only proceed further if both old and new load balancer were present
|
||||
if !(newSpec.Spec.ReplicaLoadBalancer && c.Spec.ReplicaLoadBalancer) {
|
||||
continue
|
||||
}
|
||||
}
|
||||
newService := c.generateService(role, &newSpec.Spec)
|
||||
if match, reason := c.sameServiceWith(role, newService); !match {
|
||||
c.logServiceChanges(role, c.Services[role], newService, true, reason)
|
||||
if err := c.updateService(role, newService); err != nil {
|
||||
c.setStatus(spec.ClusterStatusUpdateFailed)
|
||||
return fmt.Errorf("could not update %s service: %v", role, err)
|
||||
}
|
||||
c.logger.Infof("%s service %q has been updated", role, util.NameFromMeta(c.Services[role].ObjectMeta))
|
||||
if oldSpec.Spec.PgVersion != newSpec.Spec.PgVersion { // PG versions comparison
|
||||
c.logger.Warningf("postgresql version change(%q -> %q) has no effect", oldSpec.Spec.PgVersion, newSpec.Spec.PgVersion)
|
||||
//we need that hack to generate statefulset with the old version
|
||||
newSpec.Spec.PgVersion = oldSpec.Spec.PgVersion
|
||||
}
|
||||
|
||||
// Service
|
||||
if !reflect.DeepEqual(c.generateService(Master, &oldSpec.Spec), c.generateService(Master, &newSpec.Spec)) ||
|
||||
!reflect.DeepEqual(c.generateService(Replica, &oldSpec.Spec), c.generateService(Replica, &newSpec.Spec)) ||
|
||||
oldSpec.Spec.ReplicaLoadBalancer != newSpec.Spec.ReplicaLoadBalancer {
|
||||
c.logger.Debugf("syncing services")
|
||||
if err := c.syncServices(); err != nil {
|
||||
c.logger.Errorf("could not sync services: %v", err)
|
||||
updateFailed = true
|
||||
}
|
||||
}
|
||||
|
||||
newStatefulSet, err := c.generateStatefulSet(newSpec.Spec)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not generate statefulset: %v", err)
|
||||
}
|
||||
cmp := c.compareStatefulSetWith(newStatefulSet)
|
||||
if !reflect.DeepEqual(oldSpec.Spec.Users, newSpec.Spec.Users) {
|
||||
c.logger.Debugf("syncing secrets")
|
||||
if err := c.initUsers(); err != nil {
|
||||
c.logger.Errorf("could not init users: %v", err)
|
||||
updateFailed = true
|
||||
}
|
||||
|
||||
if !cmp.match {
|
||||
c.logStatefulSetChanges(c.Statefulset, newStatefulSet, true, cmp.reasons)
|
||||
//TODO: mind the case of updating allowedSourceRanges
|
||||
if !cmp.replace {
|
||||
if err := c.updateStatefulSet(newStatefulSet); err != nil {
|
||||
c.setStatus(spec.ClusterStatusUpdateFailed)
|
||||
return fmt.Errorf("could not upate statefulset: %v", err)
|
||||
}
|
||||
} else {
|
||||
if err := c.replaceStatefulSet(newStatefulSet); err != nil {
|
||||
c.setStatus(spec.ClusterStatusUpdateFailed)
|
||||
return fmt.Errorf("could not replace statefulset: %v", err)
|
||||
c.logger.Debugf("syncing secrets")
|
||||
|
||||
//TODO: mind the secrets of the deleted/new users
|
||||
if err := c.syncSecrets(); err != nil {
|
||||
c.logger.Errorf("could not sync secrets: %v", err)
|
||||
updateFailed = true
|
||||
}
|
||||
|
||||
if !c.databaseAccessDisabled() {
|
||||
c.logger.Debugf("syncing roles")
|
||||
if err := c.syncRoles(true); err != nil {
|
||||
c.logger.Errorf("could not sync roles: %v", err)
|
||||
updateFailed = true
|
||||
}
|
||||
}
|
||||
//TODO: if there is a change in numberOfInstances, make sure Pods have been created/deleted
|
||||
c.logger.Infof("statefulset %q has been updated", util.NameFromMeta(c.Statefulset.ObjectMeta))
|
||||
}
|
||||
|
||||
if c.Spec.PgVersion != newSpec.Spec.PgVersion { // PG versions comparison
|
||||
c.logger.Warningf("postgresql version change(%q -> %q) is not allowed",
|
||||
c.Spec.PgVersion, newSpec.Spec.PgVersion)
|
||||
//TODO: rewrite pg version in tpr spec
|
||||
}
|
||||
// Volume
|
||||
if oldSpec.Spec.Size != newSpec.Spec.Size {
|
||||
c.logger.Debugf("syncing persistent volumes")
|
||||
c.logVolumeChanges(oldSpec.Spec.Volume, newSpec.Spec.Volume)
|
||||
|
||||
if cmp.rollingUpdate {
|
||||
c.logger.Infof("rolling update is needed")
|
||||
// TODO: wait for actual streaming to the replica
|
||||
if err := c.recreatePods(); err != nil {
|
||||
c.setStatus(spec.ClusterStatusUpdateFailed)
|
||||
return fmt.Errorf("could not recreate pods: %v", err)
|
||||
if err := c.syncVolumes(); err != nil {
|
||||
c.logger.Errorf("could not sync persistent volumes: %v", err)
|
||||
updateFailed = true
|
||||
}
|
||||
c.logger.Infof("rolling update has been finished")
|
||||
}
|
||||
|
||||
if match, reason := c.sameVolumeWith(newSpec.Spec.Volume); !match {
|
||||
c.logVolumeChanges(c.Spec.Volume, newSpec.Spec.Volume, reason)
|
||||
if err := c.resizeVolumes(newSpec.Spec.Volume, []volumes.VolumeResizer{&volumes.EBSVolumeResizer{}}); err != nil {
|
||||
return fmt.Errorf("could not update volumes: %v", err)
|
||||
// Statefulset
|
||||
func() {
|
||||
oldSs, err := c.generateStatefulSet(&oldSpec.Spec)
|
||||
if err != nil {
|
||||
c.logger.Errorf("could not generate old statefulset spec")
|
||||
updateFailed = true
|
||||
return
|
||||
}
|
||||
c.logger.Infof("volumes have been updated successfully")
|
||||
}
|
||||
|
||||
if err := c.syncPodDisruptionBudget(true); err != nil {
|
||||
c.setStatus(spec.ClusterStatusUpdateFailed)
|
||||
return fmt.Errorf("could not update pod disruption budget: %v", err)
|
||||
}
|
||||
newSs, err := c.generateStatefulSet(&newSpec.Spec)
|
||||
if err != nil {
|
||||
c.logger.Errorf("could not generate new statefulset spec")
|
||||
updateFailed = true
|
||||
return
|
||||
}
|
||||
|
||||
c.setStatus(spec.ClusterStatusRunning)
|
||||
if !reflect.DeepEqual(oldSs, newSs) {
|
||||
c.logger.Debugf("syncing statefulsets")
|
||||
if err := c.syncStatefulSet(); err != nil {
|
||||
c.logger.Errorf("could not sync statefulsets: %v", err)
|
||||
updateFailed = true
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// Databases
|
||||
if !reflect.DeepEqual(oldSpec.Spec.Databases, newSpec.Spec.Databases) {
|
||||
c.logger.Infof("syncing databases")
|
||||
if err := c.syncDatabases(); err != nil {
|
||||
c.logger.Errorf("could not sync databases: %v", err)
|
||||
updateFailed = true
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
@ -551,14 +519,15 @@ func (c *Cluster) Delete() error {
|
|||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
|
||||
if err := c.deleteEndpoint(); err != nil {
|
||||
return fmt.Errorf("could not delete endpoint: %v", err)
|
||||
}
|
||||
|
||||
for _, role := range []PostgresRole{Master, Replica} {
|
||||
if role == Replica && !c.Spec.ReplicaLoadBalancer {
|
||||
continue
|
||||
}
|
||||
|
||||
if err := c.deleteEndpoint(role); err != nil {
|
||||
return fmt.Errorf("could not delete %s endpoint: %v", role, err)
|
||||
}
|
||||
|
||||
if err := c.deleteService(role); err != nil {
|
||||
return fmt.Errorf("could not delete %s service: %v", role, err)
|
||||
}
|
||||
|
|
@ -715,7 +684,8 @@ func (c *Cluster) GetStatus() *spec.ClusterStatus {
|
|||
|
||||
MasterService: c.GetServiceMaster(),
|
||||
ReplicaService: c.GetServiceReplica(),
|
||||
Endpoint: c.GetEndpoint(),
|
||||
MasterEndpoint: c.GetEndpointMaster(),
|
||||
ReplicaEndpoint: c.GetEndpointReplica(),
|
||||
StatefulSet: c.GetStatefulSet(),
|
||||
PodDisruptionBudget: c.GetPodDisruptionBudget(),
|
||||
CurrentProcess: c.GetCurrentProcess(),
|
||||
|
|
|
|||
|
|
@ -55,8 +55,13 @@ func (c *Cluster) statefulSetName() string {
|
|||
return c.Name
|
||||
}
|
||||
|
||||
func (c *Cluster) endpointName() string {
|
||||
return c.Name
|
||||
func (c *Cluster) endpointName(role PostgresRole) string {
|
||||
name := c.Name
|
||||
if role == Replica {
|
||||
name = name + "-repl"
|
||||
}
|
||||
|
||||
return name
|
||||
}
|
||||
|
||||
func (c *Cluster) serviceName(role PostgresRole) string {
|
||||
|
|
@ -149,7 +154,7 @@ func (c *Cluster) generateSpiloJSONConfiguration(pg *spec.PostgresqlParam, patro
|
|||
// maps and normal string items in the array of initdb options. We need
|
||||
// both to convert the initial key-value to strings when necessary, and
|
||||
// to de-duplicate the options supplied.
|
||||
PATRONI_INITDB_PARAMS:
|
||||
PatroniInitDBParams:
|
||||
for _, k := range initdbOptionNames {
|
||||
v := patroni.InitDB[k]
|
||||
for i, defaultParam := range config.Bootstrap.Initdb {
|
||||
|
|
@ -159,7 +164,7 @@ PATRONI_INITDB_PARAMS:
|
|||
for k1 := range defaultParam.(map[string]string) {
|
||||
if k1 == k {
|
||||
(config.Bootstrap.Initdb[i]).(map[string]string)[k] = v
|
||||
continue PATRONI_INITDB_PARAMS
|
||||
continue PatroniInitDBParams
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -167,12 +172,12 @@ PATRONI_INITDB_PARAMS:
|
|||
{
|
||||
/* if the option already occurs in the list */
|
||||
if defaultParam.(string) == v {
|
||||
continue PATRONI_INITDB_PARAMS
|
||||
continue PatroniInitDBParams
|
||||
}
|
||||
}
|
||||
default:
|
||||
c.logger.Warningf("unsupported type for initdb configuration item %s: %T", defaultParam, defaultParam)
|
||||
continue PATRONI_INITDB_PARAMS
|
||||
continue PatroniInitDBParams
|
||||
}
|
||||
}
|
||||
// The following options are known to have no parameters
|
||||
|
|
@ -356,7 +361,7 @@ func (c *Cluster) generatePodTemplate(resourceRequirements *v1.ResourceRequireme
|
|||
if cloneDescription.ClusterName != "" {
|
||||
envVars = append(envVars, c.generateCloneEnvironment(cloneDescription)...)
|
||||
}
|
||||
privilegedMode := bool(true)
|
||||
privilegedMode := true
|
||||
container := v1.Container{
|
||||
Name: c.containerName(),
|
||||
Image: c.OpConfig.DockerImage,
|
||||
|
|
@ -411,7 +416,7 @@ func (c *Cluster) generatePodTemplate(resourceRequirements *v1.ResourceRequireme
|
|||
return &template
|
||||
}
|
||||
|
||||
func (c *Cluster) generateStatefulSet(spec spec.PostgresSpec) (*v1beta1.StatefulSet, error) {
|
||||
func (c *Cluster) generateStatefulSet(spec *spec.PostgresSpec) (*v1beta1.StatefulSet, error) {
|
||||
resourceRequirements, err := c.resourceRequirements(spec.Resources)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not generate resource requirements: %v", err)
|
||||
|
|
@ -513,7 +518,7 @@ func (c *Cluster) generateSingleUserSecret(namespace string, pgUser spec.PgUser)
|
|||
return &secret
|
||||
}
|
||||
|
||||
func (c *Cluster) generateService(role PostgresRole, newSpec *spec.PostgresSpec) *v1.Service {
|
||||
func (c *Cluster) generateService(role PostgresRole, spec *spec.PostgresSpec) *v1.Service {
|
||||
var dnsName string
|
||||
|
||||
if role == Master {
|
||||
|
|
@ -534,12 +539,12 @@ func (c *Cluster) generateService(role PostgresRole, newSpec *spec.PostgresSpec)
|
|||
var annotations map[string]string
|
||||
|
||||
// Examine the per-cluster load balancer setting, if it is not defined - check the operator configuration.
|
||||
if (newSpec.UseLoadBalancer != nil && *newSpec.UseLoadBalancer) ||
|
||||
(newSpec.UseLoadBalancer == nil && c.OpConfig.EnableLoadBalancer) {
|
||||
if (spec.UseLoadBalancer != nil && *spec.UseLoadBalancer) ||
|
||||
(spec.UseLoadBalancer == nil && c.OpConfig.EnableLoadBalancer) {
|
||||
|
||||
// safe default value: lock load balancer to only local address unless overridden explicitly.
|
||||
sourceRanges := []string{localHost}
|
||||
allowedSourceRanges := newSpec.AllowedSourceRanges
|
||||
allowedSourceRanges := spec.AllowedSourceRanges
|
||||
if len(allowedSourceRanges) >= 0 {
|
||||
sourceRanges = allowedSourceRanges
|
||||
}
|
||||
|
|
@ -566,12 +571,12 @@ func (c *Cluster) generateService(role PostgresRole, newSpec *spec.PostgresSpec)
|
|||
return service
|
||||
}
|
||||
|
||||
func (c *Cluster) generateMasterEndpoints(subsets []v1.EndpointSubset) *v1.Endpoints {
|
||||
func (c *Cluster) generateEndpoint(role PostgresRole, subsets []v1.EndpointSubset) *v1.Endpoints {
|
||||
endpoints := &v1.Endpoints{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: c.endpointName(),
|
||||
Name: c.endpointName(role),
|
||||
Namespace: c.Namespace,
|
||||
Labels: c.roleLabelsSet(Master),
|
||||
Labels: c.roleLabelsSet(role),
|
||||
},
|
||||
}
|
||||
if len(subsets) > 0 {
|
||||
|
|
|
|||
|
|
@ -11,67 +11,12 @@ import (
|
|||
"k8s.io/client-go/pkg/apis/apps/v1beta1"
|
||||
policybeta1 "k8s.io/client-go/pkg/apis/policy/v1beta1"
|
||||
|
||||
"github.com/zalando-incubator/postgres-operator/pkg/spec"
|
||||
"github.com/zalando-incubator/postgres-operator/pkg/util"
|
||||
"github.com/zalando-incubator/postgres-operator/pkg/util/constants"
|
||||
"github.com/zalando-incubator/postgres-operator/pkg/util/k8sutil"
|
||||
"github.com/zalando-incubator/postgres-operator/pkg/util/retryutil"
|
||||
)
|
||||
|
||||
func (c *Cluster) loadResources() error {
|
||||
var err error
|
||||
ns := c.Namespace
|
||||
|
||||
masterService, err := c.KubeClient.Services(ns).Get(c.serviceName(Master), metav1.GetOptions{})
|
||||
if err == nil {
|
||||
c.Services[Master] = masterService
|
||||
} else if !k8sutil.ResourceNotFound(err) {
|
||||
c.logger.Errorf("could not get master service: %v", err)
|
||||
}
|
||||
|
||||
replicaService, err := c.KubeClient.Services(ns).Get(c.serviceName(Replica), metav1.GetOptions{})
|
||||
if err == nil {
|
||||
c.Services[Replica] = replicaService
|
||||
} else if !k8sutil.ResourceNotFound(err) {
|
||||
c.logger.Errorf("could not get replica service: %v", err)
|
||||
}
|
||||
|
||||
ep, err := c.KubeClient.Endpoints(ns).Get(c.endpointName(), metav1.GetOptions{})
|
||||
if err == nil {
|
||||
c.Endpoint = ep
|
||||
} else if !k8sutil.ResourceNotFound(err) {
|
||||
c.logger.Errorf("could not get endpoint: %v", err)
|
||||
}
|
||||
|
||||
secrets, err := c.KubeClient.Secrets(ns).List(metav1.ListOptions{LabelSelector: c.labelsSet().String()})
|
||||
if err != nil {
|
||||
c.logger.Errorf("could not get list of secrets: %v", err)
|
||||
}
|
||||
for i, secret := range secrets.Items {
|
||||
if _, ok := c.Secrets[secret.UID]; ok {
|
||||
continue
|
||||
}
|
||||
c.Secrets[secret.UID] = &secrets.Items[i]
|
||||
c.logger.Debugf("secret loaded, uid: %q", secret.UID)
|
||||
}
|
||||
|
||||
ss, err := c.KubeClient.StatefulSets(ns).Get(c.statefulSetName(), metav1.GetOptions{})
|
||||
if err == nil {
|
||||
c.Statefulset = ss
|
||||
} else if !k8sutil.ResourceNotFound(err) {
|
||||
c.logger.Errorf("could not get statefulset: %v", err)
|
||||
}
|
||||
|
||||
pdb, err := c.KubeClient.PodDisruptionBudgets(ns).Get(c.podDisruptionBudgetName(), metav1.GetOptions{})
|
||||
if err == nil {
|
||||
c.PodDisruptionBudget = pdb
|
||||
} else if !k8sutil.ResourceNotFound(err) {
|
||||
c.logger.Errorf("could not get pod disruption budget: %v", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Cluster) listResources() error {
|
||||
if c.PodDisruptionBudget != nil {
|
||||
c.logger.Infof("found pod disruption budget: %q (uid: %q)", util.NameFromMeta(c.PodDisruptionBudget.ObjectMeta), c.PodDisruptionBudget.UID)
|
||||
|
|
@ -85,8 +30,8 @@ func (c *Cluster) listResources() error {
|
|||
c.logger.Infof("found secret: %q (uid: %q)", util.NameFromMeta(obj.ObjectMeta), obj.UID)
|
||||
}
|
||||
|
||||
if c.Endpoint != nil {
|
||||
c.logger.Infof("found endpoint: %q (uid: %q)", util.NameFromMeta(c.Endpoint.ObjectMeta), c.Endpoint.UID)
|
||||
for role, endpoint := range c.Endpoints {
|
||||
c.logger.Infof("found %s endpoint: %q (uid: %q)", role, util.NameFromMeta(endpoint.ObjectMeta), endpoint.UID)
|
||||
}
|
||||
|
||||
for role, service := range c.Services {
|
||||
|
|
@ -119,7 +64,7 @@ func (c *Cluster) createStatefulSet() (*v1beta1.StatefulSet, error) {
|
|||
if c.Statefulset != nil {
|
||||
return nil, fmt.Errorf("statefulset already exists in the cluster")
|
||||
}
|
||||
statefulSetSpec, err := c.generateStatefulSet(c.Spec)
|
||||
statefulSetSpec, err := c.generateStatefulSet(&c.Spec)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not generate statefulset: %v", err)
|
||||
}
|
||||
|
|
@ -307,11 +252,13 @@ func (c *Cluster) createService(role PostgresRole) (*v1.Service, error) {
|
|||
|
||||
func (c *Cluster) updateService(role PostgresRole, newService *v1.Service) error {
|
||||
c.setProcessName("updating %v service", role)
|
||||
|
||||
if c.Services[role] == nil {
|
||||
return fmt.Errorf("there is no service in the cluster")
|
||||
}
|
||||
|
||||
serviceName := util.NameFromMeta(c.Services[role].ObjectMeta)
|
||||
endpointName := util.NameFromMeta(c.Endpoint.ObjectMeta)
|
||||
endpointName := util.NameFromMeta(c.Endpoints[role].ObjectMeta)
|
||||
// TODO: check if it possible to change the service type with a patch in future versions of Kubernetes
|
||||
if newService.Spec.Type != c.Services[role].Spec.Type {
|
||||
// service type has changed, need to replace the service completely.
|
||||
|
|
@ -324,38 +271,42 @@ func (c *Cluster) updateService(role PostgresRole, newService *v1.Service) error
|
|||
if role == Master {
|
||||
// for the master service we need to re-create the endpoint as well. Get the up-to-date version of
|
||||
// the addresses stored in it before the service is deleted (deletion of the service removes the endpooint)
|
||||
currentEndpoint, err = c.KubeClient.Endpoints(c.Services[role].Namespace).Get(c.Services[role].Name, metav1.GetOptions{})
|
||||
currentEndpoint, err = c.KubeClient.Endpoints(c.Namespace).Get(c.endpointName(role), metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not get current cluster endpoints: %v", err)
|
||||
return fmt.Errorf("could not get current cluster %s endpoints: %v", role, err)
|
||||
}
|
||||
}
|
||||
err = c.KubeClient.Services(c.Services[role].Namespace).Delete(c.Services[role].Name, c.deleteOptions)
|
||||
err = c.KubeClient.Services(serviceName.Namespace).Delete(serviceName.Name, c.deleteOptions)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not delete service %q: %v", serviceName, err)
|
||||
}
|
||||
c.Endpoint = nil
|
||||
svc, err := c.KubeClient.Services(newService.Namespace).Create(newService)
|
||||
|
||||
c.Endpoints[role] = nil
|
||||
svc, err := c.KubeClient.Services(serviceName.Namespace).Create(newService)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not create service %q: %v", serviceName, err)
|
||||
}
|
||||
|
||||
c.Services[role] = svc
|
||||
if role == Master {
|
||||
// create the new endpoint using the addresses obtained from the previous one
|
||||
endpointSpec := c.generateMasterEndpoints(currentEndpoint.Subsets)
|
||||
ep, err := c.KubeClient.Endpoints(c.Services[role].Namespace).Create(endpointSpec)
|
||||
endpointSpec := c.generateEndpoint(role, currentEndpoint.Subsets)
|
||||
ep, err := c.KubeClient.Endpoints(endpointSpec.Namespace).Create(endpointSpec)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not create endpoint %q: %v", endpointName, err)
|
||||
}
|
||||
c.Endpoint = ep
|
||||
|
||||
c.Endpoints[role] = ep
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
if len(newService.ObjectMeta.Annotations) > 0 {
|
||||
annotationsPatchData := metadataAnnotationsPatch(newService.ObjectMeta.Annotations)
|
||||
|
||||
_, err := c.KubeClient.Services(c.Services[role].Namespace).Patch(
|
||||
c.Services[role].Name,
|
||||
_, err := c.KubeClient.Services(serviceName.Namespace).Patch(
|
||||
serviceName.Name,
|
||||
types.StrategicMergePatchType,
|
||||
[]byte(annotationsPatchData), "")
|
||||
|
||||
|
|
@ -369,8 +320,8 @@ func (c *Cluster) updateService(role PostgresRole, newService *v1.Service) error
|
|||
return fmt.Errorf("could not form patch for the service %q: %v", serviceName, err)
|
||||
}
|
||||
|
||||
svc, err := c.KubeClient.Services(c.Services[role].Namespace).Patch(
|
||||
c.Services[role].Name,
|
||||
svc, err := c.KubeClient.Services(serviceName.Namespace).Patch(
|
||||
serviceName.Name,
|
||||
types.MergePatchType,
|
||||
patchData, "")
|
||||
if err != nil {
|
||||
|
|
@ -383,31 +334,36 @@ func (c *Cluster) updateService(role PostgresRole, newService *v1.Service) error
|
|||
|
||||
func (c *Cluster) deleteService(role PostgresRole) error {
|
||||
c.logger.Debugf("deleting service %s", role)
|
||||
if c.Services[role] == nil {
|
||||
return fmt.Errorf("there is no %s service in the cluster", role)
|
||||
}
|
||||
|
||||
service := c.Services[role]
|
||||
err := c.KubeClient.Services(service.Namespace).Delete(service.Name, c.deleteOptions)
|
||||
if err != nil {
|
||||
|
||||
if err := c.KubeClient.Services(service.Namespace).Delete(service.Name, c.deleteOptions); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
c.logger.Infof("%s service %q has been deleted", role, util.NameFromMeta(service.ObjectMeta))
|
||||
c.Services[role] = nil
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Cluster) createEndpoint() (*v1.Endpoints, error) {
|
||||
func (c *Cluster) createEndpoint(role PostgresRole) (*v1.Endpoints, error) {
|
||||
c.setProcessName("creating endpoint")
|
||||
if c.Endpoint != nil {
|
||||
return nil, fmt.Errorf("endpoint already exists in the cluster")
|
||||
if c.Endpoints[role] != nil {
|
||||
return nil, fmt.Errorf("%s endpoint already exists in the cluster", role)
|
||||
}
|
||||
endpointsSpec := c.generateMasterEndpoints(nil)
|
||||
subsets := make([]v1.EndpointSubset, 0)
|
||||
if role == Master {
|
||||
//TODO: set subsets to the master
|
||||
}
|
||||
endpointsSpec := c.generateEndpoint(role, subsets)
|
||||
|
||||
endpoints, err := c.KubeClient.Endpoints(endpointsSpec.Namespace).Create(endpointsSpec)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, fmt.Errorf("could not create %s endpoint: %v", role, err)
|
||||
}
|
||||
c.Endpoint = endpoints
|
||||
|
||||
c.Endpoints[role] = endpoints
|
||||
|
||||
return endpoints, nil
|
||||
}
|
||||
|
|
@ -430,13 +386,19 @@ func (c *Cluster) createPodDisruptionBudget() (*policybeta1.PodDisruptionBudget,
|
|||
}
|
||||
|
||||
func (c *Cluster) updatePodDisruptionBudget(pdb *policybeta1.PodDisruptionBudget) error {
|
||||
if c.podEventsQueue == nil {
|
||||
if c.PodDisruptionBudget == nil {
|
||||
return fmt.Errorf("there is no pod disruption budget in the cluster")
|
||||
}
|
||||
|
||||
newPdb, err := c.KubeClient.PodDisruptionBudgets(pdb.Namespace).Update(pdb)
|
||||
if err := c.deletePodDisruptionBudget(); err != nil {
|
||||
return fmt.Errorf("could not delete pod disruption budget: %v", err)
|
||||
}
|
||||
|
||||
newPdb, err := c.KubeClient.
|
||||
PodDisruptionBudgets(pdb.Namespace).
|
||||
Create(pdb)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not update pod disruption budget: %v", err)
|
||||
return fmt.Errorf("could not create pod disruption budget: %v", err)
|
||||
}
|
||||
c.PodDisruptionBudget = newPdb
|
||||
|
||||
|
|
@ -448,69 +410,50 @@ func (c *Cluster) deletePodDisruptionBudget() error {
|
|||
if c.PodDisruptionBudget == nil {
|
||||
return fmt.Errorf("there is no pod disruption budget in the cluster")
|
||||
}
|
||||
|
||||
pdbName := util.NameFromMeta(c.PodDisruptionBudget.ObjectMeta)
|
||||
err := c.KubeClient.
|
||||
PodDisruptionBudgets(c.PodDisruptionBudget.Namespace).
|
||||
Delete(c.PodDisruptionBudget.Namespace, c.deleteOptions)
|
||||
Delete(c.PodDisruptionBudget.Name, c.deleteOptions)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not delete pod disruption budget: %v", err)
|
||||
}
|
||||
c.logger.Infof("pod disruption budget %q has been deleted", util.NameFromMeta(c.PodDisruptionBudget.ObjectMeta))
|
||||
c.PodDisruptionBudget = nil
|
||||
|
||||
err = retryutil.Retry(c.OpConfig.ResourceCheckInterval, c.OpConfig.ResourceCheckTimeout,
|
||||
func() (bool, error) {
|
||||
_, err2 := c.KubeClient.PodDisruptionBudgets(pdbName.Namespace).Get(pdbName.Name, metav1.GetOptions{})
|
||||
if err2 == nil {
|
||||
return false, nil
|
||||
}
|
||||
if k8sutil.ResourceNotFound(err2) {
|
||||
return true, nil
|
||||
} else {
|
||||
return false, err2
|
||||
}
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not delete pod disruption budget: %v", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Cluster) deleteEndpoint() error {
|
||||
func (c *Cluster) deleteEndpoint(role PostgresRole) error {
|
||||
c.setProcessName("deleting endpoint")
|
||||
c.logger.Debugln("deleting endpoint")
|
||||
if c.Endpoint == nil {
|
||||
return fmt.Errorf("there is no endpoint in the cluster")
|
||||
if c.Endpoints[role] == nil {
|
||||
return fmt.Errorf("there is no %s endpoint in the cluster", role)
|
||||
}
|
||||
err := c.KubeClient.Endpoints(c.Endpoint.Namespace).Delete(c.Endpoint.Name, c.deleteOptions)
|
||||
if err != nil {
|
||||
|
||||
if err := c.KubeClient.Endpoints(c.Endpoints[role].Namespace).Delete(c.Endpoints[role].Name, c.deleteOptions); err != nil {
|
||||
return fmt.Errorf("could not delete endpoint: %v", err)
|
||||
}
|
||||
c.logger.Infof("endpoint %q has been deleted", util.NameFromMeta(c.Endpoint.ObjectMeta))
|
||||
c.Endpoint = nil
|
||||
|
||||
return nil
|
||||
}
|
||||
c.logger.Infof("endpoint %q has been deleted", util.NameFromMeta(c.Endpoints[role].ObjectMeta))
|
||||
|
||||
func (c *Cluster) applySecrets() error {
|
||||
c.setProcessName("applying secrets")
|
||||
secrets := c.generateUserSecrets()
|
||||
|
||||
for secretUsername, secretSpec := range secrets {
|
||||
secret, err := c.KubeClient.Secrets(secretSpec.Namespace).Create(secretSpec)
|
||||
if k8sutil.ResourceAlreadyExists(err) {
|
||||
var userMap map[string]spec.PgUser
|
||||
curSecret, err2 := c.KubeClient.Secrets(secretSpec.Namespace).Get(secretSpec.Name, metav1.GetOptions{})
|
||||
if err2 != nil {
|
||||
return fmt.Errorf("could not get current secret: %v", err2)
|
||||
}
|
||||
c.logger.Debugf("secret %q already exists, fetching it's password", util.NameFromMeta(curSecret.ObjectMeta))
|
||||
if secretUsername == c.systemUsers[constants.SuperuserKeyName].Name {
|
||||
secretUsername = constants.SuperuserKeyName
|
||||
userMap = c.systemUsers
|
||||
} else if secretUsername == c.systemUsers[constants.ReplicationUserKeyName].Name {
|
||||
secretUsername = constants.ReplicationUserKeyName
|
||||
userMap = c.systemUsers
|
||||
} else {
|
||||
userMap = c.pgUsers
|
||||
}
|
||||
pwdUser := userMap[secretUsername]
|
||||
pwdUser.Password = string(curSecret.Data["password"])
|
||||
userMap[secretUsername] = pwdUser
|
||||
|
||||
continue
|
||||
} else {
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not create secret for user %q: %v", secretUsername, err)
|
||||
}
|
||||
c.Secrets[secret.UID] = secret
|
||||
c.logger.Debugf("created new secret %q, uid: %q", util.NameFromMeta(secret.ObjectMeta), secret.UID)
|
||||
}
|
||||
}
|
||||
c.Endpoints[role] = nil
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
@ -543,9 +486,14 @@ func (c *Cluster) GetServiceReplica() *v1.Service {
|
|||
return c.Services[Replica]
|
||||
}
|
||||
|
||||
// GetEndpoint returns cluster's kubernetes Endpoint
|
||||
func (c *Cluster) GetEndpoint() *v1.Endpoints {
|
||||
return c.Endpoint
|
||||
// GetEndpointMaster returns cluster's kubernetes master Endpoint
|
||||
func (c *Cluster) GetEndpointMaster() *v1.Endpoints {
|
||||
return c.Endpoints[Master]
|
||||
}
|
||||
|
||||
// GetEndpointReplica returns cluster's kubernetes master Endpoint
|
||||
func (c *Cluster) GetEndpointReplica() *v1.Endpoints {
|
||||
return c.Endpoints[Replica]
|
||||
}
|
||||
|
||||
// GetStatefulSet returns cluster's kubernetes StatefulSet
|
||||
|
|
|
|||
|
|
@ -4,10 +4,12 @@ import (
|
|||
"fmt"
|
||||
"reflect"
|
||||
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
policybeta1 "k8s.io/client-go/pkg/apis/policy/v1beta1"
|
||||
|
||||
"github.com/zalando-incubator/postgres-operator/pkg/spec"
|
||||
"github.com/zalando-incubator/postgres-operator/pkg/util"
|
||||
"github.com/zalando-incubator/postgres-operator/pkg/util/constants"
|
||||
"github.com/zalando-incubator/postgres-operator/pkg/util/k8sutil"
|
||||
"github.com/zalando-incubator/postgres-operator/pkg/util/volumes"
|
||||
)
|
||||
|
|
@ -20,11 +22,6 @@ func (c *Cluster) Sync(newSpec *spec.Postgresql) (err error) {
|
|||
|
||||
c.Postgresql = *newSpec
|
||||
|
||||
err = c.loadResources()
|
||||
if err != nil {
|
||||
c.logger.Errorf("could not load resources: %v", err)
|
||||
}
|
||||
|
||||
defer func() {
|
||||
if err != nil {
|
||||
c.setStatus(spec.ClusterStatusSyncFailed)
|
||||
|
|
@ -41,39 +38,15 @@ func (c *Cluster) Sync(newSpec *spec.Postgresql) (err error) {
|
|||
c.logger.Debugf("syncing secrets")
|
||||
|
||||
//TODO: mind the secrets of the deleted/new users
|
||||
if err = c.applySecrets(); err != nil {
|
||||
if !k8sutil.ResourceAlreadyExists(err) {
|
||||
err = fmt.Errorf("could not sync secrets: %v", err)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
c.logger.Debugf("syncing endpoints")
|
||||
if err = c.syncEndpoint(); err != nil {
|
||||
if !k8sutil.ResourceAlreadyExists(err) {
|
||||
err = fmt.Errorf("could not sync endpoints: %v", err)
|
||||
return
|
||||
}
|
||||
if err = c.syncSecrets(); err != nil {
|
||||
err = fmt.Errorf("could not sync secrets: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
c.logger.Debugf("syncing services")
|
||||
for _, role := range []PostgresRole{Master, Replica} {
|
||||
if role == Replica && !c.Spec.ReplicaLoadBalancer {
|
||||
if c.Services[role] != nil {
|
||||
// delete the left over replica service
|
||||
if err = c.deleteService(role); err != nil {
|
||||
err = fmt.Errorf("could not delete obsolete %s service: %v", role, err)
|
||||
return
|
||||
}
|
||||
}
|
||||
continue
|
||||
}
|
||||
if err = c.syncService(role); err != nil {
|
||||
if !k8sutil.ResourceAlreadyExists(err) {
|
||||
err = fmt.Errorf("coud not sync %s service: %v", role, err)
|
||||
return
|
||||
}
|
||||
}
|
||||
if err = c.syncServices(); err != nil {
|
||||
err = fmt.Errorf("could not sync services: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
c.logger.Debugf("syncing statefulsets")
|
||||
|
|
@ -112,73 +85,170 @@ func (c *Cluster) Sync(newSpec *spec.Postgresql) (err error) {
|
|||
return
|
||||
}
|
||||
|
||||
func (c *Cluster) syncService(role PostgresRole) error {
|
||||
cSpec := c.Spec
|
||||
if c.Services[role] == nil {
|
||||
c.logger.Infof("could not find the cluster's %s service", role)
|
||||
svc, err := c.createService(role)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not create missing %s service: %v", role, err)
|
||||
func (c *Cluster) syncServices() error {
|
||||
for _, role := range []PostgresRole{Master, Replica} {
|
||||
c.logger.Debugf("syncing %s service", role)
|
||||
|
||||
if err := c.syncEndpoint(role); err != nil {
|
||||
return fmt.Errorf("could not sync %s endpont: %v", role, err)
|
||||
}
|
||||
c.logger.Infof("created missing %s service %q", role, util.NameFromMeta(svc.ObjectMeta))
|
||||
|
||||
return nil
|
||||
if err := c.syncService(role); err != nil {
|
||||
return fmt.Errorf("could not sync %s service: %v", role, err)
|
||||
}
|
||||
}
|
||||
|
||||
desiredSvc := c.generateService(role, &cSpec)
|
||||
match, reason := c.sameServiceWith(role, desiredSvc)
|
||||
if match {
|
||||
return nil
|
||||
}
|
||||
c.logServiceChanges(role, c.Services[role], desiredSvc, false, reason)
|
||||
|
||||
if err := c.updateService(role, desiredSvc); err != nil {
|
||||
return fmt.Errorf("could not update %s service to match desired state: %v", role, err)
|
||||
}
|
||||
c.logger.Infof("%s service %q is in the desired state now", role, util.NameFromMeta(desiredSvc.ObjectMeta))
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Cluster) syncEndpoint() error {
|
||||
if c.Endpoint == nil {
|
||||
c.logger.Infof("could not find the cluster's endpoint")
|
||||
ep, err := c.createEndpoint()
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not create missing endpoint: %v", err)
|
||||
func (c *Cluster) syncService(role PostgresRole) error {
|
||||
c.setProcessName("syncing %s service", role)
|
||||
|
||||
svc, err := c.KubeClient.Services(c.Namespace).Get(c.serviceName(role), metav1.GetOptions{})
|
||||
if err == nil {
|
||||
if role == Replica && !c.Spec.ReplicaLoadBalancer {
|
||||
if err := c.deleteService(role); err != nil {
|
||||
return fmt.Errorf("could not delete %s service", role)
|
||||
}
|
||||
}
|
||||
c.logger.Infof("created missing endpoint %q", util.NameFromMeta(ep.ObjectMeta))
|
||||
|
||||
desiredSvc := c.generateService(role, &c.Spec)
|
||||
match, reason := k8sutil.SameService(svc, desiredSvc)
|
||||
if match {
|
||||
c.Services[role] = svc
|
||||
return nil
|
||||
}
|
||||
c.logServiceChanges(role, svc, desiredSvc, false, reason)
|
||||
|
||||
if err := c.updateService(role, desiredSvc); err != nil {
|
||||
return fmt.Errorf("could not update %s service to match desired state: %v", role, err)
|
||||
}
|
||||
c.logger.Infof("%s service %q is in the desired state now", role, util.NameFromMeta(desiredSvc.ObjectMeta))
|
||||
|
||||
return nil
|
||||
} else if !k8sutil.ResourceNotFound(err) {
|
||||
return fmt.Errorf("could not get %s service: %v", role, err)
|
||||
}
|
||||
c.Services[role] = nil
|
||||
|
||||
// Service does not exist
|
||||
if role == Replica && !c.Spec.ReplicaLoadBalancer {
|
||||
return nil
|
||||
}
|
||||
|
||||
c.logger.Infof("could not find the cluster's %s service", role)
|
||||
|
||||
if svc, err := c.createService(role); err != nil {
|
||||
if k8sutil.ResourceAlreadyExists(err) {
|
||||
c.logger.Infof("%s service %q already exists", role, util.NameFromMeta(svc.ObjectMeta))
|
||||
svc, err := c.KubeClient.Services(c.Namespace).Get(c.serviceName(role), metav1.GetOptions{})
|
||||
if err == nil {
|
||||
c.Services[role] = svc
|
||||
} else {
|
||||
c.logger.Infof("could not fetch existing %s service: %v", role, err)
|
||||
}
|
||||
} else {
|
||||
return fmt.Errorf("could not create missing %s service: %v", role, err)
|
||||
}
|
||||
} else {
|
||||
c.logger.Infof("created missing %s service %q", role, util.NameFromMeta(svc.ObjectMeta))
|
||||
c.Services[role] = svc
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Cluster) syncEndpoint(role PostgresRole) error {
|
||||
c.setProcessName("syncing %s endpoint", role)
|
||||
|
||||
ep, err := c.KubeClient.Endpoints(c.Namespace).Get(c.endpointName(role), metav1.GetOptions{})
|
||||
if err == nil {
|
||||
if role == Replica && !c.Spec.ReplicaLoadBalancer {
|
||||
if err := c.deleteEndpoint(role); err != nil {
|
||||
return fmt.Errorf("could not delete %s endpoint", role)
|
||||
}
|
||||
}
|
||||
|
||||
c.Endpoints[role] = ep
|
||||
return nil
|
||||
} else if !k8sutil.ResourceNotFound(err) {
|
||||
return fmt.Errorf("could not get %s endpoint: %v", role, err)
|
||||
}
|
||||
c.Endpoints[role] = nil
|
||||
|
||||
if role == Replica && !c.Spec.ReplicaLoadBalancer {
|
||||
return nil
|
||||
}
|
||||
|
||||
c.logger.Infof("could not find the cluster's %s endpoint", role)
|
||||
|
||||
if ep, err := c.createEndpoint(role); err != nil {
|
||||
if k8sutil.ResourceAlreadyExists(err) {
|
||||
c.logger.Infof("%s endpoint %q already exists", role, util.NameFromMeta(ep.ObjectMeta))
|
||||
ep, err := c.KubeClient.Endpoints(c.Namespace).Get(c.endpointName(role), metav1.GetOptions{})
|
||||
if err == nil {
|
||||
c.Endpoints[role] = ep
|
||||
} else {
|
||||
c.logger.Infof("could not fetch existing %s endpoint: %v", role, err)
|
||||
}
|
||||
} else {
|
||||
return fmt.Errorf("could not create missing %s endpoint: %v", role, err)
|
||||
}
|
||||
} else {
|
||||
c.logger.Infof("created missing %s endpoint %q", role, util.NameFromMeta(ep.ObjectMeta))
|
||||
c.Endpoints[role] = ep
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Cluster) syncPodDisruptionBudget(isUpdate bool) error {
|
||||
if c.PodDisruptionBudget == nil {
|
||||
c.logger.Infof("could not find the cluster's pod disruption budget")
|
||||
pdb, err := c.createPodDisruptionBudget()
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not create pod disruption budget: %v", err)
|
||||
}
|
||||
c.logger.Infof("created missing pod disruption budget %q", util.NameFromMeta(pdb.ObjectMeta))
|
||||
return nil
|
||||
} else {
|
||||
pdb, err := c.KubeClient.PodDisruptionBudgets(c.Namespace).Get(c.podDisruptionBudgetName(), metav1.GetOptions{})
|
||||
if err == nil {
|
||||
c.PodDisruptionBudget = pdb
|
||||
newPDB := c.generatePodDisruptionBudget()
|
||||
if match, reason := c.samePDBWith(newPDB); !match {
|
||||
c.logPDBChanges(c.PodDisruptionBudget, newPDB, isUpdate, reason)
|
||||
if match, reason := k8sutil.SamePDB(pdb, newPDB); !match {
|
||||
c.logPDBChanges(pdb, newPDB, isUpdate, reason)
|
||||
if err := c.updatePodDisruptionBudget(newPDB); err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
c.PodDisruptionBudget = pdb
|
||||
}
|
||||
|
||||
return nil
|
||||
} else if !k8sutil.ResourceNotFound(err) {
|
||||
return fmt.Errorf("could not get pod disruption budget: %v", err)
|
||||
}
|
||||
c.PodDisruptionBudget = nil
|
||||
|
||||
c.logger.Infof("could not find the cluster's pod disruption budget")
|
||||
if pdb, err = c.createPodDisruptionBudget(); err != nil {
|
||||
if k8sutil.ResourceAlreadyExists(err) {
|
||||
c.logger.Infof("pod disruption budget %q already exists", util.NameFromMeta(pdb.ObjectMeta))
|
||||
} else {
|
||||
return fmt.Errorf("could not create pod disruption budget: %v", err)
|
||||
}
|
||||
} else {
|
||||
c.logger.Infof("created missing pod disruption budget %q", util.NameFromMeta(pdb.ObjectMeta))
|
||||
c.PodDisruptionBudget = pdb
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Cluster) syncStatefulSet() error {
|
||||
cSpec := c.Spec
|
||||
var rollUpdate bool
|
||||
if c.Statefulset == nil {
|
||||
var (
|
||||
err error
|
||||
rollUpdate bool
|
||||
)
|
||||
c.Statefulset, err = c.KubeClient.StatefulSets(c.Namespace).Get(c.statefulSetName(), metav1.GetOptions{})
|
||||
|
||||
if err != nil && !k8sutil.ResourceNotFound(err) {
|
||||
return fmt.Errorf("could not get statefulset: %v", err)
|
||||
}
|
||||
|
||||
if err != nil && k8sutil.ResourceNotFound(err) {
|
||||
c.logger.Infof("could not find the cluster's statefulset")
|
||||
pods, err := c.listPods()
|
||||
if err != nil {
|
||||
|
|
@ -189,22 +259,25 @@ func (c *Cluster) syncStatefulSet() error {
|
|||
c.logger.Infof("found pods without the statefulset: trigger rolling update")
|
||||
rollUpdate = true
|
||||
}
|
||||
|
||||
ss, err := c.createStatefulSet()
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not create missing statefulset: %v", err)
|
||||
}
|
||||
err = c.waitStatefulsetPodsReady()
|
||||
if err != nil {
|
||||
|
||||
if err = c.waitStatefulsetPodsReady(); err != nil {
|
||||
return fmt.Errorf("cluster is not ready: %v", err)
|
||||
}
|
||||
|
||||
c.logger.Infof("created missing statefulset %q", util.NameFromMeta(ss.ObjectMeta))
|
||||
if !rollUpdate {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
/* TODO: should check that we need to replace the statefulset */
|
||||
if !rollUpdate {
|
||||
desiredSS, err := c.generateStatefulSet(cSpec)
|
||||
desiredSS, err := c.generateStatefulSet(&c.Spec)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not generate statefulset: %v", err)
|
||||
}
|
||||
|
|
@ -239,6 +312,45 @@ func (c *Cluster) syncStatefulSet() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (c *Cluster) syncSecrets() error {
|
||||
c.setProcessName("syncing secrets")
|
||||
secrets := c.generateUserSecrets()
|
||||
|
||||
for secretUsername, secretSpec := range secrets {
|
||||
secret, err := c.KubeClient.Secrets(secretSpec.Namespace).Create(secretSpec)
|
||||
if k8sutil.ResourceAlreadyExists(err) {
|
||||
var userMap map[string]spec.PgUser
|
||||
curSecret, err2 := c.KubeClient.Secrets(secretSpec.Namespace).Get(secretSpec.Name, metav1.GetOptions{})
|
||||
if err2 != nil {
|
||||
return fmt.Errorf("could not get current secret: %v", err2)
|
||||
}
|
||||
c.logger.Debugf("secret %q already exists, fetching it's password", util.NameFromMeta(curSecret.ObjectMeta))
|
||||
if secretUsername == c.systemUsers[constants.SuperuserKeyName].Name {
|
||||
secretUsername = constants.SuperuserKeyName
|
||||
userMap = c.systemUsers
|
||||
} else if secretUsername == c.systemUsers[constants.ReplicationUserKeyName].Name {
|
||||
secretUsername = constants.ReplicationUserKeyName
|
||||
userMap = c.systemUsers
|
||||
} else {
|
||||
userMap = c.pgUsers
|
||||
}
|
||||
pwdUser := userMap[secretUsername]
|
||||
pwdUser.Password = string(curSecret.Data["password"])
|
||||
userMap[secretUsername] = pwdUser
|
||||
|
||||
continue
|
||||
} else {
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not create secret for user %q: %v", secretUsername, err)
|
||||
}
|
||||
c.Secrets[secret.UID] = secret
|
||||
c.logger.Debugf("created new secret %q, uid: %q", util.NameFromMeta(secret.ObjectMeta), secret.UID)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Cluster) syncRoles(readFromDatabase bool) error {
|
||||
c.setProcessName("syncing roles")
|
||||
|
||||
|
|
@ -268,11 +380,14 @@ func (c *Cluster) syncRoles(readFromDatabase bool) error {
|
|||
if err = c.userSyncStrategy.ExecuteSyncRequests(pgSyncRequests, c.pgDb); err != nil {
|
||||
return fmt.Errorf("error executing sync statements: %v", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// syncVolumes reads all persistent volumes and checks that their size matches the one declared in the statefulset.
|
||||
func (c *Cluster) syncVolumes() error {
|
||||
c.setProcessName("syncing volumes")
|
||||
|
||||
act, err := c.volumesNeedResizing(c.Spec.Volume)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not compare size of the volumes: %v", err)
|
||||
|
|
|
|||
|
|
@ -144,12 +144,9 @@ func (c *Cluster) logServiceChanges(role PostgresRole, old, new *v1.Service, isU
|
|||
}
|
||||
}
|
||||
|
||||
func (c *Cluster) logVolumeChanges(old, new spec.Volume, reason string) {
|
||||
func (c *Cluster) logVolumeChanges(old, new spec.Volume) {
|
||||
c.logger.Infof("volume specification has been changed")
|
||||
c.logger.Debugf("diff\n%s\n", util.PrettyDiff(old, new))
|
||||
if reason != "" {
|
||||
c.logger.Infof("reason: %s", reason)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Cluster) getOAuthToken() (string, error) {
|
||||
|
|
|
|||
|
|
@ -146,11 +146,11 @@ func (c *Cluster) resizeVolumes(newVolume spec.Volume, resizers []volumes.Volume
|
|||
}
|
||||
|
||||
func (c *Cluster) volumesNeedResizing(newVolume spec.Volume) (bool, error) {
|
||||
volumes, manifestSize, err := c.listVolumesWithManifestSize(newVolume)
|
||||
vols, manifestSize, err := c.listVolumesWithManifestSize(newVolume)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
for _, pv := range volumes {
|
||||
for _, pv := range vols {
|
||||
currentSize := quantityToGigabyte(pv.Spec.Capacity[v1.ResourceStorage])
|
||||
if currentSize != manifestSize {
|
||||
return true, nil
|
||||
|
|
@ -165,11 +165,11 @@ func (c *Cluster) listVolumesWithManifestSize(newVolume spec.Volume) ([]*v1.Pers
|
|||
return nil, 0, fmt.Errorf("could not parse volume size from the manifest: %v", err)
|
||||
}
|
||||
manifestSize := quantityToGigabyte(newSize)
|
||||
volumes, err := c.listPersistentVolumes()
|
||||
vols, err := c.listPersistentVolumes()
|
||||
if err != nil {
|
||||
return nil, 0, fmt.Errorf("could not list persistent volumes: %v", err)
|
||||
}
|
||||
return volumes, manifestSize, nil
|
||||
return vols, manifestSize, nil
|
||||
}
|
||||
|
||||
// getPodNameFromPersistentVolume returns a pod name that it extracts from the volume claim ref.
|
||||
|
|
|
|||
|
|
@ -193,7 +193,7 @@ func (c *Controller) processEvent(event spec.ClusterEvent) {
|
|||
return
|
||||
}
|
||||
c.curWorkerCluster.Store(event.WorkerID, cl)
|
||||
if err := cl.Update(event.NewSpec); err != nil {
|
||||
if err := cl.Update(event.OldSpec, event.NewSpec); err != nil {
|
||||
cl.Error = fmt.Errorf("could not update cluster: %v", err)
|
||||
lg.Error(cl.Error)
|
||||
|
||||
|
|
@ -374,9 +374,6 @@ func (c *Controller) postgresqlUpdate(prev, cur interface{}) {
|
|||
if !ok {
|
||||
c.logger.Errorf("could not cast to postgresql spec")
|
||||
}
|
||||
if pgOld.ResourceVersion == pgNew.ResourceVersion {
|
||||
return
|
||||
}
|
||||
if reflect.DeepEqual(pgOld.Spec, pgNew.Spec) {
|
||||
return
|
||||
}
|
||||
|
|
|
|||
|
|
@ -99,7 +99,8 @@ type ClusterStatus struct {
|
|||
Cluster string
|
||||
MasterService *v1.Service
|
||||
ReplicaService *v1.Service
|
||||
Endpoint *v1.Endpoints
|
||||
MasterEndpoint *v1.Endpoints
|
||||
ReplicaEndpoint *v1.Endpoints
|
||||
StatefulSet *v1beta1.StatefulSet
|
||||
PodDisruptionBudget *policyv1beta1.PodDisruptionBudget
|
||||
|
||||
|
|
|
|||
|
|
@ -2,6 +2,7 @@ package k8sutil
|
|||
|
||||
import (
|
||||
"fmt"
|
||||
"reflect"
|
||||
|
||||
apiextclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
|
||||
apiextbeta1 "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/typed/apiextensions/v1beta1"
|
||||
|
|
@ -9,10 +10,12 @@ import (
|
|||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/apimachinery/pkg/runtime/serializer"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
v1beta1 "k8s.io/client-go/kubernetes/typed/apps/v1beta1"
|
||||
"k8s.io/client-go/kubernetes/typed/apps/v1beta1"
|
||||
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
|
||||
policyv1beta1 "k8s.io/client-go/kubernetes/typed/policy/v1beta1"
|
||||
"k8s.io/client-go/pkg/api"
|
||||
"k8s.io/client-go/pkg/api/v1"
|
||||
policybeta1 "k8s.io/client-go/pkg/apis/policy/v1beta1"
|
||||
"k8s.io/client-go/rest"
|
||||
"k8s.io/client-go/tools/clientcmd"
|
||||
|
||||
|
|
@ -99,5 +102,43 @@ func NewFromConfig(cfg *rest.Config) (KubernetesClient, error) {
|
|||
kubeClient.CustomResourceDefinitionsGetter = apiextClient.ApiextensionsV1beta1()
|
||||
|
||||
return kubeClient, nil
|
||||
|
||||
}
|
||||
|
||||
// SameService compares the Services
|
||||
func SameService(cur, new *v1.Service) (match bool, reason string) {
|
||||
//TODO: improve comparison
|
||||
if cur.Spec.Type != new.Spec.Type {
|
||||
return false, fmt.Sprintf("new service's type %q doesn't match the current one %q",
|
||||
new.Spec.Type, cur.Spec.Type)
|
||||
}
|
||||
|
||||
oldSourceRanges := cur.Spec.LoadBalancerSourceRanges
|
||||
newSourceRanges := new.Spec.LoadBalancerSourceRanges
|
||||
|
||||
/* work around Kubernetes 1.6 serializing [] as nil. See https://github.com/kubernetes/kubernetes/issues/43203 */
|
||||
if (len(oldSourceRanges) == 0) && (len(newSourceRanges) == 0) {
|
||||
return true, ""
|
||||
}
|
||||
if !reflect.DeepEqual(oldSourceRanges, newSourceRanges) {
|
||||
return false, "new service's LoadBalancerSourceRange doesn't match the current one"
|
||||
}
|
||||
|
||||
oldDNSAnnotation := cur.Annotations[constants.ZalandoDNSNameAnnotation]
|
||||
newDNSAnnotation := new.Annotations[constants.ZalandoDNSNameAnnotation]
|
||||
if oldDNSAnnotation != newDNSAnnotation {
|
||||
return false, fmt.Sprintf("new service's %q annotation doesn't match the current one", constants.ZalandoDNSNameAnnotation)
|
||||
}
|
||||
|
||||
return true, ""
|
||||
}
|
||||
|
||||
// SamePDB compares the PodDisruptionBudgets
|
||||
func SamePDB(cur, new *policybeta1.PodDisruptionBudget) (match bool, reason string) {
|
||||
//TODO: improve comparison
|
||||
match = reflect.DeepEqual(new.Spec, cur.Spec)
|
||||
if !match {
|
||||
reason = "new service spec doesn't match the current one"
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue