Skip to content

Commit

Permalink
*: support partition table in tiflash (pingcap#14735)
Browse files Browse the repository at this point in the history
  • Loading branch information
crazycs520 authored Feb 13, 2020
1 parent 7cd8ba3 commit 9ffea35
Show file tree
Hide file tree
Showing 10 changed files with 206 additions and 43 deletions.
64 changes: 63 additions & 1 deletion ddl/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3718,11 +3718,73 @@ func (s *testDBSuite1) TestSetTableFlashReplica(c *C) {
t = s.testGetTable(c, "t_flash")
c.Assert(t.Meta().TiFlashReplica, NotNil)
c.Assert(t.Meta().TiFlashReplica.Count, Equals, uint64(2))
c.Assert(strings.Join(t.Meta().TiFlashReplica.LocationLabels, ","), Equals, strings.Join([]string{"a", "b"}, ","))
c.Assert(strings.Join(t.Meta().TiFlashReplica.LocationLabels, ","), Equals, "a,b")

s.tk.MustExec("alter table t_flash set tiflash replica 0")
t = s.testGetTable(c, "t_flash")
c.Assert(t.Meta().TiFlashReplica, IsNil)

// Test set tiflash replica for partition table.
s.mustExec(c, "drop table if exists t_flash;")
s.tk.MustExec("create table t_flash(a int, b int) partition by hash(a) partitions 3")
s.tk.MustExec("alter table t_flash set tiflash replica 2 location labels 'a','b';")
t = s.testGetTable(c, "t_flash")
c.Assert(t.Meta().TiFlashReplica, NotNil)
c.Assert(t.Meta().TiFlashReplica.Count, Equals, uint64(2))
c.Assert(strings.Join(t.Meta().TiFlashReplica.LocationLabels, ","), Equals, "a,b")

// Use table ID as physical ID, mock for partition feature was not enabled.
err := domain.GetDomain(s.tk.Se).DDL().UpdateTableReplicaInfo(s.tk.Se, t.Meta().ID, true)
c.Assert(err, IsNil)
t = s.testGetTable(c, "t_flash")
c.Assert(t.Meta().TiFlashReplica, NotNil)
c.Assert(t.Meta().TiFlashReplica.Available, Equals, true)
c.Assert(len(t.Meta().TiFlashReplica.AvailablePartitionIDs), Equals, 0)

err = domain.GetDomain(s.tk.Se).DDL().UpdateTableReplicaInfo(s.tk.Se, t.Meta().ID, false)
c.Assert(err, IsNil)
t = s.testGetTable(c, "t_flash")
c.Assert(t.Meta().TiFlashReplica.Available, Equals, false)

// Mock for partition 0 replica was available.
partition := t.Meta().Partition
c.Assert(len(partition.Definitions), Equals, 3)
err = domain.GetDomain(s.tk.Se).DDL().UpdateTableReplicaInfo(s.tk.Se, partition.Definitions[0].ID, true)
c.Assert(err, IsNil)
t = s.testGetTable(c, "t_flash")
c.Assert(t.Meta().TiFlashReplica.Available, Equals, false)
c.Assert(t.Meta().TiFlashReplica.AvailablePartitionIDs, DeepEquals, []int64{partition.Definitions[0].ID})

// Mock for partition 1,2 replica was available.
err = domain.GetDomain(s.tk.Se).DDL().UpdateTableReplicaInfo(s.tk.Se, partition.Definitions[1].ID, true)
c.Assert(err, IsNil)
err = domain.GetDomain(s.tk.Se).DDL().UpdateTableReplicaInfo(s.tk.Se, partition.Definitions[2].ID, true)
c.Assert(err, IsNil)
t = s.testGetTable(c, "t_flash")
c.Assert(t.Meta().TiFlashReplica.Available, Equals, true)
c.Assert(t.Meta().TiFlashReplica.AvailablePartitionIDs, DeepEquals, []int64{partition.Definitions[0].ID, partition.Definitions[1].ID, partition.Definitions[2].ID})

// Mock for partition 1 replica was unavailable.
err = domain.GetDomain(s.tk.Se).DDL().UpdateTableReplicaInfo(s.tk.Se, partition.Definitions[1].ID, false)
c.Assert(err, IsNil)
t = s.testGetTable(c, "t_flash")
c.Assert(t.Meta().TiFlashReplica.Available, Equals, false)
c.Assert(t.Meta().TiFlashReplica.AvailablePartitionIDs, DeepEquals, []int64{partition.Definitions[0].ID, partition.Definitions[2].ID})

// Test for update table replica with unknown table ID.
err = domain.GetDomain(s.tk.Se).DDL().UpdateTableReplicaInfo(s.tk.Se, math.MaxInt64, false)
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, "[schema:1146]Table which ID = 9223372036854775807 does not exist.")

// Test for FindTableByPartitionID.
is := domain.GetDomain(s.tk.Se).InfoSchema()
t, dbInfo := is.FindTableByPartitionID(partition.Definitions[0].ID)
c.Assert(t, NotNil)
c.Assert(dbInfo, NotNil)
c.Assert(t.Meta().Name.L, Equals, "t_flash")
t, dbInfo = is.FindTableByPartitionID(t.Meta().ID)
c.Assert(t, IsNil)
c.Assert(dbInfo, IsNil)
}

func (s *testSerialDBSuite) TestAlterShardRowIDBits(c *C) {
Expand Down
2 changes: 1 addition & 1 deletion ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ type DDL interface {
LockTables(ctx sessionctx.Context, stmt *ast.LockTablesStmt) error
UnlockTables(ctx sessionctx.Context, lockedTables []model.TableLockTpInfo) error
CleanupTableLock(ctx sessionctx.Context, tables []*ast.TableName) error
UpdateTableReplicaInfo(ctx sessionctx.Context, tid int64, available bool) error
UpdateTableReplicaInfo(ctx sessionctx.Context, physicalID int64, available bool) error
RepairTable(ctx sessionctx.Context, table *ast.TableName, createStmt *ast.CreateTableStmt) error
CreateSequence(ctx sessionctx.Context, stmt *ast.CreateSequenceStmt) error
DropSequence(ctx sessionctx.Context, tableIdent ast.Ident, ifExists bool) (err error)
Expand Down
18 changes: 11 additions & 7 deletions ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -3201,18 +3201,22 @@ func (d *ddl) AlterTableSetTiFlashReplica(ctx sessionctx.Context, ident ast.Iden
}

// UpdateTableReplicaInfo updates the table flash replica infos.
func (d *ddl) UpdateTableReplicaInfo(ctx sessionctx.Context, tid int64, available bool) error {
func (d *ddl) UpdateTableReplicaInfo(ctx sessionctx.Context, physicalID int64, available bool) error {
is := d.infoHandle.Get()
tb, ok := is.TableByID(tid)
tb, ok := is.TableByID(physicalID)
if !ok {
return infoschema.ErrTableNotExists.GenWithStack("Table which ID = %d does not exist.", tid)
tb, _ = is.FindTableByPartitionID(physicalID)
if tb == nil {
return infoschema.ErrTableNotExists.GenWithStack("Table which ID = %d does not exist.", physicalID)
}
}

if tb.Meta().TiFlashReplica == nil || (tb.Meta().TiFlashReplica.Available == available) {
tbInfo := tb.Meta()
if tbInfo.TiFlashReplica == nil || (tbInfo.TiFlashReplica.Available == available) ||
(tbInfo.ID != physicalID && available == tbInfo.TiFlashReplica.IsPartitionAvailable(physicalID)) {
return nil
}

db, ok := is.SchemaByTable(tb.Meta())
db, ok := is.SchemaByTable(tbInfo)
if !ok {
return infoschema.ErrDatabaseNotExists.GenWithStack("Database of table `%s` does not exist.", tb.Meta().Name)
}
Expand All @@ -3223,7 +3227,7 @@ func (d *ddl) UpdateTableReplicaInfo(ctx sessionctx.Context, tid int64, availabl
SchemaName: db.Name.L,
Type: model.ActionUpdateTiFlashReplicaStatus,
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{available},
Args: []interface{}{available, physicalID},
}
err := d.doDDLJob(ctx, job)
err = d.callHookOnChanged(err)
Expand Down
34 changes: 31 additions & 3 deletions ddl/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -729,7 +729,8 @@ func onSetTableFlashReplica(t *meta.Meta, job *model.Job) (ver int64, _ error) {

func onUpdateFlashReplicaStatus(t *meta.Meta, job *model.Job) (ver int64, _ error) {
var available bool
if err := job.DecodeArgs(&available); err != nil {
var physicalID int64
if err := job.DecodeArgs(&available, &physicalID); err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
Expand All @@ -738,12 +739,39 @@ func onUpdateFlashReplicaStatus(t *meta.Meta, job *model.Job) (ver int64, _ erro
if err != nil {
return ver, errors.Trace(err)
}
if tblInfo.TiFlashReplica == nil || (tblInfo.TiFlashReplica.Available == available) {
if tblInfo.TiFlashReplica == nil || (tblInfo.TiFlashReplica.Available == available) ||
(tblInfo.ID != physicalID && available == tblInfo.TiFlashReplica.IsPartitionAvailable(physicalID)) {
return ver, nil
}

if tblInfo.TiFlashReplica != nil {
if tblInfo.ID == physicalID {
tblInfo.TiFlashReplica.Available = available
} else if pi := tblInfo.GetPartitionInfo(); pi != nil {
// Partition replica become available.
if available {
allAvailable := true
for _, p := range pi.Definitions {
if p.ID == physicalID {
tblInfo.TiFlashReplica.AvailablePartitionIDs = append(tblInfo.TiFlashReplica.AvailablePartitionIDs, physicalID)
}
allAvailable = allAvailable && tblInfo.TiFlashReplica.IsPartitionAvailable(p.ID)
}
tblInfo.TiFlashReplica.Available = allAvailable
} else {
// Partition replica become unavailable.
for i, id := range tblInfo.TiFlashReplica.AvailablePartitionIDs {
if id == physicalID {
newIDs := tblInfo.TiFlashReplica.AvailablePartitionIDs[:i]
newIDs = append(newIDs, tblInfo.TiFlashReplica.AvailablePartitionIDs[i+1:]...)
tblInfo.TiFlashReplica.AvailablePartitionIDs = newIDs
tblInfo.TiFlashReplica.Available = false
break
}
}
}
} else {
job.State = model.JobStateCancelled
return ver, errors.Errorf("unknown physical ID %v in table %v", physicalID, tblInfo.Name.O)
}

ver, err = updateVersionAndTableInfo(t, job, tblInfo, true)
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ require (
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989
github.com/pingcap/kvproto v0.0.0-20200210234432-a965739f8162
github.com/pingcap/log v0.0.0-20200117041106-d28c14d3b1cd
github.com/pingcap/parser v0.0.0-20200207090844-d65f5147dd9f
github.com/pingcap/parser v0.0.0-20200212063918-0829643f461c
github.com/pingcap/pd v1.1.0-beta.0.20200106144140-f5a7aa985497
github.com/pingcap/sysutil v0.0.0-20200206130906-2bfa6dc40bcd
github.com/pingcap/tidb-tools v3.0.6-0.20191106033616-90632dda3863+incompatible
Expand Down
5 changes: 3 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -215,8 +215,9 @@ github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9 h1:AJD9pZYm72vMgPcQDww
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
github.com/pingcap/log v0.0.0-20200117041106-d28c14d3b1cd h1:CV3VsP3Z02MVtdpTMfEgRJ4T9NGgGTxdHpJerent7rM=
github.com/pingcap/log v0.0.0-20200117041106-d28c14d3b1cd/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
github.com/pingcap/parser v0.0.0-20200207090844-d65f5147dd9f h1:uUrZ94J2/tsmCXHjF7pItG2tMqwP4P4vMojAbI8NMRY=
github.com/pingcap/parser v0.0.0-20200207090844-d65f5147dd9f/go.mod h1:9v0Edh8IbgjGYW2ArJr19E+bvL8zKahsFp+ixWeId+4=
github.com/pingcap/parser v0.0.0-20200212063918-0829643f461c h1:QbFj6Ng/PvHeQNN7aPWpulXIzoo+j/J8odEM7ERUt7g=
github.com/pingcap/parser v0.0.0-20200212063918-0829643f461c/go.mod h1:9v0Edh8IbgjGYW2ArJr19E+bvL8zKahsFp+ixWeId+4=

github.com/pingcap/pd v1.1.0-beta.0.20200106144140-f5a7aa985497 h1:FzLErYtcXnSxtC469OuVDlgBbh0trJZzNxw0mNKzyls=
github.com/pingcap/pd v1.1.0-beta.0.20200106144140-f5a7aa985497/go.mod h1:cfT/xu4Zz+Tkq95QrLgEBZ9ikRcgzy4alHqqoaTftqI=
github.com/pingcap/sysutil v0.0.0-20191216090214-5f9620d22b3b h1:EEyo/SCRswLGuSk+7SB86Ak1p8bS6HL1Mi4Dhyuv6zg=
Expand Down
20 changes: 20 additions & 0 deletions infoschema/infoschema.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ type InfoSchema interface {
SchemaMetaVersion() int64
// TableIsView indicates whether the schema.table is a view.
TableIsView(schema, table model.CIStr) bool
FindTableByPartitionID(partitionID int64) (table.Table, *model.DBInfo)
}

type sortedTables []table.Table
Expand Down Expand Up @@ -278,6 +279,25 @@ func (is *infoSchema) SchemaTables(schema model.CIStr) (tables []table.Table) {
return
}

// FindTableByPartitionID finds the partition-table info by the partitionID.
// FindTableByPartitionID will traverse all the tables to find the partitionID partition in which partition-table.
func (is *infoSchema) FindTableByPartitionID(partitionID int64) (table.Table, *model.DBInfo) {
for _, v := range is.schemaMap {
for _, tbl := range v.tables {
pi := tbl.Meta().GetPartitionInfo()
if pi == nil {
continue
}
for _, p := range pi.Definitions {
if p.ID == partitionID {
return tbl, v.dbInfo
}
}
}
}
return nil, nil
}

func (is *infoSchema) Clone() (result []*model.DBInfo) {
for _, v := range is.schemaMap {
result = append(result, v.dbInfo.Clone())
Expand Down
7 changes: 5 additions & 2 deletions infoschema/tables.go
Original file line number Diff line number Diff line change
Expand Up @@ -2329,8 +2329,11 @@ func dataForTableTiFlashReplica(ctx sessionctx.Context, schemas []*model.DBInfo)
if pi := tbl.GetPartitionInfo(); pi != nil && len(pi.Definitions) > 0 {
progress = 0
for _, p := range pi.Definitions {
// TODO: need check partition replica available.
progress += progressMap[p.ID]
if tbl.TiFlashReplica.IsPartitionAvailable(p.ID) {
progress += 1
} else {
progress += progressMap[p.ID]
}
}
progress = progress / float64(len(pi.Definitions))
} else {
Expand Down
41 changes: 15 additions & 26 deletions server/http_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -739,6 +739,17 @@ func (h flashReplicaHandler) ServeHTTP(w http.ResponseWriter, req *http.Request)
if tblInfo.TiFlashReplica == nil {
continue
}
if pi := tblInfo.GetPartitionInfo(); pi != nil {
for _, p := range pi.Definitions {
replicaInfos = append(replicaInfos, &tableFlashReplicaInfo{
ID: p.ID,
ReplicaCount: tblInfo.TiFlashReplica.Count,
LocationLabels: tblInfo.TiFlashReplica.LocationLabels,
Available: tblInfo.TiFlashReplica.IsPartitionAvailable(p.ID),
})
}
continue
}
replicaInfos = append(replicaInfos, &tableFlashReplicaInfo{
ID: tblInfo.ID,
ReplicaCount: tblInfo.TiFlashReplica.Count,
Expand Down Expand Up @@ -1665,34 +1676,12 @@ func (h dbTableHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
return
}
// The physicalID maybe a partition ID of the partition-table.
dbTblInfo.TableInfo, dbTblInfo.DBInfo = findTableByPartitionID(schema, int64(physicalID))
if dbTblInfo.TableInfo == nil {
tbl, dbInfo := schema.FindTableByPartitionID(int64(physicalID))
if tbl == nil {
writeError(w, infoschema.ErrTableNotExists.GenWithStack("Table which ID = %s does not exist.", tableID))
return
}
dbTblInfo.TableInfo = tbl.Meta()
dbTblInfo.DBInfo = dbInfo
writeData(w, dbTblInfo)
}

// findTableByPartitionID finds the partition-table info by the partitionID.
// This function will traverse all the tables to find the partitionID partition in which partition-table.
func findTableByPartitionID(schema infoschema.InfoSchema, partitionID int64) (*model.TableInfo, *model.DBInfo) {
allDBs := schema.AllSchemas()
for _, db := range allDBs {
allTables := schema.SchemaTables(db.Name)
for _, tbl := range allTables {
if tbl.Meta().ID > partitionID || tbl.Meta().GetPartitionInfo() == nil {
continue
}
info := tbl.Meta().GetPartitionInfo()
tb := tbl.(table.PartitionedTable)
for _, def := range info.Definitions {
pid := def.ID
partition := tb.GetPartition(pid)
if partition.GetPhysicalID() == partitionID {
return tbl.Meta(), db
}
}
}
}
return nil, nil
}
56 changes: 56 additions & 0 deletions server/http_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -553,6 +553,7 @@ func (ts *HTTPHandlerTestSuite) TestTiFlashReplica(c *C) {
decoder = json.NewDecoder(resp.Body)
err = decoder.Decode(&data)
c.Assert(err, IsNil)
resp.Body.Close()
c.Assert(len(data), Equals, 1)
c.Assert(data[0].ReplicaCount, Equals, uint64(2))
c.Assert(strings.Join(data[0].LocationLabels, ","), Equals, "a,b")
Expand All @@ -565,10 +566,65 @@ func (ts *HTTPHandlerTestSuite) TestTiFlashReplica(c *C) {
decoder = json.NewDecoder(resp.Body)
err = decoder.Decode(&data)
c.Assert(err, IsNil)
resp.Body.Close()
c.Assert(len(data), Equals, 1)
c.Assert(data[0].ReplicaCount, Equals, uint64(2))
c.Assert(strings.Join(data[0].LocationLabels, ","), Equals, "a,b")
c.Assert(data[0].Available, Equals, true) // The status should be true now.

// Test for partition table.
dbt.mustExec("alter table pt set tiflash replica 2 location labels 'a','b';")
dbt.mustExec("alter table test set tiflash replica 0;")
resp, err = ts.fetchStatus("/tiflash/replica")
c.Assert(err, IsNil)
decoder = json.NewDecoder(resp.Body)
err = decoder.Decode(&data)
c.Assert(err, IsNil)
resp.Body.Close()
c.Assert(len(data), Equals, 3)
c.Assert(data[0].ReplicaCount, Equals, uint64(2))
c.Assert(strings.Join(data[0].LocationLabels, ","), Equals, "a,b")
c.Assert(data[0].Available, Equals, false)

pid0 := data[0].ID
pid1 := data[1].ID
pid2 := data[2].ID

// Mock for partition 1 replica was available.
req = fmt.Sprintf(`{"id":%d,"region_count":3,"flash_region_count":3}`, pid1)
resp, err = ts.postStatus("/tiflash/replica", "application/json", bytes.NewBuffer([]byte(req)))
c.Assert(err, IsNil)
resp.Body.Close()
resp, err = ts.fetchStatus("/tiflash/replica")
c.Assert(err, IsNil)
decoder = json.NewDecoder(resp.Body)
err = decoder.Decode(&data)
c.Assert(err, IsNil)
resp.Body.Close()
c.Assert(len(data), Equals, 3)
c.Assert(data[0].Available, Equals, false)
c.Assert(data[1].Available, Equals, true)
c.Assert(data[2].Available, Equals, false)

// Mock for partition 0,2 replica was available.
req = fmt.Sprintf(`{"id":%d,"region_count":3,"flash_region_count":3}`, pid0)
resp, err = ts.postStatus("/tiflash/replica", "application/json", bytes.NewBuffer([]byte(req)))
c.Assert(err, IsNil)
resp.Body.Close()
req = fmt.Sprintf(`{"id":%d,"region_count":3,"flash_region_count":3}`, pid2)
resp, err = ts.postStatus("/tiflash/replica", "application/json", bytes.NewBuffer([]byte(req)))
c.Assert(err, IsNil)
resp.Body.Close()
resp, err = ts.fetchStatus("/tiflash/replica")
c.Assert(err, IsNil)
decoder = json.NewDecoder(resp.Body)
err = decoder.Decode(&data)
c.Assert(err, IsNil)
resp.Body.Close()
c.Assert(len(data), Equals, 3)
c.Assert(data[0].Available, Equals, true)
c.Assert(data[1].Available, Equals, true)
c.Assert(data[2].Available, Equals, true)
}

func (ts *HTTPHandlerTestSuite) TestDecodeColumnValue(c *C) {
Expand Down

0 comments on commit 9ffea35

Please sign in to comment.