Skip to content

Commit

Permalink
fix(api): make stick sessions actually work and make them non-racy (#…
Browse files Browse the repository at this point in the history
…12665)

* fix(api): make stick sessions actually work and make them non-racy

We apparently have a way to specify that all "related" requests should
go to the same node. However:

1. It didn't work at all. All future requests would go to the first successful
node from the first request. Because that's how stack variables work.
2. It was racy if the context was re-used concurrently. But only the
first time, see point 1.

* test(api): test the API merge proxy

1. Test whether or not it works.
2. Test stickiness.

* fix(api): update OnSingleNode documentation
  • Loading branch information
Stebalien authored Nov 1, 2024
1 parent dcc903c commit 2657108
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 15 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
- Fix issue where F3 wouldn't start participating again if Lotus restarted without restarting the Miner ([filecoin-project/lotus#12640](https://github.com/filecoin-project/lotus/pull/12640)).
- Change the F3 HeadLookback parameter to 4 ([filecoin-project/lotus#12648](https://github.com/filecoin-project/lotus/pull/12648)).
- Upgrade go-f3 to 0.7.1 to resolve Tipset not found errors when trying to establish instance start time ([filecoin-project/lotus#12651](https://github.com/filecoin-project/lotus/pull/12651)).
- The mining loop will now correctly "stick" to the same upstream lotus node for all operations pertaining to mining a single block ([filecoin-project/lotus#12665](https://github.com/filecoin-project/lotus/pull/12665)).

## Deps

Expand Down
36 changes: 21 additions & 15 deletions cli/util/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"os/signal"
"reflect"
"strings"
"sync/atomic"
"syscall"
"time"

Expand Down Expand Up @@ -229,9 +230,12 @@ func GetFullNodeAPI(ctx *cli.Context) (v0api.FullNode, jsonrpc.ClientCloser, err

type contextKey string

// OnSingleNode is not thread safe
// OnSingleNode returns a modified context that, when passed to a method on a FullNodeProxy, will
// cause all calls to be directed at the same node when possible.
//
// Think "sticky sessions".
func OnSingleNode(ctx context.Context) context.Context {
return context.WithValue(ctx, contextKey("retry-node"), new(*int))
return context.WithValue(ctx, contextKey("retry-node"), new(atomic.Int32))
}

func FullNodeProxy[T api.FullNode](ins []T, outstr *api.FullNodeStruct) {
Expand Down Expand Up @@ -262,27 +266,29 @@ func FullNodeProxy[T api.FullNode](ins []T, outstr *api.FullNodeStruct) {

ctx := args[0].Interface().(context.Context)

curr := -1

// for calls that need to be performed on the same node
// primarily for miner when calling create block and submit block subsequently
key := contextKey("retry-node")
if ctx.Value(key) != nil {
if (*ctx.Value(key).(**int)) == nil {
*ctx.Value(key).(**int) = &curr
} else {
curr = **ctx.Value(key).(**int) - 1
}
var curr *atomic.Int32
if v, ok := ctx.Value(contextKey("retry-node")).(*atomic.Int32); ok {
curr = v
} else {
curr = new(atomic.Int32)
}

total := len(rins)
total := int32(len(rins))
result, _ := retry.Retry(ctx, 5, initialBackoff, errorsToRetry, func() ([]reflect.Value, error) {
curr = (curr + 1) % total

result := fns[curr].Call(args)
idx := curr.Load()
result := fns[idx].Call(args)
if result[len(result)-1].IsNil() {
return result, nil
}
// On failure, switch to the next node.
//
// We CAS instead of incrementing because this might have
// already been incremented by a concurrent call if we have
// a shared `curr` (we're sticky to a single node).
curr.CompareAndSwap(idx, (idx+1)%total)

e := result[len(result)-1].Interface().(error)
return result, e
})
Expand Down
71 changes: 71 additions & 0 deletions itests/merged_api_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package itests

import (
"context"
"testing"

"github.com/stretchr/testify/require"

"github.com/filecoin-project/go-state-types/big"

"github.com/filecoin-project/lotus/chain/types"
cliutil "github.com/filecoin-project/lotus/cli/util"
"github.com/filecoin-project/lotus/itests/kit"
)

func TestAPIMergeProxy(t *testing.T) {
ctx := context.Background()

// The default is too high for many nodes.
initialBalance := types.MustParseFIL("100000FIL")

nopts := []kit.NodeOpt{
kit.ThroughRPC(),
kit.WithAllSubsystems(),
kit.OwnerBalance(big.Int(initialBalance)),
}
ens := kit.NewEnsemble(t, kit.MockProofs())
nodes := make([]*kit.TestFullNode, 10)
for i := range nodes {
var nd kit.TestFullNode
ens.FullNode(&nd, nopts...)
nodes[i] = &nd
}
merged := kit.MergeFullNodes(nodes)

var miner kit.TestMiner
ens.Miner(&miner, merged, nopts...)

ens.Start()

nd1ID, err := nodes[0].ID(ctx)
require.NoError(t, err)
nd2ID, err := nodes[1].ID(ctx)
require.NoError(t, err)

// Expect to start on node 1, and switch to node 2 on failure.
mergedID, err := merged.ID(ctx)
require.NoError(t, err)
require.Equal(t, nd1ID, mergedID)
require.NoError(t, nodes[0].Stop(ctx))
mergedID, err = merged.ID(ctx)
require.NoError(t, err)
require.Equal(t, nd2ID, mergedID)

// Now see if sticky sessions work
stickyCtx := cliutil.OnSingleNode(ctx)
for i, nd := range nodes[1:] {
// kill off the previous node.
require.NoError(t, nodes[i].Stop(ctx))

got, err := merged.ID(stickyCtx)
require.NoError(t, err)
expected, err := nd.ID(ctx)
require.NoError(t, err)
require.Equal(t, expected, got)
}

// This should fail because we'll run out of retries because it's _not_ sticky!
_, err = merged.ID(ctx)
require.Error(t, err)
}

0 comments on commit 2657108

Please sign in to comment.