Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SSE implementation that sheds stuck clients #14413

Merged
merged 10 commits into from
Oct 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ The format is based on Keep a Changelog, and this project adheres to Semantic Ve
- Tests to ensure sepolia config matches the official upstream yaml
- HTTP endpoint for PublishBlobs
- GetBlockV2, GetBlindedBlock, ProduceBlockV2, ProduceBlockV3: add Electra case.
- SSE implementation that sheds stuck clients. [pr](https://github.com/prysmaticlabs/prysm/pull/14413)

### Changed

Expand Down
8 changes: 8 additions & 0 deletions api/headers.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package api

import "net/http"

const (
VersionHeader = "Eth-Consensus-Version"
ExecutionPayloadBlindedHeader = "Eth-Execution-Payload-Blinded"
Expand All @@ -10,3 +12,9 @@ const (
EventStreamMediaType = "text/event-stream"
KeepAlive = "keep-alive"
)

// SetSSEHeaders sets the headers needed for a server-sent event response.
func SetSSEHeaders(w http.ResponseWriter) {
w.Header().Set("Content-Type", EventStreamMediaType)
w.Header().Set("Connection", KeepAlive)
}
35 changes: 35 additions & 0 deletions api/server/structs/conversions.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/prysmaticlabs/prysm/v5/encoding/bytesutil"
"github.com/prysmaticlabs/prysm/v5/math"
enginev1 "github.com/prysmaticlabs/prysm/v5/proto/engine/v1"
ethv1 "github.com/prysmaticlabs/prysm/v5/proto/eth/v1"
eth "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1"
)

Expand Down Expand Up @@ -1508,3 +1509,37 @@ func PendingConsolidationsFromConsensus(cs []*eth.PendingConsolidation) []*Pendi
}
return consolidations
}

func HeadEventFromV1(event *ethv1.EventHead) *HeadEvent {
return &HeadEvent{
Slot: fmt.Sprintf("%d", event.Slot),
Block: hexutil.Encode(event.Block),
State: hexutil.Encode(event.State),
EpochTransition: event.EpochTransition,
ExecutionOptimistic: event.ExecutionOptimistic,
PreviousDutyDependentRoot: hexutil.Encode(event.PreviousDutyDependentRoot),
CurrentDutyDependentRoot: hexutil.Encode(event.CurrentDutyDependentRoot),
}
}

func FinalizedCheckpointEventFromV1(event *ethv1.EventFinalizedCheckpoint) *FinalizedCheckpointEvent {
return &FinalizedCheckpointEvent{
Block: hexutil.Encode(event.Block),
State: hexutil.Encode(event.State),
Epoch: fmt.Sprintf("%d", event.Epoch),
ExecutionOptimistic: event.ExecutionOptimistic,
}
}

func EventChainReorgFromV1(event *ethv1.EventChainReorg) *ChainReorgEvent {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Want to handle nil cases on these new functions? Any of them panic if the event is nil.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think nil is possible ( based on where it's used)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does seem to be the pattern overall for these functions to rely on their callers for nil checks. I added a paranoid check to the top of lazyReaderForEvent which will at least ensure this does not happen for the sse streamer code that calls it.

return &ChainReorgEvent{
Slot: fmt.Sprintf("%d", event.Slot),
Depth: fmt.Sprintf("%d", event.Depth),
OldHeadBlock: hexutil.Encode(event.OldHeadBlock),
NewHeadBlock: hexutil.Encode(event.NewHeadBlock),
OldHeadState: hexutil.Encode(event.OldHeadState),
NewHeadState: hexutil.Encode(event.NewHeadState),
Epoch: fmt.Sprintf("%d", event.Epoch),
ExecutionOptimistic: event.ExecutionOptimistic,
}
}
1 change: 1 addition & 0 deletions async/event/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go_library(
name = "go_default_library",
srcs = [
"feed.go",
"interface.go",
"subscription.go",
],
importpath = "github.com/prysmaticlabs/prysm/v5/async/event",
Expand Down
1 change: 1 addition & 0 deletions async/event/feed.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,4 @@ import (

// Feed is a re-export of the go-ethereum event feed.
type Feed = geth_event.Feed
type Subscription = geth_event.Subscription
8 changes: 8 additions & 0 deletions async/event/interface.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package event

// SubscriberSender is an abstract representation of an *event.Feed
// to use in describing types that accept or return an *event.Feed.
type SubscriberSender interface {
Subscribe(channel interface{}) Subscription
Send(value interface{}) (nsent int)
}
19 changes: 0 additions & 19 deletions async/event/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,25 +28,6 @@ import (
// request backoff time.
const waitQuotient = 10

// Subscription represents a stream of events. The carrier of the events is typically a
// channel, but isn't part of the interface.
//
// Subscriptions can fail while established. Failures are reported through an error
// channel. It receives a value if there is an issue with the subscription (e.g. the
// network connection delivering the events has been closed). Only one value will ever be
// sent.
//
// The error channel is closed when the subscription ends successfully (i.e. when the
// source of events is closed). It is also closed when Unsubscribe is called.
//
// The Unsubscribe method cancels the sending of events. You must call Unsubscribe in all
// cases to ensure that resources related to the subscription are released. It can be
// called any number of times.
type Subscription interface {
Err() <-chan error // returns the error channel
Unsubscribe() // cancels sending of events, closing the error channel
}

// NewSubscription runs a producer function as a subscription in a new goroutine. The
// channel given to the producer is closed when Unsubscribe is called. If fn returns an
// error, it is sent on the subscription's error channel.
Expand Down
2 changes: 1 addition & 1 deletion beacon-chain/blockchain/setup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ type mockBeaconNode struct {
}

// StateFeed mocks the same method in the beacon node.
func (mbn *mockBeaconNode) StateFeed() *event.Feed {
func (mbn *mockBeaconNode) StateFeed() event.SubscriberSender {
mbn.mu.Lock()
defer mbn.mu.Unlock()
if mbn.stateFeed == nil {
Expand Down
59 changes: 57 additions & 2 deletions beacon-chain/blockchain/testing/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,44 @@ func (s *ChainService) BlockNotifier() blockfeed.Notifier {
return s.blockNotifier
}

type EventFeedWrapper struct {
feed *event.Feed
subscribed chan struct{} // this channel is closed once a subscription is made
}

func (w *EventFeedWrapper) Subscribe(channel interface{}) event.Subscription {
select {
case <-w.subscribed:
break // already closed
default:
close(w.subscribed)
}
return w.feed.Subscribe(channel)
}

func (w *EventFeedWrapper) Send(value interface{}) int {
return w.feed.Send(value)
}

// WaitForSubscription allows test to wait for the feed to have a subscription before beginning to send events.
func (w *EventFeedWrapper) WaitForSubscription(ctx context.Context) error {
select {
case <-w.subscribed:
return nil
case <-ctx.Done():
return ctx.Err()
}
}

var _ event.SubscriberSender = &EventFeedWrapper{}

func NewEventFeedWrapper() *EventFeedWrapper {
return &EventFeedWrapper{
feed: new(event.Feed),
subscribed: make(chan struct{}),
}
}

// MockBlockNotifier mocks the block notifier.
type MockBlockNotifier struct {
feed *event.Feed
Expand Down Expand Up @@ -131,7 +169,7 @@ func (msn *MockStateNotifier) ReceivedEvents() []*feed.Event {
}

// StateFeed returns a state feed.
func (msn *MockStateNotifier) StateFeed() *event.Feed {
func (msn *MockStateNotifier) StateFeed() event.SubscriberSender {
msn.feedLock.Lock()
defer msn.feedLock.Unlock()

Expand Down Expand Up @@ -159,6 +197,23 @@ func (msn *MockStateNotifier) StateFeed() *event.Feed {
return msn.feed
}

// NewSimpleStateNotifier makes a state feed without the custom mock feed machinery.
func NewSimpleStateNotifier() *MockStateNotifier {
return &MockStateNotifier{feed: new(event.Feed)}
}

type SimpleNotifier struct {
Feed event.SubscriberSender
}

func (n *SimpleNotifier) StateFeed() event.SubscriberSender {
return n.Feed
}

func (n *SimpleNotifier) OperationFeed() event.SubscriberSender {
return n.Feed
}

// OperationNotifier mocks the same method in the chain service.
func (s *ChainService) OperationNotifier() opfeed.Notifier {
if s.opNotifier == nil {
Expand All @@ -173,7 +228,7 @@ type MockOperationNotifier struct {
}

// OperationFeed returns an operation feed.
func (mon *MockOperationNotifier) OperationFeed() *event.Feed {
func (mon *MockOperationNotifier) OperationFeed() event.SubscriberSender {
if mon.feed == nil {
mon.feed = new(event.Feed)
}
Expand Down
2 changes: 1 addition & 1 deletion beacon-chain/core/feed/operation/notifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,5 @@ import "github.com/prysmaticlabs/prysm/v5/async/event"

// Notifier interface defines the methods of the service that provides beacon block operation updates to consumers.
type Notifier interface {
OperationFeed() *event.Feed
OperationFeed() event.SubscriberSender
}
2 changes: 1 addition & 1 deletion beacon-chain/core/feed/state/notifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,5 @@ import "github.com/prysmaticlabs/prysm/v5/async/event"

// Notifier interface defines the methods of the service that provides state updates to consumers.
type Notifier interface {
StateFeed() *event.Feed
StateFeed() event.SubscriberSender
}
2 changes: 1 addition & 1 deletion beacon-chain/execution/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ type goodNotifier struct {
MockStateFeed *event.Feed
}

func (g *goodNotifier) StateFeed() *event.Feed {
func (g *goodNotifier) StateFeed() event.SubscriberSender {
if g.MockStateFeed == nil {
g.MockStateFeed = new(event.Feed)
}
Expand Down
4 changes: 2 additions & 2 deletions beacon-chain/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,7 @@ func initSyncWaiter(ctx context.Context, complete chan struct{}) func() error {
}

// StateFeed implements statefeed.Notifier.
func (b *BeaconNode) StateFeed() *event.Feed {
func (b *BeaconNode) StateFeed() event.SubscriberSender {
return b.stateFeed
}

Expand All @@ -408,7 +408,7 @@ func (b *BeaconNode) BlockFeed() *event.Feed {
}

// OperationFeed implements opfeed.Notifier.
func (b *BeaconNode) OperationFeed() *event.Feed {
func (b *BeaconNode) OperationFeed() event.SubscriberSender {
return b.opFeed
}

Expand Down
8 changes: 6 additions & 2 deletions beacon-chain/rpc/eth/events/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,16 @@ go_library(
"//time/slots:go_default_library",
"@com_github_ethereum_go_ethereum//common/hexutil:go_default_library",
"@com_github_pkg_errors//:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
],
)

go_test(
name = "go_default_test",
srcs = ["events_test.go"],
srcs = [
"events_test.go",
"http_test.go",
],
embed = [":go_default_library"],
deps = [
"//beacon-chain/blockchain/testing:go_default_library",
Expand All @@ -49,9 +53,9 @@ go_test(
"//consensus-types/primitives:go_default_library",
"//proto/eth/v1:go_default_library",
"//proto/prysm/v1alpha1:go_default_library",
"//testing/assert:go_default_library",
"//testing/require:go_default_library",
"//testing/util:go_default_library",
"@com_github_ethereum_go_ethereum//common:go_default_library",
"@com_github_r3labs_sse_v2//:go_default_library",
],
)
Loading
Loading