@@ -102,7 +102,7 @@ type DB struct {
102102 lc * levelsController
103103 vlog valueLog
104104 writeCh chan * request
105- flushChan chan * memTable // For flushing memtables.
105+ flushChan chan flushTask // For flushing memtables.
106106 closeOnce sync.Once // For closing DB only once.
107107
108108 blockWrites atomic.Int32
@@ -233,7 +233,7 @@ func Open(opt Options) (*DB, error) {
233233
234234 db := & DB {
235235 imm : make ([]* memTable , 0 , opt .NumMemtables ),
236- flushChan : make (chan * memTable , opt .NumMemtables ),
236+ flushChan : make (chan flushTask , opt .NumMemtables ),
237237 writeCh : make (chan * request , kvWriteChCapacity ),
238238 opt : opt ,
239239 manifest : manifestFile ,
@@ -347,11 +347,11 @@ func Open(opt Options) (*DB, error) {
347347
348348 db .closers .memtable = z .NewCloser (1 )
349349 go func () {
350- db .flushMemtable (db .closers .memtable ) // Need levels controller to be up.
350+ _ = db .flushMemtable (db .closers .memtable ) // Need levels controller to be up.
351351 }()
352352 // Flush them to disk asap.
353353 for _ , mt := range db .imm {
354- db .flushChan <- mt
354+ db .flushChan <- flushTask { mt : mt }
355355 }
356356 }
357357 // We do increment nextTxnTs below. So, no need to do it here.
@@ -565,12 +565,12 @@ func (db *DB) close() (err error) {
565565 } else {
566566 db .opt .Debugf ("Flushing memtable" )
567567 for {
568- pushedMemTable := func () bool {
568+ pushedFlushTask := func () bool {
569569 db .lock .Lock ()
570570 defer db .lock .Unlock ()
571571 y .AssertTrue (db .mt != nil )
572572 select {
573- case db .flushChan <- db .mt :
573+ case db .flushChan <- flushTask { mt : db .mt } :
574574 db .imm = append (db .imm , db .mt ) // Flusher will attempt to remove this from s.imm.
575575 db .mt = nil // Will segfault if we try writing!
576576 db .opt .Debugf ("pushed to flush chan\n " )
@@ -583,7 +583,7 @@ func (db *DB) close() (err error) {
583583 }
584584 return false
585585 }()
586- if pushedMemTable {
586+ if pushedFlushTask {
587587 break
588588 }
589589 time .Sleep (10 * time .Millisecond )
@@ -852,7 +852,6 @@ func (db *DB) writeRequests(reqs []*request) error {
852852 }
853853 count += len (b .Entries )
854854 var i uint64
855- var err error
856855 for err = db .ensureRoomForWrite (); err == errNoRoom ; err = db .ensureRoomForWrite () {
857856 i ++
858857 if i % 100 == 0 {
@@ -1019,7 +1018,7 @@ func (db *DB) ensureRoomForWrite() error {
10191018 }
10201019
10211020 select {
1022- case db .flushChan <- db .mt :
1021+ case db .flushChan <- flushTask { mt : db .mt } :
10231022 db .opt .Debugf ("Flushing memtable, mt.size=%d size of flushChan: %d\n " ,
10241023 db .mt .sl .MemSize (), len (db .flushChan ))
10251024 // We manage to push this task. Let's modify imm.
@@ -1041,12 +1040,12 @@ func arenaSize(opt Options) int64 {
10411040}
10421041
10431042// buildL0Table builds a new table from the memtable.
1044- func buildL0Table (iter y.Iterator , dropPrefixes [][]byte , bopts table.Options ) * table.Builder {
1043+ func buildL0Table (ft flushTask , bopts table.Options ) * table.Builder {
1044+ iter := ft .mt .sl .NewIterator ()
10451045 defer iter .Close ()
1046-
10471046 b := table .NewTableBuilder (bopts )
1048- for iter .Rewind (); iter .Valid (); iter .Next () {
1049- if len (dropPrefixes ) > 0 && hasAnyPrefixes (iter .Key (), dropPrefixes ) {
1047+ for iter .SeekToFirst (); iter .Valid (); iter .Next () {
1048+ if len (ft . dropPrefixes ) > 0 && hasAnyPrefixes (iter .Key (), ft . dropPrefixes ) {
10501049 continue
10511050 }
10521051 vs := iter .Value ()
@@ -1056,15 +1055,23 @@ func buildL0Table(iter y.Iterator, dropPrefixes [][]byte, bopts table.Options) *
10561055 }
10571056 b .Add (iter .Key (), iter .Value (), vp .Len )
10581057 }
1059-
10601058 return b
10611059}
10621060
1063- // handleMemTableFlush must be run serially.
1064- func (db * DB ) handleMemTableFlush (mt * memTable , dropPrefixes [][]byte ) error {
1061+ type flushTask struct {
1062+ mt * memTable
1063+ dropPrefixes [][]byte
1064+ }
1065+
1066+ // handleFlushTask must be run serially.
1067+ func (db * DB ) handleFlushTask (ft flushTask ) error {
1068+ // There can be a scenario, when empty memtable is flushed.
1069+ if ft .mt .sl .Empty () {
1070+ return nil
1071+ }
1072+
10651073 bopts := buildTableOptions (db )
1066- itr := mt .sl .NewUniIterator (false )
1067- builder := buildL0Table (itr , nil , bopts )
1074+ builder := buildL0Table (ft , bopts )
10681075 defer builder .Close ()
10691076
10701077 // buildL0Table can return nil if the none of the items in the skiplist are
@@ -1093,39 +1100,39 @@ func (db *DB) handleMemTableFlush(mt *memTable, dropPrefixes [][]byte) error {
10931100 return err
10941101}
10951102
1096- // flushMemtable must keep running until we send it an empty memtable . If there
1097- // are errors during handling the memtable flush, we'll retry indefinitely.
1098- func (db * DB ) flushMemtable (lc * z.Closer ) {
1103+ // flushMemtable must keep running until we send it an empty flushTask . If there
1104+ // are errors during handling the flush task , we'll retry indefinitely.
1105+ func (db * DB ) flushMemtable (lc * z.Closer ) error {
10991106 defer lc .Done ()
11001107
1101- for mt := range db .flushChan {
1102- if mt == nil {
1108+ for ft := range db .flushChan {
1109+ if ft .mt == nil {
1110+ // We close db.flushChan now, instead of sending a nil ft.mt.
11031111 continue
11041112 }
1105-
11061113 for {
1107- if err := db .handleMemTableFlush (mt , nil ); err != nil {
1108- // Encountered error. Retry indefinitely.
1109- db .opt .Errorf ("error flushing memtable to disk: %v, retrying" , err )
1110- time .Sleep (time .Second )
1111- continue
1112- }
1114+ err := db .handleFlushTask (ft )
1115+ if err == nil {
1116+ // Update s.imm. Need a lock.
1117+ db .lock .Lock ()
1118+ // This is a single-threaded operation. ft.mt corresponds to the head of
1119+ // db.imm list. Once we flush it, we advance db.imm. The next ft.mt
1120+ // which would arrive here would match db.imm[0], because we acquire a
1121+ // lock over DB when pushing to flushChan.
1122+ // TODO: This logic is dirty AF. Any change and this could easily break.
1123+ y .AssertTrue (ft .mt == db .imm [0 ])
1124+ db .imm = db .imm [1 :]
1125+ ft .mt .DecrRef () // Return memory.
1126+ db .lock .Unlock ()
11131127
1114- // Update s.imm. Need a lock.
1115- db .lock .Lock ()
1116- // This is a single-threaded operation. mt corresponds to the head of
1117- // db.imm list. Once we flush it, we advance db.imm. The next mt
1118- // which would arrive here would match db.imm[0], because we acquire a
1119- // lock over DB when pushing to flushChan.
1120- // TODO: This logic is dirty AF. Any change and this could easily break.
1121- y .AssertTrue (mt == db .imm [0 ])
1122- db .imm = db .imm [1 :]
1123- mt .DecrRef () // Return memory.
1124- // unlock
1125- db .lock .Unlock ()
1126- break
1128+ break
1129+ }
1130+ // Encountered error. Retry indefinitely.
1131+ db .opt .Errorf ("Failure while flushing memtable to disk: %v. Retrying...\n " , err )
1132+ time .Sleep (time .Second )
11271133 }
11281134 }
1135+ return nil
11291136}
11301137
11311138func exists (path string ) (bool , error ) {
@@ -1545,10 +1552,10 @@ func (db *DB) startCompactions() {
15451552func (db * DB ) startMemoryFlush () {
15461553 // Start memory fluhser.
15471554 if db .closers .memtable != nil {
1548- db .flushChan = make (chan * memTable , db .opt .NumMemtables )
1555+ db .flushChan = make (chan flushTask , db .opt .NumMemtables )
15491556 db .closers .memtable = z .NewCloser (1 )
15501557 go func () {
1551- db .flushMemtable (db .closers .memtable )
1558+ _ = db .flushMemtable (db .closers .memtable )
15521559 }()
15531560 }
15541561}
@@ -1651,7 +1658,7 @@ func (db *DB) prepareToDrop() (func(), error) {
16511658 panic ("Attempting to drop data in read-only mode." )
16521659 }
16531660 // In order prepare for drop, we need to block the incoming writes and
1654- // write it to db. Then, flush all the pending memtable . So that, we
1661+ // write it to db. Then, flush all the pending flushtask . So that, we
16551662 // don't miss any entries.
16561663 if err := db .blockWrite (); err != nil {
16571664 return func () {}, err
@@ -1700,7 +1707,7 @@ func (db *DB) dropAll() (func(), error) {
17001707 if err != nil {
17011708 return f , err
17021709 }
1703- // prepareToDrop will stop all the incoming write and flushes any pending memtables .
1710+ // prepareToDrop will stop all the incomming write and flushes any pending flush tasks .
17041711 // Before we drop, we'll stop the compaction because anyways all the datas are going to
17051712 // be deleted.
17061713 db .stopCompactions ()
@@ -1782,8 +1789,13 @@ func (db *DB) DropPrefix(prefixes ...[]byte) error {
17821789 memtable .DecrRef ()
17831790 continue
17841791 }
1792+ task := flushTask {
1793+ mt : memtable ,
1794+ // Ensure that the head of value log gets persisted to disk.
1795+ dropPrefixes : filtered ,
1796+ }
17851797 db .opt .Debugf ("Flushing memtable" )
1786- if err := db .handleMemTableFlush ( memtable , filtered ); err != nil {
1798+ if err := db .handleFlushTask ( task ); err != nil {
17871799 db .opt .Errorf ("While trying to flush memtable: %v" , err )
17881800 return err
17891801 }
0 commit comments