align sync and update logs (#2738)
This commit is contained in:
		
							parent
							
								
									cc9074c184
								
							
						
					
					
						commit
						a08d1679f2
					
				|  | @ -1014,7 +1014,7 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error { | ||||||
| 
 | 
 | ||||||
| 		initUsers := !sameUsers || !sameRotatedUsers || needPoolerUser || needStreamUser | 		initUsers := !sameUsers || !sameRotatedUsers || needPoolerUser || needStreamUser | ||||||
| 		if initUsers { | 		if initUsers { | ||||||
| 			c.logger.Debugf("initialize users") | 			c.logger.Debug("initialize users") | ||||||
| 			if err := c.initUsers(); err != nil { | 			if err := c.initUsers(); err != nil { | ||||||
| 				c.logger.Errorf("could not init users - skipping sync of secrets and databases: %v", err) | 				c.logger.Errorf("could not init users - skipping sync of secrets and databases: %v", err) | ||||||
| 				userInitFailed = true | 				userInitFailed = true | ||||||
|  | @ -1023,7 +1023,7 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error { | ||||||
| 			} | 			} | ||||||
| 		} | 		} | ||||||
| 		if initUsers || annotationsChanged { | 		if initUsers || annotationsChanged { | ||||||
| 			c.logger.Debugf("syncing secrets") | 			c.logger.Debug("syncing secrets") | ||||||
| 			//TODO: mind the secrets of the deleted/new users
 | 			//TODO: mind the secrets of the deleted/new users
 | ||||||
| 			if err := c.syncSecrets(); err != nil { | 			if err := c.syncSecrets(); err != nil { | ||||||
| 				c.logger.Errorf("could not sync secrets: %v", err) | 				c.logger.Errorf("could not sync secrets: %v", err) | ||||||
|  | @ -1065,7 +1065,7 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error { | ||||||
| 
 | 
 | ||||||
| 		// create if it did not exist
 | 		// create if it did not exist
 | ||||||
| 		if !oldSpec.Spec.EnableLogicalBackup && newSpec.Spec.EnableLogicalBackup { | 		if !oldSpec.Spec.EnableLogicalBackup && newSpec.Spec.EnableLogicalBackup { | ||||||
| 			c.logger.Debugf("creating backup cron job") | 			c.logger.Debug("creating backup cron job") | ||||||
| 			if err := c.createLogicalBackupJob(); err != nil { | 			if err := c.createLogicalBackupJob(); err != nil { | ||||||
| 				c.logger.Errorf("could not create a k8s cron job for logical backups: %v", err) | 				c.logger.Errorf("could not create a k8s cron job for logical backups: %v", err) | ||||||
| 				updateFailed = true | 				updateFailed = true | ||||||
|  | @ -1075,7 +1075,7 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error { | ||||||
| 
 | 
 | ||||||
| 		// delete if no longer needed
 | 		// delete if no longer needed
 | ||||||
| 		if oldSpec.Spec.EnableLogicalBackup && !newSpec.Spec.EnableLogicalBackup { | 		if oldSpec.Spec.EnableLogicalBackup && !newSpec.Spec.EnableLogicalBackup { | ||||||
| 			c.logger.Debugf("deleting backup cron job") | 			c.logger.Debug("deleting backup cron job") | ||||||
| 			if err := c.deleteLogicalBackupJob(); err != nil { | 			if err := c.deleteLogicalBackupJob(); err != nil { | ||||||
| 				c.logger.Errorf("could not delete a k8s cron job for logical backups: %v", err) | 				c.logger.Errorf("could not delete a k8s cron job for logical backups: %v", err) | ||||||
| 				updateFailed = true | 				updateFailed = true | ||||||
|  | @ -1095,7 +1095,7 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error { | ||||||
| 
 | 
 | ||||||
| 	// Roles and Databases
 | 	// Roles and Databases
 | ||||||
| 	if !userInitFailed && !(c.databaseAccessDisabled() || c.getNumberOfInstances(&c.Spec) <= 0 || c.Spec.StandbyCluster != nil) { | 	if !userInitFailed && !(c.databaseAccessDisabled() || c.getNumberOfInstances(&c.Spec) <= 0 || c.Spec.StandbyCluster != nil) { | ||||||
| 		c.logger.Debugf("syncing roles") | 		c.logger.Debug("syncing roles") | ||||||
| 		if err := c.syncRoles(); err != nil { | 		if err := c.syncRoles(); err != nil { | ||||||
| 			c.logger.Errorf("could not sync roles: %v", err) | 			c.logger.Errorf("could not sync roles: %v", err) | ||||||
| 			updateFailed = true | 			updateFailed = true | ||||||
|  |  | ||||||
|  | @ -591,7 +591,7 @@ func (c *Cluster) deleteConnectionPooler(role PostgresRole) (err error) { | ||||||
| 	// Lack of connection pooler objects is not a fatal error, just log it if
 | 	// Lack of connection pooler objects is not a fatal error, just log it if
 | ||||||
| 	// it was present before in the manifest
 | 	// it was present before in the manifest
 | ||||||
| 	if c.ConnectionPooler[role] == nil || role == "" { | 	if c.ConnectionPooler[role] == nil || role == "" { | ||||||
| 		c.logger.Debugf("no connection pooler to delete") | 		c.logger.Debug("no connection pooler to delete") | ||||||
| 		return nil | 		return nil | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
|  | @ -622,7 +622,7 @@ func (c *Cluster) deleteConnectionPooler(role PostgresRole) (err error) { | ||||||
| 	// Repeat the same for the service object
 | 	// Repeat the same for the service object
 | ||||||
| 	service := c.ConnectionPooler[role].Service | 	service := c.ConnectionPooler[role].Service | ||||||
| 	if service == nil { | 	if service == nil { | ||||||
| 		c.logger.Debugf("no connection pooler service object to delete") | 		c.logger.Debug("no connection pooler service object to delete") | ||||||
| 	} else { | 	} else { | ||||||
| 
 | 
 | ||||||
| 		err = c.KubeClient. | 		err = c.KubeClient. | ||||||
|  |  | ||||||
|  | @ -111,7 +111,7 @@ func (c *Cluster) pgConnectionString(dbname string) string { | ||||||
| 
 | 
 | ||||||
| func (c *Cluster) databaseAccessDisabled() bool { | func (c *Cluster) databaseAccessDisabled() bool { | ||||||
| 	if !c.OpConfig.EnableDBAccess { | 	if !c.OpConfig.EnableDBAccess { | ||||||
| 		c.logger.Debugf("database access is disabled") | 		c.logger.Debug("database access is disabled") | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	return !c.OpConfig.EnableDBAccess | 	return !c.OpConfig.EnableDBAccess | ||||||
|  |  | ||||||
|  | @ -116,7 +116,7 @@ func (c *Cluster) majorVersionUpgrade() error { | ||||||
| 			c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeNormal, "Major Version Upgrade", "starting major version upgrade on pod %s of %d pods", masterPod.Name, numberOfPods) | 			c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeNormal, "Major Version Upgrade", "starting major version upgrade on pod %s of %d pods", masterPod.Name, numberOfPods) | ||||||
| 			upgradeCommand := fmt.Sprintf("set -o pipefail && /usr/bin/python3 /scripts/inplace_upgrade.py %d 2>&1 | tee last_upgrade.log", numberOfPods) | 			upgradeCommand := fmt.Sprintf("set -o pipefail && /usr/bin/python3 /scripts/inplace_upgrade.py %d 2>&1 | tee last_upgrade.log", numberOfPods) | ||||||
| 
 | 
 | ||||||
| 			c.logger.Debugf("checking if the spilo image runs with root or non-root (check for user id=0)") | 			c.logger.Debug("checking if the spilo image runs with root or non-root (check for user id=0)") | ||||||
| 			resultIdCheck, errIdCheck := c.ExecCommand(podName, "/bin/bash", "-c", "/usr/bin/id -u") | 			resultIdCheck, errIdCheck := c.ExecCommand(podName, "/bin/bash", "-c", "/usr/bin/id -u") | ||||||
| 			if errIdCheck != nil { | 			if errIdCheck != nil { | ||||||
| 				c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeWarning, "Major Version Upgrade", "checking user id to run upgrade from %d to %d FAILED: %v", c.currentMajorVersion, desiredVersion, errIdCheck) | 				c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeWarning, "Major Version Upgrade", "checking user id to run upgrade from %d to %d FAILED: %v", c.currentMajorVersion, desiredVersion, errIdCheck) | ||||||
|  |  | ||||||
|  | @ -59,7 +59,7 @@ func (c *Cluster) markRollingUpdateFlagForPod(pod *v1.Pod, msg string) error { | ||||||
| 		return nil | 		return nil | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	c.logger.Debugf("mark rolling update annotation for %s: reason %s", pod.Name, msg) | 	c.logger.Infof("mark rolling update annotation for %s: reason %s", pod.Name, msg) | ||||||
| 	flag := make(map[string]string) | 	flag := make(map[string]string) | ||||||
| 	flag[rollingUpdatePodAnnotationKey] = strconv.FormatBool(true) | 	flag[rollingUpdatePodAnnotationKey] = strconv.FormatBool(true) | ||||||
| 
 | 
 | ||||||
|  | @ -110,7 +110,7 @@ func (c *Cluster) getRollingUpdateFlagFromPod(pod *v1.Pod) (flag bool) { | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (c *Cluster) deletePods() error { | func (c *Cluster) deletePods() error { | ||||||
| 	c.logger.Debugln("deleting pods") | 	c.logger.Debug("deleting pods") | ||||||
| 	pods, err := c.listPods() | 	pods, err := c.listPods() | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return err | 		return err | ||||||
|  | @ -127,9 +127,9 @@ func (c *Cluster) deletePods() error { | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
| 	if len(pods) > 0 { | 	if len(pods) > 0 { | ||||||
| 		c.logger.Debugln("pods have been deleted") | 		c.logger.Debug("pods have been deleted") | ||||||
| 	} else { | 	} else { | ||||||
| 		c.logger.Debugln("no pods to delete") | 		c.logger.Debug("no pods to delete") | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	return nil | 	return nil | ||||||
|  | @ -230,7 +230,7 @@ func (c *Cluster) MigrateMasterPod(podName spec.NamespacedName) error { | ||||||
| 		return fmt.Errorf("could not get node %q: %v", oldMaster.Spec.NodeName, err) | 		return fmt.Errorf("could not get node %q: %v", oldMaster.Spec.NodeName, err) | ||||||
| 	} | 	} | ||||||
| 	if !eol { | 	if !eol { | ||||||
| 		c.logger.Debugf("no action needed: master pod is already on a live node") | 		c.logger.Debug("no action needed: master pod is already on a live node") | ||||||
| 		return nil | 		return nil | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
|  |  | ||||||
|  | @ -187,7 +187,7 @@ func (c *Cluster) updateStatefulSet(newStatefulSet *appsv1.StatefulSet) error { | ||||||
| 			c.logger.Warningf("could not scale down: %v", err) | 			c.logger.Warningf("could not scale down: %v", err) | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
| 	c.logger.Debugf("updating statefulset") | 	c.logger.Debug("updating statefulset") | ||||||
| 
 | 
 | ||||||
| 	patchData, err := specPatch(newStatefulSet.Spec) | 	patchData, err := specPatch(newStatefulSet.Spec) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
|  | @ -218,7 +218,7 @@ func (c *Cluster) replaceStatefulSet(newStatefulSet *appsv1.StatefulSet) error { | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	statefulSetName := util.NameFromMeta(c.Statefulset.ObjectMeta) | 	statefulSetName := util.NameFromMeta(c.Statefulset.ObjectMeta) | ||||||
| 	c.logger.Debugf("replacing statefulset") | 	c.logger.Debug("replacing statefulset") | ||||||
| 
 | 
 | ||||||
| 	// Delete the current statefulset without deleting the pods
 | 	// Delete the current statefulset without deleting the pods
 | ||||||
| 	deletePropagationPolicy := metav1.DeletePropagationOrphan | 	deletePropagationPolicy := metav1.DeletePropagationOrphan | ||||||
|  | @ -232,7 +232,7 @@ func (c *Cluster) replaceStatefulSet(newStatefulSet *appsv1.StatefulSet) error { | ||||||
| 	// make sure we clear the stored statefulset status if the subsequent create fails.
 | 	// make sure we clear the stored statefulset status if the subsequent create fails.
 | ||||||
| 	c.Statefulset = nil | 	c.Statefulset = nil | ||||||
| 	// wait until the statefulset is truly deleted
 | 	// wait until the statefulset is truly deleted
 | ||||||
| 	c.logger.Debugf("waiting for the statefulset to be deleted") | 	c.logger.Debug("waiting for the statefulset to be deleted") | ||||||
| 
 | 
 | ||||||
| 	err = retryutil.Retry(c.OpConfig.ResourceCheckInterval, c.OpConfig.ResourceCheckTimeout, | 	err = retryutil.Retry(c.OpConfig.ResourceCheckInterval, c.OpConfig.ResourceCheckTimeout, | ||||||
| 		func() (bool, error) { | 		func() (bool, error) { | ||||||
|  | @ -266,7 +266,7 @@ func (c *Cluster) replaceStatefulSet(newStatefulSet *appsv1.StatefulSet) error { | ||||||
| 
 | 
 | ||||||
| func (c *Cluster) deleteStatefulSet() error { | func (c *Cluster) deleteStatefulSet() error { | ||||||
| 	c.setProcessName("deleting statefulset") | 	c.setProcessName("deleting statefulset") | ||||||
| 	c.logger.Debugln("deleting statefulset") | 	c.logger.Debug("deleting statefulset") | ||||||
| 	if c.Statefulset == nil { | 	if c.Statefulset == nil { | ||||||
| 		c.logger.Debug("there is no statefulset in the cluster") | 		c.logger.Debug("there is no statefulset in the cluster") | ||||||
| 		return nil | 		return nil | ||||||
|  | @ -349,7 +349,8 @@ func (c *Cluster) updateService(role PostgresRole, oldService *v1.Service, newSe | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (c *Cluster) deleteService(role PostgresRole) error { | func (c *Cluster) deleteService(role PostgresRole) error { | ||||||
| 	c.logger.Debugf("deleting service %s", role) | 	c.setProcessName("deleting service") | ||||||
|  | 	c.logger.Debugf("deleting %s service", role) | ||||||
| 
 | 
 | ||||||
| 	if c.Services[role] == nil { | 	if c.Services[role] == nil { | ||||||
| 		c.logger.Debugf("No service for %s role was found, nothing to delete", role) | 		c.logger.Debugf("No service for %s role was found, nothing to delete", role) | ||||||
|  | @ -495,7 +496,7 @@ func (c *Cluster) deletePodDisruptionBudget() error { | ||||||
| 
 | 
 | ||||||
| func (c *Cluster) deleteEndpoint(role PostgresRole) error { | func (c *Cluster) deleteEndpoint(role PostgresRole) error { | ||||||
| 	c.setProcessName("deleting endpoint") | 	c.setProcessName("deleting endpoint") | ||||||
| 	c.logger.Debugln("deleting endpoint") | 	c.logger.Debugf("deleting %s endpoint", role) | ||||||
| 	if c.Endpoints[role] == nil { | 	if c.Endpoints[role] == nil { | ||||||
| 		c.logger.Debugf("there is no %s endpoint in the cluster", role) | 		c.logger.Debugf("there is no %s endpoint in the cluster", role) | ||||||
| 		return nil | 		return nil | ||||||
|  | @ -543,7 +544,7 @@ func (c *Cluster) deletePatroniResources() error { | ||||||
| 
 | 
 | ||||||
| func (c *Cluster) deletePatroniConfigMap(suffix string) error { | func (c *Cluster) deletePatroniConfigMap(suffix string) error { | ||||||
| 	c.setProcessName("deleting Patroni config map") | 	c.setProcessName("deleting Patroni config map") | ||||||
| 	c.logger.Debugln("deleting Patroni config map") | 	c.logger.Debugf("deleting %s Patroni config map", suffix) | ||||||
| 	cm := c.PatroniConfigMaps[suffix] | 	cm := c.PatroniConfigMaps[suffix] | ||||||
| 	if cm == nil { | 	if cm == nil { | ||||||
| 		c.logger.Debugf("there is no %s Patroni config map in the cluster", suffix) | 		c.logger.Debugf("there is no %s Patroni config map in the cluster", suffix) | ||||||
|  | @ -565,7 +566,7 @@ func (c *Cluster) deletePatroniConfigMap(suffix string) error { | ||||||
| 
 | 
 | ||||||
| func (c *Cluster) deletePatroniEndpoint(suffix string) error { | func (c *Cluster) deletePatroniEndpoint(suffix string) error { | ||||||
| 	c.setProcessName("deleting Patroni endpoint") | 	c.setProcessName("deleting Patroni endpoint") | ||||||
| 	c.logger.Debugln("deleting Patroni endpoint") | 	c.logger.Debugf("deleting %s Patroni endpoint", suffix) | ||||||
| 	ep := c.PatroniEndpoints[suffix] | 	ep := c.PatroniEndpoints[suffix] | ||||||
| 	if ep == nil { | 	if ep == nil { | ||||||
| 		c.logger.Debugf("there is no %s Patroni endpoint in the cluster", suffix) | 		c.logger.Debugf("there is no %s Patroni endpoint in the cluster", suffix) | ||||||
|  |  | ||||||
|  | @ -46,11 +46,13 @@ func (c *Cluster) updateStreams(newEventStreams *zalandov1.FabricEventStream) (p | ||||||
| 
 | 
 | ||||||
| func (c *Cluster) deleteStream(appId string) error { | func (c *Cluster) deleteStream(appId string) error { | ||||||
| 	c.setProcessName("deleting event stream") | 	c.setProcessName("deleting event stream") | ||||||
|  | 	c.logger.Debugf("deleting event stream with applicationId %s", appId) | ||||||
| 
 | 
 | ||||||
| 	err := c.KubeClient.FabricEventStreams(c.Streams[appId].Namespace).Delete(context.TODO(), c.Streams[appId].Name, metav1.DeleteOptions{}) | 	err := c.KubeClient.FabricEventStreams(c.Streams[appId].Namespace).Delete(context.TODO(), c.Streams[appId].Name, metav1.DeleteOptions{}) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return fmt.Errorf("could not delete event stream %q with applicationId %s: %v", c.Streams[appId].Name, appId, err) | 		return fmt.Errorf("could not delete event stream %q with applicationId %s: %v", c.Streams[appId].Name, appId, err) | ||||||
| 	} | 	} | ||||||
|  | 	c.logger.Infof("event stream %q with applicationId %s has been successfully deleted", c.Streams[appId].Name, appId) | ||||||
| 	delete(c.Streams, appId) | 	delete(c.Streams, appId) | ||||||
| 
 | 
 | ||||||
| 	return nil | 	return nil | ||||||
|  | @ -308,7 +310,7 @@ func (c *Cluster) syncStreams() error { | ||||||
| 
 | 
 | ||||||
| 	_, err := c.KubeClient.CustomResourceDefinitions().Get(context.TODO(), constants.EventStreamCRDName, metav1.GetOptions{}) | 	_, err := c.KubeClient.CustomResourceDefinitions().Get(context.TODO(), constants.EventStreamCRDName, metav1.GetOptions{}) | ||||||
| 	if k8sutil.ResourceNotFound(err) { | 	if k8sutil.ResourceNotFound(err) { | ||||||
| 		c.logger.Debugf("event stream CRD not installed, skipping") | 		c.logger.Debug("event stream CRD not installed, skipping") | ||||||
| 		return nil | 		return nil | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
|  | @ -473,7 +475,7 @@ func (c *Cluster) syncStream(appId string) error { | ||||||
| 			c.Streams[appId] = stream | 			c.Streams[appId] = stream | ||||||
| 		} | 		} | ||||||
| 		if match, reason := c.compareStreams(&stream, desiredStreams); !match { | 		if match, reason := c.compareStreams(&stream, desiredStreams); !match { | ||||||
| 			c.logger.Debugf("updating event streams with applicationId %s: %s", appId, reason) | 			c.logger.Infof("updating event streams with applicationId %s: %s", appId, reason) | ||||||
| 			desiredStreams.ObjectMeta = stream.ObjectMeta | 			desiredStreams.ObjectMeta = stream.ObjectMeta | ||||||
| 			updatedStream, err := c.updateStreams(desiredStreams) | 			updatedStream, err := c.updateStreams(desiredStreams) | ||||||
| 			if err != nil { | 			if err != nil { | ||||||
|  | @ -550,7 +552,6 @@ func (c *Cluster) cleanupRemovedStreams(appIds []string) error { | ||||||
| 			if err != nil { | 			if err != nil { | ||||||
| 				errors = append(errors, fmt.Sprintf("failed deleting event streams with applicationId %s: %v", appId, err)) | 				errors = append(errors, fmt.Sprintf("failed deleting event streams with applicationId %s: %v", appId, err)) | ||||||
| 			} | 			} | ||||||
| 			c.logger.Infof("event streams with applicationId %s have been successfully deleted", appId) |  | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
|  |  | ||||||
|  | @ -300,6 +300,7 @@ func (c *Cluster) syncPatroniService() error { | ||||||
| 		err error | 		err error | ||||||
| 	) | 	) | ||||||
| 	serviceName := fmt.Sprintf("%s-%s", c.Name, Patroni) | 	serviceName := fmt.Sprintf("%s-%s", c.Name, Patroni) | ||||||
|  | 	c.logger.Debugf("syncing %s service", serviceName) | ||||||
| 	c.setProcessName("syncing %s service", serviceName) | 	c.setProcessName("syncing %s service", serviceName) | ||||||
| 
 | 
 | ||||||
| 	if svc, err = c.KubeClient.Services(c.Namespace).Get(context.TODO(), serviceName, metav1.GetOptions{}); err == nil { | 	if svc, err = c.KubeClient.Services(c.Namespace).Get(context.TODO(), serviceName, metav1.GetOptions{}); err == nil { | ||||||
|  | @ -311,7 +312,7 @@ func (c *Cluster) syncPatroniService() error { | ||||||
| 			c.setProcessName("updating %v service", serviceName) | 			c.setProcessName("updating %v service", serviceName) | ||||||
| 			svc, err = c.KubeClient.Services(c.Namespace).Update(context.TODO(), svc, metav1.UpdateOptions{}) | 			svc, err = c.KubeClient.Services(c.Namespace).Update(context.TODO(), svc, metav1.UpdateOptions{}) | ||||||
| 			if err != nil { | 			if err != nil { | ||||||
| 				return fmt.Errorf("could not update %s endpoint: %v", serviceName, err) | 				return fmt.Errorf("could not update %s service: %v", serviceName, err) | ||||||
| 			} | 			} | ||||||
| 			c.Services[Patroni] = svc | 			c.Services[Patroni] = svc | ||||||
| 		} | 		} | ||||||
|  | @ -537,7 +538,7 @@ func (c *Cluster) syncStatefulSet() error { | ||||||
| 		if err != nil { | 		if err != nil { | ||||||
| 			return fmt.Errorf("could not generate statefulset: %v", err) | 			return fmt.Errorf("could not generate statefulset: %v", err) | ||||||
| 		} | 		} | ||||||
| 		c.logger.Debugf("syncing statefulsets") | 		c.logger.Debug("syncing statefulsets") | ||||||
| 		// check if there are still pods with a rolling update flag
 | 		// check if there are still pods with a rolling update flag
 | ||||||
| 		for _, pod := range pods { | 		for _, pod := range pods { | ||||||
| 			if c.getRollingUpdateFlagFromPod(&pod) { | 			if c.getRollingUpdateFlagFromPod(&pod) { | ||||||
|  | @ -552,7 +553,7 @@ func (c *Cluster) syncStatefulSet() error { | ||||||
| 		} | 		} | ||||||
| 
 | 
 | ||||||
| 		if len(podsToRecreate) > 0 { | 		if len(podsToRecreate) > 0 { | ||||||
| 			c.logger.Debugf("%d / %d pod(s) still need to be rotated", len(podsToRecreate), len(pods)) | 			c.logger.Infof("%d / %d pod(s) still need to be rotated", len(podsToRecreate), len(pods)) | ||||||
| 		} | 		} | ||||||
| 
 | 
 | ||||||
| 		// statefulset is already there, make sure we use its definition in order to compare with the spec.
 | 		// statefulset is already there, make sure we use its definition in order to compare with the spec.
 | ||||||
|  | @ -658,7 +659,7 @@ func (c *Cluster) syncStatefulSet() error { | ||||||
| 	// statefulset or those that got their configuration from the outdated statefulset)
 | 	// statefulset or those that got their configuration from the outdated statefulset)
 | ||||||
| 	if len(podsToRecreate) > 0 { | 	if len(podsToRecreate) > 0 { | ||||||
| 		if isSafeToRecreatePods { | 		if isSafeToRecreatePods { | ||||||
| 			c.logger.Debugln("performing rolling update") | 			c.logger.Info("performing rolling update") | ||||||
| 			c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Update", "Performing rolling update") | 			c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Update", "Performing rolling update") | ||||||
| 			if err := c.recreatePods(podsToRecreate, switchoverCandidates); err != nil { | 			if err := c.recreatePods(podsToRecreate, switchoverCandidates); err != nil { | ||||||
| 				return fmt.Errorf("could not recreate pods: %v", err) | 				return fmt.Errorf("could not recreate pods: %v", err) | ||||||
|  | @ -971,7 +972,7 @@ func (c *Cluster) syncStandbyClusterConfiguration() error { | ||||||
| 	// carries the request to change configuration through
 | 	// carries the request to change configuration through
 | ||||||
| 	for _, pod := range pods { | 	for _, pod := range pods { | ||||||
| 		podName := util.NameFromMeta(pod.ObjectMeta) | 		podName := util.NameFromMeta(pod.ObjectMeta) | ||||||
| 		c.logger.Debugf("patching Postgres config via Patroni API on pod %s with following options: %s", | 		c.logger.Infof("patching Postgres config via Patroni API on pod %s with following options: %s", | ||||||
| 			podName, standbyOptionsToSet) | 			podName, standbyOptionsToSet) | ||||||
| 		if err = c.patroni.SetStandbyClusterParameters(&pod, standbyOptionsToSet); err == nil { | 		if err = c.patroni.SetStandbyClusterParameters(&pod, standbyOptionsToSet); err == nil { | ||||||
| 			return nil | 			return nil | ||||||
|  | @ -983,7 +984,7 @@ func (c *Cluster) syncStandbyClusterConfiguration() error { | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (c *Cluster) syncSecrets() error { | func (c *Cluster) syncSecrets() error { | ||||||
| 	c.logger.Info("syncing secrets") | 	c.logger.Debug("syncing secrets") | ||||||
| 	c.setProcessName("syncing secrets") | 	c.setProcessName("syncing secrets") | ||||||
| 	generatedSecrets := c.generateUserSecrets() | 	generatedSecrets := c.generateUserSecrets() | ||||||
| 	retentionUsers := make([]string, 0) | 	retentionUsers := make([]string, 0) | ||||||
|  | @ -993,7 +994,7 @@ func (c *Cluster) syncSecrets() error { | ||||||
| 		secret, err := c.KubeClient.Secrets(generatedSecret.Namespace).Create(context.TODO(), generatedSecret, metav1.CreateOptions{}) | 		secret, err := c.KubeClient.Secrets(generatedSecret.Namespace).Create(context.TODO(), generatedSecret, metav1.CreateOptions{}) | ||||||
| 		if err == nil { | 		if err == nil { | ||||||
| 			c.Secrets[secret.UID] = secret | 			c.Secrets[secret.UID] = secret | ||||||
| 			c.logger.Debugf("created new secret %s, namespace: %s, uid: %s", util.NameFromMeta(secret.ObjectMeta), generatedSecret.Namespace, secret.UID) | 			c.logger.Infof("created new secret %s, namespace: %s, uid: %s", util.NameFromMeta(secret.ObjectMeta), generatedSecret.Namespace, secret.UID) | ||||||
| 			continue | 			continue | ||||||
| 		} | 		} | ||||||
| 		if k8sutil.ResourceAlreadyExists(err) { | 		if k8sutil.ResourceAlreadyExists(err) { | ||||||
|  | @ -1134,7 +1135,7 @@ func (c *Cluster) updateSecret( | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	if updateSecret { | 	if updateSecret { | ||||||
| 		c.logger.Debugln(updateSecretMsg) | 		c.logger.Infof(updateSecretMsg) | ||||||
| 		if secret, err = c.KubeClient.Secrets(secret.Namespace).Update(context.TODO(), secret, metav1.UpdateOptions{}); err != nil { | 		if secret, err = c.KubeClient.Secrets(secret.Namespace).Update(context.TODO(), secret, metav1.UpdateOptions{}); err != nil { | ||||||
| 			return fmt.Errorf("could not update secret %s: %v", secretName, err) | 			return fmt.Errorf("could not update secret %s: %v", secretName, err) | ||||||
| 		} | 		} | ||||||
|  |  | ||||||
|  | @ -193,7 +193,7 @@ func logNiceDiff(log *logrus.Entry, old, new interface{}) { | ||||||
| 	nice := nicediff.Diff(string(o), string(n), true) | 	nice := nicediff.Diff(string(o), string(n), true) | ||||||
| 	for _, s := range strings.Split(nice, "\n") { | 	for _, s := range strings.Split(nice, "\n") { | ||||||
| 		// " is not needed in the value to understand
 | 		// " is not needed in the value to understand
 | ||||||
| 		log.Debugf(strings.ReplaceAll(s, "\"", "")) | 		log.Debug(strings.ReplaceAll(s, "\"", "")) | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | @ -209,7 +209,7 @@ func (c *Cluster) logStatefulSetChanges(old, new *appsv1.StatefulSet, isUpdate b | ||||||
| 	logNiceDiff(c.logger, old.Spec, new.Spec) | 	logNiceDiff(c.logger, old.Spec, new.Spec) | ||||||
| 
 | 
 | ||||||
| 	if !reflect.DeepEqual(old.Annotations, new.Annotations) { | 	if !reflect.DeepEqual(old.Annotations, new.Annotations) { | ||||||
| 		c.logger.Debugf("metadata.annotation are different") | 		c.logger.Debug("metadata.annotation are different") | ||||||
| 		logNiceDiff(c.logger, old.Annotations, new.Annotations) | 		logNiceDiff(c.logger, old.Annotations, new.Annotations) | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
|  | @ -280,7 +280,7 @@ func (c *Cluster) getTeamMembers(teamID string) ([]string, error) { | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	if !c.OpConfig.EnableTeamsAPI { | 	if !c.OpConfig.EnableTeamsAPI { | ||||||
| 		c.logger.Debugf("team API is disabled") | 		c.logger.Debug("team API is disabled") | ||||||
| 		return members, nil | 		return members, nil | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
|  | @ -416,7 +416,7 @@ func (c *Cluster) _waitPodLabelsReady(anyReplica bool) error { | ||||||
| 		podsNumber = len(pods.Items) | 		podsNumber = len(pods.Items) | ||||||
| 		c.logger.Debugf("Waiting for %d pods to become ready", podsNumber) | 		c.logger.Debugf("Waiting for %d pods to become ready", podsNumber) | ||||||
| 	} else { | 	} else { | ||||||
| 		c.logger.Debugf("Waiting for any replica pod to become ready") | 		c.logger.Debug("Waiting for any replica pod to become ready") | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	err := retryutil.Retry(c.OpConfig.ResourceCheckInterval, c.OpConfig.ResourceCheckTimeout, | 	err := retryutil.Retry(c.OpConfig.ResourceCheckInterval, c.OpConfig.ResourceCheckTimeout, | ||||||
|  |  | ||||||
|  | @ -66,7 +66,7 @@ func (c *Cluster) syncVolumes() error { | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (c *Cluster) syncUnderlyingEBSVolume() error { | func (c *Cluster) syncUnderlyingEBSVolume() error { | ||||||
| 	c.logger.Infof("starting to sync EBS volumes: type, iops, throughput, and size") | 	c.logger.Debug("starting to sync EBS volumes: type, iops, throughput, and size") | ||||||
| 
 | 
 | ||||||
| 	var ( | 	var ( | ||||||
| 		err     error | 		err     error | ||||||
|  | @ -136,7 +136,7 @@ func (c *Cluster) syncUnderlyingEBSVolume() error { | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (c *Cluster) populateVolumeMetaData() error { | func (c *Cluster) populateVolumeMetaData() error { | ||||||
| 	c.logger.Infof("starting reading ebs meta data") | 	c.logger.Debug("starting reading ebs meta data") | ||||||
| 
 | 
 | ||||||
| 	pvs, err := c.listPersistentVolumes() | 	pvs, err := c.listPersistentVolumes() | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
|  | @ -165,7 +165,7 @@ func (c *Cluster) populateVolumeMetaData() error { | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	if len(currentVolumes) != len(c.EBSVolumes) && len(c.EBSVolumes) > 0 { | 	if len(currentVolumes) != len(c.EBSVolumes) && len(c.EBSVolumes) > 0 { | ||||||
| 		c.logger.Debugf("number of ebs volumes (%d) discovered differs from already known volumes (%d)", len(currentVolumes), len(c.EBSVolumes)) | 		c.logger.Infof("number of ebs volumes (%d) discovered differs from already known volumes (%d)", len(currentVolumes), len(c.EBSVolumes)) | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	// reset map, operator is not responsible for dangling ebs volumes
 | 	// reset map, operator is not responsible for dangling ebs volumes
 | ||||||
|  | @ -205,18 +205,18 @@ func (c *Cluster) syncVolumeClaims() error { | ||||||
| 			if currentSize < manifestSize { | 			if currentSize < manifestSize { | ||||||
| 				pvc.Spec.Resources.Requests[v1.ResourceStorage] = newSize | 				pvc.Spec.Resources.Requests[v1.ResourceStorage] = newSize | ||||||
| 				needsUpdate = true | 				needsUpdate = true | ||||||
| 				c.logger.Debugf("persistent volume claim for volume %q needs to be resized", pvc.Name) | 				c.logger.Infof("persistent volume claim for volume %q needs to be resized", pvc.Name) | ||||||
| 			} else { | 			} else { | ||||||
| 				c.logger.Warningf("cannot shrink persistent volume") | 				c.logger.Warningf("cannot shrink persistent volume") | ||||||
| 			} | 			} | ||||||
| 		} | 		} | ||||||
| 
 | 
 | ||||||
| 		if needsUpdate { | 		if needsUpdate { | ||||||
| 			c.logger.Debugf("updating persistent volume claim definition for volume %q", pvc.Name) | 			c.logger.Infof("updating persistent volume claim definition for volume %q", pvc.Name) | ||||||
| 			if _, err := c.KubeClient.PersistentVolumeClaims(pvc.Namespace).Update(context.TODO(), &pvc, metav1.UpdateOptions{}); err != nil { | 			if _, err := c.KubeClient.PersistentVolumeClaims(pvc.Namespace).Update(context.TODO(), &pvc, metav1.UpdateOptions{}); err != nil { | ||||||
| 				return fmt.Errorf("could not update persistent volume claim: %q", err) | 				return fmt.Errorf("could not update persistent volume claim: %q", err) | ||||||
| 			} | 			} | ||||||
| 			c.logger.Debugf("successfully updated persistent volume claim %q", pvc.Name) | 			c.logger.Infof("successfully updated persistent volume claim %q", pvc.Name) | ||||||
| 		} else { | 		} else { | ||||||
| 			c.logger.Debugf("volume claim for volume %q do not require updates", pvc.Name) | 			c.logger.Debugf("volume claim for volume %q do not require updates", pvc.Name) | ||||||
| 		} | 		} | ||||||
|  | @ -234,7 +234,7 @@ func (c *Cluster) syncVolumeClaims() error { | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	c.logger.Infof("volume claims have been synced successfully") | 	c.logger.Debug("volume claims have been synced successfully") | ||||||
| 
 | 
 | ||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
|  | @ -255,7 +255,7 @@ func (c *Cluster) syncEbsVolumes() error { | ||||||
| 		return fmt.Errorf("could not sync volumes: %v", err) | 		return fmt.Errorf("could not sync volumes: %v", err) | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	c.logger.Infof("volumes have been synced successfully") | 	c.logger.Debug("volumes have been synced successfully") | ||||||
| 
 | 
 | ||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
|  | @ -274,7 +274,7 @@ func (c *Cluster) listPersistentVolumeClaims() ([]v1.PersistentVolumeClaim, erro | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (c *Cluster) deletePersistentVolumeClaims() error { | func (c *Cluster) deletePersistentVolumeClaims() error { | ||||||
| 	c.logger.Debugln("deleting PVCs") | 	c.logger.Debug("deleting PVCs") | ||||||
| 	pvcs, err := c.listPersistentVolumeClaims() | 	pvcs, err := c.listPersistentVolumeClaims() | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return err | 		return err | ||||||
|  | @ -286,9 +286,9 @@ func (c *Cluster) deletePersistentVolumeClaims() error { | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
| 	if len(pvcs) > 0 { | 	if len(pvcs) > 0 { | ||||||
| 		c.logger.Debugln("PVCs have been deleted") | 		c.logger.Debug("PVCs have been deleted") | ||||||
| 	} else { | 	} else { | ||||||
| 		c.logger.Debugln("no PVCs to delete") | 		c.logger.Debug("no PVCs to delete") | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	return nil | 	return nil | ||||||
|  | @ -382,22 +382,22 @@ func (c *Cluster) resizeVolumes() error { | ||||||
| 		if err != nil { | 		if err != nil { | ||||||
| 			return err | 			return err | ||||||
| 		} | 		} | ||||||
| 		c.logger.Debugf("updating persistent volume %q to %d", pv.Name, newSize) | 		c.logger.Infof("updating persistent volume %q to %d", pv.Name, newSize) | ||||||
| 		if err := resizer.ResizeVolume(awsVolumeID, newSize); err != nil { | 		if err := resizer.ResizeVolume(awsVolumeID, newSize); err != nil { | ||||||
| 			return fmt.Errorf("could not resize EBS volume %q: %v", awsVolumeID, err) | 			return fmt.Errorf("could not resize EBS volume %q: %v", awsVolumeID, err) | ||||||
| 		} | 		} | ||||||
| 		c.logger.Debugf("resizing the filesystem on the volume %q", pv.Name) | 		c.logger.Infof("resizing the filesystem on the volume %q", pv.Name) | ||||||
| 		podName := getPodNameFromPersistentVolume(pv) | 		podName := getPodNameFromPersistentVolume(pv) | ||||||
| 		if err := c.resizePostgresFilesystem(podName, []filesystems.FilesystemResizer{&filesystems.Ext234Resize{}}); err != nil { | 		if err := c.resizePostgresFilesystem(podName, []filesystems.FilesystemResizer{&filesystems.Ext234Resize{}}); err != nil { | ||||||
| 			return fmt.Errorf("could not resize the filesystem on pod %q: %v", podName, err) | 			return fmt.Errorf("could not resize the filesystem on pod %q: %v", podName, err) | ||||||
| 		} | 		} | ||||||
| 		c.logger.Debugf("filesystem resize successful on volume %q", pv.Name) | 		c.logger.Infof("filesystem resize successful on volume %q", pv.Name) | ||||||
| 		pv.Spec.Capacity[v1.ResourceStorage] = newQuantity | 		pv.Spec.Capacity[v1.ResourceStorage] = newQuantity | ||||||
| 		c.logger.Debugf("updating persistent volume definition for volume %q", pv.Name) | 		c.logger.Infof("updating persistent volume definition for volume %q", pv.Name) | ||||||
| 		if _, err := c.KubeClient.PersistentVolumes().Update(context.TODO(), pv, metav1.UpdateOptions{}); err != nil { | 		if _, err := c.KubeClient.PersistentVolumes().Update(context.TODO(), pv, metav1.UpdateOptions{}); err != nil { | ||||||
| 			return fmt.Errorf("could not update persistent volume: %q", err) | 			return fmt.Errorf("could not update persistent volume: %q", err) | ||||||
| 		} | 		} | ||||||
| 		c.logger.Debugf("successfully updated persistent volume %q", pv.Name) | 		c.logger.Infof("successfully updated persistent volume %q", pv.Name) | ||||||
| 
 | 
 | ||||||
| 		if !compatible { | 		if !compatible { | ||||||
| 			c.logger.Warningf("volume %q is incompatible with all available resizing providers, consider switching storage_resize_mode to pvc or off", pv.Name) | 			c.logger.Warningf("volume %q is incompatible with all available resizing providers, consider switching storage_resize_mode to pvc or off", pv.Name) | ||||||
|  | @ -458,7 +458,7 @@ func (c *Cluster) executeEBSMigration() error { | ||||||
| 		} | 		} | ||||||
| 
 | 
 | ||||||
| 		if !hasGp2 { | 		if !hasGp2 { | ||||||
| 			c.logger.Infof("no EBS gp2 volumes left to migrate") | 			c.logger.Debugf("no EBS gp2 volumes left to migrate") | ||||||
| 			return nil | 			return nil | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
|  | @ -143,7 +143,7 @@ func (c *Controller) acquireInitialListOfClusters() error { | ||||||
| 	if list, err = c.listClusters(metav1.ListOptions{ResourceVersion: "0"}); err != nil { | 	if list, err = c.listClusters(metav1.ListOptions{ResourceVersion: "0"}); err != nil { | ||||||
| 		return err | 		return err | ||||||
| 	} | 	} | ||||||
| 	c.logger.Debugf("acquiring initial list of clusters") | 	c.logger.Debug("acquiring initial list of clusters") | ||||||
| 	for _, pg := range list.Items { | 	for _, pg := range list.Items { | ||||||
| 		// XXX: check the cluster status field instead
 | 		// XXX: check the cluster status field instead
 | ||||||
| 		if pg.Error != "" { | 		if pg.Error != "" { | ||||||
|  |  | ||||||
		Loading…
	
		Reference in New Issue