Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tool: tree recovery #29

Merged
merged 4 commits into from
Aug 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions cmd/recovery/etc/config.yaml.example
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
Postgres:
DataSource: host=127.0.0.1 user=postgres password=Zkbas@123 dbname=zkbas port=5432 sslmode=disable

CacheRedis:
- Host: 127.0.0.1:6379
# Pass: myredis
Type: node

TreeDB:
Driver: memorydb
21 changes: 21 additions & 0 deletions cmd/recovery/internal/config/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package config

import (
"github.com/zeromicro/go-zero/core/logx"
"github.com/zeromicro/go-zero/core/stores/cache"

"github.com/bnb-chain/zkbas/pkg/treedb"
)

type Config struct {
Postgres struct {
DataSource string
}
CacheRedis cache.CacheConf
TreeDB struct {
Driver treedb.Driver
LevelDBOption treedb.LevelDBOption `json:",optional"`
RedisDBOption treedb.RedisDBOption `json:",optional"`
}
LogConf logx.LogConf
}
46 changes: 46 additions & 0 deletions cmd/recovery/internal/svc/servicecontext.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package svc

import (
"github.com/zeromicro/go-zero/core/logx"
"github.com/zeromicro/go-zero/core/stores/redis"
"github.com/zeromicro/go-zero/core/stores/sqlx"
"gorm.io/driver/postgres"
"gorm.io/gorm"

"github.com/bnb-chain/zkbas/cmd/recovery/internal/config"
"github.com/bnb-chain/zkbas/common/model/account"
"github.com/bnb-chain/zkbas/common/model/liquidity"
"github.com/bnb-chain/zkbas/common/model/nft"
)

type ServiceContext struct {
Config config.Config

AccountModel account.AccountModel
AccountHistoryModel account.AccountHistoryModel
LiquidityHistoryModel liquidity.LiquidityHistoryModel
NftHistoryModel nft.L2NftHistoryModel
}

func WithRedis(redisType string, redisPass string) redis.Option {
return func(p *redis.Redis) {
p.Type = redisType
p.Pass = redisPass
}
}

func NewServiceContext(c config.Config) *ServiceContext {
gormPointer, err := gorm.Open(postgres.Open(c.Postgres.DataSource))
if err != nil {
logx.Errorf("gorm connect db error, err = %s", err.Error())
}
conn := sqlx.NewSqlConn("postgres", c.Postgres.DataSource)

return &ServiceContext{
Config: c,
AccountModel: account.NewAccountModel(conn, c.CacheRedis, gormPointer),
AccountHistoryModel: account.NewAccountHistoryModel(conn, c.CacheRedis, gormPointer),
LiquidityHistoryModel: liquidity.NewLiquidityHistoryModel(conn, c.CacheRedis, gormPointer),
NftHistoryModel: nft.NewL2NftHistoryModel(conn, c.CacheRedis, gormPointer),
}
}
100 changes: 100 additions & 0 deletions cmd/recovery/recovery.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package main

import (
"flag"
"fmt"

bsmt "github.com/bnb-chain/bas-smt"
"github.com/zeromicro/go-zero/core/conf"
"github.com/zeromicro/go-zero/core/logx"
"github.com/zeromicro/go-zero/core/proc"

"github.com/bnb-chain/zkbas/cmd/recovery/internal/config"
"github.com/bnb-chain/zkbas/cmd/recovery/internal/svc"
"github.com/bnb-chain/zkbas/common/tree"
"github.com/bnb-chain/zkbas/pkg/treedb"
)

var (
configFile = flag.String("f", "./etc/recovery.yaml", "the config file")
blockHeight = flag.Int64("height", 0, "block height")
serviceName = flag.String("service", "", "service name(committer, witness)")
batchSize = flag.Int("batch", 1000, "batch size")
)

func main() {
flag.Parse()

var c config.Config
conf.MustLoad(*configFile, &c)
ctx := svc.NewServiceContext(c)
logx.MustSetup(c.LogConf)
logx.DisableStat()
proc.AddShutdownListener(func() {
logx.Close()
})

if *blockHeight < 0 {
fmt.Println("-height must be greater than 0")
flag.Usage()
return
}

if *batchSize <= 0 {
fmt.Println("-batch must be greater than 0")
flag.Usage()
return
}

if *serviceName == "" {
fmt.Println("-service must be set")
flag.Usage()
return
}

// init tree database
treeCtx := &treedb.Context{
Name: *serviceName,
Driver: c.TreeDB.Driver,
LevelDBOption: &c.TreeDB.LevelDBOption,
RedisDBOption: &c.TreeDB.RedisDBOption,
Reload: true,
}
treeCtx.SetOptions(bsmt.InitializeVersion(bsmt.Version(*blockHeight) - 1))
treeCtx.SetBatchReloadSize(*batchSize)
err := treedb.SetupTreeDB(treeCtx)
if err != nil {
logx.Errorf("Init tree database failed: %s", err)
return
}

// init accountTree and accountStateTrees
_, _, err = tree.InitAccountTree(
ctx.AccountModel,
ctx.AccountHistoryModel,
*blockHeight,
treeCtx,
)
if err != nil {
logx.Error("InitMerkleTree error:", err)
return
}
// init liquidityTree
_, err = tree.InitLiquidityTree(
ctx.LiquidityHistoryModel,
*blockHeight,
treeCtx)
if err != nil {
logx.Errorf("InitLiquidityTree error: %s", err.Error())
return
}
// init nftTree
_, err = tree.InitNftTree(
ctx.NftHistoryModel,
*blockHeight,
treeCtx)
if err != nil {
logx.Errorf("InitNftTree error: %s", err.Error())
return
}
}
42 changes: 20 additions & 22 deletions common/model/account/accountHistory.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ type (
GetAccountsList(limit int, offset int64) (accounts []*AccountHistory, err error)
GetAccountsTotalCount() (count int64, err error)
GetLatestAccountIndex() (accountIndex int64, err error)
GetValidAccounts(height int64) (rowsAffected int64, accounts []*AccountHistory, err error)
GetValidAccountNums(height int64) (accounts int64, err error)
GetValidAccounts(height int64, limit int, offset int) (rowsAffected int64, accounts []*AccountHistory, err error)
GetValidAccountCount(height int64) (accounts int64, err error)
GetLatestAccountInfoByAccountIndex(accountIndex int64) (account *AccountHistory, err error)
}

Expand Down Expand Up @@ -304,37 +304,35 @@ func (m *defaultAccountHistoryModel) CreateNewAccount(nAccount *AccountHistory)
return nil
}

func (m *defaultAccountHistoryModel) GetValidAccounts(height int64) (rowsAffected int64, accounts []*AccountHistory, err error) {
func (m *defaultAccountHistoryModel) GetValidAccounts(height int64, limit int, offset int) (rowsAffected int64, accounts []*AccountHistory, err error) {
subQuery := m.DB.Table(m.table).Select("*").
Where("account_index = a.account_index AND l2_block_height <= ? AND l2_block_height > a.l2_block_height AND l2_block_height != -1", height)

dbTx := m.DB.Table(m.table).
Raw("SELECT a.* FROM account_history a WHERE NOT EXISTS"+
"(SELECT * FROM account_history WHERE account_index = a.account_index AND l2_block_height <= ? AND l2_block_height > a.l2_block_height AND l2_block_height != -1) "+
"AND l2_block_height <= ? AND l2_block_height != -1 ORDER BY account_index", height, height).
Find(&accounts)
if dbTx.Error != nil {
dbTx := m.DB.Table(m.table+" as a").Select("*").
Where("NOT EXISTS (?) AND l2_block_height <= ? AND l2_block_height != -1", subQuery, height).
Limit(limit).Offset(offset).
Order("account_index")

if dbTx.Find(&accounts).Error != nil {
logx.Errorf("[GetValidAccounts] unable to get related accounts: %s", dbTx.Error.Error())
return 0, nil, dbTx.Error
}
return dbTx.RowsAffected, accounts, nil

}

type countResult struct {
Count int `json:"count"`
}
func (m *defaultAccountHistoryModel) GetValidAccountCount(height int64) (count int64, err error) {
subQuery := m.DB.Table(m.table).Select("*").
Where("account_index = a.account_index AND l2_block_height <= ? AND l2_block_height > a.l2_block_height AND l2_block_height != -1", height)

func (m *defaultAccountHistoryModel) GetValidAccountNums(height int64) (accounts int64, err error) {
var countResult countResult
dbTx := m.DB.Table(m.table).
Raw("SELECT count(a.*) FROM account_history a WHERE NOT EXISTS"+
"(SELECT * FROM account_history WHERE account_index = a.account_index AND l2_block_height <= ? AND l2_block_height > a.l2_block_height AND l2_block_height != -1) "+
"AND l2_block_height <= ? AND l2_block_height != -1", height, height).
Scan(&countResult)
if dbTx.Error != nil {
logx.Errorf("[GetValidAccountNums] unable to get related accounts: %s", dbTx.Error.Error())
dbTx := m.DB.Table(m.table+" as a").
Where("NOT EXISTS (?) AND l2_block_height <= ? AND l2_block_height != -1", subQuery, height)

if dbTx.Count(&count).Error != nil {
logx.Errorf("[GetValidAccountCount] unable to get related accounts: %s", dbTx.Error.Error())
return 0, dbTx.Error
}
return int64(countResult.Count), nil
return count, nil
}

func (m *defaultAccountHistoryModel) GetLatestAccountInfoByAccountIndex(accountIndex int64) (account *AccountHistory, err error) {
Expand Down
34 changes: 26 additions & 8 deletions common/model/liquidity/liquidityHistory.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ type (
CreateLiquidityHistory(liquidity *LiquidityHistory) error
CreateLiquidityHistoryInBatches(entities []*LiquidityHistory) error
GetAccountLiquidityHistoryByPairIndex(pairIndex int64) (entities []*LiquidityHistory, err error)
GetLatestLiquidityByBlockHeight(blockHeight int64) (entities []*LiquidityHistory, err error)
GetLatestLiquidityCountByBlockHeight(blockHeight int64) (count int64, err error)
GetLatestLiquidityByBlockHeight(blockHeight int64, limit int, offset int) (entities []*LiquidityHistory, err error)
GetLatestLiquidityByPairIndex(pairIndex int64) (entity *LiquidityHistory, err error)
}

Expand Down Expand Up @@ -156,13 +157,30 @@ func (m *defaultLiquidityHistoryModel) GetAccountLiquidityHistoryByPairIndex(pai
return entities, nil
}

func (m *defaultLiquidityHistoryModel) GetLatestLiquidityByBlockHeight(blockHeight int64) (entities []*LiquidityHistory, err error) {
dbTx := m.DB.Table(m.table).
Raw("SELECT a.* FROM liquidity_history a WHERE NOT EXISTS"+
"(SELECT * FROM liquidity_history WHERE pair_index = a.pair_index AND l2_block_height <= ? AND l2_block_height > a.l2_block_height) "+
"AND l2_block_height <= ? ORDER BY pair_index", blockHeight, blockHeight).
Find(&entities)
if dbTx.Error != nil {
func (m *defaultLiquidityHistoryModel) GetLatestLiquidityCountByBlockHeight(blockHeight int64) (count int64, err error) {
subQuery := m.DB.Table(m.table).Select("*").
Where("pair_index = a.pair_index AND l2_block_height <= ? AND l2_block_height > a.l2_block_height", blockHeight)

dbTx := m.DB.Table(m.table+" as a").
Where("NOT EXISTS (?) AND l2_block_height <= ?", subQuery, blockHeight)

if dbTx.Count(&count).Error != nil {
logx.Errorf("[GetLatestLiquidityCountByBlockHeight] unable to get related accounts: %s", dbTx.Error.Error())
return 0, dbTx.Error
}
return count, nil
}

func (m *defaultLiquidityHistoryModel) GetLatestLiquidityByBlockHeight(blockHeight int64, limit int, offset int) (entities []*LiquidityHistory, err error) {
subQuery := m.DB.Table(m.table).Select("*").
Where("pair_index = a.pair_index AND l2_block_height <= ? AND l2_block_height > a.l2_block_height", blockHeight)

dbTx := m.DB.Table(m.table+" as a").Select("*").
Where("NOT EXISTS (?) AND l2_block_height <= ?", subQuery, blockHeight).
Limit(limit).Offset(offset).
Order("pair_index")

if dbTx.Find(&entities).Error != nil {
logx.Errorf("[GetValidAccounts] unable to get related accounts: %s", dbTx.Error.Error())
return nil, errorcode.DbErrSqlOperation
} else if dbTx.RowsAffected == 0 {
Expand Down
40 changes: 31 additions & 9 deletions common/model/nft/nftHistory.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,10 @@ type (
L2NftHistoryModel interface {
CreateL2NftHistoryTable() error
DropL2NftHistoryTable() error
GetLatestNftAssetsByBlockHeight(height int64) (
GetLatestNftAssetCountByBlockHeight(height int64) (
count int64, err error,
)
GetLatestNftAssetsByBlockHeight(height int64, limit int, offset int) (
rowsAffected int64, nftAssets []*L2NftHistory, err error,
)
GetLatestNftAsset(nftIndex int64) (
Expand Down Expand Up @@ -94,16 +97,35 @@ func (m *defaultL2NftHistoryModel) DropL2NftHistoryTable() error {
return m.DB.Migrator().DropTable(m.table)
}

func (m *defaultL2NftHistoryModel) GetLatestNftAssetsByBlockHeight(height int64) (
func (m *defaultL2NftHistoryModel) GetLatestNftAssetCountByBlockHeight(height int64) (
count int64, err error,
) {
subQuery := m.DB.Table(m.table).Select("*").
Where("nft_index = a.nft_index AND l2_block_height <= ? AND l2_block_height > a.l2_block_height", height)

dbTx := m.DB.Table(m.table+" as a").
Where("NOT EXISTS (?) AND l2_block_height <= ?", subQuery, height)

if dbTx.Count(&count).Error != nil {
logx.Errorf("[GetLatestNftAssetCountByBlockHeight] unable to get related nft assets: %s", dbTx.Error.Error())
return 0, dbTx.Error
}

return count, nil
}

func (m *defaultL2NftHistoryModel) GetLatestNftAssetsByBlockHeight(height int64, limit int, offset int) (
rowsAffected int64, accountNftAssets []*L2NftHistory, err error,
) {
// TODO sql
dbTx := m.DB.Table(m.table).
Raw("SELECT a.* FROM l2_nft_history a WHERE NOT EXISTS"+
"(SELECT * FROM l2_nft_history WHERE nft_index = a.nft_index AND l2_block_height <= ? AND l2_block_height > a.l2_block_height) "+
"AND l2_block_height <= ? ORDER BY nft_index", height, height).
Find(&accountNftAssets)
if dbTx.Error != nil {
subQuery := m.DB.Table(m.table).Select("*").
Where("nft_index = a.nft_index AND l2_block_height <= ? AND l2_block_height > a.l2_block_height", height)

dbTx := m.DB.Table(m.table+" as a").Select("*").
Where("NOT EXISTS (?) AND l2_block_height <= ?", subQuery, height).
Limit(limit).Offset(offset).
Order("nft_index")

if dbTx.Find(&accountNftAssets).Error != nil {
logx.Errorf("[GetLatestNftAssetsByBlockHeight] unable to get related nft assets: %s", dbTx.Error.Error())
return 0, nil, dbTx.Error
}
Expand Down
Loading