Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

app/eth2wrap: fallback beacon nodes #3342

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 65 additions & 33 deletions app/eth2wrap/eth2wrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,21 @@
return nil, errors.New("clients empty")
}

return newMulti(clients), nil
// TODO(gsora): remove once the implementation is agreed upon and
// wiring is complete.
fb := NewFallbackClient(0, [4]byte{}, nil)

return newMulti(clients, fb), nil
}

// InstrumentWithFallback returns a new multi instrumented client using the provided clients as backends and fallback
// respectively.
func InstrumentWithFallback(fallback *FallbackClient, clients ...Client) (Client, error) {
if len(clients) == 0 {
return nil, errors.New("clients empty")
}

Check warning on line 72 in app/eth2wrap/eth2wrap.go

View check run for this annotation

Codecov / codecov/patch

app/eth2wrap/eth2wrap.go#L69-L72

Added lines #L69 - L72 were not covered by tests

return newMulti(clients, fallback), nil

Check warning on line 74 in app/eth2wrap/eth2wrap.go

View check run for this annotation

Codecov / codecov/patch

app/eth2wrap/eth2wrap.go#L74

Added line #L74 was not covered by tests
}

// WithSyntheticDuties wraps the provided client adding synthetic duties.
Expand All @@ -71,43 +85,58 @@

// NewMultiHTTP returns a new instrumented multi eth2 http client.
func NewMultiHTTP(timeout time.Duration, forkVersion [4]byte, addresses ...string) (Client, error) {
return Instrument(newClients(timeout, forkVersion, addresses)...)
}

// newClients returns a slice of Client initialized with the provided settings.
func newClients(timeout time.Duration, forkVersion [4]byte, addresses []string) []Client {
var clients []Client
for _, address := range addresses {
parameters := []eth2http.Parameter{
eth2http.WithLogLevel(zeroLogInfo),
eth2http.WithAddress(address),
eth2http.WithTimeout(timeout),
eth2http.WithAllowDelayedStart(true),
eth2http.WithEnforceJSON(featureset.Enabled(featureset.JSONRequests)),
}
clients = append(clients, newBeaconClient(timeout, forkVersion, address))
}

cl := newLazy(func(ctx context.Context) (Client, error) {
eth2Svc, err := eth2http.New(ctx, parameters...)
if err != nil {
return nil, wrapError(ctx, err, "new eth2 client", z.Str("address", address))
}
eth2Http, ok := eth2Svc.(*eth2http.Service)
if !ok {
return nil, errors.New("invalid eth2 http service")
}
return clients
}

adaptedCl := AdaptEth2HTTP(eth2Http, timeout)
adaptedCl.SetForkVersion(forkVersion)
// newBeaconClient returns a Client with the provided settings.
func newBeaconClient(timeout time.Duration, forkVersion [4]byte, address string) Client {
parameters := []eth2http.Parameter{
eth2http.WithLogLevel(zeroLogInfo),
eth2http.WithAddress(address),
eth2http.WithTimeout(timeout),
eth2http.WithAllowDelayedStart(true),
eth2http.WithEnforceJSON(featureset.Enabled(featureset.JSONRequests)),
}

return adaptedCl, nil
})
cl := newLazy(func(ctx context.Context) (Client, error) {
eth2Svc, err := eth2http.New(ctx, parameters...)
if err != nil {
return nil, wrapError(ctx, err, "new eth2 client", z.Str("address", address))
}

Check warning on line 115 in app/eth2wrap/eth2wrap.go

View check run for this annotation

Codecov / codecov/patch

app/eth2wrap/eth2wrap.go#L114-L115

Added lines #L114 - L115 were not covered by tests
eth2Http, ok := eth2Svc.(*eth2http.Service)
if !ok {
return nil, errors.New("invalid eth2 http service")
}

Check warning on line 119 in app/eth2wrap/eth2wrap.go

View check run for this annotation

Codecov / codecov/patch

app/eth2wrap/eth2wrap.go#L118-L119

Added lines #L118 - L119 were not covered by tests

clients = append(clients, cl)
}
adaptedCl := AdaptEth2HTTP(eth2Http, timeout)
adaptedCl.SetForkVersion(forkVersion)

return adaptedCl, nil
})

return Instrument(clients...)
return cl
}

type provideArgs struct {
client Client
fallback *FallbackClient
}

// provide calls the work function with each client in parallel, returning the
// first successful result or first error.
// The bestIdxFunc is called with the index of the client returning a successful response.
func provide[O any](ctx context.Context, clients []Client,
work forkjoin.Work[Client, O], isSuccessFunc func(O) bool, bestSelector *bestSelector,
func provide[O any](ctx context.Context, clients []Client, fallback *FallbackClient,
work forkjoin.Work[provideArgs, O], isSuccessFunc func(O) bool, bestSelector *bestSelector,
) (O, error) {
if isSuccessFunc == nil {
isSuccessFunc = func(O) bool { return true }
Expand All @@ -118,12 +147,15 @@
forkjoin.WithWorkers(len(clients)),
)
for _, client := range clients {
fork(client)
fork(provideArgs{
client: client,
fallback: fallback,
})
}
defer cancel()

var (
nokResp forkjoin.Result[Client, O]
nokResp forkjoin.Result[provideArgs, O]
hasNokResp bool
zero O
)
Expand All @@ -132,7 +164,7 @@
return zero, ctx.Err()
} else if res.Err == nil && isSuccessFunc(res.Output) {
if bestSelector != nil {
bestSelector.Increment(res.Input.Address())
bestSelector.Increment(res.Input.client.Address())
}

return res.Output, nil
Expand All @@ -154,10 +186,10 @@
type empty struct{}

// submit proxies provide, but returns nil instead of a successful result.
func submit(ctx context.Context, clients []Client, work func(context.Context, Client) error, selector *bestSelector) error {
_, err := provide(ctx, clients,
func(ctx context.Context, cl Client) (empty, error) {
return empty{}, work(ctx, cl)
func submit(ctx context.Context, clients []Client, fallback *FallbackClient, work func(context.Context, provideArgs) error, selector *bestSelector) error {
_, err := provide(ctx, clients, fallback,
func(ctx context.Context, args provideArgs) (empty, error) {
return empty{}, work(ctx, args)
},
nil, selector,
)
Expand Down
Loading
Loading