Skip to content

Commit

Permalink
Update add queue feature with a more changes and concurrent safety
Browse files Browse the repository at this point in the history
A feature to make it possible to add a new queue using an invocation
like `client.Queues().Add(name, config)`, similar to the API for adding
a new periodic job. If the client is started already, the new producer
is also started. Fetch and work context are the same ones created for
other producers during `Start`.

For now we totally punt on the problem of removing queues, which is more
complicated because it's a fairly hard problem on how producer stop
context cancellation should work.

A problem I was having while writing a stress test for the feature is
that test signal channels were being filled which was difficult to solve
because test signals were always initialized by `newTestClient` and
because the init also initializes each maintenance service's test
signals in turn, it's not possible to deinit them again. I had to
refactor tests such that test signals are only initialized when they're
used, which is probably better anyway.
  • Loading branch information
brandur committed Jul 4, 2024
1 parent 16fdc3b commit e937cb4
Show file tree
Hide file tree
Showing 6 changed files with 170 additions and 77 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Added

- Queues can be added after a client is initialized using `client.Queues().Add(queueName string, queueConfig QueueConfig)`. [PR #410](https://github.com/riverqueue/river/pull/410).
- `Config.TestOnly` has been added. It disables various features in the River client like staggered maintenance service start that are useful in production, but may be somewhat harmful in tests because they make start/stop slower. [PR #414](https://github.com/riverqueue/river/pull/414).

### Fixed
Expand Down
154 changes: 97 additions & 57 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,27 +318,26 @@ type Client[TTx any] struct {
baseService baseservice.BaseService
baseStartStop startstop.BaseStartStop

completer jobcompleter.JobCompleter
config *Config
driver riverdriver.Driver[TTx]
elector *leadership.Elector
insertNotifyLimiter *notifylimiter.Limiter
monitor *clientMonitor
notifier *notifier.Notifier // may be nil in poll-only mode
periodicJobs *PeriodicJobBundle
producersByQueueName map[string]*producer
producersByQueueNameMu sync.Mutex
queueMaintainer *maintenance.QueueMaintainer
services []startstop.Service
subscriptionManager *subscriptionManager
stopped <-chan struct{}
testSignals clientTestSignals
uniqueInserter *dbunique.UniqueInserter
completer jobcompleter.JobCompleter
config *Config
driver riverdriver.Driver[TTx]
elector *leadership.Elector
insertNotifyLimiter *notifylimiter.Limiter
monitor *clientMonitor
notifier *notifier.Notifier // may be nil in poll-only mode
periodicJobs *PeriodicJobBundle
producersByQueueName map[string]*producer
queueMaintainer *maintenance.QueueMaintainer
queues *queueBundle
services []startstop.Service
subscriptionManager *subscriptionManager
stopped <-chan struct{}
testSignals clientTestSignals
uniqueInserter *dbunique.UniqueInserter

// workCancel cancels the context used for all work goroutines. Normal Stop
// does not cancel that context.
workCancel context.CancelCauseFunc
workContext context.Context
workCancel context.CancelCauseFunc
}

// Test-only signals.
Expand Down Expand Up @@ -492,6 +491,7 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
AdvisoryLockPrefix: config.AdvisoryLockPrefix,
}),
}
client.queues = &queueBundle{addProducer: client.addProducer}

baseservice.Init(archetype, &client.baseService)
client.baseService.Name = "Client" // Have to correct the name because base service isn't embedded like it usually is
Expand Down Expand Up @@ -523,7 +523,7 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
client.services = append(client.services, client.elector)

for queue, queueConfig := range config.Queues {
client.producersByQueueName[queue] = client.addProducer(queue, queueConfig)
client.addProducer(queue, queueConfig)
}

client.services = append(client.services,
Expand Down Expand Up @@ -615,26 +615,6 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
return client, nil
}

func (c *Client[TTx]) AddQueue(ctx context.Context, queueName string, queueConfig QueueConfig) error {
if err := queueConfig.validate(queueName); err != nil {
return err
}

producerInstance := c.addProducer(queueName, queueConfig)
c.producersByQueueNameMu.Lock()
c.producersByQueueName[queueName] = producerInstance
c.producersByQueueNameMu.Unlock()

fetchCtx, started := c.baseStartStop.IsStarted()
if started {
if err := producerInstance.StartWorkContext(fetchCtx, c.workContext); err != nil {
return err
}
}

return nil
}

// Start starts the client's job fetching and working loops. Once this is called,
// the client will run in a background goroutine until stopped. All jobs are
// run with a context inheriting from the provided context, but with a timeout
Expand All @@ -653,6 +633,9 @@ func (c *Client[TTx]) Start(ctx context.Context) error {
return nil
}

c.queues.startStopMu.Lock()
defer c.queues.startStopMu.Unlock()

c.stopped = c.baseStartStop.Stopped()

producersAsServices := func() []startstop.Service {
Expand Down Expand Up @@ -723,7 +706,7 @@ func (c *Client[TTx]) Start(ctx context.Context) error {
// We use separate contexts for fetching and working to allow for a graceful
// stop. Both inherit from the provided context, so if it's cancelled, a
// more aggressive stop will be initiated.
c.workContext, c.workCancel = context.WithCancelCause(withClient[TTx](ctx, c))
workCtx, workCancel := context.WithCancelCause(withClient[TTx](ctx, c))

for _, service := range c.services {
if err := service.Start(fetchCtx); err != nil {
Expand All @@ -735,13 +718,17 @@ func (c *Client[TTx]) Start(ctx context.Context) error {
for _, producer := range c.producersByQueueName {
producer := producer

if err := producer.StartWorkContext(fetchCtx, c.workContext); err != nil {
if err := producer.StartWorkContext(fetchCtx, workCtx); err != nil {
startstop.StopAllParallel(producersAsServices())
stopServicesOnError()
return err
}
}

c.queues.fetchCtx = fetchCtx
c.queues.workCtx = workCtx
c.workCancel = workCancel

return nil
}(); err != nil {
defer stopped()
Expand All @@ -751,6 +738,11 @@ func (c *Client[TTx]) Start(ctx context.Context) error {
return err
}

// Generate producer services while c.queues.startStopMu.Lock() is still
// held. This is used for WaitAllStarted below, but don't use it elsewhere
// because new producers may have been added while the client is running.
producerServices := producersAsServices()

go func() {
// Wait for all subservices to start up before signaling our own start.
// This isn't strictly needed, but gives tests a way to fully confirm
Expand All @@ -761,7 +753,7 @@ func (c *Client[TTx]) Start(ctx context.Context) error {
// briefly start, but then immediately stop again.
startstop.WaitAllStarted(append(
c.services,
producersAsServices()...,
producerServices..., // see comment on this variable
)...)

started()
Expand All @@ -773,6 +765,9 @@ func (c *Client[TTx]) Start(ctx context.Context) error {
// The call to Stop cancels this context. Block here until shutdown.
<-fetchCtx.Done()

c.queues.startStopMu.Lock()
defer c.queues.startStopMu.Unlock()

// On stop, have the producers stop fetching first of all.
c.baseService.Logger.DebugContext(ctx, c.baseService.Name+": Stopping producers")
startstop.StopAllParallel(producersAsServices())
Expand Down Expand Up @@ -1536,23 +1531,25 @@ func (c *Client[TTx]) validateJobArgs(args JobArgs) error {
}

func (c *Client[TTx]) addProducer(queueName string, queueConfig QueueConfig) *producer {
producerInstance := newProducer(&c.baseService.Archetype, c.driver.GetExecutor(), &producerConfig{
ClientID: c.config.ID,
Completer: c.completer,
ErrorHandler: c.config.ErrorHandler,
FetchCooldown: c.config.FetchCooldown,
FetchPollInterval: c.config.FetchPollInterval,
JobTimeout: c.config.JobTimeout,
MaxWorkers: queueConfig.MaxWorkers,
Notifier: c.notifier,
Queue: queueName,
RetryPolicy: c.config.RetryPolicy,
SchedulerInterval: c.config.schedulerInterval,
StatusFunc: c.monitor.SetProducerStatus,
Workers: c.config.Workers,
producer := newProducer(&c.baseService.Archetype, c.driver.GetExecutor(), &producerConfig{
ClientID: c.config.ID,
Completer: c.completer,
ErrorHandler: c.config.ErrorHandler,
FetchCooldown: c.config.FetchCooldown,
FetchPollInterval: c.config.FetchPollInterval,
JobTimeout: c.config.JobTimeout,
MaxWorkers: queueConfig.MaxWorkers,
Notifier: c.notifier,
Queue: queueName,
QueueEventCallback: c.subscriptionManager.distributeQueueEvent,
RetryPolicy: c.config.RetryPolicy,
SchedulerInterval: c.config.schedulerInterval,
StatusFunc: c.monitor.SetProducerStatus,
Workers: c.config.Workers,
})
c.monitor.InitializeProducerStatus(queueName)
return producerInstance
c.producersByQueueName[queueName] = producer
return producer
}

var nameRegex = regexp.MustCompile(`^(?:[a-z0-9])+(?:[_|\-]?[a-z0-9]+)*$`)
Expand Down Expand Up @@ -1647,6 +1644,10 @@ func (c *Client[TTx]) JobListTx(ctx context.Context, tx TTx, params *JobListPara
// client, and can be used to add new ones or remove existing ones.
func (c *Client[TTx]) PeriodicJobs() *PeriodicJobBundle { return c.periodicJobs }

// Queues returns the currently configured set of queues for the client, and can
// be used to add new ones.
func (c *Client[TTx]) Queues() *queueBundle { return c.queues }

// QueueGet returns the queue with the given name. If the queue has not recently
// been active or does not exist, returns ErrNotFound.
//
Expand Down Expand Up @@ -1830,6 +1831,45 @@ func (c *Client[TTx]) QueueResumeTx(ctx context.Context, tx TTx, name string, op
return nil
}

// queueBundle is a bundle for adding additional queues. It's made accessible
// through Client.Queues.
type queueBundle struct {
// Function that adds a producer to the associated client.
addProducer func(queueName string, queueConfig QueueConfig) *producer

fetchCtx context.Context //nolint:containedctx

// Mutex that's acquired when client is starting and stopping and when a
// queue is being added so that we can be sure that a client is fully
// stopped or fully started when adding a new queue.
startStopMu sync.Mutex

workCtx context.Context //nolint:containedctx
}

// Add adds a new queue to the client. If the client is already started, a
// producer for the queue is started. Context is inherited from the one given to
// Client.Start.
func (b *queueBundle) Add(queueName string, queueConfig QueueConfig) error {
if err := queueConfig.validate(queueName); err != nil {
return err
}

b.startStopMu.Lock()
defer b.startStopMu.Unlock()

producer := b.addProducer(queueName, queueConfig)

// Start the queue if the client is already started.
if b.fetchCtx != nil && b.fetchCtx.Err() == nil {
if err := producer.StartWorkContext(b.fetchCtx, b.workCtx); err != nil {
return err
}
}

return nil
}

// Generates a default client ID using the current hostname and time.
func defaultClientID(startedAt time.Time) string {
host, _ := os.Hostname()
Expand Down
2 changes: 2 additions & 0 deletions client_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ func (m *clientMonitor) Start(ctx context.Context) error {
// uninitialized. Unlike SetProducerStatus, it does not broadcast the change
// and is only meant to be used during initial client startup.
func (m *clientMonitor) InitializeProducerStatus(queueName string) {
m.statusSnapshotMu.Lock()
defer m.statusSnapshotMu.Unlock()
m.currentSnapshot.Producers[queueName] = componentstatus.Uninitialized
}

Expand Down
Loading

0 comments on commit e937cb4

Please sign in to comment.