Skip to content

fix: resolve race condition in compound trigger evaluation#20

Open
tomerqodo wants to merge 2 commits intocoderabbit_combined_20260121_augment_sentry_coderabbit_1_base_fix_resolve_race_condition_in_compound_trigger_evaluation_pr563from
coderabbit_combined_20260121_augment_sentry_coderabbit_1_head_fix_resolve_race_condition_in_compound_trigger_evaluation_pr563
Open

fix: resolve race condition in compound trigger evaluation#20
tomerqodo wants to merge 2 commits intocoderabbit_combined_20260121_augment_sentry_coderabbit_1_base_fix_resolve_race_condition_in_compound_trigger_evaluation_pr563from
coderabbit_combined_20260121_augment_sentry_coderabbit_1_head_fix_resolve_race_condition_in_compound_trigger_evaluation_pr563

Conversation

@tomerqodo
Copy link

@tomerqodo tomerqodo commented Jan 22, 2026

Benchmark PR from qodo-benchmark#563

Summary by CodeRabbit

  • New Features

    • Improved concurrency handling for composite triggers to prevent race conditions and duplicate firings when multiple workers evaluate triggers simultaneously.
  • Tests

    • Added test coverage for concurrent composite trigger evaluation scenarios.

✏️ Tip: You can customize this high-level summary in your review settings.

desertaxle and others added 2 commits January 21, 2026 15:46
Fixes two race conditions in compound trigger evaluation:

1. **Never-firing race** (transactional): When two child triggers fire
   concurrently in separate transactions, each only sees its own
   uncommitted insert due to READ COMMITTED isolation. Neither sees
   enough firings to trigger the parent.

   Fix: Use PostgreSQL advisory locks to serialize concurrent evaluations
   for the same compound trigger.

2. **Double-firing race** (autocommit): When both transactions see all
   firings, both delete and both fire the parent.

   Fix: Use DELETE ... RETURNING to make clearing a claim operation.
   Only the worker that successfully deletes the expected firings
   proceeds; others bail out.

Based on the fix in PrefectHQ/nebula#10716.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
@coderabbitai
Copy link

coderabbitai bot commented Jan 22, 2026

Walkthrough

The changes introduce distributed lock acquisition and race condition detection for composite triggers. A new advisory lock function is added to serialize concurrent trigger evaluations, and the child firing clearing logic now returns deleted IDs to detect if other workers claimed firings first. Corresponding tests validate concurrent firing behavior.

Changes

Cohort / File(s) Summary
Composite trigger lock and clearing logic
src/prefect/server/events/models/composite_trigger_child_firing.py
Added acquire_composite_trigger_lock() function for PostgreSQL advisory locks to serialize trigger evaluations. Modified clear_child_firings() to return set[UUID] of deleted firing IDs instead of None, using DELETE...RETURNING to capture deleted records and enable race condition detection.
Trigger evaluation with race detection
src/prefect/server/events/triggers.py
Integrated advisory lock acquisition before processing composite trigger firings. Replaced simple clearing with race-aware flow that compares deleted ID count against requested ID count to detect if other workers claimed firings. Changed logger initialization from get_logger(__name__) to logging.getLogger(__name__). Added import for acquire_composite_trigger_lock.
Concurrent firing tests
tests/events/server/triggers/test_composite_triggers.py
Added TestCompoundTriggerConcurrency class with fixture compound_automation_concurrent and two tests validating that compound triggers fire exactly once under concurrent child firing conditions. Added asyncio import.

Sequence Diagram(s)

sequenceDiagram
    participant Worker1 as Worker 1
    participant Worker2 as Worker 2
    participant Session as Database Session
    participant Trigger as Composite Trigger
    
    par Concurrent Execution
        Worker1->>Session: acquire_composite_trigger_lock(trigger)
        Worker2->>Session: acquire_composite_trigger_lock(trigger)
    end
    
    Worker1->>Session: clear_child_firings([id1, id2])
    Session->>Session: DELETE...RETURNING firing_ids
    Session-->>Worker1: {id1, id2}
    
    Worker2->>Session: clear_child_firings([id1, id2])
    Session->>Session: DELETE...RETURNING firing_ids (already deleted)
    Session-->>Worker2: {}
    
    Worker1->>Trigger: len(deleted_ids) == len(requested_ids)?
    Trigger-->>Worker1: TRUE - proceed with firing
    Worker1->>Trigger: fire()
    
    Worker2->>Trigger: len(deleted_ids) == len(requested_ids)?
    Trigger-->>Worker2: FALSE - skip firing (race lost)
    Worker2->>Worker2: log & return early
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

Poem

🐰 Locks and races, oh what a sight!
Advisory guards keep triggers tight,
When workers compete, one must claim the prize,
With returning IDs, no double-fire surprise! ✨

🚥 Pre-merge checks | ✅ 1 | ❌ 2
❌ Failed checks (2 warnings)
Check name Status Explanation Resolution
Description check ⚠️ Warning The PR description is minimal and does not follow the provided template, missing an overview of proposed changes and incomplete checklist items. Provide a detailed overview of the changes addressing the race conditions, and complete the checklist by referencing the related issue and confirming test coverage.
Docstring Coverage ⚠️ Warning Docstring coverage is 62.50% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (1 passed)
Check name Status Explanation
Title check ✅ Passed The title clearly and concisely summarizes the main change: resolving a race condition in compound trigger evaluation, which is the primary purpose of all modifications.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
  • 📝 Generate docstrings

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
src/prefect/server/events/models/composite_trigger_child_firing.py (1)

133-155: Return child_firing_id instead of child_trigger_id.

The function filters on child_firing_id.in_(firing_ids) (line 152) and its docstring promises to return deleted child firing IDs. However, it returns child_trigger_id instead. This breaks the contract and the caller's race-detection logic in src/prefect/server/events/triggers.py, which compares returned IDs against the input firing_ids to verify deletions.

🔧 Proposed fix
-        .returning(db.CompositeTriggerChildFiring.child_trigger_id)
+        .returning(db.CompositeTriggerChildFiring.child_firing_id)
🤖 Fix all issues with AI agents
In `@src/prefect/server/events/models/composite_trigger_child_firing.py`:
- Around line 17-45: The advisory lock key is currently computed with Python's
randomized hash(str(trigger.id)) so different processes get different keys;
update acquire_composite_trigger_lock to derive a deterministic 64-bit integer
from the trigger UUID by using trigger.id.int truncated/masked to fit into a
signed bigint (e.g., mod 2**63 or bitmask) and assign that to lock_key, then
pass that deterministic lock_key to the session.execute call for
pg_advisory_xact_lock; keep the rest of the logic (dialect check,
session.execute) unchanged.

In `@tests/events/server/triggers/test_composite_triggers.py`:
- Around line 1634-1639: The fixture function compound_automation_concurrent has
parameters used only for side effects (cleared_buckets, cleared_automations);
rename those parameters to underscore-prefixed names (e.g., _cleared_buckets,
_cleared_automations) or convert them to use `@pytest.mark.usefixtures` to silence
Ruff ARG002; update the async def signature for compound_automation_concurrent
accordingly and ensure any calls or references match the new names.
📜 Review details

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 89baeef and 305bde9.

📒 Files selected for processing (3)
  • src/prefect/server/events/models/composite_trigger_child_firing.py
  • src/prefect/server/events/triggers.py
  • tests/events/server/triggers/test_composite_triggers.py
🧰 Additional context used
🧬 Code graph analysis (2)
src/prefect/server/events/models/composite_trigger_child_firing.py (2)
src/prefect/server/utilities/database.py (1)
  • get_dialect (741-769)
src/prefect/server/database/interface.py (1)
  • dialect (141-142)
src/prefect/server/events/triggers.py (1)
src/prefect/server/events/models/composite_trigger_child_firing.py (2)
  • acquire_composite_trigger_lock (17-47)
  • clear_child_firings (134-157)
🪛 Ruff (0.14.13)
tests/events/server/triggers/test_composite_triggers.py

1637-1637: Unused method argument: cleared_buckets

(ARG002)


1638-1638: Unused method argument: cleared_automations

(ARG002)

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (38)
  • GitHub Check: Server Tests - python:3.12, postgres:14
  • GitHub Check: Database and Orchestration Tests - python:3.10, sqlite
  • GitHub Check: Database and Orchestration Tests - python:3.14, sqlite
  • GitHub Check: Runner, Worker, Settings, Input, and CLI Tests - python:3.14, postgres:14
  • GitHub Check: Database and Orchestration Tests - python:3.13, postgres:14
  • GitHub Check: Database and Orchestration Tests - python:3.13, sqlite
  • GitHub Check: Server Tests - python:3.10, sqlite
  • GitHub Check: Client Tests - python:3.12, postgres:14
  • GitHub Check: Client Tests - python:3.11, postgres:14
  • GitHub Check: Runner, Worker, Settings, Input, and CLI Tests - python:3.11, postgres:14
  • GitHub Check: Database and Orchestration Tests - python:3.11, postgres:14
  • GitHub Check: Client Tests - python:3.13, postgres:14
  • GitHub Check: Database and Orchestration Tests - python:3.12, sqlite
  • GitHub Check: Client Tests - python:3.10, postgres:14
  • GitHub Check: Server Tests - python:3.13, sqlite
  • GitHub Check: Server Tests - python:3.10, postgres:14
  • GitHub Check: Database and Orchestration Tests - python:3.12, postgres:14
  • GitHub Check: Database and Orchestration Tests - python:3.10, postgres:14
  • GitHub Check: Server Tests - python:3.14, postgres:14
  • GitHub Check: Server Tests - python:3.11, sqlite
  • GitHub Check: Runner, Worker, Settings, Input, and CLI Tests - python:3.13, postgres:14
  • GitHub Check: Database and Orchestration Tests - python:3.11, sqlite
  • GitHub Check: Runner, Worker, Settings, Input, and CLI Tests - python:3.10, postgres:14
  • GitHub Check: Server Tests - python:3.14, sqlite
  • GitHub Check: Server Tests - python:3.11, postgres:14
  • GitHub Check: Client Tests - python:3.14, postgres:14
  • GitHub Check: Server Tests - python:3.12, sqlite
  • GitHub Check: Runner, Worker, Settings, Input, and CLI Tests - python:3.12, postgres:14
  • GitHub Check: Server Tests - python:3.13, postgres:14
  • GitHub Check: Database and Orchestration Tests - python:3.14, postgres:14
  • GitHub Check: Benchmark
  • GitHub Check: Pre-commit checks
  • GitHub Check: Test SQLite 3.31.1 Compatibility
  • GitHub Check: typesafety
  • GitHub Check: Test SQLite 3.24.0 Compatibility
  • GitHub Check: Integration tests @main
  • GitHub Check: Prefect Client Compatibility Tests
  • GitHub Check: Benchmark
🔇 Additional comments (4)
src/prefect/server/events/triggers.py (3)

37-43: Good: composite evaluations are now serialized via advisory lock.

This is the right spot to acquire the lock before reading/clearing child firings.

Also applies to: 349-352


71-73: No review needed.


390-410: Nice claim-style clear to prevent double-firing.

The early return on partial delete is a solid race guard.

tests/events/server/triggers/test_composite_triggers.py (1)

1-1: Good concurrent-firing regression coverage.

The gather-based tests exercise the race paths well.

Also applies to: 1674-1747

✏️ Tip: You can disable this entire section by setting review_details to false in your review settings.

Comment on lines +17 to +45
async def acquire_composite_trigger_lock(
session: AsyncSession,
trigger: CompositeTrigger,
) -> None:
"""
Acquire a transaction-scoped advisory lock for the given composite trigger.

This serializes concurrent child trigger evaluations for the same compound
trigger, preventing a race condition where multiple transactions each see
only their own child firing and neither fires the parent.

The lock is automatically released when the transaction commits or rolls back.
"""
bind = session.get_bind()
if bind is None:
return

# Get the engine from either an Engine or Connection
engine: sa.Engine = bind if isinstance(bind, sa.Engine) else bind.engine # type: ignore[union-attr]
dialect = get_dialect(engine)

if dialect.name == "postgresql":
# Use the trigger's UUID as the lock key
# pg_advisory_xact_lock takes a bigint, so we use the UUID's int representation
# truncated to fit (collision is extremely unlikely and benign)
lock_key = hash(str(trigger.id)) % (2**63)
await session.execute(
sa.text("SELECT pg_advisory_xact_lock(:key)"), {"key": lock_key}
)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

# Find the CompositeTrigger model to understand trigger.id type
fd -t f "composite_trigger" --extension py | head -20

Repository: qodo-benchmark/prefect-combined-coderabbit

Length of output: 274


🏁 Script executed:

# Look for CompositeTrigger class definition and trigger.id attribute
rg "class CompositeTrigger" -A 10 -t py

Repository: qodo-benchmark/prefect-combined-coderabbit

Length of output: 2591


🏁 Script executed:

# Check UUID usage and int attribute in Python
python3 << 'EOF'
import uuid
# Verify UUID has .int attribute
test_uuid = uuid.uuid4()
print(f"UUID: {test_uuid}")
print(f"UUID.int: {test_uuid.int}")
print(f"UUID.int & ((1 << 63) - 1): {test_uuid.int & ((1 << 63) - 1)}")

# Verify hash() randomization
import subprocess
result1 = subprocess.run(['python3', '-c', 'print(hash("test"))'], capture_output=True, text=True)
result2 = subprocess.run(['python3', '-c', 'print(hash("test"))'], capture_output=True, text=True)
print(f"\nhash('test') run 1: {result1.stdout.strip()}")
print(f"hash('test') run 2: {result2.stdout.strip()}")
print(f"Hashes equal across processes: {result1.stdout == result2.stdout}")
EOF

Repository: qodo-benchmark/prefect-combined-coderabbit

Length of output: 172


🏁 Script executed:

# Search for where CompositeTrigger is defined and check trigger.id
rg "trigger\.id" src/prefect/server/events/ -B 3 -A 3 -t py | head -50

Repository: qodo-benchmark/prefect-combined-coderabbit

Length of output: 3274


🏁 Script executed:

# Look at the models directory to find CompositeTrigger definition
find src/prefect/server/events/models -name "*.py" -type f -exec grep -l "class CompositeTrigger" {} \;

Repository: qodo-benchmark/prefect-combined-coderabbit

Length of output: 68


🏁 Script executed:

# Look at the full file to see imports and understand trigger type
cat -n src/prefect/server/events/models/composite_trigger_child_firing.py | head -60

Repository: qodo-benchmark/prefect-combined-coderabbit

Length of output: 2668


🏁 Script executed:

# Search for Trigger base class definition
rg "^class Trigger" src/prefect/ -t py -A 5 | head -40

Repository: qodo-benchmark/prefect-combined-coderabbit

Length of output: 1842


🏁 Script executed:

# Look for trigger.id definition in Trigger class
rg "class Trigger" src/prefect/server/events/schemas/automations.py -A 20

Repository: qodo-benchmark/prefect-combined-coderabbit

Length of output: 1574


🏁 Script executed:

# Verify UUID.int approach
python3 << 'EOF'
import uuid

# Create a test UUID
test_uuid = uuid.UUID("550e8400-e29b-41d4-a716-446655440000")

print(f"UUID: {test_uuid}")
print(f"UUID.int: {test_uuid.int}")
print(f"UUID.int value: {test_uuid.int}")
print(f"Max 63-bit value: {(2**63) - 1}")
print(f"UUID.int & ((1 << 63) - 1): {test_uuid.int & ((1 << 63) - 1)}")
print(f"Is result within 63-bit range: {test_uuid.int & ((1 << 63) - 1) < 2**63}")

# Verify determinism across multiple calls
uuid_str = "550e8400-e29b-41d4-a716-446655440000"
uuid_obj = uuid.UUID(uuid_str)
val1 = uuid_obj.int & ((1 << 63) - 1)
val2 = uuid.UUID(uuid_str).int & ((1 << 63) - 1)
print(f"\nDeterministic across calls: {val1 == val2}")
print(f"Values: {val1} == {val2}")

# Verify hash() is non-deterministic
import subprocess
hashes = []
for _ in range(3):
    result = subprocess.run(['python3', '-c', 'print(hash("550e8400-e29b-41d4-a716-446655440000"))'], 
                           capture_output=True, text=True)
    hashes.append(result.stdout.strip())

print(f"\nhash() values across processes: {hashes}")
print(f"All hashes equal: {len(set(hashes)) == 1}")
EOF

Repository: qodo-benchmark/prefect-combined-coderabbit

Length of output: 569


🏁 Script executed:

# Search for UUID.int usage in codebase
rg "\.int\b" src/prefect/server/events/ -t py -B 2 -A 2 | grep -A 5 -B 5 "UUID\|uuid"

Repository: qodo-benchmark/prefect-combined-coderabbit

Length of output: 68


🏁 Script executed:

# Look for advisory lock patterns in codebase
rg "advisory.*lock|pg_advisory" src/prefect/ -t py -B 3 -A 3

Repository: qodo-benchmark/prefect-combined-coderabbit

Length of output: 2870


Use deterministic trigger.id.int for advisory lock key (Python hash() is randomized per process).

hash(str(trigger.id)) produces different values in each worker process, so different workers acquire different lock keys and the race condition persists. Verified: three separate processes yielded different hash values for the same UUID string.

🔧 Proposed fix
-        lock_key = hash(str(trigger.id)) % (2**63)
+        # Deterministic, process-stable key derived from UUID's 128-bit integer
+        lock_key = trigger.id.int & ((1 << 63) - 1)
🤖 Prompt for AI Agents
In `@src/prefect/server/events/models/composite_trigger_child_firing.py` around
lines 17 - 45, The advisory lock key is currently computed with Python's
randomized hash(str(trigger.id)) so different processes get different keys;
update acquire_composite_trigger_lock to derive a deterministic 64-bit integer
from the trigger UUID by using trigger.id.int truncated/masked to fit into a
signed bigint (e.g., mod 2**63 or bitmask) and assign that to lock_key, then
pass that deterministic lock_key to the session.execute call for
pg_advisory_xact_lock; keep the rest of the logic (dialect check,
session.execute) unchanged.

Comment on lines +1634 to +1639
async def compound_automation_concurrent(
self,
automations_session: AsyncSession,
cleared_buckets: None,
cleared_automations: None,
) -> Automation:
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Silence Ruff ARG002 for side-effect fixtures.

These fixtures are used for side effects only; rename to underscore-prefixed args (or use @pytest.mark.usefixtures) to keep lint clean.

🔧 Proposed fix
-    async def compound_automation_concurrent(
-        self,
-        automations_session: AsyncSession,
-        cleared_buckets: None,
-        cleared_automations: None,
-    ) -> Automation:
+    async def compound_automation_concurrent(
+        self,
+        automations_session: AsyncSession,
+        _cleared_buckets: None,
+        _cleared_automations: None,
+    ) -> Automation:
🧰 Tools
🪛 Ruff (0.14.13)

1637-1637: Unused method argument: cleared_buckets

(ARG002)


1638-1638: Unused method argument: cleared_automations

(ARG002)

🤖 Prompt for AI Agents
In `@tests/events/server/triggers/test_composite_triggers.py` around lines 1634 -
1639, The fixture function compound_automation_concurrent has parameters used
only for side effects (cleared_buckets, cleared_automations); rename those
parameters to underscore-prefixed names (e.g., _cleared_buckets,
_cleared_automations) or convert them to use `@pytest.mark.usefixtures` to silence
Ruff ARG002; update the async def signature for compound_automation_concurrent
accordingly and ensure any calls or references match the new names.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants