From 67c7b7a22850a33f49e9487e583e41852f056057 Mon Sep 17 00:00:00 2001 From: Yusuke Kuoka Date: Fri, 24 Sep 2021 00:40:08 +0000 Subject: [PATCH] Bump chart version to 0.13.1 with controller 0.20.1 --- charts/actions-runner-controller/Chart.yaml | 4 +- .../cmd/main.go | 136 ++++++++++++ .../githubwebhookdelivery.go | 202 ++++++++++++++++++ 3 files changed, 340 insertions(+), 2 deletions(-) create mode 100644 pkg/githubwebhookdeliveryforwarder/cmd/main.go create mode 100644 pkg/githubwebhookdeliveryforwarder/githubwebhookdelivery.go diff --git a/charts/actions-runner-controller/Chart.yaml b/charts/actions-runner-controller/Chart.yaml index 4a6a2b7c..f24931b4 100644 --- a/charts/actions-runner-controller/Chart.yaml +++ b/charts/actions-runner-controller/Chart.yaml @@ -15,10 +15,10 @@ type: application # This is the chart version. This version number should be incremented each time you make changes # to the chart and its templates, including the app version. # Versions are expected to follow Semantic Versioning (https://semver.org/) -version: 0.13.0 +version: 0.13.1 # Used as the default manager tag value when no tag property is provided in the values.yaml -appVersion: 0.20.0 +appVersion: 0.20.1 home: https://github.com/actions-runner-controller/actions-runner-controller diff --git a/pkg/githubwebhookdeliveryforwarder/cmd/main.go b/pkg/githubwebhookdeliveryforwarder/cmd/main.go new file mode 100644 index 00000000..fbb0c9c7 --- /dev/null +++ b/pkg/githubwebhookdeliveryforwarder/cmd/main.go @@ -0,0 +1,136 @@ +package main + +import ( + "context" + "errors" + "flag" + "fmt" + "net/http" + "os" + "os/signal" + "sync" + "syscall" + + "github.com/actions-runner-controller/actions-runner-controller/github" + "github.com/actions-runner-controller/actions-runner-controller/pkg/githubwebhookdeliveryforwarder" + "github.com/kelseyhightower/envconfig" +) + +func main() { + var ( + metricsAddr string + target string + repo string + ) + + var c github.Config + + if err := envconfig.Process("github", &c); err != nil { + fmt.Fprintln(os.Stderr, "Error: Environment variable read failed.") + } + + flag.StringVar(&metricsAddr, "metrics-addr", ":8000", "The address the metric endpoint binds to.") + flag.StringVar(&repo, "repo", "", "The owner/name of the repository that has the target hook. If specified, the forwarder will use the first hook configured on the repository as the source.") + flag.StringVar(&target, "target", "", "The URL of the forwarding target that receives all the forwarded webhooks.") + flag.StringVar(&c.Token, "github-token", c.Token, "The personal access token of GitHub.") + flag.Int64Var(&c.AppID, "github-app-id", c.AppID, "The application ID of GitHub App.") + flag.Int64Var(&c.AppInstallationID, "github-app-installation-id", c.AppInstallationID, "The installation ID of GitHub App.") + flag.StringVar(&c.AppPrivateKey, "github-app-private-key", c.AppPrivateKey, "The path of a private key file to authenticate as a GitHub App") + flag.Parse() + + ghClient, err := c.NewClient() + if err != nil { + fmt.Fprintln(os.Stderr, "Error: Client creation failed.", err) + os.Exit(1) + } + + var wg sync.WaitGroup + + ctx, cancel := context.WithCancel(context.Background()) + + fwd := githubwebhookdeliveryforwarder.New(ghClient, target) + fwd.Repo = repo + + mux := http.NewServeMux() + mux.HandleFunc("/readyz", fwd.HandleReadyz) + + srv := http.Server{ + Addr: metricsAddr, + Handler: mux, + } + + wg.Add(1) + go func() { + defer cancel() + defer wg.Done() + + if err := fwd.Run(ctx); err != nil { + fmt.Fprintf(os.Stderr, "problem running forwarder: %v\n", err) + } + }() + + wg.Add(1) + go func() { + defer cancel() + defer wg.Done() + + go func() { + <-ctx.Done() + + srv.Shutdown(context.Background()) + }() + + if err := srv.ListenAndServe(); err != nil { + if !errors.Is(err, http.ErrServerClosed) { + fmt.Fprintf(os.Stderr, "problem running http server: %v\n", err) + } + } + }() + + go func() { + <-SetupSignalHandler().Done() + cancel() + }() + + wg.Wait() +} + +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +var onlyOneSignalHandler = make(chan struct{}) + +var shutdownSignals = []os.Signal{os.Interrupt, syscall.SIGTERM} + +// SetupSignalHandler registers for SIGTERM and SIGINT. A stop channel is returned +// which is closed on one of these signals. If a second signal is caught, the program +// is terminated with exit code 1. +func SetupSignalHandler() context.Context { + close(onlyOneSignalHandler) // panics when called twice + + ctx, cancel := context.WithCancel(context.Background()) + + c := make(chan os.Signal, 2) + signal.Notify(c, shutdownSignals...) + go func() { + <-c + cancel() + <-c + os.Exit(1) // second signal. Exit directly. + }() + + return ctx +} diff --git a/pkg/githubwebhookdeliveryforwarder/githubwebhookdelivery.go b/pkg/githubwebhookdeliveryforwarder/githubwebhookdelivery.go new file mode 100644 index 00000000..1e7abea8 --- /dev/null +++ b/pkg/githubwebhookdeliveryforwarder/githubwebhookdelivery.go @@ -0,0 +1,202 @@ +package githubwebhookdeliveryforwarder + +import ( + "bytes" + "context" + "fmt" + "net/http" + "os" + "sort" + "strings" + "time" + + "github.com/actions-runner-controller/actions-runner-controller/github" + gogithub "github.com/google/go-github/v36/github" +) + +type server struct { + target string + Repo string + client *github.Client +} + +func New(client *github.Client, target string) *server { + var srv server + + srv.target = target + srv.client = client + + return &srv +} + +func (s *server) Run(ctx context.Context) error { + segments := strings.Split(s.Repo, "/") + + if len(segments) != 2 { + return fmt.Errorf("repository must be in a form of OWNER/REPO: got %q", s.Repo) + } + + owner, repo := segments[0], segments[1] + + hooks, _, err := s.client.Repositories.ListHooks(ctx, owner, repo, nil) + if err != nil { + s.Errorf("Failed listing hooks: %v", err) + + return err + } + + var hook *gogithub.Hook + + for i := range hooks { + hook = hooks[i] + break + } + + cur := &cursor{} + + cur.deliveredAt = time.Now() + + for { + var ( + err error + payloads [][]byte + ) + + payloads, cur, err = s.getUnprocessedDeliveries(ctx, owner, repo, hook.GetID(), *cur) + if err != nil { + s.Errorf("failed getting unprocessed deliveries: %v", err) + } + + for _, p := range payloads { + if _, err := http.Post(s.target, "application/json", bytes.NewReader(p)); err != nil { + s.Errorf("failed forwarding delivery: %v", err) + } + } + + time.Sleep(10 * time.Second) + } +} + +type cursor struct { + deliveredAt time.Time + id int64 +} + +func (s *server) getUnprocessedDeliveries(ctx context.Context, owner, repo string, hookID int64, pos cursor) ([][]byte, *cursor, error) { + var ( + opts gogithub.ListCursorOptions + ) + + opts.PerPage = 2 + + var deliveries []*gogithub.HookDelivery + +OUTER: + for { + ds, resp, err := s.client.Repositories.ListHookDeliveries(ctx, owner, repo, hookID, &opts) + if err != nil { + return nil, nil, err + } + + opts.Cursor = resp.Cursor + + for _, d := range ds { + d, _, err := s.client.Repositories.GetHookDelivery(ctx, owner, repo, hookID, d.GetID()) + if err != nil { + return nil, nil, err + } + + payload, err := d.ParseRequestPayload() + if err != nil { + return nil, nil, err + } + + id := d.GetID() + deliveredAt := d.GetDeliveredAt() + + if !pos.deliveredAt.IsZero() && deliveredAt.Before(pos.deliveredAt) { + s.Logf("%s is before %s so skipping all the remaining deliveries", deliveredAt, pos.deliveredAt) + break OUTER + } + + if pos.id != 0 && id <= pos.id { + break OUTER + } + + s.Logf("Received %T at %s: %v", payload, deliveredAt, payload) + + if deliveredAt.After(pos.deliveredAt) { + pos.deliveredAt = deliveredAt + } + + if id > pos.id { + pos.id = id + } + } + + if opts.Cursor == "" { + break + } + + time.Sleep(1 * time.Second) + } + + sort.Slice(deliveries, func(a, b int) bool { + return deliveries[b].GetDeliveredAt().After(deliveries[a].GetDeliveredAt()) + }) + + var payloads [][]byte + + for _, d := range deliveries { + payloads = append(payloads, *d.Request.RawPayload) + } + + return payloads, &pos, nil +} + +func (s *server) HandleReadyz(w http.ResponseWriter, r *http.Request) { + var ( + ok bool + + err error + ) + + defer func() { + if !ok { + w.WriteHeader(http.StatusInternalServerError) + + if err != nil { + msg := err.Error() + if _, err := w.Write([]byte(msg)); err != nil { + s.Errorf("failed writing http error response: %v", err) + } + } + } + }() + + defer func() { + if r.Body != nil { + r.Body.Close() + } + }() + + // respond ok to GET / e.g. for health check + if r.Method == http.MethodGet { + fmt.Fprintln(w, "webhook server is running") + return + } + + w.WriteHeader(http.StatusOK) + + if _, err := w.Write([]byte("ok")); err != nil { + s.Errorf("failed writing http response: %v", err) + } +} + +func (s *server) Logf(format string, args ...interface{}) { + fmt.Fprintf(os.Stdout, format+"\n", args...) +} + +func (s *server) Errorf(format string, args ...interface{}) { + fmt.Fprintf(os.Stderr, format+"\n", args...) +}