Skip to content

Commit 24ef3aa

Browse files
authored
feat(stream): add support for incremental stream writer (#1722) (#1874)
This PR adds support for stream writing incrementally to the DB. Adds an API: StreamWriter.PrepareIncremental
1 parent f690097 commit 24ef3aa

File tree

4 files changed

+159
-7
lines changed

4 files changed

+159
-7
lines changed

badger/cmd/stream.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,7 @@ func stream(cmd *cobra.Command, args []string) error {
118118
WithValueDir(so.outDir).
119119
WithNumVersionsToKeep(so.numVersions).
120120
WithCompression(options.CompressionType(so.compressionType)).
121+
WithEncryptionKey(encKey).
121122
WithReadOnly(false)
122123
err = inDB.StreamDB(outOpt)
123124

db.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1623,7 +1623,7 @@ func (db *DB) prepareToDrop() (func(), error) {
16231623
// write it to db. Then, flush all the pending memtable. So that, we
16241624
// don't miss any entries.
16251625
if err := db.blockWrite(); err != nil {
1626-
return nil, err
1626+
return func() {}, err
16271627
}
16281628
reqs := make([]*request, 0, 10)
16291629
for {

stream_writer.go

Lines changed: 58 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ type StreamWriter struct {
4848
throttle *y.Throttle
4949
maxVersion uint64
5050
writers map[uint32]*sortedWriter
51+
prevLevel int
5152
}
5253

5354
// NewStreamWriter creates a StreamWriter. Right after creating StreamWriter, Prepare must be
@@ -67,18 +68,58 @@ func (db *DB) NewStreamWriter() *StreamWriter {
6768
// Prepare should be called before writing any entry to StreamWriter. It deletes all data present in
6869
// existing DB, stops compactions and any writes being done by other means. Be very careful when
6970
// calling Prepare, because it could result in permanent data loss. Not calling Prepare would result
70-
// in a corrupt Badger instance.
71+
// in a corrupt Badger instance. Use PrepareIncremental to do incremental stream write.
7172
func (sw *StreamWriter) Prepare() error {
7273
sw.writeLock.Lock()
7374
defer sw.writeLock.Unlock()
7475

7576
done, err := sw.db.dropAll()
77+
// Ensure that done() is never called more than once.
78+
var once sync.Once
79+
sw.done = func() { once.Do(done) }
80+
return err
81+
}
82+
83+
// PrepareIncremental should be called before writing any entry to StreamWriter incrementally.
84+
// In incremental stream write, the tables are written at one level above the current base level.
85+
func (sw *StreamWriter) PrepareIncremental() error {
86+
sw.writeLock.Lock()
87+
defer sw.writeLock.Unlock()
7688

7789
// Ensure that done() is never called more than once.
7890
var once sync.Once
91+
92+
// prepareToDrop will stop all the incoming writes and process any pending flush tasks.
93+
// Before we start writing, we'll stop the compactions because no one else should be writing to
94+
// the same level as the stream writer is writing to.
95+
f, err := sw.db.prepareToDrop()
96+
if err != nil {
97+
sw.done = func() { once.Do(f) }
98+
return err
99+
}
100+
sw.db.stopCompactions()
101+
done := func() {
102+
sw.db.startCompactions()
103+
f()
104+
}
79105
sw.done = func() { once.Do(done) }
80106

81-
return err
107+
isEmptyDB := true
108+
for _, level := range sw.db.Levels() {
109+
if level.NumTables > 0 {
110+
sw.prevLevel = level.Level
111+
isEmptyDB = false
112+
break
113+
}
114+
}
115+
if isEmptyDB {
116+
// If DB is empty, we should allow doing incremental stream write.
117+
return nil
118+
}
119+
if sw.prevLevel == 0 {
120+
return fmt.Errorf("Unable to do incremental writes because L0 has data")
121+
}
122+
return nil
82123
}
83124

84125
// Write writes KVList to DB. Each KV within the list contains the stream id which StreamWriter
@@ -110,16 +151,25 @@ func (sw *StreamWriter) Write(buf *z.Buffer) error {
110151
panic(fmt.Sprintf("write performed on closed stream: %d", kv.StreamId))
111152
}
112153

154+
sw.writeLock.Lock()
155+
if sw.maxVersion < kv.Version {
156+
sw.maxVersion = kv.Version
157+
}
158+
if sw.prevLevel == 0 {
159+
// If prevLevel is 0, that means that we have not written anything yet.
160+
// So, we can write to the maxLevel. newWriter writes to prevLevel - 1,
161+
// so we can set prevLevel to len(levels).
162+
sw.prevLevel = len(sw.db.lc.levels)
163+
}
164+
sw.writeLock.Unlock()
165+
113166
var meta, userMeta byte
114167
if len(kv.Meta) > 0 {
115168
meta = kv.Meta[0]
116169
}
117170
if len(kv.UserMeta) > 0 {
118171
userMeta = kv.UserMeta[0]
119172
}
120-
if sw.maxVersion < kv.Version {
121-
sw.maxVersion = kv.Version
122-
}
123173
e := &Entry{
124174
Key: y.KeyWithTs(kv.Key, kv.Version),
125175
Value: y.Copy(kv.Value),
@@ -285,6 +335,7 @@ type sortedWriter struct {
285335

286336
builder *table.Builder
287337
lastKey []byte
338+
level int
288339
streamID uint32
289340
reqCh chan *request
290341
// Have separate closer for each writer, as it can be closed at any time.
@@ -304,6 +355,7 @@ func (sw *StreamWriter) newWriter(streamID uint32) (*sortedWriter, error) {
304355
builder: table.NewTableBuilder(bopts),
305356
reqCh: make(chan *request, 3),
306357
closer: z.NewCloser(1),
358+
level: sw.prevLevel - 1, // Write at the level just above the one we were writing to.
307359
}
308360

309361
go w.handleRequests()
@@ -435,7 +487,7 @@ func (w *sortedWriter) createTable(builder *table.Builder) error {
435487
}
436488
lc := w.db.lc
437489

438-
lhandler := lc.levels[len(lc.levels)-1]
490+
lhandler := lc.levels[w.level]
439491
// Now that table can be opened successfully, let's add this to the MANIFEST.
440492
change := &pb.ManifestChange{
441493
Id: tbl.ID(),

stream_writer_test.go

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -577,3 +577,102 @@ func TestStreamWriterEncrypted(t *testing.T) {
577577
require.NoError(t, db.Close())
578578

579579
}
580+
581+
// Test that stream writer does not crashes with large values in managed mode.
582+
func TestStreamWriterWithLargeValue(t *testing.T) {
583+
opts := DefaultOptions("")
584+
opts.managedTxns = true
585+
runBadgerTest(t, &opts, func(t *testing.T, db *DB) {
586+
buf := z.NewBuffer(10<<20, "test")
587+
defer func() { require.NoError(t, buf.Release()) }()
588+
val := make([]byte, 10<<20)
589+
_, err := rand.Read(val)
590+
require.NoError(t, err)
591+
KVToBuffer(&pb.KV{
592+
Key: []byte("key"),
593+
Value: val,
594+
Version: 1,
595+
}, buf)
596+
597+
sw := db.NewStreamWriter()
598+
require.NoError(t, sw.Prepare(), "sw.Prepare() failed")
599+
require.NoError(t, sw.Write(buf), "sw.Write() failed")
600+
require.NoError(t, sw.Flush(), "sw.Flush() failed")
601+
})
602+
}
603+
604+
func TestStreamWriterIncremental(t *testing.T) {
605+
addIncremtal := func(t *testing.T, db *DB, keys [][]byte) {
606+
buf := z.NewBuffer(10<<20, "test")
607+
defer func() { require.NoError(t, buf.Release()) }()
608+
for _, key := range keys {
609+
KVToBuffer(&pb.KV{
610+
Key: key,
611+
Value: []byte("val"),
612+
Version: 1,
613+
}, buf)
614+
}
615+
// Now do an incremental stream write.
616+
sw := db.NewStreamWriter()
617+
require.NoError(t, sw.PrepareIncremental(), "sw.PrepareIncremental() failed")
618+
require.NoError(t, sw.Write(buf), "sw.Write() failed")
619+
require.NoError(t, sw.Flush(), "sw.Flush() failed")
620+
}
621+
622+
t.Run("incremental on non-empty DB", func(t *testing.T) {
623+
runBadgerTest(t, nil, func(t *testing.T, db *DB) {
624+
buf := z.NewBuffer(10<<20, "test")
625+
defer func() { require.NoError(t, buf.Release()) }()
626+
KVToBuffer(&pb.KV{
627+
Key: []byte("key-1"),
628+
Value: []byte("val"),
629+
Version: 1,
630+
}, buf)
631+
sw := db.NewStreamWriter()
632+
require.NoError(t, sw.Prepare(), "sw.Prepare() failed")
633+
require.NoError(t, sw.Write(buf), "sw.Write() failed")
634+
require.NoError(t, sw.Flush(), "sw.Flush() failed")
635+
636+
addIncremtal(t, db, [][]byte{[]byte("key-2")})
637+
638+
txn := db.NewTransaction(false)
639+
defer txn.Discard()
640+
_, err := txn.Get([]byte("key-1"))
641+
require.NoError(t, err)
642+
_, err = txn.Get([]byte("key-2"))
643+
require.NoError(t, err)
644+
})
645+
})
646+
647+
t.Run("incremental on empty DB", func(t *testing.T) {
648+
runBadgerTest(t, nil, func(t *testing.T, db *DB) {
649+
addIncremtal(t, db, [][]byte{[]byte("key-1")})
650+
txn := db.NewTransaction(false)
651+
defer txn.Discard()
652+
_, err := txn.Get([]byte("key-1"))
653+
require.NoError(t, err)
654+
})
655+
})
656+
657+
t.Run("multiple incremental", func(t *testing.T) {
658+
runBadgerTest(t, nil, func(t *testing.T, db *DB) {
659+
addIncremtal(t, db, [][]byte{[]byte("a1"), []byte("c1")})
660+
addIncremtal(t, db, [][]byte{[]byte("a2"), []byte("c2")})
661+
addIncremtal(t, db, [][]byte{[]byte("a3"), []byte("c3")})
662+
txn := db.NewTransaction(false)
663+
defer txn.Discard()
664+
_, err := txn.Get([]byte("a1"))
665+
require.NoError(t, err)
666+
_, err = txn.Get([]byte("c1"))
667+
require.NoError(t, err)
668+
_, err = txn.Get([]byte("a2"))
669+
require.NoError(t, err)
670+
_, err = txn.Get([]byte("c2"))
671+
require.NoError(t, err)
672+
_, err = txn.Get([]byte("a3"))
673+
require.NoError(t, err)
674+
_, err = txn.Get([]byte("c3"))
675+
require.NoError(t, err)
676+
})
677+
})
678+
}

0 commit comments

Comments
 (0)