Skip to content

Commit 4a70b3d

Browse files
committed
fix(stream): transmit whether it is a delete key
Taken from PR #1712, commit 58d0674
1 parent 0e2acc6 commit 4a70b3d

File tree

2 files changed

+25
-1
lines changed

2 files changed

+25
-1
lines changed

stream.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,8 @@ func (st *Stream) ToList(key []byte, itr *Iterator) (*pb.KVList, error) {
129129
}
130130
kv.Version = item.Version()
131131
kv.ExpiresAt = item.ExpiresAt()
132-
kv.Meta = []byte{item.meta}
132+
// As we do full copy, we need to transmit only if it is a delete key or not.
133+
kv.Meta = []byte{item.meta & bitDelete}
133134
kv.UserMeta = a.Copy([]byte{item.UserMeta()})
134135

135136
list.Kv = append(list.Kv, kv)

stream_writer_test.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -578,3 +578,26 @@ func TestStreamWriterEncrypted(t *testing.T) {
578578
require.NoError(t, db.Close())
579579

580580
}
581+
582+
// Test that stream writer does not crashes with large values in managed mode.
583+
func TestStreamWriterWithLargeValue(t *testing.T) {
584+
opts := DefaultOptions("")
585+
opts.managedTxns = true
586+
runBadgerTest(t, &opts, func(t *testing.T, db *DB) {
587+
buf := z.NewBuffer(10<<20, "test")
588+
defer func() { require.NoError(t, buf.Release()) }()
589+
val := make([]byte, 10<<20)
590+
_, err := rand.Read(val)
591+
require.NoError(t, err)
592+
KVToBuffer(&pb.KV{
593+
Key: []byte("key"),
594+
Value: val,
595+
Version: 1,
596+
}, buf)
597+
598+
sw := db.NewStreamWriter()
599+
require.NoError(t, sw.Prepare(), "sw.Prepare() failed")
600+
require.NoError(t, sw.Write(buf), "sw.Write() failed")
601+
require.NoError(t, sw.Flush(), "sw.Flush() failed")
602+
})
603+
}

0 commit comments

Comments
 (0)