Skip to content

Commit

Permalink
Merge pull request #4488 from onflow/petera/explicitly-check-header-e…
Browse files Browse the repository at this point in the history
…xists-en-rpc

[Execution] Check if block exists locally in RPC endpoints
  • Loading branch information
peterargue authored Sep 1, 2023
2 parents 310dac3 + 5c6bc1f commit d9ef5ca
Show file tree
Hide file tree
Showing 7 changed files with 145 additions and 138 deletions.
4 changes: 3 additions & 1 deletion engine/access/rpc/backend/backend_accounts.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,11 +114,11 @@ func (b *backendAccounts) getAccountFromAnyExeNode(ctx context.Context, execNode
execNodes,
func(node *flow.Identity) error {
var err error
// TODO: use the GRPC Client interceptor
start := time.Now()

resp, err = b.tryGetAccount(ctx, node, req)
duration := time.Since(start)

if err == nil {
// return if any execution node replied successfully
b.log.Debug().
Expand All @@ -129,13 +129,15 @@ func (b *backendAccounts) getAccountFromAnyExeNode(ctx context.Context, execNode
Msg("Successfully got account info")
return nil
}

b.log.Error().
Str("execution_node", node.String()).
Hex("block_id", req.GetBlockId()).
Hex("address", req.GetAddress()).
Int64("rtt_ms", duration.Milliseconds()).
Err(err).
Msg("failed to execute GetAccount")

return err
},
nil,
Expand Down
2 changes: 1 addition & 1 deletion engine/access/rpc/backend/backend_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ func verifyAndConvertToAccessEvents(

events, err := convert.MessagesToEventsFromVersion(result.GetEvents(), version)
if err != nil {
return nil, fmt.Errorf("failed to unmarshall events in event %d with encoding version %s: %w",
return nil, fmt.Errorf("failed to unmarshal events in event %d with encoding version %s: %w",
i, version.String(), err)
}

Expand Down
4 changes: 2 additions & 2 deletions engine/access/rpc/backend/backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -856,7 +856,7 @@ func (suite *Suite) TestTransactionStatusTransition() {
suite.execClient.
On("GetTransactionResult", ctx, exeEventReq).
Return(exeEventResp, status.Errorf(codes.NotFound, "not found")).
Once()
Times(len(fixedENIDs)) // should call each EN once

// first call - when block under test is greater height than the sealed head, but execution node does not know about Tx
result, err := backend.GetTransactionResult(ctx, txID, flow.ZeroID, flow.ZeroID)
Expand Down Expand Up @@ -1114,7 +1114,7 @@ func (suite *Suite) TestTransactionPendingToFinalizedStatusTransition() {
suite.execClient.
On("GetTransactionResult", ctx, exeEventReq).
Return(exeEventResp, status.Errorf(codes.NotFound, "not found")).
Once()
Times(len(enIDs)) // should call each EN once

// create a mock connection factory
connFactory := suite.setupConnectionFactory()
Expand Down
12 changes: 3 additions & 9 deletions engine/access/rpc/backend/backend_transactions.go
Original file line number Diff line number Diff line change
Expand Up @@ -794,9 +794,7 @@ func (b *backendTransactions) getTransactionResultFromAnyExeNode(
}
return err
},
func(_ *flow.Identity, err error) bool {
return status.Code(err) == codes.NotFound
},
nil,
)

return resp, errToReturn
Expand Down Expand Up @@ -855,9 +853,7 @@ func (b *backendTransactions) getTransactionResultsByBlockIDFromAnyExeNode(
}
return err
},
func(_ *flow.Identity, err error) bool {
return status.Code(err) == codes.NotFound
},
nil,
)

return resp, errToReturn
Expand Down Expand Up @@ -914,9 +910,7 @@ func (b *backendTransactions) getTransactionResultByIndexFromAnyExeNode(
}
return err
},
func(_ *flow.Identity, err error) bool {
return status.Code(err) == codes.NotFound
},
nil,
)

return resp, errToReturn
Expand Down
5 changes: 4 additions & 1 deletion engine/access/rpc/backend/retry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,10 @@ func (suite *Suite) TestSuccessfulTransactionsDontRetry() {
suite.colClient.On("SendTransaction", mock.Anything, mock.Anything).Return(&access.SendTransactionResponse{}, nil)

// return not found to return finalized status
suite.execClient.On("GetTransactionResult", ctx, &exeEventReq).Return(&exeEventResp, status.Errorf(codes.NotFound, "not found")).Once()
suite.execClient.On("GetTransactionResult", ctx, &exeEventReq).
Return(&exeEventResp, status.Errorf(codes.NotFound, "not found")).
Times(len(enIDs)) // should call each EN once

// first call - when block under test is greater height than the sealed head, but execution node does not know about Tx
result, err := backend.GetTransactionResult(ctx, txID, flow.ZeroID, flow.ZeroID)
suite.checkResponse(result, err)
Expand Down
57 changes: 50 additions & 7 deletions engine/execution/rpc/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import (
"github.com/onflow/flow-go/storage"
)

const DefaultMaxBlockRange = 300

// Config defines the configurable options for the gRPC server.
type Config struct {
ListenAddr string
Expand Down Expand Up @@ -98,6 +100,7 @@ func New(
transactionResults: txResults,
commits: commits,
log: log,
maxBlockRange: DefaultMaxBlockRange,
},
server: server,
config: config,
Expand Down Expand Up @@ -157,12 +160,16 @@ type handler struct {
transactionResults storage.TransactionResults
log zerolog.Logger
commits storage.Commits
maxBlockRange int
}

var _ execution.ExecutionAPIServer = &handler{}

// Ping responds to requests when the server is up.
func (h *handler) Ping(_ context.Context, _ *execution.PingRequest) (*execution.PingResponse, error) {
func (h *handler) Ping(
_ context.Context,
_ *execution.PingRequest,
) (*execution.PingResponse, error) {
return &execution.PingResponse{}, nil
}

Expand All @@ -176,6 +183,14 @@ func (h *handler) ExecuteScriptAtBlockID(
return nil, err
}

// return a more user friendly error if block has not been executed
if _, err = h.commits.ByBlockID(blockID); err != nil {
if errors.Is(err, storage.ErrNotFound) {
return nil, status.Errorf(codes.NotFound, "block %s has not been executed by node or was pruned", blockID)
}
return nil, status.Errorf(codes.Internal, "state commitment for block ID %s could not be retrieved", blockID)
}

value, err := h.engine.ExecuteScriptAtBlockID(ctx, req.GetScript(), req.GetArguments(), blockID)
if err != nil {
// return code 3 as this passes the litmus test in our context
Expand Down Expand Up @@ -214,8 +229,10 @@ func (h *handler) GetRegisterAtBlockID(
return res, nil
}

func (h *handler) GetEventsForBlockIDs(_ context.Context,
req *execution.GetEventsForBlockIDsRequest) (*execution.GetEventsForBlockIDsResponse, error) {
func (h *handler) GetEventsForBlockIDs(
_ context.Context,
req *execution.GetEventsForBlockIDsRequest,
) (*execution.GetEventsForBlockIDsResponse, error) {

// validate request
blockIDs := req.GetBlockIds()
Expand All @@ -229,14 +246,18 @@ func (h *handler) GetEventsForBlockIDs(_ context.Context,
return nil, err
}

if len(blockIDs) > h.maxBlockRange {
return nil, status.Errorf(codes.InvalidArgument, "too many block IDs requested: %d > %d", len(blockIDs), h.maxBlockRange)
}

results := make([]*execution.GetEventsForBlockIDsResponse_Result, len(blockIDs))

// collect all the events and create a EventsResponse_Result for each block
for i, bID := range flowBlockIDs {
// Check if block has been executed
if _, err := h.commits.ByBlockID(bID); err != nil {
if errors.Is(err, storage.ErrNotFound) {
return nil, status.Errorf(codes.NotFound, "state commitment for block ID %s does not exist", bID)
return nil, status.Errorf(codes.NotFound, "block %s has not been executed by node or was pruned", bID)
}
return nil, status.Errorf(codes.Internal, "state commitment for block ID %s could not be retrieved", bID)
}
Expand Down Expand Up @@ -394,6 +415,15 @@ func (h *handler) GetTransactionResultsByBlockID(
return nil, status.Errorf(codes.InvalidArgument, "invalid blockID: %v", err)
}

// must verify block was locally executed first since transactionResults.ByBlockID will return
// an empty slice if block does not exist
if _, err = h.commits.ByBlockID(blockID); err != nil {
if errors.Is(err, storage.ErrNotFound) {
return nil, status.Errorf(codes.NotFound, "block %s has not been executed by node or was pruned", blockID)
}
return nil, status.Errorf(codes.Internal, "state commitment for block ID %s could not be retrieved", blockID)
}

// Get all tx results
txResults, err := h.transactionResults.ByBlockID(blockID)
if err != nil {
Expand Down Expand Up @@ -461,8 +491,10 @@ func (h *handler) GetTransactionResultsByBlockID(
}

// eventResult creates EventsResponse_Result from flow.Event for the given blockID
func (h *handler) eventResult(blockID flow.Identifier,
flowEvents []flow.Event) (*execution.GetEventsForBlockIDsResponse_Result, error) {
func (h *handler) eventResult(
blockID flow.Identifier,
flowEvents []flow.Event,
) (*execution.GetEventsForBlockIDsResponse_Result, error) {

// convert events to event message
events := convert.EventsToMessages(flowEvents)
Expand Down Expand Up @@ -496,6 +528,14 @@ func (h *handler) GetAccountAtBlockID(
return nil, status.Errorf(codes.InvalidArgument, "invalid address: %v", err)
}

// return a more user friendly error if block has not been executed
if _, err = h.commits.ByBlockID(blockFlowID); err != nil {
if errors.Is(err, storage.ErrNotFound) {
return nil, status.Errorf(codes.NotFound, "block %s has not been executed by node or was pruned", blockFlowID)
}
return nil, status.Errorf(codes.Internal, "state commitment for block ID %s could not be retrieved", blockFlowID)
}

value, err := h.engine.GetAccount(ctx, flowAddress, blockFlowID)
if errors.Is(err, storage.ErrNotFound) {
return nil, status.Errorf(codes.NotFound, "account with address %s not found", flowAddress)
Expand Down Expand Up @@ -540,7 +580,10 @@ func (h *handler) GetLatestBlockHeader(
header, err = h.state.Final().Head()
}
if err != nil {
return nil, status.Errorf(codes.NotFound, "not found: %v", err)
// this header MUST exist in the db, otherwise the node likely has inconsistent state.
// Don't crash as a result of an external API request, but other components will likely panic.
h.log.Err(err).Msg("failed to get latest block header. potentially inconsistent protocol state.")
return nil, status.Errorf(codes.Internal, "unable to get latest header: %v", err)
}

return h.blockHeaderResponse(header)
Expand Down
Loading

0 comments on commit d9ef5ca

Please sign in to comment.