Skip to content
This repository has been archived by the owner on Dec 8, 2021. It is now read-only.

optimize memory in hundreds-of tables scenario #52

Merged
merged 16 commits into from
Aug 6, 2018
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
one table on TableKvEncoder and share it to multiple region worker
  • Loading branch information
holys committed Jul 19, 2018
commit a5ca1d8eead45d53ed04f31ed3e9ccac9f1d67d2
26 changes: 14 additions & 12 deletions lightning/kv/sql2kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,20 +35,26 @@ type TableKVEncoder struct {
}

func NewTableKVEncoder(
kvEncoder kvec.KvEncoder,
idAlloc *kvec.Allocator,
table string, tableID int64,
dbName string,
table string, tableSchema string, tableID int64,
columns int, sqlMode string) (*TableKVEncoder, error) {

idAlloc := kvec.NewAllocator()
encoder, err := kvec.New(dbName, idAlloc)
if err != nil {
common.AppLogger.Errorf("err %s", errors.ErrorStack(err))
return nil, errors.Trace(err)
}

kvcodec := &TableKVEncoder{
table: table,
tableID: tableID,
encoder: kvEncoder,
encoder: encoder,
idAllocator: idAlloc,
columns: columns,
}

if err := kvcodec.init(); err != nil {
if err := kvcodec.init(tableSchema, sqlMode); err != nil {
kvcodec.Close()
return nil, errors.Trace(err)
}
Expand All @@ -58,21 +64,17 @@ func NewTableKVEncoder(
return kvcodec, nil
}

func InitialEncoder(encoder kvec.KvEncoder, tableSchema string, sqlMode string) error {
if err := encoder.ExecDDLSQL(tableSchema); err != nil {
func (kvcodec *TableKVEncoder) init(tableSchema string, sqlMode string) error {
if err := kvcodec.encoder.ExecDDLSQL(tableSchema); err != nil {
common.AppLogger.Errorf("[sql2kv] tableSchema execute failed : %v", errors.ErrorStack(err))
return errors.Trace(err)
}

err := encoder.SetSystemVariable("sql_mode", sqlMode)
err := kvcodec.encoder.SetSystemVariable("sql_mode", sqlMode)
if err != nil {
return errors.Trace(err)
}
common.AppLogger.Debugf("set sql_mode=%s", sqlMode)
return nil
}

func (kvcodec *TableKVEncoder) init() error {
if PrepareStmtMode {
reserve := (encodeBatchRows * kvcodec.columns) << 1 // TODO : rows x ( cols + indices )
kvcodec.bufValues = make([]interface{}, 0, reserve)
Expand Down
59 changes: 20 additions & 39 deletions lightning/restore/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"github.com/pingcap/tidb-lightning/lightning/mydump"
verify "github.com/pingcap/tidb-lightning/lightning/verification"
tidbcfg "github.com/pingcap/tidb/config"
kvec "github.com/pingcap/tidb/util/kvencoder"
"github.com/satori/go.uuid"
"golang.org/x/net/context"
)
Expand Down Expand Up @@ -437,8 +436,7 @@ type regionRestoreTask struct {
status string
region *mydump.TableRegion
executor *RegionRestoreExectuor
encoder kvec.KvEncoder
idAlloc *kvec.Allocator
encoder *kv.TableKVEncoder
delivers *kv.KVDeliverKeeper
// TODO : progress ...
callback restoreCallback
Expand All @@ -450,8 +448,7 @@ type regionRestoreTask struct {
func newRegionRestoreTask(
region *mydump.TableRegion,
executor *RegionRestoreExectuor,
encoder kvec.KvEncoder,
idAlloc *kvec.Allocator,
encoder *kv.TableKVEncoder,
dbInfo *TidbDBInfo,
tableInfo *TidbTableInfo,
cfg *config.Config,
Expand All @@ -468,7 +465,6 @@ func newRegionRestoreTask(
tableInfo: tableInfo,
cfg: cfg,
encoder: encoder,
idAlloc: idAlloc,
callback: callback,
}
}
Expand Down Expand Up @@ -496,34 +492,23 @@ func (t *regionRestoreTask) Run(ctx context.Context) error {
}

func (t *regionRestoreTask) run(ctx context.Context) (int64, uint64, *verify.KVChecksum, error) {
kvEncoder, err := kv.NewTableKVEncoder(
t.encoder, t.idAlloc, t.tableInfo.Name, t.tableInfo.ID,
t.tableInfo.Columns, t.cfg.TiDB.SQLMode)
if err != nil {
common.AppLogger.Errorf("failed to new kv encoder (%s) : %s", t.dbInfo.Name, err.Error())
return 0, 0, nil, errors.Trace(err)
}
defer kvEncoder.Close()

kvDeliver := t.delivers.AcquireClient(t.executor.tableMeta.DB, t.executor.tableMeta.Name)
// cause bug here.
defer t.delivers.RecycleClient(kvDeliver)

nextRowID, affectedRows, checksum, err := t.executor.Run(ctx, t.region, kvEncoder, kvDeliver)
nextRowID, affectedRows, checksum, err := t.executor.Run(ctx, t.region, t.encoder, kvDeliver)
return nextRowID, affectedRows, checksum, errors.Trace(err)
}

type TableRestore struct {
mux sync.Mutex
ctx context.Context

cfg *config.Config
dbInfo *TidbDBInfo
tableInfo *TidbTableInfo
tableMeta *mydump.MDTableMeta
// encoders *kvEncoderPool
encoder kvec.KvEncoder
idAlloc *kvec.Allocator
cfg *config.Config
dbInfo *TidbDBInfo
tableInfo *TidbTableInfo
tableMeta *mydump.MDTableMeta
encoder *kv.TableKVEncoder
deliversMgr *kv.KVDeliverKeeper

regions []*mydump.TableRegion
Expand All @@ -550,27 +535,22 @@ func NewTableRestore(
deliverMgr *kv.KVDeliverKeeper,
) *TableRestore {

idAlloc := kvec.NewAllocator()
encoder, err := kvec.New(dbInfo.Name, idAlloc)
encoder, err := kv.NewTableKVEncoder(
dbInfo.Name, tableInfo.Name, tableInfo.CreateTableStmt, tableInfo.ID,
tableInfo.Columns, cfg.TiDB.SQLMode)
if err != nil {
common.AppLogger.Errorf("err %s", errors.ErrorStack(err))
return nil
}

err = kv.InitialEncoder(encoder, tableInfo.CreateTableStmt, cfg.TiDB.SQLMode)
if err != nil {
common.AppLogger.Errorf("err %s", errors.ErrorStack(err))
common.AppLogger.Errorf("failed to new kv encoder (%s) : %s", dbInfo.Name, err.Error())
//TODO return error
return nil
}

tr := &TableRestore{
ctx: ctx,
cfg: cfg,
dbInfo: dbInfo,
tableInfo: tableInfo,
tableMeta: tableMeta,
ctx: ctx,
cfg: cfg,
dbInfo: dbInfo,
tableInfo: tableInfo,
tableMeta: tableMeta,
encoder: encoder,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems no need to save it? because we do not use it anymore

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to save it because we need to close the encoder in the last when all the regions(of the table) finished.

Why we can't close the encoder early? The reason is that it may close the global dom.
ref: https://github.com/pingcap/tidb/blob/master/util/kvencoder/kv_encoder.go#L118

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

😳 fine

idAlloc: idAlloc,
deliversMgr: deliverMgr,
handledRegions: make(map[int]*regionStat),
}
Expand All @@ -580,6 +560,7 @@ func NewTableRestore(
}

func (tr *TableRestore) Close() {
tr.encoder.Close()
common.AppLogger.Infof("[%s] restore done", common.UniqueTable(tr.tableMeta.DB, tr.tableMeta.Name))
}

Expand All @@ -594,7 +575,7 @@ func (tr *TableRestore) loadRegions() {
tasks := make([]*regionRestoreTask, 0, len(regions))
for _, region := range regions {
executor := NewRegionRestoreExectuor(tr.cfg, tr.tableMeta)
task := newRegionRestoreTask(region, executor, tr.encoder, tr.idAlloc, tr.dbInfo, tr.tableInfo, tr.cfg, tr.deliversMgr, tr.onRegionFinished)
task := newRegionRestoreTask(region, executor, tr.encoder, tr.dbInfo, tr.tableInfo, tr.cfg, tr.deliversMgr, tr.onRegionFinished)
tasks = append(tasks, task)
common.AppLogger.Debugf("[%s] region - %s", table, region.Name())
}
Expand Down