Skip to content

Conversation

@gfyrag
Copy link
Contributor

@gfyrag gfyrag commented Nov 27, 2025

Summary by cubic

Fixes missing insertedAt in ledger logs by backfilling existing records and adding an index to speed up log pagination. Also improves replication stability.

  • Bug Fixes

    • Populate data.transaction.insertedAt for NEW_TRANSACTION and REVERTED_TRANSACTION logs.
    • Replication: ignore “already started” on restore, add clearer debug logs, and fix exporter stop handling.
  • Migration

    • Adds migrations: 42 (logs.id index) and 43 (populate insertedAt). They run on deploy and may take time on large datasets.

Written for commit 5ea84b7. Summary will update automatically on new commits.

@gfyrag gfyrag requested a review from a team as a code owner November 27, 2025 18:06
@coderabbitai
Copy link
Contributor

coderabbitai bot commented Nov 27, 2025

Walkthrough

This pull request adds comprehensive diagnostic logging throughout the replication pipeline, updates Docker provisioning infrastructure with multi-stage builds and Alpine final image, introduces database migrations for log data repairs and indexing, and parameterizes image registry configuration.

Changes

Cohort / File(s) Summary
Replication logging enhancements
internal/replication/manager.go, internal/replication/pipeline.go, internal/replication/drivers/clickhouse/driver.go
Enhanced debug logging for pipeline lifecycle (startup/shutdown), batch fetching, exporter operations, and ClickHouse acceptance; asynchronous push path with retry logic on push errors; graceful cancellation handling.
Docker provisioner build
tools/provisioner/Dockerfile
Multi-stage Dockerfile refactor: added build-stage COPY directives for sources, dedicated WORKDIR for provisioner, go mod cache, and new Alpine 3.21 final stage with entrypoint and default help command.
Build configuration
tools/provisioner/justfile
Added parameterized registry variable (default: ghcr.io) to push-image target for flexible image registry configuration.
Database migrations
internal/storage/bucket/migrations/42-add-missing-index/up.sql, internal/storage/bucket/migrations/43-fix-missing-inserted-at-in-log-data/up.sql
New migrations: creates index on logs table id column with conditional concurrency; repairs log data by batch-updating missing insertedAt timestamps with proper format.

Estimated code review effort

🎯 2 (Simple) | ⏱️ ~12 minutes

  • Database migrations — Verify SQL logic for batch update batching (1000-row chunks) and proper transaction handling in migration 43; confirm index creation strategy with transactional flag
  • Replication pipeline async push — Review error channel handling, cancellation context flow during push retry, and stop signal precedence
  • Docker multi-stage build — Confirm layer caching strategy and Alpine image compatibility with existing provisioner binary

Poem

🐰 Logs now speak with clarity bright,
Migrations mend what slipped from sight,
Docker stages trim the load,
Registry parameters pave the road,
Async whispers guide the way—
Provisioner hops to a brighter day!

Pre-merge checks and finishing touches

✅ Passed checks (3 passed)
Check name Status Explanation
Title check ✅ Passed The title accurately describes the main bug fix in this changeset: adding missing insertedAt fields to ledger logs through backfilling and migrations.
Description check ✅ Passed The description is comprehensive and directly related to the changeset, covering bug fixes, migrations, and replication improvements that align with the actual code changes.
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.
✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch fix/missing-inserted-at

Comment @coderabbitai help to get the list of available commands and usage tips.

@gfyrag gfyrag force-pushed the fix/missing-inserted-at branch 3 times, most recently from 059deb1 to 4277ffe Compare November 27, 2025 18:08
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (2)
tools/provisioner/Dockerfile (1)

18-18: Remove duplicate syntax directive.

The syntax directive on line 18 appears to be a duplicate of line 1. Dockerfile syntax directives should typically appear once at the beginning of the file.

Apply this diff to remove the duplicate:

 RUN --mount=type=cache,target=$GOPATH go build -o provisioner
 
-# syntax=docker/dockerfile:1
-
 FROM alpine:3.21
internal/storage/bucket/migrations/42-fix-missing-inserted-at-in-log-data/up.sql (1)

8-12: Consider performance implications of the batching strategy.

The current approach uses row_number() over (order by id) with offset-based pagination. For large tables, this can be inefficient because PostgreSQL must compute row numbers for all rows on each iteration. Consider using a cursor-based approach with id > last_processed_id instead.

However, given that this is a one-time migration and the PR description indicates awareness that "migrations run on deploy and may take time on large datasets," this approach is acceptable if simpler to reason about.

📜 Review details

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

Disabled knowledge base sources:

  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between f8b2138 and 4277ffe.

⛔ Files ignored due to path filters (2)
  • internal/storage/bucket/migrations/41-add-missing-index/notes.yaml is excluded by !**/*.yaml
  • internal/storage/bucket/migrations/42-fix-missing-inserted-at-in-log-data/notes.yaml is excluded by !**/*.yaml
📒 Files selected for processing (6)
  • internal/replication/manager.go (1 hunks)
  • internal/replication/pipeline.go (3 hunks)
  • internal/storage/bucket/migrations/41-add-missing-index/up.sql (1 hunks)
  • internal/storage/bucket/migrations/42-fix-missing-inserted-at-in-log-data/up.sql (1 hunks)
  • tools/provisioner/Dockerfile (2 hunks)
  • tools/provisioner/justfile (1 hunks)
🧰 Additional context used
🧠 Learnings (4)
📓 Common learnings
Learnt from: gfyrag
Repo: formancehq/ledger PR: 892
File: internal/controller/ledger/controller_default.go:196-196
Timestamp: 2025-04-29T11:24:28.923Z
Learning: In the ledger Import function, it's critical to maintain proper log ID tracking by updating lastLogID with the current log.ID after each processed log, rather than setting it to nil. This ensures the system can properly validate the ordering of logs and prevent duplicate or out-of-order processing, which is essential for maintaining data integrity in the ledger.
📚 Learning: 2025-04-29T11:24:28.923Z
Learnt from: gfyrag
Repo: formancehq/ledger PR: 892
File: internal/controller/ledger/controller_default.go:196-196
Timestamp: 2025-04-29T11:24:28.923Z
Learning: In the ledger Import function, it's critical to maintain proper log ID tracking by updating lastLogID with the current log.ID after each processed log, rather than setting it to nil. This ensures the system can properly validate the ordering of logs and prevent duplicate or out-of-order processing, which is essential for maintaining data integrity in the ledger.

Applied to files:

  • internal/replication/pipeline.go
📚 Learning: 2025-07-19T18:35:34.260Z
Learnt from: gfyrag
Repo: formancehq/ledger PR: 1017
File: internal/storage/bucket/migrations/27-fix-invalid-pcv/up.sql:30-31
Timestamp: 2025-07-19T18:35:34.260Z
Learning: In Formance ledger migrations, SHARE ROW EXCLUSIVE locks on migration-specific temporary tables like moves_view are acceptable since these tables are dedicated for the migration. ACCESS SHARE locks on transactions table don't block writes, only schema changes.

Applied to files:

  • internal/storage/bucket/migrations/42-fix-missing-inserted-at-in-log-data/up.sql
📚 Learning: 2025-05-20T13:48:07.455Z
Learnt from: gfyrag
Repo: formancehq/ledger PR: 935
File: internal/controller/system/state_tracker.go:0-0
Timestamp: 2025-05-20T13:48:07.455Z
Learning: In the Formance ledger codebase, sequence reset queries with `select setval` don't require COALESCE around max(id) even for brand new ledgers, as the system handles this case properly.

Applied to files:

  • internal/storage/bucket/migrations/42-fix-missing-inserted-at-in-log-data/up.sql
🧬 Code graph analysis (2)
internal/replication/manager.go (1)
internal/errors.go (1)
  • ErrAlreadyStarted (75-75)
internal/replication/pipeline.go (1)
deployments/pulumi/pkg/config/config.go (1)
  • Duration (265-265)
🔇 Additional comments (6)
tools/provisioner/Dockerfile (1)

1-23: LGTM - Multi-stage build improvements.

The Dockerfile properly sets up a multi-stage build with caching for Go dependencies and produces a minimal Alpine-based final image with appropriate entrypoint and default command.

tools/provisioner/justfile (1)

5-8: LGTM - Registry parameterization.

The addition of the registry parameter with a sensible default maintains backward compatibility while allowing flexibility for different container registries.

internal/replication/manager.go (2)

220-226: Improved resilience for pipeline restoration.

The change to handle ErrAlreadyStarted gracefully during synchronization makes the system more resilient to race conditions during pipeline restoration. This aligns well with the PR objective to improve replication stability.


220-226: Perfect! I've now gathered all the necessary information to verify the error comparison implementation. Let me analyze the findings:

Verification Results:

  1. ErrAlreadyStarted Definition (internal/errors.go:75-88):

    • Type: type ErrAlreadyStarted string
    • Implements Error() method ✓
    • Implements Is(err error) bool method ✓
    • Has NewErrAlreadyStarted() constructor ✓
  2. Is() Method Implementation (lines 81-83):

    func (e ErrAlreadyStarted) Is(err error) bool {
        _, ok := err.(ErrAlreadyStarted)
        return ok
    }
  3. Usage Pattern: The code correctly uses errors.Is(err, ledger.ErrAlreadyStarted("")) which works as follows:

    • errors.Is() will invoke the Is() method on the target (ErrAlreadyStarted(""))
    • The Is() method performs a type assertion to check if the error is of type ErrAlreadyStarted
    • The empty string "" is just a sentinel value; the actual comparison only checks the type
  4. Consistency: This exact pattern is used successfully in multiple places:

    • Line 160 in controller_grpc_server.go
    • Line 21 in controllers_pipeline_start.go
    • Line 46 in controllers_pipeline_start_test.go

Verify that error comparison works correctly for ErrAlreadyStarted.

The code is correct. ErrAlreadyStarted properly implements both the Error() method and the Is(err error) bool method required for errors.Is() to work correctly. The type assertion in the Is() method ensures the comparison succeeds only when the error is of type ErrAlreadyStarted, regardless of the string value. This pattern is consistently used throughout the codebase and functions as intended.

internal/replication/pipeline.go (1)

67-67: LGTM - Enhanced observability.

The debug logging additions provide valuable visibility into pipeline execution flow without changing any behavior. These logs will help diagnose replication issues and align with the PR objective to "add clearer debug logs."

Also applies to: 73-73, 77-77, 100-100, 126-126

internal/storage/bucket/migrations/41-add-missing-index/up.sql (1)

1-1: logs.id is not a primary key and the index is not redundant.

The logs table has seq as its primary key (bigserial), not id. The table includes a composite unique index on (ledger, id) but no standalone index on the id column alone. The migration correctly adds a new index on logs(id) which provides value for queries that filter or sort by the id column independently.

@codecov
Copy link

codecov bot commented Nov 27, 2025

Codecov Report

❌ Patch coverage is 40.90909% with 26 lines in your changes missing coverage. Please review.
✅ Project coverage is 81.33%. Comparing base (f8b2138) to head (5ea84b7).

Files with missing lines Patch % Lines
internal/replication/pipeline.go 44.73% 15 Missing and 6 partials ⚠️
internal/replication/manager.go 0.00% 5 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #1167      +/-   ##
==========================================
- Coverage   82.09%   81.33%   -0.76%     
==========================================
  Files         190      190              
  Lines        9319     9356      +37     
==========================================
- Hits         7650     7610      -40     
- Misses       1216     1238      +22     
- Partials      453      508      +55     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

Copy link

@cubic-dev-ai cubic-dev-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No issues found across 8 files

@gfyrag gfyrag force-pushed the fix/missing-inserted-at branch from 4277ffe to 11aee7c Compare November 28, 2025 09:57
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (2)
internal/storage/bucket/migrations/43-fix-missing-inserted-at-in-log-data/up.sql (2)

23-23: Simplify the JSONB timestamp formatting.

The double to_jsonb wrapping is unnecessary. The expression to_jsonb(to_jsonb(date)#>>'{}' || 'Z') converts to JSON, extracts as text, concatenates 'Z', then converts back to JSON—introducing redundant transformations that reduce clarity and efficiency.

Apply this diff to simplify:

-			set data = jsonb_set(data, '{transaction, insertedAt}', to_jsonb(to_jsonb(date)#>>'{}' || 'Z'))
+			set data = jsonb_set(data, '{transaction, insertedAt}', to_jsonb(date::text || 'Z'))

This directly casts the timestamp to text, appends 'Z', and converts to JSON in a single, clear operation.


28-28: Fix the unreliable loop exit condition.

The exit when not found; after UPDATE with a CTE may not work as intended. In PostgreSQL, the FOUND variable behavior with UPDATE statements in CTE context is unpredictable, and this can cause the loop to exit prematurely or run indefinitely. This is especially risky for a data backfill migration where completeness is critical.

To reliably detect when no rows are updated, use GET DIAGNOSTICS with ROW_COUNT instead.

Apply this diff:

 do $$
 	declare
 		_offset integer := 0;
 		_batch_size integer := 1000;
+		_updated_count integer;
 	begin
 		set search_path = '{{ .Schema }}';
 
 		create temp table logs_view as
 			select row_number() over (order by id) as row_number, id, ledger
 			from logs
 			where type = 'NEW_TRANSACTION' or type = 'REVERTED_TRANSACTION';
 		create index logs_view_row_numbers on logs_view(row_number);
 
 		perform pg_notify('migrations-{{ .Schema }}', 'init: ' || (select count(*) from logs_view));
 
 		loop
 			with _rows as (
 				select id, ledger, row_number
 				from logs_view
 				where row_number >= _offset and row_number < _offset + _batch_size
 			)
 			update logs
 			set data = jsonb_set(data, '{transaction, insertedAt}', to_jsonb(date::text || 'Z'))
 			from _rows
 			where logs.id = _rows.id and
 			      logs.ledger = _rows.ledger;
 
-			exit when not found;
+			GET DIAGNOSTICS _updated_count = ROW_COUNT;
+			exit when _updated_count = 0;
 
 			_offset = _offset + _batch_size;
 
 			perform pg_notify('migrations-{{ .Schema }}', 'continue: ' || _batch_size);
 
 			commit;
 		end loop;
 
 		drop table if exists logs_view;
 	end
 $$;
🧹 Nitpick comments (3)
tools/provisioner/Dockerfile (1)

18-18: Remove duplicate syntax directive.

The syntax directive is already declared on line 1 and doesn't need to be repeated here.

Apply this diff:

-# syntax=docker/dockerfile:1
 FROM alpine:3.21
internal/replication/pipeline.go (2)

112-141: Async exporter push with retries preserves semantics; consider small refinements

The new async pattern—wrapping exporter.Accept in a goroutine with a derived exportContext, reporting via a buffered errChan, and retrying the same batch on error with PushRetryPeriod—looks sound:

  • Ordering is preserved because you don’t advance LastLogID until after this loop completes successfully.
  • stopChannel is honored even while a push is in flight, and cancel() is called in both success and stop paths.
  • The buffered errChan ensures the goroutine can exit even if the stop case wins the select; there’s no goroutine leak.

Two optional improvements you might consider:

  1. Precompute the mapped slice outside the retry loop to avoid repeated allocations on retries:
-            for {
-                p.logger.Debugf("Send data to exporter.")
-                errChan := make(chan error, 1)
-                exportContext, cancel := context.WithCancel(ctx)
-                go func() {
-                    _, err := p.exporter.Accept(exportContext, collectionutils.Map(logs.Data, func(log ledger.Log) drivers.LogWithLedger {
-                        return drivers.LogWithLedger{
-                            Log:    log,
-                            Ledger: p.pipeline.Ledger,
-                        }
-                    })...)
-                    errChan <- err
-                }()
+            mapped := collectionutils.Map(logs.Data, func(log ledger.Log) drivers.LogWithLedger {
+                return drivers.LogWithLedger{
+                    Log:    log,
+                    Ledger: p.pipeline.Ledger,
+                }
+            })
+            for {
+                p.logger.Debugf("Send data to exporter.")
+                errChan := make(chan error, 1)
+                exportContext, cancel := context.WithCancel(ctx)
+                go func() {
+                    _, err := p.exporter.Accept(exportContext, mapped...)
+                    errChan <- err
+                }()
  1. Optionally special‑case context cancellation so a context.Canceled/DeadlineExceeded from the exporter is treated as a terminal condition instead of something to retry forever (e.g., bail out if ctx.Err() != nil inside the err != nil branch).

These are non‑blocking suggestions; the current behavior is consistent and correct.


167-181: Shutdown handshake and logger enrichment are consistent and helpful

  • Shutdown now logs at info/debug, sends an error channel through p.stopChannel, and waits for it to be closed—this preserves the previous handshake semantics while giving better visibility into shutdown timing.
  • The logger enrichment with "component"="pipeline", "module"=pipeline.Ledger, and "driver"=pipeline.ExporterID will make the new debug messages much more actionable in multi‑pipeline setups.

One minor nit, if you care about log wording, is that "Shutdowning pipeline" could be tweaked to "Shutting down pipeline" for readability, but that’s purely cosmetic.

-    p.logger.Infof("Shutdowning pipeline")
+    p.logger.Infof("Shutting down pipeline")

Also applies to: 201-205

📜 Review details

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

Disabled knowledge base sources:

  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between 4277ffe and 11aee7c.

⛔ Files ignored due to path filters (3)
  • internal/storage/bucket/migrations/41-add-missing-index/notes.yaml is excluded by !**/*.yaml
  • internal/storage/bucket/migrations/42-remove-useless-trigger/notes.yaml is excluded by !**/*.yaml
  • internal/storage/bucket/migrations/43-fix-missing-inserted-at-in-log-data/notes.yaml is excluded by !**/*.yaml
📒 Files selected for processing (8)
  • cmd/worker.go (1 hunks)
  • internal/replication/drivers/clickhouse/driver.go (1 hunks)
  • internal/replication/manager.go (1 hunks)
  • internal/replication/pipeline.go (3 hunks)
  • internal/storage/bucket/migrations/41-add-missing-index/up.sql (1 hunks)
  • internal/storage/bucket/migrations/43-fix-missing-inserted-at-in-log-data/up.sql (1 hunks)
  • tools/provisioner/Dockerfile (2 hunks)
  • tools/provisioner/justfile (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
  • internal/storage/bucket/migrations/41-add-missing-index/up.sql
  • internal/replication/manager.go
🧰 Additional context used
🧠 Learnings (2)
📓 Common learnings
Learnt from: gfyrag
Repo: formancehq/ledger PR: 892
File: internal/controller/ledger/controller_default.go:196-196
Timestamp: 2025-04-29T11:24:28.923Z
Learning: In the ledger Import function, it's critical to maintain proper log ID tracking by updating lastLogID with the current log.ID after each processed log, rather than setting it to nil. This ensures the system can properly validate the ordering of logs and prevent duplicate or out-of-order processing, which is essential for maintaining data integrity in the ledger.
📚 Learning: 2025-07-19T18:35:34.260Z
Learnt from: gfyrag
Repo: formancehq/ledger PR: 1017
File: internal/storage/bucket/migrations/27-fix-invalid-pcv/up.sql:30-31
Timestamp: 2025-07-19T18:35:34.260Z
Learning: In Formance ledger migrations, SHARE ROW EXCLUSIVE locks on migration-specific temporary tables like moves_view are acceptable since these tables are dedicated for the migration. ACCESS SHARE locks on transactions table don't block writes, only schema changes.

Applied to files:

  • internal/storage/bucket/migrations/43-fix-missing-inserted-at-in-log-data/up.sql
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
  • GitHub Check: Tests
  • GitHub Check: Dirty
🔇 Additional comments (9)
internal/storage/bucket/migrations/43-fix-missing-inserted-at-in-log-data/up.sql (1)

16-35: Batch processing and progress tracking are well-implemented.

The loop structure with configurable batch size, indexed range queries, and pg_notify progress signals is solid. Separate commits per batch appropriately prevents transaction bloat on large datasets. Ensure the exit when not found; fix is applied (see separate comment) so the loop terminates reliably when all rows are processed.

tools/provisioner/justfile (1)

5-8: LGTM! Good parameterization.

The registry parameterization allows flexible image publishing while preserving the default behavior. The implementation is clean and follows best practices.

tools/provisioner/Dockerfile (3)

1-1: LGTM! Best practice for BuildKit.

The syntax directive enables modern Docker BuildKit features and is recommended for all Dockerfiles.


19-23: Alpine 3.21 is available on Docker Hub.

The verification shows that Alpine 3.21 is a released version with multiple patch levels available (3.21.5, 3.21.4, 3.21.3, 3.21.2, 3.21.1, 3.21.0). The review comment's concern that "Alpine 3.21 may not be released yet" is unfounded. The Dockerfile's use of FROM alpine:3.21 is valid and correctly references an available base image.


2-16: golang:1.25-alpine is available and stable; no version issues to address.

Verification confirms that golang:1.25-alpine exists on Docker Hub and has been stable since August 2025. The image is available in multiple patch versions (1.25.0, 1.25.1, 1.25.2, 1.25.3, 1.25.4), confirming the Dockerfile's base image reference is valid. The build process reorganization with go mod download cache optimization is sound.

cmd/worker.go (1)

82-82: Worker logs page size default bump to 1000 looks fine

Increasing the default page size to 1000 is consistent with the new pagination/indexing work and should just reduce round‑trips at the cost of somewhat larger batches; no correctness concerns here.

internal/replication/drivers/clickhouse/driver.go (1)

47-48: Accept debug logging is appropriate and side‑effect free

The new debug line cleanly exposes batch size without touching control flow or error handling; this should help correlate pipeline logs with ClickHouse inserts.

internal/replication/pipeline.go (2)

67-82: Lifecycle and fetch‑path logging changes look correct

The added “Pipeline started/terminated”, “Fetch next batch.”, and “Got %d items” logs, plus the factored stop helper, make the control flow around the main loop and stop handling clearer without changing behavior. Error handling on ListLogs still backs off using PullInterval and respects a stop signal via p.stopChannel, which is consistent with the existing design.

Also applies to: 95-106


146-160: LastLogID advancement and “has more” logging align with ordering guarantees

lastLogID := logs.Data[len(logs.Data)-1].ID (safe because of the prior len(logs.Data) == 0 guard) and updating p.pipeline.LastLogID only after a successful exporter push ensures log IDs move forward strictly after processing, matching the need to maintain correct log ordering and avoid gaps. The added debug messages (“Move last log id to…”, “Has more logs to fetch.”) will make any anomalies in log progression much easier to spot.
Based on learnings, this is in line with the importance of robust lastLogID tracking.

@gfyrag gfyrag force-pushed the fix/missing-inserted-at branch from 11aee7c to cbfe1c3 Compare November 28, 2025 10:05
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

📜 Review details

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

Disabled knowledge base sources:

  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between 11aee7c and cbfe1c3.

⛔ Files ignored due to path filters (3)
  • internal/storage/bucket/migrations/41-add-missing-index/notes.yaml is excluded by !**/*.yaml
  • internal/storage/bucket/migrations/42-remove-useless-trigger/notes.yaml is excluded by !**/*.yaml
  • internal/storage/bucket/migrations/43-fix-missing-inserted-at-in-log-data/notes.yaml is excluded by !**/*.yaml
📒 Files selected for processing (7)
  • internal/replication/drivers/clickhouse/driver.go (1 hunks)
  • internal/replication/manager.go (1 hunks)
  • internal/replication/pipeline.go (3 hunks)
  • internal/storage/bucket/migrations/41-add-missing-index/up.sql (1 hunks)
  • internal/storage/bucket/migrations/43-fix-missing-inserted-at-in-log-data/up.sql (1 hunks)
  • tools/provisioner/Dockerfile (2 hunks)
  • tools/provisioner/justfile (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (4)
  • internal/replication/manager.go
  • tools/provisioner/Dockerfile
  • internal/replication/drivers/clickhouse/driver.go
  • internal/storage/bucket/migrations/43-fix-missing-inserted-at-in-log-data/up.sql
🧰 Additional context used
🧠 Learnings (1)
📓 Common learnings
Learnt from: gfyrag
Repo: formancehq/ledger PR: 892
File: internal/controller/ledger/controller_default.go:196-196
Timestamp: 2025-04-29T11:24:28.923Z
Learning: In the ledger Import function, it's critical to maintain proper log ID tracking by updating lastLogID with the current log.ID after each processed log, rather than setting it to nil. This ensures the system can properly validate the ordering of logs and prevent duplicate or out-of-order processing, which is essential for maintaining data integrity in the ledger.
🔇 Additional comments (4)
tools/provisioner/justfile (1)

5-8: Registry parameterization looks good and remains backwards compatible

Making registry a parameter with default 'ghcr.io' cleanly preserves existing behavior while allowing overrides, and the updated tag construction is consistent with that. No issues from this change alone.

internal/storage/bucket/migrations/41-add-missing-index/up.sql (1)

1-1: LGTM!

The migration correctly uses conditional CONCURRENTLY (only valid outside transactions) and the schema templating is properly applied. This index will help with log pagination queries as intended per the PR objectives.

internal/replication/pipeline.go (2)

111-144: Well-structured async export with graceful cancellation.

The pattern correctly uses a buffered channel (size 1) to prevent goroutine leaks, and the cancellable context enables responsive shutdown when stopChannel fires. The retry loop cleanly separates success from error handling.


66-73: Good centralization of termination logic.

The stop helper consolidates logging and channel closure, improving consistency and reducing duplication across all exit points.


func (p *PipelineHandler) Shutdown(ctx context.Context) error {
p.logger.Infof("shutdowning pipeline")
p.logger.Infof("Shutdowning pipeline")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Typo: "Shutdowning" → "Shutting down"

-	p.logger.Infof("Shutdowning pipeline")
+	p.logger.Infof("Shutting down pipeline")
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
p.logger.Infof("Shutdowning pipeline")
p.logger.Infof("Shutting down pipeline")
🤖 Prompt for AI Agents
internal/replication/pipeline.go around line 167: the log message contains a
typo "Shutdowning pipeline"; update the string to "Shutting down pipeline"
(preserving the logger call and formatting) so the log reads: Shutting down
pipeline.

Also fixes stop issue of exporters.
@gfyrag gfyrag force-pushed the fix/missing-inserted-at branch from cbfe1c3 to 5ea84b7 Compare November 28, 2025 10:12
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (1)
internal/replication/pipeline.go (1)

167-167: Typo: "Shutdowning" → "Shutting down"

This typo was already flagged in a previous review. Please update the log message to: "Shutting down pipeline"

🧹 Nitpick comments (1)
tools/provisioner/Dockerfile (1)

10-16: Standardize COPY path syntax for consistency.

The provisioner build stage mixes relative paths (COPY go.* . at line 11) and absolute paths (COPY cmd /src/tools/provisioner/cmd at lines 14–15). Since the WORKDIR is already set to /src/tools/provisioner, use consistent relative paths throughout.

Apply this diff to use consistent relative paths:

 WORKDIR /src/tools/provisioner
 COPY go.* .
 RUN --mount=type=cache,target=$GOPATH go mod download
 COPY main.go .
-COPY cmd /src/tools/provisioner/cmd
-COPY pkg /src/tools/provisioner/pkg
+COPY cmd ./cmd
+COPY pkg ./pkg
 RUN --mount=type=cache,target=$GOPATH go build -o provisioner
📜 Review details

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

Disabled knowledge base sources:

  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between cbfe1c3 and 5ea84b7.

⛔ Files ignored due to path filters (2)
  • internal/storage/bucket/migrations/42-add-missing-index/notes.yaml is excluded by !**/*.yaml
  • internal/storage/bucket/migrations/43-fix-missing-inserted-at-in-log-data/notes.yaml is excluded by !**/*.yaml
📒 Files selected for processing (7)
  • internal/replication/drivers/clickhouse/driver.go (1 hunks)
  • internal/replication/manager.go (1 hunks)
  • internal/replication/pipeline.go (3 hunks)
  • internal/storage/bucket/migrations/42-add-missing-index/up.sql (1 hunks)
  • internal/storage/bucket/migrations/43-fix-missing-inserted-at-in-log-data/up.sql (1 hunks)
  • tools/provisioner/Dockerfile (2 hunks)
  • tools/provisioner/justfile (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (3)
  • internal/replication/manager.go
  • internal/storage/bucket/migrations/43-fix-missing-inserted-at-in-log-data/up.sql
  • tools/provisioner/justfile
🧰 Additional context used
🧠 Learnings (1)
📓 Common learnings
Learnt from: gfyrag
Repo: formancehq/ledger PR: 892
File: internal/controller/ledger/controller_default.go:196-196
Timestamp: 2025-04-29T11:24:28.923Z
Learning: In the ledger Import function, it's critical to maintain proper log ID tracking by updating lastLogID with the current log.ID after each processed log, rather than setting it to nil. This ensures the system can properly validate the ordering of logs and prevent duplicate or out-of-order processing, which is essential for maintaining data integrity in the ledger.
🧬 Code graph analysis (1)
internal/replication/pipeline.go (1)
deployments/pulumi/pkg/config/config.go (1)
  • Duration (265-265)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
  • GitHub Check: Dirty
  • GitHub Check: Tests
🔇 Additional comments (8)
internal/storage/bucket/migrations/42-add-missing-index/up.sql (1)

1-1: Clarify migration purpose and verify index necessity.

The filename and content suggest this migration adds an index on logs.id, but the PR objectives indicate migration #42 should remove a useless trigger. This mismatch needs clarification.

Additionally, assuming id is a primary key (typical for this column), a unique index already exists on it. Creating a second non-unique index on the same column may be redundant unless it serves a specific query optimization purpose distinct from the PK index.

Consider adding IF NOT EXISTS to avoid failures on re-runs:

-create index {{ if not .Transactional }}concurrently{{end}} logs_ids on "{{.Schema}}".logs (id);
+create index if not exists {{ if not .Transactional }}concurrently{{end}} logs_ids on "{{.Schema}}".logs (id);
tools/provisioner/Dockerfile (1)

4-8: I'll help you verify whether the "root" build stage exists in the Dockerfile. Let me search for it.
<function_calls>


#!/bin/bash

Verify the "root" build stage exists in the complete Dockerfile

grep -n "^FROM.*AS root|^FROM.*as root" tools/provisioner/Dockerfile


</function_calls>

Now let me also examine the full Dockerfile to understand its complete structure:
<function_calls>


#!/bin/bash

Read the complete Dockerfile to understand its structure

cat -n tools/provisioner/Dockerfile


</function_calls>

Please run these scripts to gather the verification results.

internal/replication/drivers/clickhouse/driver.go (1)

47-48: LGTM! Helpful diagnostic addition.

The debug log provides visibility into batch sizes being processed, which aligns with the PR's goal of improving observability in the replication pipeline.

internal/replication/pipeline.go (5)

67-67: LGTM! Good refactoring for termination handling.

The addition of startup logging and the stop helper function centralizes termination logic and improves observability. Clean implementation.

Also applies to: 70-73, 78-78


81-81: LGTM! Clear fetch diagnostics.

These debug logs provide good visibility into the fetch cycle and batch sizes, making it easier to diagnose pipeline behavior.

Also applies to: 105-105


147-147: LGTM! Critical tracking made visible.

This debug log provides visibility into lastLogID updates, which is essential for maintaining proper log ordering and preventing duplicate or out-of-order processing. Based on learnings, proper lastLogID tracking is critical for data integrity in the ledger.


159-159: LGTM! Useful pagination diagnostic.

This log helps distinguish between "waiting for new logs" vs "fetching next page immediately," making it easier to understand pipeline pacing.


112-141: Context cancellation is properly handled; infinite retry without error classification remains a valid concern.

The code review's concerns are substantiated:

  1. Context cancellation handling: ✓ Verified as sound. All driver implementations (ClickHouse, Elasticsearch, HTTP) properly accept and pass the context to their underlying libraries (PrepareBatch, bulk.Do, http.Client), which handle cancellation correctly. The exportContext with cancellation at line 115 is properly used and cleaned up.

  2. Infinite retry loop: ✓ Confirmed. The for loop at line 111 will retry indefinitely on any error from the exporter—there is no max retry count, no error classification, and no circuit breaker. The only exits are: success (line 142 break), receiving a stop signal (lines 137-140), or outer context cancellation. This means permanent errors (invalid credentials, schema mismatches, etc.) will cause unbounded retry loops with the configured delay.

The design appears intentional—retry until explicitly stopped—but without distinguishing transient from permanent errors, permanent failures will waste resources with repeated failed attempts.

COPY pkg /src/tools/provisioner/pkg
RUN --mount=type=cache,target=$GOPATH go build -o provisioner

# syntax=docker/dockerfile:1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Remove the duplicate syntax directive.

The # syntax=docker/dockerfile:1 directive must appear only at the very beginning of the Dockerfile (before any other lines, comments, or directives). The duplicate at line 18 is a syntax error. The directive at line 1 is correctly placed.

Apply this diff to remove the duplicate:

-# syntax=docker/dockerfile:1
 FROM alpine:3.21
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
# syntax=docker/dockerfile:1
FROM alpine:3.21
🤖 Prompt for AI Agents
In tools/provisioner/Dockerfile around line 18, there's a duplicate docker
syntax directive ("# syntax=docker/dockerfile:1") which must only appear at the
very top of the file; remove the directive on line 18 so only the existing one
at line 1 remains, ensuring no other comments or lines precede the top
directive.

errChan <- err
}()
select {
case err := <-errChan:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we wait also for the context ?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants