diff --git a/go.mod b/go.mod index 73601744..5caf1f49 100644 --- a/go.mod +++ b/go.mod @@ -25,7 +25,7 @@ require ( github.com/tatsushid/go-prettytable v0.0.0-20141013043238-ed2d14c29939 github.com/tj/assert v0.0.3 github.com/variantdev/dag v1.1.0 - github.com/werf/kubedog-for-werf-helm v0.0.0-20241217155728-9d45c48b82b6 + github.com/werf/kubedog v0.13.1-0.20260217150136-ed58edf34eac github.com/zclconf/go-cty v1.18.1 github.com/zclconf/go-cty-yaml v1.2.0 go.szostok.io/version v1.2.0 @@ -180,7 +180,6 @@ require ( github.com/cenkalti/backoff/v4 v4.3.0 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/chai2010/gettext-go v1.0.2 // indirect - github.com/chanced/caps v1.0.2 // indirect github.com/cloudflare/circl v1.6.3 // indirect github.com/cncf/xds/go v0.0.0-20251210132809-ee656c7534f5 // indirect github.com/containerd/containerd v1.7.30 // indirect @@ -309,8 +308,6 @@ require ( github.com/werf/logboek v0.6.1 // indirect github.com/x448/float16 v0.8.4 // indirect github.com/xeipuuv/gojsonpointer v0.0.0-20190905194746-02993c407bfb // indirect - github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415 // indirect - github.com/xeipuuv/gojsonschema v1.2.0 // indirect github.com/xlab/treeprint v1.2.0 // indirect github.com/xo/terminfo v0.0.0-20220910002029-abceb7e1c41e // indirect github.com/yandex-cloud/go-genproto v0.69.0 // indirect diff --git a/go.sum b/go.sum index 1e25c93b..10477c13 100644 --- a/go.sum +++ b/go.sum @@ -217,8 +217,6 @@ github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UF github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/chai2010/gettext-go v1.0.2 h1:1Lwwip6Q2QGsAdl/ZKPCwTe9fe0CjlUbqj5bFNSjIRk= github.com/chai2010/gettext-go v1.0.2/go.mod h1:y+wnP2cHYaVj19NZhYKAwEMH2CI1gNHeQQ+5AjwawxA= -github.com/chanced/caps v1.0.2 h1:RELvNN4lZajqSXJGzPaU7z8B4LK2+o2Oc/upeWdgMOA= -github.com/chanced/caps v1.0.2/go.mod h1:SJhRzeYLKJ3OmzyQXhdZ7Etj7lqqWoPtQ1zcSJRtQjs= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/cloudflare/circl v1.6.3 h1:9GPOhQGF9MCYUeXyMYlqTR6a5gTrgR/fBLXvUgtVcg8= github.com/cloudflare/circl v1.6.3/go.mod h1:2eXP6Qfat4O/Yhh8BznvKnJ+uzEoTQ6jVKJRn81BiS4= @@ -779,8 +777,8 @@ github.com/urfave/cli v1.22.17 h1:SYzXoiPfQjHBbkYxbew5prZHS1TOLT3ierW8SYLqtVQ= github.com/urfave/cli v1.22.17/go.mod h1:b0ht0aqgH/6pBYzzxURyrM4xXNgsoT/n2ZzwQiEhNVo= github.com/variantdev/dag v1.1.0 h1:xodYlSng33KWGvIGMpKUyLcIZRXKiNUx612mZJqYrDg= github.com/variantdev/dag v1.1.0/go.mod h1:pH1TQsNSLj2uxMo9NNl9zdGy01Wtn+/2MT96BrKmVyE= -github.com/werf/kubedog-for-werf-helm v0.0.0-20241217155728-9d45c48b82b6 h1:lpgQPTCp+wNJfTqJWtR6A5gRA4e4m/eRJFV7V18XCoA= -github.com/werf/kubedog-for-werf-helm v0.0.0-20241217155728-9d45c48b82b6/go.mod h1:PA9xGVKX9Il6sCgvPrcB3/FahRme3bXRz4BuylvAssc= +github.com/werf/kubedog v0.13.1-0.20260217150136-ed58edf34eac h1:kGp4G79ZiV61SxyLeh6kErzv+u5YBaAxp23oL6T/IJo= +github.com/werf/kubedog v0.13.1-0.20260217150136-ed58edf34eac/go.mod h1:gu4EY4hxtiYVDy5o6WE2lRZS0YWqrOV0HS//GTYyrUE= github.com/werf/logboek v0.6.1 h1:oEe6FkmlKg0z0n80oZjLplj6sXcBeLleCkjfOOZEL2g= github.com/werf/logboek v0.6.1/go.mod h1:Gez5J4bxekyr6MxTmIJyId1F61rpO+0/V4vjCIEIZmk= github.com/x448/float16 v0.8.4 h1:qLwI1I70+NjRFUR3zs1JPUCgaCXSh3SW62uAKT1mSBM= @@ -788,7 +786,6 @@ github.com/x448/float16 v0.8.4/go.mod h1:14CWIYCyZA/cWjXOioeEpHeN/83MdbZDRQHoFcY github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI= github.com/xdg-go/scram v1.1.2/go.mod h1:RT/sEzTbU5y00aCK8UOx6R7YryM0iF1N2MOmC3kKLN4= github.com/xdg-go/stringprep v1.0.4/go.mod h1:mPGuuIYwz7CmR2bT9j4GbQqutWS1zV24gijq1dTyGkM= -github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f/go.mod h1:N2zxlSyiKSe5eX1tZViRH5QA0qijqEDrYZiPEAiq3wU= github.com/xeipuuv/gojsonpointer v0.0.0-20190905194746-02993c407bfb h1:zGWFAtiMcyryUHoUjUJX0/lt1H2+i2Ka2n+D3DImSNo= github.com/xeipuuv/gojsonpointer v0.0.0-20190905194746-02993c407bfb/go.mod h1:N2zxlSyiKSe5eX1tZViRH5QA0qijqEDrYZiPEAiq3wU= github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415 h1:EzJWgHovont7NscjpAxXsDA8S8BMYve8Y5+7cuRE7R0= diff --git a/pkg/kubedog/tracker.go b/pkg/kubedog/tracker.go index 6dcb2bc4..867116f8 100644 --- a/pkg/kubedog/tracker.go +++ b/pkg/kubedog/tracker.go @@ -9,8 +9,14 @@ import ( "sync" "time" - "github.com/werf/kubedog-for-werf-helm/pkg/tracker" - "github.com/werf/kubedog-for-werf-helm/pkg/trackers/rollout/multitrack" + "github.com/werf/kubedog/pkg/informer" + "github.com/werf/kubedog/pkg/tracker" + "github.com/werf/kubedog/pkg/tracker/canary" + "github.com/werf/kubedog/pkg/tracker/daemonset" + "github.com/werf/kubedog/pkg/tracker/deployment" + "github.com/werf/kubedog/pkg/tracker/job" + "github.com/werf/kubedog/pkg/tracker/statefulset" + "github.com/werf/kubedog/pkg/trackers/dyntracker/util" "go.uber.org/zap" "k8s.io/apimachinery/pkg/api/meta" "k8s.io/client-go/discovery" @@ -20,6 +26,7 @@ import ( "k8s.io/client-go/rest" "k8s.io/client-go/restmapper" "k8s.io/client-go/tools/clientcmd" + watchtools "k8s.io/client-go/tools/watch" "github.com/helmfile/helmfile/pkg/resource" ) @@ -187,6 +194,12 @@ func getOrCreateClients(kubeContext, kubeconfig string, qps float32, burst int) return cache, nil } +type trackTarget struct { + kind string + name string + namespace string +} + func (t *Tracker) TrackResources(ctx context.Context, resources []*resource.Resource) error { if len(resources) == 0 { t.logger.Info("No resources to track") @@ -201,81 +214,269 @@ func (t *Tracker) TrackResources(ctx context.Context, resources []*resource.Reso t.logger.Infof("Tracking %d resources with kubedog (filtered from %d total)", len(filtered), len(resources)) - specs := multitrack.MultitrackSpecs{} + targets := t.buildTargets(filtered) + if len(targets) == 0 { + t.logger.Info("No trackable resources found (only Deployment, StatefulSet, DaemonSet, Job, and Canary are supported)") + return nil + } - for _, res := range filtered { + t.logger.Infof("Tracking breakdown: %s", t.targetSummary(targets)) + + watchErrCh := make(chan error, len(targets)) + informerFactory := informer.NewConcurrentInformerFactory( + ctx.Done(), + watchErrCh, + t.dynamicClient, + informer.ConcurrentInformerFactoryOptions{}, + ) + + opts := tracker.Options{ + ParentContext: ctx, + Timeout: t.trackOptions.Timeout, + LogsFromTime: time.Now().Add(-t.trackOptions.LogsSince), + } + + var wg sync.WaitGroup + errCh := make(chan error, len(targets)) + + for _, target := range targets { + wg.Add(1) + go func(tgt trackTarget) { + defer wg.Done() + if err := t.trackSingleResource(tgt, informerFactory, opts); err != nil { + errCh <- fmt.Errorf("%s/%s tracking failed: %w", tgt.kind, tgt.name, err) + } + }(target) + } + + done := make(chan struct{}) + go func() { + wg.Wait() + close(done) + }() + + select { + case err := <-errCh: + return fmt.Errorf("tracking failed: %w", err) + case <-done: + t.logger.Info("All resources tracked successfully") + return nil + case <-ctx.Done(): + return fmt.Errorf("tracking canceled: %w", ctx.Err()) + } +} + +func (t *Tracker) trackSingleResource(target trackTarget, informerFactory *util.Concurrent[*informer.InformerFactory], opts tracker.Options) error { + parentContext := opts.ParentContext + if parentContext == nil { + parentContext = context.Background() + } + ctx, cancel := watchtools.ContextWithOptionalTimeout(parentContext, opts.Timeout) + defer cancel() + + trackErrCh := make(chan error, 1) + doneCh := make(chan struct{}) + + switch target.kind { + case "deploy": + tr := deployment.NewTracker(target.name, target.namespace, t.clientSet, informerFactory, opts) + go t.runDeploymentTracker(ctx, tr, trackErrCh, doneCh) + return t.waitDeploymentTracker(ctx, tr, trackErrCh, doneCh) + case "sts": + tr := statefulset.NewTracker(target.name, target.namespace, t.clientSet, informerFactory, opts) + go t.runStatefulSetTracker(ctx, tr, trackErrCh, doneCh) + return t.waitStatefulSetTracker(ctx, tr, trackErrCh, doneCh) + case "ds": + tr := daemonset.NewTracker(target.name, target.namespace, t.clientSet, informerFactory, opts) + go t.runDaemonSetTracker(ctx, tr, trackErrCh, doneCh) + return t.waitDaemonSetTracker(ctx, tr, trackErrCh, doneCh) + case "job": + tr := job.NewTracker(target.name, target.namespace, t.clientSet, informerFactory, opts) + go t.runJobTracker(ctx, tr, trackErrCh, doneCh) + return t.waitJobTracker(ctx, tr, trackErrCh, doneCh) + case "canary": + tr := canary.NewTracker(target.name, target.namespace, t.clientSet, t.dynamicClient, informerFactory, opts) + go t.runCanaryTracker(ctx, tr, trackErrCh, doneCh) + return t.waitCanaryTracker(ctx, tr, trackErrCh, doneCh) + default: + return fmt.Errorf("unsupported resource kind: %s", target.kind) + } +} + +func (t *Tracker) runDeploymentTracker(ctx context.Context, tr *deployment.Tracker, errCh chan<- error, doneCh chan<- struct{}) { + if err := tr.Track(ctx); err != nil { + errCh <- err + } else { + close(doneCh) + } +} + +func (t *Tracker) waitDeploymentTracker(ctx context.Context, tr *deployment.Tracker, trackErrCh <-chan error, doneCh <-chan struct{}) error { + for { + select { + case <-tr.Ready: + t.logger.Debugf("Deployment %s/%s is ready", tr.Namespace, tr.ResourceName) + return nil + case status := <-tr.Failed: + return fmt.Errorf("deployment %s/%s failed: %s", tr.Namespace, tr.ResourceName, status.FailedReason) + case err := <-trackErrCh: + return err + case <-doneCh: + return nil + case <-ctx.Done(): + return fmt.Errorf("tracking canceled for deployment %s/%s: %w", tr.Namespace, tr.ResourceName, ctx.Err()) + } + } +} + +func (t *Tracker) runStatefulSetTracker(ctx context.Context, tr *statefulset.Tracker, errCh chan<- error, doneCh chan<- struct{}) { + if err := tr.Track(ctx); err != nil { + errCh <- err + } else { + close(doneCh) + } +} + +func (t *Tracker) waitStatefulSetTracker(ctx context.Context, tr *statefulset.Tracker, trackErrCh <-chan error, doneCh <-chan struct{}) error { + for { + select { + case <-tr.Ready: + t.logger.Debugf("StatefulSet %s/%s is ready", tr.Namespace, tr.ResourceName) + return nil + case status := <-tr.Failed: + return fmt.Errorf("statefulset %s/%s failed: %s", tr.Namespace, tr.ResourceName, status.FailedReason) + case err := <-trackErrCh: + return err + case <-doneCh: + return nil + case <-ctx.Done(): + return fmt.Errorf("tracking canceled for statefulset %s/%s: %w", tr.Namespace, tr.ResourceName, ctx.Err()) + } + } +} + +func (t *Tracker) runDaemonSetTracker(ctx context.Context, tr *daemonset.Tracker, errCh chan<- error, doneCh chan<- struct{}) { + if err := tr.Track(ctx); err != nil { + errCh <- err + } else { + close(doneCh) + } +} + +func (t *Tracker) waitDaemonSetTracker(ctx context.Context, tr *daemonset.Tracker, trackErrCh <-chan error, doneCh <-chan struct{}) error { + for { + select { + case <-tr.Ready: + t.logger.Debugf("DaemonSet %s/%s is ready", tr.Namespace, tr.ResourceName) + return nil + case status := <-tr.Failed: + return fmt.Errorf("daemonset %s/%s failed: %s", tr.Namespace, tr.ResourceName, status.FailedReason) + case err := <-trackErrCh: + return err + case <-doneCh: + return nil + case <-ctx.Done(): + return fmt.Errorf("tracking canceled for daemonset %s/%s: %w", tr.Namespace, tr.ResourceName, ctx.Err()) + } + } +} + +func (t *Tracker) runJobTracker(ctx context.Context, tr *job.Tracker, errCh chan<- error, doneCh chan<- struct{}) { + if err := tr.Track(ctx); err != nil { + errCh <- err + } else { + close(doneCh) + } +} + +func (t *Tracker) waitJobTracker(ctx context.Context, tr *job.Tracker, trackErrCh <-chan error, doneCh <-chan struct{}) error { + for { + select { + case <-tr.Succeeded: + t.logger.Debugf("Job %s/%s succeeded", tr.Namespace, tr.ResourceName) + return nil + case status := <-tr.Failed: + return fmt.Errorf("job %s/%s failed: %s", tr.Namespace, tr.ResourceName, status.FailedReason) + case err := <-trackErrCh: + return err + case <-doneCh: + return nil + case <-ctx.Done(): + return fmt.Errorf("tracking canceled for job %s/%s: %w", tr.Namespace, tr.ResourceName, ctx.Err()) + } + } +} + +func (t *Tracker) runCanaryTracker(ctx context.Context, tr *canary.Tracker, errCh chan<- error, doneCh chan<- struct{}) { + if err := tr.Track(ctx); err != nil { + errCh <- err + } else { + close(doneCh) + } +} + +func (t *Tracker) waitCanaryTracker(ctx context.Context, tr *canary.Tracker, trackErrCh <-chan error, doneCh <-chan struct{}) error { + for { + select { + case <-tr.Succeeded: + t.logger.Debugf("Canary %s/%s succeeded", tr.Namespace, tr.ResourceName) + return nil + case status := <-tr.Failed: + return fmt.Errorf("canary %s/%s failed: %s", tr.Namespace, tr.ResourceName, status.FailedReason) + case err := <-trackErrCh: + return err + case <-doneCh: + return nil + case <-ctx.Done(): + return fmt.Errorf("tracking canceled for canary %s/%s: %w", tr.Namespace, tr.ResourceName, ctx.Err()) + } + } +} + +func (t *Tracker) buildTargets(resources []*resource.Resource) []trackTarget { + var targets []trackTarget + for _, res := range resources { namespace := res.Namespace if namespace == "" { namespace = t.namespace } + kind := "" switch strings.ToLower(res.Kind) { case "deployment", "deploy": - specs.Deployments = append(specs.Deployments, multitrack.MultitrackSpec{ - ResourceName: res.Name, - Namespace: namespace, - SkipLogs: !t.trackOptions.Logs, - }) + kind = "deploy" case "statefulset", "sts": - specs.StatefulSets = append(specs.StatefulSets, multitrack.MultitrackSpec{ - ResourceName: res.Name, - Namespace: namespace, - SkipLogs: !t.trackOptions.Logs, - }) + kind = "sts" case "daemonset", "ds": - specs.DaemonSets = append(specs.DaemonSets, multitrack.MultitrackSpec{ - ResourceName: res.Name, - Namespace: namespace, - SkipLogs: !t.trackOptions.Logs, - }) + kind = "ds" case "job": - specs.Jobs = append(specs.Jobs, multitrack.MultitrackSpec{ - ResourceName: res.Name, - Namespace: namespace, - SkipLogs: !t.trackOptions.Logs, - }) + kind = "job" case "canary": - specs.Canaries = append(specs.Canaries, multitrack.MultitrackSpec{ - ResourceName: res.Name, - Namespace: namespace, - SkipLogs: !t.trackOptions.Logs, - }) + kind = "canary" default: t.logger.Debugf("Skipping unsupported kind %s for resource %s/%s", res.Kind, namespace, res.Name) + continue } + + targets = append(targets, trackTarget{ + kind: kind, + name: res.Name, + namespace: namespace, + }) } + return targets +} - totalResources := len(specs.Deployments) + len(specs.StatefulSets) + - len(specs.DaemonSets) + len(specs.Jobs) + len(specs.Canaries) - - if totalResources == 0 { - t.logger.Info("No trackable resources found (only Deployment, StatefulSet, DaemonSet, Job, and Canary are supported)") - return nil +func (t *Tracker) targetSummary(targets []trackTarget) string { + counts := make(map[string]int) + for _, tgt := range targets { + counts[tgt.kind]++ } - - t.logger.Infof("Tracking breakdown: Deployments=%d, StatefulSets=%d, DaemonSets=%d, Jobs=%d, Canaries=%d", - len(specs.Deployments), len(specs.StatefulSets), len(specs.DaemonSets), - len(specs.Jobs), len(specs.Canaries)) - - opts := multitrack.MultitrackOptions{ - Options: tracker.Options{ - ParentContext: ctx, - Timeout: t.trackOptions.Timeout, - LogsFromTime: time.Now().Add(-t.trackOptions.LogsSince), - }, - StatusProgressPeriod: 5 * time.Second, - DynamicClient: t.dynamicClient, - DiscoveryClient: t.discovery, - Mapper: t.mapper, + parts := make([]string, 0, len(counts)) + for kind, count := range counts { + parts = append(parts, fmt.Sprintf("%ss=%d", kind, count)) } - - err := multitrack.Multitrack(t.clientSet, specs, opts) - if err != nil { - return fmt.Errorf("tracking failed: %w", err) - } - - t.logger.Info("All resources tracked successfully") - return nil + return strings.Join(parts, ", ") } func (t *Tracker) filterResources(resources []*resource.Resource) []*resource.Resource {