Skip to content

Commit f07348c

Browse files
D4ryl00jefft0
andauthored
Feat: handle new Events from a tx's response (#194)
Co-authored-by: Jeff Thompson <jeff@thefirst.org>
1 parent 43f4b37 commit f07348c

File tree

16 files changed

+2467
-379
lines changed

16 files changed

+2467
-379
lines changed

client/http.go

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package client
22

33
import (
4+
"context"
45
"fmt"
56

67
rpcClient "github.com/gnolang/gno/tm2/pkg/bft/rpc/client"
@@ -33,39 +34,39 @@ func (c *Client) CreateBatch() clientTypes.Batch {
3334
}
3435
}
3536

36-
func (c *Client) GetLatestBlockNumber() (uint64, error) {
37-
status, err := c.client.Status()
37+
func (c *Client) GetLatestBlockNumber(ctx context.Context) (uint64, error) {
38+
status, err := c.client.Status(ctx, nil)
3839
if err != nil {
3940
return 0, fmt.Errorf("unable to get chain status, %w", err)
4041
}
4142

4243
return uint64(status.SyncInfo.LatestBlockHeight), nil
4344
}
4445

45-
func (c *Client) GetBlock(blockNum uint64) (*core_types.ResultBlock, error) {
46+
func (c *Client) GetBlock(ctx context.Context, blockNum uint64) (*core_types.ResultBlock, error) {
4647
bn := int64(blockNum)
4748

48-
block, err := c.client.Block(&bn)
49+
block, err := c.client.Block(ctx, &bn)
4950
if err != nil {
5051
return nil, fmt.Errorf("unable to get block, %w", err)
5152
}
5253

5354
return block, nil
5455
}
5556

56-
func (c *Client) GetGenesis() (*core_types.ResultGenesis, error) {
57-
genesis, err := c.client.Genesis()
57+
func (c *Client) GetGenesis(ctx context.Context) (*core_types.ResultGenesis, error) {
58+
genesis, err := c.client.Genesis(ctx)
5859
if err != nil {
5960
return nil, fmt.Errorf("unable to get genesis block, %w", err)
6061
}
6162

6263
return genesis, nil
6364
}
6465

65-
func (c *Client) GetBlockResults(blockNum uint64) (*core_types.ResultBlockResults, error) {
66+
func (c *Client) GetBlockResults(ctx context.Context, blockNum uint64) (*core_types.ResultBlockResults, error) {
6667
bn := int64(blockNum)
6768

68-
results, err := c.client.BlockResults(&bn)
69+
results, err := c.client.BlockResults(ctx, &bn)
6970
if err != nil {
7071
return nil, fmt.Errorf("unable to get block results, %w", err)
7172
}

fetch/fetch.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ func New(
7373
return f
7474
}
7575

76-
func (f *Fetcher) fetchGenesisData() error {
76+
func (f *Fetcher) fetchGenesisData(ctx context.Context) error {
7777
_, err := f.storage.GetLatestHeight()
7878
// Possible cases:
7979
// - err is ErrNotFound: the storage is empty, we execute the rest of the routine and fetch+write genesis data
@@ -86,12 +86,12 @@ func (f *Fetcher) fetchGenesisData() error {
8686

8787
f.logger.Info("Fetching genesis")
8888

89-
block, err := getGenesisBlock(f.client)
89+
block, err := getGenesisBlock(ctx, f.client)
9090
if err != nil {
9191
return fmt.Errorf("failed to fetch genesis block: %w", err)
9292
}
9393

94-
results, err := f.client.GetBlockResults(0)
94+
results, err := f.client.GetBlockResults(ctx, 0)
9595
if err != nil {
9696
return fmt.Errorf("failed to fetch genesis results: %w", err)
9797
}
@@ -131,7 +131,7 @@ func (f *Fetcher) fetchGenesisData() error {
131131
// blockchain data
132132
func (f *Fetcher) FetchChainData(ctx context.Context) error {
133133
// Attempt to fetch the genesis data
134-
if err := f.fetchGenesisData(); err != nil {
134+
if err := f.fetchGenesisData(ctx); err != nil {
135135
// We treat this error as soft, to ease migration, since
136136
// some versions of gno networks don't support this.
137137
// In the future, we should hard fail if genesis is not fetch-able
@@ -156,7 +156,7 @@ func (f *Fetcher) FetchChainData(ctx context.Context) error {
156156
}
157157

158158
// Fetch the latest block from the chain
159-
latestRemote, latestErr := f.client.GetLatestBlockNumber()
159+
latestRemote, latestErr := f.client.GetLatestBlockNumber(ctx)
160160
if latestErr != nil {
161161
f.logger.Error("unable to fetch latest block number", zap.Error(latestErr))
162162

@@ -322,22 +322,22 @@ func (f *Fetcher) writeSlot(s *slot) error {
322322
return nil
323323
}
324324

325-
func (f *Fetcher) IsReady() (bool, error) {
325+
func (f *Fetcher) IsReady(ctx context.Context) (bool, error) {
326326
if f.latestChunkSize == int(f.maxChunkSize) {
327327
return false, fmt.Errorf("the data synchronization process is still in progress and hasn't "+
328328
"caught up with the current blockchain state. Chunk size: %d", f.latestChunkSize)
329329
}
330330

331-
_, err := f.client.GetLatestBlockNumber()
331+
_, err := f.client.GetLatestBlockNumber(ctx)
332332
if err != nil {
333333
return false, fmt.Errorf("node RPC method is not reachable: %w", err)
334334
}
335335

336336
return true, nil
337337
}
338338

339-
func getGenesisBlock(client Client) (*bft_types.Block, error) {
340-
gblock, err := client.GetGenesis()
339+
func getGenesisBlock(ctx context.Context, client Client) (*bft_types.Block, error) {
340+
gblock, err := client.GetGenesis(ctx)
341341
if err != nil {
342342
return nil, fmt.Errorf("unable to get genesis block: %w", err)
343343
}

fetch/fetch_test.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1102,7 +1102,7 @@ func TestFetcher_Genesis(t *testing.T) {
11021102

11031103
f := New(mockStorage, mockClient, mockEvents)
11041104

1105-
require.NoError(t, f.fetchGenesisData())
1105+
require.NoError(t, f.fetchGenesisData(context.Background()))
11061106

11071107
require.Len(t, capturedEvents, 1)
11081108

@@ -1140,7 +1140,7 @@ func TestFetcher_GenesisAlreadyFetched(t *testing.T) {
11401140

11411141
f := New(mockStorage, mockClient, mockEvents)
11421142

1143-
require.NoError(t, f.fetchGenesisData())
1143+
require.NoError(t, f.fetchGenesisData(context.Background()))
11441144
}
11451145

11461146
func TestFetcher_GenesisFetchError(t *testing.T) {
@@ -1183,7 +1183,7 @@ func TestFetcher_GenesisFetchError(t *testing.T) {
11831183

11841184
f := New(mockStorage, mockClient, mockEvents)
11851185

1186-
require.ErrorIs(t, f.fetchGenesisData(), remoteErr)
1186+
require.ErrorIs(t, f.fetchGenesisData(context.Background()), remoteErr)
11871187
}
11881188

11891189
func TestFetcher_GenesisInvalidState(t *testing.T) {
@@ -1224,7 +1224,7 @@ func TestFetcher_GenesisInvalidState(t *testing.T) {
12241224

12251225
f := New(mockStorage, mockClient, mockEvents)
12261226

1227-
require.ErrorContains(t, f.fetchGenesisData(), "unknown genesis state kind 'int'")
1227+
require.ErrorContains(t, f.fetchGenesisData(context.Background()), "unknown genesis state kind 'int'")
12281228
}
12291229

12301230
func TestFetcher_GenesisFetchResultsError(t *testing.T) {
@@ -1267,7 +1267,7 @@ func TestFetcher_GenesisFetchResultsError(t *testing.T) {
12671267

12681268
f := New(mockStorage, mockClient, mockEvents)
12691269

1270-
require.ErrorIs(t, f.fetchGenesisData(), remoteErr)
1270+
require.ErrorIs(t, f.fetchGenesisData(context.Background()), remoteErr)
12711271
}
12721272

12731273
func TestFetcher_GenesisNilGenesisDoc(t *testing.T) {
@@ -1306,7 +1306,7 @@ func TestFetcher_GenesisNilGenesisDoc(t *testing.T) {
13061306

13071307
f := New(mockStorage, mockClient, mockEvents)
13081308

1309-
require.Error(t, f.fetchGenesisData())
1309+
require.Error(t, f.fetchGenesisData(context.Background()))
13101310
}
13111311

13121312
func TestFetcher_GenesisNilResults(t *testing.T) {
@@ -1347,7 +1347,7 @@ func TestFetcher_GenesisNilResults(t *testing.T) {
13471347

13481348
f := New(mockStorage, mockClient, mockEvents)
13491349

1350-
require.Error(t, f.fetchGenesisData())
1350+
require.Error(t, f.fetchGenesisData(context.Background()))
13511351
}
13521352

13531353
// generateTransactions generates dummy transactions

fetch/mocks_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,31 +27,31 @@ type mockClient struct {
2727
createBatchFn createBatchDelegate
2828
}
2929

30-
func (m *mockClient) GetLatestBlockNumber() (uint64, error) {
30+
func (m *mockClient) GetLatestBlockNumber(ctx context.Context) (uint64, error) {
3131
if m.getLatestBlockNumberFn != nil {
3232
return m.getLatestBlockNumberFn()
3333
}
3434

3535
return 0, nil
3636
}
3737

38-
func (m *mockClient) GetBlock(blockNum uint64) (*core_types.ResultBlock, error) {
38+
func (m *mockClient) GetBlock(ctx context.Context, blockNum uint64) (*core_types.ResultBlock, error) {
3939
if m.getBlockFn != nil {
4040
return m.getBlockFn(blockNum)
4141
}
4242

4343
return nil, nil
4444
}
4545

46-
func (m *mockClient) GetGenesis() (*core_types.ResultGenesis, error) {
46+
func (m *mockClient) GetGenesis(ctx context.Context) (*core_types.ResultGenesis, error) {
4747
if m.getGenesisFn != nil {
4848
return m.getGenesisFn()
4949
}
5050

5151
return nil, nil
5252
}
5353

54-
func (m *mockClient) GetBlockResults(blockNum uint64) (*core_types.ResultBlockResults, error) {
54+
func (m *mockClient) GetBlockResults(ctx context.Context, blockNum uint64) (*core_types.ResultBlockResults, error) {
5555
if m.getBlockResultsFn != nil {
5656
return m.getBlockResultsFn(blockNum)
5757
}

fetch/types.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package fetch
22

33
import (
4+
"context"
5+
46
core_types "github.com/gnolang/gno/tm2/pkg/bft/rpc/core/types"
57

68
clientTypes "github.com/gnolang/tx-indexer/client/types"
@@ -10,17 +12,17 @@ import (
1012
// Client defines the interface for the node (client) communication
1113
type Client interface {
1214
// GetLatestBlockNumber returns the latest block height from the chain
13-
GetLatestBlockNumber() (uint64, error)
15+
GetLatestBlockNumber(context.Context) (uint64, error)
1416

1517
// GetBlock returns specified block
16-
GetBlock(uint64) (*core_types.ResultBlock, error)
18+
GetBlock(context.Context, uint64) (*core_types.ResultBlock, error)
1719

1820
// GetGenesis returns the genesis block
19-
GetGenesis() (*core_types.ResultGenesis, error)
21+
GetGenesis(context.Context) (*core_types.ResultGenesis, error)
2022

2123
// GetBlockResults returns the results of executing the transactions
2224
// for the specified block
23-
GetBlockResults(uint64) (*core_types.ResultBlockResults, error)
25+
GetBlockResults(context.Context, uint64) (*core_types.ResultBlockResults, error)
2426

2527
// CreateBatch creates a new client batch
2628
CreateBatch() clientTypes.Batch

fetch/worker.go

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,10 @@ func handleChunk(
3232
errs := make([]error, 0)
3333

3434
// Get block data from the node
35-
blocks, err := getBlocksFromBatch(info.chunkRange, client)
35+
blocks, err := getBlocksFromBatch(ctx, info.chunkRange, client)
3636
errs = append(errs, err)
3737

38-
results, err := getTxResultFromBatch(blocks, client)
38+
results, err := getTxResultFromBatch(ctx, blocks, client)
3939
errs = append(errs, err)
4040

4141
return &chunk{
@@ -61,7 +61,7 @@ func handleChunk(
6161
// getBlocksFromBatch gets the blocks using batch requests.
6262
// In case of encountering an error during fetching (remote temporarily closed, batch error...),
6363
// the fetch is attempted again using sequential block fetches
64-
func getBlocksFromBatch(chunkRange chunkRange, client Client) ([]*types.Block, error) {
64+
func getBlocksFromBatch(ctx context.Context, chunkRange chunkRange, client Client) ([]*types.Block, error) {
6565
var (
6666
batch = client.CreateBatch()
6767
fetchedBlocks = make([]*types.Block, 0)
@@ -82,7 +82,7 @@ func getBlocksFromBatch(chunkRange chunkRange, client Client) ([]*types.Block, e
8282
blocksRaw, err := batch.Execute(context.Background())
8383
if err != nil {
8484
// Try to fetch sequentially
85-
return getBlocksSequentially(chunkRange, client)
85+
return getBlocksSequentially(ctx, chunkRange, client)
8686
}
8787

8888
// Extract the blocks
@@ -100,15 +100,15 @@ func getBlocksFromBatch(chunkRange chunkRange, client Client) ([]*types.Block, e
100100
}
101101

102102
// getBlocksSequentially attempts to fetch blocks from the client, using sequential requests
103-
func getBlocksSequentially(chunkRange chunkRange, client Client) ([]*types.Block, error) {
103+
func getBlocksSequentially(ctx context.Context, chunkRange chunkRange, client Client) ([]*types.Block, error) {
104104
var (
105105
errs = make([]error, 0)
106106
blocks = make([]*types.Block, 0)
107107
)
108108

109109
for blockNum := chunkRange.from; blockNum <= chunkRange.to; blockNum++ {
110110
// Get block info from the chain
111-
block, err := client.GetBlock(blockNum)
111+
block, err := client.GetBlock(ctx, blockNum)
112112
if err != nil {
113113
errs = append(errs, fmt.Errorf("unable to get block %d, %w", blockNum, err))
114114

@@ -124,7 +124,7 @@ func getBlocksSequentially(chunkRange chunkRange, client Client) ([]*types.Block
124124
// getTxResultFromBatch gets the tx results using batch requests.
125125
// In case of encountering an error during fetching (remote temporarily closed, batch error...),
126126
// the fetch is attempted again using sequential tx result fetches
127-
func getTxResultFromBatch(blocks []*types.Block, client Client) ([][]*types.TxResult, error) {
127+
func getTxResultFromBatch(ctx context.Context, blocks []*types.Block, client Client) ([][]*types.TxResult, error) {
128128
var (
129129
batch = client.CreateBatch()
130130
fetchedResults = make([][]*types.TxResult, len(blocks))
@@ -158,7 +158,7 @@ func getTxResultFromBatch(blocks []*types.Block, client Client) ([][]*types.TxRe
158158
blockResultsRaw, err := batch.Execute(context.Background())
159159
if err != nil {
160160
// Try to fetch sequentially
161-
return getTxResultsSequentially(blocks, client)
161+
return getTxResultsSequentially(ctx, blocks, client)
162162
}
163163

164164
indexOfBlockHeight := make(map[int64]int, len(blocks))
@@ -198,7 +198,7 @@ func getTxResultFromBatch(blocks []*types.Block, client Client) ([][]*types.TxRe
198198
}
199199

200200
// getTxResultsSequentially attempts to fetch tx results from the client, using sequential requests
201-
func getTxResultsSequentially(blocks []*types.Block, client Client) ([][]*types.TxResult, error) {
201+
func getTxResultsSequentially(ctx context.Context, blocks []*types.Block, client Client) ([][]*types.TxResult, error) {
202202
var (
203203
errs = make([]error, 0)
204204
results = make([][]*types.TxResult, len(blocks))
@@ -210,7 +210,7 @@ func getTxResultsSequentially(blocks []*types.Block, client Client) ([][]*types.
210210
}
211211

212212
// Get the transaction execution results
213-
blockResults, err := client.GetBlockResults(uint64(block.Height))
213+
blockResults, err := client.GetBlockResults(ctx, uint64(block.Height))
214214
if err != nil {
215215
errs = append(
216216
errs,

go.mod

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ require (
66
github.com/99designs/gqlgen v0.17.56
77
github.com/ajnavarro/gqlfiltergen v0.1.2
88
github.com/cockroachdb/pebble v1.1.5
9-
github.com/gnolang/gno v0.0.0-20250716085632-95d5f5e743c9
9+
github.com/gnolang/gno v0.0.0-20250903085916-8441a1e81345
1010
github.com/go-chi/chi/v5 v5.2.1
1111
github.com/go-chi/httprate v0.15.0
1212
github.com/google/uuid v1.6.0
@@ -24,6 +24,7 @@ require (
2424
require (
2525
github.com/ajg/form v1.5.1 // indirect
2626
github.com/go-viper/mapstructure/v2 v2.2.1 // indirect
27+
github.com/gofrs/flock v0.12.1 // indirect
2728
github.com/klauspost/cpuid/v2 v2.2.10 // indirect
2829
github.com/sig-0/insertion-queue v0.0.0-20241004125609-6b3ca841346b // indirect
2930
github.com/valyala/bytebufferpool v1.0.0 // indirect
@@ -81,13 +82,13 @@ require (
8182
go.opentelemetry.io/otel/sdk/metric v1.34.0 // indirect
8283
go.opentelemetry.io/otel/trace v1.34.0 // indirect
8384
go.opentelemetry.io/proto/otlp v1.5.0 // indirect
84-
golang.org/x/crypto v0.37.0 // indirect
85+
golang.org/x/crypto v0.40.0 // indirect
8586
golang.org/x/exp v0.0.0-20240613232115-7f521ea00fb8 // indirect
86-
golang.org/x/mod v0.24.0 // indirect
87-
golang.org/x/net v0.39.0 // indirect
88-
golang.org/x/sys v0.32.0 // indirect
89-
golang.org/x/text v0.24.0 // indirect
90-
golang.org/x/tools v0.32.0 // indirect
87+
golang.org/x/mod v0.26.0 // indirect
88+
golang.org/x/net v0.42.0 // indirect
89+
golang.org/x/sys v0.34.0 // indirect
90+
golang.org/x/text v0.28.0 // indirect
91+
golang.org/x/tools v0.35.0 // indirect
9192
google.golang.org/genproto/googleapis/api v0.0.0-20250115164207-1a7da9e5054f // indirect
9293
google.golang.org/genproto/googleapis/rpc v0.0.0-20250115164207-1a7da9e5054f // indirect
9394
google.golang.org/grpc v1.69.4 // indirect

0 commit comments

Comments
 (0)