Skip to content

Conversation

@gfyrag
Copy link
Contributor

@gfyrag gfyrag commented Nov 28, 2025

Also fixes stop issue of exporters.


Summary by cubic

Fixes missing transaction.insertedAt in log records and improves exporter stop handling to prevent stuck replication. Also adds an index on logs.id for faster fetches.

  • Bug Fixes

    • Properly stop exporters and pipelines via stopChannel and context cancellation.
    • Skip “already started” errors when restoring pipelines.
    • Add debug logs in ClickHouse driver and pipeline to trace batches and progress.
    • Provisioner: add Dockerfile syntax directive and make registry configurable in justfile.
  • Migration

    • Backfill transaction.insertedAt in logs with a batched update and pg_notify progress.
    • Create a concurrent index on logs.id to speed up queries.

Written for commit e6ada90. Summary will update automatically on new commits.

Also fixes stop issue of exporters.
@gfyrag gfyrag requested a review from a team as a code owner November 28, 2025 10:14
@coderabbitai
Copy link
Contributor

coderabbitai bot commented Nov 28, 2025

Walkthrough

This pull request enhances the replication system with improved debug logging and error handling, adds two database migrations for log indexing and timestamp population, and refactors the provisioner Docker build configuration with multi-stage improvements.

Changes

Cohort / File(s) Summary
Replication system logging & error handling
internal/replication/drivers/clickhouse/driver.go, internal/replication/manager.go, internal/replication/pipeline.go
Added debug logging throughout replication pipelines (batch preparation, fetch operations, progress tracking). Modified error handling in manager.go to gracefully skip already-started pipelines instead of failing. Refactored pipeline.go with improved concurrency: exporter data push now runs in separate goroutine with cancellable context, explicit error channel handling, and consistent stop/cleanup semantics.
Database migrations
internal/storage/bucket/migrations/42-add-missing-index/up.sql, internal/storage/bucket/migrations/43-fix-missing-inserted-at-in-log-data/up.sql
Added index on logs(id) table with conditional concurrent directive. Introduced DO block migration to process and populate missing insertedAt timestamps in log data using batched updates with progress notifications.
Provisioner tooling
tools/provisioner/Dockerfile, tools/provisioner/justfile
Enhanced Dockerfile with syntax directives, expanded multi-stage build source copying, and added explicit ENTRYPOINT and CMD. Updated push-image recipe to accept parameterized registry (defaulting to ghcr.io) instead of hardcoded value.

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

Areas requiring extra attention:

  • internal/replication/pipeline.go: Goroutine lifecycle management in exporter push refactoring and cancellation semantics across error paths
  • internal/replication/manager.go: Error handling flow change for ledger.ErrAlreadyStarted and implications for pipeline restoration
  • internal/storage/bucket/migrations/43-fix-missing-inserted-at-in-log-data/up.sql: Correctness of batched UPDATE logic and JSONB timestamp formatting

Poem

🐰 Logs now indexed, timestamps aligned,
Pipelines skip ahead with grace in mind,
Goroutines coordinated, clean shutdown flows,
Docker builds refined, the provisioner grows!

Pre-merge checks and finishing touches

✅ Passed checks (3 passed)
Check name Status Explanation
Title check ✅ Passed The title 'fix: missing inserted at in logs' directly aligns with the primary objective of the changeset - fixing missing transaction.insertedAt timestamps in log records through a migration and backfill logic.
Description check ✅ Passed The description is related to the changeset, covering the main fixes including missing insertedAt in logs, exporter stop handling improvements, and supporting infrastructure changes like migrations and index creation.
Docstring Coverage ✅ Passed No functions found in the changed files to evaluate docstring coverage. Skipping docstring coverage check.
✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch hotfix/v2.3/missing-inserted-at

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
tools/provisioner/Dockerfile (1)

4-8: Invalid Docker COPY syntax: --from=root references undefined stage.

The COPY --from=root directives reference a stage named root, but no such stage is defined in this Dockerfile. Standard Docker syntax requires either:

  • Copying from the build context directly: COPY pkg pkg (without --from=)
  • Copying from a named stage: COPY --from=stage_name source dest

The --from=root syntax is not valid and will cause the build to fail.

Apply this diff to copy from the build context instead:

 FROM golang:1.24-alpine AS compiler
 WORKDIR /src
-COPY --from=root pkg pkg
-COPY --from=root internal internal
-COPY --from=root cmd cmd
-COPY --from=root go.* .
-COPY --from=root *.go .
+COPY pkg pkg
+COPY internal internal
+COPY cmd cmd
+COPY go.* .
+COPY *.go .
🧹 Nitpick comments (1)
internal/storage/bucket/migrations/43-fix-missing-inserted-at-in-log-data/up.sql (1)

30-30: Use standard PL/pgSQL assignment operator := instead of =.

Line 30 uses = for assignment, which is non-standard in PL/pgSQL contexts. Replace with := to follow PostgreSQL conventions and ensure portability across versions.

-			_offset = _offset + _batch_size;
+			_offset := _offset + _batch_size;
📜 Review details

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

Disabled knowledge base sources:

  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between 20af1b7 and e6ada90.

⛔ Files ignored due to path filters (2)
  • internal/storage/bucket/migrations/42-add-missing-index/notes.yaml is excluded by !**/*.yaml
  • internal/storage/bucket/migrations/43-fix-missing-inserted-at-in-log-data/notes.yaml is excluded by !**/*.yaml
📒 Files selected for processing (7)
  • internal/replication/drivers/clickhouse/driver.go (1 hunks)
  • internal/replication/manager.go (1 hunks)
  • internal/replication/pipeline.go (3 hunks)
  • internal/storage/bucket/migrations/42-add-missing-index/up.sql (1 hunks)
  • internal/storage/bucket/migrations/43-fix-missing-inserted-at-in-log-data/up.sql (1 hunks)
  • tools/provisioner/Dockerfile (2 hunks)
  • tools/provisioner/justfile (1 hunks)
🧰 Additional context used
🧠 Learnings (3)
📓 Common learnings
Learnt from: gfyrag
Repo: formancehq/ledger PR: 892
File: internal/controller/ledger/controller_default.go:196-196
Timestamp: 2025-04-29T11:24:28.923Z
Learning: In the ledger Import function, it's critical to maintain proper log ID tracking by updating lastLogID with the current log.ID after each processed log, rather than setting it to nil. This ensures the system can properly validate the ordering of logs and prevent duplicate or out-of-order processing, which is essential for maintaining data integrity in the ledger.
📚 Learning: 2025-11-28T10:06:01.363Z
Learnt from: gfyrag
Repo: formancehq/ledger PR: 1167
File: internal/storage/bucket/migrations/43-fix-missing-inserted-at-in-log-data/up.sql:23-23
Timestamp: 2025-11-28T10:06:01.363Z
Learning: In PostgreSQL migrations for the Formance ledger, to ensure ISO 8601/RFC3339 date formatting when setting JSONB fields, use the pattern `to_jsonb(date)#>>'{}'` to extract the timestamp as a properly formatted string. This leverages JSONB's built-in ISO 8601 encoding. Using `date::text` would depend on the server's DateStyle setting and doesn't guarantee ISO 8601 format.

Applied to files:

  • internal/storage/bucket/migrations/43-fix-missing-inserted-at-in-log-data/up.sql
📚 Learning: 2025-07-19T18:35:34.260Z
Learnt from: gfyrag
Repo: formancehq/ledger PR: 1017
File: internal/storage/bucket/migrations/27-fix-invalid-pcv/up.sql:30-31
Timestamp: 2025-07-19T18:35:34.260Z
Learning: In Formance ledger migrations, SHARE ROW EXCLUSIVE locks on migration-specific temporary tables like moves_view are acceptable since these tables are dedicated for the migration. ACCESS SHARE locks on transactions table don't block writes, only schema changes.

Applied to files:

  • internal/storage/bucket/migrations/43-fix-missing-inserted-at-in-log-data/up.sql
🧬 Code graph analysis (2)
internal/replication/manager.go (1)
internal/errors.go (1)
  • ErrAlreadyStarted (75-75)
internal/replication/pipeline.go (1)
deployments/pulumi/pkg/config/config.go (1)
  • Duration (262-262)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
  • GitHub Check: cubic · AI code reviewer
  • GitHub Check: Dirty
  • GitHub Check: Tests
🔇 Additional comments (9)
tools/provisioner/justfile (1)

5-9: Parameterized registry in push-image looks good and stays backward compatible

Exposing registry as a parameter with default ghcr.io and using it in the tag keeps existing behavior while allowing overrides. No issues spotted with just syntax or the docker buildx invocation.

tools/provisioner/Dockerfile (1)

22-23: Clarify the intent of the default CMD.

Setting CMD ["--help"] makes the container display help text when run without arguments. This is unusual—most provisioning tools default to performing their primary function. Verify this is intentional or consider using an empty CMD or startup script instead.

internal/replication/drivers/clickhouse/driver.go (1)

42-45: Debug batch-size logging in Accept looks good

The new debug line cleanly surfaces batch size without altering control flow or behavior and reuses the existing logger consistently with the rest of the driver.

internal/storage/bucket/migrations/42-add-missing-index/up.sql (1)

1-1: Verify that this index creation is necessary and follows naming conventions.

If id is already a primary key, it will have an implicit index. Confirm whether this explicit index is needed for performance reasons (e.g., to support the batch migration in migration 43).

The index name logs_ids is unconventional; consider renaming to logs_id_idx to follow PostgreSQL naming conventions.

internal/storage/bucket/migrations/43-fix-missing-inserted-at-in-log-data/up.sql (1)

23-26: I'll verify the critical issue identified in the review comment by examining the full migration file to confirm whether the date reference is truly undefined.
<function_calls>

find . -type f -name "up.sql" | grep "43-fix-missing-inserted-at-in-log-data" | head -5

</function_calls>

Now let me read the full migration file:
<function_calls>

cat -n internal/storage/bucket/migrations/43-fix-missing-inserted-at-in-log-data/up.sql

</function_calls>

internal/replication/pipeline.go (3)

66-72: Good consolidation of stop logic.

The stop helper ensures consistent logging and channel cleanup across all shutdown paths. This reduces duplication and makes the shutdown behavior easier to maintain.


80-80: Debug logging additions improve observability.

The added debug statements provide useful visibility into the pipeline's fetch cycle, batch sizes, and progress tracking without cluttering logs at higher levels.


111-142: Cancellation handling for exporter push is correctly implemented.

The exporter drivers properly respect context cancellation. The HTTP driver uses req.WithContext(ctx) to ensure context cancellation propagates to the underlying HTTP request, and simpler drivers like noop return immediately. The goroutine spawned in the pipeline correctly receives the cancel signal through the cancellable exportContext, preventing goroutine leaks. The buffered error channel correctly prevents blocking if the stop signal arrives before the export completes.

internal/replication/manager.go (1)

218-224: Error comparison pattern correctly ignores pipeline ID values.

The ErrAlreadyStarted.Is() method (internal/errors.go:81-83) performs type-based matching via type assertion (err.(ErrAlreadyStarted)), not value comparison. This means errors.Is(err, ledger.ErrAlreadyStarted("")) will correctly match any ErrAlreadyStarted error regardless of the pipeline ID value it contains.

The graceful handling—logging and skipping rather than returning error—is appropriate for a periodic synchronization operation. The pattern aligns with similar error handling elsewhere in the file.


func (p *PipelineHandler) Shutdown(ctx context.Context) error {
p.logger.Infof("shutdowning pipeline")
p.logger.Infof("Shutdowning pipeline")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Typo in log message: "Shutdowning" should be "Shutting down".

-	p.logger.Infof("Shutdowning pipeline")
+	p.logger.Infof("Shutting down pipeline")
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
p.logger.Infof("Shutdowning pipeline")
p.logger.Infof("Shutting down pipeline")
🤖 Prompt for AI Agents
In internal/replication/pipeline.go around line 166, the log message contains a
typo: replace the string "Shutdowning pipeline" with the corrected phrasing
"Shutting down pipeline" (preserve existing logger call and formatting).

@codecov
Copy link

codecov bot commented Nov 28, 2025

Codecov Report

❌ Patch coverage is 60.46512% with 17 lines in your changes missing coverage. Please review.
✅ Project coverage is 81.94%. Comparing base (20af1b7) to head (e6ada90).

Files with missing lines Patch % Lines
internal/replication/pipeline.go 59.45% 14 Missing and 1 partial ⚠️
internal/replication/manager.go 60.00% 2 Missing ⚠️
Additional details and impacted files
@@               Coverage Diff                @@
##           release/v2.3    #1168      +/-   ##
================================================
- Coverage         82.03%   81.94%   -0.10%     
================================================
  Files               187      187              
  Lines              8989     9016      +27     
================================================
+ Hits               7374     7388      +14     
- Misses             1188     1199      +11     
- Partials            427      429       +2     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

Copy link

@cubic-dev-ai cubic-dev-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1 issue found across 9 files

Prompt for AI agents (all 1 issues)

Check if these issues are valid — if so, understand the root cause of each and fix them.


<file name="internal/replication/pipeline.go">

<violation number="1" location="internal/replication/pipeline.go:96">
Context cancellation is no longer observed while retrying after a fetch error, so Run spins forever on a cancelled context instead of exiting.</violation>
</file>

Reply to cubic to teach it or ask questions. Re-run a review with @cubic-dev-ai review this PR

p.logger.Errorf("Error fetching logs: %s", err)
select {
case <-ctx.Done():
case ch := <-p.stopChannel:
Copy link

@cubic-dev-ai cubic-dev-ai bot Nov 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Context cancellation is no longer observed while retrying after a fetch error, so Run spins forever on a cancelled context instead of exiting.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At internal/replication/pipeline.go, line 96:

<comment>Context cancellation is no longer observed while retrying after a fetch error, so Run spins forever on a cancelled context instead of exiting.</comment>

<file context>
@@ -85,38 +93,57 @@ func (p *PipelineHandler) Run(ctx context.Context, ingestedLogs chan uint64) {
 				p.logger.Errorf(&quot;Error fetching logs: %s&quot;, err)
 				select {
-				case &lt;-ctx.Done():
+				case ch := &lt;-p.stopChannel:
+					stop(ch)
 					return
</file context>
Fix with Cubic

p.logger.Errorf("Error fetching logs: %s", err)
select {
case <-ctx.Done():
case ch := <-p.stopChannel:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The bot is right here, do you want to wait forever here ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the pipeline has a Stop(ctx) method.
The cancellation should not be done using the context, but the dedicated method.

errChan <- err
}()
select {
case err := <-errChan:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here, do you want to select also on the context ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same response ^^

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants