Merge branch 'master' into feature/connection-pooler

This commit is contained in:
Dmitrii Dolgov 2020-02-17 13:11:57 +01:00
commit 4add317b48
10 changed files with 101 additions and 91 deletions

View File

@ -9,7 +9,7 @@ metadata:
app.kubernetes.io/instance: {{ .Release.Name }}
---
apiVersion: rbac.authorization.k8s.io/v1beta1
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: {{ template "postgres-operator-ui.name" . }}
@ -17,7 +17,7 @@ metadata:
app.kubernetes.io/name: {{ template "postgres-operator-ui.name" . }}
helm.sh/chart: {{ template "postgres-operator-ui.chart" . }}
app.kubernetes.io/managed-by: {{ .Release.Service }}
app.kubernetes.io/instance: {{ .Release.Name }}
app.kubernetes.io/instance: {{ .Release.Name }}
rules:
- apiGroups:
- acid.zalan.do
@ -78,4 +78,4 @@ subjects:
# note: the cluster role binding needs to be defined
# for every namespace the operator-ui service account lives in.
name: {{ template "postgres-operator-ui.name" . }}
namespace: {{ .Release.Namespace }}
namespace: {{ .Release.Namespace }}

View File

@ -1,5 +1,5 @@
{{ if .Values.rbac.create }}
apiVersion: rbac.authorization.k8s.io/v1beta1
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: {{ include "postgres-operator.serviceAccountName" . }}

View File

@ -64,6 +64,55 @@ class EndToEndTestCase(unittest.TestCase):
print('Operator log: {}'.format(k8s.get_operator_log()))
raise
@timeout_decorator.timeout(TEST_TIMEOUT_SEC)
def test_enable_load_balancer(self):
'''
Test if services are updated when enabling/disabling load balancers
'''
k8s = self.k8s
cluster_label = 'version=acid-minimal-cluster'
# enable load balancer services
pg_patch_enable_lbs = {
"spec": {
"enableMasterLoadBalancer": True,
"enableReplicaLoadBalancer": True
}
}
k8s.api.custom_objects_api.patch_namespaced_custom_object(
"acid.zalan.do", "v1", "default", "postgresqls", "acid-minimal-cluster", pg_patch_enable_lbs)
# wait for service recreation
time.sleep(60)
master_svc_type = k8s.get_service_type(cluster_label + ',spilo-role=master')
self.assertEqual(master_svc_type, 'LoadBalancer',
"Expected LoadBalancer service type for master, found {}".format(master_svc_type))
repl_svc_type = k8s.get_service_type(cluster_label + ',spilo-role=replica')
self.assertEqual(repl_svc_type, 'LoadBalancer',
"Expected LoadBalancer service type for replica, found {}".format(repl_svc_type))
# disable load balancer services again
pg_patch_disable_lbs = {
"spec": {
"enableMasterLoadBalancer": False,
"enableReplicaLoadBalancer": False
}
}
k8s.api.custom_objects_api.patch_namespaced_custom_object(
"acid.zalan.do", "v1", "default", "postgresqls", "acid-minimal-cluster", pg_patch_disable_lbs)
# wait for service recreation
time.sleep(60)
master_svc_type = k8s.get_service_type(cluster_label + ',spilo-role=master')
self.assertEqual(master_svc_type, 'ClusterIP',
"Expected ClusterIP service type for master, found {}".format(master_svc_type))
repl_svc_type = k8s.get_service_type(cluster_label + ',spilo-role=replica')
self.assertEqual(repl_svc_type, 'ClusterIP',
"Expected ClusterIP service type for replica, found {}".format(repl_svc_type))
@timeout_decorator.timeout(TEST_TIMEOUT_SEC)
def test_min_resource_limits(self):
'''
@ -395,6 +444,13 @@ class K8s:
time.sleep(self.RETRY_TIMEOUT_SEC)
def get_service_type(self, svc_labels, namespace='default'):
svc_type = ''
svcs = self.api.core_v1.list_namespaced_service(namespace, label_selector=svc_labels, limit=1).items
for svc in svcs:
svc_type = svc.spec.type
return svc_type
def check_service_annotations(self, svc_labels, annotations, namespace='default'):
svcs = self.api.core_v1.list_namespaced_service(namespace, label_selector=svc_labels, limit=1).items
for svc in svcs:

View File

@ -5,7 +5,7 @@ metadata:
namespace: default
---
apiVersion: rbac.authorization.k8s.io/v1beta1
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: zalando-postgres-operator
@ -114,6 +114,7 @@ rules:
- delete
- get
- patch
- update
# to CRUD the StatefulSet which controls the Postgres cluster instances
- apiGroups:
- apps

View File

@ -30,7 +30,7 @@ import (
"github.com/zalando/postgres-operator/pkg/util/patroni"
"github.com/zalando/postgres-operator/pkg/util/teams"
"github.com/zalando/postgres-operator/pkg/util/users"
rbacv1beta1 "k8s.io/api/rbac/v1beta1"
rbacv1 "k8s.io/api/rbac/v1"
)
var (
@ -46,7 +46,7 @@ type Config struct {
RestConfig *rest.Config
InfrastructureRoles map[string]spec.PgUser // inherited from the controller
PodServiceAccount *v1.ServiceAccount
PodServiceAccountRoleBinding *rbacv1beta1.RoleBinding
PodServiceAccountRoleBinding *rbacv1.RoleBinding
}
type ConnectionPoolObjects struct {

View File

@ -494,6 +494,11 @@ func (c *Cluster) createService(role PostgresRole) (*v1.Service, error) {
}
func (c *Cluster) updateService(role PostgresRole, newService *v1.Service) error {
var (
svc *v1.Service
err error
)
c.setProcessName("updating %v service", role)
if c.Services[role] == nil {
@ -501,70 +506,6 @@ func (c *Cluster) updateService(role PostgresRole, newService *v1.Service) error
}
serviceName := util.NameFromMeta(c.Services[role].ObjectMeta)
endpointName := util.NameFromMeta(c.Endpoints[role].ObjectMeta)
// TODO: check if it possible to change the service type with a patch in future versions of Kubernetes
if newService.Spec.Type != c.Services[role].Spec.Type {
// service type has changed, need to replace the service completely.
// we cannot use just patch the current service, since it may contain attributes incompatible with the new type.
var (
currentEndpoint *v1.Endpoints
err error
)
if role == Master {
// for the master service we need to re-create the endpoint as well. Get the up-to-date version of
// the addresses stored in it before the service is deleted (deletion of the service removes the endpoint)
currentEndpoint, err = c.KubeClient.Endpoints(c.Namespace).Get(c.endpointName(role), metav1.GetOptions{})
if err != nil {
return fmt.Errorf("could not get current cluster %s endpoints: %v", role, err)
}
}
err = c.KubeClient.Services(serviceName.Namespace).Delete(serviceName.Name, c.deleteOptions)
if err != nil {
return fmt.Errorf("could not delete service %q: %v", serviceName, err)
}
// wait until the service is truly deleted
c.logger.Debugf("waiting for service to be deleted")
err = retryutil.Retry(c.OpConfig.ResourceCheckInterval, c.OpConfig.ResourceCheckTimeout,
func() (bool, error) {
_, err2 := c.KubeClient.Services(serviceName.Namespace).Get(serviceName.Name, metav1.GetOptions{})
if err2 == nil {
return false, nil
}
if k8sutil.ResourceNotFound(err2) {
return true, nil
}
return false, err2
})
if err != nil {
return fmt.Errorf("could not delete service %q: %v", serviceName, err)
}
// make sure we clear the stored service and endpoint status if the subsequent create fails.
c.Services[role] = nil
c.Endpoints[role] = nil
if role == Master {
// create the new endpoint using the addresses obtained from the previous one
endpointSpec := c.generateEndpoint(role, currentEndpoint.Subsets)
ep, err := c.KubeClient.Endpoints(endpointSpec.Namespace).Create(endpointSpec)
if err != nil {
return fmt.Errorf("could not create endpoint %q: %v", endpointName, err)
}
c.Endpoints[role] = ep
}
svc, err := c.KubeClient.Services(serviceName.Namespace).Create(newService)
if err != nil {
return fmt.Errorf("could not create service %q: %v", serviceName, err)
}
c.Services[role] = svc
return nil
}
// update the service annotation in order to propagate ELB notation.
if len(newService.ObjectMeta.Annotations) > 0 {
@ -582,18 +523,30 @@ func (c *Cluster) updateService(role PostgresRole, newService *v1.Service) error
}
}
patchData, err := specPatch(newService.Spec)
if err != nil {
return fmt.Errorf("could not form patch for the service %q: %v", serviceName, err)
}
// now, patch the service spec, but when disabling LoadBalancers do update instead
// patch does not work because of LoadBalancerSourceRanges field (even if set to nil)
oldServiceType := c.Services[role].Spec.Type
newServiceType := newService.Spec.Type
if newServiceType == "ClusterIP" && newServiceType != oldServiceType {
newService.ResourceVersion = c.Services[role].ResourceVersion
newService.Spec.ClusterIP = c.Services[role].Spec.ClusterIP
svc, err = c.KubeClient.Services(serviceName.Namespace).Update(newService)
if err != nil {
return fmt.Errorf("could not update service %q: %v", serviceName, err)
}
} else {
patchData, err := specPatch(newService.Spec)
if err != nil {
return fmt.Errorf("could not form patch for the service %q: %v", serviceName, err)
}
// update the service spec
svc, err := c.KubeClient.Services(serviceName.Namespace).Patch(
serviceName.Name,
types.MergePatchType,
patchData, "")
if err != nil {
return fmt.Errorf("could not patch service %q: %v", serviceName, err)
svc, err = c.KubeClient.Services(serviceName.Namespace).Patch(
serviceName.Name,
types.MergePatchType,
patchData, "")
if err != nil {
return fmt.Errorf("could not patch service %q: %v", serviceName, err)
}
}
c.Services[role] = svc

View File

@ -132,7 +132,7 @@ func (c *Cluster) syncServices() error {
c.logger.Debugf("syncing %s service", role)
if err := c.syncEndpoint(role); err != nil {
return fmt.Errorf("could not sync %s endpont: %v", role, err)
return fmt.Errorf("could not sync %s endpoint: %v", role, err)
}
if err := c.syncService(role); err != nil {

View File

@ -7,7 +7,7 @@ import (
"github.com/sirupsen/logrus"
v1 "k8s.io/api/core/v1"
rbacv1beta1 "k8s.io/api/rbac/v1beta1"
rbacv1 "k8s.io/api/rbac/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes/scheme"
@ -57,7 +57,7 @@ type Controller struct {
workerLogs map[uint32]ringlog.RingLogger
PodServiceAccount *v1.ServiceAccount
PodServiceAccountRoleBinding *rbacv1beta1.RoleBinding
PodServiceAccountRoleBinding *rbacv1.RoleBinding
}
// NewController creates a new controller
@ -198,7 +198,7 @@ func (c *Controller) initRoleBinding() {
if c.opConfig.PodServiceAccountRoleBindingDefinition == "" {
c.opConfig.PodServiceAccountRoleBindingDefinition = fmt.Sprintf(`
{
"apiVersion": "rbac.authorization.k8s.io/v1beta1",
"apiVersion": "rbac.authorization.k8s.io/v1",
"kind": "RoleBinding",
"metadata": {
"name": "%s"
@ -227,7 +227,7 @@ func (c *Controller) initRoleBinding() {
case groupVersionKind.Kind != "RoleBinding":
panic(fmt.Errorf("role binding definition in the operator config map defines another type of resource: %v", groupVersionKind.Kind))
default:
c.PodServiceAccountRoleBinding = obj.(*rbacv1beta1.RoleBinding)
c.PodServiceAccountRoleBinding = obj.(*rbacv1.RoleBinding)
c.PodServiceAccountRoleBinding.Namespace = ""
c.logger.Info("successfully parsed")

View File

@ -20,7 +20,7 @@ import (
appsv1 "k8s.io/client-go/kubernetes/typed/apps/v1"
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
policyv1beta1 "k8s.io/client-go/kubernetes/typed/policy/v1beta1"
rbacv1beta1 "k8s.io/client-go/kubernetes/typed/rbac/v1beta1"
rbacv1 "k8s.io/client-go/kubernetes/typed/rbac/v1"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
@ -46,7 +46,7 @@ type KubernetesClient struct {
corev1.ServiceAccountsGetter
appsv1.StatefulSetsGetter
appsv1.DeploymentsGetter
rbacv1beta1.RoleBindingsGetter
rbacv1.RoleBindingsGetter
policyv1beta1.PodDisruptionBudgetsGetter
apiextbeta1.CustomResourceDefinitionsGetter
clientbatchv1beta1.CronJobsGetter
@ -139,7 +139,7 @@ func NewFromConfig(cfg *rest.Config) (KubernetesClient, error) {
kubeClient.DeploymentsGetter = client.AppsV1()
kubeClient.PodDisruptionBudgetsGetter = client.PolicyV1beta1()
kubeClient.RESTClient = client.CoreV1().RESTClient()
kubeClient.RoleBindingsGetter = client.RbacV1beta1()
kubeClient.RoleBindingsGetter = client.RbacV1()
kubeClient.CronJobsGetter = client.BatchV1beta1()
apiextClient, err := apiextclient.NewForConfig(cfg)

View File

@ -5,7 +5,7 @@ metadata:
namespace: default
---
apiVersion: rbac.authorization.k8s.io/v1beta1
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: postgres-operator-ui