Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update add queue feature with a more changes and concurrent safety #410

Merged
merged 4 commits into from
Jul 6, 2024

Conversation

brandur
Copy link
Contributor

@brandur brandur commented Jun 30, 2024

A feature to make it possible to add a new queue using an invocation like
client.Queues().Add(name, config), similar to the API for adding a new
periodic job. If the client is started already, the new producer is also
started. Fetch and work context are the same ones created for other
producers during Start.

For now we totally punt on the problem of removing queues, which is more
complicated because it's a fairly hard problem on how producer stop context
cancellation should work.

A problem I was having while writing a stress test for the feature is that
test signal channels were being filled which was difficult to solve because
test signals were always initialized by newTestClient and because the
init also initializes each maintenance service's test signals in turn, it's
not possible to deinit them again. I had to refactor tests such that test
signals are only initialized when they're used, which is probably better
anyway.

@brandur brandur force-pushed the brandur-add-queue branch 3 times, most recently from 1fdf021 to d45f42f Compare June 30, 2024 00:30
// through Client.Queues.
type queueBundle struct {
// Function that adds a producer to the associated client.
addProducer func(queueName string, queueConfig QueueConfig) *producer
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Taking a function pointer here might seem a little weird, but it lets us not internalize a client instance, which lets us avoid putting a TTx generic parameter on queueBundle.

@@ -155,8 +155,6 @@ func newTestClient(t *testing.T, dbPool *pgxpool.Pool, config *Config) *Client[p
client, err := NewClient(riverpgxv5.New(dbPool), config)
require.NoError(t, err)

client.testSignals.Init()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was having trouble with my stress test where the test signal channels were filling up, and unfortunately because init'ing test signals also inits test signals for all maintenance services, it means there's no way to deinit them again. The only decent answer is to not init them on every single client initialization, and move inits into test cases that need them.

@@ -57,6 +57,8 @@ func (m *clientMonitor) Start(ctx context.Context) error {
// uninitialized. Unlike SetProducerStatus, it does not broadcast the change
// and is only meant to be used during initial client startup.
func (m *clientMonitor) InitializeProducerStatus(queueName string) {
m.statusSnapshotMu.Lock()
defer m.statusSnapshotMu.Unlock()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Found I needed to add this mutex when trying to run my new stress test. Most other client monitor functions already lock it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. As this wasn't dynamic before it wasn't really needed.

@brandur brandur requested a review from bgentry June 30, 2024 00:34
@brandur
Copy link
Contributor Author

brandur commented Jun 30, 2024

@bgentry Mind taking a look at this and letting me know what you think? I definitely found it a little on the tricky side to guarantee we make the right decision to start or not start the new producer based on whether the client is started because it's hard to know for sure whether a client is started or stopped because it may be in the process of stopping or stopping right at this instant (therefore the addition of the new mutex).

I almost think a better design would be to never start a newly added producer/queue, and then require that the caller stop and then start the client again to have it start. However, that would cause some user confusion/surprise I'm sure.

@brandur
Copy link
Contributor Author

brandur commented Jun 30, 2024

@PumpkinSeed BTW, I changed a fair bit of code, but I included all your original commits in this variant.

We're trying a new CLA process (similar to the CLA Assistant you may have seen on any projects) just to make sure we don't have any copyright issues on any of any contributed code. Would you mind helping us test it out by going to https://github.com/riverqueue/cla and following the steps there? It basically involves opening a PR through GitHub's UI and adding your name to a CSV file. Thanks!

@brandur
Copy link
Contributor Author

brandur commented Jul 4, 2024

@bgentry Just since I'm sending 0.9.0 out for a bug fix, I thought I'd ping this one once more. If you have concerns, no worries, but if it's just that you haven't reviewed yet, may be worth including it in the release.

Copy link
Contributor

@bgentry bgentry left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM aside from these comments, particularly the Client.Queues() return type which I think is a blocker.

client.go Outdated
@@ -1647,6 +1644,10 @@ func (c *Client[TTx]) JobListTx(ctx context.Context, tx TTx, params *JobListPara
// client, and can be used to add new ones or remove existing ones.
func (c *Client[TTx]) PeriodicJobs() *PeriodicJobBundle { return c.periodicJobs }

// Queues returns the currently configured set of queues for the client, and can
// be used to add new ones.
func (c *Client[TTx]) Queues() *queueBundle { return c.queues }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we want to have an exported method that returns an unexported type like this. Whether or not it works, it looks bad/undiscoverable in docs.

I think the type either needs to be exported with minimal methods exposed on it, or else we need to return an exported interface type here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yeah, makes sense. I'd make it unexported initially because I had to add a TTx generic parameter so it could internalize a client, and it was just a really ugly type. Now that that's gone, totally okay to export this.

@@ -57,6 +57,8 @@ func (m *clientMonitor) Start(ctx context.Context) error {
// uninitialized. Unlike SetProducerStatus, it does not broadcast the change
// and is only meant to be used during initial client startup.
func (m *clientMonitor) InitializeProducerStatus(queueName string) {
m.statusSnapshotMu.Lock()
defer m.statusSnapshotMu.Unlock()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. As this wasn't dynamic before it wasn't really needed.

PumpkinSeed and others added 4 commits July 6, 2024 12:15
A feature to make it possible to add a new queue using an invocation
like `client.Queues().Add(name, config)`, similar to the API for adding
a new periodic job. If the client is started already, the new producer
is also started. Fetch and work context are the same ones created for
other producers during `Start`.

For now we totally punt on the problem of removing queues, which is more
complicated because it's a fairly hard problem on how producer stop
context cancellation should work.

A problem I was having while writing a stress test for the feature is
that test signal channels were being filled which was difficult to solve
because test signals were always initialized by `newTestClient` and
because the init also initializes each maintenance service's test
signals in turn, it's not possible to deinit them again. I had to
refactor tests such that test signals are only initialized when they're
used, which is probably better anyway.
@brandur
Copy link
Contributor Author

brandur commented Jul 6, 2024

@bgentry Updated. Mind taking another look?

@brandur
Copy link
Contributor Author

brandur commented Jul 6, 2024

thx.

@brandur brandur merged commit d4ffd83 into master Jul 6, 2024
10 checks passed
@brandur brandur deleted the brandur-add-queue branch July 6, 2024 20:27
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants