Skip to content
6 changes: 3 additions & 3 deletions app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -565,7 +565,7 @@ func wireCoreWorkflow(ctx context.Context, life *lifecycle.Manager, conf Config,
return err
}

if err := wireVAPIRouter(ctx, life, conf.ValidatorAPIAddr, eth2Cl, vapi, vapiCalls, &conf); err != nil {
if err := wireVAPIRouter(life, conf.ValidatorAPIAddr, vapi, vapiCalls, &conf); err != nil {
return err
}

Expand Down Expand Up @@ -1084,10 +1084,10 @@ func createMockValidators(pubkeys []eth2p0.BLSPubKey) beaconmock.ValidatorSet {
}

// wireVAPIRouter constructs the validator API router and registers it with the life cycle manager.
func wireVAPIRouter(ctx context.Context, life *lifecycle.Manager, vapiAddr string, eth2Cl eth2wrap.Client,
func wireVAPIRouter(life *lifecycle.Manager, vapiAddr string,
handler validatorapi.Handler, vapiCalls func(), conf *Config,
) error {
vrouter, err := validatorapi.NewRouter(ctx, handler, eth2Cl, conf.BuilderAPI)
vrouter, err := validatorapi.NewRouter(handler, conf.BuilderAPI)
if err != nil {
return errors.Wrap(err, "new monitoring server")
}
Expand Down
4 changes: 1 addition & 3 deletions app/app_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"github.com/stretchr/testify/require"

"github.com/obolnetwork/charon/app/errors"
"github.com/obolnetwork/charon/app/eth2wrap/mocks"
"github.com/obolnetwork/charon/app/lifecycle"
"github.com/obolnetwork/charon/core"
vapimocks "github.com/obolnetwork/charon/core/validatorapi/mocks"
Expand All @@ -39,7 +38,6 @@ func TestWireVAPIRouterForTLS(t *testing.T) {
const testVersion = "v1.0.0"

life := new(lifecycle.Manager)
client := mocks.NewClient(t)
handler := vapimocks.NewHandler(t)
handler.On("NodeVersion", mock.Anything, mock.Anything).Run(func(args mock.Arguments) {
t.Log("NodeVersion called")
Expand All @@ -56,7 +54,7 @@ func TestWireVAPIRouterForTLS(t *testing.T) {

port := testutil.GetFreePort(t)
endpoint := fmt.Sprintf("localhost:%v", port)
err := wireVAPIRouter(t.Context(), life, endpoint, client, handler, vapiCalls, conf)
err := wireVAPIRouter(life, endpoint, handler, vapiCalls, conf)
require.NoError(t, err)

ctx, cancel := context.WithCancel(t.Context())
Expand Down
42 changes: 40 additions & 2 deletions app/eth2wrap/eth2wrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@ package eth2wrap
import (
"context"
"net"
"net/http"
"net/url"
"strings"
"sync"
"syscall"
"time"

eth2api "github.com/attestantio/go-eth2-client/api"
Expand Down Expand Up @@ -205,8 +207,8 @@ func provide[O any](ctx context.Context, clients []Client, fallbacks []Client,
}
output, err := runForkJoin(clients)

// Call fallback nodes when request to beacon node timeout or if it's syncing
if err != nil && len(fallbacks) != 0 && (isTimeoutError(err) || isSyncingError(err)) {
// Call fallback nodes when request to beacon node timeout or if it's syncing or beacon node is unreachable
if err != nil && len(fallbacks) != 0 && (isTimeoutError(err) || isSyncingError(err) || isBadGateway(err)) {
usingFallbackGauge.Set(1)
return runForkJoin(fallbacks)
}
Expand All @@ -228,6 +230,42 @@ func isSyncingError(err error) bool {
return strings.Contains(msg, "syncing")
}

// isBadGateway returns true when the error indicates a connectivity or upstream gateway issue.
func isBadGateway(err error) bool {
if err == nil {
return false
}

for current := err; current != nil; current = errors.Unwrap(current) {
if errno := new(syscall.Errno); errors.As(current, errno) {
switch *errno {
case syscall.ECONNREFUSED, syscall.ECONNRESET, syscall.EHOSTDOWN, syscall.EHOSTUNREACH, syscall.ENETDOWN, syscall.ENETUNREACH:
return true
default:
// Ignore other errno values
}
}
if errors.Is(current, http.ErrAbortHandler) {
return true
}

var apiErr *eth2api.Error
if errors.As(current, &apiErr) {
switch apiErr.StatusCode {
case http.StatusBadGateway, http.StatusServiceUnavailable, http.StatusGatewayTimeout:
return true
}
}

var netErr net.Error
if errors.As(current, &netErr) {
return true
}
}

return false
}

type empty struct{}

// submit proxies provide, but returns nil instead of a successful result.
Expand Down
12 changes: 12 additions & 0 deletions app/eth2wrap/eth2wrap_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions app/eth2wrap/genwrap/genwrap.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions app/eth2wrap/httpwrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package eth2wrap
import (
"context"
"encoding/hex"
"net/http"
"sync"
"testing"
"time"
Expand All @@ -15,6 +16,8 @@ import (
eth2p0 "github.com/attestantio/go-eth2-client/spec/phase0"

"github.com/obolnetwork/charon/app/errors"
"github.com/obolnetwork/charon/app/log"
"github.com/obolnetwork/charon/app/z"
"github.com/obolnetwork/charon/eth2util"
)

Expand Down Expand Up @@ -127,3 +130,8 @@ func (h *httpAdapter) Domain(ctx context.Context, domainType eth2p0.DomainType,

return h.Service.Domain(ctx, domainType, epoch)
}

func (h *httpAdapter) Proxy(ctx context.Context, req *http.Request) (*http.Response, error) {
log.Debug(ctx, "Proxying request to beacon node", z.Any("url", h.address))
return h.Service.Proxy(ctx, req)
}
13 changes: 13 additions & 0 deletions app/eth2wrap/lazy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package eth2wrap_test

import (
"context"
"net/http"
"testing"

"github.com/stretchr/testify/mock"
Expand Down Expand Up @@ -74,3 +75,15 @@ func TestLazy_SetValidatorCache(t *testing.T) {
l := eth2wrap.NewLazyForT(client)
l.SetValidatorCache(valCache)
}

func TestLazy_Proxy(t *testing.T) {
client := mocks.NewClient(t)
client.On("Proxy", mock.Anything, mock.Anything).Return(nil, nil).Once()

l := eth2wrap.NewLazyForT(client)

req, err := http.NewRequest("GET", "", nil)
require.NoError(t, err)
_, err = l.Proxy(t.Context(), req)
require.NoError(t, err)
}
31 changes: 31 additions & 0 deletions app/eth2wrap/mocks/client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

48 changes: 48 additions & 0 deletions app/eth2wrap/multi.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,12 @@
package eth2wrap

import (
"bytes"
"context"
"io"
"net/http"

"github.com/obolnetwork/charon/app/errors"
)

// NewMultiForT creates a new mutil client for testing.
Expand Down Expand Up @@ -119,3 +124,46 @@ func (m multi) CompleteValidators(ctx context.Context) (CompleteValidators, erro

return res0, err
}

func (m multi) Proxy(ctx context.Context, req *http.Request) (*http.Response, error) {
// Duplicate the request body so each backend gets an independent reader
// req.Clone(ctx) does NOT clone the body reader
var bodyBytes []byte
var hasBody bool
if req.Body != nil {
b, err := io.ReadAll(req.Body)
if err != nil {
return nil, errors.Wrap(err, "read request body")
}
// Close the original body
_ = req.Body.Close()
bodyBytes = b
hasBody = true
// Replace with reusable reader for safety
req.Body = io.NopCloser(bytes.NewReader(bodyBytes))
req.ContentLength = int64(len(bodyBytes))
req.GetBody = func() (io.ReadCloser, error) {
return io.NopCloser(bytes.NewReader(bodyBytes)), nil
}
}

res0, err := provide(ctx, m.clients, m.fallbacks,
func(ctx context.Context, args provideArgs) (*http.Response, error) {
cloned := req.Clone(ctx)
if hasBody {
cloned.Body = io.NopCloser(bytes.NewReader(bodyBytes))
cloned.ContentLength = int64(len(bodyBytes))
cloned.GetBody = func() (io.ReadCloser, error) {
return io.NopCloser(bytes.NewReader(bodyBytes)), nil
}
} else {
cloned.Body = nil
}
res, err := args.client.Proxy(ctx, cloned)
return res, err
},
nil, nil,
)

return res0, err
}
41 changes: 41 additions & 0 deletions app/eth2wrap/multi_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ package eth2wrap_test
import (
"context"
"errors"
"io"
"net/http"
"strings"
"testing"

"github.com/stretchr/testify/mock"
Expand Down Expand Up @@ -86,3 +89,41 @@ func TestMulti_SetValidatorCache(t *testing.T) {
m := eth2wrap.NewMultiForT([]eth2wrap.Client{client}, nil)
m.SetValidatorCache(valCache)
}

func TestMulti_Proxy(t *testing.T) {
client := mocks.NewClient(t)
client.On("Proxy", mock.Anything, mock.Anything).Return(nil, nil).Once()

m := eth2wrap.NewMultiForT([]eth2wrap.Client{client}, nil)

req, err := http.NewRequest("GET", "", nil)
require.NoError(t, err)

_, err = m.Proxy(t.Context(), req)
require.NoError(t, err)
}

func TestMulti_Proxy_ReadBody(t *testing.T) {
cl1 := mocks.NewClient(t)
cl1.On("Proxy", mock.Anything, mock.MatchedBy(func(req *http.Request) bool {
_, err := io.ReadAll(req.Body)
require.NoError(t, err)
return true
})).Return(nil, errors.New("syncing")).Once() // force fallback to also read body

cl2 := mocks.NewClient(t)
cl2.On("Proxy", mock.Anything, mock.MatchedBy(func(req *http.Request) bool {
_, err := io.ReadAll(req.Body)
require.NoError(t, err)
return true
})).Return(nil, nil).Once()

// Two clients reading the same body should not error since the body is duplicated for each backend.
m := eth2wrap.NewMultiForT([]eth2wrap.Client{cl1}, []eth2wrap.Client{cl2})
bodyReader := strings.NewReader("foo")
req, err := http.NewRequest("POST", "", bodyReader)
require.NoError(t, err)

_, err = m.Proxy(t.Context(), req)
require.NoError(t, err)
}
Loading
Loading