Skip to content

Commit

Permalink
session, util: update session to use new APIs (#22652) (#22805)
Browse files Browse the repository at this point in the history
* cherry pick #22652 to release-5.0-rc

Signed-off-by: ti-srebot <ti-srebot@pingcap.com>

* fix merge conflicts

* fix merge

* remove needless check

Co-authored-by: Morgan Tocker <tocker@gmail.com>
Co-authored-by: Ti Chi Robot <71242396+ti-chi-bot@users.noreply.github.com>
  • Loading branch information
3 people authored Feb 24, 2021
1 parent 760c7d1 commit 64994e9
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 117 deletions.
145 changes: 71 additions & 74 deletions session/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -526,7 +526,7 @@ var (

func checkBootstrapped(s Session) (bool, error) {
// Check if system db exists.
_, err := s.Execute(context.Background(), fmt.Sprintf("USE %s;", mysql.SystemDB))
_, err := s.ExecuteInternal(context.Background(), "USE %n", mysql.SystemDB)
if err != nil && infoschema.ErrDatabaseNotExists.NotEqual(err) {
logutil.BgLogger().Fatal("check bootstrap error",
zap.Error(err))
Expand All @@ -552,20 +552,18 @@ func checkBootstrapped(s Session) (bool, error) {
// getTiDBVar gets variable value from mysql.tidb table.
// Those variables are used by TiDB server.
func getTiDBVar(s Session, name string) (sVal string, isNull bool, e error) {
sql := fmt.Sprintf(`SELECT HIGH_PRIORITY VARIABLE_VALUE FROM %s.%s WHERE VARIABLE_NAME="%s"`,
mysql.SystemDB, mysql.TiDBTable, name)
ctx := context.Background()
rs, err := s.Execute(ctx, sql)
rs, err := s.ExecuteInternal(ctx, `SELECT HIGH_PRIORITY VARIABLE_VALUE FROM %n.%n WHERE VARIABLE_NAME= %?`,
mysql.SystemDB,
mysql.TiDBTable,
name,
)
if err != nil {
return "", true, errors.Trace(err)
}
if len(rs) != 1 {
return "", true, errors.New("Wrong number of Recordset")
}
r := rs[0]
defer terror.Call(r.Close)
req := r.NewChunk()
err = r.Next(ctx, req)
defer terror.Call(rs.Close)
req := rs.NewChunk()
err = rs.Next(ctx, req)
if err != nil || req.NumRows() == 0 {
return "", true, errors.Trace(err)
}
Expand All @@ -591,7 +589,7 @@ func upgrade(s Session) {
}

updateBootstrapVer(s)
_, err = s.Execute(context.Background(), "COMMIT")
_, err = s.ExecuteInternal(context.Background(), "COMMIT")

if err != nil {
sleepTime := 1 * time.Second
Expand Down Expand Up @@ -638,18 +636,15 @@ func upgradeToVer3(s Session, ver int64) {
return
}
// Version 3 fix tx_read_only variable value.
sql := fmt.Sprintf("UPDATE HIGH_PRIORITY %s.%s SET variable_value = '0' WHERE variable_name = 'tx_read_only';",
mysql.SystemDB, mysql.GlobalVariablesTable)
mustExecute(s, sql)
mustExecute(s, "UPDATE HIGH_PRIORITY %n.%n SET variable_value = '0' WHERE variable_name = 'tx_read_only';", mysql.SystemDB, mysql.GlobalVariablesTable)
}

// upgradeToVer4 updates to version 4.
func upgradeToVer4(s Session, ver int64) {
if ver >= version4 {
return
}
sql := CreateStatsMetaTable
mustExecute(s, sql)
mustExecute(s, CreateStatsMetaTable)
}

func upgradeToVer5(s Session, ver int64) {
Expand Down Expand Up @@ -683,7 +678,7 @@ func upgradeToVer8(s Session, ver int64) {
return
}
// This is a dummy upgrade, it checks whether upgradeToVer7 success, if not, do it again.
if _, err := s.Execute(context.Background(), "SELECT HIGH_PRIORITY `Process_priv` FROM mysql.user LIMIT 0"); err == nil {
if _, err := s.ExecuteInternal(context.Background(), "SELECT HIGH_PRIORITY `Process_priv` FROM mysql.user LIMIT 0"); err == nil {
return
}
upgradeToVer7(s, ver)
Expand All @@ -699,7 +694,7 @@ func upgradeToVer9(s Session, ver int64) {
}

func doReentrantDDL(s Session, sql string, ignorableErrs ...error) {
_, err := s.Execute(context.Background(), sql)
_, err := s.ExecuteInternal(context.Background(), sql)
for _, ignorableErr := range ignorableErrs {
if terror.ErrorEqual(err, ignorableErr) {
return
Expand All @@ -725,7 +720,7 @@ func upgradeToVer11(s Session, ver int64) {
if ver >= version11 {
return
}
_, err := s.Execute(context.Background(), "ALTER TABLE mysql.user ADD COLUMN `References_priv` ENUM('N','Y') CHARACTER SET utf8 NOT NULL DEFAULT 'N' AFTER `Grant_priv`")
_, err := s.ExecuteInternal(context.Background(), "ALTER TABLE mysql.user ADD COLUMN `References_priv` ENUM('N','Y') CHARACTER SET utf8 NOT NULL DEFAULT 'N' AFTER `Grant_priv`")
if err != nil {
if terror.ErrorEqual(err, infoschema.ErrColumnExists) {
return
Expand All @@ -740,21 +735,20 @@ func upgradeToVer12(s Session, ver int64) {
return
}
ctx := context.Background()
_, err := s.Execute(ctx, "BEGIN")
_, err := s.ExecuteInternal(ctx, "BEGIN")
terror.MustNil(err)
sql := "SELECT HIGH_PRIORITY user, host, password FROM mysql.user WHERE password != ''"
rs, err := s.Execute(ctx, sql)
rs, err := s.ExecuteInternal(ctx, sql)
if terror.ErrorEqual(err, core.ErrUnknownColumn) {
sql := "SELECT HIGH_PRIORITY user, host, authentication_string FROM mysql.user WHERE authentication_string != ''"
rs, err = s.Execute(ctx, sql)
rs, err = s.ExecuteInternal(ctx, sql)
}
terror.MustNil(err)
r := rs[0]
sqls := make([]string, 0, 1)
defer terror.Call(r.Close)
req := r.NewChunk()
defer terror.Call(rs.Close)
req := rs.NewChunk()
it := chunk.NewIterator4Chunk(req)
err = r.Next(ctx, req)
err = rs.Next(ctx, req)
for err == nil && req.NumRows() != 0 {
for row := it.Begin(); row != it.End(); row = it.Next() {
user := row.GetString(0)
Expand All @@ -766,7 +760,7 @@ func upgradeToVer12(s Session, ver int64) {
updateSQL := fmt.Sprintf(`UPDATE HIGH_PRIORITY mysql.user SET password = "%s" WHERE user="%s" AND host="%s"`, newPass, user, host)
sqls = append(sqls, updateSQL)
}
err = r.Next(ctx, req)
err = rs.Next(ctx, req)
}
terror.MustNil(err)

Expand Down Expand Up @@ -796,7 +790,7 @@ func upgradeToVer13(s Session, ver int64) {
}
ctx := context.Background()
for _, sql := range sqls {
_, err := s.Execute(ctx, sql)
_, err := s.ExecuteInternal(ctx, sql)
if err != nil {
if terror.ErrorEqual(err, infoschema.ErrColumnExists) {
continue
Expand Down Expand Up @@ -825,7 +819,7 @@ func upgradeToVer14(s Session, ver int64) {
}
ctx := context.Background()
for _, sql := range sqls {
_, err := s.Execute(ctx, sql)
_, err := s.ExecuteInternal(ctx, sql)
if err != nil {
if terror.ErrorEqual(err, infoschema.ErrColumnExists) {
continue
Expand All @@ -840,7 +834,7 @@ func upgradeToVer15(s Session, ver int64) {
return
}
var err error
_, err = s.Execute(context.Background(), CreateGCDeleteRangeTable)
_, err = s.ExecuteInternal(context.Background(), CreateGCDeleteRangeTable)
if err != nil {
logutil.BgLogger().Fatal("upgradeToVer15 error", zap.Error(err))
}
Expand Down Expand Up @@ -910,9 +904,13 @@ func upgradeToVer23(s Session, ver int64) {

// writeSystemTZ writes system timezone info into mysql.tidb
func writeSystemTZ(s Session) {
sql := fmt.Sprintf(`INSERT HIGH_PRIORITY INTO %s.%s VALUES ("%s", "%s", "TiDB Global System Timezone.") ON DUPLICATE KEY UPDATE VARIABLE_VALUE="%s"`,
mysql.SystemDB, mysql.TiDBTable, tidbSystemTZ, timeutil.InferSystemTZ(), timeutil.InferSystemTZ())
mustExecute(s, sql)
mustExecute(s, `INSERT HIGH_PRIORITY INTO %n.%n VALUES (%?, %?, "TiDB Global System Timezone.") ON DUPLICATE KEY UPDATE VARIABLE_VALUE= %?`,
mysql.SystemDB,
mysql.TiDBTable,
tidbSystemTZ,
timeutil.InferSystemTZ(),
timeutil.InferSystemTZ(),
)
}

// upgradeToVer24 initializes `System` timezone according to docs/design/2018-09-10-adding-tz-env.md
Expand Down Expand Up @@ -1041,7 +1039,7 @@ func upgradeToVer38(s Session, ver int64) {
return
}
var err error
_, err = s.Execute(context.Background(), CreateGlobalPrivTable)
_, err = s.ExecuteInternal(context.Background(), CreateGlobalPrivTable)
if err != nil {
logutil.BgLogger().Fatal("upgradeToVer38 error", zap.Error(err))
}
Expand All @@ -1053,9 +1051,9 @@ func writeNewCollationParameter(s Session, flag bool) {
if flag {
b = varTrue
}
sql := fmt.Sprintf(`INSERT HIGH_PRIORITY INTO %s.%s VALUES ("%s", '%s', '%s') ON DUPLICATE KEY UPDATE VARIABLE_VALUE='%s'`,
mysql.SystemDB, mysql.TiDBTable, tidbNewCollationEnabled, b, comment, b)
mustExecute(s, sql)
mustExecute(s, `INSERT HIGH_PRIORITY INTO %n.%n VALUES (%?, %?, %?) ON DUPLICATE KEY UPDATE VARIABLE_VALUE=%?`,
mysql.SystemDB, mysql.TiDBTable, tidbNewCollationEnabled, b, comment, b,
)
}

func upgradeToVer40(s Session, ver int64) {
Expand Down Expand Up @@ -1091,14 +1089,14 @@ func upgradeToVer42(s Session, ver int64) {

// Convert statement summary global variables to non-empty values.
func writeStmtSummaryVars(s Session) {
sql := fmt.Sprintf("UPDATE %s.%s SET variable_value='%%s' WHERE variable_name='%%s' AND variable_value=''", mysql.SystemDB, mysql.GlobalVariablesTable)
sql := "UPDATE %n.%n SET variable_value= %? WHERE variable_name= %? AND variable_value=''"
stmtSummaryConfig := config.GetGlobalConfig().StmtSummary
mustExecute(s, fmt.Sprintf(sql, variable.BoolToOnOff(stmtSummaryConfig.Enable), variable.TiDBEnableStmtSummary))
mustExecute(s, fmt.Sprintf(sql, variable.BoolToOnOff(stmtSummaryConfig.EnableInternalQuery), variable.TiDBStmtSummaryInternalQuery))
mustExecute(s, fmt.Sprintf(sql, strconv.Itoa(stmtSummaryConfig.RefreshInterval), variable.TiDBStmtSummaryRefreshInterval))
mustExecute(s, fmt.Sprintf(sql, strconv.Itoa(stmtSummaryConfig.HistorySize), variable.TiDBStmtSummaryHistorySize))
mustExecute(s, fmt.Sprintf(sql, strconv.FormatUint(uint64(stmtSummaryConfig.MaxStmtCount), 10), variable.TiDBStmtSummaryMaxStmtCount))
mustExecute(s, fmt.Sprintf(sql, strconv.FormatUint(uint64(stmtSummaryConfig.MaxSQLLength), 10), variable.TiDBStmtSummaryMaxSQLLength))
mustExecute(s, sql, mysql.SystemDB, mysql.GlobalVariablesTable, variable.BoolToOnOff(stmtSummaryConfig.Enable), variable.TiDBEnableStmtSummary)
mustExecute(s, sql, mysql.SystemDB, mysql.GlobalVariablesTable, variable.BoolToOnOff(stmtSummaryConfig.EnableInternalQuery), variable.TiDBStmtSummaryInternalQuery)
mustExecute(s, sql, mysql.SystemDB, mysql.GlobalVariablesTable, strconv.Itoa(stmtSummaryConfig.RefreshInterval), variable.TiDBStmtSummaryRefreshInterval)
mustExecute(s, sql, mysql.SystemDB, mysql.GlobalVariablesTable, strconv.Itoa(stmtSummaryConfig.HistorySize), variable.TiDBStmtSummaryHistorySize)
mustExecute(s, sql, mysql.SystemDB, mysql.GlobalVariablesTable, strconv.FormatUint(uint64(stmtSummaryConfig.MaxStmtCount), 10), variable.TiDBStmtSummaryMaxStmtCount)
mustExecute(s, sql, mysql.SystemDB, mysql.GlobalVariablesTable, strconv.FormatUint(uint64(stmtSummaryConfig.MaxSQLLength), 10), variable.TiDBStmtSummaryMaxSQLLength)
}

func upgradeToVer43(s Session, ver int64) {
Expand Down Expand Up @@ -1206,13 +1204,12 @@ func upgradeToVer55(s Session, ver int64) {

selectSQL := "select HIGH_PRIORITY * from mysql.global_variables where variable_name in ('" + strings.Join(names, quoteCommaQuote) + "')"
ctx := context.Background()
rs, err := s.Execute(ctx, selectSQL)
rs, err := s.ExecuteInternal(ctx, selectSQL)
terror.MustNil(err)
r := rs[0]
defer terror.Call(r.Close)
req := r.NewChunk()
defer terror.Call(rs.Close)
req := rs.NewChunk()
it := chunk.NewIterator4Chunk(req)
err = r.Next(ctx, req)
err = rs.Next(ctx, req)
for err == nil && req.NumRows() != 0 {
for row := it.Begin(); row != it.End(); row = it.Next() {
n := strings.ToLower(row.GetString(0))
Expand All @@ -1221,7 +1218,7 @@ func upgradeToVer55(s Session, ver int64) {
return
}
}
err = r.Next(ctx, req)
err = rs.Next(ctx, req)
}
terror.MustNil(err)

Expand Down Expand Up @@ -1257,9 +1254,9 @@ func initBindInfoTable(s Session) {
}

func insertBuiltinBindInfoRow(s Session) {
sql := fmt.Sprintf(`INSERT HIGH_PRIORITY INTO mysql.bind_info VALUES ("%s", "%s", "mysql", "%s", "0000-00-00 00:00:00", "0000-00-00 00:00:00", "", "", "%s")`,
bindinfo.BuiltinPseudoSQL4BindLock, bindinfo.BuiltinPseudoSQL4BindLock, bindinfo.Builtin, bindinfo.Builtin)
mustExecute(s, sql)
mustExecute(s, `INSERT HIGH_PRIORITY INTO mysql.bind_info VALUES (%?, %?, "mysql", %?, "0000-00-00 00:00:00", "0000-00-00 00:00:00", "", "", %?)`,
bindinfo.BuiltinPseudoSQL4BindLock, bindinfo.BuiltinPseudoSQL4BindLock, bindinfo.Builtin, bindinfo.Builtin,
)
}

func upgradeToVer59(s Session, ver int64) {
Expand All @@ -1279,9 +1276,9 @@ func upgradeToVer59(s Session, ver int64) {

func writeMemoryQuotaQuery(s Session) {
comment := "memory_quota_query is 32GB by default in v3.0.x, 1GB by default in v4.0.x+"
sql := fmt.Sprintf(`INSERT HIGH_PRIORITY INTO %s.%s VALUES ("%s", '%d', '%s') ON DUPLICATE KEY UPDATE VARIABLE_VALUE='%d'`,
mysql.SystemDB, mysql.TiDBTable, tidbDefMemoryQuotaQuery, 32<<30, comment, 32<<30)
mustExecute(s, sql)
mustExecute(s, `INSERT HIGH_PRIORITY INTO %n.%n VALUES (%?, %?, %?) ON DUPLICATE KEY UPDATE VARIABLE_VALUE=%?`,
mysql.SystemDB, mysql.TiDBTable, tidbDefMemoryQuotaQuery, 32<<30, comment, 32<<30,
)
}

func upgradeToVer60(s Session, ver int64) {
Expand All @@ -1303,17 +1300,17 @@ func upgradeToVer61(s Session, ver int64) {

func writeOOMAction(s Session) {
comment := "oom-action is `log` by default in v3.0.x, `cancel` by default in v4.0.11+"
sql := fmt.Sprintf(`INSERT HIGH_PRIORITY INTO %s.%s VALUES ("%s", '%s', '%s') ON DUPLICATE KEY UPDATE VARIABLE_VALUE='%s'`,
mysql.SystemDB, mysql.TiDBTable, tidbDefOOMAction, config.OOMActionLog, comment, config.OOMActionLog)
mustExecute(s, sql)
mustExecute(s, `INSERT HIGH_PRIORITY INTO %n.%n VALUES (%?, %?, %?) ON DUPLICATE KEY UPDATE VARIABLE_VALUE= %?`,
mysql.SystemDB, mysql.TiDBTable, tidbDefOOMAction, config.OOMActionLog, comment, config.OOMActionLog,
)
}

// updateBootstrapVer updates bootstrap version variable in mysql.TiDB table.
func updateBootstrapVer(s Session) {
// Update bootstrap version.
sql := fmt.Sprintf(`INSERT HIGH_PRIORITY INTO %s.%s VALUES ("%s", "%d", "TiDB bootstrap version.") ON DUPLICATE KEY UPDATE VARIABLE_VALUE="%d"`,
mysql.SystemDB, mysql.TiDBTable, tidbServerVersionVar, currentBootstrapVersion, currentBootstrapVersion)
mustExecute(s, sql)
mustExecute(s, `INSERT HIGH_PRIORITY INTO %n.%n VALUES (%?, %?, "TiDB bootstrap version.") ON DUPLICATE KEY UPDATE VARIABLE_VALUE=%?`,
mysql.SystemDB, mysql.TiDBTable, tidbServerVersionVar, currentBootstrapVersion, currentBootstrapVersion,
)
}

// getBootstrapVersion gets bootstrap version from mysql.tidb table;
Expand All @@ -1333,7 +1330,7 @@ func doDDLWorks(s Session) {
// Create a test database.
mustExecute(s, "CREATE DATABASE IF NOT EXISTS test")
// Create system db.
mustExecute(s, fmt.Sprintf("CREATE DATABASE IF NOT EXISTS %s;", mysql.SystemDB))
mustExecute(s, "CREATE DATABASE IF NOT EXISTS %n", mysql.SystemDB)
// Create user table.
mustExecute(s, CreateUserTable)
// Create privilege tables.
Expand Down Expand Up @@ -1379,6 +1376,7 @@ func doDDLWorks(s Session) {

// doDMLWorks executes DML statements in bootstrap stage.
// All the statements run in a single transaction.
// TODO: sanitize.
func doDMLWorks(s Session) {
mustExecute(s, "BEGIN")

Expand Down Expand Up @@ -1420,14 +1418,13 @@ func doDMLWorks(s Session) {
strings.Join(values, ", "))
mustExecute(s, sql)

sql = fmt.Sprintf(`INSERT HIGH_PRIORITY INTO %s.%s VALUES("%s", "%s", "Bootstrap flag. Do not delete.")
ON DUPLICATE KEY UPDATE VARIABLE_VALUE="%s"`,
mysql.SystemDB, mysql.TiDBTable, bootstrappedVar, varTrue, varTrue)
mustExecute(s, sql)
mustExecute(s, `INSERT HIGH_PRIORITY INTO %n.%n VALUES(%?, %?, "Bootstrap flag. Do not delete.") ON DUPLICATE KEY UPDATE VARIABLE_VALUE=%?`,
mysql.SystemDB, mysql.TiDBTable, bootstrappedVar, varTrue, varTrue,
)

sql = fmt.Sprintf(`INSERT HIGH_PRIORITY INTO %s.%s VALUES("%s", "%d", "Bootstrap version. Do not delete.")`,
mysql.SystemDB, mysql.TiDBTable, tidbServerVersionVar, currentBootstrapVersion)
mustExecute(s, sql)
mustExecute(s, `INSERT HIGH_PRIORITY INTO %n.%n VALUES(%?, %?, "Bootstrap version. Do not delete.")`,
mysql.SystemDB, mysql.TiDBTable, tidbServerVersionVar, currentBootstrapVersion,
)

writeSystemTZ(s)

Expand All @@ -1437,7 +1434,7 @@ func doDMLWorks(s Session) {

writeStmtSummaryVars(s)

_, err := s.Execute(context.Background(), "COMMIT")
_, err := s.ExecuteInternal(context.Background(), "COMMIT")
if err != nil {
sleepTime := 1 * time.Second
logutil.BgLogger().Info("doDMLWorks failed", zap.Error(err), zap.Duration("sleeping time", sleepTime))
Expand All @@ -1454,8 +1451,8 @@ func doDMLWorks(s Session) {
}
}

func mustExecute(s Session, sql string) {
_, err := s.Execute(context.Background(), sql)
func mustExecute(s Session, sql string, args ...interface{}) {
_, err := s.ExecuteInternal(context.Background(), sql, args...)
if err != nil {
debug.PrintStack()
logutil.BgLogger().Fatal("mustExecute error", zap.Error(err))
Expand Down
Loading

0 comments on commit 64994e9

Please sign in to comment.