296 lines
7.5 KiB
Go
296 lines
7.5 KiB
Go
package kubedog
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"math"
|
|
"os"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/werf/kubedog-for-werf-helm/pkg/tracker"
|
|
"github.com/werf/kubedog-for-werf-helm/pkg/trackers/rollout/multitrack"
|
|
"go.uber.org/zap"
|
|
"k8s.io/apimachinery/pkg/api/meta"
|
|
"k8s.io/client-go/discovery"
|
|
"k8s.io/client-go/discovery/cached/memory"
|
|
"k8s.io/client-go/dynamic"
|
|
"k8s.io/client-go/kubernetes"
|
|
"k8s.io/client-go/rest"
|
|
"k8s.io/client-go/restmapper"
|
|
"k8s.io/client-go/tools/clientcmd"
|
|
|
|
"github.com/helmfile/helmfile/pkg/resource"
|
|
)
|
|
|
|
type cacheKey struct {
|
|
kubeContext string
|
|
kubeconfig string
|
|
qps float32
|
|
burst int
|
|
}
|
|
|
|
type clientCacheEntry struct {
|
|
clientSet kubernetes.Interface
|
|
dynamicClient dynamic.Interface
|
|
restConfig *rest.Config
|
|
discovery discovery.CachedDiscoveryInterface
|
|
mapper meta.RESTMapper
|
|
}
|
|
|
|
var (
|
|
kubeInitMu sync.Mutex
|
|
clientCache = make(map[cacheKey]clientCacheEntry)
|
|
)
|
|
|
|
type Tracker struct {
|
|
logger *zap.SugaredLogger
|
|
clientSet kubernetes.Interface
|
|
dynamicClient dynamic.Interface
|
|
discovery discovery.CachedDiscoveryInterface
|
|
mapper meta.RESTMapper
|
|
trackOptions *TrackOptions
|
|
filter *resource.ResourceFilter
|
|
namespace string
|
|
}
|
|
|
|
type TrackerConfig struct {
|
|
Logger *zap.SugaredLogger
|
|
Namespace string
|
|
KubeContext string
|
|
Kubeconfig string
|
|
TrackOptions *TrackOptions
|
|
KubedogQPS *float32
|
|
KubedogBurst *int
|
|
}
|
|
|
|
func NewTracker(config *TrackerConfig) (*Tracker, error) {
|
|
logger := config.Logger
|
|
if logger == nil {
|
|
logger = zap.NewNop().Sugar()
|
|
}
|
|
|
|
kubeconfig := config.Kubeconfig
|
|
if kubeconfig == "" {
|
|
kubeconfig = os.Getenv("KUBECONFIG")
|
|
}
|
|
|
|
options := config.TrackOptions
|
|
if options == nil {
|
|
options = NewTrackOptions()
|
|
}
|
|
|
|
qps := options.QPS
|
|
if config.KubedogQPS != nil {
|
|
qps = *config.KubedogQPS
|
|
}
|
|
|
|
burst := options.Burst
|
|
if config.KubedogBurst != nil {
|
|
burst = *config.KubedogBurst
|
|
}
|
|
|
|
if qps <= 0 || math.IsInf(float64(qps), 0) || math.IsNaN(float64(qps)) {
|
|
return nil, fmt.Errorf("invalid kubedog QPS %v: must be > 0 and finite", qps)
|
|
}
|
|
if burst < 1 {
|
|
return nil, fmt.Errorf("invalid kubedog burst %v: must be >= 1", burst)
|
|
}
|
|
|
|
cacheEntry, err := getOrCreateClients(config.KubeContext, kubeconfig, qps, burst)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to initialize kubernetes clients: %w", err)
|
|
}
|
|
|
|
var filter *resource.ResourceFilter
|
|
if options.Filter != nil {
|
|
filter = resource.NewResourceFilter(options.Filter, logger)
|
|
}
|
|
|
|
return &Tracker{
|
|
logger: logger,
|
|
clientSet: cacheEntry.clientSet,
|
|
dynamicClient: cacheEntry.dynamicClient,
|
|
discovery: cacheEntry.discovery,
|
|
mapper: cacheEntry.mapper,
|
|
trackOptions: options,
|
|
filter: filter,
|
|
namespace: config.Namespace,
|
|
}, nil
|
|
}
|
|
|
|
func getOrCreateClients(kubeContext, kubeconfig string, qps float32, burst int) (clientCacheEntry, error) {
|
|
key := cacheKey{
|
|
kubeContext: kubeContext,
|
|
kubeconfig: kubeconfig,
|
|
qps: qps,
|
|
burst: burst,
|
|
}
|
|
|
|
kubeInitMu.Lock()
|
|
if cache, ok := clientCache[key]; ok {
|
|
kubeInitMu.Unlock()
|
|
return cache, nil
|
|
}
|
|
kubeInitMu.Unlock()
|
|
|
|
loadingRules := clientcmd.NewDefaultClientConfigLoadingRules()
|
|
if kubeconfig != "" {
|
|
loadingRules.ExplicitPath = kubeconfig
|
|
}
|
|
|
|
overrides := &clientcmd.ConfigOverrides{}
|
|
if kubeContext != "" {
|
|
overrides.CurrentContext = kubeContext
|
|
}
|
|
|
|
cc := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(loadingRules, overrides)
|
|
restConfig, err := cc.ClientConfig()
|
|
if err != nil {
|
|
return clientCacheEntry{}, fmt.Errorf("failed to load kubeconfig: %w", err)
|
|
}
|
|
|
|
restConfig.QPS = qps
|
|
restConfig.Burst = burst
|
|
|
|
clientSet, err := kubernetes.NewForConfig(restConfig)
|
|
if err != nil {
|
|
return clientCacheEntry{}, fmt.Errorf("failed to create kubernetes client: %w", err)
|
|
}
|
|
|
|
dynamicClient, err := dynamic.NewForConfig(restConfig)
|
|
if err != nil {
|
|
return clientCacheEntry{}, fmt.Errorf("failed to create dynamic client: %w", err)
|
|
}
|
|
|
|
discoveryClient := memory.NewMemCacheClient(clientSet.Discovery())
|
|
mapper := restmapper.NewDeferredDiscoveryRESTMapper(discoveryClient)
|
|
|
|
cache := clientCacheEntry{
|
|
clientSet: clientSet,
|
|
dynamicClient: dynamicClient,
|
|
restConfig: restConfig,
|
|
discovery: discoveryClient,
|
|
mapper: mapper,
|
|
}
|
|
|
|
kubeInitMu.Lock()
|
|
defer kubeInitMu.Unlock()
|
|
|
|
if existingCache, ok := clientCache[key]; ok {
|
|
return existingCache, nil
|
|
}
|
|
|
|
clientCache[key] = cache
|
|
|
|
return cache, nil
|
|
}
|
|
|
|
func (t *Tracker) TrackResources(ctx context.Context, resources []*resource.Resource) error {
|
|
if len(resources) == 0 {
|
|
t.logger.Info("No resources to track")
|
|
return nil
|
|
}
|
|
|
|
filtered := t.filterResources(resources)
|
|
if len(filtered) == 0 {
|
|
t.logger.Info("No resources to track after filtering")
|
|
return nil
|
|
}
|
|
|
|
t.logger.Infof("Tracking %d resources with kubedog (filtered from %d total)", len(filtered), len(resources))
|
|
|
|
specs := multitrack.MultitrackSpecs{}
|
|
|
|
for _, res := range filtered {
|
|
namespace := res.Namespace
|
|
if namespace == "" {
|
|
namespace = t.namespace
|
|
}
|
|
|
|
switch strings.ToLower(res.Kind) {
|
|
case "deployment", "deploy":
|
|
specs.Deployments = append(specs.Deployments, multitrack.MultitrackSpec{
|
|
ResourceName: res.Name,
|
|
Namespace: namespace,
|
|
SkipLogs: !t.trackOptions.Logs,
|
|
})
|
|
case "statefulset", "sts":
|
|
specs.StatefulSets = append(specs.StatefulSets, multitrack.MultitrackSpec{
|
|
ResourceName: res.Name,
|
|
Namespace: namespace,
|
|
SkipLogs: !t.trackOptions.Logs,
|
|
})
|
|
case "daemonset", "ds":
|
|
specs.DaemonSets = append(specs.DaemonSets, multitrack.MultitrackSpec{
|
|
ResourceName: res.Name,
|
|
Namespace: namespace,
|
|
SkipLogs: !t.trackOptions.Logs,
|
|
})
|
|
case "job":
|
|
specs.Jobs = append(specs.Jobs, multitrack.MultitrackSpec{
|
|
ResourceName: res.Name,
|
|
Namespace: namespace,
|
|
SkipLogs: !t.trackOptions.Logs,
|
|
})
|
|
case "canary":
|
|
specs.Canaries = append(specs.Canaries, multitrack.MultitrackSpec{
|
|
ResourceName: res.Name,
|
|
Namespace: namespace,
|
|
SkipLogs: !t.trackOptions.Logs,
|
|
})
|
|
default:
|
|
t.logger.Debugf("Skipping unsupported kind %s for resource %s/%s", res.Kind, namespace, res.Name)
|
|
}
|
|
}
|
|
|
|
totalResources := len(specs.Deployments) + len(specs.StatefulSets) +
|
|
len(specs.DaemonSets) + len(specs.Jobs) + len(specs.Canaries)
|
|
|
|
if totalResources == 0 {
|
|
t.logger.Info("No trackable resources found (only Deployment, StatefulSet, DaemonSet, Job, and Canary are supported)")
|
|
return nil
|
|
}
|
|
|
|
t.logger.Infof("Tracking breakdown: Deployments=%d, StatefulSets=%d, DaemonSets=%d, Jobs=%d, Canaries=%d",
|
|
len(specs.Deployments), len(specs.StatefulSets), len(specs.DaemonSets),
|
|
len(specs.Jobs), len(specs.Canaries))
|
|
|
|
opts := multitrack.MultitrackOptions{
|
|
Options: tracker.Options{
|
|
ParentContext: ctx,
|
|
Timeout: t.trackOptions.Timeout,
|
|
LogsFromTime: time.Now().Add(-t.trackOptions.LogsSince),
|
|
},
|
|
StatusProgressPeriod: 5 * time.Second,
|
|
DynamicClient: t.dynamicClient,
|
|
DiscoveryClient: t.discovery,
|
|
Mapper: t.mapper,
|
|
}
|
|
|
|
err := multitrack.Multitrack(t.clientSet, specs, opts)
|
|
if err != nil {
|
|
return fmt.Errorf("tracking failed: %w", err)
|
|
}
|
|
|
|
t.logger.Info("All resources tracked successfully")
|
|
return nil
|
|
}
|
|
|
|
func (t *Tracker) filterResources(resources []*resource.Resource) []*resource.Resource {
|
|
if t.filter == nil {
|
|
return resources
|
|
}
|
|
|
|
var result []*resource.Resource
|
|
for _, res := range resources {
|
|
if t.filter.ShouldTrack(res) {
|
|
result = append(result, res)
|
|
} else {
|
|
t.logger.Debugf("Skipping resource %s/%s (kind: %s) based on configuration", res.Namespace, res.Name, res.Kind)
|
|
}
|
|
}
|
|
return result
|
|
}
|