Pod Rolling update
introduce Pod events channel; add parsing of the MaintenanceWindows section; skip deleting Etcd key on cluster delete; use external etcd host; watch for tpr/pods in the namespace of the operator pod only;
This commit is contained in:
parent
2b8956bd33
commit
ae77fa15e8
|
|
@ -57,6 +57,7 @@ func ControllerConfig() *controller.Config {
|
|||
func main() {
|
||||
log.SetOutput(os.Stdout)
|
||||
log.Printf("Spilo operator %s\n", version)
|
||||
log.Printf("MY_POD_NAMESPACE=%s\n", podNamespace)
|
||||
|
||||
sigs := make(chan os.Signal, 1)
|
||||
stop := make(chan struct{})
|
||||
|
|
|
|||
|
|
@ -1,7 +1,7 @@
|
|||
FROM alpine
|
||||
MAINTAINER Team ACID @ Zalando <team-acid@zalando.de>
|
||||
|
||||
# We need root certificates for dealing with teams api over https
|
||||
# We need root certificates to deal with teams api over https
|
||||
RUN apk --no-cache add ca-certificates
|
||||
|
||||
COPY build/* /
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
hash: 8ffa20f78f1084d76efd6765df697e41edbe154a139a22292563cfc0eb50cd51
|
||||
updated: 2017-02-16T18:54:58.087728447+01:00
|
||||
updated: 2017-02-27T16:30:41.210217992+01:00
|
||||
imports:
|
||||
- name: cloud.google.com/go
|
||||
version: 3b1ae45394a234c385be014e9a488f2bb6eef821
|
||||
|
|
@ -9,7 +9,7 @@ imports:
|
|||
- name: github.com/blang/semver
|
||||
version: 31b736133b98f26d5e078ec9eb591666edfd091f
|
||||
- name: github.com/coreos/etcd
|
||||
version: 8ba2897a21e4fc51b298ca553d251318425f93ae
|
||||
version: 714e7ec8db7f8398880197be10771fe89c480ee5
|
||||
subpackages:
|
||||
- client
|
||||
- pkg/pathutil
|
||||
|
|
@ -92,7 +92,7 @@ imports:
|
|||
- name: github.com/PuerkitoBio/urlesc
|
||||
version: 5bd2802263f21d8788851d5305584c82a5c75d7e
|
||||
- name: github.com/Sirupsen/logrus
|
||||
version: c078b1e43f58d563c74cebe63c85789e76ddb627
|
||||
version: 0208149b40d863d2c1a2f8fe5753096a9cf2cc8b
|
||||
- name: github.com/spf13/pflag
|
||||
version: 08b1a584251b5b62f458943640fc8ebd4d50aaa5
|
||||
- name: github.com/ugorji/go
|
||||
|
|
|
|||
|
|
@ -2,6 +2,7 @@ apiVersion: "zalando.org/v1"
|
|||
kind: PlatformCredentialsSet
|
||||
metadata:
|
||||
name: postgresql-operator
|
||||
namespace: acid
|
||||
spec:
|
||||
application: postgresql-operator
|
||||
tokens:
|
||||
|
|
|
|||
|
|
@ -13,4 +13,13 @@ spec:
|
|||
serviceAccountName: operator
|
||||
containers:
|
||||
- name: postgres-operator
|
||||
image: postgres-operator:0.1
|
||||
image: pierone.example.com/acid/postgres-operator:0.1
|
||||
env:
|
||||
- name: MY_POD_NAMESPACE
|
||||
valueFrom:
|
||||
fieldRef:
|
||||
fieldPath: metadata.namespace
|
||||
- name: MY_POD_NAME
|
||||
valueFrom:
|
||||
fieldRef:
|
||||
fieldPath: metadata.name
|
||||
|
|
|
|||
|
|
@ -21,7 +21,6 @@ spec:
|
|||
- 127.0.0.1/32
|
||||
|
||||
#Expert section
|
||||
pamUsersSecret: human-users
|
||||
postgresql:
|
||||
version: "9.6"
|
||||
parameters:
|
||||
|
|
|
|||
|
|
@ -3,32 +3,32 @@ package cluster
|
|||
// Postgres ThirdPartyResource object i.e. Spilo
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"reflect"
|
||||
"regexp"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"github.com/Sirupsen/logrus"
|
||||
etcdclient "github.com/coreos/etcd/client"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
"k8s.io/client-go/pkg/api"
|
||||
"k8s.io/client-go/pkg/api/v1"
|
||||
"k8s.io/client-go/pkg/labels"
|
||||
"k8s.io/client-go/pkg/apis/apps/v1beta1"
|
||||
"k8s.io/client-go/pkg/types"
|
||||
"k8s.io/client-go/rest"
|
||||
|
||||
"github.bus.zalan.do/acid/postgres-operator/pkg/spec"
|
||||
"github.bus.zalan.do/acid/postgres-operator/pkg/util"
|
||||
"github.bus.zalan.do/acid/postgres-operator/pkg/util/constants"
|
||||
"github.bus.zalan.do/acid/postgres-operator/pkg/util/retryutil"
|
||||
"github.bus.zalan.do/acid/postgres-operator/pkg/util/k8sutil"
|
||||
"github.bus.zalan.do/acid/postgres-operator/pkg/util/resources"
|
||||
"github.bus.zalan.do/acid/postgres-operator/pkg/util/teams"
|
||||
)
|
||||
|
||||
var (
|
||||
superuserName = "postgres"
|
||||
replicationUsername = "replication"
|
||||
|
||||
alphaNumericRegexp = regexp.MustCompile("^[a-zA-Z0-9]*$")
|
||||
alphaNumericRegexp = regexp.MustCompile("^[a-zA-Z][a-zA-Z0-9]*$")
|
||||
)
|
||||
|
||||
//TODO: remove struct duplication
|
||||
|
|
@ -40,351 +40,279 @@ type Config struct {
|
|||
TeamsAPIClient *teams.TeamsAPI
|
||||
}
|
||||
|
||||
type pgUser struct {
|
||||
name string
|
||||
password string
|
||||
flags []string
|
||||
type KubeResources struct {
|
||||
Services map[types.UID]*v1.Service
|
||||
Endpoints map[types.UID]*v1.Endpoints
|
||||
Secrets map[types.UID]*v1.Secret
|
||||
Statefulsets map[types.UID]*v1beta1.StatefulSet
|
||||
//Pods are treated separately
|
||||
}
|
||||
|
||||
type Cluster struct {
|
||||
config Config
|
||||
logger *logrus.Entry
|
||||
etcdHost string
|
||||
dockerImage string
|
||||
cluster *spec.Postgresql
|
||||
pgUsers map[string]pgUser
|
||||
|
||||
pgDb *sql.DB
|
||||
mu sync.Mutex
|
||||
KubeResources
|
||||
spec.Postgresql
|
||||
config Config
|
||||
logger *logrus.Entry
|
||||
etcdHost string
|
||||
dockerImage string
|
||||
pgUsers map[string]spec.PgUser
|
||||
podEvents chan spec.PodEvent
|
||||
podSubscribers map[spec.PodName]chan spec.PodEvent
|
||||
pgDb *sql.DB
|
||||
mu sync.Mutex
|
||||
}
|
||||
|
||||
func New(cfg Config, spec *spec.Postgresql) *Cluster {
|
||||
lg := logrus.WithField("pkg", "cluster").WithField("cluster-name", spec.Metadata.Name)
|
||||
func New(cfg Config, pgSpec spec.Postgresql) *Cluster {
|
||||
lg := logrus.WithField("pkg", "cluster").WithField("cluster-name", pgSpec.Metadata.Name)
|
||||
kubeResources := KubeResources{
|
||||
Services: make(map[types.UID]*v1.Service),
|
||||
Endpoints: make(map[types.UID]*v1.Endpoints),
|
||||
Secrets: make(map[types.UID]*v1.Secret),
|
||||
Statefulsets: make(map[types.UID]*v1beta1.StatefulSet),
|
||||
}
|
||||
|
||||
cluster := &Cluster{
|
||||
config: cfg,
|
||||
cluster: spec,
|
||||
logger: lg,
|
||||
etcdHost: constants.EtcdHost,
|
||||
dockerImage: constants.SpiloImage,
|
||||
config: cfg,
|
||||
Postgresql: pgSpec,
|
||||
logger: lg,
|
||||
etcdHost: constants.EtcdHost,
|
||||
dockerImage: constants.SpiloImage,
|
||||
pgUsers: make(map[string]spec.PgUser),
|
||||
podEvents: make(chan spec.PodEvent),
|
||||
podSubscribers: make(map[spec.PodName]chan spec.PodEvent),
|
||||
KubeResources: kubeResources,
|
||||
}
|
||||
cluster.init()
|
||||
|
||||
return cluster
|
||||
}
|
||||
|
||||
func (c *Cluster) getReadonlyToken() (string, error) {
|
||||
// for some reason PlatformCredentialsSet creates secrets only in the default namespace
|
||||
credentialsSecret, err := c.config.KubeClient.Secrets(v1.NamespaceDefault).Get("postgresql-operator")
|
||||
func (c *Cluster) ClusterName() spec.ClusterName {
|
||||
return spec.ClusterName{
|
||||
Name: c.Metadata.Name,
|
||||
Namespace: c.Metadata.Namespace,
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Cluster) Run(stopCh <-chan struct{}) {
|
||||
go c.podEventsDispatcher(stopCh)
|
||||
|
||||
<-stopCh
|
||||
}
|
||||
|
||||
func (c *Cluster) NeedsRollingUpdate(otherSpec *spec.Postgresql) bool {
|
||||
//TODO: add more checks
|
||||
if c.Spec.Version != otherSpec.Spec.Version {
|
||||
return true
|
||||
}
|
||||
|
||||
if !reflect.DeepEqual(c.Spec.Resources, otherSpec.Spec.Resources) {
|
||||
return true
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
func (c *Cluster) MustSetStatus(status spec.PostgresStatus) {
|
||||
b, err := json.Marshal(status)
|
||||
if err != nil {
|
||||
c.logger.Fatalf("Can't marshal status: %s", err)
|
||||
}
|
||||
request := []byte(fmt.Sprintf(`{"status": %s}`, string(b))) //TODO: Look into/wait for k8s go client methods
|
||||
|
||||
_, err = c.config.RestClient.Patch(api.MergePatchType).
|
||||
RequestURI(c.Metadata.GetSelfLink()).
|
||||
Body(request).
|
||||
DoRaw()
|
||||
|
||||
if k8sutil.ResourceNotFound(err) {
|
||||
c.logger.Warningf("Can't set status for the non-existing cluster")
|
||||
return
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("Can't get credentials secret: %s", err)
|
||||
c.logger.Fatalf("Can't set status for cluster '%s': %s", c.ClusterName(), err)
|
||||
}
|
||||
data := credentialsSecret.Data
|
||||
|
||||
if string(data["read-only-token-type"]) != "Bearer" {
|
||||
return "", fmt.Errorf("Wrong token type: %s", data["read-only-token-type"])
|
||||
}
|
||||
|
||||
return string(data["read-only-token-secret"]), nil
|
||||
|
||||
}
|
||||
|
||||
func (c *Cluster) getTeamMembers() ([]string, error) {
|
||||
token, err := c.getReadonlyToken()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Can't get oauth token: %s", err)
|
||||
}
|
||||
|
||||
c.config.TeamsAPIClient.OauthToken = token
|
||||
teamInfo, err := c.config.TeamsAPIClient.TeamInfo((*c.cluster.Spec).TeamId)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Can't get team info: %s", err)
|
||||
}
|
||||
|
||||
return teamInfo.Members, nil
|
||||
}
|
||||
|
||||
func (c *Cluster) labelsSet() labels.Set {
|
||||
return labels.Set{
|
||||
"application": "spilo",
|
||||
"spilo-cluster": (*c.cluster).Metadata.Name,
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Cluster) credentialSecretName(username string) string {
|
||||
return fmt.Sprintf(
|
||||
"%s.%s.credentials.%s.%s",
|
||||
username,
|
||||
(*c.cluster).Metadata.Name,
|
||||
constants.TPRName,
|
||||
constants.TPRVendor)
|
||||
}
|
||||
|
||||
func isValidUsername(username string) bool {
|
||||
return alphaNumericRegexp.MatchString(username)
|
||||
}
|
||||
|
||||
func normalizeUserFlags(userFlags []string) (flags []string, err error) {
|
||||
uniqueFlags := make(map[string]bool)
|
||||
|
||||
for _, flag := range userFlags {
|
||||
if !alphaNumericRegexp.MatchString(flag) {
|
||||
err = fmt.Errorf("User flag '%s' is not alphanumeric", flag)
|
||||
return
|
||||
} else {
|
||||
flag = strings.ToUpper(flag)
|
||||
if _, ok := uniqueFlags[flag]; !ok {
|
||||
uniqueFlags[flag] = true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
flags = []string{}
|
||||
for k := range uniqueFlags {
|
||||
flags = append(flags, k)
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (c *Cluster) init() {
|
||||
users := (*c.cluster.Spec).Users
|
||||
c.pgUsers = make(map[string]pgUser, len(users)+2) // + [superuser and replication]
|
||||
|
||||
c.pgUsers[superuserName] = pgUser{
|
||||
name: superuserName,
|
||||
password: util.RandomPassword(constants.PasswordLength),
|
||||
}
|
||||
|
||||
c.pgUsers[replicationUsername] = pgUser{
|
||||
name: replicationUsername,
|
||||
password: util.RandomPassword(constants.PasswordLength),
|
||||
}
|
||||
|
||||
for username, userFlags := range users {
|
||||
if !isValidUsername(username) {
|
||||
c.logger.Warningf("Invalid username: '%s'", username)
|
||||
continue
|
||||
}
|
||||
|
||||
flags, err := normalizeUserFlags(userFlags)
|
||||
if err != nil {
|
||||
c.logger.Warningf("Invalid flags for user '%s': %s", username, err)
|
||||
}
|
||||
|
||||
c.pgUsers[username] = pgUser{
|
||||
name: username,
|
||||
password: util.RandomPassword(constants.PasswordLength),
|
||||
flags: flags,
|
||||
}
|
||||
}
|
||||
|
||||
teamMembers, err := c.getTeamMembers()
|
||||
if err != nil {
|
||||
c.logger.Errorf("Can't get list of team members: %s", err)
|
||||
} else {
|
||||
for _, username := range teamMembers {
|
||||
c.pgUsers[username] = pgUser{name: username}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Cluster) waitPodDelete() error {
|
||||
ls := c.labelsSet()
|
||||
|
||||
listOptions := v1.ListOptions{
|
||||
LabelSelector: ls.String(),
|
||||
}
|
||||
return retryutil.Retry(
|
||||
constants.ResourceCheckInterval, int(constants.ResourceCheckTimeout/constants.ResourceCheckInterval),
|
||||
func() (bool, error) {
|
||||
pods, err := c.config.KubeClient.Pods((*c.cluster).Metadata.Namespace).List(listOptions)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
return len(pods.Items) == 0, nil
|
||||
})
|
||||
}
|
||||
|
||||
func (c *Cluster) waitStatefulsetReady() error {
|
||||
return retryutil.Retry(constants.ResourceCheckInterval, int(constants.ResourceCheckTimeout/constants.ResourceCheckInterval),
|
||||
func() (bool, error) {
|
||||
listOptions := v1.ListOptions{
|
||||
LabelSelector: c.labelsSet().String(),
|
||||
}
|
||||
ss, err := c.config.KubeClient.StatefulSets((*c.cluster).Metadata.Namespace).List(listOptions)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
if len(ss.Items) != 1 {
|
||||
return false, fmt.Errorf("StatefulSet is not found")
|
||||
}
|
||||
|
||||
return *ss.Items[0].Spec.Replicas == ss.Items[0].Status.Replicas, nil
|
||||
})
|
||||
}
|
||||
|
||||
func (c *Cluster) waitPodLabelsReady() error {
|
||||
ls := c.labelsSet()
|
||||
namespace := (*c.cluster).Metadata.Namespace
|
||||
|
||||
listOptions := v1.ListOptions{
|
||||
LabelSelector: ls.String(),
|
||||
}
|
||||
masterListOption := v1.ListOptions{
|
||||
LabelSelector: labels.Merge(ls, labels.Set{"spilo-role": "master"}).String(),
|
||||
}
|
||||
replicaListOption := v1.ListOptions{
|
||||
LabelSelector: labels.Merge(ls, labels.Set{"spilo-role": "replica"}).String(),
|
||||
}
|
||||
pods, err := c.config.KubeClient.Pods(namespace).List(listOptions)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
podsNumber := len(pods.Items)
|
||||
|
||||
return retryutil.Retry(
|
||||
constants.ResourceCheckInterval, int(constants.ResourceCheckTimeout/constants.ResourceCheckInterval),
|
||||
func() (bool, error) {
|
||||
masterPods, err := c.config.KubeClient.Pods(namespace).List(masterListOption)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
replicaPods, err := c.config.KubeClient.Pods(namespace).List(replicaListOption)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
if len(masterPods.Items) > 1 {
|
||||
return false, fmt.Errorf("Too many masters")
|
||||
}
|
||||
|
||||
return len(masterPods.Items)+len(replicaPods.Items) == podsNumber, nil
|
||||
})
|
||||
}
|
||||
|
||||
func (c *Cluster) Create() error {
|
||||
c.createEndpoint()
|
||||
c.createService()
|
||||
c.applySecrets()
|
||||
c.createStatefulSet()
|
||||
//TODO: service will create endpoint implicitly
|
||||
ep, err := c.createEndpoint()
|
||||
if err != nil {
|
||||
return fmt.Errorf("Can't create endpoint: %s", err)
|
||||
}
|
||||
c.logger.Infof("Endpoint '%s' has been successfully created", util.NameFromMeta(ep.ObjectMeta))
|
||||
|
||||
service, err := c.createService()
|
||||
if err != nil {
|
||||
return fmt.Errorf("Can't create service: %s", err)
|
||||
} else {
|
||||
c.logger.Infof("Service '%s' has been successfully created", util.NameFromMeta(service.ObjectMeta))
|
||||
}
|
||||
|
||||
c.initSystemUsers()
|
||||
err = c.initRobotUsers()
|
||||
if err != nil {
|
||||
return fmt.Errorf("Can't init robot users: %s", err)
|
||||
}
|
||||
|
||||
err = c.initHumanUsers()
|
||||
if err != nil {
|
||||
return fmt.Errorf("Can't init human users: %s", err)
|
||||
}
|
||||
|
||||
err = c.applySecrets()
|
||||
if err != nil {
|
||||
return fmt.Errorf("Can't create secrets: %s", err)
|
||||
} else {
|
||||
c.logger.Infof("Secrets have been successfully created")
|
||||
}
|
||||
|
||||
ss, err := c.createStatefulSet()
|
||||
if err != nil {
|
||||
return fmt.Errorf("Can't create StatefulSet: %s", err)
|
||||
} else {
|
||||
c.logger.Infof("StatefulSet '%s' has been successfully created", util.NameFromMeta(ss.ObjectMeta))
|
||||
}
|
||||
|
||||
c.logger.Info("Waiting for cluster being ready")
|
||||
err := c.waitClusterReady()
|
||||
err = c.waitClusterReady()
|
||||
if err != nil {
|
||||
c.logger.Errorf("Failed to create cluster: %s", err)
|
||||
return err
|
||||
}
|
||||
c.logger.Info("Cluster is ready")
|
||||
|
||||
err = c.initDbConn()
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to init db connection: %s", err)
|
||||
return fmt.Errorf("Can't init db connection: %s", err)
|
||||
}
|
||||
|
||||
c.createUsers()
|
||||
err = c.createUsers()
|
||||
if err != nil {
|
||||
return fmt.Errorf("Can't create users: %s", err)
|
||||
} else {
|
||||
c.logger.Infof("Users have been successfully created")
|
||||
}
|
||||
|
||||
c.ListResources()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Cluster) waitClusterReady() error {
|
||||
// TODO: wait for the first Pod only
|
||||
err := c.waitStatefulsetReady()
|
||||
func (c *Cluster) Update(newSpec *spec.Postgresql, rollingUpdate bool) error {
|
||||
nSpec := newSpec.Spec
|
||||
clusterName := c.ClusterName()
|
||||
resourceList := resources.ResourceList(nSpec.Resources)
|
||||
template := resources.PodTemplate(clusterName, resourceList, c.dockerImage, nSpec.Version, c.etcdHost)
|
||||
statefulSet := resources.StatefulSet(clusterName, template, nSpec.NumberOfInstances)
|
||||
|
||||
//TODO: mind the case of updating allowedSourceRanges
|
||||
err := c.updateStatefulSet(statefulSet)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Statuful set error: %s", err)
|
||||
return fmt.Errorf("Can't upate cluster: %s", err)
|
||||
}
|
||||
|
||||
// TODO: wait only for master
|
||||
err = c.waitPodLabelsReady()
|
||||
if err != nil {
|
||||
return fmt.Errorf("Pod labels error: %s", err)
|
||||
if rollingUpdate {
|
||||
err = c.recreatePods()
|
||||
// TODO: wait for actual streaming to the replica
|
||||
if err != nil {
|
||||
return fmt.Errorf("Can't recreate pods: %s", err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Cluster) Delete() error {
|
||||
clusterName := (*c.cluster).Metadata.Name
|
||||
namespace := (*c.cluster).Metadata.Namespace
|
||||
orphanDependents := false
|
||||
deleteOptions := &v1.DeleteOptions{
|
||||
OrphanDependents: &orphanDependents,
|
||||
}
|
||||
|
||||
listOptions := v1.ListOptions{
|
||||
LabelSelector: c.labelsSet().String(),
|
||||
}
|
||||
|
||||
kubeClient := c.config.KubeClient
|
||||
|
||||
podList, err := kubeClient.Pods(namespace).List(listOptions)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Can't get list of pods: %s", err)
|
||||
}
|
||||
|
||||
err = kubeClient.StatefulSets(namespace).Delete(clusterName, deleteOptions)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Can't delete statefulset: %s", err)
|
||||
}
|
||||
c.logger.Infof("Statefulset '%s' has been deleted", util.FullObjectName(namespace, clusterName))
|
||||
|
||||
for _, pod := range podList.Items {
|
||||
err = kubeClient.Pods(namespace).Delete(pod.Name, deleteOptions)
|
||||
for _, obj := range c.Statefulsets {
|
||||
err := c.deleteStatefulSet(obj)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Error while deleting pod '%s': %s", util.FullObjectName(pod.Namespace, pod.Name), err)
|
||||
c.logger.Errorf("Can't delete StatefulSet: %s", err)
|
||||
} else {
|
||||
c.logger.Infof("StatefulSet '%s' has been deleted", util.NameFromMeta(obj.ObjectMeta))
|
||||
}
|
||||
|
||||
c.logger.Infof("Pod '%s' has been deleted", util.FullObjectName(pod.Namespace, pod.Name))
|
||||
}
|
||||
|
||||
serviceList, err := kubeClient.Services(namespace).List(listOptions)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Can't get list of the services: %s", err)
|
||||
}
|
||||
|
||||
for _, service := range serviceList.Items {
|
||||
err = kubeClient.Services(namespace).Delete(service.Name, deleteOptions)
|
||||
for _, obj := range c.Secrets {
|
||||
err := c.deleteSecret(obj)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Can't delete service '%s': %s", util.FullObjectName(service.Namespace, service.Name), err)
|
||||
c.logger.Errorf("Can't delete secret: %s", err)
|
||||
} else {
|
||||
c.logger.Infof("Secret '%s' has been deleted", util.NameFromMeta(obj.ObjectMeta))
|
||||
}
|
||||
|
||||
c.logger.Infof("Service '%s' has been deleted", util.FullObjectName(service.Namespace, service.Name))
|
||||
}
|
||||
|
||||
secretsList, err := kubeClient.Secrets(namespace).List(listOptions)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, secret := range secretsList.Items {
|
||||
err = kubeClient.Secrets(namespace).Delete(secret.Name, deleteOptions)
|
||||
for _, obj := range c.Endpoints {
|
||||
err := c.deleteEndpoint(obj)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Can't delete secret '%s': %s", util.FullObjectName(secret.Namespace, secret.Name), err)
|
||||
c.logger.Errorf("Can't delete endpoint: %s", err)
|
||||
} else {
|
||||
c.logger.Infof("Endpoint '%s' has been deleted", util.NameFromMeta(obj.ObjectMeta))
|
||||
}
|
||||
|
||||
c.logger.Infof("Secret '%s' has been deleted", util.FullObjectName(secret.Namespace, secret.Name))
|
||||
}
|
||||
|
||||
c.waitPodDelete()
|
||||
|
||||
etcdKey := fmt.Sprintf("/service/%s", clusterName)
|
||||
|
||||
resp, err := c.config.EtcdClient.Delete(context.Background(),
|
||||
etcdKey,
|
||||
&etcdclient.DeleteOptions{Recursive: true})
|
||||
for _, obj := range c.Services {
|
||||
err := c.deleteService(obj)
|
||||
if err != nil {
|
||||
c.logger.Errorf("Can't delete service: %s", err)
|
||||
} else {
|
||||
c.logger.Infof("Service '%s' has been deleted", util.NameFromMeta(obj.ObjectMeta))
|
||||
}
|
||||
}
|
||||
|
||||
err := c.deletePods()
|
||||
if err != nil {
|
||||
return fmt.Errorf("Can't delete etcd key: %s", err)
|
||||
return fmt.Errorf("Can't delete pods: %s", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Cluster) ReceivePodEvent(event spec.PodEvent) {
|
||||
c.podEvents <- event
|
||||
}
|
||||
|
||||
func (c *Cluster) initSystemUsers() {
|
||||
c.pgUsers[constants.SuperuserName] = spec.PgUser{
|
||||
Name: constants.SuperuserName,
|
||||
Password: util.RandomPassword(constants.PasswordLength),
|
||||
}
|
||||
|
||||
c.pgUsers[constants.ReplicationUsername] = spec.PgUser{
|
||||
Name: constants.ReplicationUsername,
|
||||
Password: util.RandomPassword(constants.PasswordLength),
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Cluster) initRobotUsers() error {
|
||||
for username, userFlags := range c.Spec.Users {
|
||||
if !isValidUsername(username) {
|
||||
return fmt.Errorf("Invalid username: '%s'", username)
|
||||
}
|
||||
|
||||
flags, err := normalizeUserFlags(userFlags)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Invalid flags for user '%s': %s", username, err)
|
||||
}
|
||||
|
||||
c.pgUsers[username] = spec.PgUser{
|
||||
Name: username,
|
||||
Password: util.RandomPassword(constants.PasswordLength),
|
||||
Flags: flags,
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Cluster) initHumanUsers() error {
|
||||
teamMembers, err := c.getTeamMembers()
|
||||
if err != nil {
|
||||
return fmt.Errorf("Can't get list of team members: %s", err)
|
||||
} else {
|
||||
for _, username := range teamMembers {
|
||||
c.pgUsers[username] = spec.PgUser{Name: username}
|
||||
}
|
||||
}
|
||||
|
||||
if resp == nil {
|
||||
c.logger.Warningf("No response from etcd cluster")
|
||||
}
|
||||
|
||||
c.logger.Infof("Etcd key '%s' has been deleted", etcdKey)
|
||||
|
||||
//TODO: Ensure objects are deleted
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -3,18 +3,23 @@ package cluster
|
|||
import (
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
_ "github.com/lib/pq"
|
||||
"strings"
|
||||
|
||||
"github.bus.zalan.do/acid/postgres-operator/pkg/spec"
|
||||
"github.bus.zalan.do/acid/postgres-operator/pkg/util/constants"
|
||||
)
|
||||
|
||||
var createUserSQL = `SET LOCAL synchronous_commit = 'local'; CREATE ROLE "%s" %s PASSWORD %s;`
|
||||
|
||||
func (c *Cluster) pgConnectionString() string {
|
||||
hostname := fmt.Sprintf("%s.%s.svc.cluster.local", (*c.cluster).Metadata.Name, (*c.cluster).Metadata.Namespace)
|
||||
password := c.pgUsers[superuserName].password
|
||||
hostname := fmt.Sprintf("%s.%s.svc.cluster.local", c.Metadata.Name, c.Metadata.Namespace)
|
||||
password := c.pgUsers[constants.SuperuserName].Password
|
||||
|
||||
return fmt.Sprintf("host='%s' dbname=postgres sslmode=require user='%s' password='%s'",
|
||||
hostname,
|
||||
superuserName,
|
||||
constants.SuperuserName,
|
||||
strings.Replace(password, "$", "\\$", -1))
|
||||
}
|
||||
|
||||
|
|
@ -39,3 +44,40 @@ func (c *Cluster) initDbConn() error {
|
|||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Cluster) createPgUser(user spec.PgUser) (isHuman bool, err error) {
|
||||
var flags []string = user.Flags
|
||||
|
||||
if user.Password == "" {
|
||||
isHuman = true
|
||||
flags = append(flags, fmt.Sprintf("IN ROLE \"%s\"", constants.PamRoleName))
|
||||
} else {
|
||||
isHuman = false
|
||||
}
|
||||
|
||||
addLoginFlag := true
|
||||
for _, v := range flags {
|
||||
if v == "NOLOGIN" {
|
||||
addLoginFlag = false
|
||||
break
|
||||
}
|
||||
}
|
||||
if addLoginFlag {
|
||||
flags = append(flags, "LOGIN")
|
||||
}
|
||||
|
||||
userFlags := strings.Join(flags, " ")
|
||||
userPassword := fmt.Sprintf("'%s'", user.Password)
|
||||
if user.Password == "" {
|
||||
userPassword = "NULL"
|
||||
}
|
||||
query := fmt.Sprintf(createUserSQL, user.Name, userFlags, userPassword)
|
||||
|
||||
_, err = c.pgDb.Query(query) // TODO: Try several times
|
||||
if err != nil {
|
||||
err = fmt.Errorf("DB error: %s", err)
|
||||
return
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,169 @@
|
|||
package cluster
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"k8s.io/client-go/pkg/api/v1"
|
||||
|
||||
"github.bus.zalan.do/acid/postgres-operator/pkg/spec"
|
||||
"github.bus.zalan.do/acid/postgres-operator/pkg/util"
|
||||
)
|
||||
|
||||
func (c *Cluster) clusterPods() ([]v1.Pod, error) {
|
||||
ns := c.Metadata.Namespace
|
||||
listOptions := v1.ListOptions{
|
||||
LabelSelector: c.labelsSet().String(),
|
||||
}
|
||||
|
||||
pods, err := c.config.KubeClient.Pods(ns).List(listOptions)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Can't get list of pods: %s", err)
|
||||
}
|
||||
|
||||
return pods.Items, nil
|
||||
}
|
||||
|
||||
func (c *Cluster) deletePods() error {
|
||||
pods, err := c.clusterPods()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, obj := range pods {
|
||||
err := c.deletePod(&obj)
|
||||
if err != nil {
|
||||
c.logger.Errorf("Can't delete pod: %s", err)
|
||||
} else {
|
||||
c.logger.Infof("Pod '%s' has been deleted", util.NameFromMeta(obj.ObjectMeta))
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Cluster) deletePod(pod *v1.Pod) error {
|
||||
podName := spec.PodName{
|
||||
Namespace: pod.Namespace,
|
||||
Name: pod.Name,
|
||||
}
|
||||
|
||||
ch := make(chan spec.PodEvent)
|
||||
if _, ok := c.podSubscribers[podName]; ok {
|
||||
panic("Pod '" + podName.String() + "' is already subscribed")
|
||||
}
|
||||
c.podSubscribers[podName] = ch
|
||||
defer func() {
|
||||
close(ch)
|
||||
delete(c.podSubscribers, podName)
|
||||
}()
|
||||
|
||||
err := c.config.KubeClient.Pods(pod.Namespace).Delete(pod.Name, deleteOptions)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = c.waitForPodDeletion(ch)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Cluster) recreatePod(pod v1.Pod, spiloRole string) error {
|
||||
podName := spec.PodName{
|
||||
Namespace: pod.Namespace,
|
||||
Name: pod.Name,
|
||||
}
|
||||
|
||||
orphanDependents := false
|
||||
deleteOptions := &v1.DeleteOptions{
|
||||
OrphanDependents: &orphanDependents,
|
||||
}
|
||||
|
||||
ch := make(chan spec.PodEvent)
|
||||
if _, ok := c.podSubscribers[podName]; ok {
|
||||
panic("Pod '" + podName.String() + "' is already subscribed")
|
||||
}
|
||||
c.podSubscribers[podName] = ch
|
||||
defer func() {
|
||||
close(ch)
|
||||
delete(c.podSubscribers, podName)
|
||||
}()
|
||||
|
||||
err := c.config.KubeClient.Pods(pod.Namespace).Delete(pod.Name, deleteOptions)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Can't delete pod: %s", err)
|
||||
}
|
||||
err = c.waitForPodDeletion(ch)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = c.waitForPodLabel(ch, spiloRole)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
c.logger.Infof("Pod '%s' is ready", podName)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Cluster) podEventsDispatcher(stopCh <-chan struct{}) {
|
||||
c.logger.Infof("Watching '%s' cluster", c.ClusterName())
|
||||
for {
|
||||
select {
|
||||
case event := <-c.podEvents:
|
||||
if subscriber, ok := c.podSubscribers[event.PodName]; ok {
|
||||
c.logger.Debugf("New event for '%s' pod", event.PodName)
|
||||
go func() { subscriber <- event }() //TODO: is it a right way to do nonblocking send to the channel?
|
||||
} else {
|
||||
c.logger.Debugf("Skipping event for an unwatched pod '%s'", event.PodName)
|
||||
}
|
||||
case <-stopCh:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Cluster) recreatePods() error {
|
||||
ls := c.labelsSet()
|
||||
namespace := c.Metadata.Namespace
|
||||
|
||||
listOptions := v1.ListOptions{
|
||||
LabelSelector: ls.String(),
|
||||
}
|
||||
pods, err := c.config.KubeClient.Pods(namespace).List(listOptions)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Can't get list of pods: %s", err)
|
||||
} else {
|
||||
c.logger.Infof("There are %d pods in the cluster to recreate", len(pods.Items))
|
||||
}
|
||||
|
||||
var masterPod v1.Pod
|
||||
for _, pod := range pods.Items {
|
||||
role, ok := pod.Labels["spilo-role"]
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
if role == "master" {
|
||||
masterPod = pod
|
||||
continue
|
||||
}
|
||||
|
||||
err = c.recreatePod(pod, "replica")
|
||||
if err != nil {
|
||||
return fmt.Errorf("Can't recreate replica pod '%s': %s", util.NameFromMeta(pod.ObjectMeta), err)
|
||||
}
|
||||
}
|
||||
|
||||
//TODO: do manual failover
|
||||
//TODO: specify master, leave new master empty
|
||||
c.logger.Infof("Recreating master pod '%s'", util.NameFromMeta(masterPod.ObjectMeta))
|
||||
err = c.recreatePod(masterPod, "replica")
|
||||
if err != nil {
|
||||
return fmt.Errorf("Can't recreate master pod '%s': %s", util.NameFromMeta(masterPod.ObjectMeta), err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
@ -2,330 +2,258 @@ package cluster
|
|||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"k8s.io/client-go/pkg/api/resource"
|
||||
"k8s.io/client-go/pkg/api/v1"
|
||||
"k8s.io/client-go/pkg/apis/apps/v1beta1"
|
||||
"k8s.io/client-go/pkg/util/intstr"
|
||||
|
||||
"github.bus.zalan.do/acid/postgres-operator/pkg/util"
|
||||
"github.bus.zalan.do/acid/postgres-operator/pkg/util/constants"
|
||||
"github.bus.zalan.do/acid/postgres-operator/pkg/util/k8sutil"
|
||||
"github.bus.zalan.do/acid/postgres-operator/pkg/util/resources"
|
||||
)
|
||||
|
||||
var createUserSQL = `DO $$
|
||||
BEGIN
|
||||
SET LOCAL synchronous_commit = 'local';
|
||||
CREATE ROLE "%s" %s PASSWORD %s;
|
||||
END;
|
||||
$$`
|
||||
|
||||
func (c *Cluster) createStatefulSet() {
|
||||
meta := (*c.cluster).Metadata
|
||||
|
||||
envVars := []v1.EnvVar{
|
||||
{
|
||||
Name: "SCOPE",
|
||||
Value: meta.Name,
|
||||
},
|
||||
{
|
||||
Name: "PGROOT",
|
||||
Value: "/home/postgres/pgdata/pgroot",
|
||||
},
|
||||
{
|
||||
Name: "ETCD_HOST",
|
||||
Value: c.etcdHost,
|
||||
},
|
||||
{
|
||||
Name: "POD_IP",
|
||||
ValueFrom: &v1.EnvVarSource{
|
||||
FieldRef: &v1.ObjectFieldSelector{
|
||||
APIVersion: "v1",
|
||||
FieldPath: "status.podIP",
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: "POD_NAMESPACE",
|
||||
ValueFrom: &v1.EnvVarSource{
|
||||
FieldRef: &v1.ObjectFieldSelector{
|
||||
APIVersion: "v1",
|
||||
FieldPath: "metadata.namespace",
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: "PGPASSWORD_SUPERUSER",
|
||||
ValueFrom: &v1.EnvVarSource{
|
||||
SecretKeyRef: &v1.SecretKeySelector{
|
||||
LocalObjectReference: v1.LocalObjectReference{
|
||||
Name: c.credentialSecretName(superuserName),
|
||||
},
|
||||
Key: "password",
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: "PGPASSWORD_STANDBY",
|
||||
ValueFrom: &v1.EnvVarSource{
|
||||
SecretKeyRef: &v1.SecretKeySelector{
|
||||
LocalObjectReference: v1.LocalObjectReference{
|
||||
Name: c.credentialSecretName(replicationUsername),
|
||||
},
|
||||
Key: "password",
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: "PAM_OAUTH2", //TODO: get from the operator tpr spec
|
||||
Value: constants.PamConfiguration, //space before uid is obligatory
|
||||
},
|
||||
{
|
||||
Name: "SPILO_CONFIGURATION", //TODO: get from the operator tpr spec
|
||||
Value: fmt.Sprintf(`
|
||||
postgresql:
|
||||
bin_dir: /usr/lib/postgresql/%s/bin
|
||||
bootstrap:
|
||||
initdb:
|
||||
- auth-host: md5
|
||||
- auth-local: trust
|
||||
users:
|
||||
%s:
|
||||
password: NULL
|
||||
options:
|
||||
- createdb
|
||||
- nologin
|
||||
pg_hba:
|
||||
- hostnossl all all all reject
|
||||
- hostssl all +%s all pam
|
||||
- hostssl all all all md5`, (*c.cluster.Spec).Version, constants.PamRoleName, constants.PamRoleName),
|
||||
},
|
||||
}
|
||||
|
||||
resourceList := v1.ResourceList{}
|
||||
|
||||
if cpu := (*c.cluster).Spec.Resources.Cpu; cpu != "" {
|
||||
resourceList[v1.ResourceCPU] = resource.MustParse(cpu)
|
||||
}
|
||||
|
||||
if memory := (*c.cluster).Spec.Resources.Memory; memory != "" {
|
||||
resourceList[v1.ResourceMemory] = resource.MustParse(memory)
|
||||
}
|
||||
|
||||
container := v1.Container{
|
||||
Name: meta.Name,
|
||||
Image: c.dockerImage,
|
||||
ImagePullPolicy: v1.PullAlways,
|
||||
Resources: v1.ResourceRequirements{
|
||||
Requests: resourceList,
|
||||
},
|
||||
Ports: []v1.ContainerPort{
|
||||
{
|
||||
ContainerPort: 8008,
|
||||
Protocol: v1.ProtocolTCP,
|
||||
},
|
||||
{
|
||||
ContainerPort: 5432,
|
||||
Protocol: v1.ProtocolTCP,
|
||||
},
|
||||
},
|
||||
VolumeMounts: []v1.VolumeMount{
|
||||
{
|
||||
Name: "pgdata",
|
||||
MountPath: "/home/postgres/pgdata", //TODO: fetch from manifesto
|
||||
},
|
||||
},
|
||||
Env: envVars,
|
||||
}
|
||||
|
||||
terminateGracePeriodSeconds := int64(30)
|
||||
|
||||
podSpec := v1.PodSpec{
|
||||
TerminationGracePeriodSeconds: &terminateGracePeriodSeconds,
|
||||
Volumes: []v1.Volume{
|
||||
{
|
||||
Name: "pgdata",
|
||||
VolumeSource: v1.VolumeSource{EmptyDir: &v1.EmptyDirVolumeSource{}},
|
||||
},
|
||||
},
|
||||
Containers: []v1.Container{container},
|
||||
}
|
||||
|
||||
template := v1.PodTemplateSpec{
|
||||
ObjectMeta: v1.ObjectMeta{
|
||||
Labels: c.labelsSet(),
|
||||
Namespace: meta.Namespace,
|
||||
Annotations: map[string]string{"pod.alpha.kubernetes.io/initialized": "true"},
|
||||
},
|
||||
Spec: podSpec,
|
||||
}
|
||||
|
||||
statefulSet := &v1beta1.StatefulSet{
|
||||
ObjectMeta: v1.ObjectMeta{
|
||||
Name: meta.Name,
|
||||
Namespace: meta.Namespace,
|
||||
Labels: c.labelsSet(),
|
||||
},
|
||||
Spec: v1beta1.StatefulSetSpec{
|
||||
Replicas: &c.cluster.Spec.NumberOfInstances,
|
||||
ServiceName: meta.Name,
|
||||
Template: template,
|
||||
},
|
||||
}
|
||||
|
||||
_, err := c.config.KubeClient.StatefulSets(meta.Namespace).Create(statefulSet)
|
||||
if err != nil {
|
||||
c.logger.Errorf("Can't create statefulset: %s", err)
|
||||
} else {
|
||||
c.logger.Infof("Statefulset has been created: '%s'", util.FullObjectNameFromMeta(statefulSet.ObjectMeta))
|
||||
}
|
||||
var orphanDependents = false
|
||||
var deleteOptions = &v1.DeleteOptions{
|
||||
OrphanDependents: &orphanDependents,
|
||||
}
|
||||
|
||||
func (c *Cluster) applySecrets() {
|
||||
var err error
|
||||
namespace := (*c.cluster).Metadata.Namespace
|
||||
for username, pgUser := range c.pgUsers {
|
||||
//Skip users with no password i.e. human users (they'll be authenticated using pam)
|
||||
if pgUser.password == "" {
|
||||
func (c *Cluster) LoadResources() error {
|
||||
ns := c.Metadata.Namespace
|
||||
listOptions := v1.ListOptions{
|
||||
LabelSelector: c.labelsSet().String(),
|
||||
}
|
||||
|
||||
services, err := c.config.KubeClient.Services(ns).List(listOptions)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Can't get list of services: %s", err)
|
||||
}
|
||||
for i, service := range services.Items {
|
||||
if _, ok := c.Services[service.UID]; ok {
|
||||
continue
|
||||
}
|
||||
secret := v1.Secret{
|
||||
ObjectMeta: v1.ObjectMeta{
|
||||
Name: c.credentialSecretName(username),
|
||||
Namespace: namespace,
|
||||
Labels: c.labelsSet(),
|
||||
},
|
||||
Type: v1.SecretTypeOpaque,
|
||||
Data: map[string][]byte{
|
||||
"username": []byte(pgUser.name),
|
||||
"password": []byte(pgUser.password),
|
||||
},
|
||||
}
|
||||
_, err = c.config.KubeClient.Secrets(namespace).Create(&secret)
|
||||
if k8sutil.IsKubernetesResourceAlreadyExistError(err) {
|
||||
c.logger.Infof("Skipping update of '%s'", secret.Name)
|
||||
|
||||
curSecrets, err := c.config.KubeClient.Secrets(namespace).Get(c.credentialSecretName(username))
|
||||
if err != nil {
|
||||
c.logger.Errorf("Can't get current secret: %s", err)
|
||||
}
|
||||
user := pgUser
|
||||
user.password = string(curSecrets.Data["password"])
|
||||
c.pgUsers[username] = user
|
||||
c.logger.Infof("Password fetched for user '%s' from the secrets", username)
|
||||
|
||||
continue
|
||||
} else {
|
||||
if err != nil {
|
||||
c.logger.Errorf("Error while creating secret: %s", err)
|
||||
} else {
|
||||
c.logger.Infof("Secret created: '%s'", util.FullObjectNameFromMeta(secret.ObjectMeta))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Cluster) createService() {
|
||||
meta := (*c.cluster).Metadata
|
||||
|
||||
_, err := c.config.KubeClient.Services(meta.Namespace).Get(meta.Name)
|
||||
if !k8sutil.ResourceNotFound(err) {
|
||||
c.logger.Infof("Service '%s' already exists", meta.Name)
|
||||
return
|
||||
c.Services[service.UID] = &services.Items[i]
|
||||
}
|
||||
|
||||
service := v1.Service{
|
||||
ObjectMeta: v1.ObjectMeta{
|
||||
Name: meta.Name,
|
||||
Namespace: meta.Namespace,
|
||||
Labels: c.labelsSet(),
|
||||
},
|
||||
Spec: v1.ServiceSpec{
|
||||
Type: v1.ServiceTypeLoadBalancer,
|
||||
Ports: []v1.ServicePort{{Port: 5432, TargetPort: intstr.IntOrString{IntVal: 5432}}},
|
||||
LoadBalancerSourceRanges: (*c.cluster).Spec.AllowedSourceRanges,
|
||||
},
|
||||
}
|
||||
|
||||
_, err = c.config.KubeClient.Services(meta.Namespace).Create(&service)
|
||||
endpoints, err := c.config.KubeClient.Endpoints(ns).List(listOptions)
|
||||
if err != nil {
|
||||
c.logger.Errorf("Error while creating service: %+v", err)
|
||||
} else {
|
||||
c.logger.Infof("Service created: '%s'", util.FullObjectNameFromMeta(service.ObjectMeta))
|
||||
return fmt.Errorf("Can't get list of endpoints: %s", err)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Cluster) createEndpoint() {
|
||||
meta := (*c.cluster).Metadata
|
||||
|
||||
_, err := c.config.KubeClient.Endpoints(meta.Namespace).Get(meta.Name)
|
||||
if !k8sutil.ResourceNotFound(err) {
|
||||
c.logger.Infof("Endpoint '%s' already exists", meta.Name)
|
||||
return
|
||||
}
|
||||
|
||||
endpoint := v1.Endpoints{
|
||||
ObjectMeta: v1.ObjectMeta{
|
||||
Name: meta.Name,
|
||||
Namespace: meta.Namespace,
|
||||
Labels: c.labelsSet(),
|
||||
},
|
||||
}
|
||||
|
||||
_, err = c.config.KubeClient.Endpoints(meta.Namespace).Create(&endpoint)
|
||||
if err != nil {
|
||||
c.logger.Errorf("Error while creating endpoint: %+v", err)
|
||||
} else {
|
||||
c.logger.Infof("Endpoint created: %s", endpoint.Name)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Cluster) createUser(user pgUser) {
|
||||
var userType string
|
||||
var flags []string = user.flags
|
||||
|
||||
if user.password == "" {
|
||||
userType = "human"
|
||||
flags = append(flags, fmt.Sprintf("IN ROLE \"%s\"", constants.PamRoleName))
|
||||
} else {
|
||||
userType = "app"
|
||||
}
|
||||
|
||||
addLoginFlag := true
|
||||
for _, v := range flags {
|
||||
if v == "NOLOGIN" {
|
||||
addLoginFlag = false
|
||||
break
|
||||
}
|
||||
}
|
||||
if addLoginFlag {
|
||||
flags = append(flags, "LOGIN")
|
||||
}
|
||||
|
||||
userFlags := strings.Join(flags, " ")
|
||||
userPassword := fmt.Sprintf("'%s'", user.password)
|
||||
if user.password == "" {
|
||||
userPassword = "NULL"
|
||||
}
|
||||
query := fmt.Sprintf(createUserSQL, user.name, userFlags, userPassword)
|
||||
|
||||
_, err := c.pgDb.Query(query)
|
||||
if err != nil {
|
||||
c.logger.Errorf("Can't create %s user '%s': %s", user.name, err)
|
||||
} else {
|
||||
c.logger.Infof("Created %s user '%s' with %s flags", userType, user.name, flags)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Cluster) createUsers() error {
|
||||
for username, user := range c.pgUsers {
|
||||
if username == superuserName || username == replicationUsername {
|
||||
for i, endpoint := range endpoints.Items {
|
||||
if _, ok := c.Endpoints[endpoint.UID]; ok {
|
||||
continue
|
||||
}
|
||||
c.Endpoints[endpoint.UID] = &endpoints.Items[i]
|
||||
c.logger.Debugf("Endpoint loaded, uid: %s", endpoint.UID)
|
||||
}
|
||||
|
||||
c.createUser(user)
|
||||
secrets, err := c.config.KubeClient.Secrets(ns).List(listOptions)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Can't get list of secrets: %s", err)
|
||||
}
|
||||
for i, secret := range secrets.Items {
|
||||
if _, ok := c.Secrets[secret.UID]; ok {
|
||||
continue
|
||||
}
|
||||
c.Secrets[secret.UID] = &secrets.Items[i]
|
||||
c.logger.Debugf("Secret loaded, uid: %s", secret.UID)
|
||||
}
|
||||
|
||||
statefulSets, err := c.config.KubeClient.StatefulSets(ns).List(listOptions)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Can't get list of stateful sets: %s", err)
|
||||
}
|
||||
for i, statefulSet := range statefulSets.Items {
|
||||
if _, ok := c.Statefulsets[statefulSet.UID]; ok {
|
||||
continue
|
||||
}
|
||||
c.Statefulsets[statefulSet.UID] = &statefulSets.Items[i]
|
||||
c.logger.Debugf("StatefulSet loaded, uid: %s", statefulSet.UID)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Cluster) ListResources() error {
|
||||
for _, obj := range c.Statefulsets {
|
||||
c.logger.Infof("StatefulSet: %s", util.NameFromMeta(obj.ObjectMeta))
|
||||
}
|
||||
|
||||
for _, obj := range c.Secrets {
|
||||
c.logger.Infof("Secret: %s", util.NameFromMeta(obj.ObjectMeta))
|
||||
}
|
||||
|
||||
for _, obj := range c.Endpoints {
|
||||
c.logger.Infof("Endpoint: %s", util.NameFromMeta(obj.ObjectMeta))
|
||||
}
|
||||
|
||||
for _, obj := range c.Services {
|
||||
c.logger.Infof("Service: %s", util.NameFromMeta(obj.ObjectMeta))
|
||||
}
|
||||
|
||||
pods, err := c.clusterPods()
|
||||
if err != nil {
|
||||
return fmt.Errorf("Can't get pods: %s", err)
|
||||
}
|
||||
|
||||
for _, obj := range pods {
|
||||
c.logger.Infof("Pod: %s", util.NameFromMeta(obj.ObjectMeta))
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Cluster) createStatefulSet() (*v1beta1.StatefulSet, error) {
|
||||
cSpec := c.Spec
|
||||
clusterName := c.ClusterName()
|
||||
resourceList := resources.ResourceList(cSpec.Resources)
|
||||
template := resources.PodTemplate(clusterName, resourceList, c.dockerImage, cSpec.Version, c.etcdHost)
|
||||
statefulSet := resources.StatefulSet(clusterName, template, cSpec.NumberOfInstances)
|
||||
|
||||
statefulSet, err := c.config.KubeClient.StatefulSets(statefulSet.Namespace).Create(statefulSet)
|
||||
if k8sutil.ResourceAlreadyExists(err) {
|
||||
return nil, fmt.Errorf("StatefulSet '%s' already exists", util.NameFromMeta(statefulSet.ObjectMeta))
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
c.Statefulsets[statefulSet.UID] = statefulSet
|
||||
c.logger.Debugf("Created new StatefulSet, uid: %s", statefulSet.UID)
|
||||
|
||||
return statefulSet, nil
|
||||
}
|
||||
|
||||
func (c *Cluster) updateStatefulSet(statefulSet *v1beta1.StatefulSet) error {
|
||||
statefulSet, err := c.config.KubeClient.StatefulSets(statefulSet.Namespace).Update(statefulSet)
|
||||
if err != nil {
|
||||
c.Statefulsets[statefulSet.UID] = statefulSet
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func (c *Cluster) deleteStatefulSet(statefulSet *v1beta1.StatefulSet) error {
|
||||
err := c.config.KubeClient.
|
||||
StatefulSets(statefulSet.Namespace).
|
||||
Delete(statefulSet.Name, deleteOptions)
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
delete(c.Statefulsets, statefulSet.UID)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Cluster) createEndpoint() (*v1.Endpoints, error) {
|
||||
endpoint := resources.Endpoint(c.ClusterName())
|
||||
|
||||
endpoint, err := c.config.KubeClient.Endpoints(endpoint.Namespace).Create(endpoint)
|
||||
if k8sutil.ResourceAlreadyExists(err) {
|
||||
return nil, fmt.Errorf("Endpoint '%s' already exists", util.NameFromMeta(endpoint.ObjectMeta))
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
c.Endpoints[endpoint.UID] = endpoint
|
||||
c.logger.Debugf("Created new endpoint, uid: %s", endpoint.UID)
|
||||
|
||||
return endpoint, nil
|
||||
}
|
||||
|
||||
func (c *Cluster) deleteEndpoint(endpoint *v1.Endpoints) error {
|
||||
err := c.config.KubeClient.Endpoints(endpoint.Namespace).Delete(endpoint.Name, deleteOptions)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
delete(c.Endpoints, endpoint.UID)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Cluster) createService() (*v1.Service, error) {
|
||||
service := resources.Service(c.ClusterName(), c.Spec.AllowedSourceRanges)
|
||||
|
||||
service, err := c.config.KubeClient.Services(service.Namespace).Create(service)
|
||||
if k8sutil.ResourceAlreadyExists(err) {
|
||||
return nil, fmt.Errorf("Service '%s' already exists", util.NameFromMeta(service.ObjectMeta))
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
c.Services[service.UID] = service
|
||||
c.logger.Debugf("Created new service, uid: %s", service.UID)
|
||||
|
||||
return service, nil
|
||||
}
|
||||
|
||||
func (c *Cluster) deleteService(service *v1.Service) error {
|
||||
err := c.config.KubeClient.Services(service.Namespace).Delete(service.Name, deleteOptions)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
delete(c.Services, service.UID)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Cluster) createUsers() error {
|
||||
for username, user := range c.pgUsers {
|
||||
if username == constants.SuperuserName || username == constants.ReplicationUsername {
|
||||
continue
|
||||
}
|
||||
|
||||
isHuman, err := c.createPgUser(user)
|
||||
var userType string
|
||||
if isHuman {
|
||||
userType = "human"
|
||||
} else {
|
||||
userType = "robot"
|
||||
}
|
||||
if err != nil {
|
||||
return fmt.Errorf("Can't create %s user '%s': %s", userType, username, err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Cluster) applySecrets() error {
|
||||
secrets, err := resources.UserSecrets(c.ClusterName(), c.pgUsers)
|
||||
|
||||
if err != nil {
|
||||
return fmt.Errorf("Can't get user secrets")
|
||||
}
|
||||
|
||||
for username, secret := range secrets {
|
||||
secret, err := c.config.KubeClient.Secrets(secret.Namespace).Create(secret)
|
||||
if k8sutil.ResourceAlreadyExists(err) {
|
||||
curSecrets, err := c.config.KubeClient.Secrets(secret.Namespace).Get(secret.Name)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Can't get current secret: %s", err)
|
||||
}
|
||||
pwdUser := c.pgUsers[username]
|
||||
pwdUser.Password = string(curSecrets.Data["password"])
|
||||
c.pgUsers[username] = pwdUser
|
||||
|
||||
continue
|
||||
} else {
|
||||
if err != nil {
|
||||
return fmt.Errorf("Can't create secret for user '%s': %s", username, err)
|
||||
}
|
||||
c.Secrets[secret.UID] = secret
|
||||
c.logger.Debugf("Created new secret, uid: %s", secret.UID)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Cluster) deleteSecret(secret *v1.Secret) error {
|
||||
err := c.config.KubeClient.Secrets(secret.Namespace).Delete(secret.Name, deleteOptions)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
delete(c.Secrets, secret.UID)
|
||||
|
||||
return err
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,188 @@
|
|||
package cluster
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
etcdclient "github.com/coreos/etcd/client"
|
||||
"k8s.io/client-go/pkg/api/v1"
|
||||
"k8s.io/client-go/pkg/labels"
|
||||
|
||||
"github.bus.zalan.do/acid/postgres-operator/pkg/spec"
|
||||
"github.bus.zalan.do/acid/postgres-operator/pkg/util/constants"
|
||||
"github.bus.zalan.do/acid/postgres-operator/pkg/util/retryutil"
|
||||
)
|
||||
|
||||
func isValidUsername(username string) bool {
|
||||
return alphaNumericRegexp.MatchString(username)
|
||||
}
|
||||
|
||||
func normalizeUserFlags(userFlags []string) (flags []string, err error) {
|
||||
uniqueFlags := make(map[string]bool)
|
||||
|
||||
for _, flag := range userFlags {
|
||||
if !alphaNumericRegexp.MatchString(flag) {
|
||||
err = fmt.Errorf("User flag '%s' is not alphanumeric", flag)
|
||||
return
|
||||
} else {
|
||||
flag = strings.ToUpper(flag)
|
||||
if _, ok := uniqueFlags[flag]; !ok {
|
||||
uniqueFlags[flag] = true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
flags = []string{}
|
||||
for k := range uniqueFlags {
|
||||
flags = append(flags, k)
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (c *Cluster) getTeamMembers() ([]string, error) {
|
||||
teamInfo, err := c.config.TeamsAPIClient.TeamInfo(c.Spec.TeamId)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Can't get team info: %s", err)
|
||||
}
|
||||
|
||||
return teamInfo.Members, nil
|
||||
}
|
||||
|
||||
func (c *Cluster) waitForPodLabel(podEvents chan spec.PodEvent, spiloRole string) error {
|
||||
for {
|
||||
select {
|
||||
case podEvent := <-podEvents:
|
||||
podLabels := podEvent.CurPod.Labels
|
||||
c.logger.Debugf("Pod has following labels: %+v", podLabels)
|
||||
val, ok := podLabels["spilo-role"]
|
||||
if ok && val == spiloRole {
|
||||
return nil
|
||||
}
|
||||
case <-time.After(constants.PodLabelWaitTimeout):
|
||||
return fmt.Errorf("Pod label wait timeout")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Cluster) waitForPodDeletion(podEvents chan spec.PodEvent) error {
|
||||
for {
|
||||
select {
|
||||
case podEvent := <-podEvents:
|
||||
if podEvent.EventType == spec.PodEventDelete {
|
||||
return nil
|
||||
}
|
||||
case <-time.After(constants.PodDeletionWaitTimeout):
|
||||
return fmt.Errorf("Pod deletion wait timeout")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Cluster) waitStatefulsetReady() error {
|
||||
return retryutil.Retry(constants.ResourceCheckInterval, int(constants.ResourceCheckTimeout/constants.ResourceCheckInterval),
|
||||
func() (bool, error) {
|
||||
listOptions := v1.ListOptions{
|
||||
LabelSelector: c.labelsSet().String(),
|
||||
}
|
||||
ss, err := c.config.KubeClient.StatefulSets(c.Metadata.Namespace).List(listOptions)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
if len(ss.Items) != 1 {
|
||||
return false, fmt.Errorf("StatefulSet is not found")
|
||||
}
|
||||
|
||||
return *ss.Items[0].Spec.Replicas == ss.Items[0].Status.Replicas, nil
|
||||
})
|
||||
}
|
||||
|
||||
func (c *Cluster) waitPodLabelsReady() error {
|
||||
ls := c.labelsSet()
|
||||
namespace := c.Metadata.Namespace
|
||||
|
||||
listOptions := v1.ListOptions{
|
||||
LabelSelector: ls.String(),
|
||||
}
|
||||
masterListOption := v1.ListOptions{
|
||||
LabelSelector: labels.Merge(ls, labels.Set{"spilo-role": "master"}).String(),
|
||||
}
|
||||
replicaListOption := v1.ListOptions{
|
||||
LabelSelector: labels.Merge(ls, labels.Set{"spilo-role": "replica"}).String(),
|
||||
}
|
||||
pods, err := c.config.KubeClient.Pods(namespace).List(listOptions)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
podsNumber := len(pods.Items)
|
||||
|
||||
return retryutil.Retry(
|
||||
constants.ResourceCheckInterval, int(constants.ResourceCheckTimeout/constants.ResourceCheckInterval),
|
||||
func() (bool, error) {
|
||||
masterPods, err := c.config.KubeClient.Pods(namespace).List(masterListOption)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
replicaPods, err := c.config.KubeClient.Pods(namespace).List(replicaListOption)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
if len(masterPods.Items) > 1 {
|
||||
return false, fmt.Errorf("Too many masters")
|
||||
}
|
||||
|
||||
return len(masterPods.Items)+len(replicaPods.Items) == podsNumber, nil
|
||||
})
|
||||
}
|
||||
|
||||
func (c *Cluster) waitClusterReady() error {
|
||||
// TODO: wait for the first Pod only
|
||||
err := c.waitStatefulsetReady()
|
||||
if err != nil {
|
||||
return fmt.Errorf("Statuful set error: %s", err)
|
||||
}
|
||||
|
||||
// TODO: wait only for master
|
||||
err = c.waitPodLabelsReady()
|
||||
if err != nil {
|
||||
return fmt.Errorf("Pod labels error: %s", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Cluster) labelsSet() labels.Set {
|
||||
return labels.Set{
|
||||
"application": "spilo",
|
||||
"spilo-cluster": c.Metadata.Name,
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Cluster) credentialSecretName(username string) string {
|
||||
return fmt.Sprintf(constants.UserSecretTemplate,
|
||||
username,
|
||||
c.Metadata.Name,
|
||||
constants.TPRName,
|
||||
constants.TPRVendor)
|
||||
}
|
||||
|
||||
func (c *Cluster) deleteEtcdKey() error {
|
||||
etcdKey := fmt.Sprintf("/service/%s", c.Metadata.Name)
|
||||
|
||||
//TODO: retry multiple times
|
||||
resp, err := c.config.EtcdClient.Delete(context.Background(),
|
||||
etcdKey,
|
||||
&etcdclient.DeleteOptions{Recursive: true})
|
||||
|
||||
if err != nil {
|
||||
return fmt.Errorf("Can't delete etcd key: %s", err)
|
||||
}
|
||||
|
||||
if resp == nil {
|
||||
return fmt.Errorf("No response from etcd cluster")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
@ -1,23 +1,18 @@
|
|||
package controller
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
"github.com/Sirupsen/logrus"
|
||||
etcdclient "github.com/coreos/etcd/client"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
"k8s.io/client-go/pkg/api/v1"
|
||||
v1beta1extensions "k8s.io/client-go/pkg/apis/extensions/v1beta1"
|
||||
"k8s.io/client-go/pkg/fields"
|
||||
"k8s.io/client-go/rest"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
|
||||
"github.bus.zalan.do/acid/postgres-operator/pkg/cluster"
|
||||
"github.bus.zalan.do/acid/postgres-operator/pkg/spec"
|
||||
"github.bus.zalan.do/acid/postgres-operator/pkg/util"
|
||||
"github.bus.zalan.do/acid/postgres-operator/pkg/util/constants"
|
||||
"github.bus.zalan.do/acid/postgres-operator/pkg/util/k8sutil"
|
||||
"github.bus.zalan.do/acid/postgres-operator/pkg/util/teams"
|
||||
)
|
||||
|
||||
|
|
@ -32,25 +27,23 @@ type Config struct {
|
|||
type Controller struct {
|
||||
config Config
|
||||
logger *logrus.Entry
|
||||
events chan *Event
|
||||
clusters map[string]*cluster.Cluster
|
||||
stopChMap map[string]chan struct{}
|
||||
clusters map[spec.ClusterName]*cluster.Cluster
|
||||
stopChMap map[spec.ClusterName]chan struct{}
|
||||
waitCluster sync.WaitGroup
|
||||
|
||||
postgresqlInformer cache.SharedIndexInformer
|
||||
}
|
||||
podInformer cache.SharedIndexInformer
|
||||
|
||||
type Event struct {
|
||||
Type string
|
||||
Object *spec.Postgresql
|
||||
podCh chan spec.PodEvent
|
||||
}
|
||||
|
||||
func New(cfg *Config) *Controller {
|
||||
return &Controller{
|
||||
config: *cfg,
|
||||
logger: logrus.WithField("pkg", "controller"),
|
||||
clusters: make(map[string]*cluster.Cluster),
|
||||
stopChMap: make(map[string]chan struct{}),
|
||||
clusters: make(map[spec.ClusterName]*cluster.Cluster),
|
||||
stopChMap: make(map[spec.ClusterName]chan struct{}),
|
||||
podCh: make(chan spec.PodEvent),
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -65,127 +58,65 @@ func (c *Controller) Run(stopCh <-chan struct{}, wg *sync.WaitGroup) {
|
|||
return
|
||||
}
|
||||
|
||||
go c.watchTpr(stopCh)
|
||||
c.logger.Infof("'%s' namespace will be watched", c.config.PodNamespace)
|
||||
go c.runInformers(stopCh)
|
||||
|
||||
c.logger.Info("Started working in background")
|
||||
}
|
||||
|
||||
func (c *Controller) watchTpr(stopCh <-chan struct{}) {
|
||||
go c.postgresqlInformer.Run(stopCh)
|
||||
|
||||
<-stopCh
|
||||
}
|
||||
|
||||
func (c *Controller) createTPR() error {
|
||||
TPRName := fmt.Sprintf("%s.%s", constants.TPRName, constants.TPRVendor)
|
||||
tpr := &v1beta1extensions.ThirdPartyResource{
|
||||
ObjectMeta: v1.ObjectMeta{
|
||||
Name: TPRName,
|
||||
//PodNamespace: c.config.PodNamespace, //ThirdPartyResources are cluster-wide
|
||||
},
|
||||
Versions: []v1beta1extensions.APIVersion{
|
||||
{Name: constants.TPRApiVersion},
|
||||
},
|
||||
Description: constants.TPRDescription,
|
||||
}
|
||||
|
||||
_, err := c.config.KubeClient.ExtensionsV1beta1().ThirdPartyResources().Create(tpr)
|
||||
if err != nil {
|
||||
if !k8sutil.IsKubernetesResourceAlreadyExistError(err) {
|
||||
return err
|
||||
} else {
|
||||
c.logger.Infof("ThirdPartyResource '%s' is already registered", TPRName)
|
||||
}
|
||||
} else {
|
||||
c.logger.Infof("ThirdPartyResource '%s' has been registered", TPRName)
|
||||
}
|
||||
|
||||
restClient := c.config.RestClient
|
||||
|
||||
return k8sutil.WaitTPRReady(restClient, constants.TPRReadyWaitInterval, constants.TPRReadyWaitTimeout, c.config.PodNamespace)
|
||||
}
|
||||
|
||||
func (c *Controller) makeClusterConfig() cluster.Config {
|
||||
return cluster.Config{
|
||||
ControllerNamespace: c.config.PodNamespace,
|
||||
KubeClient: c.config.KubeClient,
|
||||
RestClient: c.config.RestClient,
|
||||
EtcdClient: c.config.EtcdClient,
|
||||
TeamsAPIClient: c.config.TeamsAPIClient,
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Controller) initController() {
|
||||
err := c.createTPR()
|
||||
if err != nil {
|
||||
c.logger.Fatalf("Can't register ThirdPartyResource: %s", err)
|
||||
}
|
||||
|
||||
token, err := c.getOAuthToken()
|
||||
if err != nil {
|
||||
c.logger.Errorf("Can't get OAuth token: %s", err)
|
||||
} else {
|
||||
c.config.TeamsAPIClient.OAuthToken = token
|
||||
}
|
||||
|
||||
// Postgresqls
|
||||
clusterLw := &cache.ListWatch{
|
||||
ListFunc: c.clusterListFunc,
|
||||
WatchFunc: c.clusterWatchFunc,
|
||||
}
|
||||
c.postgresqlInformer = cache.NewSharedIndexInformer(
|
||||
cache.NewListWatchFromClient(c.config.RestClient, constants.ResourceName, v1.NamespaceAll, fields.Everything()),
|
||||
clusterLw,
|
||||
&spec.Postgresql{},
|
||||
constants.ResyncPeriod,
|
||||
constants.ResyncPeriodTPR,
|
||||
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
|
||||
|
||||
c.postgresqlInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: c.clusterAdd,
|
||||
UpdateFunc: c.clusterUpdate,
|
||||
DeleteFunc: c.clusterDelete,
|
||||
AddFunc: c.postgresqlAdd,
|
||||
UpdateFunc: c.postgresqlUpdate,
|
||||
DeleteFunc: c.postgresqlDelete,
|
||||
})
|
||||
|
||||
// Pods
|
||||
podLw := &cache.ListWatch{
|
||||
ListFunc: c.podListFunc,
|
||||
WatchFunc: c.podWatchFunc,
|
||||
}
|
||||
|
||||
c.podInformer = cache.NewSharedIndexInformer(
|
||||
podLw,
|
||||
&v1.Pod{},
|
||||
constants.ResyncPeriodPod,
|
||||
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
|
||||
|
||||
c.podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: c.podAdd,
|
||||
UpdateFunc: c.podUpdate,
|
||||
DeleteFunc: c.podDelete,
|
||||
})
|
||||
}
|
||||
|
||||
func (c *Controller) clusterAdd(obj interface{}) {
|
||||
pg := obj.(*spec.Postgresql)
|
||||
func (c *Controller) runInformers(stopCh <-chan struct{}) {
|
||||
go c.postgresqlInformer.Run(stopCh)
|
||||
go c.podInformer.Run(stopCh)
|
||||
go c.podEventsDispatcher(stopCh)
|
||||
|
||||
//TODO: why do we need to have this check
|
||||
if pg.Spec == nil {
|
||||
return
|
||||
}
|
||||
|
||||
clusterName := (*pg).Metadata.Name
|
||||
|
||||
cl := cluster.New(c.makeClusterConfig(), pg)
|
||||
err := cl.Create()
|
||||
if err != nil {
|
||||
c.logger.Errorf("Can't create cluster: %s", err)
|
||||
return
|
||||
}
|
||||
c.stopChMap[clusterName] = make(chan struct{})
|
||||
c.clusters[clusterName] = cl
|
||||
|
||||
c.logger.Infof("Postgresql cluster '%s' has been created", util.FullObjectNameFromMeta((*pg).Metadata))
|
||||
}
|
||||
|
||||
func (c *Controller) clusterUpdate(prev, cur interface{}) {
|
||||
pgPrev := prev.(*spec.Postgresql)
|
||||
pgCur := cur.(*spec.Postgresql)
|
||||
|
||||
if pgPrev.Spec == nil || pgCur.Spec == nil {
|
||||
return
|
||||
}
|
||||
if pgPrev.Metadata.ResourceVersion == pgCur.Metadata.ResourceVersion {
|
||||
return
|
||||
}
|
||||
|
||||
c.logger.Infof("Update: %+v -> %+v", *pgPrev, *pgCur)
|
||||
}
|
||||
|
||||
func (c *Controller) clusterDelete(obj interface{}) {
|
||||
pg := obj.(*spec.Postgresql)
|
||||
if pg.Spec == nil {
|
||||
return
|
||||
}
|
||||
clusterName := (*pg).Metadata.Name
|
||||
|
||||
cluster := cluster.New(c.makeClusterConfig(), pg)
|
||||
err := cluster.Delete()
|
||||
if err != nil {
|
||||
c.logger.Errorf("Can't delete cluster '%s': %s", util.FullObjectNameFromMeta((*pg).Metadata), err)
|
||||
return
|
||||
}
|
||||
|
||||
close(c.stopChMap[clusterName])
|
||||
delete(c.clusters, clusterName)
|
||||
|
||||
c.logger.Infof("Cluster has been deleted: '%s'", util.FullObjectNameFromMeta((*pg).Metadata))
|
||||
<-stopCh
|
||||
}
|
||||
|
|
|
|||
|
|
@ -2,9 +2,10 @@ package controller
|
|||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.bus.zalan.do/acid/postgres-operator/pkg/util/constants"
|
||||
etcdclient "github.com/coreos/etcd/client"
|
||||
"time"
|
||||
)
|
||||
|
||||
func (c *Controller) initEtcdClient() error {
|
||||
|
|
|
|||
|
|
@ -0,0 +1,134 @@
|
|||
package controller
|
||||
|
||||
import (
|
||||
"k8s.io/client-go/pkg/api"
|
||||
"k8s.io/client-go/pkg/api/v1"
|
||||
"k8s.io/client-go/pkg/runtime"
|
||||
"k8s.io/client-go/pkg/watch"
|
||||
|
||||
"github.bus.zalan.do/acid/postgres-operator/pkg/spec"
|
||||
"github.bus.zalan.do/acid/postgres-operator/pkg/util"
|
||||
)
|
||||
|
||||
func (c *Controller) podListFunc(options api.ListOptions) (runtime.Object, error) {
|
||||
var labelSelector string
|
||||
var fieldSelector string
|
||||
|
||||
if options.LabelSelector != nil {
|
||||
labelSelector = options.LabelSelector.String()
|
||||
}
|
||||
|
||||
if options.FieldSelector != nil {
|
||||
fieldSelector = options.FieldSelector.String()
|
||||
}
|
||||
opts := v1.ListOptions{
|
||||
LabelSelector: labelSelector,
|
||||
FieldSelector: fieldSelector,
|
||||
Watch: options.Watch,
|
||||
ResourceVersion: options.ResourceVersion,
|
||||
TimeoutSeconds: options.TimeoutSeconds,
|
||||
}
|
||||
|
||||
return c.config.KubeClient.CoreV1().Pods(c.config.PodNamespace).List(opts)
|
||||
}
|
||||
|
||||
func (c *Controller) podWatchFunc(options api.ListOptions) (watch.Interface, error) {
|
||||
var labelSelector string
|
||||
var fieldSelector string
|
||||
|
||||
if options.LabelSelector != nil {
|
||||
labelSelector = options.LabelSelector.String()
|
||||
}
|
||||
|
||||
if options.FieldSelector != nil {
|
||||
fieldSelector = options.FieldSelector.String()
|
||||
}
|
||||
|
||||
opts := v1.ListOptions{
|
||||
LabelSelector: labelSelector,
|
||||
FieldSelector: fieldSelector,
|
||||
Watch: options.Watch,
|
||||
ResourceVersion: options.ResourceVersion,
|
||||
TimeoutSeconds: options.TimeoutSeconds,
|
||||
}
|
||||
|
||||
return c.config.KubeClient.CoreV1Client.Pods(c.config.PodNamespace).Watch(opts)
|
||||
}
|
||||
|
||||
func PodNameFromMeta(meta v1.ObjectMeta) spec.PodName {
|
||||
return spec.PodName{
|
||||
Namespace: meta.Namespace,
|
||||
Name: meta.Name,
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Controller) podAdd(obj interface{}) {
|
||||
pod, ok := obj.(*v1.Pod)
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
podEvent := spec.PodEvent{
|
||||
ClusterName: util.PodClusterName(pod),
|
||||
PodName: PodNameFromMeta(pod.ObjectMeta),
|
||||
CurPod: pod,
|
||||
EventType: spec.PodEventAdd,
|
||||
}
|
||||
|
||||
c.podCh <- podEvent
|
||||
}
|
||||
|
||||
func (c *Controller) podUpdate(prev, cur interface{}) {
|
||||
prevPod, ok := prev.(*v1.Pod)
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
curPod, ok := cur.(*v1.Pod)
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
podEvent := spec.PodEvent{
|
||||
ClusterName: util.PodClusterName(curPod),
|
||||
PodName: PodNameFromMeta(curPod.ObjectMeta),
|
||||
PrevPod: prevPod,
|
||||
CurPod: curPod,
|
||||
EventType: spec.PodEventUpdate,
|
||||
}
|
||||
|
||||
c.podCh <- podEvent
|
||||
}
|
||||
|
||||
func (c *Controller) podDelete(obj interface{}) {
|
||||
pod, ok := obj.(*v1.Pod)
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
podEvent := spec.PodEvent{
|
||||
ClusterName: util.PodClusterName(pod),
|
||||
PodName: PodNameFromMeta(pod.ObjectMeta),
|
||||
CurPod: pod,
|
||||
EventType: spec.PodEventDelete,
|
||||
}
|
||||
|
||||
c.podCh <- podEvent
|
||||
}
|
||||
|
||||
func (c *Controller) podEventsDispatcher(stopCh <-chan struct{}) {
|
||||
c.logger.Infof("Watching all pod events")
|
||||
for {
|
||||
select {
|
||||
case event := <-c.podCh:
|
||||
if subscriber, ok := c.clusters[event.ClusterName]; ok {
|
||||
c.logger.Debugf("Sending %s event of pod '%s' to the '%s' cluster channel", event.EventType, event.PodName, event.ClusterName)
|
||||
go subscriber.ReceivePodEvent(event)
|
||||
} else {
|
||||
c.logger.Debugf("Skipping pods unrelated to clusters: %s", event.PodName)
|
||||
}
|
||||
case <-stopCh:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,189 @@
|
|||
package controller
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"reflect"
|
||||
|
||||
"k8s.io/client-go/pkg/api"
|
||||
"k8s.io/client-go/pkg/api/meta"
|
||||
"k8s.io/client-go/pkg/fields"
|
||||
"k8s.io/client-go/pkg/runtime"
|
||||
"k8s.io/client-go/pkg/watch"
|
||||
|
||||
"github.bus.zalan.do/acid/postgres-operator/pkg/cluster"
|
||||
"github.bus.zalan.do/acid/postgres-operator/pkg/spec"
|
||||
"github.bus.zalan.do/acid/postgres-operator/pkg/util"
|
||||
"github.bus.zalan.do/acid/postgres-operator/pkg/util/constants"
|
||||
)
|
||||
|
||||
func (c *Controller) clusterListFunc(options api.ListOptions) (runtime.Object, error) {
|
||||
c.logger.Info("Getting list of currently running clusters")
|
||||
object, err := c.config.RestClient.Get().
|
||||
Namespace(c.config.PodNamespace).
|
||||
Resource(constants.ResourceName).
|
||||
VersionedParams(&options, api.ParameterCodec).
|
||||
FieldsSelectorParam(fields.Everything()).
|
||||
Do().
|
||||
Get()
|
||||
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Can't get list of postgresql objects: %s", err)
|
||||
}
|
||||
|
||||
objList, err := meta.ExtractList(object)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Can't extract list of postgresql objects: %s", err)
|
||||
}
|
||||
|
||||
clusterConfig := c.makeClusterConfig()
|
||||
for _, obj := range objList {
|
||||
pg, ok := obj.(*spec.Postgresql)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("Can't cast object to postgresql")
|
||||
}
|
||||
clusterName := spec.ClusterName{
|
||||
Namespace: pg.Metadata.Namespace,
|
||||
Name: pg.Metadata.Name,
|
||||
}
|
||||
|
||||
cl := cluster.New(clusterConfig, *pg)
|
||||
|
||||
stopCh := make(chan struct{})
|
||||
c.stopChMap[clusterName] = stopCh
|
||||
c.clusters[clusterName] = cl
|
||||
cl.LoadResources()
|
||||
cl.ListResources()
|
||||
|
||||
go cl.Run(stopCh)
|
||||
}
|
||||
if len(c.clusters) > 0 {
|
||||
c.logger.Infof("There are %d clusters currently running", len(c.clusters))
|
||||
} else {
|
||||
c.logger.Infof("No clusters running")
|
||||
}
|
||||
|
||||
return object, err
|
||||
}
|
||||
|
||||
func (c *Controller) clusterWatchFunc(options api.ListOptions) (watch.Interface, error) {
|
||||
return c.config.RestClient.Get().
|
||||
Prefix("watch").
|
||||
Namespace(c.config.PodNamespace).
|
||||
Resource(constants.ResourceName).
|
||||
VersionedParams(&options, api.ParameterCodec).
|
||||
FieldsSelectorParam(fields.Everything()).
|
||||
Watch()
|
||||
}
|
||||
|
||||
func (c *Controller) postgresqlAdd(obj interface{}) {
|
||||
pg, ok := obj.(*spec.Postgresql)
|
||||
if !ok {
|
||||
c.logger.Errorf("Can't cast to postgresql spec")
|
||||
return
|
||||
}
|
||||
|
||||
clusterName := spec.ClusterName{
|
||||
Namespace: pg.Metadata.Namespace,
|
||||
Name: pg.Metadata.Name,
|
||||
}
|
||||
|
||||
_, ok = c.clusters[clusterName]
|
||||
if ok {
|
||||
c.logger.Infof("Cluster '%s' already exists", clusterName)
|
||||
return
|
||||
}
|
||||
|
||||
c.logger.Infof("Creation of a new Postgresql cluster '%s' started", clusterName)
|
||||
cl := cluster.New(c.makeClusterConfig(), *pg)
|
||||
cl.MustSetStatus(spec.ClusterStatusCreating)
|
||||
err := cl.Create()
|
||||
if err != nil {
|
||||
c.logger.Errorf("Can't create cluster: %s", err)
|
||||
cl.MustSetStatus(spec.ClusterStatusAddFailed)
|
||||
return
|
||||
}
|
||||
cl.MustSetStatus(spec.ClusterStatusRunning) //TODO: are you sure it's running?
|
||||
|
||||
stopCh := make(chan struct{})
|
||||
c.stopChMap[clusterName] = stopCh
|
||||
c.clusters[clusterName] = cl
|
||||
go cl.Run(stopCh)
|
||||
|
||||
c.logger.Infof("Postgresql cluster '%s' has been created", clusterName)
|
||||
}
|
||||
|
||||
func (c *Controller) postgresqlUpdate(prev, cur interface{}) {
|
||||
pgPrev, ok := prev.(*spec.Postgresql)
|
||||
if !ok {
|
||||
c.logger.Errorf("Can't cast to postgresql spec")
|
||||
}
|
||||
pgNew, ok := cur.(*spec.Postgresql)
|
||||
if !ok {
|
||||
c.logger.Errorf("Can't cast to postgresql spec")
|
||||
}
|
||||
|
||||
clusterName := spec.ClusterName{
|
||||
Namespace: pgNew.Metadata.Namespace,
|
||||
Name: pgNew.Metadata.Name,
|
||||
}
|
||||
|
||||
//TODO: Do not update cluster which is currently creating
|
||||
|
||||
if pgPrev.Metadata.ResourceVersion == pgNew.Metadata.ResourceVersion {
|
||||
c.logger.Debugf("Skipping update with no resource version change")
|
||||
return
|
||||
}
|
||||
pgCluster := c.clusters[clusterName] // current
|
||||
|
||||
if reflect.DeepEqual(pgPrev.Spec, pgNew.Spec) {
|
||||
c.logger.Infof("Skipping update with no spec change")
|
||||
return
|
||||
}
|
||||
|
||||
c.logger.Infof("Cluster update: %s(version: %s) -> %s(version: %s)",
|
||||
util.NameFromMeta(pgPrev.Metadata), pgPrev.Metadata.ResourceVersion,
|
||||
util.NameFromMeta(pgNew.Metadata), pgNew.Metadata.ResourceVersion)
|
||||
|
||||
rollingUpdate := pgCluster.NeedsRollingUpdate(pgNew)
|
||||
if rollingUpdate {
|
||||
c.logger.Infof("Pods need to be recreated")
|
||||
}
|
||||
|
||||
pgCluster.MustSetStatus(spec.ClusterStatusUpdating)
|
||||
err := pgCluster.Update(pgNew, rollingUpdate)
|
||||
if err != nil {
|
||||
pgCluster.MustSetStatus(spec.ClusterStatusUpdateFailed)
|
||||
c.logger.Errorf("Can't update cluster: %s", err)
|
||||
} else {
|
||||
c.logger.Infof("Cluster has been updated")
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Controller) postgresqlDelete(obj interface{}) {
|
||||
pgCur, ok := obj.(*spec.Postgresql)
|
||||
if !ok {
|
||||
c.logger.Errorf("Can't cast to postgresql spec")
|
||||
return
|
||||
}
|
||||
clusterName := spec.ClusterName{
|
||||
Namespace: pgCur.Metadata.Namespace,
|
||||
Name: pgCur.Metadata.Name,
|
||||
}
|
||||
pgCluster, ok := c.clusters[clusterName]
|
||||
if !ok {
|
||||
c.logger.Errorf("Unknown cluster: %s", clusterName)
|
||||
return
|
||||
}
|
||||
|
||||
c.logger.Infof("Cluster delete: %s", util.NameFromMeta(pgCur.Metadata))
|
||||
err := pgCluster.Delete()
|
||||
if err != nil {
|
||||
c.logger.Errorf("Can't delete cluster '%s': %s", clusterName, err)
|
||||
return
|
||||
}
|
||||
|
||||
close(c.stopChMap[clusterName])
|
||||
delete(c.clusters, clusterName)
|
||||
|
||||
c.logger.Infof("Cluster '%s' has been successfully deleted", clusterName)
|
||||
}
|
||||
|
|
@ -0,0 +1,57 @@
|
|||
package controller
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.bus.zalan.do/acid/postgres-operator/pkg/cluster"
|
||||
"github.bus.zalan.do/acid/postgres-operator/pkg/util/constants"
|
||||
"github.bus.zalan.do/acid/postgres-operator/pkg/util/k8sutil"
|
||||
"github.bus.zalan.do/acid/postgres-operator/pkg/util/resources"
|
||||
"k8s.io/client-go/pkg/api"
|
||||
)
|
||||
|
||||
func (c *Controller) makeClusterConfig() cluster.Config {
|
||||
return cluster.Config{
|
||||
ControllerNamespace: c.config.PodNamespace,
|
||||
KubeClient: c.config.KubeClient,
|
||||
RestClient: c.config.RestClient,
|
||||
EtcdClient: c.config.EtcdClient,
|
||||
TeamsAPIClient: c.config.TeamsAPIClient,
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Controller) getOAuthToken() (string, error) {
|
||||
// Temporary getting postgresql-operator secret from the NamespaceDefault
|
||||
credentialsSecret, err := c.config.KubeClient.Secrets(api.NamespaceDefault).Get(constants.OAuthTokenSecretName)
|
||||
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("Can't get credentials secret: %s", err)
|
||||
}
|
||||
data := credentialsSecret.Data
|
||||
|
||||
if string(data["read-only-token-type"]) != "Bearer" {
|
||||
return "", fmt.Errorf("Wrong token type: %s", data["read-only-token-type"])
|
||||
}
|
||||
|
||||
return string(data["read-only-token-secret"]), nil
|
||||
}
|
||||
|
||||
func (c *Controller) createTPR() error {
|
||||
TPRName := fmt.Sprintf("%s.%s", constants.TPRName, constants.TPRVendor)
|
||||
tpr := resources.ThirdPartyResource(TPRName)
|
||||
|
||||
_, err := c.config.KubeClient.ExtensionsV1beta1().ThirdPartyResources().Create(tpr)
|
||||
if err != nil {
|
||||
if !k8sutil.ResourceAlreadyExists(err) {
|
||||
return err
|
||||
} else {
|
||||
c.logger.Infof("ThirdPartyResource '%s' is already registered", TPRName)
|
||||
}
|
||||
} else {
|
||||
c.logger.Infof("ThirdPartyResource '%s' has been registered", TPRName)
|
||||
}
|
||||
|
||||
restClient := c.config.RestClient
|
||||
|
||||
return k8sutil.WaitTPRReady(restClient, constants.TPRReadyWaitInterval, constants.TPRReadyWaitTimeout, c.config.PodNamespace)
|
||||
}
|
||||
|
|
@ -2,20 +2,24 @@ package spec
|
|||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"regexp"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"k8s.io/client-go/pkg/api/meta"
|
||||
"k8s.io/client-go/pkg/api/unversioned"
|
||||
"k8s.io/client-go/pkg/api/v1"
|
||||
)
|
||||
|
||||
var alphaRegexp = regexp.MustCompile("^[a-zA-Z]*$")
|
||||
|
||||
type MaintenanceWindow struct {
|
||||
StartTime string
|
||||
EndTime string
|
||||
//StartTime time.Time // Start time
|
||||
//StartWeekday time.Weekday // Start weekday
|
||||
//
|
||||
//EndTime time.Time // End time
|
||||
//EndWeekday time.Weekday // End weekday
|
||||
StartTime time.Time // Start time
|
||||
StartWeekday time.Weekday // Start weekday
|
||||
|
||||
EndTime time.Time // End time
|
||||
EndWeekday time.Weekday // End weekday
|
||||
}
|
||||
|
||||
type Volume struct {
|
||||
|
|
@ -44,47 +48,37 @@ type Patroni struct {
|
|||
|
||||
type UserFlags []string
|
||||
|
||||
type PostgresSpec struct {
|
||||
Resources `json:"resources,omitempty"`
|
||||
Patroni `json:"patroni,omitempty"`
|
||||
PostgresqlParam `json:"postgresql"`
|
||||
Volume `json:"volume,omitempty"`
|
||||
type PostgresStatus string
|
||||
|
||||
TeamId string `json:"teamId"`
|
||||
AllowedSourceRanges []string `json:"allowedSourceRanges"`
|
||||
NumberOfInstances int32 `json:"numberOfInstances"`
|
||||
Users map[string]UserFlags `json:"users"`
|
||||
MaintenanceWindows []string `json:"maintenanceWindows,omitempty"`
|
||||
PamUsersSecret string `json:"pamUsersSecret,omitempty"`
|
||||
|
||||
EtcdHost string
|
||||
DockerImage string
|
||||
}
|
||||
|
||||
type PostgresStatus struct {
|
||||
// Phase is the cluster running phase
|
||||
Phase string `json:"phase"`
|
||||
Reason string `json:"reason"`
|
||||
|
||||
// ControlPuased indicates the operator pauses the control of the cluster.
|
||||
ControlPaused bool `json:"controlPaused"`
|
||||
|
||||
// Size is the current size of the cluster
|
||||
Size int `json:"size"`
|
||||
// CurrentVersion is the current cluster version
|
||||
CurrentVersion string `json:"currentVersion"`
|
||||
// TargetVersion is the version the cluster upgrading to.
|
||||
// If the cluster is not upgrading, TargetVersion is empty.
|
||||
TargetVersion string `json:"targetVersion"`
|
||||
}
|
||||
const (
|
||||
ClusterStatusUnknown PostgresStatus = ""
|
||||
ClusterStatusCreating = "Creating"
|
||||
ClusterStatusUpdating = "Updating"
|
||||
ClusterStatusUpdateFailed = "UpdateFailed"
|
||||
ClusterStatusAddFailed = "CreateFailed"
|
||||
ClusterStatusRunning = "Running"
|
||||
)
|
||||
|
||||
// PostgreSQL Third Party (resource) Object
|
||||
type Postgresql struct {
|
||||
unversioned.TypeMeta `json:",inline"`
|
||||
Metadata v1.ObjectMeta `json:"metadata"`
|
||||
|
||||
Spec *PostgresSpec `json:"spec"`
|
||||
Status *PostgresStatus `json:"status"`
|
||||
Spec PostgresSpec `json:"spec"`
|
||||
Status PostgresStatus `json:"status"`
|
||||
}
|
||||
|
||||
type PostgresSpec struct {
|
||||
PostgresqlParam `json:"postgresql"`
|
||||
Volume `json:"volume,omitempty"`
|
||||
Patroni `json:"patroni,omitempty"`
|
||||
Resources `json:"resources,omitempty"`
|
||||
|
||||
TeamId string `json:"teamId"`
|
||||
AllowedSourceRanges []string `json:"allowedSourceRanges"`
|
||||
NumberOfInstances int32 `json:"numberOfInstances"`
|
||||
Users map[string]UserFlags `json:"users"`
|
||||
MaintenanceWindows []MaintenanceWindow `json:"maintenanceWindows,omitempty"`
|
||||
}
|
||||
|
||||
type PostgresqlList struct {
|
||||
|
|
@ -94,6 +88,85 @@ type PostgresqlList struct {
|
|||
Items []Postgresql `json:"items"`
|
||||
}
|
||||
|
||||
func parseTime(s string) (t time.Time, wd time.Weekday, wdProvided bool, err error) {
|
||||
var timeLayout string
|
||||
|
||||
parts := strings.Split(s, ":")
|
||||
if len(parts) == 3 {
|
||||
if len(parts[0]) != 3 || !alphaRegexp.MatchString(parts[0]) {
|
||||
err = fmt.Errorf("Weekday must be 3 characters length")
|
||||
return
|
||||
}
|
||||
timeLayout = "Mon:15:04"
|
||||
wdProvided = true
|
||||
} else {
|
||||
wdProvided = false
|
||||
timeLayout = "15:04"
|
||||
}
|
||||
|
||||
tp, err := time.Parse(timeLayout, s)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
wd = tp.Weekday()
|
||||
t = tp.UTC()
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (m *MaintenanceWindow) MarshalJSON() ([]byte, error) {
|
||||
var startWd, endWd string
|
||||
if m.StartWeekday == time.Sunday && m.EndWeekday == time.Saturday {
|
||||
startWd = ""
|
||||
endWd = ""
|
||||
} else {
|
||||
startWd = m.StartWeekday.String()[:3] + ":"
|
||||
endWd = m.EndWeekday.String()[:3] + ":"
|
||||
}
|
||||
|
||||
return []byte(fmt.Sprintf("\"%s%s-%s%s\"",
|
||||
startWd, m.StartTime.Format("15:04"),
|
||||
endWd, m.EndTime.Format("15:04"))), nil
|
||||
}
|
||||
|
||||
func (m *MaintenanceWindow) UnmarshalJSON(data []byte) error {
|
||||
var (
|
||||
got MaintenanceWindow
|
||||
weekdayProvidedFrom bool
|
||||
weekdayProvidedTo bool
|
||||
err error
|
||||
)
|
||||
|
||||
parts := strings.Split(string(data[1:len(data)-1]), "-")
|
||||
if len(parts) != 2 {
|
||||
return fmt.Errorf("Incorrect maintenance window format")
|
||||
}
|
||||
|
||||
got.StartTime, got.StartWeekday, weekdayProvidedFrom, err = parseTime(parts[0])
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
got.EndTime, got.EndWeekday, weekdayProvidedTo, err = parseTime(parts[1])
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if got.EndTime.Before(got.StartTime) {
|
||||
return fmt.Errorf("'From' time must be prior to the 'To' time.")
|
||||
}
|
||||
|
||||
if !weekdayProvidedFrom || !weekdayProvidedTo {
|
||||
got.StartWeekday = time.Sunday
|
||||
got.EndWeekday = time.Saturday
|
||||
}
|
||||
|
||||
*m = got
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *Postgresql) GetObjectKind() unversioned.ObjectKind {
|
||||
return &p.TypeMeta
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,40 @@
|
|||
package spec
|
||||
|
||||
import (
|
||||
"k8s.io/client-go/pkg/api/v1"
|
||||
"k8s.io/client-go/pkg/types"
|
||||
)
|
||||
|
||||
type PodEventType string
|
||||
|
||||
type PodName types.NamespacedName
|
||||
|
||||
const (
|
||||
PodEventAdd PodEventType = "ADD"
|
||||
PodEventUpdate PodEventType = "UPDATE"
|
||||
PodEventDelete PodEventType = "DELETE"
|
||||
)
|
||||
|
||||
type PodEvent struct {
|
||||
ClusterName ClusterName
|
||||
PodName PodName
|
||||
PrevPod *v1.Pod
|
||||
CurPod *v1.Pod
|
||||
EventType PodEventType
|
||||
}
|
||||
|
||||
func (p PodName) String() string {
|
||||
return types.NamespacedName(p).String()
|
||||
}
|
||||
|
||||
type ClusterName types.NamespacedName
|
||||
|
||||
func (c ClusterName) String() string {
|
||||
return types.NamespacedName(c).String()
|
||||
}
|
||||
|
||||
type PgUser struct {
|
||||
Name string
|
||||
Password string
|
||||
Flags []string
|
||||
}
|
||||
|
|
@ -12,15 +12,25 @@ const (
|
|||
ResourceCheckInterval = 3 * time.Second
|
||||
ResourceCheckTimeout = 10 * time.Minute
|
||||
|
||||
ResourceName = TPRName + "s"
|
||||
ResyncPeriod = 5 * time.Minute
|
||||
PodLabelWaitTimeout = 10 * time.Minute
|
||||
PodDeletionWaitTimeout = 10 * time.Minute
|
||||
|
||||
ResourceName = TPRName + "s"
|
||||
ResyncPeriodTPR = 5 * time.Minute
|
||||
ResyncPeriodPod = 5 * time.Minute
|
||||
|
||||
SuperuserName = "postgres"
|
||||
ReplicationUsername = "replication"
|
||||
|
||||
//TODO: move to the operator spec
|
||||
EtcdHost = "etcd-client.default.svc.cluster.local:2379"
|
||||
SpiloImage = "registry.opensource.zalan.do/acid/spilo-9.6:1.2-p12"
|
||||
PamRoleName = "zalandos"
|
||||
PamConfiguration = "https://info.example.com/oauth2/tokeninfo?access_token= uid realm=/employees"
|
||||
EtcdHost = "etcd-client.default.svc.cluster.local:2379"
|
||||
SpiloImage = "registry.opensource.zalan.do/acid/spilo-9.6:1.2-p12"
|
||||
PamRoleName = "zalandos"
|
||||
PamConfiguration = "https://info.example.com/oauth2/tokeninfo?access_token= uid realm=/employees"
|
||||
PasswordLength = 64
|
||||
TeamsAPIUrl = "https://teams.example.com/api/"
|
||||
UserSecretTemplate = "%s.%s.credentials.%s.%s"
|
||||
|
||||
PasswordLength = 64
|
||||
TeamsAPIUrl = "https://teams.example.com/api/"
|
||||
OAuthTokenSecretName = "postgresql-operator"
|
||||
ServiceAccountName = "operator"
|
||||
)
|
||||
|
|
|
|||
|
|
@ -33,7 +33,7 @@ func KubernetesClient(config *rest.Config) (client *kubernetes.Clientset, err er
|
|||
return kubernetes.NewForConfig(config)
|
||||
}
|
||||
|
||||
func IsKubernetesResourceAlreadyExistError(err error) bool {
|
||||
func ResourceAlreadyExists(err error) bool {
|
||||
return apierrors.IsAlreadyExists(err)
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -0,0 +1,265 @@
|
|||
package resources
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"k8s.io/client-go/pkg/api/resource"
|
||||
"k8s.io/client-go/pkg/api/v1"
|
||||
"k8s.io/client-go/pkg/apis/apps/v1beta1"
|
||||
extv1beta "k8s.io/client-go/pkg/apis/extensions/v1beta1"
|
||||
"k8s.io/client-go/pkg/labels"
|
||||
"k8s.io/client-go/pkg/util/intstr"
|
||||
|
||||
"github.bus.zalan.do/acid/postgres-operator/pkg/spec"
|
||||
"github.bus.zalan.do/acid/postgres-operator/pkg/util/constants"
|
||||
)
|
||||
|
||||
const (
|
||||
superuserName = "postgres"
|
||||
replicationUsername = "replication"
|
||||
)
|
||||
|
||||
func credentialSecretName(clusterName, username string) string {
|
||||
return fmt.Sprintf(
|
||||
constants.UserSecretTemplate,
|
||||
username,
|
||||
clusterName,
|
||||
constants.TPRName,
|
||||
constants.TPRVendor)
|
||||
}
|
||||
|
||||
func labelsSet(clusterName string) labels.Set {
|
||||
return labels.Set{
|
||||
"application": "spilo",
|
||||
"spilo-cluster": clusterName,
|
||||
}
|
||||
}
|
||||
|
||||
func ResourceList(resources spec.Resources) *v1.ResourceList {
|
||||
resourceList := v1.ResourceList{}
|
||||
if resources.Cpu != "" {
|
||||
resourceList[v1.ResourceCPU] = resource.MustParse(resources.Cpu)
|
||||
}
|
||||
|
||||
if resources.Memory != "" {
|
||||
resourceList[v1.ResourceMemory] = resource.MustParse(resources.Memory)
|
||||
}
|
||||
|
||||
return &resourceList
|
||||
}
|
||||
|
||||
func PodTemplate(cluster spec.ClusterName, resourceList *v1.ResourceList, dockerImage, pgVersion, etcdHost string) *v1.PodTemplateSpec {
|
||||
envVars := []v1.EnvVar{
|
||||
{
|
||||
Name: "SCOPE",
|
||||
Value: cluster.Name,
|
||||
},
|
||||
{
|
||||
Name: "PGROOT",
|
||||
Value: "/home/postgres/pgdata/pgroot",
|
||||
},
|
||||
{
|
||||
Name: "ETCD_HOST",
|
||||
Value: etcdHost,
|
||||
},
|
||||
{
|
||||
Name: "POD_IP",
|
||||
ValueFrom: &v1.EnvVarSource{
|
||||
FieldRef: &v1.ObjectFieldSelector{
|
||||
APIVersion: "v1",
|
||||
FieldPath: "status.podIP",
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: "POD_NAMESPACE",
|
||||
ValueFrom: &v1.EnvVarSource{
|
||||
FieldRef: &v1.ObjectFieldSelector{
|
||||
APIVersion: "v1",
|
||||
FieldPath: "metadata.namespace",
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: "PGPASSWORD_SUPERUSER",
|
||||
ValueFrom: &v1.EnvVarSource{
|
||||
SecretKeyRef: &v1.SecretKeySelector{
|
||||
LocalObjectReference: v1.LocalObjectReference{
|
||||
Name: credentialSecretName(cluster.Name, superuserName),
|
||||
},
|
||||
Key: "password",
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: "PGPASSWORD_STANDBY",
|
||||
ValueFrom: &v1.EnvVarSource{
|
||||
SecretKeyRef: &v1.SecretKeySelector{
|
||||
LocalObjectReference: v1.LocalObjectReference{
|
||||
Name: credentialSecretName(cluster.Name, replicationUsername),
|
||||
},
|
||||
Key: "password",
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: "PAM_OAUTH2", //TODO: get from the operator tpr spec
|
||||
Value: constants.PamConfiguration, //space before uid is obligatory
|
||||
},
|
||||
{
|
||||
Name: "SPILO_CONFIGURATION", //TODO: get from the operator tpr spec
|
||||
Value: fmt.Sprintf(`
|
||||
postgresql:
|
||||
bin_dir: /usr/lib/postgresql/%s/bin
|
||||
bootstrap:
|
||||
initdb:
|
||||
- auth-host: md5
|
||||
- auth-local: trust
|
||||
users:
|
||||
%s:
|
||||
password: NULL
|
||||
options:
|
||||
- createdb
|
||||
- nologin
|
||||
pg_hba:
|
||||
- hostnossl all all all reject
|
||||
- hostssl all +%s all pam
|
||||
- hostssl all all all md5`, pgVersion, constants.PamRoleName, constants.PamRoleName),
|
||||
},
|
||||
}
|
||||
|
||||
container := v1.Container{
|
||||
Name: cluster.Name,
|
||||
Image: dockerImage,
|
||||
ImagePullPolicy: v1.PullAlways,
|
||||
Resources: v1.ResourceRequirements{
|
||||
Requests: *resourceList,
|
||||
},
|
||||
Ports: []v1.ContainerPort{
|
||||
{
|
||||
ContainerPort: 8008,
|
||||
Protocol: v1.ProtocolTCP,
|
||||
},
|
||||
{
|
||||
ContainerPort: 5432,
|
||||
Protocol: v1.ProtocolTCP,
|
||||
},
|
||||
},
|
||||
VolumeMounts: []v1.VolumeMount{
|
||||
{
|
||||
Name: "pgdata",
|
||||
MountPath: "/home/postgres/pgdata", //TODO: fetch from manifesto
|
||||
},
|
||||
},
|
||||
Env: envVars,
|
||||
}
|
||||
terminateGracePeriodSeconds := int64(30)
|
||||
|
||||
podSpec := v1.PodSpec{
|
||||
ServiceAccountName: constants.ServiceAccountName,
|
||||
TerminationGracePeriodSeconds: &terminateGracePeriodSeconds,
|
||||
Volumes: []v1.Volume{
|
||||
{
|
||||
Name: "pgdata",
|
||||
VolumeSource: v1.VolumeSource{EmptyDir: &v1.EmptyDirVolumeSource{}},
|
||||
},
|
||||
},
|
||||
Containers: []v1.Container{container},
|
||||
}
|
||||
|
||||
template := v1.PodTemplateSpec{
|
||||
ObjectMeta: v1.ObjectMeta{
|
||||
Labels: labelsSet(cluster.Name),
|
||||
Namespace: cluster.Namespace,
|
||||
Annotations: map[string]string{"pod.alpha.kubernetes.io/initialized": "true"},
|
||||
},
|
||||
Spec: podSpec,
|
||||
}
|
||||
|
||||
return &template
|
||||
}
|
||||
|
||||
func StatefulSet(cluster spec.ClusterName, podTemplate *v1.PodTemplateSpec, numberOfInstances int32) *v1beta1.StatefulSet {
|
||||
statefulSet := &v1beta1.StatefulSet{
|
||||
ObjectMeta: v1.ObjectMeta{
|
||||
Name: cluster.Name,
|
||||
Namespace: cluster.Namespace,
|
||||
Labels: labelsSet(cluster.Name),
|
||||
},
|
||||
Spec: v1beta1.StatefulSetSpec{
|
||||
Replicas: &numberOfInstances,
|
||||
ServiceName: cluster.Name,
|
||||
Template: *podTemplate,
|
||||
},
|
||||
}
|
||||
|
||||
return statefulSet
|
||||
}
|
||||
|
||||
func UserSecrets(cluster spec.ClusterName, pgUsers map[string]spec.PgUser) (secrets map[string]*v1.Secret, err error) {
|
||||
secrets = make(map[string]*v1.Secret, len(pgUsers))
|
||||
namespace := cluster.Namespace
|
||||
for username, pgUser := range pgUsers {
|
||||
//Skip users with no password i.e. human users (they'll be authenticated using pam)
|
||||
if pgUser.Password == "" {
|
||||
continue
|
||||
}
|
||||
secret := v1.Secret{
|
||||
ObjectMeta: v1.ObjectMeta{
|
||||
Name: credentialSecretName(cluster.Name, username),
|
||||
Namespace: namespace,
|
||||
Labels: labelsSet(cluster.Name),
|
||||
},
|
||||
Type: v1.SecretTypeOpaque,
|
||||
Data: map[string][]byte{
|
||||
"username": []byte(pgUser.Name),
|
||||
"password": []byte(pgUser.Password),
|
||||
},
|
||||
}
|
||||
secrets[username] = &secret
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func Service(cluster spec.ClusterName, allowedSourceRanges []string) *v1.Service {
|
||||
service := &v1.Service{
|
||||
ObjectMeta: v1.ObjectMeta{
|
||||
Name: cluster.Name,
|
||||
Namespace: cluster.Namespace,
|
||||
Labels: labelsSet(cluster.Name),
|
||||
},
|
||||
Spec: v1.ServiceSpec{
|
||||
Type: v1.ServiceTypeLoadBalancer,
|
||||
Ports: []v1.ServicePort{{Port: 5432, TargetPort: intstr.IntOrString{IntVal: 5432}}},
|
||||
LoadBalancerSourceRanges: allowedSourceRanges,
|
||||
},
|
||||
}
|
||||
|
||||
return service
|
||||
}
|
||||
|
||||
func Endpoint(cluster spec.ClusterName) *v1.Endpoints {
|
||||
endpoints := &v1.Endpoints{
|
||||
ObjectMeta: v1.ObjectMeta{
|
||||
Name: cluster.Name,
|
||||
Namespace: cluster.Namespace,
|
||||
Labels: labelsSet(cluster.Name),
|
||||
},
|
||||
}
|
||||
|
||||
return endpoints
|
||||
}
|
||||
|
||||
func ThirdPartyResource(TPRName string) *extv1beta.ThirdPartyResource {
|
||||
return &extv1beta.ThirdPartyResource{
|
||||
ObjectMeta: v1.ObjectMeta{
|
||||
//ThirdPartyResources are cluster-wide
|
||||
Name: TPRName,
|
||||
},
|
||||
Versions: []extv1beta.APIVersion{
|
||||
{Name: constants.TPRApiVersion},
|
||||
},
|
||||
Description: constants.TPRDescription,
|
||||
}
|
||||
}
|
||||
|
|
@ -38,7 +38,7 @@ type Team struct {
|
|||
type TeamsAPI struct {
|
||||
url string
|
||||
httpClient *http.Client
|
||||
OauthToken string
|
||||
OAuthToken string
|
||||
}
|
||||
|
||||
func NewTeamsAPI(url string) *TeamsAPI {
|
||||
|
|
@ -57,7 +57,7 @@ func (t *TeamsAPI) TeamInfo(teamId string) (*Team, error) {
|
|||
return nil, err
|
||||
}
|
||||
|
||||
req.Header.Add("Authorization", "Bearer "+t.OauthToken)
|
||||
req.Header.Add("Authorization", "Bearer "+t.OAuthToken)
|
||||
resp, err := t.httpClient.Do(req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
|
|
|||
|
|
@ -1,11 +1,12 @@
|
|||
package util
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"time"
|
||||
|
||||
"github.bus.zalan.do/acid/postgres-operator/pkg/spec"
|
||||
"k8s.io/client-go/pkg/api/v1"
|
||||
"k8s.io/client-go/pkg/types"
|
||||
)
|
||||
|
||||
var passwordChars = []byte("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789")
|
||||
|
|
@ -23,15 +24,22 @@ func RandomPassword(n int) string {
|
|||
return string(b)
|
||||
}
|
||||
|
||||
func FullObjectNameFromMeta(meta v1.ObjectMeta) string {
|
||||
return FullObjectName(meta.Namespace, meta.Name)
|
||||
}
|
||||
|
||||
//TODO: Remove in favour of FullObjectNameFromMeta
|
||||
func FullObjectName(ns, name string) string {
|
||||
if ns == "" {
|
||||
ns = "default"
|
||||
func NameFromMeta(meta v1.ObjectMeta) types.NamespacedName {
|
||||
obj := types.NamespacedName{
|
||||
Namespace: meta.Namespace,
|
||||
Name: meta.Name,
|
||||
}
|
||||
|
||||
return fmt.Sprintf("%s / %s", ns, name)
|
||||
return obj
|
||||
}
|
||||
|
||||
func PodClusterName(pod *v1.Pod) spec.ClusterName {
|
||||
if name, ok := pod.Labels["spilo-cluster"]; ok {
|
||||
return spec.ClusterName{
|
||||
Namespace: pod.Namespace,
|
||||
Name: name,
|
||||
}
|
||||
}
|
||||
|
||||
return spec.ClusterName{}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue