Merge branch 'master' into master
This commit is contained in:
		
						commit
						c9ed36fd26
					
				|  | @ -1160,6 +1160,7 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error { | ||||||
| 
 | 
 | ||||||
| 	// streams
 | 	// streams
 | ||||||
| 	if len(newSpec.Spec.Streams) > 0 || len(oldSpec.Spec.Streams) != len(newSpec.Spec.Streams) { | 	if len(newSpec.Spec.Streams) > 0 || len(oldSpec.Spec.Streams) != len(newSpec.Spec.Streams) { | ||||||
|  | 		c.logger.Debug("syncing streams") | ||||||
| 		if err := c.syncStreams(); err != nil { | 		if err := c.syncStreams(); err != nil { | ||||||
| 			c.logger.Errorf("could not sync streams: %v", err) | 			c.logger.Errorf("could not sync streams: %v", err) | ||||||
| 			updateFailed = true | 			updateFailed = true | ||||||
|  |  | ||||||
|  | @ -114,10 +114,10 @@ func (c *Cluster) syncPublication(dbName string, databaseSlotsList map[string]za | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	for slotName, slotAndPublication := range databaseSlotsList { | 	for slotName, slotAndPublication := range databaseSlotsList { | ||||||
| 		tables := slotAndPublication.Publication | 		newTables := slotAndPublication.Publication | ||||||
| 		tableNames := make([]string, len(tables)) | 		tableNames := make([]string, len(newTables)) | ||||||
| 		i := 0 | 		i := 0 | ||||||
| 		for t := range tables { | 		for t := range newTables { | ||||||
| 			tableName, schemaName := getTableSchema(t) | 			tableName, schemaName := getTableSchema(t) | ||||||
| 			tableNames[i] = fmt.Sprintf("%s.%s", schemaName, tableName) | 			tableNames[i] = fmt.Sprintf("%s.%s", schemaName, tableName) | ||||||
| 			i++ | 			i++ | ||||||
|  | @ -126,6 +126,12 @@ func (c *Cluster) syncPublication(dbName string, databaseSlotsList map[string]za | ||||||
| 		tableList := strings.Join(tableNames, ", ") | 		tableList := strings.Join(tableNames, ", ") | ||||||
| 
 | 
 | ||||||
| 		currentTables, exists := currentPublications[slotName] | 		currentTables, exists := currentPublications[slotName] | ||||||
|  | 		// if newTables is empty it means that it's definition was removed from streams section
 | ||||||
|  | 		// but when slot is defined in manifest we should sync publications, too
 | ||||||
|  | 		// by reusing current tables we make sure it is not
 | ||||||
|  | 		if len(newTables) == 0 { | ||||||
|  | 			tableList = currentTables | ||||||
|  | 		} | ||||||
| 		if !exists { | 		if !exists { | ||||||
| 			createPublications[slotName] = tableList | 			createPublications[slotName] = tableList | ||||||
| 		} else if currentTables != tableList { | 		} else if currentTables != tableList { | ||||||
|  | @ -350,16 +356,8 @@ func (c *Cluster) syncStreams() error { | ||||||
| 		return nil | 		return nil | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	databaseSlots := make(map[string]map[string]zalandov1.Slot) | 	// create map with every database and empty slot defintion
 | ||||||
| 	slotsToSync := make(map[string]map[string]string) | 	// we need it to detect removal of streams from databases
 | ||||||
| 	requiredPatroniConfig := c.Spec.Patroni |  | ||||||
| 
 |  | ||||||
| 	if len(requiredPatroniConfig.Slots) > 0 { |  | ||||||
| 		for slotName, slotConfig := range requiredPatroniConfig.Slots { |  | ||||||
| 			slotsToSync[slotName] = slotConfig |  | ||||||
| 		} |  | ||||||
| 	} |  | ||||||
| 
 |  | ||||||
| 	if err := c.initDbConn(); err != nil { | 	if err := c.initDbConn(); err != nil { | ||||||
| 		return fmt.Errorf("could not init database connection") | 		return fmt.Errorf("could not init database connection") | ||||||
| 	} | 	} | ||||||
|  | @ -372,13 +370,28 @@ func (c *Cluster) syncStreams() error { | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return fmt.Errorf("could not get list of databases: %v", err) | 		return fmt.Errorf("could not get list of databases: %v", err) | ||||||
| 	} | 	} | ||||||
| 	// get database name with empty list of slot, except template0 and template1
 | 	databaseSlots := make(map[string]map[string]zalandov1.Slot) | ||||||
| 	for dbName := range listDatabases { | 	for dbName := range listDatabases { | ||||||
| 		if dbName != "template0" && dbName != "template1" { | 		if dbName != "template0" && dbName != "template1" { | ||||||
| 			databaseSlots[dbName] = map[string]zalandov1.Slot{} | 			databaseSlots[dbName] = map[string]zalandov1.Slot{} | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
|  | 	// need to take explicitly defined slots into account whey syncing Patroni config
 | ||||||
|  | 	slotsToSync := make(map[string]map[string]string) | ||||||
|  | 	requiredPatroniConfig := c.Spec.Patroni | ||||||
|  | 	if len(requiredPatroniConfig.Slots) > 0 { | ||||||
|  | 		for slotName, slotConfig := range requiredPatroniConfig.Slots { | ||||||
|  | 			slotsToSync[slotName] = slotConfig | ||||||
|  | 			if _, exists := databaseSlots[slotConfig["database"]]; exists { | ||||||
|  | 				databaseSlots[slotConfig["database"]][slotName] = zalandov1.Slot{ | ||||||
|  | 					Slot:        slotConfig, | ||||||
|  | 					Publication: make(map[string]acidv1.StreamTable), | ||||||
|  | 				} | ||||||
|  | 			} | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
| 	// get list of required slots and publications, group by database
 | 	// get list of required slots and publications, group by database
 | ||||||
| 	for _, stream := range c.Spec.Streams { | 	for _, stream := range c.Spec.Streams { | ||||||
| 		if _, exists := databaseSlots[stream.Database]; !exists { | 		if _, exists := databaseSlots[stream.Database]; !exists { | ||||||
|  | @ -391,13 +404,13 @@ func (c *Cluster) syncStreams() error { | ||||||
| 			"type":     "logical", | 			"type":     "logical", | ||||||
| 		} | 		} | ||||||
| 		slotName := getSlotName(stream.Database, stream.ApplicationId) | 		slotName := getSlotName(stream.Database, stream.ApplicationId) | ||||||
| 		if _, exists := databaseSlots[stream.Database][slotName]; !exists { | 		slotAndPublication, exists := databaseSlots[stream.Database][slotName] | ||||||
|  | 		if !exists { | ||||||
| 			databaseSlots[stream.Database][slotName] = zalandov1.Slot{ | 			databaseSlots[stream.Database][slotName] = zalandov1.Slot{ | ||||||
| 				Slot:        slot, | 				Slot:        slot, | ||||||
| 				Publication: stream.Tables, | 				Publication: stream.Tables, | ||||||
| 			} | 			} | ||||||
| 		} else { | 		} else { | ||||||
| 			slotAndPublication := databaseSlots[stream.Database][slotName] |  | ||||||
| 			streamTables := slotAndPublication.Publication | 			streamTables := slotAndPublication.Publication | ||||||
| 			for tableName, table := range stream.Tables { | 			for tableName, table := range stream.Tables { | ||||||
| 				if _, exists := streamTables[tableName]; !exists { | 				if _, exists := streamTables[tableName]; !exists { | ||||||
|  | @ -492,16 +505,17 @@ func (c *Cluster) syncStream(appId string) error { | ||||||
| 			continue | 			continue | ||||||
| 		} | 		} | ||||||
| 		streamExists = true | 		streamExists = true | ||||||
|  | 		c.Streams[appId] = &stream | ||||||
| 		desiredStreams := c.generateFabricEventStream(appId) | 		desiredStreams := c.generateFabricEventStream(appId) | ||||||
| 		if !reflect.DeepEqual(stream.ObjectMeta.OwnerReferences, desiredStreams.ObjectMeta.OwnerReferences) { | 		if !reflect.DeepEqual(stream.ObjectMeta.OwnerReferences, desiredStreams.ObjectMeta.OwnerReferences) { | ||||||
| 			c.logger.Infof("owner references of event streams with applicationId %s do not match the current ones", appId) | 			c.logger.Infof("owner references of event streams with applicationId %s do not match the current ones", appId) | ||||||
| 			stream.ObjectMeta.OwnerReferences = desiredStreams.ObjectMeta.OwnerReferences | 			stream.ObjectMeta.OwnerReferences = desiredStreams.ObjectMeta.OwnerReferences | ||||||
| 			c.setProcessName("updating event streams with applicationId %s", appId) | 			c.setProcessName("updating event streams with applicationId %s", appId) | ||||||
| 			stream, err := c.KubeClient.FabricEventStreams(stream.Namespace).Update(context.TODO(), &stream, metav1.UpdateOptions{}) | 			updatedStream, err := c.KubeClient.FabricEventStreams(stream.Namespace).Update(context.TODO(), &stream, metav1.UpdateOptions{}) | ||||||
| 			if err != nil { | 			if err != nil { | ||||||
| 				return fmt.Errorf("could not update event streams with applicationId %s: %v", appId, err) | 				return fmt.Errorf("could not update event streams with applicationId %s: %v", appId, err) | ||||||
| 			} | 			} | ||||||
| 			c.Streams[appId] = stream | 			c.Streams[appId] = updatedStream | ||||||
| 		} | 		} | ||||||
| 		if match, reason := c.compareStreams(&stream, desiredStreams); !match { | 		if match, reason := c.compareStreams(&stream, desiredStreams); !match { | ||||||
| 			c.logger.Infof("updating event streams with applicationId %s: %s", appId, reason) | 			c.logger.Infof("updating event streams with applicationId %s: %s", appId, reason) | ||||||
|  |  | ||||||
|  | @ -153,7 +153,10 @@ func (c *Cluster) Sync(newSpec *acidv1.Postgresql) error { | ||||||
| 		return fmt.Errorf("could not sync connection pooler: %v", err) | 		return fmt.Errorf("could not sync connection pooler: %v", err) | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	if len(c.Spec.Streams) > 0 { | 	// sync if manifest stream count is different from stream CR count
 | ||||||
|  | 	// it can be that they are always different due to grouping of manifest streams
 | ||||||
|  | 	// but we would catch missed removals on update
 | ||||||
|  | 	if len(c.Spec.Streams) != len(c.Streams) { | ||||||
| 		c.logger.Debug("syncing streams") | 		c.logger.Debug("syncing streams") | ||||||
| 		if err = c.syncStreams(); err != nil { | 		if err = c.syncStreams(); err != nil { | ||||||
| 			err = fmt.Errorf("could not sync streams: %v", err) | 			err = fmt.Errorf("could not sync streams: %v", err) | ||||||
|  |  | ||||||
|  | @ -650,7 +650,7 @@ func Test_trimCronjobName(t *testing.T) { | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func TestisInMaintenanceWindow(t *testing.T) { | func TestIsInMaintenanceWindow(t *testing.T) { | ||||||
| 	now := time.Now() | 	now := time.Now() | ||||||
| 	futureTimeStart := now.Add(1 * time.Hour) | 	futureTimeStart := now.Add(1 * time.Hour) | ||||||
| 	futureTimeStartFormatted := futureTimeStart.Format("15:04") | 	futureTimeStartFormatted := futureTimeStart.Format("15:04") | ||||||
|  |  | ||||||
		Loading…
	
		Reference in New Issue