Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Access] Use local event for AccessAPI get events endpoints #4851

Merged
merged 18 commits into from
Dec 22, 2023
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions cmd/access/node_builder/access_node_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -670,10 +670,6 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess
indexedBlockHeight = bstorage.NewConsumerProgress(builder.DB, module.ConsumeProgressExecutionDataIndexerBlockHeight)
return nil
}).
Module("events storage", func(node *cmd.NodeConfig) error {
builder.Storage.Events = bstorage.NewEvents(node.Metrics.Cache, node.DB)
return nil
}).
Module("transaction results storage", func(node *cmd.NodeConfig) error {
builder.Storage.LightTransactionResults = bstorage.NewLightTransactionResults(node.Metrics.Cache, node.DB, bstorage.DefaultCacheSize)
return nil
Expand Down Expand Up @@ -826,7 +822,8 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess
broadcaster,
builder.executionDataConfig.InitialBlockHeight,
highestAvailableHeight,
builder.RegistersAsyncStore)
builder.RegistersAsyncStore,
)
if err != nil {
return nil, fmt.Errorf("could not create state stream backend: %w", err)
}
Expand Down Expand Up @@ -1266,6 +1263,10 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
builder.RegistersAsyncStore = execution.NewRegistersAsyncStore()
return nil
}).
Module("events storage", func(node *cmd.NodeConfig) error {
builder.Storage.Events = bstorage.NewEvents(node.Metrics.Cache, node.DB)
return nil
}).
Component("RPC engine", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) {
config := builder.rpcConf
backendConfig := config.BackendConfig
Expand Down Expand Up @@ -1309,6 +1310,7 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
HistoricalAccessNodes: builder.HistoricalAccessRPCs,
Blocks: node.Storage.Blocks,
Headers: node.Storage.Headers,
Events: node.Storage.Events,
Collections: node.Storage.Collections,
Transactions: node.Storage.Transactions,
ExecutionReceipts: node.Storage.Receipts,
Expand Down
2 changes: 2 additions & 0 deletions engine/access/integration_unsecure_grpc_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ type SameGRPCPortTestSuite struct {
// storage
blocks *storagemock.Blocks
headers *storagemock.Headers
events *storagemock.Events
collections *storagemock.Collections
transactions *storagemock.Transactions
receipts *storagemock.ExecutionReceipts
Expand Down Expand Up @@ -101,6 +102,7 @@ func (suite *SameGRPCPortTestSuite) SetupTest() {
suite.snapshot.On("Epochs").Return(suite.epochQuery).Maybe()
suite.blocks = new(storagemock.Blocks)
suite.headers = new(storagemock.Headers)
suite.events = new(storagemock.Events)
suite.transactions = new(storagemock.Transactions)
suite.collections = new(storagemock.Collections)
suite.receipts = new(storagemock.ExecutionReceipts)
Expand Down
10 changes: 6 additions & 4 deletions engine/access/rpc/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ type Params struct {
HistoricalAccessNodes []accessproto.AccessAPIClient
Blocks storage.Blocks
Headers storage.Headers
Events storage.Events
Collections storage.Collections
Transactions storage.Transactions
ExecutionReceipts storage.ExecutionReceipts
Expand Down Expand Up @@ -146,18 +147,19 @@ func New(params Params) (*Backend, error) {
state: params.State,
// create the sub-backends
backendScripts: backendScripts{
log: params.Log,
headers: params.Headers,
executionReceipts: params.ExecutionReceipts,
connFactory: params.ConnFactory,
state: params.State,
log: params.Log,
metrics: params.AccessMetrics,
loggedScripts: loggedScripts,
nodeCommunicator: params.Communicator,
scriptExecutor: params.ScriptExecutor,
scriptExecMode: params.ScriptExecutionMode,
},
backendTransactions: backendTransactions{
log: params.Log,
staticCollectionRPC: params.CollectionRPC,
state: params.State,
chainID: params.ChainID,
Expand All @@ -171,17 +173,17 @@ func New(params Params) (*Backend, error) {
retry: retry,
connFactory: params.ConnFactory,
previousAccessNodes: params.HistoricalAccessNodes,
log: params.Log,
nodeCommunicator: params.Communicator,
txResultCache: txResCache,
txErrorMessagesCache: txErrorMessagesCache,
},
backendEvents: backendEvents{
log: params.Log,
state: params.State,
headers: params.Headers,
events: params.Events,
executionReceipts: params.ExecutionReceipts,
connFactory: params.ConnFactory,
log: params.Log,
maxHeightRange: params.MaxHeightRange,
nodeCommunicator: params.Communicator,
},
Expand All @@ -194,11 +196,11 @@ func New(params Params) (*Backend, error) {
state: params.State,
},
backendAccounts: backendAccounts{
log: params.Log,
state: params.State,
headers: params.Headers,
executionReceipts: params.ExecutionReceipts,
connFactory: params.ConnFactory,
log: params.Log,
nodeCommunicator: params.Communicator,
scriptExecutor: params.ScriptExecutor,
scriptExecMode: params.ScriptExecutionMode,
Expand Down
139 changes: 121 additions & 18 deletions engine/access/rpc/backend/backend_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/hex"
"errors"
"fmt"
"sort"
"time"

"github.com/onflow/flow/protobuf/go/flow/entities"
Expand All @@ -25,6 +26,7 @@ import (

type backendEvents struct {
headers storage.Headers
events storage.Events
executionReceipts storage.ExecutionReceipts
state protocol.State
connFactory connection.ConnectionFactory
Expand All @@ -43,32 +45,42 @@ func (b *backendEvents) GetEventsForHeightRange(
) ([]flow.BlockEvents, error) {

if endHeight < startHeight {
return nil, status.Error(codes.InvalidArgument, "invalid start or end height")
return nil, status.Error(codes.InvalidArgument, "start height must not be larger than end height")
}

rangeSize := endHeight - startHeight + 1 // range is inclusive on both ends
if rangeSize > uint64(b.maxHeightRange) {
return nil, status.Errorf(codes.InvalidArgument, "requested block range (%d) exceeded maximum (%d)", rangeSize, b.maxHeightRange)
return nil, status.Errorf(codes.InvalidArgument,
"requested block range (%d) exceeded maximum (%d)", rangeSize, b.maxHeightRange)
}

// get the latest sealed block header
head, err := b.state.Sealed().Head()
sealed, err := b.state.Sealed().Head()
if err != nil {
// sealed block must be in the store, so return an Internal code even if we got NotFound
// sealed block must be in the store, so throw an exception for any error
err := irrecoverable.NewExceptionf("failed to lookup sealed header: %w", err)
irrecoverable.Throw(ctx, err)
return nil, err
}

// start height should not be beyond the last sealed height
if head.Height < startHeight {
if startHeight > sealed.Height {
return nil, status.Errorf(codes.OutOfRange,
"start height %d is greater than the last sealed block height %d", startHeight, head.Height)
"start height %d is greater than the last sealed block height %d", startHeight, sealed.Height)
}

// limit max height to last sealed block in the chain
if head.Height < endHeight {
endHeight = head.Height
//
// Note: this causes unintuitive behavior for clients making requests through a proxy that
// fronts multiple nodes. With that setup, clients may receive responses for a smaller range
// than requested because the node serving the request has a slightly delayed view of the chain.
//
// An alternative option is to return an error here, but that's likely to cause more pain for
// these clients since the requests would intermittently fail. it's recommended instead to
// check the block height of the last message in the response. this will be the last block
// height searched, and can be used to determine the start height for the next range.
if endHeight > sealed.Height {
endHeight = sealed.Height
}

// find the block headers for all the blocks between min and max height (inclusive)
Expand All @@ -83,7 +95,7 @@ func (b *backendEvents) GetEventsForHeightRange(
blockHeaders = append(blockHeaders, header)
}

return b.getBlockEventsFromExecutionNode(ctx, blockHeaders, eventType, requiredEventEncodingVersion)
return b.getBlockEvents(ctx, blockHeaders, eventType, requiredEventEncodingVersion)
}

// GetEventsForBlockIDs retrieves events for all the specified block IDs that have the given type
Expand All @@ -109,10 +121,103 @@ func (b *backendEvents) GetEventsForBlockIDs(
blockHeaders = append(blockHeaders, header)
}

// forward the request to the execution node
return b.getBlockEventsFromExecutionNode(ctx, blockHeaders, eventType, requiredEventEncodingVersion)
return b.getBlockEvents(ctx, blockHeaders, eventType, requiredEventEncodingVersion)
}

// getBlockEvents retrieves events for all the specified blocks that have the given type
// It gets all events available on storage, and requests the rest to an execution node.
func (b *backendEvents) getBlockEvents(
ctx context.Context,
blockHeaders []*flow.Header,
eventType string,
requiredEventEncodingVersion entities.EventEncodingVersion,
) ([]flow.BlockEvents, error) {
localResponse, missingHeaders, err := b.getBlockEventsFromStorage(ctx, blockHeaders, eventType, requiredEventEncodingVersion)
if err != nil {
return nil, err
}

if len(missingHeaders) == 0 {
return localResponse, nil
}

enResponse, err := b.getBlockEventsFromExecutionNode(ctx, missingHeaders, eventType, requiredEventEncodingVersion)
if err != nil {
return nil, err
}

// sort ascending by block height
// this is needed because some blocks may be retrieved from storage and others from execution nodes.
// most likely, the earlier blocks will all be found in local storage, but that's not guaranteed,
// especially for nodes started after a spork, or once pruning is enabled.
// Note: this may not match the order of the original request for clients using GetEventsForBlockIDs
// that provide out of order block IDs
response := append(localResponse, enResponse...)
sort.Slice(response, func(i, j int) bool {
return response[i].BlockHeight < response[j].BlockHeight
})

return response, nil
}

// getBlockEventsFromStorage retrieves events for all the specified blocks that have the given type
// from the local storage
func (b *backendEvents) getBlockEventsFromStorage(
ctx context.Context,
blockHeaders []*flow.Header,
eventType string,
requiredEventEncodingVersion entities.EventEncodingVersion,
) ([]flow.BlockEvents, []*flow.Header, error) {
target := flow.EventType(eventType)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to validate the event type?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the target is only used for effectively a string comparison, so it's pretty benign, but I can add it.


missing := make([]*flow.Header, 0)
resp := make([]flow.BlockEvents, 0)
for _, header := range blockHeaders {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to restrict the number of blocks to query events from? Or the caller has checked that?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the caller checks that

here

if rangeSize > uint64(b.maxHeightRange) {
return nil, status.Errorf(codes.InvalidArgument,
"requested block range (%d) exceeded maximum (%d)", rangeSize, b.maxHeightRange)
}

and here

if uint(len(blockIDs)) > b.maxHeightRange {
return nil, status.Errorf(codes.InvalidArgument, "requested block range (%d) exceeded maximum (%d)", len(blockIDs), b.maxHeightRange)
}

if ctx.Err() != nil {
return nil, nil, rpc.ConvertError(ctx.Err(), "failed to get events from storage", codes.Canceled)
}

events, err := b.events.ByBlockID(header.ID())
if err != nil {
// Note: if there are no events for a block, an empty slice is returned
if errors.Is(err, storage.ErrNotFound) {
peterargue marked this conversation as resolved.
Show resolved Hide resolved
missing = append(missing, header)
continue
}
return nil, nil, rpc.ConvertError(err, "failed to get events from storage", codes.Internal)
}

filteredEvents := make([]flow.Event, 0)
for _, e := range events {
if e.Type != target {
continue
}

// events are encoded in CCF format in storage. convert to JSON-CDC if requested
if requiredEventEncodingVersion == entities.EventEncodingVersion_JSON_CDC_V0 {
payload, err := convert.CcfPayloadToJsonPayload(e.Payload)
if err != nil {
return nil, nil, rpc.ConvertError(err, "failed to convert event payload", codes.Internal)
}
e.Payload = payload
}

filteredEvents = append(filteredEvents, e)
}

resp = append(resp, flow.BlockEvents{
BlockID: header.ID(),
BlockHeight: header.Height,
BlockTimestamp: header.Timestamp,
Events: filteredEvents,
})
}

return resp, missing, nil
}

// getBlockEventsFromExecutionNode retrieves events for all the specified blocks that have the given type
// from an execution node
func (b *backendEvents) getBlockEventsFromExecutionNode(
ctx context.Context,
blockHeaders []*flow.Header,
Expand Down Expand Up @@ -223,7 +328,8 @@ func verifyAndConvertToAccessEvents(
// error aggregating all failures is returned.
func (b *backendEvents) getEventsFromAnyExeNode(ctx context.Context,
execNodes flow.IdentityList,
req *execproto.GetEventsForBlockIDsRequest) (*execproto.GetEventsForBlockIDsResponse, *flow.Identity, error) {
req *execproto.GetEventsForBlockIDsRequest,
) (*execproto.GetEventsForBlockIDsResponse, *flow.Identity, error) {
var resp *execproto.GetEventsForBlockIDsResponse
var execNode *flow.Identity
errToReturn := b.nodeCommunicator.CallAvailableNode(
Expand Down Expand Up @@ -259,16 +365,13 @@ func (b *backendEvents) getEventsFromAnyExeNode(ctx context.Context,

func (b *backendEvents) tryGetEvents(ctx context.Context,
execNode *flow.Identity,
req *execproto.GetEventsForBlockIDsRequest) (*execproto.GetEventsForBlockIDsResponse, error) {
req *execproto.GetEventsForBlockIDsRequest,
) (*execproto.GetEventsForBlockIDsResponse, error) {
execRPCClient, closer, err := b.connFactory.GetExecutionAPIClient(execNode.Address)
if err != nil {
return nil, err
}
defer closer.Close()

resp, err := execRPCClient.GetEventsForBlockIDs(ctx, req)
if err != nil {
return nil, err
}
return resp, nil
return execRPCClient.GetEventsForBlockIDs(ctx, req)
}
Loading
Loading