Fix acquire jobs after session refresh ghalistener (#3307)
This commit is contained in:
parent
e06c7edc21
commit
7da2d7f96a
|
|
@ -384,9 +384,9 @@ func (l *Listener) acquireAvailableJobs(ctx context.Context, jobsAvailable []*ac
|
||||||
|
|
||||||
l.logger.Info("Acquiring jobs", "count", len(ids), "requestIds", fmt.Sprint(ids))
|
l.logger.Info("Acquiring jobs", "count", len(ids), "requestIds", fmt.Sprint(ids))
|
||||||
|
|
||||||
ids, err := l.client.AcquireJobs(ctx, l.scaleSetID, l.session.MessageQueueAccessToken, ids)
|
idsAcquired, err := l.client.AcquireJobs(ctx, l.scaleSetID, l.session.MessageQueueAccessToken, ids)
|
||||||
if err == nil { // if NO errors
|
if err == nil { // if NO errors
|
||||||
return ids, nil
|
return idsAcquired, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
expiredError := &actions.MessageQueueTokenExpiredError{}
|
expiredError := &actions.MessageQueueTokenExpiredError{}
|
||||||
|
|
@ -398,12 +398,12 @@ func (l *Listener) acquireAvailableJobs(ctx context.Context, jobsAvailable []*ac
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
ids, err = l.client.AcquireJobs(ctx, l.scaleSetID, l.session.MessageQueueAccessToken, ids)
|
idsAcquired, err = l.client.AcquireJobs(ctx, l.scaleSetID, l.session.MessageQueueAccessToken, ids)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("failed to acquire jobs after session refresh: %w", err)
|
return nil, fmt.Errorf("failed to acquire jobs after session refresh: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return ids, nil
|
return idsAcquired, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *Listener) refreshSession(ctx context.Context) error {
|
func (l *Listener) refreshSession(ctx context.Context) error {
|
||||||
|
|
|
||||||
|
|
@ -628,9 +628,6 @@ func TestListener_acquireAvailableJobs(t *testing.T) {
|
||||||
}
|
}
|
||||||
client.On("RefreshMessageSession", ctx, mock.Anything, mock.Anything).Return(session, nil).Once()
|
client.On("RefreshMessageSession", ctx, mock.Anything, mock.Anything).Return(session, nil).Once()
|
||||||
|
|
||||||
// First call to AcquireJobs will fail with a token expired error
|
|
||||||
client.On("AcquireJobs", ctx, mock.Anything, mock.Anything, mock.Anything).Return(nil, &actions.MessageQueueTokenExpiredError{}).Once()
|
|
||||||
|
|
||||||
// Second call to AcquireJobs will succeed
|
// Second call to AcquireJobs will succeed
|
||||||
want := []int64{1, 2, 3}
|
want := []int64{1, 2, 3}
|
||||||
availableJobs := []*actions.JobAvailable{
|
availableJobs := []*actions.JobAvailable{
|
||||||
|
|
@ -650,7 +647,24 @@ func TestListener_acquireAvailableJobs(t *testing.T) {
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
client.On("AcquireJobs", ctx, mock.Anything, mock.Anything, mock.Anything).Return(want, nil).Once()
|
|
||||||
|
// First call to AcquireJobs will fail with a token expired error
|
||||||
|
client.On("AcquireJobs", ctx, mock.Anything, mock.Anything, mock.Anything).
|
||||||
|
Run(func(args mock.Arguments) {
|
||||||
|
ids := args.Get(3).([]int64)
|
||||||
|
assert.Equal(t, want, ids)
|
||||||
|
}).
|
||||||
|
Return(nil, &actions.MessageQueueTokenExpiredError{}).
|
||||||
|
Once()
|
||||||
|
|
||||||
|
// First call to AcquireJobs will fail with a token expired error
|
||||||
|
client.On("AcquireJobs", ctx, mock.Anything, mock.Anything, mock.Anything).
|
||||||
|
Run(func(args mock.Arguments) {
|
||||||
|
ids := args.Get(3).([]int64)
|
||||||
|
assert.Equal(t, want, ids)
|
||||||
|
}).
|
||||||
|
Return(want, nil).
|
||||||
|
Once()
|
||||||
|
|
||||||
config.Client = client
|
config.Client = client
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue