Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions pkg/storage/client_raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -685,9 +685,9 @@ func TestFailedSnapshotFillsReservation(t *testing.T) {
t.Fatal(err)
}
header := storage.SnapshotRequest_Header{
CanDecline: true,
RangeSize: 100,
RangeDescriptor: *rep.Desc(),
CanDecline: true,
RangeSize: 100,
State: storagebase.ReplicaState{Desc: rep.Desc()},
}
// Cause this stream to return an error as soon as we ask it for something.
// This injects an error into HandleSnapshotStream when we try to send the
Expand Down
36 changes: 25 additions & 11 deletions pkg/storage/engine/batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,18 @@ func mvccKey(k interface{}) MVCCKey {
}
}

func testBatchBasics(t *testing.T, commit func(e Engine, b Batch) error) {
func testBatchBasics(t *testing.T, writeOnly bool, commit func(e Engine, b Batch) error) {
stopper := stop.NewStopper()
defer stopper.Stop()
e := NewInMem(roachpb.Attributes{}, 1<<20)
stopper.AddCloser(e)

b := e.NewBatch()
var b Batch
if writeOnly {
b = e.NewWriteOnlyBatch()
} else {
b = e.NewBatch()
}
defer b.Close()

if err := b.Put(mvccKey("a"), []byte("value")); err != nil {
Expand Down Expand Up @@ -95,13 +100,15 @@ func testBatchBasics(t *testing.T, commit func(e Engine, b Batch) error) {
{Key: mvccKey("a"), Value: []byte("value")},
{Key: mvccKey("c"), Value: appender("foobar")},
}
// Scan values from batch directly.
kvs, err = Scan(b, mvccKey(roachpb.RKeyMin), mvccKey(roachpb.RKeyMax), 0)
if err != nil {
t.Fatal(err)
}
if !reflect.DeepEqual(expValues, kvs) {
t.Errorf("%v != %v", kvs, expValues)
if !writeOnly {
// Scan values from batch directly.
kvs, err = Scan(b, mvccKey(roachpb.RKeyMin), mvccKey(roachpb.RKeyMax), 0)
if err != nil {
t.Fatal(err)
}
if !reflect.DeepEqual(expValues, kvs) {
t.Errorf("%v != %v", kvs, expValues)
}
}

// Commit batch and verify direct engine scan yields correct values.
Expand All @@ -121,14 +128,14 @@ func testBatchBasics(t *testing.T, commit func(e Engine, b Batch) error) {
// visible until commit, and then are all visible after commit.
func TestBatchBasics(t *testing.T) {
defer leaktest.AfterTest(t)()
testBatchBasics(t, func(e Engine, b Batch) error {
testBatchBasics(t, false /* writeOnly */, func(e Engine, b Batch) error {
return b.Commit()
})
}

func TestBatchRepr(t *testing.T) {
defer leaktest.AfterTest(t)()
testBatchBasics(t, func(e Engine, b Batch) error {
testBatchBasics(t, false /* writeOnly */, func(e Engine, b Batch) error {
repr := b.Repr()

// Simple sanity checks about the format of the batch representation. This
Expand Down Expand Up @@ -228,6 +235,13 @@ func TestBatchRepr(t *testing.T) {
})
}

func TestWriteBatchBasics(t *testing.T) {
defer leaktest.AfterTest(t)()
testBatchBasics(t, true /* writeOnly */, func(e Engine, b Batch) error {
return b.Commit()
})
}

// Regression test for flush issue which caused
// b2.ApplyBatchRepr(b1.Repr()).Repr() to not equal a noop.
func TestApplyBatchRepr(t *testing.T) {
Expand Down
8 changes: 8 additions & 0 deletions pkg/storage/engine/bench_rocksdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,14 @@ func BenchmarkMVCCDeleteRange1Version256Bytes_RocksDB(b *testing.B) {
runMVCCDeleteRange(setupMVCCRocksDB, 256, b)
}

func BenchmarkBatchApplyBatchRepr(b *testing.B) {
runBatchApplyBatchRepr(setupMVCCInMemRocksDB, false /* writeOnly */, 10, 1000000, b)
}

func BenchmarkWriteOnlyBatchApplyBatchRepr(b *testing.B) {
runBatchApplyBatchRepr(setupMVCCInMemRocksDB, true /* writeOnly */, 10, 1000000, b)
}

func BenchmarkBatchBuilderPut(b *testing.B) {
value := make([]byte, 10)
for i := range value {
Expand Down
43 changes: 43 additions & 0 deletions pkg/storage/engine/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -528,6 +528,49 @@ func runMVCCComputeStats(emk engineMaker, valueBytes int, b *testing.B) {
log.Infof(context.Background(), "live_bytes: %d", stats.LiveBytes)
}

func runBatchApplyBatchRepr(
emk engineMaker, writeOnly bool, valueSize, batchSize int, b *testing.B,
) {
rng, _ := randutil.NewPseudoRand()
value := roachpb.MakeValueFromBytes(randutil.RandBytes(rng, valueSize))
keyBuf := append(make([]byte, 0, 64), []byte("key-")...)

eng := emk(b, fmt.Sprintf("batch_apply_batch_repr_%d_%d", valueSize, batchSize))
defer eng.Close()

var repr []byte
{
batch := eng.NewBatch()
for i := 0; i < batchSize; i++ {
key := roachpb.Key(encoding.EncodeUvarintAscending(keyBuf[:4], uint64(i)))
ts := makeTS(timeutil.Now().UnixNano(), 0)
if err := MVCCPut(context.Background(), batch, nil, key, ts, value, nil); err != nil {
b.Fatal(err)
}
}
repr = batch.Repr()
batch.Close()
}

b.SetBytes(int64(len(repr)))
b.ResetTimer()

for i := 0; i < b.N; i++ {
var batch Batch
if writeOnly {
batch = eng.NewWriteOnlyBatch()
} else {
batch = eng.NewBatch()
}
if err := batch.ApplyBatchRepr(repr); err != nil {
b.Fatal(err)
}
batch.Close()
}

b.StopTimer()
}

func BenchmarkMVCCPutDelete_RocksDB(b *testing.B) {
rocksdb := setupMVCCInMemRocksDB(b, "put_delete")
defer rocksdb.Close()
Expand Down
9 changes: 9 additions & 0 deletions pkg/storage/engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,15 @@ type Engine interface {
// this engine. Batched engines accumulate all mutations and apply
// them atomically on a call to Commit().
NewBatch() Batch
// NewWriteOnlyBatch returns a new instance of a batched engine which wraps
// this engine. A write-only batch accumulates all mutations and applies them
// atomically on a call to Commit(). Read operations return an error.
//
// TODO(peter): This should return a WriteBatch interface, but there are mild
// complications in both defining that interface and implementing it. In
// particular, Batch.Close would no longer come from Reader and we'd need to
// refactor a bunch of code in rocksDBBatch.
NewWriteOnlyBatch() Batch
// NewSnapshot returns a new instance of a read-only snapshot
// engine. Snapshots are instantaneous and, as long as they're
// released relatively quickly, inexpensive. Snapshots are released
Expand Down
28 changes: 24 additions & 4 deletions pkg/storage/engine/rocksdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -561,7 +561,13 @@ func (r *RocksDB) NewSnapshot() Reader {

// NewBatch returns a new batch wrapping this rocksdb engine.
func (r *RocksDB) NewBatch() Batch {
return newRocksDBBatch(r)
return newRocksDBBatch(r, false /* writeOnly */)
}

// NewWriteOnlyBatch returns a new write-only batch wrapping this rocksdb
// engine.
func (r *RocksDB) NewWriteOnlyBatch() Batch {
return newRocksDBBatch(r, true /* writeOnly */)
}

// GetSSTables retrieves metadata about this engine's live sstables.
Expand Down Expand Up @@ -861,12 +867,14 @@ type rocksDBBatch struct {
distinct distinctBatch
distinctOpen bool
distinctNeedsFlush bool
writeOnly bool
}

func newRocksDBBatch(parent *RocksDB) *rocksDBBatch {
func newRocksDBBatch(parent *RocksDB, writeOnly bool) *rocksDBBatch {
r := &rocksDBBatch{
parent: parent,
batch: C.DBNewBatch(parent.rdb),
parent: parent,
batch: C.DBNewBatch(parent.rdb, C.bool(writeOnly)),
writeOnly: writeOnly,
}
r.distinct.rocksDBBatch = r
return r
Expand Down Expand Up @@ -920,6 +928,9 @@ func (r *rocksDBBatch) ApplyBatchRepr(repr []byte) error {
}

func (r *rocksDBBatch) Get(key MVCCKey) ([]byte, error) {
if r.writeOnly {
panic("write-only batch")
}
if r.distinctOpen {
panic("distinct batch open")
}
Expand All @@ -930,6 +941,9 @@ func (r *rocksDBBatch) Get(key MVCCKey) ([]byte, error) {
func (r *rocksDBBatch) GetProto(
key MVCCKey, msg proto.Message,
) (ok bool, keyBytes, valBytes int64, err error) {
if r.writeOnly {
panic("write-only batch")
}
if r.distinctOpen {
panic("distinct batch open")
}
Expand All @@ -938,6 +952,9 @@ func (r *rocksDBBatch) GetProto(
}

func (r *rocksDBBatch) Iterate(start, end MVCCKey, f func(MVCCKeyValue) (bool, error)) error {
if r.writeOnly {
panic("write-only batch")
}
if r.distinctOpen {
panic("distinct batch open")
}
Expand All @@ -959,6 +976,9 @@ func (r *rocksDBBatch) Clear(key MVCCKey) error {
// batch. A panic will be thrown if multiple prefix or normal (non-prefix)
// iterators are used simultaneously on the same batch.
func (r *rocksDBBatch) NewIterator(prefix bool) Iterator {
if r.writeOnly {
panic("write-only batch")
}
if r.distinctOpen {
panic("distinct batch open")
}
Expand Down
Loading