-
Notifications
You must be signed in to change notification settings - Fork 12
Allow setting back-up ANs for networking resilience #890
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
5f59a45
3fcc031
5b35970
9c8ba82
86ae4b1
46683ee
e3f57a0
9a74d72
0dd4f58
9e20910
ce4808f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -22,6 +22,8 @@ import ( | |
| "github.com/sethvargo/go-limiter/memorystore" | ||
| grpcOpts "google.golang.org/grpc" | ||
| "google.golang.org/grpc/codes" | ||
| "google.golang.org/grpc/resolver" | ||
| "google.golang.org/grpc/resolver/manual" | ||
| "google.golang.org/grpc/status" | ||
|
|
||
| "github.com/onflow/flow-evm-gateway/api" | ||
|
|
@@ -41,13 +43,13 @@ const ( | |
| // DefaultMaxMessageSize is the default maximum message size for gRPC responses | ||
| DefaultMaxMessageSize = 1024 * 1024 * 1024 | ||
|
|
||
| // DefaultResourceExhaustedRetryDelay is the default delay between retries when the server returns | ||
| // a ResourceExhausted error. | ||
| DefaultResourceExhaustedRetryDelay = 100 * time.Millisecond | ||
| // DefaultRetryDelay is the default delay between retries when a gRPC request | ||
| // to one of the Access Nodes has errored out. | ||
| DefaultRetryDelay = 100 * time.Millisecond | ||
|
|
||
| // DefaultResourceExhaustedMaxRetryDelay is the default max request duration when retrying server | ||
| // ResourceExhausted errors. | ||
| DefaultResourceExhaustedMaxRetryDelay = 30 * time.Second | ||
| // DefaultMaxRetryDelay is the default max request duration when retrying failed | ||
| // gRPC requests to one of the Access Nodes. | ||
| DefaultMaxRetryDelay = 30 * time.Second | ||
| ) | ||
|
|
||
| type Storages struct { | ||
|
|
@@ -474,16 +476,59 @@ func StartEngine( | |
| // setupCrossSporkClient sets up a cross-spork AN client. | ||
| func setupCrossSporkClient(config config.Config, logger zerolog.Logger) (*requester.CrossSporkClient, error) { | ||
| // create access client with cross-spork capabilities | ||
| currentSporkClient, err := grpc.NewClient( | ||
| config.AccessNodeHost, | ||
| grpc.WithGRPCDialOptions( | ||
| grpcOpts.WithDefaultCallOptions(grpcOpts.MaxCallRecvMsgSize(DefaultMaxMessageSize)), | ||
| grpcOpts.WithUnaryInterceptor(retryInterceptor( | ||
| DefaultResourceExhaustedMaxRetryDelay, | ||
| DefaultResourceExhaustedRetryDelay, | ||
| )), | ||
| ), | ||
| ) | ||
| var currentSporkClient *grpc.Client | ||
| var err error | ||
|
|
||
| if len(config.AccessNodeBackupHosts) > 0 { | ||
| mr := manual.NewBuilderWithScheme("dns") | ||
| defer mr.Close() | ||
m-Peter marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| // `pick_first` tries to connect to the first address, uses it for all RPCs | ||
| // if it connects, or try the next address if it fails | ||
| // (and keep doing that until one connection is successful). | ||
| // Because of this, all the RPCs will be sent to the same backend. See more on: | ||
| // https://github.com/grpc/grpc-go/tree/master/examples/features/load_balancing#pick_first | ||
| json := `{"loadBalancingConfig": [{"pick_first":{}}]}` | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can you document what the expected behavior is? I'm assuming that There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good point, I've added comments in 117deb1, with a link to the official example: https://github.com/grpc/grpc-go/tree/master/examples/features/load_balancing#pick_first . |
||
| endpoints := []resolver.Endpoint{ | ||
| {Addresses: []resolver.Address{{Addr: config.AccessNodeHost}}}, | ||
| } | ||
|
|
||
| for _, accessNodeBackupAddr := range config.AccessNodeBackupHosts { | ||
| endpoints = append(endpoints, resolver.Endpoint{ | ||
| Addresses: []resolver.Address{{Addr: accessNodeBackupAddr}}, | ||
| }) | ||
| } | ||
|
|
||
| mr.InitialState(resolver.State{ | ||
| Endpoints: endpoints, | ||
| }) | ||
|
|
||
| targetHost := fmt.Sprintf("%s:///%s", mr.Scheme(), "flow-access") | ||
| currentSporkClient, err = grpc.NewClient( | ||
| targetHost, | ||
| grpc.WithGRPCDialOptions( | ||
| grpcOpts.WithDefaultCallOptions(grpcOpts.MaxCallRecvMsgSize(DefaultMaxMessageSize)), | ||
| grpcOpts.WithResolvers(mr), | ||
| grpcOpts.WithDefaultServiceConfig(json), | ||
| grpcOpts.WithUnaryInterceptor(retryInterceptor( | ||
| DefaultMaxRetryDelay, | ||
| DefaultRetryDelay, | ||
| )), | ||
| ), | ||
| ) | ||
| } else { | ||
| currentSporkClient, err = grpc.NewClient( | ||
| config.AccessNodeHost, | ||
| grpc.WithGRPCDialOptions( | ||
| grpcOpts.WithDefaultCallOptions(grpcOpts.MaxCallRecvMsgSize(DefaultMaxMessageSize)), | ||
| grpcOpts.WithUnaryInterceptor(retryInterceptor( | ||
| DefaultMaxRetryDelay, | ||
| DefaultRetryDelay, | ||
| )), | ||
| ), | ||
| ) | ||
| } | ||
|
|
||
| if err != nil { | ||
| return nil, fmt.Errorf( | ||
| "failed to create client connection for host: %s, with error: %w", | ||
|
|
@@ -536,7 +581,16 @@ func retryInterceptor(maxDuration, pauseDuration time.Duration) grpcOpts.UnaryCl | |
| return nil | ||
| } | ||
|
|
||
| if status.Code(err) != codes.ResourceExhausted { | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since we added the load balancing config: `{"loadBalancingConfig": [{"pick_first":{}}]}`I removed this error code check entirely. The reason being, if we receive any kind of gRPC error from one of the ANs:
However, I just noticed that the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is there a way to customize the behavior? for instance, if the backend returned Similarly, if the error is
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Sorry about that, but I had a misconception about how these 2 relate to each other. The So the I have added some error handling in |
||
| switch status.Code(err) { | ||
| case codes.Canceled, codes.DeadlineExceeded: | ||
| // these kind of errors are guaranteed to fail all requests, | ||
| // if the source was a local context | ||
| return err | ||
| case codes.ResourceExhausted, codes.OutOfRange, codes.NotFound: | ||
| // when we receive these errors, we pause briefly, so that | ||
| // the next request on the same AN, has a higher chance | ||
| // of success. | ||
| default: | ||
| return err | ||
| } | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -224,23 +224,38 @@ func (r *RPCEventSubscriber) subscribe(ctx context.Context, height uint64) <-cha | |
| // we can get not found when reconnecting after a disconnect/restart before the | ||
| // next block is finalized. just wait briefly and try again | ||
| time.Sleep(200 * time.Millisecond) | ||
| case codes.DeadlineExceeded, codes.Internal: | ||
| case codes.DeadlineExceeded, codes.Internal, codes.Unavailable: | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. During some local testing with having 2 Flow Emulator processes, when I killed the 1st process, which was configured as the main AN( failure in event subscription with: recoverable: disconnected:
error receiving event: rpc error:
code = Unavailable desc = error reading from server: EOFAdding the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am not really sure how [METHOD]: /flow.access.AccessAPI/GetLatestBlock
[METHOD]: /flow.access.AccessAPI/GetAccountAtLatestBlock
[METHOD]: /flow.access.AccessAPI/SendTransaction
[METHOD]: /flow.access.AccessAPI/ExecuteScriptAtLatestBlockI have verified though, that we do need if err := connect(lastReceivedHeight); err != nil {
eventsChan <- models.NewBlockEventsError(
fmt.Errorf(
"failed to resubscribe for events on height: %d, with: %w",
lastReceivedHeight,
err,
),
)
return
}Note that if we trigger a reconnect, this will prompt the |
||
| // these are sometimes returned when the stream is disconnected by a middleware or the server | ||
| default: | ||
| // skip reconnect on all other errors | ||
| eventsChan <- models.NewBlockEventsError(fmt.Errorf("%w: %w", errs.ErrDisconnected, err)) | ||
| return | ||
| } | ||
|
|
||
| if err := connect(lastReceivedHeight + 1); err != nil { | ||
| eventsChan <- models.NewBlockEventsError( | ||
| fmt.Errorf( | ||
| "failed to resubscribe for events on height: %d, with: %w", | ||
| lastReceivedHeight+1, | ||
| err, | ||
| ), | ||
| ) | ||
| return | ||
| start := time.Now() | ||
| attempts := 0 | ||
| pauseDuration, maxDuration := 200*time.Millisecond, 30*time.Second | ||
| // Allow reconnect retries for up to 30 seconds, with retry | ||
| // attempts every 200 ms. | ||
| for { | ||
| err := connect(lastReceivedHeight) | ||
| if err == nil { | ||
| break | ||
| } | ||
|
|
||
| attempts++ | ||
| duration := time.Since(start) | ||
| if duration >= maxDuration { | ||
| eventsChan <- models.NewBlockEventsError( | ||
| fmt.Errorf( | ||
| "failed to resubscribe for events on height: %d, with: %w", | ||
| lastReceivedHeight, | ||
| err, | ||
| ), | ||
| ) | ||
| return | ||
| } | ||
| time.Sleep(pauseDuration) | ||
| } | ||
| } | ||
| } | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now that we are adding load-balancing functionality, should we maybe decrease the max retry duration?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this load balancing or failovers? I'm not aware of a demand for load balancing requests to different backends, but there is clear need for failing over when the primary is unavailable.
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given that we use
pick_firstas the load balancing strategy, effectively this works as a failover mechanism. Because it will stick to the same backend, until that backend is unable to serve any requests (due to connectivity issues), in which case it will pick the next available backend.I only changed the name of the constant, from
DefaultResourceExhaustedMaxRetryDelaytoDefaultMaxRetryDelay, because previously theretryInterceptorwould only retry onResourceExhaustederrors.But I've updated that condition in f30b0b3, to account for more related errors that we can retry on the same AN.