Skip to content

Commit

Permalink
stats: handle ddl event for partition table (#7903)
Browse files Browse the repository at this point in the history
  • Loading branch information
alivxxx authored Oct 16, 2018
1 parent 660191c commit 95edaf0
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 12 deletions.
46 changes: 34 additions & 12 deletions statistics/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,21 +31,43 @@ import (
func (h *Handle) HandleDDLEvent(t *util.Event) error {
switch t.Tp {
case model.ActionCreateTable, model.ActionTruncateTable:
return h.insertTableStats2KV(t.TableInfo)
ids := getPhysicalIDs(t.TableInfo)
for _, id := range ids {
if err := h.insertTableStats2KV(t.TableInfo, id); err != nil {
return err
}
}
case model.ActionAddColumn:
return h.insertColStats2KV(t.TableInfo.ID, t.ColumnInfo)
ids := getPhysicalIDs(t.TableInfo)
for _, id := range ids {
if err := h.insertColStats2KV(id, t.ColumnInfo); err != nil {
return err
}
}
}
return nil
}

func getPhysicalIDs(tblInfo *model.TableInfo) []int64 {
pi := tblInfo.GetPartitionInfo()
if pi == nil {
return []int64{tblInfo.ID}
}
ids := make([]int64, 0, len(pi.Definitions))
for _, def := range pi.Definitions {
ids = append(ids, def.ID)
}
return ids
}

// DDLEventCh returns ddl events channel in handle.
func (h *Handle) DDLEventCh() chan *util.Event {
return h.ddlEventCh
}

// insertTableStats2KV inserts a record standing for a new table to stats_meta and inserts some records standing for the
// new columns and indices which belong to this table.
func (h *Handle) insertTableStats2KV(info *model.TableInfo) (err error) {
func (h *Handle) insertTableStats2KV(info *model.TableInfo, physicalID int64) (err error) {
h.mu.Lock()
defer h.mu.Unlock()
exec := h.mu.ctx.(sqlexec.SQLExecutor)
Expand All @@ -56,18 +78,18 @@ func (h *Handle) insertTableStats2KV(info *model.TableInfo) (err error) {
defer func() {
err = finishTransaction(context.Background(), exec, err)
}()
_, err = exec.Execute(context.Background(), fmt.Sprintf("insert into mysql.stats_meta (version, table_id) values(%d, %d)", h.mu.ctx.Txn().StartTS(), info.ID))
_, err = exec.Execute(context.Background(), fmt.Sprintf("insert into mysql.stats_meta (version, table_id) values(%d, %d)", h.mu.ctx.Txn().StartTS(), physicalID))
if err != nil {
return
}
for _, col := range info.Columns {
_, err = exec.Execute(context.Background(), fmt.Sprintf("insert into mysql.stats_histograms (table_id, is_index, hist_id, distinct_count, version) values(%d, 0, %d, 0, %d)", info.ID, col.ID, h.mu.ctx.Txn().StartTS()))
_, err = exec.Execute(context.Background(), fmt.Sprintf("insert into mysql.stats_histograms (table_id, is_index, hist_id, distinct_count, version) values(%d, 0, %d, 0, %d)", physicalID, col.ID, h.mu.ctx.Txn().StartTS()))
if err != nil {
return
}
}
for _, idx := range info.Indices {
_, err = exec.Execute(context.Background(), fmt.Sprintf("insert into mysql.stats_histograms (table_id, is_index, hist_id, distinct_count, version) values(%d, 1, %d, 0, %d)", info.ID, idx.ID, h.mu.ctx.Txn().StartTS()))
_, err = exec.Execute(context.Background(), fmt.Sprintf("insert into mysql.stats_histograms (table_id, is_index, hist_id, distinct_count, version) values(%d, 1, %d, 0, %d)", physicalID, idx.ID, h.mu.ctx.Txn().StartTS()))
if err != nil {
return
}
Expand All @@ -77,7 +99,7 @@ func (h *Handle) insertTableStats2KV(info *model.TableInfo) (err error) {

// insertColStats2KV insert a record to stats_histograms with distinct_count 1 and insert a bucket to stats_buckets with default value.
// This operation also updates version.
func (h *Handle) insertColStats2KV(tableID int64, colInfo *model.ColumnInfo) (err error) {
func (h *Handle) insertColStats2KV(physicalID int64, colInfo *model.ColumnInfo) (err error) {
h.mu.Lock()
defer h.mu.Unlock()
exec := h.mu.ctx.(sqlexec.SQLExecutor)
Expand All @@ -89,7 +111,7 @@ func (h *Handle) insertColStats2KV(tableID int64, colInfo *model.ColumnInfo) (er
err = finishTransaction(context.Background(), exec, err)
}()
// First of all, we update the version.
_, err = exec.Execute(context.Background(), fmt.Sprintf("update mysql.stats_meta set version = %d where table_id = %d ", h.mu.ctx.Txn().StartTS(), tableID))
_, err = exec.Execute(context.Background(), fmt.Sprintf("update mysql.stats_meta set version = %d where table_id = %d ", h.mu.ctx.Txn().StartTS(), physicalID))
if err != nil {
return
}
Expand All @@ -98,7 +120,7 @@ func (h *Handle) insertColStats2KV(tableID int64, colInfo *model.ColumnInfo) (er
if h.mu.ctx.GetSessionVars().StmtCtx.AffectedRows() > 0 {
// By this step we can get the count of this table, then we can sure the count and repeats of bucket.
var rs []ast.RecordSet
rs, err = exec.Execute(ctx, fmt.Sprintf("select count from mysql.stats_meta where table_id = %d", tableID))
rs, err = exec.Execute(ctx, fmt.Sprintf("select count from mysql.stats_meta where table_id = %d", physicalID))
if len(rs) > 0 {
defer terror.Call(rs[0].Close)
}
Expand All @@ -118,13 +140,13 @@ func (h *Handle) insertColStats2KV(tableID int64, colInfo *model.ColumnInfo) (er
}
if value.IsNull() {
// If the adding column has default value null, all the existing rows have null value on the newly added column.
_, err = exec.Execute(ctx, fmt.Sprintf("insert into mysql.stats_histograms (version, table_id, is_index, hist_id, distinct_count, null_count) values (%d, %d, 0, %d, 0, %d)", h.mu.ctx.Txn().StartTS(), tableID, colInfo.ID, count))
_, err = exec.Execute(ctx, fmt.Sprintf("insert into mysql.stats_histograms (version, table_id, is_index, hist_id, distinct_count, null_count) values (%d, %d, 0, %d, 0, %d)", h.mu.ctx.Txn().StartTS(), physicalID, colInfo.ID, count))
if err != nil {
return
}
} else {
// If this stats exists, we insert histogram meta first, the distinct_count will always be one.
_, err = exec.Execute(ctx, fmt.Sprintf("insert into mysql.stats_histograms (version, table_id, is_index, hist_id, distinct_count, tot_col_size) values (%d, %d, 0, %d, 1, %d)", h.mu.ctx.Txn().StartTS(), tableID, colInfo.ID, int64(len(value.GetBytes()))*count))
_, err = exec.Execute(ctx, fmt.Sprintf("insert into mysql.stats_histograms (version, table_id, is_index, hist_id, distinct_count, tot_col_size) values (%d, %d, 0, %d, 1, %d)", h.mu.ctx.Txn().StartTS(), physicalID, colInfo.ID, int64(len(value.GetBytes()))*count))
if err != nil {
return
}
Expand All @@ -133,7 +155,7 @@ func (h *Handle) insertColStats2KV(tableID int64, colInfo *model.ColumnInfo) (er
return
}
// There must be only one bucket for this new column and the value is the default value.
_, err = exec.Execute(ctx, fmt.Sprintf("insert into mysql.stats_buckets (table_id, is_index, hist_id, bucket_id, repeats, count, lower_bound, upper_bound) values (%d, 0, %d, 0, %d, %d, X'%X', X'%X')", tableID, colInfo.ID, count, count, value.GetBytes(), value.GetBytes()))
_, err = exec.Execute(ctx, fmt.Sprintf("insert into mysql.stats_buckets (table_id, is_index, hist_id, bucket_id, repeats, count, lower_bound, upper_bound) values (%d, 0, %d, 0, %d, %d, X'%X', X'%X')", physicalID, colInfo.ID, count, count, value.GetBytes(), value.GetBytes()))
if err != nil {
return
}
Expand Down
47 changes: 47 additions & 0 deletions statistics/ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,3 +171,50 @@ func (s *testStatsCacheSuite) TestDDLHistogram(c *C) {
rs = testKit.MustQuery("select count(*) from mysql.stats_buckets where table_id = ? and hist_id = 1 and is_index = 1", tableInfo.ID)
rs.Check(testkit.Rows("2"))
}

func (s *testStatsCacheSuite) TestDDLPartition(c *C) {
defer cleanEnv(c, s.store, s.do)
testKit := testkit.NewTestKit(c, s.store)
testKit.MustExec("set @@session.tidb_enable_table_partition=1")
testKit.MustExec("use test")
testKit.MustExec("drop table if exists t")
createTable := `CREATE TABLE t (a int, b int, primary key(a), index idx(b))
PARTITION BY RANGE ( a ) (
PARTITION p0 VALUES LESS THAN (6),
PARTITION p1 VALUES LESS THAN (11),
PARTITION p2 VALUES LESS THAN (16),
PARTITION p3 VALUES LESS THAN (21)
)`
testKit.MustExec(createTable)
do := s.do
is := do.InfoSchema()
tbl, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("t"))
c.Assert(err, IsNil)
tableInfo := tbl.Meta()
h := do.StatsHandle()
err = h.HandleDDLEvent(<-h.DDLEventCh())
c.Assert(err, IsNil)
h.Update(is)
pi := tableInfo.GetPartitionInfo()
for _, def := range pi.Definitions {
statsTbl := h.GetPartitionStats(tableInfo, def.ID)
c.Assert(statsTbl.Pseudo, IsFalse)
}

testKit.MustExec("insert into t values (1,2),(6,2),(11,2),(16,2)")
testKit.MustExec("analyze table t")
testKit.MustExec("alter table t add column c varchar(15) DEFAULT '123'")
err = h.HandleDDLEvent(<-h.DDLEventCh())
c.Assert(err, IsNil)
is = do.InfoSchema()
h.Update(is)
tbl, err = is.TableByName(model.NewCIStr("test"), model.NewCIStr("t"))
c.Assert(err, IsNil)
tableInfo = tbl.Meta()
pi = tableInfo.GetPartitionInfo()
for _, def := range pi.Definitions {
statsTbl := h.GetPartitionStats(tableInfo, def.ID)
c.Assert(statsTbl.Pseudo, IsFalse)
c.Check(statsTbl.Columns[tableInfo.Columns[2].ID].AvgColSize(statsTbl.Count), Equals, 3.0)
}
}

0 comments on commit 95edaf0

Please sign in to comment.