-
-
Notifications
You must be signed in to change notification settings - Fork 138
feat(heartbeat): add HeartbeatWorker for read replica replication slots #549
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…support - Add etl-config/src/shared/heartbeat.rs with HeartbeatConfig - Add ETL_HEARTBEAT_OPTIONS to connection.rs - Add primary_connection and heartbeat fields to PipelineConfig
…ig fields Add primary_connection and heartbeat fields to PipelineConfig in benchmark and example files to support read replica mode.
…tions Add PgReplicationClient::connect_regular() method that creates a regular PostgreSQL connection (without ReplicationMode::Logical) for operations like heartbeat emissions that don't require replication protocol.
|
@iambriccardo ready for review |
|
Thanks! I have not yet looked in detail at how replication through read replicas is working. I recall we can't do streaming of the WAL from read replicas (besides newer Postgres versions) but I may be wrong. Also, why do you need these heartbeats being sent via logical messages? Can't you send status updates with the same LSNs via the same replication connection? |
From what I've read, status updates (StandbyStatusUpdate) only communicate "I've consumed up to LSN X" from consumer to server - they don't generate new WAL entries. When streaming from a read replica during idle periods (no writes on primary), there's no new WAL flowing through the replication chain. The heartbeat approach using pg_logical_emit_message(false, 'etl_heartbeat', '') on the primary creates minimal WAL entries that:
This follows the same pattern https://sequinstream.com/docs/reference/databases#using-sequin-with-a-replica (link provided by @abhishiv in #535) - their docs state: "The only operation that Sequin will perform on the primary database is to call the function pg_logical_emit_message to ensure proper WAL checkpointing." |
I was reading more about this, thanks. From my understanding of PostgreSQL internals, even when there is no user traffic, the primary still sends periodic keepalive messages to the replica. These are triggered every Separately, am I correct that this setup only works on PostgreSQL versions that support creating logical replication slots on read replicas? My understanding is that this capability was introduced relatively recently, and older versions require slots to exist exclusively on the primary, even if the logical decoding client connects through a replica. Did you try to see how the system behaves when you don't emit logical messages? |
Re: PostgreSQL version for replica slots You're correct - logical decoding from standbys requires PostgreSQL 16+. My PR currently validates PG 15+ on the primary for pg_logical_emit_message, though that function has been available since PG 9.6. I should add documentation clarifying the PG 16+ requirement for the replica connection, or add a version check there as well. Re: Testing without heartbeats I haven't tested the behavior without emitting logical messages during extended idle periods. I based the implementation on Sequin's documented approach rather than empirical testing. I can set up a test scenario to observe the actual behavior if that would be helpful for the review. Would you like me to:
|
- Resolved merge conflicts in connection.rs, pipeline.rs, and test_utils/pipeline.rs - Added thiserror dependency to etl crate for HeartbeatError - Fixed heartbeat.rs to use correct ShutdownRx API (has_changed/changed) - Replaced rand-based jitter with timestamp-based jitter
|
Warning Rate limit exceeded@JohnCari has exceeded the limit for the number of commits that can be reviewed per hour. Please wait 8 minutes and 52 seconds before requesting another review. ⌛ How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. 📒 Files selected for processing (6)
📝 WalkthroughWalkthroughAdds heartbeat support for read-replica pipelines: new HeartbeatConfig and re-export, optional PipelineConfig fields Changes
Sequence Diagram(s)sequenceDiagram
participant Pipeline
participant HBW as HeartbeatWorker
participant Primary as PrimaryDB
participant Metrics
Pipeline->>HBW: start(primary_config, heartbeat_config)
activate HBW
HBW->>Primary: connect (ETL_HEARTBEAT_OPTIONS)
alt connected
Primary-->>HBW: connection established
HBW->>Primary: query_version()
alt version ok
loop every interval_secs
HBW->>Primary: execute_heartbeat()
alt success
HBW->>Metrics: inc(emissions), set(last_emission)
else failure
HBW->>Metrics: inc(failures), inc(consecutive_failures)
HBW->>HBW: backoff_with_jitter()
end
end
else unsupported version
HBW->>Metrics: inc(failures)
HBW->>HBW: backoff_with_jitter()
end
else connection failed
HBW->>Metrics: inc(connection_attempts)
HBW->>HBW: backoff_with_jitter()
end
deactivate HBW
Pipeline->>HBW: await completion (optional)
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Merge upstream doc comments and structure with heartbeat additions: - connection.rs: Keep upstream docs, add APP_NAME_HEARTBEAT and ETL_HEARTBEAT_OPTIONS - pipeline.rs: Keep upstream docs, add primary_connection, heartbeat fields and methods
Keep upstream's extensive documentation while adding primary_connection and heartbeat fields for read replica support.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 9
🤖 Fix all issues with AI agents
In @etl-config/src/shared/heartbeat.rs:
- Around line 43-70: Add validation to HeartbeatConfig to reject invalid values:
implement a HeartbeatConfig::validate(&self) -> Result<(), String> that checks
interval_secs > 0, initial_backoff_secs <= max_backoff_secs, and jitter_percent
<= 100, returning descriptive errors for each violation; call this validate()
after deserialization (or expose it for callers to use) so configs with
interval_secs == 0, jitter_percent > 100, or initial_backoff_secs >
max_backoff_secs are rejected early.
In @etl/src/replication/client.rs:
- Around line 204-228: Extract the duplicated TLS certificate loading and
ClientConfig construction into a shared function (e.g.,
build_tls_config(pg_connection_config: &PgConnectionConfig) ->
EtlResult<ClientConfig>) and have both connect_regular_tls and connect_tls call
it; move the CertificateDer::pem_slice_iter loop and creation of
rustls::RootCertStore +
ClientConfig::builder().with_root_certificates(...).with_no_client_auth() into
that helper, return the ClientConfig, and replace the inlined code in
connect_regular_tls and connect_tls with a call to the new helper (propagate
errors using EtlResult).
In @etl/src/test_utils/pipeline.rs:
- Around line 95-110: Add builder methods to PipelineBuilder to set the
previously added optional fields: implement with_primary_connection(self,
primary_connection: PgConnectionConfig) -> Self and with_heartbeat(self,
heartbeat: HeartbeatConfig) -> Self, add matching fields primary_connection:
Option<PgConnectionConfig> and heartbeat: Option<HeartbeatConfig> to the
PipelineBuilder struct, ensure you import HeartbeatConfig and
PgConnectionConfig, set the fields in those methods and return self so build()
will pass the configured values into PipelineConfig (which currently sets
primary_connection and heartbeat to None).
In @etl/src/workers/heartbeat.rs:
- Around line 276-291: Add unit tests covering calculate_backoff behavior,
HeartbeatError messages, and configuration edge cases: write tests for
calculate_backoff (call the function with varying attempts and a BackoffConfig
to assert base/backoff growth, max cap, and that jitterized results stay within
expected +/- jitter bounds and never exceed max_backoff), add tests that format
each HeartbeatError variant (e.g., HeartbeatError::ConnectionFailed,
HeartbeatError::InvalidConfig) using to_string() or Display and assert the
expected message substrings, and add tests for
HeartbeatConfig/parse_config/from_env edge cases (missing keys, invalid values,
boundary values) to ensure defaults and errors are handled as intended; locate
helpers by the function names calculate_backoff, BackoffConfig, HeartbeatError,
and HeartbeatConfig/parse_config to add these tests.
- Around line 25-35: The HeartbeatError enum and its variants are undocumented;
add doc comments for the enum and each variant (HeartbeatError,
ConnectionFailed, UnsupportedVersion, EmitFailed, VersionQueryFailed) describing
their purpose and when they are used, e.g., document that HeartbeatError
represents heartbeat-related failures, ConnectionFailed indicates inability to
connect to primary, UnsupportedVersion indicates PostgreSQL version below
required 15, EmitFailed indicates failure to send a heartbeat, and
VersionQueryFailed indicates failure to query the DB version; place /// doc
comments immediately above the enum and each variant.
- Around line 54-69: Public struct HeartbeatWorkerHandle and its async method
wait lack documentation; add doc comments for HeartbeatWorkerHandle (describe
its role as a handle to the heartbeat background task and its contained
join_handle) and for wait (describe behavior: awaits join_handle, returns
Result<(), HeartbeatError>, and maps task panic to HeartbeatError::EmitFailed
while logging). Place /// doc comments above the struct and above the wait
method, referencing join_handle, HeartbeatError, and the panic/log behavior so
consumers understand usage and error semantics.
- Around line 37-52: Add missing documentation comments for the private enum
ConnectionState and its helper method as_str: document what the enum represents
(the heartbeat connection lifecycle states: Disconnected, Connecting, Connected)
and what as_str returns/uses; add brief doc comments above the enum and above
the as_str impl (or its method) following project doc style so private items are
documented like public ones.
- Around line 71-96: The watch channel receiver created in HeartbeatWorker::new
is immediately dropped (let (state_tx, _) = watch::channel(...)), which is
either unnecessary or accidentally discards intended observers; fix by either
(A) removing the watch channel and replacing state_tx:
watch::Sender<ConnectionState> with a simple ConnectionState field (e.g., state)
and update update_state to mutate self.state directly, or (B) expose and store
the receiver (watch::Receiver<ConnectionState>) alongside state_tx so external
observers can subscribe; also add doc comments to the HeartbeatWorker struct and
its new method describing purpose, fields (pipeline_id, primary_config,
heartbeat_config, shutdown_rx, state_tx or state), and intended observation
behavior so code guidelines are met.
📜 Review details
Configuration used: Repository UI
Review profile: ASSERTIVE
Plan: Pro
📒 Files selected for processing (14)
etl-benchmarks/benches/table_copies.rsetl-config/src/shared/connection.rsetl-config/src/shared/heartbeat.rsetl-config/src/shared/mod.rsetl-config/src/shared/pipeline.rsetl-examples/src/main.rsetl/Cargo.tomletl/src/lib.rsetl/src/metrics.rsetl/src/pipeline.rsetl/src/replication/client.rsetl/src/test_utils/pipeline.rsetl/src/workers/heartbeat.rsetl/src/workers/mod.rs
🧰 Additional context used
📓 Path-based instructions (2)
**/*.rs
📄 CodeRabbit inference engine (AGENTS.md)
**/*.rs: Usesnake_casefor files and modules
UseCamelCasefor types and traits
Usesnake_casefor functions and variables
Do not leave comments when you remove things
Document all items, public and private, using stdlib tone and precision; only use 'Panics' section when a function can panic
Link types and methods as [Type], [Type::method] in Rust documentation comments
Keep Rust documentation wording concise, correct, and punctuated; reword for clarity while preserving intent
Do not include code examples in Rust documentation; include private helpers for maintainers; apply documentation to modules, types, traits, impls, and functions
Normal comments in Rust should always finish with a period
Files:
etl/src/workers/mod.rsetl-config/src/shared/heartbeat.rsetl/src/pipeline.rsetl/src/workers/heartbeat.rsetl-config/src/shared/connection.rsetl-config/src/shared/mod.rsetl-benchmarks/benches/table_copies.rsetl/src/replication/client.rsetl/src/lib.rsetl-examples/src/main.rsetl/src/test_utils/pipeline.rsetl-config/src/shared/pipeline.rsetl/src/metrics.rs
**/Cargo.toml
📄 CodeRabbit inference engine (AGENTS.md)
Use
kebab-casefor crate names
Files:
etl/Cargo.toml
🧠 Learnings (3)
📚 Learning: 2026-01-13T07:51:54.159Z
Learnt from: CR
Repo: supabase/etl PR: 0
File: AGENTS.md:0-0
Timestamp: 2026-01-13T07:51:54.159Z
Learning: Rust workspace with crates: `etl/` (core), `etl-api/` (HTTP API), `etl-postgres/`, `etl-destinations/`, `etl-replicator/`, `etl-config/`, `etl-telemetry/`, `etl-examples/`, `etl-benchmarks/`
Applied to files:
etl/src/workers/mod.rsetl/src/pipeline.rsetl/src/workers/heartbeat.rsetl-config/src/shared/mod.rsetl-benchmarks/benches/table_copies.rsetl/src/replication/client.rsetl/src/lib.rsetl/Cargo.tomletl/src/metrics.rs
📚 Learning: 2026-01-13T07:51:54.159Z
Learnt from: CR
Repo: supabase/etl PR: 0
File: AGENTS.md:0-0
Timestamp: 2026-01-13T07:51:54.159Z
Learning: Tests live per crate (`src` unit tests, `tests` integration); benches in `etl-benchmarks/benches/`
Applied to files:
etl/src/lib.rs
📚 Learning: 2026-01-13T07:51:54.159Z
Learnt from: CR
Repo: supabase/etl PR: 0
File: AGENTS.md:0-0
Timestamp: 2026-01-13T07:51:54.159Z
Learning: Build with `cargo build --workspace --all-targets --all-features`
Applied to files:
etl/Cargo.toml
🧬 Code graph analysis (4)
etl/src/pipeline.rs (1)
etl-config/src/shared/pipeline.rs (2)
is_replica_mode(77-79)heartbeat_config(82-84)
etl/src/workers/heartbeat.rs (2)
etl-config/src/shared/pipeline.rs (1)
heartbeat_config(82-84)etl/src/replication/client.rs (1)
connect_regular(183-188)
etl/src/test_utils/pipeline.rs (1)
etl/src/pipeline.rs (1)
new(49-59)
etl-config/src/shared/pipeline.rs (2)
etl-config/src/shared/replicator.rs (1)
validate(40-42)etl-api/src/configs/pipeline.rs (1)
default_table_error_retry_max_attempts(13-15)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: depthfirst Bot
🔇 Additional comments (39)
etl/Cargo.toml (1)
34-34: LGTM!The
thiserrordependency is correctly added using the workspace pattern, consistent with other dependencies. This enables the#[derive(Error)]macro used byHeartbeatErrorin the heartbeat worker module.etl-config/src/shared/heartbeat.rs (2)
72-81: LGTM!The
Defaultimplementation correctly delegates to the helper functions, ensuring consistency with serde defaults.
83-120: LGTM!Tests adequately cover default values, serde default behavior, and custom value parsing.
etl-config/src/shared/mod.rs (1)
5-5: LGTM!Module declaration and re-export follow the existing pattern established by other submodules.
Also applies to: 15-15
etl-config/src/shared/connection.rs (3)
21-21: LGTM!The constant follows the established naming convention for application names.
74-89: LGTM!The heartbeat connection options are well-documented and appropriately configured with short timeouts for fail-fast behavior. The 5-second statement/lock timeouts and 30-second idle timeout align with the lightweight, transient nature of heartbeat operations.
299-305: LGTM!Test validates the critical timeout and application name values for heartbeat options.
etl/src/workers/heartbeat.rs (5)
159-182: LGTM!The method correctly manages state transitions, connection metrics, and failure counter reset upon successful connection.
184-210: LGTM!Connection and version verification logic is correct. The version number parsing correctly handles PostgreSQL's
server_version_numformat. Per coding guidelines, consider adding brief doc comments to internal helpers.
212-251: LGTM!The heartbeat loop correctly uses
tokio::time::intervalwith shutdown awareness. Thepg_logical_emit_message(false, 'etl_heartbeat', '')call is appropriate:
falsefor non-transactional (immediate WAL write)- Empty message body minimizes WAL overhead
- Prefix identifies the heartbeat source
Note: The first
tick()fires immediately, which means a heartbeat is emitted right after connection—a sensible choice for promptly activating the slot.
112-116: The shutdown detection pattern is correct; no changes needed.The code properly handles shutdown signals through multiple layers:
has_changed()at line 113 catches pre-startup shutdown before the first connection attemptchanged()inheartbeat_loop()(line 220) catches shutdown during heartbeat emissionchanged()in the error backoff (line 150) catches shutdown during retry delaysOnce
heartbeat_loop()detects shutdown viachanged(), it returnsOk(()), which propagates back throughconnect_and_heartbeat()torun(), exiting the loop. The receiver is never re-marked as seen because shutdown causes an early return—there are no subsequent loop iterations after shutdown is detected. The pattern is consistent with its usage instream.rsand is sound.
23-23: Add documentation explaining why PostgreSQL 15 is required.
pg_logical_emit_messagehas been available since PostgreSQL 9.4, so the MIN_PG_VERSION requirement must be based on a different feature. Add a documentation comment to the constant explaining which functionality actually requires PostgreSQL 15.etl/src/workers/mod.rs (1)
5-5: LGTM!The heartbeat module export follows the established pattern for worker modules.
etl/src/metrics.rs (2)
18-22: LGTM!The heartbeat metric constants follow the established naming convention (
ETL_*_TOTALfor counters, descriptive names for gauges).
45-49: LGTM!Heartbeat metrics are properly registered with appropriate types and units:
- Counters for cumulative totals (emissions, failures, connection attempts)
- Gauges for point-in-time values (last timestamp, consecutive failures)
etl-config/src/shared/pipeline.rs (9)
5-8: LGTM!The imports are well-organized, bringing in the new
HeartbeatConfigandBatchConfigtypes needed for the expanded configuration.
14-30: LGTM!The
TableSyncCopyConfigenum variants are well-designed with struct-like syntax forIncludeTablesandSkipTables. Theshould_copy_tablemethod correctly handles all variants.
43-48: LGTM!The new
primary_connectionandheartbeatfields are appropriately optional with serde defaults, enabling backward compatibility. The documentation is concise and clear.
66-84: LGTM!The associated constants and helper methods are well-documented and follow Rust conventions. The
is_replica_mode()method correctly infers replica mode from the presence of a primary connection, andheartbeat_config()safely provides a default when not explicitly configured.
108-118: LGTM!The default functions properly delegate to the associated constants, maintaining a single source of truth for default values.
128-131: LGTM!Correct use of
skip_serializing_if = "Option::is_none"for optional fields in the serializable variant, keeping JSON output clean when these fields are unset.
149-163: LGTM!The
From<PipelineConfig>implementation correctly propagates the newprimary_connectionandheartbeatfields, with appropriate handling for the secrets-free connection variant.
178-184: LGTM!The test correctly verifies serialization/deserialization roundtrip for the updated
IncludeTablesstruct-like variant.
86-105: HeartbeatConfig does not have avalidate()method.HeartbeatConfig has no validate() method, so this suggestion is not applicable. If validation of heartbeat configuration is desired, a validate() method would need to be added to HeartbeatConfig first.
Likely an incorrect or invalid review comment.
etl-examples/src/main.rs (1)
184-185: LGTM!The example correctly initializes the new optional fields to
None, demonstrating standard (non-replica) mode usage.etl/src/replication/client.rs (3)
4-4: LGTM!Import correctly added for
ETL_HEARTBEAT_OPTIONSto support the new heartbeat connection functionality.
179-188: LGTM!The
connect_regularmethod is well-documented and correctly establishes a non-replication connection for heartbeat operations. Returning a rawClientrather thanPgReplicationClientis appropriate since replication features aren't needed.
190-202: LGTM!The helper correctly establishes a non-TLS regular connection using
ETL_HEARTBEAT_OPTIONS, with appropriate logging.etl/src/pipeline.rs (4)
16-16: LGTM!Import correctly added for the new heartbeat worker types.
25-33: LGTM!The
PipelineState::Startedvariant is correctly extended to track the optional heartbeat worker handle.
101-119: LGTM!The heartbeat worker initialization is correctly conditional on replica mode. The worker is started with the appropriate config and shutdown receiver for graceful termination.
148-153: Heartbeat worker errors are logged but not collected.Unlike
apply_workerandtable_sync_workers, heartbeat worker errors are only logged but not added to theerrorsvector. This means pipelinewait()can returnOk(())even if heartbeat failed. If this is intentional (heartbeat is non-critical), consider adding a comment explaining the design decision. Otherwise, consider collecting the error.// Current behavior: log only if let Err(err) = heartbeat_handle.wait().await { error!(error = %err, "heartbeat worker failed"); } // Alternative if heartbeat errors should propagate: // if let Err(err) = heartbeat_handle.wait().await { // error!(error = %err, "heartbeat worker failed"); // errors.push(err); // }etl-benchmarks/benches/table_copies.rs (2)
6-6: LGTM!Import path updated to reflect the module reorganization.
348-349: LGTM!The benchmark correctly initializes the new optional fields to
Nonefor non-replica mode testing.etl/src/lib.rs (2)
5-14: LGTM!The documentation updates clearly describe the new feature ("Read replica support: Optional heartbeat mechanism for replica mode") and the wording is concise and consistent with the existing bullet points.
41-52: LGTM!The example correctly initializes the new
primary_connectionandheartbeatfields withNone, maintaining the example as a minimal working setup while reflecting the updatedPipelineConfigstructure.etl/src/test_utils/pipeline.rs (3)
24-24: LGTM!Using
ignorefor the doctest example is appropriate since test utilities often have dependencies that aren't available in the doctest environment.
112-124: LGTM!The helper function correctly delegates to the builder with default settings.
126-159: LGTM!Both helper functions correctly use the builder pattern and the reformatting improves readability.
| #[derive(Clone, Debug, Serialize, Deserialize)] | ||
| pub struct HeartbeatConfig { | ||
| /// Interval in seconds between heartbeat emissions. | ||
| /// | ||
| /// Should be less than the `wal_sender_timeout` on the primary (default 60s) | ||
| /// to ensure the slot remains active. Default: 30 seconds. | ||
| #[serde(default = "default_interval_secs")] | ||
| pub interval_secs: u64, | ||
|
|
||
| /// Initial delay in seconds before retrying after a connection failure. | ||
| /// | ||
| /// Default: 1 second. | ||
| #[serde(default = "default_initial_backoff_secs")] | ||
| pub initial_backoff_secs: u64, | ||
|
|
||
| /// Maximum delay in seconds between reconnection attempts. | ||
| /// | ||
| /// The backoff delay will not exceed this value. Default: 60 seconds. | ||
| #[serde(default = "default_max_backoff_secs")] | ||
| pub max_backoff_secs: u64, | ||
|
|
||
| /// Percentage of jitter to add to backoff delays (0-100). | ||
| /// | ||
| /// Jitter helps prevent multiple workers from reconnecting simultaneously. | ||
| /// A value of 25 means ±25% randomization. Default: 25%. | ||
| #[serde(default = "default_jitter_percent")] | ||
| pub jitter_percent: u8, | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧹 Nitpick | 🔵 Trivial
Consider adding validation for configuration values.
The configuration lacks validation for edge cases:
interval_secs = 0would create a zero-duration interval in the heartbeat loop, causing a busy-loop.jitter_percent > 100could produce unexpected backoff values (though mathematically it would still work).initial_backoff_secs > max_backoff_secsis contradictory.
Consider adding a validation method or implementing validation during deserialization.
♻️ Suggested validation method
impl HeartbeatConfig {
/// Validates the configuration values.
///
/// Returns an error if any values are invalid.
pub fn validate(&self) -> Result<(), String> {
if self.interval_secs == 0 {
return Err("interval_secs must be greater than 0".to_string());
}
if self.initial_backoff_secs > self.max_backoff_secs {
return Err("initial_backoff_secs must not exceed max_backoff_secs".to_string());
}
if self.jitter_percent > 100 {
return Err("jitter_percent must be between 0 and 100".to_string());
}
Ok(())
}
}🤖 Prompt for AI Agents
In @etl-config/src/shared/heartbeat.rs around lines 43 - 70, Add validation to
HeartbeatConfig to reject invalid values: implement a
HeartbeatConfig::validate(&self) -> Result<(), String> that checks interval_secs
> 0, initial_backoff_secs <= max_backoff_secs, and jitter_percent <= 100,
returning descriptive errors for each violation; call this validate() after
deserialization (or expose it for callers to use) so configs with interval_secs
== 0, jitter_percent > 100, or initial_backoff_secs > max_backoff_secs are
rejected early.
| /// Establishes a regular TLS-encrypted connection to Postgres. | ||
| async fn connect_regular_tls(pg_connection_config: PgConnectionConfig) -> EtlResult<Client> { | ||
| let config: Config = pg_connection_config | ||
| .clone() | ||
| .with_db(Some(&ETL_HEARTBEAT_OPTIONS)); | ||
| // Note: NO replication_mode() call - this is a regular connection | ||
|
|
||
| let mut root_store = rustls::RootCertStore::empty(); | ||
| for cert in CertificateDer::pem_slice_iter( | ||
| pg_connection_config.tls.trusted_root_certs.as_bytes(), | ||
| ) { | ||
| let cert = cert?; | ||
| root_store.add(cert)?; | ||
| } | ||
|
|
||
| let tls_config = ClientConfig::builder() | ||
| .with_root_certificates(root_store) | ||
| .with_no_client_auth(); | ||
|
|
||
| let (client, connection) = config.connect(MakeRustlsConnect::new(tls_config)).await?; | ||
| spawn_postgres_connection::<MakeRustlsConnect>(connection); | ||
|
|
||
| info!("connected to postgres (regular mode) with tls"); | ||
| Ok(client) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧹 Nitpick | 🔵 Trivial
Consider extracting TLS setup into a shared helper.
The TLS certificate loading and ClientConfig setup (lines 211-221) duplicates the logic in connect_tls (lines 264-276). Consider extracting a helper like build_tls_config(pg_connection_config: &PgConnectionConfig) -> EtlResult<ClientConfig> to reduce duplication.
♻️ Suggested helper extraction
/// Builds a TLS client configuration from the connection config's trusted root certificates.
fn build_tls_config(pg_connection_config: &PgConnectionConfig) -> EtlResult<ClientConfig> {
let mut root_store = rustls::RootCertStore::empty();
for cert in CertificateDer::pem_slice_iter(
pg_connection_config.tls.trusted_root_certs.as_bytes(),
) {
let cert = cert?;
root_store.add(cert)?;
}
Ok(ClientConfig::builder()
.with_root_certificates(root_store)
.with_no_client_auth())
}Then both connect_regular_tls and connect_tls can call let tls_config = Self::build_tls_config(&pg_connection_config)?;.
🤖 Prompt for AI Agents
In @etl/src/replication/client.rs around lines 204 - 228, Extract the duplicated
TLS certificate loading and ClientConfig construction into a shared function
(e.g., build_tls_config(pg_connection_config: &PgConnectionConfig) ->
EtlResult<ClientConfig>) and have both connect_regular_tls and connect_tls call
it; move the CertificateDer::pem_slice_iter loop and creation of
rustls::RootCertStore +
ClientConfig::builder().with_root_certificates(...).with_no_client_auth() into
that helper, return the ClientConfig, and replace the inlined code in
connect_regular_tls and connect_tls with a call to the new helper (propagate
errors using EtlResult).
| pub fn build(self) -> Pipeline<S, D> { | ||
| let config = PipelineConfig { | ||
| id: self.pipeline_id, | ||
| publication_name: self.publication_name, | ||
| pg_connection: self.pg_connection_config, | ||
| batch: self.batch.unwrap_or(BatchConfig { | ||
| max_size: 1, | ||
| max_fill_ms: 1000, | ||
| }), | ||
| primary_connection: None, | ||
| heartbeat: None, | ||
| batch: self.batch.unwrap_or(BatchConfig { max_size: 1, max_fill_ms: 1000 }), | ||
| table_error_retry_delay_ms: self.table_error_retry_delay_ms, | ||
| table_error_retry_max_attempts: self.table_error_retry_max_attempts, | ||
| max_table_sync_workers: self.max_table_sync_workers, | ||
| table_sync_copy: self.table_sync_copy.unwrap_or_default(), | ||
| }; | ||
|
|
||
| Pipeline::new(config, self.store, self.destination) | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧹 Nitpick | 🔵 Trivial
Consider adding builder methods for primary_connection and heartbeat.
The new fields are correctly initialized to None. However, if heartbeat-related integration tests are planned (as mentioned in PR discussions about testing idle-period behavior), you may want to add builder methods like with_primary_connection() and with_heartbeat() for test flexibility.
♻️ Optional builder method additions
pub fn with_primary_connection(mut self, primary_connection: PgConnectionConfig) -> Self {
self.primary_connection = Some(primary_connection);
self
}
pub fn with_heartbeat(mut self, heartbeat: HeartbeatConfig) -> Self {
self.heartbeat = Some(heartbeat);
self
}This would require adding corresponding fields to PipelineBuilder and importing HeartbeatConfig.
🤖 Prompt for AI Agents
In @etl/src/test_utils/pipeline.rs around lines 95 - 110, Add builder methods to
PipelineBuilder to set the previously added optional fields: implement
with_primary_connection(self, primary_connection: PgConnectionConfig) -> Self
and with_heartbeat(self, heartbeat: HeartbeatConfig) -> Self, add matching
fields primary_connection: Option<PgConnectionConfig> and heartbeat:
Option<HeartbeatConfig> to the PipelineBuilder struct, ensure you import
HeartbeatConfig and PgConnectionConfig, set the fields in those methods and
return self so build() will pass the configured values into PipelineConfig
(which currently sets primary_connection and heartbeat to None).
| pub struct HeartbeatWorker { | ||
| pipeline_id: PipelineId, | ||
| primary_config: PgConnectionConfig, | ||
| heartbeat_config: HeartbeatConfig, | ||
| shutdown_rx: ShutdownRx, | ||
| state_tx: watch::Sender<ConnectionState>, | ||
| consecutive_failures: u32, | ||
| } | ||
|
|
||
| impl HeartbeatWorker { | ||
| pub fn new( | ||
| pipeline_id: PipelineId, | ||
| primary_config: PgConnectionConfig, | ||
| heartbeat_config: HeartbeatConfig, | ||
| shutdown_rx: ShutdownRx, | ||
| ) -> Self { | ||
| let (state_tx, _) = watch::channel(ConnectionState::Disconnected); | ||
| Self { | ||
| pipeline_id, | ||
| primary_config, | ||
| heartbeat_config, | ||
| shutdown_rx, | ||
| state_tx, | ||
| consecutive_failures: 0, | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧹 Nitpick | 🔵 Trivial
Unused watch channel receiver and missing documentation.
The watch channel receiver created at line 87 is immediately dropped (let (state_tx, _) = ...). If external state observation is not needed, consider simplifying to just store the state directly. If it's intended for future use, consider exposing it.
Additionally, the struct and its methods need documentation per coding guidelines.
♻️ Simplification if external observation is not needed
pub struct HeartbeatWorker {
pipeline_id: PipelineId,
primary_config: PgConnectionConfig,
heartbeat_config: HeartbeatConfig,
shutdown_rx: ShutdownRx,
- state_tx: watch::Sender<ConnectionState>,
+ state: ConnectionState,
consecutive_failures: u32,
}This would require updating update_state to modify self.state directly instead of sending through a channel.
🤖 Prompt for AI Agents
In @etl/src/workers/heartbeat.rs around lines 71 - 96, The watch channel
receiver created in HeartbeatWorker::new is immediately dropped (let (state_tx,
_) = watch::channel(...)), which is either unnecessary or accidentally discards
intended observers; fix by either (A) removing the watch channel and replacing
state_tx: watch::Sender<ConnectionState> with a simple ConnectionState field
(e.g., state) and update update_state to mutate self.state directly, or (B)
expose and store the receiver (watch::Receiver<ConnectionState>) alongside
state_tx so external observers can subscribe; also add doc comments to the
HeartbeatWorker struct and its new method describing purpose, fields
(pipeline_id, primary_config, heartbeat_config, shutdown_rx, state_tx or state),
and intended observation behavior so code guidelines are met.
| #[cfg(test)] | ||
| mod tests { | ||
| use super::*; | ||
|
|
||
| #[test] | ||
| fn test_connection_state_as_str() { | ||
| assert_eq!(ConnectionState::Disconnected.as_str(), "disconnected"); | ||
| assert_eq!(ConnectionState::Connecting.as_str(), "connecting"); | ||
| assert_eq!(ConnectionState::Connected.as_str(), "connected"); | ||
| } | ||
|
|
||
| #[test] | ||
| fn test_min_pg_version_constant() { | ||
| assert_eq!(MIN_PG_VERSION, 150000); | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧹 Nitpick | 🔵 Trivial
Consider adding more unit tests.
Current tests only cover ConnectionState::as_str() and the version constant. Consider adding tests for:
calculate_backoffbehavior (especially with the fix for the jitter bug)- Error variant messages
- Configuration edge cases
🤖 Prompt for AI Agents
In @etl/src/workers/heartbeat.rs around lines 276 - 291, Add unit tests covering
calculate_backoff behavior, HeartbeatError messages, and configuration edge
cases: write tests for calculate_backoff (call the function with varying
attempts and a BackoffConfig to assert base/backoff growth, max cap, and that
jitterized results stay within expected +/- jitter bounds and never exceed
max_backoff), add tests that format each HeartbeatError variant (e.g.,
HeartbeatError::ConnectionFailed, HeartbeatError::InvalidConfig) using
to_string() or Display and assert the expected message substrings, and add tests
for HeartbeatConfig/parse_config/from_env edge cases (missing keys, invalid
values, boundary values) to ensure defaults and errors are handled as intended;
locate helpers by the function names calculate_backoff, BackoffConfig,
HeartbeatError, and HeartbeatConfig/parse_config to add these tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In @etl/src/test_utils/pipeline.rs:
- Around line 152-153: Add builder methods to PipelineBuilder to support setting
the optional fields primary_connection and heartbeat for future replica-mode
tests: implement with_primary_connection(self, conn: PrimaryConnectionType) ->
Self and with_heartbeat(self, hb: HeartbeatType) -> Self that set
self.primary_connection and self.heartbeat respectively, and return self so
tests can opt-in to non-None values; update any PipelineBuilder::build() logic
to use those fields when constructing the Pipeline.
📜 Review details
Configuration used: Repository UI
Review profile: ASSERTIVE
Plan: Pro
📒 Files selected for processing (3)
etl-config/src/shared/connection.rsetl-config/src/shared/pipeline.rsetl/src/test_utils/pipeline.rs
🧰 Additional context used
📓 Path-based instructions (1)
**/*.rs
📄 CodeRabbit inference engine (AGENTS.md)
**/*.rs: Usesnake_casefor files and modules
UseCamelCasefor types and traits
Usesnake_casefor functions and variables
Do not leave comments when you remove things
Document all items, public and private, using stdlib tone and precision; only use 'Panics' section when a function can panic
Link types and methods as [Type], [Type::method] in Rust documentation comments
Keep Rust documentation wording concise, correct, and punctuated; reword for clarity while preserving intent
Do not include code examples in Rust documentation; include private helpers for maintainers; apply documentation to modules, types, traits, impls, and functions
Normal comments in Rust should always finish with a period
Files:
etl/src/test_utils/pipeline.rsetl-config/src/shared/pipeline.rsetl-config/src/shared/connection.rs
🧬 Code graph analysis (1)
etl-config/src/shared/pipeline.rs (2)
etl-config/src/shared/replicator.rs (1)
validate(40-42)etl-api/src/configs/pipeline.rs (1)
default_table_error_retry_max_attempts(13-15)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: depthfirst Bot
🔇 Additional comments (11)
etl-config/src/shared/connection.rs (3)
36-38: LGTM!The constant follows the established naming pattern and is properly documented.
106-122: LGTM!The heartbeat connection options are well-documented and use appropriate timeout values (5s statement/lock, 30s idle) for lightweight operations that should fail fast. The configuration follows the same pattern as other connection option statics.
451-458: LGTM!Test coverage for the key heartbeat option values.
etl-config/src/shared/pipeline.rs (7)
5-8: LGTM!Imports are correctly organized to include the new
HeartbeatConfigandBatchConfigtypes.
71-81: LGTM!The new replica-mode fields are well-documented and correctly use
#[serde(default)]for optional configurations.
99-120: LGTM!The constants and methods are well-documented.
is_replica_mode()provides a clean abstraction, andheartbeat_config()correctly falls back to defaults.
146-156: LGTM!Helper functions correctly delegate to the associated constants on
PipelineConfig.
173-193: LGTM!
PipelineConfigWithoutSecretscorrectly mirrors the new fields with appropriate serde attributes includingskip_serializing_iffor optional fields.
196-211: LGTM!The
Fromimplementation correctly maps the new fields, withprimary_connectionproperly converted to strip secrets.
122-143: HeartbeatConfig does not have a validate() method.The review comment suggests calling
heartbeat.validate()?butHeartbeatConfigdoes not implement avalidate()method. The struct contains only aDefaultimplementation and no validation logic.If validation for
HeartbeatConfigis desired (e.g., ensuringjitter_percentis within 0-100, or enforcinginitial_backoff_secs≤max_backoff_secs), it would need to be implemented first before being called inPipelineConfig::validate().Likely an incorrect or invalid review comment.
etl/src/test_utils/pipeline.rs (1)
28-28: LGTM!Using
ignoreis appropriate when doc tests require external dependencies or setup that isn't available in the test environment.
| primary_connection: None, | ||
| heartbeat: None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧹 Nitpick | 🔵 Trivial
LGTM, with a suggestion for future extensibility.
The new fields are correctly initialized to None for standard test scenarios. If replica-mode testing becomes necessary, consider adding builder methods like with_primary_connection() and with_heartbeat() to PipelineBuilder.
🤖 Prompt for AI Agents
In @etl/src/test_utils/pipeline.rs around lines 152 - 153, Add builder methods
to PipelineBuilder to support setting the optional fields primary_connection and
heartbeat for future replica-mode tests: implement with_primary_connection(self,
conn: PrimaryConnectionType) -> Self and with_heartbeat(self, hb: HeartbeatType)
-> Self that set self.primary_connection and self.heartbeat respectively, and
return self so tests can opt-in to non-None values; update any
PipelineBuilder::build() logic to use those fields when constructing the
Pipeline.
- Fix jitter bug: divide by 1_000_000_000.0 instead of u32::MAX (subsec_nanos returns 0..1B, not 0..4.3B) - Add doc comments to HeartbeatError, ConnectionState, HeartbeatWorkerHandle, and HeartbeatWorker per CodeRabbit review - Properly merge pipeline.rs with upstream doc comments
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 9
🤖 Fix all issues with AI agents
In @etl/src/workers/heartbeat.rs:
- Around line 300-303: The private method update_state currently lacks
documentation; add a Rust doc comment describing its purpose and behavior, e.g.,
that update_state sends the provided ConnectionState over the state_tx channel
and logs the change for the given pipeline_id; reference the method name
update_state, the ConnectionState type, the state_tx sender, and the debug log
so reviewers can verify the doc matches behavior and any side effects
(non-blocking send, possible ignore on failure).
- Around line 213-239: The private methods connect_to_primary and
verify_pg_version lack documentation; add Rust doc comments (///) above each
private async fn describing purpose, inputs (e.g., that connect_to_primary uses
self.primary_config and returns Result<Client, HeartbeatError> and
verify_pg_version takes &Client), expected behavior, returned Result values, and
possible HeartbeatError variants (ConnectionFailed, VersionQueryFailed,
UnsupportedVersion) so readers know side-effects and error cases; keep comments
brief and follow existing crate style for private method docs.
- Around line 188-211: The private async method connect_and_heartbeat lacks
documentation; add a Rust doc comment (///) above the connect_and_heartbeat
method describing its purpose (connect to primary, verify PG version, update
connection state, reset failure counters, and run the heartbeat loop), outline
key behaviors and possible errors (returns Result<(), HeartbeatError>), and
mention side effects such as metric updates and state transitions so readers and
tools can understand its role; ensure the doc follows project style and includes
any important notes about concurrency or retry semantics.
- Around line 241-255: The private method heartbeat_loop lacks documentation;
add a doc comment above async fn heartbeat_loop(&mut self, client: &Client)
describing its purpose (periodically emits heartbeats using
self.emit_heartbeat), its shutdown behavior (listens on self.shutdown_rx and
returns Ok(()) when signaled), and explicitly note that tokio::time::interval
fires immediately on first tick so a heartbeat is emitted immediately on
entering the loop; keep the comment concise and mention the units/source of
interval_secs from self.heartbeat_config.
- Around line 257-280: Add a Rust doc comment for the private async method
emit_heartbeat to describe its purpose (emit a Postgres logical replication
message as an ETL heartbeat), its parameters (self context including pipeline_id
and the Client), its behavior (increments ETL_HEARTBEAT_EMISSIONS_TOTAL, updates
ETL_HEARTBEAT_LAST_EMISSION_TIMESTAMP, logs debug), and its return/errors
(Result<(), HeartbeatError> and when HeartbeatError::EmitFailed may be
returned). Place the /// comment immediately above the emit_heartbeat function
declaration so it follows project guideline for documenting private methods.
- Around line 282-298: Document the private method calculate_backoff with a
brief doc comment explaining its purpose, parameters and return value (it
applies jitter to a base Duration using heartbeat_config.jitter_percent), and
update inline comments inside the function to end with periods; ensure the doc
uses /// above the fn and mentions base_backoff and that jitter is derived from
SystemTime::now().subsec_nanos() normalized to -1.0..1.0 so reviewers can
understand the jitter computation.
- Around line 132-145: Add a doc comment for the private async fn run to explain
its responsibility and lifecycle, and stop silently ignoring a RecvError from
shutdown_rx.has_changed(); replace has_changed().unwrap_or(false) with proper
handling such that Ok(true) triggers shutdown, Ok(false) continues, and Err(_)
(sender dropped) is treated as shutdown (log and return Ok(())); reference the
async fn run and the shutdown_rx.has_changed() call to locate where to update
behavior and add the doc comment.
- Line 23: Add a doc comment above the MIN_PG_VERSION constant explaining the
PostgreSQL version requirement: note that pg_logical_emit_message has existed
since PG 9.6, that logical decoding from standbys is only supported starting in
PG 16, and state that this project explicitly requires a minimum of PG 15
(represented by MIN_PG_VERSION = 15_00_00) with a short rationale (compatibility
and tested behavior) and a link or reference to the upstream docs or PR
discussion for more details.
📜 Review details
Configuration used: Repository UI
Review profile: ASSERTIVE
Plan: Pro
📒 Files selected for processing (1)
etl/src/workers/heartbeat.rs
🧰 Additional context used
📓 Path-based instructions (1)
**/*.rs
📄 CodeRabbit inference engine (AGENTS.md)
**/*.rs: Usesnake_casefor files and modules
UseCamelCasefor types and traits
Usesnake_casefor functions and variables
Do not leave comments when you remove things
Document all items, public and private, using stdlib tone and precision; only use 'Panics' section when a function can panic
Link types and methods as [Type], [Type::method] in Rust documentation comments
Keep Rust documentation wording concise, correct, and punctuated; reword for clarity while preserving intent
Do not include code examples in Rust documentation; include private helpers for maintainers; apply documentation to modules, types, traits, impls, and functions
Normal comments in Rust should always finish with a period
Files:
etl/src/workers/heartbeat.rs
🧠 Learnings (2)
📚 Learning: 2026-01-13T07:51:54.159Z
Learnt from: CR
Repo: supabase/etl PR: 0
File: AGENTS.md:0-0
Timestamp: 2026-01-13T07:51:54.159Z
Learning: Rust workspace with crates: `etl/` (core), `etl-api/` (HTTP API), `etl-postgres/`, `etl-destinations/`, `etl-replicator/`, `etl-config/`, `etl-telemetry/`, `etl-examples/`, `etl-benchmarks/`
Applied to files:
etl/src/workers/heartbeat.rs
📚 Learning: 2026-01-13T07:51:54.158Z
Learnt from: CR
Repo: supabase/etl PR: 0
File: AGENTS.md:0-0
Timestamp: 2026-01-13T07:51:54.158Z
Learning: Applies to **/*.rs : Document all items, public and private, using stdlib tone and precision; only use 'Panics' section when a function can panic
Applied to files:
etl/src/workers/heartbeat.rs
🧬 Code graph analysis (1)
etl/src/workers/heartbeat.rs (4)
etl-api/src/routes/metrics.rs (1)
metrics(13-15)etl/src/test_utils/pipeline.rs (1)
new(79-98)etl-config/src/shared/pipeline.rs (1)
heartbeat_config(118-120)etl/src/replication/client.rs (1)
connect_regular(183-188)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: depthfirst Bot
🔇 Additional comments (7)
etl/src/workers/heartbeat.rs (7)
1-22: LGTM!Module documentation and imports are well-organized and appropriate for the worker's responsibilities.
25-40: LGTM!Error enum is well-documented with clear, descriptive messages for each variant.
42-61: LGTM!ConnectionState enum is well-documented and provides a clean string representation for logging.
63-82: LGTM!The handle correctly propagates errors and handles task panics gracefully.
98-122: LGTM!Constructor is well-documented with clear argument descriptions.
124-130: LGTM!The start method cleanly spawns the background task and returns a handle.
306-321: LGTM!Unit tests appropriately validate the constant value and enum string representations.
| use crate::types::PipelineId; | ||
| use etl_config::shared::{HeartbeatConfig, PgConnectionConfig}; | ||
|
|
||
| const MIN_PG_VERSION: i32 = 15_00_00; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion | 🟠 Major
Add documentation explaining the version requirement.
Per coding guidelines, all items should be documented. This constant lacks a doc comment explaining why PG 15 is the minimum. The PR discussion notes that pg_logical_emit_message exists since PG 9.6, while logical decoding from standbys requires PG 16+. Please document the rationale for choosing PG 15.
📝 Suggested documentation
+/// Minimum PostgreSQL server version required for heartbeat functionality.
+///
+/// PostgreSQL 15+ is required because [explain reason here].
const MIN_PG_VERSION: i32 = 15_00_00;📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| const MIN_PG_VERSION: i32 = 15_00_00; | |
| /// Minimum PostgreSQL server version required for heartbeat functionality. | |
| /// | |
| /// PostgreSQL 15+ is required because [explain reason here]. | |
| const MIN_PG_VERSION: i32 = 15_00_00; |
🤖 Prompt for AI Agents
In @etl/src/workers/heartbeat.rs at line 23, Add a doc comment above the
MIN_PG_VERSION constant explaining the PostgreSQL version requirement: note that
pg_logical_emit_message has existed since PG 9.6, that logical decoding from
standbys is only supported starting in PG 16, and state that this project
explicitly requires a minimum of PG 15 (represented by MIN_PG_VERSION =
15_00_00) with a short rationale (compatibility and tested behavior) and a link
or reference to the upstream docs or PR discussion for more details.
| pub struct HeartbeatWorker { | ||
| pipeline_id: PipelineId, | ||
| primary_config: PgConnectionConfig, | ||
| heartbeat_config: HeartbeatConfig, | ||
| shutdown_rx: ShutdownRx, | ||
| state_tx: watch::Sender<ConnectionState>, | ||
| consecutive_failures: u32, | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧹 Nitpick | 🔵 Trivial
Unused watch channel receiver is discarded immediately.
The state_tx watch channel is created but the receiver is immediately discarded with _ on line 113. The sender is used to broadcast state changes, but no code subscribes to observe them. This appears to be either dead code or an incomplete feature.
Consider either:
- Removing
state_txandupdate_state()if state observation isn't needed - Exposing a
subscribe()method if external state observation is intended
Also applies to: 113-113
| async fn run(mut self) -> Result<(), HeartbeatError> { | ||
| info!( | ||
| pipeline_id = %self.pipeline_id, | ||
| interval_secs = %self.heartbeat_config.interval_secs, | ||
| "starting heartbeat worker" | ||
| ); | ||
|
|
||
| let mut current_backoff = Duration::from_secs(self.heartbeat_config.initial_backoff_secs); | ||
|
|
||
| loop { | ||
| if self.shutdown_rx.has_changed().unwrap_or(false) { | ||
| info!(pipeline_id = %self.pipeline_id, "heartbeat worker shutting down"); | ||
| return Ok(()); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion | 🟠 Major
Add doc comment and handle has_changed() error properly.
Per coding guidelines, private methods should be documented.
Additionally, has_changed().unwrap_or(false) on line 142 silently ignores RecvError (sender dropped). If the shutdown sender is dropped, the worker should shut down rather than continue indefinitely.
📝 Suggested fix
+ /// Runs the heartbeat worker's main loop.
+ ///
+ /// Continuously attempts to connect and emit heartbeats, with exponential
+ /// backoff on failures. Returns when shutdown is signaled or a fatal error occurs.
async fn run(mut self) -> Result<(), HeartbeatError> {
info!(
pipeline_id = %self.pipeline_id,
interval_secs = %self.heartbeat_config.interval_secs,
"starting heartbeat worker"
);
let mut current_backoff = Duration::from_secs(self.heartbeat_config.initial_backoff_secs);
loop {
- if self.shutdown_rx.has_changed().unwrap_or(false) {
+ // If sender is dropped (Err) or shutdown signaled (Ok(true)), exit.
+ if self.shutdown_rx.has_changed().unwrap_or(true) {
info!(pipeline_id = %self.pipeline_id, "heartbeat worker shutting down");
return Ok(());
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| async fn run(mut self) -> Result<(), HeartbeatError> { | |
| info!( | |
| pipeline_id = %self.pipeline_id, | |
| interval_secs = %self.heartbeat_config.interval_secs, | |
| "starting heartbeat worker" | |
| ); | |
| let mut current_backoff = Duration::from_secs(self.heartbeat_config.initial_backoff_secs); | |
| loop { | |
| if self.shutdown_rx.has_changed().unwrap_or(false) { | |
| info!(pipeline_id = %self.pipeline_id, "heartbeat worker shutting down"); | |
| return Ok(()); | |
| } | |
| /// Runs the heartbeat worker's main loop. | |
| /// | |
| /// Continuously attempts to connect and emit heartbeats, with exponential | |
| /// backoff on failures. Returns when shutdown is signaled or a fatal error occurs. | |
| async fn run(mut self) -> Result<(), HeartbeatError> { | |
| info!( | |
| pipeline_id = %self.pipeline_id, | |
| interval_secs = %self.heartbeat_config.interval_secs, | |
| "starting heartbeat worker" | |
| ); | |
| let mut current_backoff = Duration::from_secs(self.heartbeat_config.initial_backoff_secs); | |
| loop { | |
| // If sender is dropped (Err) or shutdown signaled (Ok(true)), exit. | |
| if self.shutdown_rx.has_changed().unwrap_or(true) { | |
| info!(pipeline_id = %self.pipeline_id, "heartbeat worker shutting down"); | |
| return Ok(()); | |
| } |
🤖 Prompt for AI Agents
In @etl/src/workers/heartbeat.rs around lines 132 - 145, Add a doc comment for
the private async fn run to explain its responsibility and lifecycle, and stop
silently ignoring a RecvError from shutdown_rx.has_changed(); replace
has_changed().unwrap_or(false) with proper handling such that Ok(true) triggers
shutdown, Ok(false) continues, and Err(_) (sender dropped) is treated as
shutdown (log and return Ok(())); reference the async fn run and the
shutdown_rx.has_changed() call to locate where to update behavior and add the
doc comment.
| async fn connect_and_heartbeat(&mut self) -> Result<(), HeartbeatError> { | ||
| self.update_state(ConnectionState::Connecting); | ||
|
|
||
| counter!( | ||
| ETL_HEARTBEAT_CONNECTION_ATTEMPTS_TOTAL, | ||
| PIPELINE_ID_LABEL => self.pipeline_id.to_string() | ||
| ).increment(1); | ||
|
|
||
| debug!(pipeline_id = %self.pipeline_id, "connecting to primary for heartbeat"); | ||
|
|
||
| let client = self.connect_to_primary().await?; | ||
| self.verify_pg_version(&client).await?; | ||
|
|
||
| self.update_state(ConnectionState::Connected); | ||
| info!(pipeline_id = %self.pipeline_id, "connected to primary, starting heartbeat loop"); | ||
|
|
||
| self.consecutive_failures = 0; | ||
| gauge!( | ||
| ETL_HEARTBEAT_CONSECUTIVE_FAILURES, | ||
| PIPELINE_ID_LABEL => self.pipeline_id.to_string() | ||
| ).set(0.0); | ||
|
|
||
| self.heartbeat_loop(&client).await | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion | 🟠 Major
Add documentation for private method.
Per coding guidelines, all items including private methods should be documented.
📝 Suggested documentation
+ /// Establishes a connection to the primary and runs the heartbeat loop.
+ ///
+ /// Verifies PostgreSQL version compatibility before starting heartbeats.
+ /// Resets the consecutive failure counter on successful connection.
async fn connect_and_heartbeat(&mut self) -> Result<(), HeartbeatError> {📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| async fn connect_and_heartbeat(&mut self) -> Result<(), HeartbeatError> { | |
| self.update_state(ConnectionState::Connecting); | |
| counter!( | |
| ETL_HEARTBEAT_CONNECTION_ATTEMPTS_TOTAL, | |
| PIPELINE_ID_LABEL => self.pipeline_id.to_string() | |
| ).increment(1); | |
| debug!(pipeline_id = %self.pipeline_id, "connecting to primary for heartbeat"); | |
| let client = self.connect_to_primary().await?; | |
| self.verify_pg_version(&client).await?; | |
| self.update_state(ConnectionState::Connected); | |
| info!(pipeline_id = %self.pipeline_id, "connected to primary, starting heartbeat loop"); | |
| self.consecutive_failures = 0; | |
| gauge!( | |
| ETL_HEARTBEAT_CONSECUTIVE_FAILURES, | |
| PIPELINE_ID_LABEL => self.pipeline_id.to_string() | |
| ).set(0.0); | |
| self.heartbeat_loop(&client).await | |
| } | |
| /// Establishes a connection to the primary and runs the heartbeat loop. | |
| /// | |
| /// Verifies PostgreSQL version compatibility before starting heartbeats. | |
| /// Resets the consecutive failure counter on successful connection. | |
| async fn connect_and_heartbeat(&mut self) -> Result<(), HeartbeatError> { | |
| self.update_state(ConnectionState::Connecting); | |
| counter!( | |
| ETL_HEARTBEAT_CONNECTION_ATTEMPTS_TOTAL, | |
| PIPELINE_ID_LABEL => self.pipeline_id.to_string() | |
| ).increment(1); | |
| debug!(pipeline_id = %self.pipeline_id, "connecting to primary for heartbeat"); | |
| let client = self.connect_to_primary().await?; | |
| self.verify_pg_version(&client).await?; | |
| self.update_state(ConnectionState::Connected); | |
| info!(pipeline_id = %self.pipeline_id, "connected to primary, starting heartbeat loop"); | |
| self.consecutive_failures = 0; | |
| gauge!( | |
| ETL_HEARTBEAT_CONSECUTIVE_FAILURES, | |
| PIPELINE_ID_LABEL => self.pipeline_id.to_string() | |
| ).set(0.0); | |
| self.heartbeat_loop(&client).await | |
| } |
🤖 Prompt for AI Agents
In @etl/src/workers/heartbeat.rs around lines 188 - 211, The private async
method connect_and_heartbeat lacks documentation; add a Rust doc comment (///)
above the connect_and_heartbeat method describing its purpose (connect to
primary, verify PG version, update connection state, reset failure counters, and
run the heartbeat loop), outline key behaviors and possible errors (returns
Result<(), HeartbeatError>), and mention side effects such as metric updates and
state transitions so readers and tools can understand its role; ensure the doc
follows project style and includes any important notes about concurrency or
retry semantics.
| async fn connect_to_primary(&self) -> Result<Client, HeartbeatError> { | ||
| PgReplicationClient::connect_regular(self.primary_config.clone()) | ||
| .await | ||
| .map_err(|e| HeartbeatError::ConnectionFailed(e.to_string())) | ||
| } | ||
|
|
||
| async fn verify_pg_version(&self, client: &Client) -> Result<(), HeartbeatError> { | ||
| let row = client | ||
| .query_one("SELECT current_setting('server_version_num')::int", &[]) | ||
| .await | ||
| .map_err(|e| HeartbeatError::VersionQueryFailed(e.to_string()))?; | ||
|
|
||
| let version: i32 = row.get(0); | ||
|
|
||
| if version < MIN_PG_VERSION { | ||
| let version_str = format!( | ||
| "{}.{}.{}", | ||
| version / 10000, | ||
| (version / 100) % 100, | ||
| version % 100 | ||
| ); | ||
| return Err(HeartbeatError::UnsupportedVersion(version_str)); | ||
| } | ||
|
|
||
| debug!(pipeline_id = %self.pipeline_id, pg_version = %version, "PostgreSQL version verified"); | ||
| Ok(()) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion | 🟠 Major
Add documentation for private methods.
Per coding guidelines, private methods should be documented.
📝 Suggested documentation
+ /// Connects to the primary database using the configured connection settings.
async fn connect_to_primary(&self) -> Result<Client, HeartbeatError> {
PgReplicationClient::connect_regular(self.primary_config.clone())
.await
.map_err(|e| HeartbeatError::ConnectionFailed(e.to_string()))
}
+ /// Verifies the PostgreSQL server version meets the minimum requirement.
+ ///
+ /// Returns an error if the server version is below [`MIN_PG_VERSION`].
async fn verify_pg_version(&self, client: &Client) -> Result<(), HeartbeatError> {📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| async fn connect_to_primary(&self) -> Result<Client, HeartbeatError> { | |
| PgReplicationClient::connect_regular(self.primary_config.clone()) | |
| .await | |
| .map_err(|e| HeartbeatError::ConnectionFailed(e.to_string())) | |
| } | |
| async fn verify_pg_version(&self, client: &Client) -> Result<(), HeartbeatError> { | |
| let row = client | |
| .query_one("SELECT current_setting('server_version_num')::int", &[]) | |
| .await | |
| .map_err(|e| HeartbeatError::VersionQueryFailed(e.to_string()))?; | |
| let version: i32 = row.get(0); | |
| if version < MIN_PG_VERSION { | |
| let version_str = format!( | |
| "{}.{}.{}", | |
| version / 10000, | |
| (version / 100) % 100, | |
| version % 100 | |
| ); | |
| return Err(HeartbeatError::UnsupportedVersion(version_str)); | |
| } | |
| debug!(pipeline_id = %self.pipeline_id, pg_version = %version, "PostgreSQL version verified"); | |
| Ok(()) | |
| } | |
| /// Connects to the primary database using the configured connection settings. | |
| async fn connect_to_primary(&self) -> Result<Client, HeartbeatError> { | |
| PgReplicationClient::connect_regular(self.primary_config.clone()) | |
| .await | |
| .map_err(|e| HeartbeatError::ConnectionFailed(e.to_string())) | |
| } | |
| /// Verifies the PostgreSQL server version meets the minimum requirement. | |
| /// | |
| /// Returns an error if the server version is below [`MIN_PG_VERSION`]. | |
| async fn verify_pg_version(&self, client: &Client) -> Result<(), HeartbeatError> { | |
| let row = client | |
| .query_one("SELECT current_setting('server_version_num')::int", &[]) | |
| .await | |
| .map_err(|e| HeartbeatError::VersionQueryFailed(e.to_string()))?; | |
| let version: i32 = row.get(0); | |
| if version < MIN_PG_VERSION { | |
| let version_str = format!( | |
| "{}.{}.{}", | |
| version / 10000, | |
| (version / 100) % 100, | |
| version % 100 | |
| ); | |
| return Err(HeartbeatError::UnsupportedVersion(version_str)); | |
| } | |
| debug!(pipeline_id = %self.pipeline_id, pg_version = %version, "PostgreSQL version verified"); | |
| Ok(()) | |
| } |
🤖 Prompt for AI Agents
In @etl/src/workers/heartbeat.rs around lines 213 - 239, The private methods
connect_to_primary and verify_pg_version lack documentation; add Rust doc
comments (///) above each private async fn describing purpose, inputs (e.g.,
that connect_to_primary uses self.primary_config and returns Result<Client,
HeartbeatError> and verify_pg_version takes &Client), expected behavior,
returned Result values, and possible HeartbeatError variants (ConnectionFailed,
VersionQueryFailed, UnsupportedVersion) so readers know side-effects and error
cases; keep comments brief and follow existing crate style for private method
docs.
| async fn heartbeat_loop(&mut self, client: &Client) -> Result<(), HeartbeatError> { | ||
| let mut heartbeat_interval = interval(Duration::from_secs(self.heartbeat_config.interval_secs)); | ||
|
|
||
| loop { | ||
| tokio::select! { | ||
| _ = heartbeat_interval.tick() => { | ||
| self.emit_heartbeat(client).await?; | ||
| } | ||
| _ = self.shutdown_rx.changed() => { | ||
| info!(pipeline_id = %self.pipeline_id, "heartbeat loop received shutdown signal"); | ||
| return Ok(()); | ||
| } | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion | 🟠 Major
Add documentation for private method.
Per coding guidelines, private methods should be documented. Note that tokio::time::interval fires immediately on the first tick, so a heartbeat is emitted immediately upon entering this loop.
📝 Suggested documentation
+ /// Runs the periodic heartbeat emission loop.
+ ///
+ /// Emits heartbeats at the configured interval until shutdown is signaled
+ /// or an emission fails.
async fn heartbeat_loop(&mut self, client: &Client) -> Result<(), HeartbeatError> {📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| async fn heartbeat_loop(&mut self, client: &Client) -> Result<(), HeartbeatError> { | |
| let mut heartbeat_interval = interval(Duration::from_secs(self.heartbeat_config.interval_secs)); | |
| loop { | |
| tokio::select! { | |
| _ = heartbeat_interval.tick() => { | |
| self.emit_heartbeat(client).await?; | |
| } | |
| _ = self.shutdown_rx.changed() => { | |
| info!(pipeline_id = %self.pipeline_id, "heartbeat loop received shutdown signal"); | |
| return Ok(()); | |
| } | |
| } | |
| } | |
| } | |
| /// Runs the periodic heartbeat emission loop. | |
| /// | |
| /// Emits heartbeats at the configured interval until shutdown is signaled | |
| /// or an emission fails. | |
| async fn heartbeat_loop(&mut self, client: &Client) -> Result<(), HeartbeatError> { | |
| let mut heartbeat_interval = interval(Duration::from_secs(self.heartbeat_config.interval_secs)); | |
| loop { | |
| tokio::select! { | |
| _ = heartbeat_interval.tick() => { | |
| self.emit_heartbeat(client).await?; | |
| } | |
| _ = self.shutdown_rx.changed() => { | |
| info!(pipeline_id = %self.pipeline_id, "heartbeat loop received shutdown signal"); | |
| return Ok(()); | |
| } | |
| } | |
| } | |
| } |
🤖 Prompt for AI Agents
In @etl/src/workers/heartbeat.rs around lines 241 - 255, The private method
heartbeat_loop lacks documentation; add a doc comment above async fn
heartbeat_loop(&mut self, client: &Client) describing its purpose (periodically
emits heartbeats using self.emit_heartbeat), its shutdown behavior (listens on
self.shutdown_rx and returns Ok(()) when signaled), and explicitly note that
tokio::time::interval fires immediately on first tick so a heartbeat is emitted
immediately on entering the loop; keep the comment concise and mention the
units/source of interval_secs from self.heartbeat_config.
| async fn emit_heartbeat(&self, client: &Client) -> Result<(), HeartbeatError> { | ||
| client | ||
| .execute("SELECT pg_logical_emit_message(false, 'etl_heartbeat', '')", &[]) | ||
| .await | ||
| .map_err(|e| HeartbeatError::EmitFailed(e.to_string()))?; | ||
|
|
||
| counter!( | ||
| ETL_HEARTBEAT_EMISSIONS_TOTAL, | ||
| PIPELINE_ID_LABEL => self.pipeline_id.to_string() | ||
| ).increment(1); | ||
|
|
||
| let timestamp = SystemTime::now() | ||
| .duration_since(UNIX_EPOCH) | ||
| .unwrap_or_default() | ||
| .as_secs_f64(); | ||
|
|
||
| gauge!( | ||
| ETL_HEARTBEAT_LAST_EMISSION_TIMESTAMP, | ||
| PIPELINE_ID_LABEL => self.pipeline_id.to_string() | ||
| ).set(timestamp); | ||
|
|
||
| debug!(pipeline_id = %self.pipeline_id, "heartbeat emitted successfully"); | ||
| Ok(()) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion | 🟠 Major
Add documentation for private method.
Per coding guidelines, private methods should be documented.
📝 Suggested documentation
+ /// Emits a heartbeat message to the primary database's WAL.
+ ///
+ /// Uses `pg_logical_emit_message` with non-transactional mode to ensure
+ /// the message is written to WAL immediately without requiring a commit.
async fn emit_heartbeat(&self, client: &Client) -> Result<(), HeartbeatError> {🤖 Prompt for AI Agents
In @etl/src/workers/heartbeat.rs around lines 257 - 280, Add a Rust doc comment
for the private async method emit_heartbeat to describe its purpose (emit a
Postgres logical replication message as an ETL heartbeat), its parameters (self
context including pipeline_id and the Client), its behavior (increments
ETL_HEARTBEAT_EMISSIONS_TOTAL, updates ETL_HEARTBEAT_LAST_EMISSION_TIMESTAMP,
logs debug), and its return/errors (Result<(), HeartbeatError> and when
HeartbeatError::EmitFailed may be returned). Place the /// comment immediately
above the emit_heartbeat function declaration so it follows project guideline
for documenting private methods.
| fn calculate_backoff(&self, base_backoff: Duration) -> Duration { | ||
| // Simple jitter using timestamp nanoseconds as pseudo-random source. | ||
| let jitter_fraction = self.heartbeat_config.jitter_percent as f64 / 100.0; | ||
| let jitter_range = base_backoff.as_secs_f64() * jitter_fraction; | ||
|
|
||
| // Use nanoseconds from current time as a simple source of variation. | ||
| let nanos = SystemTime::now() | ||
| .duration_since(UNIX_EPOCH) | ||
| .unwrap_or_default() | ||
| .subsec_nanos(); | ||
| // subsec_nanos() returns 0..1_000_000_000, normalize to -1.0..1.0 | ||
| let normalized = (nanos as f64 / 1_000_000_000.0) * 2.0 - 1.0; | ||
| let jitter = normalized * jitter_range; | ||
|
|
||
| let jittered_secs = (base_backoff.as_secs_f64() + jitter).max(0.1); | ||
| Duration::from_secs_f64(jittered_secs) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion | 🟠 Major
Add documentation and fix comment punctuation.
Per coding guidelines: private methods should be documented, and comments should end with a period.
📝 Suggested fix
+ /// Calculates backoff duration with jitter.
+ ///
+ /// Applies a random jitter within the configured percentage to prevent
+ /// thundering herd effects. The result is clamped to a minimum of 0.1 seconds.
fn calculate_backoff(&self, base_backoff: Duration) -> Duration {
// Simple jitter using timestamp nanoseconds as pseudo-random source.
let jitter_fraction = self.heartbeat_config.jitter_percent as f64 / 100.0;
let jitter_range = base_backoff.as_secs_f64() * jitter_fraction;
// Use nanoseconds from current time as a simple source of variation.
let nanos = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.subsec_nanos();
- // subsec_nanos() returns 0..1_000_000_000, normalize to -1.0..1.0
+ // subsec_nanos() returns 0..1_000_000_000, normalize to -1.0..1.0.
let normalized = (nanos as f64 / 1_000_000_000.0) * 2.0 - 1.0;📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| fn calculate_backoff(&self, base_backoff: Duration) -> Duration { | |
| // Simple jitter using timestamp nanoseconds as pseudo-random source. | |
| let jitter_fraction = self.heartbeat_config.jitter_percent as f64 / 100.0; | |
| let jitter_range = base_backoff.as_secs_f64() * jitter_fraction; | |
| // Use nanoseconds from current time as a simple source of variation. | |
| let nanos = SystemTime::now() | |
| .duration_since(UNIX_EPOCH) | |
| .unwrap_or_default() | |
| .subsec_nanos(); | |
| // subsec_nanos() returns 0..1_000_000_000, normalize to -1.0..1.0 | |
| let normalized = (nanos as f64 / 1_000_000_000.0) * 2.0 - 1.0; | |
| let jitter = normalized * jitter_range; | |
| let jittered_secs = (base_backoff.as_secs_f64() + jitter).max(0.1); | |
| Duration::from_secs_f64(jittered_secs) | |
| } | |
| /// Calculates backoff duration with jitter. | |
| /// | |
| /// Applies a random jitter within the configured percentage to prevent | |
| /// thundering herd effects. The result is clamped to a minimum of 0.1 seconds. | |
| fn calculate_backoff(&self, base_backoff: Duration) -> Duration { | |
| // Simple jitter using timestamp nanoseconds as pseudo-random source. | |
| let jitter_fraction = self.heartbeat_config.jitter_percent as f64 / 100.0; | |
| let jitter_range = base_backoff.as_secs_f64() * jitter_fraction; | |
| // Use nanoseconds from current time as a simple source of variation. | |
| let nanos = SystemTime::now() | |
| .duration_since(UNIX_EPOCH) | |
| .unwrap_or_default() | |
| .subsec_nanos(); | |
| // subsec_nanos() returns 0..1_000_000_000, normalize to -1.0..1.0. | |
| let normalized = (nanos as f64 / 1_000_000_000.0) * 2.0 - 1.0; | |
| let jitter = normalized * jitter_range; | |
| let jittered_secs = (base_backoff.as_secs_f64() + jitter).max(0.1); | |
| Duration::from_secs_f64(jittered_secs) | |
| } |
🤖 Prompt for AI Agents
In @etl/src/workers/heartbeat.rs around lines 282 - 298, Document the private
method calculate_backoff with a brief doc comment explaining its purpose,
parameters and return value (it applies jitter to a base Duration using
heartbeat_config.jitter_percent), and update inline comments inside the function
to end with periods; ensure the doc uses /// above the fn and mentions
base_backoff and that jitter is derived from SystemTime::now().subsec_nanos()
normalized to -1.0..1.0 so reviewers can understand the jitter computation.
| fn update_state(&self, state: ConnectionState) { | ||
| let _ = self.state_tx.send(state); | ||
| debug!(pipeline_id = %self.pipeline_id, state = %state.as_str(), "heartbeat worker state changed"); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion | 🟠 Major
Add documentation for private method.
Per coding guidelines, private methods should be documented.
📝 Suggested documentation
+ /// Updates the connection state and logs the transition.
fn update_state(&self, state: ConnectionState) {📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| fn update_state(&self, state: ConnectionState) { | |
| let _ = self.state_tx.send(state); | |
| debug!(pipeline_id = %self.pipeline_id, state = %state.as_str(), "heartbeat worker state changed"); | |
| } | |
| /// Updates the connection state and logs the transition. | |
| fn update_state(&self, state: ConnectionState) { | |
| let _ = self.state_tx.send(state); | |
| debug!(pipeline_id = %self.pipeline_id, state = %state.as_str(), "heartbeat worker state changed"); | |
| } |
🤖 Prompt for AI Agents
In @etl/src/workers/heartbeat.rs around lines 300 - 303, The private method
update_state currently lacks documentation; add a Rust doc comment describing
its purpose and behavior, e.g., that update_state sends the provided
ConnectionState over the state_tx channel and logs the change for the given
pipeline_id; reference the method name update_state, the ConnectionState type,
the state_tx sender, and the debug log so reviewers can verify the doc matches
behavior and any side effects (non-blocking send, possible ignore on failure).
- Merge upstream's doc comment improvements from PR supabase#541 - Keep heartbeat fields (primary_connection, heartbeat) - Keep helper methods (is_replica_mode, heartbeat_config) - Fix corrupted doc comment on TableSyncCopyConfig
- Keep upstream's removal of TlsConfig::validate() - Add ETL_HEARTBEAT_OPTIONS and APP_NAME_HEARTBEAT constant - Add heartbeat options test
Upstream changed ValidationError to use a generic InvalidFieldValue variant instead of specific error variants. Align with upstream to resolve conflict.
Keep upstream's verbose documentation while adding heartbeat functionality: - lib.rs: Add primary_connection and heartbeat to example - metrics.rs: Add heartbeat metrics constants with upstream's doc style - pipeline.rs: Add heartbeat worker support with upstream's doc style
- Add primary_connection and heartbeat to PipelineConfig in test builder - Add heartbeat module to workers/mod.rs - Keep upstream's doc comments
…amples Sync PipelineConfig usage in benchmark and example files with upstream + heartbeat fields
…ation - Fix jitter calculation bug: divide by 1_000_000_000.0 instead of u32::MAX - Add documentation to HeartbeatError, ConnectionState, HeartbeatWorkerHandle - Resolve conflicts in connection.rs: keep doc comments, remove serde attributes - Resolve conflicts in pipeline.rs: merge upstream structure with heartbeat additions - Resolve conflicts in test_utils/pipeline.rs: keep upstream doc comments Rebased onto upstream/main to resolve merge conflicts.
- Add HeartbeatWorker implementation with exponential backoff and jitter - Add heartbeat metrics constants for monitoring - Fix jitter calculation: use 1_000_000_000.0 for proper normalization
- Add primary_connection and heartbeat fields to PipelineConfig - Add is_replica_mode() and heartbeat_config() helper methods - Add ETL_HEARTBEAT_OPTIONS for heartbeat connections - Update PipelineConfigWithoutSecrets with heartbeat fields
- Add APP_NAME_HEARTBEAT constant - Add ETL_HEARTBEAT_OPTIONS with 5s statement/lock timeouts - Add test for heartbeat options
- Add HeartbeatWorkerHandle to PipelineState::Started - Start heartbeat worker when primary_connection is configured - Wait for heartbeat worker during pipeline shutdown - Update lib.rs docs with heartbeat fields
- Add HeartbeatWorkerHandle to PipelineState - Start heartbeat worker in replica mode - Wait for heartbeat worker in shutdown sequence - Add heartbeat fields to PipelineBuilder
- Add connect_regular() method for non-replication connections - Add connect_regular_tls() and connect_regular_no_tls() helpers - Use ETL_HEARTBEAT_OPTIONS for heartbeat connections
|
Closing this PR in favor of #554 which has a clean branch history rebased on current The new PR includes:
|
Summary
Implements support for replication slots on read replicas by adding a HeartbeatWorker that emits periodic messages to the primary database.
Closes #535
Changes
pg_logical_emit_message(false, 'etl_heartbeat', '')on the primary to keep replica replication slots activepg_logical_emit_message(requires v15+)Files Modified
etl-config/src/shared/
mod.rs- Export heartbeat moduleheartbeat.rs- NEW: HeartbeatConfig with serde defaultsconnection.rs- Add ETL_HEARTBEAT_OPTIONSpipeline.rs- Add primary_connection and heartbeat fields, is_replica_mode() helperetl/src/
workers/mod.rs- Export heartbeat moduleworkers/heartbeat.rs- NEW: HeartbeatWorker implementationpipeline.rs- Integrate HeartbeatWorker lifecyclemetrics.rs- Add heartbeat metrics constantslib.rs- Update doc exampletest_utils/pipeline.rs- Update PipelineBuilderetl-benchmarks/benches/table_copies.rs - Add new PipelineConfig fields
etl-examples/src/main.rs - Add new PipelineConfig fields
Test plan
Unit tests included:
Summary by CodeRabbit
New Features
Configuration
Metrics
Public API
Documentation
Tests
✏️ Tip: You can customize this high-level summary in your review settings.