Skip to content

Commit

Permalink
Merge pull request #88805 from aadityasondhi/backport22.2-87533-88764
Browse files Browse the repository at this point in the history
release-22.2: sqlliveness: add timeouts to heartbeats
  • Loading branch information
aadityasondhi authored Sep 28, 2022
2 parents dc3139f + 741a058 commit 2ff878e
Show file tree
Hide file tree
Showing 5 changed files with 214 additions and 32 deletions.
28 changes: 13 additions & 15 deletions pkg/ccl/multitenantccl/tenantcostclient/tenant_side_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ func (ts *testState) request(
return ""
}

func (ts *testState) externalIngress(t *testing.T, d *datadriven.TestData, args cmdArgs) string {
func (ts *testState) externalIngress(t *testing.T, _ *datadriven.TestData, args cmdArgs) string {
usage := multitenant.ExternalIOUsage{IngressBytes: args.bytes}
if err := ts.controller.OnExternalIOWait(context.Background(), usage); err != nil {
t.Errorf("OnExternalIOWait error: %s", err)
Expand All @@ -341,12 +341,12 @@ func (ts *testState) externalEgress(t *testing.T, d *datadriven.TestData, args c
return ""
}

func (ts *testState) enableRUAccounting(t *testing.T, _ *datadriven.TestData, _ cmdArgs) string {
func (ts *testState) enableRUAccounting(_ *testing.T, _ *datadriven.TestData, _ cmdArgs) string {
tenantcostclient.ExternalIORUAccountingMode.Override(context.Background(), &ts.settings.SV, "on")
return ""
}

func (ts *testState) disableRUAccounting(t *testing.T, _ *datadriven.TestData, _ cmdArgs) string {
func (ts *testState) disableRUAccounting(_ *testing.T, _ *datadriven.TestData, _ cmdArgs) string {
tenantcostclient.ExternalIORUAccountingMode.Override(context.Background(), &ts.settings.SV, "off")
return ""
}
Expand Down Expand Up @@ -424,7 +424,7 @@ func (ts *testState) advance(t *testing.T, d *datadriven.TestData, args cmdArgs)

// waitForEvent waits until the tenant controller reports the given event
// type(s), at the current time.
func (ts *testState) waitForEvent(t *testing.T, d *datadriven.TestData, args cmdArgs) string {
func (ts *testState) waitForEvent(t *testing.T, d *datadriven.TestData, _ cmdArgs) string {
typs := make(map[string]tenantcostclient.TestEventType)
for ev, evStr := range eventTypeStr {
typs[evStr] = ev
Expand All @@ -444,7 +444,7 @@ func (ts *testState) waitForEvent(t *testing.T, d *datadriven.TestData, args cmd

// unblockRequest resumes a token bucket request that was blocked by the
// "blockRequest" configuration option.
func (ts *testState) unblockRequest(t *testing.T, d *datadriven.TestData, args cmdArgs) string {
func (ts *testState) unblockRequest(t *testing.T, _ *datadriven.TestData, _ cmdArgs) string {
ts.provider.unblockRequest(t)
return ""
}
Expand All @@ -461,7 +461,7 @@ func (ts *testState) unblockRequest(t *testing.T, d *datadriven.TestData, args c
// ----
// 00:00:01.000
// 00:00:02.000
func (ts *testState) timers(t *testing.T, d *datadriven.TestData, args cmdArgs) string {
func (ts *testState) timers(t *testing.T, d *datadriven.TestData, _ cmdArgs) string {
// If we are rewriting the test, just sleep a bit before returning the
// timers.
if d.Rewrite {
Expand Down Expand Up @@ -491,7 +491,7 @@ func timesToString(times []time.Time) string {
}

// configure the test provider.
func (ts *testState) configure(t *testing.T, d *datadriven.TestData, args cmdArgs) string {
func (ts *testState) configure(t *testing.T, d *datadriven.TestData, _ cmdArgs) string {
var cfg testProviderConfig
if err := yaml.UnmarshalStrict([]byte(d.Input), &cfg); err != nil {
d.Fatalf(t, "failed to parse request yaml: %v", err)
Expand All @@ -501,13 +501,13 @@ func (ts *testState) configure(t *testing.T, d *datadriven.TestData, args cmdArg
}

// tokenBucket dumps the current state of the tenant's token bucket.
func (ts *testState) tokenBucket(t *testing.T, d *datadriven.TestData, args cmdArgs) string {
func (ts *testState) tokenBucket(*testing.T, *datadriven.TestData, cmdArgs) string {
return tenantcostclient.TestingTokenBucketString(ts.controller)
}

// cpu adds CPU usage which will be observed by the controller on the next main
// loop tick.
func (ts *testState) cpu(t *testing.T, d *datadriven.TestData, args cmdArgs) string {
func (ts *testState) cpu(t *testing.T, d *datadriven.TestData, _ cmdArgs) string {
duration, err := time.ParseDuration(d.Input)
if err != nil {
d.Fatalf(t, "error parsing cpu duration: %v", err)
Expand All @@ -518,7 +518,7 @@ func (ts *testState) cpu(t *testing.T, d *datadriven.TestData, args cmdArgs) str

// pgwire adds PGWire egress usage which will be observed by the controller on the next
// main loop tick.
func (ts *testState) pgwireEgress(t *testing.T, d *datadriven.TestData, args cmdArgs) string {
func (ts *testState) pgwireEgress(t *testing.T, d *datadriven.TestData, _ cmdArgs) string {
bytes, err := strconv.Atoi(d.Input)
if err != nil {
d.Fatalf(t, "error parsing pgwire bytes value: %v", err)
Expand All @@ -529,7 +529,7 @@ func (ts *testState) pgwireEgress(t *testing.T, d *datadriven.TestData, args cmd

// usage prints out the latest consumption. Callers are responsible for
// triggering calls to the token bucket provider and waiting for responses.
func (ts *testState) usage(t *testing.T, d *datadriven.TestData, args cmdArgs) string {
func (ts *testState) usage(*testing.T, *datadriven.TestData, cmdArgs) string {
c := ts.provider.consumption()
return fmt.Sprintf(""+
"RU: %.2f\n"+
Expand Down Expand Up @@ -695,7 +695,7 @@ func (tp *testProvider) unblockRequest(t *testing.T) {

// TokenBucket implements the kvtenant.TokenBucketProvider interface.
func (tp *testProvider) TokenBucket(
ctx context.Context, in *roachpb.TokenBucketRequest,
_ context.Context, in *roachpb.TokenBucketRequest,
) (*roachpb.TokenBucketResponse, error) {
tp.mu.Lock()
defer tp.mu.Unlock()
Expand Down Expand Up @@ -929,8 +929,7 @@ func TestSQLLivenessExemption(t *testing.T) {
st := cluster.MakeTestingClusterSettings()
// Make the tenant heartbeat like crazy.
ctx := context.Background()
//slinstance.DefaultTTL.Override(ctx, &st.SV, 20*time.Millisecond)
slinstance.DefaultHeartBeat.Override(ctx, &st.SV, time.Millisecond)
slinstance.DefaultHeartBeat.Override(ctx, &st.SV, 10*time.Millisecond)

_, tenantDB := serverutils.StartTenant(t, hostServer, base.TestTenantArgs{
TenantID: tenantID,
Expand Down Expand Up @@ -960,7 +959,6 @@ func TestSQLLivenessExemption(t *testing.T) {

// Verify that heartbeats can go through and update the expiration time.
val := livenessValue()
time.Sleep(2 * time.Millisecond)
testutils.SucceedsSoon(
t,
func() error {
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/sqlliveness/slinstance/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ go_library(
"//pkg/settings",
"//pkg/settings/cluster",
"//pkg/sql/sqlliveness",
"//pkg/util/contextutil",
"//pkg/util/grpcutil",
"//pkg/util/hlc",
"//pkg/util/log",
Expand All @@ -18,6 +19,7 @@ go_library(
"//pkg/util/syncutil",
"//pkg/util/timeutil",
"//pkg/util/uuid",
"@com_github_cockroachdb_errors//:errors",
],
)

Expand All @@ -35,6 +37,7 @@ go_test(
"//pkg/settings/cluster",
"//pkg/sql/sqlliveness",
"//pkg/sql/sqlliveness/slstorage",
"//pkg/testutils",
"//pkg/util/hlc",
"//pkg/util/leaktest",
"//pkg/util/log",
Expand Down
58 changes: 45 additions & 13 deletions pkg/sql/sqlliveness/slinstance/slinstance.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/sqlliveness"
"github.com/cockroachdb/cockroach/pkg/util/contextutil"
"github.com/cockroachdb/cockroach/pkg/util/grpcutil"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand All @@ -29,6 +30,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
)

var (
Expand Down Expand Up @@ -151,15 +153,21 @@ func (l *Instance) setSession(s *session) {
}

func (l *Instance) clearSession(ctx context.Context) {
l.checkExpiry(ctx)
l.mu.Lock()
defer l.mu.Unlock()
l.mu.s = nil
l.mu.blockCh = make(chan struct{})
}

func (l *Instance) checkExpiry(ctx context.Context) {
l.mu.Lock()
defer l.mu.Unlock()
if expiration := l.mu.s.Expiration(); expiration.Less(l.clock.Now()) {
// If the session has expired, invoke the session expiry callbacks
// associated with the session.
l.mu.s.invokeSessionExpiryCallbacks(ctx)
}
l.mu.s = nil
l.mu.blockCh = make(chan struct{})
}

// createSession tries until it can create a new session and returns an error
Expand Down Expand Up @@ -252,9 +260,17 @@ func (l *Instance) heartbeatLoop(ctx context.Context) {
case <-t.C:
t.Read = true
s, _ := l.getSessionOrBlockCh()
// TODO(aaditya): consider combining `DefaultTTL` and `DefaultHeartBeat` into a single knob to make these
// timeouts less fragile
timeout := l.ttl()/2 + l.hb()
if s == nil {
newSession, err := l.createSession(ctx)
if err != nil {
var newSession *session
if err := contextutil.RunWithTimeout(ctx, "sqlliveness create session", timeout, func(ctx context.Context) error {
var err error
newSession, err = l.createSession(ctx)
return err
}); err != nil {
log.Errorf(ctx, "sqlliveness failed to create new session: %v", err)
func() {
l.mu.Lock()
defer l.mu.Unlock()
Expand All @@ -270,21 +286,37 @@ func (l *Instance) heartbeatLoop(ctx context.Context) {
t.Reset(l.hb())
continue
}
found, err := l.extendSession(ctx, s)
if err != nil {
var found bool
err := contextutil.RunWithTimeout(ctx, "sqlliveness extend session", timeout, func(ctx context.Context) error {
var err error
found, err = l.extendSession(ctx, s)
return err
})
switch {
case errors.HasType(err, (*contextutil.TimeoutError)(nil)):
// Retry without clearing the session because we don't know the current status.
l.checkExpiry(ctx)
t.Reset(0)
continue
case err != nil && ctx.Err() == nil:
log.Errorf(ctx, "sqlliveness failed to extend session: %v", err)
fallthrough
case err != nil:
// TODO(ajwerner): Decide whether we actually should exit the heartbeat loop here if the context is not
// canceled. Consider the case of an ambiguous result error: shouldn't we try again?
l.clearSession(ctx)
return
}
if !found {
case !found:
// No existing session found, immediately create one.
l.clearSession(ctx)
// Start next loop iteration immediately to insert a new session.
t.Reset(0)
continue
}
if log.V(2) {
log.Infof(ctx, "extended SQL liveness session %s", s.ID())
default:
if log.V(2) {
log.Infof(ctx, "extended SQL liveness session %s", s.ID())
}
t.Reset(l.hb())
}
t.Reset(l.hb())
}
}
}
Expand Down
116 changes: 114 additions & 2 deletions pkg/sql/sqlliveness/slinstance/slinstance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package slinstance_test

import (
"context"
"sync/atomic"
"testing"
"time"

Expand All @@ -20,6 +21,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sqlliveness"
"github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/slinstance"
"github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/slstorage"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand All @@ -40,8 +42,8 @@ func TestSQLInstance(t *testing.T) {
clusterversion.TestingBinaryVersion,
clusterversion.TestingBinaryMinSupportedVersion,
true /* initializeVersion */)
slinstance.DefaultTTL.Override(ctx, &settings.SV, 2*time.Microsecond)
slinstance.DefaultHeartBeat.Override(ctx, &settings.SV, time.Microsecond)
slinstance.DefaultTTL.Override(ctx, &settings.SV, 20*time.Millisecond)
slinstance.DefaultHeartBeat.Override(ctx, &settings.SV, 10*time.Millisecond)

fakeStorage := slstorage.NewFakeStorage()
sqlInstance := slinstance.NewSQLInstance(stopper, clock, fakeStorage, settings, nil)
Expand Down Expand Up @@ -91,3 +93,113 @@ func TestSQLInstance(t *testing.T) {
_, err = sqlInstance.Session(ctx)
require.Error(t, err)
}

// TestSQLInstanceDeadlines tests that we have proper deadlines set on the
// create and extend session operations. This is done by blocking the fake
// storage layer and ensuring that no sessions get created because the
// timeouts are constantly triggered.
func TestSQLInstanceDeadlines(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx, stopper := context.Background(), stop.NewStopper()
defer stopper.Stop(ctx)

clock := hlc.NewClock(timeutil.NewManualTime(timeutil.Unix(0, 42)), time.Nanosecond /* maxOffset */)
settings := cluster.MakeTestingClusterSettingsWithVersions(
clusterversion.TestingBinaryVersion,
clusterversion.TestingBinaryMinSupportedVersion,
true /* initializeVersion */)
slinstance.DefaultTTL.Override(ctx, &settings.SV, 20*time.Millisecond)
slinstance.DefaultHeartBeat.Override(ctx, &settings.SV, 10*time.Millisecond)

fakeStorage := slstorage.NewFakeStorage()
// block the fake storage
fakeStorage.SetBlockCh()
cleanUpFunc := func() {
fakeStorage.CloseBlockCh()
}
defer cleanUpFunc()

sqlInstance := slinstance.NewSQLInstance(stopper, clock, fakeStorage, settings, nil)
sqlInstance.Start(ctx)

// verify that we do not create a session
require.Never(
t,
func() bool {
_, err := sqlInstance.Session(ctx)
return err == nil
},
100*time.Millisecond, 10*time.Millisecond,
)
}

// TestSQLInstanceDeadlinesExtend tests that we have proper deadlines set on the
// create and extend session operations. This tests the case where the session is
// successfully created first and then blocks indefinitely.
func TestSQLInstanceDeadlinesExtend(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx, stopper := context.Background(), stop.NewStopper()
defer stopper.Stop(ctx)

mt := timeutil.NewManualTime(timeutil.Unix(0, 42))
clock := hlc.NewClock(mt, time.Nanosecond /* maxOffset */)
settings := cluster.MakeTestingClusterSettingsWithVersions(
clusterversion.TestingBinaryVersion,
clusterversion.TestingBinaryMinSupportedVersion,
true /* initializeVersion */)
slinstance.DefaultTTL.Override(ctx, &settings.SV, 20*time.Millisecond)
// Must be shorter than the storage sleep amount below
slinstance.DefaultHeartBeat.Override(ctx, &settings.SV, 10*time.Millisecond)

fakeStorage := slstorage.NewFakeStorage()
sqlInstance := slinstance.NewSQLInstance(stopper, clock, fakeStorage, settings, nil)
sqlInstance.Start(ctx)

// verify that eventually session is created successfully
testutils.SucceedsSoon(
t,
func() error {
_, err := sqlInstance.Session(ctx)
return err
},
)

// verify that session is also extended successfully a few times
require.Never(
t,
func() bool {
_, err := sqlInstance.Session(ctx)
return err != nil
},
100*time.Millisecond, 10*time.Millisecond,
)

// register a callback for verification that this session expired
var sessionExpired atomic.Bool
s, _ := sqlInstance.Session(ctx)
s.RegisterCallbackForSessionExpiry(func(ctx context.Context) {
sessionExpired.Store(true)
})

// block the fake storage
fakeStorage.SetBlockCh()
cleanUpFunc := func() {
fakeStorage.CloseBlockCh()
}
defer cleanUpFunc()
// advance manual clock so that session expires
mt.Advance(slinstance.DefaultTTL.Get(&settings.SV))

// expect session to expire
require.Eventually(
t,
func() bool {
return sessionExpired.Load()
},
testutils.DefaultSucceedsSoonDuration, 10*time.Millisecond,
)
}
Loading

0 comments on commit 2ff878e

Please sign in to comment.