Skip to content

Commit

Permalink
Add changes to safely add new Queue
Browse files Browse the repository at this point in the history
  • Loading branch information
PumpkinSeed authored and brandur committed Jul 4, 2024
1 parent 1680f4c commit 16fdc3b
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 9 deletions.
23 changes: 15 additions & 8 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,8 @@ type Client[TTx any] struct {

// workCancel cancels the context used for all work goroutines. Normal Stop
// does not cancel that context.
workCancel context.CancelCauseFunc
workCancel context.CancelCauseFunc
workContext context.Context
}

// Test-only signals.
Expand Down Expand Up @@ -614,14 +615,22 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
return client, nil
}

func (c *Client[TTx]) AddQueue(queueName string, queueConfig QueueConfig) error {
func (c *Client[TTx]) AddQueue(ctx context.Context, queueName string, queueConfig QueueConfig) error {
if err := queueConfig.validate(queueName); err != nil {
return err
}

producerInstance := c.addProducer(queueName, queueConfig)
c.producersByQueueNameMu.Lock()
defer c.producersByQueueNameMu.Unlock()
c.producersByQueueName[queueName] = c.addProducer(queueName, queueConfig)
c.producersByQueueName[queueName] = producerInstance
c.producersByQueueNameMu.Unlock()

fetchCtx, started := c.baseStartStop.IsStarted()
if started {
if err := producerInstance.StartWorkContext(fetchCtx, c.workContext); err != nil {
return err
}
}

return nil
}
Expand Down Expand Up @@ -653,8 +662,6 @@ func (c *Client[TTx]) Start(ctx context.Context) error {
)
}

var workCtx context.Context

// Startup code. Wrapped in a closure so it doesn't have to remember to
// close the stopped channel if returning with an error.
if err := func() error {
Expand Down Expand Up @@ -716,7 +723,7 @@ func (c *Client[TTx]) Start(ctx context.Context) error {
// We use separate contexts for fetching and working to allow for a graceful
// stop. Both inherit from the provided context, so if it's cancelled, a
// more aggressive stop will be initiated.
workCtx, c.workCancel = context.WithCancelCause(withClient[TTx](ctx, c))
c.workContext, c.workCancel = context.WithCancelCause(withClient[TTx](ctx, c))

for _, service := range c.services {
if err := service.Start(fetchCtx); err != nil {
Expand All @@ -728,7 +735,7 @@ func (c *Client[TTx]) Start(ctx context.Context) error {
for _, producer := range c.producersByQueueName {
producer := producer

if err := producer.StartWorkContext(fetchCtx, workCtx); err != nil {
if err := producer.StartWorkContext(fetchCtx, c.workContext); err != nil {
startstop.StopAllParallel(producersAsServices())
stopServicesOnError()
return err
Expand Down
66 changes: 66 additions & 0 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,72 @@ func Test_Client(t *testing.T) {
riverinternaltest.WaitOrTimeout(t, workedChan)
})

t.Run("StartInsertAndWorkAddQueue_BeforeStart", func(t *testing.T) {
t.Parallel()

client, _ := setup(t)

type JobArgs struct {
JobArgsReflectKind[JobArgs]
}

workedChan := make(chan struct{})

AddWorker(client.config.Workers, WorkFunc(func(ctx context.Context, job *Job[JobArgs]) error {
workedChan <- struct{}{}
return nil
}))

queueName := "new_queue"
err := client.AddQueue(ctx, queueName, QueueConfig{
MaxWorkers: 2,
})
if err != nil {
t.Fatal(err)
}
startClient(ctx, t, client)

_, err = client.Insert(ctx, &JobArgs{}, &InsertOpts{
Queue: queueName,
})
require.NoError(t, err)

riverinternaltest.WaitOrTimeout(t, workedChan)
})

t.Run("StartInsertAndWorkAddQueue_AfterStart", func(t *testing.T) {
t.Parallel()

client, _ := setup(t)

type JobArgs struct {
JobArgsReflectKind[JobArgs]
}

workedChan := make(chan struct{})

AddWorker(client.config.Workers, WorkFunc(func(ctx context.Context, job *Job[JobArgs]) error {
workedChan <- struct{}{}
return nil
}))

startClient(ctx, t, client)
queueName := "new_queue"
err := client.AddQueue(ctx, queueName, QueueConfig{
MaxWorkers: 2,
})
if err != nil {
t.Fatal(err)
}

_, err = client.Insert(ctx, &JobArgs{}, &InsertOpts{
Queue: queueName,
})
require.NoError(t, err)

riverinternaltest.WaitOrTimeout(t, workedChan)
})

t.Run("JobCancelErrorReturned", func(t *testing.T) {
t.Parallel()

Expand Down
7 changes: 6 additions & 1 deletion internal/startstop/start_stop.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ type serviceWithStopped interface {
// override it.
type BaseStartStop struct {
cancelFunc context.CancelCauseFunc
context context.Context
mu sync.Mutex
started chan struct{}
stopped chan struct{}
Expand Down Expand Up @@ -113,7 +114,7 @@ func (s *BaseStartStop) StartInit(ctx context.Context) (context.Context, bool, f

s.started = make(chan struct{})
s.stopped = make(chan struct{})
ctx, s.cancelFunc = context.WithCancelCause(ctx)
s.context, s.cancelFunc = context.WithCancelCause(ctx)

closeStartedOnce := sync.OnceFunc(func() { close(s.started) })

Expand All @@ -136,6 +137,10 @@ func (s *BaseStartStop) Started() <-chan struct{} {
return s.started
}

func (s *BaseStartStop) IsStarted() (context.Context, bool) {
return s.context, s.started != nil
}

// Stop is an automatically provided implementation for the maintenance Service
// interface's Stop.
func (s *BaseStartStop) Stop() {
Expand Down

0 comments on commit 16fdc3b

Please sign in to comment.