Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix StopAndTest after ongoing Stop #376

Merged
merged 2 commits into from
Jun 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

- The default max attempts of 25 can now be customized on a per-client basis using `Config.MaxAttempts`. This is in addition to the ability to customize at the job type level with `JobArgs`, or on a per-job basis using `InsertOpts`. [PR #383](https://github.com/riverqueue/river/pull/383).

### Fixed

- Fix `StopAndCancel` to not hang if called in parallel to an ongoing `Stop` call. [PR #376](https://github.com/riverqueue/river/pull/376).

## [0.6.1] - 2024-05-21

### Fixed
Expand Down
6 changes: 3 additions & 3 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -810,14 +810,14 @@ func (c *Client[TTx]) Stop(ctx context.Context) error {
// no need to call this method if the context passed to Run is cancelled
// instead.
func (c *Client[TTx]) StopAndCancel(ctx context.Context) error {
c.baseService.Logger.InfoContext(ctx, c.baseService.Name+": Hard stop started; cancelling all work")
c.workCancel(rivercommon.ErrShutdown)

shouldStop, stopped, finalizeStop := c.baseStartStop.StopInit()
if !shouldStop {
return nil
}

c.baseService.Logger.InfoContext(ctx, c.baseService.Name+": Hard stop started; cancelling all work")
c.workCancel(rivercommon.ErrShutdown)

select {
case <-ctx.Done(): // stop context cancelled
finalizeStop(false) // not stopped; allow Stop to be called again
Expand Down
130 changes: 53 additions & 77 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -641,44 +641,6 @@ func Test_Client(t *testing.T) {

startstoptest.StressErr(ctx, t, clientWithStop, rivercommon.ErrShutdown)
})

t.Run("StopAndCancel", func(t *testing.T) {
t.Parallel()

client, _ := setup(t)
jobStartedChan := make(chan int64)
jobDoneChan := make(chan struct{})

type JobArgs struct {
JobArgsReflectKind[JobArgs]
}

AddWorker(client.config.Workers, WorkFunc(func(ctx context.Context, job *Job[JobArgs]) error {
jobStartedChan <- job.ID
<-ctx.Done()
require.ErrorIs(t, context.Cause(ctx), rivercommon.ErrShutdown)
close(jobDoneChan)
return nil
}))

startClient(ctx, t, client)

insertRes, err := client.Insert(ctx, &JobArgs{}, nil)
require.NoError(t, err)

startedJobID := riverinternaltest.WaitOrTimeout(t, jobStartedChan)
require.Equal(t, insertRes.Job.ID, startedJobID)

select {
case <-client.Stopped():
t.Fatal("expected client to not be stopped yet")
default:
}

require.NoError(t, client.StopAndCancel(ctx))

riverinternaltest.WaitOrTimeout(t, client.Stopped())
})
}

func Test_Client_Stop(t *testing.T) {
Expand Down Expand Up @@ -878,66 +840,80 @@ func Test_Client_StopAndCancel(t *testing.T) {

ctx := context.Background()

t.Run("jobs in progress, only completing when context is canceled", func(t *testing.T) {
t.Parallel()

jobDoneChan := make(chan struct{})
jobStartedChan := make(chan int64)
type testBundle struct {
jobDoneChan chan struct{}
jobStartedChan chan int64
}

callbackFunc := func(ctx context.Context, job *Job[callbackArgs]) error {
defer close(jobDoneChan)
setup := func(t *testing.T) (*Client[pgx.Tx], *testBundle) {
t.Helper()

// indicate the job has started, unless context is already done:
select {
case <-ctx.Done():
return ctx.Err()
case jobStartedChan <- job.ID:
}
jobStartedChan := make(chan int64)
jobDoneChan := make(chan struct{})

config := newTestConfig(t, func(ctx context.Context, job *Job[callbackArgs]) error {
jobStartedChan <- job.ID
t.Logf("Job waiting for context cancellation")
defer t.Logf("Job finished")

select {
case <-ctx.Done(): // don't stop running until context is canceled
case <-time.After(10 * time.Second):
require.FailNow(t, "Job should've been cancelled by now")
}

<-ctx.Done()
require.ErrorIs(t, context.Cause(ctx), rivercommon.ErrShutdown)
t.Logf("Job context done, closing chan and returning")
close(jobDoneChan)
return nil
}
config := newTestConfig(t, callbackFunc)
})

client := runNewTestClient(ctx, t, config)

insertRes, err := client.Insert(ctx, callbackArgs{}, nil)
insertRes, err := client.Insert(ctx, &callbackArgs{}, nil)
require.NoError(t, err)

startedJobID := riverinternaltest.WaitOrTimeout(t, jobStartedChan)
require.Equal(t, insertRes.Job.ID, startedJobID)

t.Logf("Initiating hard stop, while jobs are still in progress")
select {
case <-client.Stopped():
t.Fatal("expected client to not be stopped yet")
default:
}

stopCtx, stopCancel := context.WithTimeout(ctx, time.Second)
t.Cleanup(stopCancel)
return client, &testBundle{
jobDoneChan: jobDoneChan,
jobStartedChan: jobStartedChan,
}
}

stopStartedAt := time.Now()
t.Run("OnItsOwn", func(t *testing.T) {
t.Parallel()

err = client.StopAndCancel(stopCtx)
require.NoError(t, err)
client, _ := setup(t)

require.NoError(t, client.StopAndCancel(ctx))
riverinternaltest.WaitOrTimeout(t, client.Stopped())
})

t.Logf("Waiting on job to be done")
t.Run("AfterStop", func(t *testing.T) {
t.Parallel()

client, bundle := setup(t)

go func() {
require.NoError(t, client.Stop(ctx))
}()

select {
case <-jobDoneChan:
default:
require.FailNow(t, "Expected job to be done before stop returns")
case <-client.Stopped():
t.Fatal("expected client to not be stopped yet")
case <-time.After(500 * time.Millisecond):
}

// Stop should be ~immediate:
//
// TODO: client stop seems to take a widely variable amount of time,
// between 1ms and >50ms, due to the JobComplete query taking that long.
// Investigate and solve that if we can, or consider reworking this test.
require.WithinDuration(t, time.Now(), stopStartedAt, 200*time.Millisecond)
require.NoError(t, client.StopAndCancel(ctx))
riverinternaltest.WaitOrTimeout(t, client.Stopped())

select {
case <-bundle.jobDoneChan:
default:
t.Fatal("expected job to have exited")
}
})
}

Expand Down
Loading