-
Notifications
You must be signed in to change notification settings - Fork 1.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
replace receive slot with event stream #13563
Changes from 91 commits
29a552b
86966f5
de96792
033448e
23ec534
91ea02a
b074356
b5670b6
b76e446
f3a0534
38bce44
87c62c9
1fb9a3e
8b0d2ed
17eb642
be92c89
0e403d1
34af48f
aa49ecc
800ce23
048bbcc
1595264
1859840
73022ae
9be3711
47c3f28
c179901
1b6581b
b7cab70
4ad4a46
0982c0b
dd0b4d9
7464d9a
0f4467f
6a828bb
ddd8560
68ceefa
40c566a
02ba06a
37da651
37da93f
be240c4
371aa4c
1c3e369
e802f62
44d5e67
4f4b473
2127691
472d211
0eb554c
e7776fc
1d03da3
4d4eafa
264282c
1e8901c
d698395
ba6c524
e880031
49af08b
86f5f48
243c833
182a683
cc08106
dde4265
ab89e47
2c63dba
32417a3
f0c3903
263a51b
9d08460
65b85ae
2f80739
8e90fbf
a35a12b
54bfd46
6729ae6
8c5200b
bc8e9e2
45693f2
98aa51e
c8bebda
4c4b669
da6fd76
42356da
79200f4
2d77069
e3fd6b8
aa144a0
d2f7df4
ae1720b
024624f
c12e68b
be7963f
6a8a9b0
0c7bb37
0d21889
7141f63
e249fb9
c49d870
8ae7314
fe599f8
effc418
414103e
92e813a
3b92c0b
4c5ba7b
db14009
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,47 @@ | ||
package beacon | ||
|
||
import ( | ||
"sync" | ||
) | ||
|
||
type NodeHealth struct { | ||
isHealthy bool | ||
healthChan chan bool | ||
sync.RWMutex | ||
} | ||
|
||
func NewNodeHealth(initialStatus bool) *NodeHealth { | ||
return &NodeHealth{ | ||
isHealthy: initialStatus, | ||
healthChan: make(chan bool, 1), | ||
} | ||
} | ||
|
||
// HealthUpdates provides a read-only channel for health updates. | ||
func (n *NodeHealth) HealthUpdates() <-chan bool { | ||
return n.healthChan | ||
} | ||
|
||
func (n *NodeHealth) IsHealthy() bool { | ||
n.RLock() | ||
defer n.RUnlock() | ||
return n.isHealthy | ||
} | ||
|
||
func (n *NodeHealth) UpdateNodeHealth(newStatus bool) { | ||
n.RLock() | ||
isStatusChanged := newStatus != n.isHealthy | ||
n.RUnlock() | ||
|
||
if isStatusChanged { | ||
n.Lock() | ||
// Double-check the condition to ensure it hasn't changed since the first check. | ||
if newStatus != n.isHealthy { | ||
n.isHealthy = newStatus | ||
n.Unlock() // It's better to unlock as soon as the protected section is over. | ||
n.healthChan <- newStatus | ||
} else { | ||
n.Unlock() | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,102 @@ | ||
package beacon | ||
|
||
import ( | ||
"sync" | ||
"testing" | ||
) | ||
|
||
func TestNodeHealth_IsHealthy(t *testing.T) { | ||
tests := []struct { | ||
name string | ||
isHealthy bool | ||
want bool | ||
}{ | ||
{"initially healthy", true, true}, | ||
{"initially unhealthy", false, false}, | ||
} | ||
for _, tt := range tests { | ||
t.Run(tt.name, func(t *testing.T) { | ||
n := &NodeHealth{ | ||
isHealthy: tt.isHealthy, | ||
healthChan: make(chan bool, 1), | ||
} | ||
if got := n.IsHealthy(); got != tt.want { | ||
t.Errorf("IsHealthy() = %v, want %v", got, tt.want) | ||
} | ||
}) | ||
} | ||
} | ||
|
||
func TestNodeHealth_UpdateNodeHealth(t *testing.T) { | ||
tests := []struct { | ||
name string | ||
initial bool // Initial health status | ||
newStatus bool // Status to update to | ||
shouldSend bool // Should a message be sent through the channel | ||
}{ | ||
{"healthy to unhealthy", true, false, true}, | ||
{"unhealthy to healthy", false, true, true}, | ||
{"remain healthy", true, true, false}, | ||
{"remain unhealthy", false, false, false}, | ||
} | ||
for _, tt := range tests { | ||
t.Run(tt.name, func(t *testing.T) { | ||
n := NewNodeHealth(tt.initial) | ||
n.isHealthy = tt.initial // Set initial health status | ||
n.UpdateNodeHealth(tt.newStatus) | ||
|
||
// Check if health status was updated | ||
if n.IsHealthy() != tt.newStatus { | ||
t.Errorf("UpdateNodeHealth() failed to update isHealthy from %v to %v", tt.initial, tt.newStatus) | ||
} | ||
|
||
select { | ||
case status := <-n.HealthUpdates(): | ||
if !tt.shouldSend { | ||
t.Errorf("UpdateNodeHealth() unexpectedly sent status %v to HealthCh", status) | ||
} else if status != tt.newStatus { | ||
t.Errorf("UpdateNodeHealth() sent wrong status %v, want %v", status, tt.newStatus) | ||
} | ||
default: | ||
if tt.shouldSend { | ||
t.Error("UpdateNodeHealth() did not send any status to HealthCh when expected") | ||
} | ||
} | ||
}) | ||
} | ||
} | ||
|
||
func TestNodeHealth_Concurrency(t *testing.T) { | ||
n := NewNodeHealth(true) | ||
var wg sync.WaitGroup | ||
|
||
// Number of goroutines to spawn for both reading and writing | ||
numGoroutines := 6 | ||
|
||
go func() { | ||
for range n.HealthUpdates() { | ||
// Consume values to avoid blocking on channel send. | ||
} | ||
}() | ||
|
||
wg.Add(numGoroutines * 2) // for readers and writers | ||
|
||
// Concurrently update health status | ||
for i := 0; i < numGoroutines; i++ { | ||
go func() { | ||
defer wg.Done() | ||
n.UpdateNodeHealth(false) // Flip health status | ||
n.UpdateNodeHealth(true) // And flip it back | ||
}() | ||
} | ||
|
||
// Concurrently read health status | ||
for i := 0; i < numGoroutines; i++ { | ||
go func() { | ||
defer wg.Done() | ||
_ = n.IsHealthy() // Just read the value | ||
}() | ||
} | ||
|
||
wg.Wait() // Wait for all goroutines to finish | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,24 @@ | ||
load("@prysm//tools/go:def.bzl", "go_library", "go_test") | ||
|
||
go_library( | ||
name = "go_default_library", | ||
srcs = ["event_stream.go"], | ||
importpath = "github.com/prysmaticlabs/prysm/v5/api/client/event", | ||
visibility = ["//visibility:public"], | ||
deps = [ | ||
"//api:go_default_library", | ||
"//api/client:go_default_library", | ||
"@com_github_pkg_errors//:go_default_library", | ||
"@com_github_sirupsen_logrus//:go_default_library", | ||
], | ||
) | ||
|
||
go_test( | ||
name = "go_default_test", | ||
srcs = ["event_stream_test.go"], | ||
embed = [":go_default_library"], | ||
deps = [ | ||
"//testing/require:go_default_library", | ||
"@com_github_sirupsen_logrus//:go_default_library", | ||
], | ||
) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,148 @@ | ||
package event | ||
|
||
import ( | ||
"bufio" | ||
"context" | ||
"net/http" | ||
"net/url" | ||
"strings" | ||
|
||
"github.com/pkg/errors" | ||
"github.com/prysmaticlabs/prysm/v5/api" | ||
"github.com/prysmaticlabs/prysm/v5/api/client" | ||
log "github.com/sirupsen/logrus" | ||
) | ||
|
||
const ( | ||
EventHead = "head" | ||
EventBlock = "block" | ||
EventAttestation = "attestation" | ||
EventVoluntaryExit = "voluntary_exit" | ||
EventBlsToExecutionChange = "bls_to_execution_change" | ||
EventProposerSlashing = "proposer_slashing" | ||
EventAttesterSlashing = "attester_slashing" | ||
EventFinalizedCheckpoint = "finalized_checkpoint" | ||
EventChainReorg = "chain_reorg" | ||
EventContributionAndProof = "contribution_and_proof" | ||
EventLightClientFinalityUpdate = "light_client_finality_update" | ||
EventLightClientOptimisticUpdate = "light_client_optimistic_update" | ||
EventPayloadAttributes = "payload_attributes" | ||
EventBlobSidecar = "blob_sidecar" | ||
EventError = "error" | ||
EventConnectionError = "connection_error" | ||
) | ||
|
||
var ( | ||
_ = EventStreamClient(&EventStream{}) | ||
) | ||
|
||
var DefaultEventTopics = []string{EventHead} | ||
|
||
type EventStreamClient interface { | ||
Subscribe(eventsChannel chan<- *Event) | ||
} | ||
|
||
type Event struct { | ||
EventType string | ||
Data []byte | ||
} | ||
|
||
// EventStream is responsible for subscribing to the Beacon API events endpoint | ||
// and dispatching received events to subscribers. | ||
type EventStream struct { | ||
ctx context.Context | ||
httpClient *http.Client | ||
host string | ||
topics []string | ||
} | ||
|
||
func NewEventStream(ctx context.Context, httpClient *http.Client, host string, topics []string) (*EventStream, error) { | ||
// Check if the host is a valid URL | ||
_, err := url.ParseRequestURI(host) | ||
if err != nil { | ||
return nil, err | ||
} | ||
if len(topics) == 0 { | ||
return nil, errors.New("no topics provided") | ||
} | ||
|
||
return &EventStream{ | ||
ctx: ctx, | ||
httpClient: httpClient, | ||
host: host, | ||
topics: topics, | ||
}, nil | ||
} | ||
|
||
func (h *EventStream) Subscribe(eventsChannel chan<- *Event) { | ||
allTopics := strings.Join(h.topics, ",") | ||
log.WithField("topics", allTopics).Info("Listening to Beacon API events") | ||
fullUrl := h.host + "/eth/v1/events?topics=" + allTopics | ||
req, err := http.NewRequestWithContext(h.ctx, http.MethodGet, fullUrl, nil) | ||
if err != nil { | ||
eventsChannel <- &Event{ | ||
EventType: EventConnectionError, | ||
Data: []byte(errors.Wrap(err, "failed to create HTTP request").Error()), | ||
} | ||
} | ||
req.Header.Set("Accept", api.EventStreamMediaType) | ||
req.Header.Set("Connection", api.KeepAlive) | ||
resp, err := h.httpClient.Do(req) | ||
if err != nil { | ||
eventsChannel <- &Event{ | ||
EventType: EventConnectionError, | ||
Data: []byte(errors.Wrap(err, client.ErrConnectionIssue.Error()).Error()), | ||
} | ||
} | ||
|
||
defer func() { | ||
if closeErr := resp.Body.Close(); closeErr != nil { | ||
log.WithError(closeErr).Error("Failed to close events response body") | ||
} | ||
}() | ||
// Create a new scanner to read lines from the response body | ||
scanner := bufio.NewScanner(resp.Body) | ||
|
||
var eventType, data string // Variables to store event type and data | ||
|
||
// Iterate over lines of the event stream | ||
for scanner.Scan() { | ||
select { | ||
case <-h.ctx.Done(): | ||
log.Info("Context canceled, stopping event stream") | ||
close(eventsChannel) | ||
return | ||
default: | ||
line := scanner.Text() // TODO: scanner does not handle /r and does not fully adhere to https://html.spec.whatwg.org/multipage/server-sent-events.html#the-eventsource-interface | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What is the TODO item? Should you fix that before merging? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is an area i need some suggestions on, scanner does not handle /r only /n and /n/r , i don't think any implementations use /r , would it be better to use a sse library to handle this instead of our own implementation |
||
// Handle the event based on your specific format | ||
if line == "" { | ||
// Empty line indicates the end of an event | ||
if eventType != "" && data != "" { | ||
// Process the event when both eventType and data are set | ||
eventsChannel <- &Event{EventType: eventType, Data: []byte(data)} | ||
} | ||
|
||
// Reset eventType and data for the next event | ||
eventType, data = "", "" | ||
continue | ||
} | ||
et, ok := strings.CutPrefix(line, "event: ") | ||
if ok { | ||
// Extract event type from the "event" field | ||
eventType = et | ||
} | ||
d, ok := strings.CutPrefix(line, "data: ") | ||
if ok { | ||
// Extract data from the "data" field | ||
data = d | ||
} | ||
} | ||
} | ||
|
||
if err := scanner.Err(); err != nil { | ||
eventsChannel <- &Event{ | ||
EventType: EventConnectionError, | ||
Data: []byte(errors.Wrap(err, errors.Wrap(client.ErrConnectionIssue, "scanner failed").Error()).Error()), | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why put this under api/client? Seems to be unrelated to making API calls.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the idea was from the client perspective this Node health object keeps track of the server ( the beacon node) so in this case the validator uses the package api/client to track the beacon node's health. happy to move it if it doesn't make sense.