Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Safe StreamEvents write loop #14557

Merged
merged 10 commits into from
Oct 24, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
StreamHandler waits for write loop exit
  • Loading branch information
kasey committed Oct 24, 2024
commit 1bd313cfff24c09f30c9b5c730b1aabb1addb27a
2 changes: 2 additions & 0 deletions beacon-chain/rpc/eth/events/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go_library(
name = "go_default_library",
srcs = [
"events.go",
"log.go",
"server.go",
],
importpath = "github.com/prysmaticlabs/prysm/v5/beacon-chain/rpc/eth/events",
Expand Down Expand Up @@ -58,5 +59,6 @@ go_test(
"//testing/util:go_default_library",
"@com_github_ethereum_go_ethereum//common:go_default_library",
"@com_github_r3labs_sse_v2//:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
],
)
133 changes: 98 additions & 35 deletions beacon-chain/rpc/eth/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
eth "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/v5/runtime/version"
"github.com/prysmaticlabs/prysm/v5/time/slots"
log "github.com/sirupsen/logrus"
)

const DefaultEventFeedDepth = 1000
Expand Down Expand Up @@ -74,13 +73,6 @@ var (
errWriterUnusable = errors.New("http response writer is unusable")
)

// StreamingResponseWriter defines a type that can be used by the eventStreamer.
// This must be an http.ResponseWriter that supports flushing and hijacking.
type StreamingResponseWriter interface {
http.ResponseWriter
http.Flusher
}

// The eventStreamer uses lazyReaders to defer serialization until the moment the value is ready to be written to the client.
type lazyReader func() io.Reader

Expand Down Expand Up @@ -159,47 +151,53 @@ func (s *Server) StreamEvents(w http.ResponseWriter, r *http.Request) {
return
}

sw, ok := w.(StreamingResponseWriter)
if !ok {
msg := "beacon node misconfiguration: http stack may not support required response handling features, like flushing"
httputil.HandleError(w, msg, http.StatusInternalServerError)
return
timeout := s.EventWriteTimeout
if timeout == 0 {
timeout = time.Duration(params.BeaconConfig().SecondsPerSlot) * time.Second
}
depth := s.EventFeedDepth
if depth == 0 {
depth = DefaultEventFeedDepth
ka := s.KeepAliveInterval
if ka == 0 {
ka = timeout
}
es, err := newEventStreamer(depth, s.KeepAliveInterval)
if err != nil {
httputil.HandleError(w, err.Error(), http.StatusInternalServerError)
return
buffSize := s.EventFeedDepth
if buffSize == 0 {
buffSize = DefaultEventFeedDepth
}

api.SetSSEHeaders(w)
sw := NewStreamingResponseController(w, timeout)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
api.SetSSEHeaders(sw)
defer func() {
cancel()
}()
es := newEventStreamer(buffSize, ka)

go es.outboxWriteLoop(ctx, cancel, sw)
if err := es.recvEventLoop(ctx, cancel, topics, s); err != nil {
log.WithError(err).Debug("Shutting down StreamEvents handler.")
}
es.waitForCleanup()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for other reviewers, this is key to fixing the panic here. previously StreamEvents would exit and the outboxWriteLoop could still be writing to a writer that was cleaned up via regular HTTP processes. now there is this wait and inside the writes wrapped with deadlines

}

func newEventStreamer(buffSize int, ka time.Duration) (*eventStreamer, error) {
if ka == 0 {
ka = time.Duration(params.BeaconConfig().SecondsPerSlot) * time.Second
}
func newEventStreamer(buffSize int, ka time.Duration) *eventStreamer {
return &eventStreamer{
outbox: make(chan lazyReader, buffSize),
keepAlive: ka,
}, nil
cleanedUp: make(chan struct{}),
}
}

type eventStreamer struct {
outbox chan lazyReader
keepAlive time.Duration
cleanedUp chan struct{}
}

func (es *eventStreamer) recvEventLoop(ctx context.Context, cancel context.CancelFunc, req *topicRequest, s *Server) error {
defer close(es.outbox)
defer func() {
cancel()
}()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you need es.exit() here. Otherwise this function will return with an error, the Shutting down StreamEvents handler log will be printed and the handler will be stuck on es.waitForExit() indefinitely because outboxWriteLoop will just keep looping. On the other hand, I am not sure if there aren't negative consequences of exiting here instead of in outboxWriteLoop.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as discussed offline the context cancellation in recvEventLoop causes outboxWriteLoop to break out of the loop and call exit on its own.

eventsChan := make(chan *feed.Event, len(es.outbox))
if req.needOpsFeed {
opsSub := s.OperationNotifier.OperationFeed().Subscribe(eventsChan)
Expand Down Expand Up @@ -228,7 +226,6 @@ func (es *eventStreamer) recvEventLoop(ctx context.Context, cancel context.Cance
// channel should stay relatively empty, which gives this loop time to unsubscribe
// and cleanup before the event stream channel fills and disrupts other readers.
if err := es.safeWrite(ctx, lr); err != nil {
cancel()
// note: we could hijack the connection and close it here. Does that cause issues? What are the benefits?
// A benefit of hijack and close is that it may force an error on the remote end, however just closing the context of the
// http handler may be sufficient to cause the remote http response reader to close.
Expand Down Expand Up @@ -265,14 +262,17 @@ func newlineReader() io.Reader {

// outboxWriteLoop runs in a separate goroutine. Its job is to write the values in the outbox to
// the client as fast as the client can read them.
func (es *eventStreamer) outboxWriteLoop(ctx context.Context, cancel context.CancelFunc, w StreamingResponseWriter) {
func (es *eventStreamer) outboxWriteLoop(ctx context.Context, cancel context.CancelFunc, w *StreamingResponseWriterController) {
var err error
defer func() {
if err != nil {
log.WithError(err).Debug("Event streamer shutting down due to error.")
}
es.cleanup()
}()
defer func() {
cancel()
}()
defer cancel()
// Write a keepalive at the start to test the connection and simplify test setup.
if err = es.writeOutbox(ctx, w, nil); err != nil {
return
Expand Down Expand Up @@ -310,18 +310,36 @@ func (es *eventStreamer) outboxWriteLoop(ctx context.Context, cancel context.Can
}
}

func writeLazyReaderWithRecover(w StreamingResponseWriter, lr lazyReader) (err error) {
func (es *eventStreamer) cleanup() {
drained := 0
for range es.outbox {
drained += 1
}
log.WithField("undelivered_events", drained).Debug("Event stream outbox drained.")
close(es.cleanedUp)
}

func (es *eventStreamer) waitForCleanup() {
<-es.cleanedUp
}

func writeLazyReaderWithRecover(w *StreamingResponseWriterController, lr lazyReader) (err error) {
defer func() {
if r := recover(); r != nil {
log.WithField("panic", r).Error("Recovered from panic while writing event to client.")
err = errWriterUnusable
}
}()
_, err = io.Copy(w, lr())
r := lr()
out, err := io.ReadAll(r)
if err != nil {
return err
}
_, err = w.Write(out)
return err
}

func (es *eventStreamer) writeOutbox(ctx context.Context, w StreamingResponseWriter, first lazyReader) error {
func (es *eventStreamer) writeOutbox(ctx context.Context, w *StreamingResponseWriterController, first lazyReader) error {
needKeepAlive := true
if first != nil {
if err := writeLazyReaderWithRecover(w, first); err != nil {
Expand All @@ -347,8 +365,7 @@ func (es *eventStreamer) writeOutbox(ctx context.Context, w StreamingResponseWri
return err
}
}
w.Flush()
return nil
return w.Flush()
}
}
}
Expand Down Expand Up @@ -638,3 +655,49 @@ func (s *Server) currentPayloadAttributes(ctx context.Context) (lazyReader, erro
})
}, nil
}

func NewStreamingResponseController(rw http.ResponseWriter, timeout time.Duration) *StreamingResponseWriterController {
rc := http.NewResponseController(rw)
return &StreamingResponseWriterController{
timeout: timeout,
rw: rw,
rc: rc,
}
}

type StreamingResponseWriterController struct {
timeout time.Duration
rw http.ResponseWriter
rc *http.ResponseController
}

func (c *StreamingResponseWriterController) Write(b []byte) (int, error) {
if err := c.setDeadline(); err != nil {
return 0, err
}
out, err := c.rw.Write(b)
if err != nil {
return out, err
}
return out, c.clearDeadline()
}

func (c *StreamingResponseWriterController) setDeadline() error {
return c.rc.SetWriteDeadline(time.Now().Add(c.timeout))
}

func (c *StreamingResponseWriterController) clearDeadline() error {
return c.rc.SetWriteDeadline(time.Time{})
}

func (c *StreamingResponseWriterController) Flush() error {
if err := c.setDeadline(); err != nil {
return err
}
if err := c.rc.Flush(); err != nil {
return err
}
return c.clearDeadline()
}

var _ io.Writer = &StreamingResponseWriterController{}
Loading