Skip to content

Conversation

@JohnCari
Copy link

@JohnCari JohnCari commented Jan 13, 2026

Summary

Adds HeartbeatWorker to maintain replication slot activity when replicating from read replicas.

Closes #535

Problem

When using logical replication from a read replica (PostgreSQL 15+), the replication slot on the primary can become inactive during idle periods since WAL is generated on the primary but consumed from the replica. This can lead to slot invalidation.

Solution

A heartbeat mechanism that periodically emits pg_logical_emit_message(false, 'etl_heartbeat', '') to the primary, keeping the slot active.

Implementation

New Components:

  • HeartbeatWorker - Background worker connecting to primary in regular mode
  • HeartbeatConfig - Configurable interval (30s), backoff (1-60s), jitter (25%)
  • HeartbeatWorkerHandle - Implements WorkerHandle<()> trait

Key Features:

  • Exponential backoff with jitter on connection failures
  • Graceful shutdown via tokio watch channel
  • Comprehensive metrics (emissions, failures, connection attempts)
  • Config validation (interval > 0, jitter ≤ 100, min ≤ max backoff)

Connection Handling:

  • connect_regular() for non-replication connections
  • build_root_cert_store() shared helper (reduces TLS code duplication)
  • ETL_HEARTBEAT_OPTIONS with short timeouts (5s statement, 5s lock)

CodeRabbit Feedback Addressed (from #549)

Issue Resolution
Jitter bug (divided by u32::MAX) Fixed: divide by 1_000_000_000.0
Missing interval_ms > 0 validation Added with test
TLS code duplication Extracted build_root_cert_store()
Missing documentation Added doc comments

Configuration

let config = PipelineConfig {
    pg_connection: replica_config,
    primary_connection: Some(primary_config),
    heartbeat: Some(HeartbeatConfig::default()),
    // ...
};

Metrics

  • etl_heartbeat_emissions_total
  • etl_heartbeat_failures_total
  • etl_heartbeat_consecutive_failures
  • etl_heartbeat_connection_attempts_total
  • etl_heartbeat_last_emission_timestamp

Test Plan

  • HeartbeatConfig default values test
  • HeartbeatConfig validation tests (zero interval, jitter > 100, min > max)
  • ETL_HEARTBEAT_OPTIONS contains expected values

Related

Summary by CodeRabbit

  • New Features

    • Heartbeat support for read-replica mode with configurable interval, backoff and jitter; automatic activation when replica mode enabled.
    • New heartbeat worker integrated into pipeline lifecycle and five observability metrics for emissions, failures, connection attempts, consecutive failures, and last emission time.
  • Chores

    • Simplified error handling surface with removal/adjustment of several conversion/aggregation paths and related tests.
  • Documentation

    • Examples and docs updated to show new replica-related config fields.

✏️ Tip: You can customize this high-level summary in your review settings.

Adds HeartbeatWorker to maintain replication slot activity when replicating
from read replicas (PostgreSQL 15+).

New components:
- HeartbeatWorker: Background worker connecting to primary in regular mode
- HeartbeatConfig: Configurable interval (30s), backoff (1-60s), jitter (25%)
- connect_regular(): Non-replication connection method
- connect_with_tls(): Shared TLS helper to reduce code duplication

Key features:
- Exponential backoff with jitter on connection failures
- Graceful shutdown via tokio watch channel
- Comprehensive metrics for monitoring
- Config validation (interval > 0, jitter <= 100, min <= max backoff)

Closes supabase#535
- Add primary_connection and heartbeat fields to PipelineConfig
- Add HeartbeatWorkerPanic error kind
- Add heartbeat metrics constants
@JohnCari JohnCari requested a review from a team as a code owner January 13, 2026 21:31
@coderabbitai
Copy link

coderabbitai bot commented Jan 13, 2026

Warning

Rate limit exceeded

@JohnCari has exceeded the limit for the number of commits that can be reviewed per hour. Please wait 16 minutes and 26 seconds before requesting another review.

⌛ How to resolve this issue?

After the wait time has elapsed, a review can be triggered using the @coderabbitai review command as a PR comment. Alternatively, push new commits to this PR.

We recommend that you space out your commits to avoid hitting the rate limit.

🚦 How do rate limits work?

CodeRabbit enforces hourly rate limits for each developer per organization.

Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout.

Please see our FAQ for further information.

📥 Commits

Reviewing files that changed from the base of the PR and between 7fbad38 and ca5600c.

📒 Files selected for processing (2)
  • etl-examples/src/main.rs
  • etl/src/pipeline.rs
📝 Walkthrough

Walkthrough

Adds heartbeat-based read-replica support: new HeartbeatConfig, pipeline fields for replica mode, a HeartbeatWorker that emits pg_logical_emit_message over regular DB connections with backoff/jitter and metrics, replication client support for regular connections, and integration into pipeline lifecycle and metrics.

Changes

Cohort / File(s) Summary
Heartbeat configuration
etl-config/src/shared/heartbeat.rs, etl-config/src/shared/mod.rs
New HeartbeatConfig (interval_ms, min_backoff_ms, max_backoff_ms, jitter_percent), serde default helpers, constants, Default and validate(), ETL_HEARTBEAT_OPTIONS const, unit tests; module added and re-exported.
Pipeline config (replica mode)
etl-config/src/shared/pipeline.rs, etl/src/test_utils/pipeline.rs, etl-benchmarks/..., etl-examples/src/main.rs
Added primary_connection: Option<PgConnectionConfig> and heartbeat: Option<HeartbeatConfig> to configs and builders; is_replica_mode() and heartbeat_config() helpers; validation extended to check heartbeat when primary present; examples/benchmarks updated to set fields to None.
Heartbeat worker
etl/src/workers/heartbeat.rs, etl/src/workers/mod.rs
New HeartbeatWorker, HeartbeatWorkerHandle, HeartbeatError; lifecycle: start(), connect regular client, emit loop using pg_logical_emit_message, exponential backoff + jitter, cooperative shutdown, metrics & logging; exported via workers::heartbeat.
Replication client
etl/src/replication/client.rs
Added connect_regular(pg_connection_config: PgConnectionConfig) -> EtlResult<tokio_postgres::Client> and extracted TLS root cert store helper for reuse between replication and regular connections.
Pipeline lifecycle integration
etl/src/pipeline.rs
PipelineState gains optional heartbeat_worker; start() may spawn HeartbeatWorker when primary_connection configured; wait() coordinates worker shutdown and surfaces heartbeat failures.
Metrics
etl/src/metrics.rs
Added heartbeat metric constants and registration: emissions total, failures total, consecutive failures gauge, connection attempts, last emission timestamp (gauge, seconds).
Error surface
etl/src/error.rs
Added HeartbeatWorkerPanic variant to ErrorKind; several From<...> conversions and related tests removed (narrowed conversion surface).
Dependencies / Cargo
etl/Cargo.toml
Added rand and thiserror dependencies (adjusted from dev-deps).

Sequence Diagram(s)

sequenceDiagram
    participant Pipeline as PipelineState
    participant HW as HeartbeatWorker
    participant RC as PgReplicationClient
    participant DB as PostgreSQL
    Pipeline->>HW: start() (when primary_connection present)
    HW->>RC: connect_regular(primary_config)
    RC->>DB: establish connection (TLS/no-TLS)
    DB-->>RC: connection established
    RC-->>HW: client returned
    HW->>HW: reset consecutive failures
    loop every interval or after backoff
        HW->>DB: emit_heartbeat() (pg_logical_emit_message)
        DB-->>HW: success / error
        alt Success
            HW->>HW: record_emission(), update timestamp (metrics)
        else Error
            HW->>HW: record_failure(), increment consecutive failures
            HW->>HW: calculate_backoff(), apply_jitter()
            HW->>HW: wait_with_shutdown(backoff)
            HW->>RC: reconnect (retry)
        end
    end
    Pipeline->>HW: shutdown requested -> HW stops and Pipeline.wait() awaits handle
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~50 minutes

Possibly related PRs

🚥 Pre-merge checks | ✅ 5
✅ Passed checks (5 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title accurately describes the main change: introduction of a HeartbeatWorker for read replica replication slot support.
Linked Issues check ✅ Passed The PR fully addresses issue #535 by implementing heartbeat mechanism to prevent replication slot invalidation on primary during idle periods when reading from read replicas.
Out of Scope Changes check ✅ Passed All changes are directly related to implementing read replica replication slot support: heartbeat config, worker lifecycle, metrics, connection helpers, and integration with pipeline execution.
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
etl/src/workers/mod.rs (1)

1-9: Consider updating the module documentation to include heartbeat workers.

The module doc comment describes apply workers, table sync workers, and worker pools, but doesn't mention the newly added heartbeat worker. As per coding guidelines, documentation should cover all items.

📝 Suggested documentation update
 //! Worker implementations for concurrent replication tasks.
 //!
-//! Contains worker types for handling different aspects of replication: apply workers process
-//! replication streams, table sync workers handle initial data copying, and worker pools manage
-//! concurrent execution and lifecycle coordination.
+//! Contains worker types for handling different aspects of replication: apply workers process
+//! replication streams, table sync workers handle initial data copying, heartbeat workers maintain
+//! replication slot activity on primaries when replicating from read replicas, and worker pools
+//! manage concurrent execution and lifecycle coordination.
🤖 Fix all issues with AI agents
In @etl-config/src/shared/heartbeat.rs:
- Around line 55-81: In the validate method of the Heartbeat config (fn
validate), add a check that rejects zero or negative backoff by returning
ValidationError::InvalidFieldValue for "min_backoff_ms" when self.min_backoff_ms
== 0 (or <= 0) with a constraint message like "must be greater than 0"; keep the
existing min_backoff_ms <= max_backoff_ms check so that this implicitly ensures
max_backoff_ms is also > 0.

In @etl-config/src/shared/pipeline.rs:
- Around line 71-79: The heartbeat validation is being run even when replica
mode is disabled; update the config validation code (the function that currently
validates `heartbeat` around lines 147-150) to only validate `heartbeat` when
`primary_connection.is_some()` (i.e., replica mode enabled). Concretely, wrap
the existing `heartbeat` validation logic in an if-check on
`primary_connection`, or alternatively return a clear error/warning if
`heartbeat` is set while `primary_connection` is None; reference
`primary_connection` and `heartbeat` in the change so the intent is explicit.

In @etl/src/metrics.rs:
- Around line 18-23: The comment above the heartbeat metric constants ("//
Heartbeat metrics") is missing a trailing period; update that comment to end
with a period (e.g., change it to "// Heartbeat metrics.") so it adheres to Rust
comment style; locate the block that defines ETL_HEARTBEAT_EMISSIONS_TOTAL,
ETL_HEARTBEAT_FAILURES_TOTAL, ETL_HEARTBEAT_CONSECUTIVE_FAILURES,
ETL_HEARTBEAT_CONNECTION_ATTEMPTS_TOTAL, and
ETL_HEARTBEAT_LAST_EMISSION_TIMESTAMP and update the comment accordingly, then
run rustfmt to ensure formatting consistency.

In @etl/src/workers/heartbeat.rs:
- Around line 219-242: Change heartbeat_loop so it emits an initial heartbeat
immediately instead of waiting the full interval: call
self.emit_heartbeat(client).await? once before entering the interval wait, or
restructure the loop to emit first then call wait_with_shutdown(interval).await;
ensure you still check client.is_closed() (use client.is_closed() before
emitting) and handle shutdown (if wait_with_shutdown returns true after emitting
then return Ok(())), keeping all existing error handling and the use of
HeartbeatError unchanged.
📜 Review details

Configuration used: Repository UI

Review profile: ASSERTIVE

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between f238781 and 0c67938.

📒 Files selected for processing (8)
  • etl-config/src/shared/heartbeat.rs
  • etl-config/src/shared/mod.rs
  • etl-config/src/shared/pipeline.rs
  • etl/src/error.rs
  • etl/src/metrics.rs
  • etl/src/replication/client.rs
  • etl/src/workers/heartbeat.rs
  • etl/src/workers/mod.rs
🧰 Additional context used
📓 Path-based instructions (1)
**/*.rs

📄 CodeRabbit inference engine (AGENTS.md)

**/*.rs: Use snake_case for files and modules
Use CamelCase for types and traits
Use snake_case for functions and variables
Do not leave comments when you remove things
Document all items, public and private, using stdlib tone and precision; only use 'Panics' section when a function can panic
Link types and methods as [Type], [Type::method] in Rust documentation comments
Keep Rust documentation wording concise, correct, and punctuated; reword for clarity while preserving intent
Do not include code examples in Rust documentation; include private helpers for maintainers; apply documentation to modules, types, traits, impls, and functions
Normal comments in Rust should always finish with a period

Files:

  • etl-config/src/shared/mod.rs
  • etl/src/workers/mod.rs
  • etl/src/replication/client.rs
  • etl-config/src/shared/pipeline.rs
  • etl/src/metrics.rs
  • etl/src/workers/heartbeat.rs
  • etl-config/src/shared/heartbeat.rs
  • etl/src/error.rs
🧠 Learnings (3)
📚 Learning: 2026-01-13T07:51:54.159Z
Learnt from: CR
Repo: supabase/etl PR: 0
File: AGENTS.md:0-0
Timestamp: 2026-01-13T07:51:54.159Z
Learning: Rust workspace with crates: `etl/` (core), `etl-api/` (HTTP API), `etl-postgres/`, `etl-destinations/`, `etl-replicator/`, `etl-config/`, `etl-telemetry/`, `etl-examples/`, `etl-benchmarks/`

Applied to files:

  • etl-config/src/shared/mod.rs
  • etl/src/replication/client.rs
  • etl/src/metrics.rs
  • etl/src/workers/heartbeat.rs
  • etl/src/error.rs
📚 Learning: 2026-01-13T07:51:54.158Z
Learnt from: CR
Repo: supabase/etl PR: 0
File: AGENTS.md:0-0
Timestamp: 2026-01-13T07:51:54.158Z
Learning: Applies to **/*.rs : Document all items, public and private, using stdlib tone and precision; only use 'Panics' section when a function can panic

Applied to files:

  • etl/src/error.rs
📚 Learning: 2026-01-13T07:51:54.159Z
Learnt from: CR
Repo: supabase/etl PR: 0
File: AGENTS.md:0-0
Timestamp: 2026-01-13T07:51:54.159Z
Learning: Tests live per crate (`src` unit tests, `tests` integration); benches in `etl-benchmarks/benches/`

Applied to files:

  • etl/src/error.rs
🧬 Code graph analysis (1)
etl/src/workers/heartbeat.rs (2)
etl-config/src/shared/pipeline.rs (1)
  • heartbeat_config (119-125)
etl/src/replication/client.rs (1)
  • connect_regular (163-168)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: depthfirst Bot
🔇 Additional comments (26)
etl-config/src/shared/mod.rs (1)

5-5: LGTM!

The heartbeat module declaration and public re-export follow the established pattern used by other modules in this file.

Also applies to: 15-15

etl/src/replication/client.rs (2)

46-58: LGTM on the DRY refactoring for TLS root certificate store construction.

The helper correctly consolidates the TLS root store building logic, reducing duplication between replication and regular connection paths.


159-168: LGTM!

The connect_regular method and its TLS/non-TLS variants are well-structured. Returning the raw tokio_postgres::Client instead of PgReplicationClient is appropriate since heartbeat connections only need to execute simple SQL commands without replication features. The connection lifecycle monitoring via spawn_postgres_connection ensures proper cleanup.

Also applies to: 220-252

etl/src/metrics.rs (1)

106-136: LGTM on the metric registrations.

The metric types and units are correctly chosen:

  • Counters for cumulative totals (emissions, failures, connection attempts)
  • Gauges for current state (consecutive failures, last timestamp)
  • Unit::Seconds appropriately used for the timestamp gauge

Minor: The comment on line 107 should also end with a period for consistency.

etl-config/src/shared/heartbeat.rs (3)

8-40: LGTM on the HeartbeatConfig structure.

The struct is well-documented with clear field descriptions. The serde defaults ensure backward compatibility when fields are omitted from configuration.


122-173: LGTM on test coverage.

The tests effectively cover default values, configuration validation edge cases, and the options string content. Good use of struct update syntax with ..Default::default() for focused test cases.


95-109: Implementation is idiomatic.

The serde default helper functions correctly delegate to the associated constants, maintaining a single source of truth for default values.

etl-config/src/shared/pipeline.rs (5)

5-8: LGTM!

Import changes correctly bring in HeartbeatConfig for the new replica mode fields.


10-12: LGTM!

Documentation wording is clear and concise.


107-125: LGTM!

The replica mode detection and heartbeat config accessor methods are well-designed. The use of unwrap_or_default() ensures sensible defaults when heartbeat config is not explicitly provided.


183-186: LGTM!

Field types correctly use the secrets-stripped variant for connection config while keeping HeartbeatConfig as-is since it contains no sensitive data.


210-211: LGTM!

Conversion correctly strips secrets from primary_connection and passes through heartbeat unchanged.

etl/src/error.rs (1)

134-137: LGTM!

The new HeartbeatWorkerPanic variant follows the established pattern for worker panic error kinds and is appropriately placed in the "State & Workflow Errors" section.

etl/src/workers/heartbeat.rs (13)

1-23: LGTM!

Module documentation clearly explains the purpose and context (PostgreSQL 15+ read replicas). Imports are well-organized and appropriate for the functionality.


25-39: LGTM!

Error variants appropriately cover the main failure modes. Using String for error details is acceptable here as these errors are primarily used for logging and metrics rather than programmatic error handling.


48-70: LGTM!

The WorkerHandle implementation correctly handles the join operation and maps panics to the appropriate error kind. The Option<JoinHandle> pattern safely handles the case where wait() is called multiple times.


72-83: LGTM!

Struct fields capture all necessary state for heartbeat operation. The consecutive_failures counter is appropriately typed as u32 and the backoff calculation uses saturating arithmetic to handle edge cases.


85-107: LGTM!

Constructor is straightforward and well-documented. Validation is appropriately delegated to the config layer.


109-120: LGTM!

The start method follows the standard tokio spawn pattern, moving ownership into the background task and returning a handle for lifecycle management.


122-183: LGTM!

The main loop is well-structured with comprehensive shutdown handling. The backoff-on-error pattern with metrics tracking provides good observability. Resetting the consecutive failures gauge on shutdown is a nice touch for clean metrics.


185-217: LGTM!

Connection logic correctly uses regular (non-replication) mode via connect_regular. The consecutive failures counter is appropriately reset on successful connection, and metrics are updated correctly.


244-266: LGTM!

The heartbeat emission correctly uses pg_logical_emit_message(false, ...) for non-transactional WAL records. The comment at line 262 appropriately explains why failure recording is delegated to the caller.


268-280: LGTM!

The exponential backoff calculation is correct with appropriate use of saturating arithmetic to prevent overflow. The formula min * 2^(failures-1) ensures the first retry uses min_backoff.


282-299: Non-random jitter may reduce effectiveness.

The jitter implementation uses subsec_nanos() which provides time-based determinism rather than randomness. If multiple workers start at similar times, they may compute similar jitter values. For heartbeat purposes this is likely acceptable, but consider using rand crate if stronger distribution is needed.

Also note the jitter is symmetric (±jitter_percent) rather than the more common additive-only pattern (0 to +jitter_percent), which means backoff can be shorter than the base.


301-317: LGTM!

Shutdown handling is correctly implemented. The tokio::select! properly races the sleep against shutdown signals, and the logic correctly handles both channel closure and value changes.


319-357: LGTM!

Metrics methods provide comprehensive observability with appropriate counter and gauge usage. The unwrap_or_default() for timestamp calculation safely handles edge cases.

- Add min_backoff_ms > 0 validation in HeartbeatConfig
- Only validate heartbeat config when replica mode is enabled
- Add period to heartbeat metrics comment
- Emit initial heartbeat immediately before waiting for interval
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Fix all issues with AI agents
In @etl-config/src/shared/heartbeat.rs:
- Around line 119-128: ETL_HEARTBEAT_OPTIONS currently relies on a trailing
space in the first string to separate options when using concat! which is
fragile; update the concat! invocation for ETL_HEARTBEAT_OPTIONS so each
concatenated literal includes an explicit leading or trailing space (e.g., add a
space before "statement_timeout", "lock_timeout", and
"idle_in_transaction_session_timeout") to ensure options are always separated
regardless of string ordering or edits.

In @etl/src/workers/heartbeat.rs:
- Around line 270-301: The apply_jitter function currently uses
SystemTime::subsec_nanos() for pseudo-randomness; replace that with the rand
crate by importing rand::Rng and using rand::thread_rng() to sample a f64 in the
range -1.0..1.0 (or directly sample a uniform value in
-jitter_range..jitter_range) and multiply by jitter_range to produce jitter;
update apply_jitter to compute jitter using thread_rng() and preserve the
existing clamping to a minimum duration, ensuring calculate_backoff continues to
call apply_jitter unchanged and that heartbeat_config.jitter_percent is used to
compute jitter_range.
📜 Review details

Configuration used: Repository UI

Review profile: ASSERTIVE

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 0c67938 and adb3828.

📒 Files selected for processing (4)
  • etl-config/src/shared/heartbeat.rs
  • etl-config/src/shared/pipeline.rs
  • etl/src/metrics.rs
  • etl/src/workers/heartbeat.rs
🧰 Additional context used
📓 Path-based instructions (1)
**/*.rs

📄 CodeRabbit inference engine (AGENTS.md)

**/*.rs: Use snake_case for files and modules
Use CamelCase for types and traits
Use snake_case for functions and variables
Do not leave comments when you remove things
Document all items, public and private, using stdlib tone and precision; only use 'Panics' section when a function can panic
Link types and methods as [Type], [Type::method] in Rust documentation comments
Keep Rust documentation wording concise, correct, and punctuated; reword for clarity while preserving intent
Do not include code examples in Rust documentation; include private helpers for maintainers; apply documentation to modules, types, traits, impls, and functions
Normal comments in Rust should always finish with a period

Files:

  • etl/src/metrics.rs
  • etl-config/src/shared/pipeline.rs
  • etl/src/workers/heartbeat.rs
  • etl-config/src/shared/heartbeat.rs
🧠 Learnings (3)
📚 Learning: 2026-01-13T07:51:54.158Z
Learnt from: CR
Repo: supabase/etl PR: 0
File: AGENTS.md:0-0
Timestamp: 2026-01-13T07:51:54.158Z
Learning: Applies to **/*.rs : Normal comments in Rust should always finish with a period

Applied to files:

  • etl/src/metrics.rs
📚 Learning: 2026-01-13T07:51:54.158Z
Learnt from: CR
Repo: supabase/etl PR: 0
File: AGENTS.md:0-0
Timestamp: 2026-01-13T07:51:54.158Z
Learning: Applies to **/*.rs : Keep Rust documentation wording concise, correct, and punctuated; reword for clarity while preserving intent

Applied to files:

  • etl/src/metrics.rs
📚 Learning: 2026-01-13T07:51:54.159Z
Learnt from: CR
Repo: supabase/etl PR: 0
File: AGENTS.md:0-0
Timestamp: 2026-01-13T07:51:54.159Z
Learning: Rust workspace with crates: `etl/` (core), `etl-api/` (HTTP API), `etl-postgres/`, `etl-destinations/`, `etl-replicator/`, `etl-config/`, `etl-telemetry/`, `etl-examples/`, `etl-benchmarks/`

Applied to files:

  • etl/src/metrics.rs
  • etl/src/workers/heartbeat.rs
🧬 Code graph analysis (2)
etl/src/workers/heartbeat.rs (2)
etl-config/src/shared/pipeline.rs (1)
  • heartbeat_config (119-125)
etl/src/replication/client.rs (1)
  • connect_regular (163-168)
etl-config/src/shared/heartbeat.rs (1)
etl-config/src/shared/pipeline.rs (1)
  • validate (130-155)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: depthfirst Bot
🔇 Additional comments (20)
etl-config/src/shared/heartbeat.rs (4)

1-40: LGTM! Well-structured configuration type with comprehensive documentation.

The struct follows Rust conventions with proper derive macros for serialization/deserialization, optional utoipa integration via feature flag, and clear documentation explaining the purpose and defaults for each field.


42-90: Validation logic is thorough and covers all edge cases.

The validate() method properly checks all constraints documented in the doc comment. The validation order is logical (interval → jitter → min_backoff → min vs max comparison).


92-117: LGTM! Default implementation and serde helpers are consistent.

The Default implementation correctly uses the associated constants, and the serde default functions properly delegate to these constants, ensuring a single source of truth.


130-191: LGTM! Test coverage is comprehensive.

Tests validate default values, the heartbeat options constant, and all validation error cases (zero interval, jitter > 100, min > max backoff, zero min backoff).

etl-config/src/shared/pipeline.rs (5)

5-8: LGTM! Import organization is correct.

The HeartbeatConfig type is properly imported alongside other shared types.


71-79: LGTM! New replica mode fields are well-documented.

The primary_connection and heartbeat fields are properly documented with clear explanations of when they're used and their relationship to each other.


107-125: LGTM! Helper methods provide clean API for replica mode handling.

is_replica_mode() is a clear predicate, and heartbeat_config() provides sensible defaulting behavior when replica mode is enabled but no explicit heartbeat config is provided.


147-152: LGTM! Conditional validation is correct.

Heartbeat config is only validated when replica mode is enabled (primary_connection is set), and only if a heartbeat config is actually provided. This aligns with the PR requirements.


185-188: LGTM! PipelineConfigWithoutSecrets properly mirrors the new fields.

The secrets-free variant correctly includes both new fields, and the From implementation properly maps primary_connection through the secrets-stripping conversion while passing heartbeat directly (since it contains no secrets).

Also applies to: 212-213

etl/src/metrics.rs (2)

18-24: LGTM! Metric constants follow established naming conventions.

The new heartbeat metrics use the etl_heartbeat_ prefix consistently and are logically grouped with a clear comment. Based on learnings, the comment properly ends with a period.


106-136: LGTM! Metric registrations are appropriate for the metric semantics.

  • Counters for monotonic values (emissions, failures, connection attempts) - correct.
  • Gauge for consecutive failures (resets to 0 on success) - correct.
  • Gauge with Unit::Seconds for timestamp - correct.

The descriptions are clear and consistent with other metrics in the file.

etl/src/workers/heartbeat.rs (9)

1-24: LGTM! Module documentation and imports are well-organized.

The module-level documentation clearly explains the purpose (maintaining replication slot activity on the primary when replicating from a read replica) and the mechanism used (pg_logical_emit_message()).


25-39: LGTM! Error types are well-defined with clear descriptions.

The HeartbeatError enum covers the expected failure modes: connection failures, emission failures, and graceful shutdown. Using thiserror for error derivation is idiomatic.


41-70: LGTM! WorkerHandle implementation is correct.

The handle properly wraps the JoinHandle and maps panics to the appropriate error kind. The state() method returning unit type makes sense since the heartbeat worker has no external state to expose.


72-107: LGTM! Worker struct and constructor are straightforward.

The struct holds all necessary state for heartbeat operation, and the constructor initializes consecutive failures to 0 as expected.


109-183: LGTM! Main loop handles all lifecycle states correctly.

The run() method properly:

  • Checks for shutdown before connection attempts.
  • Handles graceful shutdown from heartbeat_loop().
  • Implements backoff with jitter on errors.
  • Resets the consecutive failures gauge on shutdown.

The logging provides good observability into the worker's state.


185-217: LGTM! Connection handling is robust.

The method records connection attempts, logs connection details, and properly resets consecutive failures on success. The connection is made in regular (non-replication) mode as appropriate for executing pg_logical_emit_message().


219-244: Consider the behavior when emit fails after successful connection.

The heartbeat_loop() emits immediately on entry then waits for the interval. However, if emit_heartbeat() fails (returns EmitFailed), the error propagates up to connect_and_heartbeat(), which returns it to run(). In run(), this increments consecutive_failures even though the connection was successful (consecutive_failures was reset to 0 on line 212).

This is actually correct behavior since consecutive failures should track consecutive heartbeat failures, not just connection failures. Just confirming the design is intentional.


303-319: LGTM! Shutdown handling is cooperative and correct.

is_shutdown_requested() checks the current state, while wait_with_shutdown() uses tokio::select! to race the sleep against the shutdown signal. The return value semantics (true = shutdown, false = timeout) are clear from the documentation.


321-359: LGTM! Metrics recording is consistent with the registered metrics.

The methods correctly use counters for emissions and failures, and gauges for consecutive failures and timestamp. The pipeline_id label ensures metrics are properly attributed.

- ETL_HEARTBEAT_OPTIONS: use explicit leading spaces for each option
  to avoid fragile trailing space dependencies
- apply_jitter: replace SystemTime::subsec_nanos() pseudo-randomness
  with rand::thread_rng().gen_range() for proper randomization
- Add HeartbeatWorker import and state tracking in pipeline
- Start heartbeat worker when replica mode enabled (primary_connection set)
- Wait for heartbeat worker during shutdown
- Move rand from dev-dependencies to dependencies for production use
- Add primary_connection and heartbeat fields to test utils PipelineBuilder
- Add primary_connection: None, heartbeat: None to benchmark PipelineConfig
- Add primary_connection: None, heartbeat: None to example PipelineConfig
- Update lib.rs documentation example with new fields
- Add read replica support mention to lib.rs features
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Fix all issues with AI agents
In @etl/src/pipeline.rs:
- Around line 259-265: The heartbeat worker failure is logged but not appended
to the shared errors vector like apply and table sync workers; update the
section around heartbeat_worker and heartbeat_handle.wait().await to push the
error into the errors collection when Err(err) occurs (e.g.,
errors.push(err.into()) or map the error type to the same Error type used by
apply/table sync) so heartbeat failures are treated consistently, or
alternatively add a short comment above the heartbeat_worker block explaining
that heartbeat failures are intentionally non-fatal and will not be added to
errors.
- Around line 175-188: self.config.heartbeat_config() returns
Option<HeartbeatConfig> but HeartbeatWorker::new expects a HeartbeatConfig;
inside the existing if let Some(primary_config) { ... } block you should unwrap
the option before passing it to HeartbeatWorker::new (e.g., map to a local
heartbeat_config via matching or call .expect/.unwrap with a clear message), so
replace the direct call to self.config.heartbeat_config() with an unwrapped
HeartbeatConfig and pass that into HeartbeatWorker::new (refer to
heartbeat_config(), HeartbeatWorker::new, primary_config, and heartbeat_worker
to locate the code).
📜 Review details

Configuration used: Repository UI

Review profile: ASSERTIVE

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between adb3828 and dd687e3.

📒 Files selected for processing (5)
  • etl-config/src/shared/heartbeat.rs
  • etl/Cargo.toml
  • etl/src/pipeline.rs
  • etl/src/test_utils/pipeline.rs
  • etl/src/workers/heartbeat.rs
🧰 Additional context used
📓 Path-based instructions (2)
**/*.rs

📄 CodeRabbit inference engine (AGENTS.md)

**/*.rs: Use snake_case for files and modules
Use CamelCase for types and traits
Use snake_case for functions and variables
Do not leave comments when you remove things
Document all items, public and private, using stdlib tone and precision; only use 'Panics' section when a function can panic
Link types and methods as [Type], [Type::method] in Rust documentation comments
Keep Rust documentation wording concise, correct, and punctuated; reword for clarity while preserving intent
Do not include code examples in Rust documentation; include private helpers for maintainers; apply documentation to modules, types, traits, impls, and functions
Normal comments in Rust should always finish with a period

Files:

  • etl/src/test_utils/pipeline.rs
  • etl/src/pipeline.rs
  • etl-config/src/shared/heartbeat.rs
  • etl/src/workers/heartbeat.rs
**/Cargo.toml

📄 CodeRabbit inference engine (AGENTS.md)

Use kebab-case for crate names

Files:

  • etl/Cargo.toml
🧠 Learnings (2)
📚 Learning: 2026-01-13T07:51:54.159Z
Learnt from: CR
Repo: supabase/etl PR: 0
File: AGENTS.md:0-0
Timestamp: 2026-01-13T07:51:54.159Z
Learning: Rust workspace with crates: `etl/` (core), `etl-api/` (HTTP API), `etl-postgres/`, `etl-destinations/`, `etl-replicator/`, `etl-config/`, `etl-telemetry/`, `etl-examples/`, `etl-benchmarks/`

Applied to files:

  • etl/Cargo.toml
  • etl/src/pipeline.rs
  • etl/src/workers/heartbeat.rs
📚 Learning: 2026-01-13T07:51:54.159Z
Learnt from: CR
Repo: supabase/etl PR: 0
File: AGENTS.md:0-0
Timestamp: 2026-01-13T07:51:54.159Z
Learning: Build with `cargo build --workspace --all-targets --all-features`

Applied to files:

  • etl/Cargo.toml
🧬 Code graph analysis (1)
etl/src/pipeline.rs (1)
etl-config/src/shared/pipeline.rs (2)
  • is_replica_mode (112-114)
  • heartbeat_config (119-125)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: depthfirst Bot
🔇 Additional comments (22)
etl/Cargo.toml (2)

35-35: LGTM!

Adding thiserror as a runtime dependency is appropriate for the new heartbeat error handling introduced in this PR.


31-31: The thread_rng feature is valid and properly configured. The rand crate v0.9.2 includes an explicit thread_rng feature (which is a default feature), and since the workspace definition disables default features (default-features = false), explicitly enabling it in etl/Cargo.toml is correct and necessary.

etl-config/src/shared/heartbeat.rs (5)

1-40: LGTM! Well-structured configuration with good documentation.

The struct is well-documented with clear field descriptions including default values. The use of serde defaults and conditional utoipa schema derivation is appropriate.


55-89: LGTM! Validation logic is comprehensive and correct.

The validation covers all edge cases: zero interval, excessive jitter, zero minimum backoff, and inverted backoff bounds. The documentation accurately describes the checks.


92-117: LGTM!

Default implementation and serde helper functions correctly use the associated constants, ensuring consistency.


119-128: LGTM! Well-documented connection options with appropriate timeouts.

The short timeouts (5s statement/lock) are appropriate for heartbeat operations that should fail fast, while the 30s idle timeout provides reasonable cleanup for connection issues.


130-195: LGTM! Comprehensive test coverage.

Tests cover all validation scenarios including edge cases (zero interval, excessive jitter, inverted backoff bounds, zero minimum backoff) and verify default values and options string content.

etl/src/workers/heartbeat.rs (11)

1-24: LGTM! Clear module documentation and appropriate imports.

The module documentation clearly explains the purpose (maintaining replication slot activity when replicating from read replicas) and mentions the PostgreSQL 15+ requirement.


26-71: LGTM! Error types and handle implementation are well-structured.

The error variants cover the expected failure modes. The wait() implementation properly handles the case where the handle was already consumed.


73-108: LGTM! Worker structure and constructor are well-defined.

The fields capture all necessary state for heartbeat operations, and the constructor properly initializes the worker.


110-184: LGTM! Worker lifecycle is well-implemented.

The main loop properly handles shutdown checks before connection attempts, distinguishes between clean shutdown and errors, and resets metrics on termination. The backoff wait is also shutdown-aware.


186-218: LGTM! Connection and heartbeat flow is correct.

The method properly records connection attempts, uses the regular (non-replication) connection mode, and resets consecutive failures only after a successful connection.


220-245: LGTM! Heartbeat loop is well-structured.

The loop properly checks connection health before each emission and cleanly exits on shutdown. The immediate first heartbeat (before waiting) is a good pattern for quick feedback.


247-269: LGTM! Heartbeat emission uses the correct PostgreSQL function.

Using pg_logical_emit_message(false, ...) with non-transactional mode ensures immediate WAL visibility. The empty payload minimizes overhead. The comment at line 265 helpfully explains the metric recording responsibility.


271-283: LGTM! Backoff calculation is correct with proper overflow protection.

The exponential backoff formula min * 2^(failures-1) correctly starts at the minimum backoff for the first failure and doubles thereafter, with saturation arithmetic preventing overflow.


285-299: LGTM! Jitter implementation is correct.

The symmetric jitter around the base backoff using rand::thread_rng().gen_range() properly randomizes retry timing to prevent thundering herd. The 0.1s minimum floor is a sensible safety bound.


301-317: LGTM! Shutdown coordination is properly implemented.

The tokio::select! pattern correctly races the sleep against shutdown signals, and the result handling covers both channel closure and explicit shutdown requests.


319-357: LGTM! Metrics are properly recorded.

The emission timestamp correctly uses Unix epoch seconds, and the failure tracking properly separates the counter (total failures) from the gauge (current consecutive failures).

etl/src/pipeline.rs (3)

19-19: LGTM!

Import for heartbeat worker types is correctly added.


37-44: LGTM!

The optional heartbeat_worker field correctly represents that heartbeat is only active in replica mode.


58-61: LGTM!

Documentation clearly explains the replica mode behavior with heartbeat worker.

etl/src/test_utils/pipeline.rs (1)

152-153: LGTM! Test utilities correctly default to non-replica mode.

Setting primary_connection: None and heartbeat: None ensures test pipelines operate in the standard (non-replica) mode. Consider adding builder methods like with_primary_connection() and with_heartbeat() if replica mode testing becomes necessary in the future.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Fix all issues with AI agents
In @etl-examples/src/main.rs:
- Around line 184-185: Add short inline comments for the new replica-mode fields
to clarify their purpose for example readers: annotate primary_connection to
indicate it holds the primary DB connection used for writes (or None when
running as a read replica) and annotate heartbeat to explain it stores the
optional heartbeat/health-check info used to determine replica liveness and
failover readiness; place these comments next to the primary_connection and
heartbeat fields in the struct/initializer where they appear (e.g.,
primary_connection: None, heartbeat: None).
📜 Review details

Configuration used: Repository UI

Review profile: ASSERTIVE

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between dd687e3 and 7fbad38.

📒 Files selected for processing (3)
  • etl-benchmarks/benches/table_copies.rs
  • etl-examples/src/main.rs
  • etl/src/lib.rs
🧰 Additional context used
📓 Path-based instructions (1)
**/*.rs

📄 CodeRabbit inference engine (AGENTS.md)

**/*.rs: Use snake_case for files and modules
Use CamelCase for types and traits
Use snake_case for functions and variables
Do not leave comments when you remove things
Document all items, public and private, using stdlib tone and precision; only use 'Panics' section when a function can panic
Link types and methods as [Type], [Type::method] in Rust documentation comments
Keep Rust documentation wording concise, correct, and punctuated; reword for clarity while preserving intent
Do not include code examples in Rust documentation; include private helpers for maintainers; apply documentation to modules, types, traits, impls, and functions
Normal comments in Rust should always finish with a period

Files:

  • etl/src/lib.rs
  • etl-benchmarks/benches/table_copies.rs
  • etl-examples/src/main.rs
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: depthfirst Bot
🔇 Additional comments (3)
etl/src/lib.rs (2)

19-19: LGTM!

The feature description is concise and follows the existing documentation style.


81-82: LGTM!

The example correctly demonstrates the new optional fields for replica mode configuration, maintaining consistency with the updated PipelineConfig API.

etl-benchmarks/benches/table_copies.rs (1)

348-349: LGTM!

The new optional fields are correctly set to None for the benchmark, which doesn't require replica mode functionality.

- Unwrap heartbeat_config() with expect() since it's guaranteed to be Some
  when primary_connection is set
- Add comment explaining that heartbeat failures are intentionally non-fatal
Clarifies purpose of primary_connection and heartbeat fields for example readers.
@iambriccardo
Copy link
Contributor

Hi, regarding what we discussed yesterday, what I would like is to verify two things:

  1. How does the system work when there is no traffic? Is the slot advancing in the replica? Is the heartbeat needed (I assume so since Sequin used it, but I would like to figure out the exact reason why before adding code for doing it).
  2. Does the system work from Postgres 16+? If yes, does it work on read replicas with logical or physical replication? Or both?

Thanks a lot for spending your time on this! It will be a nice feature to have if we manage to implement this properly.

@JohnCari
Copy link
Author

Hi, regarding what we discussed yesterday, what I would like is to verify two things:

1. How does the system work when there is no traffic? Is the slot advancing in the replica? Is the heartbeat needed (I assume so since Sequin used it, but I would like to figure out the exact reason why before adding code for doing it).

2. Does the system work from Postgres 16+? If yes, does it work on read replicas with logical or physical replication? Or both?

Thanks a lot for spending your time on this! It will be a nice feature to have if we manage to implement this properly.

No problem!, Happy to help I love supabase its a great product!

I have to head outside but when i return in a few hours I will be happy to answer those questions

@JohnCari
Copy link
Author

JohnCari commented Jan 14, 2026

Hi @iambriccardo, here are my answers:

1. How does the system work with no traffic? Is the heartbeat needed?

Yes, the heartbeat is needed. Here's why:

When there's no write traffic, the replication slot on the primary can't advance — it has no WAL entries to confirm. The slot stagnates → WAL piles up → eventually hits max_slot_wal_keep_size and gets invalidated.

pg_logical_emit_message solves this by writing directly to WAL without touching any table — keeps the slot advancing even when the DB is idle.

This is exactly what Sequin does: "The only operation Sequin performs on the primary is to call pg_logical_emit_message to ensure proper WAL checkpointing."

2. Does it work from Postgres 16+? Logical or physical replication?

  • PG 16+ required — logical replication from standbys was added in PG 16
  • Between primary and replica: Physical (streaming) replication
  • From replica to ETL consumer: Logical replication

So the flow is: Primary → (physical) → Replica → (logical) → ETL

Both primary and standby need wal_level = logical and hot_standby_feedback = on.

Happy to add docs or tests if that would help!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Support for replication slots on read replica

2 participants