-
Notifications
You must be signed in to change notification settings - Fork 131
feat: add update of exporters config #1110
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
WalkthroughAdds an "Update exporter" capability end-to-end: new HTTP v2 endpoint, gRPC RPC, controller and manager logic, storage update, client SDK and docs, mocks, unit and e2e tests, and related runtime/locking and driver lifecycle adjustments to apply exporter configuration changes at runtime. Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant HTTP as "HTTP V2 API"
participant Ctrl as "System Controller"
participant Mgr as "Replication Manager"
participant Store as "Storage"
participant Driver as "Exporter Driver"
Client->>HTTP: PUT /v2/_/exporters/{id} (V2ExporterConfiguration)
HTTP->>HTTP: parse ID & body
HTTP->>Ctrl: UpdateExporter(ctx, id, config)
Ctrl->>Mgr: UpdateExporter(ctx, id, config)
rect rgb(235,245,255)
Note over Mgr: validate config, stop exporter/pipelines, persist update, resynchronize
Mgr->>Driver: stopExporter(id)
Mgr->>Store: UpdateExporter(exporter)
Store-->>Mgr: ok / error
Mgr->>Mgr: synchronizePipelines()
end
alt success
Mgr-->>Ctrl: ok
Ctrl-->>HTTP: ok
HTTP-->>Client: 204 No Content
else validation error
Mgr-->>HTTP: ErrInvalidDriverConfiguration
HTTP-->>Client: 400 Bad Request
else not found
Store-->>Mgr: ErrExporterNotFound
HTTP-->>Client: 404 Not Found
else other error
HTTP-->>Client: 500 Internal Server Error
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes
Poem
Pre-merge checks and finishing touches❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
📜 Recent review detailsConfiguration used: Path: .coderabbit.yaml Review profile: CHILL Plan: Pro Disabled knowledge base sources:
📒 Files selected for processing (1)
🚧 Files skipped from review as they are similar to previous changes (1)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
Comment |
Also, fix a blocking stop of the app when the exporter config is erroned.
61e7c32 to
ca7f5c9
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 5
🧹 Nitpick comments (1)
docs/api/README.md (1)
2409-2409: V2UpdateExporterRequest schema defined but not referenced.Line 2409 references
V2ExporterConfigurationas the body parameter, but lines 5542-5559 define a separateV2UpdateExporterRequestschema that is never used. Either referenceV2UpdateExporterRequestat line 2409, or remove the unused schema definition if both Create and Update endpoints intentionally use the same schema.Also applies to: 5542-5559
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Disabled knowledge base sources:
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
⛔ Files ignored due to path filters (7)
internal/replication/grpc/replication_service.pb.gois excluded by!**/*.pb.go,!**/*.pb.gointernal/replication/grpc/replication_service_grpc.pb.gois excluded by!**/*.pb.go,!**/*.pb.goopenapi.yamlis excluded by!**/*.yamlopenapi/v2.yamlis excluded by!**/*.yamlpkg/client/.speakeasy/gen.lockis excluded by!**/*.lock,!**/*.lockpkg/client/.speakeasy/logs/naming.logis excluded by!**/*.log,!**/*.logpkg/client/speakeasyusagegen/.speakeasy/logs/naming.logis excluded by!**/*.log,!**/*.log
📒 Files selected for processing (29)
docs/api/README.md(2 hunks)internal/api/common/mocks_system_controller_test.go(2 hunks)internal/api/v1/mocks_system_controller_test.go(2 hunks)internal/api/v2/controllers_exporters_update.go(1 hunks)internal/api/v2/controllers_exporters_update_test.go(1 hunks)internal/api/v2/mocks_system_controller_test.go(2 hunks)internal/api/v2/routes.go(2 hunks)internal/controller/system/controller.go(2 hunks)internal/replication/controller_grpc_client.go(1 hunks)internal/replication/controller_grpc_server.go(1 hunks)internal/replication/driver_facade.go(2 hunks)internal/replication/drivers/registry.go(2 hunks)internal/replication/exporters.go(1 hunks)internal/replication/exporters_generated.go(1 hunks)internal/replication/grpc/replication_service.proto(2 hunks)internal/replication/manager.go(9 hunks)internal/replication/manager_test.go(5 hunks)internal/replication/store.go(2 hunks)internal/replication/store_generated_test.go(1 hunks)internal/storage/common/errors.go(0 hunks)internal/storage/system/store.go(2 hunks)pkg/client/README.md(1 hunks)pkg/client/docs/models/operations/v2updateexporterrequest.md(1 hunks)pkg/client/docs/models/operations/v2updateexporterresponse.md(1 hunks)pkg/client/docs/sdks/v2/README.md(2 hunks)pkg/client/models/operations/v2updateexporter.go(1 hunks)pkg/client/v2.go(1 hunks)pkg/testserver/server.go(1 hunks)test/e2e/api_exporters_update_test.go(1 hunks)
💤 Files with no reviewable changes (1)
- internal/storage/common/errors.go
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-04-29T11:24:28.923Z
Learnt from: gfyrag
Repo: formancehq/ledger PR: 892
File: internal/controller/ledger/controller_default.go:196-196
Timestamp: 2025-04-29T11:24:28.923Z
Learning: In the ledger Import function, it's critical to maintain proper log ID tracking by updating lastLogID with the current log.ID after each processed log, rather than setting it to nil. This ensures the system can properly validate the ordering of logs and prevent duplicate or out-of-order processing, which is essential for maintaining data integrity in the ledger.
Applied to files:
internal/replication/manager.go
🧬 Code graph analysis (17)
internal/replication/drivers/registry.go (1)
internal/controller/ledger/errors.go (1)
ErrNotFound(14-14)
pkg/testserver/server.go (2)
deployments/pulumi/pkg/config/config.go (1)
Duration(258-258)cmd/worker.go (1)
WorkerPipelinesSyncPeriod(25-25)
internal/api/v2/controllers_exporters_update_test.go (2)
internal/controller/system/errors.go (2)
NewErrInvalidDriverConfiguration(69-74)NewErrExporterNotFound(47-49)internal/api/v2/routes.go (2)
NewRouter(21-122)WithExporters(158-162)
internal/replication/controller_grpc_server.go (2)
internal/replication/grpc/replication_service.pb.go (9)
UpdateExporterRequest(549-555)UpdateExporterRequest(568-568)UpdateExporterRequest(583-585)UpdateExporterResponse(601-605)UpdateExporterResponse(618-618)UpdateExporterResponse(633-635)ExporterConfiguration(409-415)ExporterConfiguration(428-428)ExporterConfiguration(443-445)internal/controller/system/errors.go (2)
ErrInvalidDriverConfiguration(51-54)ErrExporterNotFound(36-36)
internal/controller/system/controller.go (1)
internal/replication/grpc/replication_service.pb.go (3)
ExporterConfiguration(409-415)ExporterConfiguration(428-428)ExporterConfiguration(443-445)
internal/api/common/mocks_system_controller_test.go (2)
internal/api/v1/mocks_system_controller_test.go (4)
MockReplicationBackend(23-27)MockReplicationBackendMockRecorder(30-32)SystemController(221-225)SystemControllerMockRecorder(228-230)internal/replication/grpc/replication_service.pb.go (3)
ExporterConfiguration(409-415)ExporterConfiguration(428-428)ExporterConfiguration(443-445)
internal/api/v2/mocks_system_controller_test.go (2)
internal/api/common/mocks_system_controller_test.go (4)
MockReplicationBackend(23-27)MockReplicationBackendMockRecorder(30-32)SystemController(221-225)SystemControllerMockRecorder(228-230)internal/replication/grpc/replication_service.pb.go (3)
ExporterConfiguration(409-415)ExporterConfiguration(428-428)ExporterConfiguration(443-445)
test/e2e/api_exporters_update_test.go (8)
test/e2e/suite_test.go (1)
UseTemplatedDatabase(202-204)pkg/testserver/server.go (6)
ExperimentalFeaturesInstrumentation(30-35)ExperimentalExportersInstrumentation(37-42)ExperimentalEnableWorker(44-49)ExperimentalPipelinesPullIntervalInstrumentation(79-84)ExperimentalPipelinesPushRetryPeriodInstrumentation(72-77)ExperimentalPipelinesSyncPeriodInstrumentation(86-91)pkg/client/models/operations/v2updateexporter.go (1)
V2UpdateExporterRequest(9-13)pkg/client/models/components/v2errorsenum.go (2)
V2ErrorsEnumValidation(15-15)V2ErrorsEnumNotFound(19-19)pkg/client/models/operations/v2createledger.go (1)
V2CreateLedgerRequest(9-13)pkg/client/models/operations/v2createtransaction.go (1)
V2CreateTransactionRequest(9-25)pkg/client/models/components/v2posttransaction.go (1)
V2PostTransaction(58-68)pkg/client/models/components/v2posting.go (1)
V2Posting(10-15)
internal/storage/system/store.go (1)
internal/replication/grpc/replication_service.pb.go (3)
Exporter(181-188)Exporter(201-201)Exporter(216-218)
internal/replication/store.go (1)
internal/storage/system/store.go (1)
DefaultStore(41-44)
internal/replication/manager_test.go (5)
internal/replication/manager.go (2)
NewManager(475-498)WithSyncPeriod(508-512)internal/replication/store_generated_test.go (2)
NewMockStorage(72-76)NewMockLogFetcher(33-37)internal/replication/exporters_generated.go (1)
NewMockConfigValidator(32-36)internal/storage/bucket/bucket_generated_test.go (1)
NewMockFactory(165-169)internal/storage/common/pagination.go (2)
InitialPaginatedQuery(9-14)PaginatedQuery(25-28)
internal/api/v2/controllers_exporters_update.go (2)
internal/replication/grpc/replication_service.pb.go (3)
ExporterConfiguration(409-415)ExporterConfiguration(428-428)ExporterConfiguration(443-445)internal/controller/system/errors.go (2)
ErrInvalidDriverConfiguration(51-54)ErrExporterNotFound(36-36)
internal/replication/controller_grpc_client.go (2)
internal/replication/grpc/replication_service.pb.go (6)
ExporterConfiguration(409-415)ExporterConfiguration(428-428)ExporterConfiguration(443-445)UpdateExporterRequest(549-555)UpdateExporterRequest(568-568)UpdateExporterRequest(583-585)internal/controller/system/errors.go (2)
NewErrInvalidDriverConfiguration(69-74)NewErrExporterNotFound(47-49)
internal/replication/manager.go (2)
internal/controller/ledger/errors.go (1)
ErrNotFound(14-14)internal/controller/system/errors.go (2)
NewErrInvalidDriverConfiguration(69-74)NewErrExporterNotFound(47-49)
internal/replication/store_generated_test.go (1)
internal/replication/grpc/replication_service.pb.go (3)
Exporter(181-188)Exporter(201-201)Exporter(216-218)
internal/api/v1/mocks_system_controller_test.go (2)
internal/api/common/mocks_system_controller_test.go (4)
MockReplicationBackend(23-27)MockReplicationBackendMockRecorder(30-32)SystemController(221-225)SystemControllerMockRecorder(228-230)internal/replication/grpc/replication_service.pb.go (3)
ExporterConfiguration(409-415)ExporterConfiguration(428-428)ExporterConfiguration(443-445)
pkg/client/v2.go (10)
pkg/client/models/operations/v2updateexporter.go (2)
V2UpdateExporterRequest(9-13)V2UpdateExporterResponse(29-31)pkg/client/models/operations/options.go (4)
Option(41-41)Options(32-39)SupportedOptionRetries(15-15)SupportedOptionTimeout(16-16)pkg/client/internal/utils/utils.go (4)
ReplaceParameters(51-60)MatchStatusCodes(71-92)ConsumeRawBody(350-360)UnmarshalJsonFromResponseBody(39-49)pkg/client/internal/utils/pathparams.go (1)
GenerateURL(19-38)pkg/client/internal/hooks/hooks.go (4)
HookContext(27-35)BeforeRequestContext(37-39)AfterErrorContext(45-47)AfterSuccessContext(41-43)pkg/client/internal/config/sdkconfiguration.go (1)
SDKConfiguration(16-25)pkg/client/internal/utils/requestbody.go (1)
SerializeRequestBody(31-42)pkg/client/internal/utils/security.go (1)
PopulateSecurity(27-92)pkg/client/internal/utils/retries.go (3)
RetryConfig(23-23)Retries(25-28)Retry(30-104)pkg/client/models/components/v2errorresponse.go (1)
V2ErrorResponse(5-9)
🪛 GitHub Actions: Default
test/e2e/api_exporters_update_test.go
[error] 306-306: Exporters update API tests failed: race detected during execution with running pipelines; timeline shows repeated pipeline start/synchronization errors.
internal/replication/manager_test.go
[error] 260-341: Race detected during execution of TestManagerStop (and related tests) in replication package.
internal/replication/manager.go
[error] 301-301: Data race detected: Stop() and Run() goroutines racing in Manager.Stop/Manager.Run during tests.
🪛 markdownlint-cli2 (0.18.1)
pkg/client/docs/sdks/v2/README.md
1835-1835: Hard tabs
Column: 1
(MD010, no-hard-tabs)
1836-1836: Hard tabs
Column: 1
(MD010, no-hard-tabs)
1837-1837: Hard tabs
Column: 1
(MD010, no-hard-tabs)
1838-1838: Hard tabs
Column: 1
(MD010, no-hard-tabs)
1839-1839: Hard tabs
Column: 1
(MD010, no-hard-tabs)
1840-1840: Hard tabs
Column: 1
(MD010, no-hard-tabs)
docs/api/README.md
2413-2413: Blank line inside blockquote
(MD028, no-blanks-blockquote)
5557-5557: Multiple headings with the same content
(MD024, no-duplicate-heading)
5559-5559: Emphasis used instead of a heading
(MD036, no-emphasis-as-heading)
🔇 Additional comments (28)
internal/replication/exporters_generated.go (1)
6-6: Generated file correctly updated with -typed flag.The mockgen command has been properly updated to include the
-typedflag, which improves type safety and enables better IDE support for mock assertions. The mock implementation forConfigValidatoris correct and follows standard mockgen patterns.internal/replication/drivers/registry.go (2)
7-7: LGTM: Import aligns with error standardization.The addition of the postgres package import supports the error handling refactoring at line 71, standardizing on
postgres.ErrNotFoundas shown in the relevant code snippet frominternal/controller/ledger/errors.go.
71-71: LGTM: Error comparison correctly standardized.The change from
common.ErrNotFoundtopostgres.ErrNotFoundis correct and aligns with the codebase standard. This ensures proper error handling whenGetExporterreturns a not-found error.pkg/client/README.md (1)
158-158: LGTM!The documentation correctly lists the new UpdateExporter operation in the available operations section, consistent with the other exporter-related operations.
pkg/client/docs/sdks/v2/README.md (2)
36-36: LGTM!The UpdateExporter operation is correctly positioned in the navigation between GetExporterState and DeleteExporter, maintaining logical ordering of exporter operations.
1825-1891: LGTM!The UpdateExporter documentation section is comprehensive and follows the established pattern used for other operations. It includes example usage, parameter descriptions, response type, and error cases.
Note: The static analysis tool flagged hard tabs in the code example, but since this is auto-generated SDK documentation (created by Speakeasy), these formatting issues should be addressed in the generator configuration rather than manually.
pkg/client/docs/models/operations/v2updateexporterresponse.md (1)
1-8: LGTM!The V2UpdateExporterResponse documentation is minimal and appropriate for an operation that returns 204 No Content on success (only HTTP metadata is relevant).
pkg/client/docs/models/operations/v2updateexporterrequest.md (1)
1-9: LGTM!The V2UpdateExporterRequest documentation clearly defines the required fields: ExporterID for identifying which exporter to update, and V2ExporterConfiguration for the new configuration.
internal/storage/system/store.go (1)
199-199: LGTM!Good consistency improvement. Wrapping the error with
postgres.ResolveErrorensures proper error classification (e.g., sql.ErrNoRows → postgres.ErrNotFound) consistent with other methods in this file.internal/replication/driver_facade.go (2)
40-40: Verify the context cancellation semantics change.The retry loop now observes
c.startContext.Done()instead ofctx.Done(). This means the loop will only exit on internal cancellation (viaStop()) rather than on the outer context cancellation. This changes when the retry loop terminates after a Start error.Was this intentional to enforce explicit lifecycle management through the
Stop()method rather than relying on context cancellation?
55-55: LGTM!The comment clarification accurately reflects the logic: if
startingChanis closed, the facade has progressed past the starting phase into the running phase.internal/replication/store.go (2)
34-34: LGTM!The Storage interface correctly extends to include UpdateExporter, maintaining consistency with other exporter-related operations.
49-51: LGTM!The storageAdapter properly delegates UpdateExporter to the underlying DefaultStore implementation, consistent with the delegation pattern used for other storage operations.
internal/replication/grpc/replication_service.proto (2)
12-12: LGTM!The UpdateExporter RPC is correctly positioned in the service definition, maintaining logical grouping with other exporter-related operations.
71-76: LGTM!The protobuf message definitions are well-structured:
- UpdateExporterRequest captures the exporter ID and new configuration
- UpdateExporterResponse is appropriately empty (aligns with 204 No Content semantics)
- Field numbering is consistent with protobuf conventions
internal/replication/store_generated_test.go (1)
244-256: LGTM! Generated mock follows standard pattern.The UpdateExporter mock method and recorder are correctly generated following the same pattern as other storage methods.
internal/api/v2/controllers_exporters_update_test.go (1)
22-108: LGTM! Comprehensive test coverage.The test covers all key scenarios: success (204), validation error (400), not found (404), and internal error (500). The test structure is clean and follows best practices with proper mocking and parallel execution.
internal/replication/controller_grpc_server.go (1)
72-89: LGTM! Error handling follows established patterns.The UpdateExporter handler correctly maps domain errors to gRPC status codes and follows the same pattern as other handlers in this file.
internal/api/v2/controllers_exporters_update.go (1)
13-33: LGTM! Clean HTTP handler implementation.The handler properly extracts parameters, decodes the request body, delegates to the controller, and maps errors to appropriate HTTP status codes. The 204 No Content response for successful updates follows REST conventions.
internal/controller/system/controller.go (1)
88-93: LGTM! Interface and implementation follow established patterns.The UpdateExporter addition to the ReplicationBackend interface and DefaultController implementation follows the same pattern as other exporter operations. The documentation correctly identifies possible errors.
test/e2e/api_exporters_update_test.go (1)
46-172: LGTM! Comprehensive test coverage for basic update scenarios.The test thoroughly covers invalid configuration, non-existent exporter ID, valid configuration updates, and driver changes. The nested context structure clearly organizes the test flow.
internal/api/common/mocks_system_controller_test.go (2)
206-218: LGTM! Mock additions follow standard patterns.The UpdateExporter mock methods for MockReplicationBackend and SystemController are correctly generated and consistent with other mock methods in the file.
477-489: LGTM! Mock recorder correctly implemented.The SystemController UpdateExporter mock follows the same pattern as other controller methods.
internal/api/v2/mocks_system_controller_test.go (1)
206-218: LGTM!Generated mock methods follow the standard gomock pattern and are consistent with other methods in the file.
Also applies to: 477-489
internal/api/v1/mocks_system_controller_test.go (1)
206-218: LGTM!Generated mock methods follow the standard gomock pattern and are consistent with other methods in the file.
Also applies to: 477-489
internal/replication/manager.go (2)
246-273: LGTM! withLock pattern improves synchronization.The
withLockhelper (lines 246-250) and its usage to wrapsynchronizePipelinescalls (lines 252-256, 269-273) ensures proper mutex protection for pipeline synchronization operations. This is cleaner than inline lock/unlock pairs.
373-401: UpdateExporter implementation is sound.The update flow is logical:
- Validate new configuration (line 377)
- Stop all pipelines using the exporter (line 381)
- Fetch exporter from storage (line 385)
- Update configuration (line 394)
- Persist to storage (line 396)
- Re-synchronize pipelines to restart with new config (line 400)
Note that the lock is held for the entire operation including DB calls. This is consistent with other methods (
DeleteExporter,CreatePipeline) but could cause contention during slow DB operations.Verify that
synchronizePipelinesat line 400 correctly restarts the stopped pipelines with the updated configuration by checking the storage and driver initialization paths.pkg/client/v2.go (1)
5863-6057: No issues found; code is correct as-is.The verification confirms that only one
UpdateExporterfunction exists in the file (line 5864) and the code compiles without any duplicate declaration errors. The AI summary's warning about duplicate declarations was incorrect. The implementation is consistent with other methods in the file and requires no changes.
| func (t ThroughGRPCBackend) UpdateExporter(ctx context.Context, id string, configuration ledger.ExporterConfiguration) error { | ||
| _, err := t.client.UpdateExporter(ctx, &grpc.UpdateExporterRequest{ | ||
| Id: id, | ||
| Config: mapExporterConfiguration(configuration), | ||
| }) | ||
| if err != nil { | ||
| switch { | ||
| case status.Code(err) == codes.InvalidArgument: | ||
| return system.NewErrInvalidDriverConfiguration(configuration.Driver, err) | ||
| case status.Code(err) == codes.NotFound: | ||
| return system.NewErrExporterNotFound(id) | ||
| default: | ||
| return err | ||
| } | ||
| } | ||
| return nil | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
Fix error handling inconsistency with CreateExporter.
The UpdateExporter error handling correctly maps codes.InvalidArgument to ErrInvalidDriverConfiguration, but CreateExporter has inverted logic at Line 33 (if status.Code(err) != codes.InvalidArgument). This inconsistency could lead to incorrect error responses.
Verify CreateExporter's error handling logic:
🏁 Script executed:
#!/bin/bash
# Search for similar error handling patterns in gRPC clients
rg -n -A3 -B3 'status\.Code.*InvalidArgument' --type=go -g 'internal/replication/*client*.go'Length of output: 1156
Fix inverted error handling logic in CreateExporter (Line 33).
The script output confirms an inconsistency: CreateExporter at Line 33 uses != (inverted logic) while UpdateExporter at Line 50 correctly uses ==. CreateExporter incorrectly returns ErrInvalidDriverConfiguration when the error code is not InvalidArgument, which is backwards. Both methods should check for == codes.InvalidArgument to return ErrInvalidDriverConfiguration consistently.
Line 33 should be changed from if status.Code(err) != codes.InvalidArgument { to if status.Code(err) == codes.InvalidArgument {.
🤖 Prompt for AI Agents
internal/replication/controller_grpc_client.go around lines 43 to 59:
CreateExporter (around line 33) currently uses inverted logic for
InvalidArgument; update it to mirror UpdateExporter by checking status.Code(err)
== codes.InvalidArgument and returning
system.NewErrInvalidDriverConfiguration(configuration.Driver, err) in that case;
also ensure NotFound returns system.NewErrExporterNotFound(id) and any other
error is returned unchanged so CreateExporter and UpdateExporter use consistent
error handling.
| t.Parallel() | ||
|
|
||
| t.Run("nominal", func(t *testing.T) { | ||
| ctx := logging.TestingContext() | ||
| ctrl := gomock.NewController(t) | ||
| storage := NewMockStorage(ctrl) | ||
| logFetcher := NewMockLogFetcher(ctrl) | ||
| exporterConfigValidator := NewMockConfigValidator(ctrl) | ||
| driverFactory := drivers.NewMockFactory(ctrl) | ||
| driver := drivers.NewMockDriver(ctrl) | ||
|
|
||
| pipelineConfiguration := ledger.NewPipelineConfiguration("module1", "exporter") | ||
| pipeline := ledger.NewPipeline(pipelineConfiguration) | ||
|
|
||
| driverFactory.EXPECT(). | ||
| Create(gomock.Any(), pipelineConfiguration.ExporterID). | ||
| AnyTimes(). | ||
| Return(driver, nil, nil) | ||
| driver.EXPECT(). | ||
| Start(gomock.Any()). | ||
| AnyTimes(). | ||
| Return(nil) | ||
|
|
||
| log := ledger.NewLog(ledger.CreatedTransaction{ | ||
| Transaction: ledger.NewTransaction(), | ||
| }) | ||
| log.ID = pointer.For(uint64(1)) | ||
|
|
||
| logFetcher.EXPECT(). | ||
| ListLogs(gomock.Any(), common.InitialPaginatedQuery[any]{ | ||
| PageSize: 100, | ||
| Column: "id", | ||
| Options: common.ResourceQuery[any]{}, | ||
| Order: pointer.For(bunpaginate.Order(bunpaginate.OrderAsc)), | ||
| }). | ||
| AnyTimes(). | ||
| DoAndReturn(func(ctx context.Context, paginatedQuery common.PaginatedQuery[any]) (*bunpaginate.Cursor[ledger.Log], error) { | ||
| return &bunpaginate.Cursor[ledger.Log]{}, nil | ||
| }) | ||
|
|
||
| storage.EXPECT(). | ||
| ListEnabledPipelines(gomock.Any()). | ||
| AnyTimes(). | ||
| Return([]ledger.Pipeline{pipeline}, nil) | ||
|
|
||
| storage.EXPECT(). | ||
| GetPipeline(gomock.Any(), pipeline.ID). | ||
| AnyTimes(). | ||
| Return(&pipeline, nil) | ||
|
|
||
| storage.EXPECT(). | ||
| OpenLedger(gomock.Any(), pipelineConfiguration.Ledger). | ||
| AnyTimes(). | ||
| Return(logFetcher, &ledger.Ledger{}, nil) | ||
|
|
||
| manager := startManager( | ||
| t, | ||
| ctx, | ||
| storage, | ||
| driverFactory, | ||
| exporterConfigValidator, | ||
| ) | ||
| <-manager.Started() | ||
|
|
||
| err := manager.StartPipeline(ctx, pipeline.ID) | ||
| require.Error(t, err) | ||
|
|
||
| require.Eventually(t, func() bool { | ||
| return manager.GetDriver("exporter") != nil | ||
| }, 5*time.Second, 10*time.Millisecond) | ||
|
|
||
| select { | ||
| case <-manager.GetDriver("exporter").Ready(): | ||
| case <-time.After(time.Second): | ||
| require.Fail(t, "exporter should be ready") | ||
| } | ||
|
|
||
| require.Eventually(t, ctrl.Satisfied, 2*time.Second, 10*time.Millisecond) | ||
|
|
||
| // notes(gfyrag): add this expectation AFTER the previous Eventually. | ||
| // If configured before the Eventually, it will never finish as the stop call is made in a t.Cleanup defined earlier | ||
| driver.EXPECT().Stop(gomock.Any()).Return(nil) | ||
|
|
||
| require.NoError(t, manager.Stop(ctx)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Race detector regression in TestManagerStop
The latest pipeline run failed because go test -race now trips a data race while executing TestManagerStop (see the CI log pointing at lines 260‑341). I can reproduce it locally with:
go test -race ./internal/replication -run TestManagerStop -count=1The race happens between the manager’s background goroutine and the new stop logic introduced here. We need to make the manager shutdown path race-free (or adjust the test so it no longer triggers unsynchronized access) before we can merge this PR.
🧰 Tools
🪛 GitHub Actions: Default
[error] 260-341: Race detected during execution of TestManagerStop (and related tests) in replication package.
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #1110 +/- ##
==========================================
+ Coverage 81.98% 82.06% +0.08%
==========================================
Files 186 187 +1
Lines 8942 9027 +85
==========================================
+ Hits 7331 7408 +77
- Misses 1187 1191 +4
- Partials 424 428 +4 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Disabled knowledge base sources:
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (3)
internal/replication/controller_grpc_client.go(2 hunks)internal/storage/system/store.go(2 hunks)test/e2e/suite_test.go(4 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
internal/storage/system/store.go (2)
internal/replication/grpc/replication_service.pb.go (3)
Exporter(181-188)Exporter(201-201)Exporter(216-218)internal/controller/ledger/errors.go (1)
ErrNotFound(14-14)
internal/replication/controller_grpc_client.go (2)
internal/replication/grpc/replication_service.pb.go (6)
ExporterConfiguration(409-415)ExporterConfiguration(428-428)ExporterConfiguration(443-445)UpdateExporterRequest(549-555)UpdateExporterRequest(568-568)UpdateExporterRequest(583-585)internal/controller/system/errors.go (2)
NewErrInvalidDriverConfiguration(69-74)NewErrExporterNotFound(47-49)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
- GitHub Check: Tests
- GitHub Check: Dirty
🔇 Additional comments (3)
internal/storage/system/store.go (1)
199-199: LGTM! Consistent error handling.Wrapping the error with
postgres.ResolveErrorensures consistent error handling across the store, matching the pattern used in other methods.internal/replication/controller_grpc_client.go (2)
33-33: LGTM! Error handling logic corrected.The fix changes from inverted logic (
!=) to correct logic (==), now properly mappingcodes.InvalidArgumenttoErrInvalidDriverConfiguration. This addresses the inconsistency flagged in the previous review and aligns withUpdateExporter's error handling pattern.
43-59: LGTM! Clean error handling with proper gRPC status code mapping.The implementation correctly:
- Maps
codes.InvalidArgumentto driver configuration error- Maps
codes.NotFoundto exporter not found error- Propagates all other errors unchanged
The switch-based approach is clean and consistent with the project's error handling patterns.
| func (d *DefaultStore) UpdateExporter(ctx context.Context, exporter ledger.Exporter) error { | ||
| ret, err := d.db.NewUpdate(). | ||
| Model(&exporter). | ||
| Where("id = ?", exporter.ID). | ||
| Exec(ctx) | ||
| if err != nil { | ||
| return postgres.ResolveError(err) | ||
| } | ||
|
|
||
| rowsAffected, err := ret.RowsAffected() | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
||
| if rowsAffected == 0 { | ||
| return postgres.ErrNotFound | ||
| } | ||
|
|
||
| return nil | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix inconsistencies with DeleteExporter error handling.
The method now includes the rows affected check (addressing the previous review), but has two inconsistencies with DeleteExporter (lines 172-190):
- Line 322 returns the error directly, but
DeleteExporterpanics at line 183 whenRowsAffected()fails - Line 326 returns
postgres.ErrNotFound, butDeleteExporterreturnssql.ErrNoRowsat line 186
Apply this diff to match DeleteExporter's pattern:
rowsAffected, err := ret.RowsAffected()
if err != nil {
- return err
+ panic(err)
}
if rowsAffected == 0 {
- return postgres.ErrNotFound
+ return sql.ErrNoRows
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| func (d *DefaultStore) UpdateExporter(ctx context.Context, exporter ledger.Exporter) error { | |
| ret, err := d.db.NewUpdate(). | |
| Model(&exporter). | |
| Where("id = ?", exporter.ID). | |
| Exec(ctx) | |
| if err != nil { | |
| return postgres.ResolveError(err) | |
| } | |
| rowsAffected, err := ret.RowsAffected() | |
| if err != nil { | |
| return err | |
| } | |
| if rowsAffected == 0 { | |
| return postgres.ErrNotFound | |
| } | |
| return nil | |
| } | |
| func (d *DefaultStore) UpdateExporter(ctx context.Context, exporter ledger.Exporter) error { | |
| ret, err := d.db.NewUpdate(). | |
| Model(&exporter). | |
| Where("id = ?", exporter.ID). | |
| Exec(ctx) | |
| if err != nil { | |
| return postgres.ResolveError(err) | |
| } | |
| rowsAffected, err := ret.RowsAffected() | |
| if err != nil { | |
| panic(err) | |
| } | |
| if rowsAffected == 0 { | |
| return sql.ErrNoRows | |
| } | |
| return nil | |
| } |
🤖 Prompt for AI Agents
In internal/storage/system/store.go around lines 311 to 330, UpdateExporter
currently returns the RowsAffected() error and returns postgres.ErrNotFound when
no rows were affected, which is inconsistent with DeleteExporter; change it to
panic if ret.RowsAffected() returns an error (matching DeleteExporter’s
behavior) and return sql.ErrNoRows when rowsAffected == 0 instead of
postgres.ErrNotFound; ensure the sql package is imported if not already.
Feature: Update Exporter Configuration
This PR adds the ability to update an exporter's configuration without needing to delete and recreate it. This is particularly useful when exporters are used by running pipelines, as it allows updating the configuration while maintaining pipeline continuity.
Changes
API Endpoint
PUT /v2/_/exporters/{exporterID}endpoint to update an exporter's configurationV2UpdateExporterRequest(separate fromV2CreateExporterRequestfor better API design)204 No Contenton success400), not found (404), and internal errors (500)Backend Implementation
UpdateExportermethod to the replication manager that:UpdateExporterto the system store interface and implementationUpdateExporterto the system controller interface and implementationUpdateExporterRPC to the proto definitionOpenAPI Specification
V2UpdateExporterRequestschema (separate fromV2CreateExporterRequest)PUT /v2/_/exporters/{exporterID}endpoint documentationTesting
internal/api/v2/controllers_exporters_update_test.gocovering:test/e2e/api_exporters_update_test.gocovering:Bug Fix
Technical Details
The update process:
This ensures that: