diff --git a/controllers/horizontal_runner_autoscaler_webhook.go b/controllers/horizontal_runner_autoscaler_webhook.go index 55bec2a8..62e76f78 100644 --- a/controllers/horizontal_runner_autoscaler_webhook.go +++ b/controllers/horizontal_runner_autoscaler_webhook.go @@ -29,7 +29,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/reconcile" "github.com/go-logr/logr" - gogithub "github.com/google/go-github/v36/github" + gogithub "github.com/google/go-github/v37/github" "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/tools/record" ctrl "sigs.k8s.io/controller-runtime" diff --git a/controllers/horizontal_runner_autoscaler_webhook_on_check_run.go b/controllers/horizontal_runner_autoscaler_webhook_on_check_run.go index 0aa7e53e..4153cd8e 100644 --- a/controllers/horizontal_runner_autoscaler_webhook_on_check_run.go +++ b/controllers/horizontal_runner_autoscaler_webhook_on_check_run.go @@ -3,7 +3,7 @@ package controllers import ( "github.com/actions-runner-controller/actions-runner-controller/api/v1alpha1" "github.com/actions-runner-controller/actions-runner-controller/pkg/actionsglob" - "github.com/google/go-github/v36/github" + "github.com/google/go-github/v37/github" ) func (autoscaler *HorizontalRunnerAutoscalerGitHubWebhook) MatchCheckRunEvent(event *github.CheckRunEvent) func(scaleUpTrigger v1alpha1.ScaleUpTrigger) bool { diff --git a/controllers/horizontal_runner_autoscaler_webhook_on_pull_request.go b/controllers/horizontal_runner_autoscaler_webhook_on_pull_request.go index 95475e90..a728b2fb 100644 --- a/controllers/horizontal_runner_autoscaler_webhook_on_pull_request.go +++ b/controllers/horizontal_runner_autoscaler_webhook_on_pull_request.go @@ -2,7 +2,7 @@ package controllers import ( "github.com/actions-runner-controller/actions-runner-controller/api/v1alpha1" - "github.com/google/go-github/v36/github" + "github.com/google/go-github/v37/github" ) func (autoscaler *HorizontalRunnerAutoscalerGitHubWebhook) MatchPullRequestEvent(event *github.PullRequestEvent) func(scaleUpTrigger v1alpha1.ScaleUpTrigger) bool { diff --git a/controllers/horizontal_runner_autoscaler_webhook_on_push.go b/controllers/horizontal_runner_autoscaler_webhook_on_push.go index e7e39b31..2e041513 100644 --- a/controllers/horizontal_runner_autoscaler_webhook_on_push.go +++ b/controllers/horizontal_runner_autoscaler_webhook_on_push.go @@ -2,7 +2,7 @@ package controllers import ( "github.com/actions-runner-controller/actions-runner-controller/api/v1alpha1" - "github.com/google/go-github/v36/github" + "github.com/google/go-github/v37/github" ) func (autoscaler *HorizontalRunnerAutoscalerGitHubWebhook) MatchPushEvent(event *github.PushEvent) func(scaleUpTrigger v1alpha1.ScaleUpTrigger) bool { diff --git a/controllers/horizontal_runner_autoscaler_webhook_test.go b/controllers/horizontal_runner_autoscaler_webhook_test.go index e54025ff..d97850c6 100644 --- a/controllers/horizontal_runner_autoscaler_webhook_test.go +++ b/controllers/horizontal_runner_autoscaler_webhook_test.go @@ -15,7 +15,7 @@ import ( actionsv1alpha1 "github.com/actions-runner-controller/actions-runner-controller/api/v1alpha1" "github.com/go-logr/logr" - "github.com/google/go-github/v36/github" + "github.com/google/go-github/v37/github" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" clientgoscheme "k8s.io/client-go/kubernetes/scheme" diff --git a/controllers/integration_test.go b/controllers/integration_test.go index f3f6002b..deadc086 100644 --- a/controllers/integration_test.go +++ b/controllers/integration_test.go @@ -8,7 +8,7 @@ import ( "time" github2 "github.com/actions-runner-controller/actions-runner-controller/github" - "github.com/google/go-github/v36/github" + "github.com/google/go-github/v37/github" "github.com/actions-runner-controller/actions-runner-controller/github/fake" diff --git a/controllers/runner_controller.go b/controllers/runner_controller.go index 870afa29..63490916 100644 --- a/controllers/runner_controller.go +++ b/controllers/runner_controller.go @@ -24,7 +24,7 @@ import ( "time" "github.com/actions-runner-controller/actions-runner-controller/hash" - gogithub "github.com/google/go-github/v36/github" + gogithub "github.com/google/go-github/v37/github" "k8s.io/apimachinery/pkg/util/wait" "github.com/go-logr/logr" diff --git a/controllers/runner_pod_controller.go b/controllers/runner_pod_controller.go index 70de5924..0489faed 100644 --- a/controllers/runner_pod_controller.go +++ b/controllers/runner_pod_controller.go @@ -22,7 +22,7 @@ import ( "fmt" "time" - gogithub "github.com/google/go-github/v36/github" + gogithub "github.com/google/go-github/v37/github" "k8s.io/apimachinery/pkg/util/wait" "github.com/go-logr/logr" diff --git a/controllers/runnerreplicaset_controller.go b/controllers/runnerreplicaset_controller.go index 2465b0e0..7aff1fd5 100644 --- a/controllers/runnerreplicaset_controller.go +++ b/controllers/runnerreplicaset_controller.go @@ -23,7 +23,7 @@ import ( "reflect" "time" - gogithub "github.com/google/go-github/v36/github" + gogithub "github.com/google/go-github/v37/github" "github.com/go-logr/logr" kerrors "k8s.io/apimachinery/pkg/api/errors" diff --git a/github/fake/runners.go b/github/fake/runners.go index c785b365..986f9c2a 100644 --- a/github/fake/runners.go +++ b/github/fake/runners.go @@ -8,7 +8,7 @@ import ( "github.com/actions-runner-controller/actions-runner-controller/api/v1alpha1" - "github.com/google/go-github/v36/github" + "github.com/google/go-github/v37/github" "github.com/gorilla/mux" ) diff --git a/github/github.go b/github/github.go index 017efefb..75e75138 100644 --- a/github/github.go +++ b/github/github.go @@ -12,7 +12,7 @@ import ( "github.com/actions-runner-controller/actions-runner-controller/github/metrics" "github.com/bradleyfalzon/ghinstallation" - "github.com/google/go-github/v36/github" + "github.com/google/go-github/v37/github" "golang.org/x/oauth2" ) diff --git a/github/github_test.go b/github/github_test.go index 6ba3aa0b..54e9349d 100644 --- a/github/github_test.go +++ b/github/github_test.go @@ -8,7 +8,7 @@ import ( "time" "github.com/actions-runner-controller/actions-runner-controller/github/fake" - "github.com/google/go-github/v36/github" + "github.com/google/go-github/v37/github" ) var server *httptest.Server diff --git a/go.mod b/go.mod index c7e017aa..e3d9b7fd 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,7 @@ require ( github.com/davecgh/go-spew v1.1.1 github.com/go-logr/logr v0.4.0 github.com/google/go-cmp v0.5.6 - github.com/google/go-github/v36 v36.0.0 + github.com/google/go-github/v37 v37.0.0 github.com/gorilla/mux v1.8.0 github.com/kelseyhightower/envconfig v1.4.0 github.com/onsi/ginkgo v1.16.4 @@ -23,3 +23,5 @@ require ( sigs.k8s.io/controller-runtime v0.9.0 sigs.k8s.io/yaml v1.2.0 ) + +replace github.com/google/go-github/v37 => github.com/google/go-github/v37 v37.0.1-0.20210713230028-465df60a8ec3 diff --git a/go.sum b/go.sum index 68b0f4d4..4a972f07 100644 --- a/go.sum +++ b/go.sum @@ -174,8 +174,8 @@ github.com/google/go-cmp v0.5.6 h1:BKbKCqvP6I+rmFHt06ZmyQtvB8xAkWdhFyr0ZUNZcxQ= github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-github/v29 v29.0.2 h1:opYN6Wc7DOz7Ku3Oh4l7prmkOMwEcQxpFtxdU8N8Pts= github.com/google/go-github/v29 v29.0.2/go.mod h1:CHKiKKPHJ0REzfwc14QMklvtHwCveD0PxlMjLlzAM5E= -github.com/google/go-github/v36 v36.0.0 h1:ndCzM616/oijwufI7nBRa+5eZHLldT+4yIB68ib5ogs= -github.com/google/go-github/v36 v36.0.0/go.mod h1:LFlKC047IOqiglRGNqNb9s/iAPTnnjtlshm+bxp+kwk= +github.com/google/go-github/v37 v37.0.1-0.20210713230028-465df60a8ec3 h1:YVfdOQRQ95EjQz0qpGdw9LIzJUflL4FV0EEX3fZ7fH8= +github.com/google/go-github/v37 v37.0.1-0.20210713230028-465df60a8ec3/go.mod h1:LM7in3NmXDrX58GbEHy7FtNLbI2JijX93RnMKvWG3m4= github.com/google/go-querystring v1.0.0 h1:Xkwi/a1rcvNg1PPYe5vI8GbeBY/jrVuDX5ASuANWTrk= github.com/google/go-querystring v1.0.0/go.mod h1:odCYkC5MyYFN7vkCjXpyrEuKhc/BUO6wN/zVPAxq5ck= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= diff --git a/pkg/hookdeliveryforwarder/README.md b/pkg/hookdeliveryforwarder/README.md new file mode 100644 index 00000000..a6414449 --- /dev/null +++ b/pkg/hookdeliveryforwarder/README.md @@ -0,0 +1,8 @@ +`hookdeliveryforwarder` is currently in a Proof-of-Concept phase, not complete, and not supported. +That being said, we are likely accept bug reports with concrete reproduction steps, but usage/support issues. + +To use this, you need to write some Kubernetes manifest and a container image for deployment. + +For other information, please see the original pull request introduced it. + +https://github.com/actions-runner-controller/actions-runner-controller/pull/682 diff --git a/pkg/hookdeliveryforwarder/checkpointer.go b/pkg/hookdeliveryforwarder/checkpointer.go new file mode 100644 index 00000000..73b8aeaa --- /dev/null +++ b/pkg/hookdeliveryforwarder/checkpointer.go @@ -0,0 +1,30 @@ +package hookdeliveryforwarder + +import "time" + +type Checkpointer interface { + GetOrCreate(hookID int64) (*State, error) + Update(hookID int64, pos *State) error +} + +type InMemoryCheckpointer struct { + t time.Time + id int64 +} + +func (p *InMemoryCheckpointer) GetOrCreate(hookID int64) (*State, error) { + return &State{DeliveredAt: p.t}, nil +} + +func (p *InMemoryCheckpointer) Update(hookID int64, pos *State) error { + p.t = pos.DeliveredAt + p.id = pos.ID + + return nil +} + +func NewInMemoryLogPositionProvider() Checkpointer { + return &InMemoryCheckpointer{ + t: time.Now(), + } +} diff --git a/pkg/hookdeliveryforwarder/cmd/main.go b/pkg/hookdeliveryforwarder/cmd/main.go new file mode 100644 index 00000000..03bb2d36 --- /dev/null +++ b/pkg/hookdeliveryforwarder/cmd/main.go @@ -0,0 +1,95 @@ +package main + +import ( + "flag" + "fmt" + "os" + + "github.com/actions-runner-controller/actions-runner-controller/pkg/hookdeliveryforwarder" + "github.com/actions-runner-controller/actions-runner-controller/pkg/hookdeliveryforwarder/configmap" + "github.com/go-logr/logr" + zaplib "go.uber.org/zap" + + "k8s.io/apimachinery/pkg/runtime" + clientgoscheme "k8s.io/client-go/kubernetes/scheme" + _ "k8s.io/client-go/plugin/pkg/client/auth/gcp" + + "sigs.k8s.io/controller-runtime/pkg/log/zap" +) + +const ( + logLevelDebug = "debug" + logLevelInfo = "info" + logLevelWarn = "warn" + logLevelError = "error" +) + +var ( + scheme = runtime.NewScheme() +) + +func init() { + _ = clientgoscheme.AddToScheme(scheme) +} + +func main() { + var ( + logLevel string + + checkpointerConfig configmap.Config + ) + + flag.StringVar(&logLevel, "log-level", logLevelDebug, `The verbosity of the logging. Valid values are "debug", "info", "warn", "error". Defaults to "debug".`) + + checkpointerConfig.InitFlags(flag.CommandLine) + + config := &hookdeliveryforwarder.Config{} + + config.InitFlags((flag.CommandLine)) + + flag.Parse() + + logger := newZapLogger(logLevel) + + checkpointerConfig.Scheme = scheme + checkpointerConfig.Logger = logger + + p, mgr, err := configmap.New(&checkpointerConfig) + if err != nil { + fmt.Fprintf(os.Stderr, "%v\n", err) + os.Exit(1) + } + + // TODO: Set to something that is backed by a CRD so that + // restarting the forwarder doesn't result in missing deliveries. + config.Checkpointer = p + + ctx := hookdeliveryforwarder.SetupSignalHandler() + + go func() { + if err := mgr.Start(ctx); err != nil { + fmt.Fprintf(os.Stderr, "problem running manager: %v\n", err) + os.Exit(1) + } + }() + + hookdeliveryforwarder.Run(ctx, config) +} + +func newZapLogger(logLevel string) logr.Logger { + return zap.New(func(o *zap.Options) { + switch logLevel { + case logLevelDebug: + o.Development = true + case logLevelInfo: + lvl := zaplib.NewAtomicLevelAt(zaplib.InfoLevel) + o.Level = &lvl + case logLevelWarn: + lvl := zaplib.NewAtomicLevelAt(zaplib.WarnLevel) + o.Level = &lvl + case logLevelError: + lvl := zaplib.NewAtomicLevelAt(zaplib.ErrorLevel) + o.Level = &lvl + } + }) +} diff --git a/pkg/hookdeliveryforwarder/config.go b/pkg/hookdeliveryforwarder/config.go new file mode 100644 index 00000000..57de67a7 --- /dev/null +++ b/pkg/hookdeliveryforwarder/config.go @@ -0,0 +1,116 @@ +package hookdeliveryforwarder + +import ( + "context" + "errors" + "flag" + "fmt" + "net/http" + "os" + "sync" + + "github.com/actions-runner-controller/actions-runner-controller/github" + "github.com/kelseyhightower/envconfig" +) + +type Config struct { + Rules StringSlice + MetricsAddr string + GitHubConfig github.Config + Checkpointer Checkpointer +} + +func (config *Config) InitFlags(fs *flag.FlagSet) { + if err := envconfig.Process("github", &config.GitHubConfig); err != nil { + fmt.Fprintln(os.Stderr, "Error: Environment variable read failed.") + } + + flag.StringVar(&config.MetricsAddr, "metrics-addr", ":8000", "The address the metric endpoint binds to.") + flag.Var(&config.Rules, "rule", "The rule denotes from where webhook deliveries forwarded and to where they are forwarded. Must be formatted REPO=TARGET where REPO can be just the organization name for a repostory hook or \"owner/repo\" for a repository hook.") + flag.StringVar(&config.GitHubConfig.Token, "github-token", config.GitHubConfig.Token, "The personal access token of GitHub.") + flag.Int64Var(&config.GitHubConfig.AppID, "github-app-id", config.GitHubConfig.AppID, "The application ID of GitHub App.") + flag.Int64Var(&config.GitHubConfig.AppInstallationID, "github-app-installation-id", config.GitHubConfig.AppInstallationID, "The installation ID of GitHub App.") + flag.StringVar(&config.GitHubConfig.AppPrivateKey, "github-app-private-key", config.GitHubConfig.AppPrivateKey, "The path of a private key file to authenticate as a GitHub App") +} + +func Run(ctx context.Context, config *Config) { + c := config.GitHubConfig + + ghClient, err := c.NewClient() + if err != nil { + fmt.Fprintln(os.Stderr, "Error: Client creation failed.", err) + os.Exit(1) + } + + var wg sync.WaitGroup + + ctx, cancel := context.WithCancel(ctx) + + fwd, err := New(ghClient, []string(config.Rules)) + if err != nil { + fmt.Fprintf(os.Stderr, "problem initializing forwarder: %v\n", err) + os.Exit(1) + } + + if config.Checkpointer != nil { + fwd.Checkpointer = config.Checkpointer + } + + mux := http.NewServeMux() + mux.HandleFunc("/readyz", fwd.HandleReadyz) + + srv := http.Server{ + Addr: config.MetricsAddr, + Handler: mux, + } + + wg.Add(1) + go func() { + defer cancel() + defer wg.Done() + + if err := fwd.Run(ctx); err != nil { + fmt.Fprintf(os.Stderr, "problem running forwarder: %v\n", err) + } + }() + + wg.Add(1) + go func() { + defer cancel() + defer wg.Done() + + go func() { + <-ctx.Done() + + srv.Shutdown(context.Background()) + }() + + if err := srv.ListenAndServe(); err != nil { + if !errors.Is(err, http.ErrServerClosed) { + fmt.Fprintf(os.Stderr, "problem running http server: %v\n", err) + } + } + }() + + go func() { + <-ctx.Done() + cancel() + }() + + wg.Wait() +} + +type StringSlice []string + +func (s *StringSlice) String() string { + if s == nil { + return "" + } + + return fmt.Sprintf("%+v", []string(*s)) +} + +func (s *StringSlice) Set(value string) error { + *s = append(*s, value) + return nil +} diff --git a/pkg/hookdeliveryforwarder/configmap/checkpointer.go b/pkg/hookdeliveryforwarder/configmap/checkpointer.go new file mode 100644 index 00000000..78037dd3 --- /dev/null +++ b/pkg/hookdeliveryforwarder/configmap/checkpointer.go @@ -0,0 +1,100 @@ +package configmap + +import ( + "context" + "encoding/json" + "fmt" + "time" + + corev1 "k8s.io/api/core/v1" + kerrors "k8s.io/apimachinery/pkg/api/errors" + + "github.com/actions-runner-controller/actions-runner-controller/pkg/hookdeliveryforwarder" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +type ConfigMapCheckpointer struct { + Name string + NS string + Client client.Client +} + +type state struct { + DeliveredAt time.Time `json:"delivered_at"` + ID int64 `json:"id"` +} + +func (p *ConfigMapCheckpointer) GetOrCreate(hookID int64) (*hookdeliveryforwarder.State, error) { + var cm corev1.ConfigMap + + if err := p.Client.Get(context.Background(), types.NamespacedName{Namespace: p.NS, Name: p.Name}, &cm); err != nil { + if !kerrors.IsNotFound(err) { + return nil, err + } + + cm.Name = p.Name + cm.Namespace = p.NS + + if err := p.Client.Create(context.Background(), &cm); err != nil { + return nil, err + } + } + + idStr := fmt.Sprintf("hook_%d", hookID) + + var unmarshalled state + + data, ok := cm.Data[idStr] + + if ok { + if err := json.Unmarshal([]byte(data), &unmarshalled); err != nil { + return nil, err + } + } + + pos := &hookdeliveryforwarder.State{ + DeliveredAt: unmarshalled.DeliveredAt, + ID: unmarshalled.ID, + } + + if pos.DeliveredAt.IsZero() { + pos.DeliveredAt = time.Now() + } + + return pos, nil +} + +func (p *ConfigMapCheckpointer) Update(hookID int64, pos *hookdeliveryforwarder.State) error { + var cm corev1.ConfigMap + + if err := p.Client.Get(context.Background(), types.NamespacedName{Namespace: p.NS, Name: p.Name}, &cm); err != nil { + return err + } + + var posData state + + posData.DeliveredAt = pos.DeliveredAt + posData.ID = pos.ID + + idStr := fmt.Sprintf("hook_%d", hookID) + + data, err := json.Marshal(posData) + if err != nil { + return err + } + + copy := cm.DeepCopy() + + if copy.Data == nil { + copy.Data = map[string]string{} + } + + copy.Data[idStr] = string(data) + + if err := p.Client.Patch(context.Background(), copy, client.MergeFrom(&cm)); err != nil { + return err + } + + return nil +} diff --git a/pkg/hookdeliveryforwarder/configmap/config.go b/pkg/hookdeliveryforwarder/configmap/config.go new file mode 100644 index 00000000..7059c8a7 --- /dev/null +++ b/pkg/hookdeliveryforwarder/configmap/config.go @@ -0,0 +1,42 @@ +package configmap + +import ( + "flag" + "fmt" + + "github.com/go-logr/logr" + "k8s.io/apimachinery/pkg/runtime" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/manager" +) + +type Config struct { + Name string + Namespace string + Logger logr.Logger + Scheme *runtime.Scheme +} + +func (c *Config) InitFlags(fs *flag.FlagSet) { + fs.StringVar(&c.Name, "configmap-name", "gh-webhook-forwarder", `The name of the Kubernetes ConfigMap to which store state for check-pointing.`) + fs.StringVar(&c.Namespace, "namespace", "default", `The Kubernetes namespace to store configmap for check-pointing.`) +} + +func New(checkpointerConfig *Config) (*ConfigMapCheckpointer, manager.Manager, error) { + ctrl.SetLogger(checkpointerConfig.Logger) + + mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{ + Scheme: checkpointerConfig.Scheme, + LeaderElectionID: "hookdeliveryforwarder", + Port: 9443, + }) + if err != nil { + return nil, nil, fmt.Errorf("unable to start manager: %v", err) + } + + return &ConfigMapCheckpointer{ + Client: mgr.GetClient(), + Name: checkpointerConfig.Name, + NS: checkpointerConfig.Namespace, + }, mgr, nil +} diff --git a/pkg/hookdeliveryforwarder/forwarder.go b/pkg/hookdeliveryforwarder/forwarder.go new file mode 100644 index 00000000..1415b85e --- /dev/null +++ b/pkg/hookdeliveryforwarder/forwarder.go @@ -0,0 +1,253 @@ +package hookdeliveryforwarder + +import ( + "bytes" + "context" + "errors" + "fmt" + "net/http" + "os" + "sort" + "strings" + "time" + + "github.com/actions-runner-controller/actions-runner-controller/github" + gogithub "github.com/google/go-github/v37/github" +) + +type Forwarder struct { + Repo string + Target string + + Hook gogithub.Hook + + PollingDelay time.Duration + + Client *github.Client + + Checkpointer Checkpointer + + logger +} + +type persistentError struct { + Err error +} + +func (e persistentError) Error() string { + return fmt.Sprintf("%v", e.Err) +} + +func (f *Forwarder) Run(ctx context.Context) error { + pollingDelay := 10 * time.Second + if f.PollingDelay > 0 { + pollingDelay = f.PollingDelay + } + + segments := strings.Split(f.Repo, "/") + + owner := segments[0] + + var repo string + + if len(segments) > 1 { + repo = segments[1] + } + + hooksAPI := newHooksAPI(f.Client.Client, owner, repo) + + hooks, _, err := hooksAPI.ListHooks(ctx, nil) + if err != nil { + f.Errorf("Failed listing hooks: %v", err) + + return err + } + + var hook *gogithub.Hook + + for i := range hooks { + hook = hooks[i] + break + } + + if hook == nil { + hookConfig := &f.Hook + + if _, ok := hookConfig.Config["url"]; !ok { + return persistentError{Err: fmt.Errorf("config.url is missing in the hook config")} + } + + if _, ok := hookConfig.Config["content_type"]; !ok { + hookConfig.Config["content_type"] = "json" + } + + if _, ok := hookConfig.Config["insecure_ssl"]; !ok { + hookConfig.Config["insecure_ssl"] = 0 + } + + if _, ok := hookConfig.Config["secret"]; !ok { + hookConfig.Config["secret"] = os.Getenv("GITHUB_HOOK_SECRET") + } + + if len(hookConfig.Events) == 0 { + hookConfig.Events = []string{"check_run", "push"} + } + + if hookConfig.Active == nil { + hookConfig.Active = gogithub.Bool(true) + } + + h, _, err := hooksAPI.CreateHook(ctx, hookConfig) + if err != nil { + f.Errorf("Failed creating hook: %v", err) + + return persistentError{Err: err} + } + + hook = h + } + + f.Logf("Using this hook for receiving deliveries to be forwarded: %+v", *hook) + + hookDeliveries := newHookDeliveriesAPI(f.Client.Client, owner, repo, hook.GetID()) + + cur, err := f.Checkpointer.GetOrCreate(hook.GetID()) + if err != nil { + f.Errorf("Failed to get or create log position: %v", err) + + return persistentError{Err: err} + } + +LOOP: + for { + var ( + err error + payloads [][]byte + ) + + payloads, cur, err = f.getUnprocessedDeliveries(ctx, hookDeliveries, *cur) + if err != nil { + f.Errorf("failed getting unprocessed deliveries: %v", err) + + if errors.Is(err, context.Canceled) { + return err + } + } + + for _, p := range payloads { + if _, err := http.Post(f.Target, "application/json", bytes.NewReader(p)); err != nil { + f.Errorf("failed forwarding delivery: %v", err) + + retryDelay := 5 * time.Second + t := time.NewTimer(retryDelay) + + select { + case <-t.C: + t.Stop() + case <-ctx.Done(): + t.Stop() + + return ctx.Err() + } + + continue LOOP + } else { + f.Logf("Successfully POSTed the payload to %s", f.Target) + } + } + + if err := f.Checkpointer.Update(hook.GetID(), cur); err != nil { + return fmt.Errorf("failed updating checkpoint: %w", err) + } + + t := time.NewTimer(pollingDelay) + + select { + case <-t.C: + t.Stop() + case <-ctx.Done(): + t.Stop() + + return ctx.Err() + } + } +} + +type State struct { + DeliveredAt time.Time + ID int64 +} + +func (f *Forwarder) getUnprocessedDeliveries(ctx context.Context, hookDeliveries *hookDeliveriesAPI, pos State) ([][]byte, *State, error) { + var ( + opts gogithub.ListCursorOptions + ) + + opts.PerPage = 2 + + var deliveries []*gogithub.HookDelivery + +OUTER: + for { + ds, resp, err := hookDeliveries.ListHookDeliveries(ctx, &opts) + if err != nil { + return nil, nil, err + } + + opts.Cursor = resp.Cursor + + for _, d := range ds { + d, _, err := hookDeliveries.GetHookDelivery(ctx, d.GetID()) + if err != nil { + return nil, nil, err + } + + payload, err := d.ParseRequestPayload() + if err != nil { + return nil, nil, err + } + + id := d.GetID() + deliveredAt := d.GetDeliveredAt() + + if !pos.DeliveredAt.IsZero() && deliveredAt.Before(pos.DeliveredAt) { + f.Logf("%s is before %s so skipping all the remaining deliveries", deliveredAt, pos.DeliveredAt) + break OUTER + } + + if pos.ID != 0 && id <= pos.ID { + break OUTER + } + + deliveries = append(deliveries, d) + + f.Logf("Received %T at %s: %v", payload, deliveredAt, payload) + + if deliveredAt.After(pos.DeliveredAt) { + pos.DeliveredAt = deliveredAt.Time + } + + if id > pos.ID { + pos.ID = id + } + } + + if opts.Cursor == "" { + break + } + + time.Sleep(1 * time.Second) + } + + sort.Slice(deliveries, func(a, b int) bool { + return deliveries[b].GetDeliveredAt().After(deliveries[a].GetDeliveredAt().Time) + }) + + var payloads [][]byte + + for _, d := range deliveries { + payloads = append(payloads, *d.Request.RawPayload) + } + + return payloads, &pos, nil +} diff --git a/pkg/hookdeliveryforwarder/hooks.go b/pkg/hookdeliveryforwarder/hooks.go new file mode 100644 index 00000000..acbf7a8d --- /dev/null +++ b/pkg/hookdeliveryforwarder/hooks.go @@ -0,0 +1,46 @@ +package hookdeliveryforwarder + +import ( + "context" + + gogithub "github.com/google/go-github/v37/github" +) + +type hooksAPI struct { + ListHooks func(ctx context.Context, opts *gogithub.ListOptions) ([]*gogithub.Hook, *gogithub.Response, error) + CreateHook func(ctx context.Context, hook *gogithub.Hook) (*gogithub.Hook, *gogithub.Response, error) +} + +func newHooksAPI(client *gogithub.Client, org, repo string) *hooksAPI { + var hooksAPI *hooksAPI + + if repo != "" { + hooksAPI = repoHooksAPI(client.Repositories, org, repo) + } else { + hooksAPI = orgHooksAPI(client.Organizations, org) + } + + return hooksAPI +} + +func repoHooksAPI(svc *gogithub.RepositoriesService, org, repo string) *hooksAPI { + return &hooksAPI{ + ListHooks: func(ctx context.Context, opts *gogithub.ListOptions) ([]*gogithub.Hook, *gogithub.Response, error) { + return svc.ListHooks(ctx, org, repo, opts) + }, + CreateHook: func(ctx context.Context, hook *gogithub.Hook) (*gogithub.Hook, *gogithub.Response, error) { + return svc.CreateHook(ctx, org, repo, hook) + }, + } +} + +func orgHooksAPI(svc *gogithub.OrganizationsService, org string) *hooksAPI { + return &hooksAPI{ + ListHooks: func(ctx context.Context, opts *gogithub.ListOptions) ([]*gogithub.Hook, *gogithub.Response, error) { + return svc.ListHooks(ctx, org, opts) + }, + CreateHook: func(ctx context.Context, hook *gogithub.Hook) (*gogithub.Hook, *gogithub.Response, error) { + return svc.CreateHook(ctx, org, hook) + }, + } +} diff --git a/pkg/hookdeliveryforwarder/hooks_deliveries.go b/pkg/hookdeliveryforwarder/hooks_deliveries.go new file mode 100644 index 00000000..4eff3611 --- /dev/null +++ b/pkg/hookdeliveryforwarder/hooks_deliveries.go @@ -0,0 +1,46 @@ +package hookdeliveryforwarder + +import ( + "context" + + gogithub "github.com/google/go-github/v37/github" +) + +type hookDeliveriesAPI struct { + GetHookDelivery func(ctx context.Context, id int64) (*gogithub.HookDelivery, *gogithub.Response, error) + ListHookDeliveries func(ctx context.Context, opts *gogithub.ListCursorOptions) ([]*gogithub.HookDelivery, *gogithub.Response, error) +} + +func newHookDeliveriesAPI(client *gogithub.Client, org, repo string, hookID int64) *hookDeliveriesAPI { + var hookDeliveries *hookDeliveriesAPI + + if repo != "" { + hookDeliveries = repoHookDeliveriesAPI(client.Repositories, org, repo, hookID) + } else { + hookDeliveries = orgHookDeliveriesAPI(client.Organizations, org, hookID) + } + + return hookDeliveries +} + +func repoHookDeliveriesAPI(svc *gogithub.RepositoriesService, org, repo string, hookID int64) *hookDeliveriesAPI { + return &hookDeliveriesAPI{ + GetHookDelivery: func(ctx context.Context, id int64) (*gogithub.HookDelivery, *gogithub.Response, error) { + return svc.GetHookDelivery(ctx, org, repo, hookID, id) + }, + ListHookDeliveries: func(ctx context.Context, opts *gogithub.ListCursorOptions) ([]*gogithub.HookDelivery, *gogithub.Response, error) { + return svc.ListHookDeliveries(ctx, org, repo, hookID, opts) + }, + } +} + +func orgHookDeliveriesAPI(svc *gogithub.OrganizationsService, org string, hookID int64) *hookDeliveriesAPI { + return &hookDeliveriesAPI{ + GetHookDelivery: func(ctx context.Context, id int64) (*gogithub.HookDelivery, *gogithub.Response, error) { + return svc.GetHookDelivery(ctx, org, hookID, id) + }, + ListHookDeliveries: func(ctx context.Context, opts *gogithub.ListCursorOptions) ([]*gogithub.HookDelivery, *gogithub.Response, error) { + return svc.ListHookDeliveries(ctx, org, hookID, opts) + }, + } +} diff --git a/pkg/hookdeliveryforwarder/logger.go b/pkg/hookdeliveryforwarder/logger.go new file mode 100644 index 00000000..33b12761 --- /dev/null +++ b/pkg/hookdeliveryforwarder/logger.go @@ -0,0 +1,17 @@ +package hookdeliveryforwarder + +import ( + "fmt" + "os" +) + +type logger struct { +} + +func (f logger) Logf(format string, args ...interface{}) { + fmt.Fprintf(os.Stdout, format+"\n", args...) +} + +func (f logger) Errorf(format string, args ...interface{}) { + fmt.Fprintf(os.Stderr, format+"\n", args...) +} diff --git a/pkg/hookdeliveryforwarder/multiforwarder.go b/pkg/hookdeliveryforwarder/multiforwarder.go new file mode 100644 index 00000000..22829b17 --- /dev/null +++ b/pkg/hookdeliveryforwarder/multiforwarder.go @@ -0,0 +1,143 @@ +package hookdeliveryforwarder + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + "sync" + + "github.com/actions-runner-controller/actions-runner-controller/github" + gogithub "github.com/google/go-github/v37/github" +) + +type MultiForwarder struct { + client *github.Client + + Rules []Rule + + Checkpointer Checkpointer + + logger +} + +type RuleConfig struct { + Repo []string `json:"from"` + Target string `json:"to"` + Hook gogithub.Hook `json:"hook"` +} + +type Rule struct { + Repo string + Target string + Hook gogithub.Hook +} + +func New(client *github.Client, rules []string) (*MultiForwarder, error) { + var srv MultiForwarder + + for _, r := range rules { + var rule RuleConfig + + if err := json.Unmarshal([]byte(r), &rule); err != nil { + return nil, fmt.Errorf("failed unmarshalling %s: %w", r, err) + } + + if len(rule.Repo) == 0 { + return nil, fmt.Errorf("there must be one or more sources configured via `--repo \"from=SOURCE1,SOURCE2,... to=DEST1,DEST2,...\". got %q", r) + } + + if rule.Target == "" { + return nil, fmt.Errorf("there must be one destination configured via `--repo \"from=SOURCE to=DEST1,DEST2,...\". got %q", r) + } + + for _, repo := range rule.Repo { + srv.Rules = append(srv.Rules, Rule{ + Repo: repo, + Target: rule.Target, + Hook: rule.Hook, + }) + } + } + + srv.client = client + srv.Checkpointer = NewInMemoryLogPositionProvider() + + return &srv, nil +} + +func (f *MultiForwarder) Run(ctx context.Context) error { + var wg sync.WaitGroup + + errs := make(chan error, len(f.Rules)) + + for _, r := range f.Rules { + r := r + wg.Add(1) + go func() { + defer wg.Done() + + errs <- f.run(ctx, r) + }() + } + + wg.Wait() + + select { + case err := <-errs: + return err + default: + return nil + } +} + +func (f *MultiForwarder) run(ctx context.Context, rule Rule) error { + i := &Forwarder{ + Repo: rule.Repo, + Target: rule.Target, + Hook: rule.Hook, + Client: f.client, + Checkpointer: f.Checkpointer, + } + + return i.Run(ctx) +} + +func (f *MultiForwarder) HandleReadyz(w http.ResponseWriter, r *http.Request) { + var ( + ok bool + + err error + ) + + defer func() { + if !ok { + w.WriteHeader(http.StatusInternalServerError) + + if err != nil { + msg := err.Error() + if _, err := w.Write([]byte(msg)); err != nil { + f.Errorf("failed writing http error response: %v", err) + } + } + } + }() + + defer func() { + if r.Body != nil { + r.Body.Close() + } + }() + + // respond ok to GET / e.g. for health check + if r.Method == http.MethodGet { + fmt.Fprintln(w, "webhook server is running") + return + } + + w.WriteHeader(http.StatusOK) + + if _, err := w.Write([]byte("ok")); err != nil { + f.Errorf("failed writing http response: %v", err) + } +} diff --git a/pkg/hookdeliveryforwarder/signal.go b/pkg/hookdeliveryforwarder/signal.go new file mode 100644 index 00000000..73060bff --- /dev/null +++ b/pkg/hookdeliveryforwarder/signal.go @@ -0,0 +1,48 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package hookdeliveryforwarder + +import ( + "context" + "os" + "os/signal" + "syscall" +) + +var onlyOneSignalHandler = make(chan struct{}) + +var shutdownSignals = []os.Signal{os.Interrupt, syscall.SIGTERM} + +// SetupSignalHandler registers for SIGTERM and SIGINT. A stop channel is returned +// which is closed on one of these signals. If a second signal is caught, the program +// is terminated with exit code 1. +func SetupSignalHandler() context.Context { + close(onlyOneSignalHandler) // panics when called twice + + ctx, cancel := context.WithCancel(context.Background()) + + c := make(chan os.Signal, 2) + signal.Notify(c, shutdownSignals...) + go func() { + <-c + cancel() + <-c + os.Exit(1) // second signal. Exit directly. + }() + + return ctx +}