Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 15 additions & 5 deletions core/consensus/qbft/qbft.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"github.com/obolnetwork/charon/app/errors"
"github.com/obolnetwork/charon/app/featureset"
"github.com/obolnetwork/charon/app/log"
"github.com/obolnetwork/charon/app/tracer"
"github.com/obolnetwork/charon/app/z"
"github.com/obolnetwork/charon/core"
"github.com/obolnetwork/charon/core/consensus/instance"
Expand Down Expand Up @@ -514,8 +513,14 @@ func (c *Consensus) runInstance(parent context.Context, duty core.Duty) (err err
inst.ErrCh <- err // Send resulting error to errCh.
}()

var span trace.Span

ctx, span = core.StartDutyTrace(ctx, duty, "core/qbft.runInstance")

if !c.deadliner.Add(duty) {
span.AddEvent("Expired Duty Skipped")
log.Warn(ctx, "Skipping consensus for expired duty", nil)

return nil
}

Expand All @@ -527,12 +532,8 @@ func (c *Consensus) runInstance(parent context.Context, duty core.Duty) (err err
var (
decided bool
nodes = len(c.peers)
span trace.Span
)

_, span = tracer.Start(ctx, "qbft.runInstance")
span.SetAttributes(attribute.String("duty", duty.Type.String()))

defer func() {
if err != nil && !isContextErr(err) {
span.RecordError(err)
Expand Down Expand Up @@ -573,6 +574,15 @@ func (c *Consensus) runInstance(parent context.Context, duty core.Duty) (err err

// Create a new qbft definition for this instance.
def := newDefinition(len(c.peers), c.subscribers, roundTimer, decideCallback, c.compareAttestations)
origLogRoundChange := def.LogRoundChange
def.LogRoundChange = func(ctx context.Context, instance core.Duty, process, round, newRound int64, uponRule qbft.UponRule, msgs []qbft.Msg[core.Duty, [32]byte, proto.Message]) {
if origLogRoundChange != nil {
origLogRoundChange(ctx, instance, process, round, newRound, uponRule, msgs)
}

span.AddEvent("Round Changed")
span.SetAttributes(attribute.Int64("new_round", newRound))
}

// Create a new transport that handles sending and receiving for this instance.
t := newTransport(c, c.privkey, inst.ValueCh, make(chan qbft.Msg[core.Duty, [32]byte, proto.Message]), newSniffer(int64(def.Nodes), peerIdx))
Expand Down
11 changes: 8 additions & 3 deletions core/fetcher/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
eth2api "github.com/attestantio/go-eth2-client/api"
eth2spec "github.com/attestantio/go-eth2-client/spec"
eth2p0 "github.com/attestantio/go-eth2-client/spec/phase0"
"go.opentelemetry.io/otel/trace"

"github.com/obolnetwork/charon/app/errors"
"github.com/obolnetwork/charon/app/eth2wrap"
Expand Down Expand Up @@ -54,10 +55,14 @@ func (f *Fetcher) Subscribe(fn func(context.Context, core.Duty, core.UnsignedDat
// Fetch triggers fetching of a proposed duty data set.
func (f *Fetcher) Fetch(ctx context.Context, duty core.Duty, defSet core.DutyDefinitionSet) error {
var (
span trace.Span
unsignedSet core.UnsignedDataSet
err error
)

ctx, span = core.StartDutyTrace(ctx, duty, "core/fetcher.Fetch")
defer span.End()

switch duty.Type {
case core.DutyProposer:
unsignedSet, err = f.fetchProposerData(ctx, duty.Slot, defSet)
Expand Down Expand Up @@ -302,10 +307,10 @@ func (f *Fetcher) fetchProposerData(ctx context.Context, slot uint64, defSet cor
return nil, errors.Wrap(err, "new proposal")
}

// Track whether the fetched proposal is blinded (built by MEV builder, 1) or local (built by beacon node, 0)
var blinded float64
// Track whether the fetched proposal is blinded (built by MEV builder, 1) or local (built by beacon node, 2)
blinded := 2.0
if proposal.Blinded {
blinded = 1
blinded = 1.0
}
proposalBlindedGauge.Set(blinded)

Expand Down
14 changes: 6 additions & 8 deletions core/fetcher/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,9 @@ import (
"github.com/obolnetwork/charon/app/promauto"
)

var (
proposalBlindedGauge = promauto.NewGauge(prometheus.GaugeOpts{
Namespace: "core",
Subsystem: "fetcher",
Name: "proposal_blinded",
Help: "Whether the fetched proposal was blinded (1) or local (0)",
})
)
var proposalBlindedGauge = promauto.NewGauge(prometheus.GaugeOpts{
Namespace: "core",
Subsystem: "fetcher",
Name: "proposal_blinded",
Help: "Whether the fetched proposal was blinded (1) or local (2)",
})
17 changes: 9 additions & 8 deletions core/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,28 +230,29 @@ func (s *Scheduler) scheduleSlot(ctx context.Context, slot core.Slot) {
Type: dutyType,
}

var span trace.Span

dutyCtx := log.WithCtx(ctx, z.Any("duty", duty))

dutyCtx, span = core.StartDutyTrace(dutyCtx, duty, "core/scheduler.scheduleSlot")

defSet, ok := s.getDutyDefinitionSet(duty)
if !ok {
span.End()
// Nothing for this duty.
continue
}

// Trigger duty async
go func() {
defer span.End()

if !delaySlotOffset(ctx, slot, duty, s.delayFunc) {
return // context cancelled
}

instrumentDuty(duty, defSet)

dutyCtx := log.WithCtx(ctx, z.Any("duty", duty))
if duty.Type == core.DutyProposer {
var span trace.Span

dutyCtx, span = core.StartDutyTrace(dutyCtx, duty, "core/scheduler.scheduleSlot")
defer span.End()
}

for _, sub := range s.dutySubs {
clone, err := defSet.Clone() // Clone for each subscriber.
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions core/validatorapi/router_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ func (a addr) Address() string {
func TestProxyShutdown(t *testing.T) {
// Start a server that will block until the request is cancelled.
serving := make(chan struct{})

target := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
close(serving)
<-r.Context().Done()
Expand Down
62 changes: 60 additions & 2 deletions core/validatorapi/validatorapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@ import (
"github.com/attestantio/go-eth2-client/spec/altair"
eth2p0 "github.com/attestantio/go-eth2-client/spec/phase0"
ssz "github.com/ferranbt/fastssz"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"

"github.com/obolnetwork/charon/app/errors"
"github.com/obolnetwork/charon/app/eth2wrap"
"github.com/obolnetwork/charon/app/log"
"github.com/obolnetwork/charon/app/tracer"
"github.com/obolnetwork/charon/app/version"
"github.com/obolnetwork/charon/app/z"
"github.com/obolnetwork/charon/core"
Expand All @@ -33,7 +35,6 @@ import (

const (
defaultGasLimit = 30000000
zeroAddress = "0x0000000000000000000000000000000000000000"
)

// SlotFromTimestamp returns the Ethereum slot associated to a timestamp, given the genesis configuration fetched
Expand Down Expand Up @@ -260,6 +261,14 @@ func (c *Component) Subscribe(fn func(context.Context, core.Duty, core.ParSigned

// AttestationData implements the eth2client.AttesterDutiesProvider for the router.
func (c Component) AttestationData(ctx context.Context, opts *eth2api.AttestationDataOpts) (*eth2api.Response[*eth2p0.AttestationData], error) {
var span trace.Span

duty := core.NewAttesterDuty(uint64(opts.Slot))
ctx, span = core.StartDutyTrace(ctx, duty, "core/validatorapi.AttestationData")

span.SetAttributes(attribute.Int64("committee_index", int64(opts.CommitteeIndex)))
defer span.End()

att, err := c.awaitAttFunc(ctx, uint64(opts.Slot), uint64(opts.CommitteeIndex))
if err != nil {
return nil, err
Expand All @@ -270,6 +279,13 @@ func (c Component) AttestationData(ctx context.Context, opts *eth2api.Attestatio

// SubmitAttestations implements the eth2client.AttestationsSubmitter for the router.
func (c Component) SubmitAttestations(ctx context.Context, attestationOpts *eth2api.SubmitAttestationsOpts) error {
var span trace.Span

ctx, span = tracer.Start(ctx, "core/validatorapi.SubmitAttestations")

span.SetAttributes(attribute.Int("num_attestations", len(attestationOpts.Attestations)))
defer span.End()

attestations := attestationOpts.Attestations
setsBySlot := make(map[uint64]core.ParSignedDataSet)

Expand Down Expand Up @@ -386,6 +402,13 @@ func (c Component) SubmitAttestations(ctx context.Context, attestationOpts *eth2
}

func (c Component) Proposal(ctx context.Context, opts *eth2api.ProposalOpts) (*eth2api.Response[*eth2api.VersionedProposal], error) {
var span trace.Span

duty := core.NewRandaoDuty(uint64(opts.Slot))

ctx, span = core.StartDutyTrace(ctx, duty, "core/validatorapi.Proposal")
defer span.End()

// Get proposer pubkey (this is a blocking query).
pubkey, err := c.getProposerPubkey(ctx, core.NewProposerDuty(uint64(opts.Slot)))
if err != nil {
Expand All @@ -402,7 +425,6 @@ func (c Component) Proposal(ctx context.Context, opts *eth2api.ProposalOpts) (*e
Signature: opts.RandaoReveal,
}

duty := core.NewRandaoDuty(uint64(opts.Slot))
parSig := core.NewPartialSignedRandao(sigEpoch.Epoch, sigEpoch.Signature, c.shareIdx)

// Verify randao signature
Expand Down Expand Up @@ -866,6 +888,14 @@ func (c Component) BeaconCommitteeSelections(ctx context.Context, opts *eth2api.
// AggregateAttestation returns the aggregate attestation for the given attestation root.
// It does a blocking query to DutyAggregator unsigned data from dutyDB.
func (c Component) AggregateAttestation(ctx context.Context, opts *eth2api.AggregateAttestationOpts) (*eth2api.Response[*eth2spec.VersionedAttestation], error) {
var span trace.Span

duty := core.NewAggregatorDuty(uint64(opts.Slot))
ctx, span = core.StartDutyTrace(ctx, duty, "core/validatorapi.AggregateAttestation")

span.SetAttributes(attribute.Int64("committee_index", int64(opts.CommitteeIndex)))
defer span.End()

aggAtt, err := c.awaitAggAttFunc(ctx, uint64(opts.Slot), opts.AttestationDataRoot)
if err != nil {
return nil, err
Expand All @@ -878,6 +908,13 @@ func (c Component) AggregateAttestation(ctx context.Context, opts *eth2api.Aggre
// - It verifies partial signature on AggregateAndProof.
// - It then calls all the subscribers for further steps on partially signed aggregate and proof.
func (c Component) SubmitAggregateAttestations(ctx context.Context, opts *eth2api.SubmitAggregateAttestationsOpts) error {
var span trace.Span

ctx, span = tracer.Start(ctx, "core/validatorapi.SubmitAggregateAttestations")

span.SetAttributes(attribute.Int("num_aggregates", len(opts.SignedAggregateAndProofs)))
defer span.End()

aggsAndProofs := opts.SignedAggregateAndProofs

vals, err := c.eth2Cl.ActiveValidators(ctx)
Expand Down Expand Up @@ -1139,6 +1176,13 @@ func (c Component) SyncCommitteeSelections(ctx context.Context, opts *eth2api.Sy

// ProposerDuties obtains proposer duties for the given options.
func (c Component) ProposerDuties(ctx context.Context, opts *eth2api.ProposerDutiesOpts) (*eth2api.Response[[]*eth2v1.ProposerDuty], error) {
var span trace.Span

ctx, span = tracer.Start(ctx, "core/validatorapi.ProposerDuties")

span.SetAttributes(attribute.Int64("epoch", int64(opts.Epoch)))
defer span.End()

eth2Resp, err := c.eth2Cl.ProposerDuties(ctx, opts)
if err != nil {
return nil, err
Expand All @@ -1165,6 +1209,13 @@ func (c Component) ProposerDuties(ctx context.Context, opts *eth2api.ProposerDut
}

func (c Component) AttesterDuties(ctx context.Context, opts *eth2api.AttesterDutiesOpts) (*eth2api.Response[[]*eth2v1.AttesterDuty], error) {
var span trace.Span

ctx, span = tracer.Start(ctx, "core/validatorapi.AttesterDuties")

span.SetAttributes(attribute.Int64("epoch", int64(opts.Epoch)))
defer span.End()

eth2Resp, err := c.eth2Cl.AttesterDuties(ctx, opts)
if err != nil {
return nil, err
Expand Down Expand Up @@ -1216,6 +1267,13 @@ func (c Component) SyncCommitteeDuties(ctx context.Context, opts *eth2api.SyncCo
}

func (c Component) Validators(ctx context.Context, opts *eth2api.ValidatorsOpts) (*eth2api.Response[map[eth2p0.ValidatorIndex]*eth2v1.Validator], error) {
var span trace.Span

ctx, span = tracer.Start(ctx, "core/validatorapi.Validators")

span.SetAttributes(attribute.String("state", opts.State))
defer span.End()

if len(opts.PubKeys) == 0 && len(opts.Indices) == 0 {
// fetch all validators
eth2Resp, err := c.eth2Cl.Validators(ctx, opts)
Expand Down
2 changes: 1 addition & 1 deletion docs/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ when storing metrics from multiple nodes or clusters in one Prometheus instance.
| `core_consensus_duration_seconds` | Histogram | Duration of the consensus process by protocol, duty, and timer | `protocol, duty, timer` |
| `core_consensus_error_total` | Counter | Total count of consensus errors by protocol | `protocol` |
| `core_consensus_timeout_total` | Counter | Total count of consensus timeouts by protocol, duty, and timer | `protocol, duty, timer` |
| `core_fetcher_proposal_blinded` | Gauge | Whether the fetched proposal was blinded (1) or local (0) | |
| `core_fetcher_proposal_blinded` | Gauge | Whether the fetched proposal was blinded (1) or local (2) | |
| `core_parsigdb_exit_total` | Counter | Total number of partially signed voluntary exits per public key | `pubkey` |
| `core_scheduler_current_epoch` | Gauge | The current epoch | |
| `core_scheduler_current_slot` | Gauge | The current slot | |
Expand Down
Loading