From 83e1815fef9f760715c41fbac3c03fa6d93ffd73 Mon Sep 17 00:00:00 2001 From: Jay Date: Thu, 16 Jun 2022 12:24:37 -0400 Subject: [PATCH] ccl/sqlproxyccl: ensure that EnsureTenantPod always resumes a pod This was a regression from the new load balancing work. Previously, the pods list in entry.go only stores a list of running pods. That has been modified to also store draining pods, and this breaks some of the existing logic. In particular, there could be an issue where EnsureTenantPod returned right away when there are only DRAINING pods left because we relied on the length of pods rather than checking through all their states. This commit fixes that buglet by ensuring that EnsureTenantPods will attempt to resume tenants whenever only DRAINING pods are left. This bug was prominent when the pod watcher restarted at the moment the DRAINING pod was deleted, causing the deletion event to be missed. When that happens, the cache is stuck with a DRAINING pod that will never get refreshed because (1) EnsureTenantPod does not attempt to resume that tenant, and (2) we don't call ReportFailure when we fail to obtain an address. The second behavior is expected because we relied on the fact that (1) should resume the tenant. Release note: None --- pkg/ccl/sqlproxyccl/tenant/BUILD.bazel | 3 +- pkg/ccl/sqlproxyccl/tenant/directory_cache.go | 9 +-- .../tenant/directory_cache_test.go | 19 +++++++ pkg/ccl/sqlproxyccl/tenant/entry.go | 29 +++++++--- pkg/ccl/sqlproxyccl/tenant/entry_test.go | 56 +++++++++++++++++++ .../tenantdirsvr/test_static_directory_svr.go | 10 +++- 6 files changed, 105 insertions(+), 21 deletions(-) create mode 100644 pkg/ccl/sqlproxyccl/tenant/entry_test.go diff --git a/pkg/ccl/sqlproxyccl/tenant/BUILD.bazel b/pkg/ccl/sqlproxyccl/tenant/BUILD.bazel index 0f3da2561c31..d7cb76b2a762 100644 --- a/pkg/ccl/sqlproxyccl/tenant/BUILD.bazel +++ b/pkg/ccl/sqlproxyccl/tenant/BUILD.bazel @@ -49,10 +49,11 @@ go_test( size = "large", srcs = [ "directory_cache_test.go", + "entry_test.go", "main_test.go", ], + embed = [":tenant"], deps = [ - ":tenant", "//pkg/base", "//pkg/ccl", "//pkg/ccl/kvccl/kvtenantccl", diff --git a/pkg/ccl/sqlproxyccl/tenant/directory_cache.go b/pkg/ccl/sqlproxyccl/tenant/directory_cache.go index 5514d2d0ef46..be183fba35d6 100644 --- a/pkg/ccl/sqlproxyccl/tenant/directory_cache.go +++ b/pkg/ccl/sqlproxyccl/tenant/directory_cache.go @@ -194,14 +194,7 @@ func (d *directoryCache) LookupTenantPods( tenantPods := entry.GetPods() // Trigger resumption if there are no RUNNING pods. - hasRunningPod := false - for _, pod := range tenantPods { - if pod.State == RUNNING { - hasRunningPod = true - break - } - } - if !hasRunningPod { + if !hasRunningPod(tenantPods) { // There are no known pod IP addresses, so fetch pod information from // the directory server. Resume the tenant if it is suspended; that // will always result in at least one pod IP address (or an error). diff --git a/pkg/ccl/sqlproxyccl/tenant/directory_cache_test.go b/pkg/ccl/sqlproxyccl/tenant/directory_cache_test.go index 45f929c9f9c2..25ed45fe75d8 100644 --- a/pkg/ccl/sqlproxyccl/tenant/directory_cache_test.go +++ b/pkg/ccl/sqlproxyccl/tenant/directory_cache_test.go @@ -141,6 +141,11 @@ func TestWatchPods(t *testing.T) { } return nil }) + + // Trigger a deletion event, which will be missed by the pod watcher. + require.True(t, tds.RemovePod(tenantID, runningPod.Addr)) + + // Start the directory server again. require.NoError(t, tds.Start(ctx)) testutils.SucceedsSoon(t, func() error { if tds.WatchListenersCount() == 0 { @@ -149,6 +154,20 @@ func TestWatchPods(t *testing.T) { return nil }) + // Directory cache should still have the DRAINING pod. + pods, err = dir.TryLookupTenantPods(ctx, tenantID) + require.NoError(t, err) + require.Len(t, pods, 1) + require.Equal(t, pod, pods[0]) + + // Now attempt to perform a resumption. We get an error here, which shows + // that we attempted to call EnsurePod in the test directory server because + // the cache has no running pods. In the actual directory server, this + // should put the draining pod back to running. + pods, err = dir.LookupTenantPods(ctx, tenantID, "my-tenant") + require.Regexp(t, "tenant has no pods", err) + require.Empty(t, pods) + // Put the same pod back to running. require.True(t, tds.AddPod(tenantID, runningPod)) pod = <-podWatcher diff --git a/pkg/ccl/sqlproxyccl/tenant/entry.go b/pkg/ccl/sqlproxyccl/tenant/entry.go index 523b87e5c733..c30861e701cc 100644 --- a/pkg/ccl/sqlproxyccl/tenant/entry.go +++ b/pkg/ccl/sqlproxyccl/tenant/entry.go @@ -153,10 +153,10 @@ func (e *tenantEntry) GetPods() []*Pod { return e.pods.pods } -// EnsureTenantPod ensures that at least one SQL process exists for this tenant, -// and is ready for connection attempts to its IP address. If errorIfNoPods is -// true, then EnsureTenantPod returns an error if there are no pods available -// rather than blocking. +// EnsureTenantPod ensures that at least one RUNNING SQL process exists for this +// tenant, and is ready for connection attempts to its IP address. If +// errorIfNoPods is true, then EnsureTenantPod returns an error if there are no +// pods available rather than blocking. func (e *tenantEntry) EnsureTenantPod( ctx context.Context, client DirectoryClient, errorIfNoPods bool, ) (pods []*Pod, err error) { @@ -165,11 +165,11 @@ func (e *tenantEntry) EnsureTenantPod( e.calls.Lock() defer e.calls.Unlock() - // If an IP address is already available, nothing more to do. Check this - // immediately after obtaining the lock so that only the first thread does - // the work to get information about the tenant. + // If an IP address for a RUNNING pod is already available, nothing more to + // do. Check this immediately after obtaining the lock so that only the + // first thread does the work to get information about the tenant. pods = e.GetPods() - if len(pods) != 0 { + if hasRunningPod(pods) { return pods, nil } @@ -193,7 +193,7 @@ func (e *tenantEntry) EnsureTenantPod( if err != nil { return nil, err } - if len(pods) != 0 { + if hasRunningPod(pods) { log.Infof(ctx, "resumed tenant %d", e.TenantID) break } @@ -252,3 +252,14 @@ func (e *tenantEntry) canRefreshLocked() bool { e.calls.lastRefresh = now return true } + +// hasRunningPod returns true if there is at least one RUNNING pod, or false +// otherwise. +func hasRunningPod(pods []*Pod) bool { + for _, pod := range pods { + if pod.State == RUNNING { + return true + } + } + return false +} diff --git a/pkg/ccl/sqlproxyccl/tenant/entry_test.go b/pkg/ccl/sqlproxyccl/tenant/entry_test.go new file mode 100644 index 000000000000..86874e76f487 --- /dev/null +++ b/pkg/ccl/sqlproxyccl/tenant/entry_test.go @@ -0,0 +1,56 @@ +// Copyright 2022 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt + +package tenant + +import ( + "testing" + + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/stretchr/testify/require" +) + +func TestHasRunningPod(t *testing.T) { + defer leaktest.AfterTest(t)() + + for _, tc := range []struct { + name string + pods []*Pod + expected bool + }{ + { + name: "no pods", + pods: nil, + expected: false, + }, + { + name: "single running pod", + pods: []*Pod{{State: RUNNING}}, + expected: true, + }, + { + name: "single draining pod", + pods: []*Pod{{State: DRAINING}}, + expected: false, + }, + { + name: "multiple pods", + pods: []*Pod{ + {State: DRAINING}, + {State: DRAINING}, + {State: RUNNING}, + {State: RUNNING}, + }, + expected: true, + }, + } { + t.Run(tc.name, func(t *testing.T) { + require.Equal(t, tc.expected, hasRunningPod(tc.pods)) + }) + } +} diff --git a/pkg/ccl/sqlproxyccl/tenantdirsvr/test_static_directory_svr.go b/pkg/ccl/sqlproxyccl/tenantdirsvr/test_static_directory_svr.go index ed40d2113435..a2df19ca21dc 100644 --- a/pkg/ccl/sqlproxyccl/tenantdirsvr/test_static_directory_svr.go +++ b/pkg/ccl/sqlproxyccl/tenantdirsvr/test_static_directory_svr.go @@ -270,17 +270,21 @@ func (d *TestStaticDirectoryServer) AddPod(tenantID roachpb.TenantID, pod *tenan // Emit an event that the pod has been created. d.notifyPodUpdateLocked(pod) + // Make a copy of the pod so that any changes to the pod's state would not + // mutate the original data. + copyPod := *pod + // Check if the pod exists. This would handle pods transitioning from // DRAINING to RUNNING. for i, existing := range pods { - if existing.Addr == pod.Addr { - d.mu.tenants[tenantID][i] = pod + if existing.Addr == copyPod.Addr { + d.mu.tenants[tenantID][i] = ©Pod return true } } // A new pod has been added. - d.mu.tenants[tenantID] = append(d.mu.tenants[tenantID], pod) + d.mu.tenants[tenantID] = append(d.mu.tenants[tenantID], ©Pod) return true }