From 5b409baa9940065b8b7236377821ccdb7cbec852 Mon Sep 17 00:00:00 2001 From: Pekka Enberg Date: Thu, 2 Apr 2015 09:30:36 +0300 Subject: [PATCH 01/13] db: Return a vector of mutations for 'create keyspace' Origin supports chaining multiple mutations but we don't. Therefore, return a vector of mutations for 'create keyspace'. Signed-off-by: Pekka Enberg --- db/legacy_schema_tables.cc | 11 ++++++----- db/legacy_schema_tables.hh | 4 ++-- service/migration_manager.hh | 8 +++++++- 3 files changed, 15 insertions(+), 8 deletions(-) diff --git a/db/legacy_schema_tables.cc b/db/legacy_schema_tables.cc index a1c65f5d764c..9dbac421f11a 100644 --- a/db/legacy_schema_tables.cc +++ b/db/legacy_schema_tables.cc @@ -765,14 +765,16 @@ std::vector ALL { KEYSPACES, COLUMNFAMILIES, COLUMNS, TRIGGERS, USE * Keyspace metadata serialization/deserialization. */ - mutation make_create_keyspace_mutation(lw_shared_ptr keyspace, api::timestamp_type timestamp, bool with_tables_and_types_and_functions) + std::vector make_create_keyspace_mutations(lw_shared_ptr keyspace, api::timestamp_type timestamp, bool with_tables_and_types_and_functions) { + std::vector mutations; schema_ptr s = keyspaces(); auto pkey = partition_key::from_exploded(*s, {utf8_type->decompose(keyspace->name)}); mutation m(pkey, s); exploded_clustering_prefix ckey; m.set_cell(ckey, "durable_writes", keyspace->durable_writes, timestamp); m.set_cell(ckey, "strategy_class", keyspace->strategy_name, timestamp); + mutations.emplace_back(std::move(m)); #if 0 adder.add("strategy_options", json(keyspace.strategyOptions)); #endif @@ -783,11 +785,10 @@ std::vector ALL { KEYSPACES, COLUMNFAMILIES, COLUMNS, TRIGGERS, USE addTypeToSchemaMutation(type, timestamp, mutation); #endif for (auto&& kv : keyspace->cf_meta_data()) { - add_table_to_schema_mutation(kv.second, timestamp, true, m); + add_table_to_schema_mutation(kv.second, timestamp, true, pkey, mutations); } } - - return m; + return mutations; } #if 0 @@ -925,7 +926,7 @@ std::vector ALL { KEYSPACES, COLUMNFAMILIES, COLUMNS, TRIGGERS, USE } #endif - void add_table_to_schema_mutation(schema_ptr table, api::timestamp_type timestamp, bool with_columns_and_triggers, mutation& m) + void add_table_to_schema_mutation(schema_ptr table, api::timestamp_type timestamp, bool with_columns_and_triggers, const partition_key& pkey, std::vector& mutations) { throw std::runtime_error("not implemented"); #if 0 diff --git a/db/legacy_schema_tables.hh b/db/legacy_schema_tables.hh index 38f52ff468d8..eb7a822bcc94 100644 --- a/db/legacy_schema_tables.hh +++ b/db/legacy_schema_tables.hh @@ -50,9 +50,9 @@ future<> merge_schema(service::storage_proxy& proxy, std::vector mutat future<> merge_schema(service::storage_proxy& proxy, std::vector mutations, bool do_flush); -mutation make_create_keyspace_mutation(lw_shared_ptr keyspace, api::timestamp_type timestamp, bool with_tables_and_types_and_functions = true); +std::vector make_create_keyspace_mutations(lw_shared_ptr keyspace, api::timestamp_type timestamp, bool with_tables_and_types_and_functions = true); -void add_table_to_schema_mutation(schema_ptr table, api::timestamp_type timestamp, bool with_columns_and_triggers, mutation& m); +void add_table_to_schema_mutation(schema_ptr table, api::timestamp_type timestamp, bool with_columns_and_triggers, const partition_key& pkey, std::vector& mutations); } // namespace legacy_schema_tables } // namespace db diff --git a/service/migration_manager.hh b/service/migration_manager.hh index dfd74ae5f124..324d1b1a3ae7 100644 --- a/service/migration_manager.hh +++ b/service/migration_manager.hh @@ -287,7 +287,8 @@ public: logger.info(String.format("Create new Keyspace: %s", ksm)); #endif - return announce(proxy, db::legacy_schema_tables::make_create_keyspace_mutation(ksm, timestamp), announce_locally); + auto mutations = db::legacy_schema_tables::make_create_keyspace_mutations(ksm, timestamp); + return announce(proxy, std::move(mutations), announce_locally); } #if 0 @@ -442,6 +443,11 @@ public: { std::vector mutations; mutations.emplace_back(std::move(schema)); + return announce(proxy, std::move(mutations), announce_locally); + } + + static future<> announce(service::storage_proxy& proxy, std::vector mutations, bool announce_locally) + { if (announce_locally) { return db::legacy_schema_tables::merge_schema(proxy, std::move(mutations), false); } else { From e2685f1fec83ea1e05574a3b46437608d936c11f Mon Sep 17 00:00:00 2001 From: Pekka Enberg Date: Tue, 7 Apr 2015 15:37:20 +0300 Subject: [PATCH 02/13] service: add db accessor to storage proxy Signed-off-by: Pekka Enberg --- service/storage_proxy.hh | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/service/storage_proxy.hh b/service/storage_proxy.hh index 27eda27216a4..d19d2cb6c677 100644 --- a/service/storage_proxy.hh +++ b/service/storage_proxy.hh @@ -38,6 +38,10 @@ private: public: storage_proxy(distributed& db) : _db(db) {} + distributed& get_db() { + return _db; + } + future<> mutate_locally(const mutation& m); future<> mutate_locally(std::vector mutations); From 00510d610b22d2f06ff8c862bcf8d8343d479af0 Mon Sep 17 00:00:00 2001 From: Pekka Enberg Date: Thu, 2 Apr 2015 10:44:06 +0300 Subject: [PATCH 03/13] database: Add set_clustered_cell() variant Signed-off-by: Pekka Enberg --- mutation.cc | 9 +++++++++ mutation.hh | 1 + 2 files changed, 10 insertions(+) diff --git a/mutation.cc b/mutation.cc index ceca0813f221..162470403868 100644 --- a/mutation.cc +++ b/mutation.cc @@ -23,6 +23,15 @@ void mutation::set_clustered_cell(const exploded_clustering_prefix& prefix, cons update_column(row, def, std::move(value)); } +void mutation::set_clustered_cell(const clustering_key& key, const bytes& name, const boost::any& value, + api::timestamp_type timestamp, ttl_opt ttl) { + auto column_def = _schema->get_column_definition(name); + if (!column_def) { + throw std::runtime_error(sprint("no column definition found for '%s'", name)); + } + return set_clustered_cell(key, *column_def, atomic_cell::make_live(timestamp, ttl, column_def->type->decompose(value))); +} + void mutation::set_clustered_cell(const clustering_key& key, const column_definition& def, atomic_cell_or_collection value) { auto& row = _p.clustered_row(key).cells; update_column(row, def, std::move(value)); diff --git a/mutation.hh b/mutation.hh index f3750f839fc1..ac72020d50f3 100644 --- a/mutation.hh +++ b/mutation.hh @@ -23,6 +23,7 @@ public: mutation(const mutation&) = default; void set_static_cell(const column_definition& def, atomic_cell_or_collection value); void set_clustered_cell(const exploded_clustering_prefix& prefix, const column_definition& def, atomic_cell_or_collection value); + void set_clustered_cell(const clustering_key& key, const bytes& name, const boost::any& value, api::timestamp_type timestamp, ttl_opt ttl = {}); void set_clustered_cell(const clustering_key& key, const column_definition& def, atomic_cell_or_collection value); void set_cell(const exploded_clustering_prefix& prefix, const bytes& name, const boost::any& value, api::timestamp_type timestamp, ttl_opt ttl = {}); void set_cell(const exploded_clustering_prefix& prefix, const column_definition& def, atomic_cell_or_collection value); From cf1d6197d6c497300be1f5f713d456bb91800480 Mon Sep 17 00:00:00 2001 From: Pekka Enberg Date: Thu, 9 Apr 2015 10:14:31 +0300 Subject: [PATCH 04/13] database: add database::update_keyspace() stub Signed-off-by: Pekka Enberg --- database.cc | 4 ++++ database.hh | 1 + 2 files changed, 5 insertions(+) diff --git a/database.cc b/database.cc index 880e82cb2e4a..9eb9314331ce 100644 --- a/database.cc +++ b/database.cc @@ -303,6 +303,10 @@ keyspace& database::add_keyspace(sstring name, keyspace k) { return _keyspaces.emplace(std::move(name), std::move(k)).first->second; } +void database::update_keyspace(const sstring& name) { + throw std::runtime_error("not implemented"); +} + void database::add_column_family(const utils::UUID& uuid, column_family&& cf) { if (_keyspaces.count(cf._schema->ks_name()) == 0) { throw std::invalid_argument("Keyspace " + cf._schema->ks_name() + " not defined"); diff --git a/database.hh b/database.hh index 9c179d0d61f7..1460fa36d8ce 100644 --- a/database.hh +++ b/database.hh @@ -118,6 +118,7 @@ public: keyspace& find_keyspace(const sstring& name) throw (no_such_keyspace); const keyspace& find_keyspace(const sstring& name) const throw (no_such_keyspace); bool has_keyspace(const sstring& name) const; + void update_keyspace(const sstring& name); column_family& find_column_family(const sstring& ks, const sstring& name) throw (no_such_column_family); const column_family& find_column_family(const sstring& ks, const sstring& name) const throw (no_such_column_family); column_family& find_column_family(const utils::UUID&) throw (no_such_column_family); From 33ceac5643bc357f43546657d629eb10b94369e5 Mon Sep 17 00:00:00 2001 From: Pekka Enberg Date: Tue, 28 Apr 2015 15:45:43 +0300 Subject: [PATCH 05/13] database: add database::delete_keyspace() stub Signed-off-by: Pekka Enberg --- database.cc | 4 ++++ database.hh | 1 + 2 files changed, 5 insertions(+) diff --git a/database.cc b/database.cc index 9eb9314331ce..9251318bc447 100644 --- a/database.cc +++ b/database.cc @@ -307,6 +307,10 @@ void database::update_keyspace(const sstring& name) { throw std::runtime_error("not implemented"); } +void database::drop_keyspace(const sstring& name) { + throw std::runtime_error("not implemented"); +} + void database::add_column_family(const utils::UUID& uuid, column_family&& cf) { if (_keyspaces.count(cf._schema->ks_name()) == 0) { throw std::invalid_argument("Keyspace " + cf._schema->ks_name() + " not defined"); diff --git a/database.hh b/database.hh index 1460fa36d8ce..0eb44a802aad 100644 --- a/database.hh +++ b/database.hh @@ -119,6 +119,7 @@ public: const keyspace& find_keyspace(const sstring& name) const throw (no_such_keyspace); bool has_keyspace(const sstring& name) const; void update_keyspace(const sstring& name); + void drop_keyspace(const sstring& name); column_family& find_column_family(const sstring& ks, const sstring& name) throw (no_such_column_family); const column_family& find_column_family(const sstring& ks, const sstring& name) const throw (no_such_column_family); column_family& find_column_family(const utils::UUID&) throw (no_such_column_family); From e5457637fbc7dde295d1ab5afd20cf07716b9ba4 Mon Sep 17 00:00:00 2001 From: Pekka Enberg Date: Wed, 15 Apr 2015 12:25:01 +0300 Subject: [PATCH 06/13] config: Convert ks_meta_data constructors to C++ Signed-off-by: Pekka Enberg --- config/ks_meta_data.hh | 44 ++++++++++++++++++++++++------------------ 1 file changed, 25 insertions(+), 19 deletions(-) diff --git a/config/ks_meta_data.hh b/config/ks_meta_data.hh index a4a900ef630e..f1baf97825d4 100644 --- a/config/ks_meta_data.hh +++ b/config/ks_meta_data.hh @@ -45,35 +45,41 @@ public: const ::shared_ptr user_types; -#if 0 - public KSMetaData(String name, - Class strategyClass, - Map strategyOptions, - boolean durableWrites) - { - this(name, strategyClass, strategyOptions, durableWrites, Collections.emptyList(), new UTMetaData()); - } + ks_meta_data(sstring name_, + sstring strategy_name_, + std::unordered_map strategy_options_, + bool durable_writes_) + : ks_meta_data{std::move(name_), + std::move(strategy_name_), + std::move(strategy_options_), + durable_writes_, + {}, ::make_shared()} + { } + + ks_meta_data(sstring name_, + sstring strategy_name_, + std::unordered_map strategy_options_, + bool durable_writes_, + std::vector cf_defs) + : ks_meta_data{std::move(name_), + std::move(strategy_name_), + std::move(strategy_options_), + durable_writes_, + std::move(cf_defs), + ::make_shared()} + { } - public KSMetaData(String name, - Class strategyClass, - Map strategyOptions, - boolean durableWrites, - Iterable cfDefs) - { - this(name, strategyClass, strategyOptions, durableWrites, cfDefs, new UTMetaData()); - } -#endif ks_meta_data(sstring name_, sstring strategy_name_, std::unordered_map strategy_options_, bool durable_writes_, std::vector cf_defs, shared_ptr user_types_) - : name{name_} + : name{std::move(name_)} , strategy_name{strategy_name_.empty() ? "NetworkTopologyStrategy" : strategy_name_} , strategy_options{std::move(strategy_options_)} , durable_writes{durable_writes_} - , user_types{user_types_} + , user_types{std::move(user_types_)} { for (auto&& s : cf_defs) { _cf_meta_data.emplace(s->cf_name(), s); From ce51d476c26e708247282339cdf83d6a45de6d6b Mon Sep 17 00:00:00 2001 From: Pekka Enberg Date: Thu, 16 Apr 2015 08:41:00 +0300 Subject: [PATCH 07/13] schema.hh: Add schema::comment() accessor Signed-off-by: Pekka Enberg --- schema.hh | 3 +++ 1 file changed, 3 insertions(+) diff --git a/schema.hh b/schema.hh index 70c1ff01960d..c645c0df565e 100644 --- a/schema.hh +++ b/schema.hh @@ -122,6 +122,9 @@ public: const utils::UUID& id() const { return _raw._id; } + const sstring& comment() const { + return _raw._comment; + } void set_comment(const sstring& comment) { _raw._comment = comment; } From 3afceeea0982e760cb55266fa45b3f915b36d35d Mon Sep 17 00:00:00 2001 From: Pekka Enberg Date: Thu, 16 Apr 2015 13:22:54 +0300 Subject: [PATCH 08/13] bytes_ostream.hh: bytes_ostream::empty() Signed-off-by: Pekka Enberg --- bytes_ostream.hh | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/bytes_ostream.hh b/bytes_ostream.hh index 1fa3fbc7dfa5..8c55c7d8fea3 100644 --- a/bytes_ostream.hh +++ b/bytes_ostream.hh @@ -211,6 +211,10 @@ public: return _size; } + bool empty() const { + return _size == 0; + } + void reserve(size_t size) { // FIXME: implement } From bba798c1abb3300e06190dddd96830ce0fc6c341 Mon Sep 17 00:00:00 2001 From: Pekka Enberg Date: Thu, 23 Apr 2015 13:41:49 +0300 Subject: [PATCH 09/13] keys: Add partition_key::get_component() helper Signed-off-by: Pekka Enberg --- keys.hh | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/keys.hh b/keys.hh index 9bd77d0f2f27..1452728c2b39 100644 --- a/keys.hh +++ b/keys.hh @@ -274,6 +274,12 @@ public: public: using tuple = lw_shared_ptr>; + bytes_view get_component(const schema& s, size_t idx) const { + auto it = begin(s); + std::advance(it, idx); + return *it; + } + static partition_key from_bytes(bytes b) { return partition_key(std::move(b)); } From f17a8a7a92d10e46de3152c48b5774e4b3337fe2 Mon Sep 17 00:00:00 2001 From: Pekka Enberg Date: Tue, 21 Apr 2015 11:53:13 +0300 Subject: [PATCH 10/13] query: Add support for result sets Add a query::result_set class that contains per-row cells that can be accessed by column name. Partition keys, clustering keys, and static values are duplicated for every row for convenience. Signed-off-by: Pekka Enberg --- configure.py | 1 + query-result-set.cc | 106 ++++++++++++++++++++++++++++++++++++++++++++ query-result-set.hh | 102 ++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 209 insertions(+) create mode 100644 query-result-set.cc create mode 100644 query-result-set.hh diff --git a/configure.py b/configure.py index e91202fcbc33..afc18057bc97 100755 --- a/configure.py +++ b/configure.py @@ -407,6 +407,7 @@ def have_xen(): 'dht/murmur3_partitioner.cc', 'unimplemented.cc', 'query.cc', + 'query-result-set.cc', 'locator/abstract_replication_strategy.cc', 'locator/simple_strategy.cc', 'locator/token_metadata.cc', diff --git a/query-result-set.cc b/query-result-set.cc new file mode 100644 index 000000000000..b5213a44f962 --- /dev/null +++ b/query-result-set.cc @@ -0,0 +1,106 @@ +/* + * Copyright 2015 Cloudius Systems + */ + +#include "query-result-set.hh" + +namespace query { + +result_set_builder::result_set_builder(schema_ptr schema) + : _schema{schema} +{ } + +lw_shared_ptr result_set_builder::build() const { + return make_lw_shared(_rows); +} + +void result_set_builder::accept_new_partition(const partition_key& key, uint32_t row_count) +{ + _pkey_cells = deserialize(key); +} + +void result_set_builder::accept_new_partition(uint32_t row_count) +{ +} + +void result_set_builder::accept_new_row(const clustering_key& key, const result_row_view& static_row, const result_row_view& row) +{ + auto ckey_cells = deserialize(key); + auto static_cells = deserialize(static_row, true); + auto regular_cells = deserialize(row, false); + + std::unordered_map cells; + cells.insert(_pkey_cells.begin(), _pkey_cells.end()); + cells.insert(ckey_cells.begin(), ckey_cells.end()); + cells.insert(static_cells.begin(), static_cells.end()); + cells.insert(regular_cells.begin(), regular_cells.end()); + _rows.emplace_back(_schema, std::move(cells)); +} + +void result_set_builder::accept_new_row(const query::result_row_view &static_row, const query::result_row_view &row) +{ + auto static_cells = deserialize(static_row, true); + auto regular_cells = deserialize(row, false); + + std::unordered_map cells; + cells.insert(_pkey_cells.begin(), _pkey_cells.end()); + cells.insert(static_cells.begin(), static_cells.end()); + cells.insert(regular_cells.begin(), regular_cells.end()); + _rows.emplace_back(_schema, std::move(cells)); +} + +void result_set_builder::accept_partition_end(const result_row_view& static_row) +{ + _pkey_cells.clear(); +} + +std::unordered_map +result_set_builder::deserialize(const partition_key& key) +{ + std::unordered_map cells; + auto i = key.begin(*_schema); + for (auto&& col : _schema->partition_key_columns()) { + cells.emplace(col.name_as_text(), col.type->deserialize(*i)); + ++i; + } + return cells; +} + +std::unordered_map +result_set_builder::deserialize(const clustering_key& key) +{ + std::unordered_map cells; + auto i = key.begin(*_schema); + for (auto&& col : _schema->clustering_key_columns()) { + cells.emplace(col.name_as_text(), col.type->deserialize(*i)); + ++i; + } + return cells; +} + +std::unordered_map +result_set_builder::deserialize(const result_row_view& row, bool is_static) +{ + std::unordered_map cells; + auto i = row.iterator(); + auto columns = is_static ? _schema->static_columns() : _schema->regular_columns(); + for (auto &&col : columns) { + if (col.is_atomic()) { + auto cell = i.next_atomic_cell(); + if (cell) { + auto view = cell.value(); + cells.emplace(col.name_as_text(), col.type->deserialize(view.value())); + } + } else { + auto cell = i.next_collection_cell(); + if (cell) { + auto ctype = static_pointer_cast(col.type); + auto view = cell.value(); + cells.emplace(col.name_as_text(), ctype->deserialize(view.data, serialization_format::internal())); + } + } + } + return cells; +} + +} diff --git a/query-result-set.hh b/query-result-set.hh new file mode 100644 index 000000000000..605aa762aafd --- /dev/null +++ b/query-result-set.hh @@ -0,0 +1,102 @@ +/* + * Copyright 2015 Cloudius Systems + */ + +#pragma once + +#include "query-result-reader.hh" + +#include "core/shared_ptr.hh" + +#include +#include + +#include + +namespace query { + +class no_such_column : public std::runtime_error { +public: + using runtime_error::runtime_error; +}; + +class null_column_value : public std::runtime_error { +public: + using runtime_error::runtime_error; +}; + +// Result set row is a set of cells that are associated with a row +// including regular column cells, partition keys, as well as static values. +class result_set_row { + schema_ptr _schema; + std::unordered_map _cells; +public: + result_set_row(schema_ptr schema, std::unordered_map&& cells) + : _schema{schema} + , _cells{std::move(cells)} + { } + // Look up a deserialized row cell value by column name. + template + std::experimental::optional + get(const sstring& column_name) const throw (no_such_column) { + auto it = _cells.find(column_name); + if (it == _cells.end()) { + throw no_such_column(column_name); + } + if (it->second.empty()) { + return std::experimental::nullopt; + } + return std::experimental::optional{boost::any_cast(it->second)}; + } + template + T get_nonnull(const sstring& column_name) const throw (no_such_column, null_column_value) { + auto v = get(column_name); + if (v) { + return *v; + } + throw null_column_value(column_name); + } +}; + +// Result set is an in-memory representation of query results in +// deserialized format. To obtain a result set, use the result_set_builder +// class as a visitor to query_result::consume() function. +class result_set { + std::vector _rows; +public: + result_set(const std::vector& rows) + : _rows{std::move(rows)} + { } + bool empty() const { + return _rows.empty(); + } + const result_set_row& row(size_t idx) const throw (std::out_of_range) { + if (idx >= _rows.size()) { + throw std::out_of_range("no such row in result set: " + std::to_string(idx)); + } + return _rows[idx]; + } +}; + +// Result set builder is passed as a visitor to query_result::consume() +// function. You can call the build() method to obtain a result set that +// contains cells from the visited results. +class result_set_builder { + schema_ptr _schema; + std::vector _rows; + std::unordered_map _pkey_cells; +public: + result_set_builder(schema_ptr schema); + lw_shared_ptr build() const; + void accept_new_partition(const partition_key& key, uint32_t row_count); + void accept_new_partition(uint32_t row_count); + void accept_new_row(const clustering_key& key, const result_row_view& static_row, const result_row_view& row); + void accept_new_row(const result_row_view &static_row, const result_row_view &row); + void accept_partition_end(const result_row_view& static_row); +private: + std::unordered_map deserialize(const partition_key& key); + std::unordered_map deserialize(const clustering_key& key); + std::unordered_map deserialize(const result_row_view& row, bool is_static); +}; + +} From dd8a08597f6dec1726c81b3bd22a09af2938f805 Mon Sep 17 00:00:00 2001 From: Pekka Enberg Date: Tue, 28 Apr 2015 11:23:49 +0300 Subject: [PATCH 11/13] service: Add storage_proxy::query_local() helper Signed-off-by: Pekka Enberg --- service/storage_proxy.cc | 28 ++++++++++++++++++++++++++++ service/storage_proxy.hh | 3 +++ 2 files changed, 31 insertions(+) diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index c54ee2f5f325..31cae16782ff 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -27,6 +27,9 @@ #include "unimplemented.hh" #include "query_result_merger.hh" +#include +#include + namespace service { #if 0 @@ -1208,6 +1211,31 @@ storage_proxy::query(lw_shared_ptr cmd, db::consistency_lev }).finally([cmd] {}); } +future>> +storage_proxy::query_local(const sstring& ks_name, const sstring& cf_name, const dht::decorated_key& key) +{ + auto shard = _db.local().shard_of(key._token); + return _db.invoke_on(shard, [ks_name, cf_name, key] (database& db) { + auto schema = db.find_schema(ks_name, cf_name); + std::vector row_ranges = {query::clustering_range::make_open_ended_both_sides()}; + std::vector regular_cols; + boost::range::push_back(regular_cols, schema->regular_columns() | boost::adaptors::transformed([] (auto&& col) { return col.id; })); + auto opts = query::partition_slice::option_set::of< + query::partition_slice::option::send_partition_key>(); + query::partition_slice slice{row_ranges, {}, regular_cols, opts}; + std::vector pr = {query::partition_range::make_open_ended_both_sides()}; + auto id = db.find_uuid(ks_name, cf_name); + auto cmd = make_lw_shared(id, pr, slice, std::numeric_limits::max()); + return db.query(*cmd).then([key, schema, slice](lw_shared_ptr&& result) { + query::result_set_builder builder{schema}; + bytes_ostream w(result->buf()); + query::result_view view(w.linearize()); + view.consume(slice, builder); + return make_foreign(builder.build()); + }).finally([cmd] {}); + }); +} + #if 0 public static List read(List commands, ConsistencyLevel consistencyLevel) throws UnavailableException, IsBootstrappingException, ReadTimeoutException, InvalidRequestException diff --git a/service/storage_proxy.hh b/service/storage_proxy.hh index d19d2cb6c677..7fd2b89e0d9e 100644 --- a/service/storage_proxy.hh +++ b/service/storage_proxy.hh @@ -27,6 +27,7 @@ #include "database.hh" #include "query-request.hh" #include "query-result.hh" +#include "query-result-set.hh" #include "core/distributed.hh" #include "db/consistency_level.hh" @@ -71,6 +72,8 @@ public: future<> mutate_atomically(std::vector mutations, db::consistency_level cl); future>> query(lw_shared_ptr cmd, db::consistency_level cl); + + future>> query_local(const sstring& ks_name, const sstring& cf_name, const dht::decorated_key& key); }; } From 9ec10c240f127012b49a487a5a9c6787689772f5 Mon Sep 17 00:00:00 2001 From: Pekka Enberg Date: Tue, 31 Mar 2015 14:39:23 +0300 Subject: [PATCH 12/13] db: Convert LegacySchemaTables keyspace merging Signed-off-by: Pekka Enberg --- db/legacy_schema_tables.cc | 266 +++++++++++++++++++++---------------- db/legacy_schema_tables.hh | 17 +++ 2 files changed, 171 insertions(+), 112 deletions(-) diff --git a/db/legacy_schema_tables.cc b/db/legacy_schema_tables.cc index 9dbac421f11a..b8eb0c9c72ea 100644 --- a/db/legacy_schema_tables.cc +++ b/db/legacy_schema_tables.cc @@ -23,8 +23,14 @@ #include "utils/UUID_gen.hh" #include "legacy_schema_tables.hh" + +#include "dht/i_partitioner.hh" #include "system_keyspace.hh" +#include "query-result-set.hh" + +#include "core/do_with.hh" + using namespace db::system_keyspace; /** system.schema_* tables used to store keyspace/table/type attributes prior to C* 3.0 */ @@ -388,38 +394,45 @@ std::vector ALL { KEYSPACES, COLUMNFAMILIES, COLUMNS, TRIGGERS, USE mutation.add(partition.cf); } } +#endif - private static Map readSchemaForKeyspaces(String schemaTableName, Set keyspaceNames) + future + read_schema_for_keyspaces(service::storage_proxy& proxy, const sstring& schema_table_name, const std::set& keyspace_names) { - Map schema = new HashMap<>(); - - for (String keyspaceName : keyspaceNames) - { - Row schemaEntity = readSchemaPartitionForKeyspace(schemaTableName, keyspaceName); - if (schemaEntity.cf != null) - schema.put(schemaEntity.key, schemaEntity.cf); - } - - return schema; + auto map = [&proxy, schema_table_name] (sstring keyspace_name) { return read_schema_partition_for_keyspace(proxy, schema_table_name, keyspace_name); }; + auto insert = [] (schema_result&& schema, auto&& schema_entity) { + if (schema_entity.second) { + schema.insert(std::move(schema_entity)); + } + return std::move(schema); + }; + return map_reduce(keyspace_names.begin(), keyspace_names.end(), map, schema_result(), insert); } +#if 0 private static ByteBuffer getSchemaKSKey(String ksName) { return AsciiType.instance.fromString(ksName); } +#endif - private static Row readSchemaPartitionForKeyspace(String schemaTableName, String keyspaceName) + future>>> + read_schema_partition_for_keyspace(service::storage_proxy& proxy, const sstring& schema_table_name, const sstring& keyspace_name) { - DecoratedKey keyspaceKey = StorageService.getPartitioner().decorateKey(getSchemaKSKey(keyspaceName)); - return readSchemaPartitionForKeyspace(schemaTableName, keyspaceKey); + auto schema = proxy.get_db().local().find_schema(system_keyspace::NAME, schema_table_name); + auto keyspace_key = dht::global_partitioner().decorate_key(partition_key::from_single_value(*schema, to_bytes(keyspace_name))); + return read_schema_partition_for_keyspace(proxy, schema_table_name, keyspace_key); } - private static Row readSchemaPartitionForKeyspace(String schemaTableName, DecoratedKey keyspaceKey) + future>>> + read_schema_partition_for_keyspace(service::storage_proxy& proxy, const sstring& schema_table_name, const dht::decorated_key& keyspace_key) { - QueryFilter filter = QueryFilter.getIdentityFilter(keyspaceKey, schemaTableName, System.currentTimeMillis()); - return new Row(keyspaceKey, getSchemaCFS(schemaTableName).getColumnFamily(filter)); + return proxy.query_local(system_keyspace::NAME, schema_table_name, keyspace_key).then([keyspace_key] (auto&& rs) { + return std::make_pair(keyspace_key, std::move(rs)); + }); } +#if 0 private static Row readSchemaPartitionForTable(String schemaTableName, String keyspaceName, String tableName) { DecoratedKey key = StorageService.getPartitioner().decorateKey(getSchemaKSKey(keyspaceName)); @@ -461,90 +474,116 @@ std::vector ALL { KEYSPACES, COLUMNFAMILIES, COLUMNS, TRIGGERS, USE future<> merge_schema(service::storage_proxy& proxy, std::vector mutations, bool do_flush) { -#if 0 + schema_ptr s = keyspaces(); // compare before/after schemas of the affected keyspaces only - Set keyspaces = new HashSet<>(mutations.size()); - for (Mutation mutation : mutations) - keyspaces.add(ByteBufferUtil.string(mutation.key())); + std::set keyspaces; + for (auto&& mutation : mutations) { + keyspaces.emplace(boost::any_cast(utf8_type->deserialize(mutation.key().get_component(*s, 0)))); + } // current state of the schema - Map oldKeyspaces = readSchemaForKeyspaces(KEYSPACES, keyspaces); - Map oldColumnFamilies = readSchemaForKeyspaces(COLUMNFAMILIES, keyspaces); - Map oldTypes = readSchemaForKeyspaces(USERTYPES, keyspaces); - Map oldFunctions = readSchemaForKeyspaces(FUNCTIONS, keyspaces); - Map oldAggregates = readSchemaForKeyspaces(AGGREGATES, keyspaces); + auto old_keyspaces = read_schema_for_keyspaces(proxy, KEYSPACES, keyspaces); + auto old_column_families = read_schema_for_keyspaces(proxy, COLUMNFAMILIES, keyspaces); + auto old_types = read_schema_for_keyspaces(proxy, USERTYPES, keyspaces); + auto old_functions = read_schema_for_keyspaces(proxy, FUNCTIONS, keyspaces); + auto old_aggregates = read_schema_for_keyspaces(proxy, AGGREGATES, keyspaces); + + return when_all(std::move(old_keyspaces), std::move(old_column_families), std::move(old_types), + std::move(old_functions), std::move(old_aggregates)).then([&proxy, keyspaces, mutations = std::move(mutations)] (auto&& old_results) mutable { + return proxy.mutate_locally(std::move(mutations)).then([&proxy, keyspaces, old_results = std::move(old_results)] () mutable { +#if 0 + if (doFlush) + flushSchemaTables(); #endif - return proxy.mutate_locally(std::move(mutations)).then([] { + // with new data applied + auto new_keyspaces = read_schema_for_keyspaces(proxy, KEYSPACES, keyspaces); + auto new_column_families = read_schema_for_keyspaces(proxy, COLUMNFAMILIES, keyspaces); + auto new_types = read_schema_for_keyspaces(proxy, USERTYPES, keyspaces); + auto new_functions = read_schema_for_keyspaces(proxy, FUNCTIONS, keyspaces); + auto new_aggregates = read_schema_for_keyspaces(proxy, AGGREGATES, keyspaces); + + // FIXME: Make the update atomic like in Origin. + return when_all(std::move(new_keyspaces), std::move(new_column_families), std::move(new_types), + std::move(new_functions), std::move(new_aggregates)).then([&proxy, old_results = std::move(old_results)] (auto&& new_results) mutable { + auto old_keyspaces = std::move(std::get(std::get<0>(old_results).get())); + auto old_column_families = std::move(std::get(std::get<1>(old_results).get())); + auto old_types = std::move(std::get(std::get<2>(old_results).get())); + auto old_functions = std::move(std::get(std::get<3>(old_results).get())); + auto old_aggregates = std::move(std::get(std::get<4>(old_results).get())); + + auto new_keyspaces = std::move(std::get(std::get<0>(new_results).get())); + auto new_column_families = std::move(std::get(std::get<1>(new_results).get())); + auto new_types = std::move(std::get(std::get<2>(new_results).get())); + auto new_functions = std::move(std::get(std::get<3>(new_results).get())); + auto new_aggregates = std::move(std::get(std::get<4>(new_results).get())); + + auto keyspaces_to_drop = merge_keyspaces(proxy, std::move(old_keyspaces), std::move(new_keyspaces)); #if 0 - if (doFlush) - flushSchemaTables(); - - // with new data applied - Map newKeyspaces = readSchemaForKeyspaces(KEYSPACES, keyspaces); - Map newColumnFamilies = readSchemaForKeyspaces(COLUMNFAMILIES, keyspaces); - Map newTypes = readSchemaForKeyspaces(USERTYPES, keyspaces); - Map newFunctions = readSchemaForKeyspaces(FUNCTIONS, keyspaces); - Map newAggregates = readSchemaForKeyspaces(AGGREGATES, keyspaces); - - Set keyspacesToDrop = mergeKeyspaces(oldKeyspaces, newKeyspaces); - mergeTables(oldColumnFamilies, newColumnFamilies); - mergeTypes(oldTypes, newTypes); - mergeFunctions(oldFunctions, newFunctions); - mergeAggregates(oldAggregates, newAggregates); - - // it is safe to drop a keyspace only when all nested ColumnFamilies where deleted - for (String keyspaceToDrop : keyspacesToDrop) - Schema.instance.dropKeyspace(keyspaceToDrop); + mergeTables(oldColumnFamilies, newColumnFamilies); + mergeTypes(oldTypes, newTypes); + mergeFunctions(oldFunctions, newFunctions); + mergeAggregates(oldAggregates, newAggregates); #endif - return make_ready_future<>(); + return when_all(std::move(keyspaces_to_drop)).then([proxy] (auto&& results) mutable { + auto keyspaces_to_drop = std::move(std::get>(std::get<0>(results).get())); + + return proxy.get_db().invoke_on_all([keyspaces_to_drop = std::move(keyspaces_to_drop)] (database& db) { + // it is safe to drop a keyspace only when all nested ColumnFamilies where deleted + for (auto&& keyspace_to_drop : keyspaces_to_drop) { + db.drop_keyspace(keyspace_to_drop); + } + }); + }); + }); + }); }); } -#if 0 - private static Set mergeKeyspaces(Map before, Map after) - { - List created = new ArrayList<>(); - List altered = new ArrayList<>(); - Set dropped = new HashSet<>(); - - /* - * - we don't care about entriesOnlyOnLeft() or entriesInCommon(), because only the changes are of interest to us - * - of all entriesOnlyOnRight(), we only care about ones that have live columns; it's possible to have a ColumnFamily - * there that only has the top-level deletion, if: - * a) a pushed DROP KEYSPACE change for a keyspace hadn't ever made it to this node in the first place - * b) a pulled dropped keyspace that got dropped before it could find a way to this node - * - of entriesDiffering(), we don't care about the scenario where both pre and post-values have zero live columns: - * that means that a keyspace had been recreated and dropped, and the recreated keyspace had never found a way - * to this node - */ - MapDifference diff = Maps.difference(before, after); - - for (Map.Entry entry : diff.entriesOnlyOnRight().entrySet()) - if (entry.getValue().hasColumns()) - created.add(new Row(entry.getKey(), entry.getValue())); - - for (Map.Entry> entry : diff.entriesDiffering().entrySet()) - { - String keyspaceName = AsciiType.instance.compose(entry.getKey().getKey()); - - ColumnFamily pre = entry.getValue().leftValue(); - ColumnFamily post = entry.getValue().rightValue(); - - if (pre.hasColumns() && post.hasColumns()) - altered.add(keyspaceName); - else if (pre.hasColumns()) - dropped.add(keyspaceName); - else if (post.hasColumns()) // a (re)created keyspace - created.add(new Row(entry.getKey(), post)); + future> merge_keyspaces(service::storage_proxy& proxy, schema_result&& before, schema_result&& after) + { + std::vector>>> created; + std::vector altered; + std::set dropped; + + for (auto&& right : after) { + auto left = before.find(right.first); + if (left != before.end()) { + schema_ptr s = keyspaces(); + auto b = left->first._key.get_component(*s, 0); + sstring keyspace_name = boost::any_cast(utf8_type->deserialize(b)); + auto&& pre = left->second; + auto&& post = right.second; + if (!pre->empty() && !post->empty()) { + altered.emplace_back(keyspace_name); + } else if (!pre->empty()) { + dropped.emplace(keyspace_name); + } else if (!post->empty()) { // a (re)created keyspace + created.emplace_back(std::move(right)); + } + } else { + if (!right.second->empty()) { + created.emplace_back(std::move(right)); + } + } } - - for (Row row : created) - Schema.instance.addKeyspace(createKeyspaceFromSchemaPartition(row)); - for (String name : altered) - Schema.instance.updateKeyspace(name); - return dropped; + return do_with(std::move(created), [&proxy, altered = std::move(altered)] (auto& created) { + return proxy.get_db().invoke_on_all([&created, altered = std::move(altered)] (database& db) { + for (auto&& kv : created) { + auto ksm = create_keyspace_from_schema_partition(kv); + keyspace k; + k.create_replication_strategy(*ksm); + db.add_keyspace(ksm->name, std::move(k)); + } + for (auto&& name : altered) { + db.update_keyspace(name); + } + }); + }).then([dropped = std::move(dropped)] () { + return make_ready_future>(dropped); + }); } +#if 0 // see the comments for mergeKeyspaces() private static void mergeTables(Map before, Map after) { @@ -817,29 +856,28 @@ std::vector ALL { KEYSPACES, COLUMNFAMILIES, COLUMNS, TRIGGERS, USE return createKeyspaceFromSchemaPartition(partition); } +#endif /** * Deserialize only Keyspace attributes without nested tables or types * * @param partition Keyspace attributes in serialized form */ - private static KSMetaData createKeyspaceFromSchemaPartition(Row partition) + lw_shared_ptr create_keyspace_from_schema_partition(const std::pair>>& result) { - String query = String.format("SELECT * FROM %s.%s", SystemKeyspace.NAME, KEYSPACES); - UntypedResultSet.Row row = QueryProcessor.resultify(query, partition).one(); - try - { - return new KSMetaData(row.getString("keyspace_name"), - AbstractReplicationStrategy.getClass(row.getString("strategy_class")), - fromJsonMap(row.getString("strategy_options")), - row.getBoolean("durable_writes")); - } - catch (ConfigurationException e) - { - throw new RuntimeException(e); + auto&& rs = result.second; + if (rs->empty()) { + throw std::runtime_error("query result has no rows"); } + auto&& row = rs->row(0); + auto keyspace_name = row.get_nonnull("keyspace_name"); + auto strategy_name = row.get_nonnull("strategy_class"); + std::unordered_map strategy_options; + bool durable_writes = row.get_nonnull("durable_writes"); + return make_lw_shared(keyspace_name, strategy_name, strategy_options, durable_writes); } +#if 0 /* * User type metadata serialization/deserialization. */ @@ -928,17 +966,17 @@ std::vector ALL { KEYSPACES, COLUMNFAMILIES, COLUMNS, TRIGGERS, USE void add_table_to_schema_mutation(schema_ptr table, api::timestamp_type timestamp, bool with_columns_and_triggers, const partition_key& pkey, std::vector& mutations) { - throw std::runtime_error("not implemented"); -#if 0 // For property that can be null (and can be changed), we insert tombstones, to make sure // we don't keep a property the user has removed - ColumnFamily cells = mutation.addOrGet(Columnfamilies); - Composite prefix = Columnfamilies.comparator.make(table.cfName); - CFRowAdder adder = new CFRowAdder(cells, prefix, timestamp); - - adder.add("cf_id", table.cfId); - adder.add("type", table.cfType.toString()); - + schema_ptr s = columnfamilies(); + mutation m{pkey, s}; + mutations.emplace_back(std::move(m)); + auto ckey = clustering_key::from_single_value(*s, to_bytes(table->cf_name())); + m.set_clustered_cell(ckey, "cf_id", table->id(), timestamp); +#if 0 + m.set_clustered_cell(ckey, "type", table.cfType.toString(), timestamp); +#endif +#if 0 if (table.isSuper()) { // We need to continue saving the comparator and subcomparator separatly, otherwise @@ -951,10 +989,14 @@ std::vector ALL { KEYSPACES, COLUMNFAMILIES, COLUMNS, TRIGGERS, USE { adder.add("comparator", table.comparator.toString()); } +#endif +#if 0 adder.add("bloom_filter_fp_chance", table.getBloomFilterFpChance()); adder.add("caching", table.getCaching().toString()); - adder.add("comment", table.getComment()); +#endif + m.set_clustered_cell(ckey, "comment", table->comment(), timestamp); +#if 0 adder.add("compaction_strategy_class", table.compactionStrategyClass.getName()); adder.add("compaction_strategy_options", json(table.compactionStrategyOptions)); adder.add("compression_parameters", json(table.compressionParameters.asThriftOptions())); diff --git a/db/legacy_schema_tables.hh b/db/legacy_schema_tables.hh index eb7a822bcc94..8d0539ab7665 100644 --- a/db/legacy_schema_tables.hh +++ b/db/legacy_schema_tables.hh @@ -29,11 +29,18 @@ #include "schema.hh" #include +#include + +namespace query { +class result_set; +} /** system.schema_* tables used to store keyspace/table/type attributes prior to C* 3.0 */ namespace db { namespace legacy_schema_tables { +using schema_result = std::map>>; + static constexpr auto KEYSPACES = "schema_keyspaces"; static constexpr auto COLUMNFAMILIES = "schema_columnfamilies"; static constexpr auto COLUMNS = "schema_columns"; @@ -46,12 +53,22 @@ extern std::vector ALL; std::vector all_tables(); +future>>> +read_schema_partition_for_keyspace(service::storage_proxy& proxy, const sstring& schema_table_name, const sstring& keyspace_name); + +future>>> +read_schema_partition_for_keyspace(service::storage_proxy& proxy, const sstring& schema_table_name, const dht::decorated_key& keyspace_key); + future<> merge_schema(service::storage_proxy& proxy, std::vector mutations); future<> merge_schema(service::storage_proxy& proxy, std::vector mutations, bool do_flush); +future> merge_keyspaces(service::storage_proxy& proxy, schema_result&& before, schema_result&& after); + std::vector make_create_keyspace_mutations(lw_shared_ptr keyspace, api::timestamp_type timestamp, bool with_tables_and_types_and_functions = true); +lw_shared_ptr create_keyspace_from_schema_partition(const std::pair>>& partition); + void add_table_to_schema_mutation(schema_ptr table, api::timestamp_type timestamp, bool with_columns_and_triggers, const partition_key& pkey, std::vector& mutations); } // namespace legacy_schema_tables From 987d7ba698a9e8e1752d043169f46e912237eacb Mon Sep 17 00:00:00 2001 From: Pekka Enberg Date: Tue, 31 Mar 2015 11:08:19 +0300 Subject: [PATCH 13/13] tests: Make create keyspace test case more robust Add an assertion to check that the newly created keyspace actually exists. Signed-off-by: Pekka Enberg --- tests/urchin/cql_query_test.cc | 4 +++- tests/urchin/cql_test_env.cc | 6 ++++++ tests/urchin/cql_test_env.hh | 2 ++ 3 files changed, 11 insertions(+), 1 deletion(-) diff --git a/tests/urchin/cql_query_test.cc b/tests/urchin/cql_query_test.cc index 6904bcfb0ecb..b61ee90e9698 100644 --- a/tests/urchin/cql_query_test.cc +++ b/tests/urchin/cql_query_test.cc @@ -18,7 +18,9 @@ SEASTAR_TEST_CASE(test_create_keyspace_statement) { return do_with_cql_env([] (auto& e) { - return e.execute_cql("create keyspace ks with replication = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };").discard_result(); + return e.execute_cql("create keyspace ks2 with replication = { 'class' : 'org.apache.cassandra.locator.SimpleStrategy', 'replication_factor' : 1 };").discard_result().then([&e] { + return e.require_keyspace_exists("ks2"); + }); }); } diff --git a/tests/urchin/cql_test_env.cc b/tests/urchin/cql_test_env.cc index 1fc2d8980bd8..017b224c60c8 100644 --- a/tests/urchin/cql_test_env.cc +++ b/tests/urchin/cql_test_env.cc @@ -105,6 +105,12 @@ class in_memory_cql_env : public cql_test_env { }); } + virtual future<> require_keyspace_exists(const sstring& ks_name) override { + auto& db = _db->local(); + assert(db.has_keyspace(ks_name)); + return make_ready_future<>(); + } + virtual future<> require_column_has_value(const sstring& table_name, std::vector pk, std::vector ck, diff --git a/tests/urchin/cql_test_env.hh b/tests/urchin/cql_test_env.hh index 4a75a86296e6..4b427055f7b7 100644 --- a/tests/urchin/cql_test_env.hh +++ b/tests/urchin/cql_test_env.hh @@ -32,6 +32,8 @@ public: virtual future<> create_table(std::function schema_maker) = 0; + virtual future<> require_keyspace_exists(const sstring& ks_name) = 0; + virtual future<> require_column_has_value( const sstring& table_name, std::vector pk,