diff --git a/manifests/configmap.yaml b/manifests/configmap.yaml index 6fdee23b1..c9bff1cca 100644 --- a/manifests/configmap.yaml +++ b/manifests/configmap.yaml @@ -32,4 +32,5 @@ data: enable_load_balancer: "true" api_port: "8080" ring_log_lines: "100" + cluster_history_entries: "1000" pod_terminate_grace_period: 5m diff --git a/pkg/apiserver/apiserver.go b/pkg/apiserver/apiserver.go index b7332eba4..bd867a5ce 100644 --- a/pkg/apiserver/apiserver.go +++ b/pkg/apiserver/apiserver.go @@ -32,6 +32,7 @@ type controllerInformer interface { TeamClusterList() map[string][]spec.NamespacedName ClusterStatus(team, cluster string) (*spec.ClusterStatus, error) ClusterLogs(team, cluster string) ([]*spec.LogEntry, error) + ClusterHistory(team, cluster string) ([]*spec.Diff, error) WorkerLogs(workerID uint32) ([]*spec.LogEntry, error) ListQueue(workerID uint32) (*spec.QueueDump, error) GetWorkersCnt() uint32 @@ -47,6 +48,7 @@ type Server struct { var ( clusterStatusURL = regexp.MustCompile(`^/clusters/(?P[a-zA-Z][a-zA-Z0-9]*)/(?P[a-zA-Z][a-zA-Z0-9]*)/?$`) clusterLogsURL = regexp.MustCompile(`^/clusters/(?P[a-zA-Z][a-zA-Z0-9]*)/(?P[a-zA-Z][a-zA-Z0-9]*)/logs/?$`) + clusterHistoryURL = regexp.MustCompile(`^/clusters/(?P[a-zA-Z][a-zA-Z0-9]*)/(?P[a-zA-Z][a-zA-Z0-9]*)/history/?$`) teamURL = regexp.MustCompile(`^/clusters/(?P[a-zA-Z][a-zA-Z0-9]*)/?$`) workerLogsURL = regexp.MustCompile(`^/workers/(?P\d+)/logs/?$`) workerEventsQueueURL = regexp.MustCompile(`^/workers/(?P\d+)/queue/?$`) @@ -160,6 +162,8 @@ func (s *Server) clusters(w http.ResponseWriter, req *http.Request) { return } else if matches := util.FindNamedStringSubmatch(clusterLogsURL, req.URL.Path); matches != nil { resp, err = s.controller.ClusterLogs(matches["team"], matches["cluster"]) + } else if matches := util.FindNamedStringSubmatch(clusterHistoryURL, req.URL.Path); matches != nil { + resp, err = s.controller.ClusterHistory(matches["team"], matches["cluster"]) } else if req.URL.Path == clustersURL { res := make(map[string][]string) for team, clusters := range s.controller.TeamClusterList() { @@ -171,8 +175,7 @@ func (s *Server) clusters(w http.ResponseWriter, req *http.Request) { s.respond(res, nil, w) return } else { - s.respond(nil, fmt.Errorf("page not found"), w) - return + err = fmt.Errorf("page not found") } s.respond(resp, err, w) diff --git a/pkg/cluster/k8sres.go b/pkg/cluster/k8sres.go index 14ed17daf..041cd2f42 100644 --- a/pkg/cluster/k8sres.go +++ b/pkg/cluster/k8sres.go @@ -535,9 +535,11 @@ func (c *Cluster) generateMasterEndpoints(subsets []v1.EndpointSubset) *v1.Endpo func (c *Cluster) generateCloneEnvironment(description *spec.CloneDescription) []v1.EnvVar { result := make([]v1.EnvVar, 0) + if description.ClusterName == "" { return result } + cluster := description.ClusterName result = append(result, v1.EnvVar{Name: "CLONE_SCOPE", Value: cluster}) if description.EndTimestamp == "" { @@ -563,11 +565,11 @@ func (c *Cluster) generateCloneEnvironment(description *spec.CloneDescription) [ }) } else { // cloning with S3, find out the bucket to clone - clone_wal_s3_bucket := c.OpConfig.WALES3Bucket result = append(result, v1.EnvVar{Name: "CLONE_METHOD", Value: "CLONE_WITH_WALE"}) - result = append(result, v1.EnvVar{Name: "CLONE_WAL_S3_BUCKET", Value: clone_wal_s3_bucket}) + result = append(result, v1.EnvVar{Name: "CLONE_WAL_S3_BUCKET", Value: c.OpConfig.WALES3Bucket}) result = append(result, v1.EnvVar{Name: "CLONE_TARGET_TIME", Value: description.EndTimestamp}) } + return result } diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index eaf0a6a8c..8a41fa091 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -32,10 +32,11 @@ type Controller struct { stopCh chan struct{} - clustersMu sync.RWMutex - clusters map[spec.NamespacedName]*cluster.Cluster - clusterLogs map[spec.NamespacedName]ringlog.RingLogger - teamClusters map[string][]spec.NamespacedName + clustersMu sync.RWMutex + clusters map[spec.NamespacedName]*cluster.Cluster + clusterLogs map[spec.NamespacedName]ringlog.RingLogger + clusterHistory map[spec.NamespacedName]ringlog.RingLogger // history of the cluster changes + teamClusters map[string][]spec.NamespacedName postgresqlInformer cache.SharedIndexInformer podInformer cache.SharedIndexInformer @@ -52,14 +53,15 @@ func NewController(controllerConfig *spec.ControllerConfig) *Controller { logger := logrus.New() c := &Controller{ - config: *controllerConfig, - opConfig: &config.Config{}, - logger: logger.WithField("pkg", "controller"), - clusters: make(map[spec.NamespacedName]*cluster.Cluster), - clusterLogs: make(map[spec.NamespacedName]ringlog.RingLogger), - teamClusters: make(map[string][]spec.NamespacedName), - stopCh: make(chan struct{}), - podCh: make(chan spec.PodEvent), + config: *controllerConfig, + opConfig: &config.Config{}, + logger: logger.WithField("pkg", "controller"), + clusters: make(map[spec.NamespacedName]*cluster.Cluster), + clusterLogs: make(map[spec.NamespacedName]ringlog.RingLogger), + clusterHistory: make(map[spec.NamespacedName]ringlog.RingLogger), + teamClusters: make(map[string][]spec.NamespacedName), + stopCh: make(chan struct{}), + podCh: make(chan spec.PodEvent), } logger.Hooks.Add(c) diff --git a/pkg/controller/postgresql.go b/pkg/controller/postgresql.go index 876271d0e..646f10db4 100644 --- a/pkg/controller/postgresql.go +++ b/pkg/controller/postgresql.go @@ -138,12 +138,14 @@ func (c *Controller) addCluster(lg *logrus.Entry, clusterName spec.NamespacedNam c.teamClusters[teamName] = append(c.teamClusters[teamName], clusterName) c.clusters[clusterName] = cl c.clusterLogs[clusterName] = ringlog.New(c.opConfig.RingLogLines) + c.clusterHistory[clusterName] = ringlog.New(c.opConfig.ClusterHistoryEntries) return cl } func (c *Controller) processEvent(event spec.ClusterEvent) { var clusterName spec.NamespacedName + var clHistory ringlog.RingLogger lg := c.logger.WithField("worker", event.WorkerID) @@ -156,6 +158,9 @@ func (c *Controller) processEvent(event spec.ClusterEvent) { c.clustersMu.RLock() cl, clusterFound := c.clusters[clusterName] + if clusterFound { + clHistory = c.clusterHistory[clusterName] + } c.clustersMu.RUnlock() switch event.EventType { @@ -192,6 +197,12 @@ func (c *Controller) processEvent(event spec.ClusterEvent) { } cl.Error = nil lg.Infoln("cluster has been updated") + + clHistory.Insert(&spec.Diff{ + EventTime: event.EventTime, + ProcessTime: time.Now(), + Diff: util.Diff(event.OldSpec, event.NewSpec), + }) case spec.EventDelete: if !clusterFound { lg.Errorf("unknown cluster: %q", clusterName) @@ -211,6 +222,7 @@ func (c *Controller) processEvent(event spec.ClusterEvent) { delete(c.clusters, clusterName) delete(c.clusterLogs, clusterName) + delete(c.clusterHistory, clusterName) for i, val := range c.teamClusters[teamName] { if val == clusterName { copy(c.teamClusters[teamName][i:], c.teamClusters[teamName][i+1:]) @@ -298,6 +310,7 @@ func (c *Controller) queueClusterEvent(old, new *spec.Postgresql, eventType spec workerID := c.clusterWorkerID(clusterName) clusterEvent := spec.ClusterEvent{ + EventTime: time.Now(), EventType: eventType, UID: uid, OldSpec: old, diff --git a/pkg/controller/status.go b/pkg/controller/status.go index d5630d799..b198804f5 100644 --- a/pkg/controller/status.go +++ b/pkg/controller/status.go @@ -166,3 +166,25 @@ func (c *Controller) ListQueue(workerID uint32) (*spec.QueueDump, error) { func (c *Controller) GetWorkersCnt() uint32 { return c.opConfig.Workers } + +// ClusterHistory dumps history of cluster changes +func (c *Controller) ClusterHistory(team, name string) ([]*spec.Diff, error) { + clusterName := spec.NamespacedName{ + Namespace: c.opConfig.Namespace, + Name: team + "-" + name, + } + + c.clustersMu.RLock() + cl, ok := c.clusterHistory[clusterName] + c.clustersMu.RUnlock() + if !ok { + return nil, fmt.Errorf("could not find cluster") + } + + res := make([]*spec.Diff, 0) + for _, e := range cl.Walk() { + res = append(res, e.(*spec.Diff)) + } + + return res, nil +} diff --git a/pkg/controller/util.go b/pkg/controller/util.go index e9d0c5617..1a8d5f87e 100644 --- a/pkg/controller/util.go +++ b/pkg/controller/util.go @@ -48,8 +48,7 @@ func (c *Controller) clusterWorkerID(clusterName spec.NamespacedName) uint32 { func (c *Controller) createTPR() error { tpr := thirdPartyResource(constants.TPRName) - _, err := c.KubeClient.ThirdPartyResources().Create(tpr) - if err != nil { + if _, err := c.KubeClient.ThirdPartyResources().Create(tpr); err != nil { if !k8sutil.ResourceAlreadyExists(err) { return err } diff --git a/pkg/spec/types.go b/pkg/spec/types.go index cbb819953..809717a23 100644 --- a/pkg/spec/types.go +++ b/pkg/spec/types.go @@ -29,6 +29,7 @@ const ( // ClusterEvent carries the payload of the Cluster TPR events. type ClusterEvent struct { + EventTime time.Time UID types.UID EventType EventType OldSpec *Postgresql @@ -100,6 +101,13 @@ type ClusterStatus struct { Error error } +// Diff describes diff +type Diff struct { + EventTime time.Time + ProcessTime time.Time + Diff []string +} + // ControllerStatus describes status of the controller type ControllerStatus struct { LastSyncTime int64 diff --git a/pkg/util/config/config.go b/pkg/util/config/config.go index b370edcf6..54b35b1c0 100644 --- a/pkg/util/config/config.go +++ b/pkg/util/config/config.go @@ -47,23 +47,24 @@ type Config struct { TPR Resources Auth - Namespace string `name:"namespace"` - EtcdHost string `name:"etcd_host" default:"etcd-client.default.svc.cluster.local:2379"` - DockerImage string `name:"docker_image" default:"registry.opensource.zalan.do/acid/spiloprivate-9.6:1.2-p4"` - ServiceAccountName string `name:"service_account_name" default:"operator"` - DbHostedZone string `name:"db_hosted_zone" default:"db.example.com"` - EtcdScope string `name:"etcd_scope" default:"service"` - WALES3Bucket string `name:"wal_s3_bucket"` - KubeIAMRole string `name:"kube_iam_role"` - DebugLogging bool `name:"debug_logging" default:"true"` - EnableDBAccess bool `name:"enable_database_access" default:"true"` - EnableTeamsAPI bool `name:"enable_teams_api" default:"true"` - EnableLoadBalancer bool `name:"enable_load_balancer" default:"true"` - MasterDNSNameFormat stringTemplate `name:"master_dns_name_format" default:"{cluster}.{team}.{hostedzone}"` - ReplicaDNSNameFormat stringTemplate `name:"replica_dns_name_format" default:"{cluster}-repl.{team}.{hostedzone}"` - Workers uint32 `name:"workers" default:"4"` - APIPort int `name:"api_port" default:"8080"` - RingLogLines int `name:"ring_log_lines" default:"100"` + Namespace string `name:"namespace"` + EtcdHost string `name:"etcd_host" default:"etcd-client.default.svc.cluster.local:2379"` + DockerImage string `name:"docker_image" default:"registry.opensource.zalan.do/acid/spiloprivate-9.6:1.2-p4"` + ServiceAccountName string `name:"service_account_name" default:"operator"` + DbHostedZone string `name:"db_hosted_zone" default:"db.example.com"` + EtcdScope string `name:"etcd_scope" default:"service"` + WALES3Bucket string `name:"wal_s3_bucket"` + KubeIAMRole string `name:"kube_iam_role"` + DebugLogging bool `name:"debug_logging" default:"true"` + EnableDBAccess bool `name:"enable_database_access" default:"true"` + EnableTeamsAPI bool `name:"enable_teams_api" default:"true"` + EnableLoadBalancer bool `name:"enable_load_balancer" default:"true"` + MasterDNSNameFormat stringTemplate `name:"master_dns_name_format" default:"{cluster}.{team}.{hostedzone}"` + ReplicaDNSNameFormat stringTemplate `name:"replica_dns_name_format" default:"{cluster}-repl.{team}.{hostedzone}"` + Workers uint32 `name:"workers" default:"4"` + APIPort int `name:"api_port" default:"8080"` + RingLogLines int `name:"ring_log_lines" default:"100"` + ClusterHistoryEntries int `name:"cluster_history_entries" default:"1000"` PodTerminateGracePeriod time.Duration `name:"pod_terminate_grace_period" default:"5m"` } diff --git a/pkg/util/ringlog/ringlog.go b/pkg/util/ringlog/ringlog.go index 1c26ce3ba..14cc19714 100644 --- a/pkg/util/ringlog/ringlog.go +++ b/pkg/util/ringlog/ringlog.go @@ -28,7 +28,7 @@ func New(size int) *RingLog { return &r } -// Insert inserts new LogEntry into the ring logger +// Insert inserts new entry into the ring logger func (r *RingLog) Insert(obj interface{}) { r.Lock() defer r.Unlock() @@ -39,7 +39,7 @@ func (r *RingLog) Insert(obj interface{}) { } } -// Walk dumps all the LogEntries from the Ring logger +// Walk dumps all the entries from the Ring logger func (r *RingLog) Walk() []interface{} { res := make([]interface{}, 0) diff --git a/pkg/util/util.go b/pkg/util/util.go index bee6607a8..518603b57 100644 --- a/pkg/util/util.go +++ b/pkg/util/util.go @@ -52,10 +52,14 @@ func PGUserPassword(user spec.PgUser) string { return md5prefix + hex.EncodeToString(s[:]) } +// Diff returns diffs between 2 objects +func Diff(a, b interface{}) []string { + return pretty.Diff(a, b) +} + // PrettyDiff shows the diff between 2 objects in an easy to understand format. It is mainly used for debugging output. -func PrettyDiff(a, b interface{}) (result string) { - diff := pretty.Diff(a, b) - return strings.Join(diff, "\n") +func PrettyDiff(a, b interface{}) string { + return strings.Join(Diff(a, b), "\n") } // SubstractStringSlices finds elements in a that are not in b and return them as a result slice. @@ -73,6 +77,7 @@ OUTER: return result, len(result) == 0 } +// FindNamedStringSubmatch returns a map of strings holding the text of the matches of the r regular expression func FindNamedStringSubmatch(r *regexp.Regexp, s string) map[string]string { matches := r.FindStringSubmatch(s) grNames := r.SubexpNames()