diff --git a/br/pkg/lightning/backend/local/engine.go b/br/pkg/lightning/backend/local/engine.go index 22743f618fd86..399939fcd8e15 100644 --- a/br/pkg/lightning/backend/local/engine.go +++ b/br/pkg/lightning/backend/local/engine.go @@ -97,7 +97,7 @@ func (r *syncedRanges) reset() { type Engine struct { engineMeta closed atomic.Bool - db *pebble.DB + db atomic.Pointer[pebble.DB] UUID uuid.UUID localWriters sync.Map @@ -148,14 +148,19 @@ func (e *Engine) setError(err error) { } } +func (e *Engine) getDB() *pebble.DB { + return e.db.Load() +} + // Close closes the engine and release all resources. func (e *Engine) Close() error { e.logger.Debug("closing local engine", zap.Stringer("engine", e.UUID), zap.Stack("stack")) - if e.db == nil { + db := e.getDB() + if db == nil { return nil } - err := errors.Trace(e.db.Close()) - e.db = nil + err := errors.Trace(db.Close()) + e.db.Store(nil) return err } @@ -455,9 +460,7 @@ func getSizeProperties(logger log.Logger, db *pebble.DB, keyAdapter KeyAdapter) } func (e *Engine) getEngineFileSize() backend.EngineFileSize { - e.mutex.RLock() - db := e.db - e.mutex.RUnlock() + db := e.getDB() var total pebble.LevelMetrics if db != nil { @@ -869,7 +872,7 @@ func (e *Engine) flushEngineWithoutLock(ctx context.Context) error { return err } - flushFinishedCh, err := e.db.AsyncFlush() + flushFinishedCh, err := e.getDB().AsyncFlush() if err != nil { return errors.Trace(err) } @@ -897,11 +900,11 @@ func saveEngineMetaToDB(meta *engineMeta, db *pebble.DB) error { func (e *Engine) saveEngineMeta() error { e.logger.Debug("save engine meta", zap.Stringer("uuid", e.UUID), zap.Int64("count", e.Length.Load()), zap.Int64("size", e.TotalSize.Load())) - return errors.Trace(saveEngineMetaToDB(&e.engineMeta, e.db)) + return errors.Trace(saveEngineMetaToDB(&e.engineMeta, e.getDB())) } func (e *Engine) loadEngineMeta() error { - jsonBytes, closer, err := e.db.Get(engineMetaKey) + jsonBytes, closer, err := e.getDB().Get(engineMetaKey) if err != nil { if err == pebble.ErrNotFound { e.logger.Debug("local db missing engine meta", zap.Stringer("uuid", e.UUID), log.ShortError(err)) @@ -928,13 +931,13 @@ func (e *Engine) newKVIter(ctx context.Context, opts *pebble.IterOptions) Iter { opts = &newOpts } if !e.duplicateDetection { - return pebbleIter{Iterator: e.db.NewIter(opts)} + return pebbleIter{Iterator: e.getDB().NewIter(opts)} } logger := log.FromContext(ctx).With( zap.String("table", common.UniqueTable(e.tableInfo.DB, e.tableInfo.Name)), zap.Int64("tableID", e.tableInfo.ID), zap.Stringer("engineUUID", e.UUID)) - return newDupDetectIter(e.db, e.keyAdapter, opts, e.duplicateDB, logger, e.dupDetectOpt) + return newDupDetectIter(e.getDB(), e.keyAdapter, opts, e.duplicateDB, logger, e.dupDetectOpt) } // getFirstAndLastKey reads the first and last key in range [lowerBound, upperBound) @@ -1509,8 +1512,9 @@ func (i dbSSTIngester) ingest(metas []*sstMeta) error { for _, m := range metas { paths = append(paths, m.path) } - if i.e.db == nil { + db := i.e.getDB() + if db == nil { return errorEngineClosed } - return i.e.db.Ingest(paths) + return db.Ingest(paths) } diff --git a/br/pkg/lightning/backend/local/engine_test.go b/br/pkg/lightning/backend/local/engine_test.go index 46f96a09484c5..93a007f867016 100644 --- a/br/pkg/lightning/backend/local/engine_test.go +++ b/br/pkg/lightning/backend/local/engine_test.go @@ -21,6 +21,7 @@ import ( "os" "path" "path/filepath" + "sync" "testing" "github.com/cockroachdb/pebble" @@ -41,6 +42,44 @@ func makePebbleDB(t *testing.T, opt *pebble.Options) (*pebble.DB, string) { return db, tmpPath } +func TestGetEngineSizeWhenImport(t *testing.T) { + opt := &pebble.Options{ + MemTableSize: 1024 * 1024, + MaxConcurrentCompactions: 16, + L0CompactionThreshold: math.MaxInt32, // set to max try to disable compaction + L0StopWritesThreshold: math.MaxInt32, // set to max try to disable compaction + DisableWAL: true, + ReadOnly: false, + } + db, tmpPath := makePebbleDB(t, opt) + + _, engineUUID := backend.MakeUUID("ww", 0) + engineCtx, cancel := context.WithCancel(context.Background()) + f := &Engine{ + UUID: engineUUID, + sstDir: tmpPath, + ctx: engineCtx, + cancel: cancel, + sstMetasChan: make(chan metaOrFlush, 64), + keyAdapter: noopKeyAdapter{}, + logger: log.L(), + } + f.db.Store(db) + // simulate import + f.lock(importMutexStateImport) + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + engineFileSize := f.getEngineFileSize() + require.Equal(t, f.UUID, engineFileSize.UUID) + require.True(t, engineFileSize.IsImporting) + }() + wg.Wait() + f.unlock() + require.NoError(t, f.Close()) +} + func TestIngestSSTWithClosedEngine(t *testing.T) { opt := &pebble.Options{ MemTableSize: 1024 * 1024, @@ -55,7 +94,6 @@ func TestIngestSSTWithClosedEngine(t *testing.T) { _, engineUUID := backend.MakeUUID("ww", 0) engineCtx, cancel := context.WithCancel(context.Background()) f := &Engine{ - db: db, UUID: engineUUID, sstDir: tmpPath, ctx: engineCtx, @@ -64,6 +102,7 @@ func TestIngestSSTWithClosedEngine(t *testing.T) { keyAdapter: noopKeyAdapter{}, logger: log.L(), } + f.db.Store(db) f.sstIngester = dbSSTIngester{e: f} sstPath := path.Join(tmpPath, uuid.New().String()+".sst") file, err := os.Create(sstPath) @@ -93,9 +132,9 @@ func TestIngestSSTWithClosedEngine(t *testing.T) { func TestGetFirstAndLastKey(t *testing.T) { db, tmpPath := makePebbleDB(t, nil) f := &Engine{ - db: db, sstDir: tmpPath, } + f.db.Store(db) err := db.Set([]byte("a"), []byte("a"), nil) require.NoError(t, err) err = db.Set([]byte("c"), []byte("c"), nil) diff --git a/br/pkg/lightning/backend/local/local.go b/br/pkg/lightning/backend/local/local.go index 941a555afbb29..8ff1afeddc594 100644 --- a/br/pkg/lightning/backend/local/local.go +++ b/br/pkg/lightning/backend/local/local.go @@ -892,7 +892,7 @@ func (local *Backend) OpenEngine(ctx context.Context, cfg *backend.EngineConfig, logger: log.FromContext(ctx), }) engine := e.(*Engine) - engine.db = db + engine.db.Store(db) engine.sstIngester = dbSSTIngester{e: engine} if err = engine.loadEngineMeta(); err != nil { return errors.Trace(err) @@ -931,7 +931,6 @@ func (local *Backend) CloseEngine(ctx context.Context, cfg *backend.EngineConfig } engine := &Engine{ UUID: engineUUID, - db: db, sstMetasChan: make(chan metaOrFlush), tableInfo: cfg.TableInfo, keyAdapter: local.keyAdapter, @@ -940,6 +939,7 @@ func (local *Backend) CloseEngine(ctx context.Context, cfg *backend.EngineConfig duplicateDB: local.duplicateDB, logger: log.FromContext(ctx), } + engine.db.Store(db) engine.sstIngester = dbSSTIngester{e: engine} if err = engine.loadEngineMeta(); err != nil { return err @@ -1035,7 +1035,7 @@ func (local *Backend) readAndSplitIntoRange( } logger := log.FromContext(ctx).With(zap.Stringer("engine", engine.UUID)) - sizeProps, err := getSizePropertiesFn(logger, engine.db, local.keyAdapter) + sizeProps, err := getSizePropertiesFn(logger, engine.getDB(), local.keyAdapter) if err != nil { return nil, errors.Trace(err) } @@ -1124,7 +1124,7 @@ func (local *Backend) generateAndSendJob( // when use dynamic region feature, the region may be very big, we need // to split to smaller ranges to increase the concurrency. if regionSplitSize > 2*int64(config.SplitRegionSize) { - sizeProps, err := getSizePropertiesFn(logger, engine.db, local.keyAdapter) + sizeProps, err := getSizePropertiesFn(logger, engine.getDB(), local.keyAdapter) if err != nil { return errors.Trace(err) } @@ -1594,7 +1594,7 @@ func (local *Backend) ResetEngine(ctx context.Context, engineUUID uuid.UUID) err } db, err := local.openEngineDB(engineUUID, false) if err == nil { - localEngine.db = db + localEngine.db.Store(db) localEngine.engineMeta = engineMeta{} if !common.IsDirExists(localEngine.sstDir) { if err := os.Mkdir(localEngine.sstDir, 0o750); err != nil { diff --git a/br/pkg/lightning/backend/local/local_test.go b/br/pkg/lightning/backend/local/local_test.go index ddd1c17ff2d34..ed98115371f43 100644 --- a/br/pkg/lightning/backend/local/local_test.go +++ b/br/pkg/lightning/backend/local/local_test.go @@ -328,7 +328,6 @@ func testLocalWriter(t *testing.T, needSort bool, partitialSort bool) { _, engineUUID := backend.MakeUUID("ww", 0) engineCtx, cancel := context.WithCancel(context.Background()) f := &Engine{ - db: db, UUID: engineUUID, sstDir: tmpPath, ctx: engineCtx, @@ -337,6 +336,7 @@ func testLocalWriter(t *testing.T, needSort bool, partitialSort bool) { keyAdapter: noopKeyAdapter{}, logger: log.L(), } + f.db.Store(db) f.sstIngester = dbSSTIngester{e: f} f.wg.Add(1) go f.ingestSSTLoop() @@ -480,7 +480,6 @@ func TestLocalIngestLoop(t *testing.T) { _, engineUUID := backend.MakeUUID("ww", 0) engineCtx, cancel := context.WithCancel(context.Background()) f := Engine{ - db: db, UUID: engineUUID, sstDir: tmpPath, ctx: engineCtx, @@ -493,6 +492,7 @@ func TestLocalIngestLoop(t *testing.T) { }, logger: log.L(), } + f.db.Store(db) f.sstIngester = testIngester{} f.wg.Add(1) go f.ingestSSTLoop() @@ -570,7 +570,6 @@ func testMergeSSTs(t *testing.T, kvs [][]common.KvPair, meta *sstMeta) { engineCtx, cancel := context.WithCancel(context.Background()) f := &Engine{ - db: db, UUID: engineUUID, sstDir: tmpPath, ctx: engineCtx, @@ -583,6 +582,7 @@ func testMergeSSTs(t *testing.T, kvs [][]common.KvPair, meta *sstMeta) { }, logger: log.L(), } + f.db.Store(db) createSSTWriter := func() (*sstWriter, error) { path := filepath.Join(f.sstDir, uuid.New().String()+".sst") @@ -1152,7 +1152,6 @@ func TestCheckPeersBusy(t *testing.T) { _, engineUUID := backend.MakeUUID("ww", 0) engineCtx, cancel2 := context.WithCancel(context.Background()) f := &Engine{ - db: db, UUID: engineUUID, sstDir: tmpPath, ctx: engineCtx, @@ -1161,9 +1160,10 @@ func TestCheckPeersBusy(t *testing.T) { keyAdapter: noopKeyAdapter{}, logger: log.L(), } - err := f.db.Set([]byte("a"), []byte("a"), nil) + f.db.Store(db) + err := db.Set([]byte("a"), []byte("a"), nil) require.NoError(t, err) - err = f.db.Set([]byte("b"), []byte("b"), nil) + err = db.Set([]byte("b"), []byte("b"), nil) require.NoError(t, err) jobCh := make(chan *regionJob, 10) @@ -1288,7 +1288,6 @@ func TestNotLeaderErrorNeedUpdatePeers(t *testing.T) { _, engineUUID := backend.MakeUUID("ww", 0) engineCtx, cancel2 := context.WithCancel(context.Background()) f := &Engine{ - db: db, UUID: engineUUID, sstDir: tmpPath, ctx: engineCtx, @@ -1297,7 +1296,8 @@ func TestNotLeaderErrorNeedUpdatePeers(t *testing.T) { keyAdapter: noopKeyAdapter{}, logger: log.L(), } - err := f.db.Set([]byte("a"), []byte("a"), nil) + f.db.Store(db) + err := db.Set([]byte("a"), []byte("a"), nil) require.NoError(t, err) jobCh := make(chan *regionJob, 10) @@ -1395,7 +1395,6 @@ func TestPartialWriteIngestErrorWillPanic(t *testing.T) { _, engineUUID := backend.MakeUUID("ww", 0) engineCtx, cancel2 := context.WithCancel(context.Background()) f := &Engine{ - db: db, UUID: engineUUID, sstDir: tmpPath, ctx: engineCtx, @@ -1404,9 +1403,10 @@ func TestPartialWriteIngestErrorWillPanic(t *testing.T) { keyAdapter: noopKeyAdapter{}, logger: log.L(), } - err := f.db.Set([]byte("a"), []byte("a"), nil) + f.db.Store(db) + err := db.Set([]byte("a"), []byte("a"), nil) require.NoError(t, err) - err = f.db.Set([]byte("a2"), []byte("a2"), nil) + err = db.Set([]byte("a2"), []byte("a2"), nil) require.NoError(t, err) jobCh := make(chan *regionJob, 10) @@ -1502,7 +1502,6 @@ func TestPartialWriteIngestBusy(t *testing.T) { _, engineUUID := backend.MakeUUID("ww", 0) engineCtx, cancel2 := context.WithCancel(context.Background()) f := &Engine{ - db: db, UUID: engineUUID, sstDir: tmpPath, ctx: engineCtx, @@ -1511,9 +1510,10 @@ func TestPartialWriteIngestBusy(t *testing.T) { keyAdapter: noopKeyAdapter{}, logger: log.L(), } - err := f.db.Set([]byte("a"), []byte("a"), nil) + f.db.Store(db) + err := db.Set([]byte("a"), []byte("a"), nil) require.NoError(t, err) - err = f.db.Set([]byte("a2"), []byte("a2"), nil) + err = db.Set([]byte("a2"), []byte("a2"), nil) require.NoError(t, err) jobCh := make(chan *regionJob, 10) @@ -1641,7 +1641,6 @@ func TestSplitRangeAgain4BigRegion(t *testing.T) { ctx := context.Background() engineCtx, cancel := context.WithCancel(context.Background()) f := &Engine{ - db: db, UUID: engineUUID, sstDir: tmpPath, ctx: engineCtx, @@ -1650,11 +1649,12 @@ func TestSplitRangeAgain4BigRegion(t *testing.T) { keyAdapter: noopKeyAdapter{}, logger: log.L(), } + f.db.Store(db) // keys starts with 0 is meta keys, so we start with 1. for i := byte(1); i <= 10; i++ { - err := f.db.Set([]byte{i}, []byte{i}, nil) + err := db.Set([]byte{i}, []byte{i}, nil) require.NoError(t, err) - err = f.db.Set([]byte{i, 1}, []byte{i, 1}, nil) + err = db.Set([]byte{i, 1}, []byte{i, 1}, nil) require.NoError(t, err) }