fix: resolve race condition in compound trigger evaluation#20
Conversation
Fixes two race conditions in compound trigger evaluation: 1. **Never-firing race** (transactional): When two child triggers fire concurrently in separate transactions, each only sees its own uncommitted insert due to READ COMMITTED isolation. Neither sees enough firings to trigger the parent. Fix: Use PostgreSQL advisory locks to serialize concurrent evaluations for the same compound trigger. 2. **Double-firing race** (autocommit): When both transactions see all firings, both delete and both fire the parent. Fix: Use DELETE ... RETURNING to make clearing a claim operation. Only the worker that successfully deletes the expected firings proceeds; others bail out. Based on the fix in PrefectHQ/nebula#10716. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
WalkthroughThe changes introduce distributed lock acquisition and race condition detection for composite triggers. A new advisory lock function is added to serialize concurrent trigger evaluations, and the child firing clearing logic now returns deleted IDs to detect if other workers claimed firings first. Corresponding tests validate concurrent firing behavior. Changes
Sequence Diagram(s)sequenceDiagram
participant Worker1 as Worker 1
participant Worker2 as Worker 2
participant Session as Database Session
participant Trigger as Composite Trigger
par Concurrent Execution
Worker1->>Session: acquire_composite_trigger_lock(trigger)
Worker2->>Session: acquire_composite_trigger_lock(trigger)
end
Worker1->>Session: clear_child_firings([id1, id2])
Session->>Session: DELETE...RETURNING firing_ids
Session-->>Worker1: {id1, id2}
Worker2->>Session: clear_child_firings([id1, id2])
Session->>Session: DELETE...RETURNING firing_ids (already deleted)
Session-->>Worker2: {}
Worker1->>Trigger: len(deleted_ids) == len(requested_ids)?
Trigger-->>Worker1: TRUE - proceed with firing
Worker1->>Trigger: fire()
Worker2->>Trigger: len(deleted_ids) == len(requested_ids)?
Trigger-->>Worker2: FALSE - skip firing (race lost)
Worker2->>Worker2: log & return early
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Poem
🚥 Pre-merge checks | ✅ 1 | ❌ 2❌ Failed checks (2 warnings)
✅ Passed checks (1 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
Comment |
There was a problem hiding this comment.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
src/prefect/server/events/models/composite_trigger_child_firing.py (1)
133-155: Returnchild_firing_idinstead ofchild_trigger_id.The function filters on
child_firing_id.in_(firing_ids)(line 152) and its docstring promises to return deleted child firing IDs. However, it returnschild_trigger_idinstead. This breaks the contract and the caller's race-detection logic insrc/prefect/server/events/triggers.py, which compares returned IDs against the inputfiring_idsto verify deletions.🔧 Proposed fix
- .returning(db.CompositeTriggerChildFiring.child_trigger_id) + .returning(db.CompositeTriggerChildFiring.child_firing_id)
🤖 Fix all issues with AI agents
In `@src/prefect/server/events/models/composite_trigger_child_firing.py`:
- Around line 17-45: The advisory lock key is currently computed with Python's
randomized hash(str(trigger.id)) so different processes get different keys;
update acquire_composite_trigger_lock to derive a deterministic 64-bit integer
from the trigger UUID by using trigger.id.int truncated/masked to fit into a
signed bigint (e.g., mod 2**63 or bitmask) and assign that to lock_key, then
pass that deterministic lock_key to the session.execute call for
pg_advisory_xact_lock; keep the rest of the logic (dialect check,
session.execute) unchanged.
In `@tests/events/server/triggers/test_composite_triggers.py`:
- Around line 1634-1639: The fixture function compound_automation_concurrent has
parameters used only for side effects (cleared_buckets, cleared_automations);
rename those parameters to underscore-prefixed names (e.g., _cleared_buckets,
_cleared_automations) or convert them to use `@pytest.mark.usefixtures` to silence
Ruff ARG002; update the async def signature for compound_automation_concurrent
accordingly and ensure any calls or references match the new names.
📜 Review details
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
src/prefect/server/events/models/composite_trigger_child_firing.pysrc/prefect/server/events/triggers.pytests/events/server/triggers/test_composite_triggers.py
🧰 Additional context used
🧬 Code graph analysis (2)
src/prefect/server/events/models/composite_trigger_child_firing.py (2)
src/prefect/server/utilities/database.py (1)
get_dialect(741-769)src/prefect/server/database/interface.py (1)
dialect(141-142)
src/prefect/server/events/triggers.py (1)
src/prefect/server/events/models/composite_trigger_child_firing.py (2)
acquire_composite_trigger_lock(17-47)clear_child_firings(134-157)
🪛 Ruff (0.14.13)
tests/events/server/triggers/test_composite_triggers.py
1637-1637: Unused method argument: cleared_buckets
(ARG002)
1638-1638: Unused method argument: cleared_automations
(ARG002)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (38)
- GitHub Check: Server Tests - python:3.12, postgres:14
- GitHub Check: Database and Orchestration Tests - python:3.10, sqlite
- GitHub Check: Database and Orchestration Tests - python:3.14, sqlite
- GitHub Check: Runner, Worker, Settings, Input, and CLI Tests - python:3.14, postgres:14
- GitHub Check: Database and Orchestration Tests - python:3.13, postgres:14
- GitHub Check: Database and Orchestration Tests - python:3.13, sqlite
- GitHub Check: Server Tests - python:3.10, sqlite
- GitHub Check: Client Tests - python:3.12, postgres:14
- GitHub Check: Client Tests - python:3.11, postgres:14
- GitHub Check: Runner, Worker, Settings, Input, and CLI Tests - python:3.11, postgres:14
- GitHub Check: Database and Orchestration Tests - python:3.11, postgres:14
- GitHub Check: Client Tests - python:3.13, postgres:14
- GitHub Check: Database and Orchestration Tests - python:3.12, sqlite
- GitHub Check: Client Tests - python:3.10, postgres:14
- GitHub Check: Server Tests - python:3.13, sqlite
- GitHub Check: Server Tests - python:3.10, postgres:14
- GitHub Check: Database and Orchestration Tests - python:3.12, postgres:14
- GitHub Check: Database and Orchestration Tests - python:3.10, postgres:14
- GitHub Check: Server Tests - python:3.14, postgres:14
- GitHub Check: Server Tests - python:3.11, sqlite
- GitHub Check: Runner, Worker, Settings, Input, and CLI Tests - python:3.13, postgres:14
- GitHub Check: Database and Orchestration Tests - python:3.11, sqlite
- GitHub Check: Runner, Worker, Settings, Input, and CLI Tests - python:3.10, postgres:14
- GitHub Check: Server Tests - python:3.14, sqlite
- GitHub Check: Server Tests - python:3.11, postgres:14
- GitHub Check: Client Tests - python:3.14, postgres:14
- GitHub Check: Server Tests - python:3.12, sqlite
- GitHub Check: Runner, Worker, Settings, Input, and CLI Tests - python:3.12, postgres:14
- GitHub Check: Server Tests - python:3.13, postgres:14
- GitHub Check: Database and Orchestration Tests - python:3.14, postgres:14
- GitHub Check: Benchmark
- GitHub Check: Pre-commit checks
- GitHub Check: Test SQLite 3.31.1 Compatibility
- GitHub Check: typesafety
- GitHub Check: Test SQLite 3.24.0 Compatibility
- GitHub Check: Integration tests
@main - GitHub Check: Prefect Client Compatibility Tests
- GitHub Check: Benchmark
🔇 Additional comments (4)
src/prefect/server/events/triggers.py (3)
37-43: Good: composite evaluations are now serialized via advisory lock.This is the right spot to acquire the lock before reading/clearing child firings.
Also applies to: 349-352
71-73: No review needed.
390-410: Nice claim-style clear to prevent double-firing.The early return on partial delete is a solid race guard.
tests/events/server/triggers/test_composite_triggers.py (1)
1-1: Good concurrent-firing regression coverage.The gather-based tests exercise the race paths well.
Also applies to: 1674-1747
✏️ Tip: You can disable this entire section by setting review_details to false in your review settings.
| async def acquire_composite_trigger_lock( | ||
| session: AsyncSession, | ||
| trigger: CompositeTrigger, | ||
| ) -> None: | ||
| """ | ||
| Acquire a transaction-scoped advisory lock for the given composite trigger. | ||
|
|
||
| This serializes concurrent child trigger evaluations for the same compound | ||
| trigger, preventing a race condition where multiple transactions each see | ||
| only their own child firing and neither fires the parent. | ||
|
|
||
| The lock is automatically released when the transaction commits or rolls back. | ||
| """ | ||
| bind = session.get_bind() | ||
| if bind is None: | ||
| return | ||
|
|
||
| # Get the engine from either an Engine or Connection | ||
| engine: sa.Engine = bind if isinstance(bind, sa.Engine) else bind.engine # type: ignore[union-attr] | ||
| dialect = get_dialect(engine) | ||
|
|
||
| if dialect.name == "postgresql": | ||
| # Use the trigger's UUID as the lock key | ||
| # pg_advisory_xact_lock takes a bigint, so we use the UUID's int representation | ||
| # truncated to fit (collision is extremely unlikely and benign) | ||
| lock_key = hash(str(trigger.id)) % (2**63) | ||
| await session.execute( | ||
| sa.text("SELECT pg_advisory_xact_lock(:key)"), {"key": lock_key} | ||
| ) |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
# Find the CompositeTrigger model to understand trigger.id type
fd -t f "composite_trigger" --extension py | head -20Repository: qodo-benchmark/prefect-combined-coderabbit
Length of output: 274
🏁 Script executed:
# Look for CompositeTrigger class definition and trigger.id attribute
rg "class CompositeTrigger" -A 10 -t pyRepository: qodo-benchmark/prefect-combined-coderabbit
Length of output: 2591
🏁 Script executed:
# Check UUID usage and int attribute in Python
python3 << 'EOF'
import uuid
# Verify UUID has .int attribute
test_uuid = uuid.uuid4()
print(f"UUID: {test_uuid}")
print(f"UUID.int: {test_uuid.int}")
print(f"UUID.int & ((1 << 63) - 1): {test_uuid.int & ((1 << 63) - 1)}")
# Verify hash() randomization
import subprocess
result1 = subprocess.run(['python3', '-c', 'print(hash("test"))'], capture_output=True, text=True)
result2 = subprocess.run(['python3', '-c', 'print(hash("test"))'], capture_output=True, text=True)
print(f"\nhash('test') run 1: {result1.stdout.strip()}")
print(f"hash('test') run 2: {result2.stdout.strip()}")
print(f"Hashes equal across processes: {result1.stdout == result2.stdout}")
EOFRepository: qodo-benchmark/prefect-combined-coderabbit
Length of output: 172
🏁 Script executed:
# Search for where CompositeTrigger is defined and check trigger.id
rg "trigger\.id" src/prefect/server/events/ -B 3 -A 3 -t py | head -50Repository: qodo-benchmark/prefect-combined-coderabbit
Length of output: 3274
🏁 Script executed:
# Look at the models directory to find CompositeTrigger definition
find src/prefect/server/events/models -name "*.py" -type f -exec grep -l "class CompositeTrigger" {} \;Repository: qodo-benchmark/prefect-combined-coderabbit
Length of output: 68
🏁 Script executed:
# Look at the full file to see imports and understand trigger type
cat -n src/prefect/server/events/models/composite_trigger_child_firing.py | head -60Repository: qodo-benchmark/prefect-combined-coderabbit
Length of output: 2668
🏁 Script executed:
# Search for Trigger base class definition
rg "^class Trigger" src/prefect/ -t py -A 5 | head -40Repository: qodo-benchmark/prefect-combined-coderabbit
Length of output: 1842
🏁 Script executed:
# Look for trigger.id definition in Trigger class
rg "class Trigger" src/prefect/server/events/schemas/automations.py -A 20Repository: qodo-benchmark/prefect-combined-coderabbit
Length of output: 1574
🏁 Script executed:
# Verify UUID.int approach
python3 << 'EOF'
import uuid
# Create a test UUID
test_uuid = uuid.UUID("550e8400-e29b-41d4-a716-446655440000")
print(f"UUID: {test_uuid}")
print(f"UUID.int: {test_uuid.int}")
print(f"UUID.int value: {test_uuid.int}")
print(f"Max 63-bit value: {(2**63) - 1}")
print(f"UUID.int & ((1 << 63) - 1): {test_uuid.int & ((1 << 63) - 1)}")
print(f"Is result within 63-bit range: {test_uuid.int & ((1 << 63) - 1) < 2**63}")
# Verify determinism across multiple calls
uuid_str = "550e8400-e29b-41d4-a716-446655440000"
uuid_obj = uuid.UUID(uuid_str)
val1 = uuid_obj.int & ((1 << 63) - 1)
val2 = uuid.UUID(uuid_str).int & ((1 << 63) - 1)
print(f"\nDeterministic across calls: {val1 == val2}")
print(f"Values: {val1} == {val2}")
# Verify hash() is non-deterministic
import subprocess
hashes = []
for _ in range(3):
result = subprocess.run(['python3', '-c', 'print(hash("550e8400-e29b-41d4-a716-446655440000"))'],
capture_output=True, text=True)
hashes.append(result.stdout.strip())
print(f"\nhash() values across processes: {hashes}")
print(f"All hashes equal: {len(set(hashes)) == 1}")
EOFRepository: qodo-benchmark/prefect-combined-coderabbit
Length of output: 569
🏁 Script executed:
# Search for UUID.int usage in codebase
rg "\.int\b" src/prefect/server/events/ -t py -B 2 -A 2 | grep -A 5 -B 5 "UUID\|uuid"Repository: qodo-benchmark/prefect-combined-coderabbit
Length of output: 68
🏁 Script executed:
# Look for advisory lock patterns in codebase
rg "advisory.*lock|pg_advisory" src/prefect/ -t py -B 3 -A 3Repository: qodo-benchmark/prefect-combined-coderabbit
Length of output: 2870
Use deterministic trigger.id.int for advisory lock key (Python hash() is randomized per process).
hash(str(trigger.id)) produces different values in each worker process, so different workers acquire different lock keys and the race condition persists. Verified: three separate processes yielded different hash values for the same UUID string.
🔧 Proposed fix
- lock_key = hash(str(trigger.id)) % (2**63)
+ # Deterministic, process-stable key derived from UUID's 128-bit integer
+ lock_key = trigger.id.int & ((1 << 63) - 1)🤖 Prompt for AI Agents
In `@src/prefect/server/events/models/composite_trigger_child_firing.py` around
lines 17 - 45, The advisory lock key is currently computed with Python's
randomized hash(str(trigger.id)) so different processes get different keys;
update acquire_composite_trigger_lock to derive a deterministic 64-bit integer
from the trigger UUID by using trigger.id.int truncated/masked to fit into a
signed bigint (e.g., mod 2**63 or bitmask) and assign that to lock_key, then
pass that deterministic lock_key to the session.execute call for
pg_advisory_xact_lock; keep the rest of the logic (dialect check,
session.execute) unchanged.
| async def compound_automation_concurrent( | ||
| self, | ||
| automations_session: AsyncSession, | ||
| cleared_buckets: None, | ||
| cleared_automations: None, | ||
| ) -> Automation: |
There was a problem hiding this comment.
Silence Ruff ARG002 for side-effect fixtures.
These fixtures are used for side effects only; rename to underscore-prefixed args (or use @pytest.mark.usefixtures) to keep lint clean.
🔧 Proposed fix
- async def compound_automation_concurrent(
- self,
- automations_session: AsyncSession,
- cleared_buckets: None,
- cleared_automations: None,
- ) -> Automation:
+ async def compound_automation_concurrent(
+ self,
+ automations_session: AsyncSession,
+ _cleared_buckets: None,
+ _cleared_automations: None,
+ ) -> Automation:🧰 Tools
🪛 Ruff (0.14.13)
1637-1637: Unused method argument: cleared_buckets
(ARG002)
1638-1638: Unused method argument: cleared_automations
(ARG002)
🤖 Prompt for AI Agents
In `@tests/events/server/triggers/test_composite_triggers.py` around lines 1634 -
1639, The fixture function compound_automation_concurrent has parameters used
only for side effects (cleared_buckets, cleared_automations); rename those
parameters to underscore-prefixed names (e.g., _cleared_buckets,
_cleared_automations) or convert them to use `@pytest.mark.usefixtures` to silence
Ruff ARG002; update the async def signature for compound_automation_concurrent
accordingly and ensure any calls or references match the new names.
Benchmark PR from qodo-benchmark#563
Summary by CodeRabbit
New Features
Tests
✏️ Tip: You can customize this high-level summary in your review settings.