diff --git a/cmd/githubrunnerscalesetlistener/autoScalerMessageListener.go b/cmd/githubrunnerscalesetlistener/autoScalerMessageListener.go index 2c8b5301..0d7f5a2b 100644 --- a/cmd/githubrunnerscalesetlistener/autoScalerMessageListener.go +++ b/cmd/githubrunnerscalesetlistener/autoScalerMessageListener.go @@ -114,7 +114,14 @@ func createSession(ctx context.Context, logger *logr.Logger, client actions.Acti return runnerScaleSetSession, initialMessage, nil } - return runnerScaleSetSession, nil, nil + initialMessage := &actions.RunnerScaleSetMessage{ + MessageId: 0, + MessageType: "RunnerScaleSetJobMessages", + Statistics: runnerScaleSetSession.Statistics, + Body: "", + } + + return runnerScaleSetSession, initialMessage, nil } func (m *AutoScalerClient) Close() error { diff --git a/cmd/githubrunnerscalesetlistener/autoScalerMessageListener_test.go b/cmd/githubrunnerscalesetlistener/autoScalerMessageListener_test.go index d0615515..2d6ef711 100644 --- a/cmd/githubrunnerscalesetlistener/autoScalerMessageListener_test.go +++ b/cmd/githubrunnerscalesetlistener/autoScalerMessageListener_test.go @@ -37,7 +37,7 @@ func TestCreateSession(t *testing.T) { require.NoError(t, err, "Error creating autoscaler client") assert.Equal(t, session, session, "Session is not correct") - assert.Nil(t, asClient.initialMessage, "Initial message should be nil") + assert.NotNil(t, asClient.initialMessage, "Initial message should not be nil") assert.Equal(t, int64(0), asClient.lastMessageId, "Last message id should be 0") assert.True(t, mockActionsClient.AssertExpectations(t), "All expectations should be met") } @@ -188,7 +188,7 @@ func TestCreateSession_RetrySessionConflict(t *testing.T) { require.NoError(t, err, "Error creating autoscaler client") assert.Equal(t, session, session, "Session is not correct") - assert.Nil(t, asClient.initialMessage, "Initial message should be nil") + assert.NotNil(t, asClient.initialMessage, "Initial message should not be nil") assert.Equal(t, int64(0), asClient.lastMessageId, "Last message id should be 0") assert.True(t, mockActionsClient.AssertExpectations(t), "All expectations should be met") } @@ -334,6 +334,14 @@ func TestGetRunnerScaleSetMessage(t *testing.T) { return nil }) + assert.NoError(t, err, "Error getting message") + assert.Equal(t, int64(0), asClient.lastMessageId, "Initial message") + + err = asClient.GetRunnerScaleSetMessage(ctx, func(msg *actions.RunnerScaleSetMessage) error { + logger.Info("Message received", "messageId", msg.MessageId, "messageType", msg.MessageType, "body", msg.Body) + return nil + }) + assert.NoError(t, err, "Error getting message") assert.Equal(t, int64(1), asClient.lastMessageId, "Last message id should be updated") assert.True(t, mockActionsClient.AssertExpectations(t), "All expectations should be met") @@ -371,13 +379,21 @@ func TestGetRunnerScaleSetMessage_HandleFailed(t *testing.T) { }) require.NoError(t, err, "Error creating autoscaler client") + // read initial message + err = asClient.GetRunnerScaleSetMessage(ctx, func(msg *actions.RunnerScaleSetMessage) error { + logger.Info("Message received", "messageId", msg.MessageId, "messageType", msg.MessageType, "body", msg.Body) + return nil + }) + + assert.NoError(t, err, "Error getting message") + err = asClient.GetRunnerScaleSetMessage(ctx, func(msg *actions.RunnerScaleSetMessage) error { logger.Info("Message received", "messageId", msg.MessageId, "messageType", msg.MessageType, "body", msg.Body) return fmt.Errorf("error") }) assert.ErrorContains(t, err, "handle message failed. error", "Error getting message") - assert.Equal(t, int64(0), asClient.lastMessageId, "Last message id should be updated") + assert.Equal(t, int64(0), asClient.lastMessageId, "Last message id should not be updated") assert.True(t, mockActionsClient.AssertExpectations(t), "All expectations should be met") assert.True(t, mockSessionClient.AssertExpectations(t), "All expectations should be met") } @@ -513,6 +529,12 @@ func TestGetRunnerScaleSetMessage_RetryUntilGetMessage(t *testing.T) { }) require.NoError(t, err, "Error creating autoscaler client") + err = asClient.GetRunnerScaleSetMessage(ctx, func(msg *actions.RunnerScaleSetMessage) error { + logger.Info("Message received", "messageId", msg.MessageId, "messageType", msg.MessageType, "body", msg.Body) + return nil + }) + assert.NoError(t, err, "Error getting initial message") + err = asClient.GetRunnerScaleSetMessage(ctx, func(msg *actions.RunnerScaleSetMessage) error { logger.Info("Message received", "messageId", msg.MessageId, "messageType", msg.MessageType, "body", msg.Body) return nil @@ -550,6 +572,12 @@ func TestGetRunnerScaleSetMessage_ErrorOnGetMessage(t *testing.T) { }) require.NoError(t, err, "Error creating autoscaler client") + // process initial message + err = asClient.GetRunnerScaleSetMessage(ctx, func(msg *actions.RunnerScaleSetMessage) error { + return nil + }) + assert.NoError(t, err, "Error getting initial message") + err = asClient.GetRunnerScaleSetMessage(ctx, func(msg *actions.RunnerScaleSetMessage) error { return fmt.Errorf("Should not be called") }) @@ -592,6 +620,12 @@ func TestDeleteRunnerScaleSetMessage_Error(t *testing.T) { }) require.NoError(t, err, "Error creating autoscaler client") + err = asClient.GetRunnerScaleSetMessage(ctx, func(msg *actions.RunnerScaleSetMessage) error { + logger.Info("Message received", "messageId", msg.MessageId, "messageType", msg.MessageType, "body", msg.Body) + return nil + }) + assert.NoError(t, err, "Error getting initial message") + err = asClient.GetRunnerScaleSetMessage(ctx, func(msg *actions.RunnerScaleSetMessage) error { logger.Info("Message received", "messageId", msg.MessageId, "messageType", msg.MessageType, "body", msg.Body) return nil diff --git a/cmd/githubrunnerscalesetlistener/autoScalerService.go b/cmd/githubrunnerscalesetlistener/autoScalerService.go index dd5ccaa3..57589db3 100644 --- a/cmd/githubrunnerscalesetlistener/autoScalerService.go +++ b/cmd/githubrunnerscalesetlistener/autoScalerService.go @@ -39,7 +39,7 @@ func NewService( rsClient: rsClient, kubeManager: manager, settings: settings, - currentRunnerCount: 0, + currentRunnerCount: -1, // force patch on startup logger: logr.FromContextOrDiscard(ctx), } @@ -51,14 +51,6 @@ func NewService( } func (s *Service) Start() error { - if s.settings.MinRunners > 0 { - s.logger.Info("scale to match minimal runners.") - err := s.scaleForAssignedJobCount(0) - if err != nil { - return fmt.Errorf("could not scale to match minimal runners. %w", err) - } - } - for { s.logger.Info("waiting for message...") select { @@ -94,6 +86,10 @@ func (s *Service) processMessage(message *actions.RunnerScaleSetMessage) error { return nil } + if message.MessageId == 0 && message.Body == "" { // initial message with statistics only + return s.scaleForAssignedJobCount(message.Statistics.TotalAssignedJobs) + } + var batchedMessages []json.RawMessage if err := json.NewDecoder(strings.NewReader(message.Body)).Decode(&batchedMessages); err != nil { return fmt.Errorf("could not decode job messages. %w", err) diff --git a/cmd/githubrunnerscalesetlistener/autoScalerService_test.go b/cmd/githubrunnerscalesetlistener/autoScalerService_test.go index 6581859a..a1b0e213 100644 --- a/cmd/githubrunnerscalesetlistener/autoScalerService_test.go +++ b/cmd/githubrunnerscalesetlistener/autoScalerService_test.go @@ -72,6 +72,7 @@ func TestStart(t *testing.T) { func TestStart_ScaleToMinRunners(t *testing.T) { mockRsClient := &MockRunnerScaleSetClient{} + mockKubeManager := &MockKubernetesManager{} logger, log_err := logging.NewLogger(logging.LogLevelDebug, logging.LogFormatText) logger = logger.WithName(t.Name()) @@ -92,6 +93,11 @@ func TestStart_ScaleToMinRunners(t *testing.T) { s.logger = logger }, ) + + mockRsClient.On("GetRunnerScaleSetMessage", ctx, mock.Anything).Run(func(args mock.Arguments) { + _ = service.scaleForAssignedJobCount(5) + }).Return(nil) + mockKubeManager.On("ScaleEphemeralRunnerSet", ctx, service.settings.Namespace, service.settings.ResourceName, 5).Run(func(args mock.Arguments) { cancel() }).Return(nil).Once() err := service.Start() @@ -124,11 +130,15 @@ func TestStart_ScaleToMinRunnersFailed(t *testing.T) { s.logger = logger }, ) - mockKubeManager.On("ScaleEphemeralRunnerSet", ctx, service.settings.Namespace, service.settings.ResourceName, 5).Return(fmt.Errorf("error")).Once() + + c := mockKubeManager.On("ScaleEphemeralRunnerSet", ctx, service.settings.Namespace, service.settings.ResourceName, 5).Return(fmt.Errorf("error")).Once() + mockRsClient.On("GetRunnerScaleSetMessage", ctx, mock.Anything).Run(func(args mock.Arguments) { + _ = service.scaleForAssignedJobCount(5) + }).Return(c.ReturnArguments.Get(0)) err := service.Start() - assert.ErrorContains(t, err, "could not scale to match minimal runners", "Unexpected error") + assert.ErrorContains(t, err, "could not get and process message", "Unexpected error") assert.True(t, mockRsClient.AssertExpectations(t), "All expectations should be met") assert.True(t, mockKubeManager.AssertExpectations(t), "All expectations should be met") }