log cluster and controller events in the ringlog via logrus hook
This commit is contained in:
parent
82f58b57d8
commit
51fdfb90f7
|
|
@ -15,6 +15,7 @@ import (
|
|||
"github.com/zalando-incubator/postgres-operator/pkg/util/config"
|
||||
"github.com/zalando-incubator/postgres-operator/pkg/util/constants"
|
||||
"github.com/zalando-incubator/postgres-operator/pkg/util/k8sutil"
|
||||
"github.com/zalando-incubator/postgres-operator/pkg/util/ringlog"
|
||||
)
|
||||
|
||||
// Controller represents operator controller
|
||||
|
|
@ -26,9 +27,12 @@ type Controller struct {
|
|||
KubeClient k8sutil.KubernetesClient
|
||||
RestClient rest.Interface // kubernetes API group REST client
|
||||
|
||||
clustersMu sync.RWMutex
|
||||
clusters map[spec.NamespacedName]*cluster.Cluster
|
||||
stopChs map[spec.NamespacedName]chan struct{}
|
||||
stopCh chan struct{}
|
||||
|
||||
clustersMu sync.RWMutex
|
||||
clusters map[spec.NamespacedName]*cluster.Cluster
|
||||
clusterLogs map[spec.NamespacedName]ringlog.RingLogger
|
||||
teamClusters map[string][]spec.NamespacedName
|
||||
|
||||
postgresqlInformer cache.SharedIndexInformer
|
||||
podInformer cache.SharedIndexInformer
|
||||
|
|
@ -36,6 +40,8 @@ type Controller struct {
|
|||
|
||||
clusterEventQueues []*cache.FIFO // [workerID]Queue
|
||||
lastClusterSyncTime int64
|
||||
|
||||
workerLogs map[uint32]ringlog.RingLogger
|
||||
}
|
||||
|
||||
// NewController creates a new controller
|
||||
|
|
@ -43,12 +49,16 @@ func NewController(controllerConfig *spec.ControllerConfig) *Controller {
|
|||
logger := logrus.New()
|
||||
|
||||
c := &Controller{
|
||||
config: *controllerConfig,
|
||||
opConfig: &config.Config{},
|
||||
logger: logger.WithField("pkg", "controller"),
|
||||
clusters: make(map[spec.NamespacedName]*cluster.Cluster),
|
||||
podCh: make(chan spec.PodEvent),
|
||||
config: *controllerConfig,
|
||||
opConfig: &config.Config{},
|
||||
logger: logger.WithField("pkg", "controller"),
|
||||
clusters: make(map[spec.NamespacedName]*cluster.Cluster),
|
||||
clusterLogs: make(map[spec.NamespacedName]ringlog.RingLogger),
|
||||
teamClusters: make(map[string][]spec.NamespacedName),
|
||||
stopCh: make(chan struct{}),
|
||||
podCh: make(chan spec.PodEvent),
|
||||
}
|
||||
logger.Hooks.Add(c)
|
||||
|
||||
return c
|
||||
}
|
||||
|
|
@ -149,6 +159,7 @@ func (c *Controller) initController() {
|
|||
})
|
||||
|
||||
c.clusterEventQueues = make([]*cache.FIFO, c.opConfig.Workers)
|
||||
c.workerLogs = make(map[uint32]ringlog.RingLogger, c.opConfig.Workers)
|
||||
for i := range c.clusterEventQueues {
|
||||
c.clusterEventQueues[i] = cache.NewFIFO(func(obj interface{}) (string, error) {
|
||||
e, ok := obj.(spec.ClusterEvent)
|
||||
|
|
@ -172,6 +183,7 @@ func (c *Controller) Run(stopCh <-chan struct{}, wg *sync.WaitGroup) {
|
|||
|
||||
for i := range c.clusterEventQueues {
|
||||
wg.Add(1)
|
||||
c.workerLogs[uint32(i)] = ringlog.New(c.opConfig.RingLogLines)
|
||||
go c.processClusterEventsQueue(i, stopCh, wg)
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -4,10 +4,12 @@ import (
|
|||
"encoding/json"
|
||||
"fmt"
|
||||
"reflect"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/Sirupsen/logrus"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
|
|
@ -18,6 +20,7 @@ import (
|
|||
"github.com/zalando-incubator/postgres-operator/pkg/spec"
|
||||
"github.com/zalando-incubator/postgres-operator/pkg/util"
|
||||
"github.com/zalando-incubator/postgres-operator/pkg/util/constants"
|
||||
"github.com/zalando-incubator/postgres-operator/pkg/util/ringlog"
|
||||
)
|
||||
|
||||
func (c *Controller) clusterResync(stopCh <-chan struct{}, wg *sync.WaitGroup) {
|
||||
|
|
@ -124,6 +127,21 @@ func (c *Controller) clusterWatchFunc(options metav1.ListOptions) (watch.Interfa
|
|||
}), nil
|
||||
}
|
||||
|
||||
func (c *Controller) addCluster(lg *logrus.Entry, clusterName spec.NamespacedName, pgSpec *spec.Postgresql) *cluster.Cluster {
|
||||
cl := cluster.New(c.makeClusterConfig(), c.KubeClient, *pgSpec, lg)
|
||||
cl.Run(c.stopCh)
|
||||
teamName := strings.ToLower(cl.Spec.TeamID)
|
||||
|
||||
defer c.clustersMu.Unlock()
|
||||
c.clustersMu.Lock()
|
||||
|
||||
c.teamClusters[teamName] = append(c.teamClusters[teamName], clusterName)
|
||||
c.clusters[clusterName] = cl
|
||||
c.clusterLogs[clusterName] = ringlog.New(c.opConfig.RingLogLines)
|
||||
|
||||
return cl
|
||||
}
|
||||
|
||||
func (c *Controller) processEvent(obj interface{}) error {
|
||||
var clusterName spec.NamespacedName
|
||||
|
||||
|
|
@ -153,14 +171,7 @@ func (c *Controller) processEvent(obj interface{}) error {
|
|||
|
||||
lg.Infof("creation of the cluster started")
|
||||
|
||||
stopCh := make(chan struct{})
|
||||
cl = cluster.New(c.makeClusterConfig(), c.KubeClient, *event.NewSpec, lg)
|
||||
cl.Run(stopCh)
|
||||
|
||||
c.clustersMu.Lock()
|
||||
c.clusters[clusterName] = cl
|
||||
c.stopChs[clusterName] = stopCh
|
||||
c.clustersMu.Unlock()
|
||||
cl = c.addCluster(lg, clusterName, event.NewSpec)
|
||||
|
||||
if err := cl.Create(); err != nil {
|
||||
cl.Error = fmt.Errorf("could not create cluster: %v", err)
|
||||
|
|
@ -186,7 +197,9 @@ func (c *Controller) processEvent(obj interface{}) error {
|
|||
cl.Error = nil
|
||||
lg.Infoln("cluster has been updated")
|
||||
case spec.EventDelete:
|
||||
lg.Infof("deletion of the %q cluster started", clusterName)
|
||||
teamName := strings.ToLower(cl.Spec.TeamID)
|
||||
|
||||
lg.Infoln("Deletion of the cluster started")
|
||||
if !clusterFound {
|
||||
lg.Errorf("unknown cluster: %q", clusterName)
|
||||
return nil
|
||||
|
|
@ -196,12 +209,22 @@ func (c *Controller) processEvent(obj interface{}) error {
|
|||
lg.Errorf("could not delete cluster: %v", err)
|
||||
return nil
|
||||
}
|
||||
close(c.stopChs[clusterName])
|
||||
|
||||
c.clustersMu.Lock()
|
||||
delete(c.clusters, clusterName)
|
||||
delete(c.stopChs, clusterName)
|
||||
c.clustersMu.Unlock()
|
||||
func() {
|
||||
defer c.clustersMu.Unlock()
|
||||
c.clustersMu.Lock()
|
||||
|
||||
delete(c.clusters, clusterName)
|
||||
delete(c.clusterLogs, clusterName)
|
||||
for i, val := range c.teamClusters[teamName] { // on relativel
|
||||
if val == clusterName {
|
||||
copy(c.teamClusters[teamName][i:], c.teamClusters[teamName][i+1:])
|
||||
c.teamClusters[teamName][len(c.teamClusters[teamName])-1] = spec.NamespacedName{}
|
||||
c.teamClusters[teamName] = c.teamClusters[teamName][:len(c.teamClusters[teamName])-1]
|
||||
break
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
lg.Infof("cluster has been deleted")
|
||||
case spec.EventSync:
|
||||
|
|
@ -209,18 +232,11 @@ func (c *Controller) processEvent(obj interface{}) error {
|
|||
|
||||
// no race condition because a cluster is always processed by single worker
|
||||
if !clusterFound {
|
||||
stopCh := make(chan struct{})
|
||||
cl = cluster.New(c.makeClusterConfig(), c.KubeClient, *event.NewSpec, lg)
|
||||
cl.Run(stopCh)
|
||||
|
||||
c.clustersMu.Lock()
|
||||
c.clusters[clusterName] = cl
|
||||
c.stopChs[clusterName] = stopCh
|
||||
c.clustersMu.Unlock()
|
||||
cl = c.addCluster(lg, clusterName, event.NewSpec)
|
||||
}
|
||||
|
||||
if err := cl.Sync(); err != nil {
|
||||
cl.Error = fmt.Errorf("could not sync cluster %q: %v", clusterName, err)
|
||||
cl.Error = fmt.Errorf("could not sync cluster: %v", err)
|
||||
lg.Error(cl.Error)
|
||||
return nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -61,6 +61,7 @@ type Config struct {
|
|||
MasterDNSNameFormat stringTemplate `name:"master_dns_name_format" default:"{cluster}.{team}.{hostedzone}"`
|
||||
ReplicaDNSNameFormat stringTemplate `name:"replica_dns_name_format" default:"{cluster}-repl.{team}.{hostedzone}"`
|
||||
Workers uint32 `name:"workers" default:"4"`
|
||||
RingLogLines int `name:"ring_log_lines" default:"100"`
|
||||
}
|
||||
|
||||
// MustMarshal marshals the config or panics
|
||||
|
|
|
|||
Loading…
Reference in New Issue