Skip to content

Commit 332d0b1

Browse files
feat(indexer/postgres)!: add basic support for header, txs and events (#22695)
1 parent 34f407d commit 332d0b1

File tree

34 files changed

+187
-52
lines changed

34 files changed

+187
-52
lines changed

baseapp/abci.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -999,6 +999,10 @@ func (app *BaseApp) Commit() (*abci.CommitResponse, error) {
999999
app.logger.Error("Commit listening hook failed", "height", blockHeight, "err", err)
10001000
if app.streamingManager.StopNodeOnErr {
10011001
err = fmt.Errorf("Commit listening hook failed: %w", err)
1002+
if blockHeight == 1 {
1003+
// can't rollback to height 0, so just return the error
1004+
return nil, fmt.Errorf("failed to commit block 1, can't automatically rollback: %w", err)
1005+
}
10021006
rollbackErr := app.cms.RollbackToVersion(blockHeight - 1)
10031007
if rollbackErr != nil {
10041008
return nil, errors.Join(err, rollbackErr)

baseapp/streaming.go

Lines changed: 29 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package baseapp
22

33
import (
44
"context"
5+
"encoding/json"
56
"fmt"
67
"sort"
78
"strconv"
@@ -20,6 +21,7 @@ import (
2021

2122
"github.com/cosmos/cosmos-sdk/client/flags"
2223
servertypes "github.com/cosmos/cosmos-sdk/server/types"
24+
sdk "github.com/cosmos/cosmos-sdk/types"
2325
)
2426

2527
const (
@@ -48,7 +50,7 @@ func (app *BaseApp) EnableIndexer(indexerOpts interface{}, keys map[string]*stor
4850
app.cms.AddListeners(exposedKeys)
4951

5052
app.streamingManager = storetypes.StreamingManager{
51-
ABCIListeners: []storetypes.ABCIListener{listenerWrapper{listener.Listener}},
53+
ABCIListeners: []storetypes.ABCIListener{listenerWrapper{listener.Listener, app.txDecoder}},
5254
StopNodeOnErr: true,
5355
}
5456

@@ -144,9 +146,10 @@ func exposeStoreKeysSorted(keysStr []string, keys map[string]*storetypes.KVStore
144146
return exposeStoreKeys
145147
}
146148

147-
func eventToAppDataEvent(event abci.Event) (appdata.Event, error) {
149+
func eventToAppDataEvent(event abci.Event, height int64) (appdata.Event, error) {
148150
appdataEvent := appdata.Event{
149-
Type: event.Type,
151+
BlockNumber: uint64(height),
152+
Type: event.Type,
150153
Attributes: func() ([]appdata.EventAttribute, error) {
151154
attrs := make([]appdata.EventAttribute, len(event.Attributes))
152155
for j, attr := range event.Attributes {
@@ -197,7 +200,8 @@ func eventToAppDataEvent(event abci.Event) (appdata.Event, error) {
197200
}
198201

199202
type listenerWrapper struct {
200-
listener appdata.Listener
203+
listener appdata.Listener
204+
txDecoder sdk.TxDecoder
201205
}
202206

203207
// NewListenerWrapper creates a new listenerWrapper.
@@ -208,20 +212,35 @@ func NewListenerWrapper(listener appdata.Listener) listenerWrapper {
208212

209213
func (p listenerWrapper) ListenFinalizeBlock(_ context.Context, req abci.FinalizeBlockRequest, res abci.FinalizeBlockResponse) error {
210214
if p.listener.StartBlock != nil {
215+
// clean up redundant data
216+
reqWithoutTxs := req
217+
reqWithoutTxs.Txs = nil
218+
211219
if err := p.listener.StartBlock(appdata.StartBlockData{
212220
Height: uint64(req.Height),
213221
HeaderBytes: nil, // TODO: https://github.com/cosmos/cosmos-sdk/issues/22009
214-
HeaderJSON: nil, // TODO: https://github.com/cosmos/cosmos-sdk/issues/22009
222+
HeaderJSON: func() (json.RawMessage, error) {
223+
return json.Marshal(reqWithoutTxs)
224+
},
215225
}); err != nil {
216226
return err
217227
}
218228
}
219229
if p.listener.OnTx != nil {
220230
for i, tx := range req.Txs {
221231
if err := p.listener.OnTx(appdata.TxData{
222-
TxIndex: int32(i),
223-
Bytes: func() ([]byte, error) { return tx, nil },
224-
JSON: nil, // TODO: https://github.com/cosmos/cosmos-sdk/issues/22009
232+
BlockNumber: uint64(req.Height),
233+
TxIndex: int32(i),
234+
Bytes: func() ([]byte, error) { return tx, nil },
235+
JSON: func() (json.RawMessage, error) {
236+
sdkTx, err := p.txDecoder(tx)
237+
if err != nil {
238+
// if the transaction cannot be decoded, return the error as JSON
239+
// as there are some txs that might not be decodeable by the txDecoder
240+
return json.Marshal(err)
241+
}
242+
return json.Marshal(sdkTx)
243+
},
225244
}); err != nil {
226245
return err
227246
}
@@ -231,14 +250,14 @@ func (p listenerWrapper) ListenFinalizeBlock(_ context.Context, req abci.Finaliz
231250
events := make([]appdata.Event, len(res.Events))
232251
var err error
233252
for i, event := range res.Events {
234-
events[i], err = eventToAppDataEvent(event)
253+
events[i], err = eventToAppDataEvent(event, req.Height)
235254
if err != nil {
236255
return err
237256
}
238257
}
239258
for _, txResult := range res.TxResults {
240259
for _, event := range txResult.Events {
241-
appdataEvent, err := eventToAppDataEvent(event)
260+
appdataEvent, err := eventToAppDataEvent(event, req.Height)
242261
if err != nil {
243262
return err
244263
}

codec/collections.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -398,6 +398,7 @@ func protoCol(f protoreflect.FieldDescriptor) schema.Field {
398398
col.Kind = schema.StringKind
399399
case protoreflect.BytesKind:
400400
col.Kind = schema.BytesKind
401+
col.Nullable = true
401402
case protoreflect.EnumKind:
402403
// TODO: support enums
403404
col.Kind = schema.EnumKind

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,7 @@ require (
186186
// TODO remove after all modules have their own go.mods
187187
replace (
188188
cosmossdk.io/api => ./api
189+
cosmossdk.io/schema => ./schema
189190
cosmossdk.io/store => ./store
190191
cosmossdk.io/x/bank => ./x/bank
191192
cosmossdk.io/x/staking => ./x/staking

go.sum

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,6 @@ cosmossdk.io/log v1.5.0 h1:dVdzPJW9kMrnAYyMf1duqacoidB9uZIl+7c6z0mnq0g=
1818
cosmossdk.io/log v1.5.0/go.mod h1:Tr46PUJjiUthlwQ+hxYtUtPn4D/oCZXAkYevBeh5+FI=
1919
cosmossdk.io/math v1.4.0 h1:XbgExXFnXmF/CccPPEto40gOO7FpWu9yWNAZPN3nkNQ=
2020
cosmossdk.io/math v1.4.0/go.mod h1:O5PkD4apz2jZs4zqFdTr16e1dcaQCc5z6lkEnrrppuk=
21-
cosmossdk.io/schema v0.3.1-0.20241128094659-bd76b47e1d8b h1:svpFdulZRrYz+RTHu2u9CeKkMKrIHx5354vjiHerovo=
22-
cosmossdk.io/schema v0.3.1-0.20241128094659-bd76b47e1d8b/go.mod h1:RDAhxIeNB4bYqAlF4NBJwRrgtnciMcyyg0DOKnhNZQQ=
2321
filippo.io/edwards25519 v1.1.0 h1:FNf4tywRC1HmFuKW5xopWpigGjJKiJSV0Cqo0cJWDaA=
2422
filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4VDT4=
2523
github.com/99designs/go-keychain v0.0.0-20191008050251-8e49817e8af4 h1:/vQbFIOMbk2FiG/kXiLl8BRyzTWDw7gX/Hz7Dd5eDMs=

indexer/postgres/base_sql.go

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,17 +17,19 @@ CREATE TABLE IF NOT EXISTS tx
1717
id BIGSERIAL PRIMARY KEY,
1818
block_number BIGINT NOT NULL REFERENCES block (number),
1919
index_in_block BIGINT NOT NULL,
20-
data JSONB NOT NULL
20+
data JSONB NULL,
21+
bytes BYTEA NULL
2122
);
2223
2324
CREATE TABLE IF NOT EXISTS event
2425
(
2526
id BIGSERIAL PRIMARY KEY,
2627
block_number BIGINT NOT NULL REFERENCES block (number),
27-
tx_id BIGINT NULL REFERENCES tx (id),
28-
msg_index BIGINT NULL,
29-
event_index BIGINT NULL,
30-
type TEXT NOT NULL,
31-
data JSONB NOT NULL
28+
block_stage INTEGER NOT NULL,
29+
tx_index BIGINT NOT NULL,
30+
msg_index BIGINT NOT NULL,
31+
event_index BIGINT NOT NULL,
32+
type TEXT NULL,
33+
data JSONB NULL
3234
);
3335
`

indexer/postgres/listener.go

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package postgres
22

33
import (
4+
"encoding/json"
45
"fmt"
56

67
"cosmossdk.io/schema/appdata"
@@ -81,5 +82,72 @@ func (i *indexerImpl) listener() appdata.Listener {
8182
i.tx, err = i.db.BeginTx(i.ctx, nil)
8283
return nil, err
8384
},
85+
OnTx: txListener(i),
86+
OnEvent: eventListener(i),
87+
}
88+
}
89+
90+
func txListener(i *indexerImpl) func(data appdata.TxData) error {
91+
return func(td appdata.TxData) error {
92+
var bz []byte
93+
if td.Bytes != nil {
94+
var err error
95+
bz, err = td.Bytes()
96+
if err != nil {
97+
return err
98+
}
99+
}
100+
101+
var jsonData json.RawMessage
102+
if td.JSON != nil {
103+
var err error
104+
jsonData, err = td.JSON()
105+
if err != nil {
106+
return err
107+
}
108+
}
109+
110+
_, err := i.tx.Exec("INSERT INTO tx (block_number, index_in_block, data, bytes) VALUES ($1, $2, $3, $4)",
111+
td.BlockNumber, td.TxIndex, jsonData, bz)
112+
113+
return err
114+
}
115+
}
116+
117+
func eventListener(i *indexerImpl) func(data appdata.EventData) error {
118+
return func(data appdata.EventData) error {
119+
for _, e := range data.Events {
120+
var jsonData json.RawMessage
121+
122+
if e.Data != nil {
123+
var err error
124+
jsonData, err = e.Data()
125+
if err != nil {
126+
return fmt.Errorf("failed to get event data: %w", err)
127+
}
128+
} else if e.Attributes != nil {
129+
attrs, err := e.Attributes()
130+
if err != nil {
131+
return fmt.Errorf("failed to get event attributes: %w", err)
132+
}
133+
134+
attrsMap := map[string]interface{}{}
135+
for _, attr := range attrs {
136+
attrsMap[attr.Key] = attr.Value
137+
}
138+
139+
jsonData, err = json.Marshal(attrsMap)
140+
if err != nil {
141+
return fmt.Errorf("failed to marshal event attributes: %w", err)
142+
}
143+
}
144+
145+
_, err := i.tx.Exec("INSERT INTO event (block_number, block_stage, tx_index, msg_index, event_index, type, data) VALUES ($1, $2, $3, $4, $5, $6, $7)",
146+
e.BlockNumber, e.BlockStage, e.TxIndex, e.MsgIndex, e.EventIndex, e.Type, jsonData)
147+
if err != nil {
148+
return fmt.Errorf("failed to index event: %w", err)
149+
}
150+
}
151+
return nil
84152
}
85153
}

runtime/v2/app.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,11 @@ func (a *App[T]) SchemaDecoderResolver() decoding.DecoderResolver {
9494
for moduleName, module := range a.moduleManager.Modules() {
9595
moduleSet[moduleName] = module
9696
}
97+
98+
for _, overrideKey := range a.config.OverrideStoreKeys {
99+
moduleSet[overrideKey.KvStoreKey] = moduleSet[overrideKey.ModuleName]
100+
}
101+
97102
return decoding.ModuleSetDecoderResolver(moduleSet)
98103
}
99104

runtime/v2/go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ go 1.23
66
replace (
77
cosmossdk.io/api => ../../api
88
cosmossdk.io/core/testing => ../../core/testing
9+
cosmossdk.io/schema => ../../schema
910
cosmossdk.io/server/v2/appmanager => ../../server/v2/appmanager
1011
cosmossdk.io/server/v2/stf => ../../server/v2/stf
1112
cosmossdk.io/store/v2 => ../../store/v2

runtime/v2/go.sum

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,6 @@ cosmossdk.io/errors/v2 v2.0.0-20240731132947-df72853b3ca5 h1:IQNdY2kB+k+1OM2DvqF
1010
cosmossdk.io/errors/v2 v2.0.0-20240731132947-df72853b3ca5/go.mod h1:0CuYKkFHxc1vw2JC+t21THBCALJVROrWVR/3PQ1urpc=
1111
cosmossdk.io/log v1.5.0 h1:dVdzPJW9kMrnAYyMf1duqacoidB9uZIl+7c6z0mnq0g=
1212
cosmossdk.io/log v1.5.0/go.mod h1:Tr46PUJjiUthlwQ+hxYtUtPn4D/oCZXAkYevBeh5+FI=
13-
cosmossdk.io/schema v0.3.0 h1:01lcaM4trhzZ1HQTfTV8z6Ma1GziOZ/YmdzBN3F720c=
14-
cosmossdk.io/schema v0.3.0/go.mod h1:RDAhxIeNB4bYqAlF4NBJwRrgtnciMcyyg0DOKnhNZQQ=
1513
github.com/DataDog/datadog-go v3.2.0+incompatible/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ=
1614
github.com/DataDog/zstd v1.5.5 h1:oWf5W7GtOLgp6bciQYDmhHHjdhYkALu6S/5Ni9ZgSvQ=
1715
github.com/DataDog/zstd v1.5.5/go.mod h1:g4AWEaM3yOg3HYfnJ3YIawPnVdXJh9QME85blwSAmyw=

0 commit comments

Comments
 (0)