Skip to content

Commit d84cc02

Browse files
committed
feat(performance): batch accounts insertions
1 parent 819405f commit d84cc02

File tree

10 files changed

+65
-74
lines changed

10 files changed

+65
-74
lines changed

internal/controller/ledger/controller_default.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -444,7 +444,7 @@ func (ctrl *DefaultController) SaveTransactionMetadata(ctx context.Context, para
444444
}
445445

446446
func (ctrl *DefaultController) saveAccountMetadata(ctx context.Context, store Store, parameters Parameters[SaveAccountMetadata]) (*ledger.SavedMetadata, error) {
447-
if _, err := store.UpsertAccount(ctx, &ledger.Account{
447+
if err := store.UpsertAccounts(ctx, &ledger.Account{
448448
Address: parameters.Input.Address,
449449
Metadata: parameters.Input.Metadata,
450450
}); err != nil {

internal/controller/ledger/store.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ type Store interface {
4646
DeleteTransactionMetadata(ctx context.Context, transactionID int, key string) (*ledger.Transaction, bool, error)
4747
UpdateAccountsMetadata(ctx context.Context, m map[string]metadata.Metadata) error
4848
// UpsertAccount returns a boolean indicating if the account was upserted
49-
UpsertAccount(ctx context.Context, account *ledger.Account) (bool, error)
49+
UpsertAccounts(ctx context.Context, accounts ...*ledger.Account) error
5050
DeleteAccountMetadata(ctx context.Context, address, key string) error
5151
InsertLog(ctx context.Context, log *ledger.Log) error
5252

internal/controller/ledger/store_generated_test.go

+13-9
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

internal/storage/ledger/accounts.go

+24-44
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package ledger
22

33
import (
44
"context"
5-
"database/sql"
65
"fmt"
76
. "github.com/formancehq/go-libs/v2/bun/bunpaginate"
87
"github.com/formancehq/ledger/pkg/features"
@@ -12,11 +11,8 @@ import (
1211

1312
"github.com/formancehq/go-libs/v2/metadata"
1413
"github.com/formancehq/go-libs/v2/platform/postgres"
15-
ledgercontroller "github.com/formancehq/ledger/internal/controller/ledger"
16-
"go.opentelemetry.io/otel/attribute"
17-
"go.opentelemetry.io/otel/trace"
18-
1914
"github.com/formancehq/go-libs/v2/time"
15+
ledgercontroller "github.com/formancehq/ledger/internal/controller/ledger"
2016

2117
"github.com/formancehq/go-libs/v2/query"
2218
ledger "github.com/formancehq/ledger/internal"
@@ -333,45 +329,29 @@ func (s *Store) DeleteAccountMetadata(ctx context.Context, account, key string)
333329
return err
334330
}
335331

336-
// todo: since we update first balances of an accounts in the transaction process, we can avoid nested sql txs
337-
// while upserting account and upsert them all in one shot
338-
func (s *Store) UpsertAccount(ctx context.Context, account *ledger.Account) (bool, error) {
339-
return tracing.TraceWithMetric(
332+
func (s *Store) UpsertAccounts(ctx context.Context, accounts ...*ledger.Account) error {
333+
return tracing.SkipResult(tracing.TraceWithMetric(
340334
ctx,
341-
"UpsertAccount",
335+
"UpsertAccounts",
342336
s.tracer,
343-
s.upsertAccountHistogram,
344-
func(ctx context.Context) (bool, error) {
345-
upserted := false
346-
err := s.db.RunInTx(ctx, &sql.TxOptions{}, func(ctx context.Context, tx bun.Tx) error {
347-
ret, err := tx.NewInsert().
348-
Model(account).
349-
ModelTableExpr(s.GetPrefixedRelationName("accounts")).
350-
On("conflict (ledger, address) do update").
351-
Set("first_usage = case when ? < excluded.first_usage then ? else excluded.first_usage end", account.FirstUsage, account.FirstUsage).
352-
Set("metadata = accounts.metadata || excluded.metadata").
353-
Set("updated_at = excluded.updated_at").
354-
Value("ledger", "?", s.ledger.Name).
355-
Returning("*").
356-
Where("(? < accounts.first_usage) or not accounts.metadata @> excluded.metadata", account.FirstUsage).
357-
Exec(ctx)
358-
if err != nil {
359-
return err
360-
}
361-
rowsModified, err := ret.RowsAffected()
362-
if err != nil {
363-
return err
364-
}
365-
upserted = rowsModified > 0
366-
return nil
367-
})
368-
return upserted, postgres.ResolveError(err)
369-
},
370-
func(ctx context.Context, upserted bool) {
371-
trace.SpanFromContext(ctx).SetAttributes(
372-
attribute.String("address", account.Address),
373-
attribute.Bool("upserted", upserted),
374-
)
375-
},
376-
)
337+
s.upsertAccountsHistogram,
338+
tracing.NoResult(func(ctx context.Context) error {
339+
_, err := s.db.NewInsert().
340+
Model(&accounts).
341+
ModelTableExpr(s.GetPrefixedRelationName("accounts")).
342+
On("conflict (ledger, address) do update").
343+
Set("first_usage = case when excluded.first_usage < accounts.first_usage then excluded.first_usage else accounts.first_usage end").
344+
Set("metadata = accounts.metadata || excluded.metadata").
345+
Set("updated_at = excluded.updated_at").
346+
Value("ledger", "?", s.ledger.Name).
347+
Returning("*").
348+
Where("(excluded.first_usage < accounts.first_usage) or not accounts.metadata @> excluded.metadata").
349+
Exec(ctx)
350+
if err != nil {
351+
return fmt.Errorf("upserting accounts: %w", postgres.ResolveError(err))
352+
}
353+
354+
return nil
355+
}),
356+
))
377357
}

internal/storage/ledger/accounts_test.go

+16-9
Original file line numberDiff line numberDiff line change
@@ -402,22 +402,30 @@ func TestAccountsUpsert(t *testing.T) {
402402
store := newLedgerStore(t)
403403
ctx := logging.TestingContext()
404404

405-
account := ledger.Account{
405+
account1 := ledger.Account{
406406
Address: "foo",
407407
}
408408

409+
account2 := ledger.Account{
410+
Address: "foo2",
411+
}
412+
409413
// Initial insert
410-
upserted, err := store.UpsertAccount(ctx, &account)
414+
err := store.UpsertAccounts(ctx, &account1, &account2)
411415
require.NoError(t, err)
412-
require.True(t, upserted)
413-
require.NotEmpty(t, account.FirstUsage)
414-
require.NotEmpty(t, account.InsertionDate)
415-
require.NotEmpty(t, account.UpdatedAt)
416+
417+
require.NotEmpty(t, account1.FirstUsage)
418+
require.NotEmpty(t, account1.InsertionDate)
419+
require.NotEmpty(t, account1.UpdatedAt)
420+
421+
require.NotEmpty(t, account2.FirstUsage)
422+
require.NotEmpty(t, account2.InsertionDate)
423+
require.NotEmpty(t, account2.UpdatedAt)
416424

417425
now := time.Now()
418426

419427
// Reset the account model
420-
account = ledger.Account{
428+
account1 = ledger.Account{
421429
Address: "foo",
422430
// The account will be upserted on the timeline after its initial usage.
423431
// The upsert should not modify anything, but, it should retrieve and load the account entity
@@ -427,7 +435,6 @@ func TestAccountsUpsert(t *testing.T) {
427435
}
428436

429437
// Upsert with no modification
430-
upserted, err = store.UpsertAccount(ctx, &account)
438+
err = store.UpsertAccounts(ctx, &account1)
431439
require.NoError(t, err)
432-
require.False(t, upserted)
433440
}

internal/storage/ledger/balances_test.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ func TestBalancesGet(t *testing.T) {
3333
UpdatedAt: time.Now(),
3434
FirstUsage: time.Now(),
3535
}
36-
_, err := store.UpsertAccount(ctx, world)
36+
err := store.UpsertAccounts(ctx, world)
3737
require.NoError(t, err)
3838

3939
_, err = store.UpdateVolumes(ctx, ledger.AccountsVolumes{
@@ -146,7 +146,7 @@ func TestBalancesGet(t *testing.T) {
146146
InsertionDate: tx.InsertedAt,
147147
UpdatedAt: tx.InsertedAt,
148148
}
149-
_, err = store.UpsertAccount(ctx, &bankAccount)
149+
err = store.UpsertAccounts(ctx, &bankAccount)
150150
require.NoError(t, err)
151151

152152
err = store.InsertMoves(ctx, &ledger.Move{

internal/storage/ledger/legacy/adapters.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,8 @@ func (d *DefaultStoreAdapter) UpdateAccountsMetadata(ctx context.Context, m map[
4646
return d.newStore.UpdateAccountsMetadata(ctx, m)
4747
}
4848

49-
func (d *DefaultStoreAdapter) UpsertAccount(ctx context.Context, account *ledger.Account) (bool, error) {
50-
return d.newStore.UpsertAccount(ctx, account)
49+
func (d *DefaultStoreAdapter) UpsertAccounts(ctx context.Context, accounts ... *ledger.Account) error {
50+
return d.newStore.UpsertAccounts(ctx, accounts...)
5151
}
5252

5353
func (d *DefaultStoreAdapter) DeleteAccountMetadata(ctx context.Context, address, key string) error {

internal/storage/ledger/moves_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ func TestMovesInsert(t *testing.T) {
3838
account := &ledger.Account{
3939
Address: "world",
4040
}
41-
_, err := store.UpsertAccount(ctx, account)
41+
err := store.UpsertAccounts(ctx, account)
4242
require.NoError(t, err)
4343

4444
now := time.Now()

internal/storage/ledger/store.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,9 @@ type Store struct {
3131
getAccountHistogram metric.Int64Histogram
3232
countAccountsHistogram metric.Int64Histogram
3333
updateAccountsMetadataHistogram metric.Int64Histogram
34-
deleteAccountMetadataHistogram metric.Int64Histogram
35-
upsertAccountHistogram metric.Int64Histogram
36-
getBalancesHistogram metric.Int64Histogram
34+
deleteAccountMetadataHistogram metric.Int64Histogram
35+
upsertAccountsHistogram metric.Int64Histogram
36+
getBalancesHistogram metric.Int64Histogram
3737
insertLogHistogram metric.Int64Histogram
3838
listLogsHistogram metric.Int64Histogram
3939
readLogWithIdempotencyKeyHistogram metric.Int64Histogram
@@ -154,7 +154,7 @@ func New(db bun.IDB, bucket bucket.Bucket, ledger ledger.Ledger, opts ...Option)
154154
panic(err)
155155
}
156156

157-
ret.upsertAccountHistogram, err = ret.meter.Int64Histogram("store.upsertAccount")
157+
ret.upsertAccountsHistogram, err = ret.meter.Int64Histogram("store.upsertAccounts")
158158
if err != nil {
159159
panic(err)
160160
}

internal/storage/ledger/transactions.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -255,7 +255,7 @@ func (s *Store) CommitTransaction(ctx context.Context, tx *ledger.Transaction) e
255255
}
256256

257257
for _, address := range tx.InvolvedAccounts() {
258-
_, err := s.UpsertAccount(ctx, &ledger.Account{
258+
err := s.UpsertAccounts(ctx, &ledger.Account{
259259
Address: address,
260260
FirstUsage: tx.Timestamp,
261261
Metadata: make(metadata.Metadata),

0 commit comments

Comments
 (0)