Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix StopAndTest after ongoing Stop #376

Merged
merged 2 commits into from
Jun 9, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
refactor StopAndCancel tests
  • Loading branch information
bgentry committed Jun 9, 2024
commit 3f8df29cf05d8049f7de3a5ef67635faebb24dd4
174 changes: 53 additions & 121 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -641,88 +641,6 @@ func Test_Client(t *testing.T) {

startstoptest.StressErr(ctx, t, clientWithStop, rivercommon.ErrShutdown)
})

t.Run("StopAndCancel", func(t *testing.T) {
t.Parallel()

type testBundle struct {
jobDoneChan chan struct{}
jobStartedChan chan int64
}

setupStopAndCancel := func(t *testing.T) (*Client[pgx.Tx], *testBundle) {
t.Helper()

client, _ := setup(t)
jobStartedChan := make(chan int64)
jobDoneChan := make(chan struct{})

type JobArgs struct {
JobArgsReflectKind[JobArgs]
}

AddWorker(client.config.Workers, WorkFunc(func(ctx context.Context, job *Job[JobArgs]) error {
jobStartedChan <- job.ID
<-ctx.Done()
require.ErrorIs(t, context.Cause(ctx), rivercommon.ErrShutdown)
close(jobDoneChan)
return nil
}))

startClient(ctx, t, client)

insertRes, err := client.Insert(ctx, &JobArgs{}, nil)
require.NoError(t, err)

startedJobID := riverinternaltest.WaitOrTimeout(t, jobStartedChan)
require.Equal(t, insertRes.Job.ID, startedJobID)

select {
case <-client.Stopped():
t.Fatal("expected client to not be stopped yet")
default:
}

return client, &testBundle{
jobDoneChan: jobDoneChan,
jobStartedChan: jobStartedChan,
}
}

t.Run("OnItsOwn", func(t *testing.T) {
t.Parallel()

client, _ := setupStopAndCancel(t)

require.NoError(t, client.StopAndCancel(ctx))
riverinternaltest.WaitOrTimeout(t, client.Stopped())
})

t.Run("AfterStop", func(t *testing.T) {
t.Parallel()

client, bundle := setupStopAndCancel(t)

go func() {
require.NoError(t, client.Stop(ctx))
}()

select {
case <-client.Stopped():
t.Fatal("expected client to not be stopped yet")
case <-time.After(500 * time.Millisecond):
}

require.NoError(t, client.StopAndCancel(ctx))
riverinternaltest.WaitOrTimeout(t, client.Stopped())

select {
case <-bundle.jobDoneChan:
default:
t.Fatal("expected job to be have exited")
}
})
})
}

func Test_Client_Stop(t *testing.T) {
Expand Down Expand Up @@ -922,66 +840,80 @@ func Test_Client_StopAndCancel(t *testing.T) {

ctx := context.Background()

t.Run("jobs in progress, only completing when context is canceled", func(t *testing.T) {
t.Parallel()

jobDoneChan := make(chan struct{})
jobStartedChan := make(chan int64)
type testBundle struct {
jobDoneChan chan struct{}
jobStartedChan chan int64
}

callbackFunc := func(ctx context.Context, job *Job[callbackArgs]) error {
defer close(jobDoneChan)
setup := func(t *testing.T) (*Client[pgx.Tx], *testBundle) {
t.Helper()

// indicate the job has started, unless context is already done:
select {
case <-ctx.Done():
return ctx.Err()
case jobStartedChan <- job.ID:
}
jobStartedChan := make(chan int64)
jobDoneChan := make(chan struct{})

config := newTestConfig(t, func(ctx context.Context, job *Job[callbackArgs]) error {
jobStartedChan <- job.ID
t.Logf("Job waiting for context cancellation")
defer t.Logf("Job finished")

select {
case <-ctx.Done(): // don't stop running until context is canceled
case <-time.After(10 * time.Second):
require.FailNow(t, "Job should've been cancelled by now")
}

<-ctx.Done()
require.ErrorIs(t, context.Cause(ctx), rivercommon.ErrShutdown)
t.Logf("Job context done, closing chan and returning")
close(jobDoneChan)
return nil
}
config := newTestConfig(t, callbackFunc)
})

client := runNewTestClient(ctx, t, config)

insertRes, err := client.Insert(ctx, callbackArgs{}, nil)
insertRes, err := client.Insert(ctx, &callbackArgs{}, nil)
require.NoError(t, err)

startedJobID := riverinternaltest.WaitOrTimeout(t, jobStartedChan)
require.Equal(t, insertRes.Job.ID, startedJobID)

t.Logf("Initiating hard stop, while jobs are still in progress")
select {
case <-client.Stopped():
t.Fatal("expected client to not be stopped yet")
default:
}

return client, &testBundle{
jobDoneChan: jobDoneChan,
jobStartedChan: jobStartedChan,
}
}

stopCtx, stopCancel := context.WithTimeout(ctx, time.Second)
t.Cleanup(stopCancel)
t.Run("OnItsOwn", func(t *testing.T) {
t.Parallel()

stopStartedAt := time.Now()
client, _ := setup(t)

err = client.StopAndCancel(stopCtx)
require.NoError(t, err)
require.NoError(t, client.StopAndCancel(ctx))
riverinternaltest.WaitOrTimeout(t, client.Stopped())
})

t.Run("AfterStop", func(t *testing.T) {
t.Parallel()

t.Logf("Waiting on job to be done")
client, bundle := setup(t)

go func() {
require.NoError(t, client.Stop(ctx))
}()

select {
case <-jobDoneChan:
default:
require.FailNow(t, "Expected job to be done before stop returns")
case <-client.Stopped():
t.Fatal("expected client to not be stopped yet")
case <-time.After(500 * time.Millisecond):
}

// Stop should be ~immediate:
//
// TODO: client stop seems to take a widely variable amount of time,
// between 1ms and >50ms, due to the JobComplete query taking that long.
// Investigate and solve that if we can, or consider reworking this test.
require.WithinDuration(t, time.Now(), stopStartedAt, 200*time.Millisecond)
require.NoError(t, client.StopAndCancel(ctx))
riverinternaltest.WaitOrTimeout(t, client.Stopped())

select {
case <-bundle.jobDoneChan:
default:
t.Fatal("expected job to have exited")
}
})
}

Expand Down
Loading