Description
If a job is enqueued while pgflow.poll_for_tasks()
is inside the blocking
pgflow.read_with_poll()
loop, the inner function locks the message
successfully, yet no task is returned to the worker until the next call.
This adds an artificial delay equal to max_poll_seconds
.
Steps to Reproduce
- Start a worker that calls
poll_for_tasks(queue, batch=10, max_poll_seconds=2, poll_interval_ms=100)
. - After the SQL starts (sleep 200 ms), enqueue one task for that queue.
- Observe the worker logs / timing.
✅ Expected Behaviour
The task should be delivered within ≈ 300 ms (next 100 ms poll inside the same
database call).
❌ Actual Behaviour
poll_for_tasks
returns an empty set after the full 2 s timeout.
The task is only returned by the next invocation, so worst-case latency =
max_poll_seconds
.
Why It Happens
poll_for_tasks
is implemented as one SQL statement with CTEs:
WITH read_messages AS (SELECT * FROM read_with_poll(...)),
tasks AS (SELECT ... FROM step_tasks JOIN read_messages ...)
SELECT ...
• The outer statement’s snapshot is taken at start (t₀).
• read_with_poll
executes dynamic SQL → fresh snapshots → sees the new
queue row and returns msg_id = 42
.
• But the tasks
CTE joins with pgflow.step_tasks
, which was populated
after t₀, so the join finds no rows in the original snapshot.
→ Empty result set.
Proposed Solution
Split the logic into two separate statements / transactions:
- First call
pgflow.read_with_poll
(orpgmq.read_with_poll
) to obtain
msg_id
s and set theirvt
. - In a second statement use those
msg_id
s to join
pgflow.step_tasks
, bumpattempts_count
, compute payload, etc.
Each statement gets its own up-to-date snapshot, so tasks committed during the
polling window become visible immediately.