Extend diagnostic api with worker status info

This commit is contained in:
Murat Kabilov 2017-10-11 12:26:09 +02:00 committed by GitHub
parent 4bc2284b57
commit 83c8d6c419
15 changed files with 152 additions and 35 deletions

View File

@ -36,6 +36,7 @@ type controllerInformer interface {
WorkerLogs(workerID uint32) ([]*spec.LogEntry, error)
ListQueue(workerID uint32) (*spec.QueueDump, error)
GetWorkersCnt() uint32
WorkerStatus(workerID uint32) (*spec.WorkerStatus, error)
}
// Server describes HTTP API server
@ -52,7 +53,9 @@ var (
teamURL = regexp.MustCompile(`^/clusters/(?P<team>[a-zA-Z][a-zA-Z0-9]*)/?$`)
workerLogsURL = regexp.MustCompile(`^/workers/(?P<id>\d+)/logs/?$`)
workerEventsQueueURL = regexp.MustCompile(`^/workers/(?P<id>\d+)/queue/?$`)
workerStatusURL = regexp.MustCompile(`^/workers/(?P<id>\d+)/status/?$`)
workerAllQueue = regexp.MustCompile(`^/workers/all/queue/?$`)
workerAllStatus = regexp.MustCompile(`^/workers/all/status/?$`)
clustersURL = "/clusters/"
)
@ -198,6 +201,13 @@ func (s *Server) workers(w http.ResponseWriter, req *http.Request) {
workerID, _ := strconv.Atoi(matches["id"])
resp, err = s.controller.ListQueue(uint32(workerID))
} else if matches := util.FindNamedStringSubmatch(workerStatusURL, req.URL.Path); matches != nil {
workerID, _ := strconv.Atoi(matches["id"])
resp, err = s.controller.WorkerStatus(uint32(workerID))
} else if workerAllStatus.MatchString(req.URL.Path) {
s.allWorkers(w, req)
return
} else {
s.respond(nil, fmt.Errorf("page not found"), w)
return
@ -221,3 +231,18 @@ func (s *Server) allQueues(w http.ResponseWriter, r *http.Request) {
s.respond(resp, nil, w)
}
func (s *Server) allWorkers(w http.ResponseWriter, r *http.Request) {
workersCnt := s.controller.GetWorkersCnt()
resp := make(map[uint32]*spec.WorkerStatus, workersCnt)
for i := uint32(0); i < workersCnt; i++ {
status, err := s.controller.WorkerStatus(i)
if err != nil {
continue
}
resp[i] = status
}
s.respond(resp, nil, w)
}

View File

@ -9,6 +9,7 @@ import (
"reflect"
"regexp"
"sync"
"time"
"github.com/Sirupsen/logrus"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -71,6 +72,8 @@ type Cluster struct {
teamsAPIClient *teams.API
KubeClient k8sutil.KubernetesClient //TODO: move clients to the better place?
currentProcess spec.Process
processMu sync.RWMutex
}
type compareStatefulsetResult struct {
@ -122,6 +125,15 @@ func (c *Cluster) teamName() string {
return c.Spec.TeamID
}
func (c *Cluster) setProcessName(procName string, args ...interface{}) {
c.processMu.Lock()
defer c.processMu.Unlock()
c.currentProcess = spec.Process{
Name: fmt.Sprintf(procName, args...),
StartTime: time.Now(),
}
}
func (c *Cluster) setStatus(status spec.PostgresStatus) {
c.Status = status
b, err := json.Marshal(status)
@ -149,6 +161,7 @@ func (c *Cluster) setStatus(status spec.PostgresStatus) {
// initUsers populates c.systemUsers and c.pgUsers maps.
func (c *Cluster) initUsers() error {
c.setProcessName("initializing users")
c.initSystemUsers()
if err := c.initInfrastructureRoles(); err != nil {
@ -188,7 +201,7 @@ func (c *Cluster) Create() error {
c.setStatus(spec.ClusterStatusCreating)
//TODO: service will create endpoint implicitly
//service will create endpoint implicitly
ep, err = c.createEndpoint()
if err != nil {
return fmt.Errorf("could not create endpoint: %v", err)
@ -231,11 +244,11 @@ func (c *Cluster) Create() error {
c.logger.Infof("pods are ready")
if !(c.masterLess || c.databaseAccessDisabled()) {
if err := c.createRoles(); err != nil {
if err = c.createRoles(); err != nil {
return fmt.Errorf("could not create users: %v", err)
}
if err := c.createDatabases(); err != nil {
if err = c.createDatabases(); err != nil {
return fmt.Errorf("could not create databases: %v", err)
}
@ -655,6 +668,14 @@ func (c *Cluster) initInfrastructureRoles() error {
return nil
}
// GetCurrentProcess provides name of the last process of the cluster
func (c *Cluster) GetCurrentProcess() spec.Process {
c.processMu.RLock()
defer c.processMu.RUnlock()
return c.currentProcess
}
// GetStatus provides status of the cluster
func (c *Cluster) GetStatus() *spec.ClusterStatus {
return &spec.ClusterStatus{
@ -667,6 +688,7 @@ func (c *Cluster) GetStatus() *spec.ClusterStatus {
ReplicaService: c.GetServiceReplica(),
Endpoint: c.GetEndpoint(),
StatefulSet: c.GetStatefulSet(),
CurrentProcess: c.GetCurrentProcess(),
Error: c.Error,
}

View File

@ -2,6 +2,7 @@ package cluster
import (
"bytes"
"strings"
"fmt"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -15,6 +16,8 @@ import (
//ExecCommand executes arbitrary command inside the pod
func (c *Cluster) ExecCommand(podName *spec.NamespacedName, command ...string) (string, error) {
c.setProcessName("executing command %q", strings.Join(command, " "))
var (
execOut bytes.Buffer
execErr bytes.Buffer

View File

@ -46,6 +46,7 @@ func (c *Cluster) databaseAccessDisabled() bool {
}
func (c *Cluster) initDbConn() (err error) {
c.setProcessName("initializing db connection")
if c.pgDb == nil {
conn, err := sql.Open("postgres", c.pgConnectionString())
if err != nil {
@ -67,6 +68,7 @@ func (c *Cluster) initDbConn() (err error) {
}
func (c *Cluster) closeDbConn() (err error) {
c.setProcessName("closing db connection")
if c.pgDb != nil {
c.logger.Debug("closing database connection")
if err = c.pgDb.Close(); err != nil {
@ -81,6 +83,7 @@ func (c *Cluster) closeDbConn() (err error) {
}
func (c *Cluster) readPgUsersFromDatabase(userNames []string) (users spec.PgUserMap, err error) {
c.setProcessName("reading users from the db")
var rows *sql.Rows
users = make(spec.PgUserMap)
if rows, err = c.pgDb.Query(getUserSQL, pq.Array(userNames)); err != nil {
@ -116,13 +119,13 @@ func (c *Cluster) getDatabases() (map[string]string, error) {
rows *sql.Rows
err error
)
dbs := make(map[string]string, 0)
dbs := make(map[string]string)
if err := c.initDbConn(); err != nil {
if err = c.initDbConn(); err != nil {
return nil, fmt.Errorf("could not init db connection")
}
defer func() {
if err := c.closeDbConn(); err != nil {
if err = c.closeDbConn(); err != nil {
c.logger.Errorf("could not close db connection: %v", err)
}
}()
@ -151,6 +154,8 @@ func (c *Cluster) getDatabases() (map[string]string, error) {
}
func (c *Cluster) createDatabases() error {
c.setProcessName("creating databases")
newDbs := c.Spec.Databases
curDbs, err := c.getDatabases()
if err != nil {
@ -164,11 +169,11 @@ func (c *Cluster) createDatabases() error {
return nil
}
if err := c.initDbConn(); err != nil {
if err = c.initDbConn(); err != nil {
return fmt.Errorf("could not init database connection")
}
defer func() {
if err := c.closeDbConn(); err != nil {
if err = c.closeDbConn(); err != nil {
c.logger.Errorf("could not close database connection: %v", err)
}
}()

View File

@ -52,6 +52,7 @@ func (c *Cluster) deletePods() error {
}
func (c *Cluster) deletePod(podName spec.NamespacedName) error {
c.setProcessName("deleting %q pod", podName)
ch := c.registerPodSubscriber(podName)
defer c.unregisterPodSubscriber(podName)
@ -93,6 +94,7 @@ func (c *Cluster) registerPodSubscriber(podName spec.NamespacedName) chan spec.P
func (c *Cluster) recreatePod(pod v1.Pod) error {
podName := util.NameFromMeta(pod.ObjectMeta)
c.setProcessName("recreating %q pod", podName)
ch := c.registerPodSubscriber(podName)
defer c.unregisterPodSubscriber(podName)
@ -113,6 +115,7 @@ func (c *Cluster) recreatePod(pod v1.Pod) error {
}
func (c *Cluster) recreatePods() error {
c.setProcessName("recreating pods")
ls := c.labelsSet()
namespace := c.Namespace

View File

@ -101,6 +101,7 @@ func (c *Cluster) listResources() error {
}
func (c *Cluster) createStatefulSet() (*v1beta1.StatefulSet, error) {
c.setProcessName("creating statefulset")
if c.Statefulset != nil {
return nil, fmt.Errorf("statefulset already exists in the cluster")
}
@ -119,6 +120,7 @@ func (c *Cluster) createStatefulSet() (*v1beta1.StatefulSet, error) {
}
func (c *Cluster) updateStatefulSet(newStatefulSet *v1beta1.StatefulSet) error {
c.setProcessName("updating statefulset")
if c.Statefulset == nil {
return fmt.Errorf("there is no statefulset in the cluster")
}
@ -145,6 +147,7 @@ func (c *Cluster) updateStatefulSet(newStatefulSet *v1beta1.StatefulSet) error {
// replaceStatefulSet deletes an old StatefulSet and creates the new using spec in the PostgreSQL CRD.
func (c *Cluster) replaceStatefulSet(newStatefulSet *v1beta1.StatefulSet) error {
c.setProcessName("replacing statefulset")
if c.Statefulset == nil {
return fmt.Errorf("there is no statefulset in the cluster")
}
@ -191,6 +194,7 @@ func (c *Cluster) replaceStatefulSet(newStatefulSet *v1beta1.StatefulSet) error
}
func (c *Cluster) deleteStatefulSet() error {
c.setProcessName("deleting statefulset")
c.logger.Debugln("deleting statefulset")
if c.Statefulset == nil {
return fmt.Errorf("there is no statefulset in the cluster")
@ -215,6 +219,8 @@ func (c *Cluster) deleteStatefulSet() error {
}
func (c *Cluster) createService(role PostgresRole) (*v1.Service, error) {
c.setProcessName("creating %v service", role)
if c.Services[role] != nil {
return nil, fmt.Errorf("service already exists in the cluster")
}
@ -230,6 +236,7 @@ func (c *Cluster) createService(role PostgresRole) (*v1.Service, error) {
}
func (c *Cluster) updateService(role PostgresRole, newService *v1.Service) error {
c.setProcessName("updating %v service", role)
if c.Services[role] == nil {
return fmt.Errorf("there is no service in the cluster")
}
@ -320,6 +327,7 @@ func (c *Cluster) deleteService(role PostgresRole) error {
}
func (c *Cluster) createEndpoint() (*v1.Endpoints, error) {
c.setProcessName("creating endpoint")
if c.Endpoint != nil {
return nil, fmt.Errorf("endpoint already exists in the cluster")
}
@ -335,6 +343,7 @@ func (c *Cluster) createEndpoint() (*v1.Endpoints, error) {
}
func (c *Cluster) deleteEndpoint() error {
c.setProcessName("deleting endpoint")
c.logger.Debugln("deleting endpoint")
if c.Endpoint == nil {
return fmt.Errorf("there is no endpoint in the cluster")
@ -350,6 +359,7 @@ func (c *Cluster) deleteEndpoint() error {
}
func (c *Cluster) applySecrets() error {
c.setProcessName("applying secrets")
secrets := c.generateUserSecrets()
for secretUsername, secretSpec := range secrets {
@ -388,6 +398,7 @@ func (c *Cluster) applySecrets() error {
}
func (c *Cluster) deleteSecret(secret *v1.Secret) error {
c.setProcessName("deleting secret %q", util.NameFromMeta(secret.ObjectMeta))
c.logger.Debugf("deleting secret %q", util.NameFromMeta(secret.ObjectMeta))
err := c.KubeClient.Secrets(secret.Namespace).Delete(secret.Name, c.deleteOptions)
if err != nil {

View File

@ -187,6 +187,8 @@ func (c *Cluster) syncStatefulSet() error {
}
func (c *Cluster) syncRoles(readFromDatabase bool) error {
c.setProcessName("syncing roles")
var (
err error
dbUsers spec.PgUserMap

View File

@ -284,6 +284,7 @@ func (c *Cluster) waitPodLabelsReady() error {
}
func (c *Cluster) waitStatefulsetPodsReady() error {
c.setProcessName("waiting for the pods of the statefulset")
// TODO: wait for the first Pod only
if err := c.waitStatefulsetReady(); err != nil {
return fmt.Errorf("statuful set error: %v", err)

View File

@ -82,6 +82,8 @@ func (c *Cluster) listPersistentVolumes() ([]*v1.PersistentVolume, error) {
// resizeVolumes resize persistent volumes compatible with the given resizer interface
func (c *Cluster) resizeVolumes(newVolume spec.Volume, resizers []volumes.VolumeResizer) error {
c.setProcessName("resizing volumes")
totalCompatible := 0
newQuantity, err := resource.ParseQuantity(newVolume.Size)
if err != nil {

View File

@ -30,13 +30,14 @@ type Controller struct {
stopCh chan struct{}
curWorkerID uint32 //initialized with 0
clusterWorkers map[spec.NamespacedName]uint32
clustersMu sync.RWMutex
clusters map[spec.NamespacedName]*cluster.Cluster
clusterLogs map[spec.NamespacedName]ringlog.RingLogger
clusterHistory map[spec.NamespacedName]ringlog.RingLogger // history of the cluster changes
teamClusters map[string][]spec.NamespacedName
curWorkerID uint32 //initialized with 0
curWorkerCluster sync.Map
clusterWorkers map[spec.NamespacedName]uint32
clustersMu sync.RWMutex
clusters map[spec.NamespacedName]*cluster.Cluster
clusterLogs map[spec.NamespacedName]ringlog.RingLogger
clusterHistory map[spec.NamespacedName]ringlog.RingLogger // history of the cluster changes
teamClusters map[string][]spec.NamespacedName
postgresqlInformer cache.SharedIndexInformer
podInformer cache.SharedIndexInformer
@ -53,16 +54,17 @@ func NewController(controllerConfig *spec.ControllerConfig) *Controller {
logger := logrus.New()
c := &Controller{
config: *controllerConfig,
opConfig: &config.Config{},
logger: logger.WithField("pkg", "controller"),
clusterWorkers: make(map[spec.NamespacedName]uint32),
clusters: make(map[spec.NamespacedName]*cluster.Cluster),
clusterLogs: make(map[spec.NamespacedName]ringlog.RingLogger),
clusterHistory: make(map[spec.NamespacedName]ringlog.RingLogger),
teamClusters: make(map[string][]spec.NamespacedName),
stopCh: make(chan struct{}),
podCh: make(chan spec.PodEvent),
config: *controllerConfig,
opConfig: &config.Config{},
logger: logger.WithField("pkg", "controller"),
curWorkerCluster: sync.Map{},
clusterWorkers: make(map[spec.NamespacedName]uint32),
clusters: make(map[spec.NamespacedName]*cluster.Cluster),
clusterLogs: make(map[spec.NamespacedName]ringlog.RingLogger),
clusterHistory: make(map[spec.NamespacedName]ringlog.RingLogger),
teamClusters: make(map[string][]spec.NamespacedName),
stopCh: make(chan struct{}),
podCh: make(chan spec.PodEvent),
}
logger.Hooks.Add(c)

View File

@ -86,16 +86,16 @@ func (c *Controller) clusterListFunc(options metav1.ListOptions) (runtime.Object
return &list, err
}
type tprDecoder struct {
type crdDecoder struct {
dec *json.Decoder
close func() error
}
func (d *tprDecoder) Close() {
func (d *crdDecoder) Close() {
d.close()
}
func (d *tprDecoder) Decode() (action watch.EventType, object runtime.Object, err error) {
func (d *crdDecoder) Decode() (action watch.EventType, object runtime.Object, err error) {
var e struct {
Type watch.EventType
Object spec.Postgresql
@ -121,7 +121,7 @@ func (c *Controller) clusterWatchFunc(options metav1.ListOptions) (watch.Interfa
return nil, err
}
return watch.NewStreamWatcher(&tprDecoder{
return watch.NewStreamWatcher(&crdDecoder{
dec: json.NewDecoder(r),
close: r.Close,
}), nil
@ -163,6 +163,8 @@ func (c *Controller) processEvent(event spec.ClusterEvent) {
}
c.clustersMu.RUnlock()
defer c.curWorkerCluster.Store(event.WorkerID, nil)
switch event.EventType {
case spec.EventAdd:
if clusterFound {
@ -174,6 +176,8 @@ func (c *Controller) processEvent(event spec.ClusterEvent) {
cl = c.addCluster(lg, clusterName, event.NewSpec)
c.curWorkerCluster.Store(event.WorkerID, cl)
if err := cl.Create(); err != nil {
cl.Error = fmt.Errorf("could not create cluster: %v", err)
lg.Error(cl.Error)
@ -189,6 +193,7 @@ func (c *Controller) processEvent(event spec.ClusterEvent) {
lg.Warnln("cluster does not exist")
return
}
c.curWorkerCluster.Store(event.WorkerID, cl)
if err := cl.Update(event.NewSpec); err != nil {
cl.Error = fmt.Errorf("could not update cluster: %v", err)
lg.Error(cl.Error)
@ -212,6 +217,7 @@ func (c *Controller) processEvent(event spec.ClusterEvent) {
teamName := strings.ToLower(cl.Spec.TeamID)
c.curWorkerCluster.Store(event.WorkerID, cl)
if err := cl.Delete(); err != nil {
lg.Errorf("could not delete cluster: %v", err)
}
@ -242,6 +248,7 @@ func (c *Controller) processEvent(event spec.ClusterEvent) {
cl = c.addCluster(lg, clusterName, event.NewSpec)
}
c.curWorkerCluster.Store(event.WorkerID, cl)
if err := cl.Sync(); err != nil {
cl.Error = fmt.Errorf("could not sync cluster: %v", err)
lg.Error(cl.Error)

View File

@ -6,7 +6,9 @@ import (
"github.com/Sirupsen/logrus"
"github.com/zalando-incubator/postgres-operator/pkg/cluster"
"github.com/zalando-incubator/postgres-operator/pkg/spec"
"github.com/zalando-incubator/postgres-operator/pkg/util"
"github.com/zalando-incubator/postgres-operator/pkg/util/config"
)
@ -167,6 +169,24 @@ func (c *Controller) GetWorkersCnt() uint32 {
return c.opConfig.Workers
}
//WorkerStatus provides status of the worker
func (c *Controller) WorkerStatus(workerID uint32) (*spec.WorkerStatus, error) {
obj, ok := c.curWorkerCluster.Load(workerID)
if !ok || obj == nil {
return nil, fmt.Errorf("worker has no status")
}
cl, ok := obj.(*cluster.Cluster)
if !ok {
return nil, fmt.Errorf("could not cast to Cluster struct")
}
return &spec.WorkerStatus{
CurrentCluster: util.NameFromMeta(cl.ObjectMeta),
CurrentProcess: cl.GetCurrentProcess(),
}, nil
}
// ClusterHistory dumps history of cluster changes
func (c *Controller) ClusterHistory(team, name string) ([]*spec.Diff, error) {
clusterName := spec.NamespacedName{

View File

@ -29,9 +29,9 @@ func (c *Controller) makeClusterConfig() cluster.Config {
}
func (c *Controller) clusterWorkerID(clusterName spec.NamespacedName) uint32 {
workerId, ok := c.clusterWorkers[clusterName]
workerID, ok := c.clusterWorkers[clusterName]
if ok {
return workerId
return workerID
}
c.clusterWorkers[clusterName] = c.curWorkerID

View File

@ -86,6 +86,12 @@ type LogEntry struct {
Message string
}
// Process describes process of the cluster
type Process struct {
Name string
StartTime time.Time
}
// ClusterStatus describes status of the cluster
type ClusterStatus struct {
Team string
@ -95,10 +101,17 @@ type ClusterStatus struct {
Endpoint *v1.Endpoints
StatefulSet *v1beta1.StatefulSet
Worker uint32
Status PostgresStatus
Spec PostgresSpec
Error error
CurrentProcess Process
Worker uint32
Status PostgresStatus
Spec PostgresSpec
Error error
}
// WorkerStatus describes status of the worker
type WorkerStatus struct {
CurrentCluster NamespacedName
CurrentProcess Process
}
// Diff describes diff

View File

@ -53,6 +53,7 @@ func ResourceNotFound(err error) bool {
return apierrors.IsNotFound(err)
}
// NewFromConfig create Kubernets Interface using REST config
func NewFromConfig(cfg *rest.Config) (KubernetesClient, error) {
kubeClient := KubernetesClient{}