Skip to content

Commit

Permalink
ddl, planner, executor: implement CREATE TABLE ... SELECT
Browse files Browse the repository at this point in the history
  • Loading branch information
bb7133 committed Sep 28, 2018
1 parent 6a1e94f commit 5f72440
Show file tree
Hide file tree
Showing 20 changed files with 933 additions and 165 deletions.
7 changes: 4 additions & 3 deletions ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,15 +196,16 @@ var (
type DDL interface {
CreateSchema(ctx sessionctx.Context, name model.CIStr, charsetInfo *ast.CharsetOpt) error
DropSchema(ctx sessionctx.Context, schema model.CIStr) error
CreateTable(ctx sessionctx.Context, stmt *ast.CreateTableStmt) error
CreateTableWithLike(ctx sessionctx.Context, ident, referIdent ast.Ident, ifNotExists bool) error
DropTable(ctx sessionctx.Context, tableIdent ast.Ident) (err error)
CreateTable(ctx sessionctx.Context, stmt *ast.CreateTableStmt, withSelect bool) (int64, error)
CreateTableWithLike(ctx sessionctx.Context, ident, referIdent ast.Ident, ifNotExists bool) (int64, error)
DropTable(ctx sessionctx.Context, tableIdent ast.Ident, tableId int64) (err error)
CreateIndex(ctx sessionctx.Context, tableIdent ast.Ident, unique bool, indexName model.CIStr,
columnNames []*ast.IndexColName, indexOption *ast.IndexOption) error
DropIndex(ctx sessionctx.Context, tableIdent ast.Ident, indexName model.CIStr) error
AlterTable(ctx sessionctx.Context, tableIdent ast.Ident, spec []*ast.AlterTableSpec) error
TruncateTable(ctx sessionctx.Context, tableIdent ast.Ident) error
RenameTable(ctx sessionctx.Context, oldTableIdent, newTableIdent ast.Ident) error
RevealTable(ctx sessionctx.Context, schemaName model.CIStr, tableInfo *model.TableInfo) error

// GetLease returns current schema lease time.
GetLease() time.Duration
Expand Down
102 changes: 63 additions & 39 deletions ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,7 @@ func setCharsetCollationFlenDecimal(tp *types.FieldType) error {
return nil
}

// buildColumnAndConstraint builds table.Column from ast.ColumnDef and ast.Constraint
// outPriKeyConstraint is the primary key constraint out of column definition. such as: create table t1 (id int , age int, primary key(id));
func buildColumnAndConstraint(ctx sessionctx.Context, offset int,
colDef *ast.ColumnDef, outPriKeyConstraint *ast.Constraint) (*table.Column, []*ast.Constraint, error) {
Expand Down Expand Up @@ -805,26 +806,26 @@ func buildTableInfo(ctx sessionctx.Context, d *ddl, tableName model.CIStr, cols
return
}

func (d *ddl) CreateTableWithLike(ctx sessionctx.Context, ident, referIdent ast.Ident, ifNotExists bool) error {
func (d *ddl) CreateTableWithLike(ctx sessionctx.Context, ident, referIdent ast.Ident, ifNotExists bool) (int64, error) {
is := d.GetInformationSchema(ctx)
_, ok := is.SchemaByName(referIdent.Schema)
if !ok {
return infoschema.ErrTableNotExists.GenWithStackByArgs(referIdent.Schema, referIdent.Name)
return 0, infoschema.ErrTableNotExists.GenWithStackByArgs(referIdent.Schema, referIdent.Name)
}
referTbl, err := is.TableByName(referIdent.Schema, referIdent.Name)
if err != nil {
return infoschema.ErrTableNotExists.GenWithStackByArgs(referIdent.Schema, referIdent.Name)
return 0, infoschema.ErrTableNotExists.GenWithStackByArgs(referIdent.Schema, referIdent.Name)
}
schema, ok := is.SchemaByName(ident.Schema)
if !ok {
return infoschema.ErrDatabaseNotExists.GenWithStackByArgs(ident.Schema)
return 0, infoschema.ErrDatabaseNotExists.GenWithStackByArgs(ident.Schema)
}
if is.TableExists(ident.Schema, ident.Name) {
if ifNotExists {
ctx.GetSessionVars().StmtCtx.AppendNote(infoschema.ErrTableExists.GenWithStackByArgs(ident))
return nil
return 0, nil
}
return infoschema.ErrTableExists.GenWithStackByArgs(ident)
return 0, infoschema.ErrTableExists.GenWithStackByArgs(ident)
}

tblInfo := *referTbl.Meta()
Expand All @@ -833,22 +834,22 @@ func (d *ddl) CreateTableWithLike(ctx sessionctx.Context, ident, referIdent ast.
tblInfo.ForeignKeys = nil
tblInfo.ID, err = d.genGlobalID()
if err != nil {
return errors.Trace(err)
return 0, errors.Trace(err)
}
job := &model.Job{
SchemaID: schema.ID,
TableID: tblInfo.ID,
Type: model.ActionCreateTable,
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{tblInfo},
Args: []interface{}{tblInfo, false /*withSelect*/},
}

err = d.doDDLJob(ctx, job)
err = d.callHookOnChanged(err)
return errors.Trace(err)
return tblInfo.ID, errors.Trace(err)
}

func (d *ddl) CreateTable(ctx sessionctx.Context, s *ast.CreateTableStmt) (err error) {
func (d *ddl) CreateTable(ctx sessionctx.Context, s *ast.CreateTableStmt, withSelect bool) (id int64, err error) {
ident := ast.Ident{Schema: s.Table.Schema, Name: s.Table.Name}
if s.ReferTable != nil {
referIdent := ast.Ident{Schema: s.ReferTable.Schema, Name: s.ReferTable.Name}
Expand All @@ -858,81 +859,81 @@ func (d *ddl) CreateTable(ctx sessionctx.Context, s *ast.CreateTableStmt) (err e
is := d.GetInformationSchema(ctx)
schema, ok := is.SchemaByName(ident.Schema)
if !ok {
return infoschema.ErrDatabaseNotExists.GenWithStackByArgs(ident.Schema)
return 0, infoschema.ErrDatabaseNotExists.GenWithStackByArgs(ident.Schema)
}
if is.TableExists(ident.Schema, ident.Name) {
if s.IfNotExists {
ctx.GetSessionVars().StmtCtx.AppendNote(infoschema.ErrTableExists.GenWithStackByArgs(ident))
return nil
return 0, nil
}
return infoschema.ErrTableExists.GenWithStackByArgs(ident)
return 0, infoschema.ErrTableExists.GenWithStackByArgs(ident)
}
if err = checkTooLongTable(ident.Name); err != nil {
return errors.Trace(err)
return 0, errors.Trace(err)
}
if err = checkDuplicateColumn(colDefs); err != nil {
return errors.Trace(err)
return 0, errors.Trace(err)
}
if err = checkGeneratedColumn(colDefs); err != nil {
return errors.Trace(err)
return 0, errors.Trace(err)
}
if err = checkTooLongColumn(colDefs); err != nil {
return errors.Trace(err)
return 0, errors.Trace(err)
}
if err = checkTooManyColumns(colDefs); err != nil {
return errors.Trace(err)
return 0, errors.Trace(err)
}

if err = checkColumnsAttributes(colDefs); err != nil {
return errors.Trace(err)
return 0, errors.Trace(err)
}

cols, newConstraints, err := buildColumnsAndConstraints(ctx, colDefs, s.Constraints)
if err != nil {
return errors.Trace(err)
return 0, errors.Trace(err)
}

err = checkConstraintNames(newConstraints)
if err != nil {
return errors.Trace(err)
return 0, errors.Trace(err)
}

tbInfo, err := buildTableInfo(ctx, d, ident.Name, cols, newConstraints)
if err != nil {
return errors.Trace(err)
return 0, errors.Trace(err)
}

pi, err := buildTablePartitionInfo(ctx, d, s)
if err != nil {
return errors.Trace(err)
return 0, errors.Trace(err)
}

if pi != nil && pi.Type == model.PartitionTypeRange {
// Only range type partition is now supported.
// Range columns partition only implements the parser, so it will not be checked.
if s.Partition.ColumnNames == nil {
if err = checkPartitionNameUnique(tbInfo, pi); err != nil {
return errors.Trace(err)
return 0, errors.Trace(err)
}

if err = checkCreatePartitionValue(ctx, tbInfo, pi, cols); err != nil {
return errors.Trace(err)
return 0, errors.Trace(err)
}

if err = checkAddPartitionTooManyPartitions(len(pi.Definitions)); err != nil {
return errors.Trace(err)
return 0, errors.Trace(err)
}

if err = checkPartitionFuncValid(ctx, tbInfo, s.Partition.Expr); err != nil {
return errors.Trace(err)
return 0, errors.Trace(err)
}

if err = checkPartitionFuncType(ctx, s, cols, tbInfo); err != nil {
return errors.Trace(err)
return 0, errors.Trace(err)
}

if err = checkRangePartitioningKeysConstraints(ctx, s, tbInfo, newConstraints); err != nil {
return errors.Trace(err)
return 0, errors.Trace(err)
}
}
tbInfo.Partition = pi
Expand All @@ -943,16 +944,16 @@ func (d *ddl) CreateTable(ctx sessionctx.Context, s *ast.CreateTableStmt) (err e
TableID: tbInfo.ID,
Type: model.ActionCreateTable,
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{tbInfo},
Args: []interface{}{tbInfo, withSelect},
}

err = handleTableOptions(s.Options, tbInfo)
if err != nil {
return errors.Trace(err)
return 0, errors.Trace(err)
}
err = checkCharsetAndCollation(tbInfo.Charset, tbInfo.Collate)
if err != nil {
return errors.Trace(err)
return 0, errors.Trace(err)
}
err = d.doDDLJob(ctx, job)
if err == nil {
Expand All @@ -965,10 +966,10 @@ func (d *ddl) CreateTable(ctx sessionctx.Context, s *ast.CreateTableStmt) (err e

// table exists, but if_not_exists flags is true, so we ignore this error.
if infoschema.ErrTableExists.Equal(err) && s.IfNotExists {
return nil
return tbInfo.ID, nil
}
err = d.callHookOnChanged(err)
return errors.Trace(err)
return tbInfo.ID, errors.Trace(err)
}

func checkCharsetAndCollation(cs string, co string) error {
Expand Down Expand Up @@ -1830,21 +1831,24 @@ func (d *ddl) RenameIndex(ctx sessionctx.Context, ident ast.Ident, spec *ast.Alt
}

// DropTable will proceed even if some table in the list does not exists.
func (d *ddl) DropTable(ctx sessionctx.Context, ti ast.Ident) (err error) {
func (d *ddl) DropTable(ctx sessionctx.Context, ti ast.Ident, tableId int64) (err error) {
is := d.GetInformationSchema(ctx)
schema, ok := is.SchemaByName(ti.Schema)
if !ok {
return infoschema.ErrDatabaseNotExists.GenWithStackByArgs(ti.Schema)
}

tb, err := is.TableByName(ti.Schema, ti.Name)
if err != nil {
return errors.Trace(infoschema.ErrTableNotExists.GenWithStackByArgs(ti.Schema, ti.Name))
if tableId == 0 {
tb, err := is.TableByName(ti.Schema, ti.Name)
if err != nil {
return errors.Trace(infoschema.ErrTableNotExists.GenWithStackByArgs(ti.Schema, ti.Name))
}
tableId = tb.Meta().ID
}

job := &model.Job{
SchemaID: schema.ID,
TableID: tb.Meta().ID,
TableID: tableId,
Type: model.ActionDropTable,
BinlogInfo: &model.HistoryInfo{},
}
Expand Down Expand Up @@ -2160,3 +2164,23 @@ func buildPartitionInfo(meta *model.TableInfo, d *ddl, spec *ast.AlterTableSpec)
}
return part, nil
}

func (d *ddl) RevealTable(ctx sessionctx.Context, schemaName model.CIStr, tableInfo *model.TableInfo) error {
is := d.GetInformationSchema(ctx)
schema, ok := is.SchemaByName(schemaName)
if !ok {
return infoschema.ErrDatabaseNotExists.GenWithStackByArgs(schemaName)
}

job := &model.Job{
SchemaID: schema.ID,
TableID: tableInfo.ID,
Type: model.ActionRevealTable,
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{tableInfo},
}

err := d.doDDLJob(ctx, job)
err = d.callHookOnChanged(err)
return errors.Trace(err)
}
2 changes: 2 additions & 0 deletions ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -507,6 +507,8 @@ func (w *worker) runDDLJob(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64,
ver, err = onModifyTableComment(t, job)
case model.ActionAddTablePartition:
ver, err = onAddTablePartition(t, job)
case model.ActionRevealTable:
ver, err = onRevealTable(d, t, job)
default:
// Invalid job, cancel it.
job.State = model.JobStateCancelled
Expand Down
10 changes: 6 additions & 4 deletions ddl/ddl_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,13 +127,15 @@ func (s *testDDLSuite) TestTableError(c *C) {
c.Assert(err, IsNil)

// Args is wrong, so creating table is failed.
doDDLJobErr(c, 1, 1, model.ActionCreateTable, []interface{}{1}, ctx, d)
doDDLJobErr(c, 1, 1, model.ActionCreateTable, []interface{}{1, false}, ctx, d)
doDDLJobErr(c, 1, 1, model.ActionCreateTable, []interface{}{1, true}, ctx, d)
// Schema ID is wrong, so creating table is failed.
doDDLJobErr(c, -1, tblInfo.ID, model.ActionCreateTable, []interface{}{tblInfo}, ctx, d)
doDDLJobErr(c, -1, tblInfo.ID, model.ActionCreateTable, []interface{}{tblInfo, false}, ctx, d)
doDDLJobErr(c, -1, tblInfo.ID, model.ActionCreateTable, []interface{}{tblInfo, true}, ctx, d)
// Table exists, so creating table is failed.
tblInfo.ID = tblInfo.ID + 1
doDDLJobErr(c, dbInfo.ID, tblInfo.ID, model.ActionCreateTable, []interface{}{tblInfo}, ctx, d)

doDDLJobErr(c, dbInfo.ID, tblInfo.ID, model.ActionCreateTable, []interface{}{tblInfo, false}, ctx, d)
doDDLJobErr(c, dbInfo.ID, tblInfo.ID, model.ActionCreateTable, []interface{}{tblInfo, true}, ctx, d)
}

func (s *testDDLSuite) TestForeignKeyError(c *C) {
Expand Down
Loading

0 comments on commit 5f72440

Please sign in to comment.