Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

schemastore: support create tables for partition table #351

Merged
merged 6 commits into from
Oct 8, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
support createt tables for partition table
  • Loading branch information
lidezhu committed Oct 8, 2024
commit 05e627c26caf828165ac1fd9f5db0b34a8b3bfd5
24 changes: 18 additions & 6 deletions logservice/schemastore/multi_version.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ func (v *versionedTableInfoStore) doApplyDDL(event *PersistedDDLEvent) {
assertNonEmpty(v.infos, event)
appendTableInfo()
case model.ActionTruncateTable:
if isPartitionTableEvent(event) {
if isPartitionTable(event.TableInfo) {
createTable := false
for _, partition := range getAllPartitionIDs(event.TableInfo) {
if v.tableID == partition {
Expand Down Expand Up @@ -324,12 +324,24 @@ func (v *versionedTableInfoStore) doApplyDDL(event *PersistedDDLEvent) {
case model.ActionCreateTables:
assertEmpty(v.infos, event)
for _, tableInfo := range event.MultipleTableInfos {
if v.tableID == tableInfo.ID {
info := common.WrapTableInfo(event.CurrentSchemaID, event.CurrentSchemaName, event.FinishedTs, tableInfo)
info.InitPreSQLs()
v.infos = append(v.infos, &tableInfoItem{version: uint64(event.FinishedTs), info: info})
break
if isPartitionTable(tableInfo) {
for _, partitionID := range getAllPartitionIDs(tableInfo) {
if v.tableID == partitionID {
info := common.WrapTableInfo(event.CurrentSchemaID, event.CurrentSchemaName, event.FinishedTs, tableInfo)
info.InitPreSQLs()
v.infos = append(v.infos, &tableInfoItem{version: uint64(event.FinishedTs), info: info})
break
}
}
} else {
if v.tableID == tableInfo.ID {
info := common.WrapTableInfo(event.CurrentSchemaID, event.CurrentSchemaName, event.FinishedTs, tableInfo)
info.InitPreSQLs()
v.infos = append(v.infos, &tableInfoItem{version: uint64(event.FinishedTs), info: info})
break
}
}

}
case model.ActionReorganizePartition:
physicalIDs := getAllPartitionIDs(event.TableInfo)
Expand Down
121 changes: 83 additions & 38 deletions logservice/schemastore/persist_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -712,7 +712,7 @@ func buildPersistedDDLEventFromJob(
event.CurrentTableID = event.TableInfo.ID
event.CurrentSchemaName = getSchemaName(event.CurrentSchemaID)
event.CurrentTableName = getTableName(event.PrevTableID)
if isPartitionTableEvent(&event) {
if isPartitionTable(event.TableInfo) {
for id := range partitionMap[event.PrevTableID] {
event.PrevPartitions = append(event.PrevPartitions, id)
}
Expand Down Expand Up @@ -818,9 +818,9 @@ func shouldSkipDDL(
return false
}

func isPartitionTableEvent(ddlEvent *PersistedDDLEvent) bool {
// ddlEvent.TableInfo may only be nil in unit test
return ddlEvent.TableInfo != nil && ddlEvent.TableInfo.Partition != nil
func isPartitionTable(tableInfo *model.TableInfo) bool {
// tableInfo may only be nil in unit test
return tableInfo != nil && tableInfo.Partition != nil
}

func getAllPartitionIDs(tableInfo *model.TableInfo) []int64 {
Expand Down Expand Up @@ -860,7 +860,7 @@ func updateDDLHistory(
tableTriggerDDLHistory = append(tableTriggerDDLHistory, ddlEvent.FinishedTs)
// Note: for create table, this ddl event will not be sent to table dispatchers.
// add it to ddl history is just for building table info store.
if isPartitionTableEvent(ddlEvent) {
if isPartitionTable(ddlEvent.TableInfo) {
// for partition table, we only care the ddl history of physical table ids.
appendPartitionsHistory(getAllPartitionIDs(ddlEvent.TableInfo))
} else {
Expand All @@ -872,13 +872,13 @@ func updateDDLHistory(
model.ActionDropIndex,
model.ActionAddForeignKey,
model.ActionDropForeignKey:
if isPartitionTableEvent(ddlEvent) {
if isPartitionTable(ddlEvent.TableInfo) {
appendPartitionsHistory(getAllPartitionIDs(ddlEvent.TableInfo))
} else {
appendTableHistory(ddlEvent.CurrentTableID)
}
case model.ActionTruncateTable:
if isPartitionTableEvent(ddlEvent) {
if isPartitionTable(ddlEvent.TableInfo) {
appendPartitionsHistory(getAllPartitionIDs(ddlEvent.TableInfo))
appendPartitionsHistory(ddlEvent.PrevPartitions)
} else {
Expand All @@ -887,14 +887,14 @@ func updateDDLHistory(
}
case model.ActionModifyColumn,
model.ActionRebaseAutoID:
if isPartitionTableEvent(ddlEvent) {
if isPartitionTable(ddlEvent.TableInfo) {
appendPartitionsHistory(getAllPartitionIDs(ddlEvent.TableInfo))
} else {
appendTableHistory(ddlEvent.CurrentTableID)
}
case model.ActionRenameTable:
tableTriggerDDLHistory = append(tableTriggerDDLHistory, ddlEvent.FinishedTs)
if isPartitionTableEvent(ddlEvent) {
if isPartitionTable(ddlEvent.TableInfo) {
appendPartitionsHistory(getAllPartitionIDs(ddlEvent.TableInfo))
} else {
appendTableHistory(ddlEvent.CurrentTableID)
Expand All @@ -903,7 +903,7 @@ func updateDDLHistory(
model.ActionShardRowID,
model.ActionModifyTableComment,
model.ActionRenameIndex:
if isPartitionTableEvent(ddlEvent) {
if isPartitionTable(ddlEvent.TableInfo) {
appendPartitionsHistory(getAllPartitionIDs(ddlEvent.TableInfo))
} else {
appendTableHistory(ddlEvent.CurrentTableID)
Expand Down Expand Up @@ -935,7 +935,12 @@ func updateDDLHistory(
tableTriggerDDLHistory = append(tableTriggerDDLHistory, ddlEvent.FinishedTs)
// it won't be send to table dispatchers, just for build version store
for _, info := range ddlEvent.MultipleTableInfos {
appendTableHistory(info.ID)
if isPartitionTable(info) {
// for partition table, we only care the ddl history of physical table ids.
appendPartitionsHistory(getAllPartitionIDs(info))
} else {
appendTableHistory(info.ID)
}
}
case model.ActionReorganizePartition:
appendPartitionsHistory(ddlEvent.PrevPartitions)
Expand Down Expand Up @@ -1012,16 +1017,16 @@ func updateDatabaseInfoAndTableInfo(
delete(databaseMap, event.CurrentSchemaID)
case model.ActionCreateTable:
createTable(event.CurrentSchemaID, event.CurrentTableID)
if isPartitionTableEvent(event) {
if isPartitionTable(event.TableInfo) {
partitionInfo := make(BasicPartitionInfo)
for _, partition := range event.TableInfo.Partition.Definitions {
partitionInfo[partition.ID] = nil
for _, id := range getAllPartitionIDs(event.TableInfo) {
partitionInfo[id] = nil
}
partitionMap[event.CurrentTableID] = partitionInfo
}
case model.ActionDropTable:
dropTable(event.CurrentSchemaID, event.CurrentTableID)
if isPartitionTableEvent(event) {
if isPartitionTable(event.TableInfo) {
delete(partitionMap, event.CurrentTableID)
}
case model.ActionAddColumn,
Expand All @@ -1034,11 +1039,11 @@ func updateDatabaseInfoAndTableInfo(
case model.ActionTruncateTable:
dropTable(event.CurrentSchemaID, event.PrevTableID)
createTable(event.CurrentSchemaID, event.CurrentTableID)
if isPartitionTableEvent(event) {
if isPartitionTable(event.TableInfo) {
delete(partitionMap, event.PrevTableID)
partitionInfo := make(BasicPartitionInfo)
for _, partition := range event.TableInfo.Partition.Definitions {
partitionInfo[partition.ID] = nil
for _, id := range getAllPartitionIDs(event.TableInfo) {
partitionInfo[id] = nil
}
partitionMap[event.CurrentTableID] = partitionInfo
}
Expand Down Expand Up @@ -1101,7 +1106,15 @@ func updateDatabaseInfoAndTableInfo(
SchemaID: event.CurrentSchemaID,
Name: info.Name.O,
}
if isPartitionTable(info) {
partitionInfo := make(BasicPartitionInfo)
for _, id := range getAllPartitionIDs(info) {
partitionInfo[id] = nil
}
partitionMap[info.ID] = partitionInfo
}
}

case model.ActionReorganizePartition:
physicalIDs := getAllPartitionIDs(event.TableInfo)
droppedIDs := getDroppedIDs(event.PrevPartitions, physicalIDs)
Expand All @@ -1126,7 +1139,7 @@ func updateRegisteredTableInfoStore(
tableInfoStoreMap map[int64]*versionedTableInfoStore,
) error {
tryApplyDDLToStore := func() {
if isPartitionTableEvent(event) {
if isPartitionTable(event.TableInfo) {
allPhysicalIDs := getAllPartitionIDs(event.TableInfo)
for _, id := range allPhysicalIDs {
if store, ok := tableInfoStoreMap[id]; ok {
Expand All @@ -1145,7 +1158,7 @@ func updateRegisteredTableInfoStore(
model.ActionDropSchema:
// ignore
case model.ActionCreateTable:
if isPartitionTableEvent(event) {
if isPartitionTable(event.TableInfo) {
allPhysicalIDs := getAllPartitionIDs(event.TableInfo)
for _, id := range allPhysicalIDs {
if _, ok := tableInfoStoreMap[event.CurrentTableID]; ok {
Expand All @@ -1169,7 +1182,7 @@ func updateRegisteredTableInfoStore(
model.ActionDropForeignKey:
// ignore
case model.ActionTruncateTable:
if isPartitionTableEvent(event) {
if isPartitionTable(event.TableInfo) {
for _, id := range event.PrevPartitions {
if store, ok := tableInfoStoreMap[id]; ok {
store.applyDDL(event)
Expand Down Expand Up @@ -1251,10 +1264,20 @@ func updateRegisteredTableInfoStore(
}
case model.ActionCreateTables:
for _, info := range event.MultipleTableInfos {
if _, ok := tableInfoStoreMap[info.ID]; ok {
log.Panic("newly created tables should not be registered",
zap.Int64("tableID", info.ID))
if isPartitionTable(info) {
for _, id := range getAllPartitionIDs(info) {
if _, ok := tableInfoStoreMap[id]; ok {
log.Panic("newly created tables should not be registered",
zap.Int64("tableID", id))
}
}
} else {
if _, ok := tableInfoStoreMap[info.ID]; ok {
log.Panic("newly created tables should not be registered",
zap.Int64("tableID", info.ID))
}
}

}
case model.ActionReorganizePartition:
physicalIDs := getAllPartitionIDs(event.TableInfo)
Expand Down Expand Up @@ -1323,7 +1346,7 @@ func buildDDLEvent(rawEvent *PersistedDDLEvent, tableFilter filter.Filter) commo
DropDatabaseName: rawEvent.CurrentSchemaName,
}
case model.ActionCreateTable:
if isPartitionTableEvent(rawEvent) {
if isPartitionTable(rawEvent.TableInfo) {
physicalIDs := getAllPartitionIDs(rawEvent.TableInfo)
ddlEvent.NeedAddedTables = make([]common.Table, 0, len(physicalIDs))
for _, id := range physicalIDs {
Expand All @@ -1349,7 +1372,7 @@ func buildDDLEvent(rawEvent *PersistedDDLEvent, tableFilter filter.Filter) commo
},
}
case model.ActionDropTable:
if isPartitionTableEvent(rawEvent) {
if isPartitionTable(rawEvent.TableInfo) {
allPhysicalTableIDs := getAllPartitionIDs(rawEvent.TableInfo)
allPhysicalTableIDsAndDDLSpanID := make([]int64, 0, len(rawEvent.TableInfo.Partition.Definitions)+1)
allPhysicalTableIDsAndDDLSpanID = append(allPhysicalTableIDsAndDDLSpanID, allPhysicalTableIDs...)
Expand Down Expand Up @@ -1388,7 +1411,7 @@ func buildDDLEvent(rawEvent *PersistedDDLEvent, tableFilter filter.Filter) commo
model.ActionDropForeignKey:
// ignore
case model.ActionTruncateTable:
if isPartitionTableEvent(rawEvent) {
if isPartitionTable(rawEvent.TableInfo) {
if len(rawEvent.PrevPartitions) > 1 {
// if more than one partitions, we need block them
ddlEvent.BlockedTables = &common.InfluencedTables{
Expand Down Expand Up @@ -1427,7 +1450,7 @@ func buildDDLEvent(rawEvent *PersistedDDLEvent, tableFilter filter.Filter) commo
case model.ActionRenameTable:
ignorePrevTable := tableFilter != nil && tableFilter.ShouldIgnoreTable(rawEvent.PrevSchemaName, rawEvent.PrevTableName)
ignoreCurrentTable := tableFilter != nil && tableFilter.ShouldIgnoreTable(rawEvent.CurrentSchemaName, rawEvent.CurrentTableName)
if isPartitionTableEvent(rawEvent) {
if isPartitionTable(rawEvent.TableInfo) {
allPhysicalIDs := getAllPartitionIDs(rawEvent.TableInfo)
if !ignorePrevTable {
allPhysicalIDsAndDDLSpanID := make([]int64, 0, len(allPhysicalIDs)+1)
Expand Down Expand Up @@ -1664,21 +1687,43 @@ func buildDDLEvent(rawEvent *PersistedDDLEvent, tableFilter filter.Filter) commo
log.Fatal("should not happen")
}
case model.ActionCreateTables:
ddlEvent.NeedAddedTables = make([]common.Table, 0, len(rawEvent.MultipleTableInfos))
addName := make([]common.SchemaTableName, 0, len(rawEvent.MultipleTableInfos))
physicalTableCount := 0
logicalTableCount := 0
for _, info := range rawEvent.MultipleTableInfos {
if tableFilter != nil && tableFilter.ShouldIgnoreTable(rawEvent.CurrentSchemaName, info.Name.O) {
continue
}
logicalTableCount += 1
if isPartitionTable(info) {
physicalTableCount += len(info.Partition.Definitions)
} else {
physicalTableCount += 1
}
}
querys := strings.Split(rawEvent.Query, ";")
resultQuerys := make([]string, 0, len(rawEvent.MultipleTableInfos))
for i := range rawEvent.MultipleTableInfos {
if tableFilter != nil && tableFilter.ShouldIgnoreTable(rawEvent.CurrentSchemaName, rawEvent.MultipleTableInfos[i].Name.O) {
ddlEvent.NeedAddedTables = make([]common.Table, 0, physicalTableCount)
addName := make([]common.SchemaTableName, 0, logicalTableCount)
resultQuerys := make([]string, 0, logicalTableCount)
for i, info := range rawEvent.MultipleTableInfos {
if tableFilter != nil && tableFilter.ShouldIgnoreTable(rawEvent.CurrentSchemaName, info.Name.O) {
continue
}
ddlEvent.NeedAddedTables = append(ddlEvent.NeedAddedTables, common.Table{
SchemaID: rawEvent.CurrentSchemaID,
TableID: rawEvent.MultipleTableInfos[i].ID,
})
if isPartitionTable(info) {
for _, partitionID := range getAllPartitionIDs(info) {
ddlEvent.NeedAddedTables = append(ddlEvent.NeedAddedTables, common.Table{
SchemaID: rawEvent.CurrentSchemaID,
TableID: partitionID,
})
}
} else {
ddlEvent.NeedAddedTables = append(ddlEvent.NeedAddedTables, common.Table{
SchemaID: rawEvent.CurrentSchemaID,
TableID: info.ID,
})
}
addName = append(addName, common.SchemaTableName{
SchemaName: rawEvent.CurrentSchemaName,
TableName: rawEvent.MultipleTableInfos[i].Name.O,
TableName: info.Name.O,
})
resultQuerys = append(resultQuerys, querys[i])
}
Expand Down
Loading
Loading