Skip to content

Commit c0c32f6

Browse files
committed
fix: invalid date format
1 parent eb07b24 commit c0c32f6

File tree

10 files changed

+154
-7
lines changed

10 files changed

+154
-7
lines changed

internal/storage/bucket/bucket.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ type Bucket interface {
1717
IsUpToDate(ctx context.Context, db bun.IDB) (bool, error)
1818
GetMigrationsInfo(ctx context.Context, db bun.IDB) ([]migrations.Info, error)
1919
IsInitialized(context.Context, bun.IDB) (bool, error)
20+
GetLastVersion(ctx context.Context, db bun.IDB) (int, error)
2021
}
2122

2223
type Factory interface {

internal/storage/bucket/bucket_generated_test.go

Lines changed: 15 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

internal/storage/bucket/migrations/0-init-schema/up_tests_after.sql

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ select
1212
('{'
1313
'"transaction": {'
1414
'"id": ' || (seq/5) + (seq % 5) || ','
15-
'"timestamp": ' || to_json(now()::timestamp without time zone) || ','
15+
'"timestamp": "' || (to_json(now()::timestamp without time zone)#>>'{}') || 'Z",'
1616
'"postings": ['
1717
'{'
1818
'"destination": "sellers:' || (seq % 5) || '",'
@@ -58,7 +58,7 @@ select
5858
('{'
5959
'"transaction": {'
6060
'"id": ' || (seq/5) + (seq % 5) || ','
61-
'"timestamp": ' || to_json(now()::timestamp without time zone) || ','
61+
'"timestamp": "' || (to_json(now()::timestamp without time zone)#>>'{}') || 'Z",'
6262
'"postings": ['
6363
'{'
6464
'"source": "sellers:' || (seq % 5) || '",'

internal/storage/bucket/migrations/32-fix-log-data-for-reverted-transactions/up_tests_after.sql

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
do $$
22
declare
33
expected varchar = '{"transaction": {"id": 22, "metadata": {"tax": "1%"}, "postings": [{"asset": "USD", "amount": 99, "source": "sellers:0", "destination": "orders:10"}, {"asset": "USD", "amount": 1, "source": "fees", "destination": "orders:10"}, {"asset": "USD", "amount": 100, "source": "orders:10", "destination": "world"}, {"asset": "SELL", "amount": 1, "source": "sellers:0", "destination": "world"}], ' ||
4-
'"timestamp": ' ||
5-
(select to_json(timestamp) from "{{.Schema}}".transactions where id = 22 and ledger = 'ledger0')
6-
|| '}, "revertedTransaction": {"id": 2, "metadata": {"tax": "1%"}, "postings": [{"asset": "SELL", "amount": 1, "source": "world", "destination": "sellers:0"}, {"asset": "USD", "amount": 100, "source": "world", "destination": "orders:10"}, {"asset": "USD", "amount": 1, "source": "orders:10", "destination": "fees"}, {"asset": "USD", "amount": 99, "source": "orders:10", "destination": "sellers:0"}], "reverted": true, "reference": null, "timestamp": ' ||
4+
'"timestamp": "' ||
5+
(select to_json(timestamp)#>>'{}' from "{{.Schema}}".transactions where id = 22 and ledger = 'ledger0')
6+
|| 'Z"}, "revertedTransaction": {"id": 2, "metadata": {"tax": "1%"}, "postings": [{"asset": "SELL", "amount": 1, "source": "world", "destination": "sellers:0"}, {"asset": "USD", "amount": 100, "source": "world", "destination": "orders:10"}, {"asset": "USD", "amount": 1, "source": "orders:10", "destination": "fees"}, {"asset": "USD", "amount": 99, "source": "orders:10", "destination": "sellers:0"}], "reverted": true, "reference": null, "timestamp": ' ||
77
(select to_json(timestamp) from "{{.Schema}}".transactions where id = 2 and ledger = 'ledger0') ||
88
', "insertedAt": ' ||
99
(select to_json(inserted_at) from "{{.Schema}}".transactions where id = 2 and ledger = 'ledger0') ||
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
name: Fix invalid date format
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
do $$
2+
declare
3+
_offset integer := 0;
4+
_batch_size integer := 1000;
5+
begin
6+
set search_path = '{{ .Schema }}';
7+
8+
drop table if exists txs_view;
9+
10+
create temp table txs_view as
11+
with reversed as (
12+
select
13+
ledger,
14+
id,
15+
(convert_from(memento, 'UTF-8')::jsonb ->> 'revertedTransactionID')::numeric as revertedTransactionID
16+
from logs
17+
where type = 'REVERTED_TRANSACTION' and data->>'revertedTransactionID' is not null
18+
)
19+
select reversed.id as log_id, transactions.*
20+
from transactions
21+
join reversed on
22+
reversed.revertedTransactionID = transactions.id and
23+
reversed.ledger = transactions.ledger;
24+
25+
create index txs_view_idx on txs_view(log_id, id);
26+
27+
if (select count(*) from txs_view) = 0 then
28+
return;
29+
end if;
30+
31+
perform pg_notify('migrations-{{ .Schema }}', 'init: ' || (select count(*) from txs_view));
32+
33+
loop
34+
with data as (
35+
select *
36+
from txs_view
37+
order by ledger, log_id, id
38+
offset _offset
39+
limit _batch_size
40+
)
41+
update logs
42+
set data = data || jsonb_build_object('revertedTransaction', jsonb_build_object(
43+
'id', data.id,
44+
'postings', data.postings::jsonb,
45+
'metadata', data.metadata,
46+
'reverted', true,
47+
'revertedAt', to_json(data.reverted_at)#>>'{}' || 'Z',
48+
'insertedAt', to_json(data.inserted_at)#>>'{}' || 'Z',
49+
'timestamp', to_json(data.timestamp)#>>'{}' || 'Z',
50+
'reference', case when data.reference is not null and data.reference <> '' then data.reference end,
51+
'postCommitVolumes', data.post_commit_volumes
52+
))
53+
from data
54+
where logs.id = data.log_id and
55+
logs.ledger = data.ledger;
56+
57+
exit when not found;
58+
59+
_offset = _offset + _batch_size;
60+
61+
perform pg_notify('migrations-{{ .Schema }}', 'continue: ' || _batch_size);
62+
63+
commit;
64+
end loop;
65+
66+
drop table if exists txs_view;
67+
end
68+
$$;
69+
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
do $$
2+
declare
3+
expected varchar = '{"transaction": {"id": 22, "metadata": {"tax": "1%"}, "postings": [{"asset": "USD", "amount": 99, "source": "sellers:0", "destination": "orders:10"}, {"asset": "USD", "amount": 1, "source": "fees", "destination": "orders:10"}, {"asset": "USD", "amount": 100, "source": "orders:10", "destination": "world"}, {"asset": "SELL", "amount": 1, "source": "sellers:0", "destination": "world"}], ' ||
4+
'"timestamp": "' ||
5+
(select to_json(timestamp)#>>'{}' from "{{.Schema}}".transactions where id = 22 and ledger = 'ledger0')
6+
|| 'Z"}, "revertedTransaction": {"id": 2, "metadata": {"tax": "1%"}, "postings": [{"asset": "SELL", "amount": 1, "source": "world", "destination": "sellers:0"}, {"asset": "USD", "amount": 100, "source": "world", "destination": "orders:10"}, {"asset": "USD", "amount": 1, "source": "orders:10", "destination": "fees"}, {"asset": "USD", "amount": 99, "source": "orders:10", "destination": "sellers:0"}], "reverted": true, "reference": null, "timestamp": "' ||
7+
(select to_json(timestamp)#>>'{}' from "{{.Schema}}".transactions where id = 2 and ledger = 'ledger0') ||
8+
'Z", "insertedAt": "' ||
9+
(select to_json(inserted_at)#>>'{}' from "{{.Schema}}".transactions where id = 2 and ledger = 'ledger0') ||
10+
'Z", "revertedAt": "' ||
11+
(select to_json(reverted_at)#>>'{}' from "{{.Schema}}".transactions where id = 2 and ledger = 'ledger0') ||
12+
'Z", "postCommitVolumes": {"fees": {"USD": {"input": 3, "output": 0}}, "world": {"USD": {"input": 0, "output": 300}, "SELL": {"input": 0, "output": 3}}, "orders:10": {"USD": {"input": 100, "output": 100}}, "sellers:0": {"USD": {"input": 297, "output": 0}, "SELL": {"input": 3, "output": 0}}}}, "revertedTransactionID": "2"}';
13+
begin
14+
set search_path = '{{.Schema}}';
15+
assert (select data::varchar from logs where id = 22 and ledger = 'ledger0') = expected,
16+
'data should be equals to ' || expected || ' but was ' || (select data::varchar from logs where id = 22 and ledger = 'ledger0');
17+
end;
18+
$$

internal/storage/bucket/migrations_test.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,19 +3,24 @@
33
package bucket_test
44

55
import (
6+
"context"
67
"errors"
78
"fmt"
89
"github.com/formancehq/go-libs/v2/bun/bunconnect"
10+
"github.com/formancehq/go-libs/v2/bun/bunpaginate"
911
"github.com/formancehq/go-libs/v2/logging"
1012
"github.com/formancehq/go-libs/v2/migrations"
1113
"github.com/formancehq/go-libs/v2/pointer"
1214
ledger "github.com/formancehq/ledger/internal"
15+
ledgercontroller "github.com/formancehq/ledger/internal/controller/ledger"
1316
"github.com/formancehq/ledger/internal/storage/bucket"
17+
ledgerstore "github.com/formancehq/ledger/internal/storage/ledger"
1418
"github.com/formancehq/ledger/internal/storage/system"
1519
"github.com/google/uuid"
1620
_ "github.com/jackc/pgx/v5/stdlib"
1721
"github.com/stretchr/testify/require"
1822
"github.com/uptrace/bun/extra/bundebug"
23+
"go.opentelemetry.io/otel/trace/noop"
1924
"io/fs"
2025
"testing"
2126
)
@@ -35,13 +40,16 @@ func TestMigrations(t *testing.T) {
3540

3641
bucketName := uuid.NewString()[:8]
3742
migrator := bucket.GetMigrator(db, bucketName)
43+
ledgers := make([]ledger.Ledger, 0)
3844

3945
for i := 0; i < 5; i++ {
4046
l, err := ledger.New(fmt.Sprintf("ledger%d", i), ledger.Configuration{
4147
Bucket: bucketName,
4248
})
4349
require.NoError(t, err)
4450
require.NoError(t, system.New(db).CreateLedger(ctx, l))
51+
52+
ledgers = append(ledgers, *l)
4553
}
4654

4755
_, err = bucket.WalkMigrations(bucket.MigrationsFS, func(entry fs.DirEntry) (*struct{}, error) {
@@ -79,4 +87,22 @@ func TestMigrations(t *testing.T) {
7987
return pointer.For(struct{}{}), nil
8088
})
8189
require.NoError(t, err)
90+
91+
for i := 0; i < 5; i++ {
92+
store := ledgerstore.New(db, bucket.NewDefault(noop.Tracer{}, bucketName), ledgers[i])
93+
94+
require.NoError(t, bunpaginate.Iterate(
95+
ctx,
96+
ledgercontroller.ColumnPaginatedQuery[any]{
97+
PageSize: 100,
98+
Order: pointer.For(bunpaginate.Order(bunpaginate.OrderAsc)),
99+
},
100+
func(ctx context.Context, q ledgercontroller.ColumnPaginatedQuery[any]) (*bunpaginate.Cursor[ledger.Log], error) {
101+
return store.Logs().Paginate(ctx, q)
102+
},
103+
func(cursor *bunpaginate.Cursor[ledger.Log]) error {
104+
return nil
105+
},
106+
))
107+
}
82108
}

internal/storage/driver/buckets_generated_test.go

Lines changed: 15 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

internal/storage/ledger/balances.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,15 @@ import (
1414
ledgercontroller "github.com/formancehq/ledger/internal/controller/ledger"
1515
)
1616

17+
const fillAccountsVolumeHistoryMigration = 21
18+
1719
func (store *Store) GetBalances(ctx context.Context, query ledgercontroller.BalanceQuery) (ledgercontroller.Balances, error) {
18-
isUpToDate, err := store.bucket.IsUpToDate(ctx, store.db)
20+
lastVersion, err := store.bucket.GetLastVersion(ctx, store.db)
1921
if err != nil {
2022
return nil, err
2123
}
2224

23-
if isUpToDate {
25+
if lastVersion >= fillAccountsVolumeHistoryMigration {
2426
return store.GetBalancesAfterUpgrade(ctx, query)
2527
} else {
2628
return store.GetBalancesWhenUpgrading(ctx, query)

0 commit comments

Comments
 (0)