Skip to content

Commit

Permalink
Prefix-based iterating only shows keys in prefix
Browse files Browse the repository at this point in the history
Summary:
MyRocks testing found an issue that while iterating over keys
that are outside the prefix, sometimes wrong results were seen for keys
outside the prefix. We now tighten the range of keys seen with a new
read option called prefix_seen_at_start. This remembers the starting
prefix and then compares it on a Next for equality of prefix. If they
are from a different prefix, it sets valid to false.

Test Plan: PrefixTest.PrefixValid

Reviewers: IslamAbdelRahman, sdong, yhchiang, anthony

Reviewed By: anthony

Subscribers: spetrunia, hermanlee4, yoshinorim, dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D50211
  • Loading branch information
rven1 committed Nov 5, 2015
1 parent 14c6e1a commit 9d50afc
Show file tree
Hide file tree
Showing 6 changed files with 115 additions and 21 deletions.
11 changes: 6 additions & 5 deletions db/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3573,10 +3573,11 @@ Iterator* DBImpl::NewIterator(const ReadOptions& read_options,
#else
SuperVersion* sv = cfd->GetReferencedSuperVersion(&mutex_);
auto iter = new ForwardIterator(this, read_options, cfd, sv);
return NewDBIterator(env_, *cfd->ioptions(), cfd->user_comparator(), iter,
return NewDBIterator(
env_, *cfd->ioptions(), cfd->user_comparator(), iter,
kMaxSequenceNumber,
sv->mutable_cf_options.max_sequential_skip_in_iterations,
read_options.iterate_upper_bound);
read_options.iterate_upper_bound, read_options.prefix_same_as_start);
#endif
} else {
SequenceNumber latest_snapshot = versions_->LastSequence();
Expand Down Expand Up @@ -3631,9 +3632,9 @@ Iterator* DBImpl::NewIterator(const ReadOptions& read_options,
// likely that any iterator pointer is close to the iterator it points to so
// that they are likely to be in the same cache line and/or page.
ArenaWrappedDBIter* db_iter = NewArenaWrappedDbIterator(
env_, *cfd->ioptions(), cfd->user_comparator(),
snapshot, sv->mutable_cf_options.max_sequential_skip_in_iterations,
read_options.iterate_upper_bound);
env_, *cfd->ioptions(), cfd->user_comparator(), snapshot,
sv->mutable_cf_options.max_sequential_skip_in_iterations,
read_options.iterate_upper_bound, read_options.prefix_same_as_start);

InternalIterator* internal_iter =
NewInternalIterator(read_options, cfd, sv, db_iter->GetArena());
Expand Down
44 changes: 34 additions & 10 deletions db/db_iter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ class DBIter: public Iterator {
DBIter(Env* env, const ImmutableCFOptions& ioptions, const Comparator* cmp,
InternalIterator* iter, SequenceNumber s, bool arena_mode,
uint64_t max_sequential_skip_in_iterations,
const Slice* iterate_upper_bound = nullptr)
const Slice* iterate_upper_bound = nullptr,
bool prefix_same_as_start = false)
: arena_mode_(arena_mode),
env_(env),
logger_(ioptions.info_log),
Expand All @@ -74,7 +75,8 @@ class DBIter: public Iterator {
valid_(false),
current_entry_is_merged_(false),
statistics_(ioptions.statistics),
iterate_upper_bound_(iterate_upper_bound) {
iterate_upper_bound_(iterate_upper_bound),
prefix_same_as_start_(prefix_same_as_start) {
RecordTick(statistics_, NO_ITERATORS);
prefix_extractor_ = ioptions.prefix_extractor;
max_skip_ = max_sequential_skip_in_iterations;
Expand Down Expand Up @@ -155,6 +157,8 @@ class DBIter: public Iterator {
Statistics* statistics_;
uint64_t max_skip_;
const Slice* iterate_upper_bound_;
Slice prefix_start_;
bool prefix_same_as_start_;

// No copying allowed
DBIter(const DBIter&);
Expand Down Expand Up @@ -197,6 +201,11 @@ void DBIter::Next() {
RecordTick(statistics_, ITER_BYTES_READ, key().size() + value().size());
}
}
if (valid_ && prefix_extractor_ && prefix_same_as_start_ &&
prefix_extractor_->Transform(saved_key_.GetKey())
.compare(prefix_start_) != 0) {
valid_ = false;
}
}

// PRE: saved_key_ has the current user key if skipping
Expand Down Expand Up @@ -367,6 +376,11 @@ void DBIter::Prev() {
RecordTick(statistics_, ITER_BYTES_READ, key().size() + value().size());
}
}
if (valid_ && prefix_extractor_ && prefix_same_as_start_ &&
prefix_extractor_->Transform(saved_key_.GetKey())
.compare(prefix_start_) != 0) {
valid_ = false;
}
}

void DBIter::ReverseToBackward() {
Expand Down Expand Up @@ -668,6 +682,9 @@ void DBIter::Seek(const Slice& target) {
} else {
valid_ = false;
}
if (valid_ && prefix_extractor_ && prefix_same_as_start_) {
prefix_start_ = prefix_extractor_->Transform(target);
}
}

void DBIter::SeekToFirst() {
Expand Down Expand Up @@ -696,6 +713,9 @@ void DBIter::SeekToFirst() {
} else {
valid_ = false;
}
if (valid_ && prefix_extractor_ && prefix_same_as_start_) {
prefix_start_ = prefix_extractor_->Transform(saved_key_.GetKey());
}
}

void DBIter::SeekToLast() {
Expand Down Expand Up @@ -741,17 +761,21 @@ void DBIter::SeekToLast() {
RecordTick(statistics_, ITER_BYTES_READ, key().size() + value().size());
}
}
if (valid_ && prefix_extractor_ && prefix_same_as_start_) {
prefix_start_ = prefix_extractor_->Transform(saved_key_.GetKey());
}
}

Iterator* NewDBIterator(Env* env, const ImmutableCFOptions& ioptions,
const Comparator* user_key_comparator,
InternalIterator* internal_iter,
const SequenceNumber& sequence,
uint64_t max_sequential_skip_in_iterations,
const Slice* iterate_upper_bound) {
const Slice* iterate_upper_bound,
bool prefix_same_as_start) {
return new DBIter(env, ioptions, user_key_comparator, internal_iter, sequence,
false, max_sequential_skip_in_iterations,
iterate_upper_bound);
iterate_upper_bound, prefix_same_as_start);
}

ArenaWrappedDBIter::~ArenaWrappedDBIter() { db_iter_->~DBIter(); }
Expand Down Expand Up @@ -780,16 +804,16 @@ void ArenaWrappedDBIter::RegisterCleanup(CleanupFunction function, void* arg1,

ArenaWrappedDBIter* NewArenaWrappedDbIterator(
Env* env, const ImmutableCFOptions& ioptions,
const Comparator* user_key_comparator,
const SequenceNumber& sequence,
const Comparator* user_key_comparator, const SequenceNumber& sequence,
uint64_t max_sequential_skip_in_iterations,
const Slice* iterate_upper_bound) {
const Slice* iterate_upper_bound, bool prefix_same_as_start) {
ArenaWrappedDBIter* iter = new ArenaWrappedDBIter();
Arena* arena = iter->GetArena();
auto mem = arena->AllocateAligned(sizeof(DBIter));
DBIter* db_iter = new (mem) DBIter(env, ioptions, user_key_comparator,
nullptr, sequence, true, max_sequential_skip_in_iterations,
iterate_upper_bound);
DBIter* db_iter =
new (mem) DBIter(env, ioptions, user_key_comparator, nullptr, sequence,
true, max_sequential_skip_in_iterations,
iterate_upper_bound, prefix_same_as_start);

iter->SetDBIter(db_iter);

Expand Down
10 changes: 6 additions & 4 deletions db/db_iter.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ extern Iterator* NewDBIterator(Env* env, const ImmutableCFOptions& options,
InternalIterator* internal_iter,
const SequenceNumber& sequence,
uint64_t max_sequential_skip_in_iterations,
const Slice* iterate_upper_bound = nullptr);
const Slice* iterate_upper_bound = nullptr,
bool prefix_same_as_start = false);

// A wrapper iterator which wraps DB Iterator and the arena, with which the DB
// iterator is supposed be allocated. This class is used as an entry point of
Expand Down Expand Up @@ -71,8 +72,9 @@ class ArenaWrappedDBIter : public Iterator {
// Generate the arena wrapped iterator class.
extern ArenaWrappedDBIter* NewArenaWrappedDbIterator(
Env* env, const ImmutableCFOptions& options,
const Comparator* user_key_comparator,
const SequenceNumber& sequence, uint64_t max_sequential_skip_in_iterations,
const Slice* iterate_upper_bound = nullptr);
const Comparator* user_key_comparator, const SequenceNumber& sequence,
uint64_t max_sequential_skip_in_iterations,
const Slice* iterate_upper_bound = nullptr,
bool prefix_same_as_start = false);

} // namespace rocksdb
57 changes: 57 additions & 0 deletions db/prefix_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@ int main() {
#include <gflags/gflags.h>
#include "rocksdb/comparator.h"
#include "rocksdb/db.h"
#include "rocksdb/filter_policy.h"
#include "rocksdb/perf_context.h"
#include "rocksdb/slice_transform.h"
#include "rocksdb/memtablerep.h"
#include "rocksdb/table.h"
#include "util/histogram.h"
#include "util/stop_watch.h"
#include "util/string_util.h"
Expand Down Expand Up @@ -163,6 +165,12 @@ class PrefixTest : public testing::Test {
options.memtable_prefix_bloom_huge_page_tlb_size =
FLAGS_memtable_prefix_bloom_huge_page_tlb_size;

options.prefix_extractor.reset(NewFixedPrefixTransform(8));
BlockBasedTableOptions bbto;
bbto.filter_policy.reset(NewBloomFilterPolicy(10, false));
bbto.whole_key_filtering = false;
options.table_factory.reset(NewBlockBasedTableFactory(bbto));

Status s = DB::Open(options, kDbName, &db);
EXPECT_OK(s);
return std::shared_ptr<DB>(db);
Expand Down Expand Up @@ -393,6 +401,55 @@ TEST_F(PrefixTest, TestResult) {
}
}

// Show results in prefix
TEST_F(PrefixTest, PrefixValid) {
for (int num_buckets = 1; num_buckets <= 2; num_buckets++) {
FirstOption();
while (NextOptions(num_buckets)) {
std::cout << "*** Mem table: " << options.memtable_factory->Name()
<< " number of buckets: " << num_buckets << std::endl;
DestroyDB(kDbName, Options());
auto db = OpenDb();
WriteOptions write_options;
ReadOptions read_options;

// Insert keys with common prefix and one key with different
Slice v16("v16");
Slice v17("v17");
Slice v18("v18");
Slice v19("v19");
PutKey(db.get(), write_options, 12345, 6, v16);
PutKey(db.get(), write_options, 12345, 7, v17);
PutKey(db.get(), write_options, 12345, 8, v18);
PutKey(db.get(), write_options, 12345, 9, v19);
PutKey(db.get(), write_options, 12346, 8, v16);
db->Flush(FlushOptions());
db->Delete(write_options, TestKeyToSlice(TestKey(12346, 8)));
db->Flush(FlushOptions());
read_options.prefix_same_as_start = true;
std::unique_ptr<Iterator> iter(db->NewIterator(read_options));
SeekIterator(iter.get(), 12345, 6);
ASSERT_TRUE(iter->Valid());
ASSERT_TRUE(v16 == iter->value());

iter->Next();
ASSERT_TRUE(iter->Valid());
ASSERT_TRUE(v17 == iter->value());

iter->Next();
ASSERT_TRUE(iter->Valid());
ASSERT_TRUE(v18 == iter->value());

iter->Next();
ASSERT_TRUE(iter->Valid());
ASSERT_TRUE(v19 == iter->value());
iter->Next();
ASSERT_FALSE(iter->Valid());
ASSERT_EQ(kNotFoundResult, Get(db.get(), read_options, 12346, 8));
}
}
}

TEST_F(PrefixTest, DynamicPrefixIterator) {
while (NextOptions(FLAGS_bucket_count)) {
std::cout << "*** Mem table: " << options.memtable_factory->Name()
Expand Down
8 changes: 8 additions & 0 deletions include/rocksdb/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -1290,6 +1290,14 @@ struct ReadOptions {
// this option.
bool total_order_seek;

// Enforce that the iterator only iterates over the same prefix as the seek.
// This option is effective only for prefix seeks, i.e. prefix_extractor is
// non-null for the column family and total_order_seek is false. Unlike
// iterate_upper_bound, prefix_same_as_start only works within a prefix
// but in both directions.
// Default: false
bool prefix_same_as_start;

ReadOptions();
ReadOptions(bool cksum, bool cache);
};
Expand Down
6 changes: 4 additions & 2 deletions util/options.cc
Original file line number Diff line number Diff line change
Expand Up @@ -717,7 +717,8 @@ ReadOptions::ReadOptions()
read_tier(kReadAllTier),
tailing(false),
managed(false),
total_order_seek(false) {
total_order_seek(false),
prefix_same_as_start(false) {
XFUNC_TEST("", "managed_options", managed_options, xf_manage_options,
reinterpret_cast<ReadOptions*>(this));
}
Expand All @@ -730,7 +731,8 @@ ReadOptions::ReadOptions(bool cksum, bool cache)
read_tier(kReadAllTier),
tailing(false),
managed(false),
total_order_seek(false) {
total_order_seek(false),
prefix_same_as_start(false) {
XFUNC_TEST("", "managed_options", managed_options, xf_manage_options,
reinterpret_cast<ReadOptions*>(this));
}
Expand Down

0 comments on commit 9d50afc

Please sign in to comment.