From 61efa5f2b377ffcd3f35eb5f61cae9648e5023d0 Mon Sep 17 00:00:00 2001 From: dongmen <20351731+asddongmen@users.noreply.github.com> Date: Fri, 17 May 2024 10:20:43 +0800 Subject: [PATCH] snapshot (ticdc): reduce list tables time consumption (#11095) close pingcap/tiflow#11124 --- cdc/entry/schema/snapshot.go | 37 +++++++++++++++++++++--------------- 1 file changed, 22 insertions(+), 15 deletions(-) diff --git a/cdc/entry/schema/snapshot.go b/cdc/entry/schema/snapshot.go index daf9e3078cb..e01821ef551 100644 --- a/cdc/entry/schema/snapshot.go +++ b/cdc/entry/schema/snapshot.go @@ -18,6 +18,7 @@ import ( "math" "strings" "sync" + "time" "github.com/google/btree" "github.com/pingcap/errors" @@ -146,6 +147,7 @@ func NewSnapshotFromMeta( forceReplicate bool, filter filter.Filter, ) (*Snapshot, error) { + start := time.Now() snap := NewEmptySnapshot(forceReplicate) dbinfos, err := meta.ListDatabases() if err != nil { @@ -153,8 +155,6 @@ func NewSnapshotFromMeta( } // `tag` is used to reverse sort all versions in the generated snapshot. tag := negative(currentTs) - // record all tables to be replicated for logging use - tables := make([]*model.TableInfo, 0, 1024) for _, dbinfo := range dbinfos { if filter.ShouldIgnoreSchema(dbinfo.Name.O) { log.Debug("ignore database", zap.String("db", dbinfo.Name.O)) @@ -167,16 +167,30 @@ func NewSnapshotFromMeta( vname := newVersionedEntityName(-1, dbinfo.Name.O, tag) // -1 means the entity is a schema. vname.target = dbinfo.ID snap.inner.schemaNameToID.ReplaceOrInsert(vname) - - tableInfos, err := meta.ListTables(dbinfo.ID) + // get all tables Name + tableNames, err := meta.ListSimpleTables(dbinfo.ID) if err != nil { return nil, cerror.WrapError(cerror.ErrMetaListDatabases, err) } - for _, tableInfo := range tableInfos { - if filter.ShouldIgnoreTable(dbinfo.Name.O, tableInfo.Name.O) { - log.Debug("ignore table", zap.String("table", tableInfo.Name.O)) + tableNeeded := make([]*timodel.TableNameInfo, 0, len(tableNames)) + // filter tables + for _, table := range tableNames { + if filter.ShouldIgnoreTable(dbinfo.Name.O, table.Name.O) { + log.Debug("ignore table", zap.String("table", table.Name.O)) continue } + tableNeeded = append(tableNeeded, table) + } + tableInfos := make([]*timodel.TableInfo, 0, len(tableNeeded)) + for _, table := range tableNeeded { + tableInfo, err := meta.GetTable(dbinfo.ID, table.ID) + if err != nil { + return nil, errors.Trace(err) + } + tableInfos = append(tableInfos, tableInfo) + } + + for _, tableInfo := range tableInfos { tableInfo := model.WrapTableInfo(dbinfo.ID, dbinfo.Name.O, currentTs, tableInfo) snap.inner.tables.ReplaceOrInsert(versionedID{ id: tableInfo.ID, @@ -193,8 +207,6 @@ func NewSnapshotFromMeta( ineligible := !tableInfo.IsEligible(forceReplicate) if ineligible { snap.inner.ineligibleTables.ReplaceOrInsert(versionedID{id: tableInfo.ID, tag: tag}) - } else { - tables = append(tables, tableInfo) } if pi := tableInfo.GetPartitionInfo(); pi != nil { for _, partition := range pi.Definitions { @@ -209,15 +221,10 @@ func NewSnapshotFromMeta( } } snap.inner.currentTs = currentTs - var sb strings.Builder - sb.WriteString(fmt.Sprintf("%d tables to be replicated: ", len(tables))) - for _, table := range tables { - sb.WriteString(fmt.Sprintf("%s.%s, ", table.TableName.Schema, table.TableName.Table)) - } log.Info("schema snapshot created", zap.Stringer("changefeed", id), zap.Uint64("currentTs", currentTs), - zap.String("tables", sb.String())) + zap.Any("duration", time.Since(start).Seconds())) return snap, nil }