Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

server: refine the connection counter logic #54546

Merged
merged 1 commit into from
Jul 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 4 additions & 9 deletions pkg/executor/simple.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ import (
"github.com/pingcap/tidb/pkg/infoschema"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/meta"
"github.com/pingcap/tidb/pkg/metrics"
"github.com/pingcap/tidb/pkg/parser/ast"
"github.com/pingcap/tidb/pkg/parser/auth"
"github.com/pingcap/tidb/pkg/parser/model"
Expand Down Expand Up @@ -2851,20 +2850,16 @@ func (e *SimpleExec) executeAdminUnsetBDRRole() error {
}

func (e *SimpleExec) executeSetResourceGroupName(s *ast.SetResourceGroupStmt) error {
originalResourceGroup := e.Ctx().GetSessionVars().ResourceGroupName
var name string
if s.Name.L != "" {
if _, ok := e.is.ResourceGroupByName(s.Name); !ok {
return infoschema.ErrResourceGroupNotExists.GenWithStackByArgs(s.Name.O)
}
e.Ctx().GetSessionVars().ResourceGroupName = s.Name.L
name = s.Name.L
} else {
e.Ctx().GetSessionVars().ResourceGroupName = resourcegroup.DefaultResourceGroupName
}
newResourceGroup := e.Ctx().GetSessionVars().ResourceGroupName
if originalResourceGroup != newResourceGroup {
metrics.ConnGauge.WithLabelValues(originalResourceGroup).Dec()
metrics.ConnGauge.WithLabelValues(newResourceGroup).Inc()
name = resourcegroup.DefaultResourceGroupName
}
e.Ctx().GetSessionVars().SetResourceGroupName(name)
return nil
}

Expand Down
3 changes: 3 additions & 0 deletions pkg/server/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,7 @@ func closeConn(cc *clientConn) error {
logutil.Logger(context.Background()).Debug("could not close connection", zap.Error(err))
}
}

// Close statements and session
// At first, it'll decrese the count of connections in the resource group, update the corresponding gauge.
// Then it'll close the statements and session, which release advisory locks, row locks, etc.
Expand All @@ -398,6 +399,8 @@ func closeConn(cc *clientConn) error {
metrics.ConnGauge.WithLabelValues(resourceGroupName).Dec()

err = ctx.Close()
} else {
metrics.ConnGauge.WithLabelValues(resourcegroup.DefaultResourceGroupName).Dec()
}
})
return err
Expand Down
2 changes: 2 additions & 0 deletions pkg/server/internal/testserverclient/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@ go_library(
"//pkg/parser/model",
"//pkg/parser/mysql",
"//pkg/server",
"//pkg/sessionctx/sessionstates",
"//pkg/testkit",
"//pkg/testkit/testenv",
"//pkg/util",
"//pkg/util/versioninfo",
"@com_github_go_sql_driver_mysql//:mysql",
"@com_github_pingcap_errors//:errors",
Expand Down
95 changes: 95 additions & 0 deletions pkg/server/internal/testserverclient/server_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package testserverclient
import (
"bytes"
"context"
"crypto/x509"
"database/sql"
"encoding/json"
"fmt"
Expand Down Expand Up @@ -45,8 +46,10 @@ import (
"github.com/pingcap/tidb/pkg/parser/model"
tmysql "github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/server"
"github.com/pingcap/tidb/pkg/sessionctx/sessionstates"
"github.com/pingcap/tidb/pkg/testkit"
"github.com/pingcap/tidb/pkg/testkit/testenv"
"github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tidb/pkg/util/versioninfo"
dto "github.com/prometheus/client_model/go"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -2729,6 +2732,98 @@ func (cli *TestServerClient) RunTestConnectionCount(t *testing.T) {
resourceGroupConnCountReached(t, "default", 0.0)
resourceGroupConnCountReached(t, "test", 0.0)
})

// The connection closed before handshake will not decrease the count below 0.
cli.RunTests(t, func(config *mysql.Config) {
config.User = "randomusername"
}, func(dbt *testkit.DBTestKit) {
_, err := dbt.GetDB().Conn(context.Background())
require.NotNil(t, err)
resourceGroupConnCountReached(t, "default", 0.0)
})

// The resource group set by user authantication info is tracked by the count
cli.RunTests(t, nil, func(dbt *testkit.DBTestKit) {
// Create a user with resource group
_, err := dbt.GetDB().Exec("CREATE USER 'testuser'@'%' RESOURCE GROUP test;")
require.NoError(t, err)
})
cli.RunTests(t, func(c *mysql.Config) {
c.User = "testuser"
c.DBName = ""
}, func(dbt *testkit.DBTestKit) {
// By default, the resource group is set to `test`
ctx := context.Background()
dbt.GetDB().SetMaxIdleConns(0)

// start 100 connections
conns := make([]*sql.Conn, 100)
for i := 0; i < 100; i++ {
conn, err := dbt.GetDB().Conn(ctx)
require.NoError(t, err)
conns[i] = conn
}
resourceGroupConnCountReached(t, "test", 100.0)

// close 25 connections
for i := 75; i < 100; i++ {
err := conns[i].Close()
require.NoError(t, err)
}
resourceGroupConnCountReached(t, "test", 75.0)

// close the rest of them
for i := 0; i < 75; i++ {
err := conns[i].Close()
require.NoError(t, err)
}
resourceGroupConnCountReached(t, "test", 0.0)
})

// The resource group set by `SET SESSION_STATE` will be tracked by the counter
// At first, create a new cert/key pair to encode session state
tempDir := t.TempDir()
certPath := filepath.Join(tempDir, "cert.pem")
keyPath := filepath.Join(tempDir, "key.pem")
err := util.CreateCertificates(certPath, keyPath, 1024, x509.RSA, x509.UnknownSignatureAlgorithm)
require.NoError(t, err)

sessionstates.SetCertPath(certPath)
sessionstates.SetKeyPath(keyPath)
sessionstates.ReloadSigningCert()
cli.RunTests(t, nil, func(dbt *testkit.DBTestKit) {
ctx := context.Background()
conn, err := dbt.GetDB().Conn(ctx)
require.NoError(t, err)
resourceGroupConnCountReached(t, "default", 1.0)
// Now set the resource group to `test`
_, err = conn.ExecContext(ctx, "set resource group test")
require.NoError(t, err)
resourceGroupConnCountReached(t, "default", 0.0)
resourceGroupConnCountReached(t, "test", 1.0)

// Encode the session state
rows, err := conn.QueryContext(ctx, "show session_states")
require.NoError(t, err)
var sessionStates, signInfo string
rows.Next()
err = rows.Scan(&sessionStates, &signInfo)
require.NoError(t, err)
require.NoError(t, rows.Close())

// Now reset the resource group to `default`
_, err = conn.ExecContext(ctx, "set resource group default")
require.NoError(t, err)
resourceGroupConnCountReached(t, "default", 1.0)
resourceGroupConnCountReached(t, "test", 0.0)
// Set the session state
sessionStates = strings.ReplaceAll(sessionStates, "\\", "\\\\")
sessionStates = strings.ReplaceAll(sessionStates, "'", "\\'")
_, err = conn.ExecContext(ctx, fmt.Sprintf("set session_states '%s'", sessionStates))
require.NoError(t, err)
resourceGroupConnCountReached(t, "default", 0.0)
resourceGroupConnCountReached(t, "test", 1.0)
})
}

func (cli *TestServerClient) RunTestTypeAndCharsetOfSendLongData(t *testing.T) {
Expand Down
3 changes: 2 additions & 1 deletion pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ import (
autoid "github.com/pingcap/tidb/pkg/autoid_service"
"github.com/pingcap/tidb/pkg/config"
"github.com/pingcap/tidb/pkg/domain"
"github.com/pingcap/tidb/pkg/domain/resourcegroup"
"github.com/pingcap/tidb/pkg/executor/mppcoordmanager"
"github.com/pingcap/tidb/pkg/extension"
"github.com/pingcap/tidb/pkg/kv"
Expand Down Expand Up @@ -234,6 +235,7 @@ func (s *Server) newConn(conn net.Conn) *clientConn {
}
cc.setConn(conn)
cc.salt = fastrand.Buf(20)
metrics.ConnGauge.WithLabelValues(resourcegroup.DefaultResourceGroupName).Inc()
return cc
}

Expand Down Expand Up @@ -642,7 +644,6 @@ func (s *Server) registerConn(conn *clientConn) bool {
return false
}
s.clients[conn.connectionID] = conn
metrics.ConnGauge.WithLabelValues(conn.getCtx().GetSessionVars().ResourceGroupName).Inc()
return true
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -2784,7 +2784,7 @@ func (s *session) Auth(user *auth.UserIdentity, authentication, salt []byte, aut
}

if variable.EnableResourceControl.Load() && info.ResourceGroupName != "" {
s.sessionVars.ResourceGroupName = strings.ToLower(info.ResourceGroupName)
s.sessionVars.SetResourceGroupName(info.ResourceGroupName)
}

if info.InSandBoxMode {
Expand Down
12 changes: 11 additions & 1 deletion pkg/sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -1516,6 +1516,7 @@ type SessionVars struct {

// Resource group name
// NOTE: all statement relate operation should use StmtCtx.ResourceGroupName instead.
// NOTE: please don't change it directly. Use `SetResourceGroupName`, because it'll need to inc/dec the metrics
ResourceGroupName string

// PessimisticTransactionFairLocking controls whether fair locking for pessimistic transaction
Expand Down Expand Up @@ -2754,7 +2755,7 @@ func (s *SessionVars) DecodeSessionStates(_ context.Context, sessionStates *sess
s.SequenceState.SetAllStates(sessionStates.SequenceLatestValues)
s.FoundInPlanCache = sessionStates.FoundInPlanCache
s.FoundInBinding = sessionStates.FoundInBinding
s.ResourceGroupName = sessionStates.ResourceGroupName
s.SetResourceGroupName(sessionStates.ResourceGroupName)
s.HypoIndexes = sessionStates.HypoIndexes
s.HypoTiFlashReplicas = sessionStates.HypoTiFlashReplicas

Expand All @@ -2765,6 +2766,15 @@ func (s *SessionVars) DecodeSessionStates(_ context.Context, sessionStates *sess
return
}

// SetResourceGroupName changes the resource group name and inc/dec the metrics accordingly.
func (s *SessionVars) SetResourceGroupName(groupName string) {
if s.ResourceGroupName != groupName {
metrics.ConnGauge.WithLabelValues(s.ResourceGroupName).Dec()
metrics.ConnGauge.WithLabelValues(groupName).Inc()
}
s.ResourceGroupName = groupName
}

// TableDelta stands for the changed count for one table or partition.
type TableDelta struct {
Delta int64
Expand Down