add configurable polling interval for workflow runs#3397
add configurable polling interval for workflow runs#3397abelanger5 wants to merge 4 commits intomainfrom
Conversation
|
The latest updates on your projects. Learn more about Vercel for GitHub.
|
Benchmark resultsCompared against |
|
📝 Documentation updates detected! New suggestion: Document workflow run polling interval configuration options Tip: See how your feedback shapes Promptless in Agent Knowledge Base 🧠 |
There was a problem hiding this comment.
Pull request overview
Adds configurability for the dispatcher’s workflow-run subscription polling cadence by introducing min/max polling interval settings that can be set via environment variables and passed into the dispatcher.
Changes:
- Add
WorkflowRunPollingMinInterval/WorkflowRunPollingMaxIntervalto server runtime config and bind them to new env vars. - Thread the configured intervals through engine startup into the dispatcher via new
WithMin/WithMaxWorkflowRunPollingIntervaloptions. - Switch workflow-run subscription polling from a fixed 1s ticker to a randomized ticker between the configured min/max.
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 2 comments.
| File | Description |
|---|---|
| pkg/config/server/server.go | Adds runtime config fields + env bindings for workflow-run polling intervals. |
| internal/services/dispatcher/server_v1.go | Uses a randomized ticker for workflow-run polling in subscription handling. |
| internal/services/dispatcher/dispatcher.go | Adds dispatcher options/fields/defaults for min/max polling intervals. |
| cmd/hatchet-engine/engine/run.go | Wires configured polling intervals into dispatcher initialization. |
Comments suppressed due to low confidence (1)
internal/services/dispatcher/server_v1.go:490
- The random ticker started here is never stopped when the subscription context is canceled.
randomticker.NewRandomTickerruns its own goroutine untilStop()is called, so returning onctx.Done()will leak a goroutine/timer per subscription. Callticker.Stop()on exit (e.g.,defer ticker.Stop()right after creation, or explicitly in thectx.Done()case).
go func() {
ticker := randomticker.NewRandomTicker(s.minWorkflowRunPollingInterval, s.maxWorkflowRunPollingInterval)
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
workflowRunIds := acks.getNonAckdWorkflowRuns()
| @@ -479,7 +480,7 @@ func (s *DispatcherImpl) subscribeToWorkflowRunsV1(server contracts.Dispatcher_S | |||
|
|
|||
| // new goroutine to poll every second for finished workflow runs which are not ackd | |||
There was a problem hiding this comment.
The comment says this goroutine "poll[s] every second", but the implementation now uses a randomized interval between minWorkflowRunPollingInterval and maxWorkflowRunPollingInterval (and it’s configurable). Update the comment to match the new behavior so future readers don’t assume a fixed 1s poll cadence.
| // new goroutine to poll every second for finished workflow runs which are not ackd | |
| // new goroutine to poll at randomized intervals between minWorkflowRunPollingInterval and | |
| // maxWorkflowRunPollingInterval for finished workflow runs which are not ackd |
| func WithMinWorkflowRunPollingInterval(interval time.Duration) DispatcherOpt { | ||
| return func(opts *DispatcherOpts) { | ||
| opts.minWorkflowRunPollingInterval = interval | ||
| } | ||
| } | ||
|
|
||
| func WithMaxWorkflowRunPollingInterval(interval time.Duration) DispatcherOpt { | ||
| return func(opts *DispatcherOpts) { | ||
| opts.maxWorkflowRunPollingInterval = interval | ||
| } | ||
| } |
There was a problem hiding this comment.
These intervals are user-configurable (via config/env), but randomticker.NewRandomTicker will panic if max <= min (it calls rand.Int63n(max-min)). Consider validating/clamping in New (or in these option setters): require min > 0, max > 0, and max > min (or swap/clamp and log) to prevent a misconfiguration from crashing the dispatcher at runtime.
Description
Adds two environment variables to make polling intervals configurable for workflow run subscriptions.
Type of change