Skip to content

Commit 2109122

Browse files
authored
Add QueueBundle.Remove() to stop and remove queues (#1235)
Adds dynamic queue removal at runtime. When removed, the queue's producer is stopped and jobs can no longer be worked on that queue. Queues can be re-added after removal. Signed-off-by: Tiago Silva <3629062+tigrato@users.noreply.github.com>
1 parent 7f7d703 commit 2109122

3 files changed

Lines changed: 297 additions & 0 deletions

File tree

client.go

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -768,6 +768,7 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
768768

769769
client.queues = &QueueBundle{
770770
addProducer: client.addProducer,
771+
removeProducer: client.removeProducer,
771772
clientFetchCooldown: config.FetchCooldown,
772773
clientFetchPollInterval: config.FetchPollInterval,
773774
clientWillExecuteJobs: config.willExecuteJobs(),
@@ -2209,6 +2210,21 @@ func (c *Client[TTx]) addProducer(queueName string, queueConfig QueueConfig) (*p
22092210
return producer, nil
22102211
}
22112212

2213+
func (c *Client[TTx]) removeProducer(queueName string) error {
2214+
c.producersMu.Lock()
2215+
defer c.producersMu.Unlock()
2216+
2217+
producer, ok := c.producersByQueueName[queueName]
2218+
if !ok {
2219+
return &QueueNotFoundError{Name: queueName}
2220+
}
2221+
2222+
producer.Stop()
2223+
delete(c.producersByQueueName, queueName)
2224+
2225+
return nil
2226+
}
2227+
22122228
var nameRegex = regexp.MustCompile(`^(?:[a-z0-9])+(?:[_|\-]?[a-z0-9]+)*$`)
22132229

22142230
func validateQueueName(queueName string) error {
@@ -2799,6 +2815,8 @@ type QueueBundle struct {
27992815
// Function that adds a producer to the associated client.
28002816
addProducer func(queueName string, queueConfig QueueConfig) (*producer, error)
28012817

2818+
removeProducer func(queueName string) error
2819+
28022820
clientFetchCooldown time.Duration
28032821
clientFetchPollInterval time.Duration
28042822

@@ -2844,6 +2862,24 @@ func (b *QueueBundle) Add(queueName string, queueConfig QueueConfig) error {
28442862
return nil
28452863
}
28462864

2865+
// Remove removes a queue from the client, stopping the producer if the client
2866+
// is running. The function will block until all jobs currently being worked in
2867+
// the queue have completed. This blocking behavior may affect other operations,
2868+
// including shutdown timing.
2869+
//
2870+
// Returns an error if the client is not configured to execute jobs or if the
2871+
// specified queue does not exist.
2872+
func (b *QueueBundle) Remove(queueName string) error {
2873+
if !b.clientWillExecuteJobs {
2874+
return errors.New("client is not configured to execute jobs, cannot remove queue")
2875+
}
2876+
2877+
b.startStopMu.Lock()
2878+
defer b.startStopMu.Unlock()
2879+
2880+
return b.removeProducer(queueName)
2881+
}
2882+
28472883
// Generates a default client ID using the current hostname and time.
28482884
func defaultClientID(startedAt time.Time) string {
28492885
host, _ := os.Hostname()

client_test.go

Lines changed: 246 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -406,6 +406,252 @@ func Test_Client_Common(t *testing.T) {
406406
wg.Wait()
407407
})
408408

409+
t.Run("Queues_Remove_BeforeStart", func(t *testing.T) {
410+
t.Parallel()
411+
412+
client, _ := setup(t)
413+
414+
type JobArgs struct {
415+
testutil.JobArgsReflectKind[JobArgs]
416+
}
417+
418+
subscribeChan := subscribe(t, client)
419+
420+
AddWorker(client.config.Workers, WorkFunc(func(ctx context.Context, job *Job[JobArgs]) error {
421+
return nil
422+
}))
423+
424+
queueName := "remove_before_start_queue"
425+
err := client.Queues().Add(queueName, QueueConfig{
426+
MaxWorkers: 2,
427+
})
428+
require.NoError(t, err)
429+
430+
err = client.Queues().Remove(queueName)
431+
require.NoError(t, err)
432+
433+
startClient(ctx, t, client)
434+
435+
insertRes, err := client.Insert(ctx, &JobArgs{}, &InsertOpts{
436+
Queue: queueName,
437+
})
438+
require.NoError(t, err)
439+
440+
// Verify job stays available by checking another queue's job completes
441+
_, err = client.Insert(ctx, &noOpArgs{}, nil)
442+
require.NoError(t, err)
443+
event := riversharedtest.WaitOrTimeout(t, subscribeChan)
444+
require.Equal(t, EventKindJobCompleted, event.Kind)
445+
require.NotEqual(t, insertRes.Job.ID, event.Job.ID)
446+
447+
// Job on removed queue should still be available
448+
job, err := client.JobGet(ctx, insertRes.Job.ID)
449+
require.NoError(t, err)
450+
require.Equal(t, rivertype.JobStateAvailable, job.State)
451+
})
452+
453+
t.Run("Queues_Remove_AfterStart", func(t *testing.T) {
454+
t.Parallel()
455+
456+
client, _ := setup(t)
457+
458+
type JobArgs struct {
459+
testutil.JobArgsReflectKind[JobArgs]
460+
}
461+
462+
subscribeChan := subscribe(t, client)
463+
464+
AddWorker(client.config.Workers, WorkFunc(func(ctx context.Context, job *Job[JobArgs]) error {
465+
return nil
466+
}))
467+
468+
queueName := "remove_after_start_queue"
469+
err := client.Queues().Add(queueName, QueueConfig{
470+
MaxWorkers: 2,
471+
})
472+
require.NoError(t, err)
473+
474+
startClient(ctx, t, client)
475+
riversharedtest.WaitOrTimeout(t, client.baseStartStop.Started())
476+
477+
_, err = client.Insert(ctx, &JobArgs{}, &InsertOpts{
478+
Queue: queueName,
479+
})
480+
require.NoError(t, err)
481+
event := riversharedtest.WaitOrTimeout(t, subscribeChan)
482+
require.Equal(t, EventKindJobCompleted, event.Kind)
483+
484+
err = client.Queues().Remove(queueName)
485+
require.NoError(t, err)
486+
487+
insertRes, err := client.Insert(ctx, &JobArgs{}, &InsertOpts{
488+
Queue: queueName,
489+
})
490+
require.NoError(t, err)
491+
492+
// Verify job stays available by checking another queue's job completes
493+
_, err = client.Insert(ctx, &noOpArgs{}, nil)
494+
require.NoError(t, err)
495+
event = riversharedtest.WaitOrTimeout(t, subscribeChan)
496+
require.Equal(t, EventKindJobCompleted, event.Kind)
497+
require.NotEqual(t, insertRes.Job.ID, event.Job.ID)
498+
499+
// Job on removed queue should still be available
500+
job, err := client.JobGet(ctx, insertRes.Job.ID)
501+
require.NoError(t, err)
502+
require.Equal(t, rivertype.JobStateAvailable, job.State)
503+
})
504+
505+
t.Run("Queues_Remove_NonExistentQueue", func(t *testing.T) {
506+
t.Parallel()
507+
508+
client, _ := setup(t)
509+
510+
err := client.Queues().Remove("non_existent_queue")
511+
require.Error(t, err)
512+
var queueNotFoundErr *QueueNotFoundError
513+
require.ErrorAs(t, err, &queueNotFoundErr)
514+
require.Equal(t, "non_existent_queue", queueNotFoundErr.Name)
515+
})
516+
517+
t.Run("Queues_Remove_WhenClientWontExecuteJobs", func(t *testing.T) {
518+
t.Parallel()
519+
520+
config, bundle := setupConfig(t)
521+
config.Queues = nil
522+
config.Workers = nil
523+
client := newTestClient(t, bundle.dbPool, config)
524+
525+
err := client.Queues().Remove("any_queue")
526+
require.Error(t, err)
527+
require.Contains(t, err.Error(), "client is not configured to execute jobs, cannot remove queue")
528+
})
529+
530+
t.Run("Queues_Remove_DefaultQueue", func(t *testing.T) {
531+
t.Parallel()
532+
533+
config, bundle := setupConfig(t)
534+
config.Queues = map[string]QueueConfig{QueueDefault: {MaxWorkers: 2}}
535+
client := newTestClient(t, bundle.dbPool, config)
536+
537+
type JobArgs struct {
538+
testutil.JobArgsReflectKind[JobArgs]
539+
}
540+
541+
subscribeChan := subscribe(t, client)
542+
543+
AddWorker(client.config.Workers, WorkFunc(func(ctx context.Context, job *Job[JobArgs]) error {
544+
return nil
545+
}))
546+
547+
startClient(ctx, t, client)
548+
549+
_, err := client.Insert(ctx, &JobArgs{}, nil)
550+
require.NoError(t, err)
551+
event := riversharedtest.WaitOrTimeout(t, subscribeChan)
552+
require.Equal(t, EventKindJobCompleted, event.Kind)
553+
554+
err = client.Queues().Remove(QueueDefault)
555+
require.NoError(t, err)
556+
557+
insertRes, err := client.Insert(ctx, &JobArgs{}, nil)
558+
require.NoError(t, err)
559+
560+
// Verify no more jobs complete by using a short timeout
561+
select {
562+
case <-subscribeChan:
563+
t.Fatal("expected job to not be worked after default queue removal")
564+
case <-time.After(500 * time.Millisecond):
565+
}
566+
567+
// Job should still be available
568+
job, err := client.JobGet(ctx, insertRes.Job.ID)
569+
require.NoError(t, err)
570+
require.Equal(t, rivertype.JobStateAvailable, job.State)
571+
})
572+
573+
t.Run("Queues_Remove_ThenAddAgain", func(t *testing.T) {
574+
t.Parallel()
575+
576+
client, _ := setup(t)
577+
578+
type JobArgs struct {
579+
testutil.JobArgsReflectKind[JobArgs]
580+
}
581+
582+
subscribeChan := subscribe(t, client)
583+
584+
AddWorker(client.config.Workers, WorkFunc(func(ctx context.Context, job *Job[JobArgs]) error {
585+
return nil
586+
}))
587+
588+
queueName := "remove_then_add_queue"
589+
err := client.Queues().Add(queueName, QueueConfig{
590+
MaxWorkers: 2,
591+
})
592+
require.NoError(t, err)
593+
594+
startClient(ctx, t, client)
595+
riversharedtest.WaitOrTimeout(t, client.baseStartStop.Started())
596+
597+
_, err = client.Insert(ctx, &JobArgs{}, &InsertOpts{
598+
Queue: queueName,
599+
})
600+
require.NoError(t, err)
601+
event := riversharedtest.WaitOrTimeout(t, subscribeChan)
602+
require.Equal(t, EventKindJobCompleted, event.Kind)
603+
604+
err = client.Queues().Remove(queueName)
605+
require.NoError(t, err)
606+
607+
err = client.Queues().Add(queueName, QueueConfig{
608+
MaxWorkers: 2,
609+
})
610+
require.NoError(t, err)
611+
612+
_, err = client.Insert(ctx, &JobArgs{}, &InsertOpts{
613+
Queue: queueName,
614+
})
615+
require.NoError(t, err)
616+
event = riversharedtest.WaitOrTimeout(t, subscribeChan)
617+
require.Equal(t, EventKindJobCompleted, event.Kind)
618+
})
619+
620+
t.Run("Queues_Remove_JobWaitsUntilReAdded", func(t *testing.T) {
621+
t.Parallel()
622+
623+
config, bundle := setupConfig(t)
624+
config.Queues = map[string]QueueConfig{"test_queue": {MaxWorkers: 10}}
625+
client := newTestClient(t, bundle.dbPool, config)
626+
627+
subscribeChan := subscribe(t, client)
628+
startClient(ctx, t, client)
629+
630+
insertRes1, err := client.Insert(ctx, &noOpArgs{}, &InsertOpts{Queue: "test_queue"})
631+
require.NoError(t, err)
632+
633+
event := riversharedtest.WaitOrTimeout(t, subscribeChan)
634+
require.Equal(t, EventKindJobCompleted, event.Kind)
635+
require.Equal(t, insertRes1.Job.ID, event.Job.ID)
636+
637+
require.NoError(t, client.Queues().Remove("test_queue"))
638+
639+
insertRes2, err := client.Insert(ctx, &noOpArgs{}, &InsertOpts{Queue: "test_queue"})
640+
require.NoError(t, err)
641+
642+
select {
643+
case <-subscribeChan:
644+
t.Fatal("expected job 2 to not start on removed queue")
645+
case <-time.After(500 * time.Millisecond):
646+
}
647+
648+
require.NoError(t, client.Queues().Add("test_queue", QueueConfig{MaxWorkers: 10}))
649+
650+
event = riversharedtest.WaitOrTimeout(t, subscribeChan)
651+
require.Equal(t, EventKindJobCompleted, event.Kind)
652+
require.Equal(t, insertRes2.Job.ID, event.Job.ID)
653+
})
654+
409655
t.Run("JobCancelErrorReturned", func(t *testing.T) {
410656
t.Parallel()
411657

error.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,21 @@ func (e *QueueAlreadyAddedError) Is(target error) bool {
5959
return ok
6060
}
6161

62+
// QueueNotFoundError is returned when attempting to remove a queue that does
63+
// not exist on the Client.
64+
type QueueNotFoundError struct {
65+
Name string
66+
}
67+
68+
func (e *QueueNotFoundError) Error() string {
69+
return fmt.Sprintf("queue %q not found", e.Name)
70+
}
71+
72+
func (e *QueueNotFoundError) Is(target error) bool {
73+
_, ok := target.(*QueueNotFoundError)
74+
return ok
75+
}
76+
6277
// UnknownJobKindError is returned when a Client fetches and attempts to
6378
// work a job that has not been registered on the Client's Workers bundle (using AddWorker).
6479
type UnknownJobKindError = rivertype.UnknownJobKindError

0 commit comments

Comments
 (0)