Simple implementation of the tillerless mode (#531)

Ref #449
This commit is contained in:
Patrick Valsecchi 2019-04-05 12:02:37 +02:00 committed by KUOKA Yusuke
parent 72c43a2f46
commit 1acd07fa7e
13 changed files with 312 additions and 118 deletions

View File

@ -42,6 +42,7 @@ clean:
.PHONY: clean
pristine: generate fmt
git diff | cat
git ls-files --exclude-standard --modified --deleted --others | diff /dev/null -
.PHONY: pristine

View File

@ -43,6 +43,7 @@ repositories:
#default values to set for args along with dedicated keys that can be set by contributers, cli args take precedence over these
helmDefaults:
tillerNamespace: tiller-namespace #dedicated default key for tiller-namespace
tillerless: false #dedicated default key for tillerless
kubeContext: kube-context #dedicated default key for kube-context (--kube-context)
# additional and global args passed to helm
args:
@ -110,6 +111,10 @@ releases:
installed: true
# restores previous state in case of failed release
atomic: true
# name of the tiller namespace
tillerNamespace: vault
# if true, will use the helm-tiller plugin
tillerless: false
# enable TLS for request to Tiller
tls: true
# path to TLS CA certificate file (default "$HELM_HOME/ca.pem")
@ -557,6 +562,16 @@ Then the environment secret `foo.bar` can be referenced by the below template ex
{{ .Environment.Values.foo.bar }}
```
## Tillerless
With the [helm-tiller](https://github.com/rimusz/helm-tiller) plugin installed, you can work without tiller installed.
To enable this mode, you need to define `tillerless: true` and set the `tillerNamespace` in the `helmDefaults` section
or in the `releases` entries.
Since every commands is run with `helm tiller run ...`, you have to disable concurrency. Otherwise you'll get
mysterious errors about the tiller daemon.
## Separating helmfile.yaml into multiple independent files
Once your `helmfile.yaml` got to contain too many releases,

View File

@ -88,7 +88,7 @@ func (bus *Bus) Trigger(evt string, context map[string]interface{}) (bool, error
}
}
bytes, err := bus.Runner.Execute(command, args)
bytes, err := bus.Runner.Execute(command, args, map[string]string{})
bus.Logger.Debugf("hook[%s]: %s\n", name, string(bytes))
if err != nil {

View File

@ -13,7 +13,7 @@ var logger = helmexec.NewLogger(os.Stdout, "warn")
type runner struct {
}
func (r *runner) Execute(cmd string, args []string) ([]byte, error) {
func (r *runner) Execute(cmd string, args []string, env map[string]string) ([]byte, error) {
if cmd == "ng" {
return nil, fmt.Errorf("cmd failed due to invalid cmd: %s", cmd)
}

44
helmexec/context.go Normal file
View File

@ -0,0 +1,44 @@
package helmexec
import (
"os"
"path/filepath"
)
type HelmContext struct {
Tillerless bool
TillerNamespace string
WorkerIndex int
}
func (context *HelmContext) GetTillerlessArgs(helmBinary string) []string {
if context.Tillerless {
if context.TillerNamespace != "" {
return []string{"tiller", "run", context.TillerNamespace, "--", helmBinary}
} else {
return []string{"tiller", "run", "--", helmBinary}
}
} else {
return []string{}
}
}
func (context *HelmContext) getTillerlessEnv() map[string]string {
if context.Tillerless {
result := map[string]string{
"HELM_TILLER_SILENT": "true",
// Changing the TILLER port doesn't really work: https://github.com/helm/helm/issues/3159
// So this is not used for the moment.
// "HELM_TILLER_PORT": strconv.Itoa(44134 + context.WorkerIndex),
}
if config := os.Getenv("KUBECONFIG"); config != "" {
absConfig, err := filepath.Abs(config)
if err == nil {
result["KUBECONFIG"] = absConfig
}
}
return result
} else {
return map[string]string{}
}
}

View File

@ -70,60 +70,68 @@ func (helm *execer) AddRepo(name, repository, certfile, keyfile, username, passw
args = append(args, "--username", username, "--password", password)
}
helm.logger.Infof("Adding repo %v %v", name, repository)
out, err := helm.exec(args...)
out, err := helm.exec(args, map[string]string{})
helm.write(out)
return err
}
func (helm *execer) UpdateRepo() error {
helm.logger.Info("Updating repo")
out, err := helm.exec("repo", "update")
out, err := helm.exec([]string{"repo", "update"}, map[string]string{})
helm.write(out)
return err
}
func (helm *execer) UpdateDeps(chart string) error {
helm.logger.Infof("Updating dependency %v", chart)
out, err := helm.exec("dependency", "update", chart)
out, err := helm.exec([]string{"dependency", "update", chart}, map[string]string{})
helm.write(out)
return err
}
func (helm *execer) BuildDeps(chart string) error {
helm.logger.Infof("Building dependency %v", chart)
out, err := helm.exec("dependency", "build", chart)
out, err := helm.exec([]string{"dependency", "build", chart}, map[string]string{})
helm.write(out)
return err
}
func (helm *execer) SyncRelease(name, chart string, flags ...string) error {
func (helm *execer) SyncRelease(context HelmContext, name, chart string, flags ...string) error {
helm.logger.Infof("Upgrading %v", chart)
out, err := helm.exec(append([]string{"upgrade", "--install", "--reset-values", name, chart}, flags...)...)
preArgs := context.GetTillerlessArgs(helm.helmBinary)
env := context.getTillerlessEnv()
out, err := helm.exec(append(append(preArgs, "upgrade", "--install", "--reset-values", name, chart), flags...), env)
helm.write(out)
return err
}
func (helm *execer) ReleaseStatus(name string, flags ...string) error {
func (helm *execer) ReleaseStatus(context HelmContext, name string, flags ...string) error {
helm.logger.Infof("Getting status %v", name)
out, err := helm.exec(append([]string{"status", name}, flags...)...)
preArgs := context.GetTillerlessArgs(helm.helmBinary)
env := context.getTillerlessEnv()
out, err := helm.exec(append(append(preArgs, "status", name), flags...), env)
helm.write(out)
return err
}
func (helm *execer) List(filter string, flags ...string) (string, error) {
func (helm *execer) List(context HelmContext, filter string, flags ...string) (string, error) {
helm.logger.Infof("Listing releases matching %v", filter)
out, err := helm.exec(append([]string{"list", filter}, flags...)...)
preArgs := context.GetTillerlessArgs(helm.helmBinary)
env := context.getTillerlessEnv()
out, err := helm.exec(append(append(preArgs, "list", filter), flags...), env)
helm.write(out)
return string(out), err
}
func (helm *execer) DecryptSecret(name string) (string, error) {
func (helm *execer) DecryptSecret(context HelmContext, name string, flags ...string) (string, error) {
// Prevents https://github.com/roboll/helmfile/issues/258
helm.decryptionMutex.Lock()
defer helm.decryptionMutex.Unlock()
helm.logger.Infof("Decrypting secret %v", name)
out, err := helm.exec(append([]string{"secrets", "dec", name})...)
preArgs := context.GetTillerlessArgs(helm.helmBinary)
env := context.getTillerlessEnv()
out, err := helm.exec(append(append(preArgs, "secrets", "dec", name), flags...), env)
helm.write(out)
if err != nil {
return "", err
@ -162,47 +170,53 @@ func (helm *execer) DecryptSecret(name string) (string, error) {
}
func (helm *execer) TemplateRelease(chart string, flags ...string) error {
out, err := helm.exec(append([]string{"template", chart}, flags...)...)
out, err := helm.exec(append([]string{"template", chart}, flags...), map[string]string{})
helm.write(out)
return err
}
func (helm *execer) DiffRelease(name, chart string, flags ...string) error {
func (helm *execer) DiffRelease(context HelmContext, name, chart string, flags ...string) error {
helm.logger.Infof("Comparing %v %v", name, chart)
out, err := helm.exec(append([]string{"diff", "upgrade", "--allow-unreleased", name, chart}, flags...)...)
preArgs := context.GetTillerlessArgs(helm.helmBinary)
env := context.getTillerlessEnv()
out, err := helm.exec(append(append(preArgs, "diff", "upgrade", "--allow-unreleased", name, chart), flags...), env)
helm.write(out)
return err
}
func (helm *execer) Lint(chart string, flags ...string) error {
helm.logger.Infof("Linting %v", chart)
out, err := helm.exec(append([]string{"lint", chart}, flags...)...)
out, err := helm.exec(append([]string{"lint", chart}, flags...), map[string]string{})
helm.write(out)
return err
}
func (helm *execer) Fetch(chart string, flags ...string) error {
helm.logger.Infof("Fetching %v", chart)
out, err := helm.exec(append([]string{"fetch", chart}, flags...)...)
out, err := helm.exec(append([]string{"fetch", chart}, flags...), map[string]string{})
helm.write(out)
return err
}
func (helm *execer) DeleteRelease(name string, flags ...string) error {
func (helm *execer) DeleteRelease(context HelmContext, name string, flags ...string) error {
helm.logger.Infof("Deleting %v", name)
out, err := helm.exec(append([]string{"delete", name}, flags...)...)
preArgs := context.GetTillerlessArgs(helm.helmBinary)
env := context.getTillerlessEnv()
out, err := helm.exec(append(append(preArgs, "delete", name), flags...), env)
helm.write(out)
return err
}
func (helm *execer) TestRelease(name string, flags ...string) error {
func (helm *execer) TestRelease(context HelmContext, name string, flags ...string) error {
helm.logger.Infof("Testing %v", name)
out, err := helm.exec(append([]string{"test", name}, flags...)...)
preArgs := context.GetTillerlessArgs(helm.helmBinary)
env := context.getTillerlessEnv()
out, err := helm.exec(append(append(preArgs, "test", name), flags...), env)
helm.write(out)
return err
}
func (helm *execer) exec(args ...string) ([]byte, error) {
func (helm *execer) exec(args []string, env map[string]string) ([]byte, error) {
cmdargs := args
if len(helm.extra) > 0 {
cmdargs = append(cmdargs, helm.extra...)
@ -211,7 +225,7 @@ func (helm *execer) exec(args ...string) ([]byte, error) {
cmdargs = append(cmdargs, "--kube-context", helm.kubeContext)
}
helm.logger.Debugf("exec: %s %s", helm.helmBinary, strings.Join(cmdargs, " "))
return helm.runner.Execute(helm.helmBinary, cmdargs)
return helm.runner.Execute(helm.helmBinary, cmdargs, env)
}
func (helm *execer) write(out []byte) {

View File

@ -3,6 +3,7 @@ package helmexec
import (
"bytes"
"os"
"path"
"reflect"
"testing"
@ -16,7 +17,7 @@ type mockRunner struct {
err error
}
func (mock *mockRunner) Execute(cmd string, args []string) ([]byte, error) {
func (mock *mockRunner) Execute(cmd string, args []string, env map[string]string) ([]byte, error) {
return []byte{}, nil
}
@ -118,7 +119,7 @@ func Test_SyncRelease(t *testing.T) {
var buffer bytes.Buffer
logger := NewLogger(&buffer, "debug")
helm := MockExecer(logger, "dev")
helm.SyncRelease("release", "chart", "--timeout 10", "--wait")
helm.SyncRelease(HelmContext{}, "release", "chart", "--timeout 10", "--wait")
expected := `Upgrading chart
exec: helm upgrade --install --reset-values release chart --timeout 10 --wait --kube-context dev
`
@ -127,7 +128,7 @@ exec: helm upgrade --install --reset-values release chart --timeout 10 --wait --
}
buffer.Reset()
helm.SyncRelease("release", "chart")
helm.SyncRelease(HelmContext{}, "release", "chart")
expected = `Upgrading chart
exec: helm upgrade --install --reset-values release chart --kube-context dev
`
@ -136,6 +137,20 @@ exec: helm upgrade --install --reset-values release chart --kube-context dev
}
}
func Test_SyncReleaseTillerless(t *testing.T) {
var buffer bytes.Buffer
logger := NewLogger(&buffer, "debug")
helm := MockExecer(logger, "dev")
helm.SyncRelease(HelmContext{Tillerless: true, TillerNamespace: "foo"}, "release", "chart",
"--timeout 10", "--wait")
expected := `Upgrading chart
exec: helm tiller run foo -- helm upgrade --install --reset-values release chart --timeout 10 --wait --kube-context dev
`
if buffer.String() != expected {
t.Errorf("helmexec.SyncRelease()\nactual = %v\nexpect = %v", buffer.String(), expected)
}
}
func Test_UpdateDeps(t *testing.T) {
var buffer bytes.Buffer
logger := NewLogger(&buffer, "debug")
@ -186,7 +201,7 @@ func Test_DecryptSecret(t *testing.T) {
var buffer bytes.Buffer
logger := NewLogger(&buffer, "debug")
helm := MockExecer(logger, "dev")
helm.DecryptSecret("secretName")
helm.DecryptSecret(HelmContext{}, "secretName")
expected := `Decrypting secret secretName
exec: helm secrets dec secretName --kube-context dev
`
@ -199,7 +214,7 @@ func Test_DiffRelease(t *testing.T) {
var buffer bytes.Buffer
logger := NewLogger(&buffer, "debug")
helm := MockExecer(logger, "dev")
helm.DiffRelease("release", "chart", "--timeout 10", "--wait")
helm.DiffRelease(HelmContext{}, "release", "chart", "--timeout 10", "--wait")
expected := `Comparing release chart
exec: helm diff upgrade --allow-unreleased release chart --timeout 10 --wait --kube-context dev
`
@ -208,7 +223,7 @@ exec: helm diff upgrade --allow-unreleased release chart --timeout 10 --wait --k
}
buffer.Reset()
helm.DiffRelease("release", "chart")
helm.DiffRelease(HelmContext{}, "release", "chart")
expected = `Comparing release chart
exec: helm diff upgrade --allow-unreleased release chart --kube-context dev
`
@ -217,11 +232,24 @@ exec: helm diff upgrade --allow-unreleased release chart --kube-context dev
}
}
func Test_DiffReleaseTillerless(t *testing.T) {
var buffer bytes.Buffer
logger := NewLogger(&buffer, "debug")
helm := MockExecer(logger, "dev")
helm.DiffRelease(HelmContext{Tillerless: true}, "release", "chart", "--timeout 10", "--wait")
expected := `Comparing release chart
exec: helm tiller run -- helm diff upgrade --allow-unreleased release chart --timeout 10 --wait --kube-context dev
`
if buffer.String() != expected {
t.Errorf("helmexec.DiffRelease()\nactual = %v\nexpect = %v", buffer.String(), expected)
}
}
func Test_DeleteRelease(t *testing.T) {
var buffer bytes.Buffer
logger := NewLogger(&buffer, "debug")
helm := MockExecer(logger, "dev")
helm.DeleteRelease("release")
helm.DeleteRelease(HelmContext{}, "release")
expected := `Deleting release
exec: helm delete release --kube-context dev
`
@ -233,7 +261,7 @@ func Test_DeleteRelease_Flags(t *testing.T) {
var buffer bytes.Buffer
logger := NewLogger(&buffer, "debug")
helm := MockExecer(logger, "dev")
helm.DeleteRelease("release", "--purge")
helm.DeleteRelease(HelmContext{}, "release", "--purge")
expected := `Deleting release
exec: helm delete release --purge --kube-context dev
`
@ -246,7 +274,7 @@ func Test_TestRelease(t *testing.T) {
var buffer bytes.Buffer
logger := NewLogger(&buffer, "debug")
helm := MockExecer(logger, "dev")
helm.TestRelease("release")
helm.TestRelease(HelmContext{}, "release")
expected := `Testing release
exec: helm test release --kube-context dev
`
@ -258,7 +286,7 @@ func Test_TestRelease_Flags(t *testing.T) {
var buffer bytes.Buffer
logger := NewLogger(&buffer, "debug")
helm := MockExecer(logger, "dev")
helm.TestRelease("release", "--cleanup", "--timeout", "60")
helm.TestRelease(HelmContext{}, "release", "--cleanup", "--timeout", "60")
expected := `Testing release
exec: helm test release --cleanup --timeout 60 --kube-context dev
`
@ -271,7 +299,7 @@ func Test_ReleaseStatus(t *testing.T) {
var buffer bytes.Buffer
logger := NewLogger(&buffer, "debug")
helm := MockExecer(logger, "dev")
helm.ReleaseStatus("myRelease")
helm.ReleaseStatus(HelmContext{}, "myRelease")
expected := `Getting status myRelease
exec: helm status myRelease --kube-context dev
`
@ -284,28 +312,29 @@ func Test_exec(t *testing.T) {
var buffer bytes.Buffer
logger := NewLogger(&buffer, "debug")
helm := MockExecer(logger, "")
helm.exec("version")
env := map[string]string{}
helm.exec([]string{"version"}, env)
expected := "exec: helm version\n"
if buffer.String() != expected {
t.Errorf("helmexec.exec()\nactual = %v\nexpect = %v", buffer.String(), expected)
}
helm = MockExecer(logger, "dev")
ret, _ := helm.exec("diff")
ret, _ := helm.exec([]string{"diff"}, env)
if len(ret) != 0 {
t.Error("helmexec.exec() - expected empty return value")
}
buffer.Reset()
helm = MockExecer(logger, "dev")
helm.exec("diff", "release", "chart", "--timeout 10", "--wait")
helm.exec([]string{"diff", "release", "chart", "--timeout 10", "--wait"}, env)
expected = "exec: helm diff release chart --timeout 10 --wait --kube-context dev\n"
if buffer.String() != expected {
t.Errorf("helmexec.exec()\nactual = %v\nexpect = %v", buffer.String(), expected)
}
buffer.Reset()
helm.exec("version")
helm.exec([]string{"version"}, env)
expected = "exec: helm version --kube-context dev\n"
if buffer.String() != expected {
t.Errorf("helmexec.exec()\nactual = %v\nexpect = %v", buffer.String(), expected)
@ -313,7 +342,7 @@ func Test_exec(t *testing.T) {
buffer.Reset()
helm.SetExtraArgs("foo")
helm.exec("version")
helm.exec([]string{"version"}, env)
expected = "exec: helm version foo --kube-context dev\n"
if buffer.String() != expected {
t.Errorf("helmexec.exec()\nactual = %v\nexpect = %v", buffer.String(), expected)
@ -322,7 +351,7 @@ func Test_exec(t *testing.T) {
buffer.Reset()
helm = MockExecer(logger, "")
helm.SetHelmBinary("overwritten")
helm.exec("version")
helm.exec([]string{"version"}, env)
expected = "exec: overwritten version\n"
if buffer.String() != expected {
t.Errorf("helmexec.exec()\nactual = %v\nexpect = %v", buffer.String(), expected)
@ -376,3 +405,37 @@ func Test_LogLevels(t *testing.T) {
}
}
}
func Test_getTillerlessEnv(t *testing.T) {
context := HelmContext{Tillerless: true, TillerNamespace: "foo", WorkerIndex: 1}
os.Unsetenv("KUBECONFIG")
actual := context.getTillerlessEnv()
if val, found := actual["HELM_TILLER_SILENT"]; !found || val != "true" {
t.Errorf("getTillerlessEnv() HELM_TILLER_SILENT\nactual = %s\nexpect = true", val)
}
// This feature is disabled until it is fixed in helm
/*if val, found := actual["HELM_TILLER_PORT"]; !found || val != "44135" {
t.Errorf("getTillerlessEnv() HELM_TILLER_PORT\nactual = %s\nexpect = 44135", val)
}*/
if val, found := actual["KUBECONFIG"]; found {
t.Errorf("getTillerlessEnv() KUBECONFIG\nactual = %s\nexpect = nil", val)
}
os.Setenv("KUBECONFIG", "toto")
actual = context.getTillerlessEnv()
cwd, _ := os.Getwd()
expected := path.Join(cwd, "toto")
if val, found := actual["KUBECONFIG"]; !found || val != expected {
t.Errorf("getTillerlessEnv() KUBECONFIG\nactual = %s\nexpect = %s", val, expected)
}
os.Unsetenv("KUBECONFIG")
}
func Test_mergeEnv(t *testing.T) {
actual := env2map(mergeEnv([]string{"A=1", "B=c=d", "E=2"}, map[string]string{"B": "3", "F": "4"}))
expected := map[string]string{"A": "1", "B": "3", "E": "2", "F": "4"}
if !reflect.DeepEqual(actual, expected) {
t.Errorf("mergeEnv()\nactual = %v\nexpect = %v", actual, expected)
}
}

View File

@ -9,14 +9,14 @@ type Interface interface {
UpdateRepo() error
BuildDeps(chart string) error
UpdateDeps(chart string) error
SyncRelease(name, chart string, flags ...string) error
DiffRelease(name, chart string, flags ...string) error
SyncRelease(context HelmContext, name, chart string, flags ...string) error
DiffRelease(context HelmContext, name, chart string, flags ...string) error
TemplateRelease(chart string, flags ...string) error
Fetch(chart string, flags ...string) error
Lint(chart string, flags ...string) error
ReleaseStatus(name string, flags ...string) error
DeleteRelease(name string, flags ...string) error
TestRelease(name string, flags ...string) error
List(filter string, flags ...string) (string, error)
DecryptSecret(name string) (string, error)
ReleaseStatus(context HelmContext, name string, flags ...string) error
DeleteRelease(context HelmContext, name string, flags ...string) error
TestRelease(context HelmContext, name string, flags ...string) error
List(context HelmContext, filter string, flags ...string) (string, error)
DecryptSecret(context HelmContext, name string, flags ...string) (string, error)
}

View File

@ -1,7 +1,9 @@
package helmexec
import (
"os"
"os/exec"
"strings"
)
const (
@ -11,7 +13,7 @@ const (
// Runner interface for shell commands
type Runner interface {
Execute(cmd string, args []string) ([]byte, error)
Execute(cmd string, args []string, env map[string]string) ([]byte, error)
}
// ShellRunner implemention for shell commands
@ -20,8 +22,34 @@ type ShellRunner struct {
}
// Execute a shell command
func (shell ShellRunner) Execute(cmd string, args []string) ([]byte, error) {
func (shell ShellRunner) Execute(cmd string, args []string, env map[string]string) ([]byte, error) {
preparedCmd := exec.Command(cmd, args...)
preparedCmd.Dir = shell.Dir
preparedCmd.Env = mergeEnv(os.Environ(), env)
return preparedCmd.CombinedOutput()
}
func mergeEnv(orig []string, new map[string]string) []string {
wanted := env2map(orig)
for k, v := range new {
wanted[k] = v
}
return map2env(wanted)
}
func map2env(wanted map[string]string) []string {
result := []string{}
for k, v := range wanted {
result = append(result, k+"="+v)
}
return result
}
func env2map(env []string) map[string]string {
wanted := map[string]string{}
for _, cur := range env {
pair := strings.SplitN(cur, "=", 2)
wanted[pair[0]] = pair[1]
}
return wanted
}

View File

@ -158,8 +158,9 @@ func (st *HelmState) loadEnv(name string, readFile func(string) ([]byte, error))
if _, err := os.Stat(path); os.IsNotExist(err) {
return nil, err
}
decFile, err := helm.DecryptSecret(path)
release := &st.Releases[0]
flags := st.appendTillerFlags([]string{}, release)
decFile, err := helm.DecryptSecret(st.createHelmContext(release, 0), path, flags...)
if err != nil {
return nil, err
}

View File

@ -57,6 +57,7 @@ type HelmState struct {
type HelmSpec struct {
KubeContext string `yaml:"kubeContext"`
TillerNamespace string `yaml:"tillerNamespace"`
Tillerless bool `yaml:"tillerless"`
Args []string `yaml:"args"`
Verify bool `yaml:"verify"`
// Devel, when set to true, use development versions, too. Equivalent to version '>0.0.0-0'
@ -130,6 +131,7 @@ type ReleaseSpec struct {
ValuesPathPrefix string `yaml:"valuesPathPrefix"`
TillerNamespace string `yaml:"tillerNamespace"`
Tillerless *bool `yaml:"tillerless"`
TLS *bool `yaml:"tls"`
TLSCACert string `yaml:"tlsCACert"`
@ -226,11 +228,11 @@ func (st *HelmState) prepareSyncReleases(helm helmexec.Interface, additionalValu
}
close(jobs)
},
func(_ int) {
func(workerIndex int) {
for release := range jobs {
st.applyDefaultsTo(release)
flags, flagsErr := st.flagsForUpgrade(helm, release)
flags, flagsErr := st.flagsForUpgrade(helm, release, workerIndex)
if flagsErr != nil {
results <- syncPrepareResult{errors: []*ReleaseError{&ReleaseError{release, flagsErr}}}
continue
@ -277,8 +279,8 @@ func (st *HelmState) prepareSyncReleases(helm helmexec.Interface, additionalValu
return res, errs
}
func (st *HelmState) isReleaseInstalled(helm helmexec.Interface, release ReleaseSpec) (bool, error) {
out, err := helm.List("^"+release.Name+"$", st.tillerFlags(&release)...)
func (st *HelmState) isReleaseInstalled(context helmexec.HelmContext, helm helmexec.Interface, release ReleaseSpec) (bool, error) {
out, err := helm.List(context, "^"+release.Name+"$", st.tillerFlags(&release)...)
if err != nil {
return false, err
} else if out != "" {
@ -289,10 +291,9 @@ func (st *HelmState) isReleaseInstalled(helm helmexec.Interface, release Release
func (st *HelmState) DetectReleasesToBeDeleted(helm helmexec.Interface) ([]*ReleaseSpec, error) {
detected := []*ReleaseSpec{}
for i, _ := range st.Releases {
release := st.Releases[i]
for _, release := range st.Releases {
if !release.Desired() {
installed, err := st.isReleaseInstalled(helm, release)
installed, err := st.isReleaseInstalled(st.createHelmContext(&release, 0), helm, release)
if err != nil {
return nil, err
} else if installed {
@ -323,22 +324,23 @@ func (st *HelmState) SyncReleases(helm helmexec.Interface, additionalValues []st
}
close(jobQueue)
},
func(_ int) {
func(workerIndex int) {
for prep := range jobQueue {
release := prep.release
flags := prep.flags
chart := normalizeChart(st.basePath, release.Chart)
var relErr *ReleaseError
context := st.createHelmContext(release, workerIndex)
if !release.Desired() {
installed, err := st.isReleaseInstalled(helm, *release)
installed, err := st.isReleaseInstalled(context, helm, *release)
if err != nil {
relErr = &ReleaseError{release, err}
} else if installed {
if err := helm.DeleteRelease(release.Name, "--purge"); err != nil {
if err := helm.DeleteRelease(context, release.Name, "--purge"); err != nil {
relErr = &ReleaseError{release, err}
}
}
} else if err := helm.SyncRelease(release.Name, chart, flags...); err != nil {
} else if err := helm.SyncRelease(context, release.Name, chart, flags...); err != nil {
relErr = &ReleaseError{release, err}
}
@ -468,7 +470,7 @@ func (st *HelmState) TemplateReleases(helm helmexec.Interface, additionalValues
continue
}
flags, err := st.flagsForTemplate(helm, &release)
flags, err := st.flagsForTemplate(helm, &release, 0)
if err != nil {
errs = append(errs, err)
}
@ -528,7 +530,7 @@ func (st *HelmState) LintReleases(helm helmexec.Interface, additionalValues []st
continue
}
flags, err := st.flagsForLint(helm, &release)
flags, err := st.flagsForLint(helm, &release, 0)
if err != nil {
errs = append(errs, err)
}
@ -607,13 +609,13 @@ func (st *HelmState) prepareDiffReleases(helm helmexec.Interface, additionalValu
}
close(jobs)
},
func(_ int) {
func(workerIndex int) {
for release := range jobs {
errs := []error{}
st.applyDefaultsTo(release)
flags, err := st.flagsForDiff(helm, release)
flags, err := st.flagsForDiff(helm, release, workerIndex)
if err != nil {
errs = append(errs, err)
}
@ -666,6 +668,23 @@ func (st *HelmState) prepareDiffReleases(helm helmexec.Interface, additionalValu
return rs, errs
}
func (st *HelmState) createHelmContext(spec *ReleaseSpec, workerIndex int) helmexec.HelmContext {
namespace := st.HelmDefaults.TillerNamespace
if spec.TillerNamespace != "" {
namespace = spec.TillerNamespace
}
tillerless := st.HelmDefaults.Tillerless
if spec.Tillerless != nil {
tillerless = *spec.Tillerless
}
return helmexec.HelmContext{
Tillerless: tillerless,
TillerNamespace: namespace,
WorkerIndex: workerIndex,
}
}
// DiffReleases wrapper for executing helm diff on the releases
// It returns releases that had any changes
func (st *HelmState) DiffReleases(helm helmexec.Interface, additionalValues []string, workerLimit int, detailedExitCode, suppressSecrets bool, triggerCleanupEvents bool) ([]*ReleaseSpec, []error) {
@ -689,11 +708,11 @@ func (st *HelmState) DiffReleases(helm helmexec.Interface, additionalValues []st
}
close(jobQueue)
},
func(_ int) {
func(workerIndex int) {
for prep := range jobQueue {
flags := prep.flags
release := prep.release
if err := helm.DiffRelease(release.Name, normalizeChart(st.basePath, release.Chart), flags...); err != nil {
if err := helm.DiffRelease(st.createHelmContext(release, workerIndex), release.Name, normalizeChart(st.basePath, release.Chart), flags...); err != nil {
switch e := err.(type) {
case *exec.ExitError:
// Propagate any non-zero exit status from the external command like `helm` that is failed under the hood
@ -731,7 +750,7 @@ func (st *HelmState) DiffReleases(helm helmexec.Interface, additionalValues []st
}
func (st *HelmState) ReleaseStatuses(helm helmexec.Interface, workerLimit int) []error {
return st.scatterGatherReleases(helm, workerLimit, func(release ReleaseSpec) error {
return st.scatterGatherReleases(helm, workerLimit, func(release ReleaseSpec, workerIndex int) error {
if !release.Desired() {
return nil
}
@ -739,13 +758,13 @@ func (st *HelmState) ReleaseStatuses(helm helmexec.Interface, workerLimit int) [
flags := []string{}
flags = st.appendTillerFlags(flags, &release)
return helm.ReleaseStatus(release.Name, flags...)
return helm.ReleaseStatus(st.createHelmContext(&release, workerIndex), release.Name, flags...)
})
}
// DeleteReleases wrapper for executing helm delete on the releases
func (st *HelmState) DeleteReleases(helm helmexec.Interface, purge bool) []error {
return st.scatterGatherReleases(helm, len(st.Releases), func(release ReleaseSpec) error {
return st.scatterGatherReleases(helm, len(st.Releases), func(release ReleaseSpec, workerIndex int) error {
if !release.Desired() {
return nil
}
@ -755,13 +774,14 @@ func (st *HelmState) DeleteReleases(helm helmexec.Interface, purge bool) []error
flags = append(flags, "--purge")
}
flags = st.appendTillerFlags(flags, &release)
context := st.createHelmContext(&release, workerIndex)
installed, err := st.isReleaseInstalled(helm, release)
installed, err := st.isReleaseInstalled(context, helm, release)
if err != nil {
return err
}
if installed {
return helm.DeleteRelease(release.Name, flags...)
return helm.DeleteRelease(context, release.Name, flags...)
}
return nil
})
@ -769,7 +789,7 @@ func (st *HelmState) DeleteReleases(helm helmexec.Interface, purge bool) []error
// TestReleases wrapper for executing helm test on the releases
func (st *HelmState) TestReleases(helm helmexec.Interface, cleanup bool, timeout int, concurrency int) []error {
return st.scatterGatherReleases(helm, concurrency, func(release ReleaseSpec) error {
return st.scatterGatherReleases(helm, concurrency, func(release ReleaseSpec, workerIndex int) error {
if !release.Desired() {
return nil
}
@ -781,7 +801,7 @@ func (st *HelmState) TestReleases(helm helmexec.Interface, cleanup bool, timeout
flags = append(flags, "--timeout", strconv.Itoa(timeout))
flags = st.appendTillerFlags(flags, &release)
return helm.TestRelease(release.Name, flags...)
return helm.TestRelease(st.createHelmContext(&release, workerIndex), release.Name, flags...)
})
}
@ -993,38 +1013,44 @@ func (st *HelmState) appendTillerFlags(flags []string, release *ReleaseSpec) []s
func (st *HelmState) tillerFlags(release *ReleaseSpec) []string {
flags := []string{}
if release.TillerNamespace != "" {
flags = append(flags, "--tiller-namespace", release.TillerNamespace)
} else if st.HelmDefaults.TillerNamespace != "" {
flags = append(flags, "--tiller-namespace", st.HelmDefaults.TillerNamespace)
tillerless := st.HelmDefaults.Tillerless
if release.Tillerless != nil {
tillerless = *release.Tillerless
}
if !tillerless {
if release.TillerNamespace != "" {
flags = append(flags, "--tiller-namespace", release.TillerNamespace)
} else if st.HelmDefaults.TillerNamespace != "" {
flags = append(flags, "--tiller-namespace", st.HelmDefaults.TillerNamespace)
}
if release.TLS != nil && *release.TLS || release.TLS == nil && st.HelmDefaults.TLS {
flags = append(flags, "--tls")
}
if release.TLS != nil && *release.TLS || release.TLS == nil && st.HelmDefaults.TLS {
flags = append(flags, "--tls")
}
if release.TLSKey != "" {
flags = append(flags, "--tls-key", release.TLSKey)
} else if st.HelmDefaults.TLSKey != "" {
flags = append(flags, "--tls-key", st.HelmDefaults.TLSKey)
}
if release.TLSKey != "" {
flags = append(flags, "--tls-key", release.TLSKey)
} else if st.HelmDefaults.TLSKey != "" {
flags = append(flags, "--tls-key", st.HelmDefaults.TLSKey)
}
if release.TLSCert != "" {
flags = append(flags, "--tls-cert", release.TLSCert)
} else if st.HelmDefaults.TLSCert != "" {
flags = append(flags, "--tls-cert", st.HelmDefaults.TLSCert)
}
if release.TLSCert != "" {
flags = append(flags, "--tls-cert", release.TLSCert)
} else if st.HelmDefaults.TLSCert != "" {
flags = append(flags, "--tls-cert", st.HelmDefaults.TLSCert)
}
if release.TLSCACert != "" {
flags = append(flags, "--tls-ca-cert", release.TLSCACert)
} else if st.HelmDefaults.TLSCACert != "" {
flags = append(flags, "--tls-ca-cert", st.HelmDefaults.TLSCACert)
if release.TLSCACert != "" {
flags = append(flags, "--tls-ca-cert", release.TLSCACert)
} else if st.HelmDefaults.TLSCACert != "" {
flags = append(flags, "--tls-ca-cert", st.HelmDefaults.TLSCACert)
}
}
return flags
}
func (st *HelmState) flagsForUpgrade(helm helmexec.Interface, release *ReleaseSpec) ([]string, error) {
func (st *HelmState) flagsForUpgrade(helm helmexec.Interface, release *ReleaseSpec, workerIndex int) ([]string, error) {
flags := []string{}
if release.Version != "" {
flags = append(flags, "--version", release.Version)
@ -1064,25 +1090,25 @@ func (st *HelmState) flagsForUpgrade(helm helmexec.Interface, release *ReleaseSp
flags = st.appendTillerFlags(flags, release)
common, err := st.namespaceAndValuesFlags(helm, release)
common, err := st.namespaceAndValuesFlags(helm, release, workerIndex)
if err != nil {
return nil, err
}
return append(flags, common...), nil
}
func (st *HelmState) flagsForTemplate(helm helmexec.Interface, release *ReleaseSpec) ([]string, error) {
func (st *HelmState) flagsForTemplate(helm helmexec.Interface, release *ReleaseSpec, workerIndex int) ([]string, error) {
flags := []string{
"--name", release.Name,
}
common, err := st.namespaceAndValuesFlags(helm, release)
common, err := st.namespaceAndValuesFlags(helm, release, workerIndex)
if err != nil {
return nil, err
}
return append(flags, common...), nil
}
func (st *HelmState) flagsForDiff(helm helmexec.Interface, release *ReleaseSpec) ([]string, error) {
func (st *HelmState) flagsForDiff(helm helmexec.Interface, release *ReleaseSpec, workerIndex int) ([]string, error) {
flags := []string{}
if release.Version != "" {
flags = append(flags, "--version", release.Version)
@ -1094,7 +1120,7 @@ func (st *HelmState) flagsForDiff(helm helmexec.Interface, release *ReleaseSpec)
flags = st.appendTillerFlags(flags, release)
common, err := st.namespaceAndValuesFlags(helm, release)
common, err := st.namespaceAndValuesFlags(helm, release, workerIndex)
if err != nil {
return nil, err
}
@ -1110,8 +1136,8 @@ func (st *HelmState) isDevelopment(release *ReleaseSpec) bool {
return result
}
func (st *HelmState) flagsForLint(helm helmexec.Interface, release *ReleaseSpec) ([]string, error) {
return st.namespaceAndValuesFlags(helm, release)
func (st *HelmState) flagsForLint(helm helmexec.Interface, release *ReleaseSpec, workerIndex int) ([]string, error) {
return st.namespaceAndValuesFlags(helm, release, workerIndex)
}
func (st *HelmState) RenderValuesFileToBytes(path string) ([]byte, error) {
@ -1181,7 +1207,7 @@ func (st *HelmState) generateTemporaryValuesFiles(values []interface{}, missingF
return generatedFiles, nil
}
func (st *HelmState) namespaceAndValuesFlags(helm helmexec.Interface, release *ReleaseSpec) ([]string, error) {
func (st *HelmState) namespaceAndValuesFlags(helm helmexec.Interface, release *ReleaseSpec, workerIndex int) ([]string, error) {
flags := []string{}
if release.Namespace != "" {
flags = append(flags, "--namespace", release.Namespace)
@ -1230,7 +1256,8 @@ func (st *HelmState) namespaceAndValuesFlags(helm helmexec.Interface, release *R
}
}
valfile, err := helm.DecryptSecret(path)
decryptFlags := st.appendTillerFlags([]string{}, release)
valfile, err := helm.DecryptSecret(st.createHelmContext(release, workerIndex), path, decryptFlags...)
if err != nil {
return nil, err
}

View File

@ -40,7 +40,8 @@ func (st *HelmState) scatterGather(concurrency int, items int, produceInputs fun
waitGroup.Wait()
}
func (st *HelmState) scatterGatherReleases(helm helmexec.Interface, concurrency int, do func(ReleaseSpec) error) []error {
func (st *HelmState) scatterGatherReleases(helm helmexec.Interface, concurrency int,
do func(ReleaseSpec, int) error) []error {
var errs []error
inputs := st.Releases
@ -60,7 +61,7 @@ func (st *HelmState) scatterGatherReleases(helm helmexec.Interface, concurrency
},
func(id int) {
for release := range releases {
err := do(release)
err := do(release, id)
st.logger.Debugf("sending result for release: %s\n", release.Name)
results <- result{release: release, err: err}
st.logger.Debugf("sent result for release: %s\n", release.Name)

View File

@ -512,7 +512,7 @@ func TestHelmState_flagsForUpgrade(t *testing.T) {
HelmDefaults: tt.defaults,
}
helm := helmexec.New(logger, "default")
args, err := state.flagsForUpgrade(helm, tt.release)
args, err := state.flagsForUpgrade(helm, tt.release, 0)
if err != nil {
t.Errorf("unexpected error flagsForUpgade: %v", err)
}
@ -679,7 +679,7 @@ func (helm *mockHelmExec) AddRepo(name, repository, certfile, keyfile, username,
func (helm *mockHelmExec) UpdateRepo() error {
return nil
}
func (helm *mockHelmExec) SyncRelease(name, chart string, flags ...string) error {
func (helm *mockHelmExec) SyncRelease(context helmexec.HelmContext, name, chart string, flags ...string) error {
if strings.Contains(name, "error") {
return errors.New("error")
}
@ -687,28 +687,28 @@ func (helm *mockHelmExec) SyncRelease(name, chart string, flags ...string) error
helm.charts = append(helm.charts, chart)
return nil
}
func (helm *mockHelmExec) DiffRelease(name, chart string, flags ...string) error {
func (helm *mockHelmExec) DiffRelease(context helmexec.HelmContext, name, chart string, flags ...string) error {
helm.diffed = append(helm.diffed, mockRelease{name: name, flags: flags})
return nil
}
func (helm *mockHelmExec) ReleaseStatus(release string, flags ...string) error {
func (helm *mockHelmExec) ReleaseStatus(context helmexec.HelmContext, release string, flags ...string) error {
if strings.Contains(release, "error") {
return errors.New("error")
}
helm.releases = append(helm.releases, mockRelease{name: release, flags: flags})
return nil
}
func (helm *mockHelmExec) DeleteRelease(name string, flags ...string) error {
func (helm *mockHelmExec) DeleteRelease(context helmexec.HelmContext, name string, flags ...string) error {
helm.deleted = append(helm.deleted, mockRelease{name: name, flags: flags})
return nil
}
func (helm *mockHelmExec) List(filter string, flags ...string) (string, error) {
func (helm *mockHelmExec) List(context helmexec.HelmContext, filter string, flags ...string) (string, error) {
return helm.lists[listKey{filter: filter, flags: strings.Join(flags, "")}], nil
}
func (helm *mockHelmExec) DecryptSecret(name string) (string, error) {
func (helm *mockHelmExec) DecryptSecret(context helmexec.HelmContext, name string, flags ...string) (string, error) {
return "", nil
}
func (helm *mockHelmExec) TestRelease(name string, flags ...string) error {
func (helm *mockHelmExec) TestRelease(context helmexec.HelmContext, name string, flags ...string) error {
if strings.Contains(name, "error") {
return errors.New("error")
}