helmfile/pkg/kubedog/tracker.go

296 lines
7.5 KiB
Go

package kubedog
import (
"context"
"fmt"
"math"
"os"
"strings"
"sync"
"time"
"github.com/werf/kubedog-for-werf-helm/pkg/tracker"
"github.com/werf/kubedog-for-werf-helm/pkg/trackers/rollout/multitrack"
"go.uber.org/zap"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/client-go/discovery"
"k8s.io/client-go/discovery/cached/memory"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/restmapper"
"k8s.io/client-go/tools/clientcmd"
"github.com/helmfile/helmfile/pkg/resource"
)
type cacheKey struct {
kubeContext string
kubeconfig string
qps float32
burst int
}
type clientCacheEntry struct {
clientSet kubernetes.Interface
dynamicClient dynamic.Interface
restConfig *rest.Config
discovery discovery.CachedDiscoveryInterface
mapper meta.RESTMapper
}
var (
kubeInitMu sync.Mutex
clientCache = make(map[cacheKey]clientCacheEntry)
)
type Tracker struct {
logger *zap.SugaredLogger
clientSet kubernetes.Interface
dynamicClient dynamic.Interface
discovery discovery.CachedDiscoveryInterface
mapper meta.RESTMapper
trackOptions *TrackOptions
filter *resource.ResourceFilter
namespace string
}
type TrackerConfig struct {
Logger *zap.SugaredLogger
Namespace string
KubeContext string
Kubeconfig string
TrackOptions *TrackOptions
KubedogQPS *float32
KubedogBurst *int
}
func NewTracker(config *TrackerConfig) (*Tracker, error) {
logger := config.Logger
if logger == nil {
logger = zap.NewNop().Sugar()
}
kubeconfig := config.Kubeconfig
if kubeconfig == "" {
kubeconfig = os.Getenv("KUBECONFIG")
}
options := config.TrackOptions
if options == nil {
options = NewTrackOptions()
}
qps := options.QPS
if config.KubedogQPS != nil {
qps = *config.KubedogQPS
}
burst := options.Burst
if config.KubedogBurst != nil {
burst = *config.KubedogBurst
}
if qps <= 0 || math.IsInf(float64(qps), 0) || math.IsNaN(float64(qps)) {
return nil, fmt.Errorf("invalid kubedog QPS %v: must be > 0 and finite", qps)
}
if burst < 1 {
return nil, fmt.Errorf("invalid kubedog burst %v: must be >= 1", burst)
}
cacheEntry, err := getOrCreateClients(config.KubeContext, kubeconfig, qps, burst)
if err != nil {
return nil, fmt.Errorf("failed to initialize kubernetes clients: %w", err)
}
var filter *resource.ResourceFilter
if options.Filter != nil {
filter = resource.NewResourceFilter(options.Filter, logger)
}
return &Tracker{
logger: logger,
clientSet: cacheEntry.clientSet,
dynamicClient: cacheEntry.dynamicClient,
discovery: cacheEntry.discovery,
mapper: cacheEntry.mapper,
trackOptions: options,
filter: filter,
namespace: config.Namespace,
}, nil
}
func getOrCreateClients(kubeContext, kubeconfig string, qps float32, burst int) (clientCacheEntry, error) {
key := cacheKey{
kubeContext: kubeContext,
kubeconfig: kubeconfig,
qps: qps,
burst: burst,
}
kubeInitMu.Lock()
if cache, ok := clientCache[key]; ok {
kubeInitMu.Unlock()
return cache, nil
}
kubeInitMu.Unlock()
loadingRules := clientcmd.NewDefaultClientConfigLoadingRules()
if kubeconfig != "" {
loadingRules.ExplicitPath = kubeconfig
}
overrides := &clientcmd.ConfigOverrides{}
if kubeContext != "" {
overrides.CurrentContext = kubeContext
}
cc := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(loadingRules, overrides)
restConfig, err := cc.ClientConfig()
if err != nil {
return clientCacheEntry{}, fmt.Errorf("failed to load kubeconfig: %w", err)
}
restConfig.QPS = qps
restConfig.Burst = burst
clientSet, err := kubernetes.NewForConfig(restConfig)
if err != nil {
return clientCacheEntry{}, fmt.Errorf("failed to create kubernetes client: %w", err)
}
dynamicClient, err := dynamic.NewForConfig(restConfig)
if err != nil {
return clientCacheEntry{}, fmt.Errorf("failed to create dynamic client: %w", err)
}
discoveryClient := memory.NewMemCacheClient(clientSet.Discovery())
mapper := restmapper.NewDeferredDiscoveryRESTMapper(discoveryClient)
cache := clientCacheEntry{
clientSet: clientSet,
dynamicClient: dynamicClient,
restConfig: restConfig,
discovery: discoveryClient,
mapper: mapper,
}
kubeInitMu.Lock()
defer kubeInitMu.Unlock()
if existingCache, ok := clientCache[key]; ok {
return existingCache, nil
}
clientCache[key] = cache
return cache, nil
}
func (t *Tracker) TrackResources(ctx context.Context, resources []*resource.Resource) error {
if len(resources) == 0 {
t.logger.Info("No resources to track")
return nil
}
filtered := t.filterResources(resources)
if len(filtered) == 0 {
t.logger.Info("No resources to track after filtering")
return nil
}
t.logger.Infof("Tracking %d resources with kubedog (filtered from %d total)", len(filtered), len(resources))
specs := multitrack.MultitrackSpecs{}
for _, res := range filtered {
namespace := res.Namespace
if namespace == "" {
namespace = t.namespace
}
switch strings.ToLower(res.Kind) {
case "deployment", "deploy":
specs.Deployments = append(specs.Deployments, multitrack.MultitrackSpec{
ResourceName: res.Name,
Namespace: namespace,
SkipLogs: !t.trackOptions.Logs,
})
case "statefulset", "sts":
specs.StatefulSets = append(specs.StatefulSets, multitrack.MultitrackSpec{
ResourceName: res.Name,
Namespace: namespace,
SkipLogs: !t.trackOptions.Logs,
})
case "daemonset", "ds":
specs.DaemonSets = append(specs.DaemonSets, multitrack.MultitrackSpec{
ResourceName: res.Name,
Namespace: namespace,
SkipLogs: !t.trackOptions.Logs,
})
case "job":
specs.Jobs = append(specs.Jobs, multitrack.MultitrackSpec{
ResourceName: res.Name,
Namespace: namespace,
SkipLogs: !t.trackOptions.Logs,
})
case "canary":
specs.Canaries = append(specs.Canaries, multitrack.MultitrackSpec{
ResourceName: res.Name,
Namespace: namespace,
SkipLogs: !t.trackOptions.Logs,
})
default:
t.logger.Debugf("Skipping unsupported kind %s for resource %s/%s", res.Kind, namespace, res.Name)
}
}
totalResources := len(specs.Deployments) + len(specs.StatefulSets) +
len(specs.DaemonSets) + len(specs.Jobs) + len(specs.Canaries)
if totalResources == 0 {
t.logger.Info("No trackable resources found (only Deployment, StatefulSet, DaemonSet, Job, and Canary are supported)")
return nil
}
t.logger.Infof("Tracking breakdown: Deployments=%d, StatefulSets=%d, DaemonSets=%d, Jobs=%d, Canaries=%d",
len(specs.Deployments), len(specs.StatefulSets), len(specs.DaemonSets),
len(specs.Jobs), len(specs.Canaries))
opts := multitrack.MultitrackOptions{
Options: tracker.Options{
ParentContext: ctx,
Timeout: t.trackOptions.Timeout,
LogsFromTime: time.Now().Add(-t.trackOptions.LogsSince),
},
StatusProgressPeriod: 5 * time.Second,
DynamicClient: t.dynamicClient,
DiscoveryClient: t.discovery,
Mapper: t.mapper,
}
err := multitrack.Multitrack(t.clientSet, specs, opts)
if err != nil {
return fmt.Errorf("tracking failed: %w", err)
}
t.logger.Info("All resources tracked successfully")
return nil
}
func (t *Tracker) filterResources(resources []*resource.Resource) []*resource.Resource {
if t.filter == nil {
return resources
}
var result []*resource.Resource
for _, res := range resources {
if t.filter.ShouldTrack(res) {
result = append(result, res)
} else {
t.logger.Debugf("Skipping resource %s/%s (kind: %s) based on configuration", res.Namespace, res.Name, res.Kind)
}
}
return result
}