Skip to content

Commit

Permalink
Merge "Add v2 versions of make_forwadable() and make_flat_mutation_re…
Browse files Browse the repository at this point in the history
…ader_from_fragments()" from Botond

"
These two readers are crucial for writing tests for any composable
reader so we need v2 versions of them before we can convert and test the
combined reader (for example). As these two readers are often used in
situations where the payload they deliver is specially crafted for the
test at hand, we keep their v1 versions too to avoid conversion meddling
with the tests.

Tests: unit(dev)
"

* 'forwarding-and-fragment-reader-v2/v1' of https://github.com/denesb/scylla:
  flat_mutation_reader_v2: add make_flat_mutation_reader_from_fragments()
  test/lib/mutation_source_test: don't force v1 reader in reverse run
  mutation_source: add native_version() getter
  flat_mutation_reader_v2: add make_forwardable()
  position_in_partition: add after_key(position_in_partition_view)
  flat_mutation_reader: make_forwardable(): fix indentation
  flat_mutation_reader: make_forwardable(): coroutinize reader
  • Loading branch information
avikivity committed Dec 14, 2021
2 parents be6cfa4 + 39426b1 commit 3ac622b
Show file tree
Hide file tree
Showing 6 changed files with 332 additions and 30 deletions.
293 changes: 263 additions & 30 deletions flat_mutation_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -275,64 +275,154 @@ flat_mutation_reader make_forwardable(flat_mutation_reader m) {
// When resolves, _next is engaged or _end_of_stream is set.
future<> ensure_next() {
if (_next) {
return make_ready_future<>();
co_return;
}
_next = co_await _underlying();
if (!_next) {
_end_of_stream = true;
}
return _underlying().then([this] (auto&& mfo) {
_next = std::move(mfo);
if (!_next) {
}
public:
reader(flat_mutation_reader r) : impl(r.schema(), r.permit()), _underlying(std::move(r)), _current({
position_in_partition(position_in_partition::partition_start_tag_t()),
position_in_partition(position_in_partition::after_static_row_tag_t())
}) { }
virtual future<> fill_buffer() override {
while (!is_buffer_full()) {
co_await ensure_next();
if (is_end_of_stream()) {
break;
}
position_in_partition::less_compare cmp(*_schema);
if (!cmp(_next->position(), _current.end())) {
_end_of_stream = true;
// keep _next, it may be relevant for next range
break;
}
});
if (_next->relevant_for_range(*_schema, _current.start())) {
push_mutation_fragment(std::move(*_next));
}
_next = {};
}
}
virtual future<> fast_forward_to(position_range pr) override {
_current = std::move(pr);
_end_of_stream = false;
forward_buffer_to(_current.start());
return make_ready_future<>();
}
virtual future<> next_partition() override {
_end_of_stream = false;
if (!_next || !_next->is_partition_start()) {
co_await _underlying.next_partition();
_next = {};
}
clear_buffer_to_next_partition();
_current = {
position_in_partition(position_in_partition::partition_start_tag_t()),
position_in_partition(position_in_partition::after_static_row_tag_t())
};
}
virtual future<> fast_forward_to(const dht::partition_range& pr) override {
_end_of_stream = false;
clear_buffer();
_next = {};
_current = {
position_in_partition(position_in_partition::partition_start_tag_t()),
position_in_partition(position_in_partition::after_static_row_tag_t())
};
return _underlying.fast_forward_to(pr);
}
virtual future<> close() noexcept override {
return _underlying.close();
}
};
return make_flat_mutation_reader<reader>(std::move(m));
}

flat_mutation_reader_v2 make_forwardable(flat_mutation_reader_v2 m) {
class reader : public flat_mutation_reader_v2::impl {
flat_mutation_reader_v2 _underlying;
position_range _current;
mutation_fragment_v2_opt _next;
tombstone _active_tombstone;
bool _current_has_content = false;
// When resolves, _next is engaged or _end_of_stream is set.
future<> ensure_next() {
if (_next) {
co_return;
}
_next = co_await _underlying();
if (!_next) {
maybe_emit_end_tombstone();
_end_of_stream = true;
}
}
void maybe_emit_start_tombstone() {
if (!_current_has_content && _active_tombstone) {
_current_has_content = true;
push_mutation_fragment(*_schema, _permit, range_tombstone_change(position_in_partition_view::before_key(_current.start()), _active_tombstone));
}
}
void maybe_emit_end_tombstone() {
if (_active_tombstone) {
push_mutation_fragment(*_schema, _permit, range_tombstone_change(position_in_partition_view::after_key(_current.end()), {}));
}
}
public:
reader(flat_mutation_reader r) : impl(r.schema(), r.permit()), _underlying(std::move(r)), _current({
reader(flat_mutation_reader_v2 r) : impl(r.schema(), r.permit()), _underlying(std::move(r)), _current({
position_in_partition(position_in_partition::partition_start_tag_t()),
position_in_partition(position_in_partition::after_static_row_tag_t())
}) { }
virtual future<> fill_buffer() override {
return repeat([this] {
if (is_buffer_full()) {
return make_ready_future<stop_iteration>(stop_iteration::yes);
while (!is_buffer_full()) {
co_await ensure_next();
if (is_end_of_stream()) {
break;
}
return ensure_next().then([this] {
if (is_end_of_stream()) {
return stop_iteration::yes;
}
position_in_partition::less_compare cmp(*_schema);
if (!cmp(_next->position(), _current.end())) {
_end_of_stream = true;
// keep _next, it may be relevant for next range
return stop_iteration::yes;
position_in_partition::tri_compare cmp(*_schema);
if (cmp(_next->position(), _current.end()) >= 0) {
maybe_emit_start_tombstone();
maybe_emit_end_tombstone();
_end_of_stream = true;
// keep _next, it may be relevant for next range
break;
}
if (_next->relevant_for_range(*_schema, _current.start())) {
if (!_current_has_content && (!_next->is_range_tombstone_change() || cmp(_next->position(), _current.start()) != 0)) {
maybe_emit_start_tombstone();
}
if (_next->relevant_for_range(*_schema, _current.start())) {
push_mutation_fragment(std::move(*_next));
if (_next->is_range_tombstone_change()) {
_active_tombstone = _next->as_range_tombstone_change().tombstone();
}
_next = {};
return stop_iteration::no;
});
});
_current_has_content = true;
push_mutation_fragment(std::move(*_next));
} else if (_next->is_range_tombstone_change()) {
_active_tombstone = _next->as_range_tombstone_change().tombstone();
}
_next = {};
}
}
virtual future<> fast_forward_to(position_range pr) override {
_current = std::move(pr);
_end_of_stream = false;
_current_has_content = false;
forward_buffer_to(_current.start());
return make_ready_future<>();
}
virtual future<> next_partition() override {
_end_of_stream = false;
auto maybe_next_partition = make_ready_future<>();
if (!_next || !_next->is_partition_start()) {
maybe_next_partition = _underlying.next_partition().then([this] {
co_await _underlying.next_partition();
_next = {};
});
}
return maybe_next_partition.then([this] {
clear_buffer_to_next_partition();
_current = {
position_in_partition(position_in_partition::partition_start_tag_t()),
position_in_partition(position_in_partition::after_static_row_tag_t())
};
});
_active_tombstone = {};
_current_has_content = false;
}
virtual future<> fast_forward_to(const dht::partition_range& pr) override {
_end_of_stream = false;
Expand All @@ -342,13 +432,15 @@ flat_mutation_reader make_forwardable(flat_mutation_reader m) {
position_in_partition(position_in_partition::partition_start_tag_t()),
position_in_partition(position_in_partition::after_static_row_tag_t())
};
_active_tombstone = {};
_current_has_content = false;
return _underlying.fast_forward_to(pr);
}
virtual future<> close() noexcept override {
return _underlying.close();
}
};
return make_flat_mutation_reader<reader>(std::move(m));
return make_flat_mutation_reader_v2<reader>(std::move(m));
}

flat_mutation_reader make_nonforwardable(flat_mutation_reader r, bool single_partition) {
Expand Down Expand Up @@ -1008,6 +1100,147 @@ make_flat_mutation_reader_from_fragments(schema_ptr schema, reader_permit permit
return rd;
}

flat_mutation_reader_v2
make_flat_mutation_reader_from_fragments(schema_ptr schema, reader_permit permit, std::deque<mutation_fragment_v2> fragments) {
return make_flat_mutation_reader_from_fragments(std::move(schema), std::move(permit), std::move(fragments), query::full_partition_range);
}

flat_mutation_reader_v2
make_flat_mutation_reader_from_fragments(schema_ptr schema, reader_permit permit, std::deque<mutation_fragment_v2> fragments, const dht::partition_range& pr) {
class reader : public flat_mutation_reader_v2::impl {
std::deque<mutation_fragment_v2> _fragments;
const dht::partition_range* _pr;
dht::ring_position_comparator _cmp;

private:
bool end_of_range() const {
return _fragments.empty() ||
(_fragments.front().is_partition_start() && _pr->after(_fragments.front().as_partition_start().key(), _cmp));
}

void do_fast_forward_to(const dht::partition_range& pr) {
clear_buffer();
_pr = &pr;
_fragments.erase(_fragments.begin(), std::find_if(_fragments.begin(), _fragments.end(), [this] (const mutation_fragment_v2& mf) {
return mf.is_partition_start() && !_pr->before(mf.as_partition_start().key(), _cmp);
}));
_end_of_stream = end_of_range();
}

public:
reader(schema_ptr schema, reader_permit permit, std::deque<mutation_fragment_v2> fragments, const dht::partition_range& pr)
: flat_mutation_reader_v2::impl(std::move(schema), std::move(permit))
, _fragments(std::move(fragments))
, _pr(&pr)
, _cmp(*_schema) {
do_fast_forward_to(*_pr);
}
virtual future<> fill_buffer() override {
while (!(_end_of_stream = end_of_range()) && !is_buffer_full()) {
push_mutation_fragment(std::move(_fragments.front()));
_fragments.pop_front();
}
return make_ready_future<>();
}
virtual future<> next_partition() override {
clear_buffer_to_next_partition();
if (is_buffer_empty()) {
while (!(_end_of_stream = end_of_range()) && !_fragments.front().is_partition_start()) {
_fragments.pop_front();
}
}
return make_ready_future<>();
}
virtual future<> fast_forward_to(position_range pr) override {
throw std::runtime_error("This reader can't be fast forwarded to another range.");
}
virtual future<> fast_forward_to(const dht::partition_range& pr) override {
do_fast_forward_to(pr);
return make_ready_future<>();
}
virtual future<> close() noexcept override {
return make_ready_future<>();
}
};
return make_flat_mutation_reader_v2<reader>(std::move(schema), std::move(permit), std::move(fragments), pr);
}

std::deque<mutation_fragment_v2> reverse_fragments(const schema& schema, reader_permit permit, std::deque<mutation_fragment_v2> fragments) {
std::deque<mutation_fragment_v2> reversed_fragments;

auto it = fragments.begin();
auto end = fragments.end();

while (it != end) {
while (it != end && it->position().region() != partition_region::clustered) {
reversed_fragments.push_back(std::move(*it++));
}
// We need to find a partition-end but let's be flexible (tests will sometime use incorrect streams).
auto partition_end_it = std::find_if(it, end, [] (auto& mf) { return mf.position().region() != partition_region::clustered; });
auto rit = std::make_reverse_iterator(partition_end_it);
const auto rend = std::make_reverse_iterator(it);
for (; rit != rend; ++rit) {
if (rit->is_range_tombstone_change()) {
auto next_rtc_it = std::find_if(rit + 1, rend, std::mem_fn(&mutation_fragment_v2::is_range_tombstone_change));
if (next_rtc_it == rend) {
reversed_fragments.emplace_back(schema, permit, range_tombstone_change(rit->position().reversed(), {}));
} else {
reversed_fragments.emplace_back(schema, permit, range_tombstone_change(rit->position().reversed(), next_rtc_it->as_range_tombstone_change().tombstone()));
}
} else {
reversed_fragments.push_back(std::move(*rit));
}
}
it = partition_end_it;
}

return reversed_fragments;
}

flat_mutation_reader_v2
make_flat_mutation_reader_from_fragments(schema_ptr schema, reader_permit permit, std::deque<mutation_fragment_v2> fragments,
const dht::partition_range& pr, const query::partition_slice& query_slice) {
const auto reversed = query_slice.is_reversed();
if (reversed) {
fragments = reverse_fragments(*schema, permit, std::move(fragments));
}
auto slice = reversed ? query::legacy_reverse_slice_to_native_reverse_slice(*schema, query_slice) : query_slice;

std::optional<clustering_ranges_walker> ranges_walker;
std::deque<mutation_fragment_v2> filtered;
std::optional<range_tombstone_change> activa_tombstone;
for (auto&& mf : fragments) {
clustering_ranges_walker::progress p{.contained = true};

switch (mf.mutation_fragment_kind()) {
case mutation_fragment_v2::kind::partition_start:
ranges_walker.emplace(*schema, slice.row_ranges(*schema, mf.as_partition_start().key().key()), false);
[[fallthrough]];
case mutation_fragment_v2::kind::static_row:
break;
case mutation_fragment_v2::kind::clustering_row:
p = ranges_walker->advance_to(mf.position(), ranges_walker->current_tombstone());
break;
case mutation_fragment_v2::kind::range_tombstone_change:
p = ranges_walker->advance_to(mf.position(), mf.as_range_tombstone_change().tombstone());
p.contained = false;
break;
case mutation_fragment_v2::kind::partition_end:
p = ranges_walker->advance_to(mf.position(), ranges_walker->current_tombstone());
p.contained = true;
break;
}

for (auto&& rt : p.rts) {
filtered.emplace_back(*schema, permit, std::move(rt));
}
if (p.contained) {
filtered.push_back(std::move(mf));
}
}
return make_flat_mutation_reader_from_fragments(std::move(schema), permit, std::move(filtered), pr);
}

flat_mutation_reader
make_slicing_filtering_reader(flat_mutation_reader rd, const dht::partition_range& pr, const query::partition_slice& slice) {
class reader : public flat_mutation_reader::impl {
Expand Down
11 changes: 11 additions & 0 deletions flat_mutation_reader_v2.hh
Original file line number Diff line number Diff line change
Expand Up @@ -780,3 +780,14 @@ flat_mutation_reader_v2 upgrade_to_v2(flat_mutation_reader);

// Reads a single partition from a reader. Returns empty optional if there are no more partitions to be read.
future<mutation_opt> read_mutation_from_flat_mutation_reader(flat_mutation_reader_v2&);

flat_mutation_reader_v2 make_forwardable(flat_mutation_reader_v2 m);

flat_mutation_reader_v2
make_flat_mutation_reader_from_fragments(schema_ptr, reader_permit, std::deque<mutation_fragment_v2>);

flat_mutation_reader_v2
make_flat_mutation_reader_from_fragments(schema_ptr, reader_permit, std::deque<mutation_fragment_v2>, const dht::partition_range& pr);

flat_mutation_reader_v2
make_flat_mutation_reader_from_fragments(schema_ptr, reader_permit, std::deque<mutation_fragment_v2>, const dht::partition_range& pr, const query::partition_slice& slice);
3 changes: 3 additions & 0 deletions mutation_reader.hh
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,9 @@ public:
partition_presence_checker make_partition_presence_checker() {
return (*_presence_checker_factory)();
}

enum class version { v1, v2 };
version native_version() const { return _fn ? version::v1 : version::v2; }
};

// Returns a mutation_source which is the sum of given mutation_sources.
Expand Down
5 changes: 5 additions & 0 deletions position_in_partition.hh
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,11 @@ public:
return {partition_region::clustered, bound_weight::after_all_prefixed, &ck};
}

// Returns a view to after_key(pos._ck) if pos.is_clustering_row() else returns pos as-is.
static position_in_partition_view after_key(position_in_partition_view pos) {
return {partition_region::clustered, pos._bound_weight == bound_weight::equal ? bound_weight::after_all_prefixed : pos._bound_weight, pos._ck};
}

static position_in_partition_view before_key(const clustering_key& ck) {
return {partition_region::clustered, bound_weight::before_all_prefixed, &ck};
}
Expand Down
Loading

0 comments on commit 3ac622b

Please sign in to comment.