switch to batch API v1 for Jobs (#2066)
This commit is contained in:
parent
2aa52094db
commit
84fe38a069
|
|
@ -23,7 +23,7 @@ class K8sApi:
|
||||||
|
|
||||||
self.core_v1 = client.CoreV1Api()
|
self.core_v1 = client.CoreV1Api()
|
||||||
self.apps_v1 = client.AppsV1Api()
|
self.apps_v1 = client.AppsV1Api()
|
||||||
self.batch_v1_beta1 = client.BatchV1beta1Api()
|
self.batch_v1 = client.BatchV1Api()
|
||||||
self.custom_objects_api = client.CustomObjectsApi()
|
self.custom_objects_api = client.CustomObjectsApi()
|
||||||
self.policy_v1 = client.PolicyV1Api()
|
self.policy_v1 = client.PolicyV1Api()
|
||||||
self.storage_v1_api = client.StorageV1Api()
|
self.storage_v1_api = client.StorageV1Api()
|
||||||
|
|
@ -217,7 +217,7 @@ class K8s:
|
||||||
time.sleep(self.RETRY_TIMEOUT_SEC)
|
time.sleep(self.RETRY_TIMEOUT_SEC)
|
||||||
|
|
||||||
def get_logical_backup_job(self, namespace='default'):
|
def get_logical_backup_job(self, namespace='default'):
|
||||||
return self.api.batch_v1_beta1.list_namespaced_cron_job(namespace, label_selector="application=spilo")
|
return self.api.batch_v1.list_namespaced_cron_job(namespace, label_selector="application=spilo")
|
||||||
|
|
||||||
def wait_for_logical_backup_job(self, expected_num_of_jobs):
|
def wait_for_logical_backup_job(self, expected_num_of_jobs):
|
||||||
while (len(self.get_logical_backup_job().items) != expected_num_of_jobs):
|
while (len(self.get_logical_backup_job().items) != expected_num_of_jobs):
|
||||||
|
|
@ -499,7 +499,7 @@ class K8sBase:
|
||||||
time.sleep(self.RETRY_TIMEOUT_SEC)
|
time.sleep(self.RETRY_TIMEOUT_SEC)
|
||||||
|
|
||||||
def get_logical_backup_job(self, namespace='default'):
|
def get_logical_backup_job(self, namespace='default'):
|
||||||
return self.api.batch_v1_beta1.list_namespaced_cron_job(namespace, label_selector="application=spilo")
|
return self.api.batch_v1.list_namespaced_cron_job(namespace, label_selector="application=spilo")
|
||||||
|
|
||||||
def wait_for_logical_backup_job(self, expected_num_of_jobs):
|
def wait_for_logical_backup_job(self, expected_num_of_jobs):
|
||||||
while (len(self.get_logical_backup_job().items) != expected_num_of_jobs):
|
while (len(self.get_logical_backup_job().items) != expected_num_of_jobs):
|
||||||
|
|
|
||||||
|
|
@ -29,7 +29,6 @@ import (
|
||||||
"github.com/zalando/postgres-operator/pkg/util/patroni"
|
"github.com/zalando/postgres-operator/pkg/util/patroni"
|
||||||
"github.com/zalando/postgres-operator/pkg/util/retryutil"
|
"github.com/zalando/postgres-operator/pkg/util/retryutil"
|
||||||
batchv1 "k8s.io/api/batch/v1"
|
batchv1 "k8s.io/api/batch/v1"
|
||||||
batchv1beta1 "k8s.io/api/batch/v1beta1"
|
|
||||||
"k8s.io/apimachinery/pkg/labels"
|
"k8s.io/apimachinery/pkg/labels"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
@ -2017,7 +2016,7 @@ func (c *Cluster) getClusterServiceConnectionParameters(clusterName string) (hos
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Cluster) generateLogicalBackupJob() (*batchv1beta1.CronJob, error) {
|
func (c *Cluster) generateLogicalBackupJob() (*batchv1.CronJob, error) {
|
||||||
|
|
||||||
var (
|
var (
|
||||||
err error
|
err error
|
||||||
|
|
@ -2108,7 +2107,7 @@ func (c *Cluster) generateLogicalBackupJob() (*batchv1beta1.CronJob, error) {
|
||||||
|
|
||||||
// configure a cron job
|
// configure a cron job
|
||||||
|
|
||||||
jobTemplateSpec := batchv1beta1.JobTemplateSpec{
|
jobTemplateSpec := batchv1.JobTemplateSpec{
|
||||||
Spec: jobSpec,
|
Spec: jobSpec,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -2117,17 +2116,17 @@ func (c *Cluster) generateLogicalBackupJob() (*batchv1beta1.CronJob, error) {
|
||||||
schedule = c.OpConfig.LogicalBackupSchedule
|
schedule = c.OpConfig.LogicalBackupSchedule
|
||||||
}
|
}
|
||||||
|
|
||||||
cronJob := &batchv1beta1.CronJob{
|
cronJob := &batchv1.CronJob{
|
||||||
ObjectMeta: metav1.ObjectMeta{
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
Name: c.getLogicalBackupJobName(),
|
Name: c.getLogicalBackupJobName(),
|
||||||
Namespace: c.Namespace,
|
Namespace: c.Namespace,
|
||||||
Labels: c.labelsSet(true),
|
Labels: c.labelsSet(true),
|
||||||
Annotations: c.annotationsSet(nil),
|
Annotations: c.annotationsSet(nil),
|
||||||
},
|
},
|
||||||
Spec: batchv1beta1.CronJobSpec{
|
Spec: batchv1.CronJobSpec{
|
||||||
Schedule: schedule,
|
Schedule: schedule,
|
||||||
JobTemplate: jobTemplateSpec,
|
JobTemplate: jobTemplateSpec,
|
||||||
ConcurrencyPolicy: batchv1beta1.ForbidConcurrent,
|
ConcurrencyPolicy: batchv1.ForbidConcurrent,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -7,7 +7,7 @@ import (
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
appsv1 "k8s.io/api/apps/v1"
|
appsv1 "k8s.io/api/apps/v1"
|
||||||
batchv1beta1 "k8s.io/api/batch/v1beta1"
|
batchv1 "k8s.io/api/batch/v1"
|
||||||
v1 "k8s.io/api/core/v1"
|
v1 "k8s.io/api/core/v1"
|
||||||
policyv1 "k8s.io/api/policy/v1"
|
policyv1 "k8s.io/api/policy/v1"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
|
|
@ -546,7 +546,7 @@ func (c *Cluster) createLogicalBackupJob() (err error) {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Cluster) patchLogicalBackupJob(newJob *batchv1beta1.CronJob) error {
|
func (c *Cluster) patchLogicalBackupJob(newJob *batchv1.CronJob) error {
|
||||||
c.setProcessName("patching logical backup job")
|
c.setProcessName("patching logical backup job")
|
||||||
|
|
||||||
patchData, err := specPatch(newJob.Spec)
|
patchData, err := specPatch(newJob.Spec)
|
||||||
|
|
|
||||||
|
|
@ -15,7 +15,7 @@ import (
|
||||||
"github.com/zalando/postgres-operator/pkg/util"
|
"github.com/zalando/postgres-operator/pkg/util"
|
||||||
"github.com/zalando/postgres-operator/pkg/util/constants"
|
"github.com/zalando/postgres-operator/pkg/util/constants"
|
||||||
"github.com/zalando/postgres-operator/pkg/util/k8sutil"
|
"github.com/zalando/postgres-operator/pkg/util/k8sutil"
|
||||||
batchv1beta1 "k8s.io/api/batch/v1beta1"
|
batchv1 "k8s.io/api/batch/v1"
|
||||||
v1 "k8s.io/api/core/v1"
|
v1 "k8s.io/api/core/v1"
|
||||||
policyv1 "k8s.io/api/policy/v1"
|
policyv1 "k8s.io/api/policy/v1"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
|
|
@ -1145,8 +1145,8 @@ func (c *Cluster) syncExtensions(extensions map[string]string) error {
|
||||||
|
|
||||||
func (c *Cluster) syncLogicalBackupJob() error {
|
func (c *Cluster) syncLogicalBackupJob() error {
|
||||||
var (
|
var (
|
||||||
job *batchv1beta1.CronJob
|
job *batchv1.CronJob
|
||||||
desiredJob *batchv1beta1.CronJob
|
desiredJob *batchv1.CronJob
|
||||||
err error
|
err error
|
||||||
)
|
)
|
||||||
c.setProcessName("syncing the logical backup job")
|
c.setProcessName("syncing the logical backup job")
|
||||||
|
|
|
||||||
|
|
@ -8,8 +8,8 @@ import (
|
||||||
b64 "encoding/base64"
|
b64 "encoding/base64"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
|
|
||||||
batchv1beta1 "k8s.io/api/batch/v1beta1"
|
batchv1 "k8s.io/api/batch/v1"
|
||||||
clientbatchv1beta1 "k8s.io/client-go/kubernetes/typed/batch/v1beta1"
|
clientbatchv1 "k8s.io/client-go/kubernetes/typed/batch/v1"
|
||||||
|
|
||||||
apiacidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1"
|
apiacidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1"
|
||||||
zalandoclient "github.com/zalando/postgres-operator/pkg/generated/clientset/versioned"
|
zalandoclient "github.com/zalando/postgres-operator/pkg/generated/clientset/versioned"
|
||||||
|
|
@ -63,7 +63,7 @@ type KubernetesClient struct {
|
||||||
rbacv1.RoleBindingsGetter
|
rbacv1.RoleBindingsGetter
|
||||||
policyv1.PodDisruptionBudgetsGetter
|
policyv1.PodDisruptionBudgetsGetter
|
||||||
apiextv1.CustomResourceDefinitionsGetter
|
apiextv1.CustomResourceDefinitionsGetter
|
||||||
clientbatchv1beta1.CronJobsGetter
|
clientbatchv1.CronJobsGetter
|
||||||
acidv1.OperatorConfigurationsGetter
|
acidv1.OperatorConfigurationsGetter
|
||||||
acidv1.PostgresTeamsGetter
|
acidv1.PostgresTeamsGetter
|
||||||
acidv1.PostgresqlsGetter
|
acidv1.PostgresqlsGetter
|
||||||
|
|
@ -159,7 +159,7 @@ func NewFromConfig(cfg *rest.Config) (KubernetesClient, error) {
|
||||||
kubeClient.PodDisruptionBudgetsGetter = client.PolicyV1()
|
kubeClient.PodDisruptionBudgetsGetter = client.PolicyV1()
|
||||||
kubeClient.RESTClient = client.CoreV1().RESTClient()
|
kubeClient.RESTClient = client.CoreV1().RESTClient()
|
||||||
kubeClient.RoleBindingsGetter = client.RbacV1()
|
kubeClient.RoleBindingsGetter = client.RbacV1()
|
||||||
kubeClient.CronJobsGetter = client.BatchV1beta1()
|
kubeClient.CronJobsGetter = client.BatchV1()
|
||||||
kubeClient.EventsGetter = client.CoreV1()
|
kubeClient.EventsGetter = client.CoreV1()
|
||||||
|
|
||||||
apiextClient, err := apiextclient.NewForConfig(cfg)
|
apiextClient, err := apiextclient.NewForConfig(cfg)
|
||||||
|
|
@ -224,12 +224,12 @@ func SamePDB(cur, new *apipolicyv1.PodDisruptionBudget) (match bool, reason stri
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func getJobImage(cronJob *batchv1beta1.CronJob) string {
|
func getJobImage(cronJob *batchv1.CronJob) string {
|
||||||
return cronJob.Spec.JobTemplate.Spec.Template.Spec.Containers[0].Image
|
return cronJob.Spec.JobTemplate.Spec.Template.Spec.Containers[0].Image
|
||||||
}
|
}
|
||||||
|
|
||||||
// SameLogicalBackupJob compares Specs of logical backup cron jobs
|
// SameLogicalBackupJob compares Specs of logical backup cron jobs
|
||||||
func SameLogicalBackupJob(cur, new *batchv1beta1.CronJob) (match bool, reason string) {
|
func SameLogicalBackupJob(cur, new *batchv1.CronJob) (match bool, reason string) {
|
||||||
|
|
||||||
if cur.Spec.Schedule != new.Spec.Schedule {
|
if cur.Spec.Schedule != new.Spec.Schedule {
|
||||||
return false, fmt.Sprintf("new job's schedule %q does not match the current one %q",
|
return false, fmt.Sprintf("new job's schedule %q does not match the current one %q",
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue