Skip to content

Commit

Permalink
try a different design
Browse files Browse the repository at this point in the history
  • Loading branch information
rkapka committed Dec 23, 2024
1 parent abf7e6d commit 023c99d
Show file tree
Hide file tree
Showing 15 changed files with 604 additions and 280 deletions.
6 changes: 1 addition & 5 deletions beacon-chain/rpc/eth/beacon/handlers_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,11 +336,7 @@ func (s *Server) handleAttestationsElectra(
failedBroadcasts = append(failedBroadcasts, strconv.Itoa(i))
continue
}
committeeIndex, err := att.GetCommitteeIndex()
if err != nil {
return nil, nil, errors.Wrap(err, "failed to retrieve attestation committee index")
}
subnet := corehelpers.ComputeSubnetFromCommitteeAndSlot(uint64(len(vals)), committeeIndex, att.Data.Slot)
subnet := corehelpers.ComputeSubnetFromCommitteeAndSlot(uint64(len(vals)), att.GetCommitteeIndex(), att.Data.Slot)
if err = s.Broadcaster.BroadcastAttestation(ctx, subnet, att); err != nil {
log.WithError(err).Errorf("could not broadcast attestation at index %d", i)
failedBroadcasts = append(failedBroadcasts, strconv.Itoa(i))
Expand Down
6 changes: 1 addition & 5 deletions beacon-chain/rpc/eth/validator/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,11 +189,7 @@ func matchingAtts(atts []ethpbalpha.Att, slot primitives.Slot, attDataRoot []byt
// compare the committee index separately.
if postElectra {
if att.Version() >= version.Electra {
ci, err := att.GetCommitteeIndex()
if err != nil {
return nil, err
}
if ci != index {
if att.GetCommitteeIndex() != index {
continue
}
} else {
Expand Down
5 changes: 1 addition & 4 deletions beacon-chain/rpc/prysm/v1alpha1/validator/attester.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,7 @@ func (vs *Server) ProposeAttestationElectra(ctx context.Context, att *ethpb.Sing
if err != nil {
return nil, status.Error(codes.Internal, "Could not get target state")
}
committeeIndex, err := att.GetCommitteeIndex()
if err != nil {
return nil, status.Error(codes.Internal, "Could not get committee index")
}
committeeIndex := att.GetCommitteeIndex()
committee, err := helpers.BeaconCommitteeFromState(ctx, targetState, att.Data.Slot, committeeIndex)
if err != nil {
return nil, status.Error(codes.Internal, "Could not get committee")
Expand Down
2 changes: 0 additions & 2 deletions beacon-chain/sync/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ go_library(
"validate_aggregate_proof.go",
"validate_attester_slashing.go",
"validate_beacon_attestation.go",
"validate_beacon_attestation_electra.go",
"validate_beacon_blocks.go",
"validate_blob.go",
"validate_bls_to_execution_change.go",
Expand Down Expand Up @@ -179,7 +178,6 @@ go_test(
"sync_test.go",
"validate_aggregate_proof_test.go",
"validate_attester_slashing_test.go",
"validate_beacon_attestation_electra_test.go",
"validate_beacon_attestation_test.go",
"validate_beacon_blocks_test.go",
"validate_blob_test.go",
Expand Down
12 changes: 10 additions & 2 deletions beacon-chain/sync/pending_attestations_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,12 +131,20 @@ func (s *Service) processAttestations(ctx context.Context, attestations []ethpb.
log.WithError(err).Debug("Could not retrieve attestation prestate")
continue
}

valid, err := s.validateUnaggregatedAttWithState(ctx, aggregate, preState)
committee, valid, err := s.validateUnaggregatedAttWithState(ctx, aggregate, preState)
if err != nil {
log.WithError(err).Debug("Pending unaggregated attestation failed validation")
continue
}
if aggregate.Version() >= version.Electra {
singleAtt, ok := aggregate.(*ethpb.SingleAttestation)
if !ok {
log.Debugf("Attestation has wrong type (expected %T, got %T)", &ethpb.SingleAttestation{}, aggregate)
continue
}
aggregate = singleAtt.ToAttestationElectra(committee)
}

if valid == pubsub.ValidationAccept {
if err := s.cfg.attPool.SaveUnaggregatedAttestation(aggregate); err != nil {
log.WithError(err).Debug("Could not save unaggregated attestation")
Expand Down
5 changes: 1 addition & 4 deletions beacon-chain/sync/subscriber_beacon_attestation.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,7 @@ func (s *Service) committeeIndexBeaconAttestationSubscriber(_ context.Context, m
if data == nil {
return errors.New("nil attestation")
}
committeeIndex, err := a.GetCommitteeIndex()
if err != nil {
return errors.Wrap(err, "committeeIndexBeaconAttestationSubscriber failed to get committee index")
}
committeeIndex := a.GetCommitteeIndex()
s.setSeenCommitteeIndicesSlot(data.Slot, committeeIndex, a.GetAggregationBits())

exists, err := s.cfg.attPool.HasAggregatedAttestation(a)
Expand Down
15 changes: 11 additions & 4 deletions beacon-chain/sync/validate_aggregate_proof.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,16 +169,23 @@ func (s *Service) validateAggregatedAtt(ctx context.Context, signed ethpb.Signed
return pubsub.ValidationIgnore, err
}

committeeIndex, _, result, err := s.validateCommitteeIndexAndCount(ctx, aggregate, bs)
committeeIndex, _, result, err := s.validateCommitteeIndex(ctx, aggregate, bs)
if result != pubsub.ValidationAccept {
wrappedErr := errors.Wrapf(err, "could not validate committee index")
tracing.AnnotateError(span, wrappedErr)
return result, err
}

committee, result, err := s.validateBitLength(ctx, bs, aggregate.GetData().Slot, committeeIndex, aggregate.GetAggregationBits())
if result != pubsub.ValidationAccept {
return result, err
committee, err := helpers.BeaconCommitteeFromState(ctx, bs, aggregate.GetData().Slot, committeeIndex)
if err != nil {
tracing.AnnotateError(span, err)
return pubsub.ValidationIgnore, err
}

// Verify number of aggregation bits matches the committee size.
if err = helpers.VerifyBitfieldLength(aggregate.GetAggregationBits(), uint64(len(committee))); err != nil {
tracing.AnnotateError(span, err)
return pubsub.ValidationReject, err
}

// Verify validator index is within the beacon committee.
Expand Down
141 changes: 63 additions & 78 deletions beacon-chain/sync/validate_beacon_attestation.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/pkg/errors"
"github.com/prysmaticlabs/go-bitfield"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/blockchain"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/blocks"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/feed"
Expand Down Expand Up @@ -83,12 +82,7 @@ func (s *Service) validateCommitteeIndexBeaconAttestation(ctx context.Context, p
return pubsub.ValidationReject, err
}

committeeIndex, result, err := s.validateCommitteeIndex(ctx, att)
if result != pubsub.ValidationAccept {
wrappedErr := errors.Wrapf(err, "could not validate committee index for %s version", version.String(att.Version()))
tracing.AnnotateError(span, wrappedErr)
return result, wrappedErr
}
committeeIndex := att.GetCommitteeIndex()

if !features.Get().EnableSlasher {
// Verify this the first attestation received for the participating validator for the slot.
Expand All @@ -112,12 +106,12 @@ func (s *Service) validateCommitteeIndexBeaconAttestation(ctx context.Context, p
if !s.hasBlockAndState(ctx, blockRoot) {
// A node doesn't have the block, it'll request from peer while saving the pending attestation to a queue.
if att.Version() >= version.Electra {
a, ok := att.(*eth.AttestationElectra)
a, ok := att.(*eth.SingleAttestation)
// This will never fail in practice because we asserted the version
if !ok {
return pubsub.ValidationIgnore, fmt.Errorf("attestation has wrong type (expected %T, got %T)", &eth.AttestationElectra{}, att)
return pubsub.ValidationIgnore, fmt.Errorf("attestation has wrong type (expected %T, got %T)", &eth.SingleAttestation{}, att)
}
s.savePendingAtt(&eth.SignedAggregateAttestationAndProofElectra{Message: &eth.AggregateAttestationAndProofElectra{Aggregate: a}})
s.savePendingAtt(&eth.SignedAggregateAttestationAndProofSingle{Message: &eth.AggregateAttestationAndProofSingle{Aggregate: a}})
} else {
a, ok := att.(*eth.Attestation)
// This will never fail in practice because we asserted the version
Expand All @@ -133,7 +127,6 @@ func (s *Service) validateCommitteeIndexBeaconAttestation(ctx context.Context, p
tracing.AnnotateError(span, blockchain.ErrNotDescendantOfFinalized)
return pubsub.ValidationIgnore, blockchain.ErrNotDescendantOfFinalized
}

if err = s.cfg.chain.VerifyLmdFfgConsistency(ctx, att); err != nil {
tracing.AnnotateError(span, err)
attBadLmdConsistencyCount.Inc()
Expand All @@ -145,38 +138,25 @@ func (s *Service) validateCommitteeIndexBeaconAttestation(ctx context.Context, p
tracing.AnnotateError(span, err)
return pubsub.ValidationIgnore, err
}
committee, err := helpers.BeaconCommitteeFromState(ctx, preState, att.GetData().Slot, committeeIndex)
if err != nil {
tracing.AnnotateError(span, err)
return pubsub.ValidationIgnore, err
}

var singleAtt *eth.SingleAttestation
if att.Version() >= version.Electra {
singleAtt, ok = att.(*eth.SingleAttestation)
if !ok {
return pubsub.ValidationIgnore, fmt.Errorf("attestation has wrong type (expected %T, got %T)", &eth.SingleAttestation{}, att)
}
att = singleAtt.ToAttestationElectra(committee)
}

validationRes, err = s.validateUnaggregatedAttTopic(ctx, att, preState, *msg.Topic)
if validationRes != pubsub.ValidationAccept {
return validationRes, err
}

if singleAtt != nil {
validationRes, err = validateAttestingIndex(ctx, singleAtt, committee)
if validationRes != pubsub.ValidationAccept {
return validationRes, err
}
}

validationRes, err = s.validateUnaggregatedAttWithState(ctx, att, preState)
committee, validationRes, err := s.validateUnaggregatedAttWithState(ctx, att, preState)
if validationRes != pubsub.ValidationAccept {
return validationRes, err
}

if att.Version() >= version.Electra {
singleAtt, ok := att.(*eth.SingleAttestation)
if !ok {
return pubsub.ValidationIgnore, fmt.Errorf("attestation has wrong type (expected %T, got %T)", &eth.SingleAttestation{}, att)
}
att = singleAtt.ToAttestationElectra(committee)
}

if features.Get().EnableSlasher {
// Feed the indexed attestation to slasher if enabled. This action
// is done in the background to avoid adding more load to this critical code path.
Expand Down Expand Up @@ -227,7 +207,7 @@ func (s *Service) validateUnaggregatedAttTopic(ctx context.Context, a eth.Att, b
ctx, span := trace.StartSpan(ctx, "sync.validateUnaggregatedAttTopic")
defer span.End()

_, valCount, result, err := s.validateCommitteeIndexAndCount(ctx, a, bs)
_, valCount, result, err := s.validateCommitteeIndex(ctx, a, bs)
if result != pubsub.ValidationAccept {
return result, err
}
Expand All @@ -245,79 +225,84 @@ func (s *Service) validateUnaggregatedAttTopic(ctx context.Context, a eth.Att, b
return pubsub.ValidationAccept, nil
}

func (s *Service) validateCommitteeIndexAndCount(
func (s *Service) validateCommitteeIndex(
ctx context.Context,
a eth.Att,
bs state.ReadOnlyBeaconState,
) (primitives.CommitteeIndex, uint64, pubsub.ValidationResult, error) {
ci, result, err := s.validateCommitteeIndex(ctx, a)
if result != pubsub.ValidationAccept {
return 0, 0, result, err
}
valCount, err := helpers.ActiveValidatorCount(ctx, bs, slots.ToEpoch(a.GetData().Slot))
if err != nil {
return 0, 0, pubsub.ValidationIgnore, err
}
count := helpers.SlotCommitteeCount(valCount)
ci := a.GetCommitteeIndex()
if uint64(ci) > count {
return 0, 0, pubsub.ValidationReject, fmt.Errorf("committee index %d > %d", ci, count)
}
return ci, valCount, pubsub.ValidationAccept, nil
}

func (s *Service) validateCommitteeIndex(ctx context.Context, a eth.Att) (primitives.CommitteeIndex, pubsub.ValidationResult, error) {
if a.Version() >= version.Electra {
return validateCommitteeIndexElectra(ctx, a)
}
return a.GetData().CommitteeIndex, pubsub.ValidationAccept, nil
}

// This validates beacon unaggregated attestation using the given state, the validation consists of bitfield length and count consistency
// and signature verification.
func (s *Service) validateUnaggregatedAttWithState(ctx context.Context, a eth.Att, bs state.ReadOnlyBeaconState) (pubsub.ValidationResult, error) {
func (s *Service) validateUnaggregatedAttWithState(
ctx context.Context,
a eth.Att,
bs state.ReadOnlyBeaconState,
) ([]primitives.ValidatorIndex, pubsub.ValidationResult, error) {
ctx, span := trace.StartSpan(ctx, "sync.validateUnaggregatedAttWithState")
defer span.End()

committeeIndex, err := a.GetCommitteeIndex()
if err != nil {
return pubsub.ValidationIgnore, err
}
committeeIndex := a.GetCommitteeIndex()

committee, result, err := s.validateBitLength(ctx, bs, a.GetData().Slot, committeeIndex, a.GetAggregationBits())
if result != pubsub.ValidationAccept {
return result, err
}
// Attestation must be unaggregated and the bit index must exist in the range of committee indices.
// Note: The Ethereum Beacon chain spec suggests (len(get_attesting_indices(state, attestation.data, attestation.aggregation_bits)) == 1)
// however this validation can be achieved without use of get_attesting_indices which is an O(n) lookup.
if a.GetAggregationBits().Count() != 1 || a.GetAggregationBits().BitIndices()[0] >= len(committee) {
return pubsub.ValidationReject, errors.New("attestation bitfield is invalid")
var committee []primitives.ValidatorIndex
var err error

// TODO: add test
// - [REJECT] attestation.data.index == 0
if a.Version() >= version.Electra {
if committeeIndex != 0 {
return nil, pubsub.ValidationReject, errors.New("attestation data's committee index must be 0")
}
singleAtt, ok := a.(*eth.SingleAttestation)
if !ok {
return nil, pubsub.ValidationReject, fmt.Errorf("attestation has wrong type (expected %T, got %T)", &eth.SingleAttestation{}, a)
}
committee, err = helpers.BeaconCommitteeFromState(ctx, bs, a.GetData().Slot, committeeIndex)
if err != nil {
return nil, pubsub.ValidationIgnore, err
}
result, err := validateAttestingIndex(ctx, singleAtt, committee)
if result != pubsub.ValidationAccept {
return nil, result, err
}
} else {
committee, err = helpers.BeaconCommitteeFromState(ctx, bs, a.GetData().Slot, committeeIndex)
if err != nil {
return nil, pubsub.ValidationIgnore, err
}
// Verify number of aggregation bits matches the committee size.
if err = helpers.VerifyBitfieldLength(a.GetAggregationBits(), uint64(len(committee))); err != nil {
return nil, pubsub.ValidationReject, err
}
// Attestation must be unaggregated and the bit index must exist in the range of committee indices.
// Note: The Ethereum Beacon chain spec suggests (len(get_attesting_indices(state, attestation.data, attestation.aggregation_bits)) == 1)
// however this validation can be achieved without use of get_attesting_indices which is an O(n) lookup.
if a.GetAggregationBits().Count() != 1 || a.GetAggregationBits().BitIndices()[0] >= len(committee) {
return nil, pubsub.ValidationReject, errors.New("attestation bitfield is invalid")
}
}

set, err := blocks.AttestationSignatureBatch(ctx, bs, []eth.Att{a})
if err != nil {
tracing.AnnotateError(span, err)
attBadSignatureBatchCount.Inc()
return pubsub.ValidationReject, err
}
return s.validateWithBatchVerifier(ctx, "attestation", set)
}

func (s *Service) validateBitLength(
ctx context.Context,
bs state.ReadOnlyBeaconState,
slot primitives.Slot,
committeeIndex primitives.CommitteeIndex,
aggregationBits bitfield.Bitlist,
) ([]primitives.ValidatorIndex, pubsub.ValidationResult, error) {
committee, err := helpers.BeaconCommitteeFromState(ctx, bs, slot, committeeIndex)
if err != nil {
return nil, pubsub.ValidationIgnore, err
return nil, pubsub.ValidationReject, err
}

// Verify number of aggregation bits matches the committee size.
if err := helpers.VerifyBitfieldLength(aggregationBits, uint64(len(committee))); err != nil {
return nil, pubsub.ValidationReject, err
result, err := s.validateWithBatchVerifier(ctx, "attestation", set)
if result != pubsub.ValidationAccept {
tracing.AnnotateError(span, err)
return nil, result, err
}

return committee, pubsub.ValidationAccept, nil
Expand Down
27 changes: 0 additions & 27 deletions beacon-chain/sync/validate_beacon_attestation_electra.go

This file was deleted.

Loading

0 comments on commit 023c99d

Please sign in to comment.