Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Access] Use local event for AccessAPI get events endpoints #4851

Merged
merged 18 commits into from
Dec 22, 2023
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 24 additions & 7 deletions cmd/access/node_builder/access_node_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,8 @@ func DefaultAccessNodeConfig() *AccessNodeConfig {
MaxFailures: 5,
MaxRequests: 1,
},
ScriptExecutionMode: backend.ScriptExecutionModeExecutionNodesOnly.String(), // default to ENs only for now
ScriptExecutionMode: backend.IndexQueryModeExecutionNodesOnly.String(), // default to ENs only for now
EventQueryMode: backend.IndexQueryModeExecutionNodesOnly.String(), // default to ENs only for now
},
RestConfig: rest.Config{
ListenAddress: "",
Expand Down Expand Up @@ -669,10 +670,6 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess
indexedBlockHeight = bstorage.NewConsumerProgress(builder.DB, module.ConsumeProgressExecutionDataIndexerBlockHeight)
return nil
}).
Module("events storage", func(node *cmd.NodeConfig) error {
builder.Storage.Events = bstorage.NewEvents(node.Metrics.Cache, node.DB)
return nil
}).
Module("transaction results storage", func(node *cmd.NodeConfig) error {
builder.Storage.LightTransactionResults = bstorage.NewLightTransactionResults(node.Metrics.Cache, node.DB, bstorage.DefaultCacheSize)
return nil
Expand Down Expand Up @@ -825,7 +822,8 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess
broadcaster,
builder.executionDataConfig.InitialBlockHeight,
highestAvailableHeight,
builder.RegistersAsyncStore)
builder.RegistersAsyncStore,
)
if err != nil {
return nil, fmt.Errorf("could not create state stream backend: %w", err)
}
Expand Down Expand Up @@ -1063,6 +1061,11 @@ func (builder *FlowAccessNodeBuilder) extraFlags() {
flags.StringVar(&builder.registersDBPath, "execution-state-dir", defaultConfig.registersDBPath, "directory to use for execution-state database")
flags.StringVar(&builder.checkpointFile, "execution-state-checkpoint", defaultConfig.checkpointFile, "execution-state checkpoint file")

flags.StringVar(&builder.rpcConf.BackendConfig.EventQueryMode,
"event-query-mode",
defaultConfig.rpcConf.BackendConfig.EventQueryMode,
"mode to use when querying events. one of (local-only, execution-nodes-only, failover)")
peterargue marked this conversation as resolved.
Show resolved Hide resolved

// Script Execution
flags.StringVar(&builder.rpcConf.BackendConfig.ScriptExecutionMode,
"script-execution-mode",
Expand Down Expand Up @@ -1402,6 +1405,10 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
builder.RegistersAsyncStore = execution.NewRegistersAsyncStore()
return nil
}).
Module("events storage", func(node *cmd.NodeConfig) error {
builder.Storage.Events = bstorage.NewEvents(node.Metrics.Cache, node.DB)
return nil
}).
Component("RPC engine", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) {
config := builder.rpcConf
backendConfig := config.BackendConfig
Expand Down Expand Up @@ -1434,17 +1441,26 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
),
}

scriptExecMode, err := backend.ParseScriptExecutionMode(config.BackendConfig.ScriptExecutionMode)
scriptExecMode, err := backend.ParseIndexQueryMode(config.BackendConfig.ScriptExecutionMode)
if err != nil {
return nil, fmt.Errorf("could not parse script execution mode: %w", err)
}

eventQueryMode, err := backend.ParseIndexQueryMode(config.BackendConfig.EventQueryMode)
if err != nil {
return nil, fmt.Errorf("could not parse script execution mode: %w", err)
}
if eventQueryMode == backend.IndexQueryModeCompare {
return nil, fmt.Errorf("event query mode 'compare' is not supported")
}

nodeBackend, err := backend.New(backend.Params{
State: node.State,
CollectionRPC: builder.CollectionRPC,
HistoricalAccessNodes: builder.HistoricalAccessRPCs,
Blocks: node.Storage.Blocks,
Headers: node.Storage.Headers,
Events: node.Storage.Events,
Collections: node.Storage.Collections,
Transactions: node.Storage.Transactions,
ExecutionReceipts: node.Storage.Receipts,
Expand All @@ -1463,6 +1479,7 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
TxErrorMessagesCacheSize: builder.TxErrorMessagesCacheSize,
ScriptExecutor: builder.ScriptExecutor,
ScriptExecutionMode: scriptExecMode,
EventQueryMode: eventQueryMode,
})
if err != nil {
return nil, fmt.Errorf("could not initialize backend: %w", err)
Expand Down
2 changes: 1 addition & 1 deletion engine/access/access_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -984,7 +984,7 @@ func (suite *Suite) TestExecuteScript() {
Log: suite.log,
SnapshotHistoryLimit: backend.DefaultSnapshotHistoryLimit,
Communicator: backend.NewNodeCommunicator(false),
ScriptExecutionMode: backend.ScriptExecutionModeExecutionNodesOnly,
ScriptExecutionMode: backend.IndexQueryModeExecutionNodesOnly,
TxErrorMessagesCacheSize: 1000,
})
require.NoError(suite.T(), err)
Expand Down
2 changes: 2 additions & 0 deletions engine/access/integration_unsecure_grpc_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ type SameGRPCPortTestSuite struct {
// storage
blocks *storagemock.Blocks
headers *storagemock.Headers
events *storagemock.Events
collections *storagemock.Collections
transactions *storagemock.Transactions
receipts *storagemock.ExecutionReceipts
Expand Down Expand Up @@ -101,6 +102,7 @@ func (suite *SameGRPCPortTestSuite) SetupTest() {
suite.snapshot.On("Epochs").Return(suite.epochQuery).Maybe()
suite.blocks = new(storagemock.Blocks)
suite.headers = new(storagemock.Headers)
suite.events = new(storagemock.Events)
suite.transactions = new(storagemock.Transactions)
suite.collections = new(storagemock.Collections)
suite.receipts = new(storagemock.ExecutionReceipts)
Expand Down
26 changes: 14 additions & 12 deletions engine/access/rest/routes/subscribe_events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,11 @@ type testType struct {
headers http.Header
}

var chainID = flow.Testnet
var testEventTypes = []flow.EventType{
"A.0123456789abcdef.flow.event",
"B.0123456789abcdef.flow.event",
"C.0123456789abcdef.flow.event",
unittest.EventTypeFixture(chainID),
unittest.EventTypeFixture(chainID),
unittest.EventTypeFixture(chainID),
}

type SubscribeEventsSuite struct {
Expand Down Expand Up @@ -83,6 +84,8 @@ func (s *SubscribeEventsSuite) SetupTest() {
// update payloads with valid CCF encoded data
for i := range blockEvents.Events {
blockEvents.Events[i].Payload = eventsGenerator.New().Payload

s.T().Logf("block events %d %v => %v", block.Header.Height, block.ID(), blockEvents.Events[i].Type)
}

s.blocks = append(s.blocks, block)
Expand Down Expand Up @@ -143,7 +146,6 @@ func (s *SubscribeEventsSuite) TestSubscribeEvents() {
},
},
}
chain := flow.MonotonicEmulator.Chain()

// create variations for each of the base test
tests := make([]testType, 0, len(testVectors)*2)
Expand All @@ -159,7 +161,7 @@ func (s *SubscribeEventsSuite) TestSubscribeEvents() {

t3 := test
t3.name = fmt.Sprintf("%s - non existing events", test.name)
t3.eventTypes = []string{"A.0123456789abcdff.flow.event"}
t3.eventTypes = []string{fmt.Sprintf("%s_new", testEventTypes[0])}
tests = append(tests, t3)
}

Expand All @@ -170,7 +172,7 @@ func (s *SubscribeEventsSuite) TestSubscribeEvents() {

filter, err := state_stream.NewEventFilter(
state_stream.DefaultEventFilterConfig,
chain,
chainID.Chain(),
test.eventTypes,
test.addresses,
test.contracts)
Expand Down Expand Up @@ -245,9 +247,9 @@ func (s *SubscribeEventsSuite) TestSubscribeEvents() {
// closing the connection after 1 second
go func() {
time.Sleep(1 * time.Second)
close(respRecorder.closed)
respRecorder.Close()
}()
executeWsRequest(req, stateStreamBackend, respRecorder)
executeWsRequest(req, stateStreamBackend, respRecorder, chainID.Chain())
requireResponse(s.T(), respRecorder, expectedEventsResponses)
})
}
Expand All @@ -259,7 +261,7 @@ func (s *SubscribeEventsSuite) TestSubscribeEventsHandlesErrors() {
req, err := getSubscribeEventsRequest(s.T(), s.blocks[0].ID(), s.blocks[0].Header.Height, nil, nil, nil, 1, nil)
require.NoError(s.T(), err)
respRecorder := newTestHijackResponseRecorder()
executeWsRequest(req, stateStreamBackend, respRecorder)
executeWsRequest(req, stateStreamBackend, respRecorder, chainID.Chain())
requireError(s.T(), respRecorder, "can only provide either block ID or start height")
})

Expand All @@ -284,7 +286,7 @@ func (s *SubscribeEventsSuite) TestSubscribeEventsHandlesErrors() {
req, err := getSubscribeEventsRequest(s.T(), invalidBlock.ID(), request.EmptyHeight, nil, nil, nil, 1, nil)
require.NoError(s.T(), err)
respRecorder := newTestHijackResponseRecorder()
executeWsRequest(req, stateStreamBackend, respRecorder)
executeWsRequest(req, stateStreamBackend, respRecorder, chainID.Chain())
requireError(s.T(), respRecorder, "stream encountered an error: subscription error")
})

Expand All @@ -293,7 +295,7 @@ func (s *SubscribeEventsSuite) TestSubscribeEventsHandlesErrors() {
req, err := getSubscribeEventsRequest(s.T(), s.blocks[0].ID(), request.EmptyHeight, []string{"foo"}, nil, nil, 1, nil)
require.NoError(s.T(), err)
respRecorder := newTestHijackResponseRecorder()
executeWsRequest(req, stateStreamBackend, respRecorder)
executeWsRequest(req, stateStreamBackend, respRecorder, chainID.Chain())
requireError(s.T(), respRecorder, "invalid event type format")
})

Expand All @@ -318,7 +320,7 @@ func (s *SubscribeEventsSuite) TestSubscribeEventsHandlesErrors() {
req, err := getSubscribeEventsRequest(s.T(), s.blocks[0].ID(), request.EmptyHeight, nil, nil, nil, 1, nil)
require.NoError(s.T(), err)
respRecorder := newTestHijackResponseRecorder()
executeWsRequest(req, stateStreamBackend, respRecorder)
executeWsRequest(req, stateStreamBackend, respRecorder, chainID.Chain())
requireError(s.T(), respRecorder, "subscription channel closed")
})
}
Expand Down
14 changes: 12 additions & 2 deletions engine/access/rest/routes/test_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ func (c fakeNetConn) Close() error {
}
return nil
}

func (c fakeNetConn) LocalAddr() net.Addr { return localAddr }
func (c fakeNetConn) RemoteAddr() net.Addr { return remoteAddr }
func (c fakeNetConn) SetDeadline(t time.Time) error { return nil }
Expand Down Expand Up @@ -101,6 +102,15 @@ func (w *testHijackResponseRecorder) Hijack() (net.Conn, *bufio.ReadWriter, erro
return fakeNetConn{w.responseBuff, w.closed}, bufio.NewReadWriter(br, bw), nil
}

func (w *testHijackResponseRecorder) Close() error {
select {
case <-w.closed:
default:
close(w.closed)
}
return nil
}

// newTestHijackResponseRecorder creates a new instance of testHijackResponseRecorder.
func newTestHijackResponseRecorder() *testHijackResponseRecorder {
return &testHijackResponseRecorder{
Expand All @@ -122,7 +132,7 @@ func executeRequest(req *http.Request, backend access.API) *httptest.ResponseRec
return rr
}

func executeWsRequest(req *http.Request, stateStreamApi state_stream.API, responseRecorder *testHijackResponseRecorder) {
func executeWsRequest(req *http.Request, stateStreamApi state_stream.API, responseRecorder *testHijackResponseRecorder, chain flow.Chain) {
restCollector := metrics.NewNoopCollector()

config := backend.Config{
Expand All @@ -133,7 +143,7 @@ func executeWsRequest(req *http.Request, stateStreamApi state_stream.API, respon

router := NewRouterBuilder(unittest.Logger(), restCollector).AddWsRoutes(
stateStreamApi,
flow.Testnet.Chain(), config).Build()
chain, config).Build()
router.ServeHTTP(responseRecorder, req)
}

Expand Down
15 changes: 10 additions & 5 deletions engine/access/rpc/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ type Params struct {
HistoricalAccessNodes []accessproto.AccessAPIClient
Blocks storage.Blocks
Headers storage.Headers
Events storage.Events
Collections storage.Collections
Transactions storage.Transactions
ExecutionReceipts storage.ExecutionReceipts
Expand All @@ -101,7 +102,8 @@ type Params struct {
TxResultCacheSize uint
TxErrorMessagesCacheSize uint
ScriptExecutor execution.ScriptExecutor
ScriptExecutionMode ScriptExecutionMode
ScriptExecutionMode IndexQueryMode
EventQueryMode IndexQueryMode
}

// New creates backend instance
Expand Down Expand Up @@ -146,18 +148,19 @@ func New(params Params) (*Backend, error) {
state: params.State,
// create the sub-backends
backendScripts: backendScripts{
log: params.Log,
headers: params.Headers,
executionReceipts: params.ExecutionReceipts,
connFactory: params.ConnFactory,
state: params.State,
log: params.Log,
metrics: params.AccessMetrics,
loggedScripts: loggedScripts,
nodeCommunicator: params.Communicator,
scriptExecutor: params.ScriptExecutor,
scriptExecMode: params.ScriptExecutionMode,
},
backendTransactions: backendTransactions{
log: params.Log,
staticCollectionRPC: params.CollectionRPC,
state: params.State,
chainID: params.ChainID,
Expand All @@ -171,19 +174,21 @@ func New(params Params) (*Backend, error) {
retry: retry,
connFactory: params.ConnFactory,
previousAccessNodes: params.HistoricalAccessNodes,
log: params.Log,
nodeCommunicator: params.Communicator,
txResultCache: txResCache,
txErrorMessagesCache: txErrorMessagesCache,
},
backendEvents: backendEvents{
log: params.Log,
chain: params.ChainID.Chain(),
state: params.State,
headers: params.Headers,
events: params.Events,
executionReceipts: params.ExecutionReceipts,
connFactory: params.ConnFactory,
log: params.Log,
maxHeightRange: params.MaxHeightRange,
nodeCommunicator: params.Communicator,
queryMode: params.EventQueryMode,
},
backendBlockHeaders: backendBlockHeaders{
headers: params.Headers,
Expand All @@ -194,11 +199,11 @@ func New(params Params) (*Backend, error) {
state: params.State,
},
backendAccounts: backendAccounts{
log: params.Log,
state: params.State,
headers: params.Headers,
executionReceipts: params.ExecutionReceipts,
connFactory: params.ConnFactory,
log: params.Log,
nodeCommunicator: params.Communicator,
scriptExecutor: params.ScriptExecutor,
scriptExecMode: params.ScriptExecutionMode,
Expand Down
10 changes: 5 additions & 5 deletions engine/access/rpc/backend/backend_accounts.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ type backendAccounts struct {
connFactory connection.ConnectionFactory
nodeCommunicator Communicator
scriptExecutor execution.ScriptExecutor
scriptExecMode ScriptExecutionMode
scriptExecMode IndexQueryMode
}

// GetAccount returns the account details at the latest sealed block.
Expand Down Expand Up @@ -93,13 +93,13 @@ func (b *backendAccounts) getAccountAtBlock(
height uint64,
) (*flow.Account, error) {
switch b.scriptExecMode {
case ScriptExecutionModeExecutionNodesOnly:
case IndexQueryModeExecutionNodesOnly:
return b.getAccountFromAnyExeNode(ctx, address, blockID)

case ScriptExecutionModeLocalOnly:
case IndexQueryModeLocalOnly:
return b.getAccountFromLocalStorage(ctx, address, height)

case ScriptExecutionModeFailover:
case IndexQueryModeFailover:
localResult, localErr := b.getAccountFromLocalStorage(ctx, address, height)
if localErr == nil {
return localResult, nil
Expand All @@ -110,7 +110,7 @@ func (b *backendAccounts) getAccountAtBlock(

return execResult, execErr

case ScriptExecutionModeCompare:
case IndexQueryModeCompare:
execResult, execErr := b.getAccountFromAnyExeNode(ctx, address, blockID)
// Only compare actual get account errors from the EN, not system errors
if execErr != nil && !isInvalidArgumentError(execErr) {
Expand Down
Loading
Loading