Merge branch 'master' into support-many-namespaces

This commit is contained in:
Sergey Dudoladov 2018-02-16 15:06:10 +01:00
commit 088bf70e7d
9 changed files with 245 additions and 49 deletions

View File

@ -30,10 +30,14 @@ it manages and updates them with the new docker images; afterwards, all pods fro
This project is currently in active development. It is however already [used internally by Zalando](https://jobs.zalando.com/tech/blog/postgresql-in-a-time-of-kubernetes/) in order to run Postgres databases on Kubernetes in larger numbers for staging environments and a smaller number of production databases. In this environment the operator is deployed to multiple Kubernetes clusters, where users deploy manifests via our CI/CD infrastructure. This project is currently in active development. It is however already [used internally by Zalando](https://jobs.zalando.com/tech/blog/postgresql-in-a-time-of-kubernetes/) in order to run Postgres databases on Kubernetes in larger numbers for staging environments and a smaller number of production databases. In this environment the operator is deployed to multiple Kubernetes clusters, where users deploy manifests via our CI/CD infrastructure.
There is a talk about this project delivered by Josh Berkus on KubeCon 2017: [Kube-native Postgres](https://www.youtube.com/watch?v=Zn1vd7sQ_bc)
Please, report any issues discovered to https://github.com/zalando-incubator/postgres-operator/issues. Please, report any issues discovered to https://github.com/zalando-incubator/postgres-operator/issues.
## Talks
1. "Blue elephant on-demand: Postgres + Kubernetes" talk by Oleksii Kliukin and Jan Mussler, FOSDEM 2018: [video](https://fosdem.org/2018/schedule/event/blue_elephant_on_demand_postgres_kubernetes/) | [slides (pdf)](https://www.postgresql.eu/events/fosdem2018/sessions/session/1735/slides/59/FOSDEM%202018_%20Blue_Elephant_On_Demand.pdf)
2. "Kube-Native Postgres" talk by Josh Berkus, KubeCon 2017: [video](https://www.youtube.com/watch?v=Zn1vd7sQ_bc)
## Running and testing the operator ## Running and testing the operator
The best way to test the operator is to run it in [minikube](https://kubernetes.io/docs/getting-started-guides/minikube/). The best way to test the operator is to run it in [minikube](https://kubernetes.io/docs/getting-started-guides/minikube/).
@ -57,6 +61,23 @@ to test your that your setup is working.
Note: if you use multiple Kubernetes clusters, you can switch to Minikube with `kubectl config use-context minikube` Note: if you use multiple Kubernetes clusters, you can switch to Minikube with `kubectl config use-context minikube`
### Select the namespace to deploy to
The operator can run in a namespace other than `default`. For example, to use the `test` namespace, run the following before deploying the operator's manifests:
kubectl create namespace test
kubectl config set-context minikube --namespace=test
All subsequent `kubectl` commands will work with the `test` namespace. The operator will run in this namespace and look up needed resources - such as its config map - there.
### Specify the namespace to watch
Watching a namespace for an operator means tracking requests to change Postgresql clusters in the namespace such as "increase the number of Postgresql replicas to 5" and reacting to the requests, in this example by actually scaling up.
By default, the operator watches the namespace it is deployed to. You can change this by altering the `WATCHED_NAMESPACE` env var in the operator deployment manifest or the `watched_namespace` field in the operator configmap. In the case both are set, the env var takes the precedence.
Note that for an operator to manage pods in the watched namespace, the operator's service account (as specified in the operator deployment manifest) has to have appropriate privileges to access the watched namespace. The watched namespace also needs to have a (possibly different) service account in the case database pods need to talk to the Kubernetes API (e.g. when using Kubernetes-native configuration of Patroni).
### Create ConfigMap ### Create ConfigMap
ConfigMap is used to store the configuration of the operator ConfigMap is used to store the configuration of the operator
@ -350,3 +371,27 @@ kubectl port-forward POD_NAME DLV_PORT:DLV_PORT
``` ```
$ dlv connect 127.0.0.1:DLV_PORT $ dlv connect 127.0.0.1:DLV_PORT
``` ```
### Unit tests
To run all unit tests, you can simply do:
```
$ go test ./...
```
For go 1.9 `vendor` directory would be excluded automatically. For previous
versions you can exclude it manually:
```
$ go test $(glide novendor)
```
In case if you need to debug your unit test, it's possible to use delve:
```
$ dlv test ./pkg/util/retryutil/
Type 'help' for list of commands.
(dlv) c
PASS
```

View File

@ -29,10 +29,14 @@ func init() {
configMapRawName := os.Getenv("CONFIG_MAP_NAME") configMapRawName := os.Getenv("CONFIG_MAP_NAME")
if configMapRawName != "" { if configMapRawName != "" {
err := config.ConfigMapName.Decode(configMapRawName) err := config.ConfigMapName.Decode(configMapRawName)
if err != nil { if err != nil {
log.Fatalf("incorrect config map name") log.Fatalf("incorrect config map name: %v", configMapRawName)
} }
log.Printf("Fully qualified configmap name: %v", config.ConfigMapName)
} }
} }

View File

@ -1,33 +1,55 @@
build_steps: version: "2017-09-20"
- desc: 'Install required build software' pipeline:
cmd: | - id: build-postgres-operator
apt-get install -y make git apt-transport-https ca-certificates curl type: script
- desc: 'Install go' env:
cmd: | GOPATH: /root/go
cd /tmp OPERATOR_TOP_DIR: /root/go/src/github.com/zalando-incubator
wget -q https://storage.googleapis.com/golang/go1.9.linux-amd64.tar.gz -O go.tar.gz commands:
tar -xf go.tar.gz - desc: 'Install required build software'
mv go /usr/local cmd: |
ln -s /usr/local/go/bin/go /usr/bin/go apt-get install -y make git apt-transport-https ca-certificates curl
go version - desc: 'Install go'
- desc: 'Install Docker' cmd: |
cmd: | cd /tmp
curl -sSL https://get.docker.com/ | sh wget -q https://storage.googleapis.com/golang/go1.9.linux-amd64.tar.gz -O go.tar.gz
- desc: 'Symlink sources into the GOPATH' tar -xf go.tar.gz
cmd: | mv go /usr/local
export GOPATH=$HOME/go ln -s /usr/local/go/bin/go /usr/bin/go
export OPERATOR_TOP_DIR=$GOPATH/src/github.com/zalando-incubator go version
mkdir -p $OPERATOR_TOP_DIR - desc: 'Install Docker'
ln -s $(pwd) $OPERATOR_TOP_DIR/postgres-operator cmd: |
- desc: 'Build & push docker image' curl -sSL https://get.docker.com/ | sh
cmd: | - desc: 'Symlink sources into the GOPATH'
export PATH=$PATH:$HOME/go/bin cmd: |
IS_PR_BUILD=${CDP_PULL_REQUEST_NUMBER+"true"} mkdir -p $OPERATOR_TOP_DIR
if [[ ${CDP_TARGET_BRANCH} == "master" && ${IS_PR_BUILD} != "true" ]] ln -s $(pwd) $OPERATOR_TOP_DIR/postgres-operator
then - desc: 'Build docker image'
IMAGE=registry-write.opensource.zalan.do/acid/postgres-operator cmd: |
else export PATH=$PATH:$HOME/go/bin
IMAGE=registry-write.opensource.zalan.do/acid/postgres-operator-test IS_PR_BUILD=${CDP_PULL_REQUEST_NUMBER+"true"}
fi if [[ ${CDP_TARGET_BRANCH} == "master" && ${IS_PR_BUILD} != "true" ]]
export IMAGE then
make tools deps docker push IMAGE=registry-write.opensource.zalan.do/acid/postgres-operator
else
IMAGE=registry-write.opensource.zalan.do/acid/postgres-operator-test
fi
export IMAGE
make tools deps docker
- desc: 'Run unit tests'
cmd: |
export PATH=$PATH:$HOME/go/bin
cd $OPERATOR_TOP_DIR/postgres-operator
go test ./...
- desc: 'Push docker image'
cmd: |
export PATH=$PATH:$HOME/go/bin
IS_PR_BUILD=${CDP_PULL_REQUEST_NUMBER+"true"}
if [[ ${CDP_TARGET_BRANCH} == "master" && ${IS_PR_BUILD} != "true" ]]
then
IMAGE=registry-write.opensource.zalan.do/acid/postgres-operator
else
IMAGE=registry-write.opensource.zalan.do/acid/postgres-operator-test
fi
export IMAGE
make push

View File

@ -101,8 +101,10 @@ func (c *Controller) initOperatorConfig() {
watchedNsEnvVar, isPresentInOperatorEnv := os.LookupEnv("WATCHED_NAMESPACE") watchedNsEnvVar, isPresentInOperatorEnv := os.LookupEnv("WATCHED_NAMESPACE")
if (!isPresentInOperatorConfigMap) && (!isPresentInOperatorEnv) { if (!isPresentInOperatorConfigMap) && (!isPresentInOperatorEnv) {
c.logger.Infoln("Neither the operator config map nor operator pod's environment define a namespace to watch. Fall back to watching the 'default' namespace.")
configMapData["watched_namespace"] = v1.NamespaceDefault c.logger.Infof("No namespace to watch specified. By convention, the operator falls back to watching the namespace it is deployed to: '%v' \n", spec.GetOperatorNamespace())
configMapData["watched_namespace"] = spec.GetOperatorNamespace()
} }
if (isPresentInOperatorConfigMap) && (!isPresentInOperatorEnv) { if (isPresentInOperatorConfigMap) && (!isPresentInOperatorEnv) {

View File

@ -3,6 +3,8 @@ package spec
import ( import (
"database/sql" "database/sql"
"fmt" "fmt"
"io/ioutil"
"log"
"strings" "strings"
"time" "time"
@ -26,6 +28,8 @@ const (
EventUpdate EventType = "UPDATE" EventUpdate EventType = "UPDATE"
EventDelete EventType = "DELETE" EventDelete EventType = "DELETE"
EventSync EventType = "SYNC" EventSync EventType = "SYNC"
fileWithNamespace = "/var/run/secrets/kubernetes.io/serviceaccount/namespace"
) )
// ClusterEvent carries the payload of the Cluster TPR events. // ClusterEvent carries the payload of the Cluster TPR events.
@ -161,20 +165,38 @@ func (n NamespacedName) MarshalJSON() ([]byte, error) {
// Decode converts a (possibly unqualified) string into the namespaced name object. // Decode converts a (possibly unqualified) string into the namespaced name object.
func (n *NamespacedName) Decode(value string) error { func (n *NamespacedName) Decode(value string) error {
return n.DecodeWorker(value, GetOperatorNamespace())
}
// DecodeWorker separates the decode logic to (unit) test
// from obtaining the operator namespace that depends on k8s mounting files at runtime
func (n *NamespacedName) DecodeWorker(value, operatorNamespace string) error {
name := types.NewNamespacedNameFromString(value) name := types.NewNamespacedNameFromString(value)
if strings.Trim(value, string(types.Separator)) != "" && name == (types.NamespacedName{}) { if strings.Trim(value, string(types.Separator)) != "" && name == (types.NamespacedName{}) {
name.Name = value name.Name = value
name.Namespace = v1.NamespaceDefault name.Namespace = operatorNamespace
} else if name.Namespace == "" { } else if name.Namespace == "" {
name.Namespace = v1.NamespaceDefault name.Namespace = operatorNamespace
} }
if name.Name == "" { if name.Name == "" {
return fmt.Errorf("incorrect namespaced name") return fmt.Errorf("incorrect namespaced name: %v", value)
} }
*n = NamespacedName(name) *n = NamespacedName(name)
return nil return nil
} }
// GetOperatorNamespace assumes serviceaccount secret is mounted by kubernetes
// Placing this func here instead of pgk/util avoids circular import
func GetOperatorNamespace() string {
operatorNamespaceBytes, err := ioutil.ReadFile(fileWithNamespace)
if err != nil {
log.Fatalf("Unable to detect operator namespace from within its pod due to: %v", err)
}
return string(operatorNamespaceBytes)
}

View File

@ -5,22 +5,27 @@ import (
"testing" "testing"
) )
const (
mockOperatorNamespace = "acid"
)
var nnTests = []struct { var nnTests = []struct {
s string s string
expected NamespacedName expected NamespacedName
expectedMarshal []byte expectedMarshal []byte
}{ }{
{`acid/cluster`, NamespacedName{Namespace: "acid", Name: "cluster"}, []byte(`"acid/cluster"`)}, {`acid/cluster`, NamespacedName{Namespace: mockOperatorNamespace, Name: "cluster"}, []byte(`"acid/cluster"`)},
{`/name`, NamespacedName{Namespace: "default", Name: "name"}, []byte(`"default/name"`)}, {`/name`, NamespacedName{Namespace: mockOperatorNamespace, Name: "name"}, []byte(`"acid/name"`)},
{`test`, NamespacedName{Namespace: "default", Name: "test"}, []byte(`"default/test"`)}, {`test`, NamespacedName{Namespace: mockOperatorNamespace, Name: "test"}, []byte(`"acid/test"`)},
} }
var nnErr = []string{"test/", "/", "", "//"} var nnErr = []string{"test/", "/", "", "//"}
func TestNamespacedNameDecode(t *testing.T) { func TestNamespacedNameDecode(t *testing.T) {
for _, tt := range nnTests { for _, tt := range nnTests {
var actual NamespacedName var actual NamespacedName
err := actual.Decode(tt.s) err := actual.DecodeWorker(tt.s, mockOperatorNamespace)
if err != nil { if err != nil {
t.Errorf("decode error: %v", err) t.Errorf("decode error: %v", err)
} }
@ -28,6 +33,7 @@ func TestNamespacedNameDecode(t *testing.T) {
t.Errorf("expected: %v, got %#v", tt.expected, actual) t.Errorf("expected: %v, got %#v", tt.expected, actual)
} }
} }
} }
func TestNamespacedNameMarshal(t *testing.T) { func TestNamespacedNameMarshal(t *testing.T) {
@ -47,7 +53,7 @@ func TestNamespacedNameMarshal(t *testing.T) {
func TestNamespacedNameError(t *testing.T) { func TestNamespacedNameError(t *testing.T) {
for _, tt := range nnErr { for _, tt := range nnErr {
var actual NamespacedName var actual NamespacedName
err := actual.Decode(tt) err := actual.DecodeWorker(tt, mockOperatorNamespace)
if err == nil { if err == nil {
t.Errorf("error expected for %q, got: %#v", tt, actual) t.Errorf("error expected for %q, got: %#v", tt, actual)
} }

View File

@ -33,6 +33,7 @@ type KubernetesClient struct {
v1core.ConfigMapsGetter v1core.ConfigMapsGetter
v1core.NodesGetter v1core.NodesGetter
v1core.NamespacesGetter v1core.NamespacesGetter
v1core.ServiceAccountsGetter
v1beta1.StatefulSetsGetter v1beta1.StatefulSetsGetter
policyv1beta1.PodDisruptionBudgetsGetter policyv1beta1.PodDisruptionBudgetsGetter
apiextbeta1.CustomResourceDefinitionsGetter apiextbeta1.CustomResourceDefinitionsGetter
@ -73,6 +74,7 @@ func NewFromConfig(cfg *rest.Config) (KubernetesClient, error) {
kubeClient.ServicesGetter = client.CoreV1() kubeClient.ServicesGetter = client.CoreV1()
kubeClient.EndpointsGetter = client.CoreV1() kubeClient.EndpointsGetter = client.CoreV1()
kubeClient.SecretsGetter = client.CoreV1() kubeClient.SecretsGetter = client.CoreV1()
kubeClient.ServiceAccountsGetter = client.CoreV1()
kubeClient.ConfigMapsGetter = client.CoreV1() kubeClient.ConfigMapsGetter = client.CoreV1()
kubeClient.PersistentVolumeClaimsGetter = client.CoreV1() kubeClient.PersistentVolumeClaimsGetter = client.CoreV1()
kubeClient.PersistentVolumesGetter = client.CoreV1() kubeClient.PersistentVolumesGetter = client.CoreV1()

View File

@ -5,14 +5,39 @@ import (
"time" "time"
) )
// Retry calls ConditionFunc until it returns boolean true, a timeout expires or an error occurs. type RetryTicker interface {
Stop()
Tick()
}
type Ticker struct {
ticker *time.Ticker
}
func (t *Ticker) Stop() { t.ticker.Stop() }
func (t *Ticker) Tick() { <-t.ticker.C }
// Retry calls ConditionFunc until either:
// * it returns boolean true
// * a timeout expires
// * an error occurs
func Retry(interval time.Duration, timeout time.Duration, f func() (bool, error)) error { func Retry(interval time.Duration, timeout time.Duration, f func() (bool, error)) error {
//TODO: make the retry exponential //TODO: make the retry exponential
if timeout < interval { if timeout < interval {
return fmt.Errorf("timout(%s) should be greater than interval(%v)", timeout, interval) return fmt.Errorf("timout(%s) should be greater than interval(%v)", timeout, interval)
} }
tick := &Ticker{time.NewTicker(interval)}
return RetryWorker(interval, timeout, tick, f)
}
func RetryWorker(
interval time.Duration,
timeout time.Duration,
tick RetryTicker,
f func() (bool, error)) error {
maxRetries := int(timeout / interval) maxRetries := int(timeout / interval)
tick := time.NewTicker(interval)
defer tick.Stop() defer tick.Stop()
for i := 0; ; i++ { for i := 0; ; i++ {
@ -26,7 +51,7 @@ func Retry(interval time.Duration, timeout time.Duration, f func() (bool, error)
if i+1 == maxRetries { if i+1 == maxRetries {
break break
} }
<-tick.C tick.Tick()
} }
return fmt.Errorf("still failing after %d retries", maxRetries) return fmt.Errorf("still failing after %d retries", maxRetries)
} }

View File

@ -0,0 +1,68 @@
package retryutil
import (
"errors"
"testing"
)
type mockTicker struct {
test *testing.T
counter int
}
func (t *mockTicker) Stop() {}
func (t *mockTicker) Tick() {
t.counter += 1
}
func TestRetryWorkerSuccess(t *testing.T) {
tick := &mockTicker{t, 0}
result := RetryWorker(10, 20, tick, func() (bool, error) {
return true, nil
})
if result != nil {
t.Errorf("Wrong result, expected: %#v, got: %#v", nil, result)
}
if tick.counter != 0 {
t.Errorf("Ticker was started once, but it shouldn't be")
}
}
func TestRetryWorkerOneFalse(t *testing.T) {
var counter = 0
tick := &mockTicker{t, 0}
result := RetryWorker(1, 3, tick, func() (bool, error) {
counter += 1
if counter <= 1 {
return false, nil
} else {
return true, nil
}
})
if result != nil {
t.Errorf("Wrong result, expected: %#v, got: %#v", nil, result)
}
if tick.counter != 1 {
t.Errorf("Ticker was started %#v, but supposed to be just once", tick.counter)
}
}
func TestRetryWorkerError(t *testing.T) {
fail := errors.New("Error")
tick := &mockTicker{t, 0}
result := RetryWorker(1, 3, tick, func() (bool, error) {
return false, fail
})
if result != fail {
t.Errorf("Wrong result, expected: %#v, got: %#v", fail, result)
}
}