-
Notifications
You must be signed in to change notification settings - Fork 176
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Access] Use local event for AccessAPI get events endpoints #4851
Changes from 8 commits
49cdcb3
ed94b0f
3f2e8ae
9621040
5039f9e
05ea47c
ee25f6e
ce9c12a
7cb283e
f189f3b
8465305
b8983cf
b590617
1d05588
0141908
faaf82e
87efcdd
f8dc637
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -5,6 +5,7 @@ import ( | |||||||||||||||
"encoding/hex" | ||||||||||||||||
"errors" | ||||||||||||||||
"fmt" | ||||||||||||||||
"sort" | ||||||||||||||||
"time" | ||||||||||||||||
|
||||||||||||||||
"github.com/onflow/flow/protobuf/go/flow/entities" | ||||||||||||||||
|
@@ -25,6 +26,7 @@ import ( | |||||||||||||||
|
||||||||||||||||
type backendEvents struct { | ||||||||||||||||
headers storage.Headers | ||||||||||||||||
events storage.Events | ||||||||||||||||
executionReceipts storage.ExecutionReceipts | ||||||||||||||||
state protocol.State | ||||||||||||||||
connFactory connection.ConnectionFactory | ||||||||||||||||
|
@@ -43,32 +45,42 @@ func (b *backendEvents) GetEventsForHeightRange( | |||||||||||||||
) ([]flow.BlockEvents, error) { | ||||||||||||||||
|
||||||||||||||||
if endHeight < startHeight { | ||||||||||||||||
return nil, status.Error(codes.InvalidArgument, "invalid start or end height") | ||||||||||||||||
return nil, status.Error(codes.InvalidArgument, "start height must not be larger than end height") | ||||||||||||||||
} | ||||||||||||||||
|
||||||||||||||||
rangeSize := endHeight - startHeight + 1 // range is inclusive on both ends | ||||||||||||||||
if rangeSize > uint64(b.maxHeightRange) { | ||||||||||||||||
return nil, status.Errorf(codes.InvalidArgument, "requested block range (%d) exceeded maximum (%d)", rangeSize, b.maxHeightRange) | ||||||||||||||||
return nil, status.Errorf(codes.InvalidArgument, | ||||||||||||||||
"requested block range (%d) exceeded maximum (%d)", rangeSize, b.maxHeightRange) | ||||||||||||||||
} | ||||||||||||||||
|
||||||||||||||||
// get the latest sealed block header | ||||||||||||||||
head, err := b.state.Sealed().Head() | ||||||||||||||||
sealed, err := b.state.Sealed().Head() | ||||||||||||||||
if err != nil { | ||||||||||||||||
// sealed block must be in the store, so return an Internal code even if we got NotFound | ||||||||||||||||
// sealed block must be in the store, so throw an exception for any error | ||||||||||||||||
err := irrecoverable.NewExceptionf("failed to lookup sealed header: %w", err) | ||||||||||||||||
irrecoverable.Throw(ctx, err) | ||||||||||||||||
return nil, err | ||||||||||||||||
} | ||||||||||||||||
|
||||||||||||||||
// start height should not be beyond the last sealed height | ||||||||||||||||
if head.Height < startHeight { | ||||||||||||||||
if startHeight > sealed.Height { | ||||||||||||||||
return nil, status.Errorf(codes.OutOfRange, | ||||||||||||||||
"start height %d is greater than the last sealed block height %d", startHeight, head.Height) | ||||||||||||||||
"start height %d is greater than the last sealed block height %d", startHeight, sealed.Height) | ||||||||||||||||
} | ||||||||||||||||
|
||||||||||||||||
// limit max height to last sealed block in the chain | ||||||||||||||||
if head.Height < endHeight { | ||||||||||||||||
endHeight = head.Height | ||||||||||||||||
// | ||||||||||||||||
// Note: this causes unintuitive behavior for clients making requests through a proxy that | ||||||||||||||||
// fronts multiple nodes. With that setup, clients may receive responses for a smaller range | ||||||||||||||||
// than requested because the node serving the request has a slightly delayed view of the chain. | ||||||||||||||||
// | ||||||||||||||||
// An alternative option is to return an error here, but that's likely to cause more pain for | ||||||||||||||||
// these clients since the requests would intermittently fail. it's recommended instead to | ||||||||||||||||
// check the block height of the last message in the response. this will be the last block | ||||||||||||||||
// height searched, and can be used to determine the start height for the next range. | ||||||||||||||||
if endHeight > sealed.Height { | ||||||||||||||||
endHeight = sealed.Height | ||||||||||||||||
} | ||||||||||||||||
|
||||||||||||||||
// find the block headers for all the blocks between min and max height (inclusive) | ||||||||||||||||
|
@@ -83,7 +95,7 @@ func (b *backendEvents) GetEventsForHeightRange( | |||||||||||||||
blockHeaders = append(blockHeaders, header) | ||||||||||||||||
} | ||||||||||||||||
|
||||||||||||||||
return b.getBlockEventsFromExecutionNode(ctx, blockHeaders, eventType, requiredEventEncodingVersion) | ||||||||||||||||
return b.getBlockEvents(ctx, blockHeaders, eventType, requiredEventEncodingVersion) | ||||||||||||||||
} | ||||||||||||||||
|
||||||||||||||||
// GetEventsForBlockIDs retrieves events for all the specified block IDs that have the given type | ||||||||||||||||
|
@@ -109,10 +121,103 @@ func (b *backendEvents) GetEventsForBlockIDs( | |||||||||||||||
blockHeaders = append(blockHeaders, header) | ||||||||||||||||
} | ||||||||||||||||
|
||||||||||||||||
// forward the request to the execution node | ||||||||||||||||
return b.getBlockEventsFromExecutionNode(ctx, blockHeaders, eventType, requiredEventEncodingVersion) | ||||||||||||||||
return b.getBlockEvents(ctx, blockHeaders, eventType, requiredEventEncodingVersion) | ||||||||||||||||
} | ||||||||||||||||
|
||||||||||||||||
// getBlockEvents retrieves events for all the specified blocks that have the given type | ||||||||||||||||
// It gets all events available on storage, and requests the rest to an execution node. | ||||||||||||||||
func (b *backendEvents) getBlockEvents( | ||||||||||||||||
ctx context.Context, | ||||||||||||||||
blockHeaders []*flow.Header, | ||||||||||||||||
eventType string, | ||||||||||||||||
requiredEventEncodingVersion entities.EventEncodingVersion, | ||||||||||||||||
) ([]flow.BlockEvents, error) { | ||||||||||||||||
localResponse, missingHeaders, err := b.getBlockEventsFromStorage(ctx, blockHeaders, eventType, requiredEventEncodingVersion) | ||||||||||||||||
if err != nil { | ||||||||||||||||
return nil, err | ||||||||||||||||
} | ||||||||||||||||
|
||||||||||||||||
if len(missingHeaders) == 0 { | ||||||||||||||||
return localResponse, nil | ||||||||||||||||
} | ||||||||||||||||
|
||||||||||||||||
enResponse, err := b.getBlockEventsFromExecutionNode(ctx, missingHeaders, eventType, requiredEventEncodingVersion) | ||||||||||||||||
if err != nil { | ||||||||||||||||
return nil, err | ||||||||||||||||
} | ||||||||||||||||
|
||||||||||||||||
// sort ascending by block height | ||||||||||||||||
// this is needed because some blocks may be retrieved from storage and others from execution nodes. | ||||||||||||||||
// most likely, the earlier blocks will all be found in local storage, but that's not guaranteed, | ||||||||||||||||
// especially for nodes started after a spork, or once pruning is enabled. | ||||||||||||||||
// Note: this may not match the order of the original request for clients using GetEventsForBlockIDs | ||||||||||||||||
// that provide out of order block IDs | ||||||||||||||||
response := append(localResponse, enResponse...) | ||||||||||||||||
sort.Slice(response, func(i, j int) bool { | ||||||||||||||||
return response[i].BlockHeight < response[j].BlockHeight | ||||||||||||||||
}) | ||||||||||||||||
|
||||||||||||||||
return response, nil | ||||||||||||||||
} | ||||||||||||||||
|
||||||||||||||||
// getBlockEventsFromStorage retrieves events for all the specified blocks that have the given type | ||||||||||||||||
// from the local storage | ||||||||||||||||
func (b *backendEvents) getBlockEventsFromStorage( | ||||||||||||||||
ctx context.Context, | ||||||||||||||||
blockHeaders []*flow.Header, | ||||||||||||||||
eventType string, | ||||||||||||||||
requiredEventEncodingVersion entities.EventEncodingVersion, | ||||||||||||||||
) ([]flow.BlockEvents, []*flow.Header, error) { | ||||||||||||||||
target := flow.EventType(eventType) | ||||||||||||||||
|
||||||||||||||||
missing := make([]*flow.Header, 0) | ||||||||||||||||
resp := make([]flow.BlockEvents, 0) | ||||||||||||||||
for _, header := range blockHeaders { | ||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we need to restrict the number of blocks to query events from? Or the caller has checked that? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the caller checks that here flow-go/engine/access/rpc/backend/backend_events.go Lines 52 to 55 in ce9c12a
and here flow-go/engine/access/rpc/backend/backend_events.go Lines 109 to 111 in ce9c12a
|
||||||||||||||||
if ctx.Err() != nil { | ||||||||||||||||
return nil, nil, rpc.ConvertError(ctx.Err(), "failed to get events from storage", codes.Canceled) | ||||||||||||||||
} | ||||||||||||||||
|
||||||||||||||||
events, err := b.events.ByBlockID(header.ID()) | ||||||||||||||||
if err != nil { | ||||||||||||||||
// Note: if there are no events for a block, an empty slice is returned | ||||||||||||||||
if errors.Is(err, storage.ErrNotFound) { | ||||||||||||||||
peterargue marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||||||
missing = append(missing, header) | ||||||||||||||||
continue | ||||||||||||||||
} | ||||||||||||||||
return nil, nil, rpc.ConvertError(err, "failed to get events from storage", codes.Internal) | ||||||||||||||||
} | ||||||||||||||||
|
||||||||||||||||
filteredEvents := make([]flow.Event, 0) | ||||||||||||||||
for _, e := range events { | ||||||||||||||||
if e.Type != target { | ||||||||||||||||
continue | ||||||||||||||||
} | ||||||||||||||||
|
||||||||||||||||
// events are encoded in CCF format in storage. convert to JSON-CDC if requested | ||||||||||||||||
if requiredEventEncodingVersion == entities.EventEncodingVersion_JSON_CDC_V0 { | ||||||||||||||||
payload, err := convert.CcfPayloadToJsonPayload(e.Payload) | ||||||||||||||||
if err != nil { | ||||||||||||||||
return nil, nil, rpc.ConvertError(err, "failed to convert event payload", codes.Internal) | ||||||||||||||||
} | ||||||||||||||||
e.Payload = payload | ||||||||||||||||
} | ||||||||||||||||
|
||||||||||||||||
filteredEvents = append(filteredEvents, e) | ||||||||||||||||
} | ||||||||||||||||
|
||||||||||||||||
resp = append(resp, flow.BlockEvents{ | ||||||||||||||||
BlockID: header.ID(), | ||||||||||||||||
BlockHeight: header.Height, | ||||||||||||||||
BlockTimestamp: header.Timestamp, | ||||||||||||||||
Events: filteredEvents, | ||||||||||||||||
}) | ||||||||||||||||
} | ||||||||||||||||
|
||||||||||||||||
return resp, missing, nil | ||||||||||||||||
} | ||||||||||||||||
|
||||||||||||||||
// getBlockEventsFromExecutionNode retrieves events for all the specified blocks that have the given type | ||||||||||||||||
// from an execution node | ||||||||||||||||
func (b *backendEvents) getBlockEventsFromExecutionNode( | ||||||||||||||||
ctx context.Context, | ||||||||||||||||
blockHeaders []*flow.Header, | ||||||||||||||||
|
@@ -223,7 +328,8 @@ func verifyAndConvertToAccessEvents( | |||||||||||||||
// error aggregating all failures is returned. | ||||||||||||||||
func (b *backendEvents) getEventsFromAnyExeNode(ctx context.Context, | ||||||||||||||||
execNodes flow.IdentityList, | ||||||||||||||||
req *execproto.GetEventsForBlockIDsRequest) (*execproto.GetEventsForBlockIDsResponse, *flow.Identity, error) { | ||||||||||||||||
req *execproto.GetEventsForBlockIDsRequest, | ||||||||||||||||
) (*execproto.GetEventsForBlockIDsResponse, *flow.Identity, error) { | ||||||||||||||||
var resp *execproto.GetEventsForBlockIDsResponse | ||||||||||||||||
var execNode *flow.Identity | ||||||||||||||||
errToReturn := b.nodeCommunicator.CallAvailableNode( | ||||||||||||||||
|
@@ -259,16 +365,13 @@ func (b *backendEvents) getEventsFromAnyExeNode(ctx context.Context, | |||||||||||||||
|
||||||||||||||||
func (b *backendEvents) tryGetEvents(ctx context.Context, | ||||||||||||||||
execNode *flow.Identity, | ||||||||||||||||
req *execproto.GetEventsForBlockIDsRequest) (*execproto.GetEventsForBlockIDsResponse, error) { | ||||||||||||||||
req *execproto.GetEventsForBlockIDsRequest, | ||||||||||||||||
) (*execproto.GetEventsForBlockIDsResponse, error) { | ||||||||||||||||
execRPCClient, closer, err := b.connFactory.GetExecutionAPIClient(execNode.Address) | ||||||||||||||||
if err != nil { | ||||||||||||||||
return nil, err | ||||||||||||||||
} | ||||||||||||||||
defer closer.Close() | ||||||||||||||||
|
||||||||||||||||
resp, err := execRPCClient.GetEventsForBlockIDs(ctx, req) | ||||||||||||||||
if err != nil { | ||||||||||||||||
return nil, err | ||||||||||||||||
} | ||||||||||||||||
return resp, nil | ||||||||||||||||
return execRPCClient.GetEventsForBlockIDs(ctx, req) | ||||||||||||||||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to validate the event type?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the
target
is only used for effectively a string comparison, so it's pretty benign, but I can add it.