Use unified type for the namespaced object names

This commit is contained in:
Murat Kabilov 2017-04-18 11:54:45 +02:00
parent 79fdba4ac7
commit 08c0e3b6dd
10 changed files with 69 additions and 85 deletions

View File

@ -59,7 +59,7 @@ type Cluster struct {
logger *logrus.Entry logger *logrus.Entry
pgUsers map[string]spec.PgUser pgUsers map[string]spec.PgUser
podEvents chan spec.PodEvent podEvents chan spec.PodEvent
podSubscribers map[spec.PodName]chan spec.PodEvent podSubscribers map[spec.NamespacedName]chan spec.PodEvent
pgDb *sql.DB pgDb *sql.DB
mu sync.Mutex mu sync.Mutex
} }
@ -74,18 +74,15 @@ func New(cfg Config, pgSpec spec.Postgresql, logger *logrus.Logger) *Cluster {
logger: lg, logger: lg,
pgUsers: make(map[string]spec.PgUser), pgUsers: make(map[string]spec.PgUser),
podEvents: make(chan spec.PodEvent), podEvents: make(chan spec.PodEvent),
podSubscribers: make(map[spec.PodName]chan spec.PodEvent), podSubscribers: make(map[spec.NamespacedName]chan spec.PodEvent),
kubeResources: kubeResources, kubeResources: kubeResources,
} }
return cluster return cluster
} }
func (c *Cluster) ClusterName() spec.ClusterName { func (c *Cluster) ClusterName() spec.NamespacedName {
return spec.ClusterName{ return util.NameFromMeta(c.Metadata)
Name: c.Metadata.Name,
Namespace: c.Metadata.Namespace,
}
} }
func (c *Cluster) TeamName() string { func (c *Cluster) TeamName() string {

View File

@ -84,10 +84,7 @@ func (c *Cluster) deletePersistenVolumeClaims() error {
} }
func (c *Cluster) deletePod(pod *v1.Pod) error { func (c *Cluster) deletePod(pod *v1.Pod) error {
podName := spec.PodName{ podName := util.NameFromMeta(pod.ObjectMeta)
Namespace: pod.Namespace,
Name: pod.Name,
}
ch := make(chan spec.PodEvent) ch := make(chan spec.PodEvent)
if _, ok := c.podSubscribers[podName]; ok { if _, ok := c.podSubscribers[podName]; ok {
@ -110,7 +107,7 @@ func (c *Cluster) deletePod(pod *v1.Pod) error {
return nil return nil
} }
func (c *Cluster) unregisterPodSubscriber(podName spec.PodName) { func (c *Cluster) unregisterPodSubscriber(podName spec.NamespacedName) {
if _, ok := c.podSubscribers[podName]; !ok { if _, ok := c.podSubscribers[podName]; !ok {
panic("Subscriber for Pod '" + podName.String() + "' is not found") panic("Subscriber for Pod '" + podName.String() + "' is not found")
} }
@ -119,7 +116,7 @@ func (c *Cluster) unregisterPodSubscriber(podName spec.PodName) {
delete(c.podSubscribers, podName) delete(c.podSubscribers, podName)
} }
func (c *Cluster) registerPodSubscriber(podName spec.PodName) chan spec.PodEvent { func (c *Cluster) registerPodSubscriber(podName spec.NamespacedName) chan spec.PodEvent {
ch := make(chan spec.PodEvent) ch := make(chan spec.PodEvent)
if _, ok := c.podSubscribers[podName]; ok { if _, ok := c.podSubscribers[podName]; ok {
panic("Pod '" + podName.String() + "' is already subscribed") panic("Pod '" + podName.String() + "' is already subscribed")
@ -129,10 +126,7 @@ func (c *Cluster) registerPodSubscriber(podName spec.PodName) chan spec.PodEvent
} }
func (c *Cluster) recreatePod(pod v1.Pod) error { func (c *Cluster) recreatePod(pod v1.Pod) error {
podName := spec.PodName{ podName := util.NameFromMeta(pod.ObjectMeta)
Namespace: pod.Namespace,
Name: pod.Name,
}
orphanDependents := false orphanDependents := false
deleteOptions := &v1.DeleteOptions{ deleteOptions := &v1.DeleteOptions{

View File

@ -29,8 +29,8 @@ type Controller struct {
Config Config
opConfig *config.Config opConfig *config.Config
logger *logrus.Entry logger *logrus.Entry
clusters map[spec.ClusterName]*cluster.Cluster clusters map[spec.NamespacedName]*cluster.Cluster
stopChMap map[spec.ClusterName]chan struct{} stopChMap map[spec.NamespacedName]chan struct{}
waitCluster sync.WaitGroup waitCluster sync.WaitGroup
postgresqlInformer cache.SharedIndexInformer postgresqlInformer cache.SharedIndexInformer
@ -51,8 +51,8 @@ func New(controllerConfig *Config, operatorConfig *config.Config) *Controller {
Config: *controllerConfig, Config: *controllerConfig,
opConfig: operatorConfig, opConfig: operatorConfig,
logger: logger.WithField("pkg", "controller"), logger: logger.WithField("pkg", "controller"),
clusters: make(map[spec.ClusterName]*cluster.Cluster), clusters: make(map[spec.NamespacedName]*cluster.Cluster),
stopChMap: make(map[spec.ClusterName]chan struct{}), stopChMap: make(map[spec.NamespacedName]chan struct{}),
podCh: make(chan spec.PodEvent), podCh: make(chan spec.PodEvent),
} }
} }

View File

@ -55,13 +55,6 @@ func (c *Controller) podWatchFunc(options api.ListOptions) (watch.Interface, err
return c.KubeClient.CoreV1Client.Pods(c.PodNamespace).Watch(opts) return c.KubeClient.CoreV1Client.Pods(c.PodNamespace).Watch(opts)
} }
func PodNameFromMeta(meta v1.ObjectMeta) spec.PodName {
return spec.PodName{
Namespace: meta.Namespace,
Name: meta.Name,
}
}
func (c *Controller) podAdd(obj interface{}) { func (c *Controller) podAdd(obj interface{}) {
pod, ok := obj.(*v1.Pod) pod, ok := obj.(*v1.Pod)
if !ok { if !ok {
@ -70,7 +63,7 @@ func (c *Controller) podAdd(obj interface{}) {
podEvent := spec.PodEvent{ podEvent := spec.PodEvent{
ClusterName: util.PodClusterName(pod), ClusterName: util.PodClusterName(pod),
PodName: PodNameFromMeta(pod.ObjectMeta), PodName: util.NameFromMeta(pod.ObjectMeta),
CurPod: pod, CurPod: pod,
EventType: spec.PodEventAdd, EventType: spec.PodEventAdd,
} }
@ -91,7 +84,7 @@ func (c *Controller) podUpdate(prev, cur interface{}) {
podEvent := spec.PodEvent{ podEvent := spec.PodEvent{
ClusterName: util.PodClusterName(curPod), ClusterName: util.PodClusterName(curPod),
PodName: PodNameFromMeta(curPod.ObjectMeta), PodName: util.NameFromMeta(curPod.ObjectMeta),
PrevPod: prevPod, PrevPod: prevPod,
CurPod: curPod, CurPod: curPod,
EventType: spec.PodEventUpdate, EventType: spec.PodEventUpdate,
@ -108,7 +101,7 @@ func (c *Controller) podDelete(obj interface{}) {
podEvent := spec.PodEvent{ podEvent := spec.PodEvent{
ClusterName: util.PodClusterName(pod), ClusterName: util.PodClusterName(pod),
PodName: PodNameFromMeta(pod.ObjectMeta), PodName: util.NameFromMeta(pod.ObjectMeta),
CurPod: pod, CurPod: pod,
EventType: spec.PodEventDelete, EventType: spec.PodEventDelete,
} }

View File

@ -41,11 +41,7 @@ func (c *Controller) clusterListFunc(options api.ListOptions) (runtime.Object, e
if !ok { if !ok {
return nil, fmt.Errorf("Can't cast object to postgresql") return nil, fmt.Errorf("Can't cast object to postgresql")
} }
clusterName := spec.ClusterName{ clusterName := util.NameFromMeta(pg.Metadata)
Namespace: pg.Metadata.Namespace,
Name: pg.Metadata.Name,
}
cl := cluster.New(clusterConfig, *pg, c.logger.Logger) cl := cluster.New(clusterConfig, *pg, c.logger.Logger)
stopCh := make(chan struct{}) stopCh := make(chan struct{})
@ -83,10 +79,7 @@ func (c *Controller) postgresqlAdd(obj interface{}) {
return return
} }
clusterName := spec.ClusterName{ clusterName := util.NameFromMeta(pg.Metadata)
Namespace: pg.Metadata.Namespace,
Name: pg.Metadata.Name,
}
_, ok = c.clusters[clusterName] _, ok = c.clusters[clusterName]
if ok { if ok {
@ -123,10 +116,7 @@ func (c *Controller) postgresqlUpdate(prev, cur interface{}) {
c.logger.Errorf("Can't cast to postgresql spec") c.logger.Errorf("Can't cast to postgresql spec")
} }
clusterName := spec.ClusterName{ clusterName := util.NameFromMeta(pgNew.Metadata)
Namespace: pgNew.Metadata.Namespace,
Name: pgNew.Metadata.Name,
}
//TODO: Do not update cluster which is currently creating //TODO: Do not update cluster which is currently creating
if pgPrev.Metadata.ResourceVersion == pgNew.Metadata.ResourceVersion { if pgPrev.Metadata.ResourceVersion == pgNew.Metadata.ResourceVersion {
@ -155,10 +145,7 @@ func (c *Controller) postgresqlDelete(obj interface{}) {
c.logger.Errorf("Can't cast to postgresql spec") c.logger.Errorf("Can't cast to postgresql spec")
return return
} }
clusterName := spec.ClusterName{ clusterName := util.NameFromMeta(pgCur.Metadata)
Namespace: pgCur.Metadata.Namespace,
Name: pgCur.Metadata.Name,
}
pgCluster, ok := c.clusters[clusterName] pgCluster, ok := c.clusters[clusterName]
if !ok { if !ok {
c.logger.Errorf("Unknown cluster: %s", clusterName) c.logger.Errorf("Unknown cluster: %s", clusterName)

View File

@ -3,7 +3,6 @@ package controller
import ( import (
"fmt" "fmt"
"k8s.io/client-go/pkg/api"
"k8s.io/client-go/pkg/api/v1" "k8s.io/client-go/pkg/api/v1"
extv1beta "k8s.io/client-go/pkg/apis/extensions/v1beta1" extv1beta "k8s.io/client-go/pkg/apis/extensions/v1beta1"
@ -26,7 +25,9 @@ func (c *Controller) makeClusterConfig() cluster.Config {
func (c *Controller) getOAuthToken() (string, error) { func (c *Controller) getOAuthToken() (string, error) {
// Temporary getting postgresql-operator secret from the NamespaceDefault // Temporary getting postgresql-operator secret from the NamespaceDefault
credentialsSecret, err := c.KubeClient.Secrets(api.NamespaceDefault).Get(c.opConfig.OAuthTokenSecretName) credentialsSecret, err := c.KubeClient.
Secrets(c.opConfig.OAuthTokenSecretName.Namespace).
Get(c.opConfig.OAuthTokenSecretName.Name)
if err != nil { if err != nil {
c.logger.Debugf("Oauth token secret name: %s", c.opConfig.OAuthTokenSecretName) c.logger.Debugf("Oauth token secret name: %s", c.opConfig.OAuthTokenSecretName)
@ -75,15 +76,19 @@ func (c *Controller) createTPR() error {
} }
func (c *Controller) getInfrastructureRoles() (result map[string]spec.PgUser, err error) { func (c *Controller) getInfrastructureRoles() (result map[string]spec.PgUser, err error) {
if c.opConfig.InfrastructureRolesSecretName == "" { if c.opConfig.InfrastructureRolesSecretName == (spec.NamespacedName{}) {
// we don't have infrastructure roles defined, bail out // we don't have infrastructure roles defined, bail out
return nil, nil return nil, nil
} }
infraRolesSecret, err := c.KubeClient.Secrets(api.NamespaceDefault).Get(c.opConfig.InfrastructureRolesSecretName)
infraRolesSecret, err := c.KubeClient.
Secrets(c.opConfig.InfrastructureRolesSecretName.Namespace).
Get(c.opConfig.InfrastructureRolesSecretName.Name)
if err != nil { if err != nil {
c.logger.Debugf("Infrastructure roles secret name: %s", c.opConfig.InfrastructureRolesSecretName) c.logger.Debugf("Infrastructure roles secret name: %s", c.opConfig.InfrastructureRolesSecretName)
return nil, fmt.Errorf("Can't get infrastructure roles Secret: %s", err) return nil, fmt.Errorf("Can't get infrastructure roles Secret: %s", err)
} }
data := infraRolesSecret.Data data := infraRolesSecret.Data
result = make(map[string]spec.PgUser) result = make(map[string]spec.PgUser)
Users: Users:
@ -112,9 +117,11 @@ Users:
} }
} }
} }
if t.Name != "" { if t.Name != "" {
result[t.Name] = t result[t.Name] = t
} }
} }
return result, nil return result, nil
} }

View File

@ -52,11 +52,11 @@ type PostgresStatus string
const ( const (
ClusterStatusUnknown PostgresStatus = "" ClusterStatusUnknown PostgresStatus = ""
ClusterStatusCreating = "Creating" ClusterStatusCreating PostgresStatus = "Creating"
ClusterStatusUpdating = "Updating" ClusterStatusUpdating PostgresStatus = "Updating"
ClusterStatusUpdateFailed = "UpdateFailed" ClusterStatusUpdateFailed PostgresStatus = "UpdateFailed"
ClusterStatusAddFailed = "CreateFailed" ClusterStatusAddFailed PostgresStatus = "CreateFailed"
ClusterStatusRunning = "Running" ClusterStatusRunning PostgresStatus = "Running"
) )
// PostgreSQL Third Party (resource) Object // PostgreSQL Third Party (resource) Object

View File

@ -7,7 +7,7 @@ import (
type PodEventType string type PodEventType string
type PodName types.NamespacedName type NamespacedName types.NamespacedName
const ( const (
PodEventAdd PodEventType = "ADD" PodEventAdd PodEventType = "ADD"
@ -16,26 +16,32 @@ const (
) )
type PodEvent struct { type PodEvent struct {
ClusterName ClusterName ClusterName NamespacedName
PodName PodName PodName NamespacedName
PrevPod *v1.Pod PrevPod *v1.Pod
CurPod *v1.Pod CurPod *v1.Pod
EventType PodEventType EventType PodEventType
} }
func (p PodName) String() string {
return types.NamespacedName(p).String()
}
type ClusterName types.NamespacedName
func (c ClusterName) String() string {
return types.NamespacedName(c).String()
}
type PgUser struct { type PgUser struct {
Name string Name string
Password string Password string
Flags []string Flags []string
MemberOf string MemberOf string
} }
func (p NamespacedName) String() string {
return types.NamespacedName(p).String()
}
func (n *NamespacedName) Decode(value string) error {
name := types.NewNamespacedNameFromString(value)
if value != "" && name == (types.NamespacedName{}) {
name.Name = value
name.Namespace = v1.NamespaceDefault
}
*n = NamespacedName(name)
return nil
}

View File

@ -6,6 +6,7 @@ import (
"strings" "strings"
"time" "time"
"github.bus.zalan.do/acid/postgres-operator/pkg/spec"
"github.com/kelseyhightower/envconfig" "github.com/kelseyhightower/envconfig"
) )
@ -27,8 +28,8 @@ type Auth struct {
PamRoleName string `split_words:"true" default:"zalandos"` PamRoleName string `split_words:"true" default:"zalandos"`
PamConfiguration string `split_words:"true" default:"https://info.example.com/oauth2/tokeninfo?access_token= uid realm=/employees"` PamConfiguration string `split_words:"true" default:"https://info.example.com/oauth2/tokeninfo?access_token= uid realm=/employees"`
TeamsAPIUrl string `envconfig:"teams_api_url" default:"https://teams.example.com/api/"` TeamsAPIUrl string `envconfig:"teams_api_url" default:"https://teams.example.com/api/"`
OAuthTokenSecretName string `envconfig:"oauth_token_secret_name" default:"postgresql-operator"` OAuthTokenSecretName spec.NamespacedName `envconfig:"oauth_token_secret_name" default:"postgresql-operator"`
InfrastructureRolesSecretName string `split_words:"true"` InfrastructureRolesSecretName spec.NamespacedName `split_words:"true"`
SuperUsername string `split_words:"true" default:"postgres"` SuperUsername string `split_words:"true" default:"postgres"`
ReplicationUsername string `split_words:"true" default:"replication"` ReplicationUsername string `split_words:"true" default:"replication"`
} }
@ -49,6 +50,8 @@ type Config struct {
} }
func LoadFromEnv() *Config { func LoadFromEnv() *Config {
//TODO: maybe we should use ConfigMaps( https://kubernetes.io/docs/tasks/configure-pod-container/configmap/ ) instead?
var cfg Config var cfg Config
err := envconfig.Process("PGOP", &cfg) err := envconfig.Process("PGOP", &cfg)
if err != nil { if err != nil {

View File

@ -11,7 +11,6 @@ import (
"github.bus.zalan.do/acid/postgres-operator/pkg/spec" "github.bus.zalan.do/acid/postgres-operator/pkg/spec"
"k8s.io/client-go/pkg/api/v1" "k8s.io/client-go/pkg/api/v1"
"k8s.io/client-go/pkg/types"
"strings" "strings"
) )
@ -30,24 +29,22 @@ func RandomPassword(n int) string {
return string(b) return string(b)
} }
func NameFromMeta(meta v1.ObjectMeta) types.NamespacedName { func NameFromMeta(meta v1.ObjectMeta) spec.NamespacedName {
obj := types.NamespacedName{ return spec.NamespacedName{
Namespace: meta.Namespace, Namespace: meta.Namespace,
Name: meta.Name, Name: meta.Name,
} }
return obj
} }
func PodClusterName(pod *v1.Pod) spec.ClusterName { func PodClusterName(pod *v1.Pod) spec.NamespacedName {
if name, ok := pod.Labels["spilo-cluster"]; ok { if name, ok := pod.Labels["spilo-cluster"]; ok {
return spec.ClusterName{ return spec.NamespacedName{
Namespace: pod.Namespace, Namespace: pod.Namespace,
Name: name, Name: name,
} }
} }
return spec.ClusterName{} return spec.NamespacedName{}
} }
func PodSpiloRole(pod *v1.Pod) string { func PodSpiloRole(pod *v1.Pod) string {