Skip to content

Commit

Permalink
WritePrepared Txn: Iterator
Browse files Browse the repository at this point in the history
Summary:
On iterator create, take a snapshot, create a ReadCallback and pass the ReadCallback to the underlying DBIter to check if key is committed.
Closes facebook#2981

Differential Revision: D6001471

Pulled By: yiwu-arbug

fbshipit-source-id: 3565c4cdaf25370ba47008b0e0cb65b31dfe79fe
  • Loading branch information
Yi Wu authored and facebook-github-bot committed Oct 10, 2017
1 parent 5a38e18 commit 8c392a3
Show file tree
Hide file tree
Showing 13 changed files with 417 additions and 225 deletions.
6 changes: 3 additions & 3 deletions db/db_blob_index_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,9 @@ class DBBlobIndexTest : public DBTestBase {
}

ArenaWrappedDBIter* GetBlobIterator() {
return dbfull()->NewIteratorImpl(ReadOptions(), cfd(),
dbfull()->GetLatestSequenceNumber(),
true /*allow_blob*/);
return dbfull()->NewIteratorImpl(
ReadOptions(), cfd(), dbfull()->GetLatestSequenceNumber(),
nullptr /*read_callback*/, true /*allow_blob*/);
}

Options GetTestOptions() {
Expand Down
37 changes: 18 additions & 19 deletions db/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1413,6 +1413,7 @@ Iterator* DBImpl::NewIterator(const ReadOptions& read_options,
}
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
auto cfd = cfh->cfd();
ReadCallback* read_callback = nullptr; // No read callback provided.
if (read_options.managed) {
#ifdef ROCKSDB_LITE
// not supported in lite version
Expand All @@ -1437,16 +1438,14 @@ Iterator* DBImpl::NewIterator(const ReadOptions& read_options,
return NewDBIterator(
env_, read_options, *cfd->ioptions(), cfd->user_comparator(), iter,
kMaxSequenceNumber,
sv->mutable_cf_options.max_sequential_skip_in_iterations);
sv->mutable_cf_options.max_sequential_skip_in_iterations,
read_callback);
#endif
} else {
SequenceNumber latest_snapshot = versions_->LastSequence();
auto snapshot =
read_options.snapshot != nullptr
? reinterpret_cast<const SnapshotImpl*>(read_options.snapshot)
->number_
: latest_snapshot;
return NewIteratorImpl(read_options, cfd, snapshot);
auto snapshot = read_options.snapshot != nullptr
? read_options.snapshot->GetSequenceNumber()
: versions_->LastSequence();
return NewIteratorImpl(read_options, cfd, snapshot, read_callback);
}
// To stop compiler from complaining
return nullptr;
Expand All @@ -1455,6 +1454,7 @@ Iterator* DBImpl::NewIterator(const ReadOptions& read_options,
ArenaWrappedDBIter* DBImpl::NewIteratorImpl(const ReadOptions& read_options,
ColumnFamilyData* cfd,
SequenceNumber snapshot,
ReadCallback* read_callback,
bool allow_blob) {
SuperVersion* sv = cfd->GetReferencedSuperVersion(&mutex_);

Expand Down Expand Up @@ -1503,8 +1503,8 @@ ArenaWrappedDBIter* DBImpl::NewIteratorImpl(const ReadOptions& read_options,
ArenaWrappedDBIter* db_iter = NewArenaWrappedDbIterator(
env_, read_options, *cfd->ioptions(), snapshot,
sv->mutable_cf_options.max_sequential_skip_in_iterations,
sv->version_number, ((read_options.snapshot != nullptr) ? nullptr : this),
cfd, allow_blob);
sv->version_number, read_callback,
((read_options.snapshot != nullptr) ? nullptr : this), cfd, allow_blob);

InternalIterator* internal_iter =
NewInternalIterator(read_options, cfd, sv, db_iter->GetArena(),
Expand All @@ -1522,6 +1522,7 @@ Status DBImpl::NewIterators(
return Status::NotSupported(
"ReadTier::kPersistedData is not yet supported in iterators.");
}
ReadCallback* read_callback = nullptr; // No read callback provided.
iterators->clear();
iterators->reserve(column_families.size());
if (read_options.managed) {
Expand Down Expand Up @@ -1552,21 +1553,19 @@ Status DBImpl::NewIterators(
iterators->push_back(NewDBIterator(
env_, read_options, *cfd->ioptions(), cfd->user_comparator(), iter,
kMaxSequenceNumber,
sv->mutable_cf_options.max_sequential_skip_in_iterations));
sv->mutable_cf_options.max_sequential_skip_in_iterations,
read_callback));
}
#endif
} else {
SequenceNumber latest_snapshot = versions_->LastSequence();
auto snapshot =
read_options.snapshot != nullptr
? reinterpret_cast<const SnapshotImpl*>(read_options.snapshot)
->number_
: latest_snapshot;

auto snapshot = read_options.snapshot != nullptr
? read_options.snapshot->GetSequenceNumber()
: versions_->LastSequence();
for (size_t i = 0; i < column_families.size(); ++i) {
auto* cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(
column_families[i])->cfd();
iterators->push_back(NewIteratorImpl(read_options, cfd, snapshot));
iterators->push_back(
NewIteratorImpl(read_options, cfd, snapshot, read_callback));
}
}

Expand Down
1 change: 1 addition & 0 deletions db/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ class DBImpl : public DB {
ArenaWrappedDBIter* NewIteratorImpl(const ReadOptions& options,
ColumnFamilyData* cfd,
SequenceNumber snapshot,
ReadCallback* read_callback,
bool allow_blob = false);

virtual const Snapshot* GetSnapshot() override;
Expand Down
6 changes: 4 additions & 2 deletions db/db_impl_readonly.cc
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,15 @@ Iterator* DBImplReadOnly::NewIterator(const ReadOptions& read_options,
auto cfd = cfh->cfd();
SuperVersion* super_version = cfd->GetSuperVersion()->Ref();
SequenceNumber latest_snapshot = versions_->LastSequence();
ReadCallback* read_callback = nullptr; // No read callback provided.
auto db_iter = NewArenaWrappedDbIterator(
env_, read_options, *cfd->ioptions(),
(read_options.snapshot != nullptr
? reinterpret_cast<const SnapshotImpl*>(read_options.snapshot)
->number_
: latest_snapshot),
super_version->mutable_cf_options.max_sequential_skip_in_iterations,
super_version->version_number);
super_version->version_number, read_callback);
auto internal_iter =
NewInternalIterator(read_options, cfd, super_version, db_iter->GetArena(),
db_iter->GetRangeDelAggregator());
Expand All @@ -76,6 +77,7 @@ Status DBImplReadOnly::NewIterators(
const ReadOptions& read_options,
const std::vector<ColumnFamilyHandle*>& column_families,
std::vector<Iterator*>* iterators) {
ReadCallback* read_callback = nullptr; // No read callback provided.
if (iterators == nullptr) {
return Status::InvalidArgument("iterators not allowed to be nullptr");
}
Expand All @@ -93,7 +95,7 @@ Status DBImplReadOnly::NewIterators(
->number_
: latest_snapshot),
sv->mutable_cf_options.max_sequential_skip_in_iterations,
sv->version_number);
sv->version_number, read_callback);
auto* internal_iter =
NewInternalIterator(read_options, cfd, sv, db_iter->GetArena(),
db_iter->GetRangeDelAggregator());
Expand Down
47 changes: 31 additions & 16 deletions db/db_iter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,8 @@ class DBIter final: public Iterator {
DBIter(Env* _env, const ReadOptions& read_options,
const ImmutableCFOptions& cf_options, const Comparator* cmp,
InternalIterator* iter, SequenceNumber s, bool arena_mode,
uint64_t max_sequential_skip_in_iterations, bool allow_blob)
uint64_t max_sequential_skip_in_iterations,
ReadCallback* read_callback, bool allow_blob)
: arena_mode_(arena_mode),
env_(_env),
logger_(cf_options.info_log),
Expand All @@ -120,6 +121,7 @@ class DBIter final: public Iterator {
total_order_seek_(read_options.total_order_seek),
range_del_agg_(cf_options.internal_comparator, s,
true /* collapse_deletions */),
read_callback_(read_callback),
allow_blob_(allow_blob) {
RecordTick(statistics_, NO_ITERATORS);
prefix_extractor_ = cf_options.prefix_extractor;
Expand Down Expand Up @@ -226,6 +228,7 @@ class DBIter final: public Iterator {
bool ParseKey(ParsedInternalKey* key);
void MergeValuesNewToOld();
bool TooManyInternalKeysSkipped(bool increment = true);
bool IsVisible(SequenceNumber sequence);

// Temporarily pin the blocks that we encounter until ReleaseTempPinnedData()
// is called
Expand Down Expand Up @@ -293,6 +296,7 @@ class DBIter final: public Iterator {
RangeDelAggregator range_del_agg_;
LocalStatistics local_stats_;
PinnedIteratorsManager pinned_iters_mgr_;
ReadCallback* read_callback_;
bool allow_blob_;
bool is_blob_;

Expand Down Expand Up @@ -408,7 +412,7 @@ void DBIter::FindNextUserEntryInternal(bool skipping, bool prefix_check) {
return;
}

if (ikey_.sequence <= sequence_) {
if (IsVisible(ikey_.sequence)) {
if (skipping && user_comparator_->Compare(ikey_.user_key,
saved_key_.GetUserKey()) <= 0) {
num_skipped++; // skip this entry
Expand Down Expand Up @@ -674,7 +678,7 @@ void DBIter::ReverseToBackward() {
user_comparator_->Compare(ikey.user_key, saved_key_.GetUserKey()) >
0) {
assert(ikey.sequence != kMaxSequenceNumber);
if (ikey.sequence > sequence_) {
if (!IsVisible(ikey.sequence)) {
PERF_COUNTER_ADD(internal_recent_skipped_count, 1);
} else {
PERF_COUNTER_ADD(internal_key_skipped_count, 1);
Expand Down Expand Up @@ -762,7 +766,7 @@ bool DBIter::FindValueForCurrentKey() {
ReleaseTempPinnedData();
TempPinData();
size_t num_skipped = 0;
while (iter_->Valid() && ikey.sequence <= sequence_ &&
while (iter_->Valid() && IsVisible(ikey.sequence) &&
user_comparator_->Equal(ikey.user_key, saved_key_.GetUserKey())) {
if (TooManyInternalKeysSkipped()) {
return false;
Expand Down Expand Up @@ -1001,7 +1005,7 @@ void DBIter::FindPrevUserKey() {
while (iter_->Valid() &&
((cmp = user_comparator_->Compare(ikey.user_key,
saved_key_.GetUserKey())) == 0 ||
(cmp > 0 && ikey.sequence > sequence_))) {
(cmp > 0 && !IsVisible(ikey.sequence)))) {
if (TooManyInternalKeysSkipped()) {
return;
}
Expand All @@ -1019,7 +1023,7 @@ void DBIter::FindPrevUserKey() {
}
}
assert(ikey.sequence != kMaxSequenceNumber);
if (ikey.sequence > sequence_) {
if (!IsVisible(ikey.sequence)) {
PERF_COUNTER_ADD(internal_recent_skipped_count, 1);
} else {
PERF_COUNTER_ADD(internal_key_skipped_count, 1);
Expand All @@ -1041,6 +1045,11 @@ bool DBIter::TooManyInternalKeysSkipped(bool increment) {
return false;
}

bool DBIter::IsVisible(SequenceNumber sequence) {
return sequence <= sequence_ &&
(read_callback_ == nullptr || read_callback_->IsCommitted(sequence));
}

// Skip all unparseable keys
void DBIter::FindParseableKey(ParsedInternalKey* ikey, Direction direction) {
while (iter_->Valid() && !ParseKey(ikey)) {
Expand Down Expand Up @@ -1225,10 +1234,11 @@ Iterator* NewDBIterator(Env* env, const ReadOptions& read_options,
InternalIterator* internal_iter,
const SequenceNumber& sequence,
uint64_t max_sequential_skip_in_iterations,
bool allow_blob) {
DBIter* db_iter = new DBIter(
env, read_options, cf_options, user_key_comparator, internal_iter,
sequence, false, max_sequential_skip_in_iterations, allow_blob);
ReadCallback* read_callback, bool allow_blob) {
DBIter* db_iter =
new DBIter(env, read_options, cf_options, user_key_comparator,
internal_iter, sequence, false,
max_sequential_skip_in_iterations, read_callback, allow_blob);
return db_iter;
}

Expand Down Expand Up @@ -1273,11 +1283,13 @@ void ArenaWrappedDBIter::Init(Env* env, const ReadOptions& read_options,
const ImmutableCFOptions& cf_options,
const SequenceNumber& sequence,
uint64_t max_sequential_skip_in_iteration,
uint64_t version_number, bool allow_blob) {
uint64_t version_number,
ReadCallback* read_callback, bool allow_blob) {
auto mem = arena_.AllocateAligned(sizeof(DBIter));
db_iter_ = new (mem)
DBIter(env, read_options, cf_options, cf_options.user_comparator, nullptr,
sequence, true, max_sequential_skip_in_iteration, allow_blob);
sequence, true, max_sequential_skip_in_iteration, read_callback,
allow_blob);
sv_number_ = version_number;
}

Expand All @@ -1297,7 +1309,7 @@ Status ArenaWrappedDBIter::Refresh() {
SuperVersion* sv = cfd_->GetReferencedSuperVersion(db_impl_->mutex());
Init(env, read_options_, *(cfd_->ioptions()), latest_seq,
sv->mutable_cf_options.max_sequential_skip_in_iterations,
cur_sv_number, allow_blob_);
cur_sv_number, read_callback_, allow_blob_);

InternalIterator* internal_iter = db_impl_->NewInternalIterator(
read_options_, cfd_, sv, &arena_, db_iter_->GetRangeDelAggregator());
Expand All @@ -1313,12 +1325,15 @@ ArenaWrappedDBIter* NewArenaWrappedDbIterator(
Env* env, const ReadOptions& read_options,
const ImmutableCFOptions& cf_options, const SequenceNumber& sequence,
uint64_t max_sequential_skip_in_iterations, uint64_t version_number,
DBImpl* db_impl, ColumnFamilyData* cfd, bool allow_blob) {
ReadCallback* read_callback, DBImpl* db_impl, ColumnFamilyData* cfd,
bool allow_blob) {
ArenaWrappedDBIter* iter = new ArenaWrappedDBIter();
iter->Init(env, read_options, cf_options, sequence,
max_sequential_skip_in_iterations, version_number, allow_blob);
max_sequential_skip_in_iterations, version_number, read_callback,
allow_blob);
if (db_impl != nullptr && cfd != nullptr) {
iter->StoreRefreshInfo(read_options, db_impl, cfd, allow_blob);
iter->StoreRefreshInfo(read_options, db_impl, cfd, read_callback,
allow_blob);
}

return iter;
Expand Down
12 changes: 8 additions & 4 deletions db/db_iter.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ extern Iterator* NewDBIterator(Env* env, const ReadOptions& read_options,
InternalIterator* internal_iter,
const SequenceNumber& sequence,
uint64_t max_sequential_skip_in_iterations,
ReadCallback* read_callback,
bool allow_blob = false);

// A wrapper iterator which wraps DB Iterator and the arena, with which the DB
Expand Down Expand Up @@ -72,13 +73,15 @@ class ArenaWrappedDBIter : public Iterator {
const ImmutableCFOptions& cf_options,
const SequenceNumber& sequence,
uint64_t max_sequential_skip_in_iterations, uint64_t version_number,
bool allow_blob);
ReadCallback* read_callback, bool allow_blob);

void StoreRefreshInfo(const ReadOptions& read_options, DBImpl* db_impl,
ColumnFamilyData* cfd, bool allow_blob) {
ColumnFamilyData* cfd, ReadCallback* read_callback,
bool allow_blob) {
read_options_ = read_options;
db_impl_ = db_impl;
cfd_ = cfd;
read_callback_ = read_callback;
allow_blob_ = allow_blob;
}

Expand All @@ -89,6 +92,7 @@ class ArenaWrappedDBIter : public Iterator {
ColumnFamilyData* cfd_ = nullptr;
DBImpl* db_impl_ = nullptr;
ReadOptions read_options_;
ReadCallback* read_callback_;
bool allow_blob_ = false;
};

Expand All @@ -99,7 +103,7 @@ extern ArenaWrappedDBIter* NewArenaWrappedDbIterator(
Env* env, const ReadOptions& read_options,
const ImmutableCFOptions& cf_options, const SequenceNumber& sequence,
uint64_t max_sequential_skip_in_iterations, uint64_t version_number,
DBImpl* db_impl = nullptr, ColumnFamilyData* cfd = nullptr,
bool allow_blob = false);
ReadCallback* read_callback, DBImpl* db_impl = nullptr,
ColumnFamilyData* cfd = nullptr, bool allow_blob = false);

} // namespace rocksdb
Loading

0 comments on commit 8c392a3

Please sign in to comment.