Pass cluster name as a logger field
This commit is contained in:
		
						commit
						13bf0c6bea
					
				|  | @ -79,8 +79,6 @@ type compareStatefulsetResult struct { | ||||||
| 
 | 
 | ||||||
| // New creates a new cluster. This function should be called from a controller.
 | // New creates a new cluster. This function should be called from a controller.
 | ||||||
| func New(cfg Config, kubeClient k8sutil.KubernetesClient, pgSpec spec.Postgresql, logger *logrus.Entry) *Cluster { | func New(cfg Config, kubeClient k8sutil.KubernetesClient, pgSpec spec.Postgresql, logger *logrus.Entry) *Cluster { | ||||||
| 	lg := logger.WithField("pkg", "cluster").WithField("cluster-name", pgSpec.Name) |  | ||||||
| 	k8sResources := kubeResources{Secrets: make(map[types.UID]*v1.Secret), Service: make(map[postgresRole]*v1.Service)} |  | ||||||
| 	orphanDependents := true | 	orphanDependents := true | ||||||
| 
 | 
 | ||||||
| 	podEventsQueue := cache.NewFIFO(func(obj interface{}) (string, error) { | 	podEventsQueue := cache.NewFIFO(func(obj interface{}) (string, error) { | ||||||
|  | @ -95,18 +93,18 @@ func New(cfg Config, kubeClient k8sutil.KubernetesClient, pgSpec spec.Postgresql | ||||||
| 	cluster := &Cluster{ | 	cluster := &Cluster{ | ||||||
| 		Config:           cfg, | 		Config:           cfg, | ||||||
| 		Postgresql:       pgSpec, | 		Postgresql:       pgSpec, | ||||||
| 		logger:           lg, |  | ||||||
| 		pgUsers:          make(map[string]spec.PgUser), | 		pgUsers:          make(map[string]spec.PgUser), | ||||||
| 		systemUsers:      make(map[string]spec.PgUser), | 		systemUsers:      make(map[string]spec.PgUser), | ||||||
| 		podSubscribers:   make(map[spec.NamespacedName]chan spec.PodEvent), | 		podSubscribers:   make(map[spec.NamespacedName]chan spec.PodEvent), | ||||||
| 		kubeResources:    k8sResources, | 		kubeResources:    kubeResources{Secrets: make(map[types.UID]*v1.Secret), Service: make(map[postgresRole]*v1.Service)}, | ||||||
| 		masterLess:       false, | 		masterLess:       false, | ||||||
| 		userSyncStrategy: users.DefaultUserSyncStrategy{}, | 		userSyncStrategy: users.DefaultUserSyncStrategy{}, | ||||||
| 		deleteOptions:    &metav1.DeleteOptions{OrphanDependents: &orphanDependents}, | 		deleteOptions:    &metav1.DeleteOptions{OrphanDependents: &orphanDependents}, | ||||||
| 		podEventsQueue:   podEventsQueue, | 		podEventsQueue:   podEventsQueue, | ||||||
| 		KubeClient:       kubeClient, | 		KubeClient:       kubeClient, | ||||||
| 		teamsAPIClient:   teams.NewTeamsAPI(cfg.OpConfig.TeamsAPIUrl, logger.Logger), | 		teamsAPIClient:   teams.NewTeamsAPI(cfg.OpConfig.TeamsAPIUrl, logger), | ||||||
| 	} | 	} | ||||||
|  | 	cluster.logger = logger.WithField("pkg", "cluster").WithField("cluster-name", cluster.clusterName()) | ||||||
| 
 | 
 | ||||||
| 	return cluster | 	return cluster | ||||||
| } | } | ||||||
|  | @ -139,7 +137,7 @@ func (c *Cluster) setStatus(status spec.PostgresStatus) { | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		c.logger.Warningf("could not set status for cluster %q: %v", c.clusterName(), err) | 		c.logger.Warningf("could not set status for the cluster: %v", err) | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | @ -205,7 +203,7 @@ func (c *Cluster) Create() error { | ||||||
| 	if err = c.initUsers(); err != nil { | 	if err = c.initUsers(); err != nil { | ||||||
| 		return err | 		return err | ||||||
| 	} | 	} | ||||||
| 	c.logger.Infof("Users have been initialized") | 	c.logger.Infof("users have been initialized") | ||||||
| 
 | 
 | ||||||
| 	if err = c.applySecrets(); err != nil { | 	if err = c.applySecrets(); err != nil { | ||||||
| 		return fmt.Errorf("could not create secrets: %v", err) | 		return fmt.Errorf("could not create secrets: %v", err) | ||||||
|  | @ -218,10 +216,10 @@ func (c *Cluster) Create() error { | ||||||
| 	} | 	} | ||||||
| 	c.logger.Infof("statefulset %q has been successfully created", util.NameFromMeta(ss.ObjectMeta)) | 	c.logger.Infof("statefulset %q has been successfully created", util.NameFromMeta(ss.ObjectMeta)) | ||||||
| 
 | 
 | ||||||
| 	c.logger.Info("Waiting for cluster being ready") | 	c.logger.Info("waiting for the cluster being ready") | ||||||
| 
 | 
 | ||||||
| 	if err = c.waitStatefulsetPodsReady(); err != nil { | 	if err = c.waitStatefulsetPodsReady(); err != nil { | ||||||
| 		c.logger.Errorf("Failed to create cluster: %v", err) | 		c.logger.Errorf("failed to create cluster: %v", err) | ||||||
| 		return err | 		return err | ||||||
| 	} | 	} | ||||||
| 	c.logger.Infof("pods are ready") | 	c.logger.Infof("pods are ready") | ||||||
|  | @ -231,10 +229,10 @@ func (c *Cluster) Create() error { | ||||||
| 		if err != nil { | 		if err != nil { | ||||||
| 			return fmt.Errorf("could not create users: %v", err) | 			return fmt.Errorf("could not create users: %v", err) | ||||||
| 		} | 		} | ||||||
| 		c.logger.Infof("Users have been successfully created") | 		c.logger.Infof("users have been successfully created") | ||||||
| 	} else { | 	} else { | ||||||
| 		if c.masterLess { | 		if c.masterLess { | ||||||
| 			c.logger.Warnln("Cluster is masterless") | 			c.logger.Warnln("cluster is masterless") | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
|  | @ -411,7 +409,7 @@ func (c *Cluster) Update(newSpec *spec.Postgresql) error { | ||||||
| 	defer c.mu.Unlock() | 	defer c.mu.Unlock() | ||||||
| 
 | 
 | ||||||
| 	c.setStatus(spec.ClusterStatusUpdating) | 	c.setStatus(spec.ClusterStatusUpdating) | ||||||
| 	c.logger.Debugf("Cluster update from version %q to %q", | 	c.logger.Debugf("cluster update from version %q to %q", | ||||||
| 		c.ResourceVersion, newSpec.ResourceVersion) | 		c.ResourceVersion, newSpec.ResourceVersion) | ||||||
| 
 | 
 | ||||||
| 	/* Make sure we update when this function exists */ | 	/* Make sure we update when this function exists */ | ||||||
|  | @ -481,25 +479,25 @@ func (c *Cluster) Update(newSpec *spec.Postgresql) error { | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	if c.Spec.PgVersion != newSpec.Spec.PgVersion { // PG versions comparison
 | 	if c.Spec.PgVersion != newSpec.Spec.PgVersion { // PG versions comparison
 | ||||||
| 		c.logger.Warnf("Postgresql version change(%q -> %q) is not allowed", | 		c.logger.Warnf("postgresql version change(%q -> %q) is not allowed", | ||||||
| 			c.Spec.PgVersion, newSpec.Spec.PgVersion) | 			c.Spec.PgVersion, newSpec.Spec.PgVersion) | ||||||
| 		//TODO: rewrite pg version in tpr spec
 | 		//TODO: rewrite pg version in tpr spec
 | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	if cmp.rollingUpdate { | 	if cmp.rollingUpdate { | ||||||
| 		c.logger.Infof("Rolling update is needed") | 		c.logger.Infof("rolling update is needed") | ||||||
| 		// TODO: wait for actual streaming to the replica
 | 		// TODO: wait for actual streaming to the replica
 | ||||||
| 		if err := c.recreatePods(); err != nil { | 		if err := c.recreatePods(); err != nil { | ||||||
| 			c.setStatus(spec.ClusterStatusUpdateFailed) | 			c.setStatus(spec.ClusterStatusUpdateFailed) | ||||||
| 			return fmt.Errorf("could not recreate pods: %v", err) | 			return fmt.Errorf("could not recreate pods: %v", err) | ||||||
| 		} | 		} | ||||||
| 		c.logger.Infof("Rolling update has been finished") | 		c.logger.Infof("rolling update has been finished") | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	if match, reason := c.sameVolumeWith(newSpec.Spec.Volume); !match { | 	if match, reason := c.sameVolumeWith(newSpec.Spec.Volume); !match { | ||||||
| 		c.logVolumeChanges(c.Spec.Volume, newSpec.Spec.Volume, reason) | 		c.logVolumeChanges(c.Spec.Volume, newSpec.Spec.Volume, reason) | ||||||
| 		if err := c.resizeVolumes(newSpec.Spec.Volume, []volumes.VolumeResizer{&volumes.EBSVolumeResizer{}}); err != nil { | 		if err := c.resizeVolumes(newSpec.Spec.Volume, []volumes.VolumeResizer{&volumes.EBSVolumeResizer{}}); err != nil { | ||||||
| 			return fmt.Errorf("Could not update volumes: %v", err) | 			return fmt.Errorf("could not update volumes: %v", err) | ||||||
| 		} | 		} | ||||||
| 		c.logger.Infof("volumes have been updated successfully") | 		c.logger.Infof("volumes have been updated successfully") | ||||||
| 	} | 	} | ||||||
|  | @ -575,7 +573,7 @@ func (c *Cluster) processPodEventQueue(stopCh <-chan struct{}) { | ||||||
| 			return | 			return | ||||||
| 		default: | 		default: | ||||||
| 			if _, err := c.podEventsQueue.Pop(cache.PopProcessFunc(c.processPodEvent)); err != nil { | 			if _, err := c.podEventsQueue.Pop(cache.PopProcessFunc(c.processPodEvent)); err != nil { | ||||||
| 				c.logger.Errorf("error when processing pod event queeue %v", err) | 				c.logger.Errorf("error when processing pod event queue %v", err) | ||||||
| 			} | 			} | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
|  | @ -145,7 +145,7 @@ PATRONI_INITDB_PARAMS: | ||||||
| 					} | 					} | ||||||
| 				} | 				} | ||||||
| 			default: | 			default: | ||||||
| 				c.logger.Warnf("Unsupported type for initdb configuration item %s: %T", defaultParam) | 				c.logger.Warnf("unsupported type for initdb configuration item %s: %T", defaultParam, defaultParam) | ||||||
| 				continue PATRONI_INITDB_PARAMS | 				continue PATRONI_INITDB_PARAMS | ||||||
| 			} | 			} | ||||||
| 		} | 		} | ||||||
|  | @ -199,7 +199,7 @@ PATRONI_INITDB_PARAMS: | ||||||
| 	} | 	} | ||||||
| 	result, err := json.Marshal(config) | 	result, err := json.Marshal(config) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		c.logger.Errorf("Cannot convert spilo configuration into JSON: %v", err) | 		c.logger.Errorf("cannot convert spilo configuration into JSON: %v", err) | ||||||
| 		return "" | 		return "" | ||||||
| 	} | 	} | ||||||
| 	return string(result) | 	return string(result) | ||||||
|  |  | ||||||
|  | @ -34,7 +34,7 @@ func (c *Cluster) pgConnectionString() string { | ||||||
| 
 | 
 | ||||||
| func (c *Cluster) databaseAccessDisabled() bool { | func (c *Cluster) databaseAccessDisabled() bool { | ||||||
| 	if !c.OpConfig.EnableDBAccess { | 	if !c.OpConfig.EnableDBAccess { | ||||||
| 		c.logger.Debugf("Database access is disabled") | 		c.logger.Debugf("database access is disabled") | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	return !c.OpConfig.EnableDBAccess | 	return !c.OpConfig.EnableDBAccess | ||||||
|  |  | ||||||
|  | @ -26,7 +26,7 @@ func (c *Cluster) listPods() ([]v1.Pod, error) { | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (c *Cluster) deletePods() error { | func (c *Cluster) deletePods() error { | ||||||
| 	c.logger.Debugln("Deleting pods") | 	c.logger.Debugln("deleting pods") | ||||||
| 	pods, err := c.listPods() | 	pods, err := c.listPods() | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return err | 		return err | ||||||
|  | @ -35,7 +35,7 @@ func (c *Cluster) deletePods() error { | ||||||
| 	for _, obj := range pods { | 	for _, obj := range pods { | ||||||
| 		podName := util.NameFromMeta(obj.ObjectMeta) | 		podName := util.NameFromMeta(obj.ObjectMeta) | ||||||
| 
 | 
 | ||||||
| 		c.logger.Debugf("Deleting pod %q", podName) | 		c.logger.Debugf("deleting pod %q", podName) | ||||||
| 		if err := c.deletePod(podName); err != nil { | 		if err := c.deletePod(podName); err != nil { | ||||||
| 			c.logger.Errorf("could not delete pod %q: %v", podName, err) | 			c.logger.Errorf("could not delete pod %q: %v", podName, err) | ||||||
| 		} else { | 		} else { | ||||||
|  | @ -45,7 +45,7 @@ func (c *Cluster) deletePods() error { | ||||||
| 	if len(pods) > 0 { | 	if len(pods) > 0 { | ||||||
| 		c.logger.Debugln("pods have been deleted") | 		c.logger.Debugln("pods have been deleted") | ||||||
| 	} else { | 	} else { | ||||||
| 		c.logger.Debugln("No pods to delete") | 		c.logger.Debugln("no pods to delete") | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	return nil | 	return nil | ||||||
|  | @ -71,7 +71,7 @@ func (c *Cluster) unregisterPodSubscriber(podName spec.NamespacedName) { | ||||||
| 	defer c.podSubscribersMu.Unlock() | 	defer c.podSubscribersMu.Unlock() | ||||||
| 
 | 
 | ||||||
| 	if _, ok := c.podSubscribers[podName]; !ok { | 	if _, ok := c.podSubscribers[podName]; !ok { | ||||||
| 		panic("Subscriber for pod '" + podName.String() + "' is not found") | 		panic("subscriber for pod '" + podName.String() + "' is not found") | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	close(c.podSubscribers[podName]) | 	close(c.podSubscribers[podName]) | ||||||
|  | @ -124,7 +124,7 @@ func (c *Cluster) recreatePods() error { | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return fmt.Errorf("could not get the list of pods: %v", err) | 		return fmt.Errorf("could not get the list of pods: %v", err) | ||||||
| 	} | 	} | ||||||
| 	c.logger.Infof("There are %d pods in the cluster to recreate", len(pods.Items)) | 	c.logger.Infof("there are %d pods in the cluster to recreate", len(pods.Items)) | ||||||
| 
 | 
 | ||||||
| 	var masterPod v1.Pod | 	var masterPod v1.Pod | ||||||
| 	for _, pod := range pods.Items { | 	for _, pod := range pods.Items { | ||||||
|  | @ -140,11 +140,11 @@ func (c *Cluster) recreatePods() error { | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
| 	if masterPod.Name == "" { | 	if masterPod.Name == "" { | ||||||
| 		c.logger.Warningln("No master pod in the cluster") | 		c.logger.Warningln("no master pod in the cluster") | ||||||
| 	} else { | 	} else { | ||||||
| 		//TODO: do manual failover
 | 		//TODO: do manual failover
 | ||||||
| 		//TODO: specify master, leave new master empty
 | 		//TODO: specify master, leave new master empty
 | ||||||
| 		c.logger.Infof("Recreating master pod %q", util.NameFromMeta(masterPod.ObjectMeta)) | 		c.logger.Infof("recreating master pod %q", util.NameFromMeta(masterPod.ObjectMeta)) | ||||||
| 
 | 
 | ||||||
| 		if err := c.recreatePod(masterPod); err != nil { | 		if err := c.recreatePod(masterPod); err != nil { | ||||||
| 			return fmt.Errorf("could not recreate master pod %q: %v", util.NameFromMeta(masterPod.ObjectMeta), err) | 			return fmt.Errorf("could not recreate master pod %q: %v", util.NameFromMeta(masterPod.ObjectMeta), err) | ||||||
|  |  | ||||||
|  | @ -80,19 +80,19 @@ func (c *Cluster) loadResources() error { | ||||||
| 
 | 
 | ||||||
| func (c *Cluster) listResources() error { | func (c *Cluster) listResources() error { | ||||||
| 	if c.Statefulset != nil { | 	if c.Statefulset != nil { | ||||||
| 		c.logger.Infof("Found statefulset: %q (uid: %q)", util.NameFromMeta(c.Statefulset.ObjectMeta), c.Statefulset.UID) | 		c.logger.Infof("found statefulset: %q (uid: %q)", util.NameFromMeta(c.Statefulset.ObjectMeta), c.Statefulset.UID) | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	for _, obj := range c.Secrets { | 	for _, obj := range c.Secrets { | ||||||
| 		c.logger.Infof("Found secret: %q (uid: %q)", util.NameFromMeta(obj.ObjectMeta), obj.UID) | 		c.logger.Infof("found secret: %q (uid: %q)", util.NameFromMeta(obj.ObjectMeta), obj.UID) | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	if c.Endpoint != nil { | 	if c.Endpoint != nil { | ||||||
| 		c.logger.Infof("Found endpoint: %q (uid: %q)", util.NameFromMeta(c.Endpoint.ObjectMeta), c.Endpoint.UID) | 		c.logger.Infof("found endpoint: %q (uid: %q)", util.NameFromMeta(c.Endpoint.ObjectMeta), c.Endpoint.UID) | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	for role, service := range c.Service { | 	for role, service := range c.Service { | ||||||
| 		c.logger.Infof("Found %s service: %q (uid: %q)", role, util.NameFromMeta(service.ObjectMeta), service.UID) | 		c.logger.Infof("found %s service: %q (uid: %q)", role, util.NameFromMeta(service.ObjectMeta), service.UID) | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	pods, err := c.listPods() | 	pods, err := c.listPods() | ||||||
|  | @ -101,7 +101,7 @@ func (c *Cluster) listResources() error { | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	for _, obj := range pods { | 	for _, obj := range pods { | ||||||
| 		c.logger.Infof("Found pod: %q (uid: %q)", util.NameFromMeta(obj.ObjectMeta), obj.UID) | 		c.logger.Infof("found pod: %q (uid: %q)", util.NameFromMeta(obj.ObjectMeta), obj.UID) | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	pvcs, err := c.listPersistentVolumeClaims() | 	pvcs, err := c.listPersistentVolumeClaims() | ||||||
|  | @ -110,7 +110,7 @@ func (c *Cluster) listResources() error { | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	for _, obj := range pvcs { | 	for _, obj := range pvcs { | ||||||
| 		c.logger.Infof("Found PVC: %q (uid: %q)", util.NameFromMeta(obj.ObjectMeta), obj.UID) | 		c.logger.Infof("found PVC: %q (uid: %q)", util.NameFromMeta(obj.ObjectMeta), obj.UID) | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	return nil | 	return nil | ||||||
|  | @ -129,7 +129,7 @@ func (c *Cluster) createStatefulSet() (*v1beta1.StatefulSet, error) { | ||||||
| 		return nil, err | 		return nil, err | ||||||
| 	} | 	} | ||||||
| 	c.Statefulset = statefulSet | 	c.Statefulset = statefulSet | ||||||
| 	c.logger.Debugf("Created new statefulset %q, uid: %q", util.NameFromMeta(statefulSet.ObjectMeta), statefulSet.UID) | 	c.logger.Debugf("created new statefulset %q, uid: %q", util.NameFromMeta(statefulSet.ObjectMeta), statefulSet.UID) | ||||||
| 
 | 
 | ||||||
| 	return statefulSet, nil | 	return statefulSet, nil | ||||||
| } | } | ||||||
|  | @ -140,7 +140,7 @@ func (c *Cluster) updateStatefulSet(newStatefulSet *v1beta1.StatefulSet) error { | ||||||
| 	} | 	} | ||||||
| 	statefulSetName := util.NameFromMeta(c.Statefulset.ObjectMeta) | 	statefulSetName := util.NameFromMeta(c.Statefulset.ObjectMeta) | ||||||
| 
 | 
 | ||||||
| 	c.logger.Debugf("Updating statefulset") | 	c.logger.Debugf("updating statefulset") | ||||||
| 
 | 
 | ||||||
| 	patchData, err := specPatch(newStatefulSet.Spec) | 	patchData, err := specPatch(newStatefulSet.Spec) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
|  | @ -166,7 +166,7 @@ func (c *Cluster) replaceStatefulSet(newStatefulSet *v1beta1.StatefulSet) error | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	statefulSetName := util.NameFromMeta(c.Statefulset.ObjectMeta) | 	statefulSetName := util.NameFromMeta(c.Statefulset.ObjectMeta) | ||||||
| 	c.logger.Debugf("Replacing statefulset") | 	c.logger.Debugf("replacing statefulset") | ||||||
| 
 | 
 | ||||||
| 	// Delete the current statefulset without deleting the pods
 | 	// Delete the current statefulset without deleting the pods
 | ||||||
| 	orphanDepencies := true | 	orphanDepencies := true | ||||||
|  | @ -179,7 +179,7 @@ func (c *Cluster) replaceStatefulSet(newStatefulSet *v1beta1.StatefulSet) error | ||||||
| 	// make sure we clear the stored statefulset status if the subsequent create fails.
 | 	// make sure we clear the stored statefulset status if the subsequent create fails.
 | ||||||
| 	c.Statefulset = nil | 	c.Statefulset = nil | ||||||
| 	// wait until the statefulset is truly deleted
 | 	// wait until the statefulset is truly deleted
 | ||||||
| 	c.logger.Debugf("Waiting for the statefulset to be deleted") | 	c.logger.Debugf("waiting for the statefulset to be deleted") | ||||||
| 
 | 
 | ||||||
| 	err := retryutil.Retry(constants.StatefulsetDeletionInterval, constants.StatefulsetDeletionTimeout, | 	err := retryutil.Retry(constants.StatefulsetDeletionInterval, constants.StatefulsetDeletionTimeout, | ||||||
| 		func() (bool, error) { | 		func() (bool, error) { | ||||||
|  | @ -199,7 +199,7 @@ func (c *Cluster) replaceStatefulSet(newStatefulSet *v1beta1.StatefulSet) error | ||||||
| 	// check that all the previous replicas were picked up.
 | 	// check that all the previous replicas were picked up.
 | ||||||
| 	if newStatefulSet.Spec.Replicas == oldStatefulset.Spec.Replicas && | 	if newStatefulSet.Spec.Replicas == oldStatefulset.Spec.Replicas && | ||||||
| 		createdStatefulset.Status.Replicas != oldStatefulset.Status.Replicas { | 		createdStatefulset.Status.Replicas != oldStatefulset.Status.Replicas { | ||||||
| 		c.logger.Warnf("Number of pods for the old and updated Statefulsets is not identical") | 		c.logger.Warnf("number of pods for the old and updated Statefulsets is not identical") | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	c.Statefulset = createdStatefulset | 	c.Statefulset = createdStatefulset | ||||||
|  | @ -207,7 +207,7 @@ func (c *Cluster) replaceStatefulSet(newStatefulSet *v1beta1.StatefulSet) error | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (c *Cluster) deleteStatefulSet() error { | func (c *Cluster) deleteStatefulSet() error { | ||||||
| 	c.logger.Debugln("Deleting statefulset") | 	c.logger.Debugln("deleting statefulset") | ||||||
| 	if c.Statefulset == nil { | 	if c.Statefulset == nil { | ||||||
| 		return fmt.Errorf("there is no statefulset in the cluster") | 		return fmt.Errorf("there is no statefulset in the cluster") | ||||||
| 	} | 	} | ||||||
|  | @ -321,9 +321,9 @@ func (c *Cluster) updateService(role postgresRole, newService *v1.Service) error | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (c *Cluster) deleteService(role postgresRole) error { | func (c *Cluster) deleteService(role postgresRole) error { | ||||||
| 	c.logger.Debugf("Deleting service %s", role) | 	c.logger.Debugf("deleting service %s", role) | ||||||
| 	if c.Service[role] == nil { | 	if c.Service[role] == nil { | ||||||
| 		return fmt.Errorf("There is no %s service in the cluster", role) | 		return fmt.Errorf("there is no %s service in the cluster", role) | ||||||
| 	} | 	} | ||||||
| 	service := c.Service[role] | 	service := c.Service[role] | ||||||
| 	err := c.KubeClient.Services(service.Namespace).Delete(service.Name, c.deleteOptions) | 	err := c.KubeClient.Services(service.Namespace).Delete(service.Name, c.deleteOptions) | ||||||
|  | @ -351,7 +351,7 @@ func (c *Cluster) createEndpoint() (*v1.Endpoints, error) { | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (c *Cluster) deleteEndpoint() error { | func (c *Cluster) deleteEndpoint() error { | ||||||
| 	c.logger.Debugln("Deleting endpoint") | 	c.logger.Debugln("deleting endpoint") | ||||||
| 	if c.Endpoint == nil { | 	if c.Endpoint == nil { | ||||||
| 		return fmt.Errorf("there is no endpoint in the cluster") | 		return fmt.Errorf("there is no endpoint in the cluster") | ||||||
| 	} | 	} | ||||||
|  | @ -396,7 +396,7 @@ func (c *Cluster) applySecrets() error { | ||||||
| 				return fmt.Errorf("could not create secret for user %q: %v", secretUsername, err) | 				return fmt.Errorf("could not create secret for user %q: %v", secretUsername, err) | ||||||
| 			} | 			} | ||||||
| 			c.Secrets[secret.UID] = secret | 			c.Secrets[secret.UID] = secret | ||||||
| 			c.logger.Debugf("Created new secret %q, uid: %q", util.NameFromMeta(secret.ObjectMeta), secret.UID) | 			c.logger.Debugf("created new secret %q, uid: %q", util.NameFromMeta(secret.ObjectMeta), secret.UID) | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
|  | @ -404,7 +404,7 @@ func (c *Cluster) applySecrets() error { | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (c *Cluster) deleteSecret(secret *v1.Secret) error { | func (c *Cluster) deleteSecret(secret *v1.Secret) error { | ||||||
| 	c.logger.Debugf("Deleting secret %q", util.NameFromMeta(secret.ObjectMeta)) | 	c.logger.Debugf("deleting secret %q", util.NameFromMeta(secret.ObjectMeta)) | ||||||
| 	err := c.KubeClient.Secrets(secret.Namespace).Delete(secret.Name, c.deleteOptions) | 	err := c.KubeClient.Secrets(secret.Namespace).Delete(secret.Name, c.deleteOptions) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return err | 		return err | ||||||
|  |  | ||||||
|  | @ -24,7 +24,7 @@ func (c *Cluster) Sync() error { | ||||||
| 		return err | 		return err | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	c.logger.Debugf("Syncing secrets") | 	c.logger.Debugf("syncing secrets") | ||||||
| 
 | 
 | ||||||
| 	//TODO: mind the secrets of the deleted/new users
 | 	//TODO: mind the secrets of the deleted/new users
 | ||||||
| 	if err := c.applySecrets(); err != nil { | 	if err := c.applySecrets(); err != nil { | ||||||
|  | @ -33,14 +33,14 @@ func (c *Cluster) Sync() error { | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	c.logger.Debugf("Syncing endpoints") | 	c.logger.Debugf("syncing endpoints") | ||||||
| 	if err := c.syncEndpoint(); err != nil { | 	if err := c.syncEndpoint(); err != nil { | ||||||
| 		if !k8sutil.ResourceAlreadyExists(err) { | 		if !k8sutil.ResourceAlreadyExists(err) { | ||||||
| 			return fmt.Errorf("could not sync endpoints: %v", err) | 			return fmt.Errorf("could not sync endpoints: %v", err) | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	c.logger.Debugf("Syncing services") | 	c.logger.Debugf("syncing services") | ||||||
| 	for _, role := range []postgresRole{master, replica} { | 	for _, role := range []postgresRole{master, replica} { | ||||||
| 		if role == replica && !c.Spec.ReplicaLoadBalancer { | 		if role == replica && !c.Spec.ReplicaLoadBalancer { | ||||||
| 			if c.Service[role] != nil { | 			if c.Service[role] != nil { | ||||||
|  | @ -58,7 +58,7 @@ func (c *Cluster) Sync() error { | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	c.logger.Debugf("Syncing statefulsets") | 	c.logger.Debugf("syncing statefulsets") | ||||||
| 	if err := c.syncStatefulSet(); err != nil { | 	if err := c.syncStatefulSet(); err != nil { | ||||||
| 		if !k8sutil.ResourceAlreadyExists(err) { | 		if !k8sutil.ResourceAlreadyExists(err) { | ||||||
| 			return fmt.Errorf("could not sync statefulsets: %v", err) | 			return fmt.Errorf("could not sync statefulsets: %v", err) | ||||||
|  | @ -66,13 +66,13 @@ func (c *Cluster) Sync() error { | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	if !c.databaseAccessDisabled() { | 	if !c.databaseAccessDisabled() { | ||||||
| 		c.logger.Debugf("Syncing roles") | 		c.logger.Debugf("syncing roles") | ||||||
| 		if err := c.syncRoles(true); err != nil { | 		if err := c.syncRoles(true); err != nil { | ||||||
| 			return fmt.Errorf("could not sync roles: %v", err) | 			return fmt.Errorf("could not sync roles: %v", err) | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	c.logger.Debugf("Syncing persistent volumes") | 	c.logger.Debugf("syncing persistent volumes") | ||||||
| 	if err := c.syncVolumes(); err != nil { | 	if err := c.syncVolumes(); err != nil { | ||||||
| 		return fmt.Errorf("could not sync persistent volumes: %v", err) | 		return fmt.Errorf("could not sync persistent volumes: %v", err) | ||||||
| 	} | 	} | ||||||
|  | @ -88,7 +88,7 @@ func (c *Cluster) syncService(role postgresRole) error { | ||||||
| 		if err != nil { | 		if err != nil { | ||||||
| 			return fmt.Errorf("could not create missing %s service: %v", role, err) | 			return fmt.Errorf("could not create missing %s service: %v", role, err) | ||||||
| 		} | 		} | ||||||
| 		c.logger.Infof("Created missing %s service %q", role, util.NameFromMeta(svc.ObjectMeta)) | 		c.logger.Infof("created missing %s service %q", role, util.NameFromMeta(svc.ObjectMeta)) | ||||||
| 
 | 
 | ||||||
| 		return nil | 		return nil | ||||||
| 	} | 	} | ||||||
|  | @ -115,7 +115,7 @@ func (c *Cluster) syncEndpoint() error { | ||||||
| 		if err != nil { | 		if err != nil { | ||||||
| 			return fmt.Errorf("could not create missing endpoint: %v", err) | 			return fmt.Errorf("could not create missing endpoint: %v", err) | ||||||
| 		} | 		} | ||||||
| 		c.logger.Infof("Created missing endpoint %q", util.NameFromMeta(ep.ObjectMeta)) | 		c.logger.Infof("created missing endpoint %q", util.NameFromMeta(ep.ObjectMeta)) | ||||||
| 		return nil | 		return nil | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
|  | @ -133,7 +133,7 @@ func (c *Cluster) syncStatefulSet() error { | ||||||
| 		} | 		} | ||||||
| 
 | 
 | ||||||
| 		if len(pods) > 0 { | 		if len(pods) > 0 { | ||||||
| 			c.logger.Infof("Found pods without the statefulset: trigger rolling update") | 			c.logger.Infof("found pods without the statefulset: trigger rolling update") | ||||||
| 			rollUpdate = true | 			rollUpdate = true | ||||||
| 		} | 		} | ||||||
| 		ss, err := c.createStatefulSet() | 		ss, err := c.createStatefulSet() | ||||||
|  | @ -144,7 +144,7 @@ func (c *Cluster) syncStatefulSet() error { | ||||||
| 		if err != nil { | 		if err != nil { | ||||||
| 			return fmt.Errorf("cluster is not ready: %v", err) | 			return fmt.Errorf("cluster is not ready: %v", err) | ||||||
| 		} | 		} | ||||||
| 		c.logger.Infof("Created missing statefulset %q", util.NameFromMeta(ss.ObjectMeta)) | 		c.logger.Infof("created missing statefulset %q", util.NameFromMeta(ss.ObjectMeta)) | ||||||
| 		if !rollUpdate { | 		if !rollUpdate { | ||||||
| 			return nil | 			return nil | ||||||
| 		} | 		} | ||||||
|  | @ -173,11 +173,11 @@ func (c *Cluster) syncStatefulSet() error { | ||||||
| 		} | 		} | ||||||
| 
 | 
 | ||||||
| 		if !cmp.rollingUpdate { | 		if !cmp.rollingUpdate { | ||||||
| 			c.logger.Debugln("No rolling update is needed") | 			c.logger.Debugln("no rolling update is needed") | ||||||
| 			return nil | 			return nil | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
| 	c.logger.Debugln("Performing rolling update") | 	c.logger.Debugln("performing rolling update") | ||||||
| 	if err := c.recreatePods(); err != nil { | 	if err := c.recreatePods(); err != nil { | ||||||
| 		return fmt.Errorf("could not recreate pods: %v", err) | 		return fmt.Errorf("could not recreate pods: %v", err) | ||||||
| 	} | 	} | ||||||
|  | @ -226,7 +226,7 @@ func (c *Cluster) syncVolumes() error { | ||||||
| 		return nil | 		return nil | ||||||
| 	} | 	} | ||||||
| 	if err := c.resizeVolumes(c.Spec.Volume, []volumes.VolumeResizer{&volumes.EBSVolumeResizer{}}); err != nil { | 	if err := c.resizeVolumes(c.Spec.Volume, []volumes.VolumeResizer{&volumes.EBSVolumeResizer{}}); err != nil { | ||||||
| 		return fmt.Errorf("Could not sync volumes: %v", err) | 		return fmt.Errorf("could not sync volumes: %v", err) | ||||||
| 	} | 	} | ||||||
| 	c.logger.Infof("volumes have been synced successfully") | 	c.logger.Infof("volumes have been synced successfully") | ||||||
| 	return nil | 	return nil | ||||||
|  |  | ||||||
|  | @ -89,7 +89,7 @@ func (c *Cluster) logStatefulSetChanges(old, new *v1beta1.StatefulSet, isUpdate | ||||||
| 
 | 
 | ||||||
| 	if len(reasons) > 0 { | 	if len(reasons) > 0 { | ||||||
| 		for _, reason := range reasons { | 		for _, reason := range reasons { | ||||||
| 			c.logger.Infof("Reason: %q", reason) | 			c.logger.Infof("reason: %q", reason) | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  | @ -107,15 +107,15 @@ func (c *Cluster) logServiceChanges(role postgresRole, old, new *v1.Service, isU | ||||||
| 	c.logger.Debugf("diff\n%s\n", util.PrettyDiff(old.Spec, new.Spec)) | 	c.logger.Debugf("diff\n%s\n", util.PrettyDiff(old.Spec, new.Spec)) | ||||||
| 
 | 
 | ||||||
| 	if reason != "" { | 	if reason != "" { | ||||||
| 		c.logger.Infof("Reason: %s", reason) | 		c.logger.Infof("reason: %s", reason) | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (c *Cluster) logVolumeChanges(old, new spec.Volume, reason string) { | func (c *Cluster) logVolumeChanges(old, new spec.Volume, reason string) { | ||||||
| 	c.logger.Infof("Volume specification has been changed") | 	c.logger.Infof("volume specification has been changed") | ||||||
| 	c.logger.Debugf("diff\n%s\n", util.PrettyDiff(old, new)) | 	c.logger.Debugf("diff\n%s\n", util.PrettyDiff(old, new)) | ||||||
| 	if reason != "" { | 	if reason != "" { | ||||||
| 		c.logger.Infof("Reason: %s", reason) | 		c.logger.Infof("reason: %s", reason) | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | @ -127,7 +127,7 @@ func (c *Cluster) getOAuthToken() (string, error) { | ||||||
| 		Get(c.OpConfig.OAuthTokenSecretName.Name, metav1.GetOptions{}) | 		Get(c.OpConfig.OAuthTokenSecretName.Name, metav1.GetOptions{}) | ||||||
| 
 | 
 | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		c.logger.Debugf("Oauth token secret name: %q", c.OpConfig.OAuthTokenSecretName) | 		c.logger.Debugf("oauth token secret name: %q", c.OpConfig.OAuthTokenSecretName) | ||||||
| 		return "", fmt.Errorf("could not get credentials secret: %v", err) | 		return "", fmt.Errorf("could not get credentials secret: %v", err) | ||||||
| 	} | 	} | ||||||
| 	data := credentialsSecret.Data | 	data := credentialsSecret.Data | ||||||
|  | @ -144,7 +144,7 @@ func (c *Cluster) getTeamMembers() ([]string, error) { | ||||||
| 		return nil, fmt.Errorf("no teamId specified") | 		return nil, fmt.Errorf("no teamId specified") | ||||||
| 	} | 	} | ||||||
| 	if !c.OpConfig.EnableTeamsAPI { | 	if !c.OpConfig.EnableTeamsAPI { | ||||||
| 		c.logger.Debug("Team API is disabled, returning empty list of members") | 		c.logger.Debug("team API is disabled, returning empty list of members") | ||||||
| 		return []string{}, nil | 		return []string{}, nil | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
|  |  | ||||||
|  | @ -30,13 +30,13 @@ func (c *Cluster) listPersistentVolumeClaims() ([]v1.PersistentVolumeClaim, erro | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (c *Cluster) deletePersistenVolumeClaims() error { | func (c *Cluster) deletePersistenVolumeClaims() error { | ||||||
| 	c.logger.Debugln("Deleting PVCs") | 	c.logger.Debugln("deleting PVCs") | ||||||
| 	pvcs, err := c.listPersistentVolumeClaims() | 	pvcs, err := c.listPersistentVolumeClaims() | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return err | 		return err | ||||||
| 	} | 	} | ||||||
| 	for _, pvc := range pvcs { | 	for _, pvc := range pvcs { | ||||||
| 		c.logger.Debugf("Deleting PVC %q", util.NameFromMeta(pvc.ObjectMeta)) | 		c.logger.Debugf("deleting PVC %q", util.NameFromMeta(pvc.ObjectMeta)) | ||||||
| 		if err := c.KubeClient.PersistentVolumeClaims(pvc.Namespace).Delete(pvc.Name, c.deleteOptions); err != nil { | 		if err := c.KubeClient.PersistentVolumeClaims(pvc.Namespace).Delete(pvc.Name, c.deleteOptions); err != nil { | ||||||
| 			c.logger.Warningf("could not delete PersistentVolumeClaim: %v", err) | 			c.logger.Warningf("could not delete PersistentVolumeClaim: %v", err) | ||||||
| 		} | 		} | ||||||
|  | @ -44,7 +44,7 @@ func (c *Cluster) deletePersistenVolumeClaims() error { | ||||||
| 	if len(pvcs) > 0 { | 	if len(pvcs) > 0 { | ||||||
| 		c.logger.Debugln("PVCs have been deleted") | 		c.logger.Debugln("PVCs have been deleted") | ||||||
| 	} else { | 	} else { | ||||||
| 		c.logger.Debugln("No PVCs to delete") | 		c.logger.Debugln("no PVCs to delete") | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	return nil | 	return nil | ||||||
|  | @ -66,7 +66,7 @@ func (c *Cluster) listPersistentVolumes() ([]*v1.PersistentVolume, error) { | ||||||
| 				return nil, fmt.Errorf("could not convert last part of the persistent volume claim name %q to a number", pvc.Name) | 				return nil, fmt.Errorf("could not convert last part of the persistent volume claim name %q to a number", pvc.Name) | ||||||
| 			} | 			} | ||||||
| 			if int32(pvcNumber) > lastPodIndex { | 			if int32(pvcNumber) > lastPodIndex { | ||||||
| 				c.logger.Debugf("Skipping persistent volume %q corresponding to a non-running pods", pvc.Name) | 				c.logger.Debugf("skipping persistent volume %q corresponding to a non-running pods", pvc.Name) | ||||||
| 				continue | 				continue | ||||||
| 			} | 			} | ||||||
| 		} | 		} | ||||||
|  |  | ||||||
|  | @ -88,7 +88,7 @@ func (c *Controller) initOperatorConfig() { | ||||||
| 
 | 
 | ||||||
| 		configMapData = configMap.Data | 		configMapData = configMap.Data | ||||||
| 	} else { | 	} else { | ||||||
| 		c.logger.Infoln("No ConfigMap specified. Loading default values") | 		c.logger.Infoln("no ConfigMap specified. Loading default values") | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	if configMapData["namespace"] == "" { // Namespace in ConfigMap has priority over env var
 | 	if configMapData["namespace"] == "" { // Namespace in ConfigMap has priority over env var
 | ||||||
|  | @ -108,7 +108,7 @@ func (c *Controller) initController() { | ||||||
| 	c.initClients() | 	c.initClients() | ||||||
| 	c.initOperatorConfig() | 	c.initOperatorConfig() | ||||||
| 
 | 
 | ||||||
| 	c.logger.Infof("Config: %s", c.opConfig.MustMarshal()) | 	c.logger.Infof("config: %s", c.opConfig.MustMarshal()) | ||||||
| 
 | 
 | ||||||
| 	if c.opConfig.DebugLogging { | 	if c.opConfig.DebugLogging { | ||||||
| 		c.logger.Logger.Level = logrus.DebugLevel | 		c.logger.Logger.Level = logrus.DebugLevel | ||||||
|  | @ -185,7 +185,7 @@ func (c *Controller) Run(stopCh <-chan struct{}, wg *sync.WaitGroup) { | ||||||
| 		go c.processClusterEventsQueue(i, stopCh, wg) | 		go c.processClusterEventsQueue(i, stopCh, wg) | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	c.logger.Info("Started working in background") | 	c.logger.Info("started working in background") | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (c *Controller) runPodInformer(stopCh <-chan struct{}, wg *sync.WaitGroup) { | func (c *Controller) runPodInformer(stopCh <-chan struct{}, wg *sync.WaitGroup) { | ||||||
|  |  | ||||||
|  | @ -45,7 +45,10 @@ func (c *Controller) dispatchPodEvent(clusterName spec.NamespacedName, event spe | ||||||
| 	cluster, ok := c.clusters[clusterName] | 	cluster, ok := c.clusters[clusterName] | ||||||
| 	c.clustersMu.RUnlock() | 	c.clustersMu.RUnlock() | ||||||
| 	if ok { | 	if ok { | ||||||
| 		c.logger.Debugf("Sending %q event of pod %q to the %q cluster channel", event.EventType, event.PodName, clusterName) | 		c.logger.WithField("cluster-name", clusterName). | ||||||
|  | 			Debugf("sending %q event of pod %q to the cluster channel", | ||||||
|  | 				event.EventType, | ||||||
|  | 				event.PodName) | ||||||
| 		cluster.ReceivePodEvent(event) | 		cluster.ReceivePodEvent(event) | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
|  | @ -68,14 +68,14 @@ func (c *Controller) clusterListFunc(options metav1.ListOptions) (runtime.Object | ||||||
| 	} | 	} | ||||||
| 	if len(list.Items) > 0 { | 	if len(list.Items) > 0 { | ||||||
| 		if failedClustersCnt > 0 && activeClustersCnt == 0 { | 		if failedClustersCnt > 0 && activeClustersCnt == 0 { | ||||||
| 			c.logger.Infof("There are no clusters running. %d are in the failed state", failedClustersCnt) | 			c.logger.Infof("there are no clusters running. %d are in the failed state", failedClustersCnt) | ||||||
| 		} else if failedClustersCnt == 0 && activeClustersCnt > 0 { | 		} else if failedClustersCnt == 0 && activeClustersCnt > 0 { | ||||||
| 			c.logger.Infof("There are %d clusters running", activeClustersCnt) | 			c.logger.Infof("there are %d clusters running", activeClustersCnt) | ||||||
| 		} else { | 		} else { | ||||||
| 			c.logger.Infof("There are %d clusters running and %d are in the failed state", activeClustersCnt, failedClustersCnt) | 			c.logger.Infof("there are %d clusters running and %d are in the failed state", activeClustersCnt, failedClustersCnt) | ||||||
| 		} | 		} | ||||||
| 	} else { | 	} else { | ||||||
| 		c.logger.Infof("No clusters running") | 		c.logger.Infof("no clusters running") | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	atomic.StoreInt64(&c.lastClusterSyncTime, time.Now().Unix()) | 	atomic.StoreInt64(&c.lastClusterSyncTime, time.Now().Unix()) | ||||||
|  | @ -131,13 +131,14 @@ func (c *Controller) processEvent(obj interface{}) error { | ||||||
| 	if !ok { | 	if !ok { | ||||||
| 		return fmt.Errorf("could not cast to ClusterEvent") | 		return fmt.Errorf("could not cast to ClusterEvent") | ||||||
| 	} | 	} | ||||||
| 	logger := c.logger.WithField("worker", event.WorkerID) | 	lg := c.logger.WithField("worker", event.WorkerID) | ||||||
| 
 | 
 | ||||||
| 	if event.EventType == spec.EventAdd || event.EventType == spec.EventSync { | 	if event.EventType == spec.EventAdd || event.EventType == spec.EventSync { | ||||||
| 		clusterName = util.NameFromMeta(event.NewSpec.ObjectMeta) | 		clusterName = util.NameFromMeta(event.NewSpec.ObjectMeta) | ||||||
| 	} else { | 	} else { | ||||||
| 		clusterName = util.NameFromMeta(event.OldSpec.ObjectMeta) | 		clusterName = util.NameFromMeta(event.OldSpec.ObjectMeta) | ||||||
| 	} | 	} | ||||||
|  | 	lg = lg.WithField("cluster-name", clusterName) | ||||||
| 
 | 
 | ||||||
| 	c.clustersMu.RLock() | 	c.clustersMu.RLock() | ||||||
| 	cl, clusterFound := c.clusters[clusterName] | 	cl, clusterFound := c.clusters[clusterName] | ||||||
|  | @ -146,14 +147,14 @@ func (c *Controller) processEvent(obj interface{}) error { | ||||||
| 	switch event.EventType { | 	switch event.EventType { | ||||||
| 	case spec.EventAdd: | 	case spec.EventAdd: | ||||||
| 		if clusterFound { | 		if clusterFound { | ||||||
| 			logger.Debugf("Cluster %q already exists", clusterName) | 			lg.Debugf("cluster already exists") | ||||||
| 			return nil | 			return nil | ||||||
| 		} | 		} | ||||||
| 
 | 
 | ||||||
| 		logger.Infof("Creation of the %q cluster started", clusterName) | 		lg.Infof("creation of the cluster started") | ||||||
| 
 | 
 | ||||||
| 		stopCh := make(chan struct{}) | 		stopCh := make(chan struct{}) | ||||||
| 		cl = cluster.New(c.makeClusterConfig(), c.KubeClient, *event.NewSpec, logger) | 		cl = cluster.New(c.makeClusterConfig(), c.KubeClient, *event.NewSpec, lg) | ||||||
| 		cl.Run(stopCh) | 		cl.Run(stopCh) | ||||||
| 
 | 
 | ||||||
| 		c.clustersMu.Lock() | 		c.clustersMu.Lock() | ||||||
|  | @ -163,36 +164,36 @@ func (c *Controller) processEvent(obj interface{}) error { | ||||||
| 
 | 
 | ||||||
| 		if err := cl.Create(); err != nil { | 		if err := cl.Create(); err != nil { | ||||||
| 			cl.Error = fmt.Errorf("could not create cluster: %v", err) | 			cl.Error = fmt.Errorf("could not create cluster: %v", err) | ||||||
| 			logger.Errorf("%v", cl.Error) | 			lg.Error(cl.Error) | ||||||
| 
 | 
 | ||||||
| 			return nil | 			return nil | ||||||
| 		} | 		} | ||||||
| 
 | 
 | ||||||
| 		logger.Infof("Cluster %q has been created", clusterName) | 		lg.Infoln("cluster has been created") | ||||||
| 	case spec.EventUpdate: | 	case spec.EventUpdate: | ||||||
| 		logger.Infof("Update of the %q cluster started", clusterName) | 		lg.Infoln("update of the cluster started") | ||||||
| 
 | 
 | ||||||
| 		if !clusterFound { | 		if !clusterFound { | ||||||
| 			logger.Warnf("Cluster %q does not exist", clusterName) | 			lg.Warnln("cluster does not exist") | ||||||
| 			return nil | 			return nil | ||||||
| 		} | 		} | ||||||
| 		if err := cl.Update(event.NewSpec); err != nil { | 		if err := cl.Update(event.NewSpec); err != nil { | ||||||
| 			cl.Error = fmt.Errorf("could not update cluster: %v", err) | 			cl.Error = fmt.Errorf("could not update cluster: %v", err) | ||||||
| 			logger.Errorf("%v", cl.Error) | 			lg.Error(cl.Error) | ||||||
| 
 | 
 | ||||||
| 			return nil | 			return nil | ||||||
| 		} | 		} | ||||||
| 		cl.Error = nil | 		cl.Error = nil | ||||||
| 		logger.Infof("Cluster %q has been updated", clusterName) | 		lg.Infoln("cluster has been updated") | ||||||
| 	case spec.EventDelete: | 	case spec.EventDelete: | ||||||
| 		logger.Infof("Deletion of the %q cluster started", clusterName) | 		lg.Infof("deletion of the %q cluster started", clusterName) | ||||||
| 		if !clusterFound { | 		if !clusterFound { | ||||||
| 			logger.Errorf("Unknown cluster: %q", clusterName) | 			lg.Errorf("unknown cluster: %q", clusterName) | ||||||
| 			return nil | 			return nil | ||||||
| 		} | 		} | ||||||
| 
 | 
 | ||||||
| 		if err := cl.Delete(); err != nil { | 		if err := cl.Delete(); err != nil { | ||||||
| 			logger.Errorf("could not delete cluster %q: %v", clusterName, err) | 			lg.Errorf("could not delete cluster: %v", err) | ||||||
| 			return nil | 			return nil | ||||||
| 		} | 		} | ||||||
| 		close(c.stopChs[clusterName]) | 		close(c.stopChs[clusterName]) | ||||||
|  | @ -202,14 +203,14 @@ func (c *Controller) processEvent(obj interface{}) error { | ||||||
| 		delete(c.stopChs, clusterName) | 		delete(c.stopChs, clusterName) | ||||||
| 		c.clustersMu.Unlock() | 		c.clustersMu.Unlock() | ||||||
| 
 | 
 | ||||||
| 		logger.Infof("Cluster %q has been deleted", clusterName) | 		lg.Infof("cluster has been deleted") | ||||||
| 	case spec.EventSync: | 	case spec.EventSync: | ||||||
| 		logger.Infof("Syncing of the %q cluster started", clusterName) | 		lg.Infof("syncing of the cluster started") | ||||||
| 
 | 
 | ||||||
| 		// no race condition because a cluster is always processed by single worker
 | 		// no race condition because a cluster is always processed by single worker
 | ||||||
| 		if !clusterFound { | 		if !clusterFound { | ||||||
| 			stopCh := make(chan struct{}) | 			stopCh := make(chan struct{}) | ||||||
| 			cl = cluster.New(c.makeClusterConfig(), c.KubeClient, *event.NewSpec, logger) | 			cl = cluster.New(c.makeClusterConfig(), c.KubeClient, *event.NewSpec, lg) | ||||||
| 			cl.Run(stopCh) | 			cl.Run(stopCh) | ||||||
| 
 | 
 | ||||||
| 			c.clustersMu.Lock() | 			c.clustersMu.Lock() | ||||||
|  | @ -220,12 +221,12 @@ func (c *Controller) processEvent(obj interface{}) error { | ||||||
| 
 | 
 | ||||||
| 		if err := cl.Sync(); err != nil { | 		if err := cl.Sync(); err != nil { | ||||||
| 			cl.Error = fmt.Errorf("could not sync cluster %q: %v", clusterName, err) | 			cl.Error = fmt.Errorf("could not sync cluster %q: %v", clusterName, err) | ||||||
| 			logger.Errorf("%v", cl.Error) | 			lg.Error(cl.Error) | ||||||
| 			return nil | 			return nil | ||||||
| 		} | 		} | ||||||
| 		cl.Error = nil | 		cl.Error = nil | ||||||
| 
 | 
 | ||||||
| 		logger.Infof("Cluster %q has been synced", clusterName) | 		lg.Infof("cluster has been synced") | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	return nil | 	return nil | ||||||
|  | @ -273,7 +274,9 @@ func (c *Controller) queueClusterEvent(old, new *spec.Postgresql, eventType spec | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	if clusterError != nil && eventType != spec.EventDelete { | 	if clusterError != nil && eventType != spec.EventDelete { | ||||||
| 		c.logger.Debugf("Skipping %q event for invalid cluster %q (reason: %v)", eventType, clusterName, clusterError) | 		c.logger. | ||||||
|  | 			WithField("cluster-name", clusterName). | ||||||
|  | 			Debugf("skipping %q event for the invalid cluster: %v", eventType, clusterError) | ||||||
| 		return | 		return | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
|  | @ -287,10 +290,11 @@ func (c *Controller) queueClusterEvent(old, new *spec.Postgresql, eventType spec | ||||||
| 	} | 	} | ||||||
| 	//TODO: if we delete cluster, discard all the previous events for the cluster
 | 	//TODO: if we delete cluster, discard all the previous events for the cluster
 | ||||||
| 
 | 
 | ||||||
|  | 	lg := c.logger.WithField("worker", workerID).WithField("cluster-name", clusterName) | ||||||
| 	if err := c.clusterEventQueues[workerID].Add(clusterEvent); err != nil { | 	if err := c.clusterEventQueues[workerID].Add(clusterEvent); err != nil { | ||||||
| 		c.logger.WithField("worker", workerID).Errorf("error when queueing cluster event: %v", clusterEvent) | 		lg.Errorf("error when queueing cluster event: %v", clusterEvent) | ||||||
| 	} | 	} | ||||||
| 	c.logger.WithField("worker", workerID).Infof("%q of the %q cluster has been queued", eventType, clusterName) | 	lg.Infof("%q event has been queued", eventType) | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (c *Controller) postgresqlAdd(obj interface{}) { | func (c *Controller) postgresqlAdd(obj interface{}) { | ||||||
|  |  | ||||||
|  | @ -53,9 +53,9 @@ func (c *Controller) createTPR() error { | ||||||
| 		if !k8sutil.ResourceAlreadyExists(err) { | 		if !k8sutil.ResourceAlreadyExists(err) { | ||||||
| 			return err | 			return err | ||||||
| 		} | 		} | ||||||
| 		c.logger.Infof("ThirdPartyResource %q is already registered", constants.TPRName) | 		c.logger.Infof("thirdPartyResource %q is already registered", constants.TPRName) | ||||||
| 	} else { | 	} else { | ||||||
| 		c.logger.Infof("ThirdPartyResource %q' has been registered", constants.TPRName) | 		c.logger.Infof("thirdPartyResource %q' has been registered", constants.TPRName) | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	return k8sutil.WaitTPRReady(c.RestClient, c.opConfig.TPR.ReadyWaitInterval, c.opConfig.TPR.ReadyWaitTimeout, c.opConfig.Namespace) | 	return k8sutil.WaitTPRReady(c.RestClient, c.opConfig.TPR.ReadyWaitInterval, c.opConfig.TPR.ReadyWaitTimeout, c.opConfig.Namespace) | ||||||
|  | @ -71,7 +71,7 @@ func (c *Controller) getInfrastructureRoles(rolesSecret *spec.NamespacedName) (r | ||||||
| 		Secrets(rolesSecret.Namespace). | 		Secrets(rolesSecret.Namespace). | ||||||
| 		Get(rolesSecret.Name, metav1.GetOptions{}) | 		Get(rolesSecret.Name, metav1.GetOptions{}) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		c.logger.Debugf("Infrastructure roles secret name: %q", *rolesSecret) | 		c.logger.Debugf("infrastructure roles secret name: %q", *rolesSecret) | ||||||
| 		return nil, fmt.Errorf("could not get infrastructure roles secret: %v", err) | 		return nil, fmt.Errorf("could not get infrastructure roles secret: %v", err) | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
|  | @ -99,7 +99,7 @@ Users: | ||||||
| 				case "inrole": | 				case "inrole": | ||||||
| 					t.MemberOf = append(t.MemberOf, s) | 					t.MemberOf = append(t.MemberOf, s) | ||||||
| 				default: | 				default: | ||||||
| 					c.logger.Warnf("Unknown key %q", p) | 					c.logger.Warnf("unknown key %q", p) | ||||||
| 				} | 				} | ||||||
| 			} | 			} | ||||||
| 		} | 		} | ||||||
|  |  | ||||||
|  | @ -93,7 +93,7 @@ func (n *NamespacedName) Decode(value string) error { | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	if name.Name == "" { | 	if name.Name == "" { | ||||||
| 		return fmt.Errorf("Incorrect namespaced name") | 		return fmt.Errorf("incorrect namespaced name") | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	*n = NamespacedName(name) | 	*n = NamespacedName(name) | ||||||
|  |  | ||||||
|  | @ -22,10 +22,10 @@ func TestNamespacedNameDecode(t *testing.T) { | ||||||
| 		var actual NamespacedName | 		var actual NamespacedName | ||||||
| 		err := actual.Decode(tt.s) | 		err := actual.Decode(tt.s) | ||||||
| 		if err != nil { | 		if err != nil { | ||||||
| 			t.Errorf("Decode error: %v", err) | 			t.Errorf("decode error: %v", err) | ||||||
| 		} | 		} | ||||||
| 		if actual != tt.expected { | 		if actual != tt.expected { | ||||||
| 			t.Errorf("Expected: %v, got %#v", tt.expected, actual) | 			t.Errorf("expected: %v, got %#v", tt.expected, actual) | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  | @ -36,10 +36,10 @@ func TestNamespacedNameMarshal(t *testing.T) { | ||||||
| 
 | 
 | ||||||
| 		m, err := actual.MarshalJSON() | 		m, err := actual.MarshalJSON() | ||||||
| 		if err != nil { | 		if err != nil { | ||||||
| 			t.Errorf("Marshal error: %v", err) | 			t.Errorf("marshal error: %v", err) | ||||||
| 		} | 		} | ||||||
| 		if bytes.Equal(m, tt.expectedMarshal) { | 		if bytes.Equal(m, tt.expectedMarshal) { | ||||||
| 			t.Errorf("Expected marshal: %v, got %#v", tt.expected, actual) | 			t.Errorf("expected marshal: %v, got %#v", tt.expected, actual) | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  | @ -49,7 +49,7 @@ func TestNamespacedNameError(t *testing.T) { | ||||||
| 		var actual NamespacedName | 		var actual NamespacedName | ||||||
| 		err := actual.Decode(tt) | 		err := actual.Decode(tt) | ||||||
| 		if err == nil { | 		if err == nil { | ||||||
| 			t.Errorf("Error expected for %q, got: %#v", tt, actual) | 			t.Errorf("error expected for %q, got: %#v", tt, actual) | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
|  | @ -51,7 +51,7 @@ type API struct { | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| // NewTeamsAPI creates an object to query the team API.
 | // NewTeamsAPI creates an object to query the team API.
 | ||||||
| func NewTeamsAPI(url string, log *logrus.Logger) *API { | func NewTeamsAPI(url string, log *logrus.Entry) *API { | ||||||
| 	t := API{ | 	t := API{ | ||||||
| 		url:        strings.TrimRight(url, "/"), | 		url:        strings.TrimRight(url, "/"), | ||||||
| 		httpClient: &http.Client{}, | 		httpClient: &http.Client{}, | ||||||
|  | @ -69,7 +69,7 @@ func (t *API) TeamInfo(teamID, token string) (tm *Team, err error) { | ||||||
| 	) | 	) | ||||||
| 
 | 
 | ||||||
| 	url := fmt.Sprintf("%s/teams/%s", t.url, teamID) | 	url := fmt.Sprintf("%s/teams/%s", t.url, teamID) | ||||||
| 	t.logger.Debugf("Request url: %s", url) | 	t.logger.Debugf("request url: %s", url) | ||||||
| 	req, err = http.NewRequest("GET", url, nil) | 	req, err = http.NewRequest("GET", url, nil) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return | 		return | ||||||
|  |  | ||||||
|  | @ -11,7 +11,7 @@ import ( | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
| var ( | var ( | ||||||
| 	logger = logrus.New() | 	logger = logrus.New().WithField("pkg", "teamsapi") | ||||||
| 	token  = "ec45b1cfbe7100c6315d183a3eb6cec0M2U1LWJkMzEtZDgzNzNmZGQyNGM3IiwiYXV0aF90aW1lIjoxNDkzNzMwNzQ1LCJpc3MiOiJodHRwcz" | 	token  = "ec45b1cfbe7100c6315d183a3eb6cec0M2U1LWJkMzEtZDgzNzNmZGQyNGM3IiwiYXV0aF90aW1lIjoxNDkzNzMwNzQ1LCJpc3MiOiJodHRwcz" | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
|  | @ -146,11 +146,11 @@ func TestInfo(t *testing.T) { | ||||||
| 		func() { | 		func() { | ||||||
| 			ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { | 			ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { | ||||||
| 				if r.Header.Get("Authorization") != "Bearer "+token { | 				if r.Header.Get("Authorization") != "Bearer "+token { | ||||||
| 					t.Errorf("Authorization token is wrong or not provided") | 					t.Errorf("authorization token is wrong or not provided") | ||||||
| 				} | 				} | ||||||
| 				w.WriteHeader(tc.inCode) | 				w.WriteHeader(tc.inCode) | ||||||
| 				if _, err := fmt.Fprint(w, tc.in); err != nil { | 				if _, err := fmt.Fprint(w, tc.in); err != nil { | ||||||
| 					t.Errorf("Error writing teams api response %v", err) | 					t.Errorf("error writing teams api response %v", err) | ||||||
| 				} | 				} | ||||||
| 			})) | 			})) | ||||||
| 			defer ts.Close() | 			defer ts.Close() | ||||||
|  | @ -158,12 +158,12 @@ func TestInfo(t *testing.T) { | ||||||
| 
 | 
 | ||||||
| 			actual, err := api.TeamInfo("acid", token) | 			actual, err := api.TeamInfo("acid", token) | ||||||
| 			if err != nil && err.Error() != tc.err.Error() { | 			if err != nil && err.Error() != tc.err.Error() { | ||||||
| 				t.Errorf("Expected error: %v, got: %v", tc.err, err) | 				t.Errorf("expected error: %v, got: %v", tc.err, err) | ||||||
| 				return | 				return | ||||||
| 			} | 			} | ||||||
| 
 | 
 | ||||||
| 			if !reflect.DeepEqual(actual, tc.out) { | 			if !reflect.DeepEqual(actual, tc.out) { | ||||||
| 				t.Errorf("Expected %#v, got: %#v", tc.out, actual) | 				t.Errorf("expected %#v, got: %#v", tc.out, actual) | ||||||
| 			} | 			} | ||||||
| 		}() | 		}() | ||||||
| 	} | 	} | ||||||
|  | @ -205,7 +205,7 @@ func TestHttpClientClose(t *testing.T) { | ||||||
| 	_, err := api.TeamInfo("acid", token) | 	_, err := api.TeamInfo("acid", token) | ||||||
| 	expError := fmt.Errorf("error when closing response: close error") | 	expError := fmt.Errorf("error when closing response: close error") | ||||||
| 	if err.Error() != expError.Error() { | 	if err.Error() != expError.Error() { | ||||||
| 		t.Errorf("Expected error: %v, got: %v", expError, err) | 		t.Errorf("expected error: %v, got: %v", expError, err) | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | @ -214,12 +214,12 @@ func TestRequest(t *testing.T) { | ||||||
| 		api := NewTeamsAPI(tc.url, logger) | 		api := NewTeamsAPI(tc.url, logger) | ||||||
| 		resp, err := api.TeamInfo("acid", token) | 		resp, err := api.TeamInfo("acid", token) | ||||||
| 		if resp != nil { | 		if resp != nil { | ||||||
| 			t.Errorf("Response expected to be nil") | 			t.Errorf("response expected to be nil") | ||||||
| 			continue | 			continue | ||||||
| 		} | 		} | ||||||
| 
 | 
 | ||||||
| 		if err.Error() != tc.err.Error() { | 		if err.Error() != tc.err.Error() { | ||||||
| 			t.Errorf("Expected error: %v, got: %v", tc.err, err) | 			t.Errorf("expected error: %v, got: %v", tc.err, err) | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
|  | @ -48,7 +48,7 @@ func TestRandomPassword(t *testing.T) { | ||||||
| 	const pwdLength = 10 | 	const pwdLength = 10 | ||||||
| 	pwd := RandomPassword(pwdLength) | 	pwd := RandomPassword(pwdLength) | ||||||
| 	if a := len(pwd); a != pwdLength { | 	if a := len(pwd); a != pwdLength { | ||||||
| 		t.Errorf("Password length expected: %d, got: %d", pwdLength, a) | 		t.Errorf("password length expected: %d, got: %d", pwdLength, a) | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  |  | ||||||
		Loading…
	
		Reference in New Issue