Skip to content

Conversation

@JohnCari
Copy link

@JohnCari JohnCari commented Jan 13, 2026

Summary

Adds HeartbeatWorker to maintain replication slot activity when replicating from read replicas.

When using logical replication from a read replica (PostgreSQL 15+), the replication slot on the primary can become inactive during idle periods since WAL is generated on the primary but consumed from the replica. This PR adds a heartbeat mechanism that periodically emits pg_logical_emit_message() calls to the primary, ensuring WAL flows through the replication chain.

Changes

  • New HeartbeatWorker (etl/src/workers/heartbeat.rs):

    • Connects to primary database in regular (non-replication) mode
    • Emits periodic heartbeat messages via pg_logical_emit_message(false, 'etl_heartbeat', '')
    • Exponential backoff with jitter on connection failures
    • Graceful shutdown integration
    • Comprehensive metrics for monitoring
    • Implements WorkerHandle<()> trait properly
  • New HeartbeatConfig (etl-config/src/shared/heartbeat.rs):

    • Configurable interval, backoff, and jitter settings
    • Sensible defaults (30s interval, 1-60s backoff, 25% jitter)
    • Validation for jitter_percent (<=100) and backoff ordering
  • Pipeline integration (etl-config/src/shared/pipeline.rs):

    • Added primary_connection and heartbeat fields
    • is_replica_mode() and heartbeat_config() helper methods
    • Heartbeat validation in PipelineConfig::validate()
  • New connection methods (etl/src/replication/client.rs):

    • connect_regular() for non-replication mode connections
    • Extracted build_root_cert_store() helper to reduce TLS code duplication
    • Uses ETL_HEARTBEAT_OPTIONS with short timeouts (5s statement, 5s lock, 30s idle)
  • Error handling (etl/src/error.rs):

    • Added HeartbeatWorkerPanic error kind

Configuration

To enable read replica mode, configure primary_connection in PipelineConfig:

let config = PipelineConfig {
    // ... other fields ...
    primary_connection: Some(primary_pg_config),  // Connection to primary DB
    heartbeat: Some(HeartbeatConfig::default()),  // Optional custom settings
};

Metrics

New metrics exposed:

  • etl_heartbeat_emissions_total - Total heartbeat messages emitted
  • etl_heartbeat_failures_total - Total heartbeat failures
  • etl_heartbeat_consecutive_failures - Current consecutive failure count
  • etl_heartbeat_connection_attempts_total - Connection attempt count
  • etl_heartbeat_last_emission_timestamp - Unix timestamp of last successful emission

Test plan

  • Unit tests pass (cargo test -p etl)
  • Integration tests with read replica setup
  • Verify heartbeat emissions appear in primary's WAL
  • Verify graceful shutdown stops heartbeat worker
  • Verify metrics are properly exposed

Summary by CodeRabbit

  • New Features

    • Background heartbeat worker to keep primary DB replication activity alive, with configurable interval, backoff and jitter, graceful shutdown, and observability.
    • Replica-mode pipeline option to enable primary/replica connection configuration and heartbeat integration.
  • Refactor

    • Simplified replication client surface and narrowed public API exports for a cleaner integration surface.
    • Metrics and errors updated to surface heartbeat-related observability.

✏️ Tip: You can customize this high-level summary in your review settings.

Adds HeartbeatWorker to maintain replication slot activity when replicating
from read replicas (PostgreSQL 15+). The heartbeat mechanism periodically
emits pg_logical_emit_message() calls to the primary to keep the slot active.

Changes:
- New HeartbeatConfig in etl-config/src/shared/heartbeat.rs
- New HeartbeatWorker in etl/src/workers/heartbeat.rs
- Pipeline integration with primary_connection and heartbeat fields
- Regular (non-replication) connection methods in client.rs
- Heartbeat metrics constants in metrics.rs
- HeartbeatWorkerPanic error kind in error.rs
- HeartbeatWorker with WorkerHandle<()> trait implementation
- Heartbeat metrics constants
- connect_regular() methods for non-replication connections
- HeartbeatWorkerPanic error kind
@JohnCari JohnCari requested a review from a team as a code owner January 13, 2026 19:50
@coderabbitai
Copy link

coderabbitai bot commented Jan 13, 2026

📝 Walkthrough

Walkthrough

Adds a configurable heartbeat subsystem and worker to emit WAL activity for primary DB replication slots, extends pipeline config for replica mode, simplifies the replication client connection surface, updates metrics to heartbeat-focused counters, and narrows public exports from the shared config module.

Changes

Cohort / File(s) Summary
Config: heartbeat & pipeline
etl-config/src/shared/heartbeat.rs, etl-config/src/shared/pipeline.rs, etl-config/src/shared/mod.rs
New HeartbeatConfig with defaults, serde helpers, and validate(); ETL_HEARTBEAT_OPTIONS constant; PipelineConfig gains primary_connection: Option<PgConnectionConfig> and heartbeat: Option<HeartbeatConfig> plus is_replica_mode() and heartbeat_config(); module exports narrowed to selective public types.
Worker: heartbeat runtime
etl/src/workers/heartbeat.rs, etl/src/workers/mod.rs
New HeartbeatWorker, HeartbeatWorkerHandle, and HeartbeatError; worker lifecycle spawns background task, connects to primary, emits pg_logical_emit_message(...) on interval, handles graceful shutdown, exponential backoff with jitter, and updates heartbeat metrics.
Replication client refactor
etl/src/replication/client.rs
Simplified public surface: PgReplicationClient with explicit connect paths (connect, connect_regular, TLS/no-TLS helpers), centralized TLS root store builder; previous slot/publication/transaction-heavy types removed or privatized.
Metrics & Errors
etl/src/metrics.rs, etl/src/error.rs
Metrics reduced to heartbeat-focused metrics and a few labels (e.g., ETL_HEARTBEAT_EMISSIONS_TOTAL, ETL_HEARTBEAT_FAILURES_TOTAL, TABLE_NAME_LABEL, ERROR_KIND_LABEL); added ErrorKind::HeartbeatWorkerPanic.

Sequence Diagram(s)

sequenceDiagram
    participant Supervisor as Pipeline/Supervisor
    participant HW as HeartbeatWorker
    participant PRC as PgReplicationClient
    participant DB as PostgreSQL (Primary)
    participant Met as Metrics

    Supervisor->>HW: new(...)+start()
    activate HW
    HW->>HW: run() loop
    HW->>Met: increment connection attempts
    HW->>PRC: connect_regular(primary_config)
    PRC->>DB: establish connection (TLS/NoTLS)
    DB-->>PRC: connection ready
    PRC-->>HW: Client
    loop every interval
        HW->>HW: check shutdown
        HW->>DB: SELECT pg_logical_emit_message(false,'etl_heartbeat','')
        DB-->>HW: success / error
        alt success
            HW->>Met: record emission, update last_emission_timestamp, reset consecutive failures
        else error
            HW->>Met: record failure, increment consecutive failures
            HW->>HW: calculate_backoff() -> wait_with_shutdown(backoff + jitter)
        end
    end
    HW->>HW: shutdown tasks and zero consecutive failures metric
    deactivate HW
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Possibly related PRs

🚥 Pre-merge checks | ✅ 3
✅ Passed checks (3 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title 'feat(etl): add HeartbeatWorker for read replica replication slot support' accurately and clearly summarizes the main change—introducing a HeartbeatWorker component to enable heartbeat emissions for read replica replication slots. It is concise, specific, and directly reflects the primary objective of the PR.
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
  • 📝 Generate docstrings

📜 Recent review details

Configuration used: Repository UI

Review profile: ASSERTIVE

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 3b51106 and 9a6c0c0.

📒 Files selected for processing (2)
  • etl-config/src/shared/heartbeat.rs
  • etl/src/replication/client.rs
🧰 Additional context used
📓 Path-based instructions (1)
**/*.rs

📄 CodeRabbit inference engine (AGENTS.md)

**/*.rs: Use snake_case for files and modules
Use CamelCase for types and traits
Use snake_case for functions and variables
Do not leave comments when you remove things
Document all items, public and private, using stdlib tone and precision; only use 'Panics' section when a function can panic
Link types and methods as [Type], [Type::method] in Rust documentation comments
Keep Rust documentation wording concise, correct, and punctuated; reword for clarity while preserving intent
Do not include code examples in Rust documentation; include private helpers for maintainers; apply documentation to modules, types, traits, impls, and functions
Normal comments in Rust should always finish with a period

Files:

  • etl-config/src/shared/heartbeat.rs
  • etl/src/replication/client.rs
🧠 Learnings (2)
📓 Common learnings
Learnt from: CR
Repo: supabase/etl PR: 0
File: AGENTS.md:0-0
Timestamp: 2026-01-13T07:51:54.159Z
Learning: Rust workspace with crates: `etl/` (core), `etl-api/` (HTTP API), `etl-postgres/`, `etl-destinations/`, `etl-replicator/`, `etl-config/`, `etl-telemetry/`, `etl-examples/`, `etl-benchmarks/`
📚 Learning: 2026-01-13T07:51:54.159Z
Learnt from: CR
Repo: supabase/etl PR: 0
File: AGENTS.md:0-0
Timestamp: 2026-01-13T07:51:54.159Z
Learning: Rust workspace with crates: `etl/` (core), `etl-api/` (HTTP API), `etl-postgres/`, `etl-destinations/`, `etl-replicator/`, `etl-config/`, `etl-telemetry/`, `etl-examples/`, `etl-benchmarks/`

Applied to files:

  • etl-config/src/shared/heartbeat.rs
  • etl/src/replication/client.rs
🧬 Code graph analysis (1)
etl-config/src/shared/heartbeat.rs (1)
etl-config/src/shared/pipeline.rs (1)
  • validate (115-137)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: depthfirst Bot
🔇 Additional comments (15)
etl-config/src/shared/heartbeat.rs (7)

1-7: LGTM!

Module documentation is concise and imports are appropriately scoped with correct conditional compilation for the utoipa feature.


8-40: LGTM!

Struct documentation is clear, field types are appropriate, and serde defaults are correctly configured. Using u8 for jitter_percent with validation is a reasonable choice.


42-82: LGTM!

Validation logic is correct and covers all constraints mentioned in the documentation. The method signature aligns with the PipelineConfig.validate() integration shown in the relevant code snippets.


84-93: LGTM!

Default implementation correctly uses the associated constants, ensuring consistency with documented defaults.


95-109: LGTM!

The helper functions are necessary for serde's default attribute and correctly delegate to the associated constants.


111-120: LGTM!

Connection options are well-suited for heartbeat health checks with appropriate short timeouts. The application_name aids debugging via pg_stat_activity.


122-174: LGTM!

Test coverage is comprehensive, covering all validation branches and default value assertions.

etl/src/replication/client.rs (8)

1-21: LGTM!

Imports are well-organized. The replication options appropriately use longer timeouts (60s) compared to heartbeat options (5s) for more involved replication operations.


22-23: LGTM!

Unit struct as a namespace for connection methods is a clean design pattern.


25-39: LGTM!

Clean implementation with explicit TLS branching and proper replication mode configuration.


41-55: LGTM!

Appropriate for non-replication queries with shorter timeouts. Clear separation from replication mode.


57-81: LGTM!

Config builder correctly sets all connection parameters with appropriate conditional logic for replication mode and TLS.


83-106: LGTM!

TLS connection flow is correct. The spawned connection monitor appropriately logs errors without blocking the caller. Connection errors will surface to the caller during actual usage.


108-120: LGTM!

Clean implementation mirroring the TLS variant with appropriate logging.


122-137: The code snippet in this review comment does not match the actual codebase.

The review references pg_connection_config.tls.ca_cert, but TlsConfig has a field trusted_root_certs: String, not ca_cert. The field is a non-optional String, not Option<String>. This means the entire concern about empty root certificate stores when ca_cert is None is based on code that doesn't exist in the repository and would not compile.

Verify that the correct code snippet is being reviewed.

Likely an incorrect or invalid review comment.


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
etl-config/src/shared/pipeline.rs (1)

10-12: Fix corrupted documentation comment.

The doc comment appears to have a typo or corruption: "c copy should be performed.Selection rules" should likely read "Selection rules for tables participating in replication."

📝 Proposed fix
-/// c copy should be performed.Selection rules for tables participating in replication.
-///
-/// Controls which tables are eligible for initial table copy and streaming.
+/// Selection rules for tables participating in replication.
+///
+/// Controls which tables are eligible for initial table copy and streaming.
🤖 Fix all issues with AI agents
In @etl-config/src/shared/heartbeat.rs:
- Around line 58-74: In validate() add a check that self.interval_ms > 0 and
return Err(ValidationError::InvalidFieldValue { field:
"interval_ms".to_string(), constraint: "must be > 0".to_string() }) when it's
zero (or less), so the heartbeat config rejects a zero interval and avoids
tight-loop behavior in the heartbeat worker; modify the validate method that
currently checks jitter_percent and min_backoff_ms/max_backoff_ms to include
this new interval_ms validation.

In @etl/src/replication/client.rs:
- Around line 107-129: connect_tls and connect_regular_tls duplicate TLS setup
and connection logic; extract the common code into a private helper (e.g.,
private async fn connect_with_tls(config: Config, pg_connection_config:
&PgConnectionConfig, mode: &str) -> EtlResult<Client>) that builds the
root_cert_store via Self::build_root_cert_store, constructs tls_config and
MakeRustlsConnect, calls config.connect(tls).await, spawns the connection task
that logs errors, and logs a message using the provided mode string; then have
connect_tls and connect_regular_tls call this helper passing distinct mode
values for their log messages.

In @etl/src/workers/heartbeat.rs:
- Around line 282-299: The apply_jitter function uses
SystemTime::now().subsec_nanos() as a pseudo-random source which can produce
correlated values; replace that with a proper RNG (e.g. rand::thread_rng()) to
sample a jitter factor in the range -1.0..1.0 and multiply by the computed
jitter_range; update apply_jitter to use rand::Rng (or another appropriate
randomness API) and ensure the jitter_percent logic
(heartbeat_config.jitter_percent and jitter_range calculation) remains the same;
add the rand dependency if not present and import the needed trait (rand::Rng)
so apply_jitter produces non-deterministic, uncorrelated jitter for the
heartbeat backoff.
📜 Review details

Configuration used: Repository UI

Review profile: ASSERTIVE

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between f238781 and 3b51106.

📒 Files selected for processing (8)
  • etl-config/src/shared/heartbeat.rs
  • etl-config/src/shared/mod.rs
  • etl-config/src/shared/pipeline.rs
  • etl/src/error.rs
  • etl/src/metrics.rs
  • etl/src/replication/client.rs
  • etl/src/workers/heartbeat.rs
  • etl/src/workers/mod.rs
🧰 Additional context used
📓 Path-based instructions (1)
**/*.rs

📄 CodeRabbit inference engine (AGENTS.md)

**/*.rs: Use snake_case for files and modules
Use CamelCase for types and traits
Use snake_case for functions and variables
Do not leave comments when you remove things
Document all items, public and private, using stdlib tone and precision; only use 'Panics' section when a function can panic
Link types and methods as [Type], [Type::method] in Rust documentation comments
Keep Rust documentation wording concise, correct, and punctuated; reword for clarity while preserving intent
Do not include code examples in Rust documentation; include private helpers for maintainers; apply documentation to modules, types, traits, impls, and functions
Normal comments in Rust should always finish with a period

Files:

  • etl/src/error.rs
  • etl-config/src/shared/heartbeat.rs
  • etl/src/workers/heartbeat.rs
  • etl-config/src/shared/pipeline.rs
  • etl-config/src/shared/mod.rs
  • etl/src/workers/mod.rs
  • etl/src/replication/client.rs
  • etl/src/metrics.rs
🧠 Learnings (2)
📚 Learning: 2026-01-13T07:51:54.159Z
Learnt from: CR
Repo: supabase/etl PR: 0
File: AGENTS.md:0-0
Timestamp: 2026-01-13T07:51:54.159Z
Learning: Tests live per crate (`src` unit tests, `tests` integration); benches in `etl-benchmarks/benches/`

Applied to files:

  • etl/src/error.rs
📚 Learning: 2026-01-13T07:51:54.159Z
Learnt from: CR
Repo: supabase/etl PR: 0
File: AGENTS.md:0-0
Timestamp: 2026-01-13T07:51:54.159Z
Learning: Rust workspace with crates: `etl/` (core), `etl-api/` (HTTP API), `etl-postgres/`, `etl-destinations/`, `etl-replicator/`, `etl-config/`, `etl-telemetry/`, `etl-examples/`, `etl-benchmarks/`

Applied to files:

  • etl-config/src/shared/heartbeat.rs
  • etl/src/workers/heartbeat.rs
  • etl-config/src/shared/mod.rs
  • etl/src/workers/mod.rs
  • etl/src/replication/client.rs
  • etl/src/metrics.rs
🧬 Code graph analysis (3)
etl-config/src/shared/heartbeat.rs (1)
etl-config/src/shared/pipeline.rs (2)
  • validate (115-137)
  • default (47-49)
etl/src/workers/heartbeat.rs (2)
etl-config/src/shared/pipeline.rs (1)
  • heartbeat_config (147-149)
etl/src/replication/client.rs (1)
  • connect_regular (45-55)
etl-config/src/shared/pipeline.rs (3)
etl-config/src/shared/heartbeat.rs (1)
  • validate (58-74)
etl-config/src/shared/batch.rs (1)
  • validate (32-41)
etl-config/src/shared/replicator.rs (1)
  • validate (38-40)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: depthfirst Bot
🔇 Additional comments (30)
etl/src/error.rs (1)

136-136: LGTM!

The HeartbeatWorkerPanic variant follows the established naming convention and is correctly placed in the "State & Workflow Errors" section alongside the other worker panic variants (ApplyWorkerPanic, TableSyncWorkerPanic).

etl-config/src/shared/mod.rs (1)

1-15: LGTM!

The shift from wildcard re-exports to explicit item exports improves API surface clarity and makes the public contract more intentional. The new HeartbeatConfig and ETL_HEARTBEAT_OPTIONS exports align with the heartbeat worker feature.

etl-config/src/shared/heartbeat.rs (3)

77-102: LGTM!

The Default implementation correctly uses the associated constants, and the serde default functions properly delegate to them, ensuring a single source of truth for default values.


104-113: LGTM!

The ETL_HEARTBEAT_OPTIONS constant uses compile-time concatenation and defines appropriate PostgreSQL session parameters for lightweight health-check connections with fail-fast timeouts.


115-158: LGTM!

Good test coverage for defaults, options content, valid configuration, and validation edge cases. Consider adding a test for interval_ms = 0 if the validation is added per the earlier suggestion.

etl-config/src/shared/pipeline.rs (4)

86-98: LGTM!

The new primary_connection and heartbeat fields are well-documented and correctly use Option types with #[serde(default)] for optional configuration. The documentation clearly explains the replica mode workflow.


113-120: LGTM!

The validation correctly delegates to HeartbeatConfig::validate() when a heartbeat configuration is present. This integrates cleanly with the existing validation chain.


138-149: LGTM!

The helper methods provide clean accessors for replica mode detection and heartbeat configuration. The heartbeat_config() method correctly falls back to defaults when not explicitly configured.


194-199: LGTM!

The PipelineConfigWithoutSecrets struct correctly mirrors the new fields, using PgConnectionConfigWithoutSecrets for the primary connection (stripping secrets) while carrying HeartbeatConfig as-is since it contains no sensitive data.

Also applies to: 213-214

etl/src/workers/mod.rs (1)

1-9: LGTM!

The module reorganization is clean and follows Rust API design patterns. HeartbeatWorkerHandle is properly defined and publicly exported from the heartbeat module, making it conveniently accessible at the workers module level through the re-export.

etl/src/workers/heartbeat.rs (10)

1-24: LGTM!

Module documentation clearly explains the purpose and use case. Imports are well-organized.


25-39: LGTM!

Error enum covers the relevant failure modes with clear documentation.


41-70: LGTM!

The handle implementation correctly manages the join handle lifecycle and properly maps panics to the appropriate error kind.


72-107: LGTM!

The struct design cleanly separates concerns with proper dependency injection. Documentation follows the stdlib tone.


109-120: LGTM!

The start method correctly consumes self and spawns the worker in a background task.


122-183: LGTM!

The run loop correctly handles the lifecycle with proper shutdown checks, error handling with exponential backoff, and metric cleanup on exit.


185-217: LGTM!

The connection handling properly tracks metrics and resets failure counters on successful connection.


219-242: LGTM!

The heartbeat loop has proper interval handling and defensive connection state checks.


244-266: LGTM!

The heartbeat emission correctly uses non-transactional WAL messages with minimal payload. The documentation clearly explains the parameter choices.


301-358: LGTM!

Helper methods are well-implemented. The wait_with_shutdown correctly handles both explicit shutdown signals and channel closure scenarios.

etl/src/replication/client.rs (9)

1-13: LGTM!

Module documentation is concise. Imports are organized and all appear to be used.


14-20: LGTM!

The replication options constant provides sensible defaults with appropriate timeouts for long-running replication connections.


22-23: LGTM!

Using an empty struct as a namespace for related connection methods is a reasonable pattern.


25-39: LGTM!

The connect method properly handles both TLS and non-TLS paths and wraps the result in a ReplicationClient.


41-55: LGTM!

The regular connection method correctly uses shorter timeouts optimized for health checks and disables replication mode.


57-81: LGTM!

The config builder properly configures all connection parameters with conditional SSL and replication mode settings.


83-105: LGTM!

TLS connection setup is correctly implemented with proper certificate handling. The spawned connection task logs errors appropriately, and callers can detect disconnection via client.is_closed().


131-143: LGTM!

Non-TLS connection follows the same pattern consistently.


145-160: The code snippet shown does not match the actual codebase.

The review analyzes pg_connection_config.tls.ca_cert as an Option type, but the actual [TlsConfig] struct defines pub trusted_root_certs: String (not an optional field named ca_cert). The code as presented in the review would not compile due to the field name mismatch.

Likely an incorrect or invalid review comment.

etl/src/metrics.rs (1)

1-27: Module structure is good, but note that TABLE_NAME_LABEL and ERROR_KIND_LABEL are not used anywhere in the codebase.

The metrics module follows the coding guidelines well: the module-level documentation is present, all constants have appropriate doc comments following stdlib tone and precision, and naming conventions are correct. If these label constants are intentional for future use, consider adding a comment explaining their purpose. Otherwise, consider removing them to avoid dead code.

Comment on lines +282 to +299
/// Applies jitter to a duration.
fn apply_jitter(&self, base_backoff: Duration) -> Duration {
// Simple jitter using timestamp nanoseconds as pseudo-random source.
let jitter_fraction = self.heartbeat_config.jitter_percent as f64 / 100.0;
let jitter_range = base_backoff.as_secs_f64() * jitter_fraction;

let nanos = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.subsec_nanos();

// subsec_nanos() returns 0..1_000_000_000, normalize to -1.0..1.0.
let normalized = (nanos as f64 / 1_000_000_000.0) * 2.0 - 1.0;
let jitter = normalized * jitter_range;

let jittered_secs = (base_backoff.as_secs_f64() + jitter).max(0.1);
Duration::from_secs_f64(jittered_secs)
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

Consider using a proper random source for jitter.

Using SystemTime::now().subsec_nanos() as a pseudo-random source is deterministic and may produce correlated jitter values if multiple workers start simultaneously. For production-grade backoff, consider using rand::thread_rng() or similar.

That said, for a single heartbeat worker per pipeline, this approach is functional and avoids adding a dependency.

♻️ Optional: Use proper randomness if rand is available
+use rand::Rng;
+
 fn apply_jitter(&self, base_backoff: Duration) -> Duration {
     let jitter_fraction = self.heartbeat_config.jitter_percent as f64 / 100.0;
     let jitter_range = base_backoff.as_secs_f64() * jitter_fraction;

-    let nanos = SystemTime::now()
-        .duration_since(UNIX_EPOCH)
-        .unwrap_or_default()
-        .subsec_nanos();
-
-    // subsec_nanos() returns 0..1_000_000_000, normalize to -1.0..1.0.
-    let normalized = (nanos as f64 / 1_000_000_000.0) * 2.0 - 1.0;
+    let normalized = rand::thread_rng().gen_range(-1.0..1.0);
     let jitter = normalized * jitter_range;

     let jittered_secs = (base_backoff.as_secs_f64() + jitter).max(0.1);
     Duration::from_secs_f64(jittered_secs)
 }
🤖 Prompt for AI Agents
In @etl/src/workers/heartbeat.rs around lines 282 - 299, The apply_jitter
function uses SystemTime::now().subsec_nanos() as a pseudo-random source which
can produce correlated values; replace that with a proper RNG (e.g.
rand::thread_rng()) to sample a jitter factor in the range -1.0..1.0 and
multiply by the computed jitter_range; update apply_jitter to use rand::Rng (or
another appropriate randomness API) and ensure the jitter_percent logic
(heartbeat_config.jitter_percent and jitter_range calculation) remains the same;
add the rand dependency if not present and import the needed trait (rand::Rng)
so apply_jitter produces non-deterministic, uncorrelated jitter for the
heartbeat backoff.

- Add interval_ms > 0 validation to HeartbeatConfig to prevent tight loops
- Extract common TLS connection logic into connect_with_tls helper
- Add test for zero interval validation
@JohnCari JohnCari closed this Jan 13, 2026
@JohnCari
Copy link
Author

Closing this PR in favor of a cleaner single-commit version. The new PR will incorporate all CodeRabbit feedback from #549 and this PR into one atomic commit for a cleaner git history.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant