Skip to content

Commit

Permalink
tests: fix range tombstone checking and deal with the fallout
Browse files Browse the repository at this point in the history
flat_reader_assertions::produces_range_tombstone() does not actually
check range tombstones beyond the fact that they are in fact range
tombstones (unless non-empty ck_ranges is passed).

Fixing the immediate problem reveals that:

* The assertion logic is not flexible enough to deal with
  creatively-split or creatively-overlapping range tombstones.

* Some existing tests involving range tombstones are in fact wrong:
  some assertions may (at least with some readers) refer to wrong
  tombstones entirely, while others assert wrong things about right
  tombstones.

* Range tombstones in pre-made sstables (such as those read by
  sstable_3_x_test) have deletion time drift, and that now has to be
  somehow dealt with.

This patch (which is not split into smaller ones because that would
either generate unreasonable amount of work towards ensuring
bisectability or entail "temporarily" disabling problematic tests,
which is cheating) contains the following changes:

* flat_reader_assertions check range tombstones more carefully, by
  accumulating both expected and actually-read range tombstones into
  lists and comparing those lists when a partition ends (or when the
  assertion object is destroyed).

* flat_reader_assertions::may_produce_tombstones() can take
  constraining ck_ranges.

* Both flat_reader_assertions and flat_reader_assertions_v2 can be
  instructed to ignore tombstone deletion times, to help with tests that
  read pre-made sstables.

* Affected tests are changed to reflect reality.  Most changes to
  tests make sense; the only one I am not completely sure about is in
  test_uncompressed_filtering_and_forwarding_range_tombstones_read.

Fixes scylladb#9470

Signed-off-by: Michael Livshin <michael.livshin@scylladb.com>
  • Loading branch information
Michael Livshin committed Nov 7, 2021
1 parent e991604 commit 4941e2e
Show file tree
Hide file tree
Showing 3 changed files with 148 additions and 75 deletions.
41 changes: 28 additions & 13 deletions test/boost/sstable_3_x_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -737,12 +737,12 @@ SEASTAR_TEST_CASE(test_uncompressed_filtering_and_forwarding_range_tombstones_re
auto make_tombstone = [] (int64_t ts, int32_t tp) {
return tombstone{api::timestamp_type{ts}, gc_clock::time_point(gc_clock::duration(tp))};
};
auto make_range_tombstone = [] (clustering_key_prefix start, clustering_key_prefix end, tombstone t) {
auto make_range_tombstone = [] (clustering_key_prefix start, clustering_key_prefix end, tombstone t, bound_kind end_bound = bound_kind::incl_end) {
return range_tombstone {
std::move(start),
bound_kind::incl_start,
std::move(end),
bound_kind::incl_end,
end_bound,
t};
};

Expand All @@ -754,7 +754,7 @@ SEASTAR_TEST_CASE(test_uncompressed_filtering_and_forwarding_range_tombstones_re

auto make_assertions = [] (flat_mutation_reader rd) {
rd.set_max_buffer_size(1);
return assert_that(std::move(rd));
return std::move(assert_that(std::move(rd)).ignore_deletion_time());
};

std::array<int32_t, 2> static_row_values {777, 999};
Expand Down Expand Up @@ -802,9 +802,14 @@ SEASTAR_TEST_CASE(test_uncompressed_filtering_and_forwarding_range_tombstones_re
// and step over it so that it doesn't get emitted
r.fast_forward_to(to_full_ck(4471, 4471), to_full_ck(4653, 4653));
for (const auto idx : boost::irange(4471, 4652, 3)) {
r.produces_row(to_full_ck(idx, idx), to_expected(idx))
.produces_range_tombstone(
make_range_tombstone(to_non_full_ck(idx + 1), to_non_full_ck(idx + 2), tomb));
r.produces_row(to_full_ck(idx, idx), to_expected(idx));
if (idx == 4651) {
r.produces_range_tombstone(
make_range_tombstone(to_non_full_ck(idx + 1), to_full_ck(idx + 2, idx + 2), tomb, bound_kind::excl_end));
} else {
r.produces_range_tombstone(
make_range_tombstone(to_non_full_ck(idx + 1), to_non_full_ck(idx + 2), tomb));
}
}
r.produces_end_of_stream();

Expand Down Expand Up @@ -877,9 +882,14 @@ SEASTAR_TEST_CASE(test_uncompressed_filtering_and_forwarding_range_tombstones_re
r.produces_partition_start(to_pkey(pkey))
.produces_static_row({{st_cdef, int32_type->decompose(static_row_values[pkey - 1])}});
for (const auto idx : boost::irange(4471, 4652, 3)) {
r.produces_row(to_full_ck(idx, idx), to_expected(idx))
.produces_range_tombstone(
make_range_tombstone(to_non_full_ck(idx + 1), to_non_full_ck(idx + 2), tomb));
r.produces_row(to_full_ck(idx, idx), to_expected(idx));
if (idx == 4651) {
r.produces_range_tombstone(
make_range_tombstone(to_non_full_ck(idx + 1), to_full_ck(idx + 2, idx + 2), tomb, bound_kind::excl_end));
} else {
r.produces_range_tombstone(
make_range_tombstone(to_non_full_ck(idx + 1), to_non_full_ck(idx + 2), tomb));
}
}

{
Expand Down Expand Up @@ -940,9 +950,14 @@ SEASTAR_TEST_CASE(test_uncompressed_filtering_and_forwarding_range_tombstones_re

r.fast_forward_to(to_non_full_ck(3000), to_non_full_ck(6000));
for (const auto idx : boost::irange(4471, 4652, 3)) {
r.produces_row(to_full_ck(idx, idx), to_expected(idx))
.produces_range_tombstone(
make_range_tombstone(to_non_full_ck(idx + 1), to_non_full_ck(idx + 2), tomb));
r.produces_row(to_full_ck(idx, idx), to_expected(idx));
if (idx == 4651) {
r.produces_range_tombstone(
make_range_tombstone(to_non_full_ck(idx + 1), to_full_ck(idx + 2, idx + 2), tomb, bound_kind::excl_end));
} else {
r.produces_range_tombstone(
make_range_tombstone(to_non_full_ck(idx + 1), to_non_full_ck(idx + 2), tomb));
}
}
r.produces_end_of_stream();

Expand Down Expand Up @@ -1089,7 +1104,7 @@ SEASTAR_TEST_CASE(test_uncompressed_slicing_interleaved_rows_and_rts_read) {

auto make_assertions = [] (flat_mutation_reader rd) {
rd.set_max_buffer_size(1);
return assert_that(std::move(rd));
return std::move(assert_that(std::move(rd)).ignore_deletion_time());
};

auto st_cdef = UNCOMPRESSED_SLICING_INTERLEAVED_ROWS_AND_RTS_SCHEMA->get_column_definition(to_bytes("st"));
Expand Down
140 changes: 94 additions & 46 deletions test/lib/flat_mutation_reader_assertions.hh
Original file line number Diff line number Diff line change
Expand Up @@ -33,26 +33,67 @@ class flat_reader_assertions {
flat_mutation_reader _reader;
dht::partition_range _pr;
range_tombstone_list _tombstones;
range_tombstone_list _expected_tombstones;
bool _check_rts = false;
query::clustering_row_ranges _rt_ck_ranges = {};
bool _ignore_deletion_time = false;
private:
mutation_fragment_opt read_next() {
return _reader().get0();
}

static bool are_tombstones_mergeable(const schema& s, const range_tombstone& a, const range_tombstone& b) {
const auto range_a = position_range(position_in_partition(a.position()), position_in_partition(a.end_position()));
const auto tri_cmp = position_in_partition::tri_compare(s);
return (a.tomb <=> b.tomb) == 0 && (range_a.overlaps(s, b.position(), b.end_position()) ||
tri_cmp(a.end_position(), b.position()) == 0 ||
tri_cmp(b.end_position(), a.position()) == 0);
range_tombstone maybe_drop_deletion_time(const range_tombstone& rt) const {
if (!_ignore_deletion_time) {
return rt;
} else {
return {rt.start, rt.start_kind, rt.end, rt.end_kind, {rt.tomb.timestamp, {}}};
}
}
void check_rts() {
if (_check_rts) {
// If any split ends of range tombstones remain, consume
// them. For finer range restriction the test should call
// may_produce_tombstones() before this happens.
may_produce_tombstones(position_range::full(), _rt_ck_ranges);
testlog.trace("Comparing normalized range tombstones");
if (!_tombstones.equal(*_reader.schema(), _expected_tombstones)) {
BOOST_FAIL(format("Expected {}, but got {}", _expected_tombstones, _tombstones));
}
}
_tombstones.clear();
_expected_tombstones.clear();
_rt_ck_ranges.clear();
_check_rts = false;
}
range_tombstone trim(const range_tombstone& rt, const query::clustering_row_ranges& ck_ranges) const {
bound_view::compare less(*_reader.schema());
auto ret{rt};
for (auto& range : ck_ranges) {
ret.trim(*_reader.schema(), position_in_partition::for_range_start(range), position_in_partition::for_range_end(range));
}
return ret;
}
void apply_rt_unchecked(const range_tombstone& rt_) {
auto rt = maybe_drop_deletion_time(rt_);
_tombstones.apply(*_reader.schema(), rt);
_expected_tombstones.apply(*_reader.schema(), rt);
}

public:
flat_reader_assertions(flat_mutation_reader reader)
: _reader(std::move(reader))
, _tombstones(*_reader.schema())
, _expected_tombstones(*_reader.schema())
{ }

~flat_reader_assertions() {
// make sure to close the reader no matter what, to prevent
// spurios logs about it not being closed if check_rts()
// throws.
try {
check_rts();
} catch (...) {
_reader.close().get();
throw;
}
_reader.close().get();
}

Expand All @@ -65,10 +106,18 @@ public:
_reader = std::move(o._reader);
_pr = std::move(o._pr);
_tombstones = std::move(o._tombstones);
_expected_tombstones = std::move(o._expected_tombstones);
_check_rts = std::move(o._check_rts);
_ignore_deletion_time = std::move(o._ignore_deletion_time);
}
return *this;
}

flat_reader_assertions& ignore_deletion_time(bool ignore = true) {
_ignore_deletion_time = ignore;
return *this;
}

flat_reader_assertions& produces_partition_start(const dht::decorated_key& dk,
std::optional<tombstone> tomb = std::nullopt) {
testlog.trace("Expecting partition start with key {}", dk);
Expand All @@ -86,6 +135,8 @@ public:
BOOST_FAIL(format("Expected: partition start with tombstone {}, got: {}", *tomb, mutation_fragment::printer(*_reader.schema(), *mfopt)));
}
_tombstones.clear();
_expected_tombstones.clear();
_check_rts = false;
return *this;
}

Expand Down Expand Up @@ -120,15 +171,16 @@ public:
return *this;
}

flat_reader_assertions& may_produce_tombstones(position_range range) {
flat_reader_assertions& may_produce_tombstones(position_range range, const query::clustering_row_ranges& ck_ranges = {}) {
while (mutation_fragment* next = _reader.peek().get0()) {
if (next->is_range_tombstone()) {
if (!range.overlaps(*_reader.schema(), next->as_range_tombstone().position(), next->as_range_tombstone().end_position())) {
break;
}
testlog.trace("Received range tombstone: {}", mutation_fragment::printer(*_reader.schema(), *next));
range = position_range(position_in_partition(next->position()), range.end());
_tombstones.apply(*_reader.schema(), _reader().get0()->as_range_tombstone());
auto rt = trim(maybe_drop_deletion_time(_reader().get0()->as_range_tombstone()), ck_ranges);
_tombstones.apply(*_reader.schema(), rt);
} else if (next->is_clustering_row() && next->as_clustering_row().empty()) {
if (!range.contains(*_reader.schema(), next->position())) {
break;
Expand Down Expand Up @@ -255,38 +307,18 @@ public:
}

// If ck_ranges is passed, verifies only that information relevant for ck_ranges matches.
flat_reader_assertions& produces_range_tombstone(const range_tombstone& rt, const query::clustering_row_ranges& ck_ranges = {}) {
testlog.trace("Expect {}", rt);
auto mfo = read_next();
if (!mfo) {
BOOST_FAIL(format("Expected range tombstone {}, but got end of stream", rt));
}
if (!mfo->is_range_tombstone()) {
BOOST_FAIL(format("Expected range tombstone {}, but got {}", rt, mutation_fragment::printer(*_reader.schema(), *mfo)));
}
const schema& s = *_reader.schema();
range_tombstone_list actual_list(s);
actual_list.apply(s, mfo->as_range_tombstone());
_tombstones.apply(s, mfo->as_range_tombstone());
position_in_partition::equal_compare eq(s);
while (mutation_fragment* next = _reader.peek().get0()) {
if (!next->is_range_tombstone() || !are_tombstones_mergeable(s, actual_list.begin()->tombstone(), next->as_range_tombstone())) {
break;
}
auto rt = _reader().get0()->as_range_tombstone();
actual_list.apply(s, rt);
assert(actual_list.size() == 1);
_tombstones.apply(s, rt);
}
{
range_tombstone_list expected_list(s);
expected_list.apply(s, rt);
actual_list.trim(s, ck_ranges);
expected_list.trim(s, ck_ranges);
if (!actual_list.equal(s, expected_list)) {
BOOST_FAIL(format("Expected {}, but got {}", expected_list, actual_list));
}
}
flat_reader_assertions& produces_range_tombstone(const range_tombstone& rt_, const query::clustering_row_ranges& ck_ranges = {}) {
testlog.trace("Expect {}", rt_);
auto rt = trim(maybe_drop_deletion_time(rt_), ck_ranges);
_check_rts = true;
// If looking at any tombstones (which is likely), read them.
// For finer range restriction the test should call
// may_produce_tombstones() before the corresponding
// produces_range_tombstone()
may_produce_tombstones({position_in_partition(rt.position()), position_in_partition(rt.end_position())}, ck_ranges);
testlog.trace("Applying {} to expected range tombstone list", rt);
_expected_tombstones.apply(*_reader.schema(), rt);
_rt_ck_ranges = ck_ranges;
return *this;
}

Expand All @@ -299,7 +331,7 @@ public:
if (!mfopt->is_end_of_partition()) {
BOOST_FAIL(format("Expected partition end but got {}", mutation_fragment::printer(*_reader.schema(), *mfopt)));
}
_tombstones.clear();
check_rts();
return *this;
}

Expand All @@ -313,7 +345,7 @@ public:
BOOST_FAIL(format("Expected {}, but got {}", mutation_fragment::printer(*_reader.schema(), mf), mutation_fragment::printer(*_reader.schema(), *mfopt)));
}
if (mf.is_range_tombstone()) {
_tombstones.apply(*_reader.schema(), mf.as_range_tombstone());
apply_rt_unchecked(mf.as_range_tombstone());
}
return *this;
}
Expand Down Expand Up @@ -344,12 +376,13 @@ public:
if (mfopt->mutation_fragment_kind() != k) {
BOOST_FAIL(format("Expected mutation fragment kind {}, got: {}", k, mfopt->mutation_fragment_kind()));
}
testlog.trace("Received: {}", mutation_fragment::printer(*_reader.schema(), *mfopt));
clustering_key::equality ck_eq(*_reader.schema());
if (!ck_eq(mfopt->key(), ck)) {
BOOST_FAIL(format("Expected key {}, got: {}", ck, mfopt->key()));
}
if (mfopt->is_range_tombstone()) {
_tombstones.apply(*_reader.schema(), mfopt->as_range_tombstone());
apply_rt_unchecked(mfopt->as_range_tombstone());
}
return *this;
}
Expand Down Expand Up @@ -440,6 +473,7 @@ public:
flat_reader_assertions& next_partition() {
testlog.trace("Skip to next partition");
_reader.next_partition().get();
check_rts();
return *this;
}

Expand Down Expand Up @@ -500,10 +534,18 @@ flat_reader_assertions assert_that(flat_mutation_reader r) {
class flat_reader_assertions_v2 {
flat_mutation_reader_v2 _reader;
dht::partition_range _pr;
bool _ignore_deletion_time = false;
private:
mutation_fragment_v2_opt read_next() {
return _reader().get0();
}
range_tombstone_change maybe_drop_deletion_time(const range_tombstone_change& rt) const {
if (!_ignore_deletion_time) {
return rt;
} else {
return {rt.position(), {rt.tombstone().timestamp, {}}};
}
}
public:
flat_reader_assertions_v2(flat_mutation_reader_v2 reader)
: _reader(std::move(reader))
Expand All @@ -521,10 +563,16 @@ public:
_reader.close().get();
_reader = std::move(o._reader);
_pr = std::move(o._pr);
_ignore_deletion_time = std::move(o._ignore_deletion_time);
}
return *this;
}

flat_reader_assertions_v2&& ignore_deletion_time(bool ignore = true) {
_ignore_deletion_time = ignore;
return std::move(*this);
}

flat_reader_assertions_v2& produces_partition_start(const dht::decorated_key& dk,
std::optional<tombstone> tomb = std::nullopt) {
testlog.trace("Expecting partition start with key {}", dk);
Expand Down Expand Up @@ -689,7 +737,7 @@ public:
if (!mfo->is_range_tombstone_change()) {
BOOST_FAIL(format("Expected range tombstone change {}, but got {}", rt, mutation_fragment_v2::printer(*_reader.schema(), *mfo)));
}
if (!mfo->as_range_tombstone_change().equal(*_reader.schema(), rt)) {
if (!maybe_drop_deletion_time(mfo->as_range_tombstone_change()).equal(*_reader.schema(), maybe_drop_deletion_time(rt))) {
BOOST_FAIL(format("Expected {}, but got {}", rt, mutation_fragment_v2::printer(*_reader.schema(), *mfo)));
}
return *this;
Expand Down
Loading

0 comments on commit 4941e2e

Please sign in to comment.