Skip to content

Commit 725913b

Browse files
feat(stream): add support for incremental stream writer (#1722)
This PR adds support for stream writing incrementally to the DB. Adds an API: StreamWriter.PrepareIncremental Co-authored-by: Manish R Jain <manish@dgraph.io>
1 parent aaab253 commit 725913b

File tree

3 files changed

+101
-8
lines changed

3 files changed

+101
-8
lines changed

db.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1741,7 +1741,7 @@ func (db *DB) prepareToDrop() (func(), error) {
17411741
// write it to db. Then, flush all the pending flushtask. So that, we
17421742
// don't miss any entries.
17431743
if err := db.blockWrite(); err != nil {
1744-
return nil, err
1744+
return func() {}, err
17451745
}
17461746
reqs := make([]*request, 0, 10)
17471747
for {

stream_writer.go

Lines changed: 49 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -74,18 +74,57 @@ func (db *DB) NewStreamWriter() *StreamWriter {
7474
// Prepare should be called before writing any entry to StreamWriter. It deletes all data present in
7575
// existing DB, stops compactions and any writes being done by other means. Be very careful when
7676
// calling Prepare, because it could result in permanent data loss. Not calling Prepare would result
77-
// in a corrupt Badger instance.
77+
// in a corrupt Badger instance. Use PrepareIncremental to do incremental stream write.
7878
func (sw *StreamWriter) Prepare() error {
7979
sw.writeLock.Lock()
8080
defer sw.writeLock.Unlock()
8181

8282
done, err := sw.db.dropAll()
83+
// Ensure that done() is never called more than once.
84+
var once sync.Once
85+
sw.done = func() { once.Do(done) }
86+
return err
87+
}
88+
89+
// PrepareIncremental should be called before writing any entry to StreamWriter incrementally.
90+
// In incremental stream write, the tables are written at one level above the current base level.
91+
func (sw *StreamWriter) PrepareIncremental() error {
92+
sw.writeLock.Lock()
93+
defer sw.writeLock.Unlock()
8394

8495
// Ensure that done() is never called more than once.
8596
var once sync.Once
97+
98+
// prepareToDrop will stop all the incoming writes and process any pending flush tasks.
99+
// Before we start writing, we'll stop the compactions because no one else should be writing to
100+
// the same level as the stream writer is writing to.
101+
f, err := sw.db.prepareToDrop()
102+
if err != nil {
103+
sw.done = func() { once.Do(f) }
104+
return err
105+
}
106+
sw.db.stopCompactions()
107+
done := func() {
108+
sw.db.startCompactions()
109+
f()
110+
}
86111
sw.done = func() { once.Do(done) }
87112

88-
return err
113+
isEmptyDB := true
114+
for _, level := range sw.db.Levels() {
115+
if level.NumTables > 0 {
116+
sw.prevLevel = level.Level
117+
isEmptyDB = false
118+
}
119+
}
120+
if isEmptyDB {
121+
// If DB is empty, we should allow doing incremental stream write.
122+
return nil
123+
}
124+
if sw.prevLevel == 0 {
125+
return fmt.Errorf("Unable to do incremental writes because L0 has data")
126+
}
127+
return nil
89128
}
90129

91130
// Write writes KVList to DB. Each KV within the list contains the stream id which StreamWriter
@@ -169,11 +208,6 @@ func (sw *StreamWriter) Write(buf *z.Buffer) error {
169208
}
170209

171210
sw.processingKeys = true
172-
if sw.prevLevel == 0 {
173-
// If prevLevel is 0, that means that we have not written anything yet. Equivalently,
174-
// we were virtually writing to the maxLevel+1.
175-
sw.prevLevel = len(sw.db.lc.levels)
176-
}
177211
var meta, userMeta byte
178212
if len(kv.Meta) > 0 {
179213
meta = kv.Meta[0]
@@ -220,6 +254,14 @@ func (sw *StreamWriter) Write(buf *z.Buffer) error {
220254
return err
221255
}
222256

257+
// Moved this piece of code to within the lock.
258+
if sw.prevLevel == 0 {
259+
// If prevLevel is 0, that means that we have not written anything yet.
260+
// So, we can write to the maxLevel. newWriter writes to prevLevel - 1,
261+
// so we can set prevLevel to len(levels).
262+
sw.prevLevel = len(sw.db.lc.levels)
263+
}
264+
223265
for streamID, req := range streamReqs {
224266
writer, ok := sw.writers[streamID]
225267
if !ok {

stream_writer_test.go

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -602,3 +602,54 @@ func TestStreamWriterWithLargeValue(t *testing.T) {
602602
require.NoError(t, sw.Flush(), "sw.Flush() failed")
603603
})
604604
}
605+
606+
func TestStreamWriterIncremental(t *testing.T) {
607+
addIncremtal := func(t *testing.T, db *DB) {
608+
buf := z.NewBuffer(10<<20, "test")
609+
defer buf.Release()
610+
KVToBuffer(&pb.KV{
611+
Key: []byte("key-2"),
612+
Value: []byte("val"),
613+
Version: 1,
614+
}, buf)
615+
// Now do an incremental stream write.
616+
sw := db.NewStreamWriter()
617+
require.NoError(t, sw.PrepareIncremental(), "sw.PrepareIncremental() failed")
618+
require.NoError(t, sw.Write(buf), "sw.Write() failed")
619+
require.NoError(t, sw.Flush(), "sw.Flush() failed")
620+
}
621+
622+
t.Run("incremental on non-empty DB", func(t *testing.T) {
623+
runBadgerTest(t, nil, func(t *testing.T, db *DB) {
624+
buf := z.NewBuffer(10<<20, "test")
625+
defer buf.Release()
626+
KVToBuffer(&pb.KV{
627+
Key: []byte("key-1"),
628+
Value: []byte("val"),
629+
Version: 1,
630+
}, buf)
631+
sw := db.NewStreamWriter()
632+
require.NoError(t, sw.Prepare(), "sw.Prepare() failed")
633+
require.NoError(t, sw.Write(buf), "sw.Write() failed")
634+
require.NoError(t, sw.Flush(), "sw.Flush() failed")
635+
636+
addIncremtal(t, db)
637+
638+
txn := db.NewTransaction(false)
639+
defer txn.Discard()
640+
_, err := txn.Get([]byte("key-1"))
641+
require.NoError(t, err)
642+
_, err = txn.Get([]byte("key-2"))
643+
require.NoError(t, err)
644+
})
645+
})
646+
t.Run("incremental on empty DB", func(t *testing.T) {
647+
runBadgerTest(t, nil, func(t *testing.T, db *DB) {
648+
addIncremtal(t, db)
649+
txn := db.NewTransaction(false)
650+
defer txn.Discard()
651+
_, err := txn.Get([]byte("key-2"))
652+
require.NoError(t, err)
653+
})
654+
})
655+
}

0 commit comments

Comments
 (0)