Skip to content

Commit

Permalink
ddl, planner, executor: implement CREATE TABLE ... SELECT
Browse files Browse the repository at this point in the history
  • Loading branch information
bb7133 committed Dec 23, 2018
1 parent ef7082d commit ac61d95
Show file tree
Hide file tree
Showing 17 changed files with 927 additions and 165 deletions.
7 changes: 4 additions & 3 deletions ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,16 +209,17 @@ var (
type DDL interface {
CreateSchema(ctx sessionctx.Context, name model.CIStr, charsetInfo *ast.CharsetOpt) error
DropSchema(ctx sessionctx.Context, schema model.CIStr) error
CreateTable(ctx sessionctx.Context, stmt *ast.CreateTableStmt) error
CreateTable(ctx sessionctx.Context, stmt *ast.CreateTableStmt, withSelect bool) (int64, error)
CreateView(ctx sessionctx.Context, stmt *ast.CreateViewStmt) error
CreateTableWithLike(ctx sessionctx.Context, ident, referIdent ast.Ident, ifNotExists bool) error
DropTable(ctx sessionctx.Context, tableIdent ast.Ident) (err error)
CreateTableWithLike(ctx sessionctx.Context, ident, referIdent ast.Ident, ifNotExists bool) (int64, error)
DropTable(ctx sessionctx.Context, tableIdent ast.Ident, tableId int64) (err error)
CreateIndex(ctx sessionctx.Context, tableIdent ast.Ident, unique bool, indexName model.CIStr,
columnNames []*ast.IndexColName, indexOption *ast.IndexOption) error
DropIndex(ctx sessionctx.Context, tableIdent ast.Ident, indexName model.CIStr) error
AlterTable(ctx sessionctx.Context, tableIdent ast.Ident, spec []*ast.AlterTableSpec) error
TruncateTable(ctx sessionctx.Context, tableIdent ast.Ident) error
RenameTable(ctx sessionctx.Context, oldTableIdent, newTableIdent ast.Ident, isAlterTable bool) error
RevealTable(ctx sessionctx.Context, schemaName model.CIStr, tableInfo *model.TableInfo) error

// GetLease returns current schema lease time.
GetLease() time.Duration
Expand Down
86 changes: 55 additions & 31 deletions ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ func setCharsetCollationFlenDecimal(tp *types.FieldType) error {
return nil
}

// buildColumnAndConstraint builds table.Column from ast.ColumnDef and ast.Constraint
// outPriKeyConstraint is the primary key constraint out of column definition. such as: create table t1 (id int , age int, primary key(id));
func buildColumnAndConstraint(ctx sessionctx.Context, offset int,
colDef *ast.ColumnDef, outPriKeyConstraint *ast.Constraint) (*table.Column, []*ast.Constraint, error) {
Expand Down Expand Up @@ -844,26 +845,26 @@ func buildTableInfo(ctx sessionctx.Context, d *ddl, tableName model.CIStr, cols
return
}

func (d *ddl) CreateTableWithLike(ctx sessionctx.Context, ident, referIdent ast.Ident, ifNotExists bool) error {
func (d *ddl) CreateTableWithLike(ctx sessionctx.Context, ident, referIdent ast.Ident, ifNotExists bool) (int64, error) {
is := d.GetInformationSchema(ctx)
_, ok := is.SchemaByName(referIdent.Schema)
if !ok {
return infoschema.ErrTableNotExists.GenWithStackByArgs(referIdent.Schema, referIdent.Name)
return 0, infoschema.ErrTableNotExists.GenWithStackByArgs(referIdent.Schema, referIdent.Name)
}
referTbl, err := is.TableByName(referIdent.Schema, referIdent.Name)
if err != nil {
return infoschema.ErrTableNotExists.GenWithStackByArgs(referIdent.Schema, referIdent.Name)
return 0, infoschema.ErrTableNotExists.GenWithStackByArgs(referIdent.Schema, referIdent.Name)
}
schema, ok := is.SchemaByName(ident.Schema)
if !ok {
return infoschema.ErrDatabaseNotExists.GenWithStackByArgs(ident.Schema)
return 0, infoschema.ErrDatabaseNotExists.GenWithStackByArgs(ident.Schema)
}
if is.TableExists(ident.Schema, ident.Name) {
if ifNotExists {
ctx.GetSessionVars().StmtCtx.AppendNote(infoschema.ErrTableExists.GenWithStackByArgs(ident))
return nil
return 0, nil
}
return infoschema.ErrTableExists.GenWithStackByArgs(ident)
return 0, infoschema.ErrTableExists.GenWithStackByArgs(ident)
}

tblInfo := *referTbl.Meta()
Expand All @@ -872,22 +873,22 @@ func (d *ddl) CreateTableWithLike(ctx sessionctx.Context, ident, referIdent ast.
tblInfo.ForeignKeys = nil
tblInfo.ID, err = d.genGlobalID()
if err != nil {
return errors.Trace(err)
return 0, errors.Trace(err)
}
job := &model.Job{
SchemaID: schema.ID,
TableID: tblInfo.ID,
Type: model.ActionCreateTable,
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{tblInfo},
Args: []interface{}{tblInfo, false /*withSelect*/},
}

err = d.doDDLJob(ctx, job)
err = d.callHookOnChanged(err)
return errors.Trace(err)
return tblInfo.ID, errors.Trace(err)
}

func (d *ddl) CreateTable(ctx sessionctx.Context, s *ast.CreateTableStmt) (err error) {
func (d *ddl) CreateTable(ctx sessionctx.Context, s *ast.CreateTableStmt, withSelect bool) (id int64, err error) {
ident := ast.Ident{Schema: s.Table.Schema, Name: s.Table.Name}
if s.ReferTable != nil {
referIdent := ast.Ident{Schema: s.ReferTable.Schema, Name: s.ReferTable.Name}
Expand All @@ -897,58 +898,58 @@ func (d *ddl) CreateTable(ctx sessionctx.Context, s *ast.CreateTableStmt) (err e
is := d.GetInformationSchema(ctx)
schema, ok := is.SchemaByName(ident.Schema)
if !ok {
return infoschema.ErrDatabaseNotExists.GenWithStackByArgs(ident.Schema)
return 0, infoschema.ErrDatabaseNotExists.GenWithStackByArgs(ident.Schema)
}
if is.TableExists(ident.Schema, ident.Name) {
if s.IfNotExists {
ctx.GetSessionVars().StmtCtx.AppendNote(infoschema.ErrTableExists.GenWithStackByArgs(ident))
return nil
return 0, nil
}
return infoschema.ErrTableExists.GenWithStackByArgs(ident)
return 0, infoschema.ErrTableExists.GenWithStackByArgs(ident)
}

var colObjects []interface{}
for _, col := range colDefs {
colObjects = append(colObjects, col)
}
if err = checkTooLongTable(ident.Name); err != nil {
return errors.Trace(err)
return 0, errors.Trace(err)
}
if err = checkDuplicateColumn(colObjects); err != nil {
return errors.Trace(err)
}
if err = checkGeneratedColumn(colDefs); err != nil {
return errors.Trace(err)
return 0, errors.Trace(err)
}
if err = checkTooLongColumn(colObjects); err != nil {
return errors.Trace(err)
}
if err = checkTooManyColumns(colDefs); err != nil {
return errors.Trace(err)
return 0, errors.Trace(err)
}

if err = checkColumnsAttributes(colDefs); err != nil {
return errors.Trace(err)
return 0, errors.Trace(err)
}

cols, newConstraints, err := buildColumnsAndConstraints(ctx, colDefs, s.Constraints)
if err != nil {
return errors.Trace(err)
return 0, errors.Trace(err)
}

err = checkConstraintNames(newConstraints)
if err != nil {
return errors.Trace(err)
return 0, errors.Trace(err)
}

tbInfo, err := buildTableInfo(ctx, d, ident.Name, cols, newConstraints)
if err != nil {
return errors.Trace(err)
return 0, errors.Trace(err)
}

pi, err := buildTablePartitionInfo(ctx, d, s)
if err != nil {
return errors.Trace(err)
return 0, errors.Trace(err)
}

if pi != nil {
Expand All @@ -972,16 +973,16 @@ func (d *ddl) CreateTable(ctx sessionctx.Context, s *ast.CreateTableStmt) (err e
TableID: tbInfo.ID,
Type: model.ActionCreateTable,
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{tbInfo},
Args: []interface{}{tbInfo, withSelect},
}

err = handleTableOptions(s.Options, tbInfo)
if err != nil {
return errors.Trace(err)
return 0, errors.Trace(err)
}
err = checkCharsetAndCollation(tbInfo.Charset, tbInfo.Collate)
if err != nil {
return errors.Trace(err)
return 0, errors.Trace(err)
}
if len(tbInfo.Charset) != 0 && strings.ToLower(tbInfo.Charset) != mysql.DefaultCharset {
ctx.GetSessionVars().StmtCtx.AppendWarning(errors.Errorf(`TiDB only supports the "utf8mb4" character set, so "%s" does not take effect.`, tbInfo.Charset))
Expand All @@ -998,10 +999,10 @@ func (d *ddl) CreateTable(ctx sessionctx.Context, s *ast.CreateTableStmt) (err e

// table exists, but if_not_exists flags is true, so we ignore this error.
if infoschema.ErrTableExists.Equal(err) && s.IfNotExists {
return nil
return tbInfo.ID, nil
}
err = d.callHookOnChanged(err)
return errors.Trace(err)
return tbInfo.ID, errors.Trace(err)
}

func (d *ddl) CreateView(ctx sessionctx.Context, s *ast.CreateViewStmt) (err error) {
Expand Down Expand Up @@ -2188,21 +2189,24 @@ func (d *ddl) RenameIndex(ctx sessionctx.Context, ident ast.Ident, spec *ast.Alt
}

// DropTable will proceed even if some table in the list does not exists.
func (d *ddl) DropTable(ctx sessionctx.Context, ti ast.Ident) (err error) {
func (d *ddl) DropTable(ctx sessionctx.Context, ti ast.Ident, tableId int64) (err error) {
is := d.GetInformationSchema(ctx)
schema, ok := is.SchemaByName(ti.Schema)
if !ok {
return infoschema.ErrDatabaseNotExists.GenWithStackByArgs(ti.Schema)
}

tb, err := is.TableByName(ti.Schema, ti.Name)
if err != nil {
return errors.Trace(infoschema.ErrTableNotExists.GenWithStackByArgs(ti.Schema, ti.Name))
if tableId == 0 {
tb, err := is.TableByName(ti.Schema, ti.Name)
if err != nil {
return errors.Trace(infoschema.ErrTableNotExists.GenWithStackByArgs(ti.Schema, ti.Name))
}
tableId = tb.Meta().ID
}

job := &model.Job{
SchemaID: schema.ID,
TableID: tb.Meta().ID,
TableID: tableId,
Type: model.ActionDropTable,
BinlogInfo: &model.HistoryInfo{},
}
Expand Down Expand Up @@ -2534,3 +2538,23 @@ func buildPartitionInfo(meta *model.TableInfo, d *ddl, spec *ast.AlterTableSpec)
}
return part, nil
}

func (d *ddl) RevealTable(ctx sessionctx.Context, schemaName model.CIStr, tableInfo *model.TableInfo) error {
is := d.GetInformationSchema(ctx)
schema, ok := is.SchemaByName(schemaName)
if !ok {
return infoschema.ErrDatabaseNotExists.GenWithStackByArgs(schemaName)
}

job := &model.Job{
SchemaID: schema.ID,
TableID: tableInfo.ID,
Type: model.ActionRevealTable,
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{tableInfo},
}

err := d.doDDLJob(ctx, job)
err = d.callHookOnChanged(err)
return errors.Trace(err)
}
10 changes: 6 additions & 4 deletions ddl/ddl_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,13 +129,15 @@ func (s *testDDLSuite) TestTableError(c *C) {
c.Assert(err, IsNil)

// Args is wrong, so creating table is failed.
doDDLJobErr(c, 1, 1, model.ActionCreateTable, []interface{}{1}, ctx, d)
doDDLJobErr(c, 1, 1, model.ActionCreateTable, []interface{}{1, false}, ctx, d)
doDDLJobErr(c, 1, 1, model.ActionCreateTable, []interface{}{1, true}, ctx, d)
// Schema ID is wrong, so creating table is failed.
doDDLJobErr(c, -1, tblInfo.ID, model.ActionCreateTable, []interface{}{tblInfo}, ctx, d)
doDDLJobErr(c, -1, tblInfo.ID, model.ActionCreateTable, []interface{}{tblInfo, false}, ctx, d)
doDDLJobErr(c, -1, tblInfo.ID, model.ActionCreateTable, []interface{}{tblInfo, true}, ctx, d)
// Table exists, so creating table is failed.
tblInfo.ID = tblInfo.ID + 1
doDDLJobErr(c, dbInfo.ID, tblInfo.ID, model.ActionCreateTable, []interface{}{tblInfo}, ctx, d)

doDDLJobErr(c, dbInfo.ID, tblInfo.ID, model.ActionCreateTable, []interface{}{tblInfo, false}, ctx, d)
doDDLJobErr(c, dbInfo.ID, tblInfo.ID, model.ActionCreateTable, []interface{}{tblInfo, true}, ctx, d)
}

func (s *testDDLSuite) TestViewError(c *C) {
Expand Down
55 changes: 47 additions & 8 deletions ddl/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ import (
func onCreateTable(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error) {
schemaID := job.SchemaID
tbInfo := &model.TableInfo{}
if err := job.DecodeArgs(tbInfo); err != nil {
var withSelect bool
if err := job.DecodeArgs(tbInfo, &withSelect); err != nil {
// Invalid arguments, cancel this job.
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
Expand All @@ -43,32 +44,69 @@ func onCreateTable(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error)
tbInfo.State = model.StateNone
err := checkTableNotExists(t, job, schemaID, tbInfo.Name.L)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}

ver, err = updateSchemaVersion(t, job)
if err != nil {
return ver, errors.Trace(err)
}

originalState := job.SchemaState
switch tbInfo.State {
case model.StateNone:
// none -> public
tbInfo.State = model.StatePublic
if withSelect {
tbInfo.State = model.StateWriteOnly
} else {
tbInfo.State = model.StatePublic
}
tbInfo.UpdateTS = t.StartTS
err = t.CreateTable(schemaID, tbInfo)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
if atomic.LoadUint32(&EnableSplitTableRegion) != 0 {
// TODO: Add restrictions to this operation.
go splitTableRegion(d.store, tbInfo.ID)
}
ver, err := updateVersionAndTableInfo(t, job, tbInfo, originalState != tbInfo.State)
if err != nil {
job.State = model.JobStateCancelled
return ver, err
}
// Finish this job.
job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tbInfo)
job.FinishTableJob(model.JobStateDone, tbInfo.State, ver, tbInfo)
asyncNotifyEvent(d, &util.Event{Tp: model.ActionCreateTable, TableInfo: tbInfo})
return ver, nil
default:
job.State = model.JobStateCancelled
return ver, ErrInvalidTableState.GenWithStack("invalid table state %v", tbInfo.State)
}
}

func onRevealTable(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error) {
tbInfo := &model.TableInfo{}
if err := job.DecodeArgs(tbInfo); err != nil {
// Invalid arguments, cancel this job.
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}

originalState := job.SchemaState
switch tbInfo.State {
case model.StateWriteOnly:
// write_only -> public
tbInfo.State = model.StatePublic
tbInfo.UpdateTS = t.StartTS
// Finish this job.
ver, err := updateVersionAndTableInfo(t, job, tbInfo, originalState != tbInfo.State)
if err != nil {
job.State = model.JobStateCancelled
return ver, err
}
job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tbInfo)
asyncNotifyEvent(d, &util.Event{Tp: model.ActionRevealTable, TableInfo: tbInfo})
return ver, err
default:
job.State = model.JobStateCancelled
return ver, ErrInvalidTableState.GenWithStack("invalid table state %v", tbInfo.State)
}
}
Expand Down Expand Up @@ -477,6 +515,7 @@ func updateVersionAndTableInfo(t *meta.Meta, job *model.Job, tblInfo *model.Tabl
return ver, t.UpdateTable(job.SchemaID, tblInfo)
}

// onAddTablePartition handle ActionAddTablePartition DDL job
// TODO: It may have the issue when two clients concurrently add partitions to a table.
func onAddTablePartition(t *meta.Meta, job *model.Job) (ver int64, _ error) {
partInfo := &model.PartitionInfo{}
Expand Down
Loading

0 comments on commit ac61d95

Please sign in to comment.