Skip to content

Conversation

@gfyrag
Copy link
Contributor

@gfyrag gfyrag commented Nov 7, 2025

Feature: Update Exporter Configuration

This PR adds the ability to update an exporter's configuration without needing to delete and recreate it. This is particularly useful when exporters are used by running pipelines, as it allows updating the configuration while maintaining pipeline continuity.

Changes

API Endpoint

  • Added PUT /v2/_/exporters/{exporterID} endpoint to update an exporter's configuration
  • The endpoint accepts a V2UpdateExporterRequest (separate from V2CreateExporterRequest for better API design)
  • Returns 204 No Content on success
  • Proper error handling for validation errors (400), not found (404), and internal errors (500)

Backend Implementation

  • Added UpdateExporter method to the replication manager that:
    • Validates the new configuration
    • Stops the exporter if it's currently running
    • Updates the exporter configuration in the database
    • The exporter will be restarted automatically by the pipeline manager if needed
  • Added UpdateExporter to the system store interface and implementation
  • Added UpdateExporter to the system controller interface and implementation
  • Updated gRPC service to support exporter updates:
    • Added UpdateExporter RPC to the proto definition
    • Implemented the RPC handler in the gRPC server
    • Implemented the RPC client in the gRPC client adapter

OpenAPI Specification

  • Added V2UpdateExporterRequest schema (separate from V2CreateExporterRequest)
  • Updated the PUT /v2/_/exporters/{exporterID} endpoint documentation
  • Regenerated the SDK with the new endpoint and request type

Testing

  • Unit Tests: Added comprehensive unit tests in internal/api/v2/controllers_exporters_update_test.go covering:
    • Successful update
    • Invalid configuration validation
    • Exporter not found
    • Internal errors
  • Integration Tests: Added integration tests in test/e2e/api_exporters_update_test.go covering:
    • Basic update scenarios (valid/invalid configuration, not found)
    • Update with different drivers
    • Update with running pipelines: Tests that updating an exporter configuration while pipelines are running correctly redirects new transactions to the new endpoint (using two HTTP test servers)

Bug Fix

  • Fixed a blocking issue where the application would stop if an exporter configuration was erroneous during startup

Technical Details

The update process:

  1. Validates the new configuration using the exporter's driver validator
  2. Stops the exporter if it's currently running (to prevent race conditions)
  3. Updates the exporter record in the database
  4. The pipeline manager will automatically restart the exporter with the new configuration when needed

This ensures that:

  • Configuration changes are validated before being applied
  • Running exporters are cleanly stopped before reconfiguration
  • Pipelines continue to work correctly after the update
  • The update is atomic (either succeeds completely or fails without partial state)

@gfyrag gfyrag requested a review from a team as a code owner November 7, 2025 12:54
@coderabbitai
Copy link

coderabbitai bot commented Nov 7, 2025

Walkthrough

Adds an "Update exporter" capability end-to-end: new HTTP v2 endpoint, gRPC RPC, controller and manager logic, storage update, client SDK and docs, mocks, unit and e2e tests, and related runtime/locking and driver lifecycle adjustments to apply exporter configuration changes at runtime.

Changes

Cohort / File(s) Change Summary
API Documentation
docs/api/README.md, pkg/client/README.md, pkg/client/docs/models/operations/v2updateexporterrequest.md, pkg/client/docs/models/operations/v2updateexporterresponse.md, pkg/client/docs/sdks/v2/README.md
Adds Update exporter docs and example (PUT /v2/_/exporters/{exporterID}) and model pages; duplicate README blocks present.
HTTP API V2 & Routing
internal/api/v2/controllers_exporters_update.go, internal/api/v2/controllers_exporters_update_test.go, internal/api/v2/routes.go
Adds handler for PUT /exporters/{exporterID} decoding ledger.ExporterConfiguration, calling controller.UpdateExporter, mapping errors (400/404/500), and tests.
API Mocks
internal/api/v2/mocks_system_controller_test.go, internal/api/common/mocks_system_controller_test.go, internal/api/v1/mocks_system_controller_test.go
Adds UpdateExporter(ctx, id, configuration ledger.ExporterConfiguration) error and recorder methods to replication backend and system controller mocks.
System Controller
internal/controller/system/controller.go
Adds UpdateExporter(ctx, id, configuration ledger.ExporterConfiguration) error to ReplicationBackend and implements it on DefaultController delegating to backend.
gRPC: proto / server / client
internal/replication/grpc/replication_service.proto, internal/replication/controller_grpc_server.go, internal/replication/controller_grpc_client.go
Adds UpdateExporter RPC and request/response messages; server calls manager.UpdateExporter and maps domain errors to gRPC codes; client stub calls RPC and maps gRPC errors back to domain errors.
Replication Manager & Runtime
internal/replication/manager.go, internal/replication/manager_test.go, internal/replication/driver_facade.go, internal/replication/drivers/registry.go, internal/replication/exporters.go, internal/replication/exporters_generated.go
Implements manager-side UpdateExporter (validate config, stop exporter/pipelines, persist update, resync), introduces withLock and stopExporter, switches not-found checks to postgres.ErrNotFound, adjusts retry cancellation source, and updates tests and mockgen flags.
Storage Layer & Mocks
internal/replication/store.go, internal/storage/system/store.go, internal/replication/store_generated_test.go
Adds Storage.UpdateExporter(ctx, exporter) to interface and adapter; implements DefaultStore.UpdateExporter using postgres error resolution; updates generated mock to include UpdateExporter.
Client SDK
pkg/client/models/operations/v2updateexporter.go, pkg/client/v2.go, pkg/client/docs/...
Adds V2UpdateExporter request/response models and V2.UpdateExporter client method implementing HTTP lifecycle, retries, hooks; note: duplicate UpdateExporter insertion observed in pkg/client/v2.go.
Test Infra & E2E
pkg/testserver/server.go, test/e2e/api_exporters_update_test.go, test/e2e/suite_test.go
Adds experimental instrumentation for pipelines sync period, comprehensive e2e tests for update scenarios (validation, not-found, success, driver switch, runtime redirection), and expands ParallelExecutionContext fields.
Common errors & cleanup
internal/storage/common/errors.go
Removes exported ErrNotFound alias to sql.ErrNoRows; callers moved to postgres-specific error handling.
Client docs/models (generated)
pkg/client/docs/models/operations/v2updateexporterrequest.md, pkg/client/docs/models/operations/v2updateexporterresponse.md, pkg/client/docs/sdks/v2/README.md
Adds generated model docs and SDK README updates for UpdateExporter.

Sequence Diagram(s)

sequenceDiagram
    participant Client
    participant HTTP as "HTTP V2 API"
    participant Ctrl as "System Controller"
    participant Mgr as "Replication Manager"
    participant Store as "Storage"
    participant Driver as "Exporter Driver"

    Client->>HTTP: PUT /v2/_/exporters/{id} (V2ExporterConfiguration)
    HTTP->>HTTP: parse ID & body
    HTTP->>Ctrl: UpdateExporter(ctx, id, config)
    Ctrl->>Mgr: UpdateExporter(ctx, id, config)

    rect rgb(235,245,255)
      Note over Mgr: validate config, stop exporter/pipelines, persist update, resynchronize
      Mgr->>Driver: stopExporter(id)
      Mgr->>Store: UpdateExporter(exporter)
      Store-->>Mgr: ok / error
      Mgr->>Mgr: synchronizePipelines()
    end

    alt success
      Mgr-->>Ctrl: ok
      Ctrl-->>HTTP: ok
      HTTP-->>Client: 204 No Content
    else validation error
      Mgr-->>HTTP: ErrInvalidDriverConfiguration
      HTTP-->>Client: 400 Bad Request
    else not found
      Store-->>Mgr: ErrExporterNotFound
      HTTP-->>Client: 404 Not Found
    else other error
      HTTP-->>Client: 500 Internal Server Error
    end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

  • Pay special attention to:
    • internal/replication/manager.go — UpdateExporter, new locking, lifecycle and concurrency changes
    • internal/replication/manager_test.go & test/e2e/api_exporters_update_test.go — timing, synchronization and test determinism
    • internal/replication/driver_facade.go — retry loop cancellation semantics
    • pkg/client/v2.go — duplicate UpdateExporter implementation (remove duplicate)
    • Error mapping after removal of internal/storage/common.ErrNotFound and consistent use of postgres.ResolveError

Poem

🐇 I hopped a patch through docs and code,
I nudged the driver while pipelines flowed.
Stop, update, sync — new routes to explore,
Messages rerouted, logs tell of more.
Thump-thump — exporters updated, hooray!

Pre-merge checks and finishing touches

❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 0.00% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (2 passed)
Check name Status Explanation
Title check ✅ Passed The title 'feat: add update of exporters config' directly describes the main feature being added - the ability to update exporter configurations.
Description check ✅ Passed The description is comprehensive and directly related to the changeset, explaining the feature, API changes, backend implementation, gRPC updates, testing, and bug fixes.
✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch feat/exporters-update

📜 Recent review details

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

Disabled knowledge base sources:

  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between 6efde91 and d58161e.

📒 Files selected for processing (1)
  • internal/storage/system/store.go (4 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • internal/storage/system/store.go
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
  • GitHub Check: Tests
  • GitHub Check: Dirty

Comment @coderabbitai help to get the list of available commands and usage tips.

Also, fix a blocking stop of the app when the exporter config is erroned.
@gfyrag gfyrag force-pushed the feat/exporters-update branch from 61e7c32 to ca7f5c9 Compare November 7, 2025 12:59
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

🧹 Nitpick comments (1)
docs/api/README.md (1)

2409-2409: V2UpdateExporterRequest schema defined but not referenced.

Line 2409 references V2ExporterConfiguration as the body parameter, but lines 5542-5559 define a separate V2UpdateExporterRequest schema that is never used. Either reference V2UpdateExporterRequest at line 2409, or remove the unused schema definition if both Create and Update endpoints intentionally use the same schema.

Also applies to: 5542-5559

📜 Review details

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

Disabled knowledge base sources:

  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between f38748e and ca7f5c9.

⛔ Files ignored due to path filters (7)
  • internal/replication/grpc/replication_service.pb.go is excluded by !**/*.pb.go, !**/*.pb.go
  • internal/replication/grpc/replication_service_grpc.pb.go is excluded by !**/*.pb.go, !**/*.pb.go
  • openapi.yaml is excluded by !**/*.yaml
  • openapi/v2.yaml is excluded by !**/*.yaml
  • pkg/client/.speakeasy/gen.lock is excluded by !**/*.lock, !**/*.lock
  • pkg/client/.speakeasy/logs/naming.log is excluded by !**/*.log, !**/*.log
  • pkg/client/speakeasyusagegen/.speakeasy/logs/naming.log is excluded by !**/*.log, !**/*.log
📒 Files selected for processing (29)
  • docs/api/README.md (2 hunks)
  • internal/api/common/mocks_system_controller_test.go (2 hunks)
  • internal/api/v1/mocks_system_controller_test.go (2 hunks)
  • internal/api/v2/controllers_exporters_update.go (1 hunks)
  • internal/api/v2/controllers_exporters_update_test.go (1 hunks)
  • internal/api/v2/mocks_system_controller_test.go (2 hunks)
  • internal/api/v2/routes.go (2 hunks)
  • internal/controller/system/controller.go (2 hunks)
  • internal/replication/controller_grpc_client.go (1 hunks)
  • internal/replication/controller_grpc_server.go (1 hunks)
  • internal/replication/driver_facade.go (2 hunks)
  • internal/replication/drivers/registry.go (2 hunks)
  • internal/replication/exporters.go (1 hunks)
  • internal/replication/exporters_generated.go (1 hunks)
  • internal/replication/grpc/replication_service.proto (2 hunks)
  • internal/replication/manager.go (9 hunks)
  • internal/replication/manager_test.go (5 hunks)
  • internal/replication/store.go (2 hunks)
  • internal/replication/store_generated_test.go (1 hunks)
  • internal/storage/common/errors.go (0 hunks)
  • internal/storage/system/store.go (2 hunks)
  • pkg/client/README.md (1 hunks)
  • pkg/client/docs/models/operations/v2updateexporterrequest.md (1 hunks)
  • pkg/client/docs/models/operations/v2updateexporterresponse.md (1 hunks)
  • pkg/client/docs/sdks/v2/README.md (2 hunks)
  • pkg/client/models/operations/v2updateexporter.go (1 hunks)
  • pkg/client/v2.go (1 hunks)
  • pkg/testserver/server.go (1 hunks)
  • test/e2e/api_exporters_update_test.go (1 hunks)
💤 Files with no reviewable changes (1)
  • internal/storage/common/errors.go
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-04-29T11:24:28.923Z
Learnt from: gfyrag
Repo: formancehq/ledger PR: 892
File: internal/controller/ledger/controller_default.go:196-196
Timestamp: 2025-04-29T11:24:28.923Z
Learning: In the ledger Import function, it's critical to maintain proper log ID tracking by updating lastLogID with the current log.ID after each processed log, rather than setting it to nil. This ensures the system can properly validate the ordering of logs and prevent duplicate or out-of-order processing, which is essential for maintaining data integrity in the ledger.

Applied to files:

  • internal/replication/manager.go
🧬 Code graph analysis (17)
internal/replication/drivers/registry.go (1)
internal/controller/ledger/errors.go (1)
  • ErrNotFound (14-14)
pkg/testserver/server.go (2)
deployments/pulumi/pkg/config/config.go (1)
  • Duration (258-258)
cmd/worker.go (1)
  • WorkerPipelinesSyncPeriod (25-25)
internal/api/v2/controllers_exporters_update_test.go (2)
internal/controller/system/errors.go (2)
  • NewErrInvalidDriverConfiguration (69-74)
  • NewErrExporterNotFound (47-49)
internal/api/v2/routes.go (2)
  • NewRouter (21-122)
  • WithExporters (158-162)
internal/replication/controller_grpc_server.go (2)
internal/replication/grpc/replication_service.pb.go (9)
  • UpdateExporterRequest (549-555)
  • UpdateExporterRequest (568-568)
  • UpdateExporterRequest (583-585)
  • UpdateExporterResponse (601-605)
  • UpdateExporterResponse (618-618)
  • UpdateExporterResponse (633-635)
  • ExporterConfiguration (409-415)
  • ExporterConfiguration (428-428)
  • ExporterConfiguration (443-445)
internal/controller/system/errors.go (2)
  • ErrInvalidDriverConfiguration (51-54)
  • ErrExporterNotFound (36-36)
internal/controller/system/controller.go (1)
internal/replication/grpc/replication_service.pb.go (3)
  • ExporterConfiguration (409-415)
  • ExporterConfiguration (428-428)
  • ExporterConfiguration (443-445)
internal/api/common/mocks_system_controller_test.go (2)
internal/api/v1/mocks_system_controller_test.go (4)
  • MockReplicationBackend (23-27)
  • MockReplicationBackendMockRecorder (30-32)
  • SystemController (221-225)
  • SystemControllerMockRecorder (228-230)
internal/replication/grpc/replication_service.pb.go (3)
  • ExporterConfiguration (409-415)
  • ExporterConfiguration (428-428)
  • ExporterConfiguration (443-445)
internal/api/v2/mocks_system_controller_test.go (2)
internal/api/common/mocks_system_controller_test.go (4)
  • MockReplicationBackend (23-27)
  • MockReplicationBackendMockRecorder (30-32)
  • SystemController (221-225)
  • SystemControllerMockRecorder (228-230)
internal/replication/grpc/replication_service.pb.go (3)
  • ExporterConfiguration (409-415)
  • ExporterConfiguration (428-428)
  • ExporterConfiguration (443-445)
test/e2e/api_exporters_update_test.go (8)
test/e2e/suite_test.go (1)
  • UseTemplatedDatabase (202-204)
pkg/testserver/server.go (6)
  • ExperimentalFeaturesInstrumentation (30-35)
  • ExperimentalExportersInstrumentation (37-42)
  • ExperimentalEnableWorker (44-49)
  • ExperimentalPipelinesPullIntervalInstrumentation (79-84)
  • ExperimentalPipelinesPushRetryPeriodInstrumentation (72-77)
  • ExperimentalPipelinesSyncPeriodInstrumentation (86-91)
pkg/client/models/operations/v2updateexporter.go (1)
  • V2UpdateExporterRequest (9-13)
pkg/client/models/components/v2errorsenum.go (2)
  • V2ErrorsEnumValidation (15-15)
  • V2ErrorsEnumNotFound (19-19)
pkg/client/models/operations/v2createledger.go (1)
  • V2CreateLedgerRequest (9-13)
pkg/client/models/operations/v2createtransaction.go (1)
  • V2CreateTransactionRequest (9-25)
pkg/client/models/components/v2posttransaction.go (1)
  • V2PostTransaction (58-68)
pkg/client/models/components/v2posting.go (1)
  • V2Posting (10-15)
internal/storage/system/store.go (1)
internal/replication/grpc/replication_service.pb.go (3)
  • Exporter (181-188)
  • Exporter (201-201)
  • Exporter (216-218)
internal/replication/store.go (1)
internal/storage/system/store.go (1)
  • DefaultStore (41-44)
internal/replication/manager_test.go (5)
internal/replication/manager.go (2)
  • NewManager (475-498)
  • WithSyncPeriod (508-512)
internal/replication/store_generated_test.go (2)
  • NewMockStorage (72-76)
  • NewMockLogFetcher (33-37)
internal/replication/exporters_generated.go (1)
  • NewMockConfigValidator (32-36)
internal/storage/bucket/bucket_generated_test.go (1)
  • NewMockFactory (165-169)
internal/storage/common/pagination.go (2)
  • InitialPaginatedQuery (9-14)
  • PaginatedQuery (25-28)
internal/api/v2/controllers_exporters_update.go (2)
internal/replication/grpc/replication_service.pb.go (3)
  • ExporterConfiguration (409-415)
  • ExporterConfiguration (428-428)
  • ExporterConfiguration (443-445)
internal/controller/system/errors.go (2)
  • ErrInvalidDriverConfiguration (51-54)
  • ErrExporterNotFound (36-36)
internal/replication/controller_grpc_client.go (2)
internal/replication/grpc/replication_service.pb.go (6)
  • ExporterConfiguration (409-415)
  • ExporterConfiguration (428-428)
  • ExporterConfiguration (443-445)
  • UpdateExporterRequest (549-555)
  • UpdateExporterRequest (568-568)
  • UpdateExporterRequest (583-585)
internal/controller/system/errors.go (2)
  • NewErrInvalidDriverConfiguration (69-74)
  • NewErrExporterNotFound (47-49)
internal/replication/manager.go (2)
internal/controller/ledger/errors.go (1)
  • ErrNotFound (14-14)
internal/controller/system/errors.go (2)
  • NewErrInvalidDriverConfiguration (69-74)
  • NewErrExporterNotFound (47-49)
internal/replication/store_generated_test.go (1)
internal/replication/grpc/replication_service.pb.go (3)
  • Exporter (181-188)
  • Exporter (201-201)
  • Exporter (216-218)
internal/api/v1/mocks_system_controller_test.go (2)
internal/api/common/mocks_system_controller_test.go (4)
  • MockReplicationBackend (23-27)
  • MockReplicationBackendMockRecorder (30-32)
  • SystemController (221-225)
  • SystemControllerMockRecorder (228-230)
internal/replication/grpc/replication_service.pb.go (3)
  • ExporterConfiguration (409-415)
  • ExporterConfiguration (428-428)
  • ExporterConfiguration (443-445)
pkg/client/v2.go (10)
pkg/client/models/operations/v2updateexporter.go (2)
  • V2UpdateExporterRequest (9-13)
  • V2UpdateExporterResponse (29-31)
pkg/client/models/operations/options.go (4)
  • Option (41-41)
  • Options (32-39)
  • SupportedOptionRetries (15-15)
  • SupportedOptionTimeout (16-16)
pkg/client/internal/utils/utils.go (4)
  • ReplaceParameters (51-60)
  • MatchStatusCodes (71-92)
  • ConsumeRawBody (350-360)
  • UnmarshalJsonFromResponseBody (39-49)
pkg/client/internal/utils/pathparams.go (1)
  • GenerateURL (19-38)
pkg/client/internal/hooks/hooks.go (4)
  • HookContext (27-35)
  • BeforeRequestContext (37-39)
  • AfterErrorContext (45-47)
  • AfterSuccessContext (41-43)
pkg/client/internal/config/sdkconfiguration.go (1)
  • SDKConfiguration (16-25)
pkg/client/internal/utils/requestbody.go (1)
  • SerializeRequestBody (31-42)
pkg/client/internal/utils/security.go (1)
  • PopulateSecurity (27-92)
pkg/client/internal/utils/retries.go (3)
  • RetryConfig (23-23)
  • Retries (25-28)
  • Retry (30-104)
pkg/client/models/components/v2errorresponse.go (1)
  • V2ErrorResponse (5-9)
🪛 GitHub Actions: Default
test/e2e/api_exporters_update_test.go

[error] 306-306: Exporters update API tests failed: race detected during execution with running pipelines; timeline shows repeated pipeline start/synchronization errors.

internal/replication/manager_test.go

[error] 260-341: Race detected during execution of TestManagerStop (and related tests) in replication package.

internal/replication/manager.go

[error] 301-301: Data race detected: Stop() and Run() goroutines racing in Manager.Stop/Manager.Run during tests.

🪛 markdownlint-cli2 (0.18.1)
pkg/client/docs/sdks/v2/README.md

1835-1835: Hard tabs
Column: 1

(MD010, no-hard-tabs)


1836-1836: Hard tabs
Column: 1

(MD010, no-hard-tabs)


1837-1837: Hard tabs
Column: 1

(MD010, no-hard-tabs)


1838-1838: Hard tabs
Column: 1

(MD010, no-hard-tabs)


1839-1839: Hard tabs
Column: 1

(MD010, no-hard-tabs)


1840-1840: Hard tabs
Column: 1

(MD010, no-hard-tabs)

docs/api/README.md

2413-2413: Blank line inside blockquote

(MD028, no-blanks-blockquote)


5557-5557: Multiple headings with the same content

(MD024, no-duplicate-heading)


5559-5559: Emphasis used instead of a heading

(MD036, no-emphasis-as-heading)

🔇 Additional comments (28)
internal/replication/exporters_generated.go (1)

6-6: Generated file correctly updated with -typed flag.

The mockgen command has been properly updated to include the -typed flag, which improves type safety and enables better IDE support for mock assertions. The mock implementation for ConfigValidator is correct and follows standard mockgen patterns.

internal/replication/drivers/registry.go (2)

7-7: LGTM: Import aligns with error standardization.

The addition of the postgres package import supports the error handling refactoring at line 71, standardizing on postgres.ErrNotFound as shown in the relevant code snippet from internal/controller/ledger/errors.go.


71-71: LGTM: Error comparison correctly standardized.

The change from common.ErrNotFound to postgres.ErrNotFound is correct and aligns with the codebase standard. This ensures proper error handling when GetExporter returns a not-found error.

pkg/client/README.md (1)

158-158: LGTM!

The documentation correctly lists the new UpdateExporter operation in the available operations section, consistent with the other exporter-related operations.

pkg/client/docs/sdks/v2/README.md (2)

36-36: LGTM!

The UpdateExporter operation is correctly positioned in the navigation between GetExporterState and DeleteExporter, maintaining logical ordering of exporter operations.


1825-1891: LGTM!

The UpdateExporter documentation section is comprehensive and follows the established pattern used for other operations. It includes example usage, parameter descriptions, response type, and error cases.

Note: The static analysis tool flagged hard tabs in the code example, but since this is auto-generated SDK documentation (created by Speakeasy), these formatting issues should be addressed in the generator configuration rather than manually.

pkg/client/docs/models/operations/v2updateexporterresponse.md (1)

1-8: LGTM!

The V2UpdateExporterResponse documentation is minimal and appropriate for an operation that returns 204 No Content on success (only HTTP metadata is relevant).

pkg/client/docs/models/operations/v2updateexporterrequest.md (1)

1-9: LGTM!

The V2UpdateExporterRequest documentation clearly defines the required fields: ExporterID for identifying which exporter to update, and V2ExporterConfiguration for the new configuration.

internal/storage/system/store.go (1)

199-199: LGTM!

Good consistency improvement. Wrapping the error with postgres.ResolveError ensures proper error classification (e.g., sql.ErrNoRows → postgres.ErrNotFound) consistent with other methods in this file.

internal/replication/driver_facade.go (2)

40-40: Verify the context cancellation semantics change.

The retry loop now observes c.startContext.Done() instead of ctx.Done(). This means the loop will only exit on internal cancellation (via Stop()) rather than on the outer context cancellation. This changes when the retry loop terminates after a Start error.

Was this intentional to enforce explicit lifecycle management through the Stop() method rather than relying on context cancellation?


55-55: LGTM!

The comment clarification accurately reflects the logic: if startingChan is closed, the facade has progressed past the starting phase into the running phase.

internal/replication/store.go (2)

34-34: LGTM!

The Storage interface correctly extends to include UpdateExporter, maintaining consistency with other exporter-related operations.


49-51: LGTM!

The storageAdapter properly delegates UpdateExporter to the underlying DefaultStore implementation, consistent with the delegation pattern used for other storage operations.

internal/replication/grpc/replication_service.proto (2)

12-12: LGTM!

The UpdateExporter RPC is correctly positioned in the service definition, maintaining logical grouping with other exporter-related operations.


71-76: LGTM!

The protobuf message definitions are well-structured:

  • UpdateExporterRequest captures the exporter ID and new configuration
  • UpdateExporterResponse is appropriately empty (aligns with 204 No Content semantics)
  • Field numbering is consistent with protobuf conventions
internal/replication/store_generated_test.go (1)

244-256: LGTM! Generated mock follows standard pattern.

The UpdateExporter mock method and recorder are correctly generated following the same pattern as other storage methods.

internal/api/v2/controllers_exporters_update_test.go (1)

22-108: LGTM! Comprehensive test coverage.

The test covers all key scenarios: success (204), validation error (400), not found (404), and internal error (500). The test structure is clean and follows best practices with proper mocking and parallel execution.

internal/replication/controller_grpc_server.go (1)

72-89: LGTM! Error handling follows established patterns.

The UpdateExporter handler correctly maps domain errors to gRPC status codes and follows the same pattern as other handlers in this file.

internal/api/v2/controllers_exporters_update.go (1)

13-33: LGTM! Clean HTTP handler implementation.

The handler properly extracts parameters, decodes the request body, delegates to the controller, and maps errors to appropriate HTTP status codes. The 204 No Content response for successful updates follows REST conventions.

internal/controller/system/controller.go (1)

88-93: LGTM! Interface and implementation follow established patterns.

The UpdateExporter addition to the ReplicationBackend interface and DefaultController implementation follows the same pattern as other exporter operations. The documentation correctly identifies possible errors.

test/e2e/api_exporters_update_test.go (1)

46-172: LGTM! Comprehensive test coverage for basic update scenarios.

The test thoroughly covers invalid configuration, non-existent exporter ID, valid configuration updates, and driver changes. The nested context structure clearly organizes the test flow.

internal/api/common/mocks_system_controller_test.go (2)

206-218: LGTM! Mock additions follow standard patterns.

The UpdateExporter mock methods for MockReplicationBackend and SystemController are correctly generated and consistent with other mock methods in the file.


477-489: LGTM! Mock recorder correctly implemented.

The SystemController UpdateExporter mock follows the same pattern as other controller methods.

internal/api/v2/mocks_system_controller_test.go (1)

206-218: LGTM!

Generated mock methods follow the standard gomock pattern and are consistent with other methods in the file.

Also applies to: 477-489

internal/api/v1/mocks_system_controller_test.go (1)

206-218: LGTM!

Generated mock methods follow the standard gomock pattern and are consistent with other methods in the file.

Also applies to: 477-489

internal/replication/manager.go (2)

246-273: LGTM! withLock pattern improves synchronization.

The withLock helper (lines 246-250) and its usage to wrap synchronizePipelines calls (lines 252-256, 269-273) ensures proper mutex protection for pipeline synchronization operations. This is cleaner than inline lock/unlock pairs.


373-401: UpdateExporter implementation is sound.

The update flow is logical:

  1. Validate new configuration (line 377)
  2. Stop all pipelines using the exporter (line 381)
  3. Fetch exporter from storage (line 385)
  4. Update configuration (line 394)
  5. Persist to storage (line 396)
  6. Re-synchronize pipelines to restart with new config (line 400)

Note that the lock is held for the entire operation including DB calls. This is consistent with other methods (DeleteExporter, CreatePipeline) but could cause contention during slow DB operations.

Verify that synchronizePipelines at line 400 correctly restarts the stopped pipelines with the updated configuration by checking the storage and driver initialization paths.

pkg/client/v2.go (1)

5863-6057: No issues found; code is correct as-is.

The verification confirms that only one UpdateExporter function exists in the file (line 5864) and the code compiles without any duplicate declaration errors. The AI summary's warning about duplicate declarations was incorrect. The implementation is consistent with other methods in the file and requires no changes.

Comment on lines +43 to +59
func (t ThroughGRPCBackend) UpdateExporter(ctx context.Context, id string, configuration ledger.ExporterConfiguration) error {
_, err := t.client.UpdateExporter(ctx, &grpc.UpdateExporterRequest{
Id: id,
Config: mapExporterConfiguration(configuration),
})
if err != nil {
switch {
case status.Code(err) == codes.InvalidArgument:
return system.NewErrInvalidDriverConfiguration(configuration.Driver, err)
case status.Code(err) == codes.NotFound:
return system.NewErrExporterNotFound(id)
default:
return err
}
}
return nil
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

Fix error handling inconsistency with CreateExporter.

The UpdateExporter error handling correctly maps codes.InvalidArgument to ErrInvalidDriverConfiguration, but CreateExporter has inverted logic at Line 33 (if status.Code(err) != codes.InvalidArgument). This inconsistency could lead to incorrect error responses.

Verify CreateExporter's error handling logic:


🏁 Script executed:

#!/bin/bash
# Search for similar error handling patterns in gRPC clients
rg -n -A3 -B3 'status\.Code.*InvalidArgument' --type=go -g 'internal/replication/*client*.go'

Length of output: 1156


Fix inverted error handling logic in CreateExporter (Line 33).

The script output confirms an inconsistency: CreateExporter at Line 33 uses != (inverted logic) while UpdateExporter at Line 50 correctly uses ==. CreateExporter incorrectly returns ErrInvalidDriverConfiguration when the error code is not InvalidArgument, which is backwards. Both methods should check for == codes.InvalidArgument to return ErrInvalidDriverConfiguration consistently.

Line 33 should be changed from if status.Code(err) != codes.InvalidArgument { to if status.Code(err) == codes.InvalidArgument {.

🤖 Prompt for AI Agents
internal/replication/controller_grpc_client.go around lines 43 to 59:
CreateExporter (around line 33) currently uses inverted logic for
InvalidArgument; update it to mirror UpdateExporter by checking status.Code(err)
== codes.InvalidArgument and returning
system.NewErrInvalidDriverConfiguration(configuration.Driver, err) in that case;
also ensure NotFound returns system.NewErrExporterNotFound(id) and any other
error is returned unchanged so CreateExporter and UpdateExporter use consistent
error handling.

Comment on lines +258 to +341
t.Parallel()

t.Run("nominal", func(t *testing.T) {
ctx := logging.TestingContext()
ctrl := gomock.NewController(t)
storage := NewMockStorage(ctrl)
logFetcher := NewMockLogFetcher(ctrl)
exporterConfigValidator := NewMockConfigValidator(ctrl)
driverFactory := drivers.NewMockFactory(ctrl)
driver := drivers.NewMockDriver(ctrl)

pipelineConfiguration := ledger.NewPipelineConfiguration("module1", "exporter")
pipeline := ledger.NewPipeline(pipelineConfiguration)

driverFactory.EXPECT().
Create(gomock.Any(), pipelineConfiguration.ExporterID).
AnyTimes().
Return(driver, nil, nil)
driver.EXPECT().
Start(gomock.Any()).
AnyTimes().
Return(nil)

log := ledger.NewLog(ledger.CreatedTransaction{
Transaction: ledger.NewTransaction(),
})
log.ID = pointer.For(uint64(1))

logFetcher.EXPECT().
ListLogs(gomock.Any(), common.InitialPaginatedQuery[any]{
PageSize: 100,
Column: "id",
Options: common.ResourceQuery[any]{},
Order: pointer.For(bunpaginate.Order(bunpaginate.OrderAsc)),
}).
AnyTimes().
DoAndReturn(func(ctx context.Context, paginatedQuery common.PaginatedQuery[any]) (*bunpaginate.Cursor[ledger.Log], error) {
return &bunpaginate.Cursor[ledger.Log]{}, nil
})

storage.EXPECT().
ListEnabledPipelines(gomock.Any()).
AnyTimes().
Return([]ledger.Pipeline{pipeline}, nil)

storage.EXPECT().
GetPipeline(gomock.Any(), pipeline.ID).
AnyTimes().
Return(&pipeline, nil)

storage.EXPECT().
OpenLedger(gomock.Any(), pipelineConfiguration.Ledger).
AnyTimes().
Return(logFetcher, &ledger.Ledger{}, nil)

manager := startManager(
t,
ctx,
storage,
driverFactory,
exporterConfigValidator,
)
<-manager.Started()

err := manager.StartPipeline(ctx, pipeline.ID)
require.Error(t, err)

require.Eventually(t, func() bool {
return manager.GetDriver("exporter") != nil
}, 5*time.Second, 10*time.Millisecond)

select {
case <-manager.GetDriver("exporter").Ready():
case <-time.After(time.Second):
require.Fail(t, "exporter should be ready")
}

require.Eventually(t, ctrl.Satisfied, 2*time.Second, 10*time.Millisecond)

// notes(gfyrag): add this expectation AFTER the previous Eventually.
// If configured before the Eventually, it will never finish as the stop call is made in a t.Cleanup defined earlier
driver.EXPECT().Stop(gomock.Any()).Return(nil)

require.NoError(t, manager.Stop(ctx))
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Race detector regression in TestManagerStop

The latest pipeline run failed because go test -race now trips a data race while executing TestManagerStop (see the CI log pointing at lines 260‑341). I can reproduce it locally with:

go test -race ./internal/replication -run TestManagerStop -count=1

The race happens between the manager’s background goroutine and the new stop logic introduced here. We need to make the manager shutdown path race-free (or adjust the test so it no longer triggers unsynchronized access) before we can merge this PR.

🧰 Tools
🪛 GitHub Actions: Default

[error] 260-341: Race detected during execution of TestManagerStop (and related tests) in replication package.

@codecov
Copy link

codecov bot commented Nov 7, 2025

Codecov Report

❌ Patch coverage is 53.09735% with 53 lines in your changes missing coverage. Please review.
✅ Project coverage is 82.06%. Comparing base (63cd3d8) to head (6efde91).
⚠️ Report is 3 commits behind head on main.

Files with missing lines Patch % Lines
internal/replication/manager.go 63.82% 12 Missing and 5 partials ⚠️
internal/replication/controller_grpc_client.go 0.00% 15 Missing ⚠️
internal/replication/controller_grpc_server.go 0.00% 14 Missing ⚠️
internal/storage/system/store.go 57.14% 3 Missing and 3 partials ⚠️
internal/replication/drivers/registry.go 0.00% 1 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #1110      +/-   ##
==========================================
+ Coverage   81.98%   82.06%   +0.08%     
==========================================
  Files         186      187       +1     
  Lines        8942     9027      +85     
==========================================
+ Hits         7331     7408      +77     
- Misses       1187     1191       +4     
- Partials      424      428       +4     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

paul-nicolas
paul-nicolas previously approved these changes Nov 7, 2025
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

📜 Review details

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

Disabled knowledge base sources:

  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between e91216b and 6efde91.

📒 Files selected for processing (3)
  • internal/replication/controller_grpc_client.go (2 hunks)
  • internal/storage/system/store.go (2 hunks)
  • test/e2e/suite_test.go (4 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
internal/storage/system/store.go (2)
internal/replication/grpc/replication_service.pb.go (3)
  • Exporter (181-188)
  • Exporter (201-201)
  • Exporter (216-218)
internal/controller/ledger/errors.go (1)
  • ErrNotFound (14-14)
internal/replication/controller_grpc_client.go (2)
internal/replication/grpc/replication_service.pb.go (6)
  • ExporterConfiguration (409-415)
  • ExporterConfiguration (428-428)
  • ExporterConfiguration (443-445)
  • UpdateExporterRequest (549-555)
  • UpdateExporterRequest (568-568)
  • UpdateExporterRequest (583-585)
internal/controller/system/errors.go (2)
  • NewErrInvalidDriverConfiguration (69-74)
  • NewErrExporterNotFound (47-49)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
  • GitHub Check: Tests
  • GitHub Check: Dirty
🔇 Additional comments (3)
internal/storage/system/store.go (1)

199-199: LGTM! Consistent error handling.

Wrapping the error with postgres.ResolveError ensures consistent error handling across the store, matching the pattern used in other methods.

internal/replication/controller_grpc_client.go (2)

33-33: LGTM! Error handling logic corrected.

The fix changes from inverted logic (!=) to correct logic (==), now properly mapping codes.InvalidArgument to ErrInvalidDriverConfiguration. This addresses the inconsistency flagged in the previous review and aligns with UpdateExporter's error handling pattern.


43-59: LGTM! Clean error handling with proper gRPC status code mapping.

The implementation correctly:

  • Maps codes.InvalidArgument to driver configuration error
  • Maps codes.NotFound to exporter not found error
  • Propagates all other errors unchanged

The switch-based approach is clean and consistent with the project's error handling patterns.

Comment on lines 311 to 330
func (d *DefaultStore) UpdateExporter(ctx context.Context, exporter ledger.Exporter) error {
ret, err := d.db.NewUpdate().
Model(&exporter).
Where("id = ?", exporter.ID).
Exec(ctx)
if err != nil {
return postgres.ResolveError(err)
}

rowsAffected, err := ret.RowsAffected()
if err != nil {
return err
}

if rowsAffected == 0 {
return postgres.ErrNotFound
}

return nil
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Fix inconsistencies with DeleteExporter error handling.

The method now includes the rows affected check (addressing the previous review), but has two inconsistencies with DeleteExporter (lines 172-190):

  1. Line 322 returns the error directly, but DeleteExporter panics at line 183 when RowsAffected() fails
  2. Line 326 returns postgres.ErrNotFound, but DeleteExporter returns sql.ErrNoRows at line 186

Apply this diff to match DeleteExporter's pattern:

 	rowsAffected, err := ret.RowsAffected()
 	if err != nil {
-		return err
+		panic(err)
 	}
 
 	if rowsAffected == 0 {
-		return postgres.ErrNotFound
+		return sql.ErrNoRows
 	}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func (d *DefaultStore) UpdateExporter(ctx context.Context, exporter ledger.Exporter) error {
ret, err := d.db.NewUpdate().
Model(&exporter).
Where("id = ?", exporter.ID).
Exec(ctx)
if err != nil {
return postgres.ResolveError(err)
}
rowsAffected, err := ret.RowsAffected()
if err != nil {
return err
}
if rowsAffected == 0 {
return postgres.ErrNotFound
}
return nil
}
func (d *DefaultStore) UpdateExporter(ctx context.Context, exporter ledger.Exporter) error {
ret, err := d.db.NewUpdate().
Model(&exporter).
Where("id = ?", exporter.ID).
Exec(ctx)
if err != nil {
return postgres.ResolveError(err)
}
rowsAffected, err := ret.RowsAffected()
if err != nil {
panic(err)
}
if rowsAffected == 0 {
return sql.ErrNoRows
}
return nil
}
🤖 Prompt for AI Agents
In internal/storage/system/store.go around lines 311 to 330, UpdateExporter
currently returns the RowsAffected() error and returns postgres.ErrNotFound when
no rows were affected, which is inconsistent with DeleteExporter; change it to
panic if ret.RowsAffected() returns an error (matching DeleteExporter’s
behavior) and return sql.ErrNoRows when rowsAffected == 0 instead of
postgres.ErrNotFound; ensure the sql package is imported if not already.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants