Skip to content

Conversation

@JohnCari
Copy link

@JohnCari JohnCari commented Jan 13, 2026

Summary

Implements support for replication slots on read replicas by adding a HeartbeatWorker that emits periodic messages to the primary database.

Closes #535

Changes

  • HeartbeatWorker: New worker that periodically calls pg_logical_emit_message(false, 'etl_heartbeat', '') on the primary to keep replica replication slots active
  • Dual-connection architecture: Pipeline now supports separate connections for replica (WAL streaming) and primary (heartbeats)
  • HeartbeatConfig: Configurable intervals and exponential backoff with jitter (1s-60s, 25% jitter)
  • PostgreSQL version detection: Validates primary supports pg_logical_emit_message (requires v15+)
  • ETL_HEARTBEAT_OPTIONS: Connection pool settings optimized for heartbeat operations (5s statement/lock timeout, 30s idle timeout)
  • Metrics: Added Prometheus-style metrics for heartbeat emissions, failures, connection attempts, and consecutive failures

Files Modified

etl-config/src/shared/

  • mod.rs - Export heartbeat module
  • heartbeat.rs - NEW: HeartbeatConfig with serde defaults
  • connection.rs - Add ETL_HEARTBEAT_OPTIONS
  • pipeline.rs - Add primary_connection and heartbeat fields, is_replica_mode() helper

etl/src/

  • workers/mod.rs - Export heartbeat module
  • workers/heartbeat.rs - NEW: HeartbeatWorker implementation
  • pipeline.rs - Integrate HeartbeatWorker lifecycle
  • metrics.rs - Add heartbeat metrics constants
  • lib.rs - Update doc example
  • test_utils/pipeline.rs - Update PipelineBuilder

etl-benchmarks/benches/table_copies.rs - Add new PipelineConfig fields
etl-examples/src/main.rs - Add new PipelineConfig fields

Test plan

Unit tests included:

  • HeartbeatConfig default values and validation
  • Backoff calculation with exponential growth and capping
  • Connection state transitions
  • Serde serialization

Summary by CodeRabbit

  • New Features

    • Added a heartbeat worker to maintain replication-slot activity and manage primary connections.
  • Configuration

    • PipelineConfig now accepts optional primary_connection and heartbeat settings with sensible defaults.
  • Metrics

    • Introduced heartbeat-specific metrics (emissions, failures, connection attempts, last emission timestamp, consecutive failures).
  • Public API

    • Exposed heartbeat configuration and worker types to enable integration and lifecycle management.
  • Documentation

    • Simplified docs and updated examples to reflect new optional fields.
  • Tests

    • Added tests for heartbeat defaults, options, metrics, and related behavior.

✏️ Tip: You can customize this high-level summary in your review settings.

@JohnCari JohnCari requested a review from a team as a code owner January 13, 2026 03:28
…tions

Add PgReplicationClient::connect_regular() method that creates a regular
PostgreSQL connection (without ReplicationMode::Logical) for operations
like heartbeat emissions that don't require replication protocol.
@JohnCari
Copy link
Author

@iambriccardo ready for review

@iambriccardo
Copy link
Contributor

Thanks!

I have not yet looked in detail at how replication through read replicas is working. I recall we can't do streaming of the WAL from read replicas (besides newer Postgres versions) but I may be wrong.

Also, why do you need these heartbeats being sent via logical messages? Can't you send status updates with the same LSNs via the same replication connection?

@JohnCari
Copy link
Author

Thanks!

I have not yet looked in detail at how replication through read replicas is working. I recall we can't do streaming of the WAL from read replicas (besides newer Postgres versions) but I may be wrong.

Also, why do you need these heartbeats being sent via logical messages? Can't you send status updates with the same LSNs via the same replication connection?

Hi @iambriccardo

From what I've read, status updates (StandbyStatusUpdate) only communicate "I've consumed up to LSN X" from consumer to server - they don't generate new WAL entries.

When streaming from a read replica during idle periods (no writes on primary), there's no new WAL flowing through the replication chain. The heartbeat approach using pg_logical_emit_message(false, 'etl_heartbeat', '') on the primary creates minimal WAL entries that:

  1. Flow from primary → replica via physical replication
  2. Keep the replica's logical replication slot active
  3. Prevent slot staleness during prolonged idle periods

This follows the same pattern https://sequinstream.com/docs/reference/databases#using-sequin-with-a-replica (link provided by @abhishiv in #535) - their docs state: "The only operation that Sequin will perform on the primary database is to call the function pg_logical_emit_message to ensure proper WAL checkpointing."

@iambriccardo
Copy link
Contributor

Thanks!
I have not yet looked in detail at how replication through read replicas is working. I recall we can't do streaming of the WAL from read replicas (besides newer Postgres versions) but I may be wrong.
Also, why do you need these heartbeats being sent via logical messages? Can't you send status updates with the same LSNs via the same replication connection?

Hi @iambriccardo

From what I've read, status updates (StandbyStatusUpdate) only communicate "I've consumed up to LSN X" from consumer to server - they don't generate new WAL entries.

When streaming from a read replica during idle periods (no writes on primary), there's no new WAL flowing through the replication chain. The heartbeat approach using pg_logical_emit_message(false, 'etl_heartbeat', '') on the primary creates minimal WAL entries that:

  1. Flow from primary → replica via physical replication
  2. Keep the replica's logical replication slot active
  3. Prevent slot staleness during prolonged idle periods

This follows the same pattern https://sequinstream.com/docs/reference/databases#using-sequin-with-a-replica (link provided by @abhishiv in #535) - their docs state: "The only operation that Sequin will perform on the primary database is to call the function pg_logical_emit_message to ensure proper WAL checkpointing."

I was reading more about this, thanks.

From my understanding of PostgreSQL internals, even when there is no user traffic, the primary still sends periodic keepalive messages to the replica. These are triggered every wal_sender_timeout / 2 and are used to request feedback about replication progress. However, the end_lsn reported in these messages does not advance on its own. It only moves forward when new WAL is actually generated on the primary. If the primary is idle and no WAL is being produced, then both the WAL position and the reported end_lsn remain stable. What can happen is that keepalive messages continue to be exchanged, but they do so with the same LSN values, purely to maintain the connection and detect stalled replicas.

Separately, am I correct that this setup only works on PostgreSQL versions that support creating logical replication slots on read replicas? My understanding is that this capability was introduced relatively recently, and older versions require slots to exist exclusively on the primary, even if the logical decoding client connects through a replica.

Did you try to see how the system behaves when you don't emit logical messages?

@JohnCari
Copy link
Author

Thanks!
I have not yet looked in detail at how replication through read replicas is working. I recall we can't do streaming of the WAL from read replicas (besides newer Postgres versions) but I may be wrong.
Also, why do you need these heartbeats being sent via logical messages? Can't you send status updates with the same LSNs via the same replication connection?

Hi @iambriccardo
From what I've read, status updates (StandbyStatusUpdate) only communicate "I've consumed up to LSN X" from consumer to server - they don't generate new WAL entries.
When streaming from a read replica during idle periods (no writes on primary), there's no new WAL flowing through the replication chain. The heartbeat approach using pg_logical_emit_message(false, 'etl_heartbeat', '') on the primary creates minimal WAL entries that:

  1. Flow from primary → replica via physical replication
  2. Keep the replica's logical replication slot active
  3. Prevent slot staleness during prolonged idle periods

This follows the same pattern https://sequinstream.com/docs/reference/databases#using-sequin-with-a-replica (link provided by @abhishiv in #535) - their docs state: "The only operation that Sequin will perform on the primary database is to call the function pg_logical_emit_message to ensure proper WAL checkpointing."

I was reading more about this, thanks.

From my understanding of PostgreSQL internals, even when there is no user traffic, the primary still sends periodic keepalive messages to the replica. These are triggered every wal_sender_timeout / 2 and are used to request feedback about replication progress. However, the end_lsn reported in these messages does not advance on its own. It only moves forward when new WAL is actually generated on the primary. If the primary is idle and no WAL is being produced, then both the WAL position and the reported end_lsn remain stable. What can happen is that keepalive messages continue to be exchanged, but they do so with the same LSN values, purely to maintain the connection and detect stalled replicas.

Separately, am I correct that this setup only works on PostgreSQL versions that support creating logical replication slots on read replicas? My understanding is that this capability was introduced relatively recently, and older versions require slots to exist exclusively on the primary, even if the logical decoding client connects through a replica.

Did you try to see how the system behaves when you don't emit logical messages?

Hi @iambriccardo

Re: PostgreSQL version for replica slots

You're correct - logical decoding from standbys requires PostgreSQL 16+. My PR currently validates PG 15+ on the primary for pg_logical_emit_message, though that function has been available since PG 9.6. I should add documentation clarifying the PG 16+ requirement for the replica connection, or add a version check there as well.

Re: Testing without heartbeats

I haven't tested the behavior without emitting logical messages during extended idle periods. I based the implementation on Sequin's documented approach rather than empirical testing. I can set up a test scenario to observe the actual behavior if that would be helpful for the review.

Would you like me to:

  1. Add PG 16+ version validation for the replica connection?
  2. Run some idle-period tests and document the findings?

- Resolved merge conflicts in connection.rs, pipeline.rs, and test_utils/pipeline.rs
- Added thiserror dependency to etl crate for HeartbeatError
- Fixed heartbeat.rs to use correct ShutdownRx API (has_changed/changed)
- Replaced rand-based jitter with timestamp-based jitter
@coderabbitai
Copy link

coderabbitai bot commented Jan 13, 2026

Warning

Rate limit exceeded

@JohnCari has exceeded the limit for the number of commits that can be reviewed per hour. Please wait 8 minutes and 52 seconds before requesting another review.

⌛ How to resolve this issue?

After the wait time has elapsed, a review can be triggered using the @coderabbitai review command as a PR comment. Alternatively, push new commits to this PR.

We recommend that you space out your commits to avoid hitting the rate limit.

🚦 How do rate limits work?

CodeRabbit enforces hourly rate limits for each developer per organization.

Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout.

Please see our FAQ for further information.

📥 Commits

Reviewing files that changed from the base of the PR and between d52f472 and 1cb5a6d.

📒 Files selected for processing (6)
  • etl-config/src/shared/base.rs
  • etl-config/src/shared/connection.rs
  • etl/src/lib.rs
  • etl/src/metrics.rs
  • etl/src/pipeline.rs
  • etl/src/workers/mod.rs
📝 Walkthrough

Walkthrough

Adds heartbeat support for read-replica pipelines: new HeartbeatConfig and re-export, optional PipelineConfig fields primary_connection and heartbeat, ETL heartbeat connection options, HeartbeatWorker with lifecycle/backoff/metrics, pipeline integration to start/await the worker, and related metrics and tests.

Changes

Cohort / File(s) Summary
Configuration Schema
etl-config/src/shared/heartbeat.rs, etl-config/src/shared/pipeline.rs, etl-config/src/shared/mod.rs
New public HeartbeatConfig with serde defaults; added primary_connection: Option<PgConnectionConfig> and heartbeat: Option<HeartbeatConfig> to PipelineConfig and PipelineConfigWithoutSecrets; new defaults/helpers and is_replica_mode()/heartbeat_config(); re-exported heartbeat module.
Connection Management
etl-config/src/shared/connection.rs, etl/src/replication/client.rs
Added APP_NAME_HEARTBEAT and ETL_HEARTBEAT_OPTIONS (short timeouts, heartbeat app name); removed some old replicator role constants; added PgReplicationClient::connect_regular() with TLS/non-TLS helpers using heartbeat options.
Heartbeat Worker Implementation
etl/src/workers/heartbeat.rs, etl/src/workers/mod.rs
New HeartbeatWorker, HeartbeatWorkerHandle, HeartbeatError, connection state and watch channel, PG version verification, heartbeat emit loop with exponential backoff + jitter, metrics updates, shutdown handling; exported via workers mod.
Pipeline Integration
etl/src/pipeline.rs, etl/src/test_utils/pipeline.rs
Pipeline state extended to hold optional heartbeat worker handle; pipeline starts heartbeat worker when primary_connection present and awaits it alongside other workers; test utils initialize primary_connection: None and heartbeat: None.
Metrics & Dependency
etl/src/metrics.rs, etl/Cargo.toml
Added heartbeat metric constants and registrations (etl_heartbeat_emissions_total, etl_heartbeat_failures_total, etl_heartbeat_connection_attempts_total, etl_heartbeat_last_emission_timestamp, etl_heartbeat_consecutive_failures); added thiserror dependency.
Examples, Docs & Tests
etl-benchmarks/benches/table_copies.rs, etl-examples/src/main.rs, etl/src/lib.rs, etl-config/... tests
Updated import path for NotifyingStore; examples/docs updated to set primary_connection: None and heartbeat: None; tests added/adjusted for heartbeat options, heartbeat defaults, and serialization expectations.

Sequence Diagram(s)

sequenceDiagram
    participant Pipeline
    participant HBW as HeartbeatWorker
    participant Primary as PrimaryDB
    participant Metrics

    Pipeline->>HBW: start(primary_config, heartbeat_config)
    activate HBW
    HBW->>Primary: connect (ETL_HEARTBEAT_OPTIONS)
    alt connected
        Primary-->>HBW: connection established
        HBW->>Primary: query_version()
        alt version ok
            loop every interval_secs
                HBW->>Primary: execute_heartbeat()
                alt success
                    HBW->>Metrics: inc(emissions), set(last_emission)
                else failure
                    HBW->>Metrics: inc(failures), inc(consecutive_failures)
                    HBW->>HBW: backoff_with_jitter()
                end
            end
        else unsupported version
            HBW->>Metrics: inc(failures)
            HBW->>HBW: backoff_with_jitter()
        end
    else connection failed
        HBW->>Metrics: inc(connection_attempts)
        HBW->>HBW: backoff_with_jitter()
    end
    deactivate HBW
    Pipeline->>HBW: await completion (optional)
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Possibly related PRs

🚥 Pre-merge checks | ✅ 5
✅ Passed checks (5 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title clearly and concisely summarizes the main change: adding a HeartbeatWorker for read replica replication slots, which is the primary objective.
Linked Issues check ✅ Passed The PR successfully implements support for replication slots on read replicas (#535), including dual-connection architecture, pg_logical_emit_message on primary, backoff/jitter configuration, and version checks.
Out of Scope Changes check ✅ Passed All changes are scoped to heartbeat feature implementation: HeartbeatWorker, HeartbeatConfig, connection/pipeline enhancements, metrics, and test utilities; no unrelated refactoring detected.
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Merge upstream doc comments and structure with heartbeat additions:
- connection.rs: Keep upstream docs, add APP_NAME_HEARTBEAT and ETL_HEARTBEAT_OPTIONS
- pipeline.rs: Keep upstream docs, add primary_connection, heartbeat fields and methods
Keep upstream's extensive documentation while adding primary_connection
and heartbeat fields for read replica support.
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 9

🤖 Fix all issues with AI agents
In @etl-config/src/shared/heartbeat.rs:
- Around line 43-70: Add validation to HeartbeatConfig to reject invalid values:
implement a HeartbeatConfig::validate(&self) -> Result<(), String> that checks
interval_secs > 0, initial_backoff_secs <= max_backoff_secs, and jitter_percent
<= 100, returning descriptive errors for each violation; call this validate()
after deserialization (or expose it for callers to use) so configs with
interval_secs == 0, jitter_percent > 100, or initial_backoff_secs >
max_backoff_secs are rejected early.

In @etl/src/replication/client.rs:
- Around line 204-228: Extract the duplicated TLS certificate loading and
ClientConfig construction into a shared function (e.g.,
build_tls_config(pg_connection_config: &PgConnectionConfig) ->
EtlResult<ClientConfig>) and have both connect_regular_tls and connect_tls call
it; move the CertificateDer::pem_slice_iter loop and creation of
rustls::RootCertStore +
ClientConfig::builder().with_root_certificates(...).with_no_client_auth() into
that helper, return the ClientConfig, and replace the inlined code in
connect_regular_tls and connect_tls with a call to the new helper (propagate
errors using EtlResult).

In @etl/src/test_utils/pipeline.rs:
- Around line 95-110: Add builder methods to PipelineBuilder to set the
previously added optional fields: implement with_primary_connection(self,
primary_connection: PgConnectionConfig) -> Self and with_heartbeat(self,
heartbeat: HeartbeatConfig) -> Self, add matching fields primary_connection:
Option<PgConnectionConfig> and heartbeat: Option<HeartbeatConfig> to the
PipelineBuilder struct, ensure you import HeartbeatConfig and
PgConnectionConfig, set the fields in those methods and return self so build()
will pass the configured values into PipelineConfig (which currently sets
primary_connection and heartbeat to None).

In @etl/src/workers/heartbeat.rs:
- Around line 276-291: Add unit tests covering calculate_backoff behavior,
HeartbeatError messages, and configuration edge cases: write tests for
calculate_backoff (call the function with varying attempts and a BackoffConfig
to assert base/backoff growth, max cap, and that jitterized results stay within
expected +/- jitter bounds and never exceed max_backoff), add tests that format
each HeartbeatError variant (e.g., HeartbeatError::ConnectionFailed,
HeartbeatError::InvalidConfig) using to_string() or Display and assert the
expected message substrings, and add tests for
HeartbeatConfig/parse_config/from_env edge cases (missing keys, invalid values,
boundary values) to ensure defaults and errors are handled as intended; locate
helpers by the function names calculate_backoff, BackoffConfig, HeartbeatError,
and HeartbeatConfig/parse_config to add these tests.
- Around line 25-35: The HeartbeatError enum and its variants are undocumented;
add doc comments for the enum and each variant (HeartbeatError,
ConnectionFailed, UnsupportedVersion, EmitFailed, VersionQueryFailed) describing
their purpose and when they are used, e.g., document that HeartbeatError
represents heartbeat-related failures, ConnectionFailed indicates inability to
connect to primary, UnsupportedVersion indicates PostgreSQL version below
required 15, EmitFailed indicates failure to send a heartbeat, and
VersionQueryFailed indicates failure to query the DB version; place /// doc
comments immediately above the enum and each variant.
- Around line 54-69: Public struct HeartbeatWorkerHandle and its async method
wait lack documentation; add doc comments for HeartbeatWorkerHandle (describe
its role as a handle to the heartbeat background task and its contained
join_handle) and for wait (describe behavior: awaits join_handle, returns
Result<(), HeartbeatError>, and maps task panic to HeartbeatError::EmitFailed
while logging). Place /// doc comments above the struct and above the wait
method, referencing join_handle, HeartbeatError, and the panic/log behavior so
consumers understand usage and error semantics.
- Around line 37-52: Add missing documentation comments for the private enum
ConnectionState and its helper method as_str: document what the enum represents
(the heartbeat connection lifecycle states: Disconnected, Connecting, Connected)
and what as_str returns/uses; add brief doc comments above the enum and above
the as_str impl (or its method) following project doc style so private items are
documented like public ones.
- Around line 71-96: The watch channel receiver created in HeartbeatWorker::new
is immediately dropped (let (state_tx, _) = watch::channel(...)), which is
either unnecessary or accidentally discards intended observers; fix by either
(A) removing the watch channel and replacing state_tx:
watch::Sender<ConnectionState> with a simple ConnectionState field (e.g., state)
and update update_state to mutate self.state directly, or (B) expose and store
the receiver (watch::Receiver<ConnectionState>) alongside state_tx so external
observers can subscribe; also add doc comments to the HeartbeatWorker struct and
its new method describing purpose, fields (pipeline_id, primary_config,
heartbeat_config, shutdown_rx, state_tx or state), and intended observation
behavior so code guidelines are met.
📜 Review details

Configuration used: Repository UI

Review profile: ASSERTIVE

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 731a06f and d3863cd.

📒 Files selected for processing (14)
  • etl-benchmarks/benches/table_copies.rs
  • etl-config/src/shared/connection.rs
  • etl-config/src/shared/heartbeat.rs
  • etl-config/src/shared/mod.rs
  • etl-config/src/shared/pipeline.rs
  • etl-examples/src/main.rs
  • etl/Cargo.toml
  • etl/src/lib.rs
  • etl/src/metrics.rs
  • etl/src/pipeline.rs
  • etl/src/replication/client.rs
  • etl/src/test_utils/pipeline.rs
  • etl/src/workers/heartbeat.rs
  • etl/src/workers/mod.rs
🧰 Additional context used
📓 Path-based instructions (2)
**/*.rs

📄 CodeRabbit inference engine (AGENTS.md)

**/*.rs: Use snake_case for files and modules
Use CamelCase for types and traits
Use snake_case for functions and variables
Do not leave comments when you remove things
Document all items, public and private, using stdlib tone and precision; only use 'Panics' section when a function can panic
Link types and methods as [Type], [Type::method] in Rust documentation comments
Keep Rust documentation wording concise, correct, and punctuated; reword for clarity while preserving intent
Do not include code examples in Rust documentation; include private helpers for maintainers; apply documentation to modules, types, traits, impls, and functions
Normal comments in Rust should always finish with a period

Files:

  • etl/src/workers/mod.rs
  • etl-config/src/shared/heartbeat.rs
  • etl/src/pipeline.rs
  • etl/src/workers/heartbeat.rs
  • etl-config/src/shared/connection.rs
  • etl-config/src/shared/mod.rs
  • etl-benchmarks/benches/table_copies.rs
  • etl/src/replication/client.rs
  • etl/src/lib.rs
  • etl-examples/src/main.rs
  • etl/src/test_utils/pipeline.rs
  • etl-config/src/shared/pipeline.rs
  • etl/src/metrics.rs
**/Cargo.toml

📄 CodeRabbit inference engine (AGENTS.md)

Use kebab-case for crate names

Files:

  • etl/Cargo.toml
🧠 Learnings (3)
📚 Learning: 2026-01-13T07:51:54.159Z
Learnt from: CR
Repo: supabase/etl PR: 0
File: AGENTS.md:0-0
Timestamp: 2026-01-13T07:51:54.159Z
Learning: Rust workspace with crates: `etl/` (core), `etl-api/` (HTTP API), `etl-postgres/`, `etl-destinations/`, `etl-replicator/`, `etl-config/`, `etl-telemetry/`, `etl-examples/`, `etl-benchmarks/`

Applied to files:

  • etl/src/workers/mod.rs
  • etl/src/pipeline.rs
  • etl/src/workers/heartbeat.rs
  • etl-config/src/shared/mod.rs
  • etl-benchmarks/benches/table_copies.rs
  • etl/src/replication/client.rs
  • etl/src/lib.rs
  • etl/Cargo.toml
  • etl/src/metrics.rs
📚 Learning: 2026-01-13T07:51:54.159Z
Learnt from: CR
Repo: supabase/etl PR: 0
File: AGENTS.md:0-0
Timestamp: 2026-01-13T07:51:54.159Z
Learning: Tests live per crate (`src` unit tests, `tests` integration); benches in `etl-benchmarks/benches/`

Applied to files:

  • etl/src/lib.rs
📚 Learning: 2026-01-13T07:51:54.159Z
Learnt from: CR
Repo: supabase/etl PR: 0
File: AGENTS.md:0-0
Timestamp: 2026-01-13T07:51:54.159Z
Learning: Build with `cargo build --workspace --all-targets --all-features`

Applied to files:

  • etl/Cargo.toml
🧬 Code graph analysis (4)
etl/src/pipeline.rs (1)
etl-config/src/shared/pipeline.rs (2)
  • is_replica_mode (77-79)
  • heartbeat_config (82-84)
etl/src/workers/heartbeat.rs (2)
etl-config/src/shared/pipeline.rs (1)
  • heartbeat_config (82-84)
etl/src/replication/client.rs (1)
  • connect_regular (183-188)
etl/src/test_utils/pipeline.rs (1)
etl/src/pipeline.rs (1)
  • new (49-59)
etl-config/src/shared/pipeline.rs (2)
etl-config/src/shared/replicator.rs (1)
  • validate (40-42)
etl-api/src/configs/pipeline.rs (1)
  • default_table_error_retry_max_attempts (13-15)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: depthfirst Bot
🔇 Additional comments (39)
etl/Cargo.toml (1)

34-34: LGTM!

The thiserror dependency is correctly added using the workspace pattern, consistent with other dependencies. This enables the #[derive(Error)] macro used by HeartbeatError in the heartbeat worker module.

etl-config/src/shared/heartbeat.rs (2)

72-81: LGTM!

The Default implementation correctly delegates to the helper functions, ensuring consistency with serde defaults.


83-120: LGTM!

Tests adequately cover default values, serde default behavior, and custom value parsing.

etl-config/src/shared/mod.rs (1)

5-5: LGTM!

Module declaration and re-export follow the existing pattern established by other submodules.

Also applies to: 15-15

etl-config/src/shared/connection.rs (3)

21-21: LGTM!

The constant follows the established naming convention for application names.


74-89: LGTM!

The heartbeat connection options are well-documented and appropriately configured with short timeouts for fail-fast behavior. The 5-second statement/lock timeouts and 30-second idle timeout align with the lightweight, transient nature of heartbeat operations.


299-305: LGTM!

Test validates the critical timeout and application name values for heartbeat options.

etl/src/workers/heartbeat.rs (5)

159-182: LGTM!

The method correctly manages state transitions, connection metrics, and failure counter reset upon successful connection.


184-210: LGTM!

Connection and version verification logic is correct. The version number parsing correctly handles PostgreSQL's server_version_num format. Per coding guidelines, consider adding brief doc comments to internal helpers.


212-251: LGTM!

The heartbeat loop correctly uses tokio::time::interval with shutdown awareness. The pg_logical_emit_message(false, 'etl_heartbeat', '') call is appropriate:

  • false for non-transactional (immediate WAL write)
  • Empty message body minimizes WAL overhead
  • Prefix identifies the heartbeat source

Note: The first tick() fires immediately, which means a heartbeat is emitted right after connection—a sensible choice for promptly activating the slot.


112-116: The shutdown detection pattern is correct; no changes needed.

The code properly handles shutdown signals through multiple layers:

  1. has_changed() at line 113 catches pre-startup shutdown before the first connection attempt
  2. changed() in heartbeat_loop() (line 220) catches shutdown during heartbeat emission
  3. changed() in the error backoff (line 150) catches shutdown during retry delays

Once heartbeat_loop() detects shutdown via changed(), it returns Ok(()), which propagates back through connect_and_heartbeat() to run(), exiting the loop. The receiver is never re-marked as seen because shutdown causes an early return—there are no subsequent loop iterations after shutdown is detected. The pattern is consistent with its usage in stream.rs and is sound.


23-23: Add documentation explaining why PostgreSQL 15 is required.

pg_logical_emit_message has been available since PostgreSQL 9.4, so the MIN_PG_VERSION requirement must be based on a different feature. Add a documentation comment to the constant explaining which functionality actually requires PostgreSQL 15.

etl/src/workers/mod.rs (1)

5-5: LGTM!

The heartbeat module export follows the established pattern for worker modules.

etl/src/metrics.rs (2)

18-22: LGTM!

The heartbeat metric constants follow the established naming convention (ETL_*_TOTAL for counters, descriptive names for gauges).


45-49: LGTM!

Heartbeat metrics are properly registered with appropriate types and units:

  • Counters for cumulative totals (emissions, failures, connection attempts)
  • Gauges for point-in-time values (last timestamp, consecutive failures)
etl-config/src/shared/pipeline.rs (9)

5-8: LGTM!

The imports are well-organized, bringing in the new HeartbeatConfig and BatchConfig types needed for the expanded configuration.


14-30: LGTM!

The TableSyncCopyConfig enum variants are well-designed with struct-like syntax for IncludeTables and SkipTables. The should_copy_table method correctly handles all variants.


43-48: LGTM!

The new primary_connection and heartbeat fields are appropriately optional with serde defaults, enabling backward compatibility. The documentation is concise and clear.


66-84: LGTM!

The associated constants and helper methods are well-documented and follow Rust conventions. The is_replica_mode() method correctly infers replica mode from the presence of a primary connection, and heartbeat_config() safely provides a default when not explicitly configured.


108-118: LGTM!

The default functions properly delegate to the associated constants, maintaining a single source of truth for default values.


128-131: LGTM!

Correct use of skip_serializing_if = "Option::is_none" for optional fields in the serializable variant, keeping JSON output clean when these fields are unset.


149-163: LGTM!

The From<PipelineConfig> implementation correctly propagates the new primary_connection and heartbeat fields, with appropriate handling for the secrets-free connection variant.


178-184: LGTM!

The test correctly verifies serialization/deserialization roundtrip for the updated IncludeTables struct-like variant.


86-105: HeartbeatConfig does not have a validate() method.

HeartbeatConfig has no validate() method, so this suggestion is not applicable. If validation of heartbeat configuration is desired, a validate() method would need to be added to HeartbeatConfig first.

Likely an incorrect or invalid review comment.

etl-examples/src/main.rs (1)

184-185: LGTM!

The example correctly initializes the new optional fields to None, demonstrating standard (non-replica) mode usage.

etl/src/replication/client.rs (3)

4-4: LGTM!

Import correctly added for ETL_HEARTBEAT_OPTIONS to support the new heartbeat connection functionality.


179-188: LGTM!

The connect_regular method is well-documented and correctly establishes a non-replication connection for heartbeat operations. Returning a raw Client rather than PgReplicationClient is appropriate since replication features aren't needed.


190-202: LGTM!

The helper correctly establishes a non-TLS regular connection using ETL_HEARTBEAT_OPTIONS, with appropriate logging.

etl/src/pipeline.rs (4)

16-16: LGTM!

Import correctly added for the new heartbeat worker types.


25-33: LGTM!

The PipelineState::Started variant is correctly extended to track the optional heartbeat worker handle.


101-119: LGTM!

The heartbeat worker initialization is correctly conditional on replica mode. The worker is started with the appropriate config and shutdown receiver for graceful termination.


148-153: Heartbeat worker errors are logged but not collected.

Unlike apply_worker and table_sync_workers, heartbeat worker errors are only logged but not added to the errors vector. This means pipeline wait() can return Ok(()) even if heartbeat failed. If this is intentional (heartbeat is non-critical), consider adding a comment explaining the design decision. Otherwise, consider collecting the error.

// Current behavior: log only
if let Err(err) = heartbeat_handle.wait().await {
    error!(error = %err, "heartbeat worker failed");
}

// Alternative if heartbeat errors should propagate:
// if let Err(err) = heartbeat_handle.wait().await {
//     error!(error = %err, "heartbeat worker failed");
//     errors.push(err);
// }
etl-benchmarks/benches/table_copies.rs (2)

6-6: LGTM!

Import path updated to reflect the module reorganization.


348-349: LGTM!

The benchmark correctly initializes the new optional fields to None for non-replica mode testing.

etl/src/lib.rs (2)

5-14: LGTM!

The documentation updates clearly describe the new feature ("Read replica support: Optional heartbeat mechanism for replica mode") and the wording is concise and consistent with the existing bullet points.


41-52: LGTM!

The example correctly initializes the new primary_connection and heartbeat fields with None, maintaining the example as a minimal working setup while reflecting the updated PipelineConfig structure.

etl/src/test_utils/pipeline.rs (3)

24-24: LGTM!

Using ignore for the doctest example is appropriate since test utilities often have dependencies that aren't available in the doctest environment.


112-124: LGTM!

The helper function correctly delegates to the builder with default settings.


126-159: LGTM!

Both helper functions correctly use the builder pattern and the reformatting improves readability.

Comment on lines +43 to +70
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct HeartbeatConfig {
/// Interval in seconds between heartbeat emissions.
///
/// Should be less than the `wal_sender_timeout` on the primary (default 60s)
/// to ensure the slot remains active. Default: 30 seconds.
#[serde(default = "default_interval_secs")]
pub interval_secs: u64,

/// Initial delay in seconds before retrying after a connection failure.
///
/// Default: 1 second.
#[serde(default = "default_initial_backoff_secs")]
pub initial_backoff_secs: u64,

/// Maximum delay in seconds between reconnection attempts.
///
/// The backoff delay will not exceed this value. Default: 60 seconds.
#[serde(default = "default_max_backoff_secs")]
pub max_backoff_secs: u64,

/// Percentage of jitter to add to backoff delays (0-100).
///
/// Jitter helps prevent multiple workers from reconnecting simultaneously.
/// A value of 25 means ±25% randomization. Default: 25%.
#[serde(default = "default_jitter_percent")]
pub jitter_percent: u8,
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

Consider adding validation for configuration values.

The configuration lacks validation for edge cases:

  1. interval_secs = 0 would create a zero-duration interval in the heartbeat loop, causing a busy-loop.
  2. jitter_percent > 100 could produce unexpected backoff values (though mathematically it would still work).
  3. initial_backoff_secs > max_backoff_secs is contradictory.

Consider adding a validation method or implementing validation during deserialization.

♻️ Suggested validation method
impl HeartbeatConfig {
    /// Validates the configuration values.
    ///
    /// Returns an error if any values are invalid.
    pub fn validate(&self) -> Result<(), String> {
        if self.interval_secs == 0 {
            return Err("interval_secs must be greater than 0".to_string());
        }
        if self.initial_backoff_secs > self.max_backoff_secs {
            return Err("initial_backoff_secs must not exceed max_backoff_secs".to_string());
        }
        if self.jitter_percent > 100 {
            return Err("jitter_percent must be between 0 and 100".to_string());
        }
        Ok(())
    }
}
🤖 Prompt for AI Agents
In @etl-config/src/shared/heartbeat.rs around lines 43 - 70, Add validation to
HeartbeatConfig to reject invalid values: implement a
HeartbeatConfig::validate(&self) -> Result<(), String> that checks interval_secs
> 0, initial_backoff_secs <= max_backoff_secs, and jitter_percent <= 100,
returning descriptive errors for each violation; call this validate() after
deserialization (or expose it for callers to use) so configs with interval_secs
== 0, jitter_percent > 100, or initial_backoff_secs > max_backoff_secs are
rejected early.

Comment on lines +204 to +228
/// Establishes a regular TLS-encrypted connection to Postgres.
async fn connect_regular_tls(pg_connection_config: PgConnectionConfig) -> EtlResult<Client> {
let config: Config = pg_connection_config
.clone()
.with_db(Some(&ETL_HEARTBEAT_OPTIONS));
// Note: NO replication_mode() call - this is a regular connection

let mut root_store = rustls::RootCertStore::empty();
for cert in CertificateDer::pem_slice_iter(
pg_connection_config.tls.trusted_root_certs.as_bytes(),
) {
let cert = cert?;
root_store.add(cert)?;
}

let tls_config = ClientConfig::builder()
.with_root_certificates(root_store)
.with_no_client_auth();

let (client, connection) = config.connect(MakeRustlsConnect::new(tls_config)).await?;
spawn_postgres_connection::<MakeRustlsConnect>(connection);

info!("connected to postgres (regular mode) with tls");
Ok(client)
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

Consider extracting TLS setup into a shared helper.

The TLS certificate loading and ClientConfig setup (lines 211-221) duplicates the logic in connect_tls (lines 264-276). Consider extracting a helper like build_tls_config(pg_connection_config: &PgConnectionConfig) -> EtlResult<ClientConfig> to reduce duplication.

♻️ Suggested helper extraction
/// Builds a TLS client configuration from the connection config's trusted root certificates.
fn build_tls_config(pg_connection_config: &PgConnectionConfig) -> EtlResult<ClientConfig> {
    let mut root_store = rustls::RootCertStore::empty();
    for cert in CertificateDer::pem_slice_iter(
        pg_connection_config.tls.trusted_root_certs.as_bytes(),
    ) {
        let cert = cert?;
        root_store.add(cert)?;
    }

    Ok(ClientConfig::builder()
        .with_root_certificates(root_store)
        .with_no_client_auth())
}

Then both connect_regular_tls and connect_tls can call let tls_config = Self::build_tls_config(&pg_connection_config)?;.

🤖 Prompt for AI Agents
In @etl/src/replication/client.rs around lines 204 - 228, Extract the duplicated
TLS certificate loading and ClientConfig construction into a shared function
(e.g., build_tls_config(pg_connection_config: &PgConnectionConfig) ->
EtlResult<ClientConfig>) and have both connect_regular_tls and connect_tls call
it; move the CertificateDer::pem_slice_iter loop and creation of
rustls::RootCertStore +
ClientConfig::builder().with_root_certificates(...).with_no_client_auth() into
that helper, return the ClientConfig, and replace the inlined code in
connect_regular_tls and connect_tls with a call to the new helper (propagate
errors using EtlResult).

Comment on lines 95 to 110
pub fn build(self) -> Pipeline<S, D> {
let config = PipelineConfig {
id: self.pipeline_id,
publication_name: self.publication_name,
pg_connection: self.pg_connection_config,
batch: self.batch.unwrap_or(BatchConfig {
max_size: 1,
max_fill_ms: 1000,
}),
primary_connection: None,
heartbeat: None,
batch: self.batch.unwrap_or(BatchConfig { max_size: 1, max_fill_ms: 1000 }),
table_error_retry_delay_ms: self.table_error_retry_delay_ms,
table_error_retry_max_attempts: self.table_error_retry_max_attempts,
max_table_sync_workers: self.max_table_sync_workers,
table_sync_copy: self.table_sync_copy.unwrap_or_default(),
};

Pipeline::new(config, self.store, self.destination)
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

Consider adding builder methods for primary_connection and heartbeat.

The new fields are correctly initialized to None. However, if heartbeat-related integration tests are planned (as mentioned in PR discussions about testing idle-period behavior), you may want to add builder methods like with_primary_connection() and with_heartbeat() for test flexibility.

♻️ Optional builder method additions
pub fn with_primary_connection(mut self, primary_connection: PgConnectionConfig) -> Self {
    self.primary_connection = Some(primary_connection);
    self
}

pub fn with_heartbeat(mut self, heartbeat: HeartbeatConfig) -> Self {
    self.heartbeat = Some(heartbeat);
    self
}

This would require adding corresponding fields to PipelineBuilder and importing HeartbeatConfig.

🤖 Prompt for AI Agents
In @etl/src/test_utils/pipeline.rs around lines 95 - 110, Add builder methods to
PipelineBuilder to set the previously added optional fields: implement
with_primary_connection(self, primary_connection: PgConnectionConfig) -> Self
and with_heartbeat(self, heartbeat: HeartbeatConfig) -> Self, add matching
fields primary_connection: Option<PgConnectionConfig> and heartbeat:
Option<HeartbeatConfig> to the PipelineBuilder struct, ensure you import
HeartbeatConfig and PgConnectionConfig, set the fields in those methods and
return self so build() will pass the configured values into PipelineConfig
(which currently sets primary_connection and heartbeat to None).

Comment on lines 71 to 96
pub struct HeartbeatWorker {
pipeline_id: PipelineId,
primary_config: PgConnectionConfig,
heartbeat_config: HeartbeatConfig,
shutdown_rx: ShutdownRx,
state_tx: watch::Sender<ConnectionState>,
consecutive_failures: u32,
}

impl HeartbeatWorker {
pub fn new(
pipeline_id: PipelineId,
primary_config: PgConnectionConfig,
heartbeat_config: HeartbeatConfig,
shutdown_rx: ShutdownRx,
) -> Self {
let (state_tx, _) = watch::channel(ConnectionState::Disconnected);
Self {
pipeline_id,
primary_config,
heartbeat_config,
shutdown_rx,
state_tx,
consecutive_failures: 0,
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

Unused watch channel receiver and missing documentation.

The watch channel receiver created at line 87 is immediately dropped (let (state_tx, _) = ...). If external state observation is not needed, consider simplifying to just store the state directly. If it's intended for future use, consider exposing it.

Additionally, the struct and its methods need documentation per coding guidelines.

♻️ Simplification if external observation is not needed
 pub struct HeartbeatWorker {
     pipeline_id: PipelineId,
     primary_config: PgConnectionConfig,
     heartbeat_config: HeartbeatConfig,
     shutdown_rx: ShutdownRx,
-    state_tx: watch::Sender<ConnectionState>,
+    state: ConnectionState,
     consecutive_failures: u32,
 }

This would require updating update_state to modify self.state directly instead of sending through a channel.

🤖 Prompt for AI Agents
In @etl/src/workers/heartbeat.rs around lines 71 - 96, The watch channel
receiver created in HeartbeatWorker::new is immediately dropped (let (state_tx,
_) = watch::channel(...)), which is either unnecessary or accidentally discards
intended observers; fix by either (A) removing the watch channel and replacing
state_tx: watch::Sender<ConnectionState> with a simple ConnectionState field
(e.g., state) and update update_state to mutate self.state directly, or (B)
expose and store the receiver (watch::Receiver<ConnectionState>) alongside
state_tx so external observers can subscribe; also add doc comments to the
HeartbeatWorker struct and its new method describing purpose, fields
(pipeline_id, primary_config, heartbeat_config, shutdown_rx, state_tx or state),
and intended observation behavior so code guidelines are met.

Comment on lines +276 to +291
#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_connection_state_as_str() {
assert_eq!(ConnectionState::Disconnected.as_str(), "disconnected");
assert_eq!(ConnectionState::Connecting.as_str(), "connecting");
assert_eq!(ConnectionState::Connected.as_str(), "connected");
}

#[test]
fn test_min_pg_version_constant() {
assert_eq!(MIN_PG_VERSION, 150000);
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

Consider adding more unit tests.

Current tests only cover ConnectionState::as_str() and the version constant. Consider adding tests for:

  • calculate_backoff behavior (especially with the fix for the jitter bug)
  • Error variant messages
  • Configuration edge cases
🤖 Prompt for AI Agents
In @etl/src/workers/heartbeat.rs around lines 276 - 291, Add unit tests covering
calculate_backoff behavior, HeartbeatError messages, and configuration edge
cases: write tests for calculate_backoff (call the function with varying
attempts and a BackoffConfig to assert base/backoff growth, max cap, and that
jitterized results stay within expected +/- jitter bounds and never exceed
max_backoff), add tests that format each HeartbeatError variant (e.g.,
HeartbeatError::ConnectionFailed, HeartbeatError::InvalidConfig) using
to_string() or Display and assert the expected message substrings, and add tests
for HeartbeatConfig/parse_config/from_env edge cases (missing keys, invalid
values, boundary values) to ensure defaults and errors are handled as intended;
locate helpers by the function names calculate_backoff, BackoffConfig,
HeartbeatError, and HeartbeatConfig/parse_config to add these tests.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Fix all issues with AI agents
In @etl/src/test_utils/pipeline.rs:
- Around line 152-153: Add builder methods to PipelineBuilder to support setting
the optional fields primary_connection and heartbeat for future replica-mode
tests: implement with_primary_connection(self, conn: PrimaryConnectionType) ->
Self and with_heartbeat(self, hb: HeartbeatType) -> Self that set
self.primary_connection and self.heartbeat respectively, and return self so
tests can opt-in to non-None values; update any PipelineBuilder::build() logic
to use those fields when constructing the Pipeline.
📜 Review details

Configuration used: Repository UI

Review profile: ASSERTIVE

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between d3863cd and b3b8999.

📒 Files selected for processing (3)
  • etl-config/src/shared/connection.rs
  • etl-config/src/shared/pipeline.rs
  • etl/src/test_utils/pipeline.rs
🧰 Additional context used
📓 Path-based instructions (1)
**/*.rs

📄 CodeRabbit inference engine (AGENTS.md)

**/*.rs: Use snake_case for files and modules
Use CamelCase for types and traits
Use snake_case for functions and variables
Do not leave comments when you remove things
Document all items, public and private, using stdlib tone and precision; only use 'Panics' section when a function can panic
Link types and methods as [Type], [Type::method] in Rust documentation comments
Keep Rust documentation wording concise, correct, and punctuated; reword for clarity while preserving intent
Do not include code examples in Rust documentation; include private helpers for maintainers; apply documentation to modules, types, traits, impls, and functions
Normal comments in Rust should always finish with a period

Files:

  • etl/src/test_utils/pipeline.rs
  • etl-config/src/shared/pipeline.rs
  • etl-config/src/shared/connection.rs
🧬 Code graph analysis (1)
etl-config/src/shared/pipeline.rs (2)
etl-config/src/shared/replicator.rs (1)
  • validate (40-42)
etl-api/src/configs/pipeline.rs (1)
  • default_table_error_retry_max_attempts (13-15)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: depthfirst Bot
🔇 Additional comments (11)
etl-config/src/shared/connection.rs (3)

36-38: LGTM!

The constant follows the established naming pattern and is properly documented.


106-122: LGTM!

The heartbeat connection options are well-documented and use appropriate timeout values (5s statement/lock, 30s idle) for lightweight operations that should fail fast. The configuration follows the same pattern as other connection option statics.


451-458: LGTM!

Test coverage for the key heartbeat option values.

etl-config/src/shared/pipeline.rs (7)

5-8: LGTM!

Imports are correctly organized to include the new HeartbeatConfig and BatchConfig types.


71-81: LGTM!

The new replica-mode fields are well-documented and correctly use #[serde(default)] for optional configurations.


99-120: LGTM!

The constants and methods are well-documented. is_replica_mode() provides a clean abstraction, and heartbeat_config() correctly falls back to defaults.


146-156: LGTM!

Helper functions correctly delegate to the associated constants on PipelineConfig.


173-193: LGTM!

PipelineConfigWithoutSecrets correctly mirrors the new fields with appropriate serde attributes including skip_serializing_if for optional fields.


196-211: LGTM!

The From implementation correctly maps the new fields, with primary_connection properly converted to strip secrets.


122-143: HeartbeatConfig does not have a validate() method.

The review comment suggests calling heartbeat.validate()? but HeartbeatConfig does not implement a validate() method. The struct contains only a Default implementation and no validation logic.

If validation for HeartbeatConfig is desired (e.g., ensuring jitter_percent is within 0-100, or enforcing initial_backoff_secsmax_backoff_secs), it would need to be implemented first before being called in PipelineConfig::validate().

Likely an incorrect or invalid review comment.

etl/src/test_utils/pipeline.rs (1)

28-28: LGTM!

Using ignore is appropriate when doc tests require external dependencies or setup that isn't available in the test environment.

Comment on lines +152 to +153
primary_connection: None,
heartbeat: None,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

LGTM, with a suggestion for future extensibility.

The new fields are correctly initialized to None for standard test scenarios. If replica-mode testing becomes necessary, consider adding builder methods like with_primary_connection() and with_heartbeat() to PipelineBuilder.

🤖 Prompt for AI Agents
In @etl/src/test_utils/pipeline.rs around lines 152 - 153, Add builder methods
to PipelineBuilder to support setting the optional fields primary_connection and
heartbeat for future replica-mode tests: implement with_primary_connection(self,
conn: PrimaryConnectionType) -> Self and with_heartbeat(self, hb: HeartbeatType)
-> Self that set self.primary_connection and self.heartbeat respectively, and
return self so tests can opt-in to non-None values; update any
PipelineBuilder::build() logic to use those fields when constructing the
Pipeline.

- Fix jitter bug: divide by 1_000_000_000.0 instead of u32::MAX
  (subsec_nanos returns 0..1B, not 0..4.3B)
- Add doc comments to HeartbeatError, ConnectionState,
  HeartbeatWorkerHandle, and HeartbeatWorker per CodeRabbit review
- Properly merge pipeline.rs with upstream doc comments
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 9

🤖 Fix all issues with AI agents
In @etl/src/workers/heartbeat.rs:
- Around line 300-303: The private method update_state currently lacks
documentation; add a Rust doc comment describing its purpose and behavior, e.g.,
that update_state sends the provided ConnectionState over the state_tx channel
and logs the change for the given pipeline_id; reference the method name
update_state, the ConnectionState type, the state_tx sender, and the debug log
so reviewers can verify the doc matches behavior and any side effects
(non-blocking send, possible ignore on failure).
- Around line 213-239: The private methods connect_to_primary and
verify_pg_version lack documentation; add Rust doc comments (///) above each
private async fn describing purpose, inputs (e.g., that connect_to_primary uses
self.primary_config and returns Result<Client, HeartbeatError> and
verify_pg_version takes &Client), expected behavior, returned Result values, and
possible HeartbeatError variants (ConnectionFailed, VersionQueryFailed,
UnsupportedVersion) so readers know side-effects and error cases; keep comments
brief and follow existing crate style for private method docs.
- Around line 188-211: The private async method connect_and_heartbeat lacks
documentation; add a Rust doc comment (///) above the connect_and_heartbeat
method describing its purpose (connect to primary, verify PG version, update
connection state, reset failure counters, and run the heartbeat loop), outline
key behaviors and possible errors (returns Result<(), HeartbeatError>), and
mention side effects such as metric updates and state transitions so readers and
tools can understand its role; ensure the doc follows project style and includes
any important notes about concurrency or retry semantics.
- Around line 241-255: The private method heartbeat_loop lacks documentation;
add a doc comment above async fn heartbeat_loop(&mut self, client: &Client)
describing its purpose (periodically emits heartbeats using
self.emit_heartbeat), its shutdown behavior (listens on self.shutdown_rx and
returns Ok(()) when signaled), and explicitly note that tokio::time::interval
fires immediately on first tick so a heartbeat is emitted immediately on
entering the loop; keep the comment concise and mention the units/source of
interval_secs from self.heartbeat_config.
- Around line 257-280: Add a Rust doc comment for the private async method
emit_heartbeat to describe its purpose (emit a Postgres logical replication
message as an ETL heartbeat), its parameters (self context including pipeline_id
and the Client), its behavior (increments ETL_HEARTBEAT_EMISSIONS_TOTAL, updates
ETL_HEARTBEAT_LAST_EMISSION_TIMESTAMP, logs debug), and its return/errors
(Result<(), HeartbeatError> and when HeartbeatError::EmitFailed may be
returned). Place the /// comment immediately above the emit_heartbeat function
declaration so it follows project guideline for documenting private methods.
- Around line 282-298: Document the private method calculate_backoff with a
brief doc comment explaining its purpose, parameters and return value (it
applies jitter to a base Duration using heartbeat_config.jitter_percent), and
update inline comments inside the function to end with periods; ensure the doc
uses /// above the fn and mentions base_backoff and that jitter is derived from
SystemTime::now().subsec_nanos() normalized to -1.0..1.0 so reviewers can
understand the jitter computation.
- Around line 132-145: Add a doc comment for the private async fn run to explain
its responsibility and lifecycle, and stop silently ignoring a RecvError from
shutdown_rx.has_changed(); replace has_changed().unwrap_or(false) with proper
handling such that Ok(true) triggers shutdown, Ok(false) continues, and Err(_)
(sender dropped) is treated as shutdown (log and return Ok(())); reference the
async fn run and the shutdown_rx.has_changed() call to locate where to update
behavior and add the doc comment.
- Line 23: Add a doc comment above the MIN_PG_VERSION constant explaining the
PostgreSQL version requirement: note that pg_logical_emit_message has existed
since PG 9.6, that logical decoding from standbys is only supported starting in
PG 16, and state that this project explicitly requires a minimum of PG 15
(represented by MIN_PG_VERSION = 15_00_00) with a short rationale (compatibility
and tested behavior) and a link or reference to the upstream docs or PR
discussion for more details.
📜 Review details

Configuration used: Repository UI

Review profile: ASSERTIVE

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between b3b8999 and d52f472.

📒 Files selected for processing (1)
  • etl/src/workers/heartbeat.rs
🧰 Additional context used
📓 Path-based instructions (1)
**/*.rs

📄 CodeRabbit inference engine (AGENTS.md)

**/*.rs: Use snake_case for files and modules
Use CamelCase for types and traits
Use snake_case for functions and variables
Do not leave comments when you remove things
Document all items, public and private, using stdlib tone and precision; only use 'Panics' section when a function can panic
Link types and methods as [Type], [Type::method] in Rust documentation comments
Keep Rust documentation wording concise, correct, and punctuated; reword for clarity while preserving intent
Do not include code examples in Rust documentation; include private helpers for maintainers; apply documentation to modules, types, traits, impls, and functions
Normal comments in Rust should always finish with a period

Files:

  • etl/src/workers/heartbeat.rs
🧠 Learnings (2)
📚 Learning: 2026-01-13T07:51:54.159Z
Learnt from: CR
Repo: supabase/etl PR: 0
File: AGENTS.md:0-0
Timestamp: 2026-01-13T07:51:54.159Z
Learning: Rust workspace with crates: `etl/` (core), `etl-api/` (HTTP API), `etl-postgres/`, `etl-destinations/`, `etl-replicator/`, `etl-config/`, `etl-telemetry/`, `etl-examples/`, `etl-benchmarks/`

Applied to files:

  • etl/src/workers/heartbeat.rs
📚 Learning: 2026-01-13T07:51:54.158Z
Learnt from: CR
Repo: supabase/etl PR: 0
File: AGENTS.md:0-0
Timestamp: 2026-01-13T07:51:54.158Z
Learning: Applies to **/*.rs : Document all items, public and private, using stdlib tone and precision; only use 'Panics' section when a function can panic

Applied to files:

  • etl/src/workers/heartbeat.rs
🧬 Code graph analysis (1)
etl/src/workers/heartbeat.rs (4)
etl-api/src/routes/metrics.rs (1)
  • metrics (13-15)
etl/src/test_utils/pipeline.rs (1)
  • new (79-98)
etl-config/src/shared/pipeline.rs (1)
  • heartbeat_config (118-120)
etl/src/replication/client.rs (1)
  • connect_regular (183-188)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: depthfirst Bot
🔇 Additional comments (7)
etl/src/workers/heartbeat.rs (7)

1-22: LGTM!

Module documentation and imports are well-organized and appropriate for the worker's responsibilities.


25-40: LGTM!

Error enum is well-documented with clear, descriptive messages for each variant.


42-61: LGTM!

ConnectionState enum is well-documented and provides a clean string representation for logging.


63-82: LGTM!

The handle correctly propagates errors and handles task panics gracefully.


98-122: LGTM!

Constructor is well-documented with clear argument descriptions.


124-130: LGTM!

The start method cleanly spawns the background task and returns a handle.


306-321: LGTM!

Unit tests appropriately validate the constant value and enum string representations.

use crate::types::PipelineId;
use etl_config::shared::{HeartbeatConfig, PgConnectionConfig};

const MIN_PG_VERSION: i32 = 15_00_00;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major

Add documentation explaining the version requirement.

Per coding guidelines, all items should be documented. This constant lacks a doc comment explaining why PG 15 is the minimum. The PR discussion notes that pg_logical_emit_message exists since PG 9.6, while logical decoding from standbys requires PG 16+. Please document the rationale for choosing PG 15.

📝 Suggested documentation
+/// Minimum PostgreSQL server version required for heartbeat functionality.
+///
+/// PostgreSQL 15+ is required because [explain reason here].
 const MIN_PG_VERSION: i32 = 15_00_00;
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
const MIN_PG_VERSION: i32 = 15_00_00;
/// Minimum PostgreSQL server version required for heartbeat functionality.
///
/// PostgreSQL 15+ is required because [explain reason here].
const MIN_PG_VERSION: i32 = 15_00_00;
🤖 Prompt for AI Agents
In @etl/src/workers/heartbeat.rs at line 23, Add a doc comment above the
MIN_PG_VERSION constant explaining the PostgreSQL version requirement: note that
pg_logical_emit_message has existed since PG 9.6, that logical decoding from
standbys is only supported starting in PG 16, and state that this project
explicitly requires a minimum of PG 15 (represented by MIN_PG_VERSION =
15_00_00) with a short rationale (compatibility and tested behavior) and a link
or reference to the upstream docs or PR discussion for more details.

Comment on lines +89 to +96
pub struct HeartbeatWorker {
pipeline_id: PipelineId,
primary_config: PgConnectionConfig,
heartbeat_config: HeartbeatConfig,
shutdown_rx: ShutdownRx,
state_tx: watch::Sender<ConnectionState>,
consecutive_failures: u32,
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

Unused watch channel receiver is discarded immediately.

The state_tx watch channel is created but the receiver is immediately discarded with _ on line 113. The sender is used to broadcast state changes, but no code subscribes to observe them. This appears to be either dead code or an incomplete feature.

Consider either:

  1. Removing state_tx and update_state() if state observation isn't needed
  2. Exposing a subscribe() method if external state observation is intended

Also applies to: 113-113

Comment on lines +132 to +145
async fn run(mut self) -> Result<(), HeartbeatError> {
info!(
pipeline_id = %self.pipeline_id,
interval_secs = %self.heartbeat_config.interval_secs,
"starting heartbeat worker"
);

let mut current_backoff = Duration::from_secs(self.heartbeat_config.initial_backoff_secs);

loop {
if self.shutdown_rx.has_changed().unwrap_or(false) {
info!(pipeline_id = %self.pipeline_id, "heartbeat worker shutting down");
return Ok(());
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major

Add doc comment and handle has_changed() error properly.

Per coding guidelines, private methods should be documented.

Additionally, has_changed().unwrap_or(false) on line 142 silently ignores RecvError (sender dropped). If the shutdown sender is dropped, the worker should shut down rather than continue indefinitely.

📝 Suggested fix
+    /// Runs the heartbeat worker's main loop.
+    ///
+    /// Continuously attempts to connect and emit heartbeats, with exponential
+    /// backoff on failures. Returns when shutdown is signaled or a fatal error occurs.
     async fn run(mut self) -> Result<(), HeartbeatError> {
         info!(
             pipeline_id = %self.pipeline_id,
             interval_secs = %self.heartbeat_config.interval_secs,
             "starting heartbeat worker"
         );

         let mut current_backoff = Duration::from_secs(self.heartbeat_config.initial_backoff_secs);

         loop {
-            if self.shutdown_rx.has_changed().unwrap_or(false) {
+            // If sender is dropped (Err) or shutdown signaled (Ok(true)), exit.
+            if self.shutdown_rx.has_changed().unwrap_or(true) {
                 info!(pipeline_id = %self.pipeline_id, "heartbeat worker shutting down");
                 return Ok(());
             }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
async fn run(mut self) -> Result<(), HeartbeatError> {
info!(
pipeline_id = %self.pipeline_id,
interval_secs = %self.heartbeat_config.interval_secs,
"starting heartbeat worker"
);
let mut current_backoff = Duration::from_secs(self.heartbeat_config.initial_backoff_secs);
loop {
if self.shutdown_rx.has_changed().unwrap_or(false) {
info!(pipeline_id = %self.pipeline_id, "heartbeat worker shutting down");
return Ok(());
}
/// Runs the heartbeat worker's main loop.
///
/// Continuously attempts to connect and emit heartbeats, with exponential
/// backoff on failures. Returns when shutdown is signaled or a fatal error occurs.
async fn run(mut self) -> Result<(), HeartbeatError> {
info!(
pipeline_id = %self.pipeline_id,
interval_secs = %self.heartbeat_config.interval_secs,
"starting heartbeat worker"
);
let mut current_backoff = Duration::from_secs(self.heartbeat_config.initial_backoff_secs);
loop {
// If sender is dropped (Err) or shutdown signaled (Ok(true)), exit.
if self.shutdown_rx.has_changed().unwrap_or(true) {
info!(pipeline_id = %self.pipeline_id, "heartbeat worker shutting down");
return Ok(());
}
🤖 Prompt for AI Agents
In @etl/src/workers/heartbeat.rs around lines 132 - 145, Add a doc comment for
the private async fn run to explain its responsibility and lifecycle, and stop
silently ignoring a RecvError from shutdown_rx.has_changed(); replace
has_changed().unwrap_or(false) with proper handling such that Ok(true) triggers
shutdown, Ok(false) continues, and Err(_) (sender dropped) is treated as
shutdown (log and return Ok(())); reference the async fn run and the
shutdown_rx.has_changed() call to locate where to update behavior and add the
doc comment.

Comment on lines +188 to +211
async fn connect_and_heartbeat(&mut self) -> Result<(), HeartbeatError> {
self.update_state(ConnectionState::Connecting);

counter!(
ETL_HEARTBEAT_CONNECTION_ATTEMPTS_TOTAL,
PIPELINE_ID_LABEL => self.pipeline_id.to_string()
).increment(1);

debug!(pipeline_id = %self.pipeline_id, "connecting to primary for heartbeat");

let client = self.connect_to_primary().await?;
self.verify_pg_version(&client).await?;

self.update_state(ConnectionState::Connected);
info!(pipeline_id = %self.pipeline_id, "connected to primary, starting heartbeat loop");

self.consecutive_failures = 0;
gauge!(
ETL_HEARTBEAT_CONSECUTIVE_FAILURES,
PIPELINE_ID_LABEL => self.pipeline_id.to_string()
).set(0.0);

self.heartbeat_loop(&client).await
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major

Add documentation for private method.

Per coding guidelines, all items including private methods should be documented.

📝 Suggested documentation
+    /// Establishes a connection to the primary and runs the heartbeat loop.
+    ///
+    /// Verifies PostgreSQL version compatibility before starting heartbeats.
+    /// Resets the consecutive failure counter on successful connection.
     async fn connect_and_heartbeat(&mut self) -> Result<(), HeartbeatError> {
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
async fn connect_and_heartbeat(&mut self) -> Result<(), HeartbeatError> {
self.update_state(ConnectionState::Connecting);
counter!(
ETL_HEARTBEAT_CONNECTION_ATTEMPTS_TOTAL,
PIPELINE_ID_LABEL => self.pipeline_id.to_string()
).increment(1);
debug!(pipeline_id = %self.pipeline_id, "connecting to primary for heartbeat");
let client = self.connect_to_primary().await?;
self.verify_pg_version(&client).await?;
self.update_state(ConnectionState::Connected);
info!(pipeline_id = %self.pipeline_id, "connected to primary, starting heartbeat loop");
self.consecutive_failures = 0;
gauge!(
ETL_HEARTBEAT_CONSECUTIVE_FAILURES,
PIPELINE_ID_LABEL => self.pipeline_id.to_string()
).set(0.0);
self.heartbeat_loop(&client).await
}
/// Establishes a connection to the primary and runs the heartbeat loop.
///
/// Verifies PostgreSQL version compatibility before starting heartbeats.
/// Resets the consecutive failure counter on successful connection.
async fn connect_and_heartbeat(&mut self) -> Result<(), HeartbeatError> {
self.update_state(ConnectionState::Connecting);
counter!(
ETL_HEARTBEAT_CONNECTION_ATTEMPTS_TOTAL,
PIPELINE_ID_LABEL => self.pipeline_id.to_string()
).increment(1);
debug!(pipeline_id = %self.pipeline_id, "connecting to primary for heartbeat");
let client = self.connect_to_primary().await?;
self.verify_pg_version(&client).await?;
self.update_state(ConnectionState::Connected);
info!(pipeline_id = %self.pipeline_id, "connected to primary, starting heartbeat loop");
self.consecutive_failures = 0;
gauge!(
ETL_HEARTBEAT_CONSECUTIVE_FAILURES,
PIPELINE_ID_LABEL => self.pipeline_id.to_string()
).set(0.0);
self.heartbeat_loop(&client).await
}
🤖 Prompt for AI Agents
In @etl/src/workers/heartbeat.rs around lines 188 - 211, The private async
method connect_and_heartbeat lacks documentation; add a Rust doc comment (///)
above the connect_and_heartbeat method describing its purpose (connect to
primary, verify PG version, update connection state, reset failure counters, and
run the heartbeat loop), outline key behaviors and possible errors (returns
Result<(), HeartbeatError>), and mention side effects such as metric updates and
state transitions so readers and tools can understand its role; ensure the doc
follows project style and includes any important notes about concurrency or
retry semantics.

Comment on lines +213 to +239
async fn connect_to_primary(&self) -> Result<Client, HeartbeatError> {
PgReplicationClient::connect_regular(self.primary_config.clone())
.await
.map_err(|e| HeartbeatError::ConnectionFailed(e.to_string()))
}

async fn verify_pg_version(&self, client: &Client) -> Result<(), HeartbeatError> {
let row = client
.query_one("SELECT current_setting('server_version_num')::int", &[])
.await
.map_err(|e| HeartbeatError::VersionQueryFailed(e.to_string()))?;

let version: i32 = row.get(0);

if version < MIN_PG_VERSION {
let version_str = format!(
"{}.{}.{}",
version / 10000,
(version / 100) % 100,
version % 100
);
return Err(HeartbeatError::UnsupportedVersion(version_str));
}

debug!(pipeline_id = %self.pipeline_id, pg_version = %version, "PostgreSQL version verified");
Ok(())
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major

Add documentation for private methods.

Per coding guidelines, private methods should be documented.

📝 Suggested documentation
+    /// Connects to the primary database using the configured connection settings.
     async fn connect_to_primary(&self) -> Result<Client, HeartbeatError> {
         PgReplicationClient::connect_regular(self.primary_config.clone())
             .await
             .map_err(|e| HeartbeatError::ConnectionFailed(e.to_string()))
     }

+    /// Verifies the PostgreSQL server version meets the minimum requirement.
+    ///
+    /// Returns an error if the server version is below [`MIN_PG_VERSION`].
     async fn verify_pg_version(&self, client: &Client) -> Result<(), HeartbeatError> {
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
async fn connect_to_primary(&self) -> Result<Client, HeartbeatError> {
PgReplicationClient::connect_regular(self.primary_config.clone())
.await
.map_err(|e| HeartbeatError::ConnectionFailed(e.to_string()))
}
async fn verify_pg_version(&self, client: &Client) -> Result<(), HeartbeatError> {
let row = client
.query_one("SELECT current_setting('server_version_num')::int", &[])
.await
.map_err(|e| HeartbeatError::VersionQueryFailed(e.to_string()))?;
let version: i32 = row.get(0);
if version < MIN_PG_VERSION {
let version_str = format!(
"{}.{}.{}",
version / 10000,
(version / 100) % 100,
version % 100
);
return Err(HeartbeatError::UnsupportedVersion(version_str));
}
debug!(pipeline_id = %self.pipeline_id, pg_version = %version, "PostgreSQL version verified");
Ok(())
}
/// Connects to the primary database using the configured connection settings.
async fn connect_to_primary(&self) -> Result<Client, HeartbeatError> {
PgReplicationClient::connect_regular(self.primary_config.clone())
.await
.map_err(|e| HeartbeatError::ConnectionFailed(e.to_string()))
}
/// Verifies the PostgreSQL server version meets the minimum requirement.
///
/// Returns an error if the server version is below [`MIN_PG_VERSION`].
async fn verify_pg_version(&self, client: &Client) -> Result<(), HeartbeatError> {
let row = client
.query_one("SELECT current_setting('server_version_num')::int", &[])
.await
.map_err(|e| HeartbeatError::VersionQueryFailed(e.to_string()))?;
let version: i32 = row.get(0);
if version < MIN_PG_VERSION {
let version_str = format!(
"{}.{}.{}",
version / 10000,
(version / 100) % 100,
version % 100
);
return Err(HeartbeatError::UnsupportedVersion(version_str));
}
debug!(pipeline_id = %self.pipeline_id, pg_version = %version, "PostgreSQL version verified");
Ok(())
}
🤖 Prompt for AI Agents
In @etl/src/workers/heartbeat.rs around lines 213 - 239, The private methods
connect_to_primary and verify_pg_version lack documentation; add Rust doc
comments (///) above each private async fn describing purpose, inputs (e.g.,
that connect_to_primary uses self.primary_config and returns Result<Client,
HeartbeatError> and verify_pg_version takes &Client), expected behavior,
returned Result values, and possible HeartbeatError variants (ConnectionFailed,
VersionQueryFailed, UnsupportedVersion) so readers know side-effects and error
cases; keep comments brief and follow existing crate style for private method
docs.

Comment on lines +241 to +255
async fn heartbeat_loop(&mut self, client: &Client) -> Result<(), HeartbeatError> {
let mut heartbeat_interval = interval(Duration::from_secs(self.heartbeat_config.interval_secs));

loop {
tokio::select! {
_ = heartbeat_interval.tick() => {
self.emit_heartbeat(client).await?;
}
_ = self.shutdown_rx.changed() => {
info!(pipeline_id = %self.pipeline_id, "heartbeat loop received shutdown signal");
return Ok(());
}
}
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major

Add documentation for private method.

Per coding guidelines, private methods should be documented. Note that tokio::time::interval fires immediately on the first tick, so a heartbeat is emitted immediately upon entering this loop.

📝 Suggested documentation
+    /// Runs the periodic heartbeat emission loop.
+    ///
+    /// Emits heartbeats at the configured interval until shutdown is signaled
+    /// or an emission fails.
     async fn heartbeat_loop(&mut self, client: &Client) -> Result<(), HeartbeatError> {
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
async fn heartbeat_loop(&mut self, client: &Client) -> Result<(), HeartbeatError> {
let mut heartbeat_interval = interval(Duration::from_secs(self.heartbeat_config.interval_secs));
loop {
tokio::select! {
_ = heartbeat_interval.tick() => {
self.emit_heartbeat(client).await?;
}
_ = self.shutdown_rx.changed() => {
info!(pipeline_id = %self.pipeline_id, "heartbeat loop received shutdown signal");
return Ok(());
}
}
}
}
/// Runs the periodic heartbeat emission loop.
///
/// Emits heartbeats at the configured interval until shutdown is signaled
/// or an emission fails.
async fn heartbeat_loop(&mut self, client: &Client) -> Result<(), HeartbeatError> {
let mut heartbeat_interval = interval(Duration::from_secs(self.heartbeat_config.interval_secs));
loop {
tokio::select! {
_ = heartbeat_interval.tick() => {
self.emit_heartbeat(client).await?;
}
_ = self.shutdown_rx.changed() => {
info!(pipeline_id = %self.pipeline_id, "heartbeat loop received shutdown signal");
return Ok(());
}
}
}
}
🤖 Prompt for AI Agents
In @etl/src/workers/heartbeat.rs around lines 241 - 255, The private method
heartbeat_loop lacks documentation; add a doc comment above async fn
heartbeat_loop(&mut self, client: &Client) describing its purpose (periodically
emits heartbeats using self.emit_heartbeat), its shutdown behavior (listens on
self.shutdown_rx and returns Ok(()) when signaled), and explicitly note that
tokio::time::interval fires immediately on first tick so a heartbeat is emitted
immediately on entering the loop; keep the comment concise and mention the
units/source of interval_secs from self.heartbeat_config.

Comment on lines +257 to +280
async fn emit_heartbeat(&self, client: &Client) -> Result<(), HeartbeatError> {
client
.execute("SELECT pg_logical_emit_message(false, 'etl_heartbeat', '')", &[])
.await
.map_err(|e| HeartbeatError::EmitFailed(e.to_string()))?;

counter!(
ETL_HEARTBEAT_EMISSIONS_TOTAL,
PIPELINE_ID_LABEL => self.pipeline_id.to_string()
).increment(1);

let timestamp = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs_f64();

gauge!(
ETL_HEARTBEAT_LAST_EMISSION_TIMESTAMP,
PIPELINE_ID_LABEL => self.pipeline_id.to_string()
).set(timestamp);

debug!(pipeline_id = %self.pipeline_id, "heartbeat emitted successfully");
Ok(())
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major

Add documentation for private method.

Per coding guidelines, private methods should be documented.

📝 Suggested documentation
+    /// Emits a heartbeat message to the primary database's WAL.
+    ///
+    /// Uses `pg_logical_emit_message` with non-transactional mode to ensure
+    /// the message is written to WAL immediately without requiring a commit.
     async fn emit_heartbeat(&self, client: &Client) -> Result<(), HeartbeatError> {
🤖 Prompt for AI Agents
In @etl/src/workers/heartbeat.rs around lines 257 - 280, Add a Rust doc comment
for the private async method emit_heartbeat to describe its purpose (emit a
Postgres logical replication message as an ETL heartbeat), its parameters (self
context including pipeline_id and the Client), its behavior (increments
ETL_HEARTBEAT_EMISSIONS_TOTAL, updates ETL_HEARTBEAT_LAST_EMISSION_TIMESTAMP,
logs debug), and its return/errors (Result<(), HeartbeatError> and when
HeartbeatError::EmitFailed may be returned). Place the /// comment immediately
above the emit_heartbeat function declaration so it follows project guideline
for documenting private methods.

Comment on lines +282 to +298
fn calculate_backoff(&self, base_backoff: Duration) -> Duration {
// Simple jitter using timestamp nanoseconds as pseudo-random source.
let jitter_fraction = self.heartbeat_config.jitter_percent as f64 / 100.0;
let jitter_range = base_backoff.as_secs_f64() * jitter_fraction;

// Use nanoseconds from current time as a simple source of variation.
let nanos = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.subsec_nanos();
// subsec_nanos() returns 0..1_000_000_000, normalize to -1.0..1.0
let normalized = (nanos as f64 / 1_000_000_000.0) * 2.0 - 1.0;
let jitter = normalized * jitter_range;

let jittered_secs = (base_backoff.as_secs_f64() + jitter).max(0.1);
Duration::from_secs_f64(jittered_secs)
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major

Add documentation and fix comment punctuation.

Per coding guidelines: private methods should be documented, and comments should end with a period.

📝 Suggested fix
+    /// Calculates backoff duration with jitter.
+    ///
+    /// Applies a random jitter within the configured percentage to prevent
+    /// thundering herd effects. The result is clamped to a minimum of 0.1 seconds.
     fn calculate_backoff(&self, base_backoff: Duration) -> Duration {
         // Simple jitter using timestamp nanoseconds as pseudo-random source.
         let jitter_fraction = self.heartbeat_config.jitter_percent as f64 / 100.0;
         let jitter_range = base_backoff.as_secs_f64() * jitter_fraction;

         // Use nanoseconds from current time as a simple source of variation.
         let nanos = SystemTime::now()
             .duration_since(UNIX_EPOCH)
             .unwrap_or_default()
             .subsec_nanos();
-        // subsec_nanos() returns 0..1_000_000_000, normalize to -1.0..1.0
+        // subsec_nanos() returns 0..1_000_000_000, normalize to -1.0..1.0.
         let normalized = (nanos as f64 / 1_000_000_000.0) * 2.0 - 1.0;
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
fn calculate_backoff(&self, base_backoff: Duration) -> Duration {
// Simple jitter using timestamp nanoseconds as pseudo-random source.
let jitter_fraction = self.heartbeat_config.jitter_percent as f64 / 100.0;
let jitter_range = base_backoff.as_secs_f64() * jitter_fraction;
// Use nanoseconds from current time as a simple source of variation.
let nanos = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.subsec_nanos();
// subsec_nanos() returns 0..1_000_000_000, normalize to -1.0..1.0
let normalized = (nanos as f64 / 1_000_000_000.0) * 2.0 - 1.0;
let jitter = normalized * jitter_range;
let jittered_secs = (base_backoff.as_secs_f64() + jitter).max(0.1);
Duration::from_secs_f64(jittered_secs)
}
/// Calculates backoff duration with jitter.
///
/// Applies a random jitter within the configured percentage to prevent
/// thundering herd effects. The result is clamped to a minimum of 0.1 seconds.
fn calculate_backoff(&self, base_backoff: Duration) -> Duration {
// Simple jitter using timestamp nanoseconds as pseudo-random source.
let jitter_fraction = self.heartbeat_config.jitter_percent as f64 / 100.0;
let jitter_range = base_backoff.as_secs_f64() * jitter_fraction;
// Use nanoseconds from current time as a simple source of variation.
let nanos = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.subsec_nanos();
// subsec_nanos() returns 0..1_000_000_000, normalize to -1.0..1.0.
let normalized = (nanos as f64 / 1_000_000_000.0) * 2.0 - 1.0;
let jitter = normalized * jitter_range;
let jittered_secs = (base_backoff.as_secs_f64() + jitter).max(0.1);
Duration::from_secs_f64(jittered_secs)
}
🤖 Prompt for AI Agents
In @etl/src/workers/heartbeat.rs around lines 282 - 298, Document the private
method calculate_backoff with a brief doc comment explaining its purpose,
parameters and return value (it applies jitter to a base Duration using
heartbeat_config.jitter_percent), and update inline comments inside the function
to end with periods; ensure the doc uses /// above the fn and mentions
base_backoff and that jitter is derived from SystemTime::now().subsec_nanos()
normalized to -1.0..1.0 so reviewers can understand the jitter computation.

Comment on lines +300 to +303
fn update_state(&self, state: ConnectionState) {
let _ = self.state_tx.send(state);
debug!(pipeline_id = %self.pipeline_id, state = %state.as_str(), "heartbeat worker state changed");
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major

Add documentation for private method.

Per coding guidelines, private methods should be documented.

📝 Suggested documentation
+    /// Updates the connection state and logs the transition.
     fn update_state(&self, state: ConnectionState) {
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
fn update_state(&self, state: ConnectionState) {
let _ = self.state_tx.send(state);
debug!(pipeline_id = %self.pipeline_id, state = %state.as_str(), "heartbeat worker state changed");
}
/// Updates the connection state and logs the transition.
fn update_state(&self, state: ConnectionState) {
let _ = self.state_tx.send(state);
debug!(pipeline_id = %self.pipeline_id, state = %state.as_str(), "heartbeat worker state changed");
}
🤖 Prompt for AI Agents
In @etl/src/workers/heartbeat.rs around lines 300 - 303, The private method
update_state currently lacks documentation; add a Rust doc comment describing
its purpose and behavior, e.g., that update_state sends the provided
ConnectionState over the state_tx channel and logs the change for the given
pipeline_id; reference the method name update_state, the ConnectionState type,
the state_tx sender, and the debug log so reviewers can verify the doc matches
behavior and any side effects (non-blocking send, possible ignore on failure).

- Merge upstream's doc comment improvements from PR supabase#541
- Keep heartbeat fields (primary_connection, heartbeat)
- Keep helper methods (is_replica_mode, heartbeat_config)
- Fix corrupted doc comment on TableSyncCopyConfig
- Keep upstream's removal of TlsConfig::validate()
- Add ETL_HEARTBEAT_OPTIONS and APP_NAME_HEARTBEAT constant
- Add heartbeat options test
Upstream changed ValidationError to use a generic InvalidFieldValue variant
instead of specific error variants. Align with upstream to resolve conflict.
Keep upstream's verbose documentation while adding heartbeat functionality:
- lib.rs: Add primary_connection and heartbeat to example
- metrics.rs: Add heartbeat metrics constants with upstream's doc style
- pipeline.rs: Add heartbeat worker support with upstream's doc style
- Add primary_connection and heartbeat to PipelineConfig in test builder
- Add heartbeat module to workers/mod.rs
- Keep upstream's doc comments
…amples

Sync PipelineConfig usage in benchmark and example files with upstream + heartbeat fields
…ation

- Fix jitter calculation bug: divide by 1_000_000_000.0 instead of u32::MAX
- Add documentation to HeartbeatError, ConnectionState, HeartbeatWorkerHandle
- Resolve conflicts in connection.rs: keep doc comments, remove serde attributes
- Resolve conflicts in pipeline.rs: merge upstream structure with heartbeat additions
- Resolve conflicts in test_utils/pipeline.rs: keep upstream doc comments

Rebased onto upstream/main to resolve merge conflicts.
- Add HeartbeatWorker implementation with exponential backoff and jitter
- Add heartbeat metrics constants for monitoring
- Fix jitter calculation: use 1_000_000_000.0 for proper normalization
- Add primary_connection and heartbeat fields to PipelineConfig
- Add is_replica_mode() and heartbeat_config() helper methods
- Add ETL_HEARTBEAT_OPTIONS for heartbeat connections
- Update PipelineConfigWithoutSecrets with heartbeat fields
- Add APP_NAME_HEARTBEAT constant
- Add ETL_HEARTBEAT_OPTIONS with 5s statement/lock timeouts
- Add test for heartbeat options
- Add HeartbeatWorkerHandle to PipelineState::Started
- Start heartbeat worker when primary_connection is configured
- Wait for heartbeat worker during pipeline shutdown
- Update lib.rs docs with heartbeat fields
- Add HeartbeatWorkerHandle to PipelineState
- Start heartbeat worker in replica mode
- Wait for heartbeat worker in shutdown sequence
- Add heartbeat fields to PipelineBuilder
- Add connect_regular() method for non-replication connections
- Add connect_regular_tls() and connect_regular_no_tls() helpers
- Use ETL_HEARTBEAT_OPTIONS for heartbeat connections
@JohnCari
Copy link
Author

Closing this PR in favor of #554 which has a clean branch history rebased on current main.

The new PR includes:

  • Fixed jitter calculation bug (was dividing by u32::MAX instead of 1_000_000_000)
  • Added documentation to all public types as suggested by CodeRabbit
  • Clean merge with upstream changes

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Support for replication slots on read replica

2 participants