Add pagination support for listing VM events (#386)

* Add pagination support for listing VM events

Introduced a paginated event listing API, added support for pagination parameters in the request, and included cursor-based navigation using headers. Relevant tests and Badger store implementations were updated to support the new logic.

* Remove support for ordering VM events

Dropped `ListOrder` type, `order` query parameter, and related logic for ordering VM events. Updated tests, API schema, and Badger store to reflect the removal.

* Remove invalid VM events ordering test

Deleted a test case for invalid VM events ordering since the `order` query parameter and related functionality have been removed.

* Add support for ordering VM events

Implemented `order` query parameter for specifying sort order (ascending/descending) of VM events. Updated API schema, Badger store, and added related tests.

* Add support for limiting and ordering VM logs

Introduced `--limit` and `--order` flags for controlling the number of log lines and their sort order (ascending/descending). Updated API client to handle new options.

* Update internal/controller/store/badger/badger_events.go

Co-authored-by: Nikolay Edigaryev <edigaryev@gmail.com>

* fix: address PR review feedback

- switch logs CLI to --tail with desc ordering
- reuse ParseLogsOrder in controller with helpful errors
- always use ListEventsPage and scope event cursors
- move events pagination coverage to integration test

🤖 Generated with [Codex](https://chatgpt.com/codex)

Co-Authored-By: Codex <codex@openai.com>

* refactor: simplify prefix trimming and improve error formatting

- Replaced manual prefix check with `bytes.TrimPrefix` in Badger store.
- Enhanced error message formatting in VM logs controller.

* fix: address PR review feedback

- use suggested reverse seek in badger events pagination
- add events pagination client helper and use it in integration test

🤖 Generated with [Codex](https://chatgpt.com/codex)

Co-Authored-By: Codex <codex@openai.com>

---------

Co-authored-by: Nikolay Edigaryev <edigaryev@gmail.com>
Co-authored-by: Codex <codex@openai.com>
This commit is contained in:
Fedor Korotkov 2026-01-22 15:22:53 +01:00 committed by GitHub
parent ea89d01760
commit 6fe523ef69
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 470 additions and 24 deletions

View File

@ -326,9 +326,34 @@ paths:
summary: "Retrieve events for a given VM"
tags:
- vms
parameters:
- in: query
name: limit
description: Maximum number of events to return.
schema:
type: integer
minimum: 1
- in: query
name: order
description: Sort order of events; asc (default) or desc.
schema:
type: string
enum:
- asc
- desc
- in: query
name: cursor
description: Opaque cursor from the X-Next-Cursor response header.
schema:
type: string
responses:
'200':
description: OK
headers:
X-Next-Cursor:
description: Opaque cursor for the next page of events, if any.
schema:
type: string
content:
application/json:
schema:

View File

@ -2,10 +2,13 @@ package logs
import (
"fmt"
"github.com/cirruslabs/orchard/pkg/client"
"github.com/spf13/cobra"
)
var logTail int
func newLogsVMCommand() *cobra.Command {
command := &cobra.Command{
Use: "vm NAME",
@ -14,18 +17,26 @@ func newLogsVMCommand() *cobra.Command {
Args: cobra.ExactArgs(1),
}
command.Flags().IntVar(&logTail, "tail", 0, "Number of log lines to show from the end (newest first)")
return command
}
func runLogsVM(cmd *cobra.Command, args []string) error {
name := args[0]
client, err := client.New()
apiClient, err := client.New()
if err != nil {
return err
}
lines, err := client.VMs().Logs(cmd.Context(), name)
options := client.LogsOptions{}
if logTail > 0 {
options.Limit = logTail
options.Order = client.LogsOrderDesc
}
lines, err := apiClient.VMs().LogsWithOptions(cmd.Context(), name, options)
if err != nil {
return err
}

View File

@ -1,9 +1,11 @@
package controller
import (
"encoding/base64"
"encoding/json"
"errors"
"net/http"
"strconv"
"time"
"github.com/cirruslabs/orchard/internal/controller/lifecycle"
@ -11,6 +13,7 @@ import (
"github.com/cirruslabs/orchard/internal/responder"
"github.com/cirruslabs/orchard/internal/simplename"
"github.com/cirruslabs/orchard/internal/worker/ondiskname"
"github.com/cirruslabs/orchard/pkg/client"
"github.com/cirruslabs/orchard/pkg/resource/v1"
"github.com/gin-gonic/gin"
"github.com/google/go-cmp/cmp"
@ -364,21 +367,87 @@ func (controller *Controller) listVMEvents(ctx *gin.Context) responder.Responder
}
name := ctx.Param("name")
options, parseResponder := parseListVMEventsOptions(ctx)
if parseResponder != nil {
return parseResponder
}
return controller.storeView(func(txn storepkg.Transaction) responder.Responder {
vm, err := txn.GetVM(name)
if err != nil {
return responder.Error(err)
}
events, err := txn.ListEvents("vms", vm.UID)
page, err := txn.ListEventsPage(options, "vms", vm.UID)
if err != nil {
return responder.Error(err)
}
if len(page.NextCursor) != 0 {
ctx.Header("X-Next-Cursor", encodeEventCursor(page.NextCursor))
}
return responder.JSON(http.StatusOK, events)
return responder.JSON(http.StatusOK, page.Items)
})
}
func parseListVMEventsOptions(ctx *gin.Context) (storepkg.ListOptions, responder.Responder) {
var options storepkg.ListOptions
limitRaw := ctx.Query("limit")
orderRaw := ctx.Query("order")
cursorRaw := ctx.Query("cursor")
if limitRaw != "" {
limit, ok := parsePositiveInt(limitRaw)
if !ok {
return options, responder.JSON(http.StatusBadRequest,
NewErrorResponse("invalid limit %q: expected positive integer", limitRaw))
}
options.Limit = limit
}
if orderRaw != "" {
order, err := client.ParseLogsOrder(orderRaw)
if err != nil {
return options, responder.JSON(http.StatusBadRequest, NewErrorResponse("%s", err))
}
options.Order = storepkg.ListOrder(order)
}
if cursorRaw != "" {
cursor, err := decodeEventCursor(cursorRaw)
if err != nil {
return options, responder.JSON(http.StatusBadRequest,
NewErrorResponse("invalid cursor %q", cursorRaw))
}
options.Cursor = cursor
}
return options, nil
}
func parsePositiveInt(raw string) (int, bool) {
value, err := strconv.ParseInt(raw, 10, 0)
if err != nil || value <= 0 {
return 0, false
}
return int(value), true
}
func encodeEventCursor(cursor []byte) string {
return base64.RawURLEncoding.EncodeToString(cursor)
}
func decodeEventCursor(cursorRaw string) ([]byte, error) {
cursor, err := base64.RawURLEncoding.DecodeString(cursorRaw)
if err == nil {
return cursor, nil
}
return base64.URLEncoding.DecodeString(cursorRaw)
}
func (controller *Controller) validateHostDirs(hostDirs []v1.HostDir) responder.Responder {
if len(hostDirs) == 0 {
return nil

View File

@ -1,12 +1,15 @@
package badger
import (
"bytes"
"encoding/json"
"fmt"
"github.com/cirruslabs/orchard/pkg/resource/v1"
"github.com/dgraph-io/badger/v3"
"path"
"time"
storepkg "github.com/cirruslabs/orchard/internal/controller/store"
"github.com/cirruslabs/orchard/pkg/resource/v1"
"github.com/dgraph-io/badger/v3"
)
const SpaceEvents = "/events"
@ -16,6 +19,7 @@ func scopePrefix(scope []string) []byte {
keyParts = append(keyParts, scope...)
return []byte(path.Join(keyParts...))
}
func (txn *Transaction) AppendEvents(events []v1.Event, scope ...string) (err error) {
defer func() {
err = mapErr(err)
@ -46,34 +50,81 @@ func (txn *Transaction) AppendEvents(events []v1.Event, scope ...string) (err er
}
func (txn *Transaction) ListEvents(scope ...string) (_ []v1.Event, err error) {
page, err := txn.ListEventsPage(storepkg.ListOptions{}, scope...)
if err != nil {
return nil, err
}
return page.Items, nil
}
func (txn *Transaction) ListEventsPage(options storepkg.ListOptions, scope ...string) (
result storepkg.Page[v1.Event],
err error,
) {
defer func() {
err = mapErr(err)
}()
// Declare an empty, non-nil slice to
// return [] when no events are found
result := []v1.Event{}
result.Items = []v1.Event{}
it := txn.badgerTxn.NewIterator(badger.IteratorOptions{
Prefix: scopePrefix(scope),
})
prefix := scopePrefix(scope)
itOptions := badger.DefaultIteratorOptions
itOptions.Prefix = prefix
if options.Order == storepkg.ListOrderDesc {
itOptions.Reverse = true
}
it := txn.badgerTxn.NewIterator(itOptions)
defer it.Close()
for it.Rewind(); it.Valid(); it.Next() {
cursor := options.Cursor
if len(cursor) > 0 {
if !bytes.HasPrefix(cursor, prefix) {
seekKey := make([]byte, 0, len(prefix)+len(cursor))
seekKey = append(seekKey, prefix...)
seekKey = append(seekKey, cursor...)
cursor = seekKey
}
it.Seek(cursor)
if it.ValidForPrefix(prefix) && bytes.Equal(it.Item().Key(), cursor) {
it.Next()
}
} else if options.Order == storepkg.ListOrderDesc {
it.Seek(append(prefix, 0xFF))
} else {
it.Rewind()
}
for it.ValidForPrefix(prefix) {
item := it.Item()
eventBytes, err := item.ValueCopy(nil)
if err != nil {
return nil, err
return result, err
}
var event v1.Event
if err := json.Unmarshal(eventBytes, &event); err != nil {
return nil, err
return result, err
}
result = append(result, event)
result.Items = append(result.Items, event)
if options.Limit > 0 && len(result.Items) >= options.Limit {
lastKey := item.KeyCopy(nil)
it.Next()
if it.ValidForPrefix(prefix) {
lastKey = bytes.TrimPrefix(lastKey, prefix)
result.NextCursor = lastKey
}
break
}
it.Next()
}
return result, nil

View File

@ -0,0 +1,77 @@
package store_test
import (
"testing"
storepkg "github.com/cirruslabs/orchard/internal/controller/store"
"github.com/cirruslabs/orchard/internal/controller/store/badger"
"github.com/cirruslabs/orchard/pkg/resource/v1"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
)
func TestListEventsPage(t *testing.T) {
logger := zap.NewNop().Sugar()
store, err := badger.NewBadgerStore(t.TempDir(), true, logger)
require.NoError(t, err)
events := []v1.Event{
{Kind: v1.EventKindLogLine, Timestamp: 1, Payload: "one"},
{Kind: v1.EventKindLogLine, Timestamp: 2, Payload: "two"},
{Kind: v1.EventKindLogLine, Timestamp: 3, Payload: "three"},
{Kind: v1.EventKindLogLine, Timestamp: 4, Payload: "four"},
}
err = store.Update(func(txn storepkg.Transaction) error {
return txn.AppendEvents(events, "vms", "vm-uid")
})
require.NoError(t, err)
var page storepkg.Page[v1.Event]
err = store.View(func(txn storepkg.Transaction) error {
page, err = txn.ListEventsPage(storepkg.ListOptions{
Limit: 2,
}, "vms", "vm-uid")
return err
})
require.NoError(t, err)
require.Equal(t, events[:2], page.Items)
require.NotEmpty(t, page.NextCursor)
var page2 storepkg.Page[v1.Event]
err = store.View(func(txn storepkg.Transaction) error {
page2, err = txn.ListEventsPage(storepkg.ListOptions{
Limit: 2,
Cursor: page.NextCursor,
}, "vms", "vm-uid")
return err
})
require.NoError(t, err)
require.Equal(t, events[2:], page2.Items)
require.Empty(t, page2.NextCursor)
var descPage storepkg.Page[v1.Event]
err = store.View(func(txn storepkg.Transaction) error {
descPage, err = txn.ListEventsPage(storepkg.ListOptions{
Limit: 2,
Order: storepkg.ListOrderDesc,
}, "vms", "vm-uid")
return err
})
require.NoError(t, err)
require.Equal(t, []v1.Event{events[3], events[2]}, descPage.Items)
require.NotEmpty(t, descPage.NextCursor)
var descPage2 storepkg.Page[v1.Event]
err = store.View(func(txn storepkg.Transaction) error {
descPage2, err = txn.ListEventsPage(storepkg.ListOptions{
Limit: 2,
Cursor: descPage.NextCursor,
Order: storepkg.ListOrderDesc,
}, "vms", "vm-uid")
return err
})
require.NoError(t, err)
require.Equal(t, []v1.Event{events[1], events[0]}, descPage2.Items)
require.Empty(t, descPage2.NextCursor)
}

View File

@ -43,8 +43,27 @@ type Transaction interface {
AppendEvents(event []v1.Event, scope ...string) (err error)
ListEvents(scope ...string) (result []v1.Event, err error)
ListEventsPage(options ListOptions, scope ...string) (result Page[v1.Event], err error)
DeleteEvents(scope ...string) (err error)
GetClusterSettings() (*v1.ClusterSettings, error)
SetClusterSettings(clusterSettings v1.ClusterSettings) error
}
type ListOptions struct {
Limit int
Cursor []byte
Order ListOrder
}
type Page[T any] struct {
Items []T
NextCursor []byte
}
type ListOrder string
const (
ListOrderAsc ListOrder = "asc"
ListOrderDesc ListOrder = "desc"
)

View File

@ -0,0 +1,109 @@
package tests_test
import (
"bytes"
"context"
"encoding/json"
"net/http"
"net/url"
"testing"
"github.com/cirruslabs/orchard/internal/imageconstant"
"github.com/cirruslabs/orchard/internal/tests/devcontroller"
"github.com/cirruslabs/orchard/pkg/client"
v1 "github.com/cirruslabs/orchard/pkg/resource/v1"
"github.com/stretchr/testify/require"
)
func TestListVMEventsPagination(t *testing.T) {
devClient, devController, _ := devcontroller.StartIntegrationTestEnvironmentWithAdditionalOpts(t,
false, nil,
true, nil,
)
ctx := context.Background()
vm := v1.VM{
Meta: v1.Meta{Name: "test-vm"},
Image: imageconstant.DefaultMacosImage,
CPU: 1,
Memory: 1024,
Headless: true,
}
require.NoError(t, devClient.VMs().Create(ctx, &vm))
events := []v1.Event{
{Kind: v1.EventKindLogLine, Timestamp: 1, Payload: "one"},
{Kind: v1.EventKindLogLine, Timestamp: 2, Payload: "two"},
{Kind: v1.EventKindLogLine, Timestamp: 3, Payload: "three"},
{Kind: v1.EventKindLogLine, Timestamp: 4, Payload: "four"},
}
appendVMEvents(t, devController.Address(), vm.Name, events)
page, cursor := fetchVMEventsPage(t, ctx, devClient, vm.Name, client.EventsPageOptions{Limit: 2})
require.Equal(t, events[:2], page)
require.NotEmpty(t, cursor)
page2, cursor2 := fetchVMEventsPage(t, ctx, devClient, vm.Name, client.EventsPageOptions{
Limit: 2,
Cursor: cursor,
})
require.Equal(t, events[2:], page2)
require.Empty(t, cursor2)
descPage, descCursor := fetchVMEventsPage(t, ctx, devClient, vm.Name, client.EventsPageOptions{
Limit: 2,
Order: client.LogsOrderDesc,
})
require.Equal(t, []v1.Event{events[3], events[2]}, descPage)
require.NotEmpty(t, descCursor)
descPage2, descCursor2 := fetchVMEventsPage(t, ctx, devClient, vm.Name, client.EventsPageOptions{
Limit: 2,
Order: client.LogsOrderDesc,
Cursor: descCursor,
})
require.Equal(t, []v1.Event{events[1], events[0]}, descPage2)
require.Empty(t, descCursor2)
lines, err := devClient.VMs().LogsWithOptions(ctx, vm.Name, client.LogsOptions{
Limit: 2,
Order: client.LogsOrderDesc,
})
require.NoError(t, err)
require.Equal(t, []string{"four", "three"}, lines)
}
func appendVMEvents(t *testing.T, baseURL, name string, events []v1.Event) {
t.Helper()
endpoint, err := url.JoinPath(baseURL, "v1", "vms", name, "events")
require.NoError(t, err)
body, err := json.Marshal(events)
require.NoError(t, err)
req, err := http.NewRequest(http.MethodPost, endpoint, bytes.NewReader(body))
require.NoError(t, err)
req.Header.Set("Content-Type", "application/json")
resp, err := http.DefaultClient.Do(req)
require.NoError(t, err)
defer resp.Body.Close()
require.Equal(t, http.StatusOK, resp.StatusCode)
}
func fetchVMEventsPage(
t *testing.T,
ctx context.Context,
devClient *client.Client,
name string,
options client.EventsPageOptions,
) ([]v1.Event, string) {
t.Helper()
events, cursor, err := devClient.VMs().EventsPage(ctx, name, options)
require.NoError(t, err)
return events, cursor
}

View File

@ -168,20 +168,20 @@ func (client *Client) configureFromDefaultContext() error {
return nil
}
func (client *Client) request(
func (client *Client) requestWithHeaders(
ctx context.Context,
method string,
path string,
in interface{},
out interface{},
params map[string]string,
) error {
) (http.Header, error) {
var body io.Reader
if in != nil {
jsonBytes, err := json.Marshal(in)
if err != nil {
return fmt.Errorf("%w to marshal request body: %v", ErrFailed, err)
return nil, fmt.Errorf("%w to marshal request body: %v", ErrFailed, err)
}
body = bytes.NewBuffer(jsonBytes)
@ -197,21 +197,21 @@ func (client *Client) request(
request, err := http.NewRequestWithContext(ctx, method, endpointURL.String(), body)
if err != nil {
return fmt.Errorf("%w instantiate a request: %v", ErrFailed, err)
return nil, fmt.Errorf("%w instantiate a request: %v", ErrFailed, err)
}
client.modifyHeader(request.Header)
response, err := client.httpClient.Do(request)
if err != nil {
return fmt.Errorf("%w to make a request: %v", ErrFailed, err)
return nil, fmt.Errorf("%w to make a request: %v", ErrFailed, err)
}
defer func() {
_ = response.Body.Close()
}()
if response.StatusCode != http.StatusOK {
return fmt.Errorf("%w to make a request: %d %s%s",
return nil, fmt.Errorf("%w to make a request: %d %s%s",
ErrAPI, response.StatusCode, http.StatusText(response.StatusCode),
detailsFromErrorResponseBody(response.Body))
}
@ -219,15 +219,27 @@ func (client *Client) request(
if out != nil {
bodyBytes, err := io.ReadAll(response.Body)
if err != nil {
return fmt.Errorf("%w to read response body: %v", ErrAPI, err)
return nil, fmt.Errorf("%w to read response body: %v", ErrAPI, err)
}
if err := json.Unmarshal(bodyBytes, out); err != nil {
return fmt.Errorf("%w to unmarshal response body: %v", ErrAPI, err)
return nil, fmt.Errorf("%w to unmarshal response body: %v", ErrAPI, err)
}
}
return nil
return response.Header, nil
}
func (client *Client) request(
ctx context.Context,
method string,
path string,
in interface{},
out interface{},
params map[string]string,
) error {
_, err := client.requestWithHeaders(ctx, method, path, in, out, params)
return err
}
func detailsFromErrorResponseBody(body io.Reader) string {

View File

@ -7,6 +7,7 @@ import (
"net/http"
"net/url"
"strconv"
"strings"
"github.com/cirruslabs/orchard/pkg/resource/v1"
)
@ -15,6 +16,36 @@ type VMsService struct {
client *Client
}
type LogsOrder string
const (
LogsOrderAsc LogsOrder = "asc"
LogsOrderDesc LogsOrder = "desc"
)
func ParseLogsOrder(raw string) (LogsOrder, error) {
order := strings.ToLower(raw)
switch order {
case string(LogsOrderAsc):
return LogsOrderAsc, nil
case string(LogsOrderDesc):
return LogsOrderDesc, nil
default:
return "", fmt.Errorf("invalid order %q: expected asc or desc", raw)
}
}
type LogsOptions struct {
Limit int
Order LogsOrder
}
type EventsPageOptions struct {
Limit int
Order LogsOrder
Cursor string
}
func (service *VMsService) Create(ctx context.Context, vm *v1.VM) error {
err := service.client.request(ctx, http.MethodPost, "vms",
vm, nil, nil)
@ -134,9 +165,23 @@ func (service *VMsService) StreamEvents(name string) *EventStreamer {
}
func (service *VMsService) Logs(ctx context.Context, name string) (lines []string, err error) {
return service.LogsWithOptions(ctx, name, LogsOptions{})
}
func (service *VMsService) LogsWithOptions(ctx context.Context, name string, options LogsOptions) (lines []string, err error) {
var events []v1.Event
params := map[string]string{}
if options.Limit > 0 {
params["limit"] = strconv.Itoa(options.Limit)
}
if options.Order != "" {
params["order"] = string(options.Order)
}
if len(params) == 0 {
params = nil
}
err = service.client.request(ctx, http.MethodGet, fmt.Sprintf("vms/%s/events", url.PathEscape(name)),
nil, &events, nil)
nil, &events, params)
if err != nil {
return
}
@ -147,3 +192,31 @@ func (service *VMsService) Logs(ctx context.Context, name string) (lines []strin
}
return
}
func (service *VMsService) EventsPage(
ctx context.Context,
name string,
options EventsPageOptions,
) (events []v1.Event, nextCursor string, err error) {
params := map[string]string{}
if options.Limit > 0 {
params["limit"] = strconv.Itoa(options.Limit)
}
if options.Order != "" {
params["order"] = string(options.Order)
}
if options.Cursor != "" {
params["cursor"] = options.Cursor
}
if len(params) == 0 {
params = nil
}
headers, err := service.client.requestWithHeaders(ctx, http.MethodGet, fmt.Sprintf("vms/%s/events", url.PathEscape(name)),
nil, &events, params)
if err != nil {
return nil, "", err
}
return events, headers.Get("X-Next-Cursor"), nil
}