-
-
Notifications
You must be signed in to change notification settings - Fork 138
feat(etl): add HeartbeatWorker for read replica replication slot support #562
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Adds HeartbeatWorker to maintain replication slot activity when replicating from read replicas (PostgreSQL 15+). New components: - HeartbeatWorker: Background worker connecting to primary in regular mode - HeartbeatConfig: Configurable interval (30s), backoff (1-60s), jitter (25%) - connect_regular(): Non-replication connection method - connect_with_tls(): Shared TLS helper to reduce code duplication Key features: - Exponential backoff with jitter on connection failures - Graceful shutdown via tokio watch channel - Comprehensive metrics for monitoring - Config validation (interval > 0, jitter <= 100, min <= max backoff) Closes supabase#535
- Add primary_connection and heartbeat fields to PipelineConfig - Add HeartbeatWorkerPanic error kind - Add heartbeat metrics constants
|
Warning Rate limit exceeded@JohnCari has exceeded the limit for the number of commits that can be reviewed per hour. Please wait 16 minutes and 26 seconds before requesting another review. ⌛ How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. 📒 Files selected for processing (2)
📝 WalkthroughWalkthroughAdds heartbeat-based read-replica support: new HeartbeatConfig, pipeline fields for replica mode, a HeartbeatWorker that emits pg_logical_emit_message over regular DB connections with backoff/jitter and metrics, replication client support for regular connections, and integration into pipeline lifecycle and metrics. Changes
Sequence Diagram(s)sequenceDiagram
participant Pipeline as PipelineState
participant HW as HeartbeatWorker
participant RC as PgReplicationClient
participant DB as PostgreSQL
Pipeline->>HW: start() (when primary_connection present)
HW->>RC: connect_regular(primary_config)
RC->>DB: establish connection (TLS/no-TLS)
DB-->>RC: connection established
RC-->>HW: client returned
HW->>HW: reset consecutive failures
loop every interval or after backoff
HW->>DB: emit_heartbeat() (pg_logical_emit_message)
DB-->>HW: success / error
alt Success
HW->>HW: record_emission(), update timestamp (metrics)
else Error
HW->>HW: record_failure(), increment consecutive failures
HW->>HW: calculate_backoff(), apply_jitter()
HW->>HW: wait_with_shutdown(backoff)
HW->>RC: reconnect (retry)
end
end
Pipeline->>HW: shutdown requested -> HW stops and Pipeline.wait() awaits handle
Estimated code review effort🎯 4 (Complex) | ⏱️ ~50 minutes Possibly related PRs
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 4
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
etl/src/workers/mod.rs (1)
1-9: Consider updating the module documentation to include heartbeat workers.The module doc comment describes apply workers, table sync workers, and worker pools, but doesn't mention the newly added heartbeat worker. As per coding guidelines, documentation should cover all items.
📝 Suggested documentation update
//! Worker implementations for concurrent replication tasks. //! -//! Contains worker types for handling different aspects of replication: apply workers process -//! replication streams, table sync workers handle initial data copying, and worker pools manage -//! concurrent execution and lifecycle coordination. +//! Contains worker types for handling different aspects of replication: apply workers process +//! replication streams, table sync workers handle initial data copying, heartbeat workers maintain +//! replication slot activity on primaries when replicating from read replicas, and worker pools +//! manage concurrent execution and lifecycle coordination.
🤖 Fix all issues with AI agents
In @etl-config/src/shared/heartbeat.rs:
- Around line 55-81: In the validate method of the Heartbeat config (fn
validate), add a check that rejects zero or negative backoff by returning
ValidationError::InvalidFieldValue for "min_backoff_ms" when self.min_backoff_ms
== 0 (or <= 0) with a constraint message like "must be greater than 0"; keep the
existing min_backoff_ms <= max_backoff_ms check so that this implicitly ensures
max_backoff_ms is also > 0.
In @etl-config/src/shared/pipeline.rs:
- Around line 71-79: The heartbeat validation is being run even when replica
mode is disabled; update the config validation code (the function that currently
validates `heartbeat` around lines 147-150) to only validate `heartbeat` when
`primary_connection.is_some()` (i.e., replica mode enabled). Concretely, wrap
the existing `heartbeat` validation logic in an if-check on
`primary_connection`, or alternatively return a clear error/warning if
`heartbeat` is set while `primary_connection` is None; reference
`primary_connection` and `heartbeat` in the change so the intent is explicit.
In @etl/src/metrics.rs:
- Around line 18-23: The comment above the heartbeat metric constants ("//
Heartbeat metrics") is missing a trailing period; update that comment to end
with a period (e.g., change it to "// Heartbeat metrics.") so it adheres to Rust
comment style; locate the block that defines ETL_HEARTBEAT_EMISSIONS_TOTAL,
ETL_HEARTBEAT_FAILURES_TOTAL, ETL_HEARTBEAT_CONSECUTIVE_FAILURES,
ETL_HEARTBEAT_CONNECTION_ATTEMPTS_TOTAL, and
ETL_HEARTBEAT_LAST_EMISSION_TIMESTAMP and update the comment accordingly, then
run rustfmt to ensure formatting consistency.
In @etl/src/workers/heartbeat.rs:
- Around line 219-242: Change heartbeat_loop so it emits an initial heartbeat
immediately instead of waiting the full interval: call
self.emit_heartbeat(client).await? once before entering the interval wait, or
restructure the loop to emit first then call wait_with_shutdown(interval).await;
ensure you still check client.is_closed() (use client.is_closed() before
emitting) and handle shutdown (if wait_with_shutdown returns true after emitting
then return Ok(())), keeping all existing error handling and the use of
HeartbeatError unchanged.
📜 Review details
Configuration used: Repository UI
Review profile: ASSERTIVE
Plan: Pro
📒 Files selected for processing (8)
etl-config/src/shared/heartbeat.rsetl-config/src/shared/mod.rsetl-config/src/shared/pipeline.rsetl/src/error.rsetl/src/metrics.rsetl/src/replication/client.rsetl/src/workers/heartbeat.rsetl/src/workers/mod.rs
🧰 Additional context used
📓 Path-based instructions (1)
**/*.rs
📄 CodeRabbit inference engine (AGENTS.md)
**/*.rs: Usesnake_casefor files and modules
UseCamelCasefor types and traits
Usesnake_casefor functions and variables
Do not leave comments when you remove things
Document all items, public and private, using stdlib tone and precision; only use 'Panics' section when a function can panic
Link types and methods as [Type], [Type::method] in Rust documentation comments
Keep Rust documentation wording concise, correct, and punctuated; reword for clarity while preserving intent
Do not include code examples in Rust documentation; include private helpers for maintainers; apply documentation to modules, types, traits, impls, and functions
Normal comments in Rust should always finish with a period
Files:
etl-config/src/shared/mod.rsetl/src/workers/mod.rsetl/src/replication/client.rsetl-config/src/shared/pipeline.rsetl/src/metrics.rsetl/src/workers/heartbeat.rsetl-config/src/shared/heartbeat.rsetl/src/error.rs
🧠 Learnings (3)
📚 Learning: 2026-01-13T07:51:54.159Z
Learnt from: CR
Repo: supabase/etl PR: 0
File: AGENTS.md:0-0
Timestamp: 2026-01-13T07:51:54.159Z
Learning: Rust workspace with crates: `etl/` (core), `etl-api/` (HTTP API), `etl-postgres/`, `etl-destinations/`, `etl-replicator/`, `etl-config/`, `etl-telemetry/`, `etl-examples/`, `etl-benchmarks/`
Applied to files:
etl-config/src/shared/mod.rsetl/src/replication/client.rsetl/src/metrics.rsetl/src/workers/heartbeat.rsetl/src/error.rs
📚 Learning: 2026-01-13T07:51:54.158Z
Learnt from: CR
Repo: supabase/etl PR: 0
File: AGENTS.md:0-0
Timestamp: 2026-01-13T07:51:54.158Z
Learning: Applies to **/*.rs : Document all items, public and private, using stdlib tone and precision; only use 'Panics' section when a function can panic
Applied to files:
etl/src/error.rs
📚 Learning: 2026-01-13T07:51:54.159Z
Learnt from: CR
Repo: supabase/etl PR: 0
File: AGENTS.md:0-0
Timestamp: 2026-01-13T07:51:54.159Z
Learning: Tests live per crate (`src` unit tests, `tests` integration); benches in `etl-benchmarks/benches/`
Applied to files:
etl/src/error.rs
🧬 Code graph analysis (1)
etl/src/workers/heartbeat.rs (2)
etl-config/src/shared/pipeline.rs (1)
heartbeat_config(119-125)etl/src/replication/client.rs (1)
connect_regular(163-168)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: depthfirst Bot
🔇 Additional comments (26)
etl-config/src/shared/mod.rs (1)
5-5: LGTM!The heartbeat module declaration and public re-export follow the established pattern used by other modules in this file.
Also applies to: 15-15
etl/src/replication/client.rs (2)
46-58: LGTM on the DRY refactoring for TLS root certificate store construction.The helper correctly consolidates the TLS root store building logic, reducing duplication between replication and regular connection paths.
159-168: LGTM!The
connect_regularmethod and its TLS/non-TLS variants are well-structured. Returning the rawtokio_postgres::Clientinstead ofPgReplicationClientis appropriate since heartbeat connections only need to execute simple SQL commands without replication features. The connection lifecycle monitoring viaspawn_postgres_connectionensures proper cleanup.Also applies to: 220-252
etl/src/metrics.rs (1)
106-136: LGTM on the metric registrations.The metric types and units are correctly chosen:
- Counters for cumulative totals (emissions, failures, connection attempts)
- Gauges for current state (consecutive failures, last timestamp)
Unit::Secondsappropriately used for the timestamp gaugeMinor: The comment on line 107 should also end with a period for consistency.
etl-config/src/shared/heartbeat.rs (3)
8-40: LGTM on the HeartbeatConfig structure.The struct is well-documented with clear field descriptions. The serde defaults ensure backward compatibility when fields are omitted from configuration.
122-173: LGTM on test coverage.The tests effectively cover default values, configuration validation edge cases, and the options string content. Good use of struct update syntax with
..Default::default()for focused test cases.
95-109: Implementation is idiomatic.The serde default helper functions correctly delegate to the associated constants, maintaining a single source of truth for default values.
etl-config/src/shared/pipeline.rs (5)
5-8: LGTM!Import changes correctly bring in
HeartbeatConfigfor the new replica mode fields.
10-12: LGTM!Documentation wording is clear and concise.
107-125: LGTM!The replica mode detection and heartbeat config accessor methods are well-designed. The use of
unwrap_or_default()ensures sensible defaults when heartbeat config is not explicitly provided.
183-186: LGTM!Field types correctly use the secrets-stripped variant for connection config while keeping
HeartbeatConfigas-is since it contains no sensitive data.
210-211: LGTM!Conversion correctly strips secrets from
primary_connectionand passes throughheartbeatunchanged.etl/src/error.rs (1)
134-137: LGTM!The new
HeartbeatWorkerPanicvariant follows the established pattern for worker panic error kinds and is appropriately placed in the "State & Workflow Errors" section.etl/src/workers/heartbeat.rs (13)
1-23: LGTM!Module documentation clearly explains the purpose and context (PostgreSQL 15+ read replicas). Imports are well-organized and appropriate for the functionality.
25-39: LGTM!Error variants appropriately cover the main failure modes. Using
Stringfor error details is acceptable here as these errors are primarily used for logging and metrics rather than programmatic error handling.
48-70: LGTM!The
WorkerHandleimplementation correctly handles the join operation and maps panics to the appropriate error kind. TheOption<JoinHandle>pattern safely handles the case wherewait()is called multiple times.
72-83: LGTM!Struct fields capture all necessary state for heartbeat operation. The
consecutive_failurescounter is appropriately typed asu32and the backoff calculation uses saturating arithmetic to handle edge cases.
85-107: LGTM!Constructor is straightforward and well-documented. Validation is appropriately delegated to the config layer.
109-120: LGTM!The start method follows the standard tokio spawn pattern, moving ownership into the background task and returning a handle for lifecycle management.
122-183: LGTM!The main loop is well-structured with comprehensive shutdown handling. The backoff-on-error pattern with metrics tracking provides good observability. Resetting the consecutive failures gauge on shutdown is a nice touch for clean metrics.
185-217: LGTM!Connection logic correctly uses regular (non-replication) mode via
connect_regular. The consecutive failures counter is appropriately reset on successful connection, and metrics are updated correctly.
244-266: LGTM!The heartbeat emission correctly uses
pg_logical_emit_message(false, ...)for non-transactional WAL records. The comment at line 262 appropriately explains why failure recording is delegated to the caller.
268-280: LGTM!The exponential backoff calculation is correct with appropriate use of saturating arithmetic to prevent overflow. The formula
min * 2^(failures-1)ensures the first retry usesmin_backoff.
282-299: Non-random jitter may reduce effectiveness.The jitter implementation uses
subsec_nanos()which provides time-based determinism rather than randomness. If multiple workers start at similar times, they may compute similar jitter values. For heartbeat purposes this is likely acceptable, but consider usingrandcrate if stronger distribution is needed.Also note the jitter is symmetric (±jitter_percent) rather than the more common additive-only pattern (0 to +jitter_percent), which means backoff can be shorter than the base.
301-317: LGTM!Shutdown handling is correctly implemented. The
tokio::select!properly races the sleep against shutdown signals, and the logic correctly handles both channel closure and value changes.
319-357: LGTM!Metrics methods provide comprehensive observability with appropriate counter and gauge usage. The
unwrap_or_default()for timestamp calculation safely handles edge cases.
- Add min_backoff_ms > 0 validation in HeartbeatConfig - Only validate heartbeat config when replica mode is enabled - Add period to heartbeat metrics comment - Emit initial heartbeat immediately before waiting for interval
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🤖 Fix all issues with AI agents
In @etl-config/src/shared/heartbeat.rs:
- Around line 119-128: ETL_HEARTBEAT_OPTIONS currently relies on a trailing
space in the first string to separate options when using concat! which is
fragile; update the concat! invocation for ETL_HEARTBEAT_OPTIONS so each
concatenated literal includes an explicit leading or trailing space (e.g., add a
space before "statement_timeout", "lock_timeout", and
"idle_in_transaction_session_timeout") to ensure options are always separated
regardless of string ordering or edits.
In @etl/src/workers/heartbeat.rs:
- Around line 270-301: The apply_jitter function currently uses
SystemTime::subsec_nanos() for pseudo-randomness; replace that with the rand
crate by importing rand::Rng and using rand::thread_rng() to sample a f64 in the
range -1.0..1.0 (or directly sample a uniform value in
-jitter_range..jitter_range) and multiply by jitter_range to produce jitter;
update apply_jitter to compute jitter using thread_rng() and preserve the
existing clamping to a minimum duration, ensuring calculate_backoff continues to
call apply_jitter unchanged and that heartbeat_config.jitter_percent is used to
compute jitter_range.
📜 Review details
Configuration used: Repository UI
Review profile: ASSERTIVE
Plan: Pro
📒 Files selected for processing (4)
etl-config/src/shared/heartbeat.rsetl-config/src/shared/pipeline.rsetl/src/metrics.rsetl/src/workers/heartbeat.rs
🧰 Additional context used
📓 Path-based instructions (1)
**/*.rs
📄 CodeRabbit inference engine (AGENTS.md)
**/*.rs: Usesnake_casefor files and modules
UseCamelCasefor types and traits
Usesnake_casefor functions and variables
Do not leave comments when you remove things
Document all items, public and private, using stdlib tone and precision; only use 'Panics' section when a function can panic
Link types and methods as [Type], [Type::method] in Rust documentation comments
Keep Rust documentation wording concise, correct, and punctuated; reword for clarity while preserving intent
Do not include code examples in Rust documentation; include private helpers for maintainers; apply documentation to modules, types, traits, impls, and functions
Normal comments in Rust should always finish with a period
Files:
etl/src/metrics.rsetl-config/src/shared/pipeline.rsetl/src/workers/heartbeat.rsetl-config/src/shared/heartbeat.rs
🧠 Learnings (3)
📚 Learning: 2026-01-13T07:51:54.158Z
Learnt from: CR
Repo: supabase/etl PR: 0
File: AGENTS.md:0-0
Timestamp: 2026-01-13T07:51:54.158Z
Learning: Applies to **/*.rs : Normal comments in Rust should always finish with a period
Applied to files:
etl/src/metrics.rs
📚 Learning: 2026-01-13T07:51:54.158Z
Learnt from: CR
Repo: supabase/etl PR: 0
File: AGENTS.md:0-0
Timestamp: 2026-01-13T07:51:54.158Z
Learning: Applies to **/*.rs : Keep Rust documentation wording concise, correct, and punctuated; reword for clarity while preserving intent
Applied to files:
etl/src/metrics.rs
📚 Learning: 2026-01-13T07:51:54.159Z
Learnt from: CR
Repo: supabase/etl PR: 0
File: AGENTS.md:0-0
Timestamp: 2026-01-13T07:51:54.159Z
Learning: Rust workspace with crates: `etl/` (core), `etl-api/` (HTTP API), `etl-postgres/`, `etl-destinations/`, `etl-replicator/`, `etl-config/`, `etl-telemetry/`, `etl-examples/`, `etl-benchmarks/`
Applied to files:
etl/src/metrics.rsetl/src/workers/heartbeat.rs
🧬 Code graph analysis (2)
etl/src/workers/heartbeat.rs (2)
etl-config/src/shared/pipeline.rs (1)
heartbeat_config(119-125)etl/src/replication/client.rs (1)
connect_regular(163-168)
etl-config/src/shared/heartbeat.rs (1)
etl-config/src/shared/pipeline.rs (1)
validate(130-155)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: depthfirst Bot
🔇 Additional comments (20)
etl-config/src/shared/heartbeat.rs (4)
1-40: LGTM! Well-structured configuration type with comprehensive documentation.The struct follows Rust conventions with proper derive macros for serialization/deserialization, optional utoipa integration via feature flag, and clear documentation explaining the purpose and defaults for each field.
42-90: Validation logic is thorough and covers all edge cases.The
validate()method properly checks all constraints documented in the doc comment. The validation order is logical (interval → jitter → min_backoff → min vs max comparison).
92-117: LGTM! Default implementation and serde helpers are consistent.The
Defaultimplementation correctly uses the associated constants, and the serde default functions properly delegate to these constants, ensuring a single source of truth.
130-191: LGTM! Test coverage is comprehensive.Tests validate default values, the heartbeat options constant, and all validation error cases (zero interval, jitter > 100, min > max backoff, zero min backoff).
etl-config/src/shared/pipeline.rs (5)
5-8: LGTM! Import organization is correct.The
HeartbeatConfigtype is properly imported alongside other shared types.
71-79: LGTM! New replica mode fields are well-documented.The
primary_connectionandheartbeatfields are properly documented with clear explanations of when they're used and their relationship to each other.
107-125: LGTM! Helper methods provide clean API for replica mode handling.
is_replica_mode()is a clear predicate, andheartbeat_config()provides sensible defaulting behavior when replica mode is enabled but no explicit heartbeat config is provided.
147-152: LGTM! Conditional validation is correct.Heartbeat config is only validated when replica mode is enabled (primary_connection is set), and only if a heartbeat config is actually provided. This aligns with the PR requirements.
185-188: LGTM! PipelineConfigWithoutSecrets properly mirrors the new fields.The secrets-free variant correctly includes both new fields, and the
Fromimplementation properly mapsprimary_connectionthrough the secrets-stripping conversion while passingheartbeatdirectly (since it contains no secrets).Also applies to: 212-213
etl/src/metrics.rs (2)
18-24: LGTM! Metric constants follow established naming conventions.The new heartbeat metrics use the
etl_heartbeat_prefix consistently and are logically grouped with a clear comment. Based on learnings, the comment properly ends with a period.
106-136: LGTM! Metric registrations are appropriate for the metric semantics.
- Counters for monotonic values (emissions, failures, connection attempts) - correct.
- Gauge for consecutive failures (resets to 0 on success) - correct.
- Gauge with
Unit::Secondsfor timestamp - correct.The descriptions are clear and consistent with other metrics in the file.
etl/src/workers/heartbeat.rs (9)
1-24: LGTM! Module documentation and imports are well-organized.The module-level documentation clearly explains the purpose (maintaining replication slot activity on the primary when replicating from a read replica) and the mechanism used (
pg_logical_emit_message()).
25-39: LGTM! Error types are well-defined with clear descriptions.The
HeartbeatErrorenum covers the expected failure modes: connection failures, emission failures, and graceful shutdown. Usingthiserrorfor error derivation is idiomatic.
41-70: LGTM! WorkerHandle implementation is correct.The handle properly wraps the JoinHandle and maps panics to the appropriate error kind. The
state()method returning unit type makes sense since the heartbeat worker has no external state to expose.
72-107: LGTM! Worker struct and constructor are straightforward.The struct holds all necessary state for heartbeat operation, and the constructor initializes consecutive failures to 0 as expected.
109-183: LGTM! Main loop handles all lifecycle states correctly.The
run()method properly:
- Checks for shutdown before connection attempts.
- Handles graceful shutdown from
heartbeat_loop().- Implements backoff with jitter on errors.
- Resets the consecutive failures gauge on shutdown.
The logging provides good observability into the worker's state.
185-217: LGTM! Connection handling is robust.The method records connection attempts, logs connection details, and properly resets consecutive failures on success. The connection is made in regular (non-replication) mode as appropriate for executing
pg_logical_emit_message().
219-244: Consider the behavior when emit fails after successful connection.The
heartbeat_loop()emits immediately on entry then waits for the interval. However, ifemit_heartbeat()fails (returnsEmitFailed), the error propagates up toconnect_and_heartbeat(), which returns it torun(). Inrun(), this incrementsconsecutive_failureseven though the connection was successful (consecutive_failures was reset to 0 on line 212).This is actually correct behavior since consecutive failures should track consecutive heartbeat failures, not just connection failures. Just confirming the design is intentional.
303-319: LGTM! Shutdown handling is cooperative and correct.
is_shutdown_requested()checks the current state, whilewait_with_shutdown()usestokio::select!to race the sleep against the shutdown signal. The return value semantics (true = shutdown, false = timeout) are clear from the documentation.
321-359: LGTM! Metrics recording is consistent with the registered metrics.The methods correctly use counters for emissions and failures, and gauges for consecutive failures and timestamp. The pipeline_id label ensures metrics are properly attributed.
- ETL_HEARTBEAT_OPTIONS: use explicit leading spaces for each option to avoid fragile trailing space dependencies - apply_jitter: replace SystemTime::subsec_nanos() pseudo-randomness with rand::thread_rng().gen_range() for proper randomization
- Add HeartbeatWorker import and state tracking in pipeline - Start heartbeat worker when replica mode enabled (primary_connection set) - Wait for heartbeat worker during shutdown - Move rand from dev-dependencies to dependencies for production use - Add primary_connection and heartbeat fields to test utils PipelineBuilder
- Add primary_connection: None, heartbeat: None to benchmark PipelineConfig - Add primary_connection: None, heartbeat: None to example PipelineConfig - Update lib.rs documentation example with new fields - Add read replica support mention to lib.rs features
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🤖 Fix all issues with AI agents
In @etl/src/pipeline.rs:
- Around line 259-265: The heartbeat worker failure is logged but not appended
to the shared errors vector like apply and table sync workers; update the
section around heartbeat_worker and heartbeat_handle.wait().await to push the
error into the errors collection when Err(err) occurs (e.g.,
errors.push(err.into()) or map the error type to the same Error type used by
apply/table sync) so heartbeat failures are treated consistently, or
alternatively add a short comment above the heartbeat_worker block explaining
that heartbeat failures are intentionally non-fatal and will not be added to
errors.
- Around line 175-188: self.config.heartbeat_config() returns
Option<HeartbeatConfig> but HeartbeatWorker::new expects a HeartbeatConfig;
inside the existing if let Some(primary_config) { ... } block you should unwrap
the option before passing it to HeartbeatWorker::new (e.g., map to a local
heartbeat_config via matching or call .expect/.unwrap with a clear message), so
replace the direct call to self.config.heartbeat_config() with an unwrapped
HeartbeatConfig and pass that into HeartbeatWorker::new (refer to
heartbeat_config(), HeartbeatWorker::new, primary_config, and heartbeat_worker
to locate the code).
📜 Review details
Configuration used: Repository UI
Review profile: ASSERTIVE
Plan: Pro
📒 Files selected for processing (5)
etl-config/src/shared/heartbeat.rsetl/Cargo.tomletl/src/pipeline.rsetl/src/test_utils/pipeline.rsetl/src/workers/heartbeat.rs
🧰 Additional context used
📓 Path-based instructions (2)
**/*.rs
📄 CodeRabbit inference engine (AGENTS.md)
**/*.rs: Usesnake_casefor files and modules
UseCamelCasefor types and traits
Usesnake_casefor functions and variables
Do not leave comments when you remove things
Document all items, public and private, using stdlib tone and precision; only use 'Panics' section when a function can panic
Link types and methods as [Type], [Type::method] in Rust documentation comments
Keep Rust documentation wording concise, correct, and punctuated; reword for clarity while preserving intent
Do not include code examples in Rust documentation; include private helpers for maintainers; apply documentation to modules, types, traits, impls, and functions
Normal comments in Rust should always finish with a period
Files:
etl/src/test_utils/pipeline.rsetl/src/pipeline.rsetl-config/src/shared/heartbeat.rsetl/src/workers/heartbeat.rs
**/Cargo.toml
📄 CodeRabbit inference engine (AGENTS.md)
Use
kebab-casefor crate names
Files:
etl/Cargo.toml
🧠 Learnings (2)
📚 Learning: 2026-01-13T07:51:54.159Z
Learnt from: CR
Repo: supabase/etl PR: 0
File: AGENTS.md:0-0
Timestamp: 2026-01-13T07:51:54.159Z
Learning: Rust workspace with crates: `etl/` (core), `etl-api/` (HTTP API), `etl-postgres/`, `etl-destinations/`, `etl-replicator/`, `etl-config/`, `etl-telemetry/`, `etl-examples/`, `etl-benchmarks/`
Applied to files:
etl/Cargo.tomletl/src/pipeline.rsetl/src/workers/heartbeat.rs
📚 Learning: 2026-01-13T07:51:54.159Z
Learnt from: CR
Repo: supabase/etl PR: 0
File: AGENTS.md:0-0
Timestamp: 2026-01-13T07:51:54.159Z
Learning: Build with `cargo build --workspace --all-targets --all-features`
Applied to files:
etl/Cargo.toml
🧬 Code graph analysis (1)
etl/src/pipeline.rs (1)
etl-config/src/shared/pipeline.rs (2)
is_replica_mode(112-114)heartbeat_config(119-125)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: depthfirst Bot
🔇 Additional comments (22)
etl/Cargo.toml (2)
35-35: LGTM!Adding
thiserroras a runtime dependency is appropriate for the new heartbeat error handling introduced in this PR.
31-31: Thethread_rngfeature is valid and properly configured. Therandcrate v0.9.2 includes an explicitthread_rngfeature (which is a default feature), and since the workspace definition disables default features (default-features = false), explicitly enabling it inetl/Cargo.tomlis correct and necessary.etl-config/src/shared/heartbeat.rs (5)
1-40: LGTM! Well-structured configuration with good documentation.The struct is well-documented with clear field descriptions including default values. The use of serde defaults and conditional utoipa schema derivation is appropriate.
55-89: LGTM! Validation logic is comprehensive and correct.The validation covers all edge cases: zero interval, excessive jitter, zero minimum backoff, and inverted backoff bounds. The documentation accurately describes the checks.
92-117: LGTM!Default implementation and serde helper functions correctly use the associated constants, ensuring consistency.
119-128: LGTM! Well-documented connection options with appropriate timeouts.The short timeouts (5s statement/lock) are appropriate for heartbeat operations that should fail fast, while the 30s idle timeout provides reasonable cleanup for connection issues.
130-195: LGTM! Comprehensive test coverage.Tests cover all validation scenarios including edge cases (zero interval, excessive jitter, inverted backoff bounds, zero minimum backoff) and verify default values and options string content.
etl/src/workers/heartbeat.rs (11)
1-24: LGTM! Clear module documentation and appropriate imports.The module documentation clearly explains the purpose (maintaining replication slot activity when replicating from read replicas) and mentions the PostgreSQL 15+ requirement.
26-71: LGTM! Error types and handle implementation are well-structured.The error variants cover the expected failure modes. The
wait()implementation properly handles the case where the handle was already consumed.
73-108: LGTM! Worker structure and constructor are well-defined.The fields capture all necessary state for heartbeat operations, and the constructor properly initializes the worker.
110-184: LGTM! Worker lifecycle is well-implemented.The main loop properly handles shutdown checks before connection attempts, distinguishes between clean shutdown and errors, and resets metrics on termination. The backoff wait is also shutdown-aware.
186-218: LGTM! Connection and heartbeat flow is correct.The method properly records connection attempts, uses the regular (non-replication) connection mode, and resets consecutive failures only after a successful connection.
220-245: LGTM! Heartbeat loop is well-structured.The loop properly checks connection health before each emission and cleanly exits on shutdown. The immediate first heartbeat (before waiting) is a good pattern for quick feedback.
247-269: LGTM! Heartbeat emission uses the correct PostgreSQL function.Using
pg_logical_emit_message(false, ...)with non-transactional mode ensures immediate WAL visibility. The empty payload minimizes overhead. The comment at line 265 helpfully explains the metric recording responsibility.
271-283: LGTM! Backoff calculation is correct with proper overflow protection.The exponential backoff formula
min * 2^(failures-1)correctly starts at the minimum backoff for the first failure and doubles thereafter, with saturation arithmetic preventing overflow.
285-299: LGTM! Jitter implementation is correct.The symmetric jitter around the base backoff using
rand::thread_rng().gen_range()properly randomizes retry timing to prevent thundering herd. The 0.1s minimum floor is a sensible safety bound.
301-317: LGTM! Shutdown coordination is properly implemented.The
tokio::select!pattern correctly races the sleep against shutdown signals, and the result handling covers both channel closure and explicit shutdown requests.
319-357: LGTM! Metrics are properly recorded.The emission timestamp correctly uses Unix epoch seconds, and the failure tracking properly separates the counter (total failures) from the gauge (current consecutive failures).
etl/src/pipeline.rs (3)
19-19: LGTM!Import for heartbeat worker types is correctly added.
37-44: LGTM!The optional
heartbeat_workerfield correctly represents that heartbeat is only active in replica mode.
58-61: LGTM!Documentation clearly explains the replica mode behavior with heartbeat worker.
etl/src/test_utils/pipeline.rs (1)
152-153: LGTM! Test utilities correctly default to non-replica mode.Setting
primary_connection: Noneandheartbeat: Noneensures test pipelines operate in the standard (non-replica) mode. Consider adding builder methods likewith_primary_connection()andwith_heartbeat()if replica mode testing becomes necessary in the future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In @etl-examples/src/main.rs:
- Around line 184-185: Add short inline comments for the new replica-mode fields
to clarify their purpose for example readers: annotate primary_connection to
indicate it holds the primary DB connection used for writes (or None when
running as a read replica) and annotate heartbeat to explain it stores the
optional heartbeat/health-check info used to determine replica liveness and
failover readiness; place these comments next to the primary_connection and
heartbeat fields in the struct/initializer where they appear (e.g.,
primary_connection: None, heartbeat: None).
📜 Review details
Configuration used: Repository UI
Review profile: ASSERTIVE
Plan: Pro
📒 Files selected for processing (3)
etl-benchmarks/benches/table_copies.rsetl-examples/src/main.rsetl/src/lib.rs
🧰 Additional context used
📓 Path-based instructions (1)
**/*.rs
📄 CodeRabbit inference engine (AGENTS.md)
**/*.rs: Usesnake_casefor files and modules
UseCamelCasefor types and traits
Usesnake_casefor functions and variables
Do not leave comments when you remove things
Document all items, public and private, using stdlib tone and precision; only use 'Panics' section when a function can panic
Link types and methods as [Type], [Type::method] in Rust documentation comments
Keep Rust documentation wording concise, correct, and punctuated; reword for clarity while preserving intent
Do not include code examples in Rust documentation; include private helpers for maintainers; apply documentation to modules, types, traits, impls, and functions
Normal comments in Rust should always finish with a period
Files:
etl/src/lib.rsetl-benchmarks/benches/table_copies.rsetl-examples/src/main.rs
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: depthfirst Bot
🔇 Additional comments (3)
etl/src/lib.rs (2)
19-19: LGTM!The feature description is concise and follows the existing documentation style.
81-82: LGTM!The example correctly demonstrates the new optional fields for replica mode configuration, maintaining consistency with the updated
PipelineConfigAPI.etl-benchmarks/benches/table_copies.rs (1)
348-349: LGTM!The new optional fields are correctly set to
Nonefor the benchmark, which doesn't require replica mode functionality.
- Unwrap heartbeat_config() with expect() since it's guaranteed to be Some when primary_connection is set - Add comment explaining that heartbeat failures are intentionally non-fatal
Clarifies purpose of primary_connection and heartbeat fields for example readers.
|
Hi, regarding what we discussed yesterday, what I would like is to verify two things:
Thanks a lot for spending your time on this! It will be a nice feature to have if we manage to implement this properly. |
No problem!, Happy to help I love supabase its a great product! I have to head outside but when i return in a few hours I will be happy to answer those questions |
|
Hi @iambriccardo, here are my answers: 1. How does the system work with no traffic? Is the heartbeat needed?Yes, the heartbeat is needed. Here's why: When there's no write traffic, the replication slot on the primary can't advance — it has no WAL entries to confirm. The slot stagnates → WAL piles up → eventually hits
This is exactly what Sequin does: "The only operation Sequin performs on the primary is to call 2. Does it work from Postgres 16+? Logical or physical replication?
So the flow is: Both primary and standby need Happy to add docs or tests if that would help! |
Summary
Adds
HeartbeatWorkerto maintain replication slot activity when replicating from read replicas.Closes #535
Problem
When using logical replication from a read replica (PostgreSQL 15+), the replication slot on the primary can become inactive during idle periods since WAL is generated on the primary but consumed from the replica. This can lead to slot invalidation.
Solution
A heartbeat mechanism that periodically emits
pg_logical_emit_message(false, 'etl_heartbeat', '')to the primary, keeping the slot active.Implementation
New Components:
HeartbeatWorker- Background worker connecting to primary in regular modeHeartbeatConfig- Configurable interval (30s), backoff (1-60s), jitter (25%)HeartbeatWorkerHandle- ImplementsWorkerHandle<()>traitKey Features:
Connection Handling:
connect_regular()for non-replication connectionsbuild_root_cert_store()shared helper (reduces TLS code duplication)ETL_HEARTBEAT_OPTIONSwith short timeouts (5s statement, 5s lock)CodeRabbit Feedback Addressed (from #549)
build_root_cert_store()Configuration
Metrics
etl_heartbeat_emissions_totaletl_heartbeat_failures_totaletl_heartbeat_consecutive_failuresetl_heartbeat_connection_attempts_totaletl_heartbeat_last_emission_timestampTest Plan
Related
Summary by CodeRabbit
New Features
Chores
Documentation
✏️ Tip: You can customize this high-level summary in your review settings.