From 6fe523ef69c9f4795269c2e62cd6c932fcf2c204 Mon Sep 17 00:00:00 2001 From: Fedor Korotkov Date: Thu, 22 Jan 2026 15:22:53 +0100 Subject: [PATCH] Add pagination support for listing VM events (#386) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Add pagination support for listing VM events Introduced a paginated event listing API, added support for pagination parameters in the request, and included cursor-based navigation using headers. Relevant tests and Badger store implementations were updated to support the new logic. * Remove support for ordering VM events Dropped `ListOrder` type, `order` query parameter, and related logic for ordering VM events. Updated tests, API schema, and Badger store to reflect the removal. * Remove invalid VM events ordering test Deleted a test case for invalid VM events ordering since the `order` query parameter and related functionality have been removed. * Add support for ordering VM events Implemented `order` query parameter for specifying sort order (ascending/descending) of VM events. Updated API schema, Badger store, and added related tests. * Add support for limiting and ordering VM logs Introduced `--limit` and `--order` flags for controlling the number of log lines and their sort order (ascending/descending). Updated API client to handle new options. * Update internal/controller/store/badger/badger_events.go Co-authored-by: Nikolay Edigaryev * fix: address PR review feedback - switch logs CLI to --tail with desc ordering - reuse ParseLogsOrder in controller with helpful errors - always use ListEventsPage and scope event cursors - move events pagination coverage to integration test 🤖 Generated with [Codex](https://chatgpt.com/codex) Co-Authored-By: Codex * refactor: simplify prefix trimming and improve error formatting - Replaced manual prefix check with `bytes.TrimPrefix` in Badger store. - Enhanced error message formatting in VM logs controller. * fix: address PR review feedback - use suggested reverse seek in badger events pagination - add events pagination client helper and use it in integration test 🤖 Generated with [Codex](https://chatgpt.com/codex) Co-Authored-By: Codex --------- Co-authored-by: Nikolay Edigaryev Co-authored-by: Codex --- api/openapi.yaml | 25 ++++ internal/command/logs/vm.go | 15 ++- internal/controller/api_vms.go | 73 +++++++++++- .../controller/store/badger/badger_events.go | 71 ++++++++++-- internal/controller/store/events_test.go | 77 +++++++++++++ internal/controller/store/store.go | 19 +++ internal/tests/events_pagination_test.go | 109 ++++++++++++++++++ pkg/client/client.go | 30 +++-- pkg/client/vms.go | 75 +++++++++++- 9 files changed, 470 insertions(+), 24 deletions(-) create mode 100644 internal/controller/store/events_test.go create mode 100644 internal/tests/events_pagination_test.go diff --git a/api/openapi.yaml b/api/openapi.yaml index dc4b055..36921b1 100644 --- a/api/openapi.yaml +++ b/api/openapi.yaml @@ -326,9 +326,34 @@ paths: summary: "Retrieve events for a given VM" tags: - vms + parameters: + - in: query + name: limit + description: Maximum number of events to return. + schema: + type: integer + minimum: 1 + - in: query + name: order + description: Sort order of events; asc (default) or desc. + schema: + type: string + enum: + - asc + - desc + - in: query + name: cursor + description: Opaque cursor from the X-Next-Cursor response header. + schema: + type: string responses: '200': description: OK + headers: + X-Next-Cursor: + description: Opaque cursor for the next page of events, if any. + schema: + type: string content: application/json: schema: diff --git a/internal/command/logs/vm.go b/internal/command/logs/vm.go index 4eb666d..ba28f20 100644 --- a/internal/command/logs/vm.go +++ b/internal/command/logs/vm.go @@ -2,10 +2,13 @@ package logs import ( "fmt" + "github.com/cirruslabs/orchard/pkg/client" "github.com/spf13/cobra" ) +var logTail int + func newLogsVMCommand() *cobra.Command { command := &cobra.Command{ Use: "vm NAME", @@ -14,18 +17,26 @@ func newLogsVMCommand() *cobra.Command { Args: cobra.ExactArgs(1), } + command.Flags().IntVar(&logTail, "tail", 0, "Number of log lines to show from the end (newest first)") + return command } func runLogsVM(cmd *cobra.Command, args []string) error { name := args[0] - client, err := client.New() + apiClient, err := client.New() if err != nil { return err } - lines, err := client.VMs().Logs(cmd.Context(), name) + options := client.LogsOptions{} + if logTail > 0 { + options.Limit = logTail + options.Order = client.LogsOrderDesc + } + + lines, err := apiClient.VMs().LogsWithOptions(cmd.Context(), name, options) if err != nil { return err } diff --git a/internal/controller/api_vms.go b/internal/controller/api_vms.go index fbfde28..c13a6c6 100644 --- a/internal/controller/api_vms.go +++ b/internal/controller/api_vms.go @@ -1,9 +1,11 @@ package controller import ( + "encoding/base64" "encoding/json" "errors" "net/http" + "strconv" "time" "github.com/cirruslabs/orchard/internal/controller/lifecycle" @@ -11,6 +13,7 @@ import ( "github.com/cirruslabs/orchard/internal/responder" "github.com/cirruslabs/orchard/internal/simplename" "github.com/cirruslabs/orchard/internal/worker/ondiskname" + "github.com/cirruslabs/orchard/pkg/client" "github.com/cirruslabs/orchard/pkg/resource/v1" "github.com/gin-gonic/gin" "github.com/google/go-cmp/cmp" @@ -364,21 +367,87 @@ func (controller *Controller) listVMEvents(ctx *gin.Context) responder.Responder } name := ctx.Param("name") + options, parseResponder := parseListVMEventsOptions(ctx) + if parseResponder != nil { + return parseResponder + } return controller.storeView(func(txn storepkg.Transaction) responder.Responder { vm, err := txn.GetVM(name) if err != nil { return responder.Error(err) } - events, err := txn.ListEvents("vms", vm.UID) + + page, err := txn.ListEventsPage(options, "vms", vm.UID) if err != nil { return responder.Error(err) } + if len(page.NextCursor) != 0 { + ctx.Header("X-Next-Cursor", encodeEventCursor(page.NextCursor)) + } - return responder.JSON(http.StatusOK, events) + return responder.JSON(http.StatusOK, page.Items) }) } +func parseListVMEventsOptions(ctx *gin.Context) (storepkg.ListOptions, responder.Responder) { + var options storepkg.ListOptions + + limitRaw := ctx.Query("limit") + orderRaw := ctx.Query("order") + cursorRaw := ctx.Query("cursor") + + if limitRaw != "" { + limit, ok := parsePositiveInt(limitRaw) + if !ok { + return options, responder.JSON(http.StatusBadRequest, + NewErrorResponse("invalid limit %q: expected positive integer", limitRaw)) + } + options.Limit = limit + } + + if orderRaw != "" { + order, err := client.ParseLogsOrder(orderRaw) + if err != nil { + return options, responder.JSON(http.StatusBadRequest, NewErrorResponse("%s", err)) + } + options.Order = storepkg.ListOrder(order) + } + + if cursorRaw != "" { + cursor, err := decodeEventCursor(cursorRaw) + if err != nil { + return options, responder.JSON(http.StatusBadRequest, + NewErrorResponse("invalid cursor %q", cursorRaw)) + } + options.Cursor = cursor + } + + return options, nil +} + +func parsePositiveInt(raw string) (int, bool) { + value, err := strconv.ParseInt(raw, 10, 0) + if err != nil || value <= 0 { + return 0, false + } + + return int(value), true +} + +func encodeEventCursor(cursor []byte) string { + return base64.RawURLEncoding.EncodeToString(cursor) +} + +func decodeEventCursor(cursorRaw string) ([]byte, error) { + cursor, err := base64.RawURLEncoding.DecodeString(cursorRaw) + if err == nil { + return cursor, nil + } + + return base64.URLEncoding.DecodeString(cursorRaw) +} + func (controller *Controller) validateHostDirs(hostDirs []v1.HostDir) responder.Responder { if len(hostDirs) == 0 { return nil diff --git a/internal/controller/store/badger/badger_events.go b/internal/controller/store/badger/badger_events.go index e4f9e74..9d86717 100644 --- a/internal/controller/store/badger/badger_events.go +++ b/internal/controller/store/badger/badger_events.go @@ -1,12 +1,15 @@ package badger import ( + "bytes" "encoding/json" "fmt" - "github.com/cirruslabs/orchard/pkg/resource/v1" - "github.com/dgraph-io/badger/v3" "path" "time" + + storepkg "github.com/cirruslabs/orchard/internal/controller/store" + "github.com/cirruslabs/orchard/pkg/resource/v1" + "github.com/dgraph-io/badger/v3" ) const SpaceEvents = "/events" @@ -16,6 +19,7 @@ func scopePrefix(scope []string) []byte { keyParts = append(keyParts, scope...) return []byte(path.Join(keyParts...)) } + func (txn *Transaction) AppendEvents(events []v1.Event, scope ...string) (err error) { defer func() { err = mapErr(err) @@ -46,34 +50,81 @@ func (txn *Transaction) AppendEvents(events []v1.Event, scope ...string) (err er } func (txn *Transaction) ListEvents(scope ...string) (_ []v1.Event, err error) { + page, err := txn.ListEventsPage(storepkg.ListOptions{}, scope...) + if err != nil { + return nil, err + } + + return page.Items, nil +} + +func (txn *Transaction) ListEventsPage(options storepkg.ListOptions, scope ...string) ( + result storepkg.Page[v1.Event], + err error, +) { defer func() { err = mapErr(err) }() // Declare an empty, non-nil slice to // return [] when no events are found - result := []v1.Event{} + result.Items = []v1.Event{} - it := txn.badgerTxn.NewIterator(badger.IteratorOptions{ - Prefix: scopePrefix(scope), - }) + prefix := scopePrefix(scope) + itOptions := badger.DefaultIteratorOptions + itOptions.Prefix = prefix + if options.Order == storepkg.ListOrderDesc { + itOptions.Reverse = true + } + + it := txn.badgerTxn.NewIterator(itOptions) defer it.Close() - for it.Rewind(); it.Valid(); it.Next() { + cursor := options.Cursor + if len(cursor) > 0 { + if !bytes.HasPrefix(cursor, prefix) { + seekKey := make([]byte, 0, len(prefix)+len(cursor)) + seekKey = append(seekKey, prefix...) + seekKey = append(seekKey, cursor...) + cursor = seekKey + } + it.Seek(cursor) + if it.ValidForPrefix(prefix) && bytes.Equal(it.Item().Key(), cursor) { + it.Next() + } + } else if options.Order == storepkg.ListOrderDesc { + it.Seek(append(prefix, 0xFF)) + } else { + it.Rewind() + } + + for it.ValidForPrefix(prefix) { item := it.Item() eventBytes, err := item.ValueCopy(nil) if err != nil { - return nil, err + return result, err } var event v1.Event if err := json.Unmarshal(eventBytes, &event); err != nil { - return nil, err + return result, err } - result = append(result, event) + result.Items = append(result.Items, event) + + if options.Limit > 0 && len(result.Items) >= options.Limit { + lastKey := item.KeyCopy(nil) + it.Next() + if it.ValidForPrefix(prefix) { + lastKey = bytes.TrimPrefix(lastKey, prefix) + result.NextCursor = lastKey + } + break + } + + it.Next() } return result, nil diff --git a/internal/controller/store/events_test.go b/internal/controller/store/events_test.go new file mode 100644 index 0000000..feab1a4 --- /dev/null +++ b/internal/controller/store/events_test.go @@ -0,0 +1,77 @@ +package store_test + +import ( + "testing" + + storepkg "github.com/cirruslabs/orchard/internal/controller/store" + "github.com/cirruslabs/orchard/internal/controller/store/badger" + "github.com/cirruslabs/orchard/pkg/resource/v1" + "github.com/stretchr/testify/require" + "go.uber.org/zap" +) + +func TestListEventsPage(t *testing.T) { + logger := zap.NewNop().Sugar() + store, err := badger.NewBadgerStore(t.TempDir(), true, logger) + require.NoError(t, err) + + events := []v1.Event{ + {Kind: v1.EventKindLogLine, Timestamp: 1, Payload: "one"}, + {Kind: v1.EventKindLogLine, Timestamp: 2, Payload: "two"}, + {Kind: v1.EventKindLogLine, Timestamp: 3, Payload: "three"}, + {Kind: v1.EventKindLogLine, Timestamp: 4, Payload: "four"}, + } + + err = store.Update(func(txn storepkg.Transaction) error { + return txn.AppendEvents(events, "vms", "vm-uid") + }) + require.NoError(t, err) + + var page storepkg.Page[v1.Event] + err = store.View(func(txn storepkg.Transaction) error { + page, err = txn.ListEventsPage(storepkg.ListOptions{ + Limit: 2, + }, "vms", "vm-uid") + return err + }) + require.NoError(t, err) + require.Equal(t, events[:2], page.Items) + require.NotEmpty(t, page.NextCursor) + + var page2 storepkg.Page[v1.Event] + err = store.View(func(txn storepkg.Transaction) error { + page2, err = txn.ListEventsPage(storepkg.ListOptions{ + Limit: 2, + Cursor: page.NextCursor, + }, "vms", "vm-uid") + return err + }) + require.NoError(t, err) + require.Equal(t, events[2:], page2.Items) + require.Empty(t, page2.NextCursor) + + var descPage storepkg.Page[v1.Event] + err = store.View(func(txn storepkg.Transaction) error { + descPage, err = txn.ListEventsPage(storepkg.ListOptions{ + Limit: 2, + Order: storepkg.ListOrderDesc, + }, "vms", "vm-uid") + return err + }) + require.NoError(t, err) + require.Equal(t, []v1.Event{events[3], events[2]}, descPage.Items) + require.NotEmpty(t, descPage.NextCursor) + + var descPage2 storepkg.Page[v1.Event] + err = store.View(func(txn storepkg.Transaction) error { + descPage2, err = txn.ListEventsPage(storepkg.ListOptions{ + Limit: 2, + Cursor: descPage.NextCursor, + Order: storepkg.ListOrderDesc, + }, "vms", "vm-uid") + return err + }) + require.NoError(t, err) + require.Equal(t, []v1.Event{events[1], events[0]}, descPage2.Items) + require.Empty(t, descPage2.NextCursor) +} diff --git a/internal/controller/store/store.go b/internal/controller/store/store.go index c62f6e2..2803164 100644 --- a/internal/controller/store/store.go +++ b/internal/controller/store/store.go @@ -43,8 +43,27 @@ type Transaction interface { AppendEvents(event []v1.Event, scope ...string) (err error) ListEvents(scope ...string) (result []v1.Event, err error) + ListEventsPage(options ListOptions, scope ...string) (result Page[v1.Event], err error) DeleteEvents(scope ...string) (err error) GetClusterSettings() (*v1.ClusterSettings, error) SetClusterSettings(clusterSettings v1.ClusterSettings) error } + +type ListOptions struct { + Limit int + Cursor []byte + Order ListOrder +} + +type Page[T any] struct { + Items []T + NextCursor []byte +} + +type ListOrder string + +const ( + ListOrderAsc ListOrder = "asc" + ListOrderDesc ListOrder = "desc" +) diff --git a/internal/tests/events_pagination_test.go b/internal/tests/events_pagination_test.go new file mode 100644 index 0000000..46cd8c4 --- /dev/null +++ b/internal/tests/events_pagination_test.go @@ -0,0 +1,109 @@ +package tests_test + +import ( + "bytes" + "context" + "encoding/json" + "net/http" + "net/url" + "testing" + + "github.com/cirruslabs/orchard/internal/imageconstant" + "github.com/cirruslabs/orchard/internal/tests/devcontroller" + "github.com/cirruslabs/orchard/pkg/client" + v1 "github.com/cirruslabs/orchard/pkg/resource/v1" + "github.com/stretchr/testify/require" +) + +func TestListVMEventsPagination(t *testing.T) { + devClient, devController, _ := devcontroller.StartIntegrationTestEnvironmentWithAdditionalOpts(t, + false, nil, + true, nil, + ) + + ctx := context.Background() + vm := v1.VM{ + Meta: v1.Meta{Name: "test-vm"}, + Image: imageconstant.DefaultMacosImage, + CPU: 1, + Memory: 1024, + Headless: true, + } + require.NoError(t, devClient.VMs().Create(ctx, &vm)) + + events := []v1.Event{ + {Kind: v1.EventKindLogLine, Timestamp: 1, Payload: "one"}, + {Kind: v1.EventKindLogLine, Timestamp: 2, Payload: "two"}, + {Kind: v1.EventKindLogLine, Timestamp: 3, Payload: "three"}, + {Kind: v1.EventKindLogLine, Timestamp: 4, Payload: "four"}, + } + appendVMEvents(t, devController.Address(), vm.Name, events) + + page, cursor := fetchVMEventsPage(t, ctx, devClient, vm.Name, client.EventsPageOptions{Limit: 2}) + require.Equal(t, events[:2], page) + require.NotEmpty(t, cursor) + + page2, cursor2 := fetchVMEventsPage(t, ctx, devClient, vm.Name, client.EventsPageOptions{ + Limit: 2, + Cursor: cursor, + }) + require.Equal(t, events[2:], page2) + require.Empty(t, cursor2) + + descPage, descCursor := fetchVMEventsPage(t, ctx, devClient, vm.Name, client.EventsPageOptions{ + Limit: 2, + Order: client.LogsOrderDesc, + }) + require.Equal(t, []v1.Event{events[3], events[2]}, descPage) + require.NotEmpty(t, descCursor) + + descPage2, descCursor2 := fetchVMEventsPage(t, ctx, devClient, vm.Name, client.EventsPageOptions{ + Limit: 2, + Order: client.LogsOrderDesc, + Cursor: descCursor, + }) + require.Equal(t, []v1.Event{events[1], events[0]}, descPage2) + require.Empty(t, descCursor2) + + lines, err := devClient.VMs().LogsWithOptions(ctx, vm.Name, client.LogsOptions{ + Limit: 2, + Order: client.LogsOrderDesc, + }) + require.NoError(t, err) + require.Equal(t, []string{"four", "three"}, lines) +} + +func appendVMEvents(t *testing.T, baseURL, name string, events []v1.Event) { + t.Helper() + + endpoint, err := url.JoinPath(baseURL, "v1", "vms", name, "events") + require.NoError(t, err) + + body, err := json.Marshal(events) + require.NoError(t, err) + + req, err := http.NewRequest(http.MethodPost, endpoint, bytes.NewReader(body)) + require.NoError(t, err) + req.Header.Set("Content-Type", "application/json") + + resp, err := http.DefaultClient.Do(req) + require.NoError(t, err) + defer resp.Body.Close() + + require.Equal(t, http.StatusOK, resp.StatusCode) +} + +func fetchVMEventsPage( + t *testing.T, + ctx context.Context, + devClient *client.Client, + name string, + options client.EventsPageOptions, +) ([]v1.Event, string) { + t.Helper() + + events, cursor, err := devClient.VMs().EventsPage(ctx, name, options) + require.NoError(t, err) + + return events, cursor +} diff --git a/pkg/client/client.go b/pkg/client/client.go index f7f6315..a5f846b 100644 --- a/pkg/client/client.go +++ b/pkg/client/client.go @@ -168,20 +168,20 @@ func (client *Client) configureFromDefaultContext() error { return nil } -func (client *Client) request( +func (client *Client) requestWithHeaders( ctx context.Context, method string, path string, in interface{}, out interface{}, params map[string]string, -) error { +) (http.Header, error) { var body io.Reader if in != nil { jsonBytes, err := json.Marshal(in) if err != nil { - return fmt.Errorf("%w to marshal request body: %v", ErrFailed, err) + return nil, fmt.Errorf("%w to marshal request body: %v", ErrFailed, err) } body = bytes.NewBuffer(jsonBytes) @@ -197,21 +197,21 @@ func (client *Client) request( request, err := http.NewRequestWithContext(ctx, method, endpointURL.String(), body) if err != nil { - return fmt.Errorf("%w instantiate a request: %v", ErrFailed, err) + return nil, fmt.Errorf("%w instantiate a request: %v", ErrFailed, err) } client.modifyHeader(request.Header) response, err := client.httpClient.Do(request) if err != nil { - return fmt.Errorf("%w to make a request: %v", ErrFailed, err) + return nil, fmt.Errorf("%w to make a request: %v", ErrFailed, err) } defer func() { _ = response.Body.Close() }() if response.StatusCode != http.StatusOK { - return fmt.Errorf("%w to make a request: %d %s%s", + return nil, fmt.Errorf("%w to make a request: %d %s%s", ErrAPI, response.StatusCode, http.StatusText(response.StatusCode), detailsFromErrorResponseBody(response.Body)) } @@ -219,15 +219,27 @@ func (client *Client) request( if out != nil { bodyBytes, err := io.ReadAll(response.Body) if err != nil { - return fmt.Errorf("%w to read response body: %v", ErrAPI, err) + return nil, fmt.Errorf("%w to read response body: %v", ErrAPI, err) } if err := json.Unmarshal(bodyBytes, out); err != nil { - return fmt.Errorf("%w to unmarshal response body: %v", ErrAPI, err) + return nil, fmt.Errorf("%w to unmarshal response body: %v", ErrAPI, err) } } - return nil + return response.Header, nil +} + +func (client *Client) request( + ctx context.Context, + method string, + path string, + in interface{}, + out interface{}, + params map[string]string, +) error { + _, err := client.requestWithHeaders(ctx, method, path, in, out, params) + return err } func detailsFromErrorResponseBody(body io.Reader) string { diff --git a/pkg/client/vms.go b/pkg/client/vms.go index 2acaa55..bc37d14 100644 --- a/pkg/client/vms.go +++ b/pkg/client/vms.go @@ -7,6 +7,7 @@ import ( "net/http" "net/url" "strconv" + "strings" "github.com/cirruslabs/orchard/pkg/resource/v1" ) @@ -15,6 +16,36 @@ type VMsService struct { client *Client } +type LogsOrder string + +const ( + LogsOrderAsc LogsOrder = "asc" + LogsOrderDesc LogsOrder = "desc" +) + +func ParseLogsOrder(raw string) (LogsOrder, error) { + order := strings.ToLower(raw) + switch order { + case string(LogsOrderAsc): + return LogsOrderAsc, nil + case string(LogsOrderDesc): + return LogsOrderDesc, nil + default: + return "", fmt.Errorf("invalid order %q: expected asc or desc", raw) + } +} + +type LogsOptions struct { + Limit int + Order LogsOrder +} + +type EventsPageOptions struct { + Limit int + Order LogsOrder + Cursor string +} + func (service *VMsService) Create(ctx context.Context, vm *v1.VM) error { err := service.client.request(ctx, http.MethodPost, "vms", vm, nil, nil) @@ -134,9 +165,23 @@ func (service *VMsService) StreamEvents(name string) *EventStreamer { } func (service *VMsService) Logs(ctx context.Context, name string) (lines []string, err error) { + return service.LogsWithOptions(ctx, name, LogsOptions{}) +} + +func (service *VMsService) LogsWithOptions(ctx context.Context, name string, options LogsOptions) (lines []string, err error) { var events []v1.Event + params := map[string]string{} + if options.Limit > 0 { + params["limit"] = strconv.Itoa(options.Limit) + } + if options.Order != "" { + params["order"] = string(options.Order) + } + if len(params) == 0 { + params = nil + } err = service.client.request(ctx, http.MethodGet, fmt.Sprintf("vms/%s/events", url.PathEscape(name)), - nil, &events, nil) + nil, &events, params) if err != nil { return } @@ -147,3 +192,31 @@ func (service *VMsService) Logs(ctx context.Context, name string) (lines []strin } return } + +func (service *VMsService) EventsPage( + ctx context.Context, + name string, + options EventsPageOptions, +) (events []v1.Event, nextCursor string, err error) { + params := map[string]string{} + if options.Limit > 0 { + params["limit"] = strconv.Itoa(options.Limit) + } + if options.Order != "" { + params["order"] = string(options.Order) + } + if options.Cursor != "" { + params["cursor"] = options.Cursor + } + if len(params) == 0 { + params = nil + } + + headers, err := service.client.requestWithHeaders(ctx, http.MethodGet, fmt.Sprintf("vms/%s/events", url.PathEscape(name)), + nil, &events, params) + if err != nil { + return nil, "", err + } + + return events, headers.Get("X-Next-Cursor"), nil +}