Skip to content

Commit

Permalink
types: move collection_type_impl::mutation(_view) out of collection_t…
Browse files Browse the repository at this point in the history
…ype_impl.

collection_type_impl::mutation became collection_mutation_description.
collection_type_impl::mutation_view became collection_mutation_view_description.
These classes now reside inside collection_mutation.hh.

Additional documentation has been written for these classes.

Related function implementations were moved to collection_mutation.cc.

This makes it easier to generalize these classes to non-frozen UDTs in future commits.
The new names (together with documentation) better describe their purpose.
  • Loading branch information
kbr-scylla committed Oct 25, 2019
1 parent c0d3e6c commit b1d16c1
Show file tree
Hide file tree
Showing 28 changed files with 185 additions and 172 deletions.
5 changes: 3 additions & 2 deletions alternator/executor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
#include "utils/big_decimal.hh"
#include "seastar/json/json_elements.hh"
#include <boost/algorithm/cxx11/any_of.hpp>
#include "collection_mutation.hh"

#include <boost/range/adaptors.hpp>

Expand Down Expand Up @@ -606,8 +607,8 @@ class attribute_collector {
void del(bytes&& name, api::timestamp_type ts) {
add(std::move(name), atomic_cell::make_dead(ts, gc_clock::now()));
}
collection_type_impl::mutation to_mut() {
collection_type_impl::mutation ret;
collection_mutation_description to_mut() {
collection_mutation_description ret;
for (auto&& e : collected) {
ret.cells.emplace_back(e.first, std::move(e.second));
}
Expand Down
95 changes: 90 additions & 5 deletions collection_mutation.cc
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,67 @@ api::timestamp_type collection_type_impl::last_update(collection_mutation_view c
});
}

collection_mutation_description
collection_mutation_view_description::materialize(const collection_type_impl& ctype) const {
collection_mutation_description m;
m.tomb = tomb;
m.cells.reserve(cells.size());
for (auto&& e : cells) {
m.cells.emplace_back(bytes(e.first.begin(), e.first.end()), atomic_cell(*ctype.value_comparator(), e.second));
}
return m;
}

bool collection_mutation_description::compact_and_expire(column_id id, row_tombstone base_tomb, gc_clock::time_point query_time,
can_gc_fn& can_gc, gc_clock::time_point gc_before, compaction_garbage_collector* collector)
{
bool any_live = false;
auto t = tomb;
tombstone purged_tomb;
if (tomb <= base_tomb.regular()) {
tomb = tombstone();
} else if (tomb.deletion_time < gc_before && can_gc(tomb)) {
purged_tomb = tomb;
tomb = tombstone();
}
t.apply(base_tomb.regular());
utils::chunked_vector<std::pair<bytes, atomic_cell>> survivors;
utils::chunked_vector<std::pair<bytes, atomic_cell>> losers;
for (auto&& name_and_cell : cells) {
atomic_cell& cell = name_and_cell.second;
auto cannot_erase_cell = [&] {
return cell.deletion_time() >= gc_before || !can_gc(tombstone(cell.timestamp(), cell.deletion_time()));
};

if (cell.is_covered_by(t, false) || cell.is_covered_by(base_tomb.shadowable().tomb(), false)) {
continue;
}
if (cell.has_expired(query_time)) {
if (cannot_erase_cell()) {
survivors.emplace_back(std::make_pair(
std::move(name_and_cell.first), atomic_cell::make_dead(cell.timestamp(), cell.deletion_time())));
} else if (collector) {
losers.emplace_back(std::pair(
std::move(name_and_cell.first), atomic_cell::make_dead(cell.timestamp(), cell.deletion_time())));
}
} else if (!cell.is_live()) {
if (cannot_erase_cell()) {
survivors.emplace_back(std::move(name_and_cell));
} else if (collector) {
losers.emplace_back(std::move(name_and_cell));
}
} else {
any_live |= true;
survivors.emplace_back(std::move(name_and_cell));
}
}
if (collector) {
collector->collect(id, collection_mutation_description{purged_tomb, std::move(losers)});
}
cells = std::move(survivors);
return any_live;
}

template <typename Iterator>
collection_mutation do_serialize_mutation_form(
const collection_type_impl& ctype,
Expand Down Expand Up @@ -134,16 +195,16 @@ collection_mutation do_serialize_mutation_form(
}

collection_mutation
collection_type_impl::serialize_mutation_form(const mutation& mut) const {
collection_type_impl::serialize_mutation_form(const collection_mutation_description& mut) const {
return do_serialize_mutation_form(*this, mut.tomb, boost::make_iterator_range(mut.cells.begin(), mut.cells.end()));
}

collection_mutation
collection_type_impl::serialize_mutation_form(mutation_view mut) const {
collection_type_impl::serialize_mutation_form(collection_mutation_view_description mut) const {
return do_serialize_mutation_form(*this, mut.tomb, boost::make_iterator_range(mut.cells.begin(), mut.cells.end()));
}

collection_type_impl::serialize_mutation_form_only_live(mutation_view mut, gc_clock::time_point now) const {
collection_type_impl::serialize_mutation_form_only_live(collection_mutation_view_description mut, gc_clock::time_point now) const {
return do_serialize_mutation_form(*this, mut.tomb, mut.cells | boost::adaptors::filtered([t = mut.tomb, now] (auto&& e) {
return e.second.is_live(t, now, false);
}));
Expand All @@ -155,7 +216,7 @@ collection_type_impl::merge(collection_mutation_view a, collection_mutation_view
return b.data.with_linearized([&] (bytes_view b_in) {
auto aa = deserialize_mutation_form(a_in);
auto bb = deserialize_mutation_form(b_in);
mutation_view merged;
collection_mutation_view_description merged;
merged.cells.reserve(aa.cells.size() + bb.cells.size());
using element_type = std::pair<bytes_view, atomic_cell_view>;
auto key_type = name_comparator();
Expand Down Expand Up @@ -199,7 +260,7 @@ collection_type_impl::difference(collection_mutation_view a, collection_mutation
return b.data.with_linearized([&] (bytes_view b_in) {
auto aa = deserialize_mutation_form(a_in);
auto bb = deserialize_mutation_form(b_in);
mutation_view diff;
collection_mutation_view_description diff;
diff.cells.reserve(std::max(aa.cells.size(), bb.cells.size()));
auto key_type = name_comparator();
auto it = bb.cells.begin();
Expand All @@ -221,3 +282,27 @@ collection_type_impl::difference(collection_mutation_view a, collection_mutation
});
});
}

collection_mutation_view_description
collection_type_impl::deserialize_mutation_form(bytes_view in) const {
collection_mutation_view_description ret;
auto has_tomb = read_simple<bool>(in);
if (has_tomb) {
auto ts = read_simple<api::timestamp_type>(in);
auto ttl = read_simple<gc_clock::duration::rep>(in);
ret.tomb = tombstone{ts, gc_clock::time_point(gc_clock::duration(ttl))};
}
auto nr = read_simple<uint32_t>(in);
ret.cells.reserve(nr);
for (uint32_t i = 0; i != nr; ++i) {
// FIXME: we could probably avoid the need for size
auto ksize = read_simple<uint32_t>(in);
auto key = read_simple_bytes(in, ksize);
auto vsize = read_simple<uint32_t>(in);
// value_comparator(), ugh
auto value = atomic_cell_view::from_bytes(value_comparator()->imr_state().type_info(), read_simple_bytes(in, vsize));
ret.cells.emplace_back(key, value);
}
assert(in.empty());
return ret;
}
27 changes: 27 additions & 0 deletions collection_mutation.hh
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,36 @@

#pragma once

#include "utils/chunked_vector.hh"
#include "schema_fwd.hh"
#include "gc_clock.hh"
#include "atomic_cell.hh"

class collection_type_impl;
class compaction_garbage_collector;
class row_tombstone;

// An auxiliary struct used to (de)construct collection_mutations.
// Unlike collection_mutation which is a serialized blob, this struct allows to inspect logical units of information
// (tombstone and cells) inside the mutation easily.
struct collection_mutation_description {
tombstone tomb;
utils::chunked_vector<std::pair<bytes, atomic_cell>> cells;

// Expires cells based on query_time. Expires tombstones based on max_purgeable and gc_before.
// Removes cells covered by tomb or this->tomb.
bool compact_and_expire(column_id id, row_tombstone tomb, gc_clock::time_point query_time,
can_gc_fn&, gc_clock::time_point gc_before, compaction_garbage_collector* collector = nullptr);
};

// Similar to collection_mutation_description, except that it doesn't store the cells' data, only observes it.
struct collection_mutation_view_description {
tombstone tomb;
utils::chunked_vector<std::pair<bytes_view, atomic_cell_view>> cells;

// Copies the observed data, storing it in a collection_mutation_description.
collection_mutation_description materialize(const collection_type_impl&) const;
};

class collection_mutation_view {
public:
Expand Down
4 changes: 2 additions & 2 deletions compaction_garbage_collector.hh
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
#pragma once

#include "schema.hh"
#include "types/collection.hh"
#include "collection_mutation.hh"

class atomic_cell;
class row_marker;
Expand All @@ -31,6 +31,6 @@ class compaction_garbage_collector {
public:
virtual ~compaction_garbage_collector() = default;
virtual void collect(column_id id, atomic_cell) = 0;
virtual void collect(column_id id, collection_type_impl::mutation) = 0;
virtual void collect(column_id id, collection_mutation_description) = 0;
virtual void collect(row_marker) = 0;
};
2 changes: 1 addition & 1 deletion converting_mutation_partition_applier.hh
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ private:
auto old_ctype = static_pointer_cast<const collection_type_impl>(old_type);
auto old_view = old_ctype->deserialize_mutation_form(cell_bv);

collection_type_impl::mutation new_view;
collection_mutation_description new_view;
if (old_view.tomb.timestamp > new_def.dropped_at()) {
new_view.tomb = old_view.tomb;
}
Expand Down
2 changes: 1 addition & 1 deletion cql3/constants.cc
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ constants::literal::prepare(database& db, const sstring& keyspace, ::shared_ptr<

void constants::deleter::execute(mutation& m, const clustering_key_prefix& prefix, const update_parameters& params) {
if (column.type->is_multi_cell()) {
collection_type_impl::mutation coll_m;
collection_mutation_description coll_m;
coll_m.tomb = params.make_tombstone();
auto ctype = static_pointer_cast<const collection_type_impl>(column.type);
m.set_cell(prefix, column, atomic_cell_or_collection::from_collection_mutation(ctype->serialize_mutation_form(coll_m)));
Expand Down
17 changes: 9 additions & 8 deletions cql3/lists.cc
Original file line number Diff line number Diff line change
Expand Up @@ -270,8 +270,8 @@ lists::setter::execute(mutation& m, const clustering_key_prefix& prefix, const u
return;
}
if (column.type->is_multi_cell()) {
// delete + append
collection_type_impl::mutation mut;
// Delete all cells first, then append new ones
collection_mutation_view_description mut;
mut.tomb = params.make_tombstone_just_before();
auto ctype = static_pointer_cast<const list_type_impl>(column.type);
auto col_mut = ctype->serialize_mutation_form(std::move(mut));
Expand Down Expand Up @@ -322,9 +322,10 @@ lists::setter_by_index::execute(mutation& m, const clustering_key_prefix& prefix
throw exceptions::invalid_request_exception(format("List index {:d} out of bound, list has size {:d}",
idx, existing_list.size()));
}

const data_value& eidx_dv = existing_list[idx].first;
bytes eidx = eidx_dv.type()->decompose(eidx_dv);
list_type_impl::mutation mut;
collection_mutation_description mut;
mut.cells.reserve(1);
if (!value) {
mut.cells.emplace_back(std::move(eidx), params.make_dead_cell());
Expand Down Expand Up @@ -355,7 +356,7 @@ lists::setter_by_uuid::execute(mutation& m, const clustering_key_prefix& prefix,

auto ltype = dynamic_pointer_cast<const list_type_impl>(column.type);

list_type_impl::mutation mut;
collection_mutation_description mut;
mut.cells.reserve(1);
mut.cells.emplace_back(to_bytes(*index), params.make_cell(*ltype->value_comparator(), *value, atomic_cell::collection_member::yes));
auto smut = ltype->serialize_mutation_form(mut);
Expand Down Expand Up @@ -390,7 +391,7 @@ lists::do_append(shared_ptr<term> value,
}

auto&& to_add = list_value->_elements;
collection_type_impl::mutation appended;
collection_mutation_description appended;
appended.cells.reserve(to_add.size());
for (auto&& e : to_add) {
auto uuid1 = utils::UUID_gen::get_time_UUID_bytes();
Expand Down Expand Up @@ -422,7 +423,7 @@ lists::prepender::execute(mutation& m, const clustering_key_prefix& prefix, cons
assert(lvalue);
auto time = precision_time::REFERENCE_TIME - (db_clock::now() - precision_time::REFERENCE_TIME);

collection_type_impl::mutation mut;
collection_mutation_description mut;
mut.cells.reserve(lvalue->get_elements().size());
// We reverse the order of insertion, so that the last element gets the lastest time
// (lists are sorted by time)
Expand Down Expand Up @@ -474,7 +475,7 @@ lists::discarder::execute(mutation& m, const clustering_key_prefix& prefix, cons
// the read-before-write this operation requires limits its usefulness on big lists, so in practice
// toDiscard will be small and keeping a list will be more efficient.
auto&& to_discard = lvalue->_elements;
collection_type_impl::mutation mnew;
collection_mutation_description mnew;
for (auto&& cell : elist) {
auto has_value = [&] (bytes_view value) {
return std::find_if(to_discard.begin(), to_discard.end(),
Expand Down Expand Up @@ -520,7 +521,7 @@ lists::discarder_by_index::execute(mutation& m, const clustering_key_prefix& pre
if (idx < 0 || size_t(idx) >= existing_list.size()) {
throw exceptions::invalid_request_exception(format("List index {:d} out of bound, list has size {:d}", idx, existing_list.size()));
}
collection_type_impl::mutation mut;
collection_mutation_description mut;
const data_value& eidx_dv = existing_list[idx].first;
bytes eidx = eidx_dv.type()->decompose(eidx_dv);
mut.cells.emplace_back(std::move(eidx), params.make_dead_cell());
Expand Down
12 changes: 6 additions & 6 deletions cql3/maps.cc
Original file line number Diff line number Diff line change
Expand Up @@ -285,8 +285,8 @@ maps::setter::execute(mutation& m, const clustering_key_prefix& row_key, const u
return;
}
if (column.type->is_multi_cell()) {
// delete + put
collection_type_impl::mutation mut;
// Delete all cells first, then put new ones
collection_mutation_description mut;
mut.tomb = params.make_tombstone_just_before();
auto ctype = static_pointer_cast<const map_type_impl>(column.type);
auto col_mut = ctype->serialize_mutation_form(std::move(mut));
Expand All @@ -312,7 +312,7 @@ maps::setter_by_key::execute(mutation& m, const clustering_key_prefix& prefix, c
}
auto ctype = static_pointer_cast<const map_type_impl>(column.type);
auto avalue = value ? params.make_cell(*ctype->get_values_type(), *value, atomic_cell::collection_member::yes) : params.make_dead_cell();
map_type_impl::mutation update;
collection_mutation_description update;
update.cells.emplace_back(std::move(to_bytes(*key)), std::move(avalue));
// should have been verified as map earlier?
auto col_mut = ctype->serialize_mutation_form(std::move(update));
Expand All @@ -333,12 +333,12 @@ maps::do_put(mutation& m, const clustering_key_prefix& prefix, const update_para
shared_ptr<term> value, const column_definition& column) {
auto map_value = dynamic_pointer_cast<maps::value>(value);
if (column.type->is_multi_cell()) {
collection_type_impl::mutation mut;

if (!value) {
return;
}

collection_mutation_description mut;

auto ctype = static_pointer_cast<const map_type_impl>(column.type);
for (auto&& e : map_value->map) {
mut.cells.emplace_back(e.first, params.make_cell(*ctype->get_values_type(), fragmented_temporary_buffer::view(e.second), atomic_cell::collection_member::yes));
Expand Down Expand Up @@ -367,7 +367,7 @@ maps::discarder_by_key::execute(mutation& m, const clustering_key_prefix& prefix
if (key == constants::UNSET_VALUE) {
throw exceptions::invalid_request_exception("Invalid unset map key");
}
collection_type_impl::mutation mut;
collection_mutation_description mut;
mut.cells.emplace_back(*key->get(params._options), params.make_dead_cell());
auto mtype = static_cast<const map_type_impl*>(column.type.get());
m.set_cell(prefix, column, mtype->serialize_mutation_form(mut));
Expand Down
14 changes: 7 additions & 7 deletions cql3/sets.cc
Original file line number Diff line number Diff line change
Expand Up @@ -261,8 +261,8 @@ sets::setter::execute(mutation& m, const clustering_key_prefix& row_key, const u
return;
}
if (column.type->is_multi_cell()) {
// delete + add
collection_type_impl::mutation mut;
// Delete all cells first, then add new ones
collection_mutation_description mut;
mut.tomb = params.make_tombstone_just_before();
auto ctype = static_pointer_cast<const set_type_impl>(column.type);
auto col_mut = ctype->serialize_mutation_form(std::move(mut));
Expand All @@ -287,13 +287,13 @@ sets::adder::do_add(mutation& m, const clustering_key_prefix& row_key, const upd
auto set_value = dynamic_pointer_cast<sets::value>(std::move(value));
auto set_type = dynamic_pointer_cast<const set_type_impl>(column.type);
if (column.type->is_multi_cell()) {
// FIXME: mutation_view? not compatible with params.make_cell().
collection_type_impl::mutation mut;

if (!set_value || set_value->_elements.empty()) {
return;
}

// FIXME: collection_mutation_view_description? not compatible with params.make_cell().
collection_mutation_description mut;

for (auto&& e : set_value->_elements) {
mut.cells.emplace_back(e, params.make_cell(*set_type->value_comparator(), bytes_view(), atomic_cell::collection_member::yes));
}
Expand All @@ -320,7 +320,7 @@ sets::discarder::execute(mutation& m, const clustering_key_prefix& row_key, cons
return;
}

collection_type_impl::mutation mut;
collection_mutation_description mut;
auto kill = [&] (bytes idx) {
mut.cells.push_back({std::move(idx), params.make_dead_cell()});
};
Expand All @@ -343,7 +343,7 @@ void sets::element_discarder::execute(mutation& m, const clustering_key_prefix&
if (!elt) {
throw exceptions::invalid_request_exception("Invalid null set element");
}
collection_type_impl::mutation mut;
collection_mutation_description mut;
mut.cells.emplace_back(*elt->get(params._options), params.make_dead_cell());
auto ctype = static_pointer_cast<const collection_type_impl>(column.type);
m.set_cell(row_key, column, ctype->serialize_mutation_form(mut));
Expand Down
Loading

0 comments on commit b1d16c1

Please sign in to comment.