@@ -109,7 +109,7 @@ type DB struct {
109109 lc * levelsController
110110 vlog valueLog
111111 writeCh chan * request
112- flushChan chan flushTask // For flushing memtables.
112+ flushChan chan * memTable // For flushing memtables.
113113 closeOnce sync.Once // For closing DB only once.
114114
115115 blockWrites int32
@@ -240,7 +240,7 @@ func Open(opt Options) (*DB, error) {
240240
241241 db := & DB {
242242 imm : make ([]* memTable , 0 , opt .NumMemtables ),
243- flushChan : make (chan flushTask , opt .NumMemtables ),
243+ flushChan : make (chan * memTable , opt .NumMemtables ),
244244 writeCh : make (chan * request , kvWriteChCapacity ),
245245 opt : opt ,
246246 manifest : manifestFile ,
@@ -351,11 +351,11 @@ func Open(opt Options) (*DB, error) {
351351
352352 db .closers .memtable = z .NewCloser (1 )
353353 go func () {
354- _ = db .flushMemtable (db .closers .memtable ) // Need levels controller to be up.
354+ db .flushMemtable (db .closers .memtable ) // Need levels controller to be up.
355355 }()
356356 // Flush them to disk asap.
357357 for _ , mt := range db .imm {
358- db .flushChan <- flushTask { mt : mt }
358+ db .flushChan <- mt
359359 }
360360 }
361361 // We do increment nextTxnTs below. So, no need to do it here.
@@ -568,12 +568,12 @@ func (db *DB) close() (err error) {
568568 } else {
569569 db .opt .Debugf ("Flushing memtable" )
570570 for {
571- pushedFlushTask := func () bool {
571+ pushedMemTable := func () bool {
572572 db .lock .Lock ()
573573 defer db .lock .Unlock ()
574574 y .AssertTrue (db .mt != nil )
575575 select {
576- case db .flushChan <- flushTask { mt : db .mt } :
576+ case db .flushChan <- db .mt :
577577 db .imm = append (db .imm , db .mt ) // Flusher will attempt to remove this from s.imm.
578578 db .mt = nil // Will segfault if we try writing!
579579 db .opt .Debugf ("pushed to flush chan\n " )
@@ -586,7 +586,7 @@ func (db *DB) close() (err error) {
586586 }
587587 return false
588588 }()
589- if pushedFlushTask {
589+ if pushedMemTable {
590590 break
591591 }
592592 time .Sleep (10 * time .Millisecond )
@@ -826,6 +826,7 @@ func (db *DB) writeRequests(reqs []*request) error {
826826 }
827827 count += len (b .Entries )
828828 var i uint64
829+ var err error
829830 for err = db .ensureRoomForWrite (); err == errNoRoom ; err = db .ensureRoomForWrite () {
830831 i ++
831832 if i % 100 == 0 {
@@ -987,7 +988,7 @@ func (db *DB) ensureRoomForWrite() error {
987988 }
988989
989990 select {
990- case db .flushChan <- flushTask { mt : db .mt } :
991+ case db .flushChan <- db .mt :
991992 db .opt .Debugf ("Flushing memtable, mt.size=%d size of flushChan: %d\n " ,
992993 db .mt .sl .MemSize (), len (db .flushChan ))
993994 // We manage to push this task. Let's modify imm.
@@ -1009,12 +1010,12 @@ func arenaSize(opt Options) int64 {
10091010}
10101011
10111012// buildL0Table builds a new table from the memtable.
1012- func buildL0Table (ft flushTask , bopts table.Options ) * table.Builder {
1013- iter := ft .mt .sl .NewIterator ()
1013+ func buildL0Table (iter y.Iterator , dropPrefixes [][]byte , bopts table.Options ) * table.Builder {
10141014 defer iter .Close ()
1015+
10151016 b := table .NewTableBuilder (bopts )
1016- for iter .SeekToFirst (); iter .Valid (); iter .Next () {
1017- if len (ft . dropPrefixes ) > 0 && hasAnyPrefixes (iter .Key (), ft . dropPrefixes ) {
1017+ for iter .Rewind (); iter .Valid (); iter .Next () {
1018+ if len (dropPrefixes ) > 0 && hasAnyPrefixes (iter .Key (), dropPrefixes ) {
10181019 continue
10191020 }
10201021 vs := iter .Value ()
@@ -1024,23 +1025,14 @@ func buildL0Table(ft flushTask, bopts table.Options) *table.Builder {
10241025 }
10251026 b .Add (iter .Key (), iter .Value (), vp .Len )
10261027 }
1027- return b
1028- }
10291028
1030- type flushTask struct {
1031- mt * memTable
1032- dropPrefixes [][]byte
1029+ return b
10331030}
10341031
1035- // handleFlushTask must be run serially.
1036- func (db * DB ) handleFlushTask (ft flushTask ) error {
1037- // There can be a scenario, when empty memtable is flushed.
1038- if ft .mt .sl .Empty () {
1039- return nil
1040- }
1041-
1032+ // handleMemTableFlush must be run serially.
1033+ func (db * DB ) handleMemTableFlush (itr y.Iterator , dropPrefixes [][]byte ) error {
10421034 bopts := buildTableOptions (db )
1043- builder := buildL0Table (ft , bopts )
1035+ builder := buildL0Table (itr , nil , bopts )
10441036 defer builder .Close ()
10451037
10461038 // buildL0Table can return nil if the none of the items in the skiplist are
@@ -1069,39 +1061,36 @@ func (db *DB) handleFlushTask(ft flushTask) error {
10691061 return err
10701062}
10711063
1072- // flushMemtable must keep running until we send it an empty flushTask . If there
1073- // are errors during handling the flush task , we'll retry indefinitely.
1074- func (db * DB ) flushMemtable (lc * z.Closer ) error {
1064+ // flushMemtable must keep running until we send it an empty memtable . If there
1065+ // are errors during handling the memtable flush , we'll retry indefinitely.
1066+ func (db * DB ) flushMemtable (lc * z.Closer ) {
10751067 defer lc .Done ()
10761068
1077- for ft := range db .flushChan {
1078- if ft .mt == nil {
1079- // We close db.flushChan now, instead of sending a nil ft.mt.
1080- continue
1081- }
1069+ for mt := range db .flushChan {
1070+ itr := mt .sl .NewUniIterator (false )
10821071 for {
1083- err := db .handleFlushTask (ft )
1084- if err == nil {
1085- // Update s.imm. Need a lock.
1086- db .lock .Lock ()
1087- // This is a single-threaded operation. ft.mt corresponds to the head of
1088- // db.imm list. Once we flush it, we advance db.imm. The next ft.mt
1089- // which would arrive here would match db.imm[0], because we acquire a
1090- // lock over DB when pushing to flushChan.
1091- // TODO: This logic is dirty AF. Any change and this could easily break.
1092- y .AssertTrue (ft .mt == db .imm [0 ])
1093- db .imm = db .imm [1 :]
1094- ft .mt .DecrRef () // Return memory.
1095- db .lock .Unlock ()
1096-
1097- break
1072+ if err := db .handleMemTableFlush (itr , nil ); err != nil {
1073+ // Encountered error. Retry indefinitely.
1074+ db .opt .Errorf ("error flushing memtable to disk: %v, retrying" , err )
1075+ time .Sleep (time .Second )
1076+ continue
10981077 }
1099- // Encountered error. Retry indefinitely.
1100- db .opt .Errorf ("Failure while flushing memtable to disk: %v. Retrying...\n " , err )
1101- time .Sleep (time .Second )
1078+
1079+ // Update s.imm. Need a lock.
1080+ db .lock .Lock ()
1081+ // This is a single-threaded operation. mt corresponds to the head of
1082+ // db.imm list. Once we flush it, we advance db.imm. The next mt
1083+ // which would arrive here would match db.imm[0], because we acquire a
1084+ // lock over DB when pushing to flushChan.
1085+ // TODO: This logic is dirty AF. Any change and this could easily break.
1086+ y .AssertTrue (mt == db .imm [0 ])
1087+ db .imm = db .imm [1 :]
1088+ mt .DecrRef () // Return memory.
1089+ // unlock
1090+ db .lock .Unlock ()
1091+ break
11021092 }
11031093 }
1104- return nil
11051094}
11061095
11071096func exists (path string ) (bool , error ) {
@@ -1521,10 +1510,10 @@ func (db *DB) startCompactions() {
15211510func (db * DB ) startMemoryFlush () {
15221511 // Start memory fluhser.
15231512 if db .closers .memtable != nil {
1524- db .flushChan = make (chan flushTask , db .opt .NumMemtables )
1513+ db .flushChan = make (chan * memTable , db .opt .NumMemtables )
15251514 db .closers .memtable = z .NewCloser (1 )
15261515 go func () {
1527- _ = db .flushMemtable (db .closers .memtable )
1516+ db .flushMemtable (db .closers .memtable )
15281517 }()
15291518 }
15301519}
@@ -1627,7 +1616,7 @@ func (db *DB) prepareToDrop() (func(), error) {
16271616 panic ("Attempting to drop data in read-only mode." )
16281617 }
16291618 // In order prepare for drop, we need to block the incoming writes and
1630- // write it to db. Then, flush all the pending flushtask . So that, we
1619+ // write it to db. Then, flush all the pending memtable . So that, we
16311620 // don't miss any entries.
16321621 if err := db .blockWrite (); err != nil {
16331622 return nil , err
@@ -1676,7 +1665,7 @@ func (db *DB) dropAll() (func(), error) {
16761665 if err != nil {
16771666 return f , err
16781667 }
1679- // prepareToDrop will stop all the incomming write and flushes any pending flush tasks .
1668+ // prepareToDrop will stop all the incomming write and flushes any pending memtables .
16801669 // Before we drop, we'll stop the compaction because anyways all the datas are going to
16811670 // be deleted.
16821671 db .stopCompactions ()
@@ -1758,13 +1747,9 @@ func (db *DB) DropPrefix(prefixes ...[]byte) error {
17581747 memtable .DecrRef ()
17591748 continue
17601749 }
1761- task := flushTask {
1762- mt : memtable ,
1763- // Ensure that the head of value log gets persisted to disk.
1764- dropPrefixes : filtered ,
1765- }
1750+ itr := memtable .sl .NewUniIterator (false )
17661751 db .opt .Debugf ("Flushing memtable" )
1767- if err := db .handleFlushTask ( task ); err != nil {
1752+ if err := db .handleMemTableFlush ( itr , filtered ); err != nil {
17681753 db .opt .Errorf ("While trying to flush memtable: %v" , err )
17691754 return err
17701755 }
0 commit comments