merge with master
This commit is contained in:
		
						commit
						0225f181b9
					
				|  | @ -42,6 +42,13 @@ rules: | |||
|   - configmaps | ||||
|   verbs: | ||||
|   - get | ||||
| # to send events to the CRs | ||||
| - apiGroups: | ||||
|   - "" | ||||
|   resources: | ||||
|   - events | ||||
|   verbs: | ||||
|   - create | ||||
| # to manage endpoints which are also used by Patroni | ||||
| - apiGroups: | ||||
|   - "" | ||||
|  |  | |||
|  | @ -43,6 +43,13 @@ rules: | |||
|   - configmaps | ||||
|   verbs: | ||||
|   - get | ||||
| # to send events to the CRs | ||||
| - apiGroups: | ||||
|   - "" | ||||
|   resources: | ||||
|   - events | ||||
|   verbs: | ||||
|   - create | ||||
| # to manage endpoints which are also used by Patroni | ||||
| - apiGroups: | ||||
|   - "" | ||||
|  |  | |||
|  | @ -22,8 +22,11 @@ import ( | |||
| 	"k8s.io/apimachinery/pkg/types" | ||||
| 	"k8s.io/client-go/rest" | ||||
| 	"k8s.io/client-go/tools/cache" | ||||
| 	"k8s.io/client-go/tools/record" | ||||
| 	"k8s.io/client-go/tools/reference" | ||||
| 
 | ||||
| 	acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1" | ||||
| 	"github.com/zalando/postgres-operator/pkg/generated/clientset/versioned/scheme" | ||||
| 	"github.com/zalando/postgres-operator/pkg/spec" | ||||
| 	"github.com/zalando/postgres-operator/pkg/util" | ||||
| 	"github.com/zalando/postgres-operator/pkg/util/config" | ||||
|  | @ -82,6 +85,7 @@ type Cluster struct { | |||
| 	acidv1.Postgresql | ||||
| 	Config | ||||
| 	logger           *logrus.Entry | ||||
| 	eventRecorder    record.EventRecorder | ||||
| 	patroni          patroni.Interface | ||||
| 	pgUsers          map[string]spec.PgUser | ||||
| 	systemUsers      map[string]spec.PgUser | ||||
|  | @ -110,7 +114,7 @@ type compareStatefulsetResult struct { | |||
| } | ||||
| 
 | ||||
| // New creates a new cluster. This function should be called from a controller.
 | ||||
| func New(cfg Config, kubeClient k8sutil.KubernetesClient, pgSpec acidv1.Postgresql, logger *logrus.Entry) *Cluster { | ||||
| func New(cfg Config, kubeClient k8sutil.KubernetesClient, pgSpec acidv1.Postgresql, logger *logrus.Entry, eventRecorder record.EventRecorder) *Cluster { | ||||
| 	deletePropagationPolicy := metav1.DeletePropagationOrphan | ||||
| 
 | ||||
| 	podEventsQueue := cache.NewFIFO(func(obj interface{}) (string, error) { | ||||
|  | @ -141,7 +145,7 @@ func New(cfg Config, kubeClient k8sutil.KubernetesClient, pgSpec acidv1.Postgres | |||
| 	cluster.teamsAPIClient = teams.NewTeamsAPI(cfg.OpConfig.TeamsAPIUrl, logger) | ||||
| 	cluster.oauthTokenGetter = newSecretOauthTokenGetter(&kubeClient, cfg.OpConfig.OAuthTokenSecretName) | ||||
| 	cluster.patroni = patroni.New(cluster.logger) | ||||
| 
 | ||||
| 	cluster.eventRecorder = eventRecorder | ||||
| 	return cluster | ||||
| } | ||||
| 
 | ||||
|  | @ -167,6 +171,16 @@ func (c *Cluster) setProcessName(procName string, args ...interface{}) { | |||
| 	} | ||||
| } | ||||
| 
 | ||||
| // GetReference of Postgres CR object
 | ||||
| // i.e. required to emit events to this resource
 | ||||
| func (c *Cluster) GetReference() *v1.ObjectReference { | ||||
| 	ref, err := reference.GetReference(scheme.Scheme, &c.Postgresql) | ||||
| 	if err != nil { | ||||
| 		c.logger.Errorf("could not get reference for Postgresql CR %v/%v: %v", c.Postgresql.Namespace, c.Postgresql.Name, err) | ||||
| 	} | ||||
| 	return ref | ||||
| } | ||||
| 
 | ||||
| // SetStatus of Postgres cluster
 | ||||
| // TODO: eventually switch to updateStatus() for kubernetes 1.11 and above
 | ||||
| func (c *Cluster) setStatus(status string) { | ||||
|  | @ -250,6 +264,7 @@ func (c *Cluster) Create() error { | |||
| 	}() | ||||
| 
 | ||||
| 	c.setStatus(acidv1.ClusterStatusCreating) | ||||
| 	c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Create", "Started creation of new cluster resources") | ||||
| 
 | ||||
| 	if err = c.enforceMinResourceLimits(&c.Spec); err != nil { | ||||
| 		return fmt.Errorf("could not enforce minimum resource limits: %v", err) | ||||
|  | @ -268,6 +283,7 @@ func (c *Cluster) Create() error { | |||
| 				return fmt.Errorf("could not create %s endpoint: %v", role, err) | ||||
| 			} | ||||
| 			c.logger.Infof("endpoint %q has been successfully created", util.NameFromMeta(ep.ObjectMeta)) | ||||
| 			c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeNormal, "Endpoints", "Endpoint %q has been successfully created", util.NameFromMeta(ep.ObjectMeta)) | ||||
| 		} | ||||
| 
 | ||||
| 		if c.Services[role] != nil { | ||||
|  | @ -278,6 +294,7 @@ func (c *Cluster) Create() error { | |||
| 			return fmt.Errorf("could not create %s service: %v", role, err) | ||||
| 		} | ||||
| 		c.logger.Infof("%s service %q has been successfully created", role, util.NameFromMeta(service.ObjectMeta)) | ||||
| 		c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeNormal, "Services", "The service %q for role %s has been successfully created", util.NameFromMeta(service.ObjectMeta), role) | ||||
| 	} | ||||
| 
 | ||||
| 	if err = c.initUsers(); err != nil { | ||||
|  | @ -289,6 +306,7 @@ func (c *Cluster) Create() error { | |||
| 		return fmt.Errorf("could not create secrets: %v", err) | ||||
| 	} | ||||
| 	c.logger.Infof("secrets have been successfully created") | ||||
| 	c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Secrets", "The secrets have been successfully created") | ||||
| 
 | ||||
| 	if c.PodDisruptionBudget != nil { | ||||
| 		return fmt.Errorf("pod disruption budget already exists in the cluster") | ||||
|  | @ -307,6 +325,7 @@ func (c *Cluster) Create() error { | |||
| 		return fmt.Errorf("could not create statefulset: %v", err) | ||||
| 	} | ||||
| 	c.logger.Infof("statefulset %q has been successfully created", util.NameFromMeta(ss.ObjectMeta)) | ||||
| 	c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeNormal, "StatefulSet", "Statefulset %q has been successfully created", util.NameFromMeta(ss.ObjectMeta)) | ||||
| 
 | ||||
| 	c.logger.Info("waiting for the cluster being ready") | ||||
| 
 | ||||
|  | @ -315,6 +334,7 @@ func (c *Cluster) Create() error { | |||
| 		return err | ||||
| 	} | ||||
| 	c.logger.Infof("pods are ready") | ||||
| 	c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "StatefulSet", "Pods are ready") | ||||
| 
 | ||||
| 	// create database objects unless we are running without pods or disabled
 | ||||
| 	// that feature explicitly
 | ||||
|  | @ -563,6 +583,7 @@ func (c *Cluster) enforceMinResourceLimits(spec *acidv1.PostgresSpec) error { | |||
| 		} | ||||
| 		if isSmaller { | ||||
| 			c.logger.Warningf("defined CPU limit %s is below required minimum %s and will be set to it", cpuLimit, minCPULimit) | ||||
| 			c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeWarning, "ResourceLimits", "defined CPU limit %s is below required minimum %s and will be set to it", cpuLimit, minCPULimit) | ||||
| 			spec.Resources.ResourceLimits.CPU = minCPULimit | ||||
| 		} | ||||
| 	} | ||||
|  | @ -575,6 +596,7 @@ func (c *Cluster) enforceMinResourceLimits(spec *acidv1.PostgresSpec) error { | |||
| 		} | ||||
| 		if isSmaller { | ||||
| 			c.logger.Warningf("defined memory limit %s is below required minimum %s and will be set to it", memoryLimit, minMemoryLimit) | ||||
| 			c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeWarning, "ResourceLimits", "defined memory limit %s is below required minimum %s and will be set to it", memoryLimit, minMemoryLimit) | ||||
| 			spec.Resources.ResourceLimits.Memory = minMemoryLimit | ||||
| 		} | ||||
| 	} | ||||
|  | @ -606,6 +628,8 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error { | |||
| 	if oldSpec.Spec.PostgresqlParam.PgVersion != newSpec.Spec.PostgresqlParam.PgVersion { // PG versions comparison
 | ||||
| 		c.logger.Warningf("postgresql version change(%q -> %q) has no effect", | ||||
| 			oldSpec.Spec.PostgresqlParam.PgVersion, newSpec.Spec.PostgresqlParam.PgVersion) | ||||
| 		c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeWarning, "PostgreSQL", "postgresql version change(%q -> %q) has no effect", | ||||
| 			oldSpec.Spec.PostgresqlParam.PgVersion, newSpec.Spec.PostgresqlParam.PgVersion) | ||||
| 		//we need that hack to generate statefulset with the old version
 | ||||
| 		newSpec.Spec.PostgresqlParam.PgVersion = oldSpec.Spec.PostgresqlParam.PgVersion | ||||
| 	} | ||||
|  | @ -773,6 +797,7 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error { | |||
| func (c *Cluster) Delete() { | ||||
| 	c.mu.Lock() | ||||
| 	defer c.mu.Unlock() | ||||
| 	c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Delete", "Started deletion of new cluster resources") | ||||
| 
 | ||||
| 	// delete the backup job before the stateful set of the cluster to prevent connections to non-existing pods
 | ||||
| 	// deleting the cron job also removes pods and batch jobs it created
 | ||||
|  | @ -1207,6 +1232,7 @@ func (c *Cluster) Switchover(curMaster *v1.Pod, candidate spec.NamespacedName) e | |||
| 
 | ||||
| 	var err error | ||||
| 	c.logger.Debugf("switching over from %q to %q", curMaster.Name, candidate) | ||||
| 	c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeNormal, "Switchover", "Switching over from %q to %q", curMaster.Name, candidate) | ||||
| 
 | ||||
| 	var wg sync.WaitGroup | ||||
| 
 | ||||
|  | @ -1233,6 +1259,7 @@ func (c *Cluster) Switchover(curMaster *v1.Pod, candidate spec.NamespacedName) e | |||
| 
 | ||||
| 	if err = c.patroni.Switchover(curMaster, candidate.Name); err == nil { | ||||
| 		c.logger.Debugf("successfully switched over from %q to %q", curMaster.Name, candidate) | ||||
| 		c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeNormal, "Switchover", "Successfully switched over from %q to %q", curMaster.Name, candidate) | ||||
| 		if err = <-podLabelErr; err != nil { | ||||
| 			err = fmt.Errorf("could not get master pod label: %v", err) | ||||
| 		} | ||||
|  | @ -1248,6 +1275,7 @@ func (c *Cluster) Switchover(curMaster *v1.Pod, candidate spec.NamespacedName) e | |||
| 	// close the label waiting channel no sooner than the waiting goroutine terminates.
 | ||||
| 	close(podLabelErr) | ||||
| 
 | ||||
| 	c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeNormal, "Switchover", "Switchover from %q to %q FAILED: %v", curMaster.Name, candidate, err) | ||||
| 	return err | ||||
| 
 | ||||
| } | ||||
|  |  | |||
|  | @ -14,6 +14,7 @@ import ( | |||
| 	"github.com/zalando/postgres-operator/pkg/util/teams" | ||||
| 	v1 "k8s.io/api/core/v1" | ||||
| 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||||
| 	"k8s.io/client-go/tools/record" | ||||
| ) | ||||
| 
 | ||||
| const ( | ||||
|  | @ -22,6 +23,8 @@ const ( | |||
| ) | ||||
| 
 | ||||
| var logger = logrus.New().WithField("test", "cluster") | ||||
| var eventRecorder = record.NewFakeRecorder(1) | ||||
| 
 | ||||
| var cl = New( | ||||
| 	Config{ | ||||
| 		OpConfig: config.Config{ | ||||
|  | @ -35,6 +38,7 @@ var cl = New( | |||
| 	k8sutil.NewMockKubernetesClient(), | ||||
| 	acidv1.Postgresql{ObjectMeta: metav1.ObjectMeta{Name: "acid-test", Namespace: "test"}}, | ||||
| 	logger, | ||||
| 	eventRecorder, | ||||
| ) | ||||
| 
 | ||||
| func TestInitRobotUsers(t *testing.T) { | ||||
|  |  | |||
|  | @ -37,7 +37,7 @@ func TestGenerateSpiloJSONConfiguration(t *testing.T) { | |||
| 					ReplicationUsername: replicationUserName, | ||||
| 				}, | ||||
| 			}, | ||||
| 		}, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger) | ||||
| 		}, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger, eventRecorder) | ||||
| 
 | ||||
| 	testName := "TestGenerateSpiloConfig" | ||||
| 	tests := []struct { | ||||
|  | @ -102,7 +102,7 @@ func TestCreateLoadBalancerLogic(t *testing.T) { | |||
| 					ReplicationUsername: replicationUserName, | ||||
| 				}, | ||||
| 			}, | ||||
| 		}, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger) | ||||
| 		}, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger, eventRecorder) | ||||
| 
 | ||||
| 	testName := "TestCreateLoadBalancerLogic" | ||||
| 	tests := []struct { | ||||
|  | @ -164,7 +164,8 @@ func TestGeneratePodDisruptionBudget(t *testing.T) { | |||
| 				acidv1.Postgresql{ | ||||
| 					ObjectMeta: metav1.ObjectMeta{Name: "myapp-database", Namespace: "myapp"}, | ||||
| 					Spec:       acidv1.PostgresSpec{TeamID: "myapp", NumberOfInstances: 3}}, | ||||
| 				logger), | ||||
| 				logger, | ||||
| 				eventRecorder), | ||||
| 			policyv1beta1.PodDisruptionBudget{ | ||||
| 				ObjectMeta: metav1.ObjectMeta{ | ||||
| 					Name:      "postgres-myapp-database-pdb", | ||||
|  | @ -187,7 +188,8 @@ func TestGeneratePodDisruptionBudget(t *testing.T) { | |||
| 				acidv1.Postgresql{ | ||||
| 					ObjectMeta: metav1.ObjectMeta{Name: "myapp-database", Namespace: "myapp"}, | ||||
| 					Spec:       acidv1.PostgresSpec{TeamID: "myapp", NumberOfInstances: 0}}, | ||||
| 				logger), | ||||
| 				logger, | ||||
| 				eventRecorder), | ||||
| 			policyv1beta1.PodDisruptionBudget{ | ||||
| 				ObjectMeta: metav1.ObjectMeta{ | ||||
| 					Name:      "postgres-myapp-database-pdb", | ||||
|  | @ -210,7 +212,8 @@ func TestGeneratePodDisruptionBudget(t *testing.T) { | |||
| 				acidv1.Postgresql{ | ||||
| 					ObjectMeta: metav1.ObjectMeta{Name: "myapp-database", Namespace: "myapp"}, | ||||
| 					Spec:       acidv1.PostgresSpec{TeamID: "myapp", NumberOfInstances: 3}}, | ||||
| 				logger), | ||||
| 				logger, | ||||
| 				eventRecorder), | ||||
| 			policyv1beta1.PodDisruptionBudget{ | ||||
| 				ObjectMeta: metav1.ObjectMeta{ | ||||
| 					Name:      "postgres-myapp-database-pdb", | ||||
|  | @ -233,7 +236,8 @@ func TestGeneratePodDisruptionBudget(t *testing.T) { | |||
| 				acidv1.Postgresql{ | ||||
| 					ObjectMeta: metav1.ObjectMeta{Name: "myapp-database", Namespace: "myapp"}, | ||||
| 					Spec:       acidv1.PostgresSpec{TeamID: "myapp", NumberOfInstances: 3}}, | ||||
| 				logger), | ||||
| 				logger, | ||||
| 				eventRecorder), | ||||
| 			policyv1beta1.PodDisruptionBudget{ | ||||
| 				ObjectMeta: metav1.ObjectMeta{ | ||||
| 					Name:      "postgres-myapp-database-databass-budget", | ||||
|  | @ -368,7 +372,7 @@ func TestCloneEnv(t *testing.T) { | |||
| 					ReplicationUsername: replicationUserName, | ||||
| 				}, | ||||
| 			}, | ||||
| 		}, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger) | ||||
| 		}, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger, eventRecorder) | ||||
| 
 | ||||
| 	for _, tt := range tests { | ||||
| 		envs := cluster.generateCloneEnvironment(tt.cloneOpts) | ||||
|  | @ -502,7 +506,7 @@ func TestGetPgVersion(t *testing.T) { | |||
| 					ReplicationUsername: replicationUserName, | ||||
| 				}, | ||||
| 			}, | ||||
| 		}, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger) | ||||
| 		}, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger, eventRecorder) | ||||
| 
 | ||||
| 	for _, tt := range tests { | ||||
| 		pgVersion, err := cluster.getNewPgVersion(tt.pgContainer, tt.newPgVersion) | ||||
|  | @ -678,7 +682,7 @@ func TestConnectionPoolerPodSpec(t *testing.T) { | |||
| 					ConnectionPoolerDefaultMemoryLimit:   "100Mi", | ||||
| 				}, | ||||
| 			}, | ||||
| 		}, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger) | ||||
| 		}, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger, eventRecorder) | ||||
| 
 | ||||
| 	var clusterNoDefaultRes = New( | ||||
| 		Config{ | ||||
|  | @ -690,7 +694,7 @@ func TestConnectionPoolerPodSpec(t *testing.T) { | |||
| 				}, | ||||
| 				ConnectionPooler: config.ConnectionPooler{}, | ||||
| 			}, | ||||
| 		}, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger) | ||||
| 		}, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger, eventRecorder) | ||||
| 
 | ||||
| 	noCheck := func(cluster *Cluster, podSpec *v1.PodTemplateSpec) error { return nil } | ||||
| 
 | ||||
|  | @ -803,7 +807,7 @@ func TestConnectionPoolerDeploymentSpec(t *testing.T) { | |||
| 					ConnectionPoolerDefaultMemoryLimit:   "100Mi", | ||||
| 				}, | ||||
| 			}, | ||||
| 		}, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger) | ||||
| 		}, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger, eventRecorder) | ||||
| 	cluster.Statefulset = &appsv1.StatefulSet{ | ||||
| 		ObjectMeta: metav1.ObjectMeta{ | ||||
| 			Name: "test-sts", | ||||
|  | @ -904,7 +908,7 @@ func TestConnectionPoolerServiceSpec(t *testing.T) { | |||
| 					ConnectionPoolerDefaultMemoryLimit:   "100Mi", | ||||
| 				}, | ||||
| 			}, | ||||
| 		}, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger) | ||||
| 		}, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger, eventRecorder) | ||||
| 	cluster.Statefulset = &appsv1.StatefulSet{ | ||||
| 		ObjectMeta: metav1.ObjectMeta{ | ||||
| 			Name: "test-sts", | ||||
|  | @ -990,7 +994,7 @@ func TestTLS(t *testing.T) { | |||
| 					SpiloFSGroup: &spiloFSGroup, | ||||
| 				}, | ||||
| 			}, | ||||
| 		}, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger) | ||||
| 		}, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger, eventRecorder) | ||||
| 	spec = makeSpec(acidv1.TLSDescription{SecretName: "my-secret", CAFile: "ca.crt"}) | ||||
| 	s, err := cluster.generateStatefulSet(&spec) | ||||
| 	if err != nil { | ||||
|  | @ -1112,7 +1116,7 @@ func TestAdditionalVolume(t *testing.T) { | |||
| 					ReplicationUsername: replicationUserName, | ||||
| 				}, | ||||
| 			}, | ||||
| 		}, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger) | ||||
| 		}, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger, eventRecorder) | ||||
| 
 | ||||
| 	for _, tt := range tests { | ||||
| 		// Test with additional volume mounted in all containers
 | ||||
|  |  | |||
|  | @ -36,7 +36,7 @@ func TestConnectionPoolerCreationAndDeletion(t *testing.T) { | |||
| 					ConnectionPoolerDefaultMemoryLimit:   "100Mi", | ||||
| 				}, | ||||
| 			}, | ||||
| 		}, k8sutil.NewMockKubernetesClient(), acidv1.Postgresql{}, logger) | ||||
| 		}, k8sutil.NewMockKubernetesClient(), acidv1.Postgresql{}, logger, eventRecorder) | ||||
| 
 | ||||
| 	cluster.Statefulset = &appsv1.StatefulSet{ | ||||
| 		ObjectMeta: metav1.ObjectMeta{ | ||||
|  | @ -85,7 +85,7 @@ func TestNeedConnectionPooler(t *testing.T) { | |||
| 					ConnectionPoolerDefaultMemoryLimit:   "100Mi", | ||||
| 				}, | ||||
| 			}, | ||||
| 		}, k8sutil.NewMockKubernetesClient(), acidv1.Postgresql{}, logger) | ||||
| 		}, k8sutil.NewMockKubernetesClient(), acidv1.Postgresql{}, logger, eventRecorder) | ||||
| 
 | ||||
| 	cluster.Spec = acidv1.PostgresSpec{ | ||||
| 		ConnectionPooler: &acidv1.ConnectionPooler{}, | ||||
|  |  | |||
|  | @ -349,10 +349,12 @@ func (c *Cluster) syncStatefulSet() error { | |||
| 	// statefulset or those that got their configuration from the outdated statefulset)
 | ||||
| 	if podsRollingUpdateRequired { | ||||
| 		c.logger.Debugln("performing rolling update") | ||||
| 		c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Update", "Performing rolling update") | ||||
| 		if err := c.recreatePods(); err != nil { | ||||
| 			return fmt.Errorf("could not recreate pods: %v", err) | ||||
| 		} | ||||
| 		c.logger.Infof("pods have been recreated") | ||||
| 		c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Update", "Rolling update done - pods have been recreated") | ||||
| 		if err := c.applyRollingUpdateFlagforStatefulSet(false); err != nil { | ||||
| 			c.logger.Warningf("could not clear rolling update for the statefulset: %v", err) | ||||
| 		} | ||||
|  |  | |||
|  | @ -79,7 +79,7 @@ func TestConnectionPoolerSynchronization(t *testing.T) { | |||
| 					NumberOfInstances:                    int32ToPointer(1), | ||||
| 				}, | ||||
| 			}, | ||||
| 		}, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger) | ||||
| 		}, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger, eventRecorder) | ||||
| 
 | ||||
| 	cluster.Statefulset = &appsv1.StatefulSet{ | ||||
| 		ObjectMeta: metav1.ObjectMeta{ | ||||
|  |  | |||
|  | @ -7,24 +7,24 @@ import ( | |||
| 	"sync" | ||||
| 
 | ||||
| 	"github.com/sirupsen/logrus" | ||||
| 	v1 "k8s.io/api/core/v1" | ||||
| 	rbacv1 "k8s.io/api/rbac/v1" | ||||
| 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||||
| 	"k8s.io/apimachinery/pkg/types" | ||||
| 	"k8s.io/client-go/kubernetes/scheme" | ||||
| 	"k8s.io/client-go/tools/cache" | ||||
| 
 | ||||
| 	acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1" | ||||
| 	"github.com/zalando/postgres-operator/pkg/apiserver" | ||||
| 	"github.com/zalando/postgres-operator/pkg/cluster" | ||||
| 	acidv1informer "github.com/zalando/postgres-operator/pkg/generated/informers/externalversions/acid.zalan.do/v1" | ||||
| 	"github.com/zalando/postgres-operator/pkg/spec" | ||||
| 	"github.com/zalando/postgres-operator/pkg/util" | ||||
| 	"github.com/zalando/postgres-operator/pkg/util/config" | ||||
| 	"github.com/zalando/postgres-operator/pkg/util/constants" | ||||
| 	"github.com/zalando/postgres-operator/pkg/util/k8sutil" | ||||
| 	"github.com/zalando/postgres-operator/pkg/util/ringlog" | ||||
| 
 | ||||
| 	acidv1informer "github.com/zalando/postgres-operator/pkg/generated/informers/externalversions/acid.zalan.do/v1" | ||||
| 	v1 "k8s.io/api/core/v1" | ||||
| 	rbacv1 "k8s.io/api/rbac/v1" | ||||
| 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||||
| 	"k8s.io/apimachinery/pkg/types" | ||||
| 	"k8s.io/client-go/kubernetes/scheme" | ||||
| 	typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1" | ||||
| 	"k8s.io/client-go/tools/cache" | ||||
| 	"k8s.io/client-go/tools/record" | ||||
| ) | ||||
| 
 | ||||
| // Controller represents operator controller
 | ||||
|  | @ -36,6 +36,9 @@ type Controller struct { | |||
| 	KubeClient k8sutil.KubernetesClient | ||||
| 	apiserver  *apiserver.Server | ||||
| 
 | ||||
| 	eventRecorder    record.EventRecorder | ||||
| 	eventBroadcaster record.EventBroadcaster | ||||
| 
 | ||||
| 	stopCh chan struct{} | ||||
| 
 | ||||
| 	controllerID     string | ||||
|  | @ -67,10 +70,21 @@ type Controller struct { | |||
| func NewController(controllerConfig *spec.ControllerConfig, controllerId string) *Controller { | ||||
| 	logger := logrus.New() | ||||
| 
 | ||||
| 	var myComponentName = "postgres-operator" | ||||
| 	if controllerId != "" { | ||||
| 		myComponentName += "/" + controllerId | ||||
| 	} | ||||
| 
 | ||||
| 	eventBroadcaster := record.NewBroadcaster() | ||||
| 	eventBroadcaster.StartLogging(logger.Debugf) | ||||
| 	recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: myComponentName}) | ||||
| 
 | ||||
| 	c := &Controller{ | ||||
| 		config:           *controllerConfig, | ||||
| 		opConfig:         &config.Config{}, | ||||
| 		logger:           logger.WithField("pkg", "controller"), | ||||
| 		eventRecorder:    recorder, | ||||
| 		eventBroadcaster: eventBroadcaster, | ||||
| 		controllerID:     controllerId, | ||||
| 		curWorkerCluster: sync.Map{}, | ||||
| 		clusterWorkers:   make(map[spec.NamespacedName]uint32), | ||||
|  | @ -93,6 +107,11 @@ func (c *Controller) initClients() { | |||
| 	if err != nil { | ||||
| 		c.logger.Fatalf("could not create kubernetes clients: %v", err) | ||||
| 	} | ||||
| 	c.eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: c.KubeClient.EventsGetter.Events("")}) | ||||
| 	if err != nil { | ||||
| 		c.logger.Fatalf("could not setup kubernetes event sink: %v", err) | ||||
| 	} | ||||
| 
 | ||||
| } | ||||
| 
 | ||||
| func (c *Controller) initOperatorConfig() { | ||||
|  |  | |||
|  | @ -11,6 +11,7 @@ import ( | |||
| 
 | ||||
| 	"github.com/sirupsen/logrus" | ||||
| 
 | ||||
| 	v1 "k8s.io/api/core/v1" | ||||
| 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||||
| 	"k8s.io/apimachinery/pkg/types" | ||||
| 	"k8s.io/client-go/tools/cache" | ||||
|  | @ -157,7 +158,7 @@ func (c *Controller) acquireInitialListOfClusters() error { | |||
| } | ||||
| 
 | ||||
| func (c *Controller) addCluster(lg *logrus.Entry, clusterName spec.NamespacedName, pgSpec *acidv1.Postgresql) *cluster.Cluster { | ||||
| 	cl := cluster.New(c.makeClusterConfig(), c.KubeClient, *pgSpec, lg) | ||||
| 	cl := cluster.New(c.makeClusterConfig(), c.KubeClient, *pgSpec, lg, c.eventRecorder) | ||||
| 	cl.Run(c.stopCh) | ||||
| 	teamName := strings.ToLower(cl.Spec.TeamID) | ||||
| 
 | ||||
|  | @ -236,6 +237,7 @@ func (c *Controller) processEvent(event ClusterEvent) { | |||
| 		if err := cl.Create(); err != nil { | ||||
| 			cl.Error = fmt.Sprintf("could not create cluster: %v", err) | ||||
| 			lg.Error(cl.Error) | ||||
| 			c.eventRecorder.Eventf(cl.GetReference(), v1.EventTypeWarning, "Create", "%v", cl.Error) | ||||
| 
 | ||||
| 			return | ||||
| 		} | ||||
|  | @ -274,6 +276,8 @@ func (c *Controller) processEvent(event ClusterEvent) { | |||
| 
 | ||||
| 		c.curWorkerCluster.Store(event.WorkerID, cl) | ||||
| 		cl.Delete() | ||||
| 		// Fixme - no error handling for delete ?
 | ||||
| 		// c.eventRecorder.Eventf(cl.GetReference, v1.EventTypeWarning, "Delete", "%v", cl.Error)
 | ||||
| 
 | ||||
| 		func() { | ||||
| 			defer c.clustersMu.Unlock() | ||||
|  | @ -304,6 +308,7 @@ func (c *Controller) processEvent(event ClusterEvent) { | |||
| 		c.curWorkerCluster.Store(event.WorkerID, cl) | ||||
| 		if err := cl.Sync(event.NewSpec); err != nil { | ||||
| 			cl.Error = fmt.Sprintf("could not sync cluster: %v", err) | ||||
| 			c.eventRecorder.Eventf(cl.GetReference(), v1.EventTypeWarning, "Sync", "%v", cl.Error) | ||||
| 			lg.Error(cl.Error) | ||||
| 			return | ||||
| 		} | ||||
|  |  | |||
|  | @ -45,6 +45,7 @@ type KubernetesClient struct { | |||
| 	corev1.NodesGetter | ||||
| 	corev1.NamespacesGetter | ||||
| 	corev1.ServiceAccountsGetter | ||||
| 	corev1.EventsGetter | ||||
| 	appsv1.StatefulSetsGetter | ||||
| 	appsv1.DeploymentsGetter | ||||
| 	rbacv1.RoleBindingsGetter | ||||
|  | @ -142,6 +143,7 @@ func NewFromConfig(cfg *rest.Config) (KubernetesClient, error) { | |||
| 	kubeClient.RESTClient = client.CoreV1().RESTClient() | ||||
| 	kubeClient.RoleBindingsGetter = client.RbacV1() | ||||
| 	kubeClient.CronJobsGetter = client.BatchV1beta1() | ||||
| 	kubeClient.EventsGetter = client.CoreV1() | ||||
| 
 | ||||
| 	apiextClient, err := apiextclient.NewForConfig(cfg) | ||||
| 	if err != nil { | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue