Skip to content

Commit e18da07

Browse files
author
yihuang
committed
Problem: state streamers are not integrated (#702)
* Problem: state streaming is integrated Solution: - integration the basic file streamer * add integration test * changelog * fix build * fix lint * fix deliver tx event in cosmos-sdk * fix integration test * Update integration_tests/test_streamer.py Signed-off-by: yihuang <huang@crypto.com> * update ethermint and fix build * add a small cli utility into test_streamer.py * fix integration test * update sdk to upstream Signed-off-by: yihuang <huang@crypto.com>
1 parent af996ab commit e18da07

File tree

10 files changed

+150
-6
lines changed

10 files changed

+150
-6
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,10 @@
66

77
- [cronos#719](https://github.com/crypto-org-chain/cronos/pull/719) Fix `eth_call` for legacy blocks (backport #713).
88

9+
### Improvements
10+
11+
- [cronos#702](https://github.com/crypto-org-chain/cronos/pull/702) Integrate the file state streamer (backport #702).
12+
913
*Sep 20, 2022*
1014

1115
## v0.9.0-beta3

app/app.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"net/http"
66
"os"
77
"path/filepath"
8+
"sync"
89

910
"github.com/crypto-org-chain/cronos/x/cronos/middleware"
1011

@@ -19,11 +20,13 @@ import (
1920
dbm "github.com/tendermint/tm-db"
2021

2122
"github.com/cosmos/cosmos-sdk/baseapp"
23+
"github.com/cosmos/cosmos-sdk/client/flags"
2224
"github.com/cosmos/cosmos-sdk/client/grpc/tmservice"
2325
"github.com/cosmos/cosmos-sdk/codec"
2426
"github.com/cosmos/cosmos-sdk/server/api"
2527
"github.com/cosmos/cosmos-sdk/server/config"
2628
servertypes "github.com/cosmos/cosmos-sdk/server/types"
29+
"github.com/cosmos/cosmos-sdk/store/streaming/file"
2730
storetypes "github.com/cosmos/cosmos-sdk/store/types"
2831
"github.com/cosmos/cosmos-sdk/testutil/testdata"
2932
sdk "github.com/cosmos/cosmos-sdk/types"
@@ -118,6 +121,7 @@ import (
118121
gravitytypes "github.com/peggyjv/gravity-bridge/module/v2/x/gravity/types"
119122

120123
// this line is used by starport scaffolding # stargate/app/moduleImport
124+
cronosappclient "github.com/crypto-org-chain/cronos/client"
121125
"github.com/crypto-org-chain/cronos/x/cronos"
122126
cronosclient "github.com/crypto-org-chain/cronos/x/cronos/client"
123127
cronoskeeper "github.com/crypto-org-chain/cronos/x/cronos/keeper"
@@ -142,6 +146,8 @@ const (
142146
//
143147
// NOTE: In the SDK, the default value is 255.
144148
AddrLen = 20
149+
150+
FileStreamerDirectory = "file_streamer"
145151
)
146152

147153
// this line is used by starport scaffolding # stargate/wasm/app/enabledProposals
@@ -341,6 +347,32 @@ func New(
341347
tkeys := sdk.NewTransientStoreKeys(paramstypes.TStoreKey, evmtypes.TransientKey, feemarkettypes.TransientKey)
342348
memKeys := sdk.NewMemoryStoreKeys(capabilitytypes.MemStoreKey)
343349

350+
// configure state listening capabilities using AppOptions
351+
// we are doing nothing with the returned streamingServices and waitGroup in this case
352+
// Only support file streamer right now.
353+
if cast.ToString(appOpts.Get(cronosappclient.FlagStreamers)) == "file" {
354+
streamingDir := filepath.Join(cast.ToString(appOpts.Get(flags.FlagHome)), "data", FileStreamerDirectory)
355+
if err := os.MkdirAll(streamingDir, os.ModePerm); err != nil {
356+
panic(err)
357+
}
358+
359+
// default to exposing all
360+
exposeStoreKeys := make([]storetypes.StoreKey, 0, len(keys))
361+
for _, storeKey := range keys {
362+
exposeStoreKeys = append(exposeStoreKeys, storeKey)
363+
}
364+
service, err := file.NewStreamingService(streamingDir, "", exposeStoreKeys, appCodec)
365+
if err != nil {
366+
panic(err)
367+
}
368+
bApp.SetStreamingService(service)
369+
370+
wg := new(sync.WaitGroup)
371+
if err := service.Stream(wg); err != nil {
372+
panic(err)
373+
}
374+
}
375+
344376
app := &App{
345377
BaseApp: bApp,
346378
cdc: cdc,

client/flags.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
package client
2+
3+
const FlagStreamers = "streamers"

cmd/cronosd/cmd/root.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ import (
4545
ethermint "github.com/evmos/ethermint/types"
4646

4747
"github.com/crypto-org-chain/cronos/app"
48+
cronosclient "github.com/crypto-org-chain/cronos/client"
4849
// this line is used by starport scaffolding # stargate/root/import
4950
)
5051

@@ -147,6 +148,7 @@ func initRootCmd(rootCmd *cobra.Command, encodingConfig params.EncodingConfig) {
147148
func addModuleInitFlags(startCmd *cobra.Command) {
148149
crisis.AddModuleInitFlags(startCmd)
149150
cronos.AddModuleInitFlags(startCmd)
151+
startCmd.Flags().String(cronosclient.FlagStreamers, "", "Enable streamers, only file streamer is supported right now")
150152
// this line is used by starport scaffolding # stargate/root/initFlags
151153
}
152154

integration_tests/configs/default.jsonnet

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
dotenv: '../../scripts/.env',
33
'cronos_777-1': {
44
cmd: 'cronosd',
5-
'start-flags': '--trace',
5+
'start-flags': '--trace --streamers file',
66
config: {
77
mempool: {
88
version: 'v1',

integration_tests/network.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,9 +55,10 @@ def node_rpc(self, i):
5555
return "tcp://127.0.0.1:%d" % ports.rpc_port(self.base_port(i))
5656

5757
def cosmos_cli(self, i=0):
58-
return CosmosCLI(
59-
self.base_dir / f"node{i}", self.node_rpc(i), self.chain_binary
60-
)
58+
return CosmosCLI(self.node_home(i), self.node_rpc(i), self.chain_binary)
59+
60+
def node_home(self, i=0):
61+
return self.base_dir / f"node{i}"
6162

6263
def use_websocket(self, use=True):
6364
self._w3 = None

integration_tests/poetry.lock

Lines changed: 17 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

integration_tests/pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ toml = "^0.10.2"
2626
pysha3 = "^1.0.2"
2727
jsonnet = "^0.18.0"
2828
eth-account = { git = "https://github.com/mmsqe/eth-account.git", branch = "v0.5.8-rc0" }
29+
cprotobuf = { git = "https://github.com/yihuang/cprotobuf.git" }
2930

3031
[tool.poetry.dev-dependencies]
3132

integration_tests/test_streamer.py

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
from cprotobuf import Field, ProtoEntity, decode_primitive
2+
from hexbytes import HexBytes
3+
4+
from .utils import ADDRS
5+
6+
7+
class StoreKVPairs(ProtoEntity):
8+
# the store key for the KVStore this pair originates from
9+
store_key = Field("string", 1)
10+
# true indicates a delete operation
11+
delete = Field("bool", 2)
12+
key = Field("bytes", 3)
13+
value = Field("bytes", 4)
14+
15+
16+
def decode_stream_file(data, body_cls=StoreKVPairs, header_cls=None, footer_cls=None):
17+
"""
18+
header, body*, footer
19+
"""
20+
header = footer = None
21+
body = []
22+
offset = 0
23+
size, n = decode_primitive(data, "uint64")
24+
offset += n
25+
26+
# header
27+
if header_cls is not None:
28+
header = header_cls()
29+
header.ParseFromString(data[offset : offset + size])
30+
offset += size
31+
32+
while True:
33+
size, n = decode_primitive(data[offset:], "uint64")
34+
offset += n
35+
if offset + size == len(data):
36+
# footer
37+
if footer_cls is not None:
38+
footer = footer_cls()
39+
footer.ParseFromString(data[offset : offset + size])
40+
offset += size
41+
break
42+
else:
43+
# body
44+
if body_cls is not None:
45+
item = body_cls()
46+
item.ParseFromString(data[offset : offset + size])
47+
body.append(item)
48+
offset += size
49+
return header, body, footer
50+
51+
52+
def test_streamers(cronos):
53+
"""
54+
- check the streaming files are created
55+
- try to parse the state change sets
56+
"""
57+
# inspect the first state change of the first tx in genesis
58+
path = cronos.node_home(0) / "data/file_streamer/block-0-tx-0"
59+
_, body, _ = decode_stream_file(open(path, "rb").read())
60+
# creation of the validator account
61+
assert body[0].store_key == "acc"
62+
# the order in gen_txs is undeterministic, could be either one.
63+
assert body[0].key in (
64+
b"\x01" + HexBytes(ADDRS["validator"]),
65+
b"\x01" + HexBytes(ADDRS["validator2"]),
66+
)
67+
68+
69+
if __name__ == "__main__":
70+
import binascii
71+
import sys
72+
73+
_, body, _ = decode_stream_file(open(sys.argv[1], "rb").read())
74+
for item in body:
75+
print(
76+
item.store_key,
77+
item.delete,
78+
binascii.hexlify(item.key).decode(),
79+
binascii.hexlify(item.value).decode(),
80+
)

nix/testenv.nix

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,5 +14,11 @@ pkgs.poetry2nix.mkPoetryEnv {
1414
nativeBuildInputs = (old.nativeBuildInputs or [ ]) ++ [ self.poetry ];
1515
}
1616
);
17+
18+
cprotobuf = super.cprotobuf.overridePythonAttrs (
19+
old: {
20+
nativeBuildInputs = (old.nativeBuildInputs or [ ]) ++ [ self.cython ];
21+
}
22+
);
1723
});
1824
}

0 commit comments

Comments
 (0)