-
-
Notifications
You must be signed in to change notification settings - Fork 138
feat(etl): add HeartbeatWorker for read replica replication slot support #560
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Adds HeartbeatWorker to maintain replication slot activity when replicating from read replicas (PostgreSQL 15+). The heartbeat mechanism periodically emits pg_logical_emit_message() calls to the primary to keep the slot active. Changes: - New HeartbeatConfig in etl-config/src/shared/heartbeat.rs - New HeartbeatWorker in etl/src/workers/heartbeat.rs - Pipeline integration with primary_connection and heartbeat fields - Regular (non-replication) connection methods in client.rs - Heartbeat metrics constants in metrics.rs - HeartbeatWorkerPanic error kind in error.rs
- HeartbeatWorker with WorkerHandle<()> trait implementation - Heartbeat metrics constants - connect_regular() methods for non-replication connections - HeartbeatWorkerPanic error kind
📝 WalkthroughWalkthroughAdds a configurable heartbeat subsystem and worker to emit WAL activity for primary DB replication slots, extends pipeline config for replica mode, simplifies the replication client connection surface, updates metrics to heartbeat-focused counters, and narrows public exports from the shared config module. Changes
Sequence Diagram(s)sequenceDiagram
participant Supervisor as Pipeline/Supervisor
participant HW as HeartbeatWorker
participant PRC as PgReplicationClient
participant DB as PostgreSQL (Primary)
participant Met as Metrics
Supervisor->>HW: new(...)+start()
activate HW
HW->>HW: run() loop
HW->>Met: increment connection attempts
HW->>PRC: connect_regular(primary_config)
PRC->>DB: establish connection (TLS/NoTLS)
DB-->>PRC: connection ready
PRC-->>HW: Client
loop every interval
HW->>HW: check shutdown
HW->>DB: SELECT pg_logical_emit_message(false,'etl_heartbeat','')
DB-->>HW: success / error
alt success
HW->>Met: record emission, update last_emission_timestamp, reset consecutive failures
else error
HW->>Met: record failure, increment consecutive failures
HW->>HW: calculate_backoff() -> wait_with_shutdown(backoff + jitter)
end
end
HW->>HW: shutdown tasks and zero consecutive failures metric
deactivate HW
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
📜 Recent review detailsConfiguration used: Repository UI Review profile: ASSERTIVE Plan: Pro 📒 Files selected for processing (2)
🧰 Additional context used📓 Path-based instructions (1)**/*.rs📄 CodeRabbit inference engine (AGENTS.md)
Files:
🧠 Learnings (2)📓 Common learnings📚 Learning: 2026-01-13T07:51:54.159ZApplied to files:
🧬 Code graph analysis (1)etl-config/src/shared/heartbeat.rs (1)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
🔇 Additional comments (15)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
etl-config/src/shared/pipeline.rs (1)
10-12: Fix corrupted documentation comment.The doc comment appears to have a typo or corruption: "c copy should be performed.Selection rules" should likely read "Selection rules for tables participating in replication."
📝 Proposed fix
-/// c copy should be performed.Selection rules for tables participating in replication. -/// -/// Controls which tables are eligible for initial table copy and streaming. +/// Selection rules for tables participating in replication. +/// +/// Controls which tables are eligible for initial table copy and streaming.
🤖 Fix all issues with AI agents
In @etl-config/src/shared/heartbeat.rs:
- Around line 58-74: In validate() add a check that self.interval_ms > 0 and
return Err(ValidationError::InvalidFieldValue { field:
"interval_ms".to_string(), constraint: "must be > 0".to_string() }) when it's
zero (or less), so the heartbeat config rejects a zero interval and avoids
tight-loop behavior in the heartbeat worker; modify the validate method that
currently checks jitter_percent and min_backoff_ms/max_backoff_ms to include
this new interval_ms validation.
In @etl/src/replication/client.rs:
- Around line 107-129: connect_tls and connect_regular_tls duplicate TLS setup
and connection logic; extract the common code into a private helper (e.g.,
private async fn connect_with_tls(config: Config, pg_connection_config:
&PgConnectionConfig, mode: &str) -> EtlResult<Client>) that builds the
root_cert_store via Self::build_root_cert_store, constructs tls_config and
MakeRustlsConnect, calls config.connect(tls).await, spawns the connection task
that logs errors, and logs a message using the provided mode string; then have
connect_tls and connect_regular_tls call this helper passing distinct mode
values for their log messages.
In @etl/src/workers/heartbeat.rs:
- Around line 282-299: The apply_jitter function uses
SystemTime::now().subsec_nanos() as a pseudo-random source which can produce
correlated values; replace that with a proper RNG (e.g. rand::thread_rng()) to
sample a jitter factor in the range -1.0..1.0 and multiply by the computed
jitter_range; update apply_jitter to use rand::Rng (or another appropriate
randomness API) and ensure the jitter_percent logic
(heartbeat_config.jitter_percent and jitter_range calculation) remains the same;
add the rand dependency if not present and import the needed trait (rand::Rng)
so apply_jitter produces non-deterministic, uncorrelated jitter for the
heartbeat backoff.
📜 Review details
Configuration used: Repository UI
Review profile: ASSERTIVE
Plan: Pro
📒 Files selected for processing (8)
etl-config/src/shared/heartbeat.rsetl-config/src/shared/mod.rsetl-config/src/shared/pipeline.rsetl/src/error.rsetl/src/metrics.rsetl/src/replication/client.rsetl/src/workers/heartbeat.rsetl/src/workers/mod.rs
🧰 Additional context used
📓 Path-based instructions (1)
**/*.rs
📄 CodeRabbit inference engine (AGENTS.md)
**/*.rs: Usesnake_casefor files and modules
UseCamelCasefor types and traits
Usesnake_casefor functions and variables
Do not leave comments when you remove things
Document all items, public and private, using stdlib tone and precision; only use 'Panics' section when a function can panic
Link types and methods as [Type], [Type::method] in Rust documentation comments
Keep Rust documentation wording concise, correct, and punctuated; reword for clarity while preserving intent
Do not include code examples in Rust documentation; include private helpers for maintainers; apply documentation to modules, types, traits, impls, and functions
Normal comments in Rust should always finish with a period
Files:
etl/src/error.rsetl-config/src/shared/heartbeat.rsetl/src/workers/heartbeat.rsetl-config/src/shared/pipeline.rsetl-config/src/shared/mod.rsetl/src/workers/mod.rsetl/src/replication/client.rsetl/src/metrics.rs
🧠 Learnings (2)
📚 Learning: 2026-01-13T07:51:54.159Z
Learnt from: CR
Repo: supabase/etl PR: 0
File: AGENTS.md:0-0
Timestamp: 2026-01-13T07:51:54.159Z
Learning: Tests live per crate (`src` unit tests, `tests` integration); benches in `etl-benchmarks/benches/`
Applied to files:
etl/src/error.rs
📚 Learning: 2026-01-13T07:51:54.159Z
Learnt from: CR
Repo: supabase/etl PR: 0
File: AGENTS.md:0-0
Timestamp: 2026-01-13T07:51:54.159Z
Learning: Rust workspace with crates: `etl/` (core), `etl-api/` (HTTP API), `etl-postgres/`, `etl-destinations/`, `etl-replicator/`, `etl-config/`, `etl-telemetry/`, `etl-examples/`, `etl-benchmarks/`
Applied to files:
etl-config/src/shared/heartbeat.rsetl/src/workers/heartbeat.rsetl-config/src/shared/mod.rsetl/src/workers/mod.rsetl/src/replication/client.rsetl/src/metrics.rs
🧬 Code graph analysis (3)
etl-config/src/shared/heartbeat.rs (1)
etl-config/src/shared/pipeline.rs (2)
validate(115-137)default(47-49)
etl/src/workers/heartbeat.rs (2)
etl-config/src/shared/pipeline.rs (1)
heartbeat_config(147-149)etl/src/replication/client.rs (1)
connect_regular(45-55)
etl-config/src/shared/pipeline.rs (3)
etl-config/src/shared/heartbeat.rs (1)
validate(58-74)etl-config/src/shared/batch.rs (1)
validate(32-41)etl-config/src/shared/replicator.rs (1)
validate(38-40)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: depthfirst Bot
🔇 Additional comments (30)
etl/src/error.rs (1)
136-136: LGTM!The
HeartbeatWorkerPanicvariant follows the established naming convention and is correctly placed in the "State & Workflow Errors" section alongside the other worker panic variants (ApplyWorkerPanic,TableSyncWorkerPanic).etl-config/src/shared/mod.rs (1)
1-15: LGTM!The shift from wildcard re-exports to explicit item exports improves API surface clarity and makes the public contract more intentional. The new
HeartbeatConfigandETL_HEARTBEAT_OPTIONSexports align with the heartbeat worker feature.etl-config/src/shared/heartbeat.rs (3)
77-102: LGTM!The
Defaultimplementation correctly uses the associated constants, and the serde default functions properly delegate to them, ensuring a single source of truth for default values.
104-113: LGTM!The
ETL_HEARTBEAT_OPTIONSconstant uses compile-time concatenation and defines appropriate PostgreSQL session parameters for lightweight health-check connections with fail-fast timeouts.
115-158: LGTM!Good test coverage for defaults, options content, valid configuration, and validation edge cases. Consider adding a test for
interval_ms = 0if the validation is added per the earlier suggestion.etl-config/src/shared/pipeline.rs (4)
86-98: LGTM!The new
primary_connectionandheartbeatfields are well-documented and correctly useOptiontypes with#[serde(default)]for optional configuration. The documentation clearly explains the replica mode workflow.
113-120: LGTM!The validation correctly delegates to
HeartbeatConfig::validate()when a heartbeat configuration is present. This integrates cleanly with the existing validation chain.
138-149: LGTM!The helper methods provide clean accessors for replica mode detection and heartbeat configuration. The
heartbeat_config()method correctly falls back to defaults when not explicitly configured.
194-199: LGTM!The
PipelineConfigWithoutSecretsstruct correctly mirrors the new fields, usingPgConnectionConfigWithoutSecretsfor the primary connection (stripping secrets) while carryingHeartbeatConfigas-is since it contains no sensitive data.Also applies to: 213-214
etl/src/workers/mod.rs (1)
1-9: LGTM!The module reorganization is clean and follows Rust API design patterns.
HeartbeatWorkerHandleis properly defined and publicly exported from the heartbeat module, making it conveniently accessible at the workers module level through the re-export.etl/src/workers/heartbeat.rs (10)
1-24: LGTM!Module documentation clearly explains the purpose and use case. Imports are well-organized.
25-39: LGTM!Error enum covers the relevant failure modes with clear documentation.
41-70: LGTM!The handle implementation correctly manages the join handle lifecycle and properly maps panics to the appropriate error kind.
72-107: LGTM!The struct design cleanly separates concerns with proper dependency injection. Documentation follows the stdlib tone.
109-120: LGTM!The start method correctly consumes self and spawns the worker in a background task.
122-183: LGTM!The run loop correctly handles the lifecycle with proper shutdown checks, error handling with exponential backoff, and metric cleanup on exit.
185-217: LGTM!The connection handling properly tracks metrics and resets failure counters on successful connection.
219-242: LGTM!The heartbeat loop has proper interval handling and defensive connection state checks.
244-266: LGTM!The heartbeat emission correctly uses non-transactional WAL messages with minimal payload. The documentation clearly explains the parameter choices.
301-358: LGTM!Helper methods are well-implemented. The
wait_with_shutdowncorrectly handles both explicit shutdown signals and channel closure scenarios.etl/src/replication/client.rs (9)
1-13: LGTM!Module documentation is concise. Imports are organized and all appear to be used.
14-20: LGTM!The replication options constant provides sensible defaults with appropriate timeouts for long-running replication connections.
22-23: LGTM!Using an empty struct as a namespace for related connection methods is a reasonable pattern.
25-39: LGTM!The connect method properly handles both TLS and non-TLS paths and wraps the result in a ReplicationClient.
41-55: LGTM!The regular connection method correctly uses shorter timeouts optimized for health checks and disables replication mode.
57-81: LGTM!The config builder properly configures all connection parameters with conditional SSL and replication mode settings.
83-105: LGTM!TLS connection setup is correctly implemented with proper certificate handling. The spawned connection task logs errors appropriately, and callers can detect disconnection via
client.is_closed().
131-143: LGTM!Non-TLS connection follows the same pattern consistently.
145-160: The code snippet shown does not match the actual codebase.The review analyzes
pg_connection_config.tls.ca_certas anOptiontype, but the actual [TlsConfig] struct definespub trusted_root_certs: String(not an optional field namedca_cert). The code as presented in the review would not compile due to the field name mismatch.Likely an incorrect or invalid review comment.
etl/src/metrics.rs (1)
1-27: Module structure is good, but note thatTABLE_NAME_LABELandERROR_KIND_LABELare not used anywhere in the codebase.The metrics module follows the coding guidelines well: the module-level documentation is present, all constants have appropriate doc comments following stdlib tone and precision, and naming conventions are correct. If these label constants are intentional for future use, consider adding a comment explaining their purpose. Otherwise, consider removing them to avoid dead code.
| /// Applies jitter to a duration. | ||
| fn apply_jitter(&self, base_backoff: Duration) -> Duration { | ||
| // Simple jitter using timestamp nanoseconds as pseudo-random source. | ||
| let jitter_fraction = self.heartbeat_config.jitter_percent as f64 / 100.0; | ||
| let jitter_range = base_backoff.as_secs_f64() * jitter_fraction; | ||
|
|
||
| let nanos = SystemTime::now() | ||
| .duration_since(UNIX_EPOCH) | ||
| .unwrap_or_default() | ||
| .subsec_nanos(); | ||
|
|
||
| // subsec_nanos() returns 0..1_000_000_000, normalize to -1.0..1.0. | ||
| let normalized = (nanos as f64 / 1_000_000_000.0) * 2.0 - 1.0; | ||
| let jitter = normalized * jitter_range; | ||
|
|
||
| let jittered_secs = (base_backoff.as_secs_f64() + jitter).max(0.1); | ||
| Duration::from_secs_f64(jittered_secs) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧹 Nitpick | 🔵 Trivial
Consider using a proper random source for jitter.
Using SystemTime::now().subsec_nanos() as a pseudo-random source is deterministic and may produce correlated jitter values if multiple workers start simultaneously. For production-grade backoff, consider using rand::thread_rng() or similar.
That said, for a single heartbeat worker per pipeline, this approach is functional and avoids adding a dependency.
♻️ Optional: Use proper randomness if rand is available
+use rand::Rng;
+
fn apply_jitter(&self, base_backoff: Duration) -> Duration {
let jitter_fraction = self.heartbeat_config.jitter_percent as f64 / 100.0;
let jitter_range = base_backoff.as_secs_f64() * jitter_fraction;
- let nanos = SystemTime::now()
- .duration_since(UNIX_EPOCH)
- .unwrap_or_default()
- .subsec_nanos();
-
- // subsec_nanos() returns 0..1_000_000_000, normalize to -1.0..1.0.
- let normalized = (nanos as f64 / 1_000_000_000.0) * 2.0 - 1.0;
+ let normalized = rand::thread_rng().gen_range(-1.0..1.0);
let jitter = normalized * jitter_range;
let jittered_secs = (base_backoff.as_secs_f64() + jitter).max(0.1);
Duration::from_secs_f64(jittered_secs)
}🤖 Prompt for AI Agents
In @etl/src/workers/heartbeat.rs around lines 282 - 299, The apply_jitter
function uses SystemTime::now().subsec_nanos() as a pseudo-random source which
can produce correlated values; replace that with a proper RNG (e.g.
rand::thread_rng()) to sample a jitter factor in the range -1.0..1.0 and
multiply by the computed jitter_range; update apply_jitter to use rand::Rng (or
another appropriate randomness API) and ensure the jitter_percent logic
(heartbeat_config.jitter_percent and jitter_range calculation) remains the same;
add the rand dependency if not present and import the needed trait (rand::Rng)
so apply_jitter produces non-deterministic, uncorrelated jitter for the
heartbeat backoff.
- Add interval_ms > 0 validation to HeartbeatConfig to prevent tight loops - Extract common TLS connection logic into connect_with_tls helper - Add test for zero interval validation
|
Closing this PR in favor of a cleaner single-commit version. The new PR will incorporate all CodeRabbit feedback from #549 and this PR into one atomic commit for a cleaner git history. |
Summary
Adds
HeartbeatWorkerto maintain replication slot activity when replicating from read replicas.When using logical replication from a read replica (PostgreSQL 15+), the replication slot on the primary can become inactive during idle periods since WAL is generated on the primary but consumed from the replica. This PR adds a heartbeat mechanism that periodically emits
pg_logical_emit_message()calls to the primary, ensuring WAL flows through the replication chain.Changes
New
HeartbeatWorker(etl/src/workers/heartbeat.rs):pg_logical_emit_message(false, 'etl_heartbeat', '')WorkerHandle<()>trait properlyNew
HeartbeatConfig(etl-config/src/shared/heartbeat.rs):Pipeline integration (
etl-config/src/shared/pipeline.rs):primary_connectionandheartbeatfieldsis_replica_mode()andheartbeat_config()helper methodsPipelineConfig::validate()New connection methods (
etl/src/replication/client.rs):connect_regular()for non-replication mode connectionsbuild_root_cert_store()helper to reduce TLS code duplicationETL_HEARTBEAT_OPTIONSwith short timeouts (5s statement, 5s lock, 30s idle)Error handling (
etl/src/error.rs):HeartbeatWorkerPanicerror kindConfiguration
To enable read replica mode, configure
primary_connectioninPipelineConfig:Metrics
New metrics exposed:
etl_heartbeat_emissions_total- Total heartbeat messages emittedetl_heartbeat_failures_total- Total heartbeat failuresetl_heartbeat_consecutive_failures- Current consecutive failure countetl_heartbeat_connection_attempts_total- Connection attempt countetl_heartbeat_last_emission_timestamp- Unix timestamp of last successful emissionTest plan
cargo test -p etl)Summary by CodeRabbit
New Features
Refactor
✏️ Tip: You can customize this high-level summary in your review settings.