Skip to content

Commit

Permalink
ddl: resolve the charset by the order: table->database->server (#9105)
Browse files Browse the repository at this point in the history
  • Loading branch information
winkyao authored and crazycs520 committed Jan 21, 2019
1 parent 89f4e64 commit 63936cc
Show file tree
Hide file tree
Showing 7 changed files with 149 additions and 54 deletions.
2 changes: 1 addition & 1 deletion ddl/column_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -954,7 +954,7 @@ func (s *testColumnSuite) colDefStrToFieldType(c *C, str string) *types.FieldTyp
stmt, err := parser.New().ParseOneStmt(sqlA, "", "")
c.Assert(err, IsNil)
colDef := stmt.(*ast.AlterTableStmt).Specs[0].NewColumns[0]
col, _, err := buildColumnAndConstraint(nil, 0, colDef, nil)
col, _, err := buildColumnAndConstraint(nil, 0, colDef, nil, mysql.DefaultCharset, mysql.DefaultCharset)
c.Assert(err, IsNil)
return &col.FieldType
}
Expand Down
30 changes: 30 additions & 0 deletions ddl/db_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -990,6 +990,36 @@ func (s *testIntegrationSuite) TestAddIndexAfterAddColumn(c *C) {
s.testErrorCode(c, s.tk, sql, tmysql.ErrTooManyKeyParts)
}

func (s *testIntegrationSuite) TestResolveCharset(c *C) {
s.tk = testkit.NewTestKit(c, s.store)
s.tk.MustExec("use test")
s.tk.MustExec("drop table if exists resolve_charset")
s.tk.MustExec(`CREATE TABLE resolve_charset (a varchar(255) DEFAULT NULL) DEFAULT CHARSET=latin1`)
ctx := s.tk.Se.(sessionctx.Context)
is := domain.GetDomain(ctx).InfoSchema()
tbl, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("resolve_charset"))
c.Assert(err, IsNil)
c.Assert(tbl.Cols()[0].Charset, Equals, "latin1")
s.tk.MustExec("INSERT INTO resolve_charset VALUES('鰈')")

s.tk.MustExec("create database resolve_charset charset binary")
s.tk.MustExec("use resolve_charset")
s.tk.MustExec(`CREATE TABLE resolve_charset (a varchar(255) DEFAULT NULL) DEFAULT CHARSET=latin1`)

is = domain.GetDomain(ctx).InfoSchema()
tbl, err = is.TableByName(model.NewCIStr("resolve_charset"), model.NewCIStr("resolve_charset"))
c.Assert(err, IsNil)
c.Assert(tbl.Cols()[0].Charset, Equals, "latin1")
c.Assert(tbl.Meta().Charset, Equals, "latin1")

s.tk.MustExec(`CREATE TABLE resolve_charset1 (a varchar(255) DEFAULT NULL)`)
is = domain.GetDomain(ctx).InfoSchema()
tbl, err = is.TableByName(model.NewCIStr("resolve_charset"), model.NewCIStr("resolve_charset1"))
c.Assert(err, IsNil)
c.Assert(tbl.Cols()[0].Charset, Equals, "binary")
c.Assert(tbl.Meta().Charset, Equals, "binary")
}

func (s *testIntegrationSuite) TestAddAnonymousIndex(c *C) {
s.tk = testkit.NewTestKit(c, s.store)
s.tk.MustExec("use test")
Expand Down
145 changes: 97 additions & 48 deletions ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ func setColumnFlagWithConstraint(colMap map[string]*table.Column, v *ast.Constra
}

func buildColumnsAndConstraints(ctx sessionctx.Context, colDefs []*ast.ColumnDef,
constraints []*ast.Constraint) ([]*table.Column, []*ast.Constraint, error) {
constraints []*ast.Constraint, tblCharset, dbCharset string) ([]*table.Column, []*ast.Constraint, error) {
var cols []*table.Column
colMap := map[string]*table.Column{}
// outPriKeyConstraint is the primary key constraint out of column definition. such as: create table t1 (id int , age int, primary key(id));
Expand All @@ -180,7 +180,7 @@ func buildColumnsAndConstraints(ctx sessionctx.Context, colDefs []*ast.ColumnDef
}
}
for i, colDef := range colDefs {
col, cts, err := buildColumnAndConstraint(ctx, i, colDef, outPriKeyConstraint)
col, cts, err := buildColumnAndConstraint(ctx, i, colDef, outPriKeyConstraint, tblCharset, dbCharset)
if err != nil {
return nil, nil, errors.Trace(err)
}
Expand All @@ -196,13 +196,40 @@ func buildColumnsAndConstraints(ctx sessionctx.Context, colDefs []*ast.ColumnDef
return cols, constraints, nil
}

func setCharsetCollationFlenDecimal(tp *types.FieldType) error {
// ResolveCharsetCollation will resolve the charset by the order: table charset > database charset > server default charset.
func ResolveCharsetCollation(tblCharset, dbCharset string) (string, string, error) {
if len(tblCharset) != 0 {
defCollate, err := charset.GetDefaultCollation(tblCharset)
if err != nil {
// return terror is better.
return "", "", ErrUnknownCharacterSet.GenWithStackByArgs(tblCharset)
}
return tblCharset, defCollate, nil
}

if len(dbCharset) != 0 {
defCollate, err := charset.GetDefaultCollation(dbCharset)
if err != nil {
return "", "", ErrUnknownCharacterSet.GenWithStackByArgs(dbCharset)
}
return dbCharset, defCollate, errors.Trace(err)
}

charset, collate := charset.GetDefaultCharsetAndCollate()
return charset, collate, nil
}

func setCharsetCollationFlenDecimal(tp *types.FieldType, tblCharset string, dbCharset string) error {
tp.Charset = strings.ToLower(tp.Charset)
tp.Collate = strings.ToLower(tp.Collate)
if len(tp.Charset) == 0 {
switch tp.Tp {
case mysql.TypeString, mysql.TypeVarchar, mysql.TypeVarString, mysql.TypeBlob, mysql.TypeTinyBlob, mysql.TypeMediumBlob, mysql.TypeLongBlob, mysql.TypeEnum, mysql.TypeSet:
tp.Charset, tp.Collate = charset.GetDefaultCharsetAndCollate()
var err error
tp.Charset, tp.Collate, err = ResolveCharsetCollation(tblCharset, dbCharset)
if err != nil {
return errors.Trace(err)
}
default:
tp.Charset = charset.CharsetBin
tp.Collate = charset.CharsetBin
Expand All @@ -219,6 +246,7 @@ func setCharsetCollationFlenDecimal(tp *types.FieldType) error {
}
}
}

// Use default value for flen or decimal when they are unspecified.
defaultFlen, defaultDecimal := mysql.GetDefaultFieldLengthAndDecimal(tp.Tp)
if tp.Flen == types.UnspecifiedLength {
Expand All @@ -237,9 +265,8 @@ func setCharsetCollationFlenDecimal(tp *types.FieldType) error {

// outPriKeyConstraint is the primary key constraint out of column definition. such as: create table t1 (id int , age int, primary key(id));
func buildColumnAndConstraint(ctx sessionctx.Context, offset int,
colDef *ast.ColumnDef, outPriKeyConstraint *ast.Constraint) (*table.Column, []*ast.Constraint, error) {
err := setCharsetCollationFlenDecimal(colDef.Tp)
if err != nil {
colDef *ast.ColumnDef, outPriKeyConstraint *ast.Constraint, tblCharset, dbCharset string) (*table.Column, []*ast.Constraint, error) {
if err := setCharsetCollationFlenDecimal(colDef.Tp, tblCharset, dbCharset); err != nil {
return nil, nil, errors.Trace(err)
}
col, cts, err := columnDefToCol(ctx, offset, colDef, outPriKeyConstraint)
Expand Down Expand Up @@ -673,25 +700,23 @@ func checkColumnsAttributes(colDefs []*ast.ColumnDef) error {
return nil
}

// checkColumnFieldLength check the maximum length limit for different character set varchar type columns.
func checkColumnFieldLength(schema *model.DBInfo, colDefs []*ast.ColumnDef, tbInfo *model.TableInfo) error {
for _, colDef := range colDefs {
if colDef.Tp.Tp == mysql.TypeVarchar {
var setCharset string
setCharset = mysql.DefaultCharset
if len(schema.Charset) != 0 {
setCharset = schema.Charset
}
if len(tbInfo.Charset) != 0 {
setCharset = tbInfo.Charset
}
// checkColumnsFieldLength check the maximum length limit for different character set varchar type columns.
func checkColumnsFieldLength(cols []*table.Column) error {
for _, col := range cols {
if err := checkColumnFieldLength(col); err != nil {
return errors.Trace(err)
}
}
return nil
}

err := IsTooBigFieldLength(colDef.Tp.Flen, colDef.Name.Name.O, setCharset)
if err != nil {
return errors.Trace(err)
}
func checkColumnFieldLength(col *table.Column) error {
if col.Tp == mysql.TypeVarchar {
if err := IsTooBigFieldLength(col.Flen, col.Name.O, col.Charset); err != nil {
return errors.Trace(err)
}
}

return nil
}

Expand Down Expand Up @@ -949,10 +974,10 @@ func (d *ddl) CreateTableWithLike(ctx sessionctx.Context, ident, referIdent ast.
// The SQL string should be a create table statement.
// Don't use this function to build a partitioned table.
func BuildTableInfoFromAST(s *ast.CreateTableStmt) (*model.TableInfo, error) {
return buildTableInfoWithCheck(mock.NewContext(), nil, s)
return buildTableInfoWithCheck(mock.NewContext(), nil, s, mysql.DefaultCharset)
}

func buildTableInfoWithCheck(ctx sessionctx.Context, d *ddl, s *ast.CreateTableStmt) (*model.TableInfo, error) {
func buildTableInfoWithCheck(ctx sessionctx.Context, d *ddl, s *ast.CreateTableStmt, dbCharset string) (*model.TableInfo, error) {
ident := ast.Ident{Schema: s.Table.Schema, Name: s.Table.Name}
colDefs := s.Cols
var colObjects []interface{}
Expand All @@ -979,11 +1004,17 @@ func buildTableInfoWithCheck(ctx sessionctx.Context, d *ddl, s *ast.CreateTableS
return nil, errors.Trace(err)
}

cols, newConstraints, err := buildColumnsAndConstraints(ctx, colDefs, s.Constraints)
tableCharset := findTableOptionCharset(s.Options)
// The column charset haven't been resolved here.
cols, newConstraints, err := buildColumnsAndConstraints(ctx, colDefs, s.Constraints, tableCharset, dbCharset)
if err != nil {
return nil, errors.Trace(err)
}

if err = checkColumnsFieldLength(cols); err != nil {
return nil, errors.Trace(err)
}

err = checkConstraintNames(newConstraints)
if err != nil {
return nil, errors.Trace(err)
Expand Down Expand Up @@ -1015,6 +1046,20 @@ func buildTableInfoWithCheck(ctx sessionctx.Context, d *ddl, s *ast.CreateTableS
}
tbInfo.Partition = pi
}

// The specified charset will be handled in handleTableOptions
if err = handleTableOptions(s.Options, tbInfo); err != nil {
return nil, errors.Trace(err)
}

if err = resolveDefaultTableCharsetAndCollation(tbInfo, dbCharset); err != nil {
return nil, errors.Trace(err)
}

if err = checkCharsetAndCollation(tbInfo.Charset, tbInfo.Collate); err != nil {
return nil, errors.Trace(err)
}

return tbInfo, nil
}

Expand All @@ -1037,7 +1082,7 @@ func (d *ddl) CreateTable(ctx sessionctx.Context, s *ast.CreateTableStmt) (err e
return infoschema.ErrTableExists.GenWithStackByArgs(ident)
}

tbInfo, err := buildTableInfoWithCheck(ctx, d, s)
tbInfo, err := buildTableInfoWithCheck(ctx, d, s, schema.Charset)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -1050,18 +1095,6 @@ func (d *ddl) CreateTable(ctx sessionctx.Context, s *ast.CreateTableStmt) (err e
Args: []interface{}{tbInfo},
}

err = handleTableOptions(s.Options, tbInfo)
if err != nil {
return errors.Trace(err)
}
err = checkCharsetAndCollation(tbInfo.Charset, tbInfo.Collate)
if err != nil {
return errors.Trace(err)
}
if err = checkColumnFieldLength(schema, s.Cols, tbInfo); err != nil {
return errors.Trace(err)
}

err = d.doDDLJob(ctx, job)
if err == nil {
if tbInfo.AutoIncID > 1 {
Expand Down Expand Up @@ -1269,17 +1302,35 @@ func (d *ddl) handleAutoIncID(tbInfo *model.TableInfo, schemaID int64) error {
return nil
}

func setDefaultTableCharsetAndCollation(tbInfo *model.TableInfo) (err error) {
func resolveDefaultTableCharsetAndCollation(tbInfo *model.TableInfo, dbCharset string) (err error) {
chr, collate, err := ResolveCharsetCollation(tbInfo.Charset, dbCharset)
if err != nil {
return errors.Trace(err)
}
if len(tbInfo.Charset) == 0 {
tbInfo.Charset = mysql.DefaultCharset
tbInfo.Charset = chr
}

if len(tbInfo.Collate) == 0 {
tbInfo.Collate, err = charset.GetDefaultCollation(tbInfo.Charset)
tbInfo.Collate = collate
}
return
}

func findTableOptionCharset(options []*ast.TableOption) string {
var tableCharset string
for i := len(options) - 1; i >= 0; i-- {
op := options[i]
if op.Tp == ast.TableOptionCharset {
// find the last one.
tableCharset = op.StrValue
break
}
}

return tableCharset
}

// handleTableOptions updates tableInfo according to table options.
func handleTableOptions(options []*ast.TableOption, tbInfo *model.TableInfo) error {
for _, op := range options {
Expand All @@ -1305,9 +1356,6 @@ func handleTableOptions(options []*ast.TableOption, tbInfo *model.TableInfo) err
}
}

if err := setDefaultTableCharsetAndCollation(tbInfo); err != nil {
log.Error(errors.ErrorStack(err))
}
return nil
}

Expand Down Expand Up @@ -1588,10 +1636,11 @@ func (d *ddl) AddColumn(ctx sessionctx.Context, ti ast.Ident, spec *ast.AlterTab
// Ingore table constraints now, maybe return error later.
// We use length(t.Cols()) as the default offset firstly, we will change the
// column's offset later.
col, _, err = buildColumnAndConstraint(ctx, len(t.Cols()), specNewColumn, nil)
col, _, err = buildColumnAndConstraint(ctx, len(t.Cols()), specNewColumn, nil, t.Meta().Charset, schema.Charset)
if err != nil {
return errors.Trace(err)
}

col.OriginDefaultValue, err = generateOriginDefaultValue(col.ToInfo())
if err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -2031,7 +2080,7 @@ func (d *ddl) getModifiableColumnJob(ctx sessionctx.Context, ident ast.Ident, or
Name: newColName,
})

err = setCharsetCollationFlenDecimal(&newCol.FieldType)
err = setCharsetCollationFlenDecimal(&newCol.FieldType, t.Meta().Charset, schema.Charset)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -2066,7 +2115,7 @@ func (d *ddl) getModifiableColumnJob(ctx sessionctx.Context, ident ast.Ident, or
modifyColumnTp = mysql.TypeNull
}

if err = checkColumnFieldLength(schema, spec.NewColumns, t.Meta()); err != nil {
if err = checkColumnFieldLength(newCol); err != nil {
return nil, errors.Trace(err)
}

Expand Down
3 changes: 2 additions & 1 deletion ddl/ddl_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -520,8 +520,9 @@ func (s *testDDLSuite) TestCancelJob(c *C) {
Tp: &types.FieldType{Tp: mysql.TypeLonglong},
Options: []*ast.ColumnOption{},
}
col, _, err := buildColumnAndConstraint(ctx, 2, newColumnDef, nil)
col, _, err := buildColumnAndConstraint(ctx, 2, newColumnDef, nil, mysql.DefaultCharset, mysql.DefaultCharset)
c.Assert(err, IsNil)

addColumnArgs := []interface{}{col, &ast.ColumnPosition{Tp: ast.ColumnPositionNone}, 0}
doDDLJobErrWithSchemaState(ctx, d, c, dbInfo.ID, tblInfo.ID, model.ActionAddColumn, addColumnArgs, &cancelState)
c.Check(errors.ErrorStack(checkErr), Equals, "")
Expand Down
12 changes: 11 additions & 1 deletion ddl/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func (dr *mockDelRange) clear() {}

// MockTableInfo mocks a table info by create table stmt ast and a specified table id.
func MockTableInfo(ctx sessionctx.Context, stmt *ast.CreateTableStmt, tableID int64) (*model.TableInfo, error) {
cols, newConstraints, err := buildColumnsAndConstraints(ctx, stmt.Cols, stmt.Constraints)
cols, newConstraints, err := buildColumnsAndConstraints(ctx, stmt.Cols, stmt.Constraints, "", "")
if err != nil {
return nil, errors.Trace(err)
}
Expand All @@ -148,5 +148,15 @@ func MockTableInfo(ctx sessionctx.Context, stmt *ast.CreateTableStmt, tableID in
return nil, errors.Trace(err)
}
tbl.ID = tableID

// The specified charset will be handled in handleTableOptions
if err = handleTableOptions(stmt.Options, tbl); err != nil {
return nil, errors.Trace(err)
}

if err = resolveDefaultTableCharsetAndCollation(tbl, ""); err != nil {
return nil, errors.Trace(err)
}

return tbl, nil
}
2 changes: 1 addition & 1 deletion executor/seqtest/seq_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ func (s *seqTestSuite) TestShow(c *C) {
row := result.Rows()[0]
// For issue https://github.com/pingcap/tidb/issues/1061
expectedRow := []interface{}{
"SHOW_test", "CREATE TABLE `SHOW_test` (\n `id` int(11) NOT NULL AUTO_INCREMENT,\n `c1` int(11) DEFAULT NULL COMMENT 'c1_comment',\n `c2` int(11) DEFAULT NULL,\n `c3` int(11) DEFAULT '1',\n `c4` text CHARSET utf8mb4 COLLATE utf8mb4_bin DEFAULT NULL,\n `c5` tinyint(1) DEFAULT NULL,\n PRIMARY KEY (`id`),\n KEY `idx_wide_c4` (`c3`,`c4`(10))\n) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin AUTO_INCREMENT=28934 COMMENT='table_comment'"}
"SHOW_test", "CREATE TABLE `SHOW_test` (\n `id` int(11) NOT NULL AUTO_INCREMENT,\n `c1` int(11) DEFAULT NULL COMMENT 'c1_comment',\n `c2` int(11) DEFAULT NULL,\n `c3` int(11) DEFAULT '1',\n `c4` text CHARSET utf8 COLLATE utf8_bin DEFAULT NULL,\n `c5` tinyint(1) DEFAULT NULL,\n PRIMARY KEY (`id`),\n KEY `idx_wide_c4` (`c3`,`c4`(10))\n) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin AUTO_INCREMENT=28934 COMMENT='table_comment'"}
for i, r := range row {
c.Check(r, Equals, expectedRow[i])
}
Expand Down
9 changes: 7 additions & 2 deletions server/driver_tidb.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/parser/ast"
"github.com/pingcap/parser/auth"
"github.com/pingcap/parser/charset"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/parser/terror"
"github.com/pingcap/tidb/kv"
Expand Down Expand Up @@ -427,10 +428,14 @@ func convertColumnInfo(fld *ast.ResultField) (ci *ColumnInfo) {
// * gb2312, the multiple is 2
// * Utf-8, the multiple is 3
// * utf8mb4, the multiple is 4
// So the large enough multiple is 4 in here.
// We used to check non-string types to avoid the truncation problem in some MySQL
// client such as Navicat. Now we only allow string type enter this branch.
ci.ColumnLength = ci.ColumnLength * mysql.MaxBytesOfCharacter
charsetDesc, err := charset.GetCharsetDesc(fld.Column.Charset)
if err != nil {
ci.ColumnLength = ci.ColumnLength * 4
} else {
ci.ColumnLength = ci.ColumnLength * uint32(charsetDesc.Maxlen)
}
}

if fld.Column.Decimal == types.UnspecifiedLength {
Expand Down

0 comments on commit 63936cc

Please sign in to comment.