Skip to content

Commit

Permalink
lightning: fix check disk quota routine block when some engine is imp…
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Jun 29, 2023
1 parent 21345c2 commit d68abe5
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 38 deletions.
32 changes: 18 additions & 14 deletions br/pkg/lightning/backend/local/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func (r *syncedRanges) reset() {
type Engine struct {
engineMeta
closed atomic.Bool
db *pebble.DB
db atomic.Pointer[pebble.DB]
UUID uuid.UUID
localWriters sync.Map

Expand Down Expand Up @@ -148,14 +148,19 @@ func (e *Engine) setError(err error) {
}
}

func (e *Engine) getDB() *pebble.DB {
return e.db.Load()
}

// Close closes the engine and release all resources.
func (e *Engine) Close() error {
e.logger.Debug("closing local engine", zap.Stringer("engine", e.UUID), zap.Stack("stack"))
if e.db == nil {
db := e.getDB()
if db == nil {
return nil
}
err := errors.Trace(e.db.Close())
e.db = nil
err := errors.Trace(db.Close())
e.db.Store(nil)
return err
}

Expand Down Expand Up @@ -455,9 +460,7 @@ func getSizeProperties(logger log.Logger, db *pebble.DB, keyAdapter KeyAdapter)
}

func (e *Engine) getEngineFileSize() backend.EngineFileSize {
e.mutex.RLock()
db := e.db
e.mutex.RUnlock()
db := e.getDB()

var total pebble.LevelMetrics
if db != nil {
Expand Down Expand Up @@ -869,7 +872,7 @@ func (e *Engine) flushEngineWithoutLock(ctx context.Context) error {
return err
}

flushFinishedCh, err := e.db.AsyncFlush()
flushFinishedCh, err := e.getDB().AsyncFlush()
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -897,11 +900,11 @@ func saveEngineMetaToDB(meta *engineMeta, db *pebble.DB) error {
func (e *Engine) saveEngineMeta() error {
e.logger.Debug("save engine meta", zap.Stringer("uuid", e.UUID), zap.Int64("count", e.Length.Load()),
zap.Int64("size", e.TotalSize.Load()))
return errors.Trace(saveEngineMetaToDB(&e.engineMeta, e.db))
return errors.Trace(saveEngineMetaToDB(&e.engineMeta, e.getDB()))
}

func (e *Engine) loadEngineMeta() error {
jsonBytes, closer, err := e.db.Get(engineMetaKey)
jsonBytes, closer, err := e.getDB().Get(engineMetaKey)
if err != nil {
if err == pebble.ErrNotFound {
e.logger.Debug("local db missing engine meta", zap.Stringer("uuid", e.UUID), log.ShortError(err))
Expand All @@ -928,13 +931,13 @@ func (e *Engine) newKVIter(ctx context.Context, opts *pebble.IterOptions) Iter {
opts = &newOpts
}
if !e.duplicateDetection {
return pebbleIter{Iterator: e.db.NewIter(opts)}
return pebbleIter{Iterator: e.getDB().NewIter(opts)}
}
logger := log.FromContext(ctx).With(
zap.String("table", common.UniqueTable(e.tableInfo.DB, e.tableInfo.Name)),
zap.Int64("tableID", e.tableInfo.ID),
zap.Stringer("engineUUID", e.UUID))
return newDupDetectIter(e.db, e.keyAdapter, opts, e.duplicateDB, logger, e.dupDetectOpt)
return newDupDetectIter(e.getDB(), e.keyAdapter, opts, e.duplicateDB, logger, e.dupDetectOpt)
}

// getFirstAndLastKey reads the first and last key in range [lowerBound, upperBound)
Expand Down Expand Up @@ -1509,8 +1512,9 @@ func (i dbSSTIngester) ingest(metas []*sstMeta) error {
for _, m := range metas {
paths = append(paths, m.path)
}
if i.e.db == nil {
db := i.e.getDB()
if db == nil {
return errorEngineClosed
}
return i.e.db.Ingest(paths)
return db.Ingest(paths)
}
43 changes: 41 additions & 2 deletions br/pkg/lightning/backend/local/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"os"
"path"
"path/filepath"
"sync"
"testing"

"github.com/cockroachdb/pebble"
Expand All @@ -41,6 +42,44 @@ func makePebbleDB(t *testing.T, opt *pebble.Options) (*pebble.DB, string) {
return db, tmpPath
}

func TestGetEngineSizeWhenImport(t *testing.T) {
opt := &pebble.Options{
MemTableSize: 1024 * 1024,
MaxConcurrentCompactions: 16,
L0CompactionThreshold: math.MaxInt32, // set to max try to disable compaction
L0StopWritesThreshold: math.MaxInt32, // set to max try to disable compaction
DisableWAL: true,
ReadOnly: false,
}
db, tmpPath := makePebbleDB(t, opt)

_, engineUUID := backend.MakeUUID("ww", 0)
engineCtx, cancel := context.WithCancel(context.Background())
f := &Engine{
UUID: engineUUID,
sstDir: tmpPath,
ctx: engineCtx,
cancel: cancel,
sstMetasChan: make(chan metaOrFlush, 64),
keyAdapter: noopKeyAdapter{},
logger: log.L(),
}
f.db.Store(db)
// simulate import
f.lock(importMutexStateImport)
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
engineFileSize := f.getEngineFileSize()
require.Equal(t, f.UUID, engineFileSize.UUID)
require.True(t, engineFileSize.IsImporting)
}()
wg.Wait()
f.unlock()
require.NoError(t, f.Close())
}

func TestIngestSSTWithClosedEngine(t *testing.T) {
opt := &pebble.Options{
MemTableSize: 1024 * 1024,
Expand All @@ -55,7 +94,6 @@ func TestIngestSSTWithClosedEngine(t *testing.T) {
_, engineUUID := backend.MakeUUID("ww", 0)
engineCtx, cancel := context.WithCancel(context.Background())
f := &Engine{
db: db,
UUID: engineUUID,
sstDir: tmpPath,
ctx: engineCtx,
Expand All @@ -64,6 +102,7 @@ func TestIngestSSTWithClosedEngine(t *testing.T) {
keyAdapter: noopKeyAdapter{},
logger: log.L(),
}
f.db.Store(db)
f.sstIngester = dbSSTIngester{e: f}
sstPath := path.Join(tmpPath, uuid.New().String()+".sst")
file, err := os.Create(sstPath)
Expand Down Expand Up @@ -93,9 +132,9 @@ func TestIngestSSTWithClosedEngine(t *testing.T) {
func TestGetFirstAndLastKey(t *testing.T) {
db, tmpPath := makePebbleDB(t, nil)
f := &Engine{
db: db,
sstDir: tmpPath,
}
f.db.Store(db)
err := db.Set([]byte("a"), []byte("a"), nil)
require.NoError(t, err)
err = db.Set([]byte("c"), []byte("c"), nil)
Expand Down
10 changes: 5 additions & 5 deletions br/pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -892,7 +892,7 @@ func (local *Backend) OpenEngine(ctx context.Context, cfg *backend.EngineConfig,
logger: log.FromContext(ctx),
})
engine := e.(*Engine)
engine.db = db
engine.db.Store(db)
engine.sstIngester = dbSSTIngester{e: engine}
if err = engine.loadEngineMeta(); err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -931,7 +931,6 @@ func (local *Backend) CloseEngine(ctx context.Context, cfg *backend.EngineConfig
}
engine := &Engine{
UUID: engineUUID,
db: db,
sstMetasChan: make(chan metaOrFlush),
tableInfo: cfg.TableInfo,
keyAdapter: local.keyAdapter,
Expand All @@ -940,6 +939,7 @@ func (local *Backend) CloseEngine(ctx context.Context, cfg *backend.EngineConfig
duplicateDB: local.duplicateDB,
logger: log.FromContext(ctx),
}
engine.db.Store(db)
engine.sstIngester = dbSSTIngester{e: engine}
if err = engine.loadEngineMeta(); err != nil {
return err
Expand Down Expand Up @@ -1035,7 +1035,7 @@ func (local *Backend) readAndSplitIntoRange(
}

logger := log.FromContext(ctx).With(zap.Stringer("engine", engine.UUID))
sizeProps, err := getSizePropertiesFn(logger, engine.db, local.keyAdapter)
sizeProps, err := getSizePropertiesFn(logger, engine.getDB(), local.keyAdapter)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -1124,7 +1124,7 @@ func (local *Backend) generateAndSendJob(
// when use dynamic region feature, the region may be very big, we need
// to split to smaller ranges to increase the concurrency.
if regionSplitSize > 2*int64(config.SplitRegionSize) {
sizeProps, err := getSizePropertiesFn(logger, engine.db, local.keyAdapter)
sizeProps, err := getSizePropertiesFn(logger, engine.getDB(), local.keyAdapter)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -1594,7 +1594,7 @@ func (local *Backend) ResetEngine(ctx context.Context, engineUUID uuid.UUID) err
}
db, err := local.openEngineDB(engineUUID, false)
if err == nil {
localEngine.db = db
localEngine.db.Store(db)
localEngine.engineMeta = engineMeta{}
if !common.IsDirExists(localEngine.sstDir) {
if err := os.Mkdir(localEngine.sstDir, 0o750); err != nil {
Expand Down
34 changes: 17 additions & 17 deletions br/pkg/lightning/backend/local/local_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,6 @@ func testLocalWriter(t *testing.T, needSort bool, partitialSort bool) {
_, engineUUID := backend.MakeUUID("ww", 0)
engineCtx, cancel := context.WithCancel(context.Background())
f := &Engine{
db: db,
UUID: engineUUID,
sstDir: tmpPath,
ctx: engineCtx,
Expand All @@ -337,6 +336,7 @@ func testLocalWriter(t *testing.T, needSort bool, partitialSort bool) {
keyAdapter: noopKeyAdapter{},
logger: log.L(),
}
f.db.Store(db)
f.sstIngester = dbSSTIngester{e: f}
f.wg.Add(1)
go f.ingestSSTLoop()
Expand Down Expand Up @@ -480,7 +480,6 @@ func TestLocalIngestLoop(t *testing.T) {
_, engineUUID := backend.MakeUUID("ww", 0)
engineCtx, cancel := context.WithCancel(context.Background())
f := Engine{
db: db,
UUID: engineUUID,
sstDir: tmpPath,
ctx: engineCtx,
Expand All @@ -493,6 +492,7 @@ func TestLocalIngestLoop(t *testing.T) {
},
logger: log.L(),
}
f.db.Store(db)
f.sstIngester = testIngester{}
f.wg.Add(1)
go f.ingestSSTLoop()
Expand Down Expand Up @@ -570,7 +570,6 @@ func testMergeSSTs(t *testing.T, kvs [][]common.KvPair, meta *sstMeta) {
engineCtx, cancel := context.WithCancel(context.Background())

f := &Engine{
db: db,
UUID: engineUUID,
sstDir: tmpPath,
ctx: engineCtx,
Expand All @@ -583,6 +582,7 @@ func testMergeSSTs(t *testing.T, kvs [][]common.KvPair, meta *sstMeta) {
},
logger: log.L(),
}
f.db.Store(db)

createSSTWriter := func() (*sstWriter, error) {
path := filepath.Join(f.sstDir, uuid.New().String()+".sst")
Expand Down Expand Up @@ -1152,7 +1152,6 @@ func TestCheckPeersBusy(t *testing.T) {
_, engineUUID := backend.MakeUUID("ww", 0)
engineCtx, cancel2 := context.WithCancel(context.Background())
f := &Engine{
db: db,
UUID: engineUUID,
sstDir: tmpPath,
ctx: engineCtx,
Expand All @@ -1161,9 +1160,10 @@ func TestCheckPeersBusy(t *testing.T) {
keyAdapter: noopKeyAdapter{},
logger: log.L(),
}
err := f.db.Set([]byte("a"), []byte("a"), nil)
f.db.Store(db)
err := db.Set([]byte("a"), []byte("a"), nil)
require.NoError(t, err)
err = f.db.Set([]byte("b"), []byte("b"), nil)
err = db.Set([]byte("b"), []byte("b"), nil)
require.NoError(t, err)

jobCh := make(chan *regionJob, 10)
Expand Down Expand Up @@ -1288,7 +1288,6 @@ func TestNotLeaderErrorNeedUpdatePeers(t *testing.T) {
_, engineUUID := backend.MakeUUID("ww", 0)
engineCtx, cancel2 := context.WithCancel(context.Background())
f := &Engine{
db: db,
UUID: engineUUID,
sstDir: tmpPath,
ctx: engineCtx,
Expand All @@ -1297,7 +1296,8 @@ func TestNotLeaderErrorNeedUpdatePeers(t *testing.T) {
keyAdapter: noopKeyAdapter{},
logger: log.L(),
}
err := f.db.Set([]byte("a"), []byte("a"), nil)
f.db.Store(db)
err := db.Set([]byte("a"), []byte("a"), nil)
require.NoError(t, err)

jobCh := make(chan *regionJob, 10)
Expand Down Expand Up @@ -1395,7 +1395,6 @@ func TestPartialWriteIngestErrorWillPanic(t *testing.T) {
_, engineUUID := backend.MakeUUID("ww", 0)
engineCtx, cancel2 := context.WithCancel(context.Background())
f := &Engine{
db: db,
UUID: engineUUID,
sstDir: tmpPath,
ctx: engineCtx,
Expand All @@ -1404,9 +1403,10 @@ func TestPartialWriteIngestErrorWillPanic(t *testing.T) {
keyAdapter: noopKeyAdapter{},
logger: log.L(),
}
err := f.db.Set([]byte("a"), []byte("a"), nil)
f.db.Store(db)
err := db.Set([]byte("a"), []byte("a"), nil)
require.NoError(t, err)
err = f.db.Set([]byte("a2"), []byte("a2"), nil)
err = db.Set([]byte("a2"), []byte("a2"), nil)
require.NoError(t, err)

jobCh := make(chan *regionJob, 10)
Expand Down Expand Up @@ -1502,7 +1502,6 @@ func TestPartialWriteIngestBusy(t *testing.T) {
_, engineUUID := backend.MakeUUID("ww", 0)
engineCtx, cancel2 := context.WithCancel(context.Background())
f := &Engine{
db: db,
UUID: engineUUID,
sstDir: tmpPath,
ctx: engineCtx,
Expand All @@ -1511,9 +1510,10 @@ func TestPartialWriteIngestBusy(t *testing.T) {
keyAdapter: noopKeyAdapter{},
logger: log.L(),
}
err := f.db.Set([]byte("a"), []byte("a"), nil)
f.db.Store(db)
err := db.Set([]byte("a"), []byte("a"), nil)
require.NoError(t, err)
err = f.db.Set([]byte("a2"), []byte("a2"), nil)
err = db.Set([]byte("a2"), []byte("a2"), nil)
require.NoError(t, err)

jobCh := make(chan *regionJob, 10)
Expand Down Expand Up @@ -1641,7 +1641,6 @@ func TestSplitRangeAgain4BigRegion(t *testing.T) {
ctx := context.Background()
engineCtx, cancel := context.WithCancel(context.Background())
f := &Engine{
db: db,
UUID: engineUUID,
sstDir: tmpPath,
ctx: engineCtx,
Expand All @@ -1650,11 +1649,12 @@ func TestSplitRangeAgain4BigRegion(t *testing.T) {
keyAdapter: noopKeyAdapter{},
logger: log.L(),
}
f.db.Store(db)
// keys starts with 0 is meta keys, so we start with 1.
for i := byte(1); i <= 10; i++ {
err := f.db.Set([]byte{i}, []byte{i}, nil)
err := db.Set([]byte{i}, []byte{i}, nil)
require.NoError(t, err)
err = f.db.Set([]byte{i, 1}, []byte{i, 1}, nil)
err = db.Set([]byte{i, 1}, []byte{i, 1}, nil)
require.NoError(t, err)
}

Expand Down

0 comments on commit d68abe5

Please sign in to comment.