Skip to content

Commit

Permalink
Compute wip batch during stream delta calculation (#2861)
Browse files Browse the repository at this point in the history
* Compute wip batch during stream delta calculation (#2860)

* compute wip batch during stream delta calculation

* do not generate GER update for WIP Batch

* fix datastream bool
  • Loading branch information
ToniRamirezM authored Nov 29, 2023
1 parent 5e5cfb1 commit 8be247e
Show file tree
Hide file tree
Showing 6 changed files with 26 additions and 18 deletions.
2 changes: 1 addition & 1 deletion sequencer/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ type stateInterface interface {
GetStoredFlushID(ctx context.Context) (uint64, string, error)
GetForkIDByBatchNumber(batchNumber uint64) uint64
GetDSGenesisBlock(ctx context.Context, dbTx pgx.Tx) (*state.DSL2Block, error)
GetDSBatches(ctx context.Context, firstBatchNumber, lastBatchNumber uint64, dbTx pgx.Tx) ([]*state.DSBatch, error)
GetDSBatches(ctx context.Context, firstBatchNumber, lastBatchNumber uint64, readWIPBatch bool, dbTx pgx.Tx) ([]*state.DSBatch, error)
GetDSL2Blocks(ctx context.Context, firstBatchNumber, lastBatchNumber uint64, dbTx pgx.Tx) ([]*state.DSL2Block, error)
GetDSL2Transactions(ctx context.Context, firstL2Block, lastL2Block uint64, dbTx pgx.Tx) ([]*state.DSL2Transaction, error)
}
Expand Down
18 changes: 9 additions & 9 deletions sequencer/mock_state.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion sequencer/sequencer.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func (s *Sequencer) Start(ctx context.Context) {
}

func (s *Sequencer) updateDataStreamerFile(ctx context.Context, streamServer *datastreamer.StreamServer) {
err := state.GenerateDataStreamerFile(ctx, streamServer, s.state)
err := state.GenerateDataStreamerFile(ctx, streamServer, s.state, true)
if err != nil {
log.Fatalf("failed to generate data streamer file, err: %v", err)
}
Expand Down
10 changes: 7 additions & 3 deletions state/datastream.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,13 +199,13 @@ func (g DSUpdateGER) Decode(data []byte) DSUpdateGER {
// DSState gathers the methods required to interact with the data stream state.
type DSState interface {
GetDSGenesisBlock(ctx context.Context, dbTx pgx.Tx) (*DSL2Block, error)
GetDSBatches(ctx context.Context, firstBatchNumber, lastBatchNumber uint64, dbTx pgx.Tx) ([]*DSBatch, error)
GetDSBatches(ctx context.Context, firstBatchNumber, lastBatchNumber uint64, readWIPBatch bool, dbTx pgx.Tx) ([]*DSBatch, error)
GetDSL2Blocks(ctx context.Context, firstBatchNumber, lastBatchNumber uint64, dbTx pgx.Tx) ([]*DSL2Block, error)
GetDSL2Transactions(ctx context.Context, firstL2Block, lastL2Block uint64, dbTx pgx.Tx) ([]*DSL2Transaction, error)
}

// GenerateDataStreamerFile generates or resumes a data stream file
func GenerateDataStreamerFile(ctx context.Context, streamServer *datastreamer.StreamServer, stateDB DSState) error {
func GenerateDataStreamerFile(ctx context.Context, streamServer *datastreamer.StreamServer, stateDB DSState, readWIPBatch bool) error {
header := streamServer.GetHeader()

var currentBatchNumber uint64 = 0
Expand Down Expand Up @@ -314,7 +314,7 @@ func GenerateDataStreamerFile(ctx context.Context, streamServer *datastreamer.St
log.Debugf("Current entry number: %d", entry)
log.Debugf("Current batch number: %d", currentBatchNumber)
// Get Next Batch
batches, err := stateDB.GetDSBatches(ctx, currentBatchNumber, currentBatchNumber+limit, nil)
batches, err := stateDB.GetDSBatches(ctx, currentBatchNumber, currentBatchNumber+limit, readWIPBatch, nil)
if err != nil {
if err == ErrStateNotSynchronized {
break
Expand Down Expand Up @@ -352,6 +352,10 @@ func GenerateDataStreamerFile(ctx context.Context, streamServer *datastreamer.St
for _, batch := range fullBatches {
if len(batch.L2Blocks) == 0 {
// Empty batch
// Is WIP Batch?
if batch.StateRoot == (common.Hash{}) {
continue
}
// Check if there is a GER update
if batch.GlobalExitRoot != currentGER && batch.GlobalExitRoot != (common.Hash{}) {
updateGer := DSUpdateGER{
Expand Down
10 changes: 7 additions & 3 deletions state/pgstatestorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -2842,11 +2842,15 @@ func scanDSL2Transaction(row pgx.Row) (*DSL2Transaction, error) {
}

// GetDSBatches returns the DS batches
func (p *PostgresStorage) GetDSBatches(ctx context.Context, firstBatchNumber, lastBatchNumber uint64, dbTx pgx.Tx) ([]*DSBatch, error) {
const getBatchByNumberSQL = `
func (p *PostgresStorage) GetDSBatches(ctx context.Context, firstBatchNumber, lastBatchNumber uint64, readWIPBatch bool, dbTx pgx.Tx) ([]*DSBatch, error) {
var getBatchByNumberSQL = `
SELECT b.batch_num, b.global_exit_root, b.local_exit_root, b.acc_input_hash, b.state_root, b.timestamp, b.coinbase, b.raw_txs_data, b.forced_batch_num, f.fork_id
FROM state.batch b, state.fork_id f
WHERE b.state_root is not null AND b.batch_num >= $1 AND b.batch_num <= $2 AND batch_num between f.from_batch_num AND f.to_batch_num`
WHERE b.batch_num >= $1 AND b.batch_num <= $2 AND batch_num between f.from_batch_num AND f.to_batch_num`

if !readWIPBatch {
getBatchByNumberSQL += " AND b.state_root is not null"
}

e := p.getExecQuerier(dbTx)
rows, err := e.Query(ctx, getBatchByNumberSQL, firstBatchNumber, lastBatchNumber)
Expand Down
2 changes: 1 addition & 1 deletion tools/datastreamer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ func generate(cliCtx *cli.Context) error {
stateDB := state.NewPostgresStorage(state.Config{}, stateSqlDB)
log.Debug("Connected to the database")

err = state.GenerateDataStreamerFile(cliCtx.Context, streamServer, stateDB)
err = state.GenerateDataStreamerFile(cliCtx.Context, streamServer, stateDB, false)
if err != nil {
fmt.Printf("Error: %v\n", err)
os.Exit(1)
Expand Down

0 comments on commit 8be247e

Please sign in to comment.