From 285ff0fdf1031b3e6516e67686e3a732d095eaaf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Toni=20Ram=C3=ADrez?= <58293609+ToniRamirezM@users.noreply.github.com> Date: Tue, 5 Sep 2023 16:51:24 +0200 Subject: [PATCH] Cherrypick/fix convert process batch response (#2479) * remove need of decoded txs to process a batch response * fix * fix typo * fix * fix fb convert handling * handle invalid rlp error * improve logs * another fix * change IsStateRootChanged * fix INVALID_RLP * new rlp error --------- Co-authored-by: Alonso --- sequencer/finalizer.go | 19 +++++---- state/batch.go | 24 ++--------- state/converters.go | 78 +++++++++++++++++++++--------------- state/state_test.go | 6 +-- state/transaction.go | 8 ++-- synchronizer/synchronizer.go | 9 +++-- 6 files changed, 74 insertions(+), 70 deletions(-) diff --git a/sequencer/finalizer.go b/sequencer/finalizer.go index c2f76a725c..4cb53a39da 100644 --- a/sequencer/finalizer.go +++ b/sequencer/finalizer.go @@ -735,8 +735,9 @@ func (f *finalizer) handleForcedTxsProcessResp(ctx context.Context, request stat // Handle Transaction Error if txResp.RomError != nil { romErr := executor.RomErrorCode(txResp.RomError) - if executor.IsIntrinsicError(romErr) { - // If we have an intrinsic error, we should continue processing the batch, but skip the transaction + if executor.IsIntrinsicError(romErr) || romErr == executor.RomError_ROM_ERROR_INVALID_RLP { + // If we have an intrinsic error or the RLP is invalid + // we should continue processing the batch, but skip the transaction log.Errorf("handleForcedTxsProcessResp: ROM error: %s", txResp.RomError) continue } @@ -1006,12 +1007,16 @@ func (f *finalizer) processForcedBatch(ctx context.Context, lastBatchNumberInSta if len(response.Responses) > 0 && !response.IsRomOOCError { for _, txResponse := range response.Responses { - sender, err := state.GetSender(txResponse.Tx) - if err != nil { - log.Warnf("failed trying to add forced tx (%s) to worker. Error getting sender from tx, Err: %v", txResponse.TxHash, err) - continue + if !errors.Is(txResponse.RomError, executor.RomErr(executor.RomError_ROM_ERROR_INVALID_RLP)) { + sender, err := state.GetSender(txResponse.Tx) + if err != nil { + log.Warnf("failed trying to add forced tx (%s) to worker. Error getting sender from tx, Err: %v", txResponse.TxHash, err) + continue + } + f.worker.AddForcedTx(txResponse.TxHash, sender) + } else { + log.Warnf("ROM_ERROR_INVALID_RLP error received from executor for forced batch %d", forcedBatch.ForcedBatchNumber) } - f.worker.AddForcedTx(txResponse.TxHash, sender) } f.handleForcedTxsProcessResp(ctx, request, response, stateRoot) diff --git a/state/batch.go b/state/batch.go index d18c6b0c42..1201f20387 100644 --- a/state/batch.go +++ b/state/batch.go @@ -149,17 +149,7 @@ func (s *State) ProcessSequencerBatch(ctx context.Context, batchNumber uint64, b return nil, err } - txs := []types.Transaction{} - forkID := s.GetForkIDByBatchNumber(batchNumber) - - if processBatchResponse.Responses != nil && len(processBatchResponse.Responses) > 0 { - txs, _, _, err = DecodeTxs(batchL2Data, forkID) - if err != nil && !errors.Is(err, ErrInvalidData) { - return nil, err - } - } - - result, err := s.convertToProcessBatchResponse(txs, processBatchResponse) + result, err := s.convertToProcessBatchResponse(processBatchResponse) if err != nil { return nil, err } @@ -198,14 +188,8 @@ func (s *State) ProcessBatch(ctx context.Context, request ProcessRequest, update return nil, err } - txs, _, effP, err := DecodeTxs(request.Transactions, forkID) - if err != nil && !errors.Is(err, ErrInvalidData) { - return nil, err - } - log.Infof("ProcessBatch: %d txs, %#v effP", len(txs), effP) - var result *ProcessBatchResponse - result, err = s.convertToProcessBatchResponse(txs, res) + result, err = s.convertToProcessBatchResponse(res) if err != nil { return nil, err } @@ -456,16 +440,14 @@ func (s *State) ProcessAndStoreClosedBatch(ctx context.Context, processingCtx Pr // Remove unprocessed tx if i == len(processed.Responses)-1 { processed.Responses = processed.Responses[:i] - decodedTransactions = decodedTransactions[:i] } else { processed.Responses = append(processed.Responses[:i], processed.Responses[i+1:]...) - decodedTransactions = append(decodedTransactions[:i], decodedTransactions[i+1:]...) } i-- } } - processedBatch, err := s.convertToProcessBatchResponse(decodedTransactions, processed) + processedBatch, err := s.convertToProcessBatchResponse(processed) if err != nil { return common.Hash{}, noFlushID, noProverID, err } diff --git a/state/converters.go b/state/converters.go index 9881d78567..951ed6de6c 100644 --- a/state/converters.go +++ b/state/converters.go @@ -32,12 +32,12 @@ func ConvertToCounters(resp *executor.ProcessBatchResponse) ZKCounters { } // TestConvertToProcessBatchResponse for test purposes -func (s *State) TestConvertToProcessBatchResponse(txs []types.Transaction, response *executor.ProcessBatchResponse) (*ProcessBatchResponse, error) { - return s.convertToProcessBatchResponse(txs, response) +func (s *State) TestConvertToProcessBatchResponse(response *executor.ProcessBatchResponse) (*ProcessBatchResponse, error) { + return s.convertToProcessBatchResponse(response) } -func (s *State) convertToProcessBatchResponse(txs []types.Transaction, response *executor.ProcessBatchResponse) (*ProcessBatchResponse, error) { - responses, err := s.convertToProcessTransactionResponse(txs, response.Responses) +func (s *State) convertToProcessBatchResponse(response *executor.ProcessBatchResponse) (*ProcessBatchResponse, error) { + responses, err := s.convertToProcessTransactionResponse(response.Responses) if err != nil { return nil, err } @@ -86,7 +86,7 @@ func (s *State) convertToProcessBatchResponse(txs []types.Transaction, response // IsStateRootChanged returns true if the transaction changes the state root func IsStateRootChanged(err executor.RomError) bool { - return !executor.IsIntrinsicError(err) && !executor.IsROMOutOfCountersError(err) + return !executor.IsIntrinsicError(err) && !executor.IsROMOutOfCountersError(err) && err != executor.RomError_ROM_ERROR_INVALID_RLP } func convertToReadWriteAddresses(addresses map[string]*executor.InfoReadWrite) (map[common.Address]*InfoReadWrite, error) { @@ -123,9 +123,9 @@ func convertToReadWriteAddresses(addresses map[string]*executor.InfoReadWrite) ( return results, nil } -func (s *State) convertToProcessTransactionResponse(txs []types.Transaction, responses []*executor.ProcessTransactionResponse) ([]*ProcessTransactionResponse, error) { +func (s *State) convertToProcessTransactionResponse(responses []*executor.ProcessTransactionResponse) ([]*ProcessTransactionResponse, error) { results := make([]*ProcessTransactionResponse, 0, len(responses)) - for i, response := range responses { + for _, response := range responses { trace, err := convertToStructLogArray(response.ExecutionTrace) if err != nil { return nil, err @@ -151,39 +151,51 @@ func (s *State) convertToProcessTransactionResponse(txs []types.Transaction, res result.CallTrace = *callTrace result.EffectiveGasPrice = response.EffectiveGasPrice result.EffectivePercentage = response.EffectivePercentage - result.Tx = txs[i] - _, err = DecodeTx(common.Bytes2Hex(response.GetRlpTx())) - if err != nil { - timestamp := time.Now() - log.Errorf("error decoding rlp returned by executor %v at %v", err, timestamp) - - event := &event.Event{ - ReceivedAt: timestamp, - Source: event.Source_Node, - Level: event.Level_Error, - EventID: event.EventID_ExecutorRLPError, - Json: string(response.GetRlpTx()), - } + tx := new(types.Transaction) - err = s.eventLog.LogEvent(context.Background(), event) + if response.Error != executor.RomError_ROM_ERROR_INVALID_RLP && len(response.GetRlpTx()) > 0 { + tx, err = DecodeTx(common.Bytes2Hex(response.GetRlpTx())) if err != nil { - log.Errorf("error storing payload: %v", err) + timestamp := time.Now() + log.Errorf("error decoding rlp returned by executor %v at %v", err, timestamp) + + event := &event.Event{ + ReceivedAt: timestamp, + Source: event.Source_Node, + Level: event.Level_Error, + EventID: event.EventID_ExecutorRLPError, + Json: string(response.GetRlpTx()), + } + + eventErr := s.eventLog.LogEvent(context.Background(), event) + if eventErr != nil { + log.Errorf("error storing payload: %v", err) + } + + return nil, err + } + } else { + log.Warnf("ROM_ERROR_INVALID_RLP returned by the executor") + } + + if tx != nil { + result.Tx = *tx + log.Debugf("ProcessTransactionResponse[TxHash]: %v", result.TxHash) + if response.Error == executor.RomError_ROM_ERROR_NO_ERROR { + log.Debugf("ProcessTransactionResponse[Nonce]: %v", result.Tx.Nonce()) } + log.Debugf("ProcessTransactionResponse[StateRoot]: %v", result.StateRoot.String()) + log.Debugf("ProcessTransactionResponse[Error]: %v", result.RomError) + log.Debugf("ProcessTransactionResponse[GasUsed]: %v", result.GasUsed) + log.Debugf("ProcessTransactionResponse[GasLeft]: %v", result.GasLeft) + log.Debugf("ProcessTransactionResponse[GasRefunded]: %v", result.GasRefunded) + log.Debugf("ProcessTransactionResponse[ChangesStateRoot]: %v", result.ChangesStateRoot) + log.Debugf("ProcessTransactionResponse[EffectiveGasPrice]: %v", result.EffectiveGasPrice) + log.Debugf("ProcessTransactionResponse[EffectivePercentage]: %v", result.EffectivePercentage) } results = append(results, result) - - log.Debugf("ProcessTransactionResponse[TxHash]: %v", result.TxHash) - log.Debugf("ProcessTransactionResponse[Nonce]: %v", result.Tx.Nonce()) - log.Debugf("ProcessTransactionResponse[StateRoot]: %v", result.StateRoot.String()) - log.Debugf("ProcessTransactionResponse[Error]: %v", result.RomError) - log.Debugf("ProcessTransactionResponse[GasUsed]: %v", result.GasUsed) - log.Debugf("ProcessTransactionResponse[GasLeft]: %v", result.GasLeft) - log.Debugf("ProcessTransactionResponse[GasRefunded]: %v", result.GasRefunded) - log.Debugf("ProcessTransactionResponse[ChangesStateRoot]: %v", result.ChangesStateRoot) - log.Debugf("ProcessTransactionResponse[EffectiveGasPrice]: %v", result.EffectiveGasPrice) - log.Debugf("ProcessTransactionResponse[EffectivePercentage]: %v", result.EffectivePercentage) } return results, nil diff --git a/state/state_test.go b/state/state_test.go index d72c82e89d..dd7b6c4de1 100644 --- a/state/state_test.go +++ b/state/state_test.go @@ -1071,7 +1071,7 @@ func TestExecutorTransfer(t *testing.T) { require.Equal(t, "21002", data.Balance) // Read Modified Addresses from converted response - converted, err := testState.TestConvertToProcessBatchResponse([]types.Transaction{*signedTx}, processBatchResponse) + converted, err := testState.TestConvertToProcessBatchResponse(processBatchResponse) require.NoError(t, err) convertedData := converted.ReadWriteAddresses[receiverAddress] require.Equal(t, uint64(21002), convertedData.Balance.Uint64()) @@ -2037,7 +2037,7 @@ func TestExecutorEstimateGas(t *testing.T) { require.NoError(t, err) assert.NotEqual(t, "", processBatchResponse.Responses[0].Error) - convertedResponse, err := testState.TestConvertToProcessBatchResponse([]types.Transaction{*signedTx0, *signedTx1}, processBatchResponse) + convertedResponse, err := testState.TestConvertToProcessBatchResponse(processBatchResponse) require.NoError(t, err) log.Debugf("%v", len(convertedResponse.Responses)) @@ -2411,7 +2411,7 @@ func TestExecutorGasEstimationMultisig(t *testing.T) { require.Equal(t, uint64(1000000000), balance.Uint64()) // Preparation to be able to estimate gas - convertedResponse, err := testState.TestConvertToProcessBatchResponse(transactions, processBatchResponse) + convertedResponse, err := testState.TestConvertToProcessBatchResponse(processBatchResponse) require.NoError(t, err) log.Debugf("%v", len(convertedResponse.Responses)) diff --git a/state/transaction.go b/state/transaction.go index ddda47d586..babed0f999 100644 --- a/state/transaction.go +++ b/state/transaction.go @@ -153,7 +153,7 @@ func (s *State) StoreTransactions(ctx context.Context, batchNumber uint64, proce // if the transaction has an intrinsic invalid tx error it means // the transaction has not changed the state, so we don't store it // and just move to the next - if executor.IsIntrinsicError(executor.RomErrorCode(processedTx.RomError)) { + if executor.IsIntrinsicError(executor.RomErrorCode(processedTx.RomError)) || errors.Is(processedTx.RomError, executor.RomErr(executor.RomError_ROM_ERROR_INVALID_RLP)) { continue } @@ -300,6 +300,8 @@ func (s *State) DebugTransaction(ctx context.Context, transactionHash common.Has return nil, err } + // Transactions are decoded only for logging purposes + // as they are not longer needed in the convertToProcessBatchResponse function txs, _, _, err := DecodeTxs(batchL2Data, forkId) if err != nil && !errors.Is(err, ErrInvalidData) { return nil, err @@ -309,7 +311,7 @@ func (s *State) DebugTransaction(ctx context.Context, transactionHash common.Has log.Debugf(tx.Hash().String()) } - convertedResponse, err := s.convertToProcessBatchResponse(txs, processBatchResponse) + convertedResponse, err := s.convertToProcessBatchResponse(processBatchResponse) if err != nil { return nil, err } @@ -874,7 +876,7 @@ func (s *State) internalProcessUnsignedTransaction(ctx context.Context, tx *type return nil, err } - response, err := s.convertToProcessBatchResponse([]types.Transaction{*tx}, processBatchResponse) + response, err := s.convertToProcessBatchResponse(processBatchResponse) if err != nil { return nil, err } diff --git a/synchronizer/synchronizer.go b/synchronizer/synchronizer.go index 987a2d93fb..630fad073a 100644 --- a/synchronizer/synchronizer.go +++ b/synchronizer/synchronizer.go @@ -16,6 +16,7 @@ import ( "github.com/0xPolygonHermez/zkevm-node/log" "github.com/0xPolygonHermez/zkevm-node/state" stateMetrics "github.com/0xPolygonHermez/zkevm-node/state/metrics" + "github.com/0xPolygonHermez/zkevm-node/state/runtime/executor" "github.com/0xPolygonHermez/zkevm-node/synchronizer/metrics" "github.com/ethereum/go-ethereum/common" ethTypes "github.com/ethereum/go-ethereum/core/types" @@ -1483,9 +1484,11 @@ func (s *ClientSynchronizer) processAndStoreTxs(trustedBatch *types.Batch, reque log.Debugf("Storing transactions %d for batch %v", len(processBatchResp.Responses), trustedBatch.Number) for _, tx := range processBatchResp.Responses { - if err = s.state.StoreTransaction(s.ctx, uint64(trustedBatch.Number), tx, trustedBatch.Coinbase, uint64(trustedBatch.Timestamp), dbTx); err != nil { - log.Errorf("failed to store transactions for batch: %v", trustedBatch.Number) - return nil, err + if !errors.Is(tx.RomError, executor.RomErr(executor.RomError_ROM_ERROR_INVALID_RLP)) { + if err = s.state.StoreTransaction(s.ctx, uint64(trustedBatch.Number), tx, trustedBatch.Coinbase, uint64(trustedBatch.Timestamp), dbTx); err != nil { + log.Errorf("failed to store transactions for batch: %v", trustedBatch.Number) + return nil, err + } } } return processBatchResp, nil