Skip to content
This repository has been archived by the owner on Dec 8, 2021. It is now read-only.

restore: write index kv pairs and data kv pairs to separate engine #132

Merged
merged 11 commits into from
Mar 8, 2019
11 changes: 9 additions & 2 deletions lightning/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ func (c *Config) String() string {
type Lightning struct {
common.LogConfig
TableConcurrency int `toml:"table-concurrency" json:"table-concurrency"`
IndexConcurrency int `toml:"index-concurrency" json:"index-concurrency"`
RegionConcurrency int `toml:"region-concurrency" json:"region-concurrency"`
IOConcurrency int `toml:"io-concurrency" json:"io-concurrency"`
ProfilePort int `toml:"pprof-port" json:"pprof-port"`
Expand Down Expand Up @@ -229,11 +230,17 @@ func (cfg *Config) Load() error {
}
}

// If the level 1 compact configuration not found, default to true
// If the level 1 compact configuration not found, default to false
lonng marked this conversation as resolved.
Show resolved Hide resolved
if cfg.PostRestore.Level1Compact == nil {
cfg.PostRestore.Level1Compact = new(bool)
*cfg.PostRestore.Level1Compact = true
*cfg.PostRestore.Level1Compact = false
}

if cfg.App.TableConcurrency < 2 {
cfg.App.TableConcurrency = 2
}
if cfg.App.IndexConcurrency < 2 {
cfg.App.IndexConcurrency = 2
}
lonng marked this conversation as resolved.
Show resolved Hide resolved
return nil
}
55 changes: 33 additions & 22 deletions lightning/restore/checkpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ const (
CheckpointStatusAllWritten CheckpointStatus = 60
CheckpointStatusClosed CheckpointStatus = 90
CheckpointStatusImported CheckpointStatus = 120
CheckpointStatusIndexImported CheckpointStatus = 140
CheckpointStatusAlteredAutoInc CheckpointStatus = 150
CheckpointStatusChecksumSkipped CheckpointStatus = 170
CheckpointStatusChecksummed CheckpointStatus = 180
Expand All @@ -54,7 +55,7 @@ const (
// the table names to store each kind of checkpoint in the checkpoint database
// remember to increase the version number in case of incompatible change.
checkpointTableNameTable = "table_v4"
checkpointTableNameEngine = "engine_v4"
checkpointTableNameEngine = "engine_v5"
checkpointTableNameChunk = "chunk_v4"
)

Expand Down Expand Up @@ -108,8 +109,9 @@ type ChunkCheckpoint struct {
}

type EngineCheckpoint struct {
Status CheckpointStatus
Chunks []*ChunkCheckpoint // a sorted array
EngineID int
Status CheckpointStatus
Chunks []*ChunkCheckpoint // a sorted array
}

type TableCheckpoint struct {
lonng marked this conversation as resolved.
Show resolved Hide resolved
Expand Down Expand Up @@ -192,7 +194,7 @@ func (merger *StatusCheckpointMerger) SetInvalid() {
}

func (merger *StatusCheckpointMerger) MergeInto(cpd *TableCheckpointDiff) {
if merger.EngineID == -1 || merger.Status <= CheckpointStatusMaxInvalid {
if merger.EngineID == invalidEngineID || merger.Status <= CheckpointStatusMaxInvalid {
lonng marked this conversation as resolved.
Show resolved Hide resolved
cpd.status = merger.Status
cpd.hasStatus = true
}
Expand Down Expand Up @@ -318,7 +320,7 @@ func NewMySQLCheckpointsDB(ctx context.Context, db *sql.DB, schemaName string) (
err = common.ExecWithRetry(ctx, db, "(create engine checkpoints table)", fmt.Sprintf(`
CREATE TABLE IF NOT EXISTS %s.%s (
table_name varchar(261) NOT NULL,
engine_id int unsigned NOT NULL,
engine_id int NOT NULL,
status tinyint unsigned DEFAULT 30,
create_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
update_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
Expand Down Expand Up @@ -432,10 +434,18 @@ func (cpdb *MySQLCheckpointsDB) Get(ctx context.Context, tableName string) (*Tab
if err := engineRows.Scan(&engineID, &status); err != nil {
return errors.Trace(err)
}
for len(cp.Engines) <= engineID {
cp.Engines = append(cp.Engines, new(EngineCheckpoint))
var found bool
for _, engine := range cp.Engines {
lonng marked this conversation as resolved.
Show resolved Hide resolved
if engineID == engine.EngineID {
engine.Status = CheckpointStatus(status)
found = true
break
}
}
if !found {
checkpoint := &EngineCheckpoint{EngineID: engineID, Status: CheckpointStatus(status)}
cp.Engines = append(cp.Engines, checkpoint)
}
cp.Engines[engineID].Status = CheckpointStatus(status)
}
if err := engineRows.Err(); err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -472,6 +482,8 @@ func (cpdb *MySQLCheckpointsDB) Get(ctx context.Context, tableName string) (*Tab
return errors.Trace(err)
}
value.Checksum = verify.MakeKVChecksum(kvcBytes, kvcKVs, kvcChecksum)
// It is ok to use engineID here, because index engine file will not
lonng marked this conversation as resolved.
Show resolved Hide resolved
// contains any chunks
cp.Engines[engineID].Chunks = append(cp.Engines[engineID].Chunks, value)
}
if err := chunkRows.Err(); err != nil {
Expand Down Expand Up @@ -527,14 +539,14 @@ func (cpdb *MySQLCheckpointsDB) InsertEngineCheckpoints(ctx context.Context, tab
}
defer chunkStmt.Close()

for engineID, engine := range checkpoints {
_, err = engineStmt.ExecContext(c, tableName, engineID, engine.Status)
for _, engine := range checkpoints {
_, err = engineStmt.ExecContext(c, tableName, engine.EngineID, engine.Status)
if err != nil {
return errors.Trace(err)
}
for _, value := range engine.Chunks {
_, err = chunkStmt.ExecContext(
c, tableName, engineID,
c, tableName, engine.EngineID,
value.Key.Path, value.Key.Offset, value.Columns, value.ShouldIncludeRowID,
value.Chunk.Offset, value.Chunk.EndOffset, value.Chunk.PrevRowIDMax, value.Chunk.RowIDMax,
value.Checksum.SumSize(), value.Checksum.SumKVS(), value.Checksum.Sum(),
Expand Down Expand Up @@ -700,8 +712,9 @@ func (cpdb *FileCheckpointsDB) Get(_ context.Context, tableName string) (*TableC

for _, engineModel := range tableModel.Engines {
engine := &EngineCheckpoint{
Status: CheckpointStatus(engineModel.Status),
Chunks: make([]*ChunkCheckpoint, 0, len(engineModel.Chunks)),
EngineID: int(engineModel.EngineId),
Status: CheckpointStatus(engineModel.Status),
Chunks: make([]*ChunkCheckpoint, 0, len(engineModel.Chunks)),
}

for _, chunkModel := range engineModel.Chunks {
Expand Down Expand Up @@ -737,15 +750,12 @@ func (cpdb *FileCheckpointsDB) InsertEngineCheckpoints(_ context.Context, tableN
defer cpdb.lock.Unlock()

tableModel := cpdb.checkpoints.Checkpoints[tableName]
for len(tableModel.Engines) < len(checkpoints) {
tableModel.Engines = append(tableModel.Engines, &EngineCheckpointModel{
Status: uint32(CheckpointStatusLoaded),
Chunks: make(map[string]*ChunkCheckpointModel),
})
}

for engineID, engine := range checkpoints {
engineModel := tableModel.Engines[engineID]
for _, engine := range checkpoints {
engineModel := &EngineCheckpointModel{
EngineId: int32(engine.EngineID),
Status: uint32(CheckpointStatusLoaded),
Chunks: make(map[string]*ChunkCheckpointModel),
}
for _, value := range engine.Chunks {
key := value.Key.String()
chunk, ok := engineModel.Chunks[key]
Expand All @@ -766,6 +776,7 @@ func (cpdb *FileCheckpointsDB) InsertEngineCheckpoints(_ context.Context, tableN
chunk.KvcKvs = value.Checksum.SumKVS()
chunk.KvcChecksum = value.Checksum.Sum()
}
tableModel.Engines = append(tableModel.Engines, engineModel)
}

return errors.Trace(cpdb.save())
Expand Down
Loading