From bda57b741fb97e2cae4e84cc9ca2141dff235d87 Mon Sep 17 00:00:00 2001 From: yxxhero <11087727+yxxhero@users.noreply.github.com> Date: Sat, 2 May 2026 08:17:48 +0800 Subject: [PATCH] build(deps): replace werf/kubedog-for-werf-helm with werf/kubedog (#2568) * build(deps): replace werf/kubedog-for-werf-helm with werf/kubedog Replace the fork github.com/werf/kubedog-for-werf-helm with the upstream github.com/werf/kubedog. The fork was a temporary compatibility shim; the upstream repository now includes the necessary k8s API fixes. Signed-off-by: yxxhero * fix(kubedog): initialize InformerFactory to prevent nil pointer panic The upstream werf/kubedog now requires an InformerFactory for its resource trackers (deployment, statefulset, daemonset, job, canary), but the multitrack layer still passes nil. Bypass the broken multitrack feed layer by creating resource trackers directly with a properly initialized InformerFactory. Signed-off-by: yxxhero * fix(lint): correct misspelling of canceled Signed-off-by: yxxhero --------- Signed-off-by: yxxhero --- go.mod | 5 +- go.sum | 7 +- pkg/kubedog/tracker.go | 317 +++++++++++++++++++++++++++++++++-------- 3 files changed, 262 insertions(+), 67 deletions(-) diff --git a/go.mod b/go.mod index 73601744..5caf1f49 100644 --- a/go.mod +++ b/go.mod @@ -25,7 +25,7 @@ require ( github.com/tatsushid/go-prettytable v0.0.0-20141013043238-ed2d14c29939 github.com/tj/assert v0.0.3 github.com/variantdev/dag v1.1.0 - github.com/werf/kubedog-for-werf-helm v0.0.0-20241217155728-9d45c48b82b6 + github.com/werf/kubedog v0.13.1-0.20260217150136-ed58edf34eac github.com/zclconf/go-cty v1.18.1 github.com/zclconf/go-cty-yaml v1.2.0 go.szostok.io/version v1.2.0 @@ -180,7 +180,6 @@ require ( github.com/cenkalti/backoff/v4 v4.3.0 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/chai2010/gettext-go v1.0.2 // indirect - github.com/chanced/caps v1.0.2 // indirect github.com/cloudflare/circl v1.6.3 // indirect github.com/cncf/xds/go v0.0.0-20251210132809-ee656c7534f5 // indirect github.com/containerd/containerd v1.7.30 // indirect @@ -309,8 +308,6 @@ require ( github.com/werf/logboek v0.6.1 // indirect github.com/x448/float16 v0.8.4 // indirect github.com/xeipuuv/gojsonpointer v0.0.0-20190905194746-02993c407bfb // indirect - github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415 // indirect - github.com/xeipuuv/gojsonschema v1.2.0 // indirect github.com/xlab/treeprint v1.2.0 // indirect github.com/xo/terminfo v0.0.0-20220910002029-abceb7e1c41e // indirect github.com/yandex-cloud/go-genproto v0.69.0 // indirect diff --git a/go.sum b/go.sum index 1e25c93b..10477c13 100644 --- a/go.sum +++ b/go.sum @@ -217,8 +217,6 @@ github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UF github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/chai2010/gettext-go v1.0.2 h1:1Lwwip6Q2QGsAdl/ZKPCwTe9fe0CjlUbqj5bFNSjIRk= github.com/chai2010/gettext-go v1.0.2/go.mod h1:y+wnP2cHYaVj19NZhYKAwEMH2CI1gNHeQQ+5AjwawxA= -github.com/chanced/caps v1.0.2 h1:RELvNN4lZajqSXJGzPaU7z8B4LK2+o2Oc/upeWdgMOA= -github.com/chanced/caps v1.0.2/go.mod h1:SJhRzeYLKJ3OmzyQXhdZ7Etj7lqqWoPtQ1zcSJRtQjs= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/cloudflare/circl v1.6.3 h1:9GPOhQGF9MCYUeXyMYlqTR6a5gTrgR/fBLXvUgtVcg8= github.com/cloudflare/circl v1.6.3/go.mod h1:2eXP6Qfat4O/Yhh8BznvKnJ+uzEoTQ6jVKJRn81BiS4= @@ -779,8 +777,8 @@ github.com/urfave/cli v1.22.17 h1:SYzXoiPfQjHBbkYxbew5prZHS1TOLT3ierW8SYLqtVQ= github.com/urfave/cli v1.22.17/go.mod h1:b0ht0aqgH/6pBYzzxURyrM4xXNgsoT/n2ZzwQiEhNVo= github.com/variantdev/dag v1.1.0 h1:xodYlSng33KWGvIGMpKUyLcIZRXKiNUx612mZJqYrDg= github.com/variantdev/dag v1.1.0/go.mod h1:pH1TQsNSLj2uxMo9NNl9zdGy01Wtn+/2MT96BrKmVyE= -github.com/werf/kubedog-for-werf-helm v0.0.0-20241217155728-9d45c48b82b6 h1:lpgQPTCp+wNJfTqJWtR6A5gRA4e4m/eRJFV7V18XCoA= -github.com/werf/kubedog-for-werf-helm v0.0.0-20241217155728-9d45c48b82b6/go.mod h1:PA9xGVKX9Il6sCgvPrcB3/FahRme3bXRz4BuylvAssc= +github.com/werf/kubedog v0.13.1-0.20260217150136-ed58edf34eac h1:kGp4G79ZiV61SxyLeh6kErzv+u5YBaAxp23oL6T/IJo= +github.com/werf/kubedog v0.13.1-0.20260217150136-ed58edf34eac/go.mod h1:gu4EY4hxtiYVDy5o6WE2lRZS0YWqrOV0HS//GTYyrUE= github.com/werf/logboek v0.6.1 h1:oEe6FkmlKg0z0n80oZjLplj6sXcBeLleCkjfOOZEL2g= github.com/werf/logboek v0.6.1/go.mod h1:Gez5J4bxekyr6MxTmIJyId1F61rpO+0/V4vjCIEIZmk= github.com/x448/float16 v0.8.4 h1:qLwI1I70+NjRFUR3zs1JPUCgaCXSh3SW62uAKT1mSBM= @@ -788,7 +786,6 @@ github.com/x448/float16 v0.8.4/go.mod h1:14CWIYCyZA/cWjXOioeEpHeN/83MdbZDRQHoFcY github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI= github.com/xdg-go/scram v1.1.2/go.mod h1:RT/sEzTbU5y00aCK8UOx6R7YryM0iF1N2MOmC3kKLN4= github.com/xdg-go/stringprep v1.0.4/go.mod h1:mPGuuIYwz7CmR2bT9j4GbQqutWS1zV24gijq1dTyGkM= -github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f/go.mod h1:N2zxlSyiKSe5eX1tZViRH5QA0qijqEDrYZiPEAiq3wU= github.com/xeipuuv/gojsonpointer v0.0.0-20190905194746-02993c407bfb h1:zGWFAtiMcyryUHoUjUJX0/lt1H2+i2Ka2n+D3DImSNo= github.com/xeipuuv/gojsonpointer v0.0.0-20190905194746-02993c407bfb/go.mod h1:N2zxlSyiKSe5eX1tZViRH5QA0qijqEDrYZiPEAiq3wU= github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415 h1:EzJWgHovont7NscjpAxXsDA8S8BMYve8Y5+7cuRE7R0= diff --git a/pkg/kubedog/tracker.go b/pkg/kubedog/tracker.go index 6dcb2bc4..867116f8 100644 --- a/pkg/kubedog/tracker.go +++ b/pkg/kubedog/tracker.go @@ -9,8 +9,14 @@ import ( "sync" "time" - "github.com/werf/kubedog-for-werf-helm/pkg/tracker" - "github.com/werf/kubedog-for-werf-helm/pkg/trackers/rollout/multitrack" + "github.com/werf/kubedog/pkg/informer" + "github.com/werf/kubedog/pkg/tracker" + "github.com/werf/kubedog/pkg/tracker/canary" + "github.com/werf/kubedog/pkg/tracker/daemonset" + "github.com/werf/kubedog/pkg/tracker/deployment" + "github.com/werf/kubedog/pkg/tracker/job" + "github.com/werf/kubedog/pkg/tracker/statefulset" + "github.com/werf/kubedog/pkg/trackers/dyntracker/util" "go.uber.org/zap" "k8s.io/apimachinery/pkg/api/meta" "k8s.io/client-go/discovery" @@ -20,6 +26,7 @@ import ( "k8s.io/client-go/rest" "k8s.io/client-go/restmapper" "k8s.io/client-go/tools/clientcmd" + watchtools "k8s.io/client-go/tools/watch" "github.com/helmfile/helmfile/pkg/resource" ) @@ -187,6 +194,12 @@ func getOrCreateClients(kubeContext, kubeconfig string, qps float32, burst int) return cache, nil } +type trackTarget struct { + kind string + name string + namespace string +} + func (t *Tracker) TrackResources(ctx context.Context, resources []*resource.Resource) error { if len(resources) == 0 { t.logger.Info("No resources to track") @@ -201,81 +214,269 @@ func (t *Tracker) TrackResources(ctx context.Context, resources []*resource.Reso t.logger.Infof("Tracking %d resources with kubedog (filtered from %d total)", len(filtered), len(resources)) - specs := multitrack.MultitrackSpecs{} + targets := t.buildTargets(filtered) + if len(targets) == 0 { + t.logger.Info("No trackable resources found (only Deployment, StatefulSet, DaemonSet, Job, and Canary are supported)") + return nil + } - for _, res := range filtered { + t.logger.Infof("Tracking breakdown: %s", t.targetSummary(targets)) + + watchErrCh := make(chan error, len(targets)) + informerFactory := informer.NewConcurrentInformerFactory( + ctx.Done(), + watchErrCh, + t.dynamicClient, + informer.ConcurrentInformerFactoryOptions{}, + ) + + opts := tracker.Options{ + ParentContext: ctx, + Timeout: t.trackOptions.Timeout, + LogsFromTime: time.Now().Add(-t.trackOptions.LogsSince), + } + + var wg sync.WaitGroup + errCh := make(chan error, len(targets)) + + for _, target := range targets { + wg.Add(1) + go func(tgt trackTarget) { + defer wg.Done() + if err := t.trackSingleResource(tgt, informerFactory, opts); err != nil { + errCh <- fmt.Errorf("%s/%s tracking failed: %w", tgt.kind, tgt.name, err) + } + }(target) + } + + done := make(chan struct{}) + go func() { + wg.Wait() + close(done) + }() + + select { + case err := <-errCh: + return fmt.Errorf("tracking failed: %w", err) + case <-done: + t.logger.Info("All resources tracked successfully") + return nil + case <-ctx.Done(): + return fmt.Errorf("tracking canceled: %w", ctx.Err()) + } +} + +func (t *Tracker) trackSingleResource(target trackTarget, informerFactory *util.Concurrent[*informer.InformerFactory], opts tracker.Options) error { + parentContext := opts.ParentContext + if parentContext == nil { + parentContext = context.Background() + } + ctx, cancel := watchtools.ContextWithOptionalTimeout(parentContext, opts.Timeout) + defer cancel() + + trackErrCh := make(chan error, 1) + doneCh := make(chan struct{}) + + switch target.kind { + case "deploy": + tr := deployment.NewTracker(target.name, target.namespace, t.clientSet, informerFactory, opts) + go t.runDeploymentTracker(ctx, tr, trackErrCh, doneCh) + return t.waitDeploymentTracker(ctx, tr, trackErrCh, doneCh) + case "sts": + tr := statefulset.NewTracker(target.name, target.namespace, t.clientSet, informerFactory, opts) + go t.runStatefulSetTracker(ctx, tr, trackErrCh, doneCh) + return t.waitStatefulSetTracker(ctx, tr, trackErrCh, doneCh) + case "ds": + tr := daemonset.NewTracker(target.name, target.namespace, t.clientSet, informerFactory, opts) + go t.runDaemonSetTracker(ctx, tr, trackErrCh, doneCh) + return t.waitDaemonSetTracker(ctx, tr, trackErrCh, doneCh) + case "job": + tr := job.NewTracker(target.name, target.namespace, t.clientSet, informerFactory, opts) + go t.runJobTracker(ctx, tr, trackErrCh, doneCh) + return t.waitJobTracker(ctx, tr, trackErrCh, doneCh) + case "canary": + tr := canary.NewTracker(target.name, target.namespace, t.clientSet, t.dynamicClient, informerFactory, opts) + go t.runCanaryTracker(ctx, tr, trackErrCh, doneCh) + return t.waitCanaryTracker(ctx, tr, trackErrCh, doneCh) + default: + return fmt.Errorf("unsupported resource kind: %s", target.kind) + } +} + +func (t *Tracker) runDeploymentTracker(ctx context.Context, tr *deployment.Tracker, errCh chan<- error, doneCh chan<- struct{}) { + if err := tr.Track(ctx); err != nil { + errCh <- err + } else { + close(doneCh) + } +} + +func (t *Tracker) waitDeploymentTracker(ctx context.Context, tr *deployment.Tracker, trackErrCh <-chan error, doneCh <-chan struct{}) error { + for { + select { + case <-tr.Ready: + t.logger.Debugf("Deployment %s/%s is ready", tr.Namespace, tr.ResourceName) + return nil + case status := <-tr.Failed: + return fmt.Errorf("deployment %s/%s failed: %s", tr.Namespace, tr.ResourceName, status.FailedReason) + case err := <-trackErrCh: + return err + case <-doneCh: + return nil + case <-ctx.Done(): + return fmt.Errorf("tracking canceled for deployment %s/%s: %w", tr.Namespace, tr.ResourceName, ctx.Err()) + } + } +} + +func (t *Tracker) runStatefulSetTracker(ctx context.Context, tr *statefulset.Tracker, errCh chan<- error, doneCh chan<- struct{}) { + if err := tr.Track(ctx); err != nil { + errCh <- err + } else { + close(doneCh) + } +} + +func (t *Tracker) waitStatefulSetTracker(ctx context.Context, tr *statefulset.Tracker, trackErrCh <-chan error, doneCh <-chan struct{}) error { + for { + select { + case <-tr.Ready: + t.logger.Debugf("StatefulSet %s/%s is ready", tr.Namespace, tr.ResourceName) + return nil + case status := <-tr.Failed: + return fmt.Errorf("statefulset %s/%s failed: %s", tr.Namespace, tr.ResourceName, status.FailedReason) + case err := <-trackErrCh: + return err + case <-doneCh: + return nil + case <-ctx.Done(): + return fmt.Errorf("tracking canceled for statefulset %s/%s: %w", tr.Namespace, tr.ResourceName, ctx.Err()) + } + } +} + +func (t *Tracker) runDaemonSetTracker(ctx context.Context, tr *daemonset.Tracker, errCh chan<- error, doneCh chan<- struct{}) { + if err := tr.Track(ctx); err != nil { + errCh <- err + } else { + close(doneCh) + } +} + +func (t *Tracker) waitDaemonSetTracker(ctx context.Context, tr *daemonset.Tracker, trackErrCh <-chan error, doneCh <-chan struct{}) error { + for { + select { + case <-tr.Ready: + t.logger.Debugf("DaemonSet %s/%s is ready", tr.Namespace, tr.ResourceName) + return nil + case status := <-tr.Failed: + return fmt.Errorf("daemonset %s/%s failed: %s", tr.Namespace, tr.ResourceName, status.FailedReason) + case err := <-trackErrCh: + return err + case <-doneCh: + return nil + case <-ctx.Done(): + return fmt.Errorf("tracking canceled for daemonset %s/%s: %w", tr.Namespace, tr.ResourceName, ctx.Err()) + } + } +} + +func (t *Tracker) runJobTracker(ctx context.Context, tr *job.Tracker, errCh chan<- error, doneCh chan<- struct{}) { + if err := tr.Track(ctx); err != nil { + errCh <- err + } else { + close(doneCh) + } +} + +func (t *Tracker) waitJobTracker(ctx context.Context, tr *job.Tracker, trackErrCh <-chan error, doneCh <-chan struct{}) error { + for { + select { + case <-tr.Succeeded: + t.logger.Debugf("Job %s/%s succeeded", tr.Namespace, tr.ResourceName) + return nil + case status := <-tr.Failed: + return fmt.Errorf("job %s/%s failed: %s", tr.Namespace, tr.ResourceName, status.FailedReason) + case err := <-trackErrCh: + return err + case <-doneCh: + return nil + case <-ctx.Done(): + return fmt.Errorf("tracking canceled for job %s/%s: %w", tr.Namespace, tr.ResourceName, ctx.Err()) + } + } +} + +func (t *Tracker) runCanaryTracker(ctx context.Context, tr *canary.Tracker, errCh chan<- error, doneCh chan<- struct{}) { + if err := tr.Track(ctx); err != nil { + errCh <- err + } else { + close(doneCh) + } +} + +func (t *Tracker) waitCanaryTracker(ctx context.Context, tr *canary.Tracker, trackErrCh <-chan error, doneCh <-chan struct{}) error { + for { + select { + case <-tr.Succeeded: + t.logger.Debugf("Canary %s/%s succeeded", tr.Namespace, tr.ResourceName) + return nil + case status := <-tr.Failed: + return fmt.Errorf("canary %s/%s failed: %s", tr.Namespace, tr.ResourceName, status.FailedReason) + case err := <-trackErrCh: + return err + case <-doneCh: + return nil + case <-ctx.Done(): + return fmt.Errorf("tracking canceled for canary %s/%s: %w", tr.Namespace, tr.ResourceName, ctx.Err()) + } + } +} + +func (t *Tracker) buildTargets(resources []*resource.Resource) []trackTarget { + var targets []trackTarget + for _, res := range resources { namespace := res.Namespace if namespace == "" { namespace = t.namespace } + kind := "" switch strings.ToLower(res.Kind) { case "deployment", "deploy": - specs.Deployments = append(specs.Deployments, multitrack.MultitrackSpec{ - ResourceName: res.Name, - Namespace: namespace, - SkipLogs: !t.trackOptions.Logs, - }) + kind = "deploy" case "statefulset", "sts": - specs.StatefulSets = append(specs.StatefulSets, multitrack.MultitrackSpec{ - ResourceName: res.Name, - Namespace: namespace, - SkipLogs: !t.trackOptions.Logs, - }) + kind = "sts" case "daemonset", "ds": - specs.DaemonSets = append(specs.DaemonSets, multitrack.MultitrackSpec{ - ResourceName: res.Name, - Namespace: namespace, - SkipLogs: !t.trackOptions.Logs, - }) + kind = "ds" case "job": - specs.Jobs = append(specs.Jobs, multitrack.MultitrackSpec{ - ResourceName: res.Name, - Namespace: namespace, - SkipLogs: !t.trackOptions.Logs, - }) + kind = "job" case "canary": - specs.Canaries = append(specs.Canaries, multitrack.MultitrackSpec{ - ResourceName: res.Name, - Namespace: namespace, - SkipLogs: !t.trackOptions.Logs, - }) + kind = "canary" default: t.logger.Debugf("Skipping unsupported kind %s for resource %s/%s", res.Kind, namespace, res.Name) + continue } + + targets = append(targets, trackTarget{ + kind: kind, + name: res.Name, + namespace: namespace, + }) } + return targets +} - totalResources := len(specs.Deployments) + len(specs.StatefulSets) + - len(specs.DaemonSets) + len(specs.Jobs) + len(specs.Canaries) - - if totalResources == 0 { - t.logger.Info("No trackable resources found (only Deployment, StatefulSet, DaemonSet, Job, and Canary are supported)") - return nil +func (t *Tracker) targetSummary(targets []trackTarget) string { + counts := make(map[string]int) + for _, tgt := range targets { + counts[tgt.kind]++ } - - t.logger.Infof("Tracking breakdown: Deployments=%d, StatefulSets=%d, DaemonSets=%d, Jobs=%d, Canaries=%d", - len(specs.Deployments), len(specs.StatefulSets), len(specs.DaemonSets), - len(specs.Jobs), len(specs.Canaries)) - - opts := multitrack.MultitrackOptions{ - Options: tracker.Options{ - ParentContext: ctx, - Timeout: t.trackOptions.Timeout, - LogsFromTime: time.Now().Add(-t.trackOptions.LogsSince), - }, - StatusProgressPeriod: 5 * time.Second, - DynamicClient: t.dynamicClient, - DiscoveryClient: t.discovery, - Mapper: t.mapper, + parts := make([]string, 0, len(counts)) + for kind, count := range counts { + parts = append(parts, fmt.Sprintf("%ss=%d", kind, count)) } - - err := multitrack.Multitrack(t.clientSet, specs, opts) - if err != nil { - return fmt.Errorf("tracking failed: %w", err) - } - - t.logger.Info("All resources tracked successfully") - return nil + return strings.Join(parts, ", ") } func (t *Tracker) filterResources(resources []*resource.Resource) []*resource.Resource {