code cleanup
This commit is contained in:
parent
a73b986b45
commit
bd9d0fcd38
|
|
@ -582,7 +582,7 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error {
|
||||||
func() {
|
func() {
|
||||||
|
|
||||||
// create if it did not exist
|
// create if it did not exist
|
||||||
if newSpec.Spec.EnableLogicalBackup && !oldSpec.Spec.EnableLogicalBackup {
|
if !oldSpec.Spec.EnableLogicalBackup && newSpec.Spec.EnableLogicalBackup {
|
||||||
c.logger.Debugf("creating backup cron job")
|
c.logger.Debugf("creating backup cron job")
|
||||||
if err := c.createLogicalBackupJob(); err != nil {
|
if err := c.createLogicalBackupJob(); err != nil {
|
||||||
c.logger.Errorf("could not create a k8s cron job for logical backups: %v", err)
|
c.logger.Errorf("could not create a k8s cron job for logical backups: %v", err)
|
||||||
|
|
@ -603,9 +603,10 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// apply schedule changes
|
// apply schedule changes
|
||||||
if (c.logicalBackupJob != nil) &&
|
// this is the only parameter of logical backups a user can overwrite in the cluster manifest
|
||||||
|
if (newSpec.Spec.EnableLogicalBackup) &&
|
||||||
(newSpec.Spec.LogicalBackupSchedule != oldSpec.Spec.LogicalBackupSchedule) {
|
(newSpec.Spec.LogicalBackupSchedule != oldSpec.Spec.LogicalBackupSchedule) {
|
||||||
c.logger.Debugf("updating backup cron job")
|
c.logger.Debugf("updating schedule of the backup cron job")
|
||||||
if err := c.syncLogicalBackupJob(); err != nil {
|
if err := c.syncLogicalBackupJob(); err != nil {
|
||||||
c.logger.Errorf("could not sync logical backup jobs: %v", err)
|
c.logger.Errorf("could not sync logical backup jobs: %v", err)
|
||||||
updateFailed = true
|
updateFailed = true
|
||||||
|
|
@ -1067,7 +1068,7 @@ func (c *Cluster) deleteLogicalBackupJob() error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
c.logger.Debugf("removing the logical backup job")
|
c.logger.Debug("removing the logical backup job")
|
||||||
|
|
||||||
return c.KubeClient.CronJobsGetter.CronJobs(c.Namespace).Delete(c.logicalBackupJob.ObjectMeta.Name, c.deleteOptions)
|
return c.KubeClient.CronJobsGetter.CronJobs(c.Namespace).Delete(c.getLogicalBackupJobName(), c.deleteOptions)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -1407,3 +1407,8 @@ func (c *Cluster) generateLogicalBackupPodEnvVars() []v1.EnvVar {
|
||||||
|
|
||||||
return envVars
|
return envVars
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// getLogicalBackupJobName returns the name; the job itself may not exists
|
||||||
|
func (c *Cluster) getLogicalBackupJobName() (jobName string) {
|
||||||
|
return "logical-backup-" + c.clusterName().Namespace + "-" + c.clusterName().Name
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -629,12 +629,8 @@ func (c *Cluster) createLogicalBackupJob() (err error) {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Cluster) updateCronJob(newJob *batchv1beta1.CronJob) error {
|
func (c *Cluster) patchLogicalBackupJob(newJob *batchv1beta1.CronJob) error {
|
||||||
c.setProcessName("updating logical backup job")
|
c.setProcessName("patching logical backup job")
|
||||||
|
|
||||||
if c.logicalBackupJob == nil {
|
|
||||||
return fmt.Errorf("there is no logical backup job in the cluster")
|
|
||||||
}
|
|
||||||
|
|
||||||
patchData, err := specPatch(newJob.Spec)
|
patchData, err := specPatch(newJob.Spec)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
||||||
|
|
@ -94,11 +94,11 @@ func (c *Cluster) Sync(newSpec *acidv1.Postgresql) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// create a logical backup job unless we are running without pods or disable that feature explicitly
|
// create a logical backup job unless we are running without pods or disable that feature explicitly
|
||||||
if c.Spec.EnableLogicalBackup && c.getNumberOfInstances(&newSpec.Spec) > 0 {
|
if c.Spec.EnableLogicalBackup && c.getNumberOfInstances(&c.Spec) > 0 {
|
||||||
|
|
||||||
c.logger.Debug("syncing logical backup jobs")
|
c.logger.Debug("syncing logical backup job")
|
||||||
if err = c.syncLogicalBackupJob(); err != nil {
|
if err = c.syncLogicalBackupJob(); err != nil {
|
||||||
err = fmt.Errorf("could not sync logical backup jobs: %v", err)
|
err = fmt.Errorf("could not sync the logical backup job: %v", err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -539,24 +539,20 @@ func (c *Cluster) syncLogicalBackupJob() error {
|
||||||
)
|
)
|
||||||
c.setProcessName("syncing the logical backup job")
|
c.setProcessName("syncing the logical backup job")
|
||||||
|
|
||||||
// operator pod at startup syncs all clusters, logicalBackupJob will be nil at this point
|
// sync the job if it exists
|
||||||
if c.Postgresql.Spec.EnableLogicalBackup && c.logicalBackupJob == nil {
|
// NB operator pod at startup syncs all clusters, c.logicalBackupJob will be nil during such Sync
|
||||||
c.logicalBackupJob, err = c.generateLogicalBackupJob()
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("could not generate the desired cron job state, presumably during the first Sync on operator pod start-up: %v", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if job, err = c.KubeClient.CronJobsGetter.CronJobs(c.Namespace).Get(c.logicalBackupJob.Name, metav1.GetOptions{}); err == nil {
|
jobName := c.getLogicalBackupJobName()
|
||||||
|
if job, err = c.KubeClient.CronJobsGetter.CronJobs(c.Namespace).Get(jobName, metav1.GetOptions{}); err == nil {
|
||||||
|
|
||||||
desiredJob, err = c.generateLogicalBackupJob()
|
desiredJob, err = c.generateLogicalBackupJob()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("could not generate the desired cron job state: %v", err)
|
return fmt.Errorf("could not generate the desired logical backup job state: %v", err)
|
||||||
}
|
}
|
||||||
if match, reason := k8sutil.SameCronJob(job, desiredJob); !match {
|
if match, reason := k8sutil.SameLogicalBackupJob(job, desiredJob); !match {
|
||||||
c.logCronJobChanges(job, desiredJob, false, reason)
|
c.logLogicalBackupJobChanges(job, desiredJob, reason)
|
||||||
if err = c.updateCronJob(desiredJob); err != nil {
|
if err = c.patchLogicalBackupJob(desiredJob); err != nil {
|
||||||
return fmt.Errorf("could not update job to match desired state: %v", err)
|
return fmt.Errorf("could not update logical backup job to match desired state: %v", err)
|
||||||
}
|
}
|
||||||
c.logger.Info("the logical backup job is in the desired state now")
|
c.logger.Info("the logical backup job is in the desired state now")
|
||||||
}
|
}
|
||||||
|
|
@ -565,17 +561,18 @@ func (c *Cluster) syncLogicalBackupJob() error {
|
||||||
if !k8sutil.ResourceNotFound(err) {
|
if !k8sutil.ResourceNotFound(err) {
|
||||||
return fmt.Errorf("could not get logical backp job: %v", err)
|
return fmt.Errorf("could not get logical backp job: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// no existing logical backup job, create new one
|
// no existing logical backup job, create new one
|
||||||
c.logger.Info("could not find the cluster's logical backup job")
|
c.logger.Info("could not find the cluster's logical backup job")
|
||||||
|
|
||||||
if err = c.createLogicalBackupJob(); err == nil {
|
if err = c.createLogicalBackupJob(); err == nil {
|
||||||
c.logger.Infof("created missing logical backup job %q", c.logicalBackupJob.Name)
|
c.logger.Infof("created missing logical backup job %q", jobName)
|
||||||
} else {
|
} else {
|
||||||
if !k8sutil.ResourceAlreadyExists(err) {
|
if !k8sutil.ResourceAlreadyExists(err) {
|
||||||
return fmt.Errorf("could not create missing logical backup job: %v", err)
|
return fmt.Errorf("could not create missing logical backup job: %v", err)
|
||||||
}
|
}
|
||||||
c.logger.Infof("logical backup job %q already exists", util.NameFromMeta(job.ObjectMeta))
|
c.logger.Infof("logical backup job %q already exists", jobName)
|
||||||
if _, err = c.KubeClient.CronJobsGetter.CronJobs(c.Namespace).Get(c.logicalBackupJob.Name, metav1.GetOptions{}); err != nil {
|
if _, err = c.KubeClient.CronJobsGetter.CronJobs(c.Namespace).Get(jobName, metav1.GetOptions{}); err != nil {
|
||||||
return fmt.Errorf("could not fetch existing logical backup job: %v", err)
|
return fmt.Errorf("could not fetch existing logical backup job: %v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -485,17 +485,11 @@ func (c *Cluster) patroniUsesKubernetes() bool {
|
||||||
return c.OpConfig.EtcdHost == ""
|
return c.OpConfig.EtcdHost == ""
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Cluster) logCronJobChanges(old, new *batchv1beta1.CronJob, isUpdate bool, reason string) {
|
func (c *Cluster) logLogicalBackupJobChanges(old, new *batchv1beta1.CronJob, reason string) {
|
||||||
if isUpdate {
|
|
||||||
c.logger.Infof("logical job %q has been changed",
|
|
||||||
c.logicalBackupJob.Name,
|
|
||||||
)
|
|
||||||
} else {
|
|
||||||
c.logger.Infof("logical job %q is not in the desired state and needs to be updated",
|
|
||||||
c.logicalBackupJob.Name,
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
|
c.logger.Infof("logical job %q is not in the desired state and needs to be updated",
|
||||||
|
c.logicalBackupJob.Name,
|
||||||
|
)
|
||||||
if reason != "" {
|
if reason != "" {
|
||||||
c.logger.Infof("reason: %s", reason)
|
c.logger.Infof("reason: %s", reason)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -71,7 +71,7 @@ type LogicalBackup struct {
|
||||||
EnableLogicalBackup bool `name:"enable_logical_backup" default:"false"`
|
EnableLogicalBackup bool `name:"enable_logical_backup" default:"false"`
|
||||||
LogicalBackupSchedule string `name:"logical_backup_schedule" default:"30 00 * * *"`
|
LogicalBackupSchedule string `name:"logical_backup_schedule" default:"30 00 * * *"`
|
||||||
LogicalBackupDockerImage string `name:"logical_backup_docker_image" default:"registry.opensource.zalan.do/acid/logical-backup"`
|
LogicalBackupDockerImage string `name:"logical_backup_docker_image" default:"registry.opensource.zalan.do/acid/logical-backup"`
|
||||||
LogicalBackupS3Bucket string `name:"logical_backup_s3_bucket"`
|
LogicalBackupS3Bucket string `name:"logical_backup_s3_bucket" default:""`
|
||||||
}
|
}
|
||||||
|
|
||||||
// Config describes operator config
|
// Config describes operator config
|
||||||
|
|
|
||||||
|
|
@ -151,8 +151,8 @@ func getJobImage(cronJob *batchv1beta1.CronJob) string {
|
||||||
return cronJob.Spec.JobTemplate.Spec.Template.Spec.Containers[0].Image
|
return cronJob.Spec.JobTemplate.Spec.Template.Spec.Containers[0].Image
|
||||||
}
|
}
|
||||||
|
|
||||||
// SameCronJob compares Specs of logical backup cron jobs
|
// SameLogicalBackupJob compares Specs of logical backup cron jobs
|
||||||
func SameCronJob(cur, new *batchv1beta1.CronJob) (match bool, reason string) {
|
func SameLogicalBackupJob(cur, new *batchv1beta1.CronJob) (match bool, reason string) {
|
||||||
|
|
||||||
if cur.Spec.Schedule != new.Spec.Schedule {
|
if cur.Spec.Schedule != new.Spec.Schedule {
|
||||||
return false, fmt.Sprintf("new job's schedule %q doesn't match the current one %q",
|
return false, fmt.Sprintf("new job's schedule %q doesn't match the current one %q",
|
||||||
|
|
|
||||||
|
|
@ -256,7 +256,7 @@ function main(){
|
||||||
clean_up
|
clean_up
|
||||||
start_minikube
|
start_minikube
|
||||||
start_operator
|
start_operator
|
||||||
# submit_postgresql_manifest
|
submit_postgresql_manifest
|
||||||
forward_ports
|
forward_ports
|
||||||
check_health
|
check_health
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue