Increase logging verbosity, restructure code
This commit is contained in:
parent
3aaa05fb96
commit
79a6726d4d
|
|
@ -99,24 +99,6 @@ func (c *Cluster) Run(stopCh <-chan struct{}) {
|
||||||
<-stopCh
|
<-stopCh
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Cluster) needsRollingUpdate(otherSpec *spec.Postgresql) bool {
|
|
||||||
//TODO: add more checks
|
|
||||||
if c.Spec.Version != otherSpec.Spec.Version {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
if !reflect.DeepEqual(c.Spec.Resources, otherSpec.Spec.Resources) {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
newSs := genStatefulSet(c.ClusterName(), otherSpec.Spec, c.etcdHost, c.dockerImage)
|
|
||||||
diff, res := statefulsetsEqual(c.Statefulset, newSs)
|
|
||||||
if diff && res {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *Cluster) SetStatus(status spec.PostgresStatus) {
|
func (c *Cluster) SetStatus(status spec.PostgresStatus) {
|
||||||
b, err := json.Marshal(status)
|
b, err := json.Marshal(status)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
@ -198,41 +180,113 @@ func (c *Cluster) Create() error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c Cluster) sameServiceWith(service *v1.Service) bool {
|
||||||
|
//TODO: improve comparison
|
||||||
|
return reflect.DeepEqual(c.Service.Spec.LoadBalancerSourceRanges, service.Spec.LoadBalancerSourceRanges)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c Cluster) sameVolumeWith(volume spec.Volume) bool {
|
||||||
|
return reflect.DeepEqual(c.Spec.Volume, volume)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c Cluster) compareStatefulSetWith(statefulSet *v1beta1.StatefulSet) (equal, needsRollUpdate bool) {
|
||||||
|
equal = true
|
||||||
|
needsRollUpdate = false
|
||||||
|
//TODO: improve me
|
||||||
|
if *c.Statefulset.Spec.Replicas != *statefulSet.Spec.Replicas {
|
||||||
|
equal = false
|
||||||
|
}
|
||||||
|
if len(c.Statefulset.Spec.Template.Spec.Containers) != len(statefulSet.Spec.Template.Spec.Containers) {
|
||||||
|
equal = false
|
||||||
|
needsRollUpdate = true
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if len(c.Statefulset.Spec.Template.Spec.Containers) == 0 {
|
||||||
|
c.logger.Warnf("StatefulSet '%s' has no container", util.NameFromMeta(c.Statefulset.ObjectMeta))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
container1 := c.Statefulset.Spec.Template.Spec.Containers[0]
|
||||||
|
container2 := statefulSet.Spec.Template.Spec.Containers[0]
|
||||||
|
if container1.Image != container2.Image {
|
||||||
|
equal = false
|
||||||
|
needsRollUpdate = true
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if !reflect.DeepEqual(container1.Ports, container2.Ports) {
|
||||||
|
equal = false
|
||||||
|
needsRollUpdate = true
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if !reflect.DeepEqual(container1.Resources, container2.Resources) {
|
||||||
|
equal = false
|
||||||
|
needsRollUpdate = true
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if !reflect.DeepEqual(container1.Env, container2.Env) {
|
||||||
|
equal = false
|
||||||
|
needsRollUpdate = true
|
||||||
|
}
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
func (c *Cluster) Update(newSpec *spec.Postgresql) error {
|
func (c *Cluster) Update(newSpec *spec.Postgresql) error {
|
||||||
c.logger.Infof("Cluster update from version %s to %s",
|
c.logger.Infof("Cluster update from version %s to %s",
|
||||||
c.Metadata.ResourceVersion, newSpec.Metadata.ResourceVersion)
|
c.Metadata.ResourceVersion, newSpec.Metadata.ResourceVersion)
|
||||||
|
|
||||||
rollingUpdate := c.needsRollingUpdate(newSpec)
|
|
||||||
if rollingUpdate {
|
|
||||||
c.logger.Infof("Pods need to be recreated")
|
|
||||||
}
|
|
||||||
|
|
||||||
newStatefulSet := genStatefulSet(c.ClusterName(), newSpec.Spec, c.etcdHost, c.dockerImage)
|
|
||||||
|
|
||||||
newService := resources.Service(c.ClusterName(), c.TeamName(), newSpec.Spec.AllowedSourceRanges)
|
newService := resources.Service(c.ClusterName(), c.TeamName(), newSpec.Spec.AllowedSourceRanges)
|
||||||
if !servicesEqual(newService, c.Service) {
|
if !c.sameServiceWith(newService) {
|
||||||
c.logger.Infof("Service needs to be upated")
|
c.logger.Infof("LoadBalancer configuration has changed for Service '%s': %+v -> %+v",
|
||||||
|
util.NameFromMeta(c.Service.ObjectMeta),
|
||||||
|
c.Service.Spec.LoadBalancerSourceRanges, newService.Spec.LoadBalancerSourceRanges,
|
||||||
|
)
|
||||||
if err := c.updateService(newService); err != nil {
|
if err := c.updateService(newService); err != nil {
|
||||||
return fmt.Errorf("Can't update Service: %s", err)
|
return fmt.Errorf("Can't update Service: %s", err)
|
||||||
} else {
|
} else {
|
||||||
c.logger.Infof("Service has been updated")
|
c.logger.Infof("Service '%s' has been updated", util.NameFromMeta(c.Service.ObjectMeta))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if !reflect.DeepEqual(newSpec.Spec.Volume, c.Spec.Volume) {
|
if !c.sameVolumeWith(newSpec.Spec.Volume) {
|
||||||
|
c.logger.Infof("Volume specification has been changed")
|
||||||
//TODO: update PVC
|
//TODO: update PVC
|
||||||
}
|
}
|
||||||
|
|
||||||
//TODO: mind the case of updating allowedSourceRanges
|
newStatefulSet := genStatefulSet(c.ClusterName(), newSpec.Spec, c.etcdHost, c.dockerImage)
|
||||||
if err := c.updateStatefulSet(newStatefulSet); err != nil {
|
sameSS, rollingUpdate := c.compareStatefulSetWith(newStatefulSet)
|
||||||
return fmt.Errorf("Can't upate StatefulSet: %s", err)
|
|
||||||
|
if !sameSS {
|
||||||
|
c.logger.Infof("StatefulSet '%s' has been changed: %+v -> %+v",
|
||||||
|
util.NameFromMeta(c.Statefulset.ObjectMeta),
|
||||||
|
c.Statefulset.Spec, newStatefulSet.Spec,
|
||||||
|
)
|
||||||
|
//TODO: mind the case of updating allowedSourceRanges
|
||||||
|
if err := c.updateStatefulSet(newStatefulSet); err != nil {
|
||||||
|
return fmt.Errorf("Can't upate StatefulSet: %s", err)
|
||||||
|
}
|
||||||
|
c.logger.Infof("StatefulSet '%s' has been updated", util.NameFromMeta(c.Statefulset.ObjectMeta))
|
||||||
|
}
|
||||||
|
|
||||||
|
if c.Spec.PgVersion != newSpec.Spec.PgVersion { // PG versions comparison
|
||||||
|
c.logger.Warnf("Postgresql version change(%s -> %s) is not allowed",
|
||||||
|
c.Spec.PgVersion, newSpec.Spec.PgVersion)
|
||||||
|
//TODO: rewrite pg version in tpr spec
|
||||||
|
}
|
||||||
|
|
||||||
|
if !reflect.DeepEqual(c.Spec.Resources, newSpec.Spec.Resources) { // Kubernetes resources: cpu, mem
|
||||||
|
rollingUpdate = true
|
||||||
}
|
}
|
||||||
|
|
||||||
if rollingUpdate {
|
if rollingUpdate {
|
||||||
|
c.logger.Infof("Rolling update is needed")
|
||||||
// TODO: wait for actual streaming to the replica
|
// TODO: wait for actual streaming to the replica
|
||||||
if err := c.recreatePods(); err != nil {
|
if err := c.recreatePods(); err != nil {
|
||||||
return fmt.Errorf("Can't recreate Pods: %s", err)
|
return fmt.Errorf("Can't recreate Pods: %s", err)
|
||||||
}
|
}
|
||||||
|
c.logger.Infof("Rolling update has been finished")
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
|
|
||||||
|
|
@ -22,7 +22,7 @@ func genStatefulSet(clusterName spec.ClusterName, cSpec spec.PostgresSpec, etcdH
|
||||||
volumeSize := cSpec.Volume.Size
|
volumeSize := cSpec.Volume.Size
|
||||||
volumeStorageClass := cSpec.Volume.StorageClass
|
volumeStorageClass := cSpec.Volume.StorageClass
|
||||||
resourceList := resources.ResourceList(cSpec.Resources)
|
resourceList := resources.ResourceList(cSpec.Resources)
|
||||||
template := resources.PodTemplate(clusterName, resourceList, dockerImage, cSpec.Version, etcdHost)
|
template := resources.PodTemplate(clusterName, resourceList, cSpec.PgVersion, dockerImage, etcdHost)
|
||||||
volumeClaimTemplate := resources.VolumeClaimTemplate(volumeSize, volumeStorageClass)
|
volumeClaimTemplate := resources.VolumeClaimTemplate(volumeSize, volumeStorageClass)
|
||||||
|
|
||||||
return resources.StatefulSet(clusterName, template, volumeClaimTemplate, cSpec.NumberOfInstances)
|
return resources.StatefulSet(clusterName, template, volumeClaimTemplate, cSpec.NumberOfInstances)
|
||||||
|
|
|
||||||
|
|
@ -56,7 +56,7 @@ func (c *Cluster) syncService() error {
|
||||||
}
|
}
|
||||||
|
|
||||||
desiredSvc := resources.Service(c.ClusterName(), c.Spec.TeamId, cSpec.AllowedSourceRanges)
|
desiredSvc := resources.Service(c.ClusterName(), c.Spec.TeamId, cSpec.AllowedSourceRanges)
|
||||||
if servicesEqual(c.Service, desiredSvc) {
|
if c.sameServiceWith(desiredSvc) {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
c.logger.Infof("Service '%s' needs to be updated", util.NameFromMeta(desiredSvc.ObjectMeta))
|
c.logger.Infof("Service '%s' needs to be updated", util.NameFromMeta(desiredSvc.ObjectMeta))
|
||||||
|
|
@ -100,7 +100,7 @@ func (c *Cluster) syncStatefulSet() error {
|
||||||
}
|
}
|
||||||
|
|
||||||
desiredSS := genStatefulSet(c.ClusterName(), cSpec, c.etcdHost, c.dockerImage)
|
desiredSS := genStatefulSet(c.ClusterName(), cSpec, c.etcdHost, c.dockerImage)
|
||||||
equalSS, rollUpdate := statefulsetsEqual(c.Statefulset, desiredSS)
|
equalSS, rollUpdate := c.compareStatefulSetWith(desiredSS)
|
||||||
if equalSS {
|
if equalSS {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -45,59 +45,6 @@ func normalizeUserFlags(userFlags []string) (flags []string, err error) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func statefulsetsEqual(ss1, ss2 *v1beta1.StatefulSet) (equal bool, needsRollUpdate bool) {
|
|
||||||
equal = true
|
|
||||||
needsRollUpdate = false
|
|
||||||
//TODO: improve me
|
|
||||||
if *ss1.Spec.Replicas != *ss2.Spec.Replicas {
|
|
||||||
equal = false
|
|
||||||
}
|
|
||||||
if len(ss1.Spec.Template.Spec.Containers) != len(ss1.Spec.Template.Spec.Containers) {
|
|
||||||
equal = false
|
|
||||||
needsRollUpdate = true
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if len(ss1.Spec.Template.Spec.Containers) == 0 {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
container1 := ss1.Spec.Template.Spec.Containers[0]
|
|
||||||
container2 := ss2.Spec.Template.Spec.Containers[0]
|
|
||||||
if container1.Image != container2.Image {
|
|
||||||
equal = false
|
|
||||||
needsRollUpdate = true
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if !reflect.DeepEqual(container1.Ports, container2.Ports) {
|
|
||||||
equal = false
|
|
||||||
needsRollUpdate = true
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if !reflect.DeepEqual(container1.Resources, container2.Resources) {
|
|
||||||
equal = false
|
|
||||||
needsRollUpdate = true
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if !reflect.DeepEqual(container1.Env, container2.Env) {
|
|
||||||
equal = false
|
|
||||||
needsRollUpdate = true
|
|
||||||
}
|
|
||||||
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
func servicesEqual(svc1, svc2 *v1.Service) bool {
|
|
||||||
//TODO: check of Ports
|
|
||||||
//TODO: improve me
|
|
||||||
if reflect.DeepEqual(svc1.Spec.LoadBalancerSourceRanges, svc2.Spec.LoadBalancerSourceRanges) {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
func podMatchesTemplate(pod *v1.Pod, ss *v1beta1.StatefulSet) bool {
|
func podMatchesTemplate(pod *v1.Pod, ss *v1beta1.StatefulSet) bool {
|
||||||
//TODO: improve me
|
//TODO: improve me
|
||||||
if len(pod.Spec.Containers) != 1 {
|
if len(pod.Spec.Containers) != 1 {
|
||||||
|
|
|
||||||
|
|
@ -130,7 +130,7 @@ func (c *Controller) postgresqlUpdate(prev, cur interface{}) {
|
||||||
//TODO: Do not update cluster which is currently creating
|
//TODO: Do not update cluster which is currently creating
|
||||||
|
|
||||||
if pgPrev.Metadata.ResourceVersion == pgNew.Metadata.ResourceVersion {
|
if pgPrev.Metadata.ResourceVersion == pgNew.Metadata.ResourceVersion {
|
||||||
c.logger.Debugf("Skipping update with no resource version change")
|
c.logger.Infof("Skipping update with no resource version change")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
pgCluster := c.clusters[clusterName] // current
|
pgCluster := c.clusters[clusterName] // current
|
||||||
|
|
|
||||||
|
|
@ -28,7 +28,7 @@ type Volume struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
type PostgresqlParam struct {
|
type PostgresqlParam struct {
|
||||||
Version string `json:"version"`
|
PgVersion string `json:"version"`
|
||||||
Parameters map[string]string `json:"parameters"`
|
Parameters map[string]string `json:"parameters"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -44,7 +44,7 @@ func ResourceList(resources spec.Resources) *v1.ResourceList {
|
||||||
return &resourceList
|
return &resourceList
|
||||||
}
|
}
|
||||||
|
|
||||||
func PodTemplate(cluster spec.ClusterName, resourceList *v1.ResourceList, dockerImage, pgVersion, etcdHost string) *v1.PodTemplateSpec {
|
func PodTemplate(cluster spec.ClusterName, resourceList *v1.ResourceList, pgVersion string, dockerImage, etcdHost string) *v1.PodTemplateSpec {
|
||||||
envVars := []v1.EnvVar{
|
envVars := []v1.EnvVar{
|
||||||
{
|
{
|
||||||
Name: "SCOPE",
|
Name: "SCOPE",
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue