Skip to content

Commit

Permalink
ccl/sqlproxyccl: ensure that EnsureTenantPod always resumes a pod
Browse files Browse the repository at this point in the history
This was a regression from the new load balancing work. Previously, the pods
list in entry.go only stores a list of running pods. That has been modified to
also store draining pods, and this breaks some of the existing logic. In
particular, there could be an issue where EnsureTenantPod returned right away
when there are only DRAINING pods left because we relied on the length of pods
rather than checking through all their states. This commit fixes that buglet
by ensuring that EnsureTenantPods will attempt to resume tenants whenever only
DRAINING pods are left.

This bug was prominent when the pod watcher restarted at the moment the
DRAINING pod was deleted, causing the deletion event to be missed. When that
happens, the cache is stuck with a DRAINING pod that will never get refreshed
because (1) EnsureTenantPod does not attempt to resume that tenant, and
(2) we don't call ReportFailure when we fail to obtain an address. The second
behavior is expected because we relied on the fact that (1) should resume the
tenant.

Release note: None
  • Loading branch information
jaylim-crl committed Jun 16, 2022
1 parent fb8ed97 commit 83e1815
Show file tree
Hide file tree
Showing 6 changed files with 105 additions and 21 deletions.
3 changes: 2 additions & 1 deletion pkg/ccl/sqlproxyccl/tenant/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,11 @@ go_test(
size = "large",
srcs = [
"directory_cache_test.go",
"entry_test.go",
"main_test.go",
],
embed = [":tenant"],
deps = [
":tenant",
"//pkg/base",
"//pkg/ccl",
"//pkg/ccl/kvccl/kvtenantccl",
Expand Down
9 changes: 1 addition & 8 deletions pkg/ccl/sqlproxyccl/tenant/directory_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,14 +194,7 @@ func (d *directoryCache) LookupTenantPods(
tenantPods := entry.GetPods()

// Trigger resumption if there are no RUNNING pods.
hasRunningPod := false
for _, pod := range tenantPods {
if pod.State == RUNNING {
hasRunningPod = true
break
}
}
if !hasRunningPod {
if !hasRunningPod(tenantPods) {
// There are no known pod IP addresses, so fetch pod information from
// the directory server. Resume the tenant if it is suspended; that
// will always result in at least one pod IP address (or an error).
Expand Down
19 changes: 19 additions & 0 deletions pkg/ccl/sqlproxyccl/tenant/directory_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,11 @@ func TestWatchPods(t *testing.T) {
}
return nil
})

// Trigger a deletion event, which will be missed by the pod watcher.
require.True(t, tds.RemovePod(tenantID, runningPod.Addr))

// Start the directory server again.
require.NoError(t, tds.Start(ctx))
testutils.SucceedsSoon(t, func() error {
if tds.WatchListenersCount() == 0 {
Expand All @@ -149,6 +154,20 @@ func TestWatchPods(t *testing.T) {
return nil
})

// Directory cache should still have the DRAINING pod.
pods, err = dir.TryLookupTenantPods(ctx, tenantID)
require.NoError(t, err)
require.Len(t, pods, 1)
require.Equal(t, pod, pods[0])

// Now attempt to perform a resumption. We get an error here, which shows
// that we attempted to call EnsurePod in the test directory server because
// the cache has no running pods. In the actual directory server, this
// should put the draining pod back to running.
pods, err = dir.LookupTenantPods(ctx, tenantID, "my-tenant")
require.Regexp(t, "tenant has no pods", err)
require.Empty(t, pods)

// Put the same pod back to running.
require.True(t, tds.AddPod(tenantID, runningPod))
pod = <-podWatcher
Expand Down
29 changes: 20 additions & 9 deletions pkg/ccl/sqlproxyccl/tenant/entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,10 +153,10 @@ func (e *tenantEntry) GetPods() []*Pod {
return e.pods.pods
}

// EnsureTenantPod ensures that at least one SQL process exists for this tenant,
// and is ready for connection attempts to its IP address. If errorIfNoPods is
// true, then EnsureTenantPod returns an error if there are no pods available
// rather than blocking.
// EnsureTenantPod ensures that at least one RUNNING SQL process exists for this
// tenant, and is ready for connection attempts to its IP address. If
// errorIfNoPods is true, then EnsureTenantPod returns an error if there are no
// pods available rather than blocking.
func (e *tenantEntry) EnsureTenantPod(
ctx context.Context, client DirectoryClient, errorIfNoPods bool,
) (pods []*Pod, err error) {
Expand All @@ -165,11 +165,11 @@ func (e *tenantEntry) EnsureTenantPod(
e.calls.Lock()
defer e.calls.Unlock()

// If an IP address is already available, nothing more to do. Check this
// immediately after obtaining the lock so that only the first thread does
// the work to get information about the tenant.
// If an IP address for a RUNNING pod is already available, nothing more to
// do. Check this immediately after obtaining the lock so that only the
// first thread does the work to get information about the tenant.
pods = e.GetPods()
if len(pods) != 0 {
if hasRunningPod(pods) {
return pods, nil
}

Expand All @@ -193,7 +193,7 @@ func (e *tenantEntry) EnsureTenantPod(
if err != nil {
return nil, err
}
if len(pods) != 0 {
if hasRunningPod(pods) {
log.Infof(ctx, "resumed tenant %d", e.TenantID)
break
}
Expand Down Expand Up @@ -252,3 +252,14 @@ func (e *tenantEntry) canRefreshLocked() bool {
e.calls.lastRefresh = now
return true
}

// hasRunningPod returns true if there is at least one RUNNING pod, or false
// otherwise.
func hasRunningPod(pods []*Pod) bool {
for _, pod := range pods {
if pod.State == RUNNING {
return true
}
}
return false
}
56 changes: 56 additions & 0 deletions pkg/ccl/sqlproxyccl/tenant/entry_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// Copyright 2022 The Cockroach Authors.
//
// Licensed as a CockroachDB Enterprise file under the Cockroach Community
// License (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt

package tenant

import (
"testing"

"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/stretchr/testify/require"
)

func TestHasRunningPod(t *testing.T) {
defer leaktest.AfterTest(t)()

for _, tc := range []struct {
name string
pods []*Pod
expected bool
}{
{
name: "no pods",
pods: nil,
expected: false,
},
{
name: "single running pod",
pods: []*Pod{{State: RUNNING}},
expected: true,
},
{
name: "single draining pod",
pods: []*Pod{{State: DRAINING}},
expected: false,
},
{
name: "multiple pods",
pods: []*Pod{
{State: DRAINING},
{State: DRAINING},
{State: RUNNING},
{State: RUNNING},
},
expected: true,
},
} {
t.Run(tc.name, func(t *testing.T) {
require.Equal(t, tc.expected, hasRunningPod(tc.pods))
})
}
}
10 changes: 7 additions & 3 deletions pkg/ccl/sqlproxyccl/tenantdirsvr/test_static_directory_svr.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,17 +270,21 @@ func (d *TestStaticDirectoryServer) AddPod(tenantID roachpb.TenantID, pod *tenan
// Emit an event that the pod has been created.
d.notifyPodUpdateLocked(pod)

// Make a copy of the pod so that any changes to the pod's state would not
// mutate the original data.
copyPod := *pod

// Check if the pod exists. This would handle pods transitioning from
// DRAINING to RUNNING.
for i, existing := range pods {
if existing.Addr == pod.Addr {
d.mu.tenants[tenantID][i] = pod
if existing.Addr == copyPod.Addr {
d.mu.tenants[tenantID][i] = &copyPod
return true
}
}

// A new pod has been added.
d.mu.tenants[tenantID] = append(d.mu.tenants[tenantID], pod)
d.mu.tenants[tenantID] = append(d.mu.tenants[tenantID], &copyPod)
return true
}

Expand Down

0 comments on commit 83e1815

Please sign in to comment.