Skip to content

Commit

Permalink
Merge #119739
Browse files Browse the repository at this point in the history
119739: catalog/lease: block liveness heartbeats if leases cannot be written r=fqazi a=fqazi

Previously, if we had any type of partition failure where a single node could not write / delete leases it could cause the schema changes to be blocked on descriptors held by it. This was made worse with the session based model where leases no longer expire, so any failure of this nature will last until the problematic node is restarted. To address this, this patch will prevent the sqlliveness from extending the expiration if the leases table cannot be accessed any longer.

Fixes: #119654
Fixes: #119347

Release note: None

Co-authored-by: Faizan Qazi <faizan@cockroachlabs.com>
  • Loading branch information
craig[bot] and fqazi committed Mar 1, 2024
2 parents 033097f + 3fe7bf8 commit 96ccd91
Show file tree
Hide file tree
Showing 6 changed files with 254 additions and 11 deletions.
4 changes: 4 additions & 0 deletions pkg/sql/catalog/lease/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ go_library(
"//pkg/sql/sem/tree",
"//pkg/sql/sessiondata",
"//pkg/sql/sqlliveness",
"//pkg/sql/sqlliveness/slbase",
"//pkg/storage",
"//pkg/util/grpcutil",
"//pkg/util/hlc",
Expand Down Expand Up @@ -120,8 +121,10 @@ go_test(
"//pkg/sql/pgwire/pgcode",
"//pkg/sql/regions",
"//pkg/sql/rowenc/keyside",
"//pkg/sql/rowenc/valueside",
"//pkg/sql/sem/tree",
"//pkg/sql/sessiondata",
"//pkg/sql/sqlinstance/instancestorage",
"//pkg/sql/sqlliveness/slbase",
"//pkg/sql/sqlliveness/slprovider",
"//pkg/sql/types",
Expand All @@ -132,6 +135,7 @@ go_test(
"//pkg/testutils/sqlutils",
"//pkg/testutils/testcluster",
"//pkg/util/admission",
"//pkg/util/ctxgroup",
"//pkg/util/encoding",
"//pkg/util/hlc",
"//pkg/util/leaktest",
Expand Down
21 changes: 10 additions & 11 deletions pkg/sql/catalog/lease/lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -639,7 +639,7 @@ func acquireNodeLease(
var session sqlliveness.Session
if m.sessionBasedLeasingModeAtLeast(ctx, SessionBasedDualWrite) {
var err error
session, err = m.livenessProvider.Session(ctx)
session, err = m.storage.livenessProvider.Session(ctx)
if err != nil {
return false, errors.Wrapf(err, "lease acquisition was unable to resolve liveness session")
}
Expand Down Expand Up @@ -850,7 +850,6 @@ type Manager struct {
rangeFeedFactory *rangefeed.Factory
storage storage
settings *cluster.Settings
livenessProvider sqlliveness.Provider
mu struct {
syncutil.Mutex
// TODO(james): Track size of leased descriptors in memory.
Expand Down Expand Up @@ -902,14 +901,15 @@ func NewLeaseManager(
) *Manager {
lm := &Manager{
storage: storage{
nodeIDContainer: nodeIDContainer,
db: db,
clock: clock,
settings: settings,
codec: codec,
sysDBCache: catkv.NewSystemDatabaseCache(codec, settings),
group: singleflight.NewGroup("acquire-lease", "descriptor ID"),
testingKnobs: testingKnobs.LeaseStoreTestingKnobs,
nodeIDContainer: nodeIDContainer,
db: db,
clock: clock,
settings: settings,
codec: codec,
livenessProvider: livenessProvider,
sysDBCache: catkv.NewSystemDatabaseCache(codec, settings),
group: singleflight.NewGroup("acquire-lease", "descriptor ID"),
testingKnobs: testingKnobs.LeaseStoreTestingKnobs,
outstandingLeases: metric.NewGauge(metric.Metadata{
Name: "sql.leases.active",
Help: "The number of outstanding SQL schema leases.",
Expand All @@ -930,7 +930,6 @@ func NewLeaseManager(
}),
},
settings: settings,
livenessProvider: livenessProvider,
rangeFeedFactory: rangeFeedFactory,
testingKnobs: testingKnobs,
names: makeNameCache(),
Expand Down
140 changes: 140 additions & 0 deletions pkg/sql/catalog/lease/lease_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,10 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/sql/regions"
"github.com/cockroachdb/cockroach/pkg/sql/rowenc/keyside"
"github.com/cockroachdb/cockroach/pkg/sql/rowenc/valueside"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/sqlinstance/instancestorage"
"github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/slbase"
"github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/slprovider"
"github.com/cockroachdb/cockroach/pkg/sql/types"
Expand All @@ -60,6 +62,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/encoding"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
Expand Down Expand Up @@ -3637,6 +3640,143 @@ func TestAmbiguousResultIsRetried(t *testing.T) {
require.NoError(t, <-selectErr)
}

// TestLeaseTableWriteFailure is used to ensure that sqlliveness heart-beating
// and extensions are disabled if the lease table becomes accessible. This
// is used to ensure that schema changes are allowed on the remaining nodes.
func TestLeaseTableWriteFailure(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
skip.UnderStressRace(t)

type filter = kvserverbase.ReplicaResponseFilter
var f atomic.Value
noop := filter(func(context.Context, *kvpb.BatchRequest, *kvpb.BatchResponse) *kvpb.Error {
return nil
})
f.Store(noop)
ctx := context.Background()
// Session based leases will retry with the same value, which KV will be smart
// enough to cancel out on the replica when ambigious results happen. So,
// this test breaks on this setting.
settings := cluster.MakeTestingClusterSettings()
srv := serverutils.StartCluster(t, 3, base.TestClusterArgs{
ServerArgs: base.TestServerArgs{
Settings: settings,
Knobs: base.TestingKnobs{
Store: &kvserver.StoreTestingKnobs{
TestingResponseFilter: func(ctx context.Context, request *kvpb.BatchRequest, response *kvpb.BatchResponse) *kvpb.Error {
return f.Load().(filter)(ctx, request, response)
},
},
},
},
})
defer srv.Stopper().Stop(ctx)
s := srv.ApplicationLayer(0)
lease.LeaseDuration.Override(ctx, &s.ClusterSettings().SV, time.Second*10)
instancestorage.ReclaimLoopInterval.Override(ctx, &s.ClusterSettings().SV, 150*time.Millisecond)
slbase.DefaultTTL.Override(ctx, &s.ClusterSettings().SV, time.Second*10)
codec := s.Codec()

sqlDB := srv.ServerConn(0)
// Disable automatic stats collection to avoid un-related leases.
sqlutils.MakeSQLRunner(sqlDB).Exec(t, "SET CLUSTER SETTING sql.stats.automatic_collection.enabled = false")
sqlutils.MakeSQLRunner(sqlDB).Exec(t, "CREATE TABLE foo ()")
tableID := sqlutils.QueryTableID(t, sqlDB, "defaultdb", "public", "foo")

tablePrefix := codec.TablePrefix(keys.LeaseTableID)
testCtx, cancel := context.WithCancel(ctx)
defer cancel()
errorsDuringLeaseInsert := make(chan chan *kvpb.Error)
f.Store(filter(func(ctx context.Context, request *kvpb.BatchRequest, response *kvpb.BatchResponse) *kvpb.Error {
for _, req := range request.Requests {
switch r := req.GetInner().(type) {
// Detect any inserts into the system.lease table involving the table foo
// from node 1.
case *kvpb.ConditionalPutRequest:
if !bytes.HasPrefix(r.Key, tablePrefix) {
return nil
}
in, _, _, err := codec.DecodeIndexPrefix(r.Key)
if err != nil {
return kvpb.NewError(errors.WithAssertionFailure(err))
}
var a tree.DatumAlloc
_, in, err = keyside.Decode(&a, types.Bytes, in, encoding.Ascending)
if !assert.NoError(t, err) {
return kvpb.NewError(err)
}
id, _, err := keyside.Decode(
&a, types.Int, in, encoding.Ascending,
)
assert.NoError(t, err)

// Extra the node ID, which is in the tuple.
tuple, err := r.Value.GetTuple()
assert.NoError(t, err)
nodeID, _, err := valueside.Decode(
&a, types.Int, tuple,
)
assert.NoError(t, err)

if tree.MustBeDInt(id) == tree.DInt(tableID) && tree.MustBeDInt(nodeID) == tree.DInt(1) {
errCh := make(chan *kvpb.Error)
select {
case errorsDuringLeaseInsert <- errCh:
case <-testCtx.Done():
return nil
}
err := <-errCh
return err
}
}
}
return nil
}))

// Make sure that the lease gets acquired and then, upon an ambiguous
// failure, the retry happens, and there is just one lease.
selectErr := make(chan error)

done := false
firstBlock := false
stopGeneratingErrors := atomic.Bool{}
grp := ctxgroup.WithContext(ctx)
grp.GoCtx(func(ctx context.Context) error {
_, err := sqlDB.Exec("SELECT * FROM foo")
selectErr <- err
return nil
})
for !done {
select {
// Everytime we attempt to insert a lease for foo on node 1, generate
// and ambiguous result error.
case unblock := <-errorsDuringLeaseInsert:
if !stopGeneratingErrors.Load() {
unblock <- kvpb.NewError(kvpb.NewAmbiguousResultError(errors.New("boom")))
} else {
unblock <- nil
}
// The first time we hit one of these errors lets try and get a schema
// change going which will need the lease on the old version to disappear.
if !firstBlock {
grp.GoCtx(func(ctx context.Context) error {
defer stopGeneratingErrors.Swap(true)
_, err := srv.ServerConn(1).Exec("ALTER TABLE foo ADD COLUMN j INT")
return err
})
firstBlock = true
}
case err := <-selectErr:
require.NoError(t, err)
close(errorsDuringLeaseInsert)
done = true
}
}
require.NoError(t, grp.Wait())
require.Truef(t, firstBlock, "did not block any lease writes")
}

// TestDescriptorRemovedFromCacheWhenLeaseRenewalForThisDescriptorFails makes sure that, during a lease
// periodical refresh, if the descriptor, whose lease we intend to refresh, does not exist anymore, we delete
// this descriptor from "cache" (i.e. manager.mu.descriptor).
Expand Down
60 changes: 60 additions & 0 deletions pkg/sql/catalog/lease/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sqlliveness"
"github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/slbase"
"github.com/cockroachdb/cockroach/pkg/util/grpcutil"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand All @@ -40,6 +41,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/startup"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil/singleflight"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
)

Expand All @@ -57,6 +59,7 @@ type storage struct {
regionPrefix *atomic.Value
sessionBasedLeasingMode sessionBasedLeasingModeReader
sysDBCache *catkv.SystemDatabaseCache
livenessProvider sqlliveness.Provider

// group is used for all calls made to acquireNodeLease to prevent
// concurrent lease acquisitions from the store.
Expand Down Expand Up @@ -227,12 +230,39 @@ func (s storage) acquire(
return s.writer.insertLease(ctx, txn, lf)
}

// Compute the maximum time we will retry ambiguous replica errors before
// disabling the SQL liveness heartbeat. The time chosen will guarantee that
// the sqlliveness TTL expires once the lease duration has surpassed.
maxTimeToDisableLiveness := s.jitteredLeaseDuration()
defaultTTLForLiveness := slbase.DefaultTTL.Get(&s.settings.SV)
if maxTimeToDisableLiveness > defaultTTLForLiveness {
maxTimeToDisableLiveness -= defaultTTLForLiveness
} else {
// If the TTL time is somehow bigger than the lease duration, then immediately
// after the first retry sqlliveness will renewals will be disabled.
maxTimeToDisableLiveness = 0
}
acquireStart := timeutil.Now()
extensionsBlocked := false
defer func() {
if extensionsBlocked {
s.livenessProvider.UnpauseLivenessHeartbeat(ctx)
}
}()
// Run a retry loop to deal with AmbiguousResultErrors. All other error types
// are propagated up to the caller.
for r := retry.StartWithCtx(ctx, retry.Options{}); r.Next(); {
err := s.db.KV().Txn(ctx, acquireInTxn)
switch {
case startup.IsRetryableReplicaError(err):
// If we keep encountering the retryable replica error for more then
// the lease expiry duration we can no longer keep updating the liveness.
// i.e. This node is no longer productive at this point since it can't
// acquire or release releases potentially blocking schema changes.
if !extensionsBlocked && timeutil.Since(acquireStart) > maxTimeToDisableLiveness {
s.livenessProvider.PauseLivenessHeartbeat(ctx)
extensionsBlocked = true
}
log.Infof(ctx, "retryable replica error occurred during lease acquisition for %v, retrying: %v", id, err)
continue
case pgerror.GetPGCode(err) == pgcode.UniqueViolation:
Expand Down Expand Up @@ -263,6 +293,25 @@ func (s storage) release(
retryOptions := base.DefaultRetryOptions()
retryOptions.Closer = stopper.ShouldQuiesce()

// Compute the maximum time we will retry ambiguous replica errors before
// disabling the SQL liveness heartbeat. The time chosen will guarantee that
// the sqlliveness TTL expires once the lease duration has surpassed.
maxTimeToDisableLiveness := s.jitteredLeaseDuration()
defaultTTLForLiveness := slbase.DefaultTTL.Get(&s.settings.SV)
if maxTimeToDisableLiveness > defaultTTLForLiveness {
maxTimeToDisableLiveness -= defaultTTLForLiveness
} else {
// If the TTL time is somehow bigger than the lease duration, then immediately
// after the first retry sqlliveness will renewals will be disabled.
maxTimeToDisableLiveness = 0
}
acquireStart := timeutil.Now()
extensionsBlocked := false
defer func() {
if extensionsBlocked {
s.livenessProvider.UnpauseLivenessHeartbeat(ctx)
}
}()
// This transaction is idempotent; the retry was put in place because of
// NodeUnavailableErrors.
for r := retry.StartWithCtx(ctx, retryOptions); r.Next(); {
Expand All @@ -285,6 +334,17 @@ func (s storage) release(
if grpcutil.IsConnectionRejected(err) {
return
}
if startup.IsRetryableReplicaError(err) {
// If we keep encountering the retryable replica error for more then
// the lease expiry duration we can no longer keep updating the liveness.
// i.e. This node is no longer productive at this point since it can't
// acquire or release releases potentially blocking schema changes across
// a cluster.
if !extensionsBlocked && timeutil.Since(acquireStart) > maxTimeToDisableLiveness {
s.livenessProvider.PauseLivenessHeartbeat(ctx)
extensionsBlocked = true
}
}
continue
}
released = true
Expand Down
32 changes: 32 additions & 0 deletions pkg/sql/sqlliveness/slinstance/slinstance.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,9 @@ type Instance struct {
stopErr error
// s is the current session, if any.
s *session
// blockedExtensions indicates if extensions are disallowed because of availability
// issues on dependent tables.
blockedExtensions int
// blockCh is set when s == nil && stopErr == nil. It is used to wait on
// updates to s.
blockCh chan struct{}
Expand Down Expand Up @@ -250,6 +253,15 @@ func (l *Instance) createSession(ctx context.Context) (*session, error) {
func (l *Instance) extendSession(ctx context.Context, s *session) (bool, error) {
exp := l.clock.Now().Add(l.ttl().Nanoseconds(), 0)

// If extensions are disallowed we are only going to heartbeat the same
// timestamp, so that we can detect if the sqlliveness row was removed.
l.mu.Lock()
extensionsBlocked := l.mu.blockedExtensions
l.mu.Unlock()
if extensionsBlocked > 0 {
exp = s.Expiration()
}

opts := retry.Options{
InitialBackoff: 10 * time.Millisecond,
MaxBackoff: 2 * time.Second,
Expand Down Expand Up @@ -447,6 +459,26 @@ func (l *Instance) Release(ctx context.Context) (sqlliveness.SessionID, error) {
return session.ID(), nil
}

func (l *Instance) PauseLivenessHeartbeat(ctx context.Context) {
l.mu.Lock()
defer l.mu.Unlock()
firstToBlock := l.mu.blockedExtensions == 0
l.mu.blockedExtensions++
if firstToBlock {
log.Infof(ctx, "disabling sqlliveness extension because of availability issue on system tables")
}
}

func (l *Instance) UnpauseLivenessHeartbeat(ctx context.Context) {
l.mu.Lock()
defer l.mu.Unlock()
l.mu.blockedExtensions--
lastToUnblock := l.mu.blockedExtensions == 0
if lastToUnblock {
log.Infof(ctx, "enabling sqlliveness extension due to restored availability")
}
}

// Session returns a live session id. For each Sqlliveness instance the
// invariant is that there exists at most one live session at any point in time.
// If the current one has expired then a new one is created.
Expand Down
Loading

0 comments on commit 96ccd91

Please sign in to comment.