Skip to content
This repository has been archived by the owner on Aug 21, 2023. It is now read-only.

Commit

Permalink
dump: always split TiDB v3.* tables through tidb rowid to save TiDB's…
Browse files Browse the repository at this point in the history
… memory (#301) (#306)
  • Loading branch information
ti-chi-bot authored Jul 15, 2021
1 parent 4ad1c86 commit d28f469
Show file tree
Hide file tree
Showing 9 changed files with 1,663 additions and 42 deletions.
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ require (
github.com/pingcap/errors v0.11.5-0.20201126102027-b0a155152ca3
github.com/pingcap/failpoint v0.0.0-20210316064728-7acb0f0a3dfd
github.com/pingcap/log v0.0.0-20210317133921-96f4fcab92a4
github.com/pingcap/tidb v1.1.0-beta.0.20210715025933-fb96fe79e72a // indirect
github.com/pingcap/parser v0.0.0-20210421190550-451a84cf120a
github.com/pingcap/tidb v1.1.0-beta.0.20210715025933-fb96fe79e72a
github.com/pingcap/tidb-tools v4.0.9-0.20201127090955-2707c97b3853+incompatible
github.com/prometheus/client_golang v1.5.1
github.com/prometheus/client_model v0.2.0
Expand Down
7 changes: 0 additions & 7 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,7 @@ github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd/go.mod h1:sE
github.com/codahale/hdrhistogram v0.9.0 h1:9GjrtRI+mLEFPtTfR/AZhcxp+Ii8NZYWq5104FbZQY0=
github.com/codahale/hdrhistogram v0.9.0/go.mod h1:sE/e/2PUdi/liOCUjSTXgM1o87ZssimdTWN964YiIeI=
github.com/colinmarc/hdfs/v2 v2.1.1/go.mod h1:M3x+k8UKKmxtFu++uAZ0OtDU8jR3jnaZIAc6yK4Ue0c=
github.com/coreos/bbolt v1.3.2 h1:wZwiHHUieZCquLkDL0B8UhzreNWsPHooDAG3q34zk0s=
github.com/coreos/bbolt v1.3.2/go.mod h1:iRUV2dpdMOn7Bo10OQBFzIJO9kkE559Wcmn+qkEiiKk=
github.com/coreos/etcd v3.3.10+incompatible h1:jFneRYjIvLMLhDLCzuTuU4rSJUjRplcJQ7pD7MnhC04=
github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE=
github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk=
github.com/coreos/go-semver v0.3.0 h1:wkHLiw0WNATZnSG7epLsujiMCgPAc9xhjJ4tgnAxmfM=
Expand Down Expand Up @@ -152,7 +150,6 @@ github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMo
github.com/fsouza/fake-gcs-server v1.17.0/go.mod h1:D1rTE4YCyHFNa99oyJJ5HyclvN/0uQR+pM/VdlL83bw=
github.com/fsouza/fake-gcs-server v1.19.0 h1:XyaGOlqo+R5sjT03x2ymk0xepaQlgwhRLTT2IopW0zA=
github.com/fsouza/fake-gcs-server v1.19.0/go.mod h1:JtXHY/QzHhtyIxsNfIuQ+XgHtRb5B/w8nqbL5O8zqo0=
github.com/ghodss/yaml v1.0.0 h1:wQHKEahhL6wmXdzwWG11gIVCkOv05bNOh+Rxn0yngAk=
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
github.com/gin-contrib/gzip v0.0.1/go.mod h1:fGBJBCdt6qCZuCAOwWuFhBB4OOq9EFqlo5dEaFhhu5w=
github.com/gin-contrib/sse v0.0.0-20170109093832-22d885f9ecc7/go.mod h1:VJ0WA2NBN22VlZ2dKZQPAPnyWw5XTlK1KymzLKsr59s=
Expand Down Expand Up @@ -385,11 +382,9 @@ github.com/oleiade/reflections v1.0.0/go.mod h1:RbATFBbKYkVdqmSFtx13Bb/tVhR0lgOB
github.com/olekukonko/tablewriter v0.0.0-20170122224234-a0225b3f23b5/go.mod h1:vsDQFd/mU46D+Z4whnwzcISnGGzXWMclvtLoiIKAKIo=
github.com/olekukonko/tablewriter v0.0.4/go.mod h1:zq6QwlOf5SlnkVbMSr5EoBv3636FWnp+qbPhuoO21uA=
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.7.0 h1:WSHQ+IS43OoUrWtD1/bbclrwK8TTH5hzp+umCiuxHgs=
github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.11.0 h1:JAKSXpt1YjtLA7YpPiqO9ss6sNXEsPfSGdwN0UHqzrw=
github.com/onsi/ginkgo v1.11.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/gomega v1.4.3 h1:RE1xgDvH7imwFD45h+u2SgIfERHlS2yNG4DObb5BSKU=
github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/onsi/gomega v1.8.1 h1:C5Dqfs/LeauYDX0jJXIe2SWmwCbGzx9yF8C8xy3Lh34=
github.com/onsi/gomega v1.8.1/go.mod h1:Ho0h+IUsWyvy1OpqCwxlQ/21gkhVunqlU8fDGcoTdcA=
Expand Down Expand Up @@ -562,11 +557,9 @@ github.com/uber/jaeger-lib v2.4.0+incompatible h1:fY7QsGQWiCt8pajv4r7JEvmATdCVaW
github.com/uber/jaeger-lib v2.4.0+incompatible/go.mod h1:ComeNDZlWwrWnDv8aPp0Ba6+uUTzImX/AauajbLI56U=
github.com/ugorji/go v1.1.4/go.mod h1:uQMGLiO92mf5W77hV/PUCpI3pbzQx3CRekS0kk+RGrc=
github.com/ugorji/go v1.1.5-pre/go.mod h1:FwP/aQVg39TXzItUBMwnWp9T9gPQnXw4Poh4/oBQZ/0=
github.com/ugorji/go v1.1.7 h1:/68gy2h+1mWMrwZFeD1kQialdSzAb432dtpeJ42ovdo=
github.com/ugorji/go v1.1.7/go.mod h1:kZn38zHttfInRq0xu/PH0az30d+z6vm202qpg1oXVMw=
github.com/ugorji/go/codec v0.0.0-20181022190402-e5e69e061d4f/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0=
github.com/ugorji/go/codec v1.1.5-pre/go.mod h1:tULtS6Gy1AE1yCENaw4Vb//HLH5njI2tfCQDUqRd8fI=
github.com/ugorji/go/codec v1.1.7 h1:2SvQaVZ1ouYrrKKwoSk2pzd4A9evlKJb9oTL+OaLUSs=
github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLYF3GoBXY=
github.com/unrolled/render v1.0.1/go.mod h1:gN9T0NhL4Bfbwu8ann7Ry/TGHYfosul+J0obPf6NBdM=
github.com/urfave/cli v1.20.0/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijbERA=
Expand Down
5 changes: 3 additions & 2 deletions v4/export/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -550,8 +550,9 @@ const (
)

var (
gcSafePointVersion = semver.New("4.0.0")
tableSampleVersion = semver.New("5.0.0-nightly")
decodeRegionVersion = semver.New("3.0.0")
gcSafePointVersion = semver.New("4.0.0")
tableSampleVersion = semver.New("5.0.0-nightly")
)

// ServerInfo is the combination of ServerType and ServerInfo
Expand Down
120 changes: 111 additions & 9 deletions v4/export/dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,11 @@ import (
"bytes"
"context"
"database/sql"
"encoding/hex"
"fmt"
"math/big"
"sort"
"strconv"
"strings"
"time"

Expand All @@ -22,11 +25,16 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
pclog "github.com/pingcap/log"
"github.com/pingcap/tidb/store/helper"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/util/codec"
pd "github.com/tikv/pd/client"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)

var openDBFunc = sql.Open

// Dumper is the dump progress structure
type Dumper struct {
tctx *tcontext.Context
Expand All @@ -36,16 +44,18 @@ type Dumper struct {
extStore storage.ExternalStorage
dbHandle *sql.DB

tidbPDClientForGC pd.Client
tidbPDClientForGC pd.Client
selectTiDBTableRegionFunc func(tctx *tcontext.Context, conn *sql.Conn, dbName, tableName string) (pkFields []string, pkVals [][]string, err error)
}

// NewDumper returns a new Dumper
func NewDumper(ctx context.Context, conf *Config) (*Dumper, error) {
tctx, cancelFn := tcontext.Background().WithContext(ctx).WithCancel()
d := &Dumper{
tctx: tctx,
conf: conf,
cancelCtx: cancelFn,
tctx: tctx,
conf: conf,
cancelCtx: cancelFn,
selectTiDBTableRegionFunc: selectTiDBTableRegion,
}
err := adjustConfig(conf,
registerTLSConfig,
Expand Down Expand Up @@ -139,6 +149,9 @@ func (d *Dumper) Dump() (dumpErr error) {
return err
}
}
if err = d.renewSelectTableRegionFuncForLowerTiDB(tctx); err != nil {
tctx.L().Error("fail to update select table region info for TiDB", zap.Error(err))
}

rebuildConn := func(conn *sql.Conn) (*sql.Conn, error) {
// make sure that the lock connection is still alive
Expand Down Expand Up @@ -214,8 +227,7 @@ func (d *Dumper) Dump() (dumpErr error) {
})

// get estimate total count
err = d.getEstimateTotalRowsCount(tctx, metaConn)
if err != nil {
if err = d.getEstimateTotalRowsCount(tctx, metaConn); err != nil {
tctx.L().Error("fail to get estimate total count", zap.Error(err))
}

Expand Down Expand Up @@ -442,7 +454,7 @@ func (d *Dumper) concurrentDumpTable(tctx *tcontext.Context, conn *sql.Conn, met
if conf.ServerInfo.ServerType == ServerTypeTiDB &&
conf.ServerInfo.ServerVersion != nil &&
(conf.ServerInfo.ServerVersion.Compare(*tableSampleVersion) >= 0 ||
(conf.ServerInfo.HasTiKV && conf.ServerInfo.ServerVersion.Compare(*gcSafePointVersion) >= 0)) {
(conf.ServerInfo.HasTiKV && conf.ServerInfo.ServerVersion.Compare(*decodeRegionVersion) >= 0)) {
return d.concurrentDumpTiDBTables(tctx, conn, meta, taskChan)
}
field, err := pickupPossibleField(db, tbl, conn, conf)
Expand Down Expand Up @@ -578,18 +590,22 @@ func (d *Dumper) concurrentDumpTiDBTables(tctx *tcontext.Context, conn *sql.Conn
handleVals [][]string
err error
)
// for TiDB v5.0+, we can use table sample directly
if d.conf.ServerInfo.ServerVersion.Compare(*tableSampleVersion) >= 0 {
tctx.L().Debug("dumping TiDB tables with TABLESAMPLE",
zap.String("database", db), zap.String("table", tbl))
handleColNames, handleVals, err = selectTiDBTableSample(tctx, conn, db, tbl)
} else {
// for TiDB v3.0+, we can use table region decode in TiDB directly
tctx.L().Debug("dumping TiDB tables with TABLE REGIONS",
zap.String("database", db), zap.String("table", tbl))
var partitions []string
partitions, err = GetPartitionNames(conn, db, tbl)
if d.conf.ServerInfo.ServerVersion.Compare(*gcSafePointVersion) >= 0 {
partitions, err = GetPartitionNames(conn, db, tbl)
}
if err == nil {
if len(partitions) == 0 {
handleColNames, handleVals, err = selectTiDBTableRegion(tctx, conn, db, tbl)
handleColNames, handleVals, err = d.selectTiDBTableRegionFunc(tctx, conn, db, tbl)
} else {
return d.concurrentDumpTiDBPartitionTables(tctx, conn, meta, taskChan, partitions)
}
Expand Down Expand Up @@ -1153,3 +1169,89 @@ func setSessionParam(d *Dumper) error {
}
return nil
}

func (d *Dumper) renewSelectTableRegionFuncForLowerTiDB(tctx *tcontext.Context) error {
conf := d.conf
if !(conf.ServerInfo.ServerType == ServerTypeTiDB && conf.ServerInfo.ServerVersion != nil && conf.ServerInfo.HasTiKV &&
conf.ServerInfo.ServerVersion.Compare(*decodeRegionVersion) >= 0 &&
conf.ServerInfo.ServerVersion.Compare(*gcSafePointVersion) < 0) {
tctx.L().Debug("no need to build region info because database is not TiDB 3.x")
return nil
}
dbHandle, err := openDBFunc("mysql", conf.GetDSN(""))
if err != nil {
return errors.Trace(err)
}
defer dbHandle.Close()
conn, err := dbHandle.Conn(tctx)
if err != nil {
return errors.Trace(err)
}
defer conn.Close()
dbInfos, err := GetDBInfo(conn, DatabaseTablesToMap(conf.Tables))
if err != nil {
return errors.Trace(err)
}
regionsInfo, err := GetRegionInfos(conn)
if err != nil {
return errors.Trace(err)
}
tikvHelper := &helper.Helper{}
tableInfos := tikvHelper.GetRegionsTableInfo(regionsInfo, dbInfos)

tableInfoMap := make(map[string]map[string][]int64, len(conf.Tables))
for _, region := range regionsInfo.Regions {
tableList := tableInfos[region.ID]
for _, table := range tableList {
db, tbl := table.DB.Name.O, table.Table.Name.O
if _, ok := tableInfoMap[db]; !ok {
tableInfoMap[db] = make(map[string][]int64, len(conf.Tables[db]))
}

key, err := hex.DecodeString(region.StartKey)
if err != nil {
d.L().Debug("invalid region start key", zap.Error(err), zap.String("key", region.StartKey))
continue
}
// Auto decode byte if needed.
_, bs, err := codec.DecodeBytes(key, nil)
if err == nil {
key = bs
}
// Try to decode it as a record key.
tableID, handle, err := tablecodec.DecodeRecordKey(key)
if err != nil {
d.L().Debug("fail to decode region start key", zap.Error(err), zap.String("key", region.StartKey), zap.Int64("tableID", tableID))
continue
}
tableInfoMap[db][tbl] = append(tableInfoMap[db][tbl], handle)
}
}
for _, tbInfos := range tableInfoMap {
for _, tbInfoLoop := range tbInfos {
// make sure tbInfo is only used in this loop
tbInfo := tbInfoLoop
sort.Slice(tbInfo, func(i, j int) bool {
return tbInfo[i] < tbInfo[j]
})
}
}

d.selectTiDBTableRegionFunc = func(tctx *tcontext.Context, conn *sql.Conn, dbName, tableName string) (pkFields []string, pkVals [][]string, err error) {
pkFields, _, err = selectTiDBRowKeyFields(conn, dbName, tableName, checkTiDBTableRegionPkFields)
if err != nil {
return
}
if tbInfos, ok := tableInfoMap[dbName]; ok {
if tbInfo, ok := tbInfos[tableName]; ok {
pkVals = make([][]string, len(tbInfo))
for i, val := range tbInfo {
pkVals[i] = []string{strconv.FormatInt(val, 10)}
}
}
}
return
}

return nil
}
14 changes: 14 additions & 0 deletions v4/export/prepare.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,3 +183,17 @@ func (d DatabaseTables) Literal() string {

return b.String()
}

// DatabaseTablesToMap transfers DatabaseTables to Map
func DatabaseTablesToMap(d DatabaseTables) map[string]map[string]struct{} {
mp := make(map[string]map[string]struct{}, len(d))
for name, infos := range d {
mp[name] = make(map[string]struct{}, len(infos))
for _, info := range infos {
if info.Type == TableTypeBase {
mp[name][info.Name] = struct{}{}
}
}
}
return mp
}
Loading

0 comments on commit d28f469

Please sign in to comment.