Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion cmd/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ type WorkerCommandConfiguration struct {
WorkerGRPCConfig `mapstructure:",squash"`
}

// addWorkerFlags adds command-line flags to cmd to configure worker runtime behavior.
// The flags control async block hashing, pipeline pull/push/sync behavior and pagination, and bucket cleanup retention and schedule.
func addWorkerFlags(cmd *cobra.Command) {
cmd.Flags().Int(WorkerAsyncBlockHasherMaxBlockSizeFlag, 1000, "Max block size")
cmd.Flags().String(WorkerAsyncBlockHasherScheduleFlag, "0 * * * * *", "Schedule")
Expand All @@ -69,6 +71,9 @@ func addWorkerFlags(cmd *cobra.Command) {
cmd.Flags().String(WorkerBucketCleanupScheduleFlag, "0 0 * * * *", "Schedule for bucket cleanup (cron format)")
}

// NewWorkerCommand constructs the "worker" Cobra command which initializes and runs the worker service using loaded configuration and composed FX modules.
// The command registers worker-specific flags via addWorkerFlags and common service, bunconnect, and OTLP flags, and exposes the --worker-grpc-address flag (default ":8081").
// When executed it loads configuration and starts the service with the configured modules and a gRPC server.
func NewWorkerCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "worker",
Expand Down Expand Up @@ -113,6 +118,8 @@ func NewWorkerCommand() *cobra.Command {
return cmd
}

// newWorkerModule creates an fx.Option that configures the worker module using the provided WorkerConfiguration.
// It maps the configuration into AsyncBlockRunnerConfig, ReplicationConfig, and BucketCleanupRunnerConfig for the worker.
func newWorkerModule(configuration WorkerConfiguration) fx.Option {
return worker.NewFXModule(worker.ModuleConfig{
AsyncBlockRunnerConfig: storage.AsyncBlockRunnerConfig{
Expand All @@ -130,4 +137,4 @@ func newWorkerModule(configuration WorkerConfiguration) fx.Option {
Schedule: configuration.BucketCleanupCRONSpec,
},
})
}
}
3 changes: 2 additions & 1 deletion internal/api/v2/controllers_buckets_delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"github.com/formancehq/go-libs/v3/api"
)

// deleteBucket constructs an HTTP handler that deletes the bucket specified by the "bucket" URL parameter.
// The handler invokes systemController.DeleteBucket with the request context; if deletion fails it responds with an internal server error, otherwise it responds with 204 No Content.
func deleteBucket(systemController system.Controller) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
bucket := chi.URLParam(r, "bucket")
Expand All @@ -22,4 +24,3 @@ func deleteBucket(systemController system.Controller) http.HandlerFunc {
api.NoContent(w)
}
}

4 changes: 3 additions & 1 deletion internal/api/v2/controllers_buckets_restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ import (
"github.com/formancehq/go-libs/v3/api"
)

// restoreBucket returns an HTTP handler that restores the bucket identified by the URL parameter "bucket".
// It invokes the provided system.Controller's RestoreBucket with the request context and the extracted bucket name.
// On success it responds with HTTP 204 No Content; on failure it writes an internal server error response.
func restoreBucket(systemController system.Controller) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
bucket := chi.URLParam(r, "bucket")
Expand All @@ -22,4 +25,3 @@ func restoreBucket(systemController system.Controller) http.HandlerFunc {
api.NoContent(w)
}
}

6 changes: 5 additions & 1 deletion internal/api/v2/controllers_ledgers_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ import (
"github.com/formancehq/ledger/internal/controller/system"
)

// listLedgers constructs an HTTP handler that lists ledgers with pagination.
// The handler applies the provided pagination configuration (sorted by "id" ascending),
// reads the "includeDeleted" query parameter to include deleted ledgers when set,
// invokes the controller's ListLedgers, and renders the resulting paginated cursor.
func listLedgers(b system.Controller, paginationConfig common.PaginationConfig) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {

Expand Down Expand Up @@ -39,4 +43,4 @@ func listLedgers(b system.Controller, paginationConfig common.PaginationConfig)

api.RenderCursor(w, *ledgers)
}
}
}
10 changes: 9 additions & 1 deletion internal/api/v2/routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,14 @@ import (
"github.com/go-chi/chi/v5"
)

// NewRouter creates a chi.Router configured with the v2 HTTP API routes for the ledger service.
// It registers authentication-protected top-level endpoints (including /_info), an "/_" group
// that may expose exporter management and bucket operations, ledger-scoped routes (ledger creation,
// metadata, and nested ledger subroutes such as bulk operations, info, stats, pipelines when
// enabled, logs, accounts, transactions, aggregated balances, and volumes), and applies tracing
// attributes for the selected ledger on ledger-scoped requests.
// The behavior of tracing, bulking, bulk handler factories, pagination, and whether exporter-related
// endpoints are mounted is controlled via RouterOption arguments.
func NewRouter(
systemController systemcontroller.Controller,
authenticator auth.Authenticator,
Expand Down Expand Up @@ -180,4 +188,4 @@ var defaultRouterOptions = []RouterOption{
DefaultPageSize: bunpaginate.QueryDefaultPageSize,
MaxPageSize: bunpaginate.MaxPageSize,
}),
}
}
8 changes: 7 additions & 1 deletion internal/controller/system/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,12 @@ func (ctrl *DefaultController) RestoreBucket(ctx context.Context, bucket string)
})))
}

// NewDefaultController creates a DefaultController configured with the provided
// store, listener, replication backend, and optional functional options.
//
// The controller is initialized with a new StateRegistry and a default Numscript
// parser; any of these defaults (and other fields) can be overridden by passing
// Option values. The returned controller is ready for further initialization or use.
func NewDefaultController(
store Driver,
listener ledgercontroller.Listener,
Expand Down Expand Up @@ -296,4 +302,4 @@ func WithEnableFeatures(v bool) Option {
var defaultOptions = []Option{
WithMeterProvider(noopmetrics.MeterProvider{}),
WithTracerProvider(nooptracer.TracerProvider{}),
}
}
4 changes: 3 additions & 1 deletion internal/storage/system/migrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"github.com/uptrace/bun"
)

// GetMigrator creates a Migrator configured with the package's system schema migrations for the given database.
// It appends the system schema option to any provided migration options, registers all system migrations, and returns the configured *migrations.Migrator.
func GetMigrator(db bun.IDB, options ...migrations.Option) *migrations.Migrator {

// configuration table has been removed, we keep the model to keep migrations consistent but the table is not used anymore.
Expand Down Expand Up @@ -373,4 +375,4 @@ BEGIN
END IF;
END;
$$ LANGUAGE plpgsql;
`
`
11 changes: 10 additions & 1 deletion internal/storage/worker_bucket_cleanup.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,10 @@ func (r *BucketCleanupRunner) processBucket(ctx context.Context, bucket string)
return nil
}

// NewBucketCleanupRunner creates a BucketCleanupRunner configured with the provided logger,
// database handle, and configuration, applying any functional options.
//
// The returned runner is ready to be started; provided options override default behavior.
func NewBucketCleanupRunner(logger logging.Logger, db *bun.DB, cfg BucketCleanupRunnerConfig, opts ...BucketCleanupRunnerOption) *BucketCleanupRunner {
ret := &BucketCleanupRunner{
stopChannel: make(chan chan struct{}),
Expand All @@ -126,6 +130,7 @@ func NewBucketCleanupRunner(logger logging.Logger, db *bun.DB, cfg BucketCleanup

type BucketCleanupRunnerOption func(*BucketCleanupRunner)

// WithBucketCleanupRunnerTracer returns a BucketCleanupRunnerOption that sets the OpenTelemetry tracer used by the BucketCleanupRunner.
func WithBucketCleanupRunnerTracer(tracer trace.Tracer) BucketCleanupRunnerOption {
return func(r *BucketCleanupRunner) {
r.tracer = tracer
Expand All @@ -136,6 +141,10 @@ var defaultBucketCleanupRunnerOptions = []BucketCleanupRunnerOption{
WithBucketCleanupRunnerTracer(noop.Tracer{}),
}

// NewBucketCleanupRunnerModule returns an Fx module that provides a configured BucketCleanupRunner
// and registers lifecycle hooks to start it in the background when the application starts and to stop
// it when the application shuts down. The background goroutine will panic if the runner's Run method
// returns an error.
func NewBucketCleanupRunnerModule(cfg BucketCleanupRunnerConfig) fx.Option {
return fx.Options(
fx.Provide(func(logger logging.Logger, db *bun.DB) (*BucketCleanupRunner, error) {
Expand All @@ -156,4 +165,4 @@ func NewBucketCleanupRunnerModule(cfg BucketCleanupRunnerConfig) fx.Option {
})
}),
)
}
}
5 changes: 4 additions & 1 deletion internal/worker/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ type ModuleConfig struct {
BucketCleanupRunnerConfig storage.BucketCleanupRunnerConfig
}

// NewFXModule constructs an fx.Option that installs the storage async block runner,
// the replication worker, and the bucket cleanup runner modules into an Fx application.
// The provided cfg supplies each submodule's configuration.
func NewFXModule(cfg ModuleConfig) fx.Option {
return fx.Options(
// todo: add auto discovery
Expand Down Expand Up @@ -69,4 +72,4 @@ func NewGRPCClientFxModule(
return client, nil
}),
)
}
}
8 changes: 7 additions & 1 deletion pkg/testserver/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,30 +8,36 @@ import (
"time"
)

// LogsHashBlockMaxSizeInstrumentation returns an instrumentation function that appends the worker async block hasher max block size flag and the provided size to the run configuration's CLI arguments.
// The returned function adds the flag with the size formatted as a decimal string and always returns nil.
func LogsHashBlockMaxSizeInstrumentation(size int) testservice.InstrumentationFunc {
return func(ctx context.Context, runConfiguration *testservice.RunConfiguration) error {
runConfiguration.AppendArgs("--"+cmd.WorkerAsyncBlockHasherMaxBlockSizeFlag, strconv.Itoa(size))
return nil
}
}

// LogsHashBlockCRONSpecInstrumentation returns an instrumentation function that appends the async block hasher CRON schedule flag and the given spec to a run configuration.
// The spec parameter is the CRON schedule expression to be passed as the value for the WorkerAsyncBlockHasherScheduleFlag.
func LogsHashBlockCRONSpecInstrumentation(spec string) testservice.InstrumentationFunc {
return func(ctx context.Context, runConfiguration *testservice.RunConfiguration) error {
runConfiguration.AppendArgs("--"+cmd.WorkerAsyncBlockHasherScheduleFlag, spec)
return nil
}
}

// BucketCleanupRetentionPeriodInstrumentation creates an instrumentation function that appends the bucket cleanup retention period flag and its value to a test run configuration.
func BucketCleanupRetentionPeriodInstrumentation(retentionPeriod time.Duration) testservice.InstrumentationFunc {
return func(ctx context.Context, runConfiguration *testservice.RunConfiguration) error {
runConfiguration.AppendArgs("--"+cmd.WorkerBucketCleanupRetentionPeriodFlag, retentionPeriod.String())
return nil
}
}

// BucketCleanupCRONSpecInstrumentation returns an instrumentation function that appends the bucket cleanup CRON schedule flag and the provided CRON spec to a test run configuration.
func BucketCleanupCRONSpecInstrumentation(spec string) testservice.InstrumentationFunc {
return func(ctx context.Context, runConfiguration *testservice.RunConfiguration) error {
runConfiguration.AppendArgs("--"+cmd.WorkerBucketCleanupScheduleFlag, spec)
return nil
}
}
}
Loading