Skip to content
This repository has been archived by the owner on Feb 17, 2025. It is now read-only.

Commit

Permalink
Cherrypick/fix convert process batch response (#2479)
Browse files Browse the repository at this point in the history
* remove need of decoded txs to process a batch response

* fix

* fix typo

* fix

* fix fb convert handling

* handle invalid rlp error

* improve logs

* another fix

* change IsStateRootChanged

* fix INVALID_RLP

* new rlp error

---------

Co-authored-by: Alonso <ARR551@protonmail.com>
  • Loading branch information
ToniRamirezM and ARR552 authored Sep 5, 2023
1 parent 1da2eea commit 285ff0f
Show file tree
Hide file tree
Showing 6 changed files with 74 additions and 70 deletions.
19 changes: 12 additions & 7 deletions sequencer/finalizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -735,8 +735,9 @@ func (f *finalizer) handleForcedTxsProcessResp(ctx context.Context, request stat
// Handle Transaction Error
if txResp.RomError != nil {
romErr := executor.RomErrorCode(txResp.RomError)
if executor.IsIntrinsicError(romErr) {
// If we have an intrinsic error, we should continue processing the batch, but skip the transaction
if executor.IsIntrinsicError(romErr) || romErr == executor.RomError_ROM_ERROR_INVALID_RLP {
// If we have an intrinsic error or the RLP is invalid
// we should continue processing the batch, but skip the transaction
log.Errorf("handleForcedTxsProcessResp: ROM error: %s", txResp.RomError)
continue
}
Expand Down Expand Up @@ -1006,12 +1007,16 @@ func (f *finalizer) processForcedBatch(ctx context.Context, lastBatchNumberInSta

if len(response.Responses) > 0 && !response.IsRomOOCError {
for _, txResponse := range response.Responses {
sender, err := state.GetSender(txResponse.Tx)
if err != nil {
log.Warnf("failed trying to add forced tx (%s) to worker. Error getting sender from tx, Err: %v", txResponse.TxHash, err)
continue
if !errors.Is(txResponse.RomError, executor.RomErr(executor.RomError_ROM_ERROR_INVALID_RLP)) {
sender, err := state.GetSender(txResponse.Tx)
if err != nil {
log.Warnf("failed trying to add forced tx (%s) to worker. Error getting sender from tx, Err: %v", txResponse.TxHash, err)
continue
}
f.worker.AddForcedTx(txResponse.TxHash, sender)
} else {
log.Warnf("ROM_ERROR_INVALID_RLP error received from executor for forced batch %d", forcedBatch.ForcedBatchNumber)
}
f.worker.AddForcedTx(txResponse.TxHash, sender)
}

f.handleForcedTxsProcessResp(ctx, request, response, stateRoot)
Expand Down
24 changes: 3 additions & 21 deletions state/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,17 +149,7 @@ func (s *State) ProcessSequencerBatch(ctx context.Context, batchNumber uint64, b
return nil, err
}

txs := []types.Transaction{}
forkID := s.GetForkIDByBatchNumber(batchNumber)

if processBatchResponse.Responses != nil && len(processBatchResponse.Responses) > 0 {
txs, _, _, err = DecodeTxs(batchL2Data, forkID)
if err != nil && !errors.Is(err, ErrInvalidData) {
return nil, err
}
}

result, err := s.convertToProcessBatchResponse(txs, processBatchResponse)
result, err := s.convertToProcessBatchResponse(processBatchResponse)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -198,14 +188,8 @@ func (s *State) ProcessBatch(ctx context.Context, request ProcessRequest, update
return nil, err
}

txs, _, effP, err := DecodeTxs(request.Transactions, forkID)
if err != nil && !errors.Is(err, ErrInvalidData) {
return nil, err
}
log.Infof("ProcessBatch: %d txs, %#v effP", len(txs), effP)

var result *ProcessBatchResponse
result, err = s.convertToProcessBatchResponse(txs, res)
result, err = s.convertToProcessBatchResponse(res)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -456,16 +440,14 @@ func (s *State) ProcessAndStoreClosedBatch(ctx context.Context, processingCtx Pr
// Remove unprocessed tx
if i == len(processed.Responses)-1 {
processed.Responses = processed.Responses[:i]
decodedTransactions = decodedTransactions[:i]
} else {
processed.Responses = append(processed.Responses[:i], processed.Responses[i+1:]...)
decodedTransactions = append(decodedTransactions[:i], decodedTransactions[i+1:]...)
}
i--
}
}

processedBatch, err := s.convertToProcessBatchResponse(decodedTransactions, processed)
processedBatch, err := s.convertToProcessBatchResponse(processed)
if err != nil {
return common.Hash{}, noFlushID, noProverID, err
}
Expand Down
78 changes: 45 additions & 33 deletions state/converters.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,12 @@ func ConvertToCounters(resp *executor.ProcessBatchResponse) ZKCounters {
}

// TestConvertToProcessBatchResponse for test purposes
func (s *State) TestConvertToProcessBatchResponse(txs []types.Transaction, response *executor.ProcessBatchResponse) (*ProcessBatchResponse, error) {
return s.convertToProcessBatchResponse(txs, response)
func (s *State) TestConvertToProcessBatchResponse(response *executor.ProcessBatchResponse) (*ProcessBatchResponse, error) {
return s.convertToProcessBatchResponse(response)
}

func (s *State) convertToProcessBatchResponse(txs []types.Transaction, response *executor.ProcessBatchResponse) (*ProcessBatchResponse, error) {
responses, err := s.convertToProcessTransactionResponse(txs, response.Responses)
func (s *State) convertToProcessBatchResponse(response *executor.ProcessBatchResponse) (*ProcessBatchResponse, error) {
responses, err := s.convertToProcessTransactionResponse(response.Responses)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -86,7 +86,7 @@ func (s *State) convertToProcessBatchResponse(txs []types.Transaction, response

// IsStateRootChanged returns true if the transaction changes the state root
func IsStateRootChanged(err executor.RomError) bool {
return !executor.IsIntrinsicError(err) && !executor.IsROMOutOfCountersError(err)
return !executor.IsIntrinsicError(err) && !executor.IsROMOutOfCountersError(err) && err != executor.RomError_ROM_ERROR_INVALID_RLP
}

func convertToReadWriteAddresses(addresses map[string]*executor.InfoReadWrite) (map[common.Address]*InfoReadWrite, error) {
Expand Down Expand Up @@ -123,9 +123,9 @@ func convertToReadWriteAddresses(addresses map[string]*executor.InfoReadWrite) (
return results, nil
}

func (s *State) convertToProcessTransactionResponse(txs []types.Transaction, responses []*executor.ProcessTransactionResponse) ([]*ProcessTransactionResponse, error) {
func (s *State) convertToProcessTransactionResponse(responses []*executor.ProcessTransactionResponse) ([]*ProcessTransactionResponse, error) {
results := make([]*ProcessTransactionResponse, 0, len(responses))
for i, response := range responses {
for _, response := range responses {
trace, err := convertToStructLogArray(response.ExecutionTrace)
if err != nil {
return nil, err
Expand All @@ -151,39 +151,51 @@ func (s *State) convertToProcessTransactionResponse(txs []types.Transaction, res
result.CallTrace = *callTrace
result.EffectiveGasPrice = response.EffectiveGasPrice
result.EffectivePercentage = response.EffectivePercentage
result.Tx = txs[i]

_, err = DecodeTx(common.Bytes2Hex(response.GetRlpTx()))
if err != nil {
timestamp := time.Now()
log.Errorf("error decoding rlp returned by executor %v at %v", err, timestamp)

event := &event.Event{
ReceivedAt: timestamp,
Source: event.Source_Node,
Level: event.Level_Error,
EventID: event.EventID_ExecutorRLPError,
Json: string(response.GetRlpTx()),
}
tx := new(types.Transaction)

err = s.eventLog.LogEvent(context.Background(), event)
if response.Error != executor.RomError_ROM_ERROR_INVALID_RLP && len(response.GetRlpTx()) > 0 {
tx, err = DecodeTx(common.Bytes2Hex(response.GetRlpTx()))
if err != nil {
log.Errorf("error storing payload: %v", err)
timestamp := time.Now()
log.Errorf("error decoding rlp returned by executor %v at %v", err, timestamp)

event := &event.Event{
ReceivedAt: timestamp,
Source: event.Source_Node,
Level: event.Level_Error,
EventID: event.EventID_ExecutorRLPError,
Json: string(response.GetRlpTx()),
}

eventErr := s.eventLog.LogEvent(context.Background(), event)
if eventErr != nil {
log.Errorf("error storing payload: %v", err)
}

return nil, err
}
} else {
log.Warnf("ROM_ERROR_INVALID_RLP returned by the executor")
}

if tx != nil {
result.Tx = *tx
log.Debugf("ProcessTransactionResponse[TxHash]: %v", result.TxHash)
if response.Error == executor.RomError_ROM_ERROR_NO_ERROR {
log.Debugf("ProcessTransactionResponse[Nonce]: %v", result.Tx.Nonce())
}
log.Debugf("ProcessTransactionResponse[StateRoot]: %v", result.StateRoot.String())
log.Debugf("ProcessTransactionResponse[Error]: %v", result.RomError)
log.Debugf("ProcessTransactionResponse[GasUsed]: %v", result.GasUsed)
log.Debugf("ProcessTransactionResponse[GasLeft]: %v", result.GasLeft)
log.Debugf("ProcessTransactionResponse[GasRefunded]: %v", result.GasRefunded)
log.Debugf("ProcessTransactionResponse[ChangesStateRoot]: %v", result.ChangesStateRoot)
log.Debugf("ProcessTransactionResponse[EffectiveGasPrice]: %v", result.EffectiveGasPrice)
log.Debugf("ProcessTransactionResponse[EffectivePercentage]: %v", result.EffectivePercentage)
}

results = append(results, result)

log.Debugf("ProcessTransactionResponse[TxHash]: %v", result.TxHash)
log.Debugf("ProcessTransactionResponse[Nonce]: %v", result.Tx.Nonce())
log.Debugf("ProcessTransactionResponse[StateRoot]: %v", result.StateRoot.String())
log.Debugf("ProcessTransactionResponse[Error]: %v", result.RomError)
log.Debugf("ProcessTransactionResponse[GasUsed]: %v", result.GasUsed)
log.Debugf("ProcessTransactionResponse[GasLeft]: %v", result.GasLeft)
log.Debugf("ProcessTransactionResponse[GasRefunded]: %v", result.GasRefunded)
log.Debugf("ProcessTransactionResponse[ChangesStateRoot]: %v", result.ChangesStateRoot)
log.Debugf("ProcessTransactionResponse[EffectiveGasPrice]: %v", result.EffectiveGasPrice)
log.Debugf("ProcessTransactionResponse[EffectivePercentage]: %v", result.EffectivePercentage)
}

return results, nil
Expand Down
6 changes: 3 additions & 3 deletions state/state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1071,7 +1071,7 @@ func TestExecutorTransfer(t *testing.T) {
require.Equal(t, "21002", data.Balance)

// Read Modified Addresses from converted response
converted, err := testState.TestConvertToProcessBatchResponse([]types.Transaction{*signedTx}, processBatchResponse)
converted, err := testState.TestConvertToProcessBatchResponse(processBatchResponse)
require.NoError(t, err)
convertedData := converted.ReadWriteAddresses[receiverAddress]
require.Equal(t, uint64(21002), convertedData.Balance.Uint64())
Expand Down Expand Up @@ -2037,7 +2037,7 @@ func TestExecutorEstimateGas(t *testing.T) {
require.NoError(t, err)
assert.NotEqual(t, "", processBatchResponse.Responses[0].Error)

convertedResponse, err := testState.TestConvertToProcessBatchResponse([]types.Transaction{*signedTx0, *signedTx1}, processBatchResponse)
convertedResponse, err := testState.TestConvertToProcessBatchResponse(processBatchResponse)
require.NoError(t, err)
log.Debugf("%v", len(convertedResponse.Responses))

Expand Down Expand Up @@ -2411,7 +2411,7 @@ func TestExecutorGasEstimationMultisig(t *testing.T) {
require.Equal(t, uint64(1000000000), balance.Uint64())

// Preparation to be able to estimate gas
convertedResponse, err := testState.TestConvertToProcessBatchResponse(transactions, processBatchResponse)
convertedResponse, err := testState.TestConvertToProcessBatchResponse(processBatchResponse)
require.NoError(t, err)
log.Debugf("%v", len(convertedResponse.Responses))

Expand Down
8 changes: 5 additions & 3 deletions state/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func (s *State) StoreTransactions(ctx context.Context, batchNumber uint64, proce
// if the transaction has an intrinsic invalid tx error it means
// the transaction has not changed the state, so we don't store it
// and just move to the next
if executor.IsIntrinsicError(executor.RomErrorCode(processedTx.RomError)) {
if executor.IsIntrinsicError(executor.RomErrorCode(processedTx.RomError)) || errors.Is(processedTx.RomError, executor.RomErr(executor.RomError_ROM_ERROR_INVALID_RLP)) {
continue
}

Expand Down Expand Up @@ -300,6 +300,8 @@ func (s *State) DebugTransaction(ctx context.Context, transactionHash common.Has
return nil, err
}

// Transactions are decoded only for logging purposes
// as they are not longer needed in the convertToProcessBatchResponse function
txs, _, _, err := DecodeTxs(batchL2Data, forkId)
if err != nil && !errors.Is(err, ErrInvalidData) {
return nil, err
Expand All @@ -309,7 +311,7 @@ func (s *State) DebugTransaction(ctx context.Context, transactionHash common.Has
log.Debugf(tx.Hash().String())
}

convertedResponse, err := s.convertToProcessBatchResponse(txs, processBatchResponse)
convertedResponse, err := s.convertToProcessBatchResponse(processBatchResponse)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -874,7 +876,7 @@ func (s *State) internalProcessUnsignedTransaction(ctx context.Context, tx *type
return nil, err
}

response, err := s.convertToProcessBatchResponse([]types.Transaction{*tx}, processBatchResponse)
response, err := s.convertToProcessBatchResponse(processBatchResponse)
if err != nil {
return nil, err
}
Expand Down
9 changes: 6 additions & 3 deletions synchronizer/synchronizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/0xPolygonHermez/zkevm-node/log"
"github.com/0xPolygonHermez/zkevm-node/state"
stateMetrics "github.com/0xPolygonHermez/zkevm-node/state/metrics"
"github.com/0xPolygonHermez/zkevm-node/state/runtime/executor"
"github.com/0xPolygonHermez/zkevm-node/synchronizer/metrics"
"github.com/ethereum/go-ethereum/common"
ethTypes "github.com/ethereum/go-ethereum/core/types"
Expand Down Expand Up @@ -1483,9 +1484,11 @@ func (s *ClientSynchronizer) processAndStoreTxs(trustedBatch *types.Batch, reque

log.Debugf("Storing transactions %d for batch %v", len(processBatchResp.Responses), trustedBatch.Number)
for _, tx := range processBatchResp.Responses {
if err = s.state.StoreTransaction(s.ctx, uint64(trustedBatch.Number), tx, trustedBatch.Coinbase, uint64(trustedBatch.Timestamp), dbTx); err != nil {
log.Errorf("failed to store transactions for batch: %v", trustedBatch.Number)
return nil, err
if !errors.Is(tx.RomError, executor.RomErr(executor.RomError_ROM_ERROR_INVALID_RLP)) {
if err = s.state.StoreTransaction(s.ctx, uint64(trustedBatch.Number), tx, trustedBatch.Coinbase, uint64(trustedBatch.Timestamp), dbTx); err != nil {
log.Errorf("failed to store transactions for batch: %v", trustedBatch.Number)
return nil, err
}
}
}
return processBatchResp, nil
Expand Down

0 comments on commit 285ff0f

Please sign in to comment.