Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 12 additions & 81 deletions app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ import (
"github.com/obolnetwork/charon/app/tracer"
"github.com/obolnetwork/charon/app/version"
"github.com/obolnetwork/charon/app/z"
"github.com/obolnetwork/charon/cluster"
clusterpkg "github.com/obolnetwork/charon/cluster"
"github.com/obolnetwork/charon/cluster/manifest"
manifestpb "github.com/obolnetwork/charon/cluster/manifestpb/v1"
"github.com/obolnetwork/charon/core"
Expand Down Expand Up @@ -118,7 +118,7 @@ type TestConfig struct {
p2p.TestPingConfig

// Lock provides the lock explicitly, skips loading from disk.
Lock *cluster.Lock
Lock *clusterpkg.Lock
// P2PKey provides the p2p privkey explicitly, skips loading from keystore on disk.
P2PKey *k1.PrivateKey
// ParSigExFunc provides an in-memory partial signature exchange.
Expand Down Expand Up @@ -398,14 +398,14 @@ func wireP2P(ctx context.Context, life *lifecycle.Manager, conf Config,

// wireCoreWorkflow wires the core workflow components.
func wireCoreWorkflow(ctx context.Context, life *lifecycle.Manager, conf Config,
cluster *manifestpb.Cluster, nodeIdx cluster.NodeIdx, p2pNode host.Host, p2pKey *k1.PrivateKey,
cluster *manifestpb.Cluster, nodeIdx clusterpkg.NodeIdx, p2pNode host.Host, p2pKey *k1.PrivateKey,
eth2Cl, submissionEth2Cl eth2wrap.Client, peerIDs []peer.ID, sender *p2p.Sender,
consensusDebugger consensus.Debugger, pubkeys []core.PubKey, seenPubkeys func(core.PubKey),
sseListener sse.Listener, vapiCalls func(),
) error {
// Convert and prep public keys and public shares
var (
corePubkeys []core.PubKey
builderRegistrations []clusterpkg.BuilderRegistration
eth2Pubkeys []eth2p0.BLSPubKey
pubshares []eth2p0.BLSPubKey
allPubSharesByKey = make(map[core.PubKey]map[int]tbls.PublicKey) // map[pubkey]map[shareIdx]pubshare
Expand Down Expand Up @@ -445,10 +445,16 @@ func wireCoreWorkflow(ctx context.Context, life *lifecycle.Manager, conf Config,
eth2Pubkey := eth2p0.BLSPubKey(pubkey)

eth2Pubkeys = append(eth2Pubkeys, eth2Pubkey)
corePubkeys = append(corePubkeys, corePubkey)
pubshares = append(pubshares, eth2Share)
allPubSharesByKey[corePubkey] = allPubShares
feeRecipientAddrByCorePubkey[corePubkey] = val.GetFeeRecipientAddress()

var builderRegistration clusterpkg.BuilderRegistration
if err := json.Unmarshal(val.GetBuilderRegistrationJson(), &builderRegistration); err != nil {
return errors.Wrap(err, "unmarshal builder registration")
}

builderRegistrations = append(builderRegistrations, builderRegistration)
}

peers, err := manifest.ClusterPeers(cluster)
Expand All @@ -465,7 +471,7 @@ func wireCoreWorkflow(ctx context.Context, life *lifecycle.Manager, conf Config,
return core.NewDeadliner(ctx, label, deadlineFunc)
}

sched, err := scheduler.New(corePubkeys, eth2Cl, conf.BuilderAPI)
sched, err := scheduler.New(builderRegistrations, eth2Cl, conf.BuilderAPI)
if err != nil {
return err
}
Expand Down Expand Up @@ -625,11 +631,6 @@ func wireCoreWorkflow(ctx context.Context, life *lifecycle.Manager, conf Config,
return err
}

if err = wireRecaster(ctx, eth2Cl, sched, sigAgg, broadcaster, cluster.GetValidators(),
conf.BuilderAPI, conf.TestConfig.BroadcastCallback); err != nil {
return errors.Wrap(err, "wire recaster")
}

track, err := newTracker(ctx, life, deadlineFunc, peers, eth2Cl)
if err != nil {
return err
Expand Down Expand Up @@ -749,76 +750,6 @@ func wirePrioritise(ctx context.Context, conf Config, life *lifecycle.Manager, p
return nil
}

// wireRecaster wires the rebroadcaster component to scheduler, sigAgg and broadcaster.
// This is not done in core.Wire since recaster isn't really part of the official core workflow (yet).
func wireRecaster(ctx context.Context, eth2Cl eth2wrap.Client, sched core.Scheduler, sigAgg core.SigAgg,
broadcaster core.Broadcaster, validators []*manifestpb.Validator, builderAPI bool,
callback func(context.Context, core.Duty, core.SignedDataSet) error,
) error {
recaster, err := bcast.NewRecaster(func(ctx context.Context) (map[eth2p0.BLSPubKey]struct{}, error) {
valList, err := eth2Cl.ActiveValidators(ctx)
if err != nil {
return nil, err
}

ret := make(map[eth2p0.BLSPubKey]struct{})

for _, v := range valList {
ret[v] = struct{}{}
}

return ret, nil
})
if err != nil {
return errors.Wrap(err, "recaster init")
}

sched.SubscribeSlots(recaster.SlotTicked)
sigAgg.Subscribe(recaster.Store)
recaster.Subscribe(broadcaster.Broadcast)

if callback != nil {
recaster.Subscribe(callback)
}

if !builderAPI {
return nil
}

for _, val := range validators {
// Check if the current cluster manifest supports pre-generate validator registrations.
if len(val.GetBuilderRegistrationJson()) == 0 {
continue
}

reg := new(eth2api.VersionedSignedValidatorRegistration)
if err := json.Unmarshal(val.GetBuilderRegistrationJson(), reg); err != nil {
return errors.Wrap(err, "unmarshal validator registration")
}

pubkey, err := core.PubKeyFromBytes(val.GetPublicKey())
if err != nil {
return errors.Wrap(err, "core pubkey from bytes")
}

signedData, err := core.NewVersionedSignedValidatorRegistration(reg)
if err != nil {
return errors.Wrap(err, "new versioned signed validator registration")
}

slot, err := validatorapi.SlotFromTimestamp(ctx, eth2Cl, reg.V1.Message.Timestamp)
if err != nil {
return errors.Wrap(err, "calculate slot from timestamp")
}

if err = recaster.Store(ctx, core.NewBuilderRegistrationDuty(uint64(slot)), core.SignedDataSet{pubkey: signedData}); err != nil {
return errors.Wrap(err, "recaster store registration")
}
}

return nil
}

// newTracker creates and starts a new tracker instance.
func newTracker(ctx context.Context, life *lifecycle.Manager, deadlineFunc func(duty core.Duty) (time.Time, bool),
peers []p2p.Peer, eth2Cl eth2wrap.Client,
Expand Down
2 changes: 1 addition & 1 deletion app/health/checks.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ var checks = []check{
Description: "High rate of failed validator registrations. Please check the logs for more details.",
Severity: severityWarning,
Func: func(q query, _ Metadata) (bool, error) {
increase, err := q("core_bcast_recast_errors_total", sumLabels(), increase)
increase, err := q("core_scheduler_submit_registration_errors_total", sumLabels(), increase)
if err != nil {
return false, err
}
Expand Down
29 changes: 5 additions & 24 deletions app/health/checks_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,9 +333,7 @@ func TestWarnLogsCheck(t *testing.T) {
func TestHighRegistrationFailuresRateCheck(t *testing.T) {
m := Metadata{}
checkName := "high_registration_failures_rate"
metricName := "core_bcast_recast_errors_total"
pregenLabel := genLabels("source", "pregen")
downsteamLabel := genLabels("source", "downstream")
metricName := "core_scheduler_submit_registration_errors_total"

t.Run("no data", func(t *testing.T) {
testCheck(t, m, checkName, false, nil)
Expand All @@ -344,33 +342,16 @@ func TestHighRegistrationFailuresRateCheck(t *testing.T) {
t.Run("same errors count", func(t *testing.T) {
testCheck(t, m, checkName, false,
genFam(metricName,
genGauge(pregenLabel, 1, 1, 1), // No increments
genCounter(genLabels(), 1, 1, 1), // No increments
),
)
})

t.Run("incrementing errors count", func(t *testing.T) {
t.Run("have increasing errors count", func(t *testing.T) {
testCheck(t, m, checkName, true,
genFam(metricName,
genGauge(downsteamLabel, 0, 1, 2, 10),
),
)
})

t.Run("both labels have stable errors count", func(t *testing.T) {
testCheck(t, m, checkName, false,
genFam(metricName,
genGauge(pregenLabel, 1, 1, 1),
genGauge(downsteamLabel, 1, 1, 1),
),
)
})

t.Run("both labels have increasing errors count", func(t *testing.T) {
testCheck(t, m, checkName, true,
genFam(metricName,
genGauge(pregenLabel, 10, 15, 18),
genGauge(downsteamLabel, 1, 2, 3),
genCounter(genLabels(), 10, 15, 18),
genCounter(genLabels(), 1, 2, 3),
),
)
})
Expand Down
58 changes: 2 additions & 56 deletions core/bcast/bcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,28 +213,8 @@ func (b Broadcaster) Broadcast(ctx context.Context, duty core.Duty, set core.Sig
return core.ErrDeprecatedDutyBuilderProposer

case core.DutyBuilderRegistration:
slot, err := firstSlotInCurrentEpoch(ctx, b.eth2Cl)
if err != nil {
return errors.Wrap(err, "calculate first slot in epoch")
}

// Use first slot in current epoch for accurate delay calculations while submitting builder registrations.
// This is because builder registrations are submitted in first slot of every epoch.
duty.Slot = slot

registrations, err := setToRegistrations(set)
if err != nil {
return err
}

err = b.eth2Cl.SubmitValidatorRegistrations(ctx, registrations)
if err == nil {
log.Info(ctx, "Successfully submitted validator registrations to beacon node",
z.Any("delay", b.delayFunc(duty.Slot, core.DutyBuilderRegistration)),
)
}

return err
// Builder registrations are submitted by the scheduler.
return nil

case core.DutyExit:
var err error // Try submitting all exits and return last error.
Expand Down Expand Up @@ -360,21 +340,6 @@ func setToAggAndProof(set core.SignedDataSet) (*eth2api.SubmitAggregateAttestati
return &eth2api.SubmitAggregateAttestationsOpts{SignedAggregateAndProofs: resp}, nil
}

// setToRegistrations converts a set of signed data into a list of registrations.
func setToRegistrations(set core.SignedDataSet) ([]*eth2api.VersionedSignedValidatorRegistration, error) {
var resp []*eth2api.VersionedSignedValidatorRegistration
for _, reg := range set {
reg, ok := reg.(core.VersionedSignedValidatorRegistration)
if !ok {
return nil, errors.New("invalid registration")
}

resp = append(resp, &reg.VersionedSignedValidatorRegistration)
}

return resp, nil
}

// setToOne converts a set of signed data into a single signed data.
func setToOne(set core.SignedDataSet) (core.PubKey, core.SignedData, error) {
if len(set) != 1 {
Expand Down Expand Up @@ -431,25 +396,6 @@ func newDelayFunc(ctx context.Context, eth2Cl eth2wrap.Client) (func(slot uint64
}, nil
}

// firstSlotInCurrentEpoch calculates first slot number of the current ongoing epoch.
func firstSlotInCurrentEpoch(ctx context.Context, eth2Cl eth2wrap.Client) (uint64, error) {
genesisTime, err := eth2wrap.FetchGenesisTime(ctx, eth2Cl)
if err != nil {
return 0, err
}

slotDuration, slotsPerEpoch, err := eth2wrap.FetchSlotsConfig(ctx, eth2Cl)
if err != nil {
return 0, err
}

chainAge := time.Since(genesisTime)
currentSlot := chainAge / slotDuration
currentEpoch := uint64(currentSlot) / slotsPerEpoch

return currentEpoch * slotsPerEpoch, nil
}

// resolveActiveValidatorsIndices returns the active validators (including their validator index) for the slot.
func resolveActiveValidatorsIndices(ctx context.Context, eth2Cl eth2wrap.Client, epoch eth2p0.Epoch) ([]eth2p0.ValidatorIndex, error) {
eth2Resp, err := eth2Cl.CompleteValidators(ctx)
Expand Down
24 changes: 0 additions & 24 deletions core/bcast/bcast_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ func TestBroadcast(t *testing.T) {
attData, // Attestation
proposalData, // BeaconBlock
blindedProposalData, // BlindedBlock
validatorRegistrationData, // ValidatorRegistration
validatorExitData, // ValidatorExit
aggregateAttestationData, // AggregateAttestation
beaconCommitteeSelections, // BeaconCommitteeSelections
Expand Down Expand Up @@ -188,29 +187,6 @@ func blindedProposalData(t *testing.T, mock *beaconmock.Mock) test {
}
}

func validatorRegistrationData(t *testing.T, mock *beaconmock.Mock) test {
t.Helper()

asserted := make(chan struct{})
registration := testutil.RandomCoreVersionedSignedValidatorRegistration(t).VersionedSignedValidatorRegistration
aggData := core.VersionedSignedValidatorRegistration{VersionedSignedValidatorRegistration: registration}

mock.SubmitValidatorRegistrationsFunc = func(ctx context.Context, registrations []*eth2api.VersionedSignedValidatorRegistration) error {
require.Equal(t, aggData.VersionedSignedValidatorRegistration, *registrations[0])
close(asserted)

return nil
}

return test{
name: "Broadcast Validator Registration",
aggData: aggData,
duty: core.DutyBuilderRegistration,
bcastCnt: 1,
asserted: asserted,
}
}

func validatorExitData(t *testing.T, mock *beaconmock.Mock) test {
t.Helper()

Expand Down
21 changes: 0 additions & 21 deletions core/bcast/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,27 +26,6 @@ var (
Help: "Duty broadcast delay since the expected duty submission in seconds by type",
Buckets: []float64{.05, .1, .25, .5, 1, 2.5, 5, 10, 20, 30, 60},
}, []string{"duty"})

recastRegistrationCounter = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "core",
Subsystem: "bcast",
Name: "recast_registration_total",
Help: "The total number of unique validator registration stored in recaster per pubkey",
}, []string{"pubkey"})

recastTotal = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "core",
Subsystem: "bcast",
Name: "recast_total",
Help: "The total count of recasted registrations by source; 'pregen' vs 'downstream'",
}, []string{"source"})

recastErrors = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "core",
Subsystem: "bcast",
Name: "recast_errors_total",
Help: "The total count of failed recasted registrations by source; 'pregen' vs 'downstream'",
}, []string{"source"})
)

// instrumentDuty increments the duty counter.
Expand Down
Loading
Loading