Skip to content

Commit

Permalink
support compaction filter (#72)
Browse files Browse the repository at this point in the history
  • Loading branch information
coocood authored Dec 22, 2018
1 parent 48654df commit 547274d
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 1 deletion.
48 changes: 48 additions & 0 deletions db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1155,6 +1155,54 @@ func TestMinReadTs(t *testing.T) {
})
}

func TestCompactionFilter(t *testing.T) {
dir, err := ioutil.TempDir("", "badger")
require.NoError(t, err)
defer os.RemoveAll(dir)
opts := getTestOptions(dir)
opts.ValueThreshold = 8 * 1024
opts.MaxTableSize = 32 * 1024
opts.NumMemtables = 2
opts.NumLevelZeroTables = 1
opts.NumLevelZeroTablesStall = 2
opts.CompactionFilter = func(key, val, userMeta []byte) (skip bool) {
// Only keep the keys with user meta.
if len(userMeta) == 0 {
return true
}
return false
}
db, err := Open(opts)
require.NoError(t, err)
val := make([]byte, 1024*4)
// Insert 100 entries to trigger some compaction.
for i := 0; i < 100; i++ {
db.Update(func(txn *Txn) error {
key := []byte(fmt.Sprintf("key%d", i))
if i%2 == 0 {
txn.Set(key, val)
} else {
txn.SetWithMetaSlice(key, val, []byte{0})
}
return nil
})
}
// The first 50 entries must have been compacted already.
db.View(func(txn *Txn) error {
for i := 0; i < 50; i++ {
key := []byte(fmt.Sprintf("key%d", i))
item, _ := txn.Get(key)
if i%2 == 0 {
require.Nil(t, item)
} else {
require.NotNil(t, item)
require.Len(t, item.UserMeta(), 1)
}
}
return nil
})
}

func ExampleOpen() {
dir, err := ioutil.TempDir("", "badger")
if err != nil {
Expand Down
11 changes: 10 additions & 1 deletion levels.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,8 @@ func (s *levelsController) compactBuildTables(level int, cd compactDef, limiter
// would affect the snapshot view guarantee provided by transactions.
minReadTs := s.kv.orc.readMark.MinReadTs()

filter := s.kv.opt.CompactionFilter

var numVersions int
var lastKey, skipKey []byte
var newTables []*table.Table
Expand All @@ -334,7 +336,7 @@ func (s *levelsController) compactBuildTables(level int, cd compactDef, limiter
if start != nil {
it.Seek(start)
}
for ; it.Valid() && (end == nil || y.CompareKeys(it.Key(), end) < 0); {
for it.Valid() && (end == nil || y.CompareKeys(it.Key(), end) < 0) {
timeStart := time.Now()
fileID := s.reserveFileID()
fd, err := y.CreateSyncedFile(table.NewFilename(fileID, s.kv.opt.Dir), false)
Expand Down Expand Up @@ -400,6 +402,13 @@ func (s *levelsController) compactBuildTables(level int, cd compactDef, limiter
}
}
}
if filter != nil {
skip := filter(it.Key(), vs.Value, vs.UserMeta)
if skip {
discardStats.collect(vs)
continue
}
}
numKeys++
builder.Add(it.Key(), it.Value())
}
Expand Down
5 changes: 5 additions & 0 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,11 @@ type Options struct {
TableBuilderOptions options.TableBuilderOptions

ValueLogWriteOptions options.ValueLogWriterOptions

// The compaction process invokes this method for kv that is being compacted. A return value of false
// indicates that the kv should be preserved in the output of this compaction run and a return value
// of true indicates that this key-value should be removed from the output of the compaction.
CompactionFilter func(key, val, userMeta []byte) (skip bool)
}

// DefaultOptions sets a list of recommended options for good performance.
Expand Down

0 comments on commit 547274d

Please sign in to comment.