Skip to content

Commit

Permalink
Node/CCQ: Solana min context slot support
Browse files Browse the repository at this point in the history
  • Loading branch information
bruce-riley committed Jan 26, 2024
1 parent 0c15b07 commit 400e0b0
Show file tree
Hide file tree
Showing 5 changed files with 301 additions and 12 deletions.
8 changes: 6 additions & 2 deletions node/pkg/query/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ const (
// RetryInterval specifies how long we will wait between retry intervals. This is the interval of our ticker.
RetryInterval = 10 * time.Second

// AuditInterval specifies how often to audit the list of pending queries.
AuditInterval = time.Second

// SignedQueryRequestChannelSize is the buffer size of the incoming query request channel.
SignedQueryRequestChannelSize = 50

Expand Down Expand Up @@ -105,7 +108,7 @@ func (qh *QueryHandler) Start(ctx context.Context) error {

// handleQueryRequests multiplexes observation requests to the appropriate chain
func (qh *QueryHandler) handleQueryRequests(ctx context.Context) error {
return handleQueryRequestsImpl(ctx, qh.logger, qh.signedQueryReqC, qh.chainQueryReqC, qh.allowedRequestors, qh.queryResponseReadC, qh.queryResponseWriteC, qh.env, RequestTimeout, RetryInterval)
return handleQueryRequestsImpl(ctx, qh.logger, qh.signedQueryReqC, qh.chainQueryReqC, qh.allowedRequestors, qh.queryResponseReadC, qh.queryResponseWriteC, qh.env, RequestTimeout, RetryInterval, AuditInterval)
}

// handleQueryRequestsImpl allows instantiating the handler in the test environment with shorter timeout and retry parameters.
Expand All @@ -120,6 +123,7 @@ func handleQueryRequestsImpl(
env common.Environment,
requestTimeoutImpl time.Duration,
retryIntervalImpl time.Duration,
auditIntervalImpl time.Duration,
) error {
qLogger := logger.With(zap.String("component", "ccqhandler"))
qLogger.Info("cross chain queries are enabled", zap.Any("allowedRequestors", allowedRequestors), zap.String("env", string(env)))
Expand Down Expand Up @@ -165,7 +169,7 @@ func handleQueryRequestsImpl(
}
}

ticker := time.NewTicker(retryIntervalImpl)
ticker := time.NewTicker(auditIntervalImpl)
defer ticker.Stop()

for {
Expand Down
3 changes: 2 additions & 1 deletion node/pkg/query/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ const (
// Speed things up for testing purposes.
requestTimeoutForTest = 100 * time.Millisecond
retryIntervalForTest = 10 * time.Millisecond
auditIntervalForTest = 10 * time.Millisecond
pollIntervalForTest = 5 * time.Millisecond
)

Expand Down Expand Up @@ -436,7 +437,7 @@ func createQueryHandlerForTestWithoutPublisher(t *testing.T, ctx context.Context

go func() {
err := handleQueryRequestsImpl(ctx, logger, md.signedQueryReqReadC, md.chainQueryReqC, ccqAllowedRequestersList,
md.queryResponseReadC, md.queryResponsePublicationWriteC, common.GoTest, requestTimeoutForTest, retryIntervalForTest)
md.queryResponseReadC, md.queryResponsePublicationWriteC, common.GoTest, requestTimeoutForTest, retryIntervalForTest, auditIntervalForTest)
assert.NoError(t, err)
}()

Expand Down
132 changes: 123 additions & 9 deletions node/pkg/watchers/solana/ccq.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,18 @@ package solana
import (
"context"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"strconv"
"time"

"go.uber.org/zap"

"github.com/certusone/wormhole/node/pkg/query"
"github.com/gagliardetto/solana-go"
"github.com/gagliardetto/solana-go/rpc"
"github.com/gagliardetto/solana-go/rpc/jsonrpc"
)

// ccqSendQueryResponse sends a response back to the query handler. In the case of an error, the response parameter may be nil.
Expand All @@ -37,7 +41,8 @@ func (w *SolanaWatcher) ccqHandleQuery(ctx context.Context, queryRequest *query.

switch req := queryRequest.Request.Query.(type) {
case *query.SolanaAccountQueryRequest:
w.ccqHandleSolanaAccountQueryRequest(ctx, queryRequest, req)
giveUpTime := start.Add(query.RetryInterval).Add(-250 * time.Millisecond)
w.ccqHandleSolanaAccountQueryRequest(ctx, queryRequest, req, giveUpTime, false)
default:
w.ccqLogger.Warn("received unsupported request type",
zap.Uint8("payload", uint8(queryRequest.Request.Query.Type())),
Expand All @@ -49,15 +54,17 @@ func (w *SolanaWatcher) ccqHandleQuery(ctx context.Context, queryRequest *query.
}

// ccqHandleSolanaAccountQueryRequest is the query handler for a sol_account request.
func (w *SolanaWatcher) ccqHandleSolanaAccountQueryRequest(ctx context.Context, queryRequest *query.PerChainQueryInternal, req *query.SolanaAccountQueryRequest) {
func (w *SolanaWatcher) ccqHandleSolanaAccountQueryRequest(ctx context.Context, queryRequest *query.PerChainQueryInternal, req *query.SolanaAccountQueryRequest, giveUpTime time.Time, isRetry bool) {
requestId := "sol_account:" + queryRequest.ID()
w.ccqLogger.Info("received a sol_account query",
zap.Uint64("minContextSlot", req.MinContextSlot),
zap.Uint64("dataSliceOffset", req.DataSliceOffset),
zap.Uint64("dataSliceLength", req.DataSliceLength),
zap.Int("numAccounts", len(req.Accounts)),
zap.String("requestId", requestId),
)
if !isRetry {
w.ccqLogger.Info("received a sol_account query",
zap.Uint64("minContextSlot", req.MinContextSlot),
zap.Uint64("dataSliceOffset", req.DataSliceOffset),
zap.Uint64("dataSliceLength", req.DataSliceLength),
zap.Int("numAccounts", len(req.Accounts)),
zap.String("requestId", requestId),
)
}

rCtx, cancel := context.WithTimeout(ctx, rpcTimeout)
defer cancel()
Expand Down Expand Up @@ -88,6 +95,10 @@ func (w *SolanaWatcher) ccqHandleSolanaAccountQueryRequest(ctx context.Context,
// Read the accounts.
info, err := w.getMultipleAccountsWithOpts(rCtx, accounts, &params)
if err != nil {
if w.ccqCheckForMinSlotContext(ctx, queryRequest, req, requestId, err, giveUpTime, !isRetry) {
// Return without posting a response because a go routine was created to handle it.
return
}
w.ccqLogger.Error("read failed for sol_account query request",
zap.String("requestId", requestId),
zap.Any("accounts", accounts),
Expand Down Expand Up @@ -182,6 +193,109 @@ func (w *SolanaWatcher) ccqHandleSolanaAccountQueryRequest(ctx context.Context,
w.ccqSendQueryResponse(queryRequest, query.QuerySuccess, resp)
}

// ccqCheckForMinSlotContext checks to see if the returned error was due to the min context slot not being reached. If so, and the estimated time in the future is not too great, it kicks off
// a go routine to sleep and do a retry. In that case, it returns true, telling the caller that it is handling the request so it should not post a response. Note that the go routine only does
// a single retry, but may result in another go routine being initiated to do another, and so on.
func (w *SolanaWatcher) ccqCheckForMinSlotContext(ctx context.Context, queryRequest *query.PerChainQueryInternal, req *query.SolanaAccountQueryRequest, requestId string, err error, giveUpTime time.Time, log bool) bool {
if req.MinContextSlot == 0 {
return false
}

if time.Now().After(giveUpTime) {
w.ccqLogger.Info("giving up on fast retry", zap.String("requestId", requestId))
return false
}

isMinContext, currentSlot, err := ccqIsMinContextSlotError(err)
if err != nil {
w.ccqLogger.Error("failed to parse for min context slot error", zap.Error(err))
return false
}

if !isMinContext {
return false
}

// Estimate how far in the future the requested slot is, assuming a slot time of 400 ms.
msInTheFuture := (req.MinContextSlot - currentSlot) * 400

// If the requested slot is more than ten seconds in the future, use the regular retry mechanism.
if msInTheFuture > 10000 {
w.ccqLogger.Info("minimum context slot is too far in the future, requesting slow retry",
zap.String("requestId", requestId),
zap.Uint64("currentSlot", currentSlot),
zap.Uint64("minContextSlot", req.MinContextSlot),
zap.Uint64("msInTheFuture", msInTheFuture),
)
return false
}

// Kick off the retry after a short delay.
go w.ccqSleepAndRetryAccountQuery(ctx, queryRequest, req, requestId, currentSlot, giveUpTime, log)
return true
}

const CCQ_FAST_RETRY_INTERVAL = 200

// ccqSleepAndRetryAccountQuery does a short sleep and then initiates a retry.
func (w *SolanaWatcher) ccqSleepAndRetryAccountQuery(ctx context.Context, queryRequest *query.PerChainQueryInternal, req *query.SolanaAccountQueryRequest, requestId string, currentSlot uint64, giveUpTime time.Time, log bool) {
if log {
w.ccqLogger.Info("minimum context slot has not been reached, will retry shortly",
zap.String("requestId", requestId),
zap.Uint64("currentSlot", currentSlot),
zap.Uint64("minContextSlot", req.MinContextSlot),
zap.Int("retryInterval", CCQ_FAST_RETRY_INTERVAL),
)
}

time.Sleep(CCQ_FAST_RETRY_INTERVAL * time.Millisecond)

if log {
w.ccqLogger.Info("initiating fast retry", zap.String("requestId", requestId))
}

w.ccqHandleSolanaAccountQueryRequest(ctx, queryRequest, req, giveUpTime, true)
}

// ccqIsMinContextSlotError parses an error to see if it is "Minimum context slot has not been reached". If it is, it returns the slot number
func ccqIsMinContextSlotError(err error) (bool, uint64, error) {
/*
A MinContextSlot error looks like this (and contains the context slot):
"(*jsonrpc.RPCError)(0xc00b3881b0)({\n Code: (int) -32016,\n Message: (string) (len=41) \"Minimum context slot has not been reached\",\n Data: (map[string]interface {}) (len=1) {\n (string) (len=11) \"contextSlot\": (json.Number) (len=4) \"3630\"\n }\n})\n"
*/
var rpcErr *jsonrpc.RPCError
if !errors.As(err, &rpcErr) {
return false, 0, nil // Some other kind of error. That's okay.
}

if rpcErr.Code != -32016 { // Minimum context slot has not been reached
return false, 0, nil // Some other kind of RPC error. That's okay.
}

// From here on down, any error is bad because the MinContextSlot error is not in the expected format.
m, ok := rpcErr.Data.(map[string]interface{})
if !ok {
return false, 0, fmt.Errorf("failed to extract data from min context slot error")
}

contextSlot, ok := m["contextSlot"]
if !ok {
return false, 0, fmt.Errorf(`min context slot error does not contain "contextSlot"`)
}

currentSlotAsJson, ok := contextSlot.(json.Number)
if !ok {
return false, 0, fmt.Errorf(`min context slot error "contextSlot" is not json.Number`)
}

currentSlot, typeErr := strconv.ParseUint(currentSlotAsJson.String(), 10, 64)
if typeErr != nil {
return false, 0, fmt.Errorf(`min context slot error "contextSlot" is not uint64: %w`, err)
}

return true, currentSlot, nil
}

type M map[string]interface{}

// getMultipleAccountsWithOpts is a work-around for the fact that the library call doesn't honor MinContextSlot.
Expand Down
98 changes: 98 additions & 0 deletions node/pkg/watchers/solana/ccq_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package solana

import (
"encoding/json"
"fmt"
"strings"
"testing"

"github.com/gagliardetto/solana-go/rpc/jsonrpc"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestCcqIsMinContextSlotErrorSuccess(t *testing.T) {
myErr := &jsonrpc.RPCError{
Code: -32016,
Message: "Minimum context slot has not been reached",
Data: map[string]interface{}{
"contextSlot": json.Number("13526"),
},
}

isMinContext, currentSlot, err := ccqIsMinContextSlotError(error(myErr))
require.NoError(t, err)
require.True(t, isMinContext)
assert.Equal(t, uint64(13526), currentSlot)
}

func TestCcqIsMinContextSlotErrorSomeOtherError(t *testing.T) {
myErr := fmt.Errorf("Some other error")
isMinContext, _, err := ccqIsMinContextSlotError(error(myErr))
require.NoError(t, err)
require.False(t, isMinContext)
}

func TestCcqIsMinContextSlotErrorSomeOtherRPCError(t *testing.T) {
myErr := &jsonrpc.RPCError{
Code: -32000,
Message: "Some other RPC error",
Data: map[string]interface{}{
"contextSlot": json.Number("13526"),
},
}

isMinContext, _, err := ccqIsMinContextSlotError(error(myErr))
require.NoError(t, err)
require.False(t, isMinContext)
}

func TestCcqIsMinContextSlotErrorNoData(t *testing.T) {
myErr := &jsonrpc.RPCError{
Code: -32016,
Message: "Minimum context slot has not been reached",
}

_, _, err := ccqIsMinContextSlotError(error(myErr))
assert.EqualError(t, err, `failed to extract data from min context slot error`)
}

func TestCcqIsMinContextSlotErrorContextSlotMissing(t *testing.T) {
myErr := &jsonrpc.RPCError{
Code: -32016,
Message: "Minimum context slot has not been reached",
Data: map[string]interface{}{
"someOtherField": json.Number("13526"),
},
}

_, _, err := ccqIsMinContextSlotError(error(myErr))
assert.EqualError(t, err, `min context slot error does not contain "contextSlot"`)
}

func TestCcqIsMinContextSlotErrorContextSlotIsNotAJsonNumber(t *testing.T) {
myErr := &jsonrpc.RPCError{
Code: -32016,
Message: "Minimum context slot has not been reached",
Data: map[string]interface{}{
"contextSlot": "13526",
},
}

_, _, err := ccqIsMinContextSlotError(error(myErr))
assert.EqualError(t, err, `min context slot error "contextSlot" is not json.Number`)
}

func TestCcqIsMinContextSlotErrorContextSlotIsNotUint64(t *testing.T) {
myErr := &jsonrpc.RPCError{
Code: -32016,
Message: "Minimum context slot has not been reached",
Data: map[string]interface{}{
"contextSlot": json.Number("HelloWorld"),
},
}

_, _, err := ccqIsMinContextSlotError(error(myErr))
assert.True(t, strings.Contains(err.Error(), `min context slot error "contextSlot" is not uint64`))
}
Loading

0 comments on commit 400e0b0

Please sign in to comment.