Skip to content
This repository has been archived by the owner on Dec 8, 2021. It is now read-only.

Commit

Permalink
restore: write index kv pairs and data kv pairs to separate engine (#132
Browse files Browse the repository at this point in the history
)

* restore: write index kvs and data kvs to seperate engine

* restore: rename  to

* restore: use single engine file to store index

* restore: make index engine limited by table concurrency

* restore: revert some changes

* restore: modify checkpoint proto

* restore: implement checkpoint for index engine file

* tests: add failpoint for CheckpointStatusIndexImported

* address comment

* address comment

* address comment
  • Loading branch information
lonng authored and kennytm committed Mar 8, 2019
1 parent b345961 commit 9b71f1c
Show file tree
Hide file tree
Showing 13 changed files with 624 additions and 298 deletions.
4 changes: 2 additions & 2 deletions cmd/tidb-lightning-ctl/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,8 +176,8 @@ func checkpointErrorDestroy(ctx context.Context, cfg *config.Config, tableName s

for _, table := range targetTables {
for engineID := 0; engineID < table.EnginesCount; engineID++ {
fmt.Fprintln(os.Stderr, "Closing and cleaning up engine:", table.TableName, engineID)
closedEngine, err := importer.UnsafeCloseEngine(ctx, table.TableName, engineID)
fmt.Fprintln(os.Stderr, "Closing and cleaning up engine:", table.TableName, int32(engineID))
closedEngine, err := importer.UnsafeCloseEngine(ctx, table.TableName, int32(engineID))
if err != nil {
fmt.Fprintln(os.Stderr, "* Encountered error while closing engine:", err)
lastErr = err
Expand Down
17 changes: 6 additions & 11 deletions lightning/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ func (c *Config) String() string {
type Lightning struct {
common.LogConfig
TableConcurrency int `toml:"table-concurrency" json:"table-concurrency"`
IndexConcurrency int `toml:"index-concurrency" json:"index-concurrency"`
RegionConcurrency int `toml:"region-concurrency" json:"region-concurrency"`
IOConcurrency int `toml:"io-concurrency" json:"io-concurrency"`
ProfilePort int `toml:"pprof-port" json:"pprof-port"`
Expand All @@ -91,10 +92,10 @@ type Lightning struct {

// PostRestore has some options which will be executed after kv restored.
type PostRestore struct {
Level1Compact *bool `toml:"level-1-compact" json:"level-1-compact"`
Compact bool `toml:"compact" json:"compact"`
Checksum bool `toml:"checksum" json:"checksum"`
Analyze bool `toml:"analyze" json:"analyze"`
Level1Compact bool `toml:"level-1-compact" json:"level-1-compact"`
Compact bool `toml:"compact" json:"compact"`
Checksum bool `toml:"checksum" json:"checksum"`
Analyze bool `toml:"analyze" json:"analyze"`
}

type MydumperRuntime struct {
Expand Down Expand Up @@ -144,6 +145,7 @@ func NewConfig() *Config {
App: Lightning{
RegionConcurrency: runtime.NumCPU(),
TableConcurrency: 8,
IndexConcurrency: 2,
IOConcurrency: 5,
CheckRequirements: true,
},
Expand Down Expand Up @@ -228,12 +230,5 @@ func (cfg *Config) Load() error {
cfg.Checkpoint.DSN = "/tmp/" + cfg.Checkpoint.Schema + ".pb"
}
}

// If the level 1 compact configuration not found, default to true
if cfg.PostRestore.Level1Compact == nil {
cfg.PostRestore.Level1Compact = new(bool)
*cfg.PostRestore.Level1Compact = true
}

return nil
}
6 changes: 3 additions & 3 deletions lightning/kv/importer.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ func isIgnorableOpenCloseEngineError(err error) bool {
return err == nil || strings.Contains(err.Error(), "FileExists")
}

func makeTag(tableName string, engineID int) string {
func makeTag(tableName string, engineID int32) string {
return fmt.Sprintf("%s:%d", tableName, engineID)
}

Expand All @@ -166,7 +166,7 @@ var engineNamespace = uuid.Must(uuid.FromString("d68d6abe-c59e-45d6-ade8-e2b0ceb
func (importer *Importer) OpenEngine(
ctx context.Context,
tableName string,
engineID int,
engineID int32,
) (*OpenedEngine, error) {
tag := makeTag(tableName, engineID)
engineUUID := uuid.NewV5(engineNamespace, tag)
Expand Down Expand Up @@ -312,7 +312,7 @@ func (engine *OpenedEngine) Close(ctx context.Context) (*ClosedEngine, error) {
// (Open -> Write -> Close -> Import). This method should only be used when one
// knows via other ways that the engine has already been opened, e.g. when
// resuming from a checkpoint.
func (importer *Importer) UnsafeCloseEngine(ctx context.Context, tableName string, engineID int) (*ClosedEngine, error) {
func (importer *Importer) UnsafeCloseEngine(ctx context.Context, tableName string, engineID int32) (*ClosedEngine, error) {
tag := makeTag(tableName, engineID)
engineUUID := uuid.NewV5(engineNamespace, tag)
return importer.unsafeCloseEngine(ctx, tag, engineUUID)
Expand Down
Loading

0 comments on commit 9b71f1c

Please sign in to comment.