Merge branch 'master' into feature/tests
# Conflicts: # pkg/util/teams/teams.go
This commit is contained in:
		
						commit
						a198442338
					
				|  | @ -25,7 +25,6 @@ data: | ||||||
|   resource_check_interval: 3s |   resource_check_interval: 3s | ||||||
|   resource_check_timeout: 10m |   resource_check_timeout: 10m | ||||||
|   resync_period: 5m |   resync_period: 5m | ||||||
|   resync_period_pod: 5m |  | ||||||
|   super_username: postgres |   super_username: postgres | ||||||
|   teams_api_url: http://fake-teams-api.default.svc.cluster.local |   teams_api_url: http://fake-teams-api.default.svc.cluster.local | ||||||
|   workers: "4" |   workers: "4" | ||||||
|  |  | ||||||
|  | @ -430,7 +430,7 @@ func (c *Cluster) Update(newSpec *spec.Postgresql) error { | ||||||
| 					c.logger.Infof("%s service '%s' has been created", role, util.NameFromMeta(service.ObjectMeta)) | 					c.logger.Infof("%s service '%s' has been created", role, util.NameFromMeta(service.ObjectMeta)) | ||||||
| 				} | 				} | ||||||
| 			} | 			} | ||||||
| 			// only proceeed further if both old and new load balancer were present
 | 			// only proceed further if both old and new load balancer were present
 | ||||||
| 			if !(newSpec.Spec.ReplicaLoadBalancer && c.Spec.ReplicaLoadBalancer) { | 			if !(newSpec.Spec.ReplicaLoadBalancer && c.Spec.ReplicaLoadBalancer) { | ||||||
| 				continue | 				continue | ||||||
| 			} | 			} | ||||||
|  |  | ||||||
|  | @ -39,7 +39,7 @@ func (c *Cluster) Sync() error { | ||||||
| 			if c.Service[role] != nil { | 			if c.Service[role] != nil { | ||||||
| 				// delete the left over replica service
 | 				// delete the left over replica service
 | ||||||
| 				if err := c.deleteService(role); err != nil { | 				if err := c.deleteService(role); err != nil { | ||||||
| 					return fmt.Errorf("could not delete obsolete %s service: %v", role) | 					return fmt.Errorf("could not delete obsolete %s service: %v", role, err) | ||||||
| 				} | 				} | ||||||
| 			} | 			} | ||||||
| 			continue | 			continue | ||||||
|  |  | ||||||
|  | @ -109,8 +109,8 @@ func (c *Cluster) resizeVolumes(newVolume spec.Volume, resizers []volumes.Volume | ||||||
| 					return fmt.Errorf("could not connect to the volume provider: %v", err) | 					return fmt.Errorf("could not connect to the volume provider: %v", err) | ||||||
| 				} | 				} | ||||||
| 				defer func() { | 				defer func() { | ||||||
| 					err2 := resizer.DisconnectFromProvider(); if err2 != nil { | 					if err := resizer.DisconnectFromProvider(); err != nil { | ||||||
| 						c.logger.Errorf("%v", err2) | 						c.logger.Errorf("%v", err) | ||||||
| 					} | 					} | ||||||
| 				}() | 				}() | ||||||
| 			} | 			} | ||||||
|  | @ -127,7 +127,7 @@ func (c *Cluster) resizeVolumes(newVolume spec.Volume, resizers []volumes.Volume | ||||||
| 			if err := c.resizePostgresFilesystem(podName, []filesystems.FilesystemResizer{&filesystems.Ext234Resize{}}); err != nil { | 			if err := c.resizePostgresFilesystem(podName, []filesystems.FilesystemResizer{&filesystems.Ext234Resize{}}); err != nil { | ||||||
| 				return fmt.Errorf("could not resize the filesystem on pod '%s': %v", podName, err) | 				return fmt.Errorf("could not resize the filesystem on pod '%s': %v", podName, err) | ||||||
| 			} | 			} | ||||||
| 			c.logger.Debugf("filesystem resize successfull on volume %s", pv.Name) | 			c.logger.Debugf("filesystem resize successful on volume %s", pv.Name) | ||||||
| 			pv.Spec.Capacity[v1.ResourceStorage] = newQuantity | 			pv.Spec.Capacity[v1.ResourceStorage] = newQuantity | ||||||
| 			c.logger.Debugf("updating persistent volume definition for volume %s", pv.Name) | 			c.logger.Debugf("updating persistent volume definition for volume %s", pv.Name) | ||||||
| 			if _, err := c.KubeClient.PersistentVolumes().Update(pv); err != nil { | 			if _, err := c.KubeClient.PersistentVolumes().Update(pv); err != nil { | ||||||
|  |  | ||||||
|  | @ -13,6 +13,7 @@ import ( | ||||||
| 	"github.com/zalando-incubator/postgres-operator/pkg/cluster" | 	"github.com/zalando-incubator/postgres-operator/pkg/cluster" | ||||||
| 	"github.com/zalando-incubator/postgres-operator/pkg/spec" | 	"github.com/zalando-incubator/postgres-operator/pkg/spec" | ||||||
| 	"github.com/zalando-incubator/postgres-operator/pkg/util/config" | 	"github.com/zalando-incubator/postgres-operator/pkg/util/config" | ||||||
|  | 	"github.com/zalando-incubator/postgres-operator/pkg/util/constants" | ||||||
| 	"github.com/zalando-incubator/postgres-operator/pkg/util/teams" | 	"github.com/zalando-incubator/postgres-operator/pkg/util/teams" | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
|  | @ -38,6 +39,8 @@ type Controller struct { | ||||||
| 	podCh              chan spec.PodEvent | 	podCh              chan spec.PodEvent | ||||||
| 
 | 
 | ||||||
| 	clusterEventQueues []*cache.FIFO | 	clusterEventQueues []*cache.FIFO | ||||||
|  | 
 | ||||||
|  | 	lastClusterSyncTime int64 | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func New(controllerConfig *Config, operatorConfig *config.Config) *Controller { | func New(controllerConfig *Config, operatorConfig *config.Config) *Controller { | ||||||
|  | @ -93,7 +96,7 @@ func (c *Controller) initController() { | ||||||
| 	c.postgresqlInformer = cache.NewSharedIndexInformer( | 	c.postgresqlInformer = cache.NewSharedIndexInformer( | ||||||
| 		clusterLw, | 		clusterLw, | ||||||
| 		&spec.Postgresql{}, | 		&spec.Postgresql{}, | ||||||
| 		c.opConfig.ResyncPeriod, | 		constants.QueueResyncPeriodTPR, | ||||||
| 		cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) | 		cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) | ||||||
| 
 | 
 | ||||||
| 	if err := c.postgresqlInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ | 	if err := c.postgresqlInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ | ||||||
|  | @ -113,7 +116,7 @@ func (c *Controller) initController() { | ||||||
| 	c.podInformer = cache.NewSharedIndexInformer( | 	c.podInformer = cache.NewSharedIndexInformer( | ||||||
| 		podLw, | 		podLw, | ||||||
| 		&v1.Pod{}, | 		&v1.Pod{}, | ||||||
| 		c.opConfig.ResyncPeriodPod, | 		constants.QueueResyncPeriodPod, | ||||||
| 		cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) | 		cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) | ||||||
| 
 | 
 | ||||||
| 	if err := c.podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ | 	if err := c.podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ | ||||||
|  | @ -141,6 +144,7 @@ func (c *Controller) runInformers(stopCh <-chan struct{}) { | ||||||
| 	go c.postgresqlInformer.Run(stopCh) | 	go c.postgresqlInformer.Run(stopCh) | ||||||
| 	go c.podInformer.Run(stopCh) | 	go c.podInformer.Run(stopCh) | ||||||
| 	go c.podEventsDispatcher(stopCh) | 	go c.podEventsDispatcher(stopCh) | ||||||
|  | 	go c.clusterResync(stopCh) | ||||||
| 
 | 
 | ||||||
| 	<-stopCh | 	<-stopCh | ||||||
| } | } | ||||||
|  |  | ||||||
|  | @ -3,6 +3,8 @@ package controller | ||||||
| import ( | import ( | ||||||
| 	"fmt" | 	"fmt" | ||||||
| 	"reflect" | 	"reflect" | ||||||
|  | 	"sync/atomic" | ||||||
|  | 	"time" | ||||||
| 
 | 
 | ||||||
| 	"k8s.io/client-go/pkg/api" | 	"k8s.io/client-go/pkg/api" | ||||||
| 	"k8s.io/client-go/pkg/api/meta" | 	"k8s.io/client-go/pkg/api/meta" | ||||||
|  | @ -18,6 +20,19 @@ import ( | ||||||
| 	"github.com/zalando-incubator/postgres-operator/pkg/util/constants" | 	"github.com/zalando-incubator/postgres-operator/pkg/util/constants" | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
|  | func (c *Controller) clusterResync(stopCh <-chan struct{}) { | ||||||
|  | 	ticker := time.NewTicker(c.opConfig.ResyncPeriod) | ||||||
|  | 
 | ||||||
|  | 	for { | ||||||
|  | 		select { | ||||||
|  | 		case <-ticker.C: | ||||||
|  | 			c.clusterListFunc(api.ListOptions{ResourceVersion: "0"}) | ||||||
|  | 		case <-stopCh: | ||||||
|  | 			return | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  | 
 | ||||||
| func (c *Controller) clusterListFunc(options api.ListOptions) (runtime.Object, error) { | func (c *Controller) clusterListFunc(options api.ListOptions) (runtime.Object, error) { | ||||||
| 	c.logger.Info("Getting list of currently running clusters") | 	c.logger.Info("Getting list of currently running clusters") | ||||||
| 
 | 
 | ||||||
|  | @ -37,6 +52,11 @@ func (c *Controller) clusterListFunc(options api.ListOptions) (runtime.Object, e | ||||||
| 		return nil, fmt.Errorf("could not extract list of postgresql objects: %v", err) | 		return nil, fmt.Errorf("could not extract list of postgresql objects: %v", err) | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
|  | 	if time.Now().Unix()-atomic.LoadInt64(&c.lastClusterSyncTime) <= int64(c.opConfig.ResyncPeriod.Seconds()) { | ||||||
|  | 		c.logger.Debugln("skipping resync of clusters") | ||||||
|  | 		return object, err | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
| 	var activeClustersCnt, failedClustersCnt int | 	var activeClustersCnt, failedClustersCnt int | ||||||
| 	for _, obj := range objList { | 	for _, obj := range objList { | ||||||
| 		pg, ok := obj.(*spec.Postgresql) | 		pg, ok := obj.(*spec.Postgresql) | ||||||
|  | @ -63,6 +83,8 @@ func (c *Controller) clusterListFunc(options api.ListOptions) (runtime.Object, e | ||||||
| 		c.logger.Infof("No clusters running") | 		c.logger.Infof("No clusters running") | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
|  | 	atomic.StoreInt64(&c.lastClusterSyncTime, time.Now().Unix()) | ||||||
|  | 
 | ||||||
| 	return object, err | 	return object, err | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  |  | ||||||
|  | @ -135,13 +135,13 @@ func (m *MaintenanceWindow) MarshalJSON() ([]byte, error) { | ||||||
| 		return []byte(fmt.Sprintf("\"%s-%s\"", | 		return []byte(fmt.Sprintf("\"%s-%s\"", | ||||||
| 			m.StartTime.Format("15:04"), | 			m.StartTime.Format("15:04"), | ||||||
| 			m.EndTime.Format("15:04"))), nil | 			m.EndTime.Format("15:04"))), nil | ||||||
| 	} else { | 	} | ||||||
|  | 
 | ||||||
| 	return []byte(fmt.Sprintf("\"%s:%s-%s\"", | 	return []byte(fmt.Sprintf("\"%s:%s-%s\"", | ||||||
| 		m.Weekday.String()[:3], | 		m.Weekday.String()[:3], | ||||||
| 		m.StartTime.Format("15:04"), | 		m.StartTime.Format("15:04"), | ||||||
| 		m.EndTime.Format("15:04"))), nil | 		m.EndTime.Format("15:04"))), nil | ||||||
| } | } | ||||||
| } |  | ||||||
| 
 | 
 | ||||||
| // UnmarshalJSON convets a JSON to the maintenance window definition.
 | // UnmarshalJSON convets a JSON to the maintenance window definition.
 | ||||||
| func (m *MaintenanceWindow) UnmarshalJSON(data []byte) error { | func (m *MaintenanceWindow) UnmarshalJSON(data []byte) error { | ||||||
|  |  | ||||||
|  | @ -282,7 +282,7 @@ var postgresqlList = []struct { | ||||||
| 				Kind:       "List", | 				Kind:       "List", | ||||||
| 				APIVersion: "v1", | 				APIVersion: "v1", | ||||||
| 			}, | 			}, | ||||||
| 			Items: []Postgresql{Postgresql{ | 			Items: []Postgresql{{ | ||||||
| 				TypeMeta: unversioned.TypeMeta{ | 				TypeMeta: unversioned.TypeMeta{ | ||||||
| 					Kind:       "Postgresql", | 					Kind:       "Postgresql", | ||||||
| 					APIVersion: "acid.zalan.do/v1", | 					APIVersion: "acid.zalan.do/v1", | ||||||
|  | @ -382,7 +382,7 @@ func TestUnmarshalMaintenanceWindow(t *testing.T) { | ||||||
| 		} | 		} | ||||||
| 
 | 
 | ||||||
| 		if !reflect.DeepEqual(m, tt.out) { | 		if !reflect.DeepEqual(m, tt.out) { | ||||||
| 			t.Errorf("Expected maintenace window: %#v, got: %#v", tt.out, m) | 			t.Errorf("Expected maintenance window: %#v, got: %#v", tt.out, m) | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
|  | @ -15,7 +15,6 @@ type TPR struct { | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| type Resources struct { | type Resources struct { | ||||||
| 	ResyncPeriodPod        time.Duration     `name:"resync_period_pod" default:"5m"` |  | ||||||
| 	ResourceCheckInterval  time.Duration     `name:"resource_check_interval" default:"3s"` | 	ResourceCheckInterval  time.Duration     `name:"resource_check_interval" default:"3s"` | ||||||
| 	ResourceCheckTimeout   time.Duration     `name:"resource_check_timeout" default:"10m"` | 	ResourceCheckTimeout   time.Duration     `name:"resource_check_timeout" default:"10m"` | ||||||
| 	PodLabelWaitTimeout    time.Duration     `name:"pod_label_wait_timeout" default:"10m"` | 	PodLabelWaitTimeout    time.Duration     `name:"pod_label_wait_timeout" default:"10m"` | ||||||
|  |  | ||||||
|  | @ -10,4 +10,7 @@ const ( | ||||||
| 	K8sAPIPath                  = "/api" | 	K8sAPIPath                  = "/api" | ||||||
| 	StatefulsetDeletionInterval = 1 * time.Second | 	StatefulsetDeletionInterval = 1 * time.Second | ||||||
| 	StatefulsetDeletionTimeout  = 30 * time.Second | 	StatefulsetDeletionTimeout  = 30 * time.Second | ||||||
|  | 
 | ||||||
|  | 	QueueResyncPeriodPod = 5 * time.Minute | ||||||
|  | 	QueueResyncPeriodTPR = 5 * time.Minute | ||||||
| ) | ) | ||||||
|  |  | ||||||
|  | @ -25,13 +25,6 @@ var pgUsers = []struct { | ||||||
| 		MemberOf: []string{}}, | 		MemberOf: []string{}}, | ||||||
| 		"md592f413f3974bdf3799bb6fecb5f9f2c6"}} | 		"md592f413f3974bdf3799bb6fecb5f9f2c6"}} | ||||||
| 
 | 
 | ||||||
| var prettyTest = []struct { |  | ||||||
| 	in  interface{} |  | ||||||
| 	out string |  | ||||||
| }{ |  | ||||||
| 	{pgUsers, `[{{test password [] []} md587f77988ccb5aa917c93201ba314fcd4} {{test md592f413f3974bdf3799bb6fecb5f9f2c6 [] []} md592f413f3974bdf3799bb6fecb5f9f2c6}]`}, |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| var prettyDiffTest = []struct { | var prettyDiffTest = []struct { | ||||||
| 	inA interface{} | 	inA interface{} | ||||||
| 	inB interface{} | 	inB interface{} | ||||||
|  |  | ||||||
		Loading…
	
		Reference in New Issue