Skip to content

Commit 970dd4c

Browse files
committed
Support SSE proxy
1 parent 3d497f0 commit 970dd4c

File tree

10 files changed

+187
-2
lines changed

10 files changed

+187
-2
lines changed

app/eth2wrap/eth2wrap_gen.go

Lines changed: 5 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

app/eth2wrap/httpwrap.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,3 +135,7 @@ func (h *httpAdapter) Proxy(ctx context.Context, req *http.Request) (*http.Respo
135135
log.Debug(ctx, "Proxying request to beacon node", z.Any("url", h.address))
136136
return h.Service.Proxy(ctx, req)
137137
}
138+
139+
func (h *httpAdapter) Headers() map[string]string {
140+
return h.headers
141+
}

app/eth2wrap/lazy.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,15 @@ func (l *lazy) Address() string {
120120
return cl.Address()
121121
}
122122

123+
func (l *lazy) Headers() map[string]string {
124+
cl, ok := l.getClient()
125+
if !ok {
126+
return nil
127+
}
128+
129+
return cl.Headers()
130+
}
131+
123132
func (l *lazy) IsActive() bool {
124133
cl, ok := l.getClient()
125134
if !ok {

app/eth2wrap/mocks/client.go

Lines changed: 20 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

app/eth2wrap/multi.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,13 @@ func (m multi) Address() string {
5959
return address
6060
}
6161

62+
func (m multi) Headers() map[string]string {
63+
if len(m.clients) == 0 {
64+
return nil
65+
}
66+
return m.clients[0].Headers()
67+
}
68+
6269
func (m multi) IsActive() bool {
6370
for _, cl := range m.clients {
6471
if cl.IsActive() {

core/validatorapi/mocks/handler.go

Lines changed: 38 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

core/validatorapi/router.go

Lines changed: 84 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,11 @@ import (
1111
"encoding/json"
1212
"fmt"
1313
"io"
14+
stdlog "log"
1415
"maps"
1516
"math"
1617
"net/http"
18+
"net/http/httputil"
1719
"net/url"
1820
"slices"
1921
"strconv"
@@ -83,6 +85,11 @@ type Handler interface {
8385
eth2client.ValidatorRegistrationsSubmitter
8486
eth2client.VoluntaryExitSubmitter
8587
// Above sorted alphabetically.
88+
89+
// Address returns the address of the beacon node.
90+
Address() string
91+
// Headers returns custom headers to include in requests to the beacon node.
92+
Headers() map[string]string
8693
}
8794

8895
// NewRouter returns a new validator http server router. The http router
@@ -1599,12 +1606,21 @@ func nodeVersion(p eth2client.NodeVersionProvider) handlerFunc {
15991606
}
16001607
}
16011608

1602-
func proxy(p eth2client.ProxyProvider) http.HandlerFunc {
1609+
func proxy(h Handler) http.HandlerFunc {
16031610
return func(w http.ResponseWriter, r *http.Request) {
16041611
ctx := r.Context()
16051612
ctx = log.WithTopic(ctx, "vapi")
16061613
ctx = log.WithCtx(ctx, z.Str("vapi_proxy_method", r.Method), z.Str("vapi_proxy_path", r.URL.Path))
16071614
ctx = withCtxDuration(ctx)
1615+
1616+
// For SSE endpoints (/events), use direct reverse proxy to beacon node
1617+
if strings.HasSuffix(r.URL.Path, "/events") {
1618+
addr := h.Address()
1619+
proxyToBeaconNode(ctx, w, r, addr, h.Headers())
1620+
return
1621+
}
1622+
1623+
// For non-SSE endpoints, use eth2wrap logic with timeout
16081624
ctx, cancel := context.WithTimeout(ctx, defaultRequestTimeout)
16091625

16101626
defer func() {
@@ -1616,7 +1632,7 @@ func proxy(p eth2client.ProxyProvider) http.HandlerFunc {
16161632
cancel()
16171633
}()
16181634

1619-
res, err := p.Proxy(ctx, r)
1635+
res, err := h.Proxy(ctx, r)
16201636
if err != nil {
16211637
writeError(ctx, w, r.URL.Path, err)
16221638
return
@@ -1641,6 +1657,72 @@ func proxy(p eth2client.ProxyProvider) http.HandlerFunc {
16411657
}
16421658
}
16431659

1660+
// writeFlusher is copied from /net/http/httputil/reverseproxy.go.
1661+
// It is required to flush streaming responses.
1662+
type writeFlusher interface {
1663+
http.ResponseWriter
1664+
http.Flusher
1665+
}
1666+
1667+
// proxyResponseWriter wraps the writeFlusher interface and instruments errors.
1668+
type proxyResponseWriter struct {
1669+
writeFlusher
1670+
}
1671+
1672+
func (w proxyResponseWriter) WriteHeader(statusCode int) {
1673+
if statusCode/100 == 2 {
1674+
// 2XX isn't an error
1675+
return
1676+
}
1677+
1678+
incAPIErrors("proxy", statusCode)
1679+
w.writeFlusher.WriteHeader(statusCode)
1680+
}
1681+
1682+
// proxyToBeaconNode directly reverse proxies the request to the beacon node.
1683+
// This is used for SSE endpoints which require a persistent connection.
1684+
func proxyToBeaconNode(ctx context.Context, w http.ResponseWriter, r *http.Request, beaconNodeAddr string, headers map[string]string) {
1685+
targetURL, err := url.ParseRequestURI(beaconNodeAddr)
1686+
if err != nil {
1687+
log.Error(ctx, "Failed to parse beacon node address for proxying", err, z.Str("address", beaconNodeAddr))
1688+
writeError(ctx, w, "proxy", err)
1689+
return
1690+
}
1691+
1692+
proxy := httputil.NewSingleHostReverseProxy(targetURL)
1693+
1694+
// Extend default proxy director with basic auth and host header
1695+
defaultDirector := proxy.Director
1696+
proxy.Director = func(req *http.Request) {
1697+
if targetURL.User != nil {
1698+
password, _ := targetURL.User.Password()
1699+
req.SetBasicAuth(targetURL.User.Username(), password)
1700+
}
1701+
1702+
req.Host = targetURL.Host
1703+
defaultDirector(req)
1704+
1705+
// Apply custom beacon node headers (e.g., authentication)
1706+
for k, v := range headers {
1707+
req.Header.Set(k, v)
1708+
}
1709+
}
1710+
1711+
proxy.ErrorLog = stdlog.New(io.Discard, "", 0)
1712+
1713+
// Clone request with the provided context for soft shutdown support
1714+
clonedReq := r.Clone(ctx)
1715+
1716+
log.Debug(ctx, "Reverse proxying SSE request to beacon node",
1717+
z.Str("method", clonedReq.Method),
1718+
z.Str("path", clonedReq.URL.Path))
1719+
1720+
defer observeProxyAPILatency(clonedReq.URL.Path)()
1721+
defer observeAPILatency("proxy")()
1722+
1723+
proxy.ServeHTTP(proxyResponseWriter{w.(writeFlusher)}, clonedReq)
1724+
}
1725+
16441726
// writeError writes a http json error response object.
16451727
func writeError(ctx context.Context, w http.ResponseWriter, endpoint string, err error) {
16461728
if ctx.Err() != nil {

core/validatorapi/router_internal_test.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2191,6 +2191,14 @@ func (h testHandler) Proxy(ctx context.Context, req *http.Request) (*http.Respon
21912191
return h.ProxyFunc(ctx, req)
21922192
}
21932193

2194+
func (h testHandler) Address() string {
2195+
return "http://mock-beacon-node"
2196+
}
2197+
2198+
func (h testHandler) Headers() map[string]string {
2199+
return nil // Test handler doesn't use custom headers
2200+
}
2201+
21942202
// newBeaconHandler returns a mock beacon node handler. It registers a few mock handlers required by the
21952203
// eth2http service on startup, all other requests are routed to ProxyHandler if not nil.
21962204
func (h testHandler) newBeaconHandler(t *testing.T) http.Handler {

core/validatorapi/validatorapi.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1304,6 +1304,14 @@ func (c Component) Proxy(ctx context.Context, req *http.Request) (*http.Response
13041304
return c.eth2Cl.Proxy(ctx, req)
13051305
}
13061306

1307+
func (c Component) Address() string {
1308+
return c.eth2Cl.Address()
1309+
}
1310+
1311+
func (c Component) Headers() map[string]string {
1312+
return c.eth2Cl.Headers()
1313+
}
1314+
13071315
// wrapResponse wraps the provided data into an API Response and returns the response.
13081316
func wrapResponse[T any](data T) *eth2api.Response[T] {
13091317
return &eth2api.Response[T]{Data: data}

testutil/beaconmock/beaconmock.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -463,6 +463,10 @@ func (m Mock) Address() string {
463463
return "http://" + m.httpServer.Addr
464464
}
465465

466+
func (m Mock) Headers() map[string]string {
467+
return nil // Mock doesn't use custom headers
468+
}
469+
466470
func (m Mock) IsActive() bool {
467471
return m.IsActiveFunc()
468472
}

0 commit comments

Comments
 (0)