-
Notifications
You must be signed in to change notification settings - Fork 134
fix: missing inserted at in logs #1167
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
WalkthroughThis pull request adds comprehensive diagnostic logging throughout the replication pipeline, updates Docker provisioning infrastructure with multi-stage builds and Alpine final image, introduces database migrations for log data repairs and indexing, and parameterizes image registry configuration. Changes
Estimated code review effort🎯 2 (Simple) | ⏱️ ~12 minutes
Poem
Pre-merge checks and finishing touches✅ Passed checks (3 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
Comment |
059deb1 to
4277ffe
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🧹 Nitpick comments (2)
tools/provisioner/Dockerfile (1)
18-18: Remove duplicate syntax directive.The syntax directive on line 18 appears to be a duplicate of line 1. Dockerfile syntax directives should typically appear once at the beginning of the file.
Apply this diff to remove the duplicate:
RUN --mount=type=cache,target=$GOPATH go build -o provisioner -# syntax=docker/dockerfile:1 - FROM alpine:3.21internal/storage/bucket/migrations/42-fix-missing-inserted-at-in-log-data/up.sql (1)
8-12: Consider performance implications of the batching strategy.The current approach uses
row_number() over (order by id)with offset-based pagination. For large tables, this can be inefficient because PostgreSQL must compute row numbers for all rows on each iteration. Consider using a cursor-based approach withid > last_processed_idinstead.However, given that this is a one-time migration and the PR description indicates awareness that "migrations run on deploy and may take time on large datasets," this approach is acceptable if simpler to reason about.
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Disabled knowledge base sources:
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
⛔ Files ignored due to path filters (2)
internal/storage/bucket/migrations/41-add-missing-index/notes.yamlis excluded by!**/*.yamlinternal/storage/bucket/migrations/42-fix-missing-inserted-at-in-log-data/notes.yamlis excluded by!**/*.yaml
📒 Files selected for processing (6)
internal/replication/manager.go(1 hunks)internal/replication/pipeline.go(3 hunks)internal/storage/bucket/migrations/41-add-missing-index/up.sql(1 hunks)internal/storage/bucket/migrations/42-fix-missing-inserted-at-in-log-data/up.sql(1 hunks)tools/provisioner/Dockerfile(2 hunks)tools/provisioner/justfile(1 hunks)
🧰 Additional context used
🧠 Learnings (4)
📓 Common learnings
Learnt from: gfyrag
Repo: formancehq/ledger PR: 892
File: internal/controller/ledger/controller_default.go:196-196
Timestamp: 2025-04-29T11:24:28.923Z
Learning: In the ledger Import function, it's critical to maintain proper log ID tracking by updating lastLogID with the current log.ID after each processed log, rather than setting it to nil. This ensures the system can properly validate the ordering of logs and prevent duplicate or out-of-order processing, which is essential for maintaining data integrity in the ledger.
📚 Learning: 2025-04-29T11:24:28.923Z
Learnt from: gfyrag
Repo: formancehq/ledger PR: 892
File: internal/controller/ledger/controller_default.go:196-196
Timestamp: 2025-04-29T11:24:28.923Z
Learning: In the ledger Import function, it's critical to maintain proper log ID tracking by updating lastLogID with the current log.ID after each processed log, rather than setting it to nil. This ensures the system can properly validate the ordering of logs and prevent duplicate or out-of-order processing, which is essential for maintaining data integrity in the ledger.
Applied to files:
internal/replication/pipeline.go
📚 Learning: 2025-07-19T18:35:34.260Z
Learnt from: gfyrag
Repo: formancehq/ledger PR: 1017
File: internal/storage/bucket/migrations/27-fix-invalid-pcv/up.sql:30-31
Timestamp: 2025-07-19T18:35:34.260Z
Learning: In Formance ledger migrations, SHARE ROW EXCLUSIVE locks on migration-specific temporary tables like moves_view are acceptable since these tables are dedicated for the migration. ACCESS SHARE locks on transactions table don't block writes, only schema changes.
Applied to files:
internal/storage/bucket/migrations/42-fix-missing-inserted-at-in-log-data/up.sql
📚 Learning: 2025-05-20T13:48:07.455Z
Learnt from: gfyrag
Repo: formancehq/ledger PR: 935
File: internal/controller/system/state_tracker.go:0-0
Timestamp: 2025-05-20T13:48:07.455Z
Learning: In the Formance ledger codebase, sequence reset queries with `select setval` don't require COALESCE around max(id) even for brand new ledgers, as the system handles this case properly.
Applied to files:
internal/storage/bucket/migrations/42-fix-missing-inserted-at-in-log-data/up.sql
🧬 Code graph analysis (2)
internal/replication/manager.go (1)
internal/errors.go (1)
ErrAlreadyStarted(75-75)
internal/replication/pipeline.go (1)
deployments/pulumi/pkg/config/config.go (1)
Duration(265-265)
🔇 Additional comments (6)
tools/provisioner/Dockerfile (1)
1-23: LGTM - Multi-stage build improvements.The Dockerfile properly sets up a multi-stage build with caching for Go dependencies and produces a minimal Alpine-based final image with appropriate entrypoint and default command.
tools/provisioner/justfile (1)
5-8: LGTM - Registry parameterization.The addition of the
registryparameter with a sensible default maintains backward compatibility while allowing flexibility for different container registries.internal/replication/manager.go (2)
220-226: Improved resilience for pipeline restoration.The change to handle
ErrAlreadyStartedgracefully during synchronization makes the system more resilient to race conditions during pipeline restoration. This aligns well with the PR objective to improve replication stability.
220-226: Perfect! I've now gathered all the necessary information to verify the error comparison implementation. Let me analyze the findings:Verification Results:
ErrAlreadyStarted Definition (internal/errors.go:75-88):
- Type:
type ErrAlreadyStarted string- Implements
Error()method ✓- Implements
Is(err error) boolmethod ✓- Has
NewErrAlreadyStarted()constructor ✓Is() Method Implementation (lines 81-83):
func (e ErrAlreadyStarted) Is(err error) bool { _, ok := err.(ErrAlreadyStarted) return ok }Usage Pattern: The code correctly uses
errors.Is(err, ledger.ErrAlreadyStarted(""))which works as follows:
errors.Is()will invoke theIs()method on the target (ErrAlreadyStarted(""))- The
Is()method performs a type assertion to check if the error is of typeErrAlreadyStarted- The empty string
""is just a sentinel value; the actual comparison only checks the typeConsistency: This exact pattern is used successfully in multiple places:
- Line 160 in controller_grpc_server.go
- Line 21 in controllers_pipeline_start.go
- Line 46 in controllers_pipeline_start_test.go
Verify that error comparison works correctly for ErrAlreadyStarted.
The code is correct.
ErrAlreadyStartedproperly implements both theError()method and theIs(err error) boolmethod required forerrors.Is()to work correctly. The type assertion in theIs()method ensures the comparison succeeds only when the error is of typeErrAlreadyStarted, regardless of the string value. This pattern is consistently used throughout the codebase and functions as intended.internal/replication/pipeline.go (1)
67-67: LGTM - Enhanced observability.The debug logging additions provide valuable visibility into pipeline execution flow without changing any behavior. These logs will help diagnose replication issues and align with the PR objective to "add clearer debug logs."
Also applies to: 73-73, 77-77, 100-100, 126-126
internal/storage/bucket/migrations/41-add-missing-index/up.sql (1)
1-1: logs.id is not a primary key and the index is not redundant.The logs table has
seqas its primary key (bigserial), notid. The table includes a composite unique index on(ledger, id)but no standalone index on theidcolumn alone. The migration correctly adds a new index onlogs(id)which provides value for queries that filter or sort by theidcolumn independently.
internal/storage/bucket/migrations/43-fix-missing-inserted-at-in-log-data/up.sql
Show resolved
Hide resolved
internal/storage/bucket/migrations/43-fix-missing-inserted-at-in-log-data/up.sql
Show resolved
Hide resolved
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #1167 +/- ##
==========================================
- Coverage 82.09% 81.33% -0.76%
==========================================
Files 190 190
Lines 9319 9356 +37
==========================================
- Hits 7650 7610 -40
- Misses 1216 1238 +22
- Partials 453 508 +55 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No issues found across 8 files
4277ffe to
11aee7c
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (2)
internal/storage/bucket/migrations/43-fix-missing-inserted-at-in-log-data/up.sql (2)
23-23: Simplify the JSONB timestamp formatting.The double
to_jsonbwrapping is unnecessary. The expressionto_jsonb(to_jsonb(date)#>>'{}' || 'Z')converts to JSON, extracts as text, concatenates 'Z', then converts back to JSON—introducing redundant transformations that reduce clarity and efficiency.Apply this diff to simplify:
- set data = jsonb_set(data, '{transaction, insertedAt}', to_jsonb(to_jsonb(date)#>>'{}' || 'Z')) + set data = jsonb_set(data, '{transaction, insertedAt}', to_jsonb(date::text || 'Z'))This directly casts the timestamp to text, appends 'Z', and converts to JSON in a single, clear operation.
28-28: Fix the unreliable loop exit condition.The
exit when not found;after UPDATE with a CTE may not work as intended. In PostgreSQL, theFOUNDvariable behavior with UPDATE statements in CTE context is unpredictable, and this can cause the loop to exit prematurely or run indefinitely. This is especially risky for a data backfill migration where completeness is critical.To reliably detect when no rows are updated, use
GET DIAGNOSTICSwithROW_COUNTinstead.Apply this diff:
do $$ declare _offset integer := 0; _batch_size integer := 1000; + _updated_count integer; begin set search_path = '{{ .Schema }}'; create temp table logs_view as select row_number() over (order by id) as row_number, id, ledger from logs where type = 'NEW_TRANSACTION' or type = 'REVERTED_TRANSACTION'; create index logs_view_row_numbers on logs_view(row_number); perform pg_notify('migrations-{{ .Schema }}', 'init: ' || (select count(*) from logs_view)); loop with _rows as ( select id, ledger, row_number from logs_view where row_number >= _offset and row_number < _offset + _batch_size ) update logs set data = jsonb_set(data, '{transaction, insertedAt}', to_jsonb(date::text || 'Z')) from _rows where logs.id = _rows.id and logs.ledger = _rows.ledger; - exit when not found; + GET DIAGNOSTICS _updated_count = ROW_COUNT; + exit when _updated_count = 0; _offset = _offset + _batch_size; perform pg_notify('migrations-{{ .Schema }}', 'continue: ' || _batch_size); commit; end loop; drop table if exists logs_view; end $$;
🧹 Nitpick comments (3)
tools/provisioner/Dockerfile (1)
18-18: Remove duplicate syntax directive.The syntax directive is already declared on line 1 and doesn't need to be repeated here.
Apply this diff:
-# syntax=docker/dockerfile:1 FROM alpine:3.21internal/replication/pipeline.go (2)
112-141: Async exporter push with retries preserves semantics; consider small refinementsThe new async pattern—wrapping
exporter.Acceptin a goroutine with a derivedexportContext, reporting via a bufferederrChan, and retrying the same batch on error withPushRetryPeriod—looks sound:
- Ordering is preserved because you don’t advance
LastLogIDuntil after this loop completes successfully.stopChannelis honored even while a push is in flight, andcancel()is called in both success and stop paths.- The buffered
errChanensures the goroutine can exit even if the stop case wins the select; there’s no goroutine leak.Two optional improvements you might consider:
- Precompute the mapped slice outside the retry loop to avoid repeated allocations on retries:
- for { - p.logger.Debugf("Send data to exporter.") - errChan := make(chan error, 1) - exportContext, cancel := context.WithCancel(ctx) - go func() { - _, err := p.exporter.Accept(exportContext, collectionutils.Map(logs.Data, func(log ledger.Log) drivers.LogWithLedger { - return drivers.LogWithLedger{ - Log: log, - Ledger: p.pipeline.Ledger, - } - })...) - errChan <- err - }() + mapped := collectionutils.Map(logs.Data, func(log ledger.Log) drivers.LogWithLedger { + return drivers.LogWithLedger{ + Log: log, + Ledger: p.pipeline.Ledger, + } + }) + for { + p.logger.Debugf("Send data to exporter.") + errChan := make(chan error, 1) + exportContext, cancel := context.WithCancel(ctx) + go func() { + _, err := p.exporter.Accept(exportContext, mapped...) + errChan <- err + }()
- Optionally special‑case context cancellation so a
context.Canceled/DeadlineExceededfrom the exporter is treated as a terminal condition instead of something to retry forever (e.g., bail out ifctx.Err() != nilinside theerr != nilbranch).These are non‑blocking suggestions; the current behavior is consistent and correct.
167-181: Shutdown handshake and logger enrichment are consistent and helpful
Shutdownnow logs at info/debug, sends an error channel throughp.stopChannel, and waits for it to be closed—this preserves the previous handshake semantics while giving better visibility into shutdown timing.- The logger enrichment with
"component"="pipeline","module"=pipeline.Ledger, and"driver"=pipeline.ExporterIDwill make the new debug messages much more actionable in multi‑pipeline setups.One minor nit, if you care about log wording, is that
"Shutdowning pipeline"could be tweaked to"Shutting down pipeline"for readability, but that’s purely cosmetic.- p.logger.Infof("Shutdowning pipeline") + p.logger.Infof("Shutting down pipeline")Also applies to: 201-205
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Disabled knowledge base sources:
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
⛔ Files ignored due to path filters (3)
internal/storage/bucket/migrations/41-add-missing-index/notes.yamlis excluded by!**/*.yamlinternal/storage/bucket/migrations/42-remove-useless-trigger/notes.yamlis excluded by!**/*.yamlinternal/storage/bucket/migrations/43-fix-missing-inserted-at-in-log-data/notes.yamlis excluded by!**/*.yaml
📒 Files selected for processing (8)
cmd/worker.go(1 hunks)internal/replication/drivers/clickhouse/driver.go(1 hunks)internal/replication/manager.go(1 hunks)internal/replication/pipeline.go(3 hunks)internal/storage/bucket/migrations/41-add-missing-index/up.sql(1 hunks)internal/storage/bucket/migrations/43-fix-missing-inserted-at-in-log-data/up.sql(1 hunks)tools/provisioner/Dockerfile(2 hunks)tools/provisioner/justfile(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
- internal/storage/bucket/migrations/41-add-missing-index/up.sql
- internal/replication/manager.go
🧰 Additional context used
🧠 Learnings (2)
📓 Common learnings
Learnt from: gfyrag
Repo: formancehq/ledger PR: 892
File: internal/controller/ledger/controller_default.go:196-196
Timestamp: 2025-04-29T11:24:28.923Z
Learning: In the ledger Import function, it's critical to maintain proper log ID tracking by updating lastLogID with the current log.ID after each processed log, rather than setting it to nil. This ensures the system can properly validate the ordering of logs and prevent duplicate or out-of-order processing, which is essential for maintaining data integrity in the ledger.
📚 Learning: 2025-07-19T18:35:34.260Z
Learnt from: gfyrag
Repo: formancehq/ledger PR: 1017
File: internal/storage/bucket/migrations/27-fix-invalid-pcv/up.sql:30-31
Timestamp: 2025-07-19T18:35:34.260Z
Learning: In Formance ledger migrations, SHARE ROW EXCLUSIVE locks on migration-specific temporary tables like moves_view are acceptable since these tables are dedicated for the migration. ACCESS SHARE locks on transactions table don't block writes, only schema changes.
Applied to files:
internal/storage/bucket/migrations/43-fix-missing-inserted-at-in-log-data/up.sql
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
- GitHub Check: Tests
- GitHub Check: Dirty
🔇 Additional comments (9)
internal/storage/bucket/migrations/43-fix-missing-inserted-at-in-log-data/up.sql (1)
16-35: Batch processing and progress tracking are well-implemented.The loop structure with configurable batch size, indexed range queries, and pg_notify progress signals is solid. Separate commits per batch appropriately prevents transaction bloat on large datasets. Ensure the
exit when not found;fix is applied (see separate comment) so the loop terminates reliably when all rows are processed.tools/provisioner/justfile (1)
5-8: LGTM! Good parameterization.The registry parameterization allows flexible image publishing while preserving the default behavior. The implementation is clean and follows best practices.
tools/provisioner/Dockerfile (3)
1-1: LGTM! Best practice for BuildKit.The syntax directive enables modern Docker BuildKit features and is recommended for all Dockerfiles.
19-23: Alpine 3.21 is available on Docker Hub.The verification shows that Alpine 3.21 is a released version with multiple patch levels available (3.21.5, 3.21.4, 3.21.3, 3.21.2, 3.21.1, 3.21.0). The review comment's concern that "Alpine 3.21 may not be released yet" is unfounded. The Dockerfile's use of
FROM alpine:3.21is valid and correctly references an available base image.
2-16: golang:1.25-alpine is available and stable; no version issues to address.Verification confirms that golang:1.25-alpine exists on Docker Hub and has been stable since August 2025. The image is available in multiple patch versions (1.25.0, 1.25.1, 1.25.2, 1.25.3, 1.25.4), confirming the Dockerfile's base image reference is valid. The build process reorganization with go mod download cache optimization is sound.
cmd/worker.go (1)
82-82: Worker logs page size default bump to 1000 looks fineIncreasing the default page size to 1000 is consistent with the new pagination/indexing work and should just reduce round‑trips at the cost of somewhat larger batches; no correctness concerns here.
internal/replication/drivers/clickhouse/driver.go (1)
47-48: Accept debug logging is appropriate and side‑effect freeThe new debug line cleanly exposes batch size without touching control flow or error handling; this should help correlate pipeline logs with ClickHouse inserts.
internal/replication/pipeline.go (2)
67-82: Lifecycle and fetch‑path logging changes look correctThe added “Pipeline started/terminated”, “Fetch next batch.”, and “Got %d items” logs, plus the factored
stophelper, make the control flow around the main loop and stop handling clearer without changing behavior. Error handling onListLogsstill backs off usingPullIntervaland respects a stop signal viap.stopChannel, which is consistent with the existing design.Also applies to: 95-106
146-160: LastLogID advancement and “has more” logging align with ordering guarantees
lastLogID := logs.Data[len(logs.Data)-1].ID(safe because of the priorlen(logs.Data) == 0guard) and updatingp.pipeline.LastLogIDonly after a successful exporter push ensures log IDs move forward strictly after processing, matching the need to maintain correct log ordering and avoid gaps. The added debug messages (“Move last log id to…”, “Has more logs to fetch.”) will make any anomalies in log progression much easier to spot.
Based on learnings, this is in line with the importance of robust lastLogID tracking.
11aee7c to
cbfe1c3
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Disabled knowledge base sources:
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
⛔ Files ignored due to path filters (3)
internal/storage/bucket/migrations/41-add-missing-index/notes.yamlis excluded by!**/*.yamlinternal/storage/bucket/migrations/42-remove-useless-trigger/notes.yamlis excluded by!**/*.yamlinternal/storage/bucket/migrations/43-fix-missing-inserted-at-in-log-data/notes.yamlis excluded by!**/*.yaml
📒 Files selected for processing (7)
internal/replication/drivers/clickhouse/driver.go(1 hunks)internal/replication/manager.go(1 hunks)internal/replication/pipeline.go(3 hunks)internal/storage/bucket/migrations/41-add-missing-index/up.sql(1 hunks)internal/storage/bucket/migrations/43-fix-missing-inserted-at-in-log-data/up.sql(1 hunks)tools/provisioner/Dockerfile(2 hunks)tools/provisioner/justfile(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (4)
- internal/replication/manager.go
- tools/provisioner/Dockerfile
- internal/replication/drivers/clickhouse/driver.go
- internal/storage/bucket/migrations/43-fix-missing-inserted-at-in-log-data/up.sql
🧰 Additional context used
🧠 Learnings (1)
📓 Common learnings
Learnt from: gfyrag
Repo: formancehq/ledger PR: 892
File: internal/controller/ledger/controller_default.go:196-196
Timestamp: 2025-04-29T11:24:28.923Z
Learning: In the ledger Import function, it's critical to maintain proper log ID tracking by updating lastLogID with the current log.ID after each processed log, rather than setting it to nil. This ensures the system can properly validate the ordering of logs and prevent duplicate or out-of-order processing, which is essential for maintaining data integrity in the ledger.
🔇 Additional comments (4)
tools/provisioner/justfile (1)
5-8: Registry parameterization looks good and remains backwards compatibleMaking
registrya parameter with default'ghcr.io'cleanly preserves existing behavior while allowing overrides, and the updated tag construction is consistent with that. No issues from this change alone.internal/storage/bucket/migrations/41-add-missing-index/up.sql (1)
1-1: LGTM!The migration correctly uses conditional
CONCURRENTLY(only valid outside transactions) and the schema templating is properly applied. This index will help with log pagination queries as intended per the PR objectives.internal/replication/pipeline.go (2)
111-144: Well-structured async export with graceful cancellation.The pattern correctly uses a buffered channel (size 1) to prevent goroutine leaks, and the cancellable context enables responsive shutdown when
stopChannelfires. The retry loop cleanly separates success from error handling.
66-73: Good centralization of termination logic.The
stophelper consolidates logging and channel closure, improving consistency and reducing duplication across all exit points.
|
|
||
| func (p *PipelineHandler) Shutdown(ctx context.Context) error { | ||
| p.logger.Infof("shutdowning pipeline") | ||
| p.logger.Infof("Shutdowning pipeline") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Typo: "Shutdowning" → "Shutting down"
- p.logger.Infof("Shutdowning pipeline")
+ p.logger.Infof("Shutting down pipeline")📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| p.logger.Infof("Shutdowning pipeline") | |
| p.logger.Infof("Shutting down pipeline") |
🤖 Prompt for AI Agents
internal/replication/pipeline.go around line 167: the log message contains a
typo "Shutdowning pipeline"; update the string to "Shutting down pipeline"
(preserving the logger call and formatting) so the log reads: Shutting down
pipeline.
Also fixes stop issue of exporters.
cbfe1c3 to
5ea84b7
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
♻️ Duplicate comments (1)
internal/replication/pipeline.go (1)
167-167: Typo: "Shutdowning" → "Shutting down"This typo was already flagged in a previous review. Please update the log message to: "Shutting down pipeline"
🧹 Nitpick comments (1)
tools/provisioner/Dockerfile (1)
10-16: Standardize COPY path syntax for consistency.The provisioner build stage mixes relative paths (
COPY go.* .at line 11) and absolute paths (COPY cmd /src/tools/provisioner/cmdat lines 14–15). Since the WORKDIR is already set to/src/tools/provisioner, use consistent relative paths throughout.Apply this diff to use consistent relative paths:
WORKDIR /src/tools/provisioner COPY go.* . RUN --mount=type=cache,target=$GOPATH go mod download COPY main.go . -COPY cmd /src/tools/provisioner/cmd -COPY pkg /src/tools/provisioner/pkg +COPY cmd ./cmd +COPY pkg ./pkg RUN --mount=type=cache,target=$GOPATH go build -o provisioner
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Disabled knowledge base sources:
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
⛔ Files ignored due to path filters (2)
internal/storage/bucket/migrations/42-add-missing-index/notes.yamlis excluded by!**/*.yamlinternal/storage/bucket/migrations/43-fix-missing-inserted-at-in-log-data/notes.yamlis excluded by!**/*.yaml
📒 Files selected for processing (7)
internal/replication/drivers/clickhouse/driver.go(1 hunks)internal/replication/manager.go(1 hunks)internal/replication/pipeline.go(3 hunks)internal/storage/bucket/migrations/42-add-missing-index/up.sql(1 hunks)internal/storage/bucket/migrations/43-fix-missing-inserted-at-in-log-data/up.sql(1 hunks)tools/provisioner/Dockerfile(2 hunks)tools/provisioner/justfile(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (3)
- internal/replication/manager.go
- internal/storage/bucket/migrations/43-fix-missing-inserted-at-in-log-data/up.sql
- tools/provisioner/justfile
🧰 Additional context used
🧠 Learnings (1)
📓 Common learnings
Learnt from: gfyrag
Repo: formancehq/ledger PR: 892
File: internal/controller/ledger/controller_default.go:196-196
Timestamp: 2025-04-29T11:24:28.923Z
Learning: In the ledger Import function, it's critical to maintain proper log ID tracking by updating lastLogID with the current log.ID after each processed log, rather than setting it to nil. This ensures the system can properly validate the ordering of logs and prevent duplicate or out-of-order processing, which is essential for maintaining data integrity in the ledger.
🧬 Code graph analysis (1)
internal/replication/pipeline.go (1)
deployments/pulumi/pkg/config/config.go (1)
Duration(265-265)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
- GitHub Check: Dirty
- GitHub Check: Tests
🔇 Additional comments (8)
internal/storage/bucket/migrations/42-add-missing-index/up.sql (1)
1-1: Clarify migration purpose and verify index necessity.The filename and content suggest this migration adds an index on
logs.id, but the PR objectives indicate migration #42 should remove a useless trigger. This mismatch needs clarification.Additionally, assuming
idis a primary key (typical for this column), a unique index already exists on it. Creating a second non-unique index on the same column may be redundant unless it serves a specific query optimization purpose distinct from the PK index.Consider adding
IF NOT EXISTSto avoid failures on re-runs:-create index {{ if not .Transactional }}concurrently{{end}} logs_ids on "{{.Schema}}".logs (id); +create index if not exists {{ if not .Transactional }}concurrently{{end}} logs_ids on "{{.Schema}}".logs (id);tools/provisioner/Dockerfile (1)
4-8: I'll help you verify whether the "root" build stage exists in the Dockerfile. Let me search for it.
<function_calls>
#!/bin/bashVerify the "root" build stage exists in the complete Dockerfile
grep -n "^FROM.*AS root|^FROM.*as root" tools/provisioner/Dockerfile
</function_calls>Now let me also examine the full Dockerfile to understand its complete structure:
<function_calls>
#!/bin/bashRead the complete Dockerfile to understand its structure
cat -n tools/provisioner/Dockerfile
</function_calls>Please run these scripts to gather the verification results.
internal/replication/drivers/clickhouse/driver.go (1)
47-48: LGTM! Helpful diagnostic addition.The debug log provides visibility into batch sizes being processed, which aligns with the PR's goal of improving observability in the replication pipeline.
internal/replication/pipeline.go (5)
67-67: LGTM! Good refactoring for termination handling.The addition of startup logging and the
stophelper function centralizes termination logic and improves observability. Clean implementation.Also applies to: 70-73, 78-78
81-81: LGTM! Clear fetch diagnostics.These debug logs provide good visibility into the fetch cycle and batch sizes, making it easier to diagnose pipeline behavior.
Also applies to: 105-105
147-147: LGTM! Critical tracking made visible.This debug log provides visibility into lastLogID updates, which is essential for maintaining proper log ordering and preventing duplicate or out-of-order processing. Based on learnings, proper lastLogID tracking is critical for data integrity in the ledger.
159-159: LGTM! Useful pagination diagnostic.This log helps distinguish between "waiting for new logs" vs "fetching next page immediately," making it easier to understand pipeline pacing.
112-141: Context cancellation is properly handled; infinite retry without error classification remains a valid concern.The code review's concerns are substantiated:
Context cancellation handling: ✓ Verified as sound. All driver implementations (ClickHouse, Elasticsearch, HTTP) properly accept and pass the context to their underlying libraries (PrepareBatch, bulk.Do, http.Client), which handle cancellation correctly. The exportContext with cancellation at line 115 is properly used and cleaned up.
Infinite retry loop: ✓ Confirmed. The
forloop at line 111 will retry indefinitely on any error from the exporter—there is no max retry count, no error classification, and no circuit breaker. The only exits are: success (line 142 break), receiving a stop signal (lines 137-140), or outer context cancellation. This means permanent errors (invalid credentials, schema mismatches, etc.) will cause unbounded retry loops with the configured delay.The design appears intentional—retry until explicitly stopped—but without distinguishing transient from permanent errors, permanent failures will waste resources with repeated failed attempts.
| COPY pkg /src/tools/provisioner/pkg | ||
| RUN --mount=type=cache,target=$GOPATH go build -o provisioner | ||
|
|
||
| # syntax=docker/dockerfile:1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove the duplicate syntax directive.
The # syntax=docker/dockerfile:1 directive must appear only at the very beginning of the Dockerfile (before any other lines, comments, or directives). The duplicate at line 18 is a syntax error. The directive at line 1 is correctly placed.
Apply this diff to remove the duplicate:
-# syntax=docker/dockerfile:1
FROM alpine:3.21📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| # syntax=docker/dockerfile:1 | |
| FROM alpine:3.21 |
🤖 Prompt for AI Agents
In tools/provisioner/Dockerfile around line 18, there's a duplicate docker
syntax directive ("# syntax=docker/dockerfile:1") which must only appear at the
very top of the file; remove the directive on line 18 so only the existing one
at line 1 remains, ensuring no other comments or lines precede the top
directive.
| errChan <- err | ||
| }() | ||
| select { | ||
| case err := <-errChan: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we wait also for the context ?
Summary by cubic
Fixes missing insertedAt in ledger logs by backfilling existing records and adding an index to speed up log pagination. Also improves replication stability.
Bug Fixes
Migration
Written for commit 5ea84b7. Summary will update automatically on new commits.