-
Notifications
You must be signed in to change notification settings - Fork 134
fix: missing inserted at in logs #1168
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: release/v2.3
Are you sure you want to change the base?
Conversation
Also fixes stop issue of exporters.
WalkthroughThis pull request enhances the replication system with improved debug logging and error handling, adds two database migrations for log indexing and timestamp population, and refactors the provisioner Docker build configuration with multi-stage improvements. Changes
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Areas requiring extra attention:
Poem
Pre-merge checks and finishing touches✅ Passed checks (3 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
tools/provisioner/Dockerfile (1)
4-8: Invalid Docker COPY syntax:--from=rootreferences undefined stage.The
COPY --from=rootdirectives reference a stage namedroot, but no such stage is defined in this Dockerfile. Standard Docker syntax requires either:
- Copying from the build context directly:
COPY pkg pkg(without--from=)- Copying from a named stage:
COPY --from=stage_name source destThe
--from=rootsyntax is not valid and will cause the build to fail.Apply this diff to copy from the build context instead:
FROM golang:1.24-alpine AS compiler WORKDIR /src -COPY --from=root pkg pkg -COPY --from=root internal internal -COPY --from=root cmd cmd -COPY --from=root go.* . -COPY --from=root *.go . +COPY pkg pkg +COPY internal internal +COPY cmd cmd +COPY go.* . +COPY *.go .
🧹 Nitpick comments (1)
internal/storage/bucket/migrations/43-fix-missing-inserted-at-in-log-data/up.sql (1)
30-30: Use standard PL/pgSQL assignment operator:=instead of=.Line 30 uses
=for assignment, which is non-standard in PL/pgSQL contexts. Replace with:=to follow PostgreSQL conventions and ensure portability across versions.- _offset = _offset + _batch_size; + _offset := _offset + _batch_size;
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Disabled knowledge base sources:
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
⛔ Files ignored due to path filters (2)
internal/storage/bucket/migrations/42-add-missing-index/notes.yamlis excluded by!**/*.yamlinternal/storage/bucket/migrations/43-fix-missing-inserted-at-in-log-data/notes.yamlis excluded by!**/*.yaml
📒 Files selected for processing (7)
internal/replication/drivers/clickhouse/driver.go(1 hunks)internal/replication/manager.go(1 hunks)internal/replication/pipeline.go(3 hunks)internal/storage/bucket/migrations/42-add-missing-index/up.sql(1 hunks)internal/storage/bucket/migrations/43-fix-missing-inserted-at-in-log-data/up.sql(1 hunks)tools/provisioner/Dockerfile(2 hunks)tools/provisioner/justfile(1 hunks)
🧰 Additional context used
🧠 Learnings (3)
📓 Common learnings
Learnt from: gfyrag
Repo: formancehq/ledger PR: 892
File: internal/controller/ledger/controller_default.go:196-196
Timestamp: 2025-04-29T11:24:28.923Z
Learning: In the ledger Import function, it's critical to maintain proper log ID tracking by updating lastLogID with the current log.ID after each processed log, rather than setting it to nil. This ensures the system can properly validate the ordering of logs and prevent duplicate or out-of-order processing, which is essential for maintaining data integrity in the ledger.
📚 Learning: 2025-11-28T10:06:01.363Z
Learnt from: gfyrag
Repo: formancehq/ledger PR: 1167
File: internal/storage/bucket/migrations/43-fix-missing-inserted-at-in-log-data/up.sql:23-23
Timestamp: 2025-11-28T10:06:01.363Z
Learning: In PostgreSQL migrations for the Formance ledger, to ensure ISO 8601/RFC3339 date formatting when setting JSONB fields, use the pattern `to_jsonb(date)#>>'{}'` to extract the timestamp as a properly formatted string. This leverages JSONB's built-in ISO 8601 encoding. Using `date::text` would depend on the server's DateStyle setting and doesn't guarantee ISO 8601 format.
Applied to files:
internal/storage/bucket/migrations/43-fix-missing-inserted-at-in-log-data/up.sql
📚 Learning: 2025-07-19T18:35:34.260Z
Learnt from: gfyrag
Repo: formancehq/ledger PR: 1017
File: internal/storage/bucket/migrations/27-fix-invalid-pcv/up.sql:30-31
Timestamp: 2025-07-19T18:35:34.260Z
Learning: In Formance ledger migrations, SHARE ROW EXCLUSIVE locks on migration-specific temporary tables like moves_view are acceptable since these tables are dedicated for the migration. ACCESS SHARE locks on transactions table don't block writes, only schema changes.
Applied to files:
internal/storage/bucket/migrations/43-fix-missing-inserted-at-in-log-data/up.sql
🧬 Code graph analysis (2)
internal/replication/manager.go (1)
internal/errors.go (1)
ErrAlreadyStarted(75-75)
internal/replication/pipeline.go (1)
deployments/pulumi/pkg/config/config.go (1)
Duration(262-262)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
- GitHub Check: cubic · AI code reviewer
- GitHub Check: Dirty
- GitHub Check: Tests
🔇 Additional comments (9)
tools/provisioner/justfile (1)
5-9: Parameterized registry inpush-imagelooks good and stays backward compatibleExposing
registryas a parameter with defaultghcr.ioand using it in the tag keeps existing behavior while allowing overrides. No issues spotted withjustsyntax or thedocker buildxinvocation.tools/provisioner/Dockerfile (1)
22-23: Clarify the intent of the default CMD.Setting
CMD ["--help"]makes the container display help text when run without arguments. This is unusual—most provisioning tools default to performing their primary function. Verify this is intentional or consider using an empty CMD or startup script instead.internal/replication/drivers/clickhouse/driver.go (1)
42-45: Debug batch-size logging inAcceptlooks goodThe new debug line cleanly surfaces batch size without altering control flow or behavior and reuses the existing logger consistently with the rest of the driver.
internal/storage/bucket/migrations/42-add-missing-index/up.sql (1)
1-1: Verify that this index creation is necessary and follows naming conventions.If
idis already a primary key, it will have an implicit index. Confirm whether this explicit index is needed for performance reasons (e.g., to support the batch migration in migration 43).The index name
logs_idsis unconventional; consider renaming tologs_id_idxto follow PostgreSQL naming conventions.internal/storage/bucket/migrations/43-fix-missing-inserted-at-in-log-data/up.sql (1)
23-26: I'll verify the critical issue identified in the review comment by examining the full migration file to confirm whether thedatereference is truly undefined.
<function_calls>
find . -type f -name "up.sql" | grep "43-fix-missing-inserted-at-in-log-data" | head -5
</function_calls>Now let me read the full migration file:
<function_calls>
cat -n internal/storage/bucket/migrations/43-fix-missing-inserted-at-in-log-data/up.sql
</function_calls>internal/replication/pipeline.go (3)
66-72: Good consolidation of stop logic.The
stophelper ensures consistent logging and channel cleanup across all shutdown paths. This reduces duplication and makes the shutdown behavior easier to maintain.
80-80: Debug logging additions improve observability.The added debug statements provide useful visibility into the pipeline's fetch cycle, batch sizes, and progress tracking without cluttering logs at higher levels.
111-142: Cancellation handling for exporter push is correctly implemented.The exporter drivers properly respect context cancellation. The HTTP driver uses
req.WithContext(ctx)to ensure context cancellation propagates to the underlying HTTP request, and simpler drivers like noop return immediately. The goroutine spawned in the pipeline correctly receives the cancel signal through the cancellableexportContext, preventing goroutine leaks. The buffered error channel correctly prevents blocking if the stop signal arrives before the export completes.internal/replication/manager.go (1)
218-224: Error comparison pattern correctly ignores pipeline ID values.The
ErrAlreadyStarted.Is()method (internal/errors.go:81-83) performs type-based matching via type assertion (err.(ErrAlreadyStarted)), not value comparison. This meanserrors.Is(err, ledger.ErrAlreadyStarted(""))will correctly match anyErrAlreadyStartederror regardless of the pipeline ID value it contains.The graceful handling—logging and skipping rather than returning error—is appropriate for a periodic synchronization operation. The pattern aligns with similar error handling elsewhere in the file.
|
|
||
| func (p *PipelineHandler) Shutdown(ctx context.Context) error { | ||
| p.logger.Infof("shutdowning pipeline") | ||
| p.logger.Infof("Shutdowning pipeline") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Typo in log message: "Shutdowning" should be "Shutting down".
- p.logger.Infof("Shutdowning pipeline")
+ p.logger.Infof("Shutting down pipeline")📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| p.logger.Infof("Shutdowning pipeline") | |
| p.logger.Infof("Shutting down pipeline") |
🤖 Prompt for AI Agents
In internal/replication/pipeline.go around line 166, the log message contains a
typo: replace the string "Shutdowning pipeline" with the corrected phrasing
"Shutting down pipeline" (preserve existing logger call and formatting).
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## release/v2.3 #1168 +/- ##
================================================
- Coverage 82.03% 81.94% -0.10%
================================================
Files 187 187
Lines 8989 9016 +27
================================================
+ Hits 7374 7388 +14
- Misses 1188 1199 +11
- Partials 427 429 +2 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
1 issue found across 9 files
Prompt for AI agents (all 1 issues)
Check if these issues are valid — if so, understand the root cause of each and fix them.
<file name="internal/replication/pipeline.go">
<violation number="1" location="internal/replication/pipeline.go:96">
Context cancellation is no longer observed while retrying after a fetch error, so Run spins forever on a cancelled context instead of exiting.</violation>
</file>
Reply to cubic to teach it or ask questions. Re-run a review with @cubic-dev-ai review this PR
| p.logger.Errorf("Error fetching logs: %s", err) | ||
| select { | ||
| case <-ctx.Done(): | ||
| case ch := <-p.stopChannel: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Context cancellation is no longer observed while retrying after a fetch error, so Run spins forever on a cancelled context instead of exiting.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At internal/replication/pipeline.go, line 96:
<comment>Context cancellation is no longer observed while retrying after a fetch error, so Run spins forever on a cancelled context instead of exiting.</comment>
<file context>
@@ -85,38 +93,57 @@ func (p *PipelineHandler) Run(ctx context.Context, ingestedLogs chan uint64) {
p.logger.Errorf("Error fetching logs: %s", err)
select {
- case <-ctx.Done():
+ case ch := <-p.stopChannel:
+ stop(ch)
return
</file context>
| p.logger.Errorf("Error fetching logs: %s", err) | ||
| select { | ||
| case <-ctx.Done(): | ||
| case ch := <-p.stopChannel: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The bot is right here, do you want to wait forever here ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, the pipeline has a Stop(ctx) method.
The cancellation should not be done using the context, but the dedicated method.
| errChan <- err | ||
| }() | ||
| select { | ||
| case err := <-errChan: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here, do you want to select also on the context ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same response ^^
Also fixes stop issue of exporters.
Summary by cubic
Fixes missing transaction.insertedAt in log records and improves exporter stop handling to prevent stuck replication. Also adds an index on logs.id for faster fetches.
Bug Fixes
Migration
Written for commit e6ada90. Summary will update automatically on new commits.