diff --git a/levels.go b/levels.go index aeabec232..84fd29070 100644 --- a/levels.go +++ b/levels.go @@ -309,7 +309,7 @@ func (s *levelsController) dropPrefixes(prefixes [][]byte) error { } for _, table := range l.tables { - if containsAnyPrefixes(table.Smallest(), table.Biggest(), prefixes) { + if containsAnyPrefixes(table, prefixes) { tableGroup = append(tableGroup, table) } else { finishGroup() @@ -940,24 +940,43 @@ func hasAnyPrefixes(s []byte, listOfPrefixes [][]byte) bool { return false } -func containsPrefix(smallValue, largeValue, prefix []byte) bool { +func containsPrefix(table *table.Table, prefix []byte) bool { + smallValue := table.Smallest() + largeValue := table.Biggest() if bytes.HasPrefix(smallValue, prefix) { return true } if bytes.HasPrefix(largeValue, prefix) { return true } + isPresent := func() bool { + ti := table.NewIterator(0) + defer ti.Close() + // In table iterator's Seek, we assume that key has version in last 8 bytes. We set + // version=0 (ts=math.MaxUint64), so that we don't skip the key prefixed with prefix. + ti.Seek(y.KeyWithTs(prefix, math.MaxUint64)) + if bytes.HasPrefix(ti.Key(), prefix) { + return true + } + return false + } + if bytes.Compare(prefix, smallValue) > 0 && bytes.Compare(prefix, largeValue) < 0 { + // There may be a case when table contains [0x0000,...., 0xffff]. If we are searching for + // k=0x0011, we should not directly infer that k is present. It may not be present. + if !isPresent() { + return false + } return true } return false } -func containsAnyPrefixes(smallValue, largeValue []byte, listOfPrefixes [][]byte) bool { +func containsAnyPrefixes(table *table.Table, listOfPrefixes [][]byte) bool { for _, prefix := range listOfPrefixes { - if containsPrefix(smallValue, largeValue, prefix) { + if containsPrefix(table, prefix) { return true } } diff --git a/levels_test.go b/levels_test.go index f928df28b..76746ff05 100644 --- a/levels_test.go +++ b/levels_test.go @@ -19,6 +19,9 @@ package badger import ( "fmt" "math" + "math/rand" + "os" + "sort" "testing" "time" @@ -1049,3 +1052,42 @@ func TestKeyVersions(t *testing.T) { }) }) } + +func TestTableContainsPrefix(t *testing.T) { + opts := table.Options{ + BlockSize: 4 * 1024, + BloomFalsePositive: 0.01, + } + + buildTable := func(keys []string) *table.Table { + filename := fmt.Sprintf("%s%s%d.sst", os.TempDir(), string(os.PathSeparator), rand.Uint32()) + b := table.NewTableBuilder(opts) + defer b.Close() + + v := []byte("value") + sort.Slice(keys, func(i, j int) bool { + return keys[i] < keys[j] + }) + for _, k := range keys { + b.Add(y.KeyWithTs([]byte(k), 1), y.ValueStruct{Value: v}, 0) + b.Add(y.KeyWithTs([]byte(k), 2), y.ValueStruct{Value: v}, 0) + } + tbl, err := table.CreateTable(filename, b) + require.NoError(t, err) + return tbl + } + + tbl := buildTable([]string{"key1", "key3", "key31", "key32", "key4"}) + defer tbl.DecrRef() + + require.True(t, containsPrefix(tbl, []byte("key"))) + require.True(t, containsPrefix(tbl, []byte("key1"))) + require.True(t, containsPrefix(tbl, []byte("key3"))) + require.True(t, containsPrefix(tbl, []byte("key32"))) + require.True(t, containsPrefix(tbl, []byte("key4"))) + + require.False(t, containsPrefix(tbl, []byte("key0"))) + require.False(t, containsPrefix(tbl, []byte("key2"))) + require.False(t, containsPrefix(tbl, []byte("key323"))) + require.False(t, containsPrefix(tbl, []byte("key5"))) +}