From 51fdfb90f70cc0becdb24257c72dba1a7a0efae4 Mon Sep 17 00:00:00 2001 From: Murat Kabilov Date: Tue, 15 Aug 2017 12:16:09 +0200 Subject: [PATCH] log cluster and controller events in the ringlog via logrus hook --- pkg/controller/controller.go | 28 +++++++++++----- pkg/controller/postgresql.go | 62 +++++++++++++++++++++++------------- pkg/util/config/config.go | 1 + 3 files changed, 60 insertions(+), 31 deletions(-) diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 7c4dc6328..f7947c402 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -15,6 +15,7 @@ import ( "github.com/zalando-incubator/postgres-operator/pkg/util/config" "github.com/zalando-incubator/postgres-operator/pkg/util/constants" "github.com/zalando-incubator/postgres-operator/pkg/util/k8sutil" + "github.com/zalando-incubator/postgres-operator/pkg/util/ringlog" ) // Controller represents operator controller @@ -26,9 +27,12 @@ type Controller struct { KubeClient k8sutil.KubernetesClient RestClient rest.Interface // kubernetes API group REST client - clustersMu sync.RWMutex - clusters map[spec.NamespacedName]*cluster.Cluster - stopChs map[spec.NamespacedName]chan struct{} + stopCh chan struct{} + + clustersMu sync.RWMutex + clusters map[spec.NamespacedName]*cluster.Cluster + clusterLogs map[spec.NamespacedName]ringlog.RingLogger + teamClusters map[string][]spec.NamespacedName postgresqlInformer cache.SharedIndexInformer podInformer cache.SharedIndexInformer @@ -36,6 +40,8 @@ type Controller struct { clusterEventQueues []*cache.FIFO // [workerID]Queue lastClusterSyncTime int64 + + workerLogs map[uint32]ringlog.RingLogger } // NewController creates a new controller @@ -43,12 +49,16 @@ func NewController(controllerConfig *spec.ControllerConfig) *Controller { logger := logrus.New() c := &Controller{ - config: *controllerConfig, - opConfig: &config.Config{}, - logger: logger.WithField("pkg", "controller"), - clusters: make(map[spec.NamespacedName]*cluster.Cluster), - podCh: make(chan spec.PodEvent), + config: *controllerConfig, + opConfig: &config.Config{}, + logger: logger.WithField("pkg", "controller"), + clusters: make(map[spec.NamespacedName]*cluster.Cluster), + clusterLogs: make(map[spec.NamespacedName]ringlog.RingLogger), + teamClusters: make(map[string][]spec.NamespacedName), + stopCh: make(chan struct{}), + podCh: make(chan spec.PodEvent), } + logger.Hooks.Add(c) return c } @@ -149,6 +159,7 @@ func (c *Controller) initController() { }) c.clusterEventQueues = make([]*cache.FIFO, c.opConfig.Workers) + c.workerLogs = make(map[uint32]ringlog.RingLogger, c.opConfig.Workers) for i := range c.clusterEventQueues { c.clusterEventQueues[i] = cache.NewFIFO(func(obj interface{}) (string, error) { e, ok := obj.(spec.ClusterEvent) @@ -172,6 +183,7 @@ func (c *Controller) Run(stopCh <-chan struct{}, wg *sync.WaitGroup) { for i := range c.clusterEventQueues { wg.Add(1) + c.workerLogs[uint32(i)] = ringlog.New(c.opConfig.RingLogLines) go c.processClusterEventsQueue(i, stopCh, wg) } diff --git a/pkg/controller/postgresql.go b/pkg/controller/postgresql.go index 3407164a4..64c2a32d4 100644 --- a/pkg/controller/postgresql.go +++ b/pkg/controller/postgresql.go @@ -4,10 +4,12 @@ import ( "encoding/json" "fmt" "reflect" + "strings" "sync" "sync/atomic" "time" + "github.com/Sirupsen/logrus" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" @@ -18,6 +20,7 @@ import ( "github.com/zalando-incubator/postgres-operator/pkg/spec" "github.com/zalando-incubator/postgres-operator/pkg/util" "github.com/zalando-incubator/postgres-operator/pkg/util/constants" + "github.com/zalando-incubator/postgres-operator/pkg/util/ringlog" ) func (c *Controller) clusterResync(stopCh <-chan struct{}, wg *sync.WaitGroup) { @@ -124,6 +127,21 @@ func (c *Controller) clusterWatchFunc(options metav1.ListOptions) (watch.Interfa }), nil } +func (c *Controller) addCluster(lg *logrus.Entry, clusterName spec.NamespacedName, pgSpec *spec.Postgresql) *cluster.Cluster { + cl := cluster.New(c.makeClusterConfig(), c.KubeClient, *pgSpec, lg) + cl.Run(c.stopCh) + teamName := strings.ToLower(cl.Spec.TeamID) + + defer c.clustersMu.Unlock() + c.clustersMu.Lock() + + c.teamClusters[teamName] = append(c.teamClusters[teamName], clusterName) + c.clusters[clusterName] = cl + c.clusterLogs[clusterName] = ringlog.New(c.opConfig.RingLogLines) + + return cl +} + func (c *Controller) processEvent(obj interface{}) error { var clusterName spec.NamespacedName @@ -153,14 +171,7 @@ func (c *Controller) processEvent(obj interface{}) error { lg.Infof("creation of the cluster started") - stopCh := make(chan struct{}) - cl = cluster.New(c.makeClusterConfig(), c.KubeClient, *event.NewSpec, lg) - cl.Run(stopCh) - - c.clustersMu.Lock() - c.clusters[clusterName] = cl - c.stopChs[clusterName] = stopCh - c.clustersMu.Unlock() + cl = c.addCluster(lg, clusterName, event.NewSpec) if err := cl.Create(); err != nil { cl.Error = fmt.Errorf("could not create cluster: %v", err) @@ -186,7 +197,9 @@ func (c *Controller) processEvent(obj interface{}) error { cl.Error = nil lg.Infoln("cluster has been updated") case spec.EventDelete: - lg.Infof("deletion of the %q cluster started", clusterName) + teamName := strings.ToLower(cl.Spec.TeamID) + + lg.Infoln("Deletion of the cluster started") if !clusterFound { lg.Errorf("unknown cluster: %q", clusterName) return nil @@ -196,12 +209,22 @@ func (c *Controller) processEvent(obj interface{}) error { lg.Errorf("could not delete cluster: %v", err) return nil } - close(c.stopChs[clusterName]) - c.clustersMu.Lock() - delete(c.clusters, clusterName) - delete(c.stopChs, clusterName) - c.clustersMu.Unlock() + func() { + defer c.clustersMu.Unlock() + c.clustersMu.Lock() + + delete(c.clusters, clusterName) + delete(c.clusterLogs, clusterName) + for i, val := range c.teamClusters[teamName] { // on relativel + if val == clusterName { + copy(c.teamClusters[teamName][i:], c.teamClusters[teamName][i+1:]) + c.teamClusters[teamName][len(c.teamClusters[teamName])-1] = spec.NamespacedName{} + c.teamClusters[teamName] = c.teamClusters[teamName][:len(c.teamClusters[teamName])-1] + break + } + } + }() lg.Infof("cluster has been deleted") case spec.EventSync: @@ -209,18 +232,11 @@ func (c *Controller) processEvent(obj interface{}) error { // no race condition because a cluster is always processed by single worker if !clusterFound { - stopCh := make(chan struct{}) - cl = cluster.New(c.makeClusterConfig(), c.KubeClient, *event.NewSpec, lg) - cl.Run(stopCh) - - c.clustersMu.Lock() - c.clusters[clusterName] = cl - c.stopChs[clusterName] = stopCh - c.clustersMu.Unlock() + cl = c.addCluster(lg, clusterName, event.NewSpec) } if err := cl.Sync(); err != nil { - cl.Error = fmt.Errorf("could not sync cluster %q: %v", clusterName, err) + cl.Error = fmt.Errorf("could not sync cluster: %v", err) lg.Error(cl.Error) return nil } diff --git a/pkg/util/config/config.go b/pkg/util/config/config.go index 18a902cc1..dac649281 100644 --- a/pkg/util/config/config.go +++ b/pkg/util/config/config.go @@ -61,6 +61,7 @@ type Config struct { MasterDNSNameFormat stringTemplate `name:"master_dns_name_format" default:"{cluster}.{team}.{hostedzone}"` ReplicaDNSNameFormat stringTemplate `name:"replica_dns_name_format" default:"{cluster}-repl.{team}.{hostedzone}"` Workers uint32 `name:"workers" default:"4"` + RingLogLines int `name:"ring_log_lines" default:"100"` } // MustMarshal marshals the config or panics