Merge branch 'feature/refactor-tpr' into fix/graceful-shutdown
# Conflicts: # pkg/controller/controller.go
This commit is contained in:
		
						commit
						acebdf0122
					
				
							
								
								
									
										73
									
								
								cmd/main.go
								
								
								
								
							
							
						
						
									
										73
									
								
								cmd/main.go
								
								
								
								
							|  | @ -8,67 +8,42 @@ import ( | ||||||
| 	"sync" | 	"sync" | ||||||
| 	"syscall" | 	"syscall" | ||||||
| 
 | 
 | ||||||
| 	meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1" |  | ||||||
| 
 |  | ||||||
| 	"github.com/zalando-incubator/postgres-operator/pkg/controller" | 	"github.com/zalando-incubator/postgres-operator/pkg/controller" | ||||||
| 	"github.com/zalando-incubator/postgres-operator/pkg/spec" |  | ||||||
| 	"github.com/zalando-incubator/postgres-operator/pkg/util/config" |  | ||||||
| 	"github.com/zalando-incubator/postgres-operator/pkg/util/k8sutil" | 	"github.com/zalando-incubator/postgres-operator/pkg/util/k8sutil" | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
| var ( | var ( | ||||||
| 	KubeConfigFile string | 	KubeConfigFile string | ||||||
| 	podNamespace     string |  | ||||||
| 	configMapName    spec.NamespacedName |  | ||||||
| 	OutOfCluster   bool | 	OutOfCluster   bool | ||||||
| 	noTeamsAPI       bool |  | ||||||
| 	noDatabaseAccess bool |  | ||||||
| 	version        string | 	version        string | ||||||
|  | 
 | ||||||
|  | 	config controller.Config | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
| func init() { | func init() { | ||||||
| 	flag.StringVar(&KubeConfigFile, "kubeconfig", "", "Path to kubeconfig file with authorization and master location information.") | 	flag.StringVar(&KubeConfigFile, "kubeconfig", "", "Path to kubeconfig file with authorization and master location information.") | ||||||
| 	flag.BoolVar(&OutOfCluster, "outofcluster", false, "Whether the operator runs in- our outside of the Kubernetes cluster.") | 	flag.BoolVar(&OutOfCluster, "outofcluster", false, "Whether the operator runs in- our outside of the Kubernetes cluster.") | ||||||
| 	flag.BoolVar(&noDatabaseAccess, "nodatabaseaccess", false, "Disable all access to the database from the operator side.") | 	flag.BoolVar(&config.NoDatabaseAccess, "nodatabaseaccess", false, "Disable all access to the database from the operator side.") | ||||||
| 	flag.BoolVar(&noTeamsAPI, "noteamsapi", false, "Disable all access to the teams API") | 	flag.BoolVar(&config.NoTeamsAPI, "noteamsapi", false, "Disable all access to the teams API") | ||||||
| 	flag.Parse() | 	flag.Parse() | ||||||
| 
 | 
 | ||||||
| 	podNamespace = os.Getenv("MY_POD_NAMESPACE") | 	config.Namespace = os.Getenv("MY_POD_NAMESPACE") | ||||||
| 	if podNamespace == "" { | 	if config.Namespace == "" { | ||||||
| 		podNamespace = "default" | 		config.Namespace = "default" | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	configMap := os.Getenv("CONFIG_MAP_NAME") | 	configMap := os.Getenv("CONFIG_MAP_NAME") | ||||||
| 	if configMap != "" { | 	if configMap != "" { | ||||||
| 		configMapName.Decode(configMap) | 		err := config.ConfigMapName.Decode(configMap) | ||||||
| 	} |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func ControllerConfig() *controller.Config { |  | ||||||
| 	restConfig, err := k8sutil.RestConfig(KubeConfigFile, OutOfCluster) |  | ||||||
| 		if err != nil { | 		if err != nil { | ||||||
| 		log.Fatalf("Can't get REST config: %s", err) | 			log.Fatalf("incorrect config map name") | ||||||
| 		} | 		} | ||||||
| 
 |  | ||||||
| 	client, err := k8sutil.KubernetesClient(restConfig) |  | ||||||
| 	if err != nil { |  | ||||||
| 		log.Fatalf("Can't create client: %s", err) |  | ||||||
| 	} |  | ||||||
| 
 |  | ||||||
| 	restClient, err := k8sutil.KubernetesRestClient(restConfig) |  | ||||||
| 	if err != nil { |  | ||||||
| 		log.Fatalf("Can't create rest client: %s", err) |  | ||||||
| 	} |  | ||||||
| 
 |  | ||||||
| 	return &controller.Config{ |  | ||||||
| 		RestConfig: restConfig, |  | ||||||
| 		KubeClient: client, |  | ||||||
| 		RestClient: restClient, |  | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func main() { | func main() { | ||||||
| 	configMapData := make(map[string]string) | 	var err error | ||||||
|  | 
 | ||||||
| 	log.SetOutput(os.Stdout) | 	log.SetOutput(os.Stdout) | ||||||
| 	log.Printf("Spilo operator %s\n", version) | 	log.Printf("Spilo operator %s\n", version) | ||||||
| 
 | 
 | ||||||
|  | @ -78,33 +53,13 @@ func main() { | ||||||
| 
 | 
 | ||||||
| 	wg := &sync.WaitGroup{} // Goroutines can add themselves to this to be waited on
 | 	wg := &sync.WaitGroup{} // Goroutines can add themselves to this to be waited on
 | ||||||
| 
 | 
 | ||||||
| 	controllerConfig := ControllerConfig() | 	config.RestConfig, err = k8sutil.RestConfig(KubeConfigFile, OutOfCluster) | ||||||
| 
 |  | ||||||
| 	if configMapName != (spec.NamespacedName{}) { |  | ||||||
| 		configMap, err := controllerConfig.KubeClient.ConfigMaps(configMapName.Namespace).Get(configMapName.Name, meta_v1.GetOptions{}) |  | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 			panic(err) | 		log.Fatalf("couldn't get REST config: %v", err) | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 		configMapData = configMap.Data | 	c := controller.NewController(&config) | ||||||
| 	} else { |  | ||||||
| 		log.Printf("No ConfigMap specified. Loading default values") |  | ||||||
| 	} |  | ||||||
| 
 | 
 | ||||||
| 	if configMapData["namespace"] == "" { // Namespace in ConfigMap has priority over env var
 |  | ||||||
| 		configMapData["namespace"] = podNamespace |  | ||||||
| 	} |  | ||||||
| 	if noDatabaseAccess { |  | ||||||
| 		configMapData["enable_database_access"] = "false" |  | ||||||
| 	} |  | ||||||
| 	if noTeamsAPI { |  | ||||||
| 		configMapData["enable_teams_api"] = "false" |  | ||||||
| 	} |  | ||||||
| 	cfg := config.NewFromMap(configMapData) |  | ||||||
| 
 |  | ||||||
| 	log.Printf("Config: %s", cfg.MustMarshal()) |  | ||||||
| 
 |  | ||||||
| 	c := controller.New(controllerConfig, cfg) |  | ||||||
| 	c.Run(stop, wg) | 	c.Run(stop, wg) | ||||||
| 
 | 
 | ||||||
| 	sig := <-sigs | 	sig := <-sigs | ||||||
|  |  | ||||||
|  | @ -1,8 +1,8 @@ | ||||||
| hash: 140e0c8a606d18ca405e9c50359cc673e4aa0cc88bcae5d7f83791e7002bd6a1 | hash: 140e0c8a606d18ca405e9c50359cc673e4aa0cc88bcae5d7f83791e7002bd6a1 | ||||||
| updated: 2017-07-12T12:52:55.896264+02:00 | updated: 2017-07-24T19:24:17.604824235+02:00 | ||||||
| imports: | imports: | ||||||
| - name: github.com/aws/aws-sdk-go | - name: github.com/aws/aws-sdk-go | ||||||
|   version: b1a7b51924b90a6ecdbaeb17e96418740ff07a1e |   version: afd601335e2a72d43caa3af6bd2abe512fcc3bfd | ||||||
|   subpackages: |   subpackages: | ||||||
|   - aws |   - aws | ||||||
|   - aws/awserr |   - aws/awserr | ||||||
|  | @ -30,7 +30,7 @@ imports: | ||||||
|   - service/ec2 |   - service/ec2 | ||||||
|   - service/sts |   - service/sts | ||||||
| - name: github.com/davecgh/go-spew | - name: github.com/davecgh/go-spew | ||||||
|   version: 5215b55f46b2b919f50a1df0eaa5886afe4e3b3d |   version: 782f4967f2dc4564575ca782fe2d04090b5faca8 | ||||||
|   subpackages: |   subpackages: | ||||||
|   - spew |   - spew | ||||||
| - name: github.com/docker/distribution | - name: github.com/docker/distribution | ||||||
|  | @ -104,7 +104,7 @@ imports: | ||||||
| - name: github.com/PuerkitoBio/urlesc | - name: github.com/PuerkitoBio/urlesc | ||||||
|   version: 5bd2802263f21d8788851d5305584c82a5c75d7e |   version: 5bd2802263f21d8788851d5305584c82a5c75d7e | ||||||
| - name: github.com/Sirupsen/logrus | - name: github.com/Sirupsen/logrus | ||||||
|   version: 7f976d3a76720c4c27af2ba716b85d2e0a7e38b1 |   version: a3f95b5c423586578a4e099b11a46c2479628cac | ||||||
| - name: github.com/spf13/pflag | - name: github.com/spf13/pflag | ||||||
|   version: 9ff6c6923cfffbcd502984b8e0c80539a94968b7 |   version: 9ff6c6923cfffbcd502984b8e0c80539a94968b7 | ||||||
| - name: github.com/ugorji/go | - name: github.com/ugorji/go | ||||||
|  | @ -118,12 +118,15 @@ imports: | ||||||
| - name: golang.org/x/net | - name: golang.org/x/net | ||||||
|   version: f2499483f923065a842d38eb4c7f1927e6fc6e6d |   version: f2499483f923065a842d38eb4c7f1927e6fc6e6d | ||||||
|   subpackages: |   subpackages: | ||||||
|  |   - html | ||||||
|  |   - html/atom | ||||||
|   - http2 |   - http2 | ||||||
|   - http2/hpack |   - http2/hpack | ||||||
|   - idna |   - idna | ||||||
|   - lex/httplex |   - lex/httplex | ||||||
|  |   - websocket | ||||||
| - name: golang.org/x/sys | - name: golang.org/x/sys | ||||||
|   version: 8f0908ab3b2457e2e15403d3697c9ef5cb4b57a9 |   version: c4489faa6e5ab84c0ef40d6ee878f7a030281f0f | ||||||
|   subpackages: |   subpackages: | ||||||
|   - unix |   - unix | ||||||
| - name: golang.org/x/text | - name: golang.org/x/text | ||||||
|  |  | ||||||
|  | @ -13,7 +13,6 @@ import ( | ||||||
| 	"github.com/Sirupsen/logrus" | 	"github.com/Sirupsen/logrus" | ||||||
| 	meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1" | 	meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||||||
| 	"k8s.io/apimachinery/pkg/types" | 	"k8s.io/apimachinery/pkg/types" | ||||||
| 	"k8s.io/client-go/kubernetes" |  | ||||||
| 	"k8s.io/client-go/pkg/api/v1" | 	"k8s.io/client-go/pkg/api/v1" | ||||||
| 	"k8s.io/client-go/pkg/apis/apps/v1beta1" | 	"k8s.io/client-go/pkg/apis/apps/v1beta1" | ||||||
| 	"k8s.io/client-go/rest" | 	"k8s.io/client-go/rest" | ||||||
|  | @ -36,11 +35,8 @@ var ( | ||||||
| 
 | 
 | ||||||
| // Config contains operator-wide clients and configuration used from a cluster. TODO: remove struct duplication.
 | // Config contains operator-wide clients and configuration used from a cluster. TODO: remove struct duplication.
 | ||||||
| type Config struct { | type Config struct { | ||||||
| 	KubeClient          *kubernetes.Clientset //TODO: move clients to the better place?
 |  | ||||||
| 	RestClient          *rest.RESTClient |  | ||||||
| 	RestConfig          *rest.Config |  | ||||||
| 	TeamsAPIClient      *teams.API |  | ||||||
| 	OpConfig            config.Config | 	OpConfig            config.Config | ||||||
|  | 	RestConfig          *rest.Config | ||||||
| 	InfrastructureRoles map[string]spec.PgUser // inherited from the controller
 | 	InfrastructureRoles map[string]spec.PgUser // inherited from the controller
 | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | @ -68,6 +64,9 @@ type Cluster struct { | ||||||
| 	userSyncStrategy spec.UserSyncer | 	userSyncStrategy spec.UserSyncer | ||||||
| 	deleteOptions    *meta_v1.DeleteOptions | 	deleteOptions    *meta_v1.DeleteOptions | ||||||
| 	podEventsQueue   *cache.FIFO | 	podEventsQueue   *cache.FIFO | ||||||
|  | 
 | ||||||
|  | 	teamsAPIClient *teams.API | ||||||
|  | 	KubeClient     k8sutil.KubernetesClient //TODO: move clients to the better place?
 | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| type compareStatefulsetResult struct { | type compareStatefulsetResult struct { | ||||||
|  | @ -78,7 +77,7 @@ type compareStatefulsetResult struct { | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| // New creates a new cluster. This function should be called from a controller.
 | // New creates a new cluster. This function should be called from a controller.
 | ||||||
| func New(cfg Config, pgSpec spec.Postgresql, logger *logrus.Entry) *Cluster { | func New(cfg Config, kubeClient k8sutil.KubernetesClient, pgSpec spec.Postgresql, logger *logrus.Entry) *Cluster { | ||||||
| 	lg := logger.WithField("pkg", "cluster").WithField("cluster-name", pgSpec.Metadata.Name) | 	lg := logger.WithField("pkg", "cluster").WithField("cluster-name", pgSpec.Metadata.Name) | ||||||
| 	kubeResources := kubeResources{Secrets: make(map[types.UID]*v1.Secret), Service: make(map[PostgresRole]*v1.Service)} | 	kubeResources := kubeResources{Secrets: make(map[types.UID]*v1.Secret), Service: make(map[PostgresRole]*v1.Service)} | ||||||
| 	orphanDependents := true | 	orphanDependents := true | ||||||
|  | @ -104,6 +103,8 @@ func New(cfg Config, pgSpec spec.Postgresql, logger *logrus.Entry) *Cluster { | ||||||
| 		userSyncStrategy: users.DefaultUserSyncStrategy{}, | 		userSyncStrategy: users.DefaultUserSyncStrategy{}, | ||||||
| 		deleteOptions:    &meta_v1.DeleteOptions{OrphanDependents: &orphanDependents}, | 		deleteOptions:    &meta_v1.DeleteOptions{OrphanDependents: &orphanDependents}, | ||||||
| 		podEventsQueue:   podEventsQueue, | 		podEventsQueue:   podEventsQueue, | ||||||
|  | 		KubeClient:       kubeClient, | ||||||
|  | 		teamsAPIClient:   teams.NewTeamsAPI(cfg.OpConfig.TeamsAPIUrl, logger.Logger), | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	return cluster | 	return cluster | ||||||
|  | @ -126,7 +127,7 @@ func (c *Cluster) setStatus(status spec.PostgresStatus) { | ||||||
| 	} | 	} | ||||||
| 	request := []byte(fmt.Sprintf(`{"status": %s}`, string(b))) //TODO: Look into/wait for k8s go client methods
 | 	request := []byte(fmt.Sprintf(`{"status": %s}`, string(b))) //TODO: Look into/wait for k8s go client methods
 | ||||||
| 
 | 
 | ||||||
| 	_, err = c.RestClient.Patch(types.MergePatchType). | 	_, err = c.KubeClient.RESTClient.Patch(types.MergePatchType). | ||||||
| 		RequestURI(c.Metadata.GetSelfLink()). | 		RequestURI(c.Metadata.GetSelfLink()). | ||||||
| 		Body(request). | 		Body(request). | ||||||
| 		DoRaw() | 		DoRaw() | ||||||
|  | @ -137,7 +138,7 @@ func (c *Cluster) setStatus(status spec.PostgresStatus) { | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		c.logger.Warningf("could not set status for cluster '%s': %s", c.clusterName(), err) | 		c.logger.Warningf("could not set status for cluster %q: %v", c.clusterName(), err) | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | @ -180,7 +181,7 @@ func (c *Cluster) Create() error { | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return fmt.Errorf("could not create endpoint: %v", err) | 		return fmt.Errorf("could not create endpoint: %v", err) | ||||||
| 	} | 	} | ||||||
| 	c.logger.Infof("endpoint '%s' has been successfully created", util.NameFromMeta(ep.ObjectMeta)) | 	c.logger.Infof("endpoint %q has been successfully created", util.NameFromMeta(ep.ObjectMeta)) | ||||||
| 
 | 
 | ||||||
| 	for _, role := range []PostgresRole{Master, Replica} { | 	for _, role := range []PostgresRole{Master, Replica} { | ||||||
| 		if role == Replica && !c.Spec.ReplicaLoadBalancer { | 		if role == Replica && !c.Spec.ReplicaLoadBalancer { | ||||||
|  | @ -190,7 +191,7 @@ func (c *Cluster) Create() error { | ||||||
| 		if err != nil { | 		if err != nil { | ||||||
| 			return fmt.Errorf("could not create %s service: %v", role, err) | 			return fmt.Errorf("could not create %s service: %v", role, err) | ||||||
| 		} | 		} | ||||||
| 		c.logger.Infof("%s service '%s' has been successfully created", role, util.NameFromMeta(service.ObjectMeta)) | 		c.logger.Infof("%s service %q has been successfully created", role, util.NameFromMeta(service.ObjectMeta)) | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	if err = c.initUsers(); err != nil { | 	if err = c.initUsers(); err != nil { | ||||||
|  | @ -207,12 +208,12 @@ func (c *Cluster) Create() error { | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return fmt.Errorf("could not create statefulset: %v", err) | 		return fmt.Errorf("could not create statefulset: %v", err) | ||||||
| 	} | 	} | ||||||
| 	c.logger.Infof("statefulset '%s' has been successfully created", util.NameFromMeta(ss.ObjectMeta)) | 	c.logger.Infof("statefulset %q has been successfully created", util.NameFromMeta(ss.ObjectMeta)) | ||||||
| 
 | 
 | ||||||
| 	c.logger.Info("Waiting for cluster being ready") | 	c.logger.Info("Waiting for cluster being ready") | ||||||
| 
 | 
 | ||||||
| 	if err = c.waitStatefulsetPodsReady(); err != nil { | 	if err = c.waitStatefulsetPodsReady(); err != nil { | ||||||
| 		c.logger.Errorf("Failed to create cluster: %s", err) | 		c.logger.Errorf("Failed to create cluster: %v", err) | ||||||
| 		return err | 		return err | ||||||
| 	} | 	} | ||||||
| 	c.logger.Infof("pods are ready") | 	c.logger.Infof("pods are ready") | ||||||
|  | @ -233,7 +234,7 @@ func (c *Cluster) Create() error { | ||||||
| 
 | 
 | ||||||
| 	err = c.listResources() | 	err = c.listResources() | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		c.logger.Errorf("could not list resources: %s", err) | 		c.logger.Errorf("could not list resources: %v", err) | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	return nil | 	return nil | ||||||
|  | @ -243,7 +244,7 @@ func (c *Cluster) sameServiceWith(role PostgresRole, service *v1.Service) (match | ||||||
| 	//TODO: improve comparison
 | 	//TODO: improve comparison
 | ||||||
| 	match = true | 	match = true | ||||||
| 	if c.Service[role].Spec.Type != service.Spec.Type { | 	if c.Service[role].Spec.Type != service.Spec.Type { | ||||||
| 		return false, fmt.Sprintf("new %s service's type %s doesn't match the current one %s", | 		return false, fmt.Sprintf("new %s service's type %q doesn't match the current one %q", | ||||||
| 			role, service.Spec.Type, c.Service[role].Spec.Type) | 			role, service.Spec.Type, c.Service[role].Spec.Type) | ||||||
| 	} | 	} | ||||||
| 	oldSourceRanges := c.Service[role].Spec.LoadBalancerSourceRanges | 	oldSourceRanges := c.Service[role].Spec.LoadBalancerSourceRanges | ||||||
|  | @ -259,7 +260,7 @@ func (c *Cluster) sameServiceWith(role PostgresRole, service *v1.Service) (match | ||||||
| 	oldDNSAnnotation := c.Service[role].Annotations[constants.ZalandoDNSNameAnnotation] | 	oldDNSAnnotation := c.Service[role].Annotations[constants.ZalandoDNSNameAnnotation] | ||||||
| 	newDNSAnnotation := service.Annotations[constants.ZalandoDNSNameAnnotation] | 	newDNSAnnotation := service.Annotations[constants.ZalandoDNSNameAnnotation] | ||||||
| 	if oldDNSAnnotation != newDNSAnnotation { | 	if oldDNSAnnotation != newDNSAnnotation { | ||||||
| 		return false, fmt.Sprintf("new %s service's '%s' annotation doesn't match the current one", role, constants.ZalandoDNSNameAnnotation) | 		return false, fmt.Sprintf("new %s service's %q annotation doesn't match the current one", role, constants.ZalandoDNSNameAnnotation) | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	return true, "" | 	return true, "" | ||||||
|  | @ -290,7 +291,7 @@ func (c *Cluster) compareStatefulSetWith(statefulSet *v1beta1.StatefulSet) *comp | ||||||
| 	} | 	} | ||||||
| 	if len(c.Statefulset.Spec.Template.Spec.Containers) == 0 { | 	if len(c.Statefulset.Spec.Template.Spec.Containers) == 0 { | ||||||
| 
 | 
 | ||||||
| 		c.logger.Warnf("statefulset '%s' has no container", util.NameFromMeta(c.Statefulset.ObjectMeta)) | 		c.logger.Warnf("statefulset %q has no container", util.NameFromMeta(c.Statefulset.ObjectMeta)) | ||||||
| 		return &compareStatefulsetResult{} | 		return &compareStatefulsetResult{} | ||||||
| 	} | 	} | ||||||
| 	// In the comparisons below, the needsReplace and needsRollUpdate flags are never reset, since checks fall through
 | 	// In the comparisons below, the needsReplace and needsRollUpdate flags are never reset, since checks fall through
 | ||||||
|  | @ -333,12 +334,12 @@ func (c *Cluster) compareStatefulSetWith(statefulSet *v1beta1.StatefulSet) *comp | ||||||
| 		} | 		} | ||||||
| 		if !reflect.DeepEqual(c.Statefulset.Spec.VolumeClaimTemplates[i].Annotations, statefulSet.Spec.VolumeClaimTemplates[i].Annotations) { | 		if !reflect.DeepEqual(c.Statefulset.Spec.VolumeClaimTemplates[i].Annotations, statefulSet.Spec.VolumeClaimTemplates[i].Annotations) { | ||||||
| 			needsReplace = true | 			needsReplace = true | ||||||
| 			reasons = append(reasons, fmt.Sprintf("new statefulset's annotations for volume %s doesn't match the current one", name)) | 			reasons = append(reasons, fmt.Sprintf("new statefulset's annotations for volume %q doesn't match the current one", name)) | ||||||
| 		} | 		} | ||||||
| 		if !reflect.DeepEqual(c.Statefulset.Spec.VolumeClaimTemplates[i].Spec, statefulSet.Spec.VolumeClaimTemplates[i].Spec) { | 		if !reflect.DeepEqual(c.Statefulset.Spec.VolumeClaimTemplates[i].Spec, statefulSet.Spec.VolumeClaimTemplates[i].Spec) { | ||||||
| 			name := c.Statefulset.Spec.VolumeClaimTemplates[i].Name | 			name := c.Statefulset.Spec.VolumeClaimTemplates[i].Name | ||||||
| 			needsReplace = true | 			needsReplace = true | ||||||
| 			reasons = append(reasons, fmt.Sprintf("new statefulset's volumeClaimTemplates specification for volume %s doesn't match the current one", name)) | 			reasons = append(reasons, fmt.Sprintf("new statefulset's volumeClaimTemplates specification for volume %q doesn't match the current one", name)) | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
|  | @ -405,7 +406,7 @@ func (c *Cluster) Update(newSpec *spec.Postgresql) error { | ||||||
| 	defer c.mu.Unlock() | 	defer c.mu.Unlock() | ||||||
| 
 | 
 | ||||||
| 	c.setStatus(spec.ClusterStatusUpdating) | 	c.setStatus(spec.ClusterStatusUpdating) | ||||||
| 	c.logger.Debugf("Cluster update from version %s to %s", | 	c.logger.Debugf("Cluster update from version %q to %q", | ||||||
| 		c.Metadata.ResourceVersion, newSpec.Metadata.ResourceVersion) | 		c.Metadata.ResourceVersion, newSpec.Metadata.ResourceVersion) | ||||||
| 
 | 
 | ||||||
| 	/* Make sure we update when this function exists */ | 	/* Make sure we update when this function exists */ | ||||||
|  | @ -431,7 +432,7 @@ func (c *Cluster) Update(newSpec *spec.Postgresql) error { | ||||||
| 					if err != nil { | 					if err != nil { | ||||||
| 						return fmt.Errorf("could not create new %s service: %v", role, err) | 						return fmt.Errorf("could not create new %s service: %v", role, err) | ||||||
| 					} | 					} | ||||||
| 					c.logger.Infof("%s service '%s' has been created", role, util.NameFromMeta(service.ObjectMeta)) | 					c.logger.Infof("%s service %q has been created", role, util.NameFromMeta(service.ObjectMeta)) | ||||||
| 				} | 				} | ||||||
| 			} | 			} | ||||||
| 			// only proceed further if both old and new load balancer were present
 | 			// only proceed further if both old and new load balancer were present
 | ||||||
|  | @ -446,7 +447,7 @@ func (c *Cluster) Update(newSpec *spec.Postgresql) error { | ||||||
| 				c.setStatus(spec.ClusterStatusUpdateFailed) | 				c.setStatus(spec.ClusterStatusUpdateFailed) | ||||||
| 				return fmt.Errorf("could not update %s service: %v", role, err) | 				return fmt.Errorf("could not update %s service: %v", role, err) | ||||||
| 			} | 			} | ||||||
| 			c.logger.Infof("%s service '%s' has been updated", role, util.NameFromMeta(c.Service[role].ObjectMeta)) | 			c.logger.Infof("%s service %q has been updated", role, util.NameFromMeta(c.Service[role].ObjectMeta)) | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
|  | @ -471,11 +472,11 @@ func (c *Cluster) Update(newSpec *spec.Postgresql) error { | ||||||
| 			} | 			} | ||||||
| 		} | 		} | ||||||
| 		//TODO: if there is a change in numberOfInstances, make sure Pods have been created/deleted
 | 		//TODO: if there is a change in numberOfInstances, make sure Pods have been created/deleted
 | ||||||
| 		c.logger.Infof("statefulset '%s' has been updated", util.NameFromMeta(c.Statefulset.ObjectMeta)) | 		c.logger.Infof("statefulset %q has been updated", util.NameFromMeta(c.Statefulset.ObjectMeta)) | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	if c.Spec.PgVersion != newSpec.Spec.PgVersion { // PG versions comparison
 | 	if c.Spec.PgVersion != newSpec.Spec.PgVersion { // PG versions comparison
 | ||||||
| 		c.logger.Warnf("Postgresql version change(%s -> %s) is not allowed", | 		c.logger.Warnf("Postgresql version change(%q -> %q) is not allowed", | ||||||
| 			c.Spec.PgVersion, newSpec.Spec.PgVersion) | 			c.Spec.PgVersion, newSpec.Spec.PgVersion) | ||||||
| 		//TODO: rewrite pg version in tpr spec
 | 		//TODO: rewrite pg version in tpr spec
 | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
|  | @ -6,7 +6,8 @@ import ( | ||||||
| 
 | 
 | ||||||
| 	meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1" | 	meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||||||
| 	remotecommandconsts "k8s.io/apimachinery/pkg/util/remotecommand" | 	remotecommandconsts "k8s.io/apimachinery/pkg/util/remotecommand" | ||||||
| 	"k8s.io/client-go/pkg/api" | 	"k8s.io/client-go/kubernetes/scheme" | ||||||
|  | 	"k8s.io/client-go/pkg/api/v1" | ||||||
| 	"k8s.io/client-go/tools/remotecommand" | 	"k8s.io/client-go/tools/remotecommand" | ||||||
| 
 | 
 | ||||||
| 	"github.com/zalando-incubator/postgres-operator/pkg/spec" | 	"github.com/zalando-incubator/postgres-operator/pkg/spec" | ||||||
|  | @ -27,17 +28,17 @@ func (c *Cluster) ExecCommand(podName *spec.NamespacedName, command ...string) ( | ||||||
| 		return "", fmt.Errorf("could not determine which container to use") | 		return "", fmt.Errorf("could not determine which container to use") | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	req := c.RestClient.Post(). | 	req := c.KubeClient.RESTClient.Post(). | ||||||
| 		Resource("pods"). | 		Resource("pods"). | ||||||
| 		Name(podName.Name). | 		Name(podName.Name). | ||||||
| 		Namespace(podName.Namespace). | 		Namespace(podName.Namespace). | ||||||
| 		SubResource("exec") | 		SubResource("exec") | ||||||
| 	req.VersionedParams(&api.PodExecOptions{ | 	req.VersionedParams(&v1.PodExecOptions{ | ||||||
| 		Container: pod.Spec.Containers[0].Name, | 		Container: pod.Spec.Containers[0].Name, | ||||||
| 		Command:   command, | 		Command:   command, | ||||||
| 		Stdout:    true, | 		Stdout:    true, | ||||||
| 		Stderr:    true, | 		Stderr:    true, | ||||||
| 	}, api.ParameterCodec) | 	}, scheme.ParameterCodec) | ||||||
| 
 | 
 | ||||||
| 	exec, err := remotecommand.NewExecutor(c.RestConfig, "POST", req.URL()) | 	exec, err := remotecommand.NewExecutor(c.RestConfig, "POST", req.URL()) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
|  |  | ||||||
|  | @ -39,5 +39,5 @@ func (c *Cluster) resizePostgresFilesystem(podName *spec.NamespacedName, resizer | ||||||
| 
 | 
 | ||||||
| 		return err | 		return err | ||||||
| 	} | 	} | ||||||
| 	return fmt.Errorf("could not resize filesystem: no compatible resizers for the filesystem of type %s", fsType) | 	return fmt.Errorf("could not resize filesystem: no compatible resizers for the filesystem of type %q", fsType) | ||||||
| } | } | ||||||
|  |  | ||||||
|  | @ -199,7 +199,7 @@ PATRONI_INITDB_PARAMS: | ||||||
| 	} | 	} | ||||||
| 	result, err := json.Marshal(config) | 	result, err := json.Marshal(config) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		c.logger.Errorf("Cannot convert spilo configuration into JSON: %s", err) | 		c.logger.Errorf("Cannot convert spilo configuration into JSON: %v", err) | ||||||
| 		return "" | 		return "" | ||||||
| 	} | 	} | ||||||
| 	return string(result) | 	return string(result) | ||||||
|  | @ -451,7 +451,7 @@ func (c *Cluster) generateService(role PostgresRole, newSpec *spec.PostgresSpec) | ||||||
| 	if (newSpec.UseLoadBalancer != nil && *newSpec.UseLoadBalancer) || | 	if (newSpec.UseLoadBalancer != nil && *newSpec.UseLoadBalancer) || | ||||||
| 		(newSpec.UseLoadBalancer == nil && c.OpConfig.EnableLoadBalancer) { | 		(newSpec.UseLoadBalancer == nil && c.OpConfig.EnableLoadBalancer) { | ||||||
| 
 | 
 | ||||||
| 		// safe default value: lock load balancer to only local address unless overriden explicitely.
 | 		// safe default value: lock load balancer to only local address unless overridden explicitly.
 | ||||||
| 		sourceRanges := []string{localHost} | 		sourceRanges := []string{localHost} | ||||||
| 		allowedSourceRanges := newSpec.AllowedSourceRanges | 		allowedSourceRanges := newSpec.AllowedSourceRanges | ||||||
| 		if len(allowedSourceRanges) >= 0 { | 		if len(allowedSourceRanges) >= 0 { | ||||||
|  |  | ||||||
|  | @ -35,11 +35,11 @@ func (c *Cluster) deletePods() error { | ||||||
| 	for _, obj := range pods { | 	for _, obj := range pods { | ||||||
| 		podName := util.NameFromMeta(obj.ObjectMeta) | 		podName := util.NameFromMeta(obj.ObjectMeta) | ||||||
| 
 | 
 | ||||||
| 		c.logger.Debugf("Deleting pod '%s'", podName) | 		c.logger.Debugf("Deleting pod %q", podName) | ||||||
| 		if err := c.deletePod(podName); err != nil { | 		if err := c.deletePod(podName); err != nil { | ||||||
| 			c.logger.Errorf("could not delete pod '%s': %s", podName, err) | 			c.logger.Errorf("could not delete pod %q: %v", podName, err) | ||||||
| 		} else { | 		} else { | ||||||
| 			c.logger.Infof("pod '%s' has been deleted", podName) | 			c.logger.Infof("pod %q has been deleted", podName) | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
| 	if len(pods) > 0 { | 	if len(pods) > 0 { | ||||||
|  | @ -107,7 +107,7 @@ func (c *Cluster) recreatePod(pod v1.Pod) error { | ||||||
| 	if err := c.waitForPodLabel(ch); err != nil { | 	if err := c.waitForPodLabel(ch); err != nil { | ||||||
| 		return err | 		return err | ||||||
| 	} | 	} | ||||||
| 	c.logger.Infof("pod '%s' is ready", podName) | 	c.logger.Infof("pod %q is ready", podName) | ||||||
| 
 | 
 | ||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
|  | @ -136,7 +136,7 @@ func (c *Cluster) recreatePods() error { | ||||||
| 		} | 		} | ||||||
| 
 | 
 | ||||||
| 		if err := c.recreatePod(pod); err != nil { | 		if err := c.recreatePod(pod); err != nil { | ||||||
| 			return fmt.Errorf("could not recreate replica pod '%s': %v", util.NameFromMeta(pod.ObjectMeta), err) | 			return fmt.Errorf("could not recreate replica pod %q: %v", util.NameFromMeta(pod.ObjectMeta), err) | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
| 	if masterPod.Name == "" { | 	if masterPod.Name == "" { | ||||||
|  | @ -144,10 +144,10 @@ func (c *Cluster) recreatePods() error { | ||||||
| 	} else { | 	} else { | ||||||
| 		//TODO: do manual failover
 | 		//TODO: do manual failover
 | ||||||
| 		//TODO: specify master, leave new master empty
 | 		//TODO: specify master, leave new master empty
 | ||||||
| 		c.logger.Infof("Recreating master pod '%s'", util.NameFromMeta(masterPod.ObjectMeta)) | 		c.logger.Infof("Recreating master pod %q", util.NameFromMeta(masterPod.ObjectMeta)) | ||||||
| 
 | 
 | ||||||
| 		if err := c.recreatePod(masterPod); err != nil { | 		if err := c.recreatePod(masterPod); err != nil { | ||||||
| 			return fmt.Errorf("could not recreate master pod '%s': %v", util.NameFromMeta(masterPod.ObjectMeta), err) | 			return fmt.Errorf("could not recreate master pod %q: %v", util.NameFromMeta(masterPod.ObjectMeta), err) | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
|  |  | ||||||
|  | @ -61,7 +61,7 @@ func (c *Cluster) loadResources() error { | ||||||
| 			continue | 			continue | ||||||
| 		} | 		} | ||||||
| 		c.Secrets[secret.UID] = &secrets.Items[i] | 		c.Secrets[secret.UID] = &secrets.Items[i] | ||||||
| 		c.logger.Debugf("secret loaded, uid: %s", secret.UID) | 		c.logger.Debugf("secret loaded, uid: %q", secret.UID) | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	statefulSets, err := c.KubeClient.StatefulSets(ns).List(listOptions) | 	statefulSets, err := c.KubeClient.StatefulSets(ns).List(listOptions) | ||||||
|  | @ -80,19 +80,19 @@ func (c *Cluster) loadResources() error { | ||||||
| 
 | 
 | ||||||
| func (c *Cluster) listResources() error { | func (c *Cluster) listResources() error { | ||||||
| 	if c.Statefulset != nil { | 	if c.Statefulset != nil { | ||||||
| 		c.logger.Infof("Found statefulset: %s (uid: %s)", util.NameFromMeta(c.Statefulset.ObjectMeta), c.Statefulset.UID) | 		c.logger.Infof("Found statefulset: %q (uid: %q)", util.NameFromMeta(c.Statefulset.ObjectMeta), c.Statefulset.UID) | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	for _, obj := range c.Secrets { | 	for _, obj := range c.Secrets { | ||||||
| 		c.logger.Infof("Found secret: %s (uid: %s)", util.NameFromMeta(obj.ObjectMeta), obj.UID) | 		c.logger.Infof("Found secret: %q (uid: %q)", util.NameFromMeta(obj.ObjectMeta), obj.UID) | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	if c.Endpoint != nil { | 	if c.Endpoint != nil { | ||||||
| 		c.logger.Infof("Found endpoint: %s (uid: %s)", util.NameFromMeta(c.Endpoint.ObjectMeta), c.Endpoint.UID) | 		c.logger.Infof("Found endpoint: %q (uid: %q)", util.NameFromMeta(c.Endpoint.ObjectMeta), c.Endpoint.UID) | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	for role, service := range c.Service { | 	for role, service := range c.Service { | ||||||
| 		c.logger.Infof("Found %s service: %s (uid: %s)", role, util.NameFromMeta(service.ObjectMeta), service.UID) | 		c.logger.Infof("Found %s service: %q (uid: %q)", role, util.NameFromMeta(service.ObjectMeta), service.UID) | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	pods, err := c.listPods() | 	pods, err := c.listPods() | ||||||
|  | @ -101,7 +101,7 @@ func (c *Cluster) listResources() error { | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	for _, obj := range pods { | 	for _, obj := range pods { | ||||||
| 		c.logger.Infof("Found pod: %s (uid: %s)", util.NameFromMeta(obj.ObjectMeta), obj.UID) | 		c.logger.Infof("Found pod: %q (uid: %q)", util.NameFromMeta(obj.ObjectMeta), obj.UID) | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	pvcs, err := c.listPersistentVolumeClaims() | 	pvcs, err := c.listPersistentVolumeClaims() | ||||||
|  | @ -110,7 +110,7 @@ func (c *Cluster) listResources() error { | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	for _, obj := range pvcs { | 	for _, obj := range pvcs { | ||||||
| 		c.logger.Infof("Found PVC: %s (uid: %s)", util.NameFromMeta(obj.ObjectMeta), obj.UID) | 		c.logger.Infof("Found PVC: %q (uid: %q)", util.NameFromMeta(obj.ObjectMeta), obj.UID) | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	return nil | 	return nil | ||||||
|  | @ -129,7 +129,7 @@ func (c *Cluster) createStatefulSet() (*v1beta1.StatefulSet, error) { | ||||||
| 		return nil, err | 		return nil, err | ||||||
| 	} | 	} | ||||||
| 	c.Statefulset = statefulSet | 	c.Statefulset = statefulSet | ||||||
| 	c.logger.Debugf("Created new statefulset '%s', uid: %s", util.NameFromMeta(statefulSet.ObjectMeta), statefulSet.UID) | 	c.logger.Debugf("Created new statefulset %q, uid: %q", util.NameFromMeta(statefulSet.ObjectMeta), statefulSet.UID) | ||||||
| 
 | 
 | ||||||
| 	return statefulSet, nil | 	return statefulSet, nil | ||||||
| } | } | ||||||
|  | @ -144,7 +144,7 @@ func (c *Cluster) updateStatefulSet(newStatefulSet *v1beta1.StatefulSet) error { | ||||||
| 
 | 
 | ||||||
| 	patchData, err := specPatch(newStatefulSet.Spec) | 	patchData, err := specPatch(newStatefulSet.Spec) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return fmt.Errorf("could not form patch for the statefulset '%s': %v", statefulSetName, err) | 		return fmt.Errorf("could not form patch for the statefulset %q: %v", statefulSetName, err) | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	statefulSet, err := c.KubeClient.StatefulSets(c.Statefulset.Namespace).Patch( | 	statefulSet, err := c.KubeClient.StatefulSets(c.Statefulset.Namespace).Patch( | ||||||
|  | @ -152,7 +152,7 @@ func (c *Cluster) updateStatefulSet(newStatefulSet *v1beta1.StatefulSet) error { | ||||||
| 		types.MergePatchType, | 		types.MergePatchType, | ||||||
| 		patchData, "") | 		patchData, "") | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return fmt.Errorf("could not patch statefulset '%s': %v", statefulSetName, err) | 		return fmt.Errorf("could not patch statefulset %q: %v", statefulSetName, err) | ||||||
| 	} | 	} | ||||||
| 	c.Statefulset = statefulSet | 	c.Statefulset = statefulSet | ||||||
| 
 | 
 | ||||||
|  | @ -174,7 +174,7 @@ func (c *Cluster) replaceStatefulSet(newStatefulSet *v1beta1.StatefulSet) error | ||||||
| 
 | 
 | ||||||
| 	options := meta_v1.DeleteOptions{OrphanDependents: &orphanDepencies} | 	options := meta_v1.DeleteOptions{OrphanDependents: &orphanDepencies} | ||||||
| 	if err := c.KubeClient.StatefulSets(oldStatefulset.Namespace).Delete(oldStatefulset.Name, &options); err != nil { | 	if err := c.KubeClient.StatefulSets(oldStatefulset.Namespace).Delete(oldStatefulset.Name, &options); err != nil { | ||||||
| 		return fmt.Errorf("could not delete statefulset '%s': %v", statefulSetName, err) | 		return fmt.Errorf("could not delete statefulset %q: %v", statefulSetName, err) | ||||||
| 	} | 	} | ||||||
| 	// make sure we clear the stored statefulset status if the subsequent create fails.
 | 	// make sure we clear the stored statefulset status if the subsequent create fails.
 | ||||||
| 	c.Statefulset = nil | 	c.Statefulset = nil | ||||||
|  | @ -194,7 +194,7 @@ func (c *Cluster) replaceStatefulSet(newStatefulSet *v1beta1.StatefulSet) error | ||||||
| 	// create the new statefulset with the desired spec. It would take over the remaining pods.
 | 	// create the new statefulset with the desired spec. It would take over the remaining pods.
 | ||||||
| 	createdStatefulset, err := c.KubeClient.StatefulSets(newStatefulSet.Namespace).Create(newStatefulSet) | 	createdStatefulset, err := c.KubeClient.StatefulSets(newStatefulSet.Namespace).Create(newStatefulSet) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return fmt.Errorf("could not create statefulset '%s': %v", statefulSetName, err) | 		return fmt.Errorf("could not create statefulset %q: %v", statefulSetName, err) | ||||||
| 	} | 	} | ||||||
| 	// check that all the previous replicas were picked up.
 | 	// check that all the previous replicas were picked up.
 | ||||||
| 	if newStatefulSet.Spec.Replicas == oldStatefulset.Spec.Replicas && | 	if newStatefulSet.Spec.Replicas == oldStatefulset.Spec.Replicas && | ||||||
|  | @ -216,7 +216,7 @@ func (c *Cluster) deleteStatefulSet() error { | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return err | 		return err | ||||||
| 	} | 	} | ||||||
| 	c.logger.Infof("statefulset '%s' has been deleted", util.NameFromMeta(c.Statefulset.ObjectMeta)) | 	c.logger.Infof("statefulset %q has been deleted", util.NameFromMeta(c.Statefulset.ObjectMeta)) | ||||||
| 	c.Statefulset = nil | 	c.Statefulset = nil | ||||||
| 
 | 
 | ||||||
| 	if err := c.deletePods(); err != nil { | 	if err := c.deletePods(); err != nil { | ||||||
|  | @ -270,12 +270,12 @@ func (c *Cluster) updateService(role PostgresRole, newService *v1.Service) error | ||||||
| 		} | 		} | ||||||
| 		err = c.KubeClient.Services(c.Service[role].Namespace).Delete(c.Service[role].Name, c.deleteOptions) | 		err = c.KubeClient.Services(c.Service[role].Namespace).Delete(c.Service[role].Name, c.deleteOptions) | ||||||
| 		if err != nil { | 		if err != nil { | ||||||
| 			return fmt.Errorf("could not delete service '%s': '%v'", serviceName, err) | 			return fmt.Errorf("could not delete service %q: %v", serviceName, err) | ||||||
| 		} | 		} | ||||||
| 		c.Endpoint = nil | 		c.Endpoint = nil | ||||||
| 		svc, err := c.KubeClient.Services(newService.Namespace).Create(newService) | 		svc, err := c.KubeClient.Services(newService.Namespace).Create(newService) | ||||||
| 		if err != nil { | 		if err != nil { | ||||||
| 			return fmt.Errorf("could not create service '%s': '%v'", serviceName, err) | 			return fmt.Errorf("could not create service %q: %v", serviceName, err) | ||||||
| 		} | 		} | ||||||
| 		c.Service[role] = svc | 		c.Service[role] = svc | ||||||
| 		if role == Master { | 		if role == Master { | ||||||
|  | @ -283,7 +283,7 @@ func (c *Cluster) updateService(role PostgresRole, newService *v1.Service) error | ||||||
| 			endpointSpec := c.generateMasterEndpoints(currentEndpoint.Subsets) | 			endpointSpec := c.generateMasterEndpoints(currentEndpoint.Subsets) | ||||||
| 			ep, err := c.KubeClient.Endpoints(c.Service[role].Namespace).Create(endpointSpec) | 			ep, err := c.KubeClient.Endpoints(c.Service[role].Namespace).Create(endpointSpec) | ||||||
| 			if err != nil { | 			if err != nil { | ||||||
| 				return fmt.Errorf("could not create endpoint '%s': '%v'", endpointName, err) | 				return fmt.Errorf("could not create endpoint %q: %v", endpointName, err) | ||||||
| 			} | 			} | ||||||
| 			c.Endpoint = ep | 			c.Endpoint = ep | ||||||
| 		} | 		} | ||||||
|  | @ -299,13 +299,13 @@ func (c *Cluster) updateService(role PostgresRole, newService *v1.Service) error | ||||||
| 			[]byte(annotationsPatchData), "") | 			[]byte(annotationsPatchData), "") | ||||||
| 
 | 
 | ||||||
| 		if err != nil { | 		if err != nil { | ||||||
| 			return fmt.Errorf("could not replace annotations for the service '%s': %v", serviceName, err) | 			return fmt.Errorf("could not replace annotations for the service %q: %v", serviceName, err) | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	patchData, err := specPatch(newService.Spec) | 	patchData, err := specPatch(newService.Spec) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return fmt.Errorf("could not form patch for the service '%s': %v", serviceName, err) | 		return fmt.Errorf("could not form patch for the service %q: %v", serviceName, err) | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	svc, err := c.KubeClient.Services(c.Service[role].Namespace).Patch( | 	svc, err := c.KubeClient.Services(c.Service[role].Namespace).Patch( | ||||||
|  | @ -313,7 +313,7 @@ func (c *Cluster) updateService(role PostgresRole, newService *v1.Service) error | ||||||
| 		types.MergePatchType, | 		types.MergePatchType, | ||||||
| 		patchData, "") | 		patchData, "") | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return fmt.Errorf("could not patch service '%s': %v", serviceName, err) | 		return fmt.Errorf("could not patch service %q: %v", serviceName, err) | ||||||
| 	} | 	} | ||||||
| 	c.Service[role] = svc | 	c.Service[role] = svc | ||||||
| 
 | 
 | ||||||
|  | @ -330,7 +330,7 @@ func (c *Cluster) deleteService(role PostgresRole) error { | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return err | 		return err | ||||||
| 	} | 	} | ||||||
| 	c.logger.Infof("%s service '%s' has been deleted", role, util.NameFromMeta(service.ObjectMeta)) | 	c.logger.Infof("%s service %q has been deleted", role, util.NameFromMeta(service.ObjectMeta)) | ||||||
| 	c.Service[role] = nil | 	c.Service[role] = nil | ||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
|  | @ -359,7 +359,7 @@ func (c *Cluster) deleteEndpoint() error { | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return err | 		return err | ||||||
| 	} | 	} | ||||||
| 	c.logger.Infof("endpoint '%s' has been deleted", util.NameFromMeta(c.Endpoint.ObjectMeta)) | 	c.logger.Infof("endpoint %q has been deleted", util.NameFromMeta(c.Endpoint.ObjectMeta)) | ||||||
| 	c.Endpoint = nil | 	c.Endpoint = nil | ||||||
| 
 | 
 | ||||||
| 	return nil | 	return nil | ||||||
|  | @ -376,7 +376,7 @@ func (c *Cluster) applySecrets() error { | ||||||
| 			if err != nil { | 			if err != nil { | ||||||
| 				return fmt.Errorf("could not get current secret: %v", err) | 				return fmt.Errorf("could not get current secret: %v", err) | ||||||
| 			} | 			} | ||||||
| 			c.logger.Debugf("secret '%s' already exists, fetching it's password", util.NameFromMeta(curSecret.ObjectMeta)) | 			c.logger.Debugf("secret %q already exists, fetching it's password", util.NameFromMeta(curSecret.ObjectMeta)) | ||||||
| 			if secretUsername == c.systemUsers[constants.SuperuserKeyName].Name { | 			if secretUsername == c.systemUsers[constants.SuperuserKeyName].Name { | ||||||
| 				secretUsername = constants.SuperuserKeyName | 				secretUsername = constants.SuperuserKeyName | ||||||
| 				userMap = c.systemUsers | 				userMap = c.systemUsers | ||||||
|  | @ -393,10 +393,10 @@ func (c *Cluster) applySecrets() error { | ||||||
| 			continue | 			continue | ||||||
| 		} else { | 		} else { | ||||||
| 			if err != nil { | 			if err != nil { | ||||||
| 				return fmt.Errorf("could not create secret for user '%s': %v", secretUsername, err) | 				return fmt.Errorf("could not create secret for user %q: %v", secretUsername, err) | ||||||
| 			} | 			} | ||||||
| 			c.Secrets[secret.UID] = secret | 			c.Secrets[secret.UID] = secret | ||||||
| 			c.logger.Debugf("Created new secret '%s', uid: %s", util.NameFromMeta(secret.ObjectMeta), secret.UID) | 			c.logger.Debugf("Created new secret %q, uid: %q", util.NameFromMeta(secret.ObjectMeta), secret.UID) | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
|  | @ -404,12 +404,12 @@ func (c *Cluster) applySecrets() error { | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (c *Cluster) deleteSecret(secret *v1.Secret) error { | func (c *Cluster) deleteSecret(secret *v1.Secret) error { | ||||||
| 	c.logger.Debugf("Deleting secret '%s'", util.NameFromMeta(secret.ObjectMeta)) | 	c.logger.Debugf("Deleting secret %q", util.NameFromMeta(secret.ObjectMeta)) | ||||||
| 	err := c.KubeClient.Secrets(secret.Namespace).Delete(secret.Name, c.deleteOptions) | 	err := c.KubeClient.Secrets(secret.Namespace).Delete(secret.Name, c.deleteOptions) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return err | 		return err | ||||||
| 	} | 	} | ||||||
| 	c.logger.Infof("secret '%s' has been deleted", util.NameFromMeta(secret.ObjectMeta)) | 	c.logger.Infof("secret %q has been deleted", util.NameFromMeta(secret.ObjectMeta)) | ||||||
| 	delete(c.Secrets, secret.UID) | 	delete(c.Secrets, secret.UID) | ||||||
| 
 | 
 | ||||||
| 	return err | 	return err | ||||||
|  |  | ||||||
|  | @ -95,7 +95,7 @@ func (c *Cluster) syncService(role PostgresRole) error { | ||||||
| 		if err != nil { | 		if err != nil { | ||||||
| 			return fmt.Errorf("could not create missing %s service: %v", role, err) | 			return fmt.Errorf("could not create missing %s service: %v", role, err) | ||||||
| 		} | 		} | ||||||
| 		c.logger.Infof("Created missing %s service '%s'", role, util.NameFromMeta(svc.ObjectMeta)) | 		c.logger.Infof("Created missing %s service %q", role, util.NameFromMeta(svc.ObjectMeta)) | ||||||
| 
 | 
 | ||||||
| 		return nil | 		return nil | ||||||
| 	} | 	} | ||||||
|  | @ -110,7 +110,7 @@ func (c *Cluster) syncService(role PostgresRole) error { | ||||||
| 	if err := c.updateService(role, desiredSvc); err != nil { | 	if err := c.updateService(role, desiredSvc); err != nil { | ||||||
| 		return fmt.Errorf("could not update %s service to match desired state: %v", role, err) | 		return fmt.Errorf("could not update %s service to match desired state: %v", role, err) | ||||||
| 	} | 	} | ||||||
| 	c.logger.Infof("%s service '%s' is in the desired state now", role, util.NameFromMeta(desiredSvc.ObjectMeta)) | 	c.logger.Infof("%s service %q is in the desired state now", role, util.NameFromMeta(desiredSvc.ObjectMeta)) | ||||||
| 
 | 
 | ||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
|  | @ -122,7 +122,7 @@ func (c *Cluster) syncEndpoint() error { | ||||||
| 		if err != nil { | 		if err != nil { | ||||||
| 			return fmt.Errorf("could not create missing endpoint: %v", err) | 			return fmt.Errorf("could not create missing endpoint: %v", err) | ||||||
| 		} | 		} | ||||||
| 		c.logger.Infof("Created missing endpoint '%s'", util.NameFromMeta(ep.ObjectMeta)) | 		c.logger.Infof("Created missing endpoint %q", util.NameFromMeta(ep.ObjectMeta)) | ||||||
| 		return nil | 		return nil | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
|  | @ -151,7 +151,7 @@ func (c *Cluster) syncStatefulSet() error { | ||||||
| 		if err != nil { | 		if err != nil { | ||||||
| 			return fmt.Errorf("cluster is not ready: %v", err) | 			return fmt.Errorf("cluster is not ready: %v", err) | ||||||
| 		} | 		} | ||||||
| 		c.logger.Infof("Created missing statefulset '%s'", util.NameFromMeta(ss.ObjectMeta)) | 		c.logger.Infof("Created missing statefulset %q", util.NameFromMeta(ss.ObjectMeta)) | ||||||
| 		if !rollUpdate { | 		if !rollUpdate { | ||||||
| 			return nil | 			return nil | ||||||
| 		} | 		} | ||||||
|  |  | ||||||
|  | @ -77,11 +77,11 @@ func metadataAnnotationsPatch(annotations map[string]string) string { | ||||||
| 
 | 
 | ||||||
| func (c *Cluster) logStatefulSetChanges(old, new *v1beta1.StatefulSet, isUpdate bool, reasons []string) { | func (c *Cluster) logStatefulSetChanges(old, new *v1beta1.StatefulSet, isUpdate bool, reasons []string) { | ||||||
| 	if isUpdate { | 	if isUpdate { | ||||||
| 		c.logger.Infof("statefulset '%s' has been changed", | 		c.logger.Infof("statefulset %q has been changed", | ||||||
| 			util.NameFromMeta(old.ObjectMeta), | 			util.NameFromMeta(old.ObjectMeta), | ||||||
| 		) | 		) | ||||||
| 	} else { | 	} else { | ||||||
| 		c.logger.Infof("statefulset '%s' is not in the desired state and needs to be updated", | 		c.logger.Infof("statefulset %q is not in the desired state and needs to be updated", | ||||||
| 			util.NameFromMeta(old.ObjectMeta), | 			util.NameFromMeta(old.ObjectMeta), | ||||||
| 		) | 		) | ||||||
| 	} | 	} | ||||||
|  | @ -89,18 +89,18 @@ func (c *Cluster) logStatefulSetChanges(old, new *v1beta1.StatefulSet, isUpdate | ||||||
| 
 | 
 | ||||||
| 	if len(reasons) > 0 { | 	if len(reasons) > 0 { | ||||||
| 		for _, reason := range reasons { | 		for _, reason := range reasons { | ||||||
| 			c.logger.Infof("Reason: %s", reason) | 			c.logger.Infof("Reason: %q", reason) | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (c *Cluster) logServiceChanges(role PostgresRole, old, new *v1.Service, isUpdate bool, reason string) { | func (c *Cluster) logServiceChanges(role PostgresRole, old, new *v1.Service, isUpdate bool, reason string) { | ||||||
| 	if isUpdate { | 	if isUpdate { | ||||||
| 		c.logger.Infof("%s service '%s' has been changed", | 		c.logger.Infof("%s service %q has been changed", | ||||||
| 			role, util.NameFromMeta(old.ObjectMeta), | 			role, util.NameFromMeta(old.ObjectMeta), | ||||||
| 		) | 		) | ||||||
| 	} else { | 	} else { | ||||||
| 		c.logger.Infof("%s service '%s  is not in the desired state and needs to be updated", | 		c.logger.Infof("%s service %q is not in the desired state and needs to be updated", | ||||||
| 			role, util.NameFromMeta(old.ObjectMeta), | 			role, util.NameFromMeta(old.ObjectMeta), | ||||||
| 		) | 		) | ||||||
| 	} | 	} | ||||||
|  | @ -127,7 +127,7 @@ func (c *Cluster) getOAuthToken() (string, error) { | ||||||
| 		Get(c.OpConfig.OAuthTokenSecretName.Name, meta_v1.GetOptions{}) | 		Get(c.OpConfig.OAuthTokenSecretName.Name, meta_v1.GetOptions{}) | ||||||
| 
 | 
 | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		c.logger.Debugf("Oauth token secret name: %s", c.OpConfig.OAuthTokenSecretName) | 		c.logger.Debugf("Oauth token secret name: %q", c.OpConfig.OAuthTokenSecretName) | ||||||
| 		return "", fmt.Errorf("could not get credentials secret: %v", err) | 		return "", fmt.Errorf("could not get credentials secret: %v", err) | ||||||
| 	} | 	} | ||||||
| 	data := credentialsSecret.Data | 	data := credentialsSecret.Data | ||||||
|  | @ -153,7 +153,7 @@ func (c *Cluster) getTeamMembers() ([]string, error) { | ||||||
| 		return []string{}, fmt.Errorf("could not get oauth token: %v", err) | 		return []string{}, fmt.Errorf("could not get oauth token: %v", err) | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	teamInfo, err := c.TeamsAPIClient.TeamInfo(c.Spec.TeamID, token) | 	teamInfo, err := c.teamsAPIClient.TeamInfo(c.Spec.TeamID, token) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return nil, fmt.Errorf("could not get team info: %v", err) | 		return nil, fmt.Errorf("could not get team info: %v", err) | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
|  | @ -36,7 +36,7 @@ func (c *Cluster) deletePersistenVolumeClaims() error { | ||||||
| 		return err | 		return err | ||||||
| 	} | 	} | ||||||
| 	for _, pvc := range pvcs { | 	for _, pvc := range pvcs { | ||||||
| 		c.logger.Debugf("Deleting PVC '%s'", util.NameFromMeta(pvc.ObjectMeta)) | 		c.logger.Debugf("Deleting PVC %q", util.NameFromMeta(pvc.ObjectMeta)) | ||||||
| 		if err := c.KubeClient.PersistentVolumeClaims(pvc.Namespace).Delete(pvc.Name, c.deleteOptions); err != nil { | 		if err := c.KubeClient.PersistentVolumeClaims(pvc.Namespace).Delete(pvc.Name, c.deleteOptions); err != nil { | ||||||
| 			c.logger.Warningf("could not delete PersistentVolumeClaim: %v", err) | 			c.logger.Warningf("could not delete PersistentVolumeClaim: %v", err) | ||||||
| 		} | 		} | ||||||
|  | @ -63,10 +63,10 @@ func (c *Cluster) listPersistentVolumes() ([]*v1.PersistentVolume, error) { | ||||||
| 		if lastDash > 0 && lastDash < len(pvc.Name)-1 { | 		if lastDash > 0 && lastDash < len(pvc.Name)-1 { | ||||||
| 			pvcNumber, err := strconv.Atoi(pvc.Name[lastDash+1:]) | 			pvcNumber, err := strconv.Atoi(pvc.Name[lastDash+1:]) | ||||||
| 			if err != nil { | 			if err != nil { | ||||||
| 				return nil, fmt.Errorf("could not convert last part of the persistent volume claim name %s to a number", pvc.Name) | 				return nil, fmt.Errorf("could not convert last part of the persistent volume claim name %q to a number", pvc.Name) | ||||||
| 			} | 			} | ||||||
| 			if int32(pvcNumber) > lastPodIndex { | 			if int32(pvcNumber) > lastPodIndex { | ||||||
| 				c.logger.Debugf("Skipping persistent volume %s corresponding to a non-running pods", pvc.Name) | 				c.logger.Debugf("Skipping persistent volume %q corresponding to a non-running pods", pvc.Name) | ||||||
| 				continue | 				continue | ||||||
| 			} | 			} | ||||||
| 		} | 		} | ||||||
|  | @ -119,22 +119,22 @@ func (c *Cluster) resizeVolumes(newVolume spec.Volume, resizers []volumes.Volume | ||||||
| 			if err != nil { | 			if err != nil { | ||||||
| 				return err | 				return err | ||||||
| 			} | 			} | ||||||
| 			c.logger.Debugf("updating persistent volume %s to %d", pv.Name, newSize) | 			c.logger.Debugf("updating persistent volume %q to %d", pv.Name, newSize) | ||||||
| 			if err := resizer.ResizeVolume(awsVolumeId, newSize); err != nil { | 			if err := resizer.ResizeVolume(awsVolumeId, newSize); err != nil { | ||||||
| 				return fmt.Errorf("could not resize EBS volume %s: %v", awsVolumeId, err) | 				return fmt.Errorf("could not resize EBS volume %q: %v", awsVolumeId, err) | ||||||
| 			} | 			} | ||||||
| 			c.logger.Debugf("resizing the filesystem on the volume %s", pv.Name) | 			c.logger.Debugf("resizing the filesystem on the volume %q", pv.Name) | ||||||
| 			podName := getPodNameFromPersistentVolume(pv) | 			podName := getPodNameFromPersistentVolume(pv) | ||||||
| 			if err := c.resizePostgresFilesystem(podName, []filesystems.FilesystemResizer{&filesystems.Ext234Resize{}}); err != nil { | 			if err := c.resizePostgresFilesystem(podName, []filesystems.FilesystemResizer{&filesystems.Ext234Resize{}}); err != nil { | ||||||
| 				return fmt.Errorf("could not resize the filesystem on pod '%s': %v", podName, err) | 				return fmt.Errorf("could not resize the filesystem on pod %q: %v", podName, err) | ||||||
| 			} | 			} | ||||||
| 			c.logger.Debugf("filesystem resize successful on volume %s", pv.Name) | 			c.logger.Debugf("filesystem resize successful on volume %q", pv.Name) | ||||||
| 			pv.Spec.Capacity[v1.ResourceStorage] = newQuantity | 			pv.Spec.Capacity[v1.ResourceStorage] = newQuantity | ||||||
| 			c.logger.Debugf("updating persistent volume definition for volume %s", pv.Name) | 			c.logger.Debugf("updating persistent volume definition for volume %q", pv.Name) | ||||||
| 			if _, err := c.KubeClient.PersistentVolumes().Update(pv); err != nil { | 			if _, err := c.KubeClient.PersistentVolumes().Update(pv); err != nil { | ||||||
| 				return fmt.Errorf("could not update persistent volume: %s", err) | 				return fmt.Errorf("could not update persistent volume: %q", err) | ||||||
| 			} | 			} | ||||||
| 			c.logger.Debugf("successfully updated persistent volume %s", pv.Name) | 			c.logger.Debugf("successfully updated persistent volume %q", pv.Name) | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
| 	if len(pvs) > 0 && totalCompatible == 0 { | 	if len(pvs) > 0 && totalCompatible == 0 { | ||||||
|  |  | ||||||
|  | @ -5,7 +5,7 @@ import ( | ||||||
| 	"sync" | 	"sync" | ||||||
| 
 | 
 | ||||||
| 	"github.com/Sirupsen/logrus" | 	"github.com/Sirupsen/logrus" | ||||||
| 	"k8s.io/client-go/kubernetes" | 	meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||||||
| 	"k8s.io/client-go/pkg/api/v1" | 	"k8s.io/client-go/pkg/api/v1" | ||||||
| 	"k8s.io/client-go/rest" | 	"k8s.io/client-go/rest" | ||||||
| 	"k8s.io/client-go/tools/cache" | 	"k8s.io/client-go/tools/cache" | ||||||
|  | @ -14,21 +14,26 @@ import ( | ||||||
| 	"github.com/zalando-incubator/postgres-operator/pkg/spec" | 	"github.com/zalando-incubator/postgres-operator/pkg/spec" | ||||||
| 	"github.com/zalando-incubator/postgres-operator/pkg/util/config" | 	"github.com/zalando-incubator/postgres-operator/pkg/util/config" | ||||||
| 	"github.com/zalando-incubator/postgres-operator/pkg/util/constants" | 	"github.com/zalando-incubator/postgres-operator/pkg/util/constants" | ||||||
| 	"github.com/zalando-incubator/postgres-operator/pkg/util/teams" | 	"github.com/zalando-incubator/postgres-operator/pkg/util/k8sutil" | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
| type Config struct { | type Config struct { | ||||||
| 	RestConfig          *rest.Config | 	RestConfig          *rest.Config | ||||||
| 	KubeClient          *kubernetes.Clientset |  | ||||||
| 	RestClient          *rest.RESTClient |  | ||||||
| 	TeamsAPIClient      *teams.API |  | ||||||
| 	InfrastructureRoles map[string]spec.PgUser | 	InfrastructureRoles map[string]spec.PgUser | ||||||
|  | 
 | ||||||
|  | 	NoDatabaseAccess bool | ||||||
|  | 	NoTeamsAPI       bool | ||||||
|  | 	ConfigMapName    spec.NamespacedName | ||||||
|  | 	Namespace        string | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| type Controller struct { | type Controller struct { | ||||||
| 	Config | 	config   Config | ||||||
| 	opConfig *config.Config | 	opConfig *config.Config | ||||||
|  | 
 | ||||||
| 	logger     *logrus.Entry | 	logger     *logrus.Entry | ||||||
|  | 	KubeClient k8sutil.KubernetesClient | ||||||
|  | 	RestClient rest.Interface | ||||||
| 
 | 
 | ||||||
| 	clustersMu sync.RWMutex | 	clustersMu sync.RWMutex | ||||||
| 	clusters   map[spec.NamespacedName]*cluster.Cluster | 	clusters   map[spec.NamespacedName]*cluster.Cluster | ||||||
|  | @ -39,22 +44,15 @@ type Controller struct { | ||||||
| 	podCh              chan spec.PodEvent | 	podCh              chan spec.PodEvent | ||||||
| 
 | 
 | ||||||
| 	clusterEventQueues  []*cache.FIFO | 	clusterEventQueues  []*cache.FIFO | ||||||
| 
 |  | ||||||
| 	lastClusterSyncTime int64 | 	lastClusterSyncTime int64 | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func New(controllerConfig *Config, operatorConfig *config.Config) *Controller { | func NewController(controllerConfig *Config) *Controller { | ||||||
| 	logger := logrus.New() | 	logger := logrus.New() | ||||||
| 
 | 
 | ||||||
| 	if operatorConfig.DebugLogging { |  | ||||||
| 		logger.Level = logrus.DebugLevel |  | ||||||
| 	} |  | ||||||
| 
 |  | ||||||
| 	controllerConfig.TeamsAPIClient = teams.NewTeamsAPI(operatorConfig.TeamsAPIUrl, logger) |  | ||||||
| 
 |  | ||||||
| 	return &Controller{ | 	return &Controller{ | ||||||
| 		Config:   *controllerConfig, | 		config:   *controllerConfig, | ||||||
| 		opConfig: operatorConfig, | 		opConfig: &config.Config{}, | ||||||
| 		logger:   logger.WithField("pkg", "controller"), | 		logger:   logger.WithField("pkg", "controller"), | ||||||
| 		clusters: make(map[spec.NamespacedName]*cluster.Cluster), | 		clusters: make(map[spec.NamespacedName]*cluster.Cluster), | ||||||
| 		stopChs:  make(map[spec.NamespacedName]chan struct{}), | 		stopChs:  make(map[spec.NamespacedName]chan struct{}), | ||||||
|  | @ -62,45 +60,76 @@ func New(controllerConfig *Config, operatorConfig *config.Config) *Controller { | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (c *Controller) Run(stopCh <-chan struct{}, wg *sync.WaitGroup) { | func (c *Controller) initClients() { | ||||||
|  | 	client, err := k8sutil.ClientSet(c.config.RestConfig) | ||||||
|  | 	if err != nil { | ||||||
|  | 		c.logger.Fatalf("couldn't create client: %v", err) | ||||||
|  | 	} | ||||||
|  | 	c.KubeClient = k8sutil.NewFromKubernetesInterface(client) | ||||||
| 
 | 
 | ||||||
| 	c.initController() | 	c.RestClient, err = k8sutil.KubernetesRestClient(*c.config.RestConfig) | ||||||
| 
 | 	if err != nil { | ||||||
| 	wg.Add(4) | 		c.logger.Fatalf("couldn't create rest client: %v", err) | ||||||
| 	go c.runPodInformer(stopCh, wg) | 	} | ||||||
| 	go c.runPostgresqlInformer(stopCh, wg) |  | ||||||
| 	go c.podEventsDispatcher(stopCh, wg) |  | ||||||
| 	go c.clusterResync(stopCh, wg) |  | ||||||
| 
 |  | ||||||
| 	for i := range c.clusterEventQueues { |  | ||||||
| 		wg.Add(1) |  | ||||||
| 		go c.processClusterEventsQueue(stopCh, i, wg) |  | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| 	c.logger.Info("Started working in background") | func (c *Controller) initOperatorConfig() { | ||||||
|  | 	configMapData := make(map[string]string) | ||||||
|  | 
 | ||||||
|  | 	if c.config.ConfigMapName != (spec.NamespacedName{}) { | ||||||
|  | 		configMap, err := c.KubeClient.ConfigMaps(c.config.ConfigMapName.Namespace). | ||||||
|  | 			Get(c.config.ConfigMapName.Name, meta_v1.GetOptions{}) | ||||||
|  | 		if err != nil { | ||||||
|  | 			panic(err) | ||||||
|  | 		} | ||||||
|  | 
 | ||||||
|  | 		configMapData = configMap.Data | ||||||
|  | 	} else { | ||||||
|  | 		c.logger.Infoln("No ConfigMap specified. Loading default values") | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	if configMapData["namespace"] == "" { // Namespace in ConfigMap has priority over env var
 | ||||||
|  | 		configMapData["namespace"] = c.config.Namespace | ||||||
|  | 	} | ||||||
|  | 	if c.config.NoDatabaseAccess { | ||||||
|  | 		configMapData["enable_database_access"] = "false" | ||||||
|  | 	} | ||||||
|  | 	if c.config.NoTeamsAPI { | ||||||
|  | 		configMapData["enable_teams_api"] = "false" | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	c.opConfig = config.NewFromMap(configMapData) | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (c *Controller) initController() { | func (c *Controller) initController() { | ||||||
|  | 	c.initClients() | ||||||
|  | 	c.initOperatorConfig() | ||||||
|  | 
 | ||||||
|  | 	c.logger.Infof("Config: %s", c.opConfig.MustMarshal()) | ||||||
|  | 
 | ||||||
|  | 	if c.opConfig.DebugLogging { | ||||||
|  | 		c.logger.Level = logrus.DebugLevel | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
| 	if err := c.createTPR(); err != nil { | 	if err := c.createTPR(); err != nil { | ||||||
| 		c.logger.Fatalf("could not register ThirdPartyResource: %v", err) | 		c.logger.Fatalf("could not register ThirdPartyResource: %v", err) | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	if infraRoles, err := c.getInfrastructureRoles(); err != nil { | 	if infraRoles, err := c.getInfrastructureRoles(&c.opConfig.InfrastructureRolesSecretName); err != nil { | ||||||
| 		c.logger.Warningf("could not get infrastructure roles: %v", err) | 		c.logger.Warningf("could not get infrastructure roles: %v", err) | ||||||
| 	} else { | 	} else { | ||||||
| 		c.InfrastructureRoles = infraRoles | 		c.config.InfrastructureRoles = infraRoles | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	// Postgresqls
 | 	// Postgresqls
 | ||||||
| 	clusterLw := &cache.ListWatch{ | 	c.postgresqlInformer = cache.NewSharedIndexInformer( | ||||||
|  | 		&cache.ListWatch{ | ||||||
| 			ListFunc:  c.clusterListFunc, | 			ListFunc:  c.clusterListFunc, | ||||||
| 			WatchFunc: c.clusterWatchFunc, | 			WatchFunc: c.clusterWatchFunc, | ||||||
| 	} | 		}, | ||||||
| 	c.postgresqlInformer = cache.NewSharedIndexInformer( |  | ||||||
| 		clusterLw, |  | ||||||
| 		&spec.Postgresql{}, | 		&spec.Postgresql{}, | ||||||
| 		constants.QueueResyncPeriodTPR, | 		constants.QueueResyncPeriodTPR, | ||||||
| 		cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) | 		cache.Indexers{}) | ||||||
| 
 | 
 | ||||||
| 	c.postgresqlInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ | 	c.postgresqlInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ | ||||||
| 		AddFunc:    c.postgresqlAdd, | 		AddFunc:    c.postgresqlAdd, | ||||||
|  | @ -139,14 +168,26 @@ func (c *Controller) initController() { | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (c *Controller) runPodInformer(stopCh <-chan struct{}, wg *sync.WaitGroup) { | func (c *Controller) Run(stopCh <-chan struct{}, wg *sync.WaitGroup) { | ||||||
| 	defer wg.Done() | 	defer wg.Done() | ||||||
|  | 	wg.Add(1) | ||||||
| 
 | 
 | ||||||
| 	c.podInformer.Run(stopCh) | 	c.initController() | ||||||
|  | 
 | ||||||
|  | 	go c.runInformers(stopCh) | ||||||
|  | 
 | ||||||
|  | 	for i := range c.clusterEventQueues { | ||||||
|  | 		go c.processClusterEventsQueue(i) | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| func (c *Controller) runPostgresqlInformer(stopCh <-chan struct{}, wg *sync.WaitGroup) { | 	c.logger.Info("Started working in background") | ||||||
| 	defer wg.Done() | } | ||||||
| 
 | 
 | ||||||
| 	c.postgresqlInformer.Run(stopCh) | func (c *Controller) runInformers(stopCh <-chan struct{}) { | ||||||
|  | 	go c.postgresqlInformer.Run(stopCh) | ||||||
|  | 	go c.podInformer.Run(stopCh) | ||||||
|  | 	go c.podEventsDispatcher(stopCh) | ||||||
|  | 	go c.clusterResync(stopCh) | ||||||
|  | 
 | ||||||
|  | 	<-stopCh | ||||||
| } | } | ||||||
|  |  | ||||||
|  | @ -24,7 +24,7 @@ func (c *Controller) podListFunc(options meta_v1.ListOptions) (runtime.Object, e | ||||||
| 		TimeoutSeconds:  options.TimeoutSeconds, | 		TimeoutSeconds:  options.TimeoutSeconds, | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	return c.KubeClient.CoreV1().Pods(c.opConfig.Namespace).List(opts) | 	return c.KubeClient.Pods(c.opConfig.Namespace).List(opts) | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (c *Controller) podWatchFunc(options meta_v1.ListOptions) (watch.Interface, error) { | func (c *Controller) podWatchFunc(options meta_v1.ListOptions) (watch.Interface, error) { | ||||||
|  | @ -39,7 +39,7 @@ func (c *Controller) podWatchFunc(options meta_v1.ListOptions) (watch.Interface, | ||||||
| 		TimeoutSeconds:  options.TimeoutSeconds, | 		TimeoutSeconds:  options.TimeoutSeconds, | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	return c.KubeClient.CoreV1Client.Pods(c.opConfig.Namespace).Watch(opts) | 	return c.KubeClient.Pods(c.opConfig.Namespace).Watch(opts) | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (c *Controller) podAdd(obj interface{}) { | func (c *Controller) podAdd(obj interface{}) { | ||||||
|  | @ -111,7 +111,7 @@ func (c *Controller) podEventsDispatcher(stopCh <-chan struct{}, wg *sync.WaitGr | ||||||
| 			c.clustersMu.RUnlock() | 			c.clustersMu.RUnlock() | ||||||
| 
 | 
 | ||||||
| 			if ok { | 			if ok { | ||||||
| 				c.logger.Debugf("Sending %s event of pod '%s' to the '%s' cluster channel", event.EventType, event.PodName, event.ClusterName) | 				c.logger.Debugf("Sending %q event of pod %q to the %q cluster channel", event.EventType, event.PodName, event.ClusterName) | ||||||
| 				cluster.ReceivePodEvent(event) | 				cluster.ReceivePodEvent(event) | ||||||
| 			} | 			} | ||||||
| 		case <-stopCh: | 		case <-stopCh: | ||||||
|  |  | ||||||
|  | @ -1,19 +1,17 @@ | ||||||
| package controller | package controller | ||||||
| 
 | 
 | ||||||
| import ( | import ( | ||||||
|  | 	"encoding/json" | ||||||
| 	"fmt" | 	"fmt" | ||||||
| 	"reflect" | 	"reflect" | ||||||
| 	"sync" | 	"sync" | ||||||
| 	"sync/atomic" | 	"sync/atomic" | ||||||
| 	"time" | 	"time" | ||||||
| 
 | 
 | ||||||
| 	"k8s.io/apimachinery/pkg/api/meta" |  | ||||||
| 	meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1" | 	meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||||||
| 	"k8s.io/apimachinery/pkg/fields" |  | ||||||
| 	"k8s.io/apimachinery/pkg/runtime" | 	"k8s.io/apimachinery/pkg/runtime" | ||||||
| 	"k8s.io/apimachinery/pkg/types" | 	"k8s.io/apimachinery/pkg/types" | ||||||
| 	"k8s.io/apimachinery/pkg/watch" | 	"k8s.io/apimachinery/pkg/watch" | ||||||
| 	"k8s.io/client-go/pkg/api" |  | ||||||
| 	"k8s.io/client-go/tools/cache" | 	"k8s.io/client-go/tools/cache" | ||||||
| 
 | 
 | ||||||
| 	"github.com/zalando-incubator/postgres-operator/pkg/cluster" | 	"github.com/zalando-incubator/postgres-operator/pkg/cluster" | ||||||
|  | @ -37,44 +35,35 @@ func (c *Controller) clusterResync(stopCh <-chan struct{}, wg *sync.WaitGroup) { | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (c *Controller) clusterListFunc(options meta_v1.ListOptions) (runtime.Object, error) { | func (c *Controller) clusterListFunc(options meta_v1.ListOptions) (runtime.Object, error) { | ||||||
| 	c.logger.Info("Getting list of currently running clusters") | 	var list spec.PostgresqlList | ||||||
|  | 	var activeClustersCnt, failedClustersCnt int | ||||||
| 
 | 
 | ||||||
| 	req := c.RestClient.Get(). | 	req := c.RestClient. | ||||||
| 		RequestURI(fmt.Sprintf(constants.ListClustersURITemplate, c.opConfig.Namespace)). | 		Get(). | ||||||
| 		VersionedParams(&options, api.ParameterCodec). | 		Namespace(c.opConfig.Namespace). | ||||||
| 		FieldsSelectorParam(fields.Everything()) | 		Resource(constants.ResourceName). | ||||||
| 
 | 		VersionedParams(&options, meta_v1.ParameterCodec) | ||||||
| 	object, err := req.Do().Get() |  | ||||||
| 
 | 
 | ||||||
|  | 	b, err := req.DoRaw() | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return nil, fmt.Errorf("could not get list of postgresql objects: %v", err) | 		return nil, err | ||||||
| 	} |  | ||||||
| 
 |  | ||||||
| 	objList, err := meta.ExtractList(object) |  | ||||||
| 	if err != nil { |  | ||||||
| 		return nil, fmt.Errorf("could not extract list of postgresql objects: %v", err) |  | ||||||
| 	} | 	} | ||||||
|  | 	err = json.Unmarshal(b, &list) | ||||||
| 
 | 
 | ||||||
| 	if time.Now().Unix()-atomic.LoadInt64(&c.lastClusterSyncTime) <= int64(c.opConfig.ResyncPeriod.Seconds()) { | 	if time.Now().Unix()-atomic.LoadInt64(&c.lastClusterSyncTime) <= int64(c.opConfig.ResyncPeriod.Seconds()) { | ||||||
| 		c.logger.Debugln("skipping resync of clusters") | 		c.logger.Debugln("skipping resync of clusters") | ||||||
| 		return object, err | 		return &list, err | ||||||
| 	} |  | ||||||
| 
 |  | ||||||
| 	var activeClustersCnt, failedClustersCnt int |  | ||||||
| 	for _, obj := range objList { |  | ||||||
| 		pg, ok := obj.(*spec.Postgresql) |  | ||||||
| 		if !ok { |  | ||||||
| 			return nil, fmt.Errorf("could not cast object to postgresql") |  | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
|  | 	for _, pg := range list.Items { | ||||||
| 		if pg.Error != nil { | 		if pg.Error != nil { | ||||||
| 			failedClustersCnt++ | 			failedClustersCnt++ | ||||||
| 			continue | 			continue | ||||||
| 		} | 		} | ||||||
| 		c.queueClusterEvent(nil, pg, spec.EventSync) | 		c.queueClusterEvent(nil, &pg, spec.EventSync) | ||||||
| 		activeClustersCnt++ | 		activeClustersCnt++ | ||||||
| 	} | 	} | ||||||
| 	if len(objList) > 0 { | 	if len(list.Items) > 0 { | ||||||
| 		if failedClustersCnt > 0 && activeClustersCnt == 0 { | 		if failedClustersCnt > 0 && activeClustersCnt == 0 { | ||||||
| 			c.logger.Infof("There are no clusters running. %d are in the failed state", failedClustersCnt) | 			c.logger.Infof("There are no clusters running. %d are in the failed state", failedClustersCnt) | ||||||
| 		} else if failedClustersCnt == 0 && activeClustersCnt > 0 { | 		} else if failedClustersCnt == 0 && activeClustersCnt > 0 { | ||||||
|  | @ -88,15 +77,48 @@ func (c *Controller) clusterListFunc(options meta_v1.ListOptions) (runtime.Objec | ||||||
| 
 | 
 | ||||||
| 	atomic.StoreInt64(&c.lastClusterSyncTime, time.Now().Unix()) | 	atomic.StoreInt64(&c.lastClusterSyncTime, time.Now().Unix()) | ||||||
| 
 | 
 | ||||||
| 	return object, err | 	return &list, err | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | type tprDecoder struct { | ||||||
|  | 	dec   *json.Decoder | ||||||
|  | 	close func() error | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (d *tprDecoder) Close() { | ||||||
|  | 	d.close() | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (d *tprDecoder) Decode() (action watch.EventType, object runtime.Object, err error) { | ||||||
|  | 	var e struct { | ||||||
|  | 		Type   watch.EventType | ||||||
|  | 		Object spec.Postgresql | ||||||
|  | 	} | ||||||
|  | 	if err := d.dec.Decode(&e); err != nil { | ||||||
|  | 		return watch.Error, nil, err | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	return e.Type, &e.Object, nil | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (c *Controller) clusterWatchFunc(options meta_v1.ListOptions) (watch.Interface, error) { | func (c *Controller) clusterWatchFunc(options meta_v1.ListOptions) (watch.Interface, error) { | ||||||
| 	req := c.RestClient.Get(). | 	options.Watch = true | ||||||
| 		RequestURI(fmt.Sprintf(constants.WatchClustersURITemplate, c.opConfig.Namespace)). | 	r, err := c.RestClient. | ||||||
| 		VersionedParams(&options, api.ParameterCodec). | 		Get(). | ||||||
| 		FieldsSelectorParam(fields.Everything()) | 		Namespace(c.opConfig.Namespace). | ||||||
| 	return req.Watch() | 		Resource(constants.ResourceName). | ||||||
|  | 		VersionedParams(&options, meta_v1.ParameterCodec). | ||||||
|  | 		FieldsSelectorParam(nil). | ||||||
|  | 		Stream() | ||||||
|  | 
 | ||||||
|  | 	if err != nil { | ||||||
|  | 		return nil, err | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	return watch.NewStreamWatcher(&tprDecoder{ | ||||||
|  | 		dec:   json.NewDecoder(r), | ||||||
|  | 		close: r.Close, | ||||||
|  | 	}), nil | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (c *Controller) processEvent(obj interface{}) error { | func (c *Controller) processEvent(obj interface{}) error { | ||||||
|  | @ -121,14 +143,14 @@ func (c *Controller) processEvent(obj interface{}) error { | ||||||
| 	switch event.EventType { | 	switch event.EventType { | ||||||
| 	case spec.EventAdd: | 	case spec.EventAdd: | ||||||
| 		if clusterFound { | 		if clusterFound { | ||||||
| 			logger.Debugf("Cluster '%s' already exists", clusterName) | 			logger.Debugf("Cluster %q already exists", clusterName) | ||||||
| 			return nil | 			return nil | ||||||
| 		} | 		} | ||||||
| 
 | 
 | ||||||
| 		logger.Infof("Creation of the '%s' cluster started", clusterName) | 		logger.Infof("Creation of the %q cluster started", clusterName) | ||||||
| 
 | 
 | ||||||
| 		stopCh := make(chan struct{}) | 		stopCh := make(chan struct{}) | ||||||
| 		cl = cluster.New(c.makeClusterConfig(), *event.NewSpec, logger) | 		cl = cluster.New(c.makeClusterConfig(), c.KubeClient, *event.NewSpec, logger) | ||||||
| 		cl.Run(stopCh) | 		cl.Run(stopCh) | ||||||
| 
 | 
 | ||||||
| 		c.clustersMu.Lock() | 		c.clustersMu.Lock() | ||||||
|  | @ -143,31 +165,31 @@ func (c *Controller) processEvent(obj interface{}) error { | ||||||
| 			return nil | 			return nil | ||||||
| 		} | 		} | ||||||
| 
 | 
 | ||||||
| 		logger.Infof("Cluster '%s' has been created", clusterName) | 		logger.Infof("Cluster %q has been created", clusterName) | ||||||
| 	case spec.EventUpdate: | 	case spec.EventUpdate: | ||||||
| 		logger.Infof("Update of the '%s' cluster started", clusterName) | 		logger.Infof("Update of the %q cluster started", clusterName) | ||||||
| 
 | 
 | ||||||
| 		if !clusterFound { | 		if !clusterFound { | ||||||
| 			logger.Warnf("Cluster '%s' does not exist", clusterName) | 			logger.Warnf("Cluster %q does not exist", clusterName) | ||||||
| 			return nil | 			return nil | ||||||
| 		} | 		} | ||||||
| 		if err := cl.Update(event.NewSpec); err != nil { | 		if err := cl.Update(event.NewSpec); err != nil { | ||||||
| 			cl.Error = fmt.Errorf("could not update cluster: %s", err) | 			cl.Error = fmt.Errorf("could not update cluster: %v", err) | ||||||
| 			logger.Errorf("%v", cl.Error) | 			logger.Errorf("%v", cl.Error) | ||||||
| 
 | 
 | ||||||
| 			return nil | 			return nil | ||||||
| 		} | 		} | ||||||
| 		cl.Error = nil | 		cl.Error = nil | ||||||
| 		logger.Infof("Cluster '%s' has been updated", clusterName) | 		logger.Infof("Cluster %q has been updated", clusterName) | ||||||
| 	case spec.EventDelete: | 	case spec.EventDelete: | ||||||
| 		logger.Infof("Deletion of the '%s' cluster started", clusterName) | 		logger.Infof("Deletion of the %q cluster started", clusterName) | ||||||
| 		if !clusterFound { | 		if !clusterFound { | ||||||
| 			logger.Errorf("Unknown cluster: %s", clusterName) | 			logger.Errorf("Unknown cluster: %q", clusterName) | ||||||
| 			return nil | 			return nil | ||||||
| 		} | 		} | ||||||
| 
 | 
 | ||||||
| 		if err := cl.Delete(); err != nil { | 		if err := cl.Delete(); err != nil { | ||||||
| 			logger.Errorf("could not delete cluster '%s': %s", clusterName, err) | 			logger.Errorf("could not delete cluster %q: %v", clusterName, err) | ||||||
| 			return nil | 			return nil | ||||||
| 		} | 		} | ||||||
| 		close(c.stopChs[clusterName]) | 		close(c.stopChs[clusterName]) | ||||||
|  | @ -177,14 +199,14 @@ func (c *Controller) processEvent(obj interface{}) error { | ||||||
| 		delete(c.stopChs, clusterName) | 		delete(c.stopChs, clusterName) | ||||||
| 		c.clustersMu.Unlock() | 		c.clustersMu.Unlock() | ||||||
| 
 | 
 | ||||||
| 		logger.Infof("Cluster '%s' has been deleted", clusterName) | 		logger.Infof("Cluster %q has been deleted", clusterName) | ||||||
| 	case spec.EventSync: | 	case spec.EventSync: | ||||||
| 		logger.Infof("Syncing of the '%s' cluster started", clusterName) | 		logger.Infof("Syncing of the %q cluster started", clusterName) | ||||||
| 
 | 
 | ||||||
| 		// no race condition because a cluster is always processed by single worker
 | 		// no race condition because a cluster is always processed by single worker
 | ||||||
| 		if !clusterFound { | 		if !clusterFound { | ||||||
| 			stopCh := make(chan struct{}) | 			stopCh := make(chan struct{}) | ||||||
| 			cl = cluster.New(c.makeClusterConfig(), *event.NewSpec, logger) | 			cl = cluster.New(c.makeClusterConfig(), c.KubeClient, *event.NewSpec, logger) | ||||||
| 			cl.Run(stopCh) | 			cl.Run(stopCh) | ||||||
| 
 | 
 | ||||||
| 			c.clustersMu.Lock() | 			c.clustersMu.Lock() | ||||||
|  | @ -194,13 +216,13 @@ func (c *Controller) processEvent(obj interface{}) error { | ||||||
| 		} | 		} | ||||||
| 
 | 
 | ||||||
| 		if err := cl.Sync(); err != nil { | 		if err := cl.Sync(); err != nil { | ||||||
| 			cl.Error = fmt.Errorf("could not sync cluster '%s': %v", clusterName, err) | 			cl.Error = fmt.Errorf("could not sync cluster %q: %v", clusterName, err) | ||||||
| 			logger.Errorf("%v", cl.Error) | 			logger.Errorf("%v", cl.Error) | ||||||
| 			return nil | 			return nil | ||||||
| 		} | 		} | ||||||
| 		cl.Error = nil | 		cl.Error = nil | ||||||
| 
 | 
 | ||||||
| 		logger.Infof("Cluster '%s' has been synced", clusterName) | 		logger.Infof("Cluster %q has been synced", clusterName) | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	return nil | 	return nil | ||||||
|  | @ -241,7 +263,7 @@ func (c *Controller) queueClusterEvent(old, new *spec.Postgresql, eventType spec | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	if clusterError != nil && eventType != spec.EventDelete { | 	if clusterError != nil && eventType != spec.EventDelete { | ||||||
| 		c.logger.Debugf("Skipping %s event for invalid cluster %s (reason: %v)", eventType, clusterName, clusterError) | 		c.logger.Debugf("Skipping %q event for invalid cluster %q (reason: %v)", eventType, clusterName, clusterError) | ||||||
| 		return | 		return | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
|  | @ -258,7 +280,7 @@ func (c *Controller) queueClusterEvent(old, new *spec.Postgresql, eventType spec | ||||||
| 	if err := c.clusterEventQueues[workerID].Add(clusterEvent); err != nil { | 	if err := c.clusterEventQueues[workerID].Add(clusterEvent); err != nil { | ||||||
| 		c.logger.WithField("worker", workerID).Errorf("error when queueing cluster event: %v", clusterEvent) | 		c.logger.WithField("worker", workerID).Errorf("error when queueing cluster event: %v", clusterEvent) | ||||||
| 	} | 	} | ||||||
| 	c.logger.WithField("worker", workerID).Infof("%s of the '%s' cluster has been queued", eventType, clusterName) | 	c.logger.WithField("worker", workerID).Infof("%q of the %q cluster has been queued", eventType, clusterName) | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (c *Controller) postgresqlAdd(obj interface{}) { | func (c *Controller) postgresqlAdd(obj interface{}) { | ||||||
|  |  | ||||||
|  | @ -17,15 +17,12 @@ import ( | ||||||
| 
 | 
 | ||||||
| func (c *Controller) makeClusterConfig() cluster.Config { | func (c *Controller) makeClusterConfig() cluster.Config { | ||||||
| 	infrastructureRoles := make(map[string]spec.PgUser) | 	infrastructureRoles := make(map[string]spec.PgUser) | ||||||
| 	for k, v := range c.InfrastructureRoles { | 	for k, v := range c.config.InfrastructureRoles { | ||||||
| 		infrastructureRoles[k] = v | 		infrastructureRoles[k] = v | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	return cluster.Config{ | 	return cluster.Config{ | ||||||
| 		KubeClient:          c.KubeClient, | 		RestConfig:          c.config.RestConfig, | ||||||
| 		RestClient:          c.RestClient, |  | ||||||
| 		RestConfig:          c.RestConfig, |  | ||||||
| 		TeamsAPIClient:      c.TeamsAPIClient, |  | ||||||
| 		OpConfig:            config.Copy(c.opConfig), | 		OpConfig:            config.Copy(c.opConfig), | ||||||
| 		InfrastructureRoles: infrastructureRoles, | 		InfrastructureRoles: infrastructureRoles, | ||||||
| 	} | 	} | ||||||
|  | @ -49,33 +46,32 @@ func (c *Controller) clusterWorkerID(clusterName spec.NamespacedName) uint32 { | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (c *Controller) createTPR() error { | func (c *Controller) createTPR() error { | ||||||
| 	TPRName := fmt.Sprintf("%s.%s", constants.TPRName, constants.TPRVendor) | 	tpr := thirdPartyResource(constants.TPRName) | ||||||
| 	tpr := thirdPartyResource(TPRName) |  | ||||||
| 
 | 
 | ||||||
| 	_, err := c.KubeClient.ExtensionsV1beta1().ThirdPartyResources().Create(tpr) | 	_, err := c.KubeClient.ThirdPartyResources().Create(tpr) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		if !k8sutil.ResourceAlreadyExists(err) { | 		if !k8sutil.ResourceAlreadyExists(err) { | ||||||
| 			return err | 			return err | ||||||
| 		} | 		} | ||||||
| 		c.logger.Infof("ThirdPartyResource '%s' is already registered", TPRName) | 		c.logger.Infof("ThirdPartyResource %q is already registered", constants.TPRName) | ||||||
| 	} else { | 	} else { | ||||||
| 		c.logger.Infof("ThirdPartyResource '%s' has been registered", TPRName) | 		c.logger.Infof("ThirdPartyResource %q' has been registered", constants.TPRName) | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	return k8sutil.WaitTPRReady(c.RestClient, c.opConfig.TPR.ReadyWaitInterval, c.opConfig.TPR.ReadyWaitTimeout, c.opConfig.Namespace) | 	return k8sutil.WaitTPRReady(c.RestClient, c.opConfig.TPR.ReadyWaitInterval, c.opConfig.TPR.ReadyWaitTimeout, c.opConfig.Namespace) | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (c *Controller) getInfrastructureRoles() (result map[string]spec.PgUser, err error) { | func (c *Controller) getInfrastructureRoles(rolesSecret *spec.NamespacedName) (result map[string]spec.PgUser, err error) { | ||||||
| 	if c.opConfig.InfrastructureRolesSecretName == (spec.NamespacedName{}) { | 	if *rolesSecret == (spec.NamespacedName{}) { | ||||||
| 		// we don't have infrastructure roles defined, bail out
 | 		// we don't have infrastructure roles defined, bail out
 | ||||||
| 		return nil, nil | 		return nil, nil | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	infraRolesSecret, err := c.KubeClient. | 	infraRolesSecret, err := c.KubeClient. | ||||||
| 		Secrets(c.opConfig.InfrastructureRolesSecretName.Namespace). | 		Secrets(rolesSecret.Namespace). | ||||||
| 		Get(c.opConfig.InfrastructureRolesSecretName.Name, meta_v1.GetOptions{}) | 		Get(rolesSecret.Name, meta_v1.GetOptions{}) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		c.logger.Debugf("Infrastructure roles secret name: %s", c.opConfig.InfrastructureRolesSecretName) | 		c.logger.Debugf("Infrastructure roles secret name: %q", *rolesSecret) | ||||||
| 		return nil, fmt.Errorf("could not get infrastructure roles secret: %v", err) | 		return nil, fmt.Errorf("could not get infrastructure roles secret: %v", err) | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
|  | @ -103,7 +99,7 @@ Users: | ||||||
| 				case "inrole": | 				case "inrole": | ||||||
| 					t.MemberOf = append(t.MemberOf, s) | 					t.MemberOf = append(t.MemberOf, s) | ||||||
| 				default: | 				default: | ||||||
| 					c.logger.Warnf("Unknown key %s", p) | 					c.logger.Warnf("Unknown key %q", p) | ||||||
| 				} | 				} | ||||||
| 			} | 			} | ||||||
| 		} | 		} | ||||||
|  |  | ||||||
|  | @ -0,0 +1,154 @@ | ||||||
|  | package controller | ||||||
|  | 
 | ||||||
|  | import ( | ||||||
|  | 	"fmt" | ||||||
|  | 	"reflect" | ||||||
|  | 	"testing" | ||||||
|  | 
 | ||||||
|  | 	meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||||||
|  | 	v1core "k8s.io/client-go/kubernetes/typed/core/v1" | ||||||
|  | 	"k8s.io/client-go/pkg/api/v1" | ||||||
|  | 
 | ||||||
|  | 	"github.com/zalando-incubator/postgres-operator/pkg/spec" | ||||||
|  | 	"github.com/zalando-incubator/postgres-operator/pkg/util/k8sutil" | ||||||
|  | ) | ||||||
|  | 
 | ||||||
|  | const ( | ||||||
|  | 	testInfrastructureRolesSecretName = "infrastructureroles-test" | ||||||
|  | ) | ||||||
|  | 
 | ||||||
|  | type mockSecret struct { | ||||||
|  | 	v1core.SecretInterface | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (c *mockSecret) Get(name string, options meta_v1.GetOptions) (*v1.Secret, error) { | ||||||
|  | 	if name != testInfrastructureRolesSecretName { | ||||||
|  | 		return nil, fmt.Errorf("NotFound") | ||||||
|  | 	} | ||||||
|  | 	secret := &v1.Secret{} | ||||||
|  | 	secret.Name = mockController.opConfig.ClusterNameLabel | ||||||
|  | 	secret.Data = map[string][]byte{ | ||||||
|  | 		"user1":     []byte("testrole"), | ||||||
|  | 		"password1": []byte("testpassword"), | ||||||
|  | 		"inrole1":   []byte("testinrole"), | ||||||
|  | 	} | ||||||
|  | 	return secret, nil | ||||||
|  | 
 | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | type MockSecretGetter struct { | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (c *MockSecretGetter) Secrets(namespace string) v1core.SecretInterface { | ||||||
|  | 	return &mockSecret{} | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func newMockKubernetesClient() k8sutil.KubernetesClient { | ||||||
|  | 	return k8sutil.KubernetesClient{SecretsGetter: &MockSecretGetter{}} | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func newMockController() *Controller { | ||||||
|  | 	controller := NewController(&Config{}) | ||||||
|  | 	controller.opConfig.ClusterNameLabel = "cluster-name" | ||||||
|  | 	controller.opConfig.InfrastructureRolesSecretName = | ||||||
|  | 		spec.NamespacedName{v1.NamespaceDefault, testInfrastructureRolesSecretName} | ||||||
|  | 	controller.opConfig.Workers = 4 | ||||||
|  | 	controller.KubeClient = newMockKubernetesClient() | ||||||
|  | 	return controller | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | var mockController = newMockController() | ||||||
|  | 
 | ||||||
|  | func TestPodClusterName(t *testing.T) { | ||||||
|  | 	var testTable = []struct { | ||||||
|  | 		in       *v1.Pod | ||||||
|  | 		expected spec.NamespacedName | ||||||
|  | 	}{ | ||||||
|  | 		{ | ||||||
|  | 			&v1.Pod{}, | ||||||
|  | 			spec.NamespacedName{}, | ||||||
|  | 		}, | ||||||
|  | 		{ | ||||||
|  | 			&v1.Pod{ | ||||||
|  | 				ObjectMeta: meta_v1.ObjectMeta{ | ||||||
|  | 					Namespace: v1.NamespaceDefault, | ||||||
|  | 					Labels: map[string]string{ | ||||||
|  | 						mockController.opConfig.ClusterNameLabel: "testcluster", | ||||||
|  | 					}, | ||||||
|  | 				}, | ||||||
|  | 			}, | ||||||
|  | 			spec.NamespacedName{v1.NamespaceDefault, "testcluster"}, | ||||||
|  | 		}, | ||||||
|  | 	} | ||||||
|  | 	for _, test := range testTable { | ||||||
|  | 		resp := mockController.podClusterName(test.in) | ||||||
|  | 		if resp != test.expected { | ||||||
|  | 			t.Errorf("expected response %v does not match the actual %v", test.expected, resp) | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func TestClusterWorkerID(t *testing.T) { | ||||||
|  | 	var testTable = []struct { | ||||||
|  | 		in       spec.NamespacedName | ||||||
|  | 		expected uint32 | ||||||
|  | 	}{ | ||||||
|  | 		{ | ||||||
|  | 			in:       spec.NamespacedName{"foo", "bar"}, | ||||||
|  | 			expected: 2, | ||||||
|  | 		}, | ||||||
|  | 		{ | ||||||
|  | 			in:       spec.NamespacedName{"default", "testcluster"}, | ||||||
|  | 			expected: 3, | ||||||
|  | 		}, | ||||||
|  | 	} | ||||||
|  | 	for _, test := range testTable { | ||||||
|  | 		resp := mockController.clusterWorkerID(test.in) | ||||||
|  | 		if resp != test.expected { | ||||||
|  | 			t.Errorf("expected response %v does not match the actual %v", test.expected, resp) | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func TestGetInfrastructureRoles(t *testing.T) { | ||||||
|  | 	var testTable = []struct { | ||||||
|  | 		secretName    spec.NamespacedName | ||||||
|  | 		expectedRoles map[string]spec.PgUser | ||||||
|  | 		expectedError error | ||||||
|  | 	}{ | ||||||
|  | 		{ | ||||||
|  | 			spec.NamespacedName{}, | ||||||
|  | 			nil, | ||||||
|  | 			nil, | ||||||
|  | 		}, | ||||||
|  | 		{ | ||||||
|  | 			spec.NamespacedName{v1.NamespaceDefault, "null"}, | ||||||
|  | 			nil, | ||||||
|  | 			fmt.Errorf(`could not get infrastructure roles secret: NotFound`), | ||||||
|  | 		}, | ||||||
|  | 		{ | ||||||
|  | 			spec.NamespacedName{v1.NamespaceDefault, testInfrastructureRolesSecretName}, | ||||||
|  | 			map[string]spec.PgUser{ | ||||||
|  | 				"testrole": { | ||||||
|  | 					"testrole", | ||||||
|  | 					"testpassword", | ||||||
|  | 					nil, | ||||||
|  | 					[]string{"testinrole"}, | ||||||
|  | 				}, | ||||||
|  | 			}, | ||||||
|  | 			nil, | ||||||
|  | 		}, | ||||||
|  | 	} | ||||||
|  | 	for _, test := range testTable { | ||||||
|  | 		roles, err := mockController.getInfrastructureRoles(&test.secretName) | ||||||
|  | 		if err != test.expectedError { | ||||||
|  | 			if err != nil && test.expectedError != nil && err.Error() == test.expectedError.Error() { | ||||||
|  | 				continue | ||||||
|  | 			} | ||||||
|  | 			t.Errorf("expected error '%v' does not match the actual error '%v'", test.expectedError, err) | ||||||
|  | 		} | ||||||
|  | 		if !reflect.DeepEqual(roles, test.expectedRoles) { | ||||||
|  | 			t.Errorf("expected roles output %v does not match the actual %v", test.expectedRoles, roles) | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  | @ -87,7 +87,7 @@ type PostgresSpec struct { | ||||||
| 
 | 
 | ||||||
| 	TeamID              string   `json:"teamId"` | 	TeamID              string   `json:"teamId"` | ||||||
| 	AllowedSourceRanges []string `json:"allowedSourceRanges"` | 	AllowedSourceRanges []string `json:"allowedSourceRanges"` | ||||||
| 	// EnableLoadBalancer  is a pointer, since it is importat to know if that parameters is omited from the manifest
 | 	// EnableLoadBalancer  is a pointer, since it is importat to know if that parameters is omitted from the manifest
 | ||||||
| 	UseLoadBalancer     *bool                `json:"useLoadBalancer,omitempty"` | 	UseLoadBalancer     *bool                `json:"useLoadBalancer,omitempty"` | ||||||
| 	ReplicaLoadBalancer bool                 `json:"replicaLoadBalancer,omitempty"` | 	ReplicaLoadBalancer bool                 `json:"replicaLoadBalancer,omitempty"` | ||||||
| 	NumberOfInstances   int32                `json:"numberOfInstances"` | 	NumberOfInstances   int32                `json:"numberOfInstances"` | ||||||
|  |  | ||||||
|  | @ -362,7 +362,7 @@ func TestClusterName(t *testing.T) { | ||||||
| 			continue | 			continue | ||||||
| 		} | 		} | ||||||
| 		if name != tt.clusterName { | 		if name != tt.clusterName { | ||||||
| 			t.Errorf("Expected cluserName: %s, got: %s", tt.clusterName, name) | 			t.Errorf("Expected cluserName: %q, got: %q", tt.clusterName, name) | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  | @ -399,7 +399,7 @@ func TestMarshalMaintenanceWindow(t *testing.T) { | ||||||
| 		} | 		} | ||||||
| 
 | 
 | ||||||
| 		if !bytes.Equal(s, tt.in) { | 		if !bytes.Equal(s, tt.in) { | ||||||
| 			t.Errorf("Expected Marshal: %s, got: %s", string(tt.in), string(s)) | 			t.Errorf("Expected Marshal: %q, got: %q", string(tt.in), string(s)) | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  | @ -434,7 +434,7 @@ func TestMarshal(t *testing.T) { | ||||||
| 			continue | 			continue | ||||||
| 		} | 		} | ||||||
| 		if !bytes.Equal(m, tt.marshal) { | 		if !bytes.Equal(m, tt.marshal) { | ||||||
| 			t.Errorf("Marshal Postgresql expected: %s, got: %s", string(tt.marshal), string(m)) | 			t.Errorf("Marshal Postgresql expected: %q, got: %q", string(tt.marshal), string(m)) | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
|  | @ -1,11 +1,10 @@ | ||||||
| package spec | package spec | ||||||
| 
 | 
 | ||||||
| import ( | import ( | ||||||
|  | 	"database/sql" | ||||||
| 	"fmt" | 	"fmt" | ||||||
| 	"strings" | 	"strings" | ||||||
| 
 | 
 | ||||||
| 	"database/sql" |  | ||||||
| 
 |  | ||||||
| 	"k8s.io/apimachinery/pkg/types" | 	"k8s.io/apimachinery/pkg/types" | ||||||
| 	"k8s.io/client-go/pkg/api/v1" | 	"k8s.io/client-go/pkg/api/v1" | ||||||
| ) | ) | ||||||
|  |  | ||||||
|  | @ -49,7 +49,7 @@ func TestNamespacedNameError(t *testing.T) { | ||||||
| 		var actual NamespacedName | 		var actual NamespacedName | ||||||
| 		err := actual.Decode(tt) | 		err := actual.Decode(tt) | ||||||
| 		if err == nil { | 		if err == nil { | ||||||
| 			t.Errorf("Error expected for '%s', got: %#v", tt, actual) | 			t.Errorf("Error expected for %q, got: %#v", tt, actual) | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
|  | @ -4,10 +4,7 @@ import "time" | ||||||
| 
 | 
 | ||||||
| // General kubernetes-related constants
 | // General kubernetes-related constants
 | ||||||
| const ( | const ( | ||||||
| 	ListClustersURITemplate     = "/apis/" + TPRVendor + "/" + TPRApiVersion + "/namespaces/%s/" + ResourceName       // Namespace
 | 	K8sAPIPath                  = "/apis" | ||||||
| 	WatchClustersURITemplate    = "/apis/" + TPRVendor + "/" + TPRApiVersion + "/watch/namespaces/%s/" + ResourceName // Namespace
 |  | ||||||
| 	K8sVersion                  = "v1" |  | ||||||
| 	K8sAPIPath                  = "/api" |  | ||||||
| 	StatefulsetDeletionInterval = 1 * time.Second | 	StatefulsetDeletionInterval = 1 * time.Second | ||||||
| 	StatefulsetDeletionTimeout  = 30 * time.Second | 	StatefulsetDeletionTimeout  = 30 * time.Second | ||||||
| 
 | 
 | ||||||
|  |  | ||||||
|  | @ -2,7 +2,7 @@ package constants | ||||||
| 
 | 
 | ||||||
| const ( | const ( | ||||||
| 	PasswordLength         = 64 | 	PasswordLength         = 64 | ||||||
| 	UserSecretTemplate     = "%s.%s.credentials." + TPRName + "." + TPRVendor // Username, ClusterName
 | 	UserSecretTemplate     = "%s.%s.credentials." + TPRKind + "." + TPRGroup // Username, ClusterName
 | ||||||
| 	SuperuserKeyName       = "superuser" | 	SuperuserKeyName       = "superuser" | ||||||
| 	ReplicationUserKeyName = "replication" | 	ReplicationUserKeyName = "replication" | ||||||
| 	RoleFlagSuperuser      = "SUPERUSER" | 	RoleFlagSuperuser      = "SUPERUSER" | ||||||
|  |  | ||||||
|  | @ -2,9 +2,10 @@ package constants | ||||||
| 
 | 
 | ||||||
| // Different properties of the PostgreSQL Third Party Resources
 | // Different properties of the PostgreSQL Third Party Resources
 | ||||||
| const ( | const ( | ||||||
| 	TPRName        = "postgresql" | 	TPRKind        = "postgresql" | ||||||
| 	TPRVendor      = "acid.zalan.do" | 	TPRGroup       = "acid.zalan.do" | ||||||
| 	TPRDescription = "Managed PostgreSQL clusters" | 	TPRDescription = "Managed PostgreSQL clusters" | ||||||
| 	TPRApiVersion  = "v1" | 	TPRApiVersion  = "v1" | ||||||
| 	ResourceName   = TPRName + "s" | 	TPRName        = TPRKind + "." + TPRGroup | ||||||
|  | 	ResourceName   = TPRKind + "s" | ||||||
| ) | ) | ||||||
|  |  | ||||||
|  | @ -37,5 +37,5 @@ func (c *Ext234Resize) ResizeFilesystem(deviceName string, commandExecutor func( | ||||||
| 		(strings.Contains(out, "on-line resizing required") && ext2fsSuccessRegexp.MatchString(out)) { | 		(strings.Contains(out, "on-line resizing required") && ext2fsSuccessRegexp.MatchString(out)) { | ||||||
| 		return nil | 		return nil | ||||||
| 	} | 	} | ||||||
| 	return fmt.Errorf("unrecognized output: %s, assuming error", out) | 	return fmt.Errorf("unrecognized output: %q, assuming error", out) | ||||||
| } | } | ||||||
|  |  | ||||||
|  | @ -1,32 +1,60 @@ | ||||||
| package k8sutil | package k8sutil | ||||||
| 
 | 
 | ||||||
| import ( | import ( | ||||||
| 	"fmt" |  | ||||||
| 	"time" | 	"time" | ||||||
| 
 | 
 | ||||||
| 	apierrors "k8s.io/apimachinery/pkg/api/errors" | 	apierrors "k8s.io/apimachinery/pkg/api/errors" | ||||||
| 	meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1" |  | ||||||
| 	"k8s.io/apimachinery/pkg/runtime" |  | ||||||
| 	"k8s.io/apimachinery/pkg/runtime/schema" | 	"k8s.io/apimachinery/pkg/runtime/schema" | ||||||
| 	"k8s.io/apimachinery/pkg/runtime/serializer" | 	"k8s.io/apimachinery/pkg/runtime/serializer" | ||||||
| 	"k8s.io/client-go/kubernetes" | 	"k8s.io/client-go/kubernetes" | ||||||
|  | 	v1beta1 "k8s.io/client-go/kubernetes/typed/apps/v1beta1" | ||||||
|  | 	v1core "k8s.io/client-go/kubernetes/typed/core/v1" | ||||||
|  | 	extensions "k8s.io/client-go/kubernetes/typed/extensions/v1beta1" | ||||||
| 	"k8s.io/client-go/pkg/api" | 	"k8s.io/client-go/pkg/api" | ||||||
| 	"k8s.io/client-go/rest" | 	"k8s.io/client-go/rest" | ||||||
| 	"k8s.io/client-go/tools/clientcmd" | 	"k8s.io/client-go/tools/clientcmd" | ||||||
| 
 | 
 | ||||||
| 	"github.com/zalando-incubator/postgres-operator/pkg/spec" |  | ||||||
| 	"github.com/zalando-incubator/postgres-operator/pkg/util/constants" | 	"github.com/zalando-incubator/postgres-operator/pkg/util/constants" | ||||||
| 	"github.com/zalando-incubator/postgres-operator/pkg/util/retryutil" | 	"github.com/zalando-incubator/postgres-operator/pkg/util/retryutil" | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
|  | type KubernetesClient struct { | ||||||
|  | 	v1core.SecretsGetter | ||||||
|  | 	v1core.ServicesGetter | ||||||
|  | 	v1core.EndpointsGetter | ||||||
|  | 	v1core.PodsGetter | ||||||
|  | 	v1core.PersistentVolumesGetter | ||||||
|  | 	v1core.PersistentVolumeClaimsGetter | ||||||
|  | 	v1core.ConfigMapsGetter | ||||||
|  | 	v1beta1.StatefulSetsGetter | ||||||
|  | 	extensions.ThirdPartyResourcesGetter | ||||||
|  | 	RESTClient rest.Interface | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func NewFromKubernetesInterface(src kubernetes.Interface) (c KubernetesClient) { | ||||||
|  | 	c = KubernetesClient{} | ||||||
|  | 	c.PodsGetter = src.CoreV1() | ||||||
|  | 	c.ServicesGetter = src.CoreV1() | ||||||
|  | 	c.EndpointsGetter = src.CoreV1() | ||||||
|  | 	c.SecretsGetter = src.CoreV1() | ||||||
|  | 	c.ConfigMapsGetter = src.CoreV1() | ||||||
|  | 	c.PersistentVolumeClaimsGetter = src.CoreV1() | ||||||
|  | 	c.PersistentVolumesGetter = src.CoreV1() | ||||||
|  | 	c.StatefulSetsGetter = src.AppsV1beta1() | ||||||
|  | 	c.ThirdPartyResourcesGetter = src.ExtensionsV1beta1() | ||||||
|  | 	c.RESTClient = src.CoreV1().RESTClient() | ||||||
|  | 	return | ||||||
|  | } | ||||||
|  | 
 | ||||||
| func RestConfig(kubeConfig string, outOfCluster bool) (*rest.Config, error) { | func RestConfig(kubeConfig string, outOfCluster bool) (*rest.Config, error) { | ||||||
| 	if outOfCluster { | 	if outOfCluster { | ||||||
| 		return clientcmd.BuildConfigFromFlags("", kubeConfig) | 		return clientcmd.BuildConfigFromFlags("", kubeConfig) | ||||||
| 	} | 	} | ||||||
|  | 
 | ||||||
| 	return rest.InClusterConfig() | 	return rest.InClusterConfig() | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func KubernetesClient(config *rest.Config) (client *kubernetes.Clientset, err error) { | func ClientSet(config *rest.Config) (client *kubernetes.Clientset, err error) { | ||||||
| 	return kubernetes.NewForConfig(config) | 	return kubernetes.NewForConfig(config) | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | @ -38,35 +66,24 @@ func ResourceNotFound(err error) bool { | ||||||
| 	return apierrors.IsNotFound(err) | 	return apierrors.IsNotFound(err) | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func KubernetesRestClient(c *rest.Config) (*rest.RESTClient, error) { | func KubernetesRestClient(cfg rest.Config) (rest.Interface, error) { | ||||||
| 	c.GroupVersion = &schema.GroupVersion{Version: constants.K8sVersion} | 	cfg.GroupVersion = &schema.GroupVersion{ | ||||||
| 	c.APIPath = constants.K8sAPIPath | 		Group:   constants.TPRGroup, | ||||||
| 	c.NegotiatedSerializer = serializer.DirectCodecFactory{CodecFactory: api.Codecs} |  | ||||||
| 
 |  | ||||||
| 	schemeBuilder := runtime.NewSchemeBuilder( |  | ||||||
| 		func(scheme *runtime.Scheme) error { |  | ||||||
| 			scheme.AddKnownTypes( |  | ||||||
| 				schema.GroupVersion{ |  | ||||||
| 					Group:   constants.TPRVendor, |  | ||||||
| 		Version: constants.TPRApiVersion, | 		Version: constants.TPRApiVersion, | ||||||
| 				}, |  | ||||||
| 				&spec.Postgresql{}, |  | ||||||
| 				&spec.PostgresqlList{}, |  | ||||||
| 				&meta_v1.ListOptions{}, |  | ||||||
| 				&meta_v1.DeleteOptions{}, |  | ||||||
| 			) |  | ||||||
| 			return nil |  | ||||||
| 		}) |  | ||||||
| 	if err := schemeBuilder.AddToScheme(api.Scheme); err != nil { |  | ||||||
| 		return nil, fmt.Errorf("could not apply functions to register PostgreSQL TPR type: %v", err) |  | ||||||
| 	} | 	} | ||||||
|  | 	cfg.APIPath = constants.K8sAPIPath | ||||||
|  | 	cfg.NegotiatedSerializer = serializer.DirectCodecFactory{CodecFactory: api.Codecs} | ||||||
| 
 | 
 | ||||||
| 	return rest.RESTClientFor(c) | 	return rest.RESTClientFor(&cfg) | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func WaitTPRReady(restclient rest.Interface, interval, timeout time.Duration, ns string) error { | func WaitTPRReady(restclient rest.Interface, interval, timeout time.Duration, ns string) error { | ||||||
| 	return retryutil.Retry(interval, timeout, func() (bool, error) { | 	return retryutil.Retry(interval, timeout, func() (bool, error) { | ||||||
| 		_, err := restclient.Get().RequestURI(fmt.Sprintf(constants.ListClustersURITemplate, ns)).DoRaw() | 		_, err := restclient. | ||||||
|  | 			Get(). | ||||||
|  | 			Namespace(ns). | ||||||
|  | 			Resource(constants.ResourceName). | ||||||
|  | 			DoRaw() | ||||||
| 		if err != nil { | 		if err != nil { | ||||||
| 			if ResourceNotFound(err) { // not set up yet. wait more.
 | 			if ResourceNotFound(err) { // not set up yet. wait more.
 | ||||||
| 				return false, nil | 				return false, nil | ||||||
|  |  | ||||||
|  | @ -66,11 +66,11 @@ func (s DefaultUserSyncStrategy) ExecuteSyncRequests(reqs []spec.PgSyncUserReque | ||||||
| 		switch r.Kind { | 		switch r.Kind { | ||||||
| 		case spec.PGSyncUserAdd: | 		case spec.PGSyncUserAdd: | ||||||
| 			if err := s.createPgUser(r.User, db); err != nil { | 			if err := s.createPgUser(r.User, db); err != nil { | ||||||
| 				return fmt.Errorf("could not create user '%s': %v", r.User.Name, err) | 				return fmt.Errorf("could not create user %q: %v", r.User.Name, err) | ||||||
| 			} | 			} | ||||||
| 		case spec.PGsyncUserAlter: | 		case spec.PGsyncUserAlter: | ||||||
| 			if err := s.alterPgUser(r.User, db); err != nil { | 			if err := s.alterPgUser(r.User, db); err != nil { | ||||||
| 				return fmt.Errorf("could not alter user '%s': %v", r.User.Name, err) | 				return fmt.Errorf("could not alter user %q: %v", r.User.Name, err) | ||||||
| 			} | 			} | ||||||
| 		default: | 		default: | ||||||
| 			return fmt.Errorf("unrecognized operation: %v", r.Kind) | 			return fmt.Errorf("unrecognized operation: %v", r.Kind) | ||||||
|  | @ -100,7 +100,7 @@ func (s DefaultUserSyncStrategy) createPgUser(user spec.PgUser, db *sql.DB) (err | ||||||
| 
 | 
 | ||||||
| 	_, err = db.Query(query) // TODO: Try several times
 | 	_, err = db.Query(query) // TODO: Try several times
 | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		err = fmt.Errorf("dB error: %s, query: %v", err, query) | 		err = fmt.Errorf("dB error: %v, query: %q", err, query) | ||||||
| 		return | 		return | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
|  | @ -122,7 +122,7 @@ func (s DefaultUserSyncStrategy) alterPgUser(user spec.PgUser, db *sql.DB) (err | ||||||
| 
 | 
 | ||||||
| 	_, err = db.Query(query) // TODO: Try several times
 | 	_, err = db.Query(query) // TODO: Try several times
 | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		err = fmt.Errorf("dB error: %s query %v", err, query) | 		err = fmt.Errorf("dB error: %v query %q", err, query) | ||||||
| 		return | 		return | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
|  |  | ||||||
|  | @ -73,7 +73,7 @@ func TestPGUserPassword(t *testing.T) { | ||||||
| 	for _, tt := range pgUsers { | 	for _, tt := range pgUsers { | ||||||
| 		pwd := PGUserPassword(tt.in) | 		pwd := PGUserPassword(tt.in) | ||||||
| 		if pwd != tt.out { | 		if pwd != tt.out { | ||||||
| 			t.Errorf("PgUserPassword expected: %s, got: %s", tt.out, pwd) | 			t.Errorf("PgUserPassword expected: %q, got: %q", tt.out, pwd) | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  | @ -81,7 +81,7 @@ func TestPGUserPassword(t *testing.T) { | ||||||
| func TestPrettyDiff(t *testing.T) { | func TestPrettyDiff(t *testing.T) { | ||||||
| 	for _, tt := range prettyDiffTest { | 	for _, tt := range prettyDiffTest { | ||||||
| 		if actual := PrettyDiff(tt.inA, tt.inB); actual != tt.out { | 		if actual := PrettyDiff(tt.inA, tt.inB); actual != tt.out { | ||||||
| 			t.Errorf("PrettyDiff expected: %s, got: %s", tt.out, actual) | 			t.Errorf("PrettyDiff expected: %q, got: %q", tt.out, actual) | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
|  | @ -42,11 +42,11 @@ func (c *EBSVolumeResizer) VolumeBelongsToProvider(pv *v1.PersistentVolume) bool | ||||||
| func (c *EBSVolumeResizer) GetProviderVolumeID(pv *v1.PersistentVolume) (string, error) { | func (c *EBSVolumeResizer) GetProviderVolumeID(pv *v1.PersistentVolume) (string, error) { | ||||||
| 	volumeID := pv.Spec.AWSElasticBlockStore.VolumeID | 	volumeID := pv.Spec.AWSElasticBlockStore.VolumeID | ||||||
| 	if volumeID == "" { | 	if volumeID == "" { | ||||||
| 		return "", fmt.Errorf("volume id is empty for volume %s", pv.Name) | 		return "", fmt.Errorf("volume id is empty for volume %q", pv.Name) | ||||||
| 	} | 	} | ||||||
| 	idx := strings.LastIndex(volumeID, constants.EBSVolumeIDStart) + 1 | 	idx := strings.LastIndex(volumeID, constants.EBSVolumeIDStart) + 1 | ||||||
| 	if idx == 0 { | 	if idx == 0 { | ||||||
| 		return "", fmt.Errorf("malfored EBS volume id %s", volumeID) | 		return "", fmt.Errorf("malfored EBS volume id %q", volumeID) | ||||||
| 	} | 	} | ||||||
| 	return volumeID[idx:], nil | 	return volumeID[idx:], nil | ||||||
| } | } | ||||||
|  | @ -60,7 +60,7 @@ func (c *EBSVolumeResizer) ResizeVolume(volumeId string, newSize int64) error { | ||||||
| 	} | 	} | ||||||
| 	vol := volumeOutput.Volumes[0] | 	vol := volumeOutput.Volumes[0] | ||||||
| 	if *vol.VolumeId != volumeId { | 	if *vol.VolumeId != volumeId { | ||||||
| 		return fmt.Errorf("describe volume %s returned information about a non-matching volume %s", volumeId, *vol.VolumeId) | 		return fmt.Errorf("describe volume %q returned information about a non-matching volume %q", volumeId, *vol.VolumeId) | ||||||
| 	} | 	} | ||||||
| 	if *vol.Size == newSize { | 	if *vol.Size == newSize { | ||||||
| 		// nothing to do
 | 		// nothing to do
 | ||||||
|  | @ -74,7 +74,7 @@ func (c *EBSVolumeResizer) ResizeVolume(volumeId string, newSize int64) error { | ||||||
| 
 | 
 | ||||||
| 	state := *output.VolumeModification.ModificationState | 	state := *output.VolumeModification.ModificationState | ||||||
| 	if state == constants.EBSVolumeStateFailed { | 	if state == constants.EBSVolumeStateFailed { | ||||||
| 		return fmt.Errorf("could not modify persistent volume %s: modification state failed", volumeId) | 		return fmt.Errorf("could not modify persistent volume %q: modification state failed", volumeId) | ||||||
| 	} | 	} | ||||||
| 	if state == "" { | 	if state == "" { | ||||||
| 		return fmt.Errorf("received empty modification status") | 		return fmt.Errorf("received empty modification status") | ||||||
|  | @ -91,10 +91,10 @@ func (c *EBSVolumeResizer) ResizeVolume(volumeId string, newSize int64) error { | ||||||
| 				return false, fmt.Errorf("could not describe volume modification: %v", err) | 				return false, fmt.Errorf("could not describe volume modification: %v", err) | ||||||
| 			} | 			} | ||||||
| 			if len(out.VolumesModifications) != 1 { | 			if len(out.VolumesModifications) != 1 { | ||||||
| 				return false, fmt.Errorf("describe volume modification didn't return one record for volume \"%s\"", volumeId) | 				return false, fmt.Errorf("describe volume modification didn't return one record for volume %q", volumeId) | ||||||
| 			} | 			} | ||||||
| 			if *out.VolumesModifications[0].VolumeId != volumeId { | 			if *out.VolumesModifications[0].VolumeId != volumeId { | ||||||
| 				return false, fmt.Errorf("non-matching volume id when describing modifications: \"%s\" is different from \"%s\"", | 				return false, fmt.Errorf("non-matching volume id when describing modifications: %q is different from %q", | ||||||
| 					*out.VolumesModifications[0].VolumeId, volumeId) | 					*out.VolumesModifications[0].VolumeId, volumeId) | ||||||
| 			} | 			} | ||||||
| 			return *out.VolumesModifications[0].ModificationState != constants.EBSVolumeStateModifying, nil | 			return *out.VolumesModifications[0].ModificationState != constants.EBSVolumeStateModifying, nil | ||||||
|  |  | ||||||
		Loading…
	
		Reference in New Issue