Skip to content

Commit 899ca8c

Browse files
committed
feat(stream): add support for incremental stream writer (#1722)
This PR adds support for stream writing incrementally to the DB. Adds an API: StreamWriter.PrepareIncremental This PR also has the bug fix from PR #1723.
1 parent 4a70b3d commit 899ca8c

File tree

3 files changed

+127
-8
lines changed

3 files changed

+127
-8
lines changed

db.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1670,7 +1670,7 @@ func (db *DB) prepareToDrop() (func(), error) {
16701670
// write it to db. Then, flush all the pending flushtask. So that, we
16711671
// don't miss any entries.
16721672
if err := db.blockWrite(); err != nil {
1673-
return nil, err
1673+
return func() {}, err
16741674
}
16751675
reqs := make([]*request, 0, 10)
16761676
for {

stream_writer.go

Lines changed: 50 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -75,18 +75,58 @@ func (db *DB) NewStreamWriter() *StreamWriter {
7575
// Prepare should be called before writing any entry to StreamWriter. It deletes all data present in
7676
// existing DB, stops compactions and any writes being done by other means. Be very careful when
7777
// calling Prepare, because it could result in permanent data loss. Not calling Prepare would result
78-
// in a corrupt Badger instance.
78+
// in a corrupt Badger instance. Use PrepareIncremental to do incremental stream write.
7979
func (sw *StreamWriter) Prepare() error {
8080
sw.writeLock.Lock()
8181
defer sw.writeLock.Unlock()
8282

8383
done, err := sw.db.dropAll()
84+
// Ensure that done() is never called more than once.
85+
var once sync.Once
86+
sw.done = func() { once.Do(done) }
87+
return err
88+
}
89+
90+
// PrepareIncremental should be called before writing any entry to StreamWriter incrementally.
91+
// In incremental stream write, the tables are written at one level above the current base level.
92+
func (sw *StreamWriter) PrepareIncremental() error {
93+
sw.writeLock.Lock()
94+
defer sw.writeLock.Unlock()
8495

8596
// Ensure that done() is never called more than once.
8697
var once sync.Once
98+
99+
// prepareToDrop will stop all the incoming writes and process any pending flush tasks.
100+
// Before we start writing, we'll stop the compactions because no one else should be writing to
101+
// the same level as the stream writer is writing to.
102+
f, err := sw.db.prepareToDrop()
103+
if err != nil {
104+
sw.done = func() { once.Do(f) }
105+
return err
106+
}
107+
sw.db.stopCompactions()
108+
done := func() {
109+
sw.db.startCompactions()
110+
f()
111+
}
87112
sw.done = func() { once.Do(done) }
88113

89-
return err
114+
isEmptyDB := true
115+
for _, level := range sw.db.Levels() {
116+
if level.NumTables > 0 {
117+
sw.prevLevel = level.Level
118+
isEmptyDB = false
119+
break
120+
}
121+
}
122+
if isEmptyDB {
123+
// If DB is empty, we should allow doing incremental stream write.
124+
return nil
125+
}
126+
if sw.prevLevel == 0 {
127+
return fmt.Errorf("Unable to do incremental writes because L0 has data")
128+
}
129+
return nil
90130
}
91131

92132
// Write writes KVList to DB. Each KV within the list contains the stream id which StreamWriter
@@ -170,11 +210,6 @@ func (sw *StreamWriter) Write(buf *z.Buffer) error {
170210
}
171211

172212
sw.processingKeys = true
173-
if sw.prevLevel == 0 {
174-
// If prevLevel is 0, that means that we have not written anything yet. Equivalently,
175-
// we were virtually writing to the maxLevel+1.
176-
sw.prevLevel = len(sw.db.lc.levels)
177-
}
178213
var meta, userMeta byte
179214
if len(kv.Meta) > 0 {
180215
meta = kv.Meta[0]
@@ -221,6 +256,14 @@ func (sw *StreamWriter) Write(buf *z.Buffer) error {
221256
return err
222257
}
223258

259+
// Moved this piece of code to within the lock.
260+
if sw.prevLevel == 0 {
261+
// If prevLevel is 0, that means that we have not written anything yet.
262+
// So, we can write to the maxLevel. newWriter writes to prevLevel - 1,
263+
// so we can set prevLevel to len(levels).
264+
sw.prevLevel = len(sw.db.lc.levels)
265+
}
266+
224267
for streamID, req := range streamReqs {
225268
writer, ok := sw.writers[streamID]
226269
if !ok {

stream_writer_test.go

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -601,3 +601,79 @@ func TestStreamWriterWithLargeValue(t *testing.T) {
601601
require.NoError(t, sw.Flush(), "sw.Flush() failed")
602602
})
603603
}
604+
605+
func TestStreamWriterIncremental(t *testing.T) {
606+
addIncremtal := func(t *testing.T, db *DB, keys [][]byte) {
607+
buf := z.NewBuffer(10<<20, "test")
608+
defer func() { require.NoError(t, buf.Release()) }()
609+
for _, key := range keys {
610+
KVToBuffer(&pb.KV{
611+
Key: key,
612+
Value: []byte("val"),
613+
Version: 1,
614+
}, buf)
615+
}
616+
// Now do an incremental stream write.
617+
sw := db.NewStreamWriter()
618+
require.NoError(t, sw.PrepareIncremental(), "sw.PrepareIncremental() failed")
619+
require.NoError(t, sw.Write(buf), "sw.Write() failed")
620+
require.NoError(t, sw.Flush(), "sw.Flush() failed")
621+
}
622+
623+
t.Run("incremental on non-empty DB", func(t *testing.T) {
624+
runBadgerTest(t, nil, func(t *testing.T, db *DB) {
625+
buf := z.NewBuffer(10<<20, "test")
626+
defer func() { require.NoError(t, buf.Release()) }()
627+
KVToBuffer(&pb.KV{
628+
Key: []byte("key-1"),
629+
Value: []byte("val"),
630+
Version: 1,
631+
}, buf)
632+
sw := db.NewStreamWriter()
633+
require.NoError(t, sw.Prepare(), "sw.Prepare() failed")
634+
require.NoError(t, sw.Write(buf), "sw.Write() failed")
635+
require.NoError(t, sw.Flush(), "sw.Flush() failed")
636+
637+
addIncremtal(t, db, [][]byte{[]byte("key-2")})
638+
639+
txn := db.NewTransaction(false)
640+
defer txn.Discard()
641+
_, err := txn.Get([]byte("key-1"))
642+
require.NoError(t, err)
643+
_, err = txn.Get([]byte("key-2"))
644+
require.NoError(t, err)
645+
})
646+
})
647+
648+
t.Run("incremental on empty DB", func(t *testing.T) {
649+
runBadgerTest(t, nil, func(t *testing.T, db *DB) {
650+
addIncremtal(t, db, [][]byte{[]byte("key-1")})
651+
txn := db.NewTransaction(false)
652+
defer txn.Discard()
653+
_, err := txn.Get([]byte("key-1"))
654+
require.NoError(t, err)
655+
})
656+
})
657+
658+
t.Run("multiple incremental", func(t *testing.T) {
659+
runBadgerTest(t, nil, func(t *testing.T, db *DB) {
660+
addIncremtal(t, db, [][]byte{[]byte("a1"), []byte("c1")})
661+
addIncremtal(t, db, [][]byte{[]byte("a2"), []byte("c2")})
662+
addIncremtal(t, db, [][]byte{[]byte("a3"), []byte("c3")})
663+
txn := db.NewTransaction(false)
664+
defer txn.Discard()
665+
_, err := txn.Get([]byte("a1"))
666+
require.NoError(t, err)
667+
_, err = txn.Get([]byte("c1"))
668+
require.NoError(t, err)
669+
_, err = txn.Get([]byte("a2"))
670+
require.NoError(t, err)
671+
_, err = txn.Get([]byte("c2"))
672+
require.NoError(t, err)
673+
_, err = txn.Get([]byte("a3"))
674+
require.NoError(t, err)
675+
_, err = txn.Get([]byte("c3"))
676+
require.NoError(t, err)
677+
})
678+
})
679+
}

0 commit comments

Comments
 (0)