Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 4 additions & 18 deletions internal/controller/ledger/controller_default.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"math/big"
"reflect"

. "github.com/formancehq/go-libs/v2/collectionutils"
"github.com/formancehq/go-libs/v2/pointer"
"github.com/formancehq/go-libs/v2/time"
"github.com/formancehq/ledger/pkg/features"
Expand Down Expand Up @@ -199,17 +198,10 @@ func (ctrl *DefaultController) importLog(ctx context.Context, log ledger.Log) er
switch payload := log.Data.(type) {
case ledger.CreatedTransaction:
logging.FromContext(ctx).Debugf("Importing transaction %d", *payload.Transaction.ID)
if err := ctrl.store.CommitTransaction(ctx, &payload.Transaction); err != nil {
if err := ctrl.store.CommitTransaction(ctx, &payload.Transaction, payload.AccountMetadata); err != nil {
return fmt.Errorf("failed to commit transaction: %w", err)
}
logging.FromContext(ctx).Debugf("Imported transaction %d", *payload.Transaction.ID)

if len(payload.AccountMetadata) > 0 {
logging.FromContext(ctx).Debugf("Importing metadata of accounts '%s'", Keys(payload.AccountMetadata))
if err := ctrl.store.UpdateAccountsMetadata(ctx, payload.AccountMetadata); err != nil {
return fmt.Errorf("updating metadata of accounts '%s': %w", Keys(payload.AccountMetadata), err)
}
}
case ledger.RevertedTransaction:
logging.FromContext(ctx).Debugf("Reverting transaction %d", *payload.RevertedTransaction.ID)
_, _, err := ctrl.store.RevertTransaction(
Expand All @@ -220,7 +212,7 @@ func (ctrl *DefaultController) importLog(ctx context.Context, log ledger.Log) er
if err != nil {
return fmt.Errorf("failed to revert transaction: %w", err)
}
if err := ctrl.store.CommitTransaction(ctx, &payload.RevertTransaction); err != nil {
if err := ctrl.store.CommitTransaction(ctx, &payload.RevertTransaction, nil); err != nil {
return fmt.Errorf("failed to commit transaction: %w", err)
}
case ledger.SavedMetadata:
Expand Down Expand Up @@ -332,17 +324,11 @@ func (ctrl *DefaultController) createTransaction(ctx context.Context, store Stor
WithMetadata(finalMetadata).
WithTimestamp(parameters.Input.Timestamp).
WithReference(parameters.Input.Reference)
err = store.CommitTransaction(ctx, &transaction)
err = store.CommitTransaction(ctx, &transaction, result.AccountMetadata)
if err != nil {
return nil, err
}

if len(result.AccountMetadata) > 0 {
if err := store.UpdateAccountsMetadata(ctx, result.AccountMetadata); err != nil {
return nil, fmt.Errorf("updating metadata of account '%s': %w", Keys(result.AccountMetadata), err)
}
}

return &ledger.CreatedTransaction{
Transaction: transaction,
AccountMetadata: result.AccountMetadata,
Expand Down Expand Up @@ -407,7 +393,7 @@ func (ctrl *DefaultController) revertTransaction(ctx context.Context, store Stor
}
}

err = store.CommitTransaction(ctx, &reversedTx)
err = store.CommitTransaction(ctx, &reversedTx, nil)
if err != nil {
return nil, fmt.Errorf("failed to insert transaction: %w", err)
}
Expand Down
4 changes: 2 additions & 2 deletions internal/controller/ledger/controller_default_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func TestCreateTransaction(t *testing.T) {
}, nil)

store.EXPECT().
CommitTransaction(gomock.Any(), gomock.Any()).
CommitTransaction(gomock.Any(), gomock.Any(), nil).
Return(nil)

store.EXPECT().
Expand Down Expand Up @@ -101,7 +101,7 @@ func TestRevertTransaction(t *testing.T) {
Return(map[string]map[string]*big.Int{}, nil)

store.EXPECT().
CommitTransaction(gomock.Any(), gomock.Any()).
CommitTransaction(gomock.Any(), gomock.Any(), nil).
Return(nil)

store.EXPECT().
Expand Down
2 changes: 1 addition & 1 deletion internal/controller/ledger/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type Store interface {

// GetBalances must returns balance and lock account until the end of the TX
GetBalances(ctx context.Context, query BalanceQuery) (Balances, error)
CommitTransaction(ctx context.Context, transaction *ledger.Transaction) error
CommitTransaction(ctx context.Context, transaction *ledger.Transaction, accountMetadata map[string]metadata.Metadata) error
// RevertTransaction revert the transaction with identifier id
// It returns :
// * the reverted transaction
Expand Down
8 changes: 4 additions & 4 deletions internal/controller/ledger/store_generated_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 7 additions & 7 deletions internal/storage/ledger/accounts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func TestAccountsList(t *testing.T) {
err := store.CommitTransaction(ctx, pointer.For(ledger.NewTransaction().
WithPostings(ledger.NewPosting("world", "account:1", "USD", big.NewInt(100))).
WithTimestamp(now).
WithInsertedAt(now)))
WithInsertedAt(now)), nil)
require.NoError(t, err)

require.NoError(t, store.UpdateAccountsMetadata(ctx, map[string]metadata.Metadata{
Expand All @@ -55,19 +55,19 @@ func TestAccountsList(t *testing.T) {
err = store.CommitTransaction(ctx, pointer.For(ledger.NewTransaction().
WithPostings(ledger.NewPosting("world", "account:1", "USD", big.NewInt(100))).
WithTimestamp(now.Add(4*time.Minute)).
WithInsertedAt(now.Add(100*time.Millisecond))))
WithInsertedAt(now.Add(100*time.Millisecond))), nil)
require.NoError(t, err)

err = store.CommitTransaction(ctx, pointer.For(ledger.NewTransaction().
WithPostings(ledger.NewPosting("account:1", "bank", "USD", big.NewInt(50))).
WithTimestamp(now.Add(3*time.Minute)).
WithInsertedAt(now.Add(200*time.Millisecond))))
WithInsertedAt(now.Add(200*time.Millisecond))), nil)
require.NoError(t, err)

err = store.CommitTransaction(ctx, pointer.For(ledger.NewTransaction().
WithPostings(ledger.NewPosting("world", "account:1", "USD", big.NewInt(0))).
WithTimestamp(now.Add(-time.Minute)).
WithInsertedAt(now.Add(200*time.Millisecond))))
WithInsertedAt(now.Add(200*time.Millisecond))), nil)
require.NoError(t, err)

t.Run("list all", func(t *testing.T) {
Expand Down Expand Up @@ -305,7 +305,7 @@ func TestAccountsGet(t *testing.T) {
tx1 := pointer.For(ledger.NewTransaction().WithPostings(
ledger.NewPosting("world", "multi", "USD/2", big.NewInt(100)),
).WithTimestamp(now))
err := store.CommitTransaction(ctx, tx1)
err := store.CommitTransaction(ctx, tx1, nil)
require.NoError(t, err)

// sleep for at least the time precision to ensure the next transaction is inserted with a different timestamp
Expand All @@ -320,7 +320,7 @@ func TestAccountsGet(t *testing.T) {
tx2 := pointer.For(ledger.NewTransaction().WithPostings(
ledger.NewPosting("world", "multi", "USD/2", big.NewInt(0)),
).WithTimestamp(now.Add(-time.Minute)))
err = store.CommitTransaction(ctx, tx2)
err = store.CommitTransaction(ctx, tx2, nil)
require.NoError(t, err)

t.Run("find account", func(t *testing.T) {
Expand Down Expand Up @@ -446,7 +446,7 @@ func TestAccountsCount(t *testing.T) {

err := store.CommitTransaction(ctx, pointer.For(ledger.NewTransaction().WithPostings(
ledger.NewPosting("world", "central_bank", "USD/2", big.NewInt(100)),
)))
)), nil)
require.NoError(t, err)

countAccounts, err := store.Accounts().Count(ctx, ledgercontroller.ResourceQuery[any]{})
Expand Down
4 changes: 2 additions & 2 deletions internal/storage/ledger/balances_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ func TestBalancesAggregates(t *testing.T) {
).
WithTimestamp(now).
WithInsertedAt(now)
err := store.CommitTransaction(ctx, &tx1)
err := store.CommitTransaction(ctx, &tx1, nil)
require.NoError(t, err)

tx2 := ledger.NewTransaction().
Expand All @@ -185,7 +185,7 @@ func TestBalancesAggregates(t *testing.T) {
).
WithTimestamp(now.Add(-time.Minute)).
WithInsertedAt(now.Add(time.Minute))
err = store.CommitTransaction(ctx, &tx2)
err = store.CommitTransaction(ctx, &tx2, nil)
require.NoError(t, err)

require.NoError(t, store.UpdateAccountsMetadata(ctx, map[string]metadata.Metadata{
Expand Down
2 changes: 1 addition & 1 deletion internal/storage/ledger/moves_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ func TestMovesInsert(t *testing.T) {
tx := ledger.NewTransaction().WithPostings(
ledger.NewPosting(src, dst, "USD", big.NewInt(1)),
)
err = storeCP.CommitTransaction(ctx, &tx)
err = storeCP.CommitTransaction(ctx, &tx, nil)
if errors.Is(err, postgres.ErrDeadlockDetected) {
require.NoError(t, sqlTx.Rollback())
continue
Expand Down
7 changes: 5 additions & 2 deletions internal/storage/ledger/transactions.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,10 @@ var (
metadataRegex = regexp.MustCompile(`metadata\[(.+)]`)
)

func (store *Store) CommitTransaction(ctx context.Context, tx *ledger.Transaction) error {
func (store *Store) CommitTransaction(ctx context.Context, tx *ledger.Transaction, accountMetadata map[string]metadata.Metadata) error {
if accountMetadata == nil {
accountMetadata = make(map[string]metadata.Metadata)
}

postCommitVolumes, err := store.UpdateVolumes(ctx, tx.VolumeUpdates()...)
if err != nil {
Expand All @@ -51,7 +54,7 @@ func (store *Store) CommitTransaction(ctx context.Context, tx *ledger.Transactio
return &ledger.Account{
Address: address,
FirstUsage: tx.Timestamp,
Metadata: make(metadata.Metadata),
Metadata: accountMetadata[address],
InsertionDate: tx.InsertedAt,
UpdatedAt: tx.InsertedAt,
}
Expand Down
Loading
Loading