Skip to content

Commit

Permalink
mutation: Encapsulate fields
Browse files Browse the repository at this point in the history
  • Loading branch information
tgrabiec committed Apr 24, 2015
1 parent 2b157a5 commit 1c3275c
Show file tree
Hide file tree
Showing 13 changed files with 73 additions and 63 deletions.
6 changes: 3 additions & 3 deletions cql3/lists.cc
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ lists::setter_by_index::execute(mutation& m, const exploded_clustering_prefix& p

auto idx = net::ntoh(int32_t(*unaligned_cast<int32_t>(index->begin())));

auto existing_list_opt = params.get_prefetched_list(m.key, row_key, column);
auto existing_list_opt = params.get_prefetched_list(m.key(), row_key, column);
if (!existing_list_opt) {
throw exceptions::invalid_request_exception(sprint("List index %d out of bound, list has size 0", idx));
}
Expand Down Expand Up @@ -319,7 +319,7 @@ void
lists::discarder::execute(mutation& m, const exploded_clustering_prefix& prefix, const update_parameters& params) {
assert(column.type->is_multi_cell()); // "Attempted to delete from a frozen list";
auto&& row_key = clustering_key::from_clustering_prefix(*params._schema, prefix);
auto&& existing_list = params.get_prefetched_list(m.key, row_key, column);
auto&& existing_list = params.get_prefetched_list(m.key(), row_key, column);
// We want to call bind before possibly returning to reject queries where the value provided is not a list.
auto&& value = _t->bind(params._options);

Expand Down Expand Up @@ -380,7 +380,7 @@ lists::discarder_by_index::execute(mutation& m, const exploded_clustering_prefix
assert(cvalue);

auto row_key = clustering_key::from_clustering_prefix(*params._schema, prefix);
auto&& existing_list = params.get_prefetched_list(m.key, row_key, column);
auto&& existing_list = params.get_prefetched_list(m.key(), row_key, column);
int32_t idx = read_simple_exactly<int32_t>(*cvalue->_bytes);
if (!existing_list) {
throw exceptions::invalid_request_exception("List does not exist");
Expand Down
2 changes: 1 addition & 1 deletion cql3/statements/batch_statement.hh
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ public:
if (size > warn_threshold) {
std::unordered_set<sstring> ks_cf_pairs;
for (auto&& m : mutations) {
ks_cf_pairs.insert(m.schema->ks_name() + "." + m.schema->cf_name());
ks_cf_pairs.insert(m.schema()->ks_name() + "." + m.schema()->cf_name());
}
const char* format = "Batch of prepared statements for {} is of size {}, exceeding specified threshold of {} by {}.{}";
if (size > fail_threshold) {
Expand Down
2 changes: 1 addition & 1 deletion cql3/statements/delete_statement.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ namespace statements {

void delete_statement::add_update_for_key(mutation& m, const exploded_clustering_prefix& prefix, const update_parameters& params) {
if (_column_operations.empty()) {
m.p.apply_delete(s, prefix, params.make_tombstone());
m.partition().apply_delete(s, prefix, params.make_tombstone());
return;
}

Expand Down
2 changes: 1 addition & 1 deletion cql3/statements/update_statement.cc
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ void update_statement::add_update_for_key(mutation& m, const exploded_clustering
#endif
} else {
if (type == statement_type::INSERT && prefix) {
auto& row = m.p.clustered_row(clustering_key::from_clustering_prefix(*s, prefix));
auto& row = m.partition().clustered_row(clustering_key::from_clustering_prefix(*s, prefix));
row.created_at = params.timestamp();
}
}
Expand Down
14 changes: 10 additions & 4 deletions database.cc
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,12 @@ database::shard_of(const dht::token& t) {
return uint8_t(t._data[0]) % smp::count;
}

unsigned
database::shard_of(const mutation& m) {
auto dk = dht::global_partitioner().decorate_key(m.key());
return shard_of(dk._token);
}

keyspace& database::add_keyspace(sstring name, keyspace k) {
if (_keyspaces.count(name) != 0) {
throw std::invalid_argument("Keyspace " + name + " already exists");
Expand Down Expand Up @@ -419,8 +425,8 @@ database::find_or_create_keyspace(const sstring& name) {

void
column_family::apply(const mutation& m) {
mutation_partition& p = find_or_create_partition(m.key);
p.apply(_schema, m.p);
mutation_partition& p = find_or_create_partition(m.key());
p.apply(_schema, m.partition());
}

// Based on org.apache.cassandra.db.AbstractCell#reconcile()
Expand Down Expand Up @@ -531,8 +537,8 @@ void print_partition(std::ostream& out, const schema& s, const mutation_partitio
}

std::ostream& operator<<(std::ostream& os, const mutation& m) {
fprint(os, "{mutation: schema %p key %s data ", m.schema.get(), static_cast<bytes_view>(m.key));
print_partition(os, *m.schema, m.p);
fprint(os, "{mutation: schema %p key %s data ", m.schema().get(), m.key());
print_partition(os, *m.schema(), m.partition());
os << "}";
return os;
}
Expand Down
1 change: 1 addition & 0 deletions database.hh
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ public:
future<> stop();
void assign(database&& db);
unsigned shard_of(const dht::token& t);
unsigned shard_of(const mutation& m);
future<lw_shared_ptr<query::result>> query(const query::read_command& cmd);
friend std::ostream& operator<<(std::ostream& out, const database& db);
};
Expand Down
20 changes: 10 additions & 10 deletions db/serializer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -225,26 +225,26 @@ db::serializer<mutation>::serializer(const context& ctxt, const mutation & m)
: _ctxt(ctxt), _item(m) {
size_t s = 0;

s += bytes_view_serializer(ctxt, m.key).size();
s += bytes_view_serializer(ctxt, m.key()).size();
s += sizeof(bool); // bool

// schema == null cannot happen (yet). But why not.
if (_item.schema) {
s += uuid_serializer(ctxt, _item.schema->id()).size(); // cf UUID
s += mutation_partition_serializer(ctxt, _item.p).size();
if (_item.schema()) {
s += uuid_serializer(ctxt, _item.schema()->id()).size(); // cf UUID
s += mutation_partition_serializer(ctxt, _item.partition()).size();
}
_size = s;
}

template<>
void db::serializer<mutation>::write(const context& ctxt, output& out,
const type& t) {
bytes_view_serializer::write(ctxt, out, t.key);
out.write(bool(t.schema));
bytes_view_serializer::write(ctxt, out, t.key());
out.write(bool(t.schema()));

if (t.schema) {
uuid_serializer::write(ctxt, out, ctxt.find_uuid(t.schema->ks_name(), t.schema->cf_name()));
mutation_partition_serializer::write(ctxt, out, t.p);
if (t.schema()) {
uuid_serializer::write(ctxt, out, ctxt.find_uuid(t.schema()->ks_name(), t.schema()->cf_name()));
mutation_partition_serializer::write(ctxt, out, t.partition());
}
}

Expand All @@ -254,7 +254,7 @@ mutation db::serializer<mutation>::read(const context& ctxt, input& in) {
if (in.read<bool>()) {
auto sp = ctxt.find_schema(uuid_serializer::read(ctxt, in));
mutation m(key, sp);
mutation_partition_serializer::read(ctxt, m.p, in);
mutation_partition_serializer::read(ctxt, m.partition(), in);
return std::move(m);
}
throw std::runtime_error("Should not reach here (yet)");
Expand Down
12 changes: 6 additions & 6 deletions mutation.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,22 @@


void mutation::set_static_cell(const column_definition& def, atomic_cell_or_collection value) {
update_column(p.static_row(), def, std::move(value));
update_column(_p.static_row(), def, std::move(value));
}

void mutation::set_clustered_cell(const exploded_clustering_prefix& prefix, const column_definition& def, atomic_cell_or_collection value) {
auto& row = p.clustered_row(clustering_key::from_clustering_prefix(*schema, prefix)).cells;
auto& row = _p.clustered_row(clustering_key::from_clustering_prefix(*_schema, prefix)).cells;
update_column(row, def, std::move(value));
}

void mutation::set_clustered_cell(const clustering_key& key, const column_definition& def, atomic_cell_or_collection value) {
auto& row = p.clustered_row(key).cells;
auto& row = _p.clustered_row(key).cells;
update_column(row, def, std::move(value));
}

void mutation::set_cell(const exploded_clustering_prefix& prefix, const bytes& name, const boost::any& value,
api::timestamp_type timestamp, ttl_opt ttl) {
auto column_def = schema->get_column_definition(name);
auto column_def = _schema->get_column_definition(name);
if (!column_def) {
throw std::runtime_error(sprint("no column definition found for '%s'", name));
}
Expand All @@ -48,9 +48,9 @@ mutation::get_cell(const clustering_key& rkey, const column_definition& def) {
return std::experimental::optional<atomic_cell_or_collection>{i->second};
};
if (def.is_static()) {
return find_cell(p.static_row());
return find_cell(_p.static_row());
} else {
auto r = p.find_row(rkey);
auto r = _p.find_row(rkey);
if (!r) {
return {};
}
Expand Down
20 changes: 12 additions & 8 deletions mutation.hh
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,15 @@
#include "schema.hh"

class mutation final {
private:
schema_ptr _schema;
const partition_key _key;
mutation_partition _p;
public:
schema_ptr schema;
partition_key key;
mutation_partition p;
public:
mutation(partition_key key_, schema_ptr schema_)
: schema(std::move(schema_))
, key(std::move(key_))
, p(schema)
mutation(partition_key key, schema_ptr schema)
: _schema(std::move(schema))
, _key(std::move(key))
, _p(schema)
{ }
mutation(mutation&&) = default;
mutation(const mutation&) = default;
Expand All @@ -29,6 +29,10 @@ public:
void set_cell(const exploded_clustering_prefix& prefix, const bytes& name, const boost::any& value, api::timestamp_type timestamp, ttl_opt ttl = {});
void set_cell(const exploded_clustering_prefix& prefix, const column_definition& def, atomic_cell_or_collection value);
std::experimental::optional<atomic_cell_or_collection> get_cell(const clustering_key& rkey, const column_definition& def);
const partition_key& key() const { return _key; };
const schema_ptr& schema() const { return _schema; }
const mutation_partition& partition() const { return _p; }
mutation_partition& partition() { return _p; }
private:
static void update_column(row& row, const column_definition& def, atomic_cell_or_collection&& value);
friend std::ostream& operator<<(std::ostream& os, const mutation& m);
Expand Down
12 changes: 6 additions & 6 deletions service/storage_proxy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -467,11 +467,11 @@ namespace service {


future<>
storage_proxy::mutate_locally(const mutation& m, dht::decorated_key& dk) {
auto shard = _db.local().shard_of(dk._token);
storage_proxy::mutate_locally(const mutation& m) {
auto shard = _db.local().shard_of(m);
return _db.invoke_on(shard, [&m] (database& db) -> void {
try {
auto& cf = db.find_column_family(m.schema->id());
auto& cf = db.find_column_family(m.schema()->id());
cf.apply(m);
} catch (no_such_column_family&) {
// TODO: log a warning
Expand All @@ -484,7 +484,7 @@ future<>
storage_proxy::mutate_locally(std::vector<mutation> mutations) {
auto pmut = make_lw_shared(std::move(mutations));
return parallel_for_each(pmut->begin(), pmut->end(), [this, pmut] (const mutation& m) {
auto dk = dht::global_partitioner().decorate_key(m.key);
auto dk = dht::global_partitioner().decorate_key(m.key());
return mutate_locally(m, dk);
}).finally([pmut]{});
}
Expand All @@ -502,9 +502,9 @@ future<>
storage_proxy::mutate(std::vector<mutation> mutations, db::consistency_level cl) {
auto pmut = make_lw_shared(std::move(mutations));
return parallel_for_each(pmut->begin(), pmut->end(), [this, pmut] (const mutation& m) {
auto dk = dht::global_partitioner().decorate_key(m.key);
auto dk = dht::global_partitioner().decorate_key(m.key());
try {
keyspace& ks = _db.local().find_keyspace(m.schema->ks_name());
keyspace& ks = _db.local().find_keyspace(m.schema()->ks_name());
std::vector<gms::inet_address> natural_endpoints = ks.get_replication_strategy().get_natural_endpoints(dk._token);
// FIXME: send it to replicas instead of applying locally
return mutate_locally(m, dk);
Expand Down
36 changes: 18 additions & 18 deletions tests/urchin/mutation_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -56,21 +56,21 @@ BOOST_AUTO_TEST_CASE(test_multi_level_row_tombstones) {
return clustering_key::from_deeply_exploded(*s, v);
};

m.p.apply_row_tombstone(s, make_prefix({1, 2}), tombstone(9, ttl));
BOOST_REQUIRE_EQUAL(m.p.tombstone_for_row(*s, make_key({1, 2, 3})), tombstone(9, ttl));
m.partition().apply_row_tombstone(s, make_prefix({1, 2}), tombstone(9, ttl));
BOOST_REQUIRE_EQUAL(m.partition().tombstone_for_row(*s, make_key({1, 2, 3})), tombstone(9, ttl));

m.p.apply_row_tombstone(s, make_prefix({1, 3}), tombstone(8, ttl));
BOOST_REQUIRE_EQUAL(m.p.tombstone_for_row(*s, make_key({1, 2, 0})), tombstone(9, ttl));
BOOST_REQUIRE_EQUAL(m.p.tombstone_for_row(*s, make_key({1, 3, 0})), tombstone(8, ttl));
m.partition().apply_row_tombstone(s, make_prefix({1, 3}), tombstone(8, ttl));
BOOST_REQUIRE_EQUAL(m.partition().tombstone_for_row(*s, make_key({1, 2, 0})), tombstone(9, ttl));
BOOST_REQUIRE_EQUAL(m.partition().tombstone_for_row(*s, make_key({1, 3, 0})), tombstone(8, ttl));

m.p.apply_row_tombstone(s, make_prefix({1}), tombstone(11, ttl));
BOOST_REQUIRE_EQUAL(m.p.tombstone_for_row(*s, make_key({1, 2, 0})), tombstone(11, ttl));
BOOST_REQUIRE_EQUAL(m.p.tombstone_for_row(*s, make_key({1, 3, 0})), tombstone(11, ttl));
m.partition().apply_row_tombstone(s, make_prefix({1}), tombstone(11, ttl));
BOOST_REQUIRE_EQUAL(m.partition().tombstone_for_row(*s, make_key({1, 2, 0})), tombstone(11, ttl));
BOOST_REQUIRE_EQUAL(m.partition().tombstone_for_row(*s, make_key({1, 3, 0})), tombstone(11, ttl));

m.p.apply_row_tombstone(s, make_prefix({1, 4}), tombstone(6, ttl));
BOOST_REQUIRE_EQUAL(m.p.tombstone_for_row(*s, make_key({1, 2, 0})), tombstone(11, ttl));
BOOST_REQUIRE_EQUAL(m.p.tombstone_for_row(*s, make_key({1, 3, 0})), tombstone(11, ttl));
BOOST_REQUIRE_EQUAL(m.p.tombstone_for_row(*s, make_key({1, 4, 0})), tombstone(11, ttl));
m.partition().apply_row_tombstone(s, make_prefix({1, 4}), tombstone(6, ttl));
BOOST_REQUIRE_EQUAL(m.partition().tombstone_for_row(*s, make_key({1, 2, 0})), tombstone(11, ttl));
BOOST_REQUIRE_EQUAL(m.partition().tombstone_for_row(*s, make_key({1, 3, 0})), tombstone(11, ttl));
BOOST_REQUIRE_EQUAL(m.partition().tombstone_for_row(*s, make_key({1, 4, 0})), tombstone(11, ttl));
}

BOOST_AUTO_TEST_CASE(test_row_tombstone_updates) {
Expand All @@ -88,14 +88,14 @@ BOOST_AUTO_TEST_CASE(test_row_tombstone_updates) {
auto ttl = gc_clock::now() + std::chrono::seconds(1);

mutation m(key, s);
m.p.apply_row_tombstone(s, c_key1_prefix, tombstone(1, ttl));
m.p.apply_row_tombstone(s, c_key2_prefix, tombstone(0, ttl));
m.partition().apply_row_tombstone(s, c_key1_prefix, tombstone(1, ttl));
m.partition().apply_row_tombstone(s, c_key2_prefix, tombstone(0, ttl));

BOOST_REQUIRE_EQUAL(m.p.tombstone_for_row(*s, c_key1), tombstone(1, ttl));
BOOST_REQUIRE_EQUAL(m.p.tombstone_for_row(*s, c_key2), tombstone(0, ttl));
BOOST_REQUIRE_EQUAL(m.partition().tombstone_for_row(*s, c_key1), tombstone(1, ttl));
BOOST_REQUIRE_EQUAL(m.partition().tombstone_for_row(*s, c_key2), tombstone(0, ttl));

m.p.apply_row_tombstone(s, c_key2_prefix, tombstone(1, ttl));
BOOST_REQUIRE_EQUAL(m.p.tombstone_for_row(*s, c_key2), tombstone(1, ttl));
m.partition().apply_row_tombstone(s, c_key2_prefix, tombstone(1, ttl));
BOOST_REQUIRE_EQUAL(m.partition().tombstone_for_row(*s, c_key2), tombstone(1, ttl));
}

BOOST_AUTO_TEST_CASE(test_map_mutations) {
Expand Down
6 changes: 3 additions & 3 deletions tests/urchin/serializer_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -173,9 +173,9 @@ inline bool operator==(const mutation_partition& cp1, const mutation_partition&
}

inline bool operator==(const mutation& m1, const mutation& m2) {
return m1.schema.get() == m2.schema.get()
&& m1.key == m2.key
&& m1.p == m2.p
return m1.schema().get() == m2.schema().get()
&& m1.key() == m2.key()
&& m1.partition() == m2.partition()
;
}

Expand Down
3 changes: 1 addition & 2 deletions thrift/handler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -294,8 +294,7 @@ class CassandraAsyncHandler : public CassandraCobSvIf {
throw make_exception<InvalidRequestException>("Mutation must have either column or deletion");
}
}
auto dk = dht::global_partitioner().decorate_key(m_to_apply.key);
auto shard = _db.local().shard_of(dk._token);
auto shard = _db.local().shard_of(m_to_apply);
return _db.invoke_on(shard, [this, cf_name, m_to_apply = std::move(m_to_apply)] (database& db) {
auto& cf = db.find_column_family(_ks_name, cf_name);
cf.apply(m_to_apply);
Expand Down

0 comments on commit 1c3275c

Please sign in to comment.