Refresh session if token expires during delete message (#3529)
This commit is contained in:
		
							parent
							
								
									ab92e4edc3
								
							
						
					
					
						commit
						3bda9bb240
					
				|  | @ -295,8 +295,23 @@ func (l *Listener) getMessage(ctx context.Context) (*actions.RunnerScaleSetMessa | ||||||
| 
 | 
 | ||||||
| func (l *Listener) deleteLastMessage(ctx context.Context) error { | func (l *Listener) deleteLastMessage(ctx context.Context) error { | ||||||
| 	l.logger.Info("Deleting last message", "lastMessageID", l.lastMessageID) | 	l.logger.Info("Deleting last message", "lastMessageID", l.lastMessageID) | ||||||
| 	if err := l.client.DeleteMessage(ctx, l.session.MessageQueueUrl, l.session.MessageQueueAccessToken, l.lastMessageID); err != nil { | 	err := l.client.DeleteMessage(ctx, l.session.MessageQueueUrl, l.session.MessageQueueAccessToken, l.lastMessageID) | ||||||
| 		return fmt.Errorf("failed to delete message: %w", err) | 	if err == nil { // if NO error
 | ||||||
|  | 		return nil | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	expiredError := &actions.MessageQueueTokenExpiredError{} | ||||||
|  | 	if !errors.As(err, &expiredError) { | ||||||
|  | 		return fmt.Errorf("failed to delete last message: %w", err) | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	if err := l.refreshSession(ctx); err != nil { | ||||||
|  | 		return err | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	err = l.client.DeleteMessage(ctx, l.session.MessageQueueUrl, l.session.MessageQueueAccessToken, l.lastMessageID) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return fmt.Errorf("failed to delete last message after message session refresh: %w", err) | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	return nil | 	return nil | ||||||
|  |  | ||||||
|  | @ -377,6 +377,93 @@ func TestListener_deleteLastMessage(t *testing.T) { | ||||||
| 		err = l.deleteLastMessage(ctx) | 		err = l.deleteLastMessage(ctx) | ||||||
| 		assert.NotNil(t, err) | 		assert.NotNil(t, err) | ||||||
| 	}) | 	}) | ||||||
|  | 
 | ||||||
|  | 	t.Run("RefreshAndSucceeds", func(t *testing.T) { | ||||||
|  | 		t.Parallel() | ||||||
|  | 
 | ||||||
|  | 		ctx := context.Background() | ||||||
|  | 		config := Config{ | ||||||
|  | 			ScaleSetID: 1, | ||||||
|  | 			Metrics:    metrics.Discard, | ||||||
|  | 		} | ||||||
|  | 
 | ||||||
|  | 		client := listenermocks.NewClient(t) | ||||||
|  | 
 | ||||||
|  | 		newUUID := uuid.New() | ||||||
|  | 		session := &actions.RunnerScaleSetSession{ | ||||||
|  | 			SessionId:               &newUUID, | ||||||
|  | 			OwnerName:               "example", | ||||||
|  | 			RunnerScaleSet:          &actions.RunnerScaleSet{}, | ||||||
|  | 			MessageQueueUrl:         "https://example.com", | ||||||
|  | 			MessageQueueAccessToken: "1234567890", | ||||||
|  | 			Statistics:              nil, | ||||||
|  | 		} | ||||||
|  | 		client.On("RefreshMessageSession", ctx, mock.Anything, mock.Anything).Return(session, nil).Once() | ||||||
|  | 
 | ||||||
|  | 		client.On("DeleteMessage", ctx, mock.Anything, mock.Anything, mock.Anything).Return(&actions.MessageQueueTokenExpiredError{}).Once() | ||||||
|  | 
 | ||||||
|  | 		client.On("DeleteMessage", ctx, mock.Anything, mock.Anything, mock.MatchedBy(func(lastMessageID any) bool { | ||||||
|  | 			return lastMessageID.(int64) == int64(5) | ||||||
|  | 		})).Return(nil).Once() | ||||||
|  | 		config.Client = client | ||||||
|  | 
 | ||||||
|  | 		l, err := New(config) | ||||||
|  | 		require.Nil(t, err) | ||||||
|  | 
 | ||||||
|  | 		oldUUID := uuid.New() | ||||||
|  | 		l.session = &actions.RunnerScaleSetSession{ | ||||||
|  | 			SessionId:      &oldUUID, | ||||||
|  | 			RunnerScaleSet: &actions.RunnerScaleSet{}, | ||||||
|  | 		} | ||||||
|  | 		l.lastMessageID = 5 | ||||||
|  | 
 | ||||||
|  | 		config.Client = client | ||||||
|  | 
 | ||||||
|  | 		err = l.deleteLastMessage(ctx) | ||||||
|  | 		assert.NoError(t, err) | ||||||
|  | 	}) | ||||||
|  | 
 | ||||||
|  | 	t.Run("RefreshAndFails", func(t *testing.T) { | ||||||
|  | 		t.Parallel() | ||||||
|  | 
 | ||||||
|  | 		ctx := context.Background() | ||||||
|  | 		config := Config{ | ||||||
|  | 			ScaleSetID: 1, | ||||||
|  | 			Metrics:    metrics.Discard, | ||||||
|  | 		} | ||||||
|  | 
 | ||||||
|  | 		client := listenermocks.NewClient(t) | ||||||
|  | 
 | ||||||
|  | 		newUUID := uuid.New() | ||||||
|  | 		session := &actions.RunnerScaleSetSession{ | ||||||
|  | 			SessionId:               &newUUID, | ||||||
|  | 			OwnerName:               "example", | ||||||
|  | 			RunnerScaleSet:          &actions.RunnerScaleSet{}, | ||||||
|  | 			MessageQueueUrl:         "https://example.com", | ||||||
|  | 			MessageQueueAccessToken: "1234567890", | ||||||
|  | 			Statistics:              nil, | ||||||
|  | 		} | ||||||
|  | 		client.On("RefreshMessageSession", ctx, mock.Anything, mock.Anything).Return(session, nil).Once() | ||||||
|  | 
 | ||||||
|  | 		client.On("DeleteMessage", ctx, mock.Anything, mock.Anything, mock.Anything).Return(&actions.MessageQueueTokenExpiredError{}).Twice() | ||||||
|  | 
 | ||||||
|  | 		config.Client = client | ||||||
|  | 
 | ||||||
|  | 		l, err := New(config) | ||||||
|  | 		require.Nil(t, err) | ||||||
|  | 
 | ||||||
|  | 		oldUUID := uuid.New() | ||||||
|  | 		l.session = &actions.RunnerScaleSetSession{ | ||||||
|  | 			SessionId:      &oldUUID, | ||||||
|  | 			RunnerScaleSet: &actions.RunnerScaleSet{}, | ||||||
|  | 		} | ||||||
|  | 		l.lastMessageID = 5 | ||||||
|  | 
 | ||||||
|  | 		config.Client = client | ||||||
|  | 
 | ||||||
|  | 		err = l.deleteLastMessage(ctx) | ||||||
|  | 		assert.Error(t, err) | ||||||
|  | 	}) | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func TestListener_Listen(t *testing.T) { | func TestListener_Listen(t *testing.T) { | ||||||
|  |  | ||||||
		Loading…
	
		Reference in New Issue