Skip to content

Commit

Permalink
Merge 'Memtable reversing reader: fix computing rt slice, if there wa…
Browse files Browse the repository at this point in the history
…s previously emitted range tombstone.' from Michał Radwański

This PR started by realizing that in the memtable reversing reader, it
never happened on tests that `do_refresh_state` was called with
`last_row` and `last_rts` which are not `std::nullopt`.

Changes
- fix memtable test (`tesst_memtable_with_many_versions_conforms_to_mutation_source`), so that there is a background job forcing state refreshes,
- fix the way rt_slice is computed (was `(last_rts, cr_range_snapshot.end]`, now is `[cr_range_snapshot.start, last_rts)`).

Fixes scylladb#9486

Closes scylladb#9572

* github.com:scylladb/scylla:
  partition_snapshot_reader: fix indentation in fill_buffer
  range_tombstone_list: {lower,upper,}slice share comparator implementation
  test: memtable: add full_compaction in background
  partition_snapshot_reader: fix obtaining rt_slice, if Reversing and _last_rts was set
  range_tombstone_list: add lower_slice
  • Loading branch information
tgrabiec committed Nov 5, 2021
2 parents 60f7615 + ee601b7 commit 31bc1eb
Show file tree
Hide file tree
Showing 5 changed files with 93 additions and 57 deletions.
49 changes: 34 additions & 15 deletions partition_snapshot_reader.hh
Original file line number Diff line number Diff line change
Expand Up @@ -152,10 +152,15 @@ class partition_snapshot_flat_reader : public flat_mutation_reader::impl, public
}

range_tombstone_list::iterator_range rt_slice = [&] () {
const auto& tombstones = v.partition().row_tombstones();
if (last_rts) {
return v.partition().row_tombstones().upper_slice(*_snapshot_schema, *last_rts, bound_view::from_range_end(ck_range_snapshot));
if constexpr (Reversing) {
return tombstones.lower_slice(*_snapshot_schema, bound_view::from_range_start(ck_range_snapshot), *last_rts);
} else {
return tombstones.upper_slice(*_snapshot_schema, *last_rts, bound_view::from_range_end(ck_range_snapshot));
}
} else {
return v.partition().row_tombstones().slice(*_snapshot_schema, ck_range_snapshot);
return tombstones.slice(*_snapshot_schema, ck_range_snapshot);
}
}();
if (rt_slice.begin() != rt_slice.end()) {
Expand Down Expand Up @@ -339,6 +344,9 @@ private:
std::optional<query::clustering_range> opt_reversed_range;

std::optional<position_in_partition> _last_entry;
// When not Reversing, it's .position() of last emitted range tombstone.
// When Reversing, it's .position().reversed() of last emitted range tombstone,
// so that it is usable from functions expecting position in snapshot domain.
std::optional<position_in_partition> _last_rts;
mutation_fragment_opt _next_row;

Expand All @@ -357,6 +365,18 @@ private:
}
}

// If `Reversing`, when we pop_range_tombstone(), a reversed rt is returned (the correct
// one in query clustering order). In order to save progress of reading from range_tombstone_list,
// we need to save the end position of rt (as it was stored in the list). This corresponds to
// the start position, with reversed bound weigth.
static position_in_partition rt_position_in_snapshot_order(const range_tombstone& rt) {
position_in_partition pos(rt.position());
if constexpr (Reversing) {
pos = pos.reversed();
}
return pos;
}

mutation_fragment_opt read_next() {
// We use the names ck_range_snapshot and ck_range_query to denote clustering order.
// ck_range_snapshot uses the snapshot order, while ck_range_query uses the
Expand All @@ -374,15 +394,15 @@ private:

auto mf = _reader.next_range_tombstone(ck_range_snapshot, ck_range_query, _last_entry, _last_rts, pos_view);
if (mf) {
_last_rts = mf->as_range_tombstone().position();
_last_rts = rt_position_in_snapshot_order(mf->as_range_tombstone());
return mf;
}
return std::exchange(_next_row, {});
} else {
_no_more_rows_in_current_range = true;
auto mf = _reader.next_range_tombstone(ck_range_snapshot, ck_range_query, _last_entry, _last_rts, position_in_partition_view::for_range_end(ck_range_query));
if (mf) {
_last_rts = mf->as_range_tombstone().position();
_last_rts = rt_position_in_snapshot_order(mf->as_range_tombstone());
}
return mf;
}
Expand Down Expand Up @@ -450,18 +470,17 @@ public:
}

virtual future<> fill_buffer() override {
// FIXME: indentation
return do_until([this] { return is_end_of_stream() || is_buffer_full(); }, [this] {
_reader.with_reserve([&] {
if (!_static_row_done) {
push_static_row();
on_new_range();
_static_row_done = true;
}
do_fill_buffer();
return do_until([this] { return is_end_of_stream() || is_buffer_full(); }, [this] {
_reader.with_reserve([&] {
if (!_static_row_done) {
push_static_row();
on_new_range();
_static_row_done = true;
}
do_fill_buffer();
});
return make_ready_future<>();
});
return make_ready_future<>();
});
}
virtual future<> next_partition() override {
clear_buffer_to_next_partition();
Expand Down
79 changes: 39 additions & 40 deletions range_tombstone_list.cc
Original file line number Diff line number Diff line change
Expand Up @@ -306,59 +306,58 @@ range_tombstone_list::reverter range_tombstone_list::apply_reversibly(const sche
return rev;
}

namespace {
struct bv_order_by_end {
bound_view::compare less;
bv_order_by_end(const schema& s) : less(s) {}
bool operator()(bound_view v, const range_tombstone_entry& rt) const { return less(v, rt.end_bound()); }
bool operator()(const range_tombstone_entry& rt, bound_view v) const { return less(rt.end_bound(), v); }
};
struct bv_order_by_start {
bound_view::compare less;
bv_order_by_start(const schema& s) : less(s) {}
bool operator()(bound_view v, const range_tombstone_entry& rt) const { return less(v, rt.start_bound()); }
bool operator()(const range_tombstone_entry& rt, bound_view v) const { return less(rt.start_bound(), v); }
};

struct pos_order_by_end {
position_in_partition::less_compare less;
pos_order_by_end(const schema& s) : less(s) {}
bool operator()(position_in_partition_view v, const range_tombstone_entry& rt) const { return less(v, rt.end_position()); }
bool operator()(const range_tombstone_entry& rt, position_in_partition_view v) const { return less(rt.end_position(), v); }
};
struct pos_order_by_start {
position_in_partition::less_compare less;
pos_order_by_start(const schema& s) : less(s) {}
bool operator()(position_in_partition_view v, const range_tombstone_entry& rt) const { return less(v, rt.position()); }
bool operator()(const range_tombstone_entry& rt, position_in_partition_view v) const { return less(rt.position(), v); }
};
} // namespace

range_tombstone_list::iterator_range
range_tombstone_list::slice(const schema& s, const query::clustering_range& r) const {
auto bv_range = bound_view::from_range(r);
struct order_by_end {
bound_view::compare less;
order_by_end(const schema& s) : less(s) {}
bool operator()(bound_view v, const range_tombstone_entry& rt) const { return less(v, rt.end_bound()); }
bool operator()(const range_tombstone_entry& rt, bound_view v) const { return less(rt.end_bound(), v); }
};
struct order_by_start {
bound_view::compare less;
order_by_start(const schema& s) : less(s) {}
bool operator()(bound_view v, const range_tombstone_entry& rt) const { return less(v, rt.start_bound()); }
bool operator()(const range_tombstone_entry& rt, bound_view v) const { return less(rt.start_bound(), v); }
};
return boost::make_iterator_range(
_tombstones.lower_bound(bv_range.first, order_by_end{s}),
_tombstones.upper_bound(bv_range.second, order_by_start{s}));
_tombstones.lower_bound(bv_range.first, bv_order_by_end{s}),
_tombstones.upper_bound(bv_range.second, bv_order_by_start{s}));
}

range_tombstone_list::iterator_range
range_tombstone_list::slice(const schema& s, position_in_partition_view start, position_in_partition_view end) const {
struct order_by_end {
position_in_partition::less_compare less;
order_by_end(const schema& s) : less(s) {}
bool operator()(position_in_partition_view v, const range_tombstone_entry& rt) const { return less(v, rt.end_position()); }
bool operator()(const range_tombstone_entry& rt, position_in_partition_view v) const { return less(rt.end_position(), v); }
};
struct order_by_start {
position_in_partition::less_compare less;
order_by_start(const schema& s) : less(s) {}
bool operator()(position_in_partition_view v, const range_tombstone_entry& rt) const { return less(v, rt.position()); }
bool operator()(const range_tombstone_entry& rt, position_in_partition_view v) const { return less(rt.position(), v); }
};
return boost::make_iterator_range(
_tombstones.upper_bound(start, order_by_end{s}), // end_position() is exclusive, hence upper_bound()
_tombstones.lower_bound(end, order_by_start{s}));
_tombstones.upper_bound(start, pos_order_by_end{s}), // end_position() is exclusive, hence upper_bound()
_tombstones.lower_bound(end, pos_order_by_start{s}));
}

range_tombstone_list::iterator_range
range_tombstone_list::lower_slice(const schema& s, bound_view start, position_in_partition_view before) const {
return boost::make_iterator_range(
_tombstones.lower_bound(start, bv_order_by_end{s}),
_tombstones.lower_bound(before, pos_order_by_end{s}));
}

range_tombstone_list::iterator_range
range_tombstone_list::upper_slice(const schema& s, position_in_partition_view after, bound_view end) const {
struct pos_order_by_start {
position_in_partition::less_compare less;
pos_order_by_start(const schema& s) : less(s) {}
bool operator()(position_in_partition_view v, const range_tombstone_entry& rt) const { return less(v, rt.position()); }
bool operator()(const range_tombstone_entry& rt, position_in_partition_view v) const { return less(rt.position(), v); }
};
struct bv_order_by_start {
bound_view::compare less;
bv_order_by_start(const schema& s) : less(s) {}
bool operator()(bound_view v, const range_tombstone_entry& rt) const { return less(v, rt.start_bound()); }
bool operator()(const range_tombstone_entry& rt, bound_view v) const { return less(rt.start_bound(), v); }
};
return boost::make_iterator_range(
_tombstones.upper_bound(after, pos_order_by_start{s}),
_tombstones.upper_bound(end, bv_order_by_start{s}));
Expand Down
9 changes: 7 additions & 2 deletions range_tombstone_list.hh
Original file line number Diff line number Diff line change
Expand Up @@ -230,11 +230,16 @@ public:
tombstone search_tombstone_covering(const schema& s, const clustering_key_prefix& key) const;

using iterator_range = boost::iterator_range<const_iterator>;
// Returns range of tombstones which overlap with given range
// Returns range tombstones which overlap with given range
iterator_range slice(const schema& s, const query::clustering_range&) const;
// Returns range tombstones which overlap with [start, end)
iterator_range slice(const schema& s, position_in_partition_view start, position_in_partition_view end) const;
iterator_range upper_slice(const schema& s, position_in_partition_view start, bound_view end) const;

// Returns range tombstones with ends inside [start, before).
iterator_range lower_slice(const schema& s, bound_view start, position_in_partition_view before) const;
// Returns range tombstones with starts inside (after, end].
iterator_range upper_slice(const schema& s, position_in_partition_view after, bound_view end) const;

iterator erase(const_iterator, const_iterator);

// Pops the first element and bans (in theory) further additions
Expand Down
12 changes: 12 additions & 0 deletions test/boost/memtable_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,16 @@ SEASTAR_TEST_CASE(test_memtable_with_many_versions_conforms_to_mutation_source)
};
auto cleanup_readers = defer([&] { clear_readers(); });
std::deque<dht::partition_range> ranges_storage;
lw_shared_ptr<bool> finished = make_lw_shared(false);
auto full_compaction_in_background = seastar::do_until([finished] {return *finished;}, [] {
// do_refresh_state is called when we detect a new partition snapshot version.
// If snapshot version changes in process of reading mutation fragments from a
// clustering range, the partition_snapshot_reader state is refreshed with saved
// last position of emitted row and range tombstone. full_compaction increases the
// change mark.
logalloc::shard_tracker().full_compaction();
return seastar::sleep(100us);
});
run_mutation_source_tests([&] (schema_ptr s, const std::vector<mutation>& muts) {
clear_readers();
mt = make_lw_shared<memtable>(s);
Expand All @@ -115,6 +125,8 @@ SEASTAR_TEST_CASE(test_memtable_with_many_versions_conforms_to_mutation_source)

return mt->as_data_source();
});
*finished = true;
full_compaction_in_background.get();
});
}

Expand Down
1 change: 1 addition & 0 deletions utils/immutable-collection.hh
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ public:
WRAP_METHOD(lower_bound)
WRAP_METHOD(upper_bound)
WRAP_METHOD(slice)
WRAP_METHOD(lower_slice)
WRAP_METHOD(upper_slice)

WRAP_CONST_METHOD(empty)
Expand Down

0 comments on commit 31bc1eb

Please sign in to comment.