diff --git a/charts/postgres-operator/templates/clusterrole.yaml b/charts/postgres-operator/templates/clusterrole.yaml index 38ce85e7a..0defcab41 100644 --- a/charts/postgres-operator/templates/clusterrole.yaml +++ b/charts/postgres-operator/templates/clusterrole.yaml @@ -42,6 +42,13 @@ rules: - configmaps verbs: - get +# to send events to the CRs +- apiGroups: + - "" + resources: + - events + verbs: + - create # to manage endpoints which are also used by Patroni - apiGroups: - "" diff --git a/manifests/operator-service-account-rbac.yaml b/manifests/operator-service-account-rbac.yaml index 83cd721e7..667941a24 100644 --- a/manifests/operator-service-account-rbac.yaml +++ b/manifests/operator-service-account-rbac.yaml @@ -43,6 +43,13 @@ rules: - configmaps verbs: - get +# to send events to the CRs +- apiGroups: + - "" + resources: + - events + verbs: + - create # to manage endpoints which are also used by Patroni - apiGroups: - "" diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index 74cf6e61d..244916074 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -21,8 +21,11 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" + "k8s.io/client-go/tools/record" + "k8s.io/client-go/tools/reference" acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1" + "github.com/zalando/postgres-operator/pkg/generated/clientset/versioned/scheme" "github.com/zalando/postgres-operator/pkg/spec" "github.com/zalando/postgres-operator/pkg/util" "github.com/zalando/postgres-operator/pkg/util/config" @@ -81,6 +84,7 @@ type Cluster struct { acidv1.Postgresql Config logger *logrus.Entry + eventRecorder record.EventRecorder patroni patroni.Interface pgUsers map[string]spec.PgUser systemUsers map[string]spec.PgUser @@ -109,7 +113,7 @@ type compareStatefulsetResult struct { } // New creates a new cluster. This function should be called from a controller. -func New(cfg Config, kubeClient k8sutil.KubernetesClient, pgSpec acidv1.Postgresql, logger *logrus.Entry) *Cluster { +func New(cfg Config, kubeClient k8sutil.KubernetesClient, pgSpec acidv1.Postgresql, logger *logrus.Entry, eventRecorder record.EventRecorder) *Cluster { deletePropagationPolicy := metav1.DeletePropagationOrphan podEventsQueue := cache.NewFIFO(func(obj interface{}) (string, error) { @@ -140,7 +144,7 @@ func New(cfg Config, kubeClient k8sutil.KubernetesClient, pgSpec acidv1.Postgres cluster.teamsAPIClient = teams.NewTeamsAPI(cfg.OpConfig.TeamsAPIUrl, logger) cluster.oauthTokenGetter = newSecretOauthTokenGetter(&kubeClient, cfg.OpConfig.OAuthTokenSecretName) cluster.patroni = patroni.New(cluster.logger) - + cluster.eventRecorder = eventRecorder return cluster } @@ -166,6 +170,16 @@ func (c *Cluster) setProcessName(procName string, args ...interface{}) { } } +// GetReference of Postgres CR object +// i.e. required to emit events to this resource +func (c *Cluster) GetReference() *v1.ObjectReference { + ref, err := reference.GetReference(scheme.Scheme, &c.Postgresql) + if err != nil { + c.logger.Errorf("could not get reference for Postgresql CR %v/%v: %v", c.Postgresql.Namespace, c.Postgresql.Name, err) + } + return ref +} + // SetStatus of Postgres cluster // TODO: eventually switch to updateStatus() for kubernetes 1.11 and above func (c *Cluster) setStatus(status string) { @@ -245,6 +259,7 @@ func (c *Cluster) Create() error { }() c.setStatus(acidv1.ClusterStatusCreating) + c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Create", "Started creation of new cluster resources") if err = c.enforceMinResourceLimits(&c.Spec); err != nil { return fmt.Errorf("could not enforce minimum resource limits: %v", err) @@ -263,6 +278,7 @@ func (c *Cluster) Create() error { return fmt.Errorf("could not create %s endpoint: %v", role, err) } c.logger.Infof("endpoint %q has been successfully created", util.NameFromMeta(ep.ObjectMeta)) + c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeNormal, "Endpoints", "Endpoint %q has been successfully created", util.NameFromMeta(ep.ObjectMeta)) } if c.Services[role] != nil { @@ -273,6 +289,7 @@ func (c *Cluster) Create() error { return fmt.Errorf("could not create %s service: %v", role, err) } c.logger.Infof("%s service %q has been successfully created", role, util.NameFromMeta(service.ObjectMeta)) + c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeNormal, "Services", "The service %q for role %s has been successfully created", util.NameFromMeta(service.ObjectMeta), role) } if err = c.initUsers(); err != nil { @@ -284,6 +301,7 @@ func (c *Cluster) Create() error { return fmt.Errorf("could not create secrets: %v", err) } c.logger.Infof("secrets have been successfully created") + c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Secrets", "The secrets have been successfully created") if c.PodDisruptionBudget != nil { return fmt.Errorf("pod disruption budget already exists in the cluster") @@ -302,6 +320,7 @@ func (c *Cluster) Create() error { return fmt.Errorf("could not create statefulset: %v", err) } c.logger.Infof("statefulset %q has been successfully created", util.NameFromMeta(ss.ObjectMeta)) + c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeNormal, "StatefulSet", "Statefulset %q has been successfully created", util.NameFromMeta(ss.ObjectMeta)) c.logger.Info("waiting for the cluster being ready") @@ -310,6 +329,7 @@ func (c *Cluster) Create() error { return err } c.logger.Infof("pods are ready") + c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "StatefulSet", "Pods are ready") // create database objects unless we are running without pods or disabled // that feature explicitly @@ -555,6 +575,7 @@ func (c *Cluster) enforceMinResourceLimits(spec *acidv1.PostgresSpec) error { } if isSmaller { c.logger.Warningf("defined CPU limit %s is below required minimum %s and will be set to it", cpuLimit, minCPULimit) + c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeWarning, "ResourceLimits", "defined CPU limit %s is below required minimum %s and will be set to it", cpuLimit, minCPULimit) spec.Resources.ResourceLimits.CPU = minCPULimit } } @@ -567,6 +588,7 @@ func (c *Cluster) enforceMinResourceLimits(spec *acidv1.PostgresSpec) error { } if isSmaller { c.logger.Warningf("defined memory limit %s is below required minimum %s and will be set to it", memoryLimit, minMemoryLimit) + c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeWarning, "ResourceLimits", "defined memory limit %s is below required minimum %s and will be set to it", memoryLimit, minMemoryLimit) spec.Resources.ResourceLimits.Memory = minMemoryLimit } } @@ -598,6 +620,8 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error { if oldSpec.Spec.PostgresqlParam.PgVersion != newSpec.Spec.PostgresqlParam.PgVersion { // PG versions comparison c.logger.Warningf("postgresql version change(%q -> %q) has no effect", oldSpec.Spec.PostgresqlParam.PgVersion, newSpec.Spec.PostgresqlParam.PgVersion) + c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeWarning, "PostgreSQL", "postgresql version change(%q -> %q) has no effect", + oldSpec.Spec.PostgresqlParam.PgVersion, newSpec.Spec.PostgresqlParam.PgVersion) //we need that hack to generate statefulset with the old version newSpec.Spec.PostgresqlParam.PgVersion = oldSpec.Spec.PostgresqlParam.PgVersion } @@ -757,6 +781,7 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error { func (c *Cluster) Delete() { c.mu.Lock() defer c.mu.Unlock() + c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Delete", "Started deletion of new cluster resources") // delete the backup job before the stateful set of the cluster to prevent connections to non-existing pods // deleting the cron job also removes pods and batch jobs it created @@ -1095,6 +1120,7 @@ func (c *Cluster) Switchover(curMaster *v1.Pod, candidate spec.NamespacedName) e var err error c.logger.Debugf("switching over from %q to %q", curMaster.Name, candidate) + c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeNormal, "Switchover", "Switching over from %q to %q", curMaster.Name, candidate) var wg sync.WaitGroup @@ -1121,6 +1147,7 @@ func (c *Cluster) Switchover(curMaster *v1.Pod, candidate spec.NamespacedName) e if err = c.patroni.Switchover(curMaster, candidate.Name); err == nil { c.logger.Debugf("successfully switched over from %q to %q", curMaster.Name, candidate) + c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeNormal, "Switchover", "Successfully switched over from %q to %q", curMaster.Name, candidate) if err = <-podLabelErr; err != nil { err = fmt.Errorf("could not get master pod label: %v", err) } @@ -1136,6 +1163,7 @@ func (c *Cluster) Switchover(curMaster *v1.Pod, candidate spec.NamespacedName) e // close the label waiting channel no sooner than the waiting goroutine terminates. close(podLabelErr) + c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeNormal, "Switchover", "Switchover from %q to %q FAILED: %v", curMaster.Name, candidate, err) return err } diff --git a/pkg/cluster/cluster_test.go b/pkg/cluster/cluster_test.go index 432f53132..84ec04e3e 100644 --- a/pkg/cluster/cluster_test.go +++ b/pkg/cluster/cluster_test.go @@ -13,6 +13,7 @@ import ( "github.com/zalando/postgres-operator/pkg/util/k8sutil" "github.com/zalando/postgres-operator/pkg/util/teams" v1 "k8s.io/api/core/v1" + "k8s.io/client-go/tools/record" ) const ( @@ -21,6 +22,8 @@ const ( ) var logger = logrus.New().WithField("test", "cluster") +var eventRecorder = record.NewFakeRecorder(1) + var cl = New( Config{ OpConfig: config.Config{ @@ -34,6 +37,7 @@ var cl = New( k8sutil.NewMockKubernetesClient(), acidv1.Postgresql{}, logger, + eventRecorder, ) func TestInitRobotUsers(t *testing.T) { diff --git a/pkg/cluster/k8sres_test.go b/pkg/cluster/k8sres_test.go index 6e4587627..1291d4f47 100644 --- a/pkg/cluster/k8sres_test.go +++ b/pkg/cluster/k8sres_test.go @@ -37,7 +37,7 @@ func TestGenerateSpiloJSONConfiguration(t *testing.T) { ReplicationUsername: replicationUserName, }, }, - }, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger) + }, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger, eventRecorder) testName := "TestGenerateSpiloConfig" tests := []struct { @@ -102,7 +102,7 @@ func TestCreateLoadBalancerLogic(t *testing.T) { ReplicationUsername: replicationUserName, }, }, - }, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger) + }, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger, eventRecorder) testName := "TestCreateLoadBalancerLogic" tests := []struct { @@ -164,7 +164,8 @@ func TestGeneratePodDisruptionBudget(t *testing.T) { acidv1.Postgresql{ ObjectMeta: metav1.ObjectMeta{Name: "myapp-database", Namespace: "myapp"}, Spec: acidv1.PostgresSpec{TeamID: "myapp", NumberOfInstances: 3}}, - logger), + logger, + eventRecorder), policyv1beta1.PodDisruptionBudget{ ObjectMeta: metav1.ObjectMeta{ Name: "postgres-myapp-database-pdb", @@ -187,7 +188,8 @@ func TestGeneratePodDisruptionBudget(t *testing.T) { acidv1.Postgresql{ ObjectMeta: metav1.ObjectMeta{Name: "myapp-database", Namespace: "myapp"}, Spec: acidv1.PostgresSpec{TeamID: "myapp", NumberOfInstances: 0}}, - logger), + logger, + eventRecorder), policyv1beta1.PodDisruptionBudget{ ObjectMeta: metav1.ObjectMeta{ Name: "postgres-myapp-database-pdb", @@ -210,7 +212,8 @@ func TestGeneratePodDisruptionBudget(t *testing.T) { acidv1.Postgresql{ ObjectMeta: metav1.ObjectMeta{Name: "myapp-database", Namespace: "myapp"}, Spec: acidv1.PostgresSpec{TeamID: "myapp", NumberOfInstances: 3}}, - logger), + logger, + eventRecorder), policyv1beta1.PodDisruptionBudget{ ObjectMeta: metav1.ObjectMeta{ Name: "postgres-myapp-database-pdb", @@ -233,7 +236,8 @@ func TestGeneratePodDisruptionBudget(t *testing.T) { acidv1.Postgresql{ ObjectMeta: metav1.ObjectMeta{Name: "myapp-database", Namespace: "myapp"}, Spec: acidv1.PostgresSpec{TeamID: "myapp", NumberOfInstances: 3}}, - logger), + logger, + eventRecorder), policyv1beta1.PodDisruptionBudget{ ObjectMeta: metav1.ObjectMeta{ Name: "postgres-myapp-database-databass-budget", @@ -368,7 +372,7 @@ func TestCloneEnv(t *testing.T) { ReplicationUsername: replicationUserName, }, }, - }, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger) + }, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger, eventRecorder) for _, tt := range tests { envs := cluster.generateCloneEnvironment(tt.cloneOpts) @@ -502,7 +506,7 @@ func TestGetPgVersion(t *testing.T) { ReplicationUsername: replicationUserName, }, }, - }, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger) + }, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger, eventRecorder) for _, tt := range tests { pgVersion, err := cluster.getNewPgVersion(tt.pgContainer, tt.newPgVersion) @@ -678,7 +682,7 @@ func TestConnectionPoolerPodSpec(t *testing.T) { ConnectionPoolerDefaultMemoryLimit: "100Mi", }, }, - }, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger) + }, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger, eventRecorder) var clusterNoDefaultRes = New( Config{ @@ -690,7 +694,7 @@ func TestConnectionPoolerPodSpec(t *testing.T) { }, ConnectionPooler: config.ConnectionPooler{}, }, - }, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger) + }, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger, eventRecorder) noCheck := func(cluster *Cluster, podSpec *v1.PodTemplateSpec) error { return nil } @@ -803,7 +807,7 @@ func TestConnectionPoolerDeploymentSpec(t *testing.T) { ConnectionPoolerDefaultMemoryLimit: "100Mi", }, }, - }, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger) + }, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger, eventRecorder) cluster.Statefulset = &appsv1.StatefulSet{ ObjectMeta: metav1.ObjectMeta{ Name: "test-sts", @@ -904,7 +908,7 @@ func TestConnectionPoolerServiceSpec(t *testing.T) { ConnectionPoolerDefaultMemoryLimit: "100Mi", }, }, - }, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger) + }, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger, eventRecorder) cluster.Statefulset = &appsv1.StatefulSet{ ObjectMeta: metav1.ObjectMeta{ Name: "test-sts", @@ -990,7 +994,7 @@ func TestTLS(t *testing.T) { SpiloFSGroup: &spiloFSGroup, }, }, - }, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger) + }, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger, eventRecorder) spec = makeSpec(acidv1.TLSDescription{SecretName: "my-secret", CAFile: "ca.crt"}) s, err := cluster.generateStatefulSet(&spec) if err != nil { @@ -1112,7 +1116,7 @@ func TestAdditionalVolume(t *testing.T) { ReplicationUsername: replicationUserName, }, }, - }, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger) + }, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger, eventRecorder) for _, tt := range tests { // Test with additional volume mounted in all containers diff --git a/pkg/cluster/resources_test.go b/pkg/cluster/resources_test.go index 2db807b38..9739cc354 100644 --- a/pkg/cluster/resources_test.go +++ b/pkg/cluster/resources_test.go @@ -36,7 +36,7 @@ func TestConnectionPoolerCreationAndDeletion(t *testing.T) { ConnectionPoolerDefaultMemoryLimit: "100Mi", }, }, - }, k8sutil.NewMockKubernetesClient(), acidv1.Postgresql{}, logger) + }, k8sutil.NewMockKubernetesClient(), acidv1.Postgresql{}, logger, eventRecorder) cluster.Statefulset = &appsv1.StatefulSet{ ObjectMeta: metav1.ObjectMeta{ @@ -85,7 +85,7 @@ func TestNeedConnectionPooler(t *testing.T) { ConnectionPoolerDefaultMemoryLimit: "100Mi", }, }, - }, k8sutil.NewMockKubernetesClient(), acidv1.Postgresql{}, logger) + }, k8sutil.NewMockKubernetesClient(), acidv1.Postgresql{}, logger, eventRecorder) cluster.Spec = acidv1.PostgresSpec{ ConnectionPooler: &acidv1.ConnectionPooler{}, diff --git a/pkg/cluster/sync.go b/pkg/cluster/sync.go index 43173102b..a9a2b5177 100644 --- a/pkg/cluster/sync.go +++ b/pkg/cluster/sync.go @@ -343,10 +343,12 @@ func (c *Cluster) syncStatefulSet() error { // statefulset or those that got their configuration from the outdated statefulset) if podsRollingUpdateRequired { c.logger.Debugln("performing rolling update") + c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Update", "Performing rolling update") if err := c.recreatePods(); err != nil { return fmt.Errorf("could not recreate pods: %v", err) } c.logger.Infof("pods have been recreated") + c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Update", "Rolling update done - pods have been recreated") if err := c.applyRollingUpdateFlagforStatefulSet(false); err != nil { c.logger.Warningf("could not clear rolling update for the statefulset: %v", err) } diff --git a/pkg/cluster/sync_test.go b/pkg/cluster/sync_test.go index 50b5cfaa8..3a7317938 100644 --- a/pkg/cluster/sync_test.go +++ b/pkg/cluster/sync_test.go @@ -79,7 +79,7 @@ func TestConnectionPoolerSynchronization(t *testing.T) { NumberOfInstances: int32ToPointer(1), }, }, - }, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger) + }, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger, eventRecorder) cluster.Statefulset = &appsv1.StatefulSet{ ObjectMeta: metav1.ObjectMeta{ diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 9c48b7ef2..4e4685379 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -7,24 +7,24 @@ import ( "sync" "github.com/sirupsen/logrus" - v1 "k8s.io/api/core/v1" - rbacv1 "k8s.io/api/rbac/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/types" - "k8s.io/client-go/kubernetes/scheme" - "k8s.io/client-go/tools/cache" - acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1" "github.com/zalando/postgres-operator/pkg/apiserver" "github.com/zalando/postgres-operator/pkg/cluster" + acidv1informer "github.com/zalando/postgres-operator/pkg/generated/informers/externalversions/acid.zalan.do/v1" "github.com/zalando/postgres-operator/pkg/spec" "github.com/zalando/postgres-operator/pkg/util" "github.com/zalando/postgres-operator/pkg/util/config" "github.com/zalando/postgres-operator/pkg/util/constants" "github.com/zalando/postgres-operator/pkg/util/k8sutil" "github.com/zalando/postgres-operator/pkg/util/ringlog" - - acidv1informer "github.com/zalando/postgres-operator/pkg/generated/informers/externalversions/acid.zalan.do/v1" + v1 "k8s.io/api/core/v1" + rbacv1 "k8s.io/api/rbac/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/kubernetes/scheme" + typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/tools/record" ) // Controller represents operator controller @@ -36,6 +36,9 @@ type Controller struct { KubeClient k8sutil.KubernetesClient apiserver *apiserver.Server + eventRecorder record.EventRecorder + eventBroadcaster record.EventBroadcaster + stopCh chan struct{} controllerID string @@ -67,10 +70,21 @@ type Controller struct { func NewController(controllerConfig *spec.ControllerConfig, controllerId string) *Controller { logger := logrus.New() + var myComponentName = "postgres-operator" + if controllerId != "" { + myComponentName += "/" + controllerId + } + + eventBroadcaster := record.NewBroadcaster() + eventBroadcaster.StartLogging(logger.Debugf) + recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: myComponentName}) + c := &Controller{ config: *controllerConfig, opConfig: &config.Config{}, logger: logger.WithField("pkg", "controller"), + eventRecorder: recorder, + eventBroadcaster: eventBroadcaster, controllerID: controllerId, curWorkerCluster: sync.Map{}, clusterWorkers: make(map[spec.NamespacedName]uint32), @@ -93,6 +107,11 @@ func (c *Controller) initClients() { if err != nil { c.logger.Fatalf("could not create kubernetes clients: %v", err) } + c.eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: c.KubeClient.EventsGetter.Events("")}) + if err != nil { + c.logger.Fatalf("could not setup kubernetes event sink: %v", err) + } + } func (c *Controller) initOperatorConfig() { diff --git a/pkg/controller/postgresql.go b/pkg/controller/postgresql.go index e81671c7d..2a9e1b650 100644 --- a/pkg/controller/postgresql.go +++ b/pkg/controller/postgresql.go @@ -11,6 +11,7 @@ import ( "github.com/sirupsen/logrus" + v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/tools/cache" @@ -157,7 +158,7 @@ func (c *Controller) acquireInitialListOfClusters() error { } func (c *Controller) addCluster(lg *logrus.Entry, clusterName spec.NamespacedName, pgSpec *acidv1.Postgresql) *cluster.Cluster { - cl := cluster.New(c.makeClusterConfig(), c.KubeClient, *pgSpec, lg) + cl := cluster.New(c.makeClusterConfig(), c.KubeClient, *pgSpec, lg, c.eventRecorder) cl.Run(c.stopCh) teamName := strings.ToLower(cl.Spec.TeamID) @@ -236,6 +237,7 @@ func (c *Controller) processEvent(event ClusterEvent) { if err := cl.Create(); err != nil { cl.Error = fmt.Sprintf("could not create cluster: %v", err) lg.Error(cl.Error) + c.eventRecorder.Eventf(cl.GetReference(), v1.EventTypeWarning, "Create", "%v", cl.Error) return } @@ -274,6 +276,8 @@ func (c *Controller) processEvent(event ClusterEvent) { c.curWorkerCluster.Store(event.WorkerID, cl) cl.Delete() + // Fixme - no error handling for delete ? + // c.eventRecorder.Eventf(cl.GetReference, v1.EventTypeWarning, "Delete", "%v", cl.Error) func() { defer c.clustersMu.Unlock() @@ -304,6 +308,7 @@ func (c *Controller) processEvent(event ClusterEvent) { c.curWorkerCluster.Store(event.WorkerID, cl) if err := cl.Sync(event.NewSpec); err != nil { cl.Error = fmt.Sprintf("could not sync cluster: %v", err) + c.eventRecorder.Eventf(cl.GetReference(), v1.EventTypeWarning, "Sync", "%v", cl.Error) lg.Error(cl.Error) return } diff --git a/pkg/util/k8sutil/k8sutil.go b/pkg/util/k8sutil/k8sutil.go index 3a397af7d..d7be2f48a 100644 --- a/pkg/util/k8sutil/k8sutil.go +++ b/pkg/util/k8sutil/k8sutil.go @@ -45,6 +45,7 @@ type KubernetesClient struct { corev1.NodesGetter corev1.NamespacesGetter corev1.ServiceAccountsGetter + corev1.EventsGetter appsv1.StatefulSetsGetter appsv1.DeploymentsGetter rbacv1.RoleBindingsGetter @@ -142,6 +143,7 @@ func NewFromConfig(cfg *rest.Config) (KubernetesClient, error) { kubeClient.RESTClient = client.CoreV1().RESTClient() kubeClient.RoleBindingsGetter = client.RbacV1() kubeClient.CronJobsGetter = client.BatchV1beta1() + kubeClient.EventsGetter = client.CoreV1() apiextClient, err := apiextclient.NewForConfig(cfg) if err != nil {