Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 71 additions & 17 deletions bootstrap/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"github.com/sethvargo/go-limiter/memorystore"
grpcOpts "google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/resolver/manual"
"google.golang.org/grpc/status"

"github.com/onflow/flow-evm-gateway/api"
Expand All @@ -41,13 +43,13 @@ const (
// DefaultMaxMessageSize is the default maximum message size for gRPC responses
DefaultMaxMessageSize = 1024 * 1024 * 1024

// DefaultResourceExhaustedRetryDelay is the default delay between retries when the server returns
// a ResourceExhausted error.
DefaultResourceExhaustedRetryDelay = 100 * time.Millisecond
// DefaultRetryDelay is the default delay between retries when a gRPC request
// to one of the Access Nodes has errored out.
DefaultRetryDelay = 100 * time.Millisecond

// DefaultResourceExhaustedMaxRetryDelay is the default max request duration when retrying server
// ResourceExhausted errors.
DefaultResourceExhaustedMaxRetryDelay = 30 * time.Second
// DefaultMaxRetryDelay is the default max request duration when retrying failed
// gRPC requests to one of the Access Nodes.
DefaultMaxRetryDelay = 30 * time.Second
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that we are adding load-balancing functionality, should we maybe decrease the max retry duration?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this load balancing or failovers? I'm not aware of a demand for load balancing requests to different backends, but there is clear need for failing over when the primary is unavailable.

Copy link
Collaborator Author

@m-Peter m-Peter Oct 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that we use pick_first as the load balancing strategy, effectively this works as a failover mechanism. Because it will stick to the same backend, until that backend is unable to serve any requests (due to connectivity issues), in which case it will pick the next available backend.

I only changed the name of the constant, from DefaultResourceExhaustedMaxRetryDelay to DefaultMaxRetryDelay, because previously the retryInterceptor would only retry on ResourceExhausted errors.

But I've updated that condition in f30b0b3, to account for more related errors that we can retry on the same AN.

)

type Storages struct {
Expand Down Expand Up @@ -474,16 +476,59 @@ func StartEngine(
// setupCrossSporkClient sets up a cross-spork AN client.
func setupCrossSporkClient(config config.Config, logger zerolog.Logger) (*requester.CrossSporkClient, error) {
// create access client with cross-spork capabilities
currentSporkClient, err := grpc.NewClient(
config.AccessNodeHost,
grpc.WithGRPCDialOptions(
grpcOpts.WithDefaultCallOptions(grpcOpts.MaxCallRecvMsgSize(DefaultMaxMessageSize)),
grpcOpts.WithUnaryInterceptor(retryInterceptor(
DefaultResourceExhaustedMaxRetryDelay,
DefaultResourceExhaustedRetryDelay,
)),
),
)
var currentSporkClient *grpc.Client
var err error

if len(config.AccessNodeBackupHosts) > 0 {
mr := manual.NewBuilderWithScheme("dns")
defer mr.Close()

// `pick_first` tries to connect to the first address, uses it for all RPCs
// if it connects, or try the next address if it fails
// (and keep doing that until one connection is successful).
// Because of this, all the RPCs will be sent to the same backend. See more on:
// https://github.com/grpc/grpc-go/tree/master/examples/features/load_balancing#pick_first
json := `{"loadBalancingConfig": [{"pick_first":{}}]}`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you document what the expected behavior is? I'm assuming that pick_first means it will always use the first one unless it fails, then it will go to the next. it would be good to make it clear here.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, I've added comments in 117deb1, with a link to the official example: https://github.com/grpc/grpc-go/tree/master/examples/features/load_balancing#pick_first .

endpoints := []resolver.Endpoint{
{Addresses: []resolver.Address{{Addr: config.AccessNodeHost}}},
}

for _, accessNodeBackupAddr := range config.AccessNodeBackupHosts {
endpoints = append(endpoints, resolver.Endpoint{
Addresses: []resolver.Address{{Addr: accessNodeBackupAddr}},
})
}

mr.InitialState(resolver.State{
Endpoints: endpoints,
})

targetHost := fmt.Sprintf("%s:///%s", mr.Scheme(), "flow-access")
currentSporkClient, err = grpc.NewClient(
targetHost,
grpc.WithGRPCDialOptions(
grpcOpts.WithDefaultCallOptions(grpcOpts.MaxCallRecvMsgSize(DefaultMaxMessageSize)),
grpcOpts.WithResolvers(mr),
grpcOpts.WithDefaultServiceConfig(json),
grpcOpts.WithUnaryInterceptor(retryInterceptor(
DefaultMaxRetryDelay,
DefaultRetryDelay,
)),
),
)
} else {
currentSporkClient, err = grpc.NewClient(
config.AccessNodeHost,
grpc.WithGRPCDialOptions(
grpcOpts.WithDefaultCallOptions(grpcOpts.MaxCallRecvMsgSize(DefaultMaxMessageSize)),
grpcOpts.WithUnaryInterceptor(retryInterceptor(
DefaultMaxRetryDelay,
DefaultRetryDelay,
)),
),
)
}

if err != nil {
return nil, fmt.Errorf(
"failed to create client connection for host: %s, with error: %w",
Expand Down Expand Up @@ -536,7 +581,16 @@ func retryInterceptor(maxDuration, pauseDuration time.Duration) grpcOpts.UnaryCl
return nil
}

if status.Code(err) != codes.ResourceExhausted {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we added the load balancing config:

`{"loadBalancingConfig": [{"pick_first":{}}]}`

I removed this error code check entirely.

The reason being, if we receive any kind of gRPC error from one of the ANs:

  1. The request will be retried for the max specified duration on the same AN
  2. Then the configured pick_first load-balancing strategy, will try the next ANs, until it finds one that responds without an error

However, I just noticed that the retryInterceptor is used even when there isn't any configured back-up ANs.
Should we just change the DefaultMaxRetryDelay instead?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a way to customize the behavior?

for instance, if the backend returned ResourceExhausted, retrying immediately will only make it worse and the node will eventually have to fail over. vs pausing briefly may allow the next request to succeed.

Similarly, if the error is OutOfRange or NotFound, retrying immediately is not likely to succeed.

Canceled or DeadlineExceeded are guaranteed to fail all requests if the source was a local context.

Copy link
Collaborator Author

@m-Peter m-Peter Oct 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason being, if we receive any kind of gRPC error from one of the ANs:

  1. The request will be retried for the max specified duration on the same AN
  2. Then the configured pick_first load-balancing strategy, will try the next ANs, until it finds one that responds without an error

Sorry about that, but I had a misconception about how these 2 relate to each other.

The pick_first load-balancing strategy only checks the node's connectivity state ( Ready / Connecting / Idle / TransientFailure etc ), and that's the only signal it uses in order to connect to the next node.
The errors that might be returned from specific node API calls do not really affect the node selection in any way, thus we can't direct the load-balancer when to connect to a different node inside the retryInterceptor.

So the retryInterceptor and the pick_first load-balancing strategy don't get in each other's feet.

I have added some error handling in retryInterceptor in f30b0b3 , to include the errors you mentioned above.
Is that what you had in mind?

switch status.Code(err) {
case codes.Canceled, codes.DeadlineExceeded:
// these kind of errors are guaranteed to fail all requests,
// if the source was a local context
return err
case codes.ResourceExhausted, codes.OutOfRange, codes.NotFound:
// when we receive these errors, we pause briefly, so that
// the next request on the same AN, has a higher chance
// of success.
default:
return err
}

Expand Down
6 changes: 6 additions & 0 deletions cmd/run/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,10 @@ func parseConfigFromFlags() error {
}
cfg.FilterExpiry = exp

if accessNodeBackupHosts != "" {
cfg.AccessNodeBackupHosts = strings.Split(accessNodeBackupHosts, ",")
}

if accessSporkHosts != "" {
heightHosts := strings.Split(accessSporkHosts, ",")
cfg.AccessNodePreviousSporkHosts = append(cfg.AccessNodePreviousSporkHosts, heightHosts...)
Expand Down Expand Up @@ -242,6 +246,7 @@ var (
logWriter,
filterExpiry,
accessSporkHosts,
accessNodeBackupHosts,
cloudKMSKey,
cloudKMSProjectID,
cloudKMSLocationID,
Expand All @@ -259,6 +264,7 @@ func init() {
Cmd.Flags().IntVar(&cfg.RPCPort, "rpc-port", 8545, "Port for the RPC API server")
Cmd.Flags().BoolVar(&cfg.WSEnabled, "ws-enabled", false, "Enable websocket connections")
Cmd.Flags().StringVar(&cfg.AccessNodeHost, "access-node-grpc-host", "localhost:3569", "Host to the flow access node gRPC API")
Cmd.Flags().StringVar(&accessNodeBackupHosts, "access-node-backup-hosts", "", `Backup AN hosts to use in case of connectivity issues, defined following the schema: {host1},{host2} as a comma separated list (e.g. "host-1.com,host2.com")`)
Cmd.Flags().StringVar(&accessSporkHosts, "access-node-spork-hosts", "", `Previous spork AN hosts, defined following the schema: {host1},{host2} as a comma separated list (e.g. "host-1.com,host2.com")`)
Cmd.Flags().StringVar(&flowNetwork, "flow-network-id", "flow-emulator", "Flow network ID (flow-emulator, flow-previewnet, flow-testnet, flow-mainnet)")
Cmd.Flags().StringVar(&coinbase, "coinbase", "", "Coinbase address to use for fee collection")
Expand Down
3 changes: 3 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ type Config struct {
DatabaseDir string
// AccessNodeHost defines the current spork Flow network AN host.
AccessNodeHost string
// AccessNodeBackupHosts contains a list of ANs hosts to use as backup, in
// case of connectivity issues with `AccessNodeHost`.
AccessNodeBackupHosts []string
// AccessNodePreviousSporkHosts contains a list of the ANs hosts for each spork
AccessNodePreviousSporkHosts []string
// GRPCPort for the RPC API server
Expand Down
35 changes: 25 additions & 10 deletions services/ingestion/event_subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,23 +224,38 @@ func (r *RPCEventSubscriber) subscribe(ctx context.Context, height uint64) <-cha
// we can get not found when reconnecting after a disconnect/restart before the
// next block is finalized. just wait briefly and try again
time.Sleep(200 * time.Millisecond)
case codes.DeadlineExceeded, codes.Internal:
case codes.DeadlineExceeded, codes.Internal, codes.Unavailable:
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

During some local testing with having 2 Flow Emulator processes, when I killed the 1st process, which was configured as the main AN(AccessNodeHost), then the EVM GW crashed with:

failure in event subscription with: recoverable: disconnected:
error receiving event: rpc error:
code = Unavailable desc = error reading from server: EOF

Adding the codes.Unavailable case, solved this issue.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think Unavailable is different since it is unlikely that the node will suddenly be available on reconnect. I think if we receive Unavailable, it should failover to the new node. we could let it retry for some period first, but at the expense of delaying data.

Copy link
Collaborator Author

@m-Peter m-Peter Oct 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not really sure how SubscribeEventsByBlockHeight is implemented under the hood, but I've observed that it doesn't go through the retryInterceptor, unlike other AN calls. For example:

[METHOD]:  /flow.access.AccessAPI/GetLatestBlock
[METHOD]:  /flow.access.AccessAPI/GetAccountAtLatestBlock
[METHOD]:  /flow.access.AccessAPI/SendTransaction
[METHOD]:  /flow.access.AccessAPI/ExecuteScriptAtLatestBlock

I have verified though, that we do need codes.Unavailable in the switch case above, so that it triggers a reconnect with:

if err := connect(lastReceivedHeight); err != nil {
	eventsChan <- models.NewBlockEventsError(
		fmt.Errorf(
			"failed to resubscribe for events on height: %d, with: %w",
			lastReceivedHeight,
			err,
		),
	)
	return
}

Note that if we trigger a reconnect, this will prompt the pick_first load balancer to search for the next node which can serve the given request, even if it is unlikely that the current node will suddenly be available.
This will save the EVM Gateway from a fatal error, if the configured backup ANs are indeed available.

// these are sometimes returned when the stream is disconnected by a middleware or the server
default:
// skip reconnect on all other errors
eventsChan <- models.NewBlockEventsError(fmt.Errorf("%w: %w", errs.ErrDisconnected, err))
return
}

if err := connect(lastReceivedHeight + 1); err != nil {
eventsChan <- models.NewBlockEventsError(
fmt.Errorf(
"failed to resubscribe for events on height: %d, with: %w",
lastReceivedHeight+1,
err,
),
)
return
start := time.Now()
attempts := 0
pauseDuration, maxDuration := 200*time.Millisecond, 30*time.Second
// Allow reconnect retries for up to 30 seconds, with retry
// attempts every 200 ms.
for {
err := connect(lastReceivedHeight)
if err == nil {
break
}

attempts++
duration := time.Since(start)
if duration >= maxDuration {
eventsChan <- models.NewBlockEventsError(
fmt.Errorf(
"failed to resubscribe for events on height: %d, with: %w",
lastReceivedHeight,
err,
),
)
return
}
time.Sleep(pauseDuration)
}
}
}
Expand Down
29 changes: 18 additions & 11 deletions tests/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,23 +67,18 @@ func testLogWriter() io.Writer {
return zerolog.NewConsoleWriter()
}

func startEmulator(createTestAccounts bool) (*server.EmulatorServer, error) {
func defaultServerConfig() *server.Config {
pkey, err := crypto.DecodePrivateKeyHex(sigAlgo, servicePrivateKey)
if err != nil {
return nil, err
panic(err)
}

genesisToken, err := cadence.NewUFix64("10000.0")
if err != nil {
return nil, err
panic(err)
}

log := logger.With().Timestamp().Str("component", "emulator").Logger().Level(zerolog.DebugLevel)
if logOutput == "false" {
log = zerolog.Nop()
}

srv := server.NewEmulatorServer(&log, &server.Config{
return &server.Config{
ServicePrivateKey: pkey,
ServiceKeySigAlgo: sigAlgo,
ServiceKeyHashAlgo: hashAlgo,
Expand All @@ -94,7 +89,19 @@ func startEmulator(createTestAccounts bool) (*server.EmulatorServer, error) {
TransactionMaxGasLimit: flow.DefaultMaxTransactionGasLimit,
SetupEVMEnabled: true,
SetupVMBridgeEnabled: false,
})
}
}

func startEmulator(createTestAccounts bool, conf *server.Config) (
*server.EmulatorServer,
error,
) {
log := logger.With().Timestamp().Str("component", "emulator").Logger().Level(zerolog.DebugLevel)
if logOutput == "false" {
log = zerolog.Nop()
}

srv := server.NewEmulatorServer(&log, conf)

go func() {
srv.Start()
Expand Down Expand Up @@ -133,7 +140,7 @@ func runWeb3TestWithSetup(
// servicesSetup starts up an emulator and the gateway
// engines required for operation of the evm gateway.
func servicesSetup(t *testing.T) (emulator.Emulator, func()) {
srv, err := startEmulator(true)
srv, err := startEmulator(true, defaultServerConfig())
require.NoError(t, err)

ctx, cancel := context.WithCancel(context.Background())
Expand Down
Loading