Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update add queue feature with a more changes and concurrent safety #410

Merged
merged 4 commits into from
Jul 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added

- Fully functional driver for `database/sql` for use with packages like Bun and GORM. [PR #351](https://github.com/riverqueue/river/pull/351).
- Queues can be added after a client is initialized using `client.Queues().Add(queueName string, queueConfig QueueConfig)`. [PR #410](https://github.com/riverqueue/river/pull/410).

### Changed

Expand Down
123 changes: 98 additions & 25 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"os"
"regexp"
"strings"
"sync"
"time"

"github.com/riverqueue/river/internal/baseservice"
Expand Down Expand Up @@ -263,10 +264,7 @@ func (c *Config) validate() error {
}

for queue, queueConfig := range c.Queues {
if queueConfig.MaxWorkers < 1 || queueConfig.MaxWorkers > QueueNumWorkersMax {
return fmt.Errorf("invalid number of workers for queue %q: %d", queue, queueConfig.MaxWorkers)
}
if err := validateQueueName(queue); err != nil {
if err := queueConfig.validate(queue); err != nil {
return err
}
}
Expand Down Expand Up @@ -300,6 +298,17 @@ type QueueConfig struct {
MaxWorkers int
}

func (c QueueConfig) validate(queueName string) error {
if c.MaxWorkers < 1 || c.MaxWorkers > QueueNumWorkersMax {
return fmt.Errorf("invalid number of workers for queue %q: %d", queueName, c.MaxWorkers)
}
if err := validateQueueName(queueName); err != nil {
return err
}

return nil
}

// Client is a single isolated instance of River. Your application may use
// multiple instances operating on different databases or Postgres schemas
// within a single database.
Expand All @@ -319,6 +328,7 @@ type Client[TTx any] struct {
periodicJobs *PeriodicJobBundle
producersByQueueName map[string]*producer
queueMaintainer *maintenance.QueueMaintainer
queues *QueueBundle
services []startstop.Service
subscriptionManager *subscriptionManager
stopped <-chan struct{}
Expand Down Expand Up @@ -481,6 +491,7 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
AdvisoryLockPrefix: config.AdvisoryLockPrefix,
}),
}
client.queues = &QueueBundle{addProducer: client.addProducer}

baseservice.Init(archetype, &client.baseService)
client.baseService.Name = "Client" // Have to correct the name because base service isn't embedded like it usually is
Expand Down Expand Up @@ -516,23 +527,7 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
client.services = append(client.services, client.elector)

for queue, queueConfig := range config.Queues {
client.producersByQueueName[queue] = newProducer(archetype, driver.GetExecutor(), &producerConfig{
ClientID: config.ID,
Completer: client.completer,
ErrorHandler: config.ErrorHandler,
FetchCooldown: config.FetchCooldown,
FetchPollInterval: config.FetchPollInterval,
JobTimeout: config.JobTimeout,
MaxWorkers: queueConfig.MaxWorkers,
Notifier: client.notifier,
Queue: queue,
QueueEventCallback: client.subscriptionManager.distributeQueueEvent,
RetryPolicy: config.RetryPolicy,
SchedulerInterval: config.schedulerInterval,
StatusFunc: client.monitor.SetProducerStatus,
Workers: config.Workers,
})
client.monitor.InitializeProducerStatus(queue)
client.addProducer(queue, queueConfig)
}

client.services = append(client.services,
Expand Down Expand Up @@ -642,6 +637,9 @@ func (c *Client[TTx]) Start(ctx context.Context) error {
return nil
}

c.queues.startStopMu.Lock()
defer c.queues.startStopMu.Unlock()

c.stopped = c.baseStartStop.Stopped()

producersAsServices := func() []startstop.Service {
Expand All @@ -651,8 +649,6 @@ func (c *Client[TTx]) Start(ctx context.Context) error {
)
}

var workCtx context.Context

// Startup code. Wrapped in a closure so it doesn't have to remember to
// close the stopped channel if returning with an error.
if err := func() error {
Expand Down Expand Up @@ -714,7 +710,7 @@ func (c *Client[TTx]) Start(ctx context.Context) error {
// We use separate contexts for fetching and working to allow for a graceful
// stop. Both inherit from the provided context, so if it's cancelled, a
// more aggressive stop will be initiated.
workCtx, c.workCancel = context.WithCancelCause(withClient[TTx](ctx, c))
workCtx, workCancel := context.WithCancelCause(withClient[TTx](ctx, c))

for _, service := range c.services {
if err := service.Start(fetchCtx); err != nil {
Expand All @@ -733,6 +729,10 @@ func (c *Client[TTx]) Start(ctx context.Context) error {
}
}

c.queues.fetchCtx = fetchCtx
c.queues.workCtx = workCtx
c.workCancel = workCancel

return nil
}(); err != nil {
defer stopped()
Expand All @@ -742,6 +742,11 @@ func (c *Client[TTx]) Start(ctx context.Context) error {
return err
}

// Generate producer services while c.queues.startStopMu.Lock() is still
// held. This is used for WaitAllStarted below, but don't use it elsewhere
// because new producers may have been added while the client is running.
producerServices := producersAsServices()

go func() {
// Wait for all subservices to start up before signaling our own start.
// This isn't strictly needed, but gives tests a way to fully confirm
Expand All @@ -752,7 +757,7 @@ func (c *Client[TTx]) Start(ctx context.Context) error {
// briefly start, but then immediately stop again.
startstop.WaitAllStarted(append(
c.services,
producersAsServices()...,
producerServices..., // see comment on this variable
)...)

started()
Expand All @@ -764,6 +769,9 @@ func (c *Client[TTx]) Start(ctx context.Context) error {
// The call to Stop cancels this context. Block here until shutdown.
<-fetchCtx.Done()

c.queues.startStopMu.Lock()
defer c.queues.startStopMu.Unlock()

// On stop, have the producers stop fetching first of all.
c.baseService.Logger.DebugContext(ctx, c.baseService.Name+": Stopping producers")
startstop.StopAllParallel(producersAsServices())
Expand Down Expand Up @@ -1535,6 +1543,28 @@ func (c *Client[TTx]) validateJobArgs(args JobArgs) error {
return nil
}

func (c *Client[TTx]) addProducer(queueName string, queueConfig QueueConfig) *producer {
producer := newProducer(&c.baseService.Archetype, c.driver.GetExecutor(), &producerConfig{
ClientID: c.config.ID,
Completer: c.completer,
ErrorHandler: c.config.ErrorHandler,
FetchCooldown: c.config.FetchCooldown,
FetchPollInterval: c.config.FetchPollInterval,
JobTimeout: c.config.JobTimeout,
MaxWorkers: queueConfig.MaxWorkers,
Notifier: c.notifier,
Queue: queueName,
QueueEventCallback: c.subscriptionManager.distributeQueueEvent,
RetryPolicy: c.config.RetryPolicy,
SchedulerInterval: c.config.schedulerInterval,
StatusFunc: c.monitor.SetProducerStatus,
Workers: c.config.Workers,
})
c.monitor.InitializeProducerStatus(queueName)
c.producersByQueueName[queueName] = producer
return producer
}

var nameRegex = regexp.MustCompile(`^(?:[a-z0-9])+(?:[_|\-]?[a-z0-9]+)*$`)

func validateQueueName(queueName string) error {
Expand Down Expand Up @@ -1627,6 +1657,10 @@ func (c *Client[TTx]) JobListTx(ctx context.Context, tx TTx, params *JobListPara
// client, and can be used to add new ones or remove existing ones.
func (c *Client[TTx]) PeriodicJobs() *PeriodicJobBundle { return c.periodicJobs }

// Queues returns the currently configured set of queues for the client, and can
// be used to add new ones.
func (c *Client[TTx]) Queues() *QueueBundle { return c.queues }

// QueueGet returns the queue with the given name. If the queue has not recently
// been active or does not exist, returns ErrNotFound.
//
Expand Down Expand Up @@ -1810,6 +1844,45 @@ func (c *Client[TTx]) QueueResumeTx(ctx context.Context, tx TTx, name string, op
return nil
}

// QueueBundle is a bundle for adding additional queues. It's made accessible
// through Client.Queues.
type QueueBundle struct {
// Function that adds a producer to the associated client.
addProducer func(queueName string, queueConfig QueueConfig) *producer
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Taking a function pointer here might seem a little weird, but it lets us not internalize a client instance, which lets us avoid putting a TTx generic parameter on queueBundle.


fetchCtx context.Context //nolint:containedctx

// Mutex that's acquired when client is starting and stopping and when a
// queue is being added so that we can be sure that a client is fully
// stopped or fully started when adding a new queue.
startStopMu sync.Mutex

workCtx context.Context //nolint:containedctx
}

// Add adds a new queue to the client. If the client is already started, a
// producer for the queue is started. Context is inherited from the one given to
// Client.Start.
func (b *QueueBundle) Add(queueName string, queueConfig QueueConfig) error {
if err := queueConfig.validate(queueName); err != nil {
return err
}

b.startStopMu.Lock()
defer b.startStopMu.Unlock()

producer := b.addProducer(queueName, queueConfig)

// Start the queue if the client is already started.
if b.fetchCtx != nil && b.fetchCtx.Err() == nil {
if err := producer.StartWorkContext(b.fetchCtx, b.workCtx); err != nil {
return err
}
}

return nil
}

// Generates a default client ID using the current hostname and time.
func defaultClientID(startedAt time.Time) string {
host, _ := os.Hostname()
Expand Down
2 changes: 2 additions & 0 deletions client_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ func (m *clientMonitor) Start(ctx context.Context) error {
// uninitialized. Unlike SetProducerStatus, it does not broadcast the change
// and is only meant to be used during initial client startup.
func (m *clientMonitor) InitializeProducerStatus(queueName string) {
m.statusSnapshotMu.Lock()
defer m.statusSnapshotMu.Unlock()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Found I needed to add this mutex when trying to run my new stress test. Most other client monitor functions already lock it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. As this wasn't dynamic before it wasn't really needed.

m.currentSnapshot.Producers[queueName] = componentstatus.Uninitialized
}

Expand Down
Loading
Loading