From ac786cb1a38a2487a411fdd46aa4eaf6019e6d1e Mon Sep 17 00:00:00 2001 From: YangKeao Date: Mon, 15 Jul 2024 13:11:58 +0800 Subject: [PATCH] server: refine the connection counter logic (#54546) close pingcap/tidb#54428, close pingcap/tidb#54545 --- pkg/executor/simple.go | 13 +-- pkg/server/conn.go | 3 + .../internal/testserverclient/BUILD.bazel | 2 + .../testserverclient/server_client.go | 95 +++++++++++++++++++ pkg/server/server.go | 3 +- pkg/session/session.go | 2 +- pkg/sessionctx/variable/session.go | 12 ++- 7 files changed, 118 insertions(+), 12 deletions(-) diff --git a/pkg/executor/simple.go b/pkg/executor/simple.go index 62537825b06ba..4bbbb34fbf2cd 100644 --- a/pkg/executor/simple.go +++ b/pkg/executor/simple.go @@ -43,7 +43,6 @@ import ( "github.com/pingcap/tidb/pkg/infoschema" "github.com/pingcap/tidb/pkg/kv" "github.com/pingcap/tidb/pkg/meta" - "github.com/pingcap/tidb/pkg/metrics" "github.com/pingcap/tidb/pkg/parser/ast" "github.com/pingcap/tidb/pkg/parser/auth" "github.com/pingcap/tidb/pkg/parser/model" @@ -2901,20 +2900,16 @@ func (e *SimpleExec) executeAdminUnsetBDRRole() error { } func (e *SimpleExec) executeSetResourceGroupName(s *ast.SetResourceGroupStmt) error { - originalResourceGroup := e.Ctx().GetSessionVars().ResourceGroupName + var name string if s.Name.L != "" { if _, ok := e.is.ResourceGroupByName(s.Name); !ok { return infoschema.ErrResourceGroupNotExists.GenWithStackByArgs(s.Name.O) } - e.Ctx().GetSessionVars().ResourceGroupName = s.Name.L + name = s.Name.L } else { - e.Ctx().GetSessionVars().ResourceGroupName = resourcegroup.DefaultResourceGroupName - } - newResourceGroup := e.Ctx().GetSessionVars().ResourceGroupName - if originalResourceGroup != newResourceGroup { - metrics.ConnGauge.WithLabelValues(originalResourceGroup).Dec() - metrics.ConnGauge.WithLabelValues(newResourceGroup).Inc() + name = resourcegroup.DefaultResourceGroupName } + e.Ctx().GetSessionVars().SetResourceGroupName(name) return nil } diff --git a/pkg/server/conn.go b/pkg/server/conn.go index a3d139f6bdcba..caab2400d71c8 100644 --- a/pkg/server/conn.go +++ b/pkg/server/conn.go @@ -397,6 +397,7 @@ func closeConn(cc *clientConn) error { logutil.Logger(context.Background()).Debug("could not close connection", zap.Error(err)) } } + // Close statements and session // At first, it'll decrese the count of connections in the resource group, update the corresponding gauge. // Then it'll close the statements and session, which release advisory locks, row locks, etc. @@ -405,6 +406,8 @@ func closeConn(cc *clientConn) error { metrics.ConnGauge.WithLabelValues(resourceGroupName).Dec() err = ctx.Close() + } else { + metrics.ConnGauge.WithLabelValues(resourcegroup.DefaultResourceGroupName).Dec() } }) return err diff --git a/pkg/server/internal/testserverclient/BUILD.bazel b/pkg/server/internal/testserverclient/BUILD.bazel index b06a3af5c1310..c370ec0dd39f9 100644 --- a/pkg/server/internal/testserverclient/BUILD.bazel +++ b/pkg/server/internal/testserverclient/BUILD.bazel @@ -14,8 +14,10 @@ go_library( "//pkg/parser/model", "//pkg/parser/mysql", "//pkg/server", + "//pkg/sessionctx/sessionstates", "//pkg/testkit", "//pkg/testkit/testenv", + "//pkg/util", "//pkg/util/versioninfo", "@com_github_go_sql_driver_mysql//:mysql", "@com_github_pingcap_errors//:errors", diff --git a/pkg/server/internal/testserverclient/server_client.go b/pkg/server/internal/testserverclient/server_client.go index 8eb564fa982bf..b8813f034903c 100644 --- a/pkg/server/internal/testserverclient/server_client.go +++ b/pkg/server/internal/testserverclient/server_client.go @@ -17,6 +17,7 @@ package testserverclient import ( "bytes" "context" + "crypto/x509" "database/sql" "encoding/json" "fmt" @@ -46,8 +47,10 @@ import ( "github.com/pingcap/tidb/pkg/parser/model" tmysql "github.com/pingcap/tidb/pkg/parser/mysql" "github.com/pingcap/tidb/pkg/server" + "github.com/pingcap/tidb/pkg/sessionctx/sessionstates" "github.com/pingcap/tidb/pkg/testkit" "github.com/pingcap/tidb/pkg/testkit/testenv" + "github.com/pingcap/tidb/pkg/util" "github.com/pingcap/tidb/pkg/util/versioninfo" dto "github.com/prometheus/client_model/go" "github.com/stretchr/testify/require" @@ -2742,6 +2745,98 @@ func (cli *TestServerClient) RunTestConnectionCount(t *testing.T) { resourceGroupConnCountReached(t, "default", 0.0) resourceGroupConnCountReached(t, "test", 0.0) }) + + // The connection closed before handshake will not decrease the count below 0. + cli.RunTests(t, func(config *mysql.Config) { + config.User = "randomusername" + }, func(dbt *testkit.DBTestKit) { + _, err := dbt.GetDB().Conn(context.Background()) + require.NotNil(t, err) + resourceGroupConnCountReached(t, "default", 0.0) + }) + + // The resource group set by user authantication info is tracked by the count + cli.RunTests(t, nil, func(dbt *testkit.DBTestKit) { + // Create a user with resource group + _, err := dbt.GetDB().Exec("CREATE USER 'testuser'@'%' RESOURCE GROUP test;") + require.NoError(t, err) + }) + cli.RunTests(t, func(c *mysql.Config) { + c.User = "testuser" + c.DBName = "" + }, func(dbt *testkit.DBTestKit) { + // By default, the resource group is set to `test` + ctx := context.Background() + dbt.GetDB().SetMaxIdleConns(0) + + // start 100 connections + conns := make([]*sql.Conn, 100) + for i := 0; i < 100; i++ { + conn, err := dbt.GetDB().Conn(ctx) + require.NoError(t, err) + conns[i] = conn + } + resourceGroupConnCountReached(t, "test", 100.0) + + // close 25 connections + for i := 75; i < 100; i++ { + err := conns[i].Close() + require.NoError(t, err) + } + resourceGroupConnCountReached(t, "test", 75.0) + + // close the rest of them + for i := 0; i < 75; i++ { + err := conns[i].Close() + require.NoError(t, err) + } + resourceGroupConnCountReached(t, "test", 0.0) + }) + + // The resource group set by `SET SESSION_STATE` will be tracked by the counter + // At first, create a new cert/key pair to encode session state + tempDir := t.TempDir() + certPath := filepath.Join(tempDir, "cert.pem") + keyPath := filepath.Join(tempDir, "key.pem") + err := util.CreateCertificates(certPath, keyPath, 1024, x509.RSA, x509.UnknownSignatureAlgorithm) + require.NoError(t, err) + + sessionstates.SetCertPath(certPath) + sessionstates.SetKeyPath(keyPath) + sessionstates.ReloadSigningCert() + cli.RunTests(t, nil, func(dbt *testkit.DBTestKit) { + ctx := context.Background() + conn, err := dbt.GetDB().Conn(ctx) + require.NoError(t, err) + resourceGroupConnCountReached(t, "default", 1.0) + // Now set the resource group to `test` + _, err = conn.ExecContext(ctx, "set resource group test") + require.NoError(t, err) + resourceGroupConnCountReached(t, "default", 0.0) + resourceGroupConnCountReached(t, "test", 1.0) + + // Encode the session state + rows, err := conn.QueryContext(ctx, "show session_states") + require.NoError(t, err) + var sessionStates, signInfo string + rows.Next() + err = rows.Scan(&sessionStates, &signInfo) + require.NoError(t, err) + require.NoError(t, rows.Close()) + + // Now reset the resource group to `default` + _, err = conn.ExecContext(ctx, "set resource group default") + require.NoError(t, err) + resourceGroupConnCountReached(t, "default", 1.0) + resourceGroupConnCountReached(t, "test", 0.0) + // Set the session state + sessionStates = strings.ReplaceAll(sessionStates, "\\", "\\\\") + sessionStates = strings.ReplaceAll(sessionStates, "'", "\\'") + _, err = conn.ExecContext(ctx, fmt.Sprintf("set session_states '%s'", sessionStates)) + require.NoError(t, err) + resourceGroupConnCountReached(t, "default", 0.0) + resourceGroupConnCountReached(t, "test", 1.0) + }) } func (cli *TestServerClient) RunTestTypeAndCharsetOfSendLongData(t *testing.T) { diff --git a/pkg/server/server.go b/pkg/server/server.go index 9b3f65afe22a5..b67e006b142a4 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -53,6 +53,7 @@ import ( autoid "github.com/pingcap/tidb/pkg/autoid_service" "github.com/pingcap/tidb/pkg/config" "github.com/pingcap/tidb/pkg/domain" + "github.com/pingcap/tidb/pkg/domain/resourcegroup" "github.com/pingcap/tidb/pkg/executor/mppcoordmanager" "github.com/pingcap/tidb/pkg/extension" "github.com/pingcap/tidb/pkg/kv" @@ -234,6 +235,7 @@ func (s *Server) newConn(conn net.Conn) *clientConn { } cc.setConn(conn) cc.salt = fastrand.Buf(20) + metrics.ConnGauge.WithLabelValues(resourcegroup.DefaultResourceGroupName).Inc() return cc } @@ -642,7 +644,6 @@ func (s *Server) registerConn(conn *clientConn) bool { return false } s.clients[conn.connectionID] = conn - metrics.ConnGauge.WithLabelValues(conn.getCtx().GetSessionVars().ResourceGroupName).Inc() return true } diff --git a/pkg/session/session.go b/pkg/session/session.go index 143452024aae0..64dc09c3acb85 100644 --- a/pkg/session/session.go +++ b/pkg/session/session.go @@ -2785,7 +2785,7 @@ func (s *session) Auth(user *auth.UserIdentity, authentication, salt []byte, aut } if variable.EnableResourceControl.Load() && info.ResourceGroupName != "" { - s.sessionVars.ResourceGroupName = strings.ToLower(info.ResourceGroupName) + s.sessionVars.SetResourceGroupName(info.ResourceGroupName) } if info.InSandBoxMode { diff --git a/pkg/sessionctx/variable/session.go b/pkg/sessionctx/variable/session.go index e8ed94c9c918a..3bd19fb828e98 100644 --- a/pkg/sessionctx/variable/session.go +++ b/pkg/sessionctx/variable/session.go @@ -1519,6 +1519,7 @@ type SessionVars struct { // Resource group name // NOTE: all statement relate operation should use StmtCtx.ResourceGroupName instead. + // NOTE: please don't change it directly. Use `SetResourceGroupName`, because it'll need to inc/dec the metrics ResourceGroupName string // PessimisticTransactionFairLocking controls whether fair locking for pessimistic transaction @@ -2810,7 +2811,7 @@ func (s *SessionVars) DecodeSessionStates(_ context.Context, sessionStates *sess s.SequenceState.SetAllStates(sessionStates.SequenceLatestValues) s.FoundInPlanCache = sessionStates.FoundInPlanCache s.FoundInBinding = sessionStates.FoundInBinding - s.ResourceGroupName = sessionStates.ResourceGroupName + s.SetResourceGroupName(sessionStates.ResourceGroupName) s.HypoIndexes = sessionStates.HypoIndexes s.HypoTiFlashReplicas = sessionStates.HypoTiFlashReplicas @@ -2821,6 +2822,15 @@ func (s *SessionVars) DecodeSessionStates(_ context.Context, sessionStates *sess return } +// SetResourceGroupName changes the resource group name and inc/dec the metrics accordingly. +func (s *SessionVars) SetResourceGroupName(groupName string) { + if s.ResourceGroupName != groupName { + metrics.ConnGauge.WithLabelValues(s.ResourceGroupName).Dec() + metrics.ConnGauge.WithLabelValues(groupName).Inc() + } + s.ResourceGroupName = groupName +} + // TableDelta stands for the changed count for one table or partition. type TableDelta struct { Delta int64