Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

infoschema: Add SERVER_CONNECT_ATTRS table #44340

Merged
merged 14 commits into from
Jun 13, 2023
2 changes: 1 addition & 1 deletion infoschema/perfschema/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ go_test(
data = glob(["testdata/**"]),
embed = [":perfschema"],
flaky = True,
shard_count = 4,
shard_count = 5,
deps = [
"//kv",
"//parser/terror",
Expand Down
8 changes: 8 additions & 0 deletions infoschema/perfschema/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package perfschema
// perfSchemaTables is a shortcut to involve all table names.
var perfSchemaTables = []string{
tableGlobalStatus,
tableSessionConnectAttrs,
tableSessionStatus,
tableSetupActors,
tableSetupObjects,
Expand Down Expand Up @@ -549,3 +550,10 @@ const tablePDProfileGoroutines = "CREATE TABLE IF NOT EXISTS " + tableNamePDProf
const tableSessionVariables = "CREATE TABLE IF NOT EXISTS " + tableNameSessionVariables + " (" +
"VARIABLE_NAME VARCHAR(64) NOT NULL," +
"VARIABLE_VALUE VARCHAR(1024) NOT NULL);"

// tableSessionConnectAttrs contains the column name definitions for the table session_connect_attrs
const tableSessionConnectAttrs = "CREATE TABLE IF NOT EXISTS " + tableNameSessionConnectAttrs + " (" +
"PROCESSLIST_ID bigint unsigned NOT NULL," +
"ATTR_NAME varchar(32) COLLATE utf8mb4_bin NOT NULL," +
"ATTR_VALUE varchar(1024) COLLATE utf8mb4_bin DEFAULT NULL," +
"ORDINAL_POSITION int DEFAULT NULL);"
4 changes: 4 additions & 0 deletions infoschema/perfschema/tables.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ const (
tableNamePDProfileAllocs = "pd_profile_allocs"
tableNamePDProfileBlock = "pd_profile_block"
tableNamePDProfileGoroutines = "pd_profile_goroutines"
tableNameSessionConnectAttrs = "session_connect_attrs"
tableNameSessionVariables = "session_variables"
)

Expand Down Expand Up @@ -104,6 +105,7 @@ var tableIDMap = map[string]int64{
tableNamePDProfileBlock: autoid.PerformanceSchemaDBID + 29,
tableNamePDProfileGoroutines: autoid.PerformanceSchemaDBID + 30,
tableNameSessionVariables: autoid.PerformanceSchemaDBID + 31,
tableNameSessionConnectAttrs: autoid.PerformanceSchemaDBID + 32,
}

// perfSchemaTable stands for the fake table all its data is in the memory.
Expand Down Expand Up @@ -257,6 +259,8 @@ func (vt *perfSchemaTable) getRows(ctx context.Context, sctx sessionctx.Context,
fullRows, err = dataForRemoteProfile(sctx, "pd", "/pd/api/v1/debug/pprof/goroutine?debug=2", true)
case tableNameSessionVariables:
fullRows, err = infoschema.GetDataFromSessionVariables(ctx, sctx)
case tableNameSessionConnectAttrs:
fullRows, err = infoschema.GetDataFromSessionConnectAttrs(sctx)
}
if err != nil {
return
Expand Down
14 changes: 14 additions & 0 deletions infoschema/perfschema/tables_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,20 @@ func TestTiKVProfileCPU(t *testing.T) {
require.Lenf(t, accessed, 5, "expect all HTTP API had been accessed, but found: %v", accessed)
}

// TestSessionConnectAttrs tests the `SESSION_CONNECT_ATTRS` table
func TestSessionConnectAttrs(t *testing.T) {
sm := &testkit.MockSessionManager{}
sm.ConAttrs = map[uint64]map[string]string{
123456: {
"_client_name": "libmysql",
},
}
store := newMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.Session().SetSessionManager(sm)
tk.MustQuery("SELECT PROCESSLIST_ID,ATTR_NAME,ATTR_VALUE,ORDINAL_POSITION FROM performance_schema.SESSION_CONNECT_ATTRS").Check(testkit.Rows("123456 _client_name libmysql 0"))
}

func newMockStore(t *testing.T) kv.Storage {
store, err := mockstore.NewMockStore()
require.NoError(t, err)
Expand Down
32 changes: 32 additions & 0 deletions infoschema/tables.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"net"
"net/http"
"sort"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -2003,6 +2004,37 @@ func GetDataFromSessionVariables(ctx context.Context, sctx sessionctx.Context) (
return rows, nil
}

// GetDataFromSessionConnectAttrs produces the rows for the session_connect_attrs table.
func GetDataFromSessionConnectAttrs(sctx sessionctx.Context) ([][]types.Datum, error) {
sm := sctx.GetSessionManager()
if sm == nil {
return nil, nil
}
allAttrs := sm.GetConAttrs()
rows := make([][]types.Datum, 0, len(allAttrs)*10) // 10 Attributes per connection
for pid, attrs := range allAttrs { // Note: PID is not ordered.
// Sorts the attributes by key and gives ORDINAL_POSITION based on this. This is needed as we didn't store the
// ORDINAL_POSITION and a map doesn't have a guaranteed sort order. This is needed to keep the ORDINAL_POSITION
// stable over multiple queries.
attrnames := make([]string, 0, len(attrs))
for attrname := range attrs {
attrnames = append(attrnames, attrname)
}
sort.Strings(attrnames)

for ord, attrkey := range attrnames {
row := types.MakeDatums(
pid,
attrkey,
attrs[attrkey],
ord,
)
rows = append(rows, row)
}
}
return rows, nil
}

var tableNameToColumns = map[string][]columnInfo{
TableSchemata: schemataCols,
TableTables: tablesCols,
Expand Down
2 changes: 1 addition & 1 deletion server/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ type clientConn struct {
sync.RWMutex
*TiDBContext // an interface to execute sql statements.
}
attrs map[string]string // attributes parsed from client handshake response, not used for now.
attrs map[string]string // attributes parsed from client handshake response.
serverHost string // server host
peerHost string // peer host
peerPort string // peer port
Expand Down
13 changes: 13 additions & 0 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -811,6 +811,19 @@ func (s *Server) GetProcessInfo(id uint64) (*util.ProcessInfo, bool) {
return conn.ctx.ShowProcess(), ok
}

// GetConAttrs returns the connection attributes
func (s *Server) GetConAttrs() map[uint64]map[string]string {
s.rwlock.RLock()
defer s.rwlock.RUnlock()
rs := make(map[uint64]map[string]string)
for _, client := range s.clients {
if pi := client.ctx.ShowProcess(); pi != nil {
rs[pi.ID] = client.attrs
}
}
return rs
}

// Kill implements the SessionManager interface.
func (s *Server) Kill(connectionID uint64, query bool, maxExecutionTime bool) {
logutil.BgLogger().Info("kill", zap.Uint64("conn", connectionID), zap.Bool("query", query))
Expand Down
20 changes: 13 additions & 7 deletions testkit/mocksessionmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,14 @@ import (

// MockSessionManager is a mocked session manager which is used for test.
type MockSessionManager struct {
PS []*util.ProcessInfo
PSMu sync.RWMutex
SerID uint64
TxnInfo []*txninfo.TxnInfo
Dom *domain.Domain
Conn map[uint64]session.Session
mu sync.Mutex
PS []*util.ProcessInfo
PSMu sync.RWMutex
SerID uint64
TxnInfo []*txninfo.TxnInfo
Dom *domain.Domain
Conn map[uint64]session.Session
mu sync.Mutex
ConAttrs map[uint64]map[string]string

internalSessions map[interface{}]struct{}
}
Expand Down Expand Up @@ -103,6 +104,11 @@ func (msm *MockSessionManager) GetProcessInfo(id uint64) (*util.ProcessInfo, boo
return &util.ProcessInfo{}, false
}

// GetConAttrs returns the connection attributes of all connections
func (msm *MockSessionManager) GetConAttrs() map[uint64]map[string]string {
return msm.ConAttrs
}

// Kill implements the SessionManager.Kill interface.
func (*MockSessionManager) Kill(uint64, bool, bool) {
}
Expand Down
2 changes: 2 additions & 0 deletions util/processinfo.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,8 @@ type SessionManager interface {
CheckOldRunningTxn(job2ver map[int64]int64, job2ids map[int64]string)
// KillNonFlashbackClusterConn kill all non flashback cluster connections.
KillNonFlashbackClusterConn()
// GetConAttrs gets the connection attributes
GetConAttrs() map[uint64]map[string]string
}

// GlobalConnID is the global connection ID, providing UNIQUE connection IDs across the whole TiDB cluster.
Expand Down