Fix scaling back to 0 after min runners were set to number > 0 (#2742)
This commit is contained in:
parent
dcb64f0b9e
commit
336e11a4e9
|
|
@ -114,7 +114,14 @@ func createSession(ctx context.Context, logger *logr.Logger, client actions.Acti
|
||||||
return runnerScaleSetSession, initialMessage, nil
|
return runnerScaleSetSession, initialMessage, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
return runnerScaleSetSession, nil, nil
|
initialMessage := &actions.RunnerScaleSetMessage{
|
||||||
|
MessageId: 0,
|
||||||
|
MessageType: "RunnerScaleSetJobMessages",
|
||||||
|
Statistics: runnerScaleSetSession.Statistics,
|
||||||
|
Body: "",
|
||||||
|
}
|
||||||
|
|
||||||
|
return runnerScaleSetSession, initialMessage, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *AutoScalerClient) Close() error {
|
func (m *AutoScalerClient) Close() error {
|
||||||
|
|
|
||||||
|
|
@ -37,7 +37,7 @@ func TestCreateSession(t *testing.T) {
|
||||||
|
|
||||||
require.NoError(t, err, "Error creating autoscaler client")
|
require.NoError(t, err, "Error creating autoscaler client")
|
||||||
assert.Equal(t, session, session, "Session is not correct")
|
assert.Equal(t, session, session, "Session is not correct")
|
||||||
assert.Nil(t, asClient.initialMessage, "Initial message should be nil")
|
assert.NotNil(t, asClient.initialMessage, "Initial message should not be nil")
|
||||||
assert.Equal(t, int64(0), asClient.lastMessageId, "Last message id should be 0")
|
assert.Equal(t, int64(0), asClient.lastMessageId, "Last message id should be 0")
|
||||||
assert.True(t, mockActionsClient.AssertExpectations(t), "All expectations should be met")
|
assert.True(t, mockActionsClient.AssertExpectations(t), "All expectations should be met")
|
||||||
}
|
}
|
||||||
|
|
@ -188,7 +188,7 @@ func TestCreateSession_RetrySessionConflict(t *testing.T) {
|
||||||
|
|
||||||
require.NoError(t, err, "Error creating autoscaler client")
|
require.NoError(t, err, "Error creating autoscaler client")
|
||||||
assert.Equal(t, session, session, "Session is not correct")
|
assert.Equal(t, session, session, "Session is not correct")
|
||||||
assert.Nil(t, asClient.initialMessage, "Initial message should be nil")
|
assert.NotNil(t, asClient.initialMessage, "Initial message should not be nil")
|
||||||
assert.Equal(t, int64(0), asClient.lastMessageId, "Last message id should be 0")
|
assert.Equal(t, int64(0), asClient.lastMessageId, "Last message id should be 0")
|
||||||
assert.True(t, mockActionsClient.AssertExpectations(t), "All expectations should be met")
|
assert.True(t, mockActionsClient.AssertExpectations(t), "All expectations should be met")
|
||||||
}
|
}
|
||||||
|
|
@ -334,6 +334,14 @@ func TestGetRunnerScaleSetMessage(t *testing.T) {
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
|
|
||||||
|
assert.NoError(t, err, "Error getting message")
|
||||||
|
assert.Equal(t, int64(0), asClient.lastMessageId, "Initial message")
|
||||||
|
|
||||||
|
err = asClient.GetRunnerScaleSetMessage(ctx, func(msg *actions.RunnerScaleSetMessage) error {
|
||||||
|
logger.Info("Message received", "messageId", msg.MessageId, "messageType", msg.MessageType, "body", msg.Body)
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
|
||||||
assert.NoError(t, err, "Error getting message")
|
assert.NoError(t, err, "Error getting message")
|
||||||
assert.Equal(t, int64(1), asClient.lastMessageId, "Last message id should be updated")
|
assert.Equal(t, int64(1), asClient.lastMessageId, "Last message id should be updated")
|
||||||
assert.True(t, mockActionsClient.AssertExpectations(t), "All expectations should be met")
|
assert.True(t, mockActionsClient.AssertExpectations(t), "All expectations should be met")
|
||||||
|
|
@ -371,13 +379,21 @@ func TestGetRunnerScaleSetMessage_HandleFailed(t *testing.T) {
|
||||||
})
|
})
|
||||||
require.NoError(t, err, "Error creating autoscaler client")
|
require.NoError(t, err, "Error creating autoscaler client")
|
||||||
|
|
||||||
|
// read initial message
|
||||||
|
err = asClient.GetRunnerScaleSetMessage(ctx, func(msg *actions.RunnerScaleSetMessage) error {
|
||||||
|
logger.Info("Message received", "messageId", msg.MessageId, "messageType", msg.MessageType, "body", msg.Body)
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
|
||||||
|
assert.NoError(t, err, "Error getting message")
|
||||||
|
|
||||||
err = asClient.GetRunnerScaleSetMessage(ctx, func(msg *actions.RunnerScaleSetMessage) error {
|
err = asClient.GetRunnerScaleSetMessage(ctx, func(msg *actions.RunnerScaleSetMessage) error {
|
||||||
logger.Info("Message received", "messageId", msg.MessageId, "messageType", msg.MessageType, "body", msg.Body)
|
logger.Info("Message received", "messageId", msg.MessageId, "messageType", msg.MessageType, "body", msg.Body)
|
||||||
return fmt.Errorf("error")
|
return fmt.Errorf("error")
|
||||||
})
|
})
|
||||||
|
|
||||||
assert.ErrorContains(t, err, "handle message failed. error", "Error getting message")
|
assert.ErrorContains(t, err, "handle message failed. error", "Error getting message")
|
||||||
assert.Equal(t, int64(0), asClient.lastMessageId, "Last message id should be updated")
|
assert.Equal(t, int64(0), asClient.lastMessageId, "Last message id should not be updated")
|
||||||
assert.True(t, mockActionsClient.AssertExpectations(t), "All expectations should be met")
|
assert.True(t, mockActionsClient.AssertExpectations(t), "All expectations should be met")
|
||||||
assert.True(t, mockSessionClient.AssertExpectations(t), "All expectations should be met")
|
assert.True(t, mockSessionClient.AssertExpectations(t), "All expectations should be met")
|
||||||
}
|
}
|
||||||
|
|
@ -513,6 +529,12 @@ func TestGetRunnerScaleSetMessage_RetryUntilGetMessage(t *testing.T) {
|
||||||
})
|
})
|
||||||
require.NoError(t, err, "Error creating autoscaler client")
|
require.NoError(t, err, "Error creating autoscaler client")
|
||||||
|
|
||||||
|
err = asClient.GetRunnerScaleSetMessage(ctx, func(msg *actions.RunnerScaleSetMessage) error {
|
||||||
|
logger.Info("Message received", "messageId", msg.MessageId, "messageType", msg.MessageType, "body", msg.Body)
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
assert.NoError(t, err, "Error getting initial message")
|
||||||
|
|
||||||
err = asClient.GetRunnerScaleSetMessage(ctx, func(msg *actions.RunnerScaleSetMessage) error {
|
err = asClient.GetRunnerScaleSetMessage(ctx, func(msg *actions.RunnerScaleSetMessage) error {
|
||||||
logger.Info("Message received", "messageId", msg.MessageId, "messageType", msg.MessageType, "body", msg.Body)
|
logger.Info("Message received", "messageId", msg.MessageId, "messageType", msg.MessageType, "body", msg.Body)
|
||||||
return nil
|
return nil
|
||||||
|
|
@ -550,6 +572,12 @@ func TestGetRunnerScaleSetMessage_ErrorOnGetMessage(t *testing.T) {
|
||||||
})
|
})
|
||||||
require.NoError(t, err, "Error creating autoscaler client")
|
require.NoError(t, err, "Error creating autoscaler client")
|
||||||
|
|
||||||
|
// process initial message
|
||||||
|
err = asClient.GetRunnerScaleSetMessage(ctx, func(msg *actions.RunnerScaleSetMessage) error {
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
assert.NoError(t, err, "Error getting initial message")
|
||||||
|
|
||||||
err = asClient.GetRunnerScaleSetMessage(ctx, func(msg *actions.RunnerScaleSetMessage) error {
|
err = asClient.GetRunnerScaleSetMessage(ctx, func(msg *actions.RunnerScaleSetMessage) error {
|
||||||
return fmt.Errorf("Should not be called")
|
return fmt.Errorf("Should not be called")
|
||||||
})
|
})
|
||||||
|
|
@ -592,6 +620,12 @@ func TestDeleteRunnerScaleSetMessage_Error(t *testing.T) {
|
||||||
})
|
})
|
||||||
require.NoError(t, err, "Error creating autoscaler client")
|
require.NoError(t, err, "Error creating autoscaler client")
|
||||||
|
|
||||||
|
err = asClient.GetRunnerScaleSetMessage(ctx, func(msg *actions.RunnerScaleSetMessage) error {
|
||||||
|
logger.Info("Message received", "messageId", msg.MessageId, "messageType", msg.MessageType, "body", msg.Body)
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
assert.NoError(t, err, "Error getting initial message")
|
||||||
|
|
||||||
err = asClient.GetRunnerScaleSetMessage(ctx, func(msg *actions.RunnerScaleSetMessage) error {
|
err = asClient.GetRunnerScaleSetMessage(ctx, func(msg *actions.RunnerScaleSetMessage) error {
|
||||||
logger.Info("Message received", "messageId", msg.MessageId, "messageType", msg.MessageType, "body", msg.Body)
|
logger.Info("Message received", "messageId", msg.MessageId, "messageType", msg.MessageType, "body", msg.Body)
|
||||||
return nil
|
return nil
|
||||||
|
|
|
||||||
|
|
@ -39,7 +39,7 @@ func NewService(
|
||||||
rsClient: rsClient,
|
rsClient: rsClient,
|
||||||
kubeManager: manager,
|
kubeManager: manager,
|
||||||
settings: settings,
|
settings: settings,
|
||||||
currentRunnerCount: 0,
|
currentRunnerCount: -1, // force patch on startup
|
||||||
logger: logr.FromContextOrDiscard(ctx),
|
logger: logr.FromContextOrDiscard(ctx),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -51,14 +51,6 @@ func NewService(
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Service) Start() error {
|
func (s *Service) Start() error {
|
||||||
if s.settings.MinRunners > 0 {
|
|
||||||
s.logger.Info("scale to match minimal runners.")
|
|
||||||
err := s.scaleForAssignedJobCount(0)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("could not scale to match minimal runners. %w", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
for {
|
for {
|
||||||
s.logger.Info("waiting for message...")
|
s.logger.Info("waiting for message...")
|
||||||
select {
|
select {
|
||||||
|
|
@ -94,6 +86,10 @@ func (s *Service) processMessage(message *actions.RunnerScaleSetMessage) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if message.MessageId == 0 && message.Body == "" { // initial message with statistics only
|
||||||
|
return s.scaleForAssignedJobCount(message.Statistics.TotalAssignedJobs)
|
||||||
|
}
|
||||||
|
|
||||||
var batchedMessages []json.RawMessage
|
var batchedMessages []json.RawMessage
|
||||||
if err := json.NewDecoder(strings.NewReader(message.Body)).Decode(&batchedMessages); err != nil {
|
if err := json.NewDecoder(strings.NewReader(message.Body)).Decode(&batchedMessages); err != nil {
|
||||||
return fmt.Errorf("could not decode job messages. %w", err)
|
return fmt.Errorf("could not decode job messages. %w", err)
|
||||||
|
|
|
||||||
|
|
@ -72,6 +72,7 @@ func TestStart(t *testing.T) {
|
||||||
|
|
||||||
func TestStart_ScaleToMinRunners(t *testing.T) {
|
func TestStart_ScaleToMinRunners(t *testing.T) {
|
||||||
mockRsClient := &MockRunnerScaleSetClient{}
|
mockRsClient := &MockRunnerScaleSetClient{}
|
||||||
|
|
||||||
mockKubeManager := &MockKubernetesManager{}
|
mockKubeManager := &MockKubernetesManager{}
|
||||||
logger, log_err := logging.NewLogger(logging.LogLevelDebug, logging.LogFormatText)
|
logger, log_err := logging.NewLogger(logging.LogLevelDebug, logging.LogFormatText)
|
||||||
logger = logger.WithName(t.Name())
|
logger = logger.WithName(t.Name())
|
||||||
|
|
@ -92,6 +93,11 @@ func TestStart_ScaleToMinRunners(t *testing.T) {
|
||||||
s.logger = logger
|
s.logger = logger
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
|
||||||
|
mockRsClient.On("GetRunnerScaleSetMessage", ctx, mock.Anything).Run(func(args mock.Arguments) {
|
||||||
|
_ = service.scaleForAssignedJobCount(5)
|
||||||
|
}).Return(nil)
|
||||||
|
|
||||||
mockKubeManager.On("ScaleEphemeralRunnerSet", ctx, service.settings.Namespace, service.settings.ResourceName, 5).Run(func(args mock.Arguments) { cancel() }).Return(nil).Once()
|
mockKubeManager.On("ScaleEphemeralRunnerSet", ctx, service.settings.Namespace, service.settings.ResourceName, 5).Run(func(args mock.Arguments) { cancel() }).Return(nil).Once()
|
||||||
|
|
||||||
err := service.Start()
|
err := service.Start()
|
||||||
|
|
@ -124,11 +130,15 @@ func TestStart_ScaleToMinRunnersFailed(t *testing.T) {
|
||||||
s.logger = logger
|
s.logger = logger
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
mockKubeManager.On("ScaleEphemeralRunnerSet", ctx, service.settings.Namespace, service.settings.ResourceName, 5).Return(fmt.Errorf("error")).Once()
|
|
||||||
|
c := mockKubeManager.On("ScaleEphemeralRunnerSet", ctx, service.settings.Namespace, service.settings.ResourceName, 5).Return(fmt.Errorf("error")).Once()
|
||||||
|
mockRsClient.On("GetRunnerScaleSetMessage", ctx, mock.Anything).Run(func(args mock.Arguments) {
|
||||||
|
_ = service.scaleForAssignedJobCount(5)
|
||||||
|
}).Return(c.ReturnArguments.Get(0))
|
||||||
|
|
||||||
err := service.Start()
|
err := service.Start()
|
||||||
|
|
||||||
assert.ErrorContains(t, err, "could not scale to match minimal runners", "Unexpected error")
|
assert.ErrorContains(t, err, "could not get and process message", "Unexpected error")
|
||||||
assert.True(t, mockRsClient.AssertExpectations(t), "All expectations should be met")
|
assert.True(t, mockRsClient.AssertExpectations(t), "All expectations should be met")
|
||||||
assert.True(t, mockKubeManager.AssertExpectations(t), "All expectations should be met")
|
assert.True(t, mockKubeManager.AssertExpectations(t), "All expectations should be met")
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue