Skip to content

Commit

Permalink
sse implementation that sheds stuck clients
Browse files Browse the repository at this point in the history
  • Loading branch information
kasey committed Sep 9, 2024
1 parent 4c14bd8 commit c2ae175
Show file tree
Hide file tree
Showing 11 changed files with 889 additions and 629 deletions.
16 changes: 15 additions & 1 deletion WORKSPACE
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,21 @@ load("@com_github_atlassian_bazel_tools//gometalinter:deps.bzl", "gometalinter_d

gometalinter_dependencies()

load("@bazel_gazelle//:deps.bzl", "gazelle_dependencies")
load("@bazel_gazelle//:deps.bzl", "gazelle_dependencies", "go_repository")

go_repository(
name = "com_github_r3labs_sse_v2",
importpath = "github.com/r3labs/sse/v2",
sum = "h1:hFEkLLFY4LDifoHdiCN/LlGBAdVJYsANaLqNYa1l/v0=",
version = "v2.10.0",
)

go_repository(
name = "in_gopkg_cenkalti_backoff_v1",
importpath = "gopkg.in/cenkalti/backoff.v1",
sum = "h1:Arh75ttbsvlpVA7WtVpH4u9h6Zl46xuptxqLxPiSo4Y=",
version = "v1.1.0",
)

gazelle_dependencies()

Expand Down
3 changes: 3 additions & 0 deletions api/server/structs/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,10 @@ go_library(
"//encoding/bytesutil:go_default_library",
"//math:go_default_library",
"//proto/engine/v1:go_default_library",
"//proto/eth/v1:go_default_library",
"//proto/eth/v2:go_default_library",
"//proto/prysm/v1alpha1:go_default_library",
"//runtime/version:go_default_library",
"@com_github_ethereum_go_ethereum//common:go_default_library",
"@com_github_ethereum_go_ethereum//common/hexutil:go_default_library",
"@com_github_pkg_errors//:go_default_library",
Expand Down
100 changes: 100 additions & 0 deletions api/server/structs/conversions.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@ import (
"github.com/prysmaticlabs/prysm/v5/encoding/bytesutil"
"github.com/prysmaticlabs/prysm/v5/math"
enginev1 "github.com/prysmaticlabs/prysm/v5/proto/engine/v1"
ethv1 "github.com/prysmaticlabs/prysm/v5/proto/eth/v1"
ethv2 "github.com/prysmaticlabs/prysm/v5/proto/eth/v2"
eth "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/v5/runtime/version"
)

var errNilValue = errors.New("nil value")
Expand Down Expand Up @@ -1508,3 +1511,100 @@ func PendingConsolidationsFromConsensus(cs []*eth.PendingConsolidation) []*Pendi
}
return consolidations
}

func HeadEventFromV1(event *ethv1.EventHead) *HeadEvent {
return &HeadEvent{
Slot: fmt.Sprintf("%d", event.Slot),
Block: hexutil.Encode(event.Block),
State: hexutil.Encode(event.State),
EpochTransition: event.EpochTransition,
ExecutionOptimistic: event.ExecutionOptimistic,
PreviousDutyDependentRoot: hexutil.Encode(event.PreviousDutyDependentRoot),
CurrentDutyDependentRoot: hexutil.Encode(event.CurrentDutyDependentRoot),
}
}

func FinalizedCheckpointEventFromV1(event *ethv1.EventFinalizedCheckpoint) *FinalizedCheckpointEvent {
return &FinalizedCheckpointEvent{
Block: hexutil.Encode(event.Block),
State: hexutil.Encode(event.State),
Epoch: fmt.Sprintf("%d", event.Epoch),
ExecutionOptimistic: event.ExecutionOptimistic,
}
}

func LightClientFinalityUpdateEventFromV2(event *ethv2.LightClientFinalityUpdateWithVersion) (*LightClientFinalityUpdateEvent, error) {
var finalityBranch []string
for _, b := range event.Data.FinalityBranch {
finalityBranch = append(finalityBranch, hexutil.Encode(b))
}
attestedBeacon, err := event.Data.AttestedHeader.GetBeacon()
if err != nil {
return nil, errors.Wrap(err, "could not get attested header")
}
finalizedBeacon, err := event.Data.FinalizedHeader.GetBeacon()
if err != nil {
return nil, errors.Wrap(err, "could not get finalized header")
}
return &LightClientFinalityUpdateEvent{
Version: version.String(int(event.Version)),
Data: &LightClientFinalityUpdate{
AttestedHeader: &BeaconBlockHeader{
Slot: fmt.Sprintf("%d", attestedBeacon.Slot),
ProposerIndex: fmt.Sprintf("%d", attestedBeacon.ProposerIndex),
ParentRoot: hexutil.Encode(attestedBeacon.ParentRoot),
StateRoot: hexutil.Encode(attestedBeacon.StateRoot),
BodyRoot: hexutil.Encode(attestedBeacon.BodyRoot),
},
FinalizedHeader: &BeaconBlockHeader{
Slot: fmt.Sprintf("%d", finalizedBeacon.Slot),
ProposerIndex: fmt.Sprintf("%d", finalizedBeacon.ProposerIndex),
ParentRoot: hexutil.Encode(finalizedBeacon.ParentRoot),
StateRoot: hexutil.Encode(finalizedBeacon.StateRoot),
},
FinalityBranch: finalityBranch,
SyncAggregate: &SyncAggregate{
SyncCommitteeBits: hexutil.Encode(event.Data.SyncAggregate.SyncCommitteeBits),
SyncCommitteeSignature: hexutil.Encode(event.Data.SyncAggregate.SyncCommitteeSignature),
},
SignatureSlot: fmt.Sprintf("%d", event.Data.SignatureSlot),
},
}, nil
}

func LightClientOptimisticUpdateWithVersionFromV2(event *ethv2.LightClientOptimisticUpdateWithVersion) (*LightClientOptimisticUpdateEvent, error) {
attestedBeacon, err := event.Data.AttestedHeader.GetBeacon()
if err != nil {
return nil, errors.Wrap(err, "could not get attested header")
}
return &LightClientOptimisticUpdateEvent{
Version: version.String(int(event.Version)),
Data: &LightClientOptimisticUpdate{
AttestedHeader: &BeaconBlockHeader{
Slot: fmt.Sprintf("%d", attestedBeacon.Slot),
ProposerIndex: fmt.Sprintf("%d", attestedBeacon.ProposerIndex),
ParentRoot: hexutil.Encode(attestedBeacon.ParentRoot),
StateRoot: hexutil.Encode(attestedBeacon.StateRoot),
BodyRoot: hexutil.Encode(attestedBeacon.BodyRoot),
},
SyncAggregate: &SyncAggregate{
SyncCommitteeBits: hexutil.Encode(event.Data.SyncAggregate.SyncCommitteeBits),
SyncCommitteeSignature: hexutil.Encode(event.Data.SyncAggregate.SyncCommitteeSignature),
},
SignatureSlot: fmt.Sprintf("%d", event.Data.SignatureSlot),
},
}, nil
}

func EventChainReorgFromV1(event *ethv1.EventChainReorg) *ChainReorgEvent {
return &ChainReorgEvent{
Slot: fmt.Sprintf("%d", event.Slot),
Depth: fmt.Sprintf("%d", event.Depth),
OldHeadBlock: hexutil.Encode(event.OldHeadBlock),
NewHeadBlock: hexutil.Encode(event.NewHeadBlock),
OldHeadState: hexutil.Encode(event.OldHeadState),
NewHeadState: hexutil.Encode(event.NewHeadState),
Epoch: fmt.Sprintf("%d", event.Epoch),
ExecutionOptimistic: event.ExecutionOptimistic,
}
}
5 changes: 5 additions & 0 deletions beacon-chain/blockchain/testing/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,11 @@ func (msn *MockStateNotifier) StateFeed() *event.Feed {
return msn.feed
}

// NewSimpleStateNotifier makes a state feed without the custom mock feed machinery.
func NewSimpleStateNotifier() *MockStateNotifier {
return &MockStateNotifier{feed: new(event.Feed)}
}

// OperationNotifier mocks the same method in the chain service.
func (s *ChainService) OperationNotifier() opfeed.Notifier {
if s.opNotifier == nil {
Expand Down
8 changes: 6 additions & 2 deletions beacon-chain/rpc/eth/events/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,16 @@ go_library(
"//time/slots:go_default_library",
"@com_github_ethereum_go_ethereum//common/hexutil:go_default_library",
"@com_github_pkg_errors//:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
],
)

go_test(
name = "go_default_test",
srcs = ["events_test.go"],
srcs = [
"events_test.go",
"http_test.go",
],
embed = [":go_default_library"],
deps = [
"//beacon-chain/blockchain/testing:go_default_library",
Expand All @@ -49,9 +53,9 @@ go_test(
"//consensus-types/primitives:go_default_library",
"//proto/eth/v1:go_default_library",
"//proto/prysm/v1alpha1:go_default_library",
"//testing/assert:go_default_library",
"//testing/require:go_default_library",
"//testing/util:go_default_library",
"@com_github_ethereum_go_ethereum//common:go_default_library",
"@com_github_r3labs_sse_v2//:go_default_library",
],
)
Loading

0 comments on commit c2ae175

Please sign in to comment.