Skip to content

Commit 6bf2898

Browse files
committed
feat(stream): add support for incremental stream writer (#1722)
This PR adds support for stream writing incrementally to the DB. Adds an API: StreamWriter.PrepareIncremental This PR also has the bug fix from PR #1723.
1 parent 1ab2de4 commit 6bf2898

File tree

3 files changed

+127
-8
lines changed

3 files changed

+127
-8
lines changed

db.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1630,7 +1630,7 @@ func (db *DB) prepareToDrop() (func(), error) {
16301630
// write it to db. Then, flush all the pending flushtask. So that, we
16311631
// don't miss any entries.
16321632
if err := db.blockWrite(); err != nil {
1633-
return nil, err
1633+
return func() {}, err
16341634
}
16351635
reqs := make([]*request, 0, 10)
16361636
for {

stream_writer.go

Lines changed: 50 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -75,18 +75,58 @@ func (db *DB) NewStreamWriter() *StreamWriter {
7575
// Prepare should be called before writing any entry to StreamWriter. It deletes all data present in
7676
// existing DB, stops compactions and any writes being done by other means. Be very careful when
7777
// calling Prepare, because it could result in permanent data loss. Not calling Prepare would result
78-
// in a corrupt Badger instance.
78+
// in a corrupt Badger instance. Use PrepareIncremental to do incremental stream write.
7979
func (sw *StreamWriter) Prepare() error {
8080
sw.writeLock.Lock()
8181
defer sw.writeLock.Unlock()
8282

8383
done, err := sw.db.dropAll()
84+
// Ensure that done() is never called more than once.
85+
var once sync.Once
86+
sw.done = func() { once.Do(done) }
87+
return err
88+
}
89+
90+
// PrepareIncremental should be called before writing any entry to StreamWriter incrementally.
91+
// In incremental stream write, the tables are written at one level above the current base level.
92+
func (sw *StreamWriter) PrepareIncremental() error {
93+
sw.writeLock.Lock()
94+
defer sw.writeLock.Unlock()
8495

8596
// Ensure that done() is never called more than once.
8697
var once sync.Once
98+
99+
// prepareToDrop will stop all the incoming writes and process any pending flush tasks.
100+
// Before we start writing, we'll stop the compactions because no one else should be writing to
101+
// the same level as the stream writer is writing to.
102+
f, err := sw.db.prepareToDrop()
103+
if err != nil {
104+
sw.done = func() { once.Do(f) }
105+
return err
106+
}
107+
sw.db.stopCompactions()
108+
done := func() {
109+
sw.db.startCompactions()
110+
f()
111+
}
87112
sw.done = func() { once.Do(done) }
88113

89-
return err
114+
isEmptyDB := true
115+
for _, level := range sw.db.Levels() {
116+
if level.NumTables > 0 {
117+
sw.prevLevel = level.Level
118+
isEmptyDB = false
119+
break
120+
}
121+
}
122+
if isEmptyDB {
123+
// If DB is empty, we should allow doing incremental stream write.
124+
return nil
125+
}
126+
if sw.prevLevel == 0 {
127+
return fmt.Errorf("Unable to do incremental writes because L0 has data")
128+
}
129+
return nil
90130
}
91131

92132
// Write writes KVList to DB. Each KV within the list contains the stream id which StreamWriter
@@ -170,11 +210,6 @@ func (sw *StreamWriter) Write(buf *z.Buffer) error {
170210
}
171211

172212
sw.processingKeys = true
173-
if sw.prevLevel == 0 {
174-
// If prevLevel is 0, that means that we have not written anything yet. Equivalently,
175-
// we were virtually writing to the maxLevel+1.
176-
sw.prevLevel = len(sw.db.lc.levels)
177-
}
178213
var meta, userMeta byte
179214
if len(kv.Meta) > 0 {
180215
meta = kv.Meta[0]
@@ -221,6 +256,14 @@ func (sw *StreamWriter) Write(buf *z.Buffer) error {
221256
return err
222257
}
223258

259+
// Moved this piece of code to within the lock.
260+
if sw.prevLevel == 0 {
261+
// If prevLevel is 0, that means that we have not written anything yet.
262+
// So, we can write to the maxLevel. newWriter writes to prevLevel - 1,
263+
// so we can set prevLevel to len(levels).
264+
sw.prevLevel = len(sw.db.lc.levels)
265+
}
266+
224267
for streamID, req := range streamReqs {
225268
writer, ok := sw.writers[streamID]
226269
if !ok {

stream_writer_test.go

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -600,3 +600,79 @@ func TestStreamWriterWithLargeValue(t *testing.T) {
600600
require.NoError(t, sw.Flush(), "sw.Flush() failed")
601601
})
602602
}
603+
604+
func TestStreamWriterIncremental(t *testing.T) {
605+
addIncremtal := func(t *testing.T, db *DB, keys [][]byte) {
606+
buf := z.NewBuffer(10<<20, "test")
607+
defer func() { require.NoError(t, buf.Release()) }()
608+
for _, key := range keys {
609+
KVToBuffer(&pb.KV{
610+
Key: key,
611+
Value: []byte("val"),
612+
Version: 1,
613+
}, buf)
614+
}
615+
// Now do an incremental stream write.
616+
sw := db.NewStreamWriter()
617+
require.NoError(t, sw.PrepareIncremental(), "sw.PrepareIncremental() failed")
618+
require.NoError(t, sw.Write(buf), "sw.Write() failed")
619+
require.NoError(t, sw.Flush(), "sw.Flush() failed")
620+
}
621+
622+
t.Run("incremental on non-empty DB", func(t *testing.T) {
623+
runBadgerTest(t, nil, func(t *testing.T, db *DB) {
624+
buf := z.NewBuffer(10<<20, "test")
625+
defer func() { require.NoError(t, buf.Release()) }()
626+
KVToBuffer(&pb.KV{
627+
Key: []byte("key-1"),
628+
Value: []byte("val"),
629+
Version: 1,
630+
}, buf)
631+
sw := db.NewStreamWriter()
632+
require.NoError(t, sw.Prepare(), "sw.Prepare() failed")
633+
require.NoError(t, sw.Write(buf), "sw.Write() failed")
634+
require.NoError(t, sw.Flush(), "sw.Flush() failed")
635+
636+
addIncremtal(t, db, [][]byte{[]byte("key-2")})
637+
638+
txn := db.NewTransaction(false)
639+
defer txn.Discard()
640+
_, err := txn.Get([]byte("key-1"))
641+
require.NoError(t, err)
642+
_, err = txn.Get([]byte("key-2"))
643+
require.NoError(t, err)
644+
})
645+
})
646+
647+
t.Run("incremental on empty DB", func(t *testing.T) {
648+
runBadgerTest(t, nil, func(t *testing.T, db *DB) {
649+
addIncremtal(t, db, [][]byte{[]byte("key-1")})
650+
txn := db.NewTransaction(false)
651+
defer txn.Discard()
652+
_, err := txn.Get([]byte("key-1"))
653+
require.NoError(t, err)
654+
})
655+
})
656+
657+
t.Run("multiple incremental", func(t *testing.T) {
658+
runBadgerTest(t, nil, func(t *testing.T, db *DB) {
659+
addIncremtal(t, db, [][]byte{[]byte("a1"), []byte("c1")})
660+
addIncremtal(t, db, [][]byte{[]byte("a2"), []byte("c2")})
661+
addIncremtal(t, db, [][]byte{[]byte("a3"), []byte("c3")})
662+
txn := db.NewTransaction(false)
663+
defer txn.Discard()
664+
_, err := txn.Get([]byte("a1"))
665+
require.NoError(t, err)
666+
_, err = txn.Get([]byte("c1"))
667+
require.NoError(t, err)
668+
_, err = txn.Get([]byte("a2"))
669+
require.NoError(t, err)
670+
_, err = txn.Get([]byte("c2"))
671+
require.NoError(t, err)
672+
_, err = txn.Get([]byte("a3"))
673+
require.NoError(t, err)
674+
_, err = txn.Get([]byte("c3"))
675+
require.NoError(t, err)
676+
})
677+
})
678+
}

0 commit comments

Comments
 (0)