Skip to content

poll_for_tasks skips jobs committed during read_with_poll because it runs as a single-statement CTE (stale snapshot) #129

Closed
@jumski

Description

@jumski

If a job is enqueued while pgflow.poll_for_tasks() is inside the blocking
pgflow.read_with_poll() loop, the inner function locks the message
successfully, yet no task is returned to the worker until the next call.
This adds an artificial delay equal to max_poll_seconds.


Steps to Reproduce

  1. Start a worker that calls
    poll_for_tasks(queue, batch=10, max_poll_seconds=2, poll_interval_ms=100).
  2. After the SQL starts (sleep 200 ms), enqueue one task for that queue.
  3. Observe the worker logs / timing.

✅ Expected Behaviour

The task should be delivered within ≈ 300 ms (next 100 ms poll inside the same
database call).


❌ Actual Behaviour

poll_for_tasks returns an empty set after the full 2 s timeout.
The task is only returned by the next invocation, so worst-case latency =
max_poll_seconds.


Why It Happens

poll_for_tasks is implemented as one SQL statement with CTEs:

WITH read_messages AS (SELECT * FROM read_with_poll(...)),
     tasks AS (SELECT ... FROM step_tasks JOIN read_messages ...)
SELECT ...

• The outer statement’s snapshot is taken at start (t₀).
read_with_poll executes dynamic SQL → fresh snapshots → sees the new
queue row and returns msg_id = 42.
• But the tasks CTE joins with pgflow.step_tasks, which was populated
after t₀, so the join finds no rows in the original snapshot.
→ Empty result set.


Proposed Solution

Split the logic into two separate statements / transactions:

  1. First call pgflow.read_with_poll (or pgmq.read_with_poll) to obtain
    msg_ids and set their vt.
  2. In a second statement use those msg_ids to join
    pgflow.step_tasks, bump attempts_count, compute payload, etc.

Each statement gets its own up-to-date snapshot, so tasks committed during the
polling window become visible immediately.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions