Skip to content

Commit b56e3c7

Browse files
committed
storage: send ReplicaState in streaming snapshots
This removes the need to read ReplicaState from the batch when applying a snapshot, allowing the use of a write-only RocksDB batch. The write-only RocksDB batch does not index keys on insertion/deletion which is only necessary if those keys are read. This change requires a stop-the-world upgrade: new nodes will not be able to received snapshots generated by old nodes. name time/op BatchApplyBatchRepr-8 200ms ± 1% WriteOnlyBatchApplyBatchRepr-8 99.1ms ± 1% name speed BatchApplyBatchRepr-8 180MB/s ± 1% WriteOnlyBatchApplyBatchRepr-8 363MB/s ± 1% Fixes #10783
1 parent e465692 commit b56e3c7

File tree

13 files changed

+357
-112
lines changed

13 files changed

+357
-112
lines changed

pkg/storage/engine/batch_test.go

Lines changed: 25 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -49,13 +49,18 @@ func mvccKey(k interface{}) MVCCKey {
4949
}
5050
}
5151

52-
func testBatchBasics(t *testing.T, commit func(e Engine, b Batch) error) {
52+
func testBatchBasics(t *testing.T, writeOnly bool, commit func(e Engine, b Batch) error) {
5353
stopper := stop.NewStopper()
5454
defer stopper.Stop()
5555
e := NewInMem(roachpb.Attributes{}, 1<<20)
5656
stopper.AddCloser(e)
5757

58-
b := e.NewBatch()
58+
var b Batch
59+
if writeOnly {
60+
b = e.NewWriteOnlyBatch()
61+
} else {
62+
b = e.NewBatch()
63+
}
5964
defer b.Close()
6065

6166
if err := b.Put(mvccKey("a"), []byte("value")); err != nil {
@@ -95,13 +100,15 @@ func testBatchBasics(t *testing.T, commit func(e Engine, b Batch) error) {
95100
{Key: mvccKey("a"), Value: []byte("value")},
96101
{Key: mvccKey("c"), Value: appender("foobar")},
97102
}
98-
// Scan values from batch directly.
99-
kvs, err = Scan(b, mvccKey(roachpb.RKeyMin), mvccKey(roachpb.RKeyMax), 0)
100-
if err != nil {
101-
t.Fatal(err)
102-
}
103-
if !reflect.DeepEqual(expValues, kvs) {
104-
t.Errorf("%v != %v", kvs, expValues)
103+
if !writeOnly {
104+
// Scan values from batch directly.
105+
kvs, err = Scan(b, mvccKey(roachpb.RKeyMin), mvccKey(roachpb.RKeyMax), 0)
106+
if err != nil {
107+
t.Fatal(err)
108+
}
109+
if !reflect.DeepEqual(expValues, kvs) {
110+
t.Errorf("%v != %v", kvs, expValues)
111+
}
105112
}
106113

107114
// Commit batch and verify direct engine scan yields correct values.
@@ -121,14 +128,14 @@ func testBatchBasics(t *testing.T, commit func(e Engine, b Batch) error) {
121128
// visible until commit, and then are all visible after commit.
122129
func TestBatchBasics(t *testing.T) {
123130
defer leaktest.AfterTest(t)()
124-
testBatchBasics(t, func(e Engine, b Batch) error {
131+
testBatchBasics(t, false /* writeOnly */, func(e Engine, b Batch) error {
125132
return b.Commit()
126133
})
127134
}
128135

129136
func TestBatchRepr(t *testing.T) {
130137
defer leaktest.AfterTest(t)()
131-
testBatchBasics(t, func(e Engine, b Batch) error {
138+
testBatchBasics(t, false /* writeOnly */, func(e Engine, b Batch) error {
132139
repr := b.Repr()
133140

134141
// Simple sanity checks about the format of the batch representation. This
@@ -228,6 +235,13 @@ func TestBatchRepr(t *testing.T) {
228235
})
229236
}
230237

238+
func TestWriteBatchBasics(t *testing.T) {
239+
defer leaktest.AfterTest(t)()
240+
testBatchBasics(t, true /* writeOnly */, func(e Engine, b Batch) error {
241+
return b.Commit()
242+
})
243+
}
244+
231245
// Regression test for flush issue which caused
232246
// b2.ApplyBatchRepr(b1.Repr()).Repr() to not equal a noop.
233247
func TestApplyBatchRepr(t *testing.T) {

pkg/storage/engine/bench_rocksdb_test.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -261,6 +261,14 @@ func BenchmarkMVCCDeleteRange1Version256Bytes_RocksDB(b *testing.B) {
261261
runMVCCDeleteRange(setupMVCCRocksDB, 256, b)
262262
}
263263

264+
func BenchmarkBatchApplyBatchRepr(b *testing.B) {
265+
runBatchApplyBatchRepr(setupMVCCInMemRocksDB, false /* writeOnly */, 10, 1000000, b)
266+
}
267+
268+
func BenchmarkWriteOnlyBatchApplyBatchRepr(b *testing.B) {
269+
runBatchApplyBatchRepr(setupMVCCInMemRocksDB, true /* writeOnly */, 10, 1000000, b)
270+
}
271+
264272
func BenchmarkBatchBuilderPut(b *testing.B) {
265273
value := make([]byte, 10)
266274
for i := range value {

pkg/storage/engine/bench_test.go

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -528,6 +528,49 @@ func runMVCCComputeStats(emk engineMaker, valueBytes int, b *testing.B) {
528528
log.Infof(context.Background(), "live_bytes: %d", stats.LiveBytes)
529529
}
530530

531+
func runBatchApplyBatchRepr(
532+
emk engineMaker, writeOnly bool, valueSize, batchSize int, b *testing.B,
533+
) {
534+
rng, _ := randutil.NewPseudoRand()
535+
value := roachpb.MakeValueFromBytes(randutil.RandBytes(rng, valueSize))
536+
keyBuf := append(make([]byte, 0, 64), []byte("key-")...)
537+
538+
eng := emk(b, fmt.Sprintf("batch_apply_batch_repr_%d_%d", valueSize, batchSize))
539+
defer eng.Close()
540+
541+
var repr []byte
542+
{
543+
batch := eng.NewBatch()
544+
for i := 0; i < batchSize; i++ {
545+
key := roachpb.Key(encoding.EncodeUvarintAscending(keyBuf[:4], uint64(i)))
546+
ts := makeTS(timeutil.Now().UnixNano(), 0)
547+
if err := MVCCPut(context.Background(), batch, nil, key, ts, value, nil); err != nil {
548+
b.Fatalf("failed put: %s", err)
549+
}
550+
}
551+
repr = batch.Repr()
552+
batch.Close()
553+
}
554+
555+
b.SetBytes(int64(len(repr)))
556+
b.ResetTimer()
557+
558+
for i := 0; i < b.N; i++ {
559+
var batch Batch
560+
if writeOnly {
561+
batch = eng.NewWriteOnlyBatch()
562+
} else {
563+
batch = eng.NewBatch()
564+
}
565+
if err := batch.ApplyBatchRepr(repr); err != nil {
566+
b.Fatal(err)
567+
}
568+
batch.Close()
569+
}
570+
571+
b.StopTimer()
572+
}
573+
531574
func BenchmarkMVCCPutDelete_RocksDB(b *testing.B) {
532575
rocksdb := setupMVCCInMemRocksDB(b, "put_delete")
533576
defer rocksdb.Close()

pkg/storage/engine/engine.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,10 @@ type Engine interface {
171171
// this engine. Batched engines accumulate all mutations and apply
172172
// them atomically on a call to Commit().
173173
NewBatch() Batch
174+
// NewWriteOnlyBatch returns a new instance of a batched engine which wraps
175+
// this engine. A write-only batch accumulates all mutations and applies them
176+
// atomically on a call to Commit(). Read operations return an error.
177+
NewWriteOnlyBatch() Batch
174178
// NewSnapshot returns a new instance of a read-only snapshot
175179
// engine. Snapshots are instantaneous and, as long as they're
176180
// released relatively quickly, inexpensive. Snapshots are released

pkg/storage/engine/rocksdb.go

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -561,7 +561,13 @@ func (r *RocksDB) NewSnapshot() Reader {
561561

562562
// NewBatch returns a new batch wrapping this rocksdb engine.
563563
func (r *RocksDB) NewBatch() Batch {
564-
return newRocksDBBatch(r)
564+
return newRocksDBBatch(r, false /* writeOnly */)
565+
}
566+
567+
// NewWriteOnlyBatch returns a new write-only batch wrapping this rocksdb
568+
// engine.
569+
func (r *RocksDB) NewWriteOnlyBatch() Batch {
570+
return newRocksDBBatch(r, true /* writeOnly */)
565571
}
566572

567573
// GetSSTables retrieves metadata about this engine's live sstables.
@@ -861,12 +867,14 @@ type rocksDBBatch struct {
861867
distinct distinctBatch
862868
distinctOpen bool
863869
distinctNeedsFlush bool
870+
writeOnly bool
864871
}
865872

866-
func newRocksDBBatch(parent *RocksDB) *rocksDBBatch {
873+
func newRocksDBBatch(parent *RocksDB, writeOnly bool) *rocksDBBatch {
867874
r := &rocksDBBatch{
868-
parent: parent,
869-
batch: C.DBNewBatch(parent.rdb),
875+
parent: parent,
876+
batch: C.DBNewBatch(parent.rdb, C.bool(writeOnly)),
877+
writeOnly: writeOnly,
870878
}
871879
r.distinct.rocksDBBatch = r
872880
return r
@@ -920,6 +928,9 @@ func (r *rocksDBBatch) ApplyBatchRepr(repr []byte) error {
920928
}
921929

922930
func (r *rocksDBBatch) Get(key MVCCKey) ([]byte, error) {
931+
if r.writeOnly {
932+
panic("write-only batch")
933+
}
923934
if r.distinctOpen {
924935
panic("distinct batch open")
925936
}
@@ -930,6 +941,9 @@ func (r *rocksDBBatch) Get(key MVCCKey) ([]byte, error) {
930941
func (r *rocksDBBatch) GetProto(
931942
key MVCCKey, msg proto.Message,
932943
) (ok bool, keyBytes, valBytes int64, err error) {
944+
if r.writeOnly {
945+
panic("write-only batch")
946+
}
933947
if r.distinctOpen {
934948
panic("distinct batch open")
935949
}
@@ -938,6 +952,9 @@ func (r *rocksDBBatch) GetProto(
938952
}
939953

940954
func (r *rocksDBBatch) Iterate(start, end MVCCKey, f func(MVCCKeyValue) (bool, error)) error {
955+
if r.writeOnly {
956+
panic("write-only batch")
957+
}
941958
if r.distinctOpen {
942959
panic("distinct batch open")
943960
}
@@ -959,6 +976,9 @@ func (r *rocksDBBatch) Clear(key MVCCKey) error {
959976
// batch. A panic will be thrown if multiple prefix or normal (non-prefix)
960977
// iterators are used simultaneously on the same batch.
961978
func (r *rocksDBBatch) NewIterator(prefix bool) Iterator {
979+
if r.writeOnly {
980+
panic("write-only batch")
981+
}
962982
if r.distinctOpen {
963983
panic("distinct batch open")
964984
}

0 commit comments

Comments
 (0)