Skip to content

Commit

Permalink
sstables: return a mutation_opt instead of a pointer
Browse files Browse the repository at this point in the history
As suggested by Avi, we can return an actual mutation by moving it out of our
consumer. We will encapsulate it within an optional, to handle the cases where
the mutation cannot be found.

Signed-off-by: Glauber Costa <glommer@cloudius-systems.com>
Reviewed-by: Nadav Har'El <nyh@cloudius-systems.com>
  • Loading branch information
glommer committed May 13, 2015
1 parent fa344ea commit f190bea
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 14 deletions.
2 changes: 2 additions & 0 deletions mutation.hh
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,5 @@ public:
private:
friend std::ostream& operator<<(std::ostream& os, const mutation& m);
};

using mutation_opt = std::experimental::optional<mutation>;
25 changes: 12 additions & 13 deletions sstables/partition.cc
Original file line number Diff line number Diff line change
Expand Up @@ -123,12 +123,12 @@ class mp_row_consumer : public row_consumer {
}
};
public:
lw_shared_ptr<mutation> mut;
mutation mut;

mp_row_consumer(const key& key, const schema_ptr _schema)
: _schema(_schema)
, _key(key)
, mut(make_lw_shared<mutation>(partition_key::from_exploded(*_schema, key.explode(*_schema)), _schema))
, mut(partition_key::from_exploded(*_schema, key.explode(*_schema)), _schema)
{ }

void validate_row_marker() {
Expand Down Expand Up @@ -175,20 +175,20 @@ class mp_row_consumer : public row_consumer {
auto ac = make_atomic_cell(timestamp, value, ttl, expiration);

if (col.is_static) {
mut->set_static_cell(*(col.cdef), ac);
mut.set_static_cell(*(col.cdef), ac);
return;
}

auto clustering_prefix = exploded_clustering_prefix(std::move(col.clustering));

if (col.cell.size() == 0) {
auto clustering_key = clustering_key::from_clustering_prefix(*_schema, clustering_prefix);
auto& dr = mut->partition().clustered_row(clustering_key);
auto& dr = mut.partition().clustered_row(clustering_key);
dr.apply(timestamp);
return;
}

mut->set_cell(clustering_prefix, *(col.cdef), atomic_cell_or_collection(ac));
mut.set_cell(clustering_prefix, *(col.cdef), atomic_cell_or_collection(ac));
}

virtual void consume_deleted_cell(bytes_view col_name, sstables::deletion_time deltime) override {
Expand All @@ -202,10 +202,10 @@ class mp_row_consumer : public row_consumer {
auto ac = atomic_cell::make_dead(timestamp, ttl);

if (col.is_static) {
mut->set_static_cell(*(col.cdef), atomic_cell_or_collection(ac));
mut.set_static_cell(*(col.cdef), atomic_cell_or_collection(ac));
} else {
auto clustering_prefix = exploded_clustering_prefix(std::move(col.clustering));
mut->set_cell(clustering_prefix, *(col.cdef), atomic_cell_or_collection(ac));
mut.set_cell(clustering_prefix, *(col.cdef), atomic_cell_or_collection(ac));
}
}
virtual void consume_row_end() override {
Expand All @@ -218,8 +218,7 @@ class mp_row_consumer : public row_consumer {
}
};


future<lw_shared_ptr<mutation>>
future<mutation_opt>
sstables::sstable::convert_row(schema_ptr schema, const sstables::key& key) {

assert(schema);
Expand All @@ -228,7 +227,7 @@ sstables::sstable::convert_row(schema_ptr schema, const sstables::key& key) {
auto token = partitioner.get_token(key_view(key));

if (!filter_has_key(token)) {
return make_ready_future<lw_shared_ptr<mutation>>();
return make_ready_future<mutation_opt>();
}

auto& summary = _summary;
Expand All @@ -240,14 +239,14 @@ sstables::sstable::convert_row(schema_ptr schema, const sstables::key& key) {
summary_idx = gt - 1;
}
if (summary_idx < 0) {
return make_ready_future<lw_shared_ptr<mutation>>();
return make_ready_future<mutation_opt>();
}

auto position = _summary.entries[summary_idx].position;
return read_indexes(position).then([this, schema, &key, token] (auto index_list) {
auto index_idx = this->binary_search(index_list, key, token);
if (index_idx < 0) {
return make_ready_future<lw_shared_ptr<mutation>>();
return make_ready_future<mutation_opt>();
}

auto position = index_list[index_idx].position;
Expand All @@ -260,7 +259,7 @@ sstables::sstable::convert_row(schema_ptr schema, const sstables::key& key) {

return do_with(mp_row_consumer(key, schema), [this, position, end] (auto& c) {
return this->data_consume_rows_at_once(c, position, end).then([&c] {
return make_ready_future<lw_shared_ptr<mutation>>(c.mut);
return make_ready_future<mutation_opt>(std::move(c.mut));
});
});
});
Expand Down
3 changes: 2 additions & 1 deletion sstables/sstables.hh
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include "database.hh"
#include "dht/i_partitioner.hh"
#include "schema.hh"
#include "mutation.hh"

namespace sstables {

Expand Down Expand Up @@ -90,7 +91,7 @@ public:
_generation = generation;
}

future<lw_shared_ptr<mutation>> convert_row(schema_ptr schema, const key& k);
future<mutation_opt> convert_row(schema_ptr schema, const key& k);
private:
static std::unordered_map<version_types, sstring, enum_hash<version_types>> _version_string;
static std::unordered_map<format_types, sstring, enum_hash<format_types>> _format_string;
Expand Down

0 comments on commit f190bea

Please sign in to comment.