Adjust actions usage over the sync code
This commit is contained in:
parent
28f8bfc8ce
commit
63bca66187
|
|
@ -239,9 +239,9 @@ func (c *Cluster) Create() error {
|
||||||
var (
|
var (
|
||||||
err error
|
err error
|
||||||
|
|
||||||
//service *v1.Service
|
ep *v1.Endpoints
|
||||||
ep *v1.Endpoints
|
ss *v1beta1.StatefulSet
|
||||||
ss *v1beta1.StatefulSet
|
actions []Action
|
||||||
)
|
)
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
|
|
@ -272,12 +272,13 @@ func (c *Cluster) Create() error {
|
||||||
if c.Services[role] != nil {
|
if c.Services[role] != nil {
|
||||||
return fmt.Errorf("service already exists in the cluster")
|
return fmt.Errorf("service already exists in the cluster")
|
||||||
}
|
}
|
||||||
_, err = c.createService(role)
|
actions, err = c.createService(role)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("could not calculate actions to create %s service: %v",
|
msg := "could not prepare actions to create %s service: %v"
|
||||||
role, err)
|
return fmt.Errorf(msg, role, err)
|
||||||
}
|
}
|
||||||
//c.logger.Infof("%s service %q has been successfully created", role, util.NameFromMeta(service.ObjectMeta))
|
|
||||||
|
return c.applyActions(actions)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err = c.initUsers(); err != nil {
|
if err = c.initUsers(); err != nil {
|
||||||
|
|
@ -546,9 +547,13 @@ func (c *Cluster) Update(oldSpec, newSpec *spec.Postgresql) error {
|
||||||
if !reflect.DeepEqual(c.generateService(Master, &oldSpec.Spec), c.generateService(Master, &newSpec.Spec)) ||
|
if !reflect.DeepEqual(c.generateService(Master, &oldSpec.Spec), c.generateService(Master, &newSpec.Spec)) ||
|
||||||
!reflect.DeepEqual(c.generateService(Replica, &oldSpec.Spec), c.generateService(Replica, &newSpec.Spec)) {
|
!reflect.DeepEqual(c.generateService(Replica, &oldSpec.Spec), c.generateService(Replica, &newSpec.Spec)) {
|
||||||
c.logger.Debugf("syncing services")
|
c.logger.Debugf("syncing services")
|
||||||
if _, err := c.syncServices(); err != nil {
|
if actions, err := c.syncServices(); err != nil {
|
||||||
c.logger.Errorf("could not sync services: %v", err)
|
c.logger.Errorf("could prepare actions to sync services: %v", err)
|
||||||
updateFailed = true
|
updateFailed = true
|
||||||
|
} else {
|
||||||
|
if err := c.applyActions(actions); err != nil {
|
||||||
|
updateFailed = true
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -657,8 +662,10 @@ func (c *Cluster) Delete() {
|
||||||
c.logger.Warningf("could not delete %s endpoint: %v", role, err)
|
c.logger.Warningf("could not delete %s endpoint: %v", role, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if _, err := c.deleteService(role); err != nil {
|
if actions, err := c.deleteService(role); err != nil {
|
||||||
c.logger.Warningf("could not delete %s service: %v", role, err)
|
c.logger.Warningf("could prepare actions to delete %s service: %v", role, err)
|
||||||
|
} else {
|
||||||
|
c.applyActions(actions)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -356,11 +356,38 @@ func (c *Cluster) updateService(role PostgresRole, newService *v1.Service) ([]Ac
|
||||||
}
|
}
|
||||||
|
|
||||||
serviceName := util.NameFromMeta(c.Services[role].ObjectMeta)
|
serviceName := util.NameFromMeta(c.Services[role].ObjectMeta)
|
||||||
|
endpointName := util.NameFromMeta(c.Endpoints[role].ObjectMeta)
|
||||||
|
|
||||||
// TODO: check if it possible to change the service type with a patch in future versions of Kubernetes
|
// TODO: check if it possible to change the service type with a patch in future versions of Kubernetes
|
||||||
if newService.Spec.Type != c.Services[role].Spec.Type {
|
if newService.Spec.Type != c.Services[role].Spec.Type {
|
||||||
// service type has changed, need to replace the service completely.
|
// service type has changed, need to replace the service completely.
|
||||||
// we cannot use just pach the current service, since it may contain attributes incompatible with the new type.
|
// we cannot use just pach the current service, since it may contain attributes incompatible with the new type.
|
||||||
|
var (
|
||||||
|
currentEndpoint *v1.Endpoints
|
||||||
|
err error
|
||||||
|
)
|
||||||
|
|
||||||
|
if role == Master {
|
||||||
|
// for the master service we need to re-create the endpoint as well. Get the up-to-date version of
|
||||||
|
// the addresses stored in it before the service is deleted (deletion of the service removes the endpooint)
|
||||||
|
currentEndpoint, err = c.KubeClient.
|
||||||
|
Endpoints(c.Namespace).
|
||||||
|
Get(c.endpointName(role), metav1.GetOptions{})
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return NoActions, fmt.Errorf("could not get current cluster %s endpoints: %v", role, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// create the new endpoint using the addresses obtained from the previous one
|
||||||
|
endpointSpec := c.generateEndpoint(role, currentEndpoint.Subsets)
|
||||||
|
ep, err := c.KubeClient.Endpoints(endpointSpec.Namespace).Create(endpointSpec)
|
||||||
|
if err != nil {
|
||||||
|
return NoActions, fmt.Errorf("could not create endpoint %q: %v", endpointName, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
c.Endpoints[role] = ep
|
||||||
|
}
|
||||||
|
|
||||||
return []Action{
|
return []Action{
|
||||||
DeleteService{
|
DeleteService{
|
||||||
ActionData{
|
ActionData{
|
||||||
|
|
|
||||||
|
|
@ -108,7 +108,8 @@ func (c *Cluster) syncServices() (actions []Action, err error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if actions, err = c.syncService(role); err != nil {
|
if actions, err = c.syncService(role); err != nil {
|
||||||
return NoActions, fmt.Errorf("could not sync %s service: %v", role, err)
|
msg := "could prepare actions to sync %s service: %v"
|
||||||
|
return NoActions, fmt.Errorf(msg, role, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -142,10 +143,11 @@ func (c *Cluster) syncService(role PostgresRole) ([]Action, error) {
|
||||||
if err == nil {
|
if err == nil {
|
||||||
c.Services[role] = svc
|
c.Services[role] = svc
|
||||||
desiredSvc := c.generateService(role, &c.Spec)
|
desiredSvc := c.generateService(role, &c.Spec)
|
||||||
match, _ := k8sutil.SameService(svc, desiredSvc)
|
match, reason := k8sutil.SameService(svc, desiredSvc)
|
||||||
if match {
|
if match {
|
||||||
return NoActions, nil
|
return NoActions, nil
|
||||||
}
|
}
|
||||||
|
c.logServiceChanges(role, svc, desiredSvc, false, reason)
|
||||||
|
|
||||||
actions, err := c.updateService(role, desiredSvc)
|
actions, err := c.updateService(role, desiredSvc)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
@ -156,6 +158,8 @@ func (c *Cluster) syncService(role PostgresRole) ([]Action, error) {
|
||||||
} else if !k8sutil.ResourceNotFound(err) {
|
} else if !k8sutil.ResourceNotFound(err) {
|
||||||
return NoActions, fmt.Errorf("could not get %s service: %v", role, err)
|
return NoActions, fmt.Errorf("could not get %s service: %v", role, err)
|
||||||
}
|
}
|
||||||
|
c.Services[role] = nil
|
||||||
|
|
||||||
c.logger.Infof("could not find the cluster's %s service", role)
|
c.logger.Infof("could not find the cluster's %s service", role)
|
||||||
|
|
||||||
actions, err := c.createService(role)
|
actions, err := c.createService(role)
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue