Skip to content

Commit

Permalink
*: integrating go-eth2-client@v0.21.1 (#2986)
Browse files Browse the repository at this point in the history
Integrating the new version of go-eth2-client, resolving breaking changes, etc.

Known issues: ~attestantio/go-eth2-client#118 & ~attestantio/go-eth2-client#119

TODOs
- [x] Integrate v0.21.0
- [x] Integrate v0.21.1 (with bugfixes)
- [x] Test with simnet
- [x] Improve test coverage
- [x] Investigate latest teku VC not querying BN

category: feature
ticket: #2936
  • Loading branch information
pinebit authored May 6, 2024
1 parent aaf1730 commit 11e3000
Show file tree
Hide file tree
Showing 65 changed files with 3,880 additions and 2,071 deletions.
2 changes: 1 addition & 1 deletion app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -437,7 +437,7 @@ func wireCoreWorkflow(ctx context.Context, life *lifecycle.Manager, conf Config,
return err
}

fetch, err := fetcher.New(eth2Cl, feeRecipientFunc)
fetch, err := fetcher.New(eth2Cl, feeRecipientFunc, mutableConf.BuilderAPI)
if err != nil {
return err
}
Expand Down
151 changes: 1 addition & 150 deletions app/eth2wrap/eth2wrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"github.com/obolnetwork/charon/app/forkjoin"
"github.com/obolnetwork/charon/app/promauto"
"github.com/obolnetwork/charon/app/z"
"github.com/obolnetwork/charon/eth2util/eth2exp"
)

//go:generate go run genwrap/genwrap.go
Expand Down Expand Up @@ -80,6 +79,7 @@ func NewMultiHTTP(timeout time.Duration, addresses ...string) (Client, error) {
eth2http.WithLogLevel(zeroLogInfo),
eth2http.WithAddress(address),
eth2http.WithTimeout(timeout),
eth2http.WithAllowDelayedStart(true),
)
if err != nil {
return nil, wrapError(ctx, err, "new eth2 client", z.Str("address", address))
Expand All @@ -98,155 +98,6 @@ func NewMultiHTTP(timeout time.Duration, addresses ...string) (Client, error) {
return Instrument(clients...)
}

func newMulti(clients []Client) Client {
return multi{
clients: clients,
selector: newBestSelector(bestPeriod),
}
}

// multi implements Client by wrapping multiple clients, calling them in parallel
// and returning the first successful response.
// It also adds prometheus metrics and error wrapping.
// It also implements a best client selector.
type multi struct {
clients []Client
selector *bestSelector
}

func (multi) Name() string {
return "eth2wrap.multi"
}

func (m multi) Address() string {
address, ok := m.selector.BestAddress()
if !ok {
return m.clients[0].Address()
}

return address
}

func (m multi) SetValidatorCache(valCache func(context.Context) (ActiveValidators, error)) {
for _, cl := range m.clients {
cl.SetValidatorCache(valCache)
}
}

func (m multi) SetForkVersion(forkVersion [4]byte) {
for _, c := range m.clients {
c.SetForkVersion(forkVersion)
}
}

func (m multi) ActiveValidators(ctx context.Context) (ActiveValidators, error) {
const label = "active_validators"
// No latency since this is a cached endpoint.

res0, err := provide(ctx, m.clients,
func(ctx context.Context, cl Client) (ActiveValidators, error) {
return cl.ActiveValidators(ctx)
},
nil, nil,
)
if err != nil {
incError(label)
err = wrapError(ctx, err, label)
}

return res0, err
}

func (m multi) ProposerConfig(ctx context.Context) (*eth2exp.ProposerConfigResponse, error) {
const label = "proposer_config"
defer latency(label)()

res0, err := provide(ctx, m.clients,
func(ctx context.Context, cl Client) (*eth2exp.ProposerConfigResponse, error) {
return cl.ProposerConfig(ctx)
},
nil, m.selector,
)
if err != nil {
incError(label)
err = wrapError(ctx, err, label)
}

return res0, err
}

func (m multi) AggregateBeaconCommitteeSelections(ctx context.Context, selections []*eth2exp.BeaconCommitteeSelection) ([]*eth2exp.BeaconCommitteeSelection, error) {
const label = "aggregate_beacon_committee_selections"
defer latency(label)()

res0, err := provide(ctx, m.clients,
func(ctx context.Context, cl Client) ([]*eth2exp.BeaconCommitteeSelection, error) {
return cl.AggregateBeaconCommitteeSelections(ctx, selections)
},
nil, m.selector,
)
if err != nil {
incError(label)
err = wrapError(ctx, err, label)
}

return res0, err
}

func (m multi) AggregateSyncCommitteeSelections(ctx context.Context, selections []*eth2exp.SyncCommitteeSelection) ([]*eth2exp.SyncCommitteeSelection, error) {
const label = "aggregate_sync_committee_selections"
defer latency(label)()

res, err := provide(ctx, m.clients,
func(ctx context.Context, cl Client) ([]*eth2exp.SyncCommitteeSelection, error) {
return cl.AggregateSyncCommitteeSelections(ctx, selections)
},
nil, m.selector,
)
if err != nil {
incError(label)
err = wrapError(ctx, err, label)
}

return res, err
}

func (m multi) BlockAttestations(ctx context.Context, stateID string) ([]*eth2p0.Attestation, error) {
const label = "block_attestations"
defer latency(label)()

res, err := provide(ctx, m.clients,
func(ctx context.Context, cl Client) ([]*eth2p0.Attestation, error) {
return cl.BlockAttestations(ctx, stateID)
},
nil, m.selector,
)
if err != nil {
incError(label)
err = wrapError(ctx, err, label)
}

return res, err
}

func (m multi) NodePeerCount(ctx context.Context) (int, error) {
const label = "node_peer_count"
defer latency(label)()

res, err := provide(ctx, m.clients,
func(ctx context.Context, cl Client) (int, error) {
return cl.NodePeerCount(ctx)
},
nil, m.selector,
)
if err != nil {
incError(label)
err = wrapError(ctx, err, label)
}

return res, err
}

// provide calls the work function with each client in parallel, returning the
// first successful result or first error.
// The bestIdxFunc is called with the index of the client returning a successful response.
Expand Down
47 changes: 8 additions & 39 deletions app/eth2wrap/eth2wrap_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 7 additions & 4 deletions app/eth2wrap/eth2wrap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,14 +167,15 @@ func TestSyncState(t *testing.T) {

func TestErrors(t *testing.T) {
ctx := context.Background()

t.Run("network dial error", func(t *testing.T) {
cl, err := eth2wrap.NewMultiHTTP(time.Hour, "localhost:22222")
require.NoError(t, err)

_, err = cl.SlotsPerEpoch(ctx)
log.Error(ctx, "See this error log for fields", err)
require.Error(t, err)
require.ErrorContains(t, err, "beacon api new eth2 client: network operation error: dial: connect: connection refused")
require.ErrorContains(t, err, "beacon api slots_per_epoch: client is not active")
})

// Test http server that just hangs until request cancelled
Expand All @@ -189,7 +190,7 @@ func TestErrors(t *testing.T) {
_, err = cl.SlotsPerEpoch(ctx)
log.Error(ctx, "See this error log for fields", err)
require.Error(t, err)
require.ErrorContains(t, err, "beacon api new eth2 client: http request timeout: context deadline exceeded")
require.ErrorContains(t, err, "beacon api slots_per_epoch: client is not active")
})

t.Run("caller cancelled", func(t *testing.T) {
Expand Down Expand Up @@ -226,7 +227,7 @@ func TestErrors(t *testing.T) {
bmock.SignedBeaconBlockFunc = func(_ context.Context, blockID string) (*eth2spec.VersionedSignedBeaconBlock, error) {
return nil, &eth2api.Error{
Method: http.MethodGet,
Endpoint: fmt.Sprintf("/eth/v2/beacon/blocks/%s", blockID),
Endpoint: "/eth/v3/beacon/blocks/" + blockID,
StatusCode: http.StatusNotFound,
Data: []byte(fmt.Sprintf(`{"code":404,"message":"NOT_FOUND: beacon block at slot %s","stacktraces":[]}`, blockID)),
}
Expand Down Expand Up @@ -373,6 +374,9 @@ func TestOnlyTimeout(t *testing.T) {
require.Fail(t, "Expect this only to return after main ctx cancelled")
}()

// Allow the above goroutine to block on the .Spec() call.
time.Sleep(10 * time.Millisecond)

// testCtxCancel tests that no concurrent calls block if the user cancels the context.
testCtxCancel := func(t *testing.T, timeout time.Duration) {
t.Helper()
Expand Down Expand Up @@ -426,7 +430,6 @@ func TestLazy(t *testing.T) {
// Both proxies are disabled, so this should fail.
_, err = eth2Cl.NodeSyncing(ctx, nil)
require.Error(t, err)
require.Equal(t, "", eth2Cl.Address())

enabled1.Store(true)

Expand Down
30 changes: 30 additions & 0 deletions app/eth2wrap/lazy.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,18 @@ import (
"github.com/obolnetwork/charon/eth2util/eth2exp"
)

//go:generate mockery --name=Client --output=mocks --outpkg=mocks --case=underscore

// NewLazyForT creates a new lazy client for testing.
func NewLazyForT(client Client) Client {
return &lazy{
provider: func(context.Context) (Client, error) {
return client, nil
},
client: client,
}
}

// newLazy creates a new lazy client.
func newLazy(provider func(context.Context) (Client, error)) *lazy {
return &lazy{
Expand Down Expand Up @@ -107,6 +119,24 @@ func (l *lazy) Address() string {
return cl.Address()
}

func (l *lazy) IsActive() bool {
cl, ok := l.getClient()
if !ok {
return false
}

return cl.IsActive()
}

func (l *lazy) IsSynced() bool {
cl, ok := l.getClient()
if !ok {
return false
}

return cl.IsSynced()
}

func (l *lazy) ActiveValidators(ctx context.Context) (ActiveValidators, error) {
cl, err := l.getOrCreateClient(ctx)
if err != nil {
Expand Down
Loading

0 comments on commit 11e3000

Please sign in to comment.