add code skeleton for CronJob Sync
This commit is contained in:
		
							parent
							
								
									6f855d5190
								
							
						
					
					
						commit
						0f1e19625e
					
				| 
						 | 
					@ -6,6 +6,7 @@ import (
 | 
				
			||||||
	"strings"
 | 
						"strings"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	"k8s.io/api/apps/v1beta1"
 | 
						"k8s.io/api/apps/v1beta1"
 | 
				
			||||||
 | 
						batchv1beta1 "k8s.io/api/batch/v1beta1"
 | 
				
			||||||
	"k8s.io/api/core/v1"
 | 
						"k8s.io/api/core/v1"
 | 
				
			||||||
	policybeta1 "k8s.io/api/policy/v1beta1"
 | 
						policybeta1 "k8s.io/api/policy/v1beta1"
 | 
				
			||||||
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 | 
						metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 | 
				
			||||||
| 
						 | 
					@ -628,6 +629,31 @@ func (c *Cluster) createBackupCronJob() (err error) {
 | 
				
			||||||
	return nil
 | 
						return nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (c *Cluster) updateCronJob(newJob *batchv1beta1.CronJob) error {
 | 
				
			||||||
 | 
						c.setProcessName("updating logical backup job")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						if c.logicalBackupJob == nil {
 | 
				
			||||||
 | 
							return fmt.Errorf("there is no logical backup job in the cluster")
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						patchData, err := specPatch(newJob.Spec)
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							return fmt.Errorf("could not form patch for the logical backup job: %v", err)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// update the backup job spec
 | 
				
			||||||
 | 
						job, err := c.KubeClient.CronJobsGetter.CronJobs(c.Namespace).Patch(
 | 
				
			||||||
 | 
							c.logicalBackupJob.Name,
 | 
				
			||||||
 | 
							types.MergePatchType,
 | 
				
			||||||
 | 
							patchData, "")
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							return fmt.Errorf("could not patch logical backup job: %v", err)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						c.logicalBackupJob = job
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						return nil
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// GetServiceMaster returns cluster's kubernetes master Service
 | 
					// GetServiceMaster returns cluster's kubernetes master Service
 | 
				
			||||||
func (c *Cluster) GetServiceMaster() *v1.Service {
 | 
					func (c *Cluster) GetServiceMaster() *v1.Service {
 | 
				
			||||||
	return c.Services[Master]
 | 
						return c.Services[Master]
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -3,6 +3,7 @@ package cluster
 | 
				
			||||||
import (
 | 
					import (
 | 
				
			||||||
	"fmt"
 | 
						"fmt"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						batchv1beta1 "k8s.io/api/batch/v1beta1"
 | 
				
			||||||
	"k8s.io/api/core/v1"
 | 
						"k8s.io/api/core/v1"
 | 
				
			||||||
	policybeta1 "k8s.io/api/policy/v1beta1"
 | 
						policybeta1 "k8s.io/api/policy/v1beta1"
 | 
				
			||||||
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 | 
						metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 | 
				
			||||||
| 
						 | 
					@ -92,6 +93,16 @@ func (c *Cluster) Sync(newSpec *acidv1.Postgresql) error {
 | 
				
			||||||
		return err
 | 
							return err
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// create a logical backup job unless we are running without pods or disable that feature explicitly
 | 
				
			||||||
 | 
						if c.Spec.EnableLogicalBackup && c.getNumberOfInstances(&newSpec.Spec) > 0 {
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							c.logger.Debug("syncing logical backup jobs")
 | 
				
			||||||
 | 
							if err = c.syncLogicalBackupJob(); err != nil {
 | 
				
			||||||
 | 
								err = fmt.Errorf("could not sync logical backup jobs: %v", err)
 | 
				
			||||||
 | 
								return err
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	return err
 | 
						return err
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -519,3 +530,47 @@ func (c *Cluster) syncDatabases() error {
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	return nil
 | 
						return nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (c *Cluster) syncLogicalBackupJob() error {
 | 
				
			||||||
 | 
						var (
 | 
				
			||||||
 | 
							job        *batchv1beta1.CronJob
 | 
				
			||||||
 | 
							desiredJob *batchv1beta1.CronJob
 | 
				
			||||||
 | 
							err        error
 | 
				
			||||||
 | 
						)
 | 
				
			||||||
 | 
						c.setProcessName("syncing the logical backup job")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						if job, err = c.KubeClient.CronJobsGetter.CronJobs(c.Namespace).Get(c.logicalBackupJob.Name, metav1.GetOptions{}); err == nil {
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							desiredJob, err = c.generateLogicalBackupJob()
 | 
				
			||||||
 | 
							if err = c.updateCronJob(desiredJob); err != nil {
 | 
				
			||||||
 | 
								return fmt.Errorf("could not generate the desired job state: %v", err)
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							if match, reason := k8sutil.SameCronJob(job, desiredJob); !match {
 | 
				
			||||||
 | 
								c.logCronJobChanges(job, desiredJob, false, reason)
 | 
				
			||||||
 | 
								if err = c.updateCronJob(desiredJob); err != nil {
 | 
				
			||||||
 | 
									return fmt.Errorf("could not update job to match desired state: %v", err)
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
								c.logger.Info("the logical backup job is in the desired state now")
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							return nil
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						if !k8sutil.ResourceNotFound(err) {
 | 
				
			||||||
 | 
							return fmt.Errorf("could not get logical backp job: %v", err)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						// no existing logical backup job, create new one
 | 
				
			||||||
 | 
						c.logger.Info("could not find the cluster's logical backup job")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						if err = c.createBackupCronJob(); err == nil {
 | 
				
			||||||
 | 
							c.logger.Infof("created missing logical backup job %q", util.NameFromMeta(job.ObjectMeta))
 | 
				
			||||||
 | 
						} else {
 | 
				
			||||||
 | 
							if !k8sutil.ResourceAlreadyExists(err) {
 | 
				
			||||||
 | 
								return fmt.Errorf("could not create missing logical backup job: %v", err)
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							c.logger.Infof("logical backup job %q already exists", util.NameFromMeta(job.ObjectMeta))
 | 
				
			||||||
 | 
							if job, err = c.KubeClient.CronJobsGetter.CronJobs(c.Namespace).Get(c.logicalBackupJob.Name, metav1.GetOptions{}); err != nil {
 | 
				
			||||||
 | 
								return fmt.Errorf("could not fetch existing logical backup job: %v", err)
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						return nil
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -12,6 +12,7 @@ import (
 | 
				
			||||||
	"time"
 | 
						"time"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	"k8s.io/api/apps/v1beta1"
 | 
						"k8s.io/api/apps/v1beta1"
 | 
				
			||||||
 | 
						batchv1beta1 "k8s.io/api/batch/v1beta1"
 | 
				
			||||||
	"k8s.io/api/core/v1"
 | 
						"k8s.io/api/core/v1"
 | 
				
			||||||
	policybeta1 "k8s.io/api/policy/v1beta1"
 | 
						policybeta1 "k8s.io/api/policy/v1beta1"
 | 
				
			||||||
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 | 
						metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 | 
				
			||||||
| 
						 | 
					@ -483,3 +484,20 @@ func (c *Cluster) GetSpec() (*acidv1.Postgresql, error) {
 | 
				
			||||||
func (c *Cluster) patroniUsesKubernetes() bool {
 | 
					func (c *Cluster) patroniUsesKubernetes() bool {
 | 
				
			||||||
	return c.OpConfig.EtcdHost == ""
 | 
						return c.OpConfig.EtcdHost == ""
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (c *Cluster) logCronJobChanges(old, new *batchv1beta1.CronJob, isUpdate bool, reason string) {
 | 
				
			||||||
 | 
						if isUpdate {
 | 
				
			||||||
 | 
							c.logger.Infof("logical job %q has been changed",
 | 
				
			||||||
 | 
								c.logicalBackupJob.Name,
 | 
				
			||||||
 | 
							)
 | 
				
			||||||
 | 
						} else {
 | 
				
			||||||
 | 
							c.logger.Infof("logical job %q is not in the desired state and needs to be updated",
 | 
				
			||||||
 | 
								c.logicalBackupJob.Name,
 | 
				
			||||||
 | 
							)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						c.logger.Debugf("diff\n%s\n", util.PrettyDiff(old.Spec, new.Spec))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						if reason != "" {
 | 
				
			||||||
 | 
							c.logger.Infof("reason: %s", reason)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -4,7 +4,8 @@ import (
 | 
				
			||||||
	"fmt"
 | 
						"fmt"
 | 
				
			||||||
	"reflect"
 | 
						"reflect"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	batchv1beta1 "k8s.io/client-go/kubernetes/typed/batch/v1beta1"
 | 
						batchv1beta1 "k8s.io/api/batch/v1beta1"
 | 
				
			||||||
 | 
						clientbatchv1beta1 "k8s.io/client-go/kubernetes/typed/batch/v1beta1"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	"github.com/zalando/postgres-operator/pkg/util/constants"
 | 
						"github.com/zalando/postgres-operator/pkg/util/constants"
 | 
				
			||||||
	"k8s.io/api/core/v1"
 | 
						"k8s.io/api/core/v1"
 | 
				
			||||||
| 
						 | 
					@ -39,7 +40,7 @@ type KubernetesClient struct {
 | 
				
			||||||
	rbacv1beta1.RoleBindingsGetter
 | 
						rbacv1beta1.RoleBindingsGetter
 | 
				
			||||||
	policyv1beta1.PodDisruptionBudgetsGetter
 | 
						policyv1beta1.PodDisruptionBudgetsGetter
 | 
				
			||||||
	apiextbeta1.CustomResourceDefinitionsGetter
 | 
						apiextbeta1.CustomResourceDefinitionsGetter
 | 
				
			||||||
	batchv1beta1.CronJobsGetter
 | 
						clientbatchv1beta1.CronJobsGetter
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	RESTClient      rest.Interface
 | 
						RESTClient      rest.Interface
 | 
				
			||||||
	AcidV1ClientSet *acidv1client.Clientset
 | 
						AcidV1ClientSet *acidv1client.Clientset
 | 
				
			||||||
| 
						 | 
					@ -145,3 +146,21 @@ func SamePDB(cur, new *policybeta1.PodDisruptionBudget) (match bool, reason stri
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	return
 | 
						return
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// SameCronJob compares Specs of logical backup cron jobs
 | 
				
			||||||
 | 
					func SameCronJob(cur, new *batchv1beta1.CronJob) (match bool, reason string) {
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						if cur.Spec.Schedule != new.Spec.Schedule {
 | 
				
			||||||
 | 
							return false, fmt.Sprintf("new job's schedule %q doesn't match the current one %q",
 | 
				
			||||||
 | 
								new.Spec.Schedule, cur.Spec.Schedule)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						newImage := new.Spec.JobTemplate.Spec.Template.Spec.Containers[0].Image
 | 
				
			||||||
 | 
						oldImage := cur.Spec.JobTemplate.Spec.Template.Spec.Containers[0].Image
 | 
				
			||||||
 | 
						if newImage != oldImage {
 | 
				
			||||||
 | 
							return false, fmt.Sprintf("new job's image %q doesn't match the current one %q",
 | 
				
			||||||
 | 
								newImage, oldImage)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						return true, ""
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
		Loading…
	
		Reference in New Issue