Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 22 additions & 2 deletions cmd/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package cmd

import (
"fmt"
"time"

"github.com/formancehq/go-libs/v3/bun/bunconnect"
"github.com/formancehq/go-libs/v3/otlp/otlpmetrics"
"github.com/formancehq/go-libs/v3/otlp/otlptraces"
Expand All @@ -16,7 +18,6 @@ import (
"go.uber.org/fx"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"time"
)

const (
Expand All @@ -28,6 +29,9 @@ const (
WorkerAsyncBlockHasherMaxBlockSizeFlag = "worker-async-block-hasher-max-block-size"
WorkerAsyncBlockHasherScheduleFlag = "worker-async-block-hasher-schedule"

WorkerBucketCleanupRetentionPeriodFlag = "worker-bucket-cleanup-retention-period"
WorkerBucketCleanupScheduleFlag = "worker-bucket-cleanup-schedule"

WorkerGRPCAddressFlag = "worker-grpc-address"
)

Expand All @@ -43,6 +47,9 @@ type WorkerConfiguration struct {
PullInterval time.Duration `mapstructure:"worker-pipelines-pull-interval"`
SyncPeriod time.Duration `mapstructure:"worker-pipelines-sync-period"`
LogsPageSize uint64 `mapstructure:"worker-pipelines-logs-page-size"`

BucketCleanupRetentionPeriod time.Duration `mapstructure:"worker-bucket-cleanup-retention-period"`
BucketCleanupCRONSpec cron.Schedule `mapstructure:"worker-bucket-cleanup-schedule"`
}

type WorkerCommandConfiguration struct {
Expand All @@ -51,15 +58,22 @@ type WorkerCommandConfiguration struct {
WorkerGRPCConfig `mapstructure:",squash"`
}

// addWorkerFlags adds command-line flags to cmd to configure worker runtime behavior.
// The flags control async block hashing, pipeline pull/push/sync behavior and pagination, and bucket cleanup retention and schedule.
func addWorkerFlags(cmd *cobra.Command) {
cmd.Flags().Int(WorkerAsyncBlockHasherMaxBlockSizeFlag, 1000, "Max block size")
cmd.Flags().String(WorkerAsyncBlockHasherScheduleFlag, "0 * * * * *", "Schedule")
cmd.Flags().Duration(WorkerPipelinesPullIntervalFlag, 5*time.Second, "Pipelines pull interval")
cmd.Flags().Duration(WorkerPipelinesPushRetryPeriodFlag, 10*time.Second, "Pipelines push retry period")
cmd.Flags().Duration(WorkerPipelinesSyncPeriod, time.Minute, "Pipelines sync period")
cmd.Flags().Uint64(WorkerPipelinesLogsPageSize, 100, "Pipelines logs page size")
cmd.Flags().Duration(WorkerBucketCleanupRetentionPeriodFlag, 30*24*time.Hour, "Retention period for deleted buckets before hard delete")
cmd.Flags().String(WorkerBucketCleanupScheduleFlag, "0 0 * * * *", "Schedule for bucket cleanup (cron format)")
}

// NewWorkerCommand constructs the "worker" Cobra command which initializes and runs the worker service using loaded configuration and composed FX modules.
// The command registers worker-specific flags via addWorkerFlags and common service, bunconnect, and OTLP flags, and exposes the --worker-grpc-address flag (default ":8081").
// When executed it loads configuration and starts the service with the configured modules and a gRPC server.
func NewWorkerCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "worker",
Expand Down Expand Up @@ -104,6 +118,8 @@ func NewWorkerCommand() *cobra.Command {
return cmd
}

// newWorkerModule creates an fx.Option that configures the worker module using the provided WorkerConfiguration.
// It maps the configuration into AsyncBlockRunnerConfig, ReplicationConfig, and BucketCleanupRunnerConfig for the worker.
func newWorkerModule(configuration WorkerConfiguration) fx.Option {
return worker.NewFXModule(worker.ModuleConfig{
AsyncBlockRunnerConfig: storage.AsyncBlockRunnerConfig{
Expand All @@ -116,5 +132,9 @@ func newWorkerModule(configuration WorkerConfiguration) fx.Option {
SyncPeriod: configuration.SyncPeriod,
LogsPageSize: configuration.LogsPageSize,
},
BucketCleanupRunnerConfig: storage.BucketCleanupRunnerConfig{
RetentionPeriod: configuration.BucketCleanupRetentionPeriod,
Schedule: configuration.BucketCleanupCRONSpec,
},
})
}
}
105 changes: 105 additions & 0 deletions docs/api/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ Accept: application/json
|---|---|---|---|---|
|pageSize|query|integer(int64)|false|The maximum number of results to return per page.|
|cursor|query|string|false|Parameter used in pagination requests. Maximum page size is set to 15.|
|includeDeleted|query|boolean|false|If true, include deleted ledgers in the results. By default, deleted ledgers are excluded.|
|sort|query|string|false|Sort results using a field name and order (ascending or descending). |
|body|body|object|true|none|

Expand All @@ -148,6 +149,8 @@ Set to the value of next for the next page of results.
Set to the value of previous for the previous page of results.
No other parameters can be set when this parameter is set.

**includeDeleted**: If true, include deleted ledgers in the results. By default, deleted ledgers are excluded.

**sort**: Sort results using a field name and order (ascending or descending).
Format: `<field>:<order>`, where `<field>` is the field name and `<order>` is either `asc` or `desc`.

Expand All @@ -167,6 +170,7 @@ Format: `<field>:<order>`, where `<field>` is the field name and `<order>` is ei
"name": "string",
"addedAt": "2019-08-24T14:15:22Z",
"bucket": "string",
"deletedAt": "2019-08-24T14:15:22Z",
"metadata": {
"admin": "true"
},
Expand Down Expand Up @@ -224,6 +228,7 @@ Accept: application/json
"name": "string",
"addedAt": "2019-08-24T14:15:22Z",
"bucket": "string",
"deletedAt": "2019-08-24T14:15:22Z",
"metadata": {
"admin": "true"
},
Expand Down Expand Up @@ -2421,6 +2426,102 @@ Accept: application/json
This operation does not require authentication
</aside>

## Delete bucket

<a id="opIdv2DeleteBucket"></a>

> Code samples

```http
DELETE http://localhost:8080/v2/_/buckets/{bucket} HTTP/1.1
Host: localhost:8080
Accept: application/json

```

`DELETE /v2/_/buckets/{bucket}`

Delete a bucket by marking all ledgers in the bucket as deleted (soft delete). All ledgers in the bucket will have their deleted_at field set to the current timestamp.

<h3 id="delete-bucket-parameters">Parameters</h3>

|Name|In|Type|Required|Description|
|---|---|---|---|---|
|bucket|path|string|true|The bucket name|

> Example responses

> 404 Response

```json
{
"errorCode": "VALIDATION",
"errorMessage": "[VALIDATION] invalid 'cursor' query param",
"details": "https://play.numscript.org/?payload=eyJlcnJvciI6ImFjY291bnQgaGFkIGluc3VmZmljaWVudCBmdW5kcyJ9"
}
```

<h3 id="delete-bucket-responses">Responses</h3>

|Status|Meaning|Description|Schema|
|---|---|---|---|
|204|[No Content](https://tools.ietf.org/html/rfc7231#section-6.3.5)|Bucket deleted|None|
|404|[Not Found](https://tools.ietf.org/html/rfc7231#section-6.5.4)|Bucket not found|[V2ErrorResponse](#schemav2errorresponse)|
|default|Default|Error|[V2ErrorResponse](#schemav2errorresponse)|

<aside class="warning">
To perform this operation, you must be authenticated by means of one of the following methods:
Authorization ( Scopes: ledger:write )
</aside>

## Restore bucket

<a id="opIdv2RestoreBucket"></a>

> Code samples

```http
POST http://localhost:8080/v2/_/buckets/{bucket}/restore HTTP/1.1
Host: localhost:8080
Accept: application/json

```

`POST /v2/_/buckets/{bucket}/restore`

Restore a deleted bucket by unmarking all ledgers in the bucket as deleted. All ledgers in the bucket will have their deleted_at field set to NULL.

<h3 id="restore-bucket-parameters">Parameters</h3>

|Name|In|Type|Required|Description|
|---|---|---|---|---|
|bucket|path|string|true|The bucket name|

> Example responses

> 404 Response

```json
{
"errorCode": "VALIDATION",
"errorMessage": "[VALIDATION] invalid 'cursor' query param",
"details": "https://play.numscript.org/?payload=eyJlcnJvciI6ImFjY291bnQgaGFkIGluc3VmZmljaWVudCBmdW5kcyJ9"
}
```

<h3 id="restore-bucket-responses">Responses</h3>

|Status|Meaning|Description|Schema|
|---|---|---|---|
|204|[No Content](https://tools.ietf.org/html/rfc7231#section-6.3.5)|Bucket restored|None|
|404|[Not Found](https://tools.ietf.org/html/rfc7231#section-6.5.4)|Bucket not found|[V2ErrorResponse](#schemav2errorresponse)|
|default|Default|Error|[V2ErrorResponse](#schemav2errorresponse)|

<aside class="warning">
To perform this operation, you must be authenticated by means of one of the following methods:
Authorization ( Scopes: ledger:write )
</aside>

## List pipelines

<a id="opIdv2ListPipelines"></a>
Expand Down Expand Up @@ -5281,6 +5382,7 @@ and
"name": "string",
"addedAt": "2019-08-24T14:15:22Z",
"bucket": "string",
"deletedAt": "2019-08-24T14:15:22Z",
"metadata": {
"admin": "true"
},
Expand All @@ -5300,6 +5402,7 @@ and
|name|string|true|none|none|
|addedAt|string(date-time)|true|none|none|
|bucket|string|true|none|none|
|deletedAt|string(date-time)¦null|false|none|none|
|metadata|[V2Metadata](#schemav2metadata)|false|none|none|
|features|object|false|none|none|
|» **additionalProperties**|string|false|none|none|
Expand All @@ -5324,6 +5427,7 @@ and
"name": "string",
"addedAt": "2019-08-24T14:15:22Z",
"bucket": "string",
"deletedAt": "2019-08-24T14:15:22Z",
"metadata": {
"admin": "true"
},
Expand Down Expand Up @@ -5381,6 +5485,7 @@ and
"name": "string",
"addedAt": "2019-08-24T14:15:22Z",
"bucket": "string",
"deletedAt": "2019-08-24T14:15:22Z",
"metadata": {
"admin": "true"
},
Expand Down
29 changes: 15 additions & 14 deletions internal/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ type BalancesByAssetsByAccounts map[string]BalancesByAssets
```

<a name="Configuration"></a>
## type [Configuration](<https://github.com/formancehq/ledger/blob/main/internal/ledger.go#L92-L96>)
## type [Configuration](<https://github.com/formancehq/ledger/blob/main/internal/ledger.go#L93-L97>)



Expand All @@ -349,7 +349,7 @@ type Configuration struct {
```

<a name="NewDefaultConfiguration"></a>
### func [NewDefaultConfiguration](<https://github.com/formancehq/ledger/blob/main/internal/ledger.go#L123>)
### func [NewDefaultConfiguration](<https://github.com/formancehq/ledger/blob/main/internal/ledger.go#L124>)

```go
func NewDefaultConfiguration() Configuration
Expand All @@ -358,7 +358,7 @@ func NewDefaultConfiguration() Configuration


<a name="Configuration.SetDefaults"></a>
### func \(\*Configuration\) [SetDefaults](<https://github.com/formancehq/ledger/blob/main/internal/ledger.go#L98>)
### func \(\*Configuration\) [SetDefaults](<https://github.com/formancehq/ledger/blob/main/internal/ledger.go#L99>)

```go
func (c *Configuration) SetDefaults()
Expand All @@ -367,7 +367,7 @@ func (c *Configuration) SetDefaults()


<a name="Configuration.Validate"></a>
### func \(\*Configuration\) [Validate](<https://github.com/formancehq/ledger/blob/main/internal/ledger.go#L113>)
### func \(\*Configuration\) [Validate](<https://github.com/formancehq/ledger/blob/main/internal/ledger.go#L114>)

```go
func (c *Configuration) Validate() error
Expand Down Expand Up @@ -648,7 +648,7 @@ func NewExporterConfiguration(driver string, config json.RawMessage) ExporterCon


<a name="Ledger"></a>
## type [Ledger](<https://github.com/formancehq/ledger/blob/main/internal/ledger.go#L18-L26>)
## type [Ledger](<https://github.com/formancehq/ledger/blob/main/internal/ledger.go#L18-L27>)



Expand All @@ -657,15 +657,16 @@ type Ledger struct {
bun.BaseModel `bun:"_system.ledgers,alias:ledgers"`

Configuration
ID int `json:"id" bun:"id,type:int,scanonly"`
Name string `json:"name" bun:"name,type:varchar(255),pk"`
AddedAt time.Time `json:"addedAt" bun:"added_at,type:timestamp,nullzero"`
State string `json:"-" bun:"state,type:varchar(255),nullzero"`
ID int `json:"id" bun:"id,type:int,scanonly"`
Name string `json:"name" bun:"name,type:varchar(255),pk"`
AddedAt time.Time `json:"addedAt" bun:"added_at,type:timestamp,nullzero"`
State string `json:"-" bun:"state,type:varchar(255),nullzero"`
DeletedAt *time.Time `json:"deletedAt,omitempty" bun:"deleted_at,type:timestamp,nullzero"`
}
```

<a name="MustNewWithDefault"></a>
### func [MustNewWithDefault](<https://github.com/formancehq/ledger/blob/main/internal/ledger.go#L68>)
### func [MustNewWithDefault](<https://github.com/formancehq/ledger/blob/main/internal/ledger.go#L69>)

```go
func MustNewWithDefault(name string) Ledger
Expand All @@ -674,7 +675,7 @@ func MustNewWithDefault(name string) Ledger


<a name="New"></a>
### func [New](<https://github.com/formancehq/ledger/blob/main/internal/ledger.go#L41>)
### func [New](<https://github.com/formancehq/ledger/blob/main/internal/ledger.go#L42>)

```go
func New(name string, configuration Configuration) (*Ledger, error)
Expand All @@ -683,7 +684,7 @@ func New(name string, configuration Configuration) (*Ledger, error)


<a name="NewWithDefaults"></a>
### func [NewWithDefaults](<https://github.com/formancehq/ledger/blob/main/internal/ledger.go#L64>)
### func [NewWithDefaults](<https://github.com/formancehq/ledger/blob/main/internal/ledger.go#L65>)

```go
func NewWithDefaults(name string) (*Ledger, error)
Expand All @@ -692,7 +693,7 @@ func NewWithDefaults(name string) (*Ledger, error)


<a name="Ledger.HasFeature"></a>
### func \(Ledger\) [HasFeature](<https://github.com/formancehq/ledger/blob/main/internal/ledger.go#L28>)
### func \(Ledger\) [HasFeature](<https://github.com/formancehq/ledger/blob/main/internal/ledger.go#L29>)

```go
func (l Ledger) HasFeature(feature, value string) bool
Expand All @@ -701,7 +702,7 @@ func (l Ledger) HasFeature(feature, value string) bool


<a name="Ledger.WithMetadata"></a>
### func \(Ledger\) [WithMetadata](<https://github.com/formancehq/ledger/blob/main/internal/ledger.go#L36>)
### func \(Ledger\) [WithMetadata](<https://github.com/formancehq/ledger/blob/main/internal/ledger.go#L37>)

```go
func (l Ledger) WithMetadata(m metadata.Metadata) Ledger
Expand Down
28 changes: 28 additions & 0 deletions internal/api/common/mocks_system_controller_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading