Skip to content

Commit da5f789

Browse files
feat(dropPrefix): add DropPrefixNonBlocking API (#1698)
This PR adds DropPrefixNonBlocking and DropPrefixBlocking API that can be used to logically delete the data for specified prefixes. DropPrefix now makes decision based on badger option AllowStopTheWorld whose default is to use DropPrefixBlocking. With DropPrefixNonBlocking the data would not be cleared from the LSM tree immediately. It would be deleted eventually through compactions. Co-authored-by: Rohan Prasad <prasad.rohan93@gmail.com>
1 parent e4002b7 commit da5f789

File tree

6 files changed

+249
-8
lines changed

6 files changed

+249
-8
lines changed

db.go

Lines changed: 120 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -763,6 +763,8 @@ var requestPool = sync.Pool{
763763
}
764764

765765
func (db *DB) writeToLSM(b *request) error {
766+
db.lock.RLock()
767+
defer db.lock.RUnlock()
766768
for i, entry := range b.Entries {
767769
var err error
768770
if db.opt.managedTxns || entry.skipVlogAndSetThreshold(db.valueThreshold()) {
@@ -1036,10 +1038,9 @@ func (db *DB) HandoverSkiplist(skl *skl.Skiplist, callback func()) error {
10361038

10371039
// Iterate over the skiplist and send the entries to the publisher.
10381040
it := skl.NewIterator()
1039-
it.SeekToFirst()
10401041

10411042
var entries []*Entry
1042-
for it.Valid() {
1043+
for it.SeekToFirst(); it.Valid(); it.Next() {
10431044
v := it.Value()
10441045
e := &Entry{
10451046
Key: it.Key(),
@@ -1048,7 +1049,6 @@ func (db *DB) HandoverSkiplist(skl *skl.Skiplist, callback func()) error {
10481049
UserMeta: v.UserMeta,
10491050
}
10501051
entries = append(entries, e)
1051-
it.Next()
10521052
}
10531053
req := &request{
10541054
Entries: entries,
@@ -1836,6 +1836,122 @@ func (db *DB) dropAll() (func(), error) {
18361836
return resume, nil
18371837
}
18381838

1839+
// DropPrefixNonBlocking would logically drop all the keys with the provided prefix. The data would
1840+
// not be cleared from LSM tree immediately. It would be deleted eventually through compactions.
1841+
// This operation is useful when we don't want to block writes while we delete the prefixes.
1842+
// It does this in the following way:
1843+
// - Stream the given prefixes at a given ts.
1844+
// - Write them to skiplist at the specified ts and handover that skiplist to DB.
1845+
func (db *DB) DropPrefixNonBlocking(prefixes ...[]byte) error {
1846+
if db.opt.ReadOnly {
1847+
return errors.New("Attempting to drop data in read-only mode.")
1848+
}
1849+
1850+
if len(prefixes) == 0 {
1851+
return nil
1852+
}
1853+
db.opt.Infof("Non-blocking DropPrefix called for %s", prefixes)
1854+
1855+
cbuf := z.NewBuffer(int(db.opt.MemTableSize), "DropPrefixNonBlocking")
1856+
defer cbuf.Release()
1857+
1858+
var wg sync.WaitGroup
1859+
handover := func(force bool) error {
1860+
if !force && int64(cbuf.LenNoPadding()) < db.opt.MemTableSize {
1861+
return nil
1862+
}
1863+
1864+
// Sort the kvs, add them to the builder, and hand it over to DB.
1865+
cbuf.SortSlice(func(left, right []byte) bool {
1866+
return y.CompareKeys(left, right) < 0
1867+
})
1868+
1869+
b := skl.NewBuilder(db.opt.MemTableSize)
1870+
err := cbuf.SliceIterate(func(s []byte) error {
1871+
b.Add(s, y.ValueStruct{Meta: bitDelete})
1872+
return nil
1873+
})
1874+
if err != nil {
1875+
return err
1876+
}
1877+
cbuf.Reset()
1878+
wg.Add(1)
1879+
return db.HandoverSkiplist(b.Skiplist(), wg.Done)
1880+
}
1881+
1882+
dropPrefix := func(prefix []byte) error {
1883+
stream := db.NewStreamAt(math.MaxUint64)
1884+
stream.LogPrefix = fmt.Sprintf("Dropping prefix: %#x", prefix)
1885+
stream.Prefix = prefix
1886+
// We don't need anything except key and version.
1887+
stream.KeyToList = func(key []byte, itr *Iterator) (*pb.KVList, error) {
1888+
if !itr.Valid() {
1889+
return nil, nil
1890+
}
1891+
item := itr.Item()
1892+
if item.IsDeletedOrExpired() {
1893+
return nil, nil
1894+
}
1895+
if !bytes.Equal(key, item.Key()) {
1896+
// Return on the encounter with another key.
1897+
return nil, nil
1898+
}
1899+
1900+
a := itr.Alloc
1901+
ka := a.Copy(key)
1902+
list := &pb.KVList{}
1903+
// We need to generate only a single delete marker per key. All the versions for this
1904+
// key will be considered deleted, if we delete the one at highest version.
1905+
kv := y.NewKV(a)
1906+
kv.Key = y.KeyWithTs(ka, item.Version())
1907+
list.Kv = append(list.Kv, kv)
1908+
itr.Next()
1909+
return list, nil
1910+
}
1911+
1912+
stream.Send = func(buf *z.Buffer) error {
1913+
kv := pb.KV{}
1914+
err := buf.SliceIterate(func(s []byte) error {
1915+
kv.Reset()
1916+
if err := kv.Unmarshal(s); err != nil {
1917+
return err
1918+
}
1919+
cbuf.WriteSlice(kv.Key)
1920+
return nil
1921+
})
1922+
if err != nil {
1923+
return err
1924+
}
1925+
return handover(false)
1926+
}
1927+
if err := stream.Orchestrate(context.Background()); err != nil {
1928+
return err
1929+
}
1930+
// Flush the remaining skiplists if any.
1931+
return handover(true)
1932+
}
1933+
1934+
// Iterate over all the prefixes and logically drop them.
1935+
for _, prefix := range prefixes {
1936+
if err := dropPrefix(prefix); err != nil {
1937+
return errors.Wrapf(err, "While dropping prefix: %#x", prefix)
1938+
}
1939+
}
1940+
1941+
wg.Wait()
1942+
return nil
1943+
}
1944+
1945+
// DropPrefix would drop all the keys with the provided prefix. Based on DB options, it either drops
1946+
// the prefixes by blocking the writes or doing a logical drop.
1947+
// See DropPrefixBlocking and DropPrefixNonBlocking for more information.
1948+
func (db *DB) DropPrefix(prefixes ...[]byte) error {
1949+
if db.opt.AllowStopTheWorld {
1950+
return db.DropPrefixBlocking(prefixes...)
1951+
}
1952+
return db.DropPrefixNonBlocking(prefixes...)
1953+
}
1954+
18391955
// DropPrefix would drop all the keys with the provided prefix. It does this in the following way:
18401956
// - Stop accepting new writes.
18411957
// - Stop memtable flushes before acquiring lock. Because we're acquring lock here
@@ -1847,7 +1963,7 @@ func (db *DB) dropAll() (func(), error) {
18471963
// - Compact L0->L1, skipping over Kp.
18481964
// - Compact rest of the levels, Li->Li, picking tables which have Kp.
18491965
// - Resume memtable flushes, compactions and writes.
1850-
func (db *DB) DropPrefix(prefixes ...[]byte) error {
1966+
func (db *DB) DropPrefixBlocking(prefixes ...[]byte) error {
18511967
if len(prefixes) == 0 {
18521968
return nil
18531969
}

db2_test.go

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import (
3131
"regexp"
3232
"runtime"
3333
"sync"
34+
"sync/atomic"
3435
"testing"
3536
"time"
3637

@@ -1055,3 +1056,97 @@ func TestKeyCount(t *testing.T) {
10551056
require.NoError(t, stream.Orchestrate(context.Background()))
10561057
require.Equal(t, N, uint64(count))
10571058
}
1059+
1060+
func TestDropPrefixNonBlocking(t *testing.T) {
1061+
dir, err := ioutil.TempDir("", "badger-test")
1062+
require.NoError(t, err)
1063+
defer removeDir(dir)
1064+
1065+
db, err := OpenManaged(DefaultOptions(dir).WithAllowStopTheWorld(false))
1066+
require.NoError(t, err)
1067+
defer db.Close()
1068+
1069+
val := []byte("value")
1070+
1071+
// Insert key-values
1072+
write := func() {
1073+
txn := db.NewTransactionAt(1, true)
1074+
defer txn.Discard()
1075+
require.NoError(t, txn.Set([]byte("aaa"), val))
1076+
require.NoError(t, txn.Set([]byte("aab"), val))
1077+
require.NoError(t, txn.Set([]byte("aba"), val))
1078+
require.NoError(t, txn.Set([]byte("aca"), val))
1079+
require.NoError(t, txn.CommitAt(2, nil))
1080+
}
1081+
1082+
read := func() {
1083+
txn := db.NewTransactionAt(6, false)
1084+
defer txn.Discard()
1085+
iterOpts := DefaultIteratorOptions
1086+
iterOpts.Prefix = []byte("aa")
1087+
it := txn.NewIterator(iterOpts)
1088+
defer it.Close()
1089+
1090+
cnt := 0
1091+
for it.Rewind(); it.Valid(); it.Next() {
1092+
fmt.Printf("%+v", it.Item())
1093+
cnt++
1094+
}
1095+
1096+
require.Equal(t, 0, cnt)
1097+
}
1098+
1099+
write()
1100+
prefixes := [][]byte{[]byte("aa")}
1101+
require.NoError(t, db.DropPrefix(prefixes...))
1102+
read()
1103+
}
1104+
1105+
func TestDropPrefixNonBlockingNoError(t *testing.T) {
1106+
dir, err := ioutil.TempDir("", "badger-test")
1107+
require.NoError(t, err)
1108+
defer removeDir(dir)
1109+
1110+
opt := DefaultOptions(dir)
1111+
db, err := OpenManaged(opt)
1112+
require.NoError(t, err)
1113+
defer db.Close()
1114+
1115+
clock := uint64(1)
1116+
1117+
writer := func(db *DB, shouldFail bool, closer *z.Closer) {
1118+
val := []byte("value")
1119+
defer closer.Done()
1120+
// Insert key-values
1121+
for {
1122+
select {
1123+
case <-closer.HasBeenClosed():
1124+
return
1125+
default:
1126+
txn := db.NewTransactionAt(atomic.AddUint64(&clock, 1), true)
1127+
require.NoError(t, txn.SetEntry(NewEntry([]byte("aaa"), val)))
1128+
1129+
err := txn.CommitAt(atomic.AddUint64(&clock, 1), nil)
1130+
if shouldFail && err != nil {
1131+
require.Error(t, err, ErrBlockedWrites)
1132+
} else if !shouldFail {
1133+
require.NoError(t, err)
1134+
}
1135+
txn.Discard()
1136+
}
1137+
}
1138+
}
1139+
1140+
closer := z.NewCloser(1)
1141+
go writer(db, true, closer)
1142+
time.Sleep(time.Millisecond * 100)
1143+
require.NoError(t, db.DropPrefixBlocking([]byte("aa")))
1144+
closer.SignalAndWait()
1145+
1146+
closer2 := z.NewCloser(1)
1147+
go writer(db, false, closer2)
1148+
time.Sleep(time.Millisecond * 50)
1149+
prefixes := [][]byte{[]byte("aa")}
1150+
require.NoError(t, db.DropPrefixNonBlocking(prefixes...))
1151+
closer2.SignalAndWait()
1152+
}

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ go 1.12
77
require (
88
github.com/DataDog/zstd v1.4.6-0.20210216161059-8cb8bacba7ba
99
github.com/cespare/xxhash v1.1.0
10-
github.com/dgraph-io/ristretto v0.0.4-0.20210309073149-3836124cdc5a
10+
github.com/dgraph-io/ristretto v0.0.4-0.20210504190834-0bf2acd73aa3
1111
github.com/dustin/go-humanize v1.0.0
1212
github.com/gogo/protobuf v1.3.2
1313
github.com/golang/protobuf v1.3.1

go.sum

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAE
77
github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8=
88
github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko=
99
github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
10+
github.com/cespare/xxhash/v2 v2.1.1 h1:6MnRN8NT7+YBpUIWxHtefFZOKTAPgGjpQSxqLNn0+qY=
11+
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
1012
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
1113
github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE=
1214
github.com/coreos/go-etcd v2.0.0+incompatible/go.mod h1:Jez6KQU2B/sWsbdaef3ED8NzMklzPG4d5KIOhIy30Tk=
@@ -15,8 +17,8 @@ github.com/cpuguy83/go-md2man v1.0.10/go.mod h1:SmD6nW6nTyfqj6ABTjUi3V3JVMnlJmwc
1517
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
1618
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
1719
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
18-
github.com/dgraph-io/ristretto v0.0.4-0.20210309073149-3836124cdc5a h1:1cMMkx3iegOzbAxVl1ZZQRHk+gaCf33Y5/4I3l0NNSg=
19-
github.com/dgraph-io/ristretto v0.0.4-0.20210309073149-3836124cdc5a/go.mod h1:MIonLggsKgZLUSt414ExgwNtlOL5MuEoAJP514mwGe8=
20+
github.com/dgraph-io/ristretto v0.0.4-0.20210504190834-0bf2acd73aa3 h1:jU/wpYsEL+8JPLf/QcjkQKI5g0dOjSuwcMjkThxt5x0=
21+
github.com/dgraph-io/ristretto v0.0.4-0.20210504190834-0bf2acd73aa3/go.mod h1:fux0lOrBhrVCJd3lcTHsIJhq1T2rokOu6v9Vcb3Q9ug=
2022
github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 h1:tdlZCpZ/P9DhczCTSixgIKmwPv6+wP5DGjqLYw5SUiA=
2123
github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw=
2224
github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo=

options.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,9 @@ type Options struct {
104104
// ChecksumVerificationMode decides when db should verify checksums for SSTable blocks.
105105
ChecksumVerificationMode options.ChecksumVerificationMode
106106

107+
// AllowStopTheWorld determines whether the DropPrefix will be blocking/non-blocking.
108+
AllowStopTheWorld bool
109+
107110
// DetectConflicts determines whether the transactions would be checked for
108111
// conflicts. The transactions can be processed at a higher rate when
109112
// conflict detection is disabled.
@@ -140,6 +143,7 @@ func DefaultOptions(path string) Options {
140143
MaxLevels: 7,
141144
NumGoroutines: 8,
142145
MetricsEnabled: true,
146+
AllowStopTheWorld: true,
143147

144148
NumCompactors: 4, // Run at least 2 compactors. Zero-th compactor prioritizes L0.
145149
NumLevelZeroTables: 5,
@@ -674,6 +678,20 @@ func (opt Options) WithChecksumVerificationMode(cvMode options.ChecksumVerificat
674678
return opt
675679
}
676680

681+
// WithAllowStopTheWorld returns a new Options value with AllowStopTheWorld set to the given value.
682+
//
683+
// AllowStopTheWorld indicates whether the call to DropPrefix should block the writes or not.
684+
// When set to false, the DropPrefix will do a logical delete and will not block
685+
// the writes. Although, this will not immediately clear up the LSM tree.
686+
// When set to false, the DropPrefix will block the writes and will clear up the LSM
687+
// tree.
688+
//
689+
// The default value of AllowStopTheWorld is true.
690+
func (opt Options) WithAllowStopTheWorld(b bool) Options {
691+
opt.AllowStopTheWorld = b
692+
return opt
693+
}
694+
677695
// WithBlockCacheSize returns a new Options value with BlockCacheSize set to the given value.
678696
//
679697
// This value specifies how much data cache should hold in memory. A small size

table/builder.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,16 @@ func NewTableBuilder(opts Options) *Builder {
153153
return b
154154
}
155155

156+
func maxEncodedLen(ctype options.CompressionType, sz int) int {
157+
switch ctype {
158+
case options.Snappy:
159+
return snappy.MaxEncodedLen(sz)
160+
case options.ZSTD:
161+
return y.ZSTDCompressBound(sz)
162+
}
163+
return sz
164+
}
165+
156166
func (b *Builder) handleBlock() {
157167
defer b.wg.Done()
158168

@@ -175,7 +185,7 @@ func (b *Builder) handleBlock() {
175185
// BlockBuf should always less than or equal to allocated space. If the blockBuf is greater
176186
// than allocated space that means the data from this block cannot be stored in its
177187
// existing location.
178-
allocatedSpace := (item.end) + padding + 1
188+
allocatedSpace := maxEncodedLen(b.opts.Compression, (item.end)) + padding + 1
179189
y.AssertTrue(len(blockBuf) <= allocatedSpace)
180190

181191
// blockBuf was allocated on allocator. So, we don't need to copy it over.

0 commit comments

Comments
 (0)