Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support compaction filter #72

Merged
merged 1 commit into from
Dec 22, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 48 additions & 0 deletions db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1155,6 +1155,54 @@ func TestMinReadTs(t *testing.T) {
})
}

func TestCompactionFilter(t *testing.T) {
dir, err := ioutil.TempDir("", "badger")
require.NoError(t, err)
defer os.RemoveAll(dir)
opts := getTestOptions(dir)
opts.ValueThreshold = 8 * 1024
opts.MaxTableSize = 32 * 1024
opts.NumMemtables = 2
opts.NumLevelZeroTables = 1
opts.NumLevelZeroTablesStall = 2
opts.CompactionFilter = func(key, val, userMeta []byte) (skip bool) {
// Only keep the keys with user meta.
if len(userMeta) == 0 {
return true
}
return false
}
db, err := Open(opts)
require.NoError(t, err)
val := make([]byte, 1024*4)
// Insert 100 entries to trigger some compaction.
for i := 0; i < 100; i++ {
db.Update(func(txn *Txn) error {
key := []byte(fmt.Sprintf("key%d", i))
if i%2 == 0 {
txn.Set(key, val)
} else {
txn.SetWithMetaSlice(key, val, []byte{0})
}
return nil
})
}
// The first 50 entries must have been compacted already.
db.View(func(txn *Txn) error {
for i := 0; i < 50; i++ {
key := []byte(fmt.Sprintf("key%d", i))
item, _ := txn.Get(key)
if i%2 == 0 {
require.Nil(t, item)
} else {
require.NotNil(t, item)
require.Len(t, item.UserMeta(), 1)
}
}
return nil
})
}

func ExampleOpen() {
dir, err := ioutil.TempDir("", "badger")
if err != nil {
Expand Down
11 changes: 10 additions & 1 deletion levels.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,8 @@ func (s *levelsController) compactBuildTables(level int, cd compactDef, limiter
// would affect the snapshot view guarantee provided by transactions.
minReadTs := s.kv.orc.readMark.MinReadTs()

filter := s.kv.opt.CompactionFilter

var numVersions int
var lastKey, skipKey []byte
var newTables []*table.Table
Expand All @@ -334,7 +336,7 @@ func (s *levelsController) compactBuildTables(level int, cd compactDef, limiter
if start != nil {
it.Seek(start)
}
for ; it.Valid() && (end == nil || y.CompareKeys(it.Key(), end) < 0); {
for it.Valid() && (end == nil || y.CompareKeys(it.Key(), end) < 0) {
timeStart := time.Now()
fileID := s.reserveFileID()
fd, err := y.CreateSyncedFile(table.NewFilename(fileID, s.kv.opt.Dir), false)
Expand Down Expand Up @@ -400,6 +402,13 @@ func (s *levelsController) compactBuildTables(level int, cd compactDef, limiter
}
}
}
if filter != nil {
skip := filter(it.Key(), vs.Value, vs.UserMeta)
if skip {
discardStats.collect(vs)
continue
}
}
numKeys++
builder.Add(it.Key(), it.Value())
}
Expand Down
5 changes: 5 additions & 0 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,11 @@ type Options struct {
TableBuilderOptions options.TableBuilderOptions

ValueLogWriteOptions options.ValueLogWriterOptions

// The compaction process invokes this method for kv that is being compacted. A return value of false
// indicates that the kv should be preserved in the output of this compaction run and a return value
// of true indicates that this key-value should be removed from the output of the compaction.
CompactionFilter func(key, val, userMeta []byte) (skip bool)
}

// DefaultOptions sets a list of recommended options for good performance.
Expand Down