Emit K8S events to the postgresql CR as feedback to the requestor / user (#896)

* Add EventsGetter to KubeClient to enable to sending K8S events

* Add eventRecorder to the controller, initialize it and hand it down to cluster via its constructor to enable it to emit events this way

* Add first set of events which then go to the postgresql custom resource the user interacts with to provide some feedback

* Add right to "create" events to operator cluster role

* Adapt cluster tests to new function sigurature with eventRecord (via NewFakeRecorder)

* Get a proper reference before sending events to a resource

Co-authored-by: Christian Rohmann <christian.rohmann@inovex.de>
This commit is contained in:
Christian Rohmann 2020-04-27 08:22:07 +02:00 committed by GitHub
parent 3c91bdeffa
commit 21b9b6fcbe
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 107 additions and 29 deletions

View File

@ -42,6 +42,13 @@ rules:
- configmaps - configmaps
verbs: verbs:
- get - get
# to send events to the CRs
- apiGroups:
- ""
resources:
- events
verbs:
- create
# to manage endpoints which are also used by Patroni # to manage endpoints which are also used by Patroni
- apiGroups: - apiGroups:
- "" - ""

View File

@ -43,6 +43,13 @@ rules:
- configmaps - configmaps
verbs: verbs:
- get - get
# to send events to the CRs
- apiGroups:
- ""
resources:
- events
verbs:
- create
# to manage endpoints which are also used by Patroni # to manage endpoints which are also used by Patroni
- apiGroups: - apiGroups:
- "" - ""

View File

@ -21,8 +21,11 @@ import (
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/rest" "k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/tools/reference"
acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1" acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1"
"github.com/zalando/postgres-operator/pkg/generated/clientset/versioned/scheme"
"github.com/zalando/postgres-operator/pkg/spec" "github.com/zalando/postgres-operator/pkg/spec"
"github.com/zalando/postgres-operator/pkg/util" "github.com/zalando/postgres-operator/pkg/util"
"github.com/zalando/postgres-operator/pkg/util/config" "github.com/zalando/postgres-operator/pkg/util/config"
@ -81,6 +84,7 @@ type Cluster struct {
acidv1.Postgresql acidv1.Postgresql
Config Config
logger *logrus.Entry logger *logrus.Entry
eventRecorder record.EventRecorder
patroni patroni.Interface patroni patroni.Interface
pgUsers map[string]spec.PgUser pgUsers map[string]spec.PgUser
systemUsers map[string]spec.PgUser systemUsers map[string]spec.PgUser
@ -109,7 +113,7 @@ type compareStatefulsetResult struct {
} }
// New creates a new cluster. This function should be called from a controller. // New creates a new cluster. This function should be called from a controller.
func New(cfg Config, kubeClient k8sutil.KubernetesClient, pgSpec acidv1.Postgresql, logger *logrus.Entry) *Cluster { func New(cfg Config, kubeClient k8sutil.KubernetesClient, pgSpec acidv1.Postgresql, logger *logrus.Entry, eventRecorder record.EventRecorder) *Cluster {
deletePropagationPolicy := metav1.DeletePropagationOrphan deletePropagationPolicy := metav1.DeletePropagationOrphan
podEventsQueue := cache.NewFIFO(func(obj interface{}) (string, error) { podEventsQueue := cache.NewFIFO(func(obj interface{}) (string, error) {
@ -140,7 +144,7 @@ func New(cfg Config, kubeClient k8sutil.KubernetesClient, pgSpec acidv1.Postgres
cluster.teamsAPIClient = teams.NewTeamsAPI(cfg.OpConfig.TeamsAPIUrl, logger) cluster.teamsAPIClient = teams.NewTeamsAPI(cfg.OpConfig.TeamsAPIUrl, logger)
cluster.oauthTokenGetter = newSecretOauthTokenGetter(&kubeClient, cfg.OpConfig.OAuthTokenSecretName) cluster.oauthTokenGetter = newSecretOauthTokenGetter(&kubeClient, cfg.OpConfig.OAuthTokenSecretName)
cluster.patroni = patroni.New(cluster.logger) cluster.patroni = patroni.New(cluster.logger)
cluster.eventRecorder = eventRecorder
return cluster return cluster
} }
@ -166,6 +170,16 @@ func (c *Cluster) setProcessName(procName string, args ...interface{}) {
} }
} }
// GetReference of Postgres CR object
// i.e. required to emit events to this resource
func (c *Cluster) GetReference() *v1.ObjectReference {
ref, err := reference.GetReference(scheme.Scheme, &c.Postgresql)
if err != nil {
c.logger.Errorf("could not get reference for Postgresql CR %v/%v: %v", c.Postgresql.Namespace, c.Postgresql.Name, err)
}
return ref
}
// SetStatus of Postgres cluster // SetStatus of Postgres cluster
// TODO: eventually switch to updateStatus() for kubernetes 1.11 and above // TODO: eventually switch to updateStatus() for kubernetes 1.11 and above
func (c *Cluster) setStatus(status string) { func (c *Cluster) setStatus(status string) {
@ -245,6 +259,7 @@ func (c *Cluster) Create() error {
}() }()
c.setStatus(acidv1.ClusterStatusCreating) c.setStatus(acidv1.ClusterStatusCreating)
c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Create", "Started creation of new cluster resources")
if err = c.enforceMinResourceLimits(&c.Spec); err != nil { if err = c.enforceMinResourceLimits(&c.Spec); err != nil {
return fmt.Errorf("could not enforce minimum resource limits: %v", err) return fmt.Errorf("could not enforce minimum resource limits: %v", err)
@ -263,6 +278,7 @@ func (c *Cluster) Create() error {
return fmt.Errorf("could not create %s endpoint: %v", role, err) return fmt.Errorf("could not create %s endpoint: %v", role, err)
} }
c.logger.Infof("endpoint %q has been successfully created", util.NameFromMeta(ep.ObjectMeta)) c.logger.Infof("endpoint %q has been successfully created", util.NameFromMeta(ep.ObjectMeta))
c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeNormal, "Endpoints", "Endpoint %q has been successfully created", util.NameFromMeta(ep.ObjectMeta))
} }
if c.Services[role] != nil { if c.Services[role] != nil {
@ -273,6 +289,7 @@ func (c *Cluster) Create() error {
return fmt.Errorf("could not create %s service: %v", role, err) return fmt.Errorf("could not create %s service: %v", role, err)
} }
c.logger.Infof("%s service %q has been successfully created", role, util.NameFromMeta(service.ObjectMeta)) c.logger.Infof("%s service %q has been successfully created", role, util.NameFromMeta(service.ObjectMeta))
c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeNormal, "Services", "The service %q for role %s has been successfully created", util.NameFromMeta(service.ObjectMeta), role)
} }
if err = c.initUsers(); err != nil { if err = c.initUsers(); err != nil {
@ -284,6 +301,7 @@ func (c *Cluster) Create() error {
return fmt.Errorf("could not create secrets: %v", err) return fmt.Errorf("could not create secrets: %v", err)
} }
c.logger.Infof("secrets have been successfully created") c.logger.Infof("secrets have been successfully created")
c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Secrets", "The secrets have been successfully created")
if c.PodDisruptionBudget != nil { if c.PodDisruptionBudget != nil {
return fmt.Errorf("pod disruption budget already exists in the cluster") return fmt.Errorf("pod disruption budget already exists in the cluster")
@ -302,6 +320,7 @@ func (c *Cluster) Create() error {
return fmt.Errorf("could not create statefulset: %v", err) return fmt.Errorf("could not create statefulset: %v", err)
} }
c.logger.Infof("statefulset %q has been successfully created", util.NameFromMeta(ss.ObjectMeta)) c.logger.Infof("statefulset %q has been successfully created", util.NameFromMeta(ss.ObjectMeta))
c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeNormal, "StatefulSet", "Statefulset %q has been successfully created", util.NameFromMeta(ss.ObjectMeta))
c.logger.Info("waiting for the cluster being ready") c.logger.Info("waiting for the cluster being ready")
@ -310,6 +329,7 @@ func (c *Cluster) Create() error {
return err return err
} }
c.logger.Infof("pods are ready") c.logger.Infof("pods are ready")
c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "StatefulSet", "Pods are ready")
// create database objects unless we are running without pods or disabled // create database objects unless we are running without pods or disabled
// that feature explicitly // that feature explicitly
@ -555,6 +575,7 @@ func (c *Cluster) enforceMinResourceLimits(spec *acidv1.PostgresSpec) error {
} }
if isSmaller { if isSmaller {
c.logger.Warningf("defined CPU limit %s is below required minimum %s and will be set to it", cpuLimit, minCPULimit) c.logger.Warningf("defined CPU limit %s is below required minimum %s and will be set to it", cpuLimit, minCPULimit)
c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeWarning, "ResourceLimits", "defined CPU limit %s is below required minimum %s and will be set to it", cpuLimit, minCPULimit)
spec.Resources.ResourceLimits.CPU = minCPULimit spec.Resources.ResourceLimits.CPU = minCPULimit
} }
} }
@ -567,6 +588,7 @@ func (c *Cluster) enforceMinResourceLimits(spec *acidv1.PostgresSpec) error {
} }
if isSmaller { if isSmaller {
c.logger.Warningf("defined memory limit %s is below required minimum %s and will be set to it", memoryLimit, minMemoryLimit) c.logger.Warningf("defined memory limit %s is below required minimum %s and will be set to it", memoryLimit, minMemoryLimit)
c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeWarning, "ResourceLimits", "defined memory limit %s is below required minimum %s and will be set to it", memoryLimit, minMemoryLimit)
spec.Resources.ResourceLimits.Memory = minMemoryLimit spec.Resources.ResourceLimits.Memory = minMemoryLimit
} }
} }
@ -598,6 +620,8 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error {
if oldSpec.Spec.PostgresqlParam.PgVersion != newSpec.Spec.PostgresqlParam.PgVersion { // PG versions comparison if oldSpec.Spec.PostgresqlParam.PgVersion != newSpec.Spec.PostgresqlParam.PgVersion { // PG versions comparison
c.logger.Warningf("postgresql version change(%q -> %q) has no effect", c.logger.Warningf("postgresql version change(%q -> %q) has no effect",
oldSpec.Spec.PostgresqlParam.PgVersion, newSpec.Spec.PostgresqlParam.PgVersion) oldSpec.Spec.PostgresqlParam.PgVersion, newSpec.Spec.PostgresqlParam.PgVersion)
c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeWarning, "PostgreSQL", "postgresql version change(%q -> %q) has no effect",
oldSpec.Spec.PostgresqlParam.PgVersion, newSpec.Spec.PostgresqlParam.PgVersion)
//we need that hack to generate statefulset with the old version //we need that hack to generate statefulset with the old version
newSpec.Spec.PostgresqlParam.PgVersion = oldSpec.Spec.PostgresqlParam.PgVersion newSpec.Spec.PostgresqlParam.PgVersion = oldSpec.Spec.PostgresqlParam.PgVersion
} }
@ -757,6 +781,7 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error {
func (c *Cluster) Delete() { func (c *Cluster) Delete() {
c.mu.Lock() c.mu.Lock()
defer c.mu.Unlock() defer c.mu.Unlock()
c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Delete", "Started deletion of new cluster resources")
// delete the backup job before the stateful set of the cluster to prevent connections to non-existing pods // delete the backup job before the stateful set of the cluster to prevent connections to non-existing pods
// deleting the cron job also removes pods and batch jobs it created // deleting the cron job also removes pods and batch jobs it created
@ -1095,6 +1120,7 @@ func (c *Cluster) Switchover(curMaster *v1.Pod, candidate spec.NamespacedName) e
var err error var err error
c.logger.Debugf("switching over from %q to %q", curMaster.Name, candidate) c.logger.Debugf("switching over from %q to %q", curMaster.Name, candidate)
c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeNormal, "Switchover", "Switching over from %q to %q", curMaster.Name, candidate)
var wg sync.WaitGroup var wg sync.WaitGroup
@ -1121,6 +1147,7 @@ func (c *Cluster) Switchover(curMaster *v1.Pod, candidate spec.NamespacedName) e
if err = c.patroni.Switchover(curMaster, candidate.Name); err == nil { if err = c.patroni.Switchover(curMaster, candidate.Name); err == nil {
c.logger.Debugf("successfully switched over from %q to %q", curMaster.Name, candidate) c.logger.Debugf("successfully switched over from %q to %q", curMaster.Name, candidate)
c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeNormal, "Switchover", "Successfully switched over from %q to %q", curMaster.Name, candidate)
if err = <-podLabelErr; err != nil { if err = <-podLabelErr; err != nil {
err = fmt.Errorf("could not get master pod label: %v", err) err = fmt.Errorf("could not get master pod label: %v", err)
} }
@ -1136,6 +1163,7 @@ func (c *Cluster) Switchover(curMaster *v1.Pod, candidate spec.NamespacedName) e
// close the label waiting channel no sooner than the waiting goroutine terminates. // close the label waiting channel no sooner than the waiting goroutine terminates.
close(podLabelErr) close(podLabelErr)
c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeNormal, "Switchover", "Switchover from %q to %q FAILED: %v", curMaster.Name, candidate, err)
return err return err
} }

View File

@ -13,6 +13,7 @@ import (
"github.com/zalando/postgres-operator/pkg/util/k8sutil" "github.com/zalando/postgres-operator/pkg/util/k8sutil"
"github.com/zalando/postgres-operator/pkg/util/teams" "github.com/zalando/postgres-operator/pkg/util/teams"
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
"k8s.io/client-go/tools/record"
) )
const ( const (
@ -21,6 +22,8 @@ const (
) )
var logger = logrus.New().WithField("test", "cluster") var logger = logrus.New().WithField("test", "cluster")
var eventRecorder = record.NewFakeRecorder(1)
var cl = New( var cl = New(
Config{ Config{
OpConfig: config.Config{ OpConfig: config.Config{
@ -34,6 +37,7 @@ var cl = New(
k8sutil.NewMockKubernetesClient(), k8sutil.NewMockKubernetesClient(),
acidv1.Postgresql{}, acidv1.Postgresql{},
logger, logger,
eventRecorder,
) )
func TestInitRobotUsers(t *testing.T) { func TestInitRobotUsers(t *testing.T) {

View File

@ -37,7 +37,7 @@ func TestGenerateSpiloJSONConfiguration(t *testing.T) {
ReplicationUsername: replicationUserName, ReplicationUsername: replicationUserName,
}, },
}, },
}, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger) }, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger, eventRecorder)
testName := "TestGenerateSpiloConfig" testName := "TestGenerateSpiloConfig"
tests := []struct { tests := []struct {
@ -102,7 +102,7 @@ func TestCreateLoadBalancerLogic(t *testing.T) {
ReplicationUsername: replicationUserName, ReplicationUsername: replicationUserName,
}, },
}, },
}, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger) }, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger, eventRecorder)
testName := "TestCreateLoadBalancerLogic" testName := "TestCreateLoadBalancerLogic"
tests := []struct { tests := []struct {
@ -164,7 +164,8 @@ func TestGeneratePodDisruptionBudget(t *testing.T) {
acidv1.Postgresql{ acidv1.Postgresql{
ObjectMeta: metav1.ObjectMeta{Name: "myapp-database", Namespace: "myapp"}, ObjectMeta: metav1.ObjectMeta{Name: "myapp-database", Namespace: "myapp"},
Spec: acidv1.PostgresSpec{TeamID: "myapp", NumberOfInstances: 3}}, Spec: acidv1.PostgresSpec{TeamID: "myapp", NumberOfInstances: 3}},
logger), logger,
eventRecorder),
policyv1beta1.PodDisruptionBudget{ policyv1beta1.PodDisruptionBudget{
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
Name: "postgres-myapp-database-pdb", Name: "postgres-myapp-database-pdb",
@ -187,7 +188,8 @@ func TestGeneratePodDisruptionBudget(t *testing.T) {
acidv1.Postgresql{ acidv1.Postgresql{
ObjectMeta: metav1.ObjectMeta{Name: "myapp-database", Namespace: "myapp"}, ObjectMeta: metav1.ObjectMeta{Name: "myapp-database", Namespace: "myapp"},
Spec: acidv1.PostgresSpec{TeamID: "myapp", NumberOfInstances: 0}}, Spec: acidv1.PostgresSpec{TeamID: "myapp", NumberOfInstances: 0}},
logger), logger,
eventRecorder),
policyv1beta1.PodDisruptionBudget{ policyv1beta1.PodDisruptionBudget{
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
Name: "postgres-myapp-database-pdb", Name: "postgres-myapp-database-pdb",
@ -210,7 +212,8 @@ func TestGeneratePodDisruptionBudget(t *testing.T) {
acidv1.Postgresql{ acidv1.Postgresql{
ObjectMeta: metav1.ObjectMeta{Name: "myapp-database", Namespace: "myapp"}, ObjectMeta: metav1.ObjectMeta{Name: "myapp-database", Namespace: "myapp"},
Spec: acidv1.PostgresSpec{TeamID: "myapp", NumberOfInstances: 3}}, Spec: acidv1.PostgresSpec{TeamID: "myapp", NumberOfInstances: 3}},
logger), logger,
eventRecorder),
policyv1beta1.PodDisruptionBudget{ policyv1beta1.PodDisruptionBudget{
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
Name: "postgres-myapp-database-pdb", Name: "postgres-myapp-database-pdb",
@ -233,7 +236,8 @@ func TestGeneratePodDisruptionBudget(t *testing.T) {
acidv1.Postgresql{ acidv1.Postgresql{
ObjectMeta: metav1.ObjectMeta{Name: "myapp-database", Namespace: "myapp"}, ObjectMeta: metav1.ObjectMeta{Name: "myapp-database", Namespace: "myapp"},
Spec: acidv1.PostgresSpec{TeamID: "myapp", NumberOfInstances: 3}}, Spec: acidv1.PostgresSpec{TeamID: "myapp", NumberOfInstances: 3}},
logger), logger,
eventRecorder),
policyv1beta1.PodDisruptionBudget{ policyv1beta1.PodDisruptionBudget{
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
Name: "postgres-myapp-database-databass-budget", Name: "postgres-myapp-database-databass-budget",
@ -368,7 +372,7 @@ func TestCloneEnv(t *testing.T) {
ReplicationUsername: replicationUserName, ReplicationUsername: replicationUserName,
}, },
}, },
}, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger) }, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger, eventRecorder)
for _, tt := range tests { for _, tt := range tests {
envs := cluster.generateCloneEnvironment(tt.cloneOpts) envs := cluster.generateCloneEnvironment(tt.cloneOpts)
@ -502,7 +506,7 @@ func TestGetPgVersion(t *testing.T) {
ReplicationUsername: replicationUserName, ReplicationUsername: replicationUserName,
}, },
}, },
}, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger) }, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger, eventRecorder)
for _, tt := range tests { for _, tt := range tests {
pgVersion, err := cluster.getNewPgVersion(tt.pgContainer, tt.newPgVersion) pgVersion, err := cluster.getNewPgVersion(tt.pgContainer, tt.newPgVersion)
@ -678,7 +682,7 @@ func TestConnectionPoolerPodSpec(t *testing.T) {
ConnectionPoolerDefaultMemoryLimit: "100Mi", ConnectionPoolerDefaultMemoryLimit: "100Mi",
}, },
}, },
}, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger) }, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger, eventRecorder)
var clusterNoDefaultRes = New( var clusterNoDefaultRes = New(
Config{ Config{
@ -690,7 +694,7 @@ func TestConnectionPoolerPodSpec(t *testing.T) {
}, },
ConnectionPooler: config.ConnectionPooler{}, ConnectionPooler: config.ConnectionPooler{},
}, },
}, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger) }, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger, eventRecorder)
noCheck := func(cluster *Cluster, podSpec *v1.PodTemplateSpec) error { return nil } noCheck := func(cluster *Cluster, podSpec *v1.PodTemplateSpec) error { return nil }
@ -803,7 +807,7 @@ func TestConnectionPoolerDeploymentSpec(t *testing.T) {
ConnectionPoolerDefaultMemoryLimit: "100Mi", ConnectionPoolerDefaultMemoryLimit: "100Mi",
}, },
}, },
}, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger) }, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger, eventRecorder)
cluster.Statefulset = &appsv1.StatefulSet{ cluster.Statefulset = &appsv1.StatefulSet{
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
Name: "test-sts", Name: "test-sts",
@ -904,7 +908,7 @@ func TestConnectionPoolerServiceSpec(t *testing.T) {
ConnectionPoolerDefaultMemoryLimit: "100Mi", ConnectionPoolerDefaultMemoryLimit: "100Mi",
}, },
}, },
}, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger) }, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger, eventRecorder)
cluster.Statefulset = &appsv1.StatefulSet{ cluster.Statefulset = &appsv1.StatefulSet{
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
Name: "test-sts", Name: "test-sts",
@ -990,7 +994,7 @@ func TestTLS(t *testing.T) {
SpiloFSGroup: &spiloFSGroup, SpiloFSGroup: &spiloFSGroup,
}, },
}, },
}, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger) }, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger, eventRecorder)
spec = makeSpec(acidv1.TLSDescription{SecretName: "my-secret", CAFile: "ca.crt"}) spec = makeSpec(acidv1.TLSDescription{SecretName: "my-secret", CAFile: "ca.crt"})
s, err := cluster.generateStatefulSet(&spec) s, err := cluster.generateStatefulSet(&spec)
if err != nil { if err != nil {
@ -1112,7 +1116,7 @@ func TestAdditionalVolume(t *testing.T) {
ReplicationUsername: replicationUserName, ReplicationUsername: replicationUserName,
}, },
}, },
}, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger) }, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger, eventRecorder)
for _, tt := range tests { for _, tt := range tests {
// Test with additional volume mounted in all containers // Test with additional volume mounted in all containers

View File

@ -36,7 +36,7 @@ func TestConnectionPoolerCreationAndDeletion(t *testing.T) {
ConnectionPoolerDefaultMemoryLimit: "100Mi", ConnectionPoolerDefaultMemoryLimit: "100Mi",
}, },
}, },
}, k8sutil.NewMockKubernetesClient(), acidv1.Postgresql{}, logger) }, k8sutil.NewMockKubernetesClient(), acidv1.Postgresql{}, logger, eventRecorder)
cluster.Statefulset = &appsv1.StatefulSet{ cluster.Statefulset = &appsv1.StatefulSet{
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
@ -85,7 +85,7 @@ func TestNeedConnectionPooler(t *testing.T) {
ConnectionPoolerDefaultMemoryLimit: "100Mi", ConnectionPoolerDefaultMemoryLimit: "100Mi",
}, },
}, },
}, k8sutil.NewMockKubernetesClient(), acidv1.Postgresql{}, logger) }, k8sutil.NewMockKubernetesClient(), acidv1.Postgresql{}, logger, eventRecorder)
cluster.Spec = acidv1.PostgresSpec{ cluster.Spec = acidv1.PostgresSpec{
ConnectionPooler: &acidv1.ConnectionPooler{}, ConnectionPooler: &acidv1.ConnectionPooler{},

View File

@ -343,10 +343,12 @@ func (c *Cluster) syncStatefulSet() error {
// statefulset or those that got their configuration from the outdated statefulset) // statefulset or those that got their configuration from the outdated statefulset)
if podsRollingUpdateRequired { if podsRollingUpdateRequired {
c.logger.Debugln("performing rolling update") c.logger.Debugln("performing rolling update")
c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Update", "Performing rolling update")
if err := c.recreatePods(); err != nil { if err := c.recreatePods(); err != nil {
return fmt.Errorf("could not recreate pods: %v", err) return fmt.Errorf("could not recreate pods: %v", err)
} }
c.logger.Infof("pods have been recreated") c.logger.Infof("pods have been recreated")
c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Update", "Rolling update done - pods have been recreated")
if err := c.applyRollingUpdateFlagforStatefulSet(false); err != nil { if err := c.applyRollingUpdateFlagforStatefulSet(false); err != nil {
c.logger.Warningf("could not clear rolling update for the statefulset: %v", err) c.logger.Warningf("could not clear rolling update for the statefulset: %v", err)
} }

View File

@ -79,7 +79,7 @@ func TestConnectionPoolerSynchronization(t *testing.T) {
NumberOfInstances: int32ToPointer(1), NumberOfInstances: int32ToPointer(1),
}, },
}, },
}, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger) }, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger, eventRecorder)
cluster.Statefulset = &appsv1.StatefulSet{ cluster.Statefulset = &appsv1.StatefulSet{
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{

View File

@ -7,24 +7,24 @@ import (
"sync" "sync"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
v1 "k8s.io/api/core/v1"
rbacv1 "k8s.io/api/rbac/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/cache"
acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1" acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1"
"github.com/zalando/postgres-operator/pkg/apiserver" "github.com/zalando/postgres-operator/pkg/apiserver"
"github.com/zalando/postgres-operator/pkg/cluster" "github.com/zalando/postgres-operator/pkg/cluster"
acidv1informer "github.com/zalando/postgres-operator/pkg/generated/informers/externalversions/acid.zalan.do/v1"
"github.com/zalando/postgres-operator/pkg/spec" "github.com/zalando/postgres-operator/pkg/spec"
"github.com/zalando/postgres-operator/pkg/util" "github.com/zalando/postgres-operator/pkg/util"
"github.com/zalando/postgres-operator/pkg/util/config" "github.com/zalando/postgres-operator/pkg/util/config"
"github.com/zalando/postgres-operator/pkg/util/constants" "github.com/zalando/postgres-operator/pkg/util/constants"
"github.com/zalando/postgres-operator/pkg/util/k8sutil" "github.com/zalando/postgres-operator/pkg/util/k8sutil"
"github.com/zalando/postgres-operator/pkg/util/ringlog" "github.com/zalando/postgres-operator/pkg/util/ringlog"
v1 "k8s.io/api/core/v1"
acidv1informer "github.com/zalando/postgres-operator/pkg/generated/informers/externalversions/acid.zalan.do/v1" rbacv1 "k8s.io/api/rbac/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes/scheme"
typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
) )
// Controller represents operator controller // Controller represents operator controller
@ -36,6 +36,9 @@ type Controller struct {
KubeClient k8sutil.KubernetesClient KubeClient k8sutil.KubernetesClient
apiserver *apiserver.Server apiserver *apiserver.Server
eventRecorder record.EventRecorder
eventBroadcaster record.EventBroadcaster
stopCh chan struct{} stopCh chan struct{}
controllerID string controllerID string
@ -67,10 +70,21 @@ type Controller struct {
func NewController(controllerConfig *spec.ControllerConfig, controllerId string) *Controller { func NewController(controllerConfig *spec.ControllerConfig, controllerId string) *Controller {
logger := logrus.New() logger := logrus.New()
var myComponentName = "postgres-operator"
if controllerId != "" {
myComponentName += "/" + controllerId
}
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(logger.Debugf)
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: myComponentName})
c := &Controller{ c := &Controller{
config: *controllerConfig, config: *controllerConfig,
opConfig: &config.Config{}, opConfig: &config.Config{},
logger: logger.WithField("pkg", "controller"), logger: logger.WithField("pkg", "controller"),
eventRecorder: recorder,
eventBroadcaster: eventBroadcaster,
controllerID: controllerId, controllerID: controllerId,
curWorkerCluster: sync.Map{}, curWorkerCluster: sync.Map{},
clusterWorkers: make(map[spec.NamespacedName]uint32), clusterWorkers: make(map[spec.NamespacedName]uint32),
@ -93,6 +107,11 @@ func (c *Controller) initClients() {
if err != nil { if err != nil {
c.logger.Fatalf("could not create kubernetes clients: %v", err) c.logger.Fatalf("could not create kubernetes clients: %v", err)
} }
c.eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: c.KubeClient.EventsGetter.Events("")})
if err != nil {
c.logger.Fatalf("could not setup kubernetes event sink: %v", err)
}
} }
func (c *Controller) initOperatorConfig() { func (c *Controller) initOperatorConfig() {

View File

@ -11,6 +11,7 @@ import (
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
@ -157,7 +158,7 @@ func (c *Controller) acquireInitialListOfClusters() error {
} }
func (c *Controller) addCluster(lg *logrus.Entry, clusterName spec.NamespacedName, pgSpec *acidv1.Postgresql) *cluster.Cluster { func (c *Controller) addCluster(lg *logrus.Entry, clusterName spec.NamespacedName, pgSpec *acidv1.Postgresql) *cluster.Cluster {
cl := cluster.New(c.makeClusterConfig(), c.KubeClient, *pgSpec, lg) cl := cluster.New(c.makeClusterConfig(), c.KubeClient, *pgSpec, lg, c.eventRecorder)
cl.Run(c.stopCh) cl.Run(c.stopCh)
teamName := strings.ToLower(cl.Spec.TeamID) teamName := strings.ToLower(cl.Spec.TeamID)
@ -236,6 +237,7 @@ func (c *Controller) processEvent(event ClusterEvent) {
if err := cl.Create(); err != nil { if err := cl.Create(); err != nil {
cl.Error = fmt.Sprintf("could not create cluster: %v", err) cl.Error = fmt.Sprintf("could not create cluster: %v", err)
lg.Error(cl.Error) lg.Error(cl.Error)
c.eventRecorder.Eventf(cl.GetReference(), v1.EventTypeWarning, "Create", "%v", cl.Error)
return return
} }
@ -274,6 +276,8 @@ func (c *Controller) processEvent(event ClusterEvent) {
c.curWorkerCluster.Store(event.WorkerID, cl) c.curWorkerCluster.Store(event.WorkerID, cl)
cl.Delete() cl.Delete()
// Fixme - no error handling for delete ?
// c.eventRecorder.Eventf(cl.GetReference, v1.EventTypeWarning, "Delete", "%v", cl.Error)
func() { func() {
defer c.clustersMu.Unlock() defer c.clustersMu.Unlock()
@ -304,6 +308,7 @@ func (c *Controller) processEvent(event ClusterEvent) {
c.curWorkerCluster.Store(event.WorkerID, cl) c.curWorkerCluster.Store(event.WorkerID, cl)
if err := cl.Sync(event.NewSpec); err != nil { if err := cl.Sync(event.NewSpec); err != nil {
cl.Error = fmt.Sprintf("could not sync cluster: %v", err) cl.Error = fmt.Sprintf("could not sync cluster: %v", err)
c.eventRecorder.Eventf(cl.GetReference(), v1.EventTypeWarning, "Sync", "%v", cl.Error)
lg.Error(cl.Error) lg.Error(cl.Error)
return return
} }

View File

@ -45,6 +45,7 @@ type KubernetesClient struct {
corev1.NodesGetter corev1.NodesGetter
corev1.NamespacesGetter corev1.NamespacesGetter
corev1.ServiceAccountsGetter corev1.ServiceAccountsGetter
corev1.EventsGetter
appsv1.StatefulSetsGetter appsv1.StatefulSetsGetter
appsv1.DeploymentsGetter appsv1.DeploymentsGetter
rbacv1.RoleBindingsGetter rbacv1.RoleBindingsGetter
@ -142,6 +143,7 @@ func NewFromConfig(cfg *rest.Config) (KubernetesClient, error) {
kubeClient.RESTClient = client.CoreV1().RESTClient() kubeClient.RESTClient = client.CoreV1().RESTClient()
kubeClient.RoleBindingsGetter = client.RbacV1() kubeClient.RoleBindingsGetter = client.RbacV1()
kubeClient.CronJobsGetter = client.BatchV1beta1() kubeClient.CronJobsGetter = client.BatchV1beta1()
kubeClient.EventsGetter = client.CoreV1()
apiextClient, err := apiextclient.NewForConfig(cfg) apiextClient, err := apiextclient.NewForConfig(cfg)
if err != nil { if err != nil {