Skip to content

Commit 583caba

Browse files
committed
Merge branch 'master' into fxamacker/improve-payload-file
2 parents fc709b9 + c4831ee commit 583caba

File tree

16 files changed

+472
-71
lines changed

16 files changed

+472
-71
lines changed

cmd/util/ledger/migrations/account_based_migration.go

Lines changed: 51 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -73,51 +73,71 @@ func MigrateByAccount(
7373
return allPayloads, nil
7474
}
7575

76+
log.Info().
77+
Int("inner_migrations", len(migrations)).
78+
Int("nWorker", nWorker).
79+
Msgf("created account migrations")
80+
81+
// group the Payloads by account
82+
accountGroups := util.GroupPayloadsByAccount(log, allPayloads, nWorker)
83+
7684
for i, migrator := range migrations {
77-
if err := migrator.InitMigration(
85+
err := migrator.InitMigration(
7886
log.With().
7987
Int("migration_index", i).
8088
Logger(),
8189
allPayloads,
8290
nWorker,
83-
); err != nil {
91+
)
92+
if err != nil {
8493
return nil, fmt.Errorf("could not init migration: %w", err)
8594
}
8695
}
8796

97+
var migrated []*ledger.Payload
98+
err := withMigrations(log, migrations, func() error {
99+
var err error
100+
// migrate the Payloads under accounts
101+
migrated, err = MigrateGroupConcurrently(log, migrations, accountGroups, nWorker)
102+
return err
103+
})
104+
88105
log.Info().
89-
Int("inner_migrations", len(migrations)).
90-
Int("nWorker", nWorker).
91-
Msgf("created account migrations")
106+
Int("account_count", accountGroups.Len()).
107+
Int("payload_count", len(allPayloads)).
108+
Msgf("finished migrating Payloads")
92109

110+
if err != nil {
111+
return nil, fmt.Errorf("could not migrate accounts: %w", err)
112+
}
113+
114+
return migrated, nil
115+
}
116+
117+
// withMigrations calls the given function and then closes the given migrations.
118+
func withMigrations(
119+
log zerolog.Logger,
120+
migrations []AccountBasedMigration,
121+
f func() error,
122+
) (err error) {
93123
defer func() {
94124
for i, migrator := range migrations {
95125
log.Info().
96126
Int("migration_index", i).
97127
Type("migration", migrator).
98128
Msg("closing migration")
99-
if err := migrator.Close(); err != nil {
100-
log.Error().Err(err).Msg("error closing migration")
129+
if cerr := migrator.Close(); cerr != nil {
130+
log.Error().Err(cerr).Msg("error closing migration")
131+
if err == nil {
132+
// only set the error if it's not already set
133+
// so that we don't overwrite the original error
134+
err = cerr
135+
}
101136
}
102137
}
103138
}()
104139

105-
// group the Payloads by account
106-
accountGroups := util.GroupPayloadsByAccount(log, allPayloads, nWorker)
107-
108-
// migrate the Payloads under accounts
109-
migrated, err := MigrateGroupConcurrently(log, migrations, accountGroups, nWorker)
110-
111-
if err != nil {
112-
return nil, fmt.Errorf("could not migrate accounts: %w", err)
113-
}
114-
115-
log.Info().
116-
Int("account_count", accountGroups.Len()).
117-
Int("payload_count", len(allPayloads)).
118-
Msgf("finished migrating Payloads")
119-
120-
return migrated, nil
140+
return f()
121141
}
122142

123143
// MigrateGroupConcurrently migrate the Payloads in the given account groups.
@@ -282,8 +302,14 @@ func MigrateGroupConcurrently(
282302
Array("top_longest_migrations", durations.Array()).
283303
Msgf("Top longest migrations")
284304

285-
if ctx.Err() != nil {
286-
return nil, fmt.Errorf("fail to migrate payload: %w", ctx.Err())
305+
err := ctx.Err()
306+
if err != nil {
307+
cause := context.Cause(ctx)
308+
if cause != nil {
309+
err = cause
310+
}
311+
312+
return nil, fmt.Errorf("failed to migrate payload: %w", err)
287313
}
288314

289315
return migrated, nil
Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
package migrations_test
2+
3+
import (
4+
"context"
5+
"fmt"
6+
7+
"testing"
8+
9+
"github.com/onflow/cadence/runtime/common"
10+
"github.com/rs/zerolog"
11+
"github.com/stretchr/testify/require"
12+
13+
"github.com/onflow/flow-go/cmd/util/ledger/migrations"
14+
"github.com/onflow/flow-go/ledger"
15+
)
16+
17+
func TestErrorPropagation(t *testing.T) {
18+
t.Parallel()
19+
20+
log := zerolog.New(zerolog.NewTestWriter(t))
21+
22+
address, err := common.HexToAddress("0x1")
23+
require.NoError(t, err)
24+
25+
migrateWith := func(mig migrations.AccountBasedMigration) error {
26+
_, err := migrations.MigrateByAccount(
27+
log,
28+
10,
29+
[]*ledger.Payload{
30+
// at least one payload otherwise the migration will not get called
31+
accountStatusPayload(address),
32+
},
33+
[]migrations.AccountBasedMigration{
34+
mig,
35+
},
36+
)
37+
return err
38+
}
39+
40+
t.Run("no err", func(t *testing.T) {
41+
err := migrateWith(
42+
testMigration{},
43+
)
44+
require.NoError(t, err)
45+
})
46+
47+
t.Run("err on close", func(t *testing.T) {
48+
49+
desiredErr := fmt.Errorf("test close error")
50+
err := migrateWith(
51+
testMigration{
52+
CloseFN: func() error {
53+
return desiredErr
54+
},
55+
},
56+
)
57+
require.ErrorIs(t, err, desiredErr)
58+
})
59+
60+
t.Run("err on init", func(t *testing.T) {
61+
desiredErr := fmt.Errorf("test init error")
62+
err := migrateWith(
63+
testMigration{
64+
InitMigrationFN: func(log zerolog.Logger, allPayloads []*ledger.Payload, nWorkers int) error {
65+
return desiredErr
66+
},
67+
},
68+
)
69+
require.ErrorIs(t, err, desiredErr)
70+
})
71+
72+
t.Run("err on migrate", func(t *testing.T) {
73+
desiredErr := fmt.Errorf("test migrate error")
74+
err := migrateWith(
75+
testMigration{
76+
MigrateAccountFN: func(ctx context.Context, address common.Address, payloads []*ledger.Payload) ([]*ledger.Payload, error) {
77+
return nil, desiredErr
78+
},
79+
},
80+
)
81+
require.ErrorIs(t, err, desiredErr)
82+
})
83+
}
84+
85+
type testMigration struct {
86+
InitMigrationFN func(log zerolog.Logger, allPayloads []*ledger.Payload, nWorkers int) error
87+
MigrateAccountFN func(ctx context.Context, address common.Address, payloads []*ledger.Payload) ([]*ledger.Payload, error)
88+
CloseFN func() error
89+
}
90+
91+
func (t testMigration) InitMigration(log zerolog.Logger, allPayloads []*ledger.Payload, nWorkers int) error {
92+
if t.InitMigrationFN != nil {
93+
return t.InitMigrationFN(log, allPayloads, nWorkers)
94+
}
95+
return nil
96+
}
97+
98+
func (t testMigration) MigrateAccount(ctx context.Context, address common.Address, payloads []*ledger.Payload) ([]*ledger.Payload, error) {
99+
100+
if t.MigrateAccountFN != nil {
101+
return t.MigrateAccountFN(ctx, address, payloads)
102+
}
103+
return payloads, nil
104+
}
105+
106+
func (t testMigration) Close() error {
107+
if t.CloseFN != nil {
108+
return t.CloseFN()
109+
}
110+
return nil
111+
}
112+
113+
var _ migrations.AccountBasedMigration = &testMigration{}

cmd/util/ledger/migrations/deduplicate_contract_names_migration_test.go

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -33,12 +33,6 @@ func TestDeduplicateContractNamesMigration(t *testing.T) {
3333

3434
accountStatus := environment.NewAccountStatus()
3535
accountStatus.SetStorageUsed(1000)
36-
accountStatusPayload := ledger.NewPayload(
37-
convert.RegisterIDToLedgerKey(
38-
flow.AccountStatusRegisterID(flow.ConvertAddress(address)),
39-
),
40-
accountStatus.ToBytes(),
41-
)
4236

4337
contractNamesPayload := func(contractNames []byte) *ledger.Payload {
4438
return ledger.NewPayload(
@@ -75,7 +69,7 @@ func TestDeduplicateContractNamesMigration(t *testing.T) {
7569
t.Run("no contract names", func(t *testing.T) {
7670
payloads, err := migration.MigrateAccount(ctx, address,
7771
[]*ledger.Payload{
78-
accountStatusPayload,
72+
accountStatusPayload(address),
7973
},
8074
)
8175

@@ -90,7 +84,7 @@ func TestDeduplicateContractNamesMigration(t *testing.T) {
9084

9185
payloads, err := migration.MigrateAccount(ctx, address,
9286
[]*ledger.Payload{
93-
accountStatusPayload,
87+
accountStatusPayload(address),
9488
contractNamesPayload(newContractNames),
9589
},
9690
)
@@ -111,7 +105,7 @@ func TestDeduplicateContractNamesMigration(t *testing.T) {
111105

112106
payloads, err := migration.MigrateAccount(ctx, address,
113107
[]*ledger.Payload{
114-
accountStatusPayload,
108+
accountStatusPayload(address),
115109
contractNamesPayload(newContractNames),
116110
},
117111
)
@@ -133,7 +127,7 @@ func TestDeduplicateContractNamesMigration(t *testing.T) {
133127

134128
payloads, err := migration.MigrateAccount(ctx, address,
135129
[]*ledger.Payload{
136-
accountStatusPayload,
130+
accountStatusPayload(address),
137131
contractNamesPayload(newContractNames),
138132
},
139133
)
@@ -154,7 +148,7 @@ func TestDeduplicateContractNamesMigration(t *testing.T) {
154148

155149
_, err = migration.MigrateAccount(ctx, address,
156150
[]*ledger.Payload{
157-
accountStatusPayload,
151+
accountStatusPayload(address),
158152
contractNamesPayload(newContractNames),
159153
},
160154
)
@@ -169,7 +163,7 @@ func TestDeduplicateContractNamesMigration(t *testing.T) {
169163

170164
payloads, err := migration.MigrateAccount(ctx, address,
171165
[]*ledger.Payload{
172-
accountStatusPayload,
166+
accountStatusPayload(address),
173167
contractNamesPayload(newContractNames),
174168
},
175169
)
@@ -206,7 +200,7 @@ func TestDeduplicateContractNamesMigration(t *testing.T) {
206200

207201
payloads, err := migration.MigrateAccount(ctx, address,
208202
[]*ledger.Payload{
209-
accountStatusPayload,
203+
accountStatusPayload(address),
210204
contractNamesPayload(newContractNames),
211205
},
212206
)
@@ -219,3 +213,14 @@ func TestDeduplicateContractNamesMigration(t *testing.T) {
219213
})
220214
})
221215
}
216+
217+
func accountStatusPayload(address common.Address) *ledger.Payload {
218+
accountStatus := environment.NewAccountStatus()
219+
220+
return ledger.NewPayload(
221+
convert.RegisterIDToLedgerKey(
222+
flow.AccountStatusRegisterID(flow.ConvertAddress(address)),
223+
),
224+
accountStatus.ToBytes(),
225+
)
226+
}

engine/access/rpc/backend/backend_events_test.go

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
package backend
22

33
import (
4+
"bytes"
45
"context"
56
"fmt"
7+
"sort"
68
"testing"
79

810
"github.com/rs/zerolog"
@@ -115,10 +117,19 @@ func (s *BackendEventsSuite) SetupTest() {
115117
s.blockEvents = generator.GetEventsWithEncoding(10, entities.EventEncodingVersion_CCF_V0)
116118
targetEvent = string(s.blockEvents[0].Type)
117119

120+
// events returned from the db are sorted by txID, txIndex, then eventIndex.
121+
// reproduce that here to ensure output order works as expected
122+
returnBlockEvents := make([]flow.Event, len(s.blockEvents))
123+
copy(returnBlockEvents, s.blockEvents)
124+
125+
sort.Slice(returnBlockEvents, func(i, j int) bool {
126+
return bytes.Compare(returnBlockEvents[i].TransactionID[:], returnBlockEvents[j].TransactionID[:]) < 0
127+
})
128+
118129
s.events.On("ByBlockID", mock.Anything).Return(func(blockID flow.Identifier) ([]flow.Event, error) {
119130
for _, headerID := range s.blockIDs {
120131
if blockID == headerID {
121-
return s.blockEvents, nil
132+
return returnBlockEvents, nil
122133
}
123134
}
124135
return nil, storage.ErrNotFound

0 commit comments

Comments
 (0)