orchard/internal/controller/api_vms.go

546 lines
15 KiB
Go

package controller
import (
"encoding/base64"
"encoding/json"
"errors"
"net/http"
"strconv"
"strings"
"time"
"github.com/cirruslabs/orchard/internal/controller/lifecycle"
storepkg "github.com/cirruslabs/orchard/internal/controller/store"
"github.com/cirruslabs/orchard/internal/responder"
"github.com/cirruslabs/orchard/internal/simplename"
"github.com/cirruslabs/orchard/internal/worker/ondiskname"
"github.com/cirruslabs/orchard/pkg/client"
"github.com/cirruslabs/orchard/pkg/resource/v1"
"github.com/gin-gonic/gin"
"github.com/google/go-cmp/cmp"
"github.com/google/uuid"
"github.com/samber/lo"
)
func (controller *Controller) createVM(ctx *gin.Context) responder.Responder {
if responder := controller.authorize(ctx, v1.ServiceAccountRoleComputeWrite); responder != nil {
return responder
}
var vm v1.VM
if err := ctx.ShouldBindJSON(&vm); err != nil {
return responder.JSON(http.StatusBadRequest, NewErrorResponse("invalid JSON was provided"))
}
if vm.Name == "" {
return responder.JSON(http.StatusPreconditionFailed, NewErrorResponse("VM name is empty"))
} else if err := simplename.Validate(vm.Name); err != nil {
return responder.JSON(http.StatusPreconditionFailed,
NewErrorResponse("VM name %v", err))
}
if vm.Image == "" {
return responder.JSON(http.StatusPreconditionFailed, NewErrorResponse("VM image is empty"))
}
vm.Status = v1.VMStatusPending
vm.CreatedAt = time.Now()
vm.RestartedAt = time.Time{}
vm.RestartCount = 0
vm.UID = uuid.New().String()
vm.PowerState = v1.PowerStateRunning
vm.TartName = ondiskname.New(vm.Name, vm.UID, vm.RestartCount).String()
vm.Generation = 0
vm.ObservedGeneration = 0
vm.Conditions = []v1.Condition{
{
Type: v1.ConditionTypeScheduled,
State: v1.ConditionStateFalse,
},
}
// Softnet-specific logic: automatically enable Softnet when NetSoftnetAllow or NetSoftnetBlock are set
// and propagate deprecated and non-deprecated boolean fields into each other
if vm.NetSoftnetDeprecated || vm.NetSoftnet || len(vm.NetSoftnetAllow) != 0 || len(vm.NetSoftnetBlock) != 0 {
vm.NetSoftnetDeprecated = true
vm.NetSoftnet = true
}
// Provide resource defaults
if vm.Resources == nil {
vm.Resources = make(v1.Resources)
}
if _, ok := vm.Resources[v1.ResourceTartVMs]; !ok {
vm.Resources[v1.ResourceTartVMs] = 1
}
// Validate image pull policy and provide a default value if it's missing
if vm.ImagePullPolicy != "" {
if _, err := v1.NewImagePullPolicyFromString(string(vm.ImagePullPolicy)); err != nil {
return responder.JSON(http.StatusPreconditionFailed,
NewErrorResponse("unsupported image pull policy: %q", vm.ImagePullPolicy))
}
} else {
vm.ImagePullPolicy = v1.ImagePullPolicyIfNotPresent
}
// Validate restart policy and provide a default value if it's missing
if vm.RestartPolicy != "" {
if _, err := v1.NewRestartPolicyFromString(string(vm.RestartPolicy)); err != nil {
return responder.JSON(http.StatusPreconditionFailed,
NewErrorResponse("unsupported restart policy: %q", vm.RestartPolicy))
}
} else {
vm.RestartPolicy = v1.RestartPolicyNever
}
// Validate hostDirs
if responder := controller.validateHostDirs(vm.HostDirs); responder != nil {
return responder
}
response := controller.storeUpdate(func(txn storepkg.Transaction) responder.Responder {
// Does the VM resource with this name already exists?
_, err := txn.GetVM(vm.Name)
if err != nil && !errors.Is(err, storepkg.ErrNotFound) {
controller.logger.Errorf("failed to check if the VM exists in the DB: %v", err)
return responder.Code(http.StatusInternalServerError)
}
if err == nil {
return responder.JSON(http.StatusConflict, NewErrorResponse("VM with this name already exists"))
}
if err := txn.SetVM(vm); err != nil {
controller.logger.Errorf("failed to create VM in the DB: %v", err)
return responder.Code(http.StatusInternalServerError)
}
return responder.JSON(http.StatusOK, &vm)
})
// request immediate scheduling
controller.scheduler.RequestScheduling()
return response
}
func (controller *Controller) updateVMSpec(ctx *gin.Context) responder.Responder {
if responder := controller.authorize(ctx, v1.ServiceAccountRoleComputeWrite); responder != nil {
return responder
}
var userVM v1.VM
if err := ctx.ShouldBindJSON(&userVM); err != nil {
return responder.JSON(http.StatusBadRequest, NewErrorResponse("invalid JSON was provided"))
}
name := ctx.Param("name")
return controller.storeUpdate(func(txn storepkg.Transaction) responder.Responder {
dbVM, err := txn.GetVM(name)
if err != nil {
return responder.Error(err)
}
if dbVM.TerminalState() {
return responder.JSON(http.StatusPreconditionFailed,
NewErrorResponse("cannot update VM in a terminal state"))
}
// Softnet-specific logic: automatically enable Softnet when NetSoftnetAllow or NetSoftnetBlock are set
// and propagate deprecated and non-deprecated boolean fields into each other
if userVM.NetSoftnetDeprecated || userVM.NetSoftnet || len(userVM.NetSoftnetAllow) != 0 || len(userVM.NetSoftnetBlock) != 0 {
userVM.NetSoftnetDeprecated = true
userVM.NetSoftnet = true
}
// Suspendable-specific sanity checks
if dbVM.Suspendable && !userVM.Suspendable {
return responder.JSON(http.StatusPreconditionFailed, NewErrorResponse("\"suspendable\" cannot be "+
"toggled for suspendable VMs"))
}
if dbVM.Suspendable && dbVM.NetSoftnet != userVM.NetSoftnet {
return responder.JSON(http.StatusPreconditionFailed, NewErrorResponse("\"netSoftnet\" cannot be "+
"toggled for suspendable VMs"))
}
// Power state-specific sanity checks
if !userVM.PowerState.Valid() {
return responder.JSON(http.StatusPreconditionFailed, NewErrorResponse("invalid \"powerState\" "+
"value: %s", userVM.PowerState))
}
if dbVM.PowerState.TerminalState() {
return responder.JSON(http.StatusPreconditionFailed, NewErrorResponse("invalid \"powerState\" "+
"transition: cannot transition from a terminal power state"))
}
if !dbVM.Suspendable && userVM.PowerState == v1.PowerStateSuspended {
return responder.JSON(http.StatusPreconditionFailed, NewErrorResponse("invalid \"powerState\" "+
"transition: only suspendable VMs can be suspended"))
}
if cmp.Equal(dbVM.VMSpec, userVM.VMSpec) {
// Nothing was changed
return responder.JSON(http.StatusOK, dbVM)
}
// VM specification was changed
dbVM.VMSpec = userVM.VMSpec
dbVM.Generation++
if err := txn.SetVM(*dbVM); err != nil {
controller.logger.Errorf("failed to update VM in the DB: %v", err)
return responder.Code(http.StatusInternalServerError)
}
return responder.JSON(http.StatusOK, dbVM)
})
}
func (controller *Controller) updateVMState(ctx *gin.Context) responder.Responder {
if responder := controller.authorize(ctx, v1.ServiceAccountRoleComputeWrite); responder != nil {
return responder
}
var userVM v1.VM
if err := ctx.ShouldBindJSON(&userVM); err != nil {
return responder.JSON(http.StatusBadRequest, NewErrorResponse("invalid JSON was provided"))
}
name := ctx.Param("name")
return controller.storeUpdate(func(txn storepkg.Transaction) responder.Responder {
dbVM, err := txn.GetVM(name)
if err != nil {
return responder.Error(err)
}
if dbVM.TerminalState() && dbVM.Status != userVM.Status {
return responder.JSON(http.StatusPreconditionFailed,
NewErrorResponse("cannot update status for a VM in a terminal state"))
}
if userVM.Status == v1.VMStatusRunning && dbVM.StartedAt.IsZero() {
dbVM.StartedAt = time.Now()
}
dbVM.Status = userVM.Status
dbVM.StatusMessage = userVM.StatusMessage
dbVM.ImageFQN = userVM.ImageFQN
dbVM.VMState = userVM.VMState
if err := txn.SetVM(*dbVM); err != nil {
controller.logger.Errorf("failed to update VM in the DB: %v", err)
return responder.Code(http.StatusInternalServerError)
}
return responder.JSON(http.StatusOK, dbVM)
})
}
func (controller *Controller) getVM(ctx *gin.Context) responder.Responder {
if responder := controller.authorize(ctx, v1.ServiceAccountRoleComputeRead); responder != nil {
return responder
}
name := ctx.Param("name")
if ctx.Query("watch") == "true" {
ctx.Header("Content-Type", "application/x-ndjson")
watchCh, errCh, err := controller.store.WatchVM(ctx, name)
if err != nil {
return responder.Error(err)
}
for {
select {
case watchMessage := <-watchCh:
jsonBytes, err := json.Marshal(watchMessage)
if err != nil {
controller.logger.Errorf("failed to marshal watch message "+
"for VM %q to JSON: %v", name, err)
return responder.Empty()
}
if _, err = ctx.Writer.Write(jsonBytes); err != nil {
return responder.Empty()
}
if _, err := ctx.Writer.WriteString("\n"); err != nil {
return responder.Empty()
}
ctx.Writer.Flush()
case err := <-errCh:
controller.logger.Errorf("failed to watch VM %q in the DB: %v", name, err)
return responder.Empty()
case <-ctx.Done():
return responder.Empty()
}
}
}
return controller.storeView(func(txn storepkg.Transaction) responder.Responder {
vm, err := txn.GetVM(name)
if err != nil {
return responder.Error(err)
}
return responder.JSON(http.StatusOK, vm)
})
}
func (controller *Controller) listVMs(ctx *gin.Context) responder.Responder {
if responder := controller.authorize(ctx, v1.ServiceAccountRoleComputeRead); responder != nil {
return responder
}
var filters []v1.Filter
if filterRaw := ctx.Query("filter"); filterRaw != "" {
for _, filterRaw := range strings.Split(filterRaw, ",") {
filter, err := v1.NewFilter(filterRaw)
if err != nil {
return responder.JSON(http.StatusPreconditionFailed, NewErrorResponse("%v", err))
}
filters = append(filters, filter)
}
}
resultCh := controller.single.DoChan("list-vms", func() (interface{}, error) {
var vms []v1.VM
viewErr := controller.store.View(func(txn storepkg.Transaction) (err error) {
vms, err = txn.ListVMs()
return
})
return vms, viewErr
})
var computedVMs interface{}
var err error
select {
case <-ctx.Done():
return responder.Empty()
case result := <-resultCh:
computedVMs = result.Val
err = result.Err
}
if err != nil {
return responder.Error(err)
}
allVMs, ok := computedVMs.([]v1.VM)
if !ok {
controller.logger.Errorf("failed to compute vms: %T", computedVMs)
return responder.Code(http.StatusInternalServerError)
}
// Declare an empty, non-nil slice to
// return [] when no objects are found
vms := []v1.VM{}
Outer:
// Use index-based loop to avoid per-iteration copies of v1.VM
for i := range allVMs {
for _, filter := range filters {
if !allVMs[i].Match(filter) {
continue Outer
}
}
vms = append(vms, allVMs[i])
}
return responder.JSON(http.StatusOK, vms)
}
func (controller *Controller) deleteVM(ctx *gin.Context) responder.Responder {
if responder := controller.authorize(ctx, v1.ServiceAccountRoleComputeWrite); responder != nil {
return responder
}
name := ctx.Param("name")
return controller.storeUpdate(func(txn storepkg.Transaction) responder.Responder {
vm, err := txn.GetVM(name)
if err != nil {
return responder.Error(err)
}
err = txn.DeleteVM(name)
if err != nil {
return responder.Error(err)
}
err = txn.DeleteEvents("vms", vm.UID)
if err != nil {
return responder.Error(err)
}
lifecycle.Report(vm, "VM deleted", controller.logger)
return responder.Code(http.StatusOK)
})
}
func (controller *Controller) appendVMEvents(ctx *gin.Context) responder.Responder {
if responder := controller.authorize(ctx, v1.ServiceAccountRoleComputeWrite); responder != nil {
return responder
}
var events []v1.Event
if err := ctx.ShouldBindJSON(&events); err != nil {
return responder.JSON(http.StatusBadRequest, NewErrorResponse("invalid JSON was provided"))
}
name := ctx.Param("name")
return controller.storeUpdate(func(txn storepkg.Transaction) responder.Responder {
vm, err := txn.GetVM(name)
if err != nil {
return responder.Error(err)
}
if err := txn.AppendEvents(events, "vms", vm.UID); err != nil {
return responder.Error(err)
}
return responder.Code(http.StatusOK)
})
}
func (controller *Controller) listVMEvents(ctx *gin.Context) responder.Responder {
if responder := controller.authorize(ctx, v1.ServiceAccountRoleComputeRead); responder != nil {
return responder
}
name := ctx.Param("name")
options, parseResponder := parseListVMEventsOptions(ctx)
if parseResponder != nil {
return parseResponder
}
return controller.storeView(func(txn storepkg.Transaction) responder.Responder {
vm, err := txn.GetVM(name)
if err != nil {
return responder.Error(err)
}
page, err := txn.ListEventsPage(options, "vms", vm.UID)
if err != nil {
return responder.Error(err)
}
if len(page.NextCursor) != 0 {
ctx.Header("X-Next-Cursor", encodeEventCursor(page.NextCursor))
}
return responder.JSON(http.StatusOK, page.Items)
})
}
func parseListVMEventsOptions(ctx *gin.Context) (storepkg.ListOptions, responder.Responder) {
var options storepkg.ListOptions
limitRaw := ctx.Query("limit")
orderRaw := ctx.Query("order")
cursorRaw := ctx.Query("cursor")
if limitRaw != "" {
limit, ok := parsePositiveInt(limitRaw)
if !ok {
return options, responder.JSON(http.StatusBadRequest,
NewErrorResponse("invalid limit %q: expected positive integer", limitRaw))
}
options.Limit = limit
}
if orderRaw != "" {
order, err := client.ParseLogsOrder(orderRaw)
if err != nil {
return options, responder.JSON(http.StatusBadRequest, NewErrorResponse("%s", err))
}
options.Order = storepkg.ListOrder(order)
}
if cursorRaw != "" {
cursor, err := decodeEventCursor(cursorRaw)
if err != nil {
return options, responder.JSON(http.StatusBadRequest,
NewErrorResponse("invalid cursor %q", cursorRaw))
}
options.Cursor = cursor
}
return options, nil
}
func parsePositiveInt(raw string) (int, bool) {
value, err := strconv.ParseInt(raw, 10, 0)
if err != nil || value <= 0 {
return 0, false
}
return int(value), true
}
func encodeEventCursor(cursor []byte) string {
return base64.RawURLEncoding.EncodeToString(cursor)
}
func decodeEventCursor(cursorRaw string) ([]byte, error) {
cursor, err := base64.RawURLEncoding.DecodeString(cursorRaw)
if err == nil {
return cursor, nil
}
return base64.URLEncoding.DecodeString(cursorRaw)
}
func (controller *Controller) validateHostDirs(hostDirs []v1.HostDir) responder.Responder {
if len(hostDirs) == 0 {
return nil
}
// Retrieve cluster settings
var clusterSettings *v1.ClusterSettings
var err error
err = controller.store.View(func(txn storepkg.Transaction) error {
clusterSettings, err = txn.GetClusterSettings()
return err
})
if err != nil {
controller.logger.Errorf("failed to retrieve cluster settings from the DB: %v", err)
return responder.Code(http.StatusInternalServerError)
}
for _, hostDir := range hostDirs {
if hostDir.Name == "" {
return responder.JSON(http.StatusBadRequest,
NewErrorResponse("hostDir volume's \"name\" field cannot be empty"))
}
if hostDir.Path == "" {
return responder.JSON(http.StatusBadRequest,
NewErrorResponse("hostDir volume's \"path\" field cannot be empty"))
}
if !lo.SomeBy(clusterSettings.HostDirPolicies, func(hostDirPolicy v1.HostDirPolicy) bool {
return hostDirPolicy.Validate(hostDir.Path, hostDir.ReadOnly)
}) {
return responder.JSON(http.StatusBadRequest, NewErrorResponse("host directory %q is disallowed "+
"by policy, check your cluster settings", hostDir.String()))
}
}
return nil
}