fix errors when changing service type (#716)
* fix errors when changing service type * nullify service and endpoint before recreation * improve wait for delete logic and reuse config parameters
This commit is contained in:
		
							parent
							
								
									f9487e41c1
								
							
						
					
					
						commit
						2ce602fcd7
					
				| 
						 | 
					@ -13,7 +13,6 @@ import (
 | 
				
			||||||
	"k8s.io/apimachinery/pkg/types"
 | 
						"k8s.io/apimachinery/pkg/types"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	"github.com/zalando/postgres-operator/pkg/util"
 | 
						"github.com/zalando/postgres-operator/pkg/util"
 | 
				
			||||||
	"github.com/zalando/postgres-operator/pkg/util/constants"
 | 
					 | 
				
			||||||
	"github.com/zalando/postgres-operator/pkg/util/k8sutil"
 | 
						"github.com/zalando/postgres-operator/pkg/util/k8sutil"
 | 
				
			||||||
	"github.com/zalando/postgres-operator/pkg/util/retryutil"
 | 
						"github.com/zalando/postgres-operator/pkg/util/retryutil"
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
| 
						 | 
					@ -278,7 +277,8 @@ func (c *Cluster) replaceStatefulSet(newStatefulSet *appsv1.StatefulSet) error {
 | 
				
			||||||
	oldStatefulset := c.Statefulset
 | 
						oldStatefulset := c.Statefulset
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	options := metav1.DeleteOptions{PropagationPolicy: &deletePropagationPolicy}
 | 
						options := metav1.DeleteOptions{PropagationPolicy: &deletePropagationPolicy}
 | 
				
			||||||
	if err := c.KubeClient.StatefulSets(oldStatefulset.Namespace).Delete(oldStatefulset.Name, &options); err != nil {
 | 
						err := c.KubeClient.StatefulSets(oldStatefulset.Namespace).Delete(oldStatefulset.Name, &options)
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
		return fmt.Errorf("could not delete statefulset %q: %v", statefulSetName, err)
 | 
							return fmt.Errorf("could not delete statefulset %q: %v", statefulSetName, err)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	// make sure we clear the stored statefulset status if the subsequent create fails.
 | 
						// make sure we clear the stored statefulset status if the subsequent create fails.
 | 
				
			||||||
| 
						 | 
					@ -286,11 +286,16 @@ func (c *Cluster) replaceStatefulSet(newStatefulSet *appsv1.StatefulSet) error {
 | 
				
			||||||
	// wait until the statefulset is truly deleted
 | 
						// wait until the statefulset is truly deleted
 | 
				
			||||||
	c.logger.Debugf("waiting for the statefulset to be deleted")
 | 
						c.logger.Debugf("waiting for the statefulset to be deleted")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	err := retryutil.Retry(constants.StatefulsetDeletionInterval, constants.StatefulsetDeletionTimeout,
 | 
						err = retryutil.Retry(c.OpConfig.ResourceCheckInterval, c.OpConfig.ResourceCheckTimeout,
 | 
				
			||||||
		func() (bool, error) {
 | 
							func() (bool, error) {
 | 
				
			||||||
			_, err := c.KubeClient.StatefulSets(oldStatefulset.Namespace).Get(oldStatefulset.Name, metav1.GetOptions{})
 | 
								_, err2 := c.KubeClient.StatefulSets(oldStatefulset.Namespace).Get(oldStatefulset.Name, metav1.GetOptions{})
 | 
				
			||||||
 | 
								if err2 == nil {
 | 
				
			||||||
			return err != nil, nil
 | 
									return false, nil
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
								if k8sutil.ResourceNotFound(err2) {
 | 
				
			||||||
 | 
									return true, nil
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
								return false, err2
 | 
				
			||||||
		})
 | 
							})
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		return fmt.Errorf("could not delete statefulset: %v", err)
 | 
							return fmt.Errorf("could not delete statefulset: %v", err)
 | 
				
			||||||
| 
						 | 
					@ -380,13 +385,27 @@ func (c *Cluster) updateService(role PostgresRole, newService *v1.Service) error
 | 
				
			||||||
			return fmt.Errorf("could not delete service %q: %v", serviceName, err)
 | 
								return fmt.Errorf("could not delete service %q: %v", serviceName, err)
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		c.Endpoints[role] = nil
 | 
							// wait until the service is truly deleted
 | 
				
			||||||
		svc, err := c.KubeClient.Services(serviceName.Namespace).Create(newService)
 | 
							c.logger.Debugf("waiting for service to be deleted")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							err = retryutil.Retry(c.OpConfig.ResourceCheckInterval, c.OpConfig.ResourceCheckTimeout,
 | 
				
			||||||
 | 
								func() (bool, error) {
 | 
				
			||||||
 | 
									_, err2 := c.KubeClient.Services(serviceName.Namespace).Get(serviceName.Name, metav1.GetOptions{})
 | 
				
			||||||
 | 
									if err2 == nil {
 | 
				
			||||||
 | 
										return false, nil
 | 
				
			||||||
 | 
									}
 | 
				
			||||||
 | 
									if k8sutil.ResourceNotFound(err2) {
 | 
				
			||||||
 | 
										return true, nil
 | 
				
			||||||
 | 
									}
 | 
				
			||||||
 | 
									return false, err2
 | 
				
			||||||
 | 
								})
 | 
				
			||||||
		if err != nil {
 | 
							if err != nil {
 | 
				
			||||||
			return fmt.Errorf("could not create service %q: %v", serviceName, err)
 | 
								return fmt.Errorf("could not delete service %q: %v", serviceName, err)
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		c.Services[role] = svc
 | 
							// make sure we clear the stored service and endpoint status if the subsequent create fails.
 | 
				
			||||||
 | 
							c.Services[role] = nil
 | 
				
			||||||
 | 
							c.Endpoints[role] = nil
 | 
				
			||||||
		if role == Master {
 | 
							if role == Master {
 | 
				
			||||||
			// create the new endpoint using the addresses obtained from the previous one
 | 
								// create the new endpoint using the addresses obtained from the previous one
 | 
				
			||||||
			endpointSpec := c.generateEndpoint(role, currentEndpoint.Subsets)
 | 
								endpointSpec := c.generateEndpoint(role, currentEndpoint.Subsets)
 | 
				
			||||||
| 
						 | 
					@ -398,6 +417,13 @@ func (c *Cluster) updateService(role PostgresRole, newService *v1.Service) error
 | 
				
			||||||
			c.Endpoints[role] = ep
 | 
								c.Endpoints[role] = ep
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							svc, err := c.KubeClient.Services(serviceName.Namespace).Create(newService)
 | 
				
			||||||
 | 
							if err != nil {
 | 
				
			||||||
 | 
								return fmt.Errorf("could not create service %q: %v", serviceName, err)
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							c.Services[role] = svc
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		return nil
 | 
							return nil
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -4,11 +4,9 @@ import "time"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// General kubernetes-related constants
 | 
					// General kubernetes-related constants
 | 
				
			||||||
const (
 | 
					const (
 | 
				
			||||||
	PostgresContainerName       = "postgres"
 | 
						PostgresContainerName = "postgres"
 | 
				
			||||||
	PostgresContainerIdx        = 0
 | 
						PostgresContainerIdx  = 0
 | 
				
			||||||
	K8sAPIPath                  = "/apis"
 | 
						K8sAPIPath            = "/apis"
 | 
				
			||||||
	StatefulsetDeletionInterval = 1 * time.Second
 | 
					 | 
				
			||||||
	StatefulsetDeletionTimeout  = 30 * time.Second
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
	QueueResyncPeriodPod  = 5 * time.Minute
 | 
						QueueResyncPeriodPod  = 5 * time.Minute
 | 
				
			||||||
	QueueResyncPeriodTPR  = 5 * time.Minute
 | 
						QueueResyncPeriodTPR  = 5 * time.Minute
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
		Loading…
	
		Reference in New Issue