Skip to content

Commit

Permalink
Node/CCQ: Solana min context slot support (#3747)
Browse files Browse the repository at this point in the history
* Node/CCQ: Solana min context slot support

* Code review rework

* Add port number to solana test URL
  • Loading branch information
bruce-riley authored Jan 27, 2024
1 parent 5f54773 commit 10b83f7
Show file tree
Hide file tree
Showing 5 changed files with 326 additions and 13 deletions.
10 changes: 7 additions & 3 deletions node/pkg/query/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,12 @@ const (
// RequestTimeout indicates how long before a request is considered to have timed out.
RequestTimeout = 1 * time.Minute

// RetryInterval specifies how long we will wait between retry intervals. This is the interval of our ticker.
// RetryInterval specifies how long we will wait between retry intervals.
RetryInterval = 10 * time.Second

// AuditInterval specifies how often to audit the list of pending queries.
AuditInterval = time.Second

// SignedQueryRequestChannelSize is the buffer size of the incoming query request channel.
SignedQueryRequestChannelSize = 50

Expand Down Expand Up @@ -105,7 +108,7 @@ func (qh *QueryHandler) Start(ctx context.Context) error {

// handleQueryRequests multiplexes observation requests to the appropriate chain
func (qh *QueryHandler) handleQueryRequests(ctx context.Context) error {
return handleQueryRequestsImpl(ctx, qh.logger, qh.signedQueryReqC, qh.chainQueryReqC, qh.allowedRequestors, qh.queryResponseReadC, qh.queryResponseWriteC, qh.env, RequestTimeout, RetryInterval)
return handleQueryRequestsImpl(ctx, qh.logger, qh.signedQueryReqC, qh.chainQueryReqC, qh.allowedRequestors, qh.queryResponseReadC, qh.queryResponseWriteC, qh.env, RequestTimeout, RetryInterval, AuditInterval)
}

// handleQueryRequestsImpl allows instantiating the handler in the test environment with shorter timeout and retry parameters.
Expand All @@ -120,6 +123,7 @@ func handleQueryRequestsImpl(
env common.Environment,
requestTimeoutImpl time.Duration,
retryIntervalImpl time.Duration,
auditIntervalImpl time.Duration,
) error {
qLogger := logger.With(zap.String("component", "ccqhandler"))
qLogger.Info("cross chain queries are enabled", zap.Any("allowedRequestors", allowedRequestors), zap.String("env", string(env)))
Expand Down Expand Up @@ -165,7 +169,7 @@ func handleQueryRequestsImpl(
}
}

ticker := time.NewTicker(retryIntervalImpl)
ticker := time.NewTicker(auditIntervalImpl)
defer ticker.Stop()

for {
Expand Down
3 changes: 2 additions & 1 deletion node/pkg/query/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ const (
// Speed things up for testing purposes.
requestTimeoutForTest = 100 * time.Millisecond
retryIntervalForTest = 10 * time.Millisecond
auditIntervalForTest = 10 * time.Millisecond
pollIntervalForTest = 5 * time.Millisecond
)

Expand Down Expand Up @@ -436,7 +437,7 @@ func createQueryHandlerForTestWithoutPublisher(t *testing.T, ctx context.Context

go func() {
err := handleQueryRequestsImpl(ctx, logger, md.signedQueryReqReadC, md.chainQueryReqC, ccqAllowedRequestersList,
md.queryResponseReadC, md.queryResponsePublicationWriteC, common.GoTest, requestTimeoutForTest, retryIntervalForTest)
md.queryResponseReadC, md.queryResponsePublicationWriteC, common.GoTest, requestTimeoutForTest, retryIntervalForTest, auditIntervalForTest)
assert.NoError(t, err)
}()

Expand Down
151 changes: 142 additions & 9 deletions node/pkg/watchers/solana/ccq.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,30 @@ package solana
import (
"context"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"strconv"
"time"

"go.uber.org/zap"

"github.com/certusone/wormhole/node/pkg/query"
"github.com/gagliardetto/solana-go"
"github.com/gagliardetto/solana-go/rpc"
"github.com/gagliardetto/solana-go/rpc/jsonrpc"
)

const (
// CCQ_RETRY_SLOP gets subtracted from the query retry interval to determine how long we can continue fast retries.
// We don't want the fast retry time to be too close to the query retry interval.
CCQ_RETRY_SLOP = 250 * time.Millisecond

// CCQ_ESTIMATED_SLOT_TIME is the estimated Solana slot time used for estimating how long until the MinContextSlot will be reached.
CCQ_ESTIMATED_SLOT_TIME = 400 * time.Millisecond

// CCQ_FAST_RETRY_INTERVAL is how long we sleep between fast retry attempts.
CCQ_FAST_RETRY_INTERVAL = 200 * time.Millisecond
)

// ccqSendQueryResponse sends a response back to the query handler. In the case of an error, the response parameter may be nil.
Expand All @@ -37,7 +53,8 @@ func (w *SolanaWatcher) ccqHandleQuery(ctx context.Context, queryRequest *query.

switch req := queryRequest.Request.Query.(type) {
case *query.SolanaAccountQueryRequest:
w.ccqHandleSolanaAccountQueryRequest(ctx, queryRequest, req)
giveUpTime := start.Add(query.RetryInterval).Add(-CCQ_RETRY_SLOP)
w.ccqHandleSolanaAccountQueryRequest(ctx, queryRequest, req, giveUpTime, false)
default:
w.ccqLogger.Warn("received unsupported request type",
zap.Uint8("payload", uint8(queryRequest.Request.Query.Type())),
Expand All @@ -49,15 +66,17 @@ func (w *SolanaWatcher) ccqHandleQuery(ctx context.Context, queryRequest *query.
}

// ccqHandleSolanaAccountQueryRequest is the query handler for a sol_account request.
func (w *SolanaWatcher) ccqHandleSolanaAccountQueryRequest(ctx context.Context, queryRequest *query.PerChainQueryInternal, req *query.SolanaAccountQueryRequest) {
func (w *SolanaWatcher) ccqHandleSolanaAccountQueryRequest(ctx context.Context, queryRequest *query.PerChainQueryInternal, req *query.SolanaAccountQueryRequest, giveUpTime time.Time, isRetry bool) {
requestId := "sol_account:" + queryRequest.ID()
w.ccqLogger.Info("received a sol_account query",
zap.Uint64("minContextSlot", req.MinContextSlot),
zap.Uint64("dataSliceOffset", req.DataSliceOffset),
zap.Uint64("dataSliceLength", req.DataSliceLength),
zap.Int("numAccounts", len(req.Accounts)),
zap.String("requestId", requestId),
)
if !isRetry {
w.ccqLogger.Info("received a sol_account query",
zap.Uint64("minContextSlot", req.MinContextSlot),
zap.Uint64("dataSliceOffset", req.DataSliceOffset),
zap.Uint64("dataSliceLength", req.DataSliceLength),
zap.Int("numAccounts", len(req.Accounts)),
zap.String("requestId", requestId),
)
}

rCtx, cancel := context.WithTimeout(ctx, rpcTimeout)
defer cancel()
Expand Down Expand Up @@ -88,6 +107,10 @@ func (w *SolanaWatcher) ccqHandleSolanaAccountQueryRequest(ctx context.Context,
// Read the accounts.
info, err := w.getMultipleAccountsWithOpts(rCtx, accounts, &params)
if err != nil {
if w.ccqCheckForMinSlotContext(ctx, queryRequest, req, requestId, err, giveUpTime, !isRetry) {
// Return without posting a response because a go routine was created to handle it.
return
}
w.ccqLogger.Error("read failed for sol_account query request",
zap.String("requestId", requestId),
zap.Any("accounts", accounts),
Expand Down Expand Up @@ -182,6 +205,116 @@ func (w *SolanaWatcher) ccqHandleSolanaAccountQueryRequest(ctx context.Context,
w.ccqSendQueryResponse(queryRequest, query.QuerySuccess, resp)
}

// ccqCheckForMinSlotContext checks to see if the returned error was due to the min context slot not being reached.
// If so, and the estimated time in the future is not too great, it kicks off a go routine to sleep and do a retry.
// In that case, it returns true, telling the caller that it is handling the request so it should not post a response.
// Note that the go routine only does a single retry, but may result in another go routine being initiated to do another, and so on.
func (w *SolanaWatcher) ccqCheckForMinSlotContext(
ctx context.Context,
queryRequest *query.PerChainQueryInternal,
req *query.SolanaAccountQueryRequest,
requestId string,
err error,
giveUpTime time.Time,
log bool,
) bool {
if req.MinContextSlot == 0 {
return false
}

if time.Now().After(giveUpTime) {
w.ccqLogger.Info("giving up on fast retry", zap.String("requestId", requestId))
return false
}

isMinContext, currentSlot, err := ccqIsMinContextSlotError(err)
if err != nil {
w.ccqLogger.Error("failed to parse for min context slot error", zap.Error(err))
return false
}

if !isMinContext {
return false
}

// Estimate how far in the future the requested slot is, using our estimated slot time.
futureSlotEstimate := time.Duration(req.MinContextSlot-currentSlot) * CCQ_ESTIMATED_SLOT_TIME

// If the requested slot is more than ten seconds in the future, use the regular retry mechanism.
if futureSlotEstimate > query.RetryInterval {
w.ccqLogger.Info("minimum context slot is too far in the future, requesting slow retry",
zap.String("requestId", requestId),
zap.Uint64("currentSlot", currentSlot),
zap.Uint64("minContextSlot", req.MinContextSlot),
zap.Stringer("futureSlotEstimate", futureSlotEstimate),
)
return false
}

// Kick off the retry after a short delay.
go w.ccqSleepAndRetryAccountQuery(ctx, queryRequest, req, requestId, currentSlot, giveUpTime, log)
return true
}

// ccqSleepAndRetryAccountQuery does a short sleep and then initiates a retry.
func (w *SolanaWatcher) ccqSleepAndRetryAccountQuery(ctx context.Context, queryRequest *query.PerChainQueryInternal, req *query.SolanaAccountQueryRequest, requestId string, currentSlot uint64, giveUpTime time.Time, log bool) {
if log {
w.ccqLogger.Info("minimum context slot has not been reached, will retry shortly",
zap.String("requestId", requestId),
zap.Uint64("currentSlot", currentSlot),
zap.Uint64("minContextSlot", req.MinContextSlot),
zap.Stringer("retryInterval", CCQ_FAST_RETRY_INTERVAL),
)
}

time.Sleep(CCQ_FAST_RETRY_INTERVAL)

if log {
w.ccqLogger.Info("initiating fast retry", zap.String("requestId", requestId))
}

w.ccqHandleSolanaAccountQueryRequest(ctx, queryRequest, req, giveUpTime, true)
}

// ccqIsMinContextSlotError parses an error to see if it is "Minimum context slot has not been reached". If it is, it returns the slot number
func ccqIsMinContextSlotError(err error) (bool, uint64, error) {
/*
A MinContextSlot error looks like this (and contains the context slot):
"(*jsonrpc.RPCError)(0xc00b3881b0)({\n Code: (int) -32016,\n Message: (string) (len=41) \"Minimum context slot has not been reached\",\n Data: (map[string]interface {}) (len=1) {\n (string) (len=11) \"contextSlot\": (json.Number) (len=4) \"3630\"\n }\n})\n"
*/
var rpcErr *jsonrpc.RPCError
if !errors.As(err, &rpcErr) {
return false, 0, nil // Some other kind of error. That's okay.
}

if rpcErr.Code != -32016 { // Minimum context slot has not been reached
return false, 0, nil // Some other kind of RPC error. That's okay.
}

// From here on down, any error is bad because the MinContextSlot error is not in the expected format.
m, ok := rpcErr.Data.(map[string]interface{})
if !ok {
return false, 0, fmt.Errorf("failed to extract data from min context slot error")
}

contextSlot, ok := m["contextSlot"]
if !ok {
return false, 0, fmt.Errorf(`min context slot error does not contain "contextSlot"`)
}

currentSlotAsJson, ok := contextSlot.(json.Number)
if !ok {
return false, 0, fmt.Errorf(`min context slot error "contextSlot" is not json.Number`)
}

currentSlot, typeErr := strconv.ParseUint(currentSlotAsJson.String(), 10, 64)
if typeErr != nil {
return false, 0, fmt.Errorf(`min context slot error "contextSlot" is not uint64: %w`, err)
}

return true, currentSlot, nil
}

type M map[string]interface{}

// getMultipleAccountsWithOpts is a work-around for the fact that the library call doesn't honor MinContextSlot.
Expand Down
103 changes: 103 additions & 0 deletions node/pkg/watchers/solana/ccq_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package solana

import (
"encoding/json"
"fmt"
"strings"
"testing"

"github.com/certusone/wormhole/node/pkg/query"
"github.com/gagliardetto/solana-go/rpc/jsonrpc"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestRetrySlopIsValid(t *testing.T) {
assert.Less(t, CCQ_RETRY_SLOP, query.RetryInterval)
}

func TestCcqIsMinContextSlotErrorSuccess(t *testing.T) {
myErr := &jsonrpc.RPCError{
Code: -32016,
Message: "Minimum context slot has not been reached",
Data: map[string]interface{}{
"contextSlot": json.Number("13526"),
},
}

isMinContext, currentSlot, err := ccqIsMinContextSlotError(error(myErr))
require.NoError(t, err)
require.True(t, isMinContext)
assert.Equal(t, uint64(13526), currentSlot)
}

func TestCcqIsMinContextSlotErrorSomeOtherError(t *testing.T) {
myErr := fmt.Errorf("Some other error")
isMinContext, _, err := ccqIsMinContextSlotError(error(myErr))
require.NoError(t, err)
require.False(t, isMinContext)
}

func TestCcqIsMinContextSlotErrorSomeOtherRPCError(t *testing.T) {
myErr := &jsonrpc.RPCError{
Code: -32000,
Message: "Some other RPC error",
Data: map[string]interface{}{
"contextSlot": json.Number("13526"),
},
}

isMinContext, _, err := ccqIsMinContextSlotError(error(myErr))
require.NoError(t, err)
require.False(t, isMinContext)
}

func TestCcqIsMinContextSlotErrorNoData(t *testing.T) {
myErr := &jsonrpc.RPCError{
Code: -32016,
Message: "Minimum context slot has not been reached",
}

_, _, err := ccqIsMinContextSlotError(error(myErr))
assert.EqualError(t, err, `failed to extract data from min context slot error`)
}

func TestCcqIsMinContextSlotErrorContextSlotMissing(t *testing.T) {
myErr := &jsonrpc.RPCError{
Code: -32016,
Message: "Minimum context slot has not been reached",
Data: map[string]interface{}{
"someOtherField": json.Number("13526"),
},
}

_, _, err := ccqIsMinContextSlotError(error(myErr))
assert.EqualError(t, err, `min context slot error does not contain "contextSlot"`)
}

func TestCcqIsMinContextSlotErrorContextSlotIsNotAJsonNumber(t *testing.T) {
myErr := &jsonrpc.RPCError{
Code: -32016,
Message: "Minimum context slot has not been reached",
Data: map[string]interface{}{
"contextSlot": "13526",
},
}

_, _, err := ccqIsMinContextSlotError(error(myErr))
assert.EqualError(t, err, `min context slot error "contextSlot" is not json.Number`)
}

func TestCcqIsMinContextSlotErrorContextSlotIsNotUint64(t *testing.T) {
myErr := &jsonrpc.RPCError{
Code: -32016,
Message: "Minimum context slot has not been reached",
Data: map[string]interface{}{
"contextSlot": json.Number("HelloWorld"),
},
}

_, _, err := ccqIsMinContextSlotError(error(myErr))
assert.True(t, strings.Contains(err.Error(), `min context slot error "contextSlot" is not uint64`))
}
Loading

0 comments on commit 10b83f7

Please sign in to comment.