Skip to content

Commit

Permalink
ddl: disallow alter table on view (pingcap#8890)
Browse files Browse the repository at this point in the history
  • Loading branch information
AndrewDi authored and ngaut committed Jan 16, 2019
1 parent f522de2 commit 59c7b69
Show file tree
Hide file tree
Showing 6 changed files with 60 additions and 64 deletions.
5 changes: 5 additions & 0 deletions ddl/db_change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"github.com/pingcap/tidb/util/sqlexec"
"github.com/pingcap/tidb/util/testkit"
"github.com/pingcap/tidb/util/testleak"
log "github.com/sirupsen/logrus"
)

var _ = Suite(&testStateChangeSuite{})
Expand Down Expand Up @@ -917,6 +918,8 @@ func (s *testStateChangeSuite) TestParallelDDLBeforeRunDDLJob(c *C) {
info = is
break
}
// Print log to notify if TestParallelDDLBeforeRunDDLJob hang up
log.Infof("time.Sleep(%v) in TestParallelDDLBeforeRunDDLJob", interval)
time.Sleep(interval)
}

Expand All @@ -928,6 +931,8 @@ func (s *testStateChangeSuite) TestParallelDDLBeforeRunDDLJob(c *C) {
if currID == firstConnID || seCnt == finishedCnt {
break
}
// Print log to notify if TestParallelDDLBeforeRunDDLJob hang up
log.Infof("time.Sleep(%v) in TestParallelDDLBeforeRunDDLJob", interval)
time.Sleep(interval)
}

Expand Down
11 changes: 7 additions & 4 deletions ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,9 +202,10 @@ var (
ErrCoalesceOnlyOnHashPartition = terror.ClassDDL.New(codeCoalesceOnlyOnHashPartition, mysql.MySQLErrName[mysql.ErrCoalesceOnlyOnHashPartition])
// ErrViewWrongList returns create view must include all columns in the select clause
ErrViewWrongList = terror.ClassDDL.New(codeViewWrongList, mysql.MySQLErrName[mysql.ErrViewWrongList])

// ErrTableIsNotView returns for table is not base table.
// ErrTableIsNotView returns for table is not view.
ErrTableIsNotView = terror.ClassDDL.New(codeErrWrongObject, "'%s.%s' is not VIEW")
// ErrTableIsNotBaseTable returns for table is not base table.
ErrTableIsNotBaseTable = terror.ClassDDL.New(codeErrWrongObject, "'%s.%s' is not BASE TABLE")
)

// DDL is responsible for updating schema in data store and maintaining in-memory InfoSchema cache.
Expand Down Expand Up @@ -459,8 +460,10 @@ func (d *ddl) GetLease() time.Duration {
return lease
}

// GetInformationSchema gets the infoschema binding to d. It's expoted for testing.
func (d *ddl) GetInformationSchema(ctx sessionctx.Context) infoschema.InfoSchema {
// GetInfoSchemaWithInterceptor gets the infoschema binding to d. It's exported for testing.
// Please don't use this function, it is used by TestParallelDDLBeforeRunDDLJob to intercept the calling of d.infoHandle.Get(), use d.infoHandle.Get() instead.
// Otherwise, the TestParallelDDLBeforeRunDDLJob will hang up forever.
func (d *ddl) GetInfoSchemaWithInterceptor(ctx sessionctx.Context) infoschema.InfoSchema {
is := d.infoHandle.Get()

d.mu.RLock()
Expand Down
88 changes: 28 additions & 60 deletions ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ import (
)

func (d *ddl) CreateSchema(ctx sessionctx.Context, schema model.CIStr, charsetInfo *ast.CharsetOpt) (err error) {
is := d.GetInformationSchema(ctx)
is := d.GetInfoSchemaWithInterceptor(ctx)
_, ok := is.SchemaByName(schema)
if ok {
return infoschema.ErrDatabaseExists.GenWithStackByArgs(schema)
Expand Down Expand Up @@ -86,7 +86,7 @@ func (d *ddl) CreateSchema(ctx sessionctx.Context, schema model.CIStr, charsetIn
}

func (d *ddl) DropSchema(ctx sessionctx.Context, schema model.CIStr) (err error) {
is := d.GetInformationSchema(ctx)
is := d.GetInfoSchemaWithInterceptor(ctx)
old, ok := is.SchemaByName(schema)
if !ok {
return errors.Trace(infoschema.ErrDatabaseNotExists)
Expand Down Expand Up @@ -903,7 +903,7 @@ func buildTableInfo(ctx sessionctx.Context, d *ddl, tableName model.CIStr, cols
}

func (d *ddl) CreateTableWithLike(ctx sessionctx.Context, ident, referIdent ast.Ident, ifNotExists bool) error {
is := d.GetInformationSchema(ctx)
is := d.GetInfoSchemaWithInterceptor(ctx)
_, ok := is.SchemaByName(referIdent.Schema)
if !ok {
return infoschema.ErrTableNotExists.GenWithStackByArgs(referIdent.Schema, referIdent.Name)
Expand Down Expand Up @@ -1024,7 +1024,7 @@ func (d *ddl) CreateTable(ctx sessionctx.Context, s *ast.CreateTableStmt) (err e
referIdent := ast.Ident{Schema: s.ReferTable.Schema, Name: s.ReferTable.Name}
return d.CreateTableWithLike(ctx, ident, referIdent, s.IfNotExists)
}
is := d.GetInformationSchema(ctx)
is := d.GetInfoSchemaWithInterceptor(ctx)
schema, ok := is.SchemaByName(ident.Schema)
if !ok {
return infoschema.ErrDatabaseNotExists.GenWithStackByArgs(ident.Schema)
Expand Down Expand Up @@ -1080,7 +1080,7 @@ func (d *ddl) CreateTable(ctx sessionctx.Context, s *ast.CreateTableStmt) (err e
}

func (d *ddl) RestoreTable(ctx sessionctx.Context, tbInfo *model.TableInfo, schemaID, autoID, dropJobID int64, snapshotTS uint64) (err error) {
is := d.GetInformationSchema(ctx)
is := d.GetInfoSchemaWithInterceptor(ctx)
// Check schema exist.
schema, ok := is.SchemaByID(schemaID)
if !ok {
Expand Down Expand Up @@ -1108,7 +1108,7 @@ func (d *ddl) RestoreTable(ctx sessionctx.Context, tbInfo *model.TableInfo, sche

func (d *ddl) CreateView(ctx sessionctx.Context, s *ast.CreateViewStmt) (err error) {
ident := ast.Ident{Name: s.ViewName.Name, Schema: s.ViewName.Schema}
is := d.GetInformationSchema(ctx)
is := d.GetInfoSchemaWithInterceptor(ctx)
schema, ok := is.SchemaByName(ident.Schema)
if !ok {
return infoschema.ErrDatabaseNotExists.GenWithStackByArgs(ident.Schema)
Expand Down Expand Up @@ -1371,6 +1371,11 @@ func (d *ddl) AlterTable(ctx sessionctx.Context, ident ast.Ident, specs []*ast.A
return errRunMultiSchemaChanges
}

is := d.infoHandle.Get()
if is.TableIsView(ident.Schema, ident.Name) {
return ErrTableIsNotBaseTable.GenWithStackByArgs(ident.Schema, ident.Name)
}

for _, spec := range validSpecs {
var handledCharsetOrCollate bool
switch spec.Tp {
Expand Down Expand Up @@ -1462,14 +1467,9 @@ func (d *ddl) AlterTable(ctx sessionctx.Context, ident ast.Ident, specs []*ast.A
}

func (d *ddl) RebaseAutoID(ctx sessionctx.Context, ident ast.Ident, newBase int64) error {
is := d.GetInformationSchema(ctx)
schema, ok := is.SchemaByName(ident.Schema)
if !ok {
return infoschema.ErrDatabaseNotExists.GenWithStackByArgs(ident.Schema)
}
t, err := is.TableByName(ident.Schema, ident.Name)
schema, t, err := d.getSchemaAndTableByIdent(ctx, ident)
if err != nil {
return errors.Trace(infoschema.ErrTableNotExists.GenWithStackByArgs(ident.Schema, ident.Name))
return errors.Trace(err)
}
autoIncID, err := t.Allocator(ctx).NextGlobalAutoID(t.Meta().ID)
if err != nil {
Expand Down Expand Up @@ -1510,7 +1510,7 @@ func (d *ddl) ShardRowID(ctx sessionctx.Context, tableIdent ast.Ident, uVal uint
}

func (d *ddl) getSchemaAndTableByIdent(ctx sessionctx.Context, tableIdent ast.Ident) (dbInfo *model.DBInfo, t table.Table, err error) {
is := d.GetInformationSchema(ctx)
is := d.GetInfoSchemaWithInterceptor(ctx)
schema, ok := is.SchemaByName(tableIdent.Schema)
if !ok {
return nil, nil, infoschema.ErrDatabaseNotExists.GenWithStackByArgs(tableIdent.Schema)
Expand Down Expand Up @@ -1551,14 +1551,9 @@ func (d *ddl) AddColumn(ctx sessionctx.Context, ti ast.Ident, spec *ast.AlterTab
return errors.Trace(err)
}

is := d.GetInformationSchema(ctx)
schema, ok := is.SchemaByName(ti.Schema)
if !ok {
return errors.Trace(infoschema.ErrDatabaseNotExists)
}
t, err := is.TableByName(ti.Schema, ti.Name)
schema, t, err := d.getSchemaAndTableByIdent(ctx, ti)
if err != nil {
return errors.Trace(infoschema.ErrTableNotExists.GenWithStackByArgs(ti.Schema, ti.Name))
return errors.Trace(err)
}
if err = checkAddColumnTooManyColumns(len(t.Cols()) + 1); err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -1773,14 +1768,9 @@ func (d *ddl) DropTablePartition(ctx sessionctx.Context, ident ast.Ident, spec *

// DropColumn will drop a column from the table, now we don't support drop the column with index covered.
func (d *ddl) DropColumn(ctx sessionctx.Context, ti ast.Ident, colName model.CIStr) error {
is := d.GetInformationSchema(ctx)
schema, ok := is.SchemaByName(ti.Schema)
if !ok {
return errors.Trace(infoschema.ErrDatabaseNotExists)
}
t, err := is.TableByName(ti.Schema, ti.Name)
schema, t, err := d.getSchemaAndTableByIdent(ctx, ti)
if err != nil {
return errors.Trace(infoschema.ErrTableNotExists.GenWithStackByArgs(ti.Schema, ti.Name))
return errors.Trace(err)
}

// Check whether dropped column has existed.
Expand Down Expand Up @@ -2316,15 +2306,9 @@ func (d *ddl) RenameIndex(ctx sessionctx.Context, ident ast.Ident, spec *ast.Alt

// DropTable will proceed even if some table in the list does not exists.
func (d *ddl) DropTable(ctx sessionctx.Context, ti ast.Ident) (err error) {
is := d.GetInformationSchema(ctx)
schema, ok := is.SchemaByName(ti.Schema)
if !ok {
return infoschema.ErrDatabaseNotExists.GenWithStackByArgs(ti.Schema)
}

tb, err := is.TableByName(ti.Schema, ti.Name)
if err != nil || tb.Meta().IsView() {
return infoschema.ErrTableNotExists.GenWithStackByArgs(ti.Schema, ti.Name)
schema, tb, err := d.getSchemaAndTableByIdent(ctx, ti)
if err != nil {
return errors.Trace(err)
}

job := &model.Job{
Expand All @@ -2341,15 +2325,9 @@ func (d *ddl) DropTable(ctx sessionctx.Context, ti ast.Ident) (err error) {

// DropView will proceed even if some view in the list does not exists.
func (d *ddl) DropView(ctx sessionctx.Context, ti ast.Ident) (err error) {
is := d.GetInformationSchema(ctx)
schema, ok := is.SchemaByName(ti.Schema)
if !ok {
return infoschema.ErrDatabaseNotExists.GenWithStackByArgs(ti.Schema)
}

tb, err := is.TableByName(ti.Schema, ti.Name)
schema, tb, err := d.getSchemaAndTableByIdent(ctx, ti)
if err != nil {
return infoschema.ErrTableNotExists.GenWithStackByArgs(ti.Schema, ti.Name)
return errors.Trace(err)
}

if !tb.Meta().IsView() {
Expand All @@ -2369,14 +2347,9 @@ func (d *ddl) DropView(ctx sessionctx.Context, ti ast.Ident) (err error) {
}

func (d *ddl) TruncateTable(ctx sessionctx.Context, ti ast.Ident) error {
is := d.GetInformationSchema(ctx)
schema, ok := is.SchemaByName(ti.Schema)
if !ok {
return infoschema.ErrDatabaseNotExists.GenWithStackByArgs(ti.Schema)
}
tb, err := is.TableByName(ti.Schema, ti.Name)
schema, tb, err := d.getSchemaAndTableByIdent(ctx, ti)
if err != nil {
return errors.Trace(infoschema.ErrTableNotExists.GenWithStackByArgs(ti.Schema, ti.Name))
return errors.Trace(err)
}
newTableID, err := d.genGlobalID()
if err != nil {
Expand All @@ -2395,7 +2368,7 @@ func (d *ddl) TruncateTable(ctx sessionctx.Context, ti ast.Ident) error {
}

func (d *ddl) RenameTable(ctx sessionctx.Context, oldIdent, newIdent ast.Ident, isAlterTable bool) error {
is := d.GetInformationSchema(ctx)
is := d.GetInfoSchemaWithInterceptor(ctx)
oldSchema, ok := is.SchemaByName(oldIdent.Schema)
if !ok {
if isAlterTable {
Expand Down Expand Up @@ -2457,14 +2430,9 @@ func getAnonymousIndex(t table.Table, colName model.CIStr) model.CIStr {

func (d *ddl) CreateIndex(ctx sessionctx.Context, ti ast.Ident, unique bool, indexName model.CIStr,
idxColNames []*ast.IndexColName, indexOption *ast.IndexOption) error {
is := d.infoHandle.Get()
schema, ok := is.SchemaByName(ti.Schema)
if !ok {
return infoschema.ErrDatabaseNotExists.GenWithStackByArgs(ti.Schema)
}
t, err := is.TableByName(ti.Schema, ti.Name)
schema, t, err := d.getSchemaAndTableByIdent(ctx, ti)
if err != nil {
return errors.Trace(infoschema.ErrTableNotExists.GenWithStackByArgs(ti.Schema, ti.Name))
return errors.Trace(err)
}

// Deal with anonymous index.
Expand Down
8 changes: 8 additions & 0 deletions executor/ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,10 @@ func (s *testSuite3) TestAlterTableAddColumn(c *C) {
r.Close()
tk.MustExec("alter table alter_test add column c3 varchar(50) default 'CURRENT_TIMESTAMP'")
tk.MustQuery("select c3 from alter_test").Check(testkit.Rows("CURRENT_TIMESTAMP"))
tk.MustExec("create or replace view alter_view as select c1,c2 from alter_test")
_, err = tk.Exec("alter table alter_view add column c4 varchar(50)")
c.Assert(err.Error(), Equals, ddl.ErrTableIsNotBaseTable.GenWithStackByArgs("test", "alter_view").Error())
tk.MustExec("drop view alter_view")
}

func (s *testSuite3) TestAddNotNullColumnNoDefault(c *C) {
Expand Down Expand Up @@ -305,6 +309,10 @@ func (s *testSuite3) TestAlterTableModifyColumn(c *C) {
createSQL := result.Rows()[0][1]
expected := "CREATE TABLE `mc` (\n `c1` bigint(20) DEFAULT NULL,\n `c2` text CHARSET utf8mb4 COLLATE utf8mb4_bin DEFAULT NULL\n) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin"
c.Assert(createSQL, Equals, expected)
tk.MustExec("create or replace view alter_view as select c1,c2 from mc")
_, err = tk.Exec("alter table alter_view modify column c2 text")
c.Assert(err.Error(), Equals, ddl.ErrTableIsNotBaseTable.GenWithStackByArgs("test", "alter_view").Error())
tk.MustExec("drop view alter_view")
}

func (s *testSuite3) TestDefaultDBAfterDropCurDB(c *C) {
Expand Down
11 changes: 11 additions & 0 deletions infoschema/infoschema.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ type InfoSchema interface {
Clone() (result []*model.DBInfo)
SchemaTables(schema model.CIStr) []table.Table
SchemaMetaVersion() int64
// TableIsView indicates whether the schema.table is a view.
TableIsView(schema, table model.CIStr) bool
}

// Information Schema Name.
Expand Down Expand Up @@ -177,6 +179,15 @@ func (is *infoSchema) TableByName(schema, table model.CIStr) (t table.Table, err
return nil, ErrTableNotExists.GenWithStackByArgs(schema, table)
}

func (is *infoSchema) TableIsView(schema, table model.CIStr) bool {
if tbNames, ok := is.schemaMap[schema.L]; ok {
if t, ok := tbNames.tables[table.L]; ok {
return t.Meta().IsView()
}
}
return false
}

func (is *infoSchema) TableExists(schema, table model.CIStr) bool {
if tbNames, ok := is.schemaMap[schema.L]; ok {
if _, ok = tbNames.tables[table.L]; ok {
Expand Down
1 change: 1 addition & 0 deletions infoschema/infoschema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ func (*testSuite) TestT(c *C) {

c.Assert(is.TableExists(dbName, tbName), IsTrue)
c.Assert(is.TableExists(dbName, noexist), IsFalse)
c.Assert(is.TableIsView(dbName, tbName), IsFalse)

tb, ok := is.TableByID(tbID)
c.Assert(ok, IsTrue)
Expand Down

0 comments on commit 59c7b69

Please sign in to comment.