Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Access] Enable Event streaming on REST API #4547

Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
55 commits
Select commit Hold shift + click to select a range
61be7dd
Added websocket handler, common http handler, refactored rest, added …
UlyanaAndrukhiv Jul 5, 2023
6fcc308
Updated last commit
UlyanaAndrukhiv Jul 7, 2023
3040fe0
Reverted back flag name
UlyanaAndrukhiv Jul 7, 2023
82b29ab
Added comments
UlyanaAndrukhiv Jul 7, 2023
39e8768
Refactored handlers, added filters for subscribe_events endpoint
UlyanaAndrukhiv Jul 13, 2023
334f752
Merged with master, updated tests, moved creating state stream backen…
UlyanaAndrukhiv Jul 24, 2023
6a68c01
Fixed flags
UlyanaAndrukhiv Jul 24, 2023
d6f51da
Updated tests
UlyanaAndrukhiv Jul 24, 2023
6f298e6
Added test, added Hijack impl for response writer in metrics
UlyanaAndrukhiv Jul 24, 2023
feb6dda
Updated subscribe_events rest route, remove subscribe_handler
UlyanaAndrukhiv Jul 28, 2023
88da0b1
Added unit tests
UlyanaAndrukhiv Aug 2, 2023
fbbc240
Added part of intagration tests
UlyanaAndrukhiv Aug 2, 2023
d07b5fa
Added integration test for rest event streaming, updated unit tests, …
UlyanaAndrukhiv Aug 3, 2023
40bac0a
Merged with master
UlyanaAndrukhiv Aug 3, 2023
b372ed9
Updated routeUrlMap init for rest
UlyanaAndrukhiv Aug 3, 2023
379ec43
Removed unnecessary log
UlyanaAndrukhiv Aug 4, 2023
8068f97
Added more comments
UlyanaAndrukhiv Aug 4, 2023
7c6442c
Moved part of state_stream impl back to access/state_stream package
UlyanaAndrukhiv Aug 9, 2023
296afff
Updated rest test according to comments, removed unnecessary empty li…
UlyanaAndrukhiv Aug 9, 2023
efcf292
Reverted back imports order
UlyanaAndrukhiv Aug 9, 2023
7f3297b
Merge branch 'master' into UlyanaAndrukhiv/4379-rest-event-streaming
UlyanaAndrukhiv Aug 9, 2023
cae72f0
Merge branch 'master' into UlyanaAndrukhiv/4379-rest-event-streaming
UlyanaAndrukhiv Aug 10, 2023
9d73bfb
Merge branch 'master' into UlyanaAndrukhiv/4379-rest-event-streaming
UlyanaAndrukhiv Aug 15, 2023
31ba626
Refactored subscribeEvents function
UlyanaAndrukhiv Aug 16, 2023
1a9f924
Added more comments, linted
UlyanaAndrukhiv Aug 16, 2023
48eed8b
Merge branch 'master' into UlyanaAndrukhiv/4379-rest-event-streaming
UlyanaAndrukhiv Aug 16, 2023
4477f31
Updated unit tests, linted, added more comments
UlyanaAndrukhiv Aug 17, 2023
a9a4f62
Merge branch 'master' into UlyanaAndrukhiv/4379-rest-event-streaming
UlyanaAndrukhiv Aug 17, 2023
6322ba9
Updated tests, linted
UlyanaAndrukhiv Aug 17, 2023
7908805
Merge branch 'master' into UlyanaAndrukhiv/4379-rest-event-streaming
UlyanaAndrukhiv Aug 18, 2023
3cd7b32
Upgraded state streaming impl
UlyanaAndrukhiv Aug 18, 2023
83f9805
Remove unnecessary comment
UlyanaAndrukhiv Aug 18, 2023
f70ba98
Merge branch 'master' into UlyanaAndrukhiv/4379-rest-event-streaming
UlyanaAndrukhiv Aug 21, 2023
75c757b
Removed unnecessary check for event types
UlyanaAndrukhiv Aug 22, 2023
f387b4f
Linted integration test
UlyanaAndrukhiv Aug 23, 2023
1053dc7
Merge branch 'master' into UlyanaAndrukhiv/4379-rest-event-streaming
UlyanaAndrukhiv Aug 23, 2023
86b66c9
Merge branch 'master' into UlyanaAndrukhiv/4379-rest-event-streaming
UlyanaAndrukhiv Aug 25, 2023
12a19de
Merge branch 'master' into UlyanaAndrukhiv/4379-rest-event-streaming
UlyanaAndrukhiv Aug 30, 2023
df96439
Refactored according to comments
UlyanaAndrukhiv Sep 4, 2023
212d98f
Added checking connection for closing from client side
UlyanaAndrukhiv Sep 5, 2023
a425791
Fixed unit test for subscribe events
UlyanaAndrukhiv Sep 5, 2023
ec3ec18
Merged with master
UlyanaAndrukhiv Sep 6, 2023
f4f172c
Merge branch 'master' into UlyanaAndrukhiv/4379-rest-event-streaming
UlyanaAndrukhiv Sep 7, 2023
23cad55
Updated according to comments
UlyanaAndrukhiv Sep 12, 2023
20b10fb
Merge branch 'master' into UlyanaAndrukhiv/4379-rest-event-streaming
UlyanaAndrukhiv Sep 12, 2023
f6325b6
Updated error according to comment
UlyanaAndrukhiv Sep 12, 2023
3252ec4
Merge branch 'master' into UlyanaAndrukhiv/4379-rest-event-streaming
UlyanaAndrukhiv Sep 13, 2023
c630e4a
Added RouterBuilder and updated rest unit tests. Added fixes accordin…
UlyanaAndrukhiv Sep 13, 2023
b9e4bef
Merge branch 'master' into UlyanaAndrukhiv/4379-rest-event-streaming
UlyanaAndrukhiv Sep 14, 2023
47dc938
Updated according to commits
UlyanaAndrukhiv Sep 14, 2023
4d60ad7
Merge branch 'master' into UlyanaAndrukhiv/4379-rest-event-streaming
durkmurder Sep 14, 2023
b20cb94
Updated according to last comments
UlyanaAndrukhiv Sep 14, 2023
1022c09
Merge branch 'master' into UlyanaAndrukhiv/4379-rest-event-streaming
UlyanaAndrukhiv Sep 14, 2023
303c02f
Added small fixes according to last comments
UlyanaAndrukhiv Sep 15, 2023
5cbbec6
Merge branch 'master' into UlyanaAndrukhiv/4379-rest-event-streaming
UlyanaAndrukhiv Sep 15, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 38 additions & 16 deletions cmd/access/node_builder/access_node_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,15 @@ import (
"github.com/onflow/flow-go/consensus/hotstuff/verification"
recovery "github.com/onflow/flow-go/consensus/recovery/protocol"
"github.com/onflow/flow-go/crypto"
"github.com/onflow/flow-go/engine"
"github.com/onflow/flow-go/engine/access/ingestion"
pingeng "github.com/onflow/flow-go/engine/access/ping"
"github.com/onflow/flow-go/engine/access/rpc"
"github.com/onflow/flow-go/engine/access/rpc/backend"
"github.com/onflow/flow-go/engine/access/state_stream"
followereng "github.com/onflow/flow-go/engine/common/follower"
"github.com/onflow/flow-go/engine/common/requester"
common_state_stream "github.com/onflow/flow-go/engine/common/state_stream"
synceng "github.com/onflow/flow-go/engine/common/synchronization"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/model/flow/filter"
Expand Down Expand Up @@ -115,7 +117,8 @@ type AccessNodeConfig struct {
apiRatelimits map[string]int
apiBurstlimits map[string]int
rpcConf rpc.Config
stateStreamConf state_stream.Config
stateStreamConf common_state_stream.Config
stateStreamBackend common_state_stream.API
stateStreamFilterConf map[string]int
ExecutionNodeAddress string // deprecated
HistoricalAccessRPCs []access.AccessAPIClient
Expand Down Expand Up @@ -161,15 +164,16 @@ func DefaultAccessNodeConfig() *AccessNodeConfig {
ArchiveAddressList: nil,
MaxMsgSize: grpcutils.DefaultMaxMsgSize,
},
stateStreamConf: state_stream.Config{
stateStreamConf: common_state_stream.Config{
MaxExecutionDataMsgSize: grpcutils.DefaultMaxMsgSize,
ExecutionDataCacheSize: state_stream.DefaultCacheSize,
ClientSendTimeout: state_stream.DefaultSendTimeout,
ClientSendBufferSize: state_stream.DefaultSendBufferSize,
MaxGlobalStreams: state_stream.DefaultMaxGlobalStreams,
EventFilterConfig: state_stream.DefaultEventFilterConfig,
ResponseLimit: state_stream.DefaultResponseLimit,
ExecutionDataCacheSize: common_state_stream.DefaultCacheSize,
ClientSendTimeout: common_state_stream.DefaultSendTimeout,
ClientSendBufferSize: common_state_stream.DefaultSendBufferSize,
MaxGlobalStreams: common_state_stream.DefaultMaxGlobalStreams,
EventFilterConfig: common_state_stream.DefaultEventFilterConfig,
ResponseLimit: common_state_stream.DefaultResponseLimit,
},
stateStreamBackend: nil,
stateStreamFilterConf: nil,
ExecutionNodeAddress: "localhost:9000",
logTxTimeToFinalized: false,
Expand Down Expand Up @@ -600,20 +604,35 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionDataRequester() *FlowAccessN
return nil, fmt.Errorf("could not get highest consecutive height: %w", err)
}

stateStreamEng, err := state_stream.NewEng(
broadcaster := engine.NewBroadcaster()

backend, err := common_state_stream.New(
node.Logger,
builder.stateStreamConf,
builder.ExecutionDataStore,
executionDataCache,
node.State,
node.Storage.Headers,
node.Storage.Seals,
node.Storage.Results,
node.RootChainID,
builder.ExecutionDataStore,
executionDataCache,
broadcaster,
builder.executionDataConfig.InitialBlockHeight,
highestAvailableHeight,
)
if err != nil {
return nil, fmt.Errorf("could not create state stream backend: %w", err)
}

stateStreamEng, err := state_stream.NewEng(
node.Logger,
builder.stateStreamConf,
executionDataCache,
node.Storage.Headers,
node.RootChainID,
builder.apiRatelimits,
builder.apiBurstlimits,
backend,
broadcaster,
)
if err != nil {
return nil, fmt.Errorf("could not create state stream engine: %w", err)
Expand Down Expand Up @@ -892,6 +911,10 @@ func (builder *FlowAccessNodeBuilder) enqueueRelayNetwork() {
}

func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
if builder.executionDataSyncEnabled {
builder.BuildExecutionDataRequester()
UlyanaAndrukhiv marked this conversation as resolved.
Show resolved Hide resolved
}

builder.
BuildConsensusFollower().
Module("collection node client", func(node *cmd.NodeConfig) error {
Expand Down Expand Up @@ -1008,6 +1031,9 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
builder.apiRatelimits,
builder.apiBurstlimits,
builder.Me,
builder.stateStreamBackend,
builder.stateStreamConf.EventFilterConfig,
builder.stateStreamConf.MaxGlobalStreams,
)
if err != nil {
return nil, err
Expand Down Expand Up @@ -1093,10 +1119,6 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
})
}

if builder.executionDataSyncEnabled {
builder.BuildExecutionDataRequester()
}

builder.Component("ping engine", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) {
ping, err := pingeng.New(
node.Logger,
Expand Down
4 changes: 4 additions & 0 deletions cmd/observer/node_builder/observer_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/onflow/flow-go/engine/access/rpc"
"github.com/onflow/flow-go/engine/access/rpc/backend"
"github.com/onflow/flow-go/engine/common/follower"
"github.com/onflow/flow-go/engine/common/state_stream"
synceng "github.com/onflow/flow-go/engine/common/synchronization"
"github.com/onflow/flow-go/engine/protocol"
"github.com/onflow/flow-go/model/encodable"
Expand Down Expand Up @@ -862,6 +863,9 @@ func (builder *ObserverServiceBuilder) enqueueRPCServer() {
builder.apiRatelimits,
builder.apiBurstlimits,
builder.Me,
nil,
UlyanaAndrukhiv marked this conversation as resolved.
Show resolved Hide resolved
state_stream.DefaultEventFilterConfig,
0,
)
if err != nil {
return nil, err
Expand Down
3 changes: 1 addition & 2 deletions engine/access/rest/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,9 @@ package rest
import (
"fmt"

"github.com/onflow/flow-go/access"
"github.com/onflow/flow-go/engine/access/rest/models"
"github.com/onflow/flow-go/engine/access/rest/request"

"github.com/onflow/flow-go/access"
)

const blockQueryParam = "block_ids"
Expand Down
112 changes: 10 additions & 102 deletions engine/access/rest/handler.go
Original file line number Diff line number Diff line change
@@ -1,26 +1,17 @@
package rest

import (
"encoding/json"
"errors"
"fmt"
"net/http"

"github.com/rs/zerolog"

"github.com/onflow/flow-go/access"
"github.com/onflow/flow-go/engine/access/rest/models"
"github.com/onflow/flow-go/engine/access/rest/request"
"github.com/onflow/flow-go/engine/access/rest/util"
fvmErrors "github.com/onflow/flow-go/fvm/errors"
"github.com/onflow/flow-go/model/flow"

"github.com/rs/zerolog"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/onflow/flow-go/access"
)

const MaxRequestSize = 2 << 20 // 2MB

// ApiHandlerFunc is a function that contains endpoint handling logic,
// it fetches necessary resources and returns an error or response model.
type ApiHandlerFunc func(
Expand All @@ -33,11 +24,10 @@ type ApiHandlerFunc func(
// Handler function allows easier handling of errors and responses as it
// wraps functionality for handling error and responses outside of endpoint handling.
type Handler struct {
logger zerolog.Logger
*HttpHandler
backend access.API
linkGenerator models.LinkGenerator
apiHandlerFunc ApiHandlerFunc
chain flow.Chain
}

func NewHandler(
Expand All @@ -47,31 +37,26 @@ func NewHandler(
generator models.LinkGenerator,
chain flow.Chain,
) *Handler {
return &Handler{
logger: logger,
handler := &Handler{
backend: backend,
apiHandlerFunc: handlerFunc,
linkGenerator: generator,
chain: chain,
}
handler.HttpHandler = NewHttpHandler(logger, chain)
return handler
}

// ServerHTTP function acts as a wrapper to each request providing common handling functionality
// such as logging, error handling, request decorators
func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// create a logger
errLog := h.logger.With().Str("request_url", r.URL.String()).Logger()
errLog := h.Logger.With().Str("request_url", r.URL.String()).Logger()

// limit requested body size
r.Body = http.MaxBytesReader(w, r.Body, MaxRequestSize)
err := r.ParseForm()
err := h.VerifyRequest(w, r)
if err != nil {
h.errorHandler(w, err, errLog)
return
}

// create request decorator with parsed values
decoratedRequest := request.Decorate(r, h.chain)
decoratedRequest := request.Decorate(r, h.Chain)

// execute handler function and check for error
response, err := h.apiHandlerFunc(decoratedRequest, h.backend, h.linkGenerator)
Expand All @@ -90,80 +75,3 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// write response to response stream
h.jsonResponse(w, http.StatusOK, response, errLog)
}

func (h *Handler) errorHandler(w http.ResponseWriter, err error, errorLogger zerolog.Logger) {
// rest status type error should be returned with status and user message provided
var statusErr StatusError
if errors.As(err, &statusErr) {
h.errorResponse(w, statusErr.Status(), statusErr.UserMessage(), errorLogger)
return
}

// handle cadence errors
cadenceError := fvmErrors.Find(err, fvmErrors.ErrCodeCadenceRunTimeError)
if cadenceError != nil {
msg := fmt.Sprintf("Cadence error: %s", cadenceError.Error())
h.errorResponse(w, http.StatusBadRequest, msg, errorLogger)
return
}

// handle grpc status error returned from the backend calls, we are forwarding the message to the client
if se, ok := status.FromError(err); ok {
if se.Code() == codes.NotFound {
msg := fmt.Sprintf("Flow resource not found: %s", se.Message())
h.errorResponse(w, http.StatusNotFound, msg, errorLogger)
return
}
if se.Code() == codes.InvalidArgument {
msg := fmt.Sprintf("Invalid Flow argument: %s", se.Message())
h.errorResponse(w, http.StatusBadRequest, msg, errorLogger)
return
}
if se.Code() == codes.Internal {
msg := fmt.Sprintf("Invalid Flow request: %s", se.Message())
h.errorResponse(w, http.StatusBadRequest, msg, errorLogger)
return
}
}

// stop going further - catch all error
msg := "internal server error"
errorLogger.Error().Err(err).Msg(msg)
h.errorResponse(w, http.StatusInternalServerError, msg, errorLogger)
}

// jsonResponse builds a JSON response and send it to the client
func (h *Handler) jsonResponse(w http.ResponseWriter, code int, response interface{}, errLogger zerolog.Logger) {
w.Header().Set("Content-Type", "application/json; charset=UTF-8")

// serialize response to JSON and handler errors
encodedResponse, err := json.MarshalIndent(response, "", "\t")
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
errLogger.Error().Err(err).Str("response", string(encodedResponse)).Msg("failed to indent response")
return
}

w.WriteHeader(code)
// write response to response stream
_, err = w.Write(encodedResponse)
if err != nil {
errLogger.Error().Err(err).Str("response", string(encodedResponse)).Msg("failed to write http response")
}
}

// errorResponse sends an HTTP error response to the client with the given return code
// and a model error with the given response message in the response body
func (h *Handler) errorResponse(
w http.ResponseWriter,
returnCode int,
responseMessage string,
logger zerolog.Logger,
) {
// create error response model
modelError := models.ModelError{
Code: int32(returnCode),
Message: responseMessage,
}
h.jsonResponse(w, returnCode, modelError, logger)
}
Loading