From 83c8d6c419c6da6eb88e76637bbe9e5d6aeba8b3 Mon Sep 17 00:00:00 2001 From: Murat Kabilov Date: Wed, 11 Oct 2017 12:26:09 +0200 Subject: [PATCH] Extend diagnostic api with worker status info --- pkg/apiserver/apiserver.go | 25 +++++++++++++++++++++++++ pkg/cluster/cluster.go | 28 +++++++++++++++++++++++++--- pkg/cluster/exec.go | 3 +++ pkg/cluster/pg.go | 15 ++++++++++----- pkg/cluster/pod.go | 3 +++ pkg/cluster/resources.go | 11 +++++++++++ pkg/cluster/sync.go | 2 ++ pkg/cluster/util.go | 1 + pkg/cluster/volumes.go | 2 ++ pkg/controller/controller.go | 36 +++++++++++++++++++----------------- pkg/controller/postgresql.go | 15 +++++++++++---- pkg/controller/status.go | 20 ++++++++++++++++++++ pkg/controller/util.go | 4 ++-- pkg/spec/types.go | 21 +++++++++++++++++---- pkg/util/k8sutil/k8sutil.go | 1 + 15 files changed, 152 insertions(+), 35 deletions(-) diff --git a/pkg/apiserver/apiserver.go b/pkg/apiserver/apiserver.go index 4ab5b571f..59ccca5c8 100644 --- a/pkg/apiserver/apiserver.go +++ b/pkg/apiserver/apiserver.go @@ -36,6 +36,7 @@ type controllerInformer interface { WorkerLogs(workerID uint32) ([]*spec.LogEntry, error) ListQueue(workerID uint32) (*spec.QueueDump, error) GetWorkersCnt() uint32 + WorkerStatus(workerID uint32) (*spec.WorkerStatus, error) } // Server describes HTTP API server @@ -52,7 +53,9 @@ var ( teamURL = regexp.MustCompile(`^/clusters/(?P[a-zA-Z][a-zA-Z0-9]*)/?$`) workerLogsURL = regexp.MustCompile(`^/workers/(?P\d+)/logs/?$`) workerEventsQueueURL = regexp.MustCompile(`^/workers/(?P\d+)/queue/?$`) + workerStatusURL = regexp.MustCompile(`^/workers/(?P\d+)/status/?$`) workerAllQueue = regexp.MustCompile(`^/workers/all/queue/?$`) + workerAllStatus = regexp.MustCompile(`^/workers/all/status/?$`) clustersURL = "/clusters/" ) @@ -198,6 +201,13 @@ func (s *Server) workers(w http.ResponseWriter, req *http.Request) { workerID, _ := strconv.Atoi(matches["id"]) resp, err = s.controller.ListQueue(uint32(workerID)) + } else if matches := util.FindNamedStringSubmatch(workerStatusURL, req.URL.Path); matches != nil { + workerID, _ := strconv.Atoi(matches["id"]) + + resp, err = s.controller.WorkerStatus(uint32(workerID)) + } else if workerAllStatus.MatchString(req.URL.Path) { + s.allWorkers(w, req) + return } else { s.respond(nil, fmt.Errorf("page not found"), w) return @@ -221,3 +231,18 @@ func (s *Server) allQueues(w http.ResponseWriter, r *http.Request) { s.respond(resp, nil, w) } + +func (s *Server) allWorkers(w http.ResponseWriter, r *http.Request) { + workersCnt := s.controller.GetWorkersCnt() + resp := make(map[uint32]*spec.WorkerStatus, workersCnt) + for i := uint32(0); i < workersCnt; i++ { + status, err := s.controller.WorkerStatus(i) + if err != nil { + continue + } + + resp[i] = status + } + + s.respond(resp, nil, w) +} diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index c548351b1..b1175fe96 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -9,6 +9,7 @@ import ( "reflect" "regexp" "sync" + "time" "github.com/Sirupsen/logrus" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -71,6 +72,8 @@ type Cluster struct { teamsAPIClient *teams.API KubeClient k8sutil.KubernetesClient //TODO: move clients to the better place? + currentProcess spec.Process + processMu sync.RWMutex } type compareStatefulsetResult struct { @@ -122,6 +125,15 @@ func (c *Cluster) teamName() string { return c.Spec.TeamID } +func (c *Cluster) setProcessName(procName string, args ...interface{}) { + c.processMu.Lock() + defer c.processMu.Unlock() + c.currentProcess = spec.Process{ + Name: fmt.Sprintf(procName, args...), + StartTime: time.Now(), + } +} + func (c *Cluster) setStatus(status spec.PostgresStatus) { c.Status = status b, err := json.Marshal(status) @@ -149,6 +161,7 @@ func (c *Cluster) setStatus(status spec.PostgresStatus) { // initUsers populates c.systemUsers and c.pgUsers maps. func (c *Cluster) initUsers() error { + c.setProcessName("initializing users") c.initSystemUsers() if err := c.initInfrastructureRoles(); err != nil { @@ -188,7 +201,7 @@ func (c *Cluster) Create() error { c.setStatus(spec.ClusterStatusCreating) - //TODO: service will create endpoint implicitly + //service will create endpoint implicitly ep, err = c.createEndpoint() if err != nil { return fmt.Errorf("could not create endpoint: %v", err) @@ -231,11 +244,11 @@ func (c *Cluster) Create() error { c.logger.Infof("pods are ready") if !(c.masterLess || c.databaseAccessDisabled()) { - if err := c.createRoles(); err != nil { + if err = c.createRoles(); err != nil { return fmt.Errorf("could not create users: %v", err) } - if err := c.createDatabases(); err != nil { + if err = c.createDatabases(); err != nil { return fmt.Errorf("could not create databases: %v", err) } @@ -655,6 +668,14 @@ func (c *Cluster) initInfrastructureRoles() error { return nil } +// GetCurrentProcess provides name of the last process of the cluster +func (c *Cluster) GetCurrentProcess() spec.Process { + c.processMu.RLock() + defer c.processMu.RUnlock() + + return c.currentProcess +} + // GetStatus provides status of the cluster func (c *Cluster) GetStatus() *spec.ClusterStatus { return &spec.ClusterStatus{ @@ -667,6 +688,7 @@ func (c *Cluster) GetStatus() *spec.ClusterStatus { ReplicaService: c.GetServiceReplica(), Endpoint: c.GetEndpoint(), StatefulSet: c.GetStatefulSet(), + CurrentProcess: c.GetCurrentProcess(), Error: c.Error, } diff --git a/pkg/cluster/exec.go b/pkg/cluster/exec.go index 469a26a07..beb551222 100644 --- a/pkg/cluster/exec.go +++ b/pkg/cluster/exec.go @@ -2,6 +2,7 @@ package cluster import ( "bytes" + "strings" "fmt" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -15,6 +16,8 @@ import ( //ExecCommand executes arbitrary command inside the pod func (c *Cluster) ExecCommand(podName *spec.NamespacedName, command ...string) (string, error) { + c.setProcessName("executing command %q", strings.Join(command, " ")) + var ( execOut bytes.Buffer execErr bytes.Buffer diff --git a/pkg/cluster/pg.go b/pkg/cluster/pg.go index 7f421573f..59eac0ef7 100644 --- a/pkg/cluster/pg.go +++ b/pkg/cluster/pg.go @@ -46,6 +46,7 @@ func (c *Cluster) databaseAccessDisabled() bool { } func (c *Cluster) initDbConn() (err error) { + c.setProcessName("initializing db connection") if c.pgDb == nil { conn, err := sql.Open("postgres", c.pgConnectionString()) if err != nil { @@ -67,6 +68,7 @@ func (c *Cluster) initDbConn() (err error) { } func (c *Cluster) closeDbConn() (err error) { + c.setProcessName("closing db connection") if c.pgDb != nil { c.logger.Debug("closing database connection") if err = c.pgDb.Close(); err != nil { @@ -81,6 +83,7 @@ func (c *Cluster) closeDbConn() (err error) { } func (c *Cluster) readPgUsersFromDatabase(userNames []string) (users spec.PgUserMap, err error) { + c.setProcessName("reading users from the db") var rows *sql.Rows users = make(spec.PgUserMap) if rows, err = c.pgDb.Query(getUserSQL, pq.Array(userNames)); err != nil { @@ -116,13 +119,13 @@ func (c *Cluster) getDatabases() (map[string]string, error) { rows *sql.Rows err error ) - dbs := make(map[string]string, 0) + dbs := make(map[string]string) - if err := c.initDbConn(); err != nil { + if err = c.initDbConn(); err != nil { return nil, fmt.Errorf("could not init db connection") } defer func() { - if err := c.closeDbConn(); err != nil { + if err = c.closeDbConn(); err != nil { c.logger.Errorf("could not close db connection: %v", err) } }() @@ -151,6 +154,8 @@ func (c *Cluster) getDatabases() (map[string]string, error) { } func (c *Cluster) createDatabases() error { + c.setProcessName("creating databases") + newDbs := c.Spec.Databases curDbs, err := c.getDatabases() if err != nil { @@ -164,11 +169,11 @@ func (c *Cluster) createDatabases() error { return nil } - if err := c.initDbConn(); err != nil { + if err = c.initDbConn(); err != nil { return fmt.Errorf("could not init database connection") } defer func() { - if err := c.closeDbConn(); err != nil { + if err = c.closeDbConn(); err != nil { c.logger.Errorf("could not close database connection: %v", err) } }() diff --git a/pkg/cluster/pod.go b/pkg/cluster/pod.go index 10c19fb28..0ad9fdb22 100644 --- a/pkg/cluster/pod.go +++ b/pkg/cluster/pod.go @@ -52,6 +52,7 @@ func (c *Cluster) deletePods() error { } func (c *Cluster) deletePod(podName spec.NamespacedName) error { + c.setProcessName("deleting %q pod", podName) ch := c.registerPodSubscriber(podName) defer c.unregisterPodSubscriber(podName) @@ -93,6 +94,7 @@ func (c *Cluster) registerPodSubscriber(podName spec.NamespacedName) chan spec.P func (c *Cluster) recreatePod(pod v1.Pod) error { podName := util.NameFromMeta(pod.ObjectMeta) + c.setProcessName("recreating %q pod", podName) ch := c.registerPodSubscriber(podName) defer c.unregisterPodSubscriber(podName) @@ -113,6 +115,7 @@ func (c *Cluster) recreatePod(pod v1.Pod) error { } func (c *Cluster) recreatePods() error { + c.setProcessName("recreating pods") ls := c.labelsSet() namespace := c.Namespace diff --git a/pkg/cluster/resources.go b/pkg/cluster/resources.go index 94c23656a..80a5d02b4 100644 --- a/pkg/cluster/resources.go +++ b/pkg/cluster/resources.go @@ -101,6 +101,7 @@ func (c *Cluster) listResources() error { } func (c *Cluster) createStatefulSet() (*v1beta1.StatefulSet, error) { + c.setProcessName("creating statefulset") if c.Statefulset != nil { return nil, fmt.Errorf("statefulset already exists in the cluster") } @@ -119,6 +120,7 @@ func (c *Cluster) createStatefulSet() (*v1beta1.StatefulSet, error) { } func (c *Cluster) updateStatefulSet(newStatefulSet *v1beta1.StatefulSet) error { + c.setProcessName("updating statefulset") if c.Statefulset == nil { return fmt.Errorf("there is no statefulset in the cluster") } @@ -145,6 +147,7 @@ func (c *Cluster) updateStatefulSet(newStatefulSet *v1beta1.StatefulSet) error { // replaceStatefulSet deletes an old StatefulSet and creates the new using spec in the PostgreSQL CRD. func (c *Cluster) replaceStatefulSet(newStatefulSet *v1beta1.StatefulSet) error { + c.setProcessName("replacing statefulset") if c.Statefulset == nil { return fmt.Errorf("there is no statefulset in the cluster") } @@ -191,6 +194,7 @@ func (c *Cluster) replaceStatefulSet(newStatefulSet *v1beta1.StatefulSet) error } func (c *Cluster) deleteStatefulSet() error { + c.setProcessName("deleting statefulset") c.logger.Debugln("deleting statefulset") if c.Statefulset == nil { return fmt.Errorf("there is no statefulset in the cluster") @@ -215,6 +219,8 @@ func (c *Cluster) deleteStatefulSet() error { } func (c *Cluster) createService(role PostgresRole) (*v1.Service, error) { + c.setProcessName("creating %v service", role) + if c.Services[role] != nil { return nil, fmt.Errorf("service already exists in the cluster") } @@ -230,6 +236,7 @@ func (c *Cluster) createService(role PostgresRole) (*v1.Service, error) { } func (c *Cluster) updateService(role PostgresRole, newService *v1.Service) error { + c.setProcessName("updating %v service", role) if c.Services[role] == nil { return fmt.Errorf("there is no service in the cluster") } @@ -320,6 +327,7 @@ func (c *Cluster) deleteService(role PostgresRole) error { } func (c *Cluster) createEndpoint() (*v1.Endpoints, error) { + c.setProcessName("creating endpoint") if c.Endpoint != nil { return nil, fmt.Errorf("endpoint already exists in the cluster") } @@ -335,6 +343,7 @@ func (c *Cluster) createEndpoint() (*v1.Endpoints, error) { } func (c *Cluster) deleteEndpoint() error { + c.setProcessName("deleting endpoint") c.logger.Debugln("deleting endpoint") if c.Endpoint == nil { return fmt.Errorf("there is no endpoint in the cluster") @@ -350,6 +359,7 @@ func (c *Cluster) deleteEndpoint() error { } func (c *Cluster) applySecrets() error { + c.setProcessName("applying secrets") secrets := c.generateUserSecrets() for secretUsername, secretSpec := range secrets { @@ -388,6 +398,7 @@ func (c *Cluster) applySecrets() error { } func (c *Cluster) deleteSecret(secret *v1.Secret) error { + c.setProcessName("deleting secret %q", util.NameFromMeta(secret.ObjectMeta)) c.logger.Debugf("deleting secret %q", util.NameFromMeta(secret.ObjectMeta)) err := c.KubeClient.Secrets(secret.Namespace).Delete(secret.Name, c.deleteOptions) if err != nil { diff --git a/pkg/cluster/sync.go b/pkg/cluster/sync.go index ecccf7dde..5aa42399b 100644 --- a/pkg/cluster/sync.go +++ b/pkg/cluster/sync.go @@ -187,6 +187,8 @@ func (c *Cluster) syncStatefulSet() error { } func (c *Cluster) syncRoles(readFromDatabase bool) error { + c.setProcessName("syncing roles") + var ( err error dbUsers spec.PgUserMap diff --git a/pkg/cluster/util.go b/pkg/cluster/util.go index 36c8eb607..dde77dc0b 100644 --- a/pkg/cluster/util.go +++ b/pkg/cluster/util.go @@ -284,6 +284,7 @@ func (c *Cluster) waitPodLabelsReady() error { } func (c *Cluster) waitStatefulsetPodsReady() error { + c.setProcessName("waiting for the pods of the statefulset") // TODO: wait for the first Pod only if err := c.waitStatefulsetReady(); err != nil { return fmt.Errorf("statuful set error: %v", err) diff --git a/pkg/cluster/volumes.go b/pkg/cluster/volumes.go index b73dee2da..1037fb3f7 100644 --- a/pkg/cluster/volumes.go +++ b/pkg/cluster/volumes.go @@ -82,6 +82,8 @@ func (c *Cluster) listPersistentVolumes() ([]*v1.PersistentVolume, error) { // resizeVolumes resize persistent volumes compatible with the given resizer interface func (c *Cluster) resizeVolumes(newVolume spec.Volume, resizers []volumes.VolumeResizer) error { + c.setProcessName("resizing volumes") + totalCompatible := 0 newQuantity, err := resource.ParseQuantity(newVolume.Size) if err != nil { diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 7f5315e27..ad2077188 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -30,13 +30,14 @@ type Controller struct { stopCh chan struct{} - curWorkerID uint32 //initialized with 0 - clusterWorkers map[spec.NamespacedName]uint32 - clustersMu sync.RWMutex - clusters map[spec.NamespacedName]*cluster.Cluster - clusterLogs map[spec.NamespacedName]ringlog.RingLogger - clusterHistory map[spec.NamespacedName]ringlog.RingLogger // history of the cluster changes - teamClusters map[string][]spec.NamespacedName + curWorkerID uint32 //initialized with 0 + curWorkerCluster sync.Map + clusterWorkers map[spec.NamespacedName]uint32 + clustersMu sync.RWMutex + clusters map[spec.NamespacedName]*cluster.Cluster + clusterLogs map[spec.NamespacedName]ringlog.RingLogger + clusterHistory map[spec.NamespacedName]ringlog.RingLogger // history of the cluster changes + teamClusters map[string][]spec.NamespacedName postgresqlInformer cache.SharedIndexInformer podInformer cache.SharedIndexInformer @@ -53,16 +54,17 @@ func NewController(controllerConfig *spec.ControllerConfig) *Controller { logger := logrus.New() c := &Controller{ - config: *controllerConfig, - opConfig: &config.Config{}, - logger: logger.WithField("pkg", "controller"), - clusterWorkers: make(map[spec.NamespacedName]uint32), - clusters: make(map[spec.NamespacedName]*cluster.Cluster), - clusterLogs: make(map[spec.NamespacedName]ringlog.RingLogger), - clusterHistory: make(map[spec.NamespacedName]ringlog.RingLogger), - teamClusters: make(map[string][]spec.NamespacedName), - stopCh: make(chan struct{}), - podCh: make(chan spec.PodEvent), + config: *controllerConfig, + opConfig: &config.Config{}, + logger: logger.WithField("pkg", "controller"), + curWorkerCluster: sync.Map{}, + clusterWorkers: make(map[spec.NamespacedName]uint32), + clusters: make(map[spec.NamespacedName]*cluster.Cluster), + clusterLogs: make(map[spec.NamespacedName]ringlog.RingLogger), + clusterHistory: make(map[spec.NamespacedName]ringlog.RingLogger), + teamClusters: make(map[string][]spec.NamespacedName), + stopCh: make(chan struct{}), + podCh: make(chan spec.PodEvent), } logger.Hooks.Add(c) diff --git a/pkg/controller/postgresql.go b/pkg/controller/postgresql.go index 3404f82a6..fcce69b18 100644 --- a/pkg/controller/postgresql.go +++ b/pkg/controller/postgresql.go @@ -86,16 +86,16 @@ func (c *Controller) clusterListFunc(options metav1.ListOptions) (runtime.Object return &list, err } -type tprDecoder struct { +type crdDecoder struct { dec *json.Decoder close func() error } -func (d *tprDecoder) Close() { +func (d *crdDecoder) Close() { d.close() } -func (d *tprDecoder) Decode() (action watch.EventType, object runtime.Object, err error) { +func (d *crdDecoder) Decode() (action watch.EventType, object runtime.Object, err error) { var e struct { Type watch.EventType Object spec.Postgresql @@ -121,7 +121,7 @@ func (c *Controller) clusterWatchFunc(options metav1.ListOptions) (watch.Interfa return nil, err } - return watch.NewStreamWatcher(&tprDecoder{ + return watch.NewStreamWatcher(&crdDecoder{ dec: json.NewDecoder(r), close: r.Close, }), nil @@ -163,6 +163,8 @@ func (c *Controller) processEvent(event spec.ClusterEvent) { } c.clustersMu.RUnlock() + defer c.curWorkerCluster.Store(event.WorkerID, nil) + switch event.EventType { case spec.EventAdd: if clusterFound { @@ -174,6 +176,8 @@ func (c *Controller) processEvent(event spec.ClusterEvent) { cl = c.addCluster(lg, clusterName, event.NewSpec) + c.curWorkerCluster.Store(event.WorkerID, cl) + if err := cl.Create(); err != nil { cl.Error = fmt.Errorf("could not create cluster: %v", err) lg.Error(cl.Error) @@ -189,6 +193,7 @@ func (c *Controller) processEvent(event spec.ClusterEvent) { lg.Warnln("cluster does not exist") return } + c.curWorkerCluster.Store(event.WorkerID, cl) if err := cl.Update(event.NewSpec); err != nil { cl.Error = fmt.Errorf("could not update cluster: %v", err) lg.Error(cl.Error) @@ -212,6 +217,7 @@ func (c *Controller) processEvent(event spec.ClusterEvent) { teamName := strings.ToLower(cl.Spec.TeamID) + c.curWorkerCluster.Store(event.WorkerID, cl) if err := cl.Delete(); err != nil { lg.Errorf("could not delete cluster: %v", err) } @@ -242,6 +248,7 @@ func (c *Controller) processEvent(event spec.ClusterEvent) { cl = c.addCluster(lg, clusterName, event.NewSpec) } + c.curWorkerCluster.Store(event.WorkerID, cl) if err := cl.Sync(); err != nil { cl.Error = fmt.Errorf("could not sync cluster: %v", err) lg.Error(cl.Error) diff --git a/pkg/controller/status.go b/pkg/controller/status.go index b198804f5..af670e1a1 100644 --- a/pkg/controller/status.go +++ b/pkg/controller/status.go @@ -6,7 +6,9 @@ import ( "github.com/Sirupsen/logrus" + "github.com/zalando-incubator/postgres-operator/pkg/cluster" "github.com/zalando-incubator/postgres-operator/pkg/spec" + "github.com/zalando-incubator/postgres-operator/pkg/util" "github.com/zalando-incubator/postgres-operator/pkg/util/config" ) @@ -167,6 +169,24 @@ func (c *Controller) GetWorkersCnt() uint32 { return c.opConfig.Workers } +//WorkerStatus provides status of the worker +func (c *Controller) WorkerStatus(workerID uint32) (*spec.WorkerStatus, error) { + obj, ok := c.curWorkerCluster.Load(workerID) + if !ok || obj == nil { + return nil, fmt.Errorf("worker has no status") + } + + cl, ok := obj.(*cluster.Cluster) + if !ok { + return nil, fmt.Errorf("could not cast to Cluster struct") + } + + return &spec.WorkerStatus{ + CurrentCluster: util.NameFromMeta(cl.ObjectMeta), + CurrentProcess: cl.GetCurrentProcess(), + }, nil +} + // ClusterHistory dumps history of cluster changes func (c *Controller) ClusterHistory(team, name string) ([]*spec.Diff, error) { clusterName := spec.NamespacedName{ diff --git a/pkg/controller/util.go b/pkg/controller/util.go index 5f91a8995..74f4855c2 100644 --- a/pkg/controller/util.go +++ b/pkg/controller/util.go @@ -29,9 +29,9 @@ func (c *Controller) makeClusterConfig() cluster.Config { } func (c *Controller) clusterWorkerID(clusterName spec.NamespacedName) uint32 { - workerId, ok := c.clusterWorkers[clusterName] + workerID, ok := c.clusterWorkers[clusterName] if ok { - return workerId + return workerID } c.clusterWorkers[clusterName] = c.curWorkerID diff --git a/pkg/spec/types.go b/pkg/spec/types.go index 809717a23..2147f1824 100644 --- a/pkg/spec/types.go +++ b/pkg/spec/types.go @@ -86,6 +86,12 @@ type LogEntry struct { Message string } +// Process describes process of the cluster +type Process struct { + Name string + StartTime time.Time +} + // ClusterStatus describes status of the cluster type ClusterStatus struct { Team string @@ -95,10 +101,17 @@ type ClusterStatus struct { Endpoint *v1.Endpoints StatefulSet *v1beta1.StatefulSet - Worker uint32 - Status PostgresStatus - Spec PostgresSpec - Error error + CurrentProcess Process + Worker uint32 + Status PostgresStatus + Spec PostgresSpec + Error error +} + +// WorkerStatus describes status of the worker +type WorkerStatus struct { + CurrentCluster NamespacedName + CurrentProcess Process } // Diff describes diff diff --git a/pkg/util/k8sutil/k8sutil.go b/pkg/util/k8sutil/k8sutil.go index f36649450..47aa291cf 100644 --- a/pkg/util/k8sutil/k8sutil.go +++ b/pkg/util/k8sutil/k8sutil.go @@ -53,6 +53,7 @@ func ResourceNotFound(err error) bool { return apierrors.IsNotFound(err) } +// NewFromConfig create Kubernets Interface using REST config func NewFromConfig(cfg *rest.Config) (KubernetesClient, error) { kubeClient := KubernetesClient{}