Merge pull request #225 from zalando-incubator/support-many-namespaces
Support many namespaces
This commit is contained in:
		
						commit
						b0549c3c9c
					
				|  | @ -74,7 +74,7 @@ All subsequent `kubectl` commands will work with the `test` namespace. The opera | |||
| 
 | ||||
| Watching a namespace for an operator means tracking requests to change Postgresql clusters in the namespace such as "increase the number of Postgresql replicas to 5" and reacting to the requests, in this example by actually scaling up.  | ||||
| 
 | ||||
| By default, the operator watches the namespace it is deployed to. You can change this by altering the `WATCHED_NAMESPACE` env var in the operator deployment manifest or the `watched_namespace` field in the operator configmap. In the case both are set, the env var takes the precedence. | ||||
| By default, the operator watches the namespace it is deployed to. You can change this by altering the `WATCHED_NAMESPACE` env var in the operator deployment manifest or the `watched_namespace` field in the operator configmap. In the case both are set, the env var takes the precedence. To make the operator listen to all namespaces, explicitly set the field/env var to "`*`". | ||||
| 
 | ||||
| Note that for an operator to manage pods in the watched namespace, the operator's service account (as specified in the operator deployment manifest) has to have appropriate privileges to access the watched namespace. The watched namespace also needs to have a (possibly different) service account in the case database pods need to talk to the Kubernetes API (e.g. when using Kubernetes-native configuration of Patroni). | ||||
| 
 | ||||
|  |  | |||
|  | @ -3,8 +3,9 @@ kind: ConfigMap | |||
| metadata: | ||||
|   name: postgres-operator | ||||
| data: | ||||
|   # assumes the ns exists before the operator starts | ||||
|   # the env var with the same name may overwrite it in the operator pod | ||||
|   # the env var with the same name in the operator pod may overwrite this value | ||||
|   # if neither is set or evaluates to the empty string, listen to the operator's own namespace | ||||
|   # if set to the "*", listen to all namespaces | ||||
|   # watched_namespace: development | ||||
|   service_account_name: operator | ||||
|   cluster_labels: application:spilo | ||||
|  |  | |||
|  | @ -16,6 +16,8 @@ spec: | |||
|         imagePullPolicy: IfNotPresent | ||||
|         env: | ||||
|         # uncomment to overwrite a similar setting from operator configmap | ||||
|         # if set to the empty string, watch the operator's own namespace | ||||
|         # if set to the "*", listen to all namespaces | ||||
|         # - name: WATCHED_NAMESPACE | ||||
|         #  valueFrom: | ||||
|         #    fieldRef: | ||||
|  |  | |||
|  | @ -30,9 +30,9 @@ type controllerInformer interface { | |||
| 	GetOperatorConfig() *config.Config | ||||
| 	GetStatus() *spec.ControllerStatus | ||||
| 	TeamClusterList() map[string][]spec.NamespacedName | ||||
| 	ClusterStatus(team, cluster string) (*spec.ClusterStatus, error) | ||||
| 	ClusterLogs(team, cluster string) ([]*spec.LogEntry, error) | ||||
| 	ClusterHistory(team, cluster string) ([]*spec.Diff, error) | ||||
| 	ClusterStatus(team, namespace, cluster string) (*spec.ClusterStatus, error) | ||||
| 	ClusterLogs(team, namespace, cluster string) ([]*spec.LogEntry, error) | ||||
| 	ClusterHistory(team, namespace, cluster string) ([]*spec.Diff, error) | ||||
| 	ClusterDatabasesMap() map[string][]string | ||||
| 	WorkerLogs(workerID uint32) ([]*spec.LogEntry, error) | ||||
| 	ListQueue(workerID uint32) (*spec.QueueDump, error) | ||||
|  | @ -48,9 +48,9 @@ type Server struct { | |||
| } | ||||
| 
 | ||||
| var ( | ||||
| 	clusterStatusURL     = regexp.MustCompile(`^/clusters/(?P<team>[a-zA-Z][a-zA-Z0-9]*)/(?P<cluster>[a-zA-Z][a-zA-Z0-9-]*)/?$`) | ||||
| 	clusterLogsURL       = regexp.MustCompile(`^/clusters/(?P<team>[a-zA-Z][a-zA-Z0-9]*)/(?P<cluster>[a-zA-Z][a-zA-Z0-9-]*)/logs/?$`) | ||||
| 	clusterHistoryURL    = regexp.MustCompile(`^/clusters/(?P<team>[a-zA-Z][a-zA-Z0-9]*)/(?P<cluster>[a-zA-Z][a-zA-Z0-9-]*)/history/?$`) | ||||
| 	clusterStatusURL     = regexp.MustCompile(`^/clusters/(?P<team>[a-zA-Z][a-zA-Z0-9]*)/(?P<namespace>[a-z0-9]([-a-z0-9]*[a-z0-9])?)/(?P<cluster>[a-zA-Z][a-zA-Z0-9-]*)/?$`) | ||||
| 	clusterLogsURL       = regexp.MustCompile(`^/clusters/(?P<team>[a-zA-Z][a-zA-Z0-9]*)/(?P<namespace>[a-z0-9]([-a-z0-9]*[a-z0-9])?)/(?P<cluster>[a-zA-Z][a-zA-Z0-9-]*)/logs/?$`) | ||||
| 	clusterHistoryURL    = regexp.MustCompile(`^/clusters/(?P<team>[a-zA-Z][a-zA-Z0-9]*)/(?P<namespace>[a-z0-9]([-a-z0-9]*[a-z0-9])?)/(?P<cluster>[a-zA-Z][a-zA-Z0-9-]*)/history/?$`) | ||||
| 	teamURL              = regexp.MustCompile(`^/clusters/(?P<team>[a-zA-Z][a-zA-Z0-9]*)/?$`) | ||||
| 	workerLogsURL        = regexp.MustCompile(`^/workers/(?P<id>\d+)/logs/?$`) | ||||
| 	workerEventsQueueURL = regexp.MustCompile(`^/workers/(?P<id>\d+)/queue/?$`) | ||||
|  | @ -149,7 +149,8 @@ func (s *Server) clusters(w http.ResponseWriter, req *http.Request) { | |||
| 	) | ||||
| 
 | ||||
| 	if matches := util.FindNamedStringSubmatch(clusterStatusURL, req.URL.Path); matches != nil { | ||||
| 		resp, err = s.controller.ClusterStatus(matches["team"], matches["cluster"]) | ||||
| 		namespace, _ := matches["namespace"] | ||||
| 		resp, err = s.controller.ClusterStatus(matches["team"], namespace, matches["cluster"]) | ||||
| 	} else if matches := util.FindNamedStringSubmatch(teamURL, req.URL.Path); matches != nil { | ||||
| 		teamClusters := s.controller.TeamClusterList() | ||||
| 		clusters, found := teamClusters[matches["team"]] | ||||
|  | @ -166,9 +167,11 @@ func (s *Server) clusters(w http.ResponseWriter, req *http.Request) { | |||
| 		s.respond(clusterNames, nil, w) | ||||
| 		return | ||||
| 	} else if matches := util.FindNamedStringSubmatch(clusterLogsURL, req.URL.Path); matches != nil { | ||||
| 		resp, err = s.controller.ClusterLogs(matches["team"], matches["cluster"]) | ||||
| 		namespace, _ := matches["namespace"] | ||||
| 		resp, err = s.controller.ClusterLogs(matches["team"], namespace, matches["cluster"]) | ||||
| 	} else if matches := util.FindNamedStringSubmatch(clusterHistoryURL, req.URL.Path); matches != nil { | ||||
| 		resp, err = s.controller.ClusterHistory(matches["team"], matches["cluster"]) | ||||
| 		namespace, _ := matches["namespace"] | ||||
| 		resp, err = s.controller.ClusterHistory(matches["team"], namespace, matches["cluster"]) | ||||
| 	} else if req.URL.Path == clustersURL { | ||||
| 		res := make(map[string][]string) | ||||
| 		for team, clusters := range s.controller.TeamClusterList() { | ||||
|  |  | |||
|  | @ -127,6 +127,10 @@ func (c *Cluster) clusterName() spec.NamespacedName { | |||
| 	return util.NameFromMeta(c.ObjectMeta) | ||||
| } | ||||
| 
 | ||||
| func (c *Cluster) clusterNamespace() string { | ||||
| 	return c.ObjectMeta.Namespace | ||||
| } | ||||
| 
 | ||||
| func (c *Cluster) teamName() string { | ||||
| 	// TODO: check Teams API for the actual name (in case the user passes an integer Id).
 | ||||
| 	return c.Spec.TeamID | ||||
|  |  | |||
|  | @ -110,7 +110,7 @@ func (c *Cluster) preScaleDown(newStatefulSet *v1beta1.StatefulSet) error { | |||
| 	} | ||||
| 
 | ||||
| 	podName := fmt.Sprintf("%s-0", c.Statefulset.Name) | ||||
| 	masterCandidatePod, err := c.KubeClient.Pods(c.OpConfig.WatchedNamespace).Get(podName, metav1.GetOptions{}) | ||||
| 	masterCandidatePod, err := c.KubeClient.Pods(c.clusterNamespace()).Get(podName, metav1.GetOptions{}) | ||||
| 	if err != nil { | ||||
| 		return fmt.Errorf("could not get master candidate pod: %v", err) | ||||
| 	} | ||||
|  |  | |||
|  | @ -24,6 +24,7 @@ func (c *Cluster) Sync(newSpec *spec.Postgresql) (err error) { | |||
| 
 | ||||
| 	defer func() { | ||||
| 		if err != nil { | ||||
| 			c.logger.Warningf("error while syncing cluster state: %v", err) | ||||
| 			c.setStatus(spec.ClusterStatusSyncFailed) | ||||
| 		} else if c.Status != spec.ClusterStatusRunning { | ||||
| 			c.setStatus(spec.ClusterStatusRunning) | ||||
|  |  | |||
|  | @ -14,6 +14,7 @@ import ( | |||
| 	"github.com/zalando-incubator/postgres-operator/pkg/apiserver" | ||||
| 	"github.com/zalando-incubator/postgres-operator/pkg/cluster" | ||||
| 	"github.com/zalando-incubator/postgres-operator/pkg/spec" | ||||
| 	"github.com/zalando-incubator/postgres-operator/pkg/util" | ||||
| 	"github.com/zalando-incubator/postgres-operator/pkg/util/config" | ||||
| 	"github.com/zalando-incubator/postgres-operator/pkg/util/constants" | ||||
| 	"github.com/zalando-incubator/postgres-operator/pkg/util/k8sutil" | ||||
|  | @ -97,17 +98,7 @@ func (c *Controller) initOperatorConfig() { | |||
| 		c.logger.Infoln("no ConfigMap specified. Loading default values") | ||||
| 	} | ||||
| 
 | ||||
| 	// env var takes priority over the same param from the operator ConfigMap
 | ||||
| 	watchedNamespace := os.Getenv("WATCHED_NAMESPACE") | ||||
| 	if watchedNamespace != "" { | ||||
| 		c.logger.Infof("Watch the %q namespace specified in the env variable WATCHED_NAMESPACE\n", watchedNamespace) | ||||
| 		configMapData["watched_namespace"] = watchedNamespace | ||||
| 	} | ||||
| 
 | ||||
| 	if configMapData["watched_namespace"] == "" { | ||||
| 		c.logger.Infof("No namespace to watch specified. By convention, the operator falls back to watching the namespace it is deployed to: '%v' \n", spec.GetOperatorNamespace()) | ||||
| 		configMapData["watched_namespace"] = spec.GetOperatorNamespace() | ||||
| 	} | ||||
| 	configMapData["watched_namespace"] = c.getEffectiveNamespace(os.Getenv("WATCHED_NAMESPACE"), configMapData["watched_namespace"]) | ||||
| 
 | ||||
| 	if c.config.NoDatabaseAccess { | ||||
| 		configMapData["enable_database_access"] = "false" | ||||
|  | @ -127,6 +118,7 @@ func (c *Controller) initOperatorConfig() { | |||
| func (c *Controller) initController() { | ||||
| 	c.initClients() | ||||
| 	c.initOperatorConfig() | ||||
| 
 | ||||
| 	c.initSharedInformers() | ||||
| 
 | ||||
| 	c.logger.Infof("config: %s", c.opConfig.MustMarshal()) | ||||
|  | @ -256,3 +248,25 @@ func (c *Controller) kubeNodesInformer(stopCh <-chan struct{}, wg *sync.WaitGrou | |||
| 
 | ||||
| 	c.nodesInformer.Run(stopCh) | ||||
| } | ||||
| 
 | ||||
| func (c *Controller) getEffectiveNamespace(namespaceFromEnvironment, namespaceFromConfigMap string) string { | ||||
| 
 | ||||
| 	namespace := util.Coalesce(namespaceFromEnvironment, util.Coalesce(namespaceFromConfigMap, spec.GetOperatorNamespace())) | ||||
| 
 | ||||
| 	if namespace == "*" { | ||||
| 
 | ||||
| 		namespace = v1.NamespaceAll | ||||
| 		c.logger.Infof("Listening to all namespaces") | ||||
| 
 | ||||
| 	} else { | ||||
| 
 | ||||
| 		if _, err := c.KubeClient.Namespaces().Get(namespace, metav1.GetOptions{}); err != nil { | ||||
| 			c.logger.Fatalf("Could not find the watched namespace %q", namespace) | ||||
| 		} else { | ||||
| 			c.logger.Infof("Listenting to the specific namespace %q", namespace) | ||||
| 		} | ||||
| 
 | ||||
| 	} | ||||
| 
 | ||||
| 	return namespace | ||||
| } | ||||
|  |  | |||
|  | @ -14,9 +14,10 @@ import ( | |||
| ) | ||||
| 
 | ||||
| // ClusterStatus provides status of the cluster
 | ||||
| func (c *Controller) ClusterStatus(team, cluster string) (*spec.ClusterStatus, error) { | ||||
| func (c *Controller) ClusterStatus(team, namespace, cluster string) (*spec.ClusterStatus, error) { | ||||
| 
 | ||||
| 	clusterName := spec.NamespacedName{ | ||||
| 		Namespace: c.opConfig.WatchedNamespace, | ||||
| 		Namespace: namespace, | ||||
| 		Name:      team + "-" + cluster, | ||||
| 	} | ||||
| 
 | ||||
|  | @ -90,9 +91,10 @@ func (c *Controller) GetStatus() *spec.ControllerStatus { | |||
| } | ||||
| 
 | ||||
| // ClusterLogs dumps cluster ring logs
 | ||||
| func (c *Controller) ClusterLogs(team, name string) ([]*spec.LogEntry, error) { | ||||
| func (c *Controller) ClusterLogs(team, namespace, name string) ([]*spec.LogEntry, error) { | ||||
| 
 | ||||
| 	clusterName := spec.NamespacedName{ | ||||
| 		Namespace: c.opConfig.WatchedNamespace, | ||||
| 		Namespace: namespace, | ||||
| 		Name:      team + "-" + name, | ||||
| 	} | ||||
| 
 | ||||
|  | @ -212,9 +214,10 @@ func (c *Controller) WorkerStatus(workerID uint32) (*spec.WorkerStatus, error) { | |||
| } | ||||
| 
 | ||||
| // ClusterHistory dumps history of cluster changes
 | ||||
| func (c *Controller) ClusterHistory(team, name string) ([]*spec.Diff, error) { | ||||
| func (c *Controller) ClusterHistory(team, namespace, name string) ([]*spec.Diff, error) { | ||||
| 
 | ||||
| 	clusterName := spec.NamespacedName{ | ||||
| 		Namespace: c.opConfig.WatchedNamespace, | ||||
| 		Namespace: namespace, | ||||
| 		Name:      team + "-" + name, | ||||
| 	} | ||||
| 
 | ||||
|  |  | |||
|  | @ -67,7 +67,7 @@ type Config struct { | |||
| 	Resources | ||||
| 	Auth | ||||
| 	Scalyr | ||||
| 	WatchedNamespace         string            `name:"watched_namespace"` | ||||
| 	WatchedNamespace         string            `name:"watched_namespace"` // special values: "*" means 'watch all namespaces', the empty string "" means 'watch a namespace where operator is deployed to'
 | ||||
| 	EtcdHost                 string            `name:"etcd_host" default:"etcd-client.default.svc.cluster.local:2379"` | ||||
| 	DockerImage              string            `name:"docker_image" default:"registry.opensource.zalan.do/acid/spiloprivate-9.6:1.2-p4"` | ||||
| 	ServiceAccountName       string            `name:"service_account_name" default:"operator"` | ||||
|  |  | |||
|  | @ -32,6 +32,7 @@ type KubernetesClient struct { | |||
| 	v1core.PersistentVolumeClaimsGetter | ||||
| 	v1core.ConfigMapsGetter | ||||
| 	v1core.NodesGetter | ||||
| 	v1core.NamespacesGetter | ||||
| 	v1core.ServiceAccountsGetter | ||||
| 	v1beta1.StatefulSetsGetter | ||||
| 	policyv1beta1.PodDisruptionBudgetsGetter | ||||
|  | @ -78,6 +79,7 @@ func NewFromConfig(cfg *rest.Config) (KubernetesClient, error) { | |||
| 	kubeClient.PersistentVolumeClaimsGetter = client.CoreV1() | ||||
| 	kubeClient.PersistentVolumesGetter = client.CoreV1() | ||||
| 	kubeClient.NodesGetter = client.CoreV1() | ||||
| 	kubeClient.NamespacesGetter = client.CoreV1() | ||||
| 	kubeClient.StatefulSetsGetter = client.AppsV1beta1() | ||||
| 	kubeClient.PodDisruptionBudgetsGetter = client.PolicyV1beta1() | ||||
| 	kubeClient.RESTClient = client.CoreV1().RESTClient() | ||||
|  |  | |||
|  | @ -119,3 +119,10 @@ func MapContains(haystack, needle map[string]string) bool { | |||
| 
 | ||||
| 	return true | ||||
| } | ||||
| 
 | ||||
| func Coalesce(val, defaultVal string) string { | ||||
| 	if val == "" { | ||||
| 		return defaultVal | ||||
| 	} | ||||
| 	return val | ||||
| } | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue