Skip to content

Commit

Permalink
snapshot (ticdc): reduce list tables time consumption (#11095)
Browse files Browse the repository at this point in the history
close #11124
  • Loading branch information
asddongmen authored May 17, 2024
1 parent 2d8cc97 commit 61efa5f
Showing 1 changed file with 22 additions and 15 deletions.
37 changes: 22 additions & 15 deletions cdc/entry/schema/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"math"
"strings"
"sync"
"time"

"github.com/google/btree"
"github.com/pingcap/errors"
Expand Down Expand Up @@ -146,15 +147,14 @@ func NewSnapshotFromMeta(
forceReplicate bool,
filter filter.Filter,
) (*Snapshot, error) {
start := time.Now()
snap := NewEmptySnapshot(forceReplicate)
dbinfos, err := meta.ListDatabases()
if err != nil {
return nil, cerror.WrapError(cerror.ErrMetaListDatabases, err)
}
// `tag` is used to reverse sort all versions in the generated snapshot.
tag := negative(currentTs)
// record all tables to be replicated for logging use
tables := make([]*model.TableInfo, 0, 1024)
for _, dbinfo := range dbinfos {
if filter.ShouldIgnoreSchema(dbinfo.Name.O) {
log.Debug("ignore database", zap.String("db", dbinfo.Name.O))
Expand All @@ -167,16 +167,30 @@ func NewSnapshotFromMeta(
vname := newVersionedEntityName(-1, dbinfo.Name.O, tag) // -1 means the entity is a schema.
vname.target = dbinfo.ID
snap.inner.schemaNameToID.ReplaceOrInsert(vname)

tableInfos, err := meta.ListTables(dbinfo.ID)
// get all tables Name
tableNames, err := meta.ListSimpleTables(dbinfo.ID)
if err != nil {
return nil, cerror.WrapError(cerror.ErrMetaListDatabases, err)
}
for _, tableInfo := range tableInfos {
if filter.ShouldIgnoreTable(dbinfo.Name.O, tableInfo.Name.O) {
log.Debug("ignore table", zap.String("table", tableInfo.Name.O))
tableNeeded := make([]*timodel.TableNameInfo, 0, len(tableNames))
// filter tables
for _, table := range tableNames {
if filter.ShouldIgnoreTable(dbinfo.Name.O, table.Name.O) {
log.Debug("ignore table", zap.String("table", table.Name.O))
continue
}
tableNeeded = append(tableNeeded, table)
}
tableInfos := make([]*timodel.TableInfo, 0, len(tableNeeded))
for _, table := range tableNeeded {
tableInfo, err := meta.GetTable(dbinfo.ID, table.ID)
if err != nil {
return nil, errors.Trace(err)
}
tableInfos = append(tableInfos, tableInfo)
}

for _, tableInfo := range tableInfos {
tableInfo := model.WrapTableInfo(dbinfo.ID, dbinfo.Name.O, currentTs, tableInfo)
snap.inner.tables.ReplaceOrInsert(versionedID{
id: tableInfo.ID,
Expand All @@ -193,8 +207,6 @@ func NewSnapshotFromMeta(
ineligible := !tableInfo.IsEligible(forceReplicate)
if ineligible {
snap.inner.ineligibleTables.ReplaceOrInsert(versionedID{id: tableInfo.ID, tag: tag})
} else {
tables = append(tables, tableInfo)
}
if pi := tableInfo.GetPartitionInfo(); pi != nil {
for _, partition := range pi.Definitions {
Expand All @@ -209,15 +221,10 @@ func NewSnapshotFromMeta(
}
}
snap.inner.currentTs = currentTs
var sb strings.Builder
sb.WriteString(fmt.Sprintf("%d tables to be replicated: ", len(tables)))
for _, table := range tables {
sb.WriteString(fmt.Sprintf("%s.%s, ", table.TableName.Schema, table.TableName.Table))
}
log.Info("schema snapshot created",
zap.Stringer("changefeed", id),
zap.Uint64("currentTs", currentTs),
zap.String("tables", sb.String()))
zap.Any("duration", time.Since(start).Seconds()))
return snap, nil
}

Expand Down

0 comments on commit 61efa5f

Please sign in to comment.