Skip to content

Commit

Permalink
db: Encapsulate the "row" class
Browse files Browse the repository at this point in the history
Reduces coupling. User's should not rely on the fact that it's an
std::map<>.  It also allows us to extend row's interface with
domain-specific methods, which are a lot easier to discover than free
functions.
  • Loading branch information
tgrabiec committed May 13, 2015
1 parent 56bea44 commit dbc40df
Show file tree
Hide file tree
Showing 10 changed files with 135 additions and 102 deletions.
15 changes: 0 additions & 15 deletions database.cc
Original file line number Diff line number Diff line change
Expand Up @@ -601,21 +601,6 @@ compare_atomic_cell_for_merge(atomic_cell_view left, atomic_cell_view right) {
return 0;
}

void
merge_column(const column_definition& def,
atomic_cell_or_collection& old,
const atomic_cell_or_collection& neww) {
if (def.is_atomic()) {
if (compare_atomic_cell_for_merge(old.as_atomic_cell(), neww.as_atomic_cell()) < 0) {
// FIXME: move()?
old = neww;
}
} else {
auto ct = static_pointer_cast<const collection_type_impl>(def.type);
old = ct->merge(old.as_collection_mutation(), neww.as_collection_mutation());
}
}

future<lw_shared_ptr<query::result>>
column_family::query(const query::read_command& cmd) const {
query::result::builder builder(cmd.slice);
Expand Down
37 changes: 12 additions & 25 deletions mutation.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ mutation::mutation(partition_key key_, schema_ptr schema)
{ }

void mutation::set_static_cell(const column_definition& def, atomic_cell_or_collection value) {
update_column(_p.static_row(), def, std::move(value));
_p.static_row().apply(def, std::move(value));
}

void mutation::set_static_cell(const bytes& name, const boost::any& value, api::timestamp_type timestamp, ttl_opt ttl) {
Expand All @@ -28,12 +28,12 @@ void mutation::set_static_cell(const bytes& name, const boost::any& value, api::
if (!column_def->is_static()) {
throw std::runtime_error(sprint("column '%s' is not static", name));
}
update_column(_p.static_row(), *column_def, atomic_cell::make_live(timestamp, column_def->type->decompose(value), ttl));
_p.static_row().apply(*column_def, atomic_cell::make_live(timestamp, column_def->type->decompose(value), ttl));
}

void mutation::set_clustered_cell(const exploded_clustering_prefix& prefix, const column_definition& def, atomic_cell_or_collection value) {
auto& row = _p.clustered_row(clustering_key::from_clustering_prefix(*_schema, prefix)).cells;
update_column(row, def, std::move(value));
row.apply(def, std::move(value));
}

void mutation::set_clustered_cell(const clustering_key& key, const bytes& name, const boost::any& value,
Expand All @@ -47,7 +47,7 @@ void mutation::set_clustered_cell(const clustering_key& key, const bytes& name,

void mutation::set_clustered_cell(const clustering_key& key, const column_definition& def, atomic_cell_or_collection value) {
auto& row = _p.clustered_row(key).cells;
update_column(row, def, std::move(value));
row.apply(def, std::move(value));
}

void mutation::set_cell(const exploded_clustering_prefix& prefix, const bytes& name, const boost::any& value,
Expand All @@ -71,32 +71,19 @@ void mutation::set_cell(const exploded_clustering_prefix& prefix, const column_d

std::experimental::optional<atomic_cell_or_collection>
mutation::get_cell(const clustering_key& rkey, const column_definition& def) const {
auto find_cell = [&def] (const row& r) {
auto i = r.find(def.id);
if (i == r.end()) {
return std::experimental::optional<atomic_cell_or_collection>{};
}
return std::experimental::optional<atomic_cell_or_collection>{i->second};
};
if (def.is_static()) {
return find_cell(_p.static_row());
const atomic_cell_or_collection* cell = _p.static_row().find_cell(def.id);
if (!cell) {
return {};
}
return { *cell };
} else {
auto r = _p.find_row(rkey);
const row* r = _p.find_row(rkey);
if (!r) {
return {};
}
return find_cell(*r);
}
}

void mutation::update_column(row& row, const column_definition& def, atomic_cell_or_collection&& value) {
// our mutations are not yet immutable
auto id = def.id;
auto i = row.lower_bound(id);
if (i == row.end() || i->first != id) {
row.emplace_hint(i, id, std::move(value));
} else {
merge_column(def, i->second, value);
const atomic_cell_or_collection* cell = r->find_cell(def.id);
return { *cell };
}
}

Expand Down
1 change: 0 additions & 1 deletion mutation.hh
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,5 @@ public:
bool operator==(const mutation&) const;
bool operator!=(const mutation&) const;
private:
static void update_column(row& row, const column_definition& def, atomic_cell_or_collection&& value);
friend std::ostream& operator<<(std::ostream& os, const mutation& m);
};
63 changes: 48 additions & 15 deletions mutation_partition.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,7 @@ mutation_partition::apply(const schema& schema, const mutation_partition& p) {

static auto merge_cells = [] (row& old_row, const row& new_row, auto&& find_column_def) {
for (auto&& new_column : new_row) {
auto col = new_column.first;
auto i = old_row.find(col);
if (i == old_row.end()) {
old_row.emplace_hint(i, new_column);
} else {
auto& old_column = *i;
auto& def = find_column_def(col);
merge_column(def, old_column.second, new_column.second);
}
old_row.apply(new_column.first, new_column.second, find_column_def);
}
};

Expand Down Expand Up @@ -228,22 +220,22 @@ template <typename ColumnDefResolver>
static void get_row_slice(const row& cells, const std::vector<column_id>& columns, tombstone tomb,
ColumnDefResolver&& id_to_def, query::result::row_writer& writer) {
for (auto id : columns) {
auto i = cells.find(id);
if (i == cells.end()) {
const atomic_cell_or_collection* cell = cells.find_cell(id);
if (!cell) {
writer.add_empty();
} else {
auto&& def = id_to_def(id);
if (def.is_atomic()) {
auto c = i->second.as_atomic_cell();
auto c = cell->as_atomic_cell();
if (!c.is_live(tomb)) {
writer.add_empty();
} else {
writer.add(i->second.as_atomic_cell());
writer.add(cell->as_atomic_cell());
}
} else {
auto&& cell = i->second.as_collection_mutation();
auto&& mut = cell->as_collection_mutation();
auto&& ctype = static_pointer_cast<const collection_type_impl>(def.type);
auto m_view = ctype->deserialize_mutation_form(cell);
auto m_view = ctype->deserialize_mutation_form(mut);
m_view.tomb.apply(tomb);
auto m_ser = ctype->serialize_mutation_form_only_live(m_view);
if (ctype->is_empty(m_ser)) {
Expand Down Expand Up @@ -414,3 +406,44 @@ bool mutation_partition::equal(const schema& s, const mutation_partition& p) con

return rows_equal(s, _static_row, p._static_row);
}

void
merge_column(const column_definition& def,
atomic_cell_or_collection& old,
const atomic_cell_or_collection& neww) {
if (def.is_atomic()) {
if (compare_atomic_cell_for_merge(old.as_atomic_cell(), neww.as_atomic_cell()) < 0) {
// FIXME: move()?
old = neww;
}
} else {
auto ct = static_pointer_cast<const collection_type_impl>(def.type);
old = ct->merge(old.as_collection_mutation(), neww.as_collection_mutation());
}
}

void
row::apply(const column_definition& column, atomic_cell_or_collection value) {
// our mutations are not yet immutable
auto id = column.id;
auto i = _cells.lower_bound(id);
if (i == _cells.end() || i->first != id) {
_cells.emplace_hint(i, id, std::move(value));
} else {
merge_column(column, i->second, value);
}
}

void
row::append_cell(column_id id, atomic_cell_or_collection value) {
_cells.emplace_hint(_cells.end(), id, std::move(value));
}

const atomic_cell_or_collection*
row::find_cell(column_id id) const {
auto i = _cells.find(id);
if (i == _cells.end()) {
return nullptr;
}
return &i->second;
}
43 changes: 41 additions & 2 deletions mutation_partition.hh
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,47 @@
#include "query-result-writer.hh"
#include "mutation_partition_view.hh"

// FIXME: Encapsulate
using row = std::map<column_id, atomic_cell_or_collection>;
// Container for cells of a row. Cells are identified by column_id.
//
// Can be used as a range of std::pair<column_id, atomic_cell_or_collection>.
//
class row {
using map_type = std::map<column_id, atomic_cell_or_collection>;
map_type _cells;
public:
using value_type = map_type::value_type;
using iterator = map_type::iterator;
using const_iterator = map_type::const_iterator;
public:
iterator begin() { return _cells.begin(); }
iterator end() { return _cells.end(); }
const_iterator begin() const { return _cells.begin(); }
const_iterator end() const { return _cells.end(); }
size_t size() const { return _cells.size(); }

// Returns a reference to cell's value or throws std::out_of_range
const atomic_cell_or_collection& cell_at(column_id id) const { return _cells.at(id); }

// Returns a pointer to cell's value or nullptr if column is not set.
const atomic_cell_or_collection* find_cell(column_id id) const;
public:
// Merges cell's value into the row.
void apply(const column_definition& column, atomic_cell_or_collection cell);

// Adds cell to the row. The column must not be already set.
void append_cell(column_id id, atomic_cell_or_collection cell);

// Merges given cell into the row.
template <typename ColumnDefinitionResolver>
void apply(column_id id, atomic_cell_or_collection cell, ColumnDefinitionResolver&& resolver) {
auto i = _cells.lower_bound(id);
if (i == _cells.end() || i->first != id) {
_cells.emplace_hint(i, id, std::move(cell));
} else {
merge_column(resolver(id), i->second, std::move(cell));
}
}
};

std::ostream& operator<<(std::ostream& os, const row::value_type& rv);
std::ostream& operator<<(std::ostream& os, const row& r);
Expand Down
18 changes: 4 additions & 14 deletions mutation_partition_applier.hh
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,6 @@ class mutation_partition_applier : public mutation_partition_visitor {
const schema& _schema;
mutation_partition& _p;
deletable_row* _current_row;
private:
template <typename ColumnIdResolver>
void apply_cell(row& r, column_id id, atomic_cell_or_collection c, ColumnIdResolver&& resolver) {
auto i = r.lower_bound(id);
if (i == r.end() || i->first != id) {
r.emplace_hint(i, id, std::move(c));
} else {
merge_column(resolver(id), i->second, std::move(c));
}
}
public:
mutation_partition_applier(const schema& s, mutation_partition& target)
: _schema(s), _p(target) { }
Expand All @@ -32,12 +22,12 @@ public:
}

virtual void accept_static_cell(column_id id, atomic_cell_view cell) override {
apply_cell(_p._static_row, id, atomic_cell_or_collection(cell),
_p._static_row.apply(id, atomic_cell_or_collection(cell),
[this](column_id id) -> const column_definition& { return _schema.static_column_at(id); });
}

virtual void accept_static_cell(column_id id, collection_mutation::view collection) override {
apply_cell(_p._static_row, id, atomic_cell_or_collection(collection),
_p._static_row.apply(id, atomic_cell_or_collection(collection),
[this](column_id id) -> const column_definition& { return _schema.static_column_at(id); });
}

Expand All @@ -53,12 +43,12 @@ public:
}

virtual void accept_row_cell(column_id id, atomic_cell_view cell) override {
apply_cell(_current_row->cells, id, atomic_cell_or_collection(cell),
_current_row->cells.apply(id, atomic_cell_or_collection(cell),
[this](column_id id) -> const column_definition& { return _schema.regular_column_at(id); });
}

virtual void accept_row_cell(column_id id, collection_mutation::view collection) override {
apply_cell(_current_row->cells, id, atomic_cell_or_collection(collection),
_current_row->cells.apply(id, atomic_cell_or_collection(collection),
[this](column_id id) -> const column_definition& { return _schema.regular_column_at(id); });
}
};
8 changes: 4 additions & 4 deletions partition_builder.hh
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,12 @@ public:

virtual void accept_static_cell(column_id id, atomic_cell_view cell) override {
row& r = _partition.static_row();
r.emplace_hint(r.end(), id, atomic_cell_or_collection(cell));
r.append_cell(id, atomic_cell_or_collection(cell));
}

virtual void accept_static_cell(column_id id, collection_mutation::view collection) override {
row& r = _partition.static_row();
r.emplace_hint(r.end(), id, atomic_cell_or_collection(collection));
r.append_cell(id, atomic_cell_or_collection(collection));
}

virtual void accept_row_tombstone(clustering_key_prefix_view prefix, tombstone t) override {
Expand All @@ -48,11 +48,11 @@ public:

virtual void accept_row_cell(column_id id, atomic_cell_view cell) override {
row& r = _current_row->cells;
r.emplace_hint(r.end(), id, atomic_cell_or_collection(cell));
r.append_cell(id, atomic_cell_or_collection(cell));
}

virtual void accept_row_cell(column_id id, collection_mutation::view collection) override {
row& r = _current_row->cells;
r.emplace_hint(r.end(), id, atomic_cell_or_collection(collection));
r.append_cell(id, atomic_cell_or_collection(collection));
}
};
14 changes: 7 additions & 7 deletions tests/urchin/cql_test_env.cc
Original file line number Diff line number Diff line change
Expand Up @@ -136,19 +136,19 @@ class in_memory_cql_env : public cql_test_env {
assert(row != nullptr);
auto col_def = schema->get_column_definition(utf8_type->decompose(column_name));
assert(col_def != nullptr);
auto i = row->find(col_def->id);
if (i == row->end()) {
const atomic_cell_or_collection* cell = row->find_cell(col_def->id);
if (!cell) {
assert(((void)"column not set", 0));
}
bytes actual;
if (!col_def->type->is_multi_cell()) {
auto cell = i->second.as_atomic_cell();
assert(cell.is_live());
actual = { cell.value().begin(), cell.value().end() };
auto c = cell->as_atomic_cell();
assert(c.is_live());
actual = { c.value().begin(), c.value().end() };
} else {
auto cell = i->second.as_collection_mutation();
auto c = cell->as_collection_mutation();
auto type = dynamic_pointer_cast<const collection_type_impl>(col_def->type);
actual = type->to_value(type->deserialize_mutation_form(cell),
actual = type->to_value(type->deserialize_mutation_form(c),
serialization_format::internal());
}
assert(col_def->type->equal(actual, col_def->type->decompose(expected)));
Expand Down
Loading

0 comments on commit dbc40df

Please sign in to comment.