From 68dfcf5256190fac717d487bf9b7472b579436a5 Mon Sep 17 00:00:00 2001 From: "Raphael S. Carvalho" Date: Fri, 6 Jan 2017 15:22:27 -0200 Subject: [PATCH] db: avoid excessive memory usage during resharding After resharding, sstables may be owned by all shards, which means that file descriptors and memory usage for metadata will increase by a factor equal to number of shards. That can easily lead to OOM. SSTable components are immutable, so they can be stored in one shard and shared with others that need it. We use the following formula to decide which shard will open the sstable and share it with the others: (generation % smp::count), which is the inverse of how we calculate generation for new sstables. So if no resharding is performed, everything is shard-local. With this approach, resource usage due to loaded sstables will be evenly distributed among shards. For this approach to work, we now only populate keyspaces from shard 0. It's now the sole responsible for iterating through column family dirs. In addition, most of population functions are now free and take distributed database object as parameter. Fixes #1951. Signed-off-by: Raphael S. Carvalho --- checked-file-impl.hh | 2 +- database.cc | 313 +++++++++++++++++++++++-------------- database.hh | 40 +++-- main.cc | 39 ++--- service/storage_service.cc | 7 +- sstables/sstables.cc | 71 ++++++--- sstables/sstables.hh | 28 ++++ tests/cql_test_env.cc | 4 +- 8 files changed, 311 insertions(+), 193 deletions(-) diff --git a/checked-file-impl.hh b/checked-file-impl.hh index cc365004a24a..a1c6dd884d8f 100644 --- a/checked-file-impl.hh +++ b/checked-file-impl.hh @@ -117,7 +117,7 @@ private: file _file; }; -inline file make_checked_file(const io_error_handler& error_handler, file& f) +inline file make_checked_file(const io_error_handler& error_handler, file f) { return file(::make_shared(error_handler, f)); } diff --git a/database.cc b/database.cc index e0095c633856..689dc23129b6 100644 --- a/database.cc +++ b/database.cc @@ -779,17 +779,16 @@ static bool belongs_to_other_shard(const std::vector& shards) { } future -column_family::open_sstable(sstring dir, int64_t generation, sstables::sstable::version_types v, sstables::sstable::format_types f) { +column_family::open_sstable(sstables::foreign_sstable_open_info info, sstring dir, int64_t generation, + sstables::sstable::version_types v, sstables::sstable::format_types f) { auto sst = make_lw_shared(_schema, dir, generation, v, f); - return sst->get_owning_shards_from_unloaded().then([this, sst] (std::vector shards) mutable { - if (!belongs_to_current_shard(shards)) { - dblog.debug("sstable {} not relevant for this shard, ignoring", sst->get_filename()); - sst->mark_for_deletion(); - return make_ready_future(); - } - return sst->load().then([sst] () mutable { - return make_ready_future(std::move(sst)); - }); + if (!belongs_to_current_shard(info.owners)) { + dblog.debug("sstable {} not relevant for this shard, ignoring", sst->get_filename()); + sst->mark_for_deletion(); + return make_ready_future(); + } + return sst->load(std::move(info)).then([sst] () mutable { + return make_ready_future(std::move(sst)); }); } @@ -834,50 +833,6 @@ void column_family::start_rewrite() { _sstables_need_rewrite.clear(); } -future column_family::probe_file(sstring sstdir, sstring fname) { - - using namespace sstables; - - entry_descriptor comps = entry_descriptor::make_descriptor(fname); - - // Every table will have a TOC. Using a specific file as a criteria, as - // opposed to, say verifying _sstables.count() to be zero is more robust - // against parallel loading of the directory contents. - if (comps.component != sstable::component_type::TOC) { - return make_ready_future(std::move(comps)); - } - - update_sstables_known_generation(comps.generation); - - { - auto i = boost::range::find_if(*_sstables->all(), [gen = comps.generation] (sstables::shared_sstable sst) { return sst->generation() == gen; }); - if (i != _sstables->all()->end()) { - auto new_toc = sstdir + "/" + fname; - throw std::runtime_error(sprint("Attempted to add sstable generation %d twice: new=%s existing=%s", - comps.generation, new_toc, (*i)->toc_filename())); - } - } - - return open_sstable(sstdir, comps.generation, comps.version, comps.format).then([this] (sstables::shared_sstable sst) mutable { - if (sst) { - load_sstable(sst); - } - return make_ready_future<>(); - }).then_wrapped([fname, comps] (future<> f) { - try { - f.get(); - } catch (malformed_sstable_exception& e) { - dblog.error("malformed sstable {}: {}. Refusing to boot", fname, e.what()); - throw; - } catch(...) { - dblog.error("Unrecognized error while processing {}: {}. Refusing to boot", - fname, std::current_exception()); - throw; - } - return make_ready_future(std::move(comps)); - }); -} - void column_family::update_stats_for_new_sstable(uint64_t disk_space_used_by_sstable) { _stats.live_disk_space_used += disk_space_used_by_sstable; _stats.total_disk_space_used += disk_space_used_by_sstable; @@ -1411,30 +1366,6 @@ future<> column_family::cleanup_sstables(sstables::compaction_descriptor descrip }); } -future<> -column_family::load_new_sstables(std::vector new_tables) { - return do_with(std::vector(), [this, new_tables = std::move(new_tables)] (auto& sstables) { - return parallel_for_each(new_tables, [this, &sstables] (auto comps) { - return this->open_sstable(_config.datadir, comps.generation, comps.version, comps.format).then([&sstables] (sstables::shared_sstable sst) { - if (sst) { - sstables.push_back(std::move(sst)); - } - return make_ready_future<>(); - }); - }).then([this, &sstables] { - // atomically load all preloaded sstables into column family. - for (auto& sst : sstables) { - this->load_sstable(sst, true); - } - this->start_rewrite(); - this->trigger_compaction(); - // Drop entire cache for this column family because it may be populated - // with stale data. - return this->get_row_cache().clear(); - }); - }); -} - // FIXME: this is just an example, should be changed to something more general // Note: We assume that the column_family does not get destroyed during compaction. future<> @@ -1550,7 +1481,119 @@ inline bool column_family::manifest_json_filter(const sstring& fname) { return true; } -future<> column_family::populate(sstring sstdir) { +// TODO: possibly move it to seastar +template +static future<> invoke_all_with_ptr(distributed& s, PtrType ptr, Func&& func) { + return parallel_for_each(smp::all_cpus(), [&s, &func, ptr] (unsigned id) { + return s.invoke_on(id, [func, foreign = make_foreign(ptr)] (Service& s) mutable { + return func(s, std::move(foreign)); + }); + }); +} + +future<> distributed_loader::open_sstable(distributed& db, sstables::entry_descriptor comps, + std::function (column_family&, sstables::foreign_sstable_open_info)> func) { + // loads components of a sstable from shard S and share it with all other + // shards. Which shard a sstable will be opened at is decided using + // calculate_shard_from_sstable_generation(), which is the inverse of + // calculate_generation_for_new_table(). That ensures every sstable is + // shard-local if reshard wasn't performed. This approach is also expected + // to distribute evenly the resource usage among all shards. + + return db.invoke_on(column_family::calculate_shard_from_sstable_generation(comps.generation), + [&db, comps = std::move(comps), func = std::move(func)] (database& local) { + auto& cf = local.find_column_family(comps.ks, comps.cf); + + auto f = sstables::sstable::load_shared_components(cf.schema(), cf._config.datadir, comps.generation, comps.version, comps.format); + return f.then([&db, comps = std::move(comps), func = std::move(func)] (sstables::sstable_open_info info) { + // shared components loaded, now opening sstable in all shards with shared components + return do_with(std::move(info), [&db, comps = std::move(comps), func = std::move(func)] (auto& info) { + return invoke_all_with_ptr(db, std::move(info.components), + [owners = info.owners, data = info.data.dup(), index = info.index.dup(), comps, func] (database& db, auto components) { + auto& cf = db.find_column_family(comps.ks, comps.cf); + return func(cf, sstables::foreign_sstable_open_info{std::move(components), owners, data, index}); + }); + }); + }); + }); +} + +future<> distributed_loader::load_new_sstables(distributed& db, sstring ks, sstring cf, std::vector new_tables) { + return parallel_for_each(new_tables, [&db] (auto comps) { + auto cf_sstable_open = [comps] (column_family& cf, sstables::foreign_sstable_open_info info) { + auto f = cf.open_sstable(std::move(info), cf._config.datadir, comps.generation, comps.version, comps.format); + return f.then([&cf] (sstables::shared_sstable sst) mutable { + if (sst) { + cf._sstables_opened_but_not_loaded.push_back(sst); + } + return make_ready_future<>(); + }); + }; + return distributed_loader::open_sstable(db, comps, cf_sstable_open); + }).then([&db, ks = std::move(ks), cf = std::move(cf)] { + return db.invoke_on_all([ks = std::move(ks), cfname = std::move(cf)] (database& db) { + auto& cf = db.find_column_family(ks, cfname); + // atomically load all opened sstables into column family. + for (auto& sst : cf._sstables_opened_but_not_loaded) { + cf.load_sstable(sst, true); + } + cf._sstables_opened_but_not_loaded.clear(); + cf.start_rewrite(); + cf.trigger_compaction(); + // Drop entire cache for this column family because it may be populated + // with stale data. + return cf.get_row_cache().clear(); + }); + }); +} + +future distributed_loader::probe_file(distributed& db, sstring sstdir, sstring fname) { + using namespace sstables; + + entry_descriptor comps = entry_descriptor::make_descriptor(fname); + + // Every table will have a TOC. Using a specific file as a criteria, as + // opposed to, say verifying _sstables.count() to be zero is more robust + // against parallel loading of the directory contents. + if (comps.component != sstable::component_type::TOC) { + return make_ready_future(std::move(comps)); + } + auto cf_sstable_open = [sstdir, comps, fname] (column_family& cf, sstables::foreign_sstable_open_info info) { + cf.update_sstables_known_generation(comps.generation); + { + auto i = boost::range::find_if(*cf._sstables->all(), [gen = comps.generation] (sstables::shared_sstable sst) { return sst->generation() == gen; }); + if (i != cf._sstables->all()->end()) { + auto new_toc = sstdir + "/" + fname; + throw std::runtime_error(sprint("Attempted to add sstable generation %d twice: new=%s existing=%s", + comps.generation, new_toc, (*i)->toc_filename())); + } + } + return cf.open_sstable(std::move(info), sstdir, comps.generation, comps.version, comps.format).then([&cf] (sstables::shared_sstable sst) mutable { + if (sst) { + cf.load_sstable(sst); + } + return make_ready_future<>(); + }); + }; + + return distributed_loader::open_sstable(db, comps, cf_sstable_open).then_wrapped([fname] (future<> f) { + try { + f.get(); + } catch (malformed_sstable_exception& e) { + dblog.error("malformed sstable {}: {}. Refusing to boot", fname, e.what()); + throw; + } catch(...) { + dblog.error("Unrecognized error while processing {}: {}. Refusing to boot", + fname, std::current_exception()); + throw; + } + return make_ready_future<>(); + }).then([comps] () mutable { + return make_ready_future(std::move(comps)); + }); +} + +future<> distributed_loader::populate_column_family(distributed& db, sstring sstdir, sstring ks, sstring cf) { // We can catch most errors when we try to load an sstable. But if the TOC // file is the one missing, we won't try to load the sstable at all. This // case is still an invalid case, but it is way easier for us to treat it @@ -1570,10 +1613,10 @@ future<> column_family::populate(sstring sstdir) { auto verifier = make_lw_shared>(); auto descriptor = make_lw_shared(); - return do_with(std::vector>(), [this, sstdir, verifier, descriptor] (std::vector>& futures) { - return lister::scan_dir(sstdir, { directory_entry_type::regular }, [this, sstdir, verifier, descriptor, &futures] (directory_entry de) { + return do_with(std::vector>(), [&db, sstdir, verifier, descriptor, ks, cf] (std::vector>& futures) { + return lister::scan_dir(sstdir, { directory_entry_type::regular }, [&db, sstdir, verifier, descriptor, &futures] (directory_entry de) { // FIXME: The secondary indexes are in this level, but with a directory type, (starting with ".") - auto f = probe_file(sstdir, de.name).then([verifier, descriptor, sstdir, de] (auto entry) { + auto f = distributed_loader::probe_file(db, sstdir, de.name).then([verifier, descriptor, sstdir, de] (auto entry) { auto filename = sstdir + "/" + de.name; if (entry.component == sstables::sstable::component_type::TemporaryStatistics) { return remove_file(sstables::sstable::filename(sstdir, entry.ks, entry.cf, entry.version, entry.generation, @@ -1618,7 +1661,7 @@ future<> column_family::populate(sstring sstdir) { futures.push_back(std::move(f)); return make_ready_future<>(); - }, &manifest_json_filter).then([&futures] { + }, &column_family::manifest_json_filter).then([&futures] { return when_all(futures.begin(), futures.end()).then([] (std::vector> ret) { std::exception_ptr eptr; @@ -1639,8 +1682,8 @@ future<> column_family::populate(sstring sstdir) { } return make_ready_future<>(); }); - }).then([verifier, sstdir, descriptor, this] { - return parallel_for_each(*verifier, [sstdir = std::move(sstdir), descriptor, this] (auto v) { + }).then([verifier, sstdir, descriptor, ks = std::move(ks), cf = std::move(cf)] { + return parallel_for_each(*verifier, [sstdir = std::move(sstdir), ks = std::move(ks), cf = std::move(cf), descriptor] (auto v) { if (v.second == status::has_temporary_toc_file) { unsigned long gen = v.first; assert(descriptor->version); @@ -1653,17 +1696,21 @@ future<> column_family::populate(sstring sstdir) { return make_ready_future<>(); } // shard 0 is the responsible for removing a partial sstable. - return sstables::sstable::remove_sstable_with_temp_toc(_schema->ks_name(), _schema->cf_name(), sstdir, gen, version, format); + return sstables::sstable::remove_sstable_with_temp_toc(ks, cf, sstdir, gen, version, format); } else if (v.second != status::has_toc_file) { throw sstables::malformed_sstable_exception(sprint("At directory: %s: no TOC found for SSTable with generation %d!. Refusing to boot", sstdir, v.first)); } return make_ready_future<>(); }); }); - }).then([this] { - // Make sure this is called even if CF is empty - mark_ready_for_writes(); + }).then([&db, ks, cf] { + return db.invoke_on_all([ks = std::move(ks), cfname = std::move(cf)] (database& db) { + auto& cf = db.find_column_family(ks, cfname); + // Make sure this is called even if CF is empty + cf.mark_ready_for_writes(); + }); }); + } utils::UUID database::empty_version = utils::UUID_gen::get_name_UUID(bytes{}); @@ -1893,24 +1940,27 @@ const utils::UUID& database::get_version() const { return _version; } -future<> database::populate_keyspace(sstring datadir, sstring ks_name) { +future<> distributed_loader::populate_keyspace(distributed& db, sstring datadir, sstring ks_name) { auto ksdir = datadir + "/" + ks_name; - auto i = _keyspaces.find(ks_name); - if (i == _keyspaces.end()) { + auto& keyspaces = db.local().get_keyspaces(); + auto i = keyspaces.find(ks_name); + if (i == keyspaces.end()) { dblog.warn("Skipping undefined keyspace: {}", ks_name); return make_ready_future<>(); } else { dblog.info("Populating Keyspace {}", ks_name); auto& ks = i->second; + auto& column_families = db.local().get_column_families(); + return parallel_for_each(ks.metadata()->cf_meta_data() | boost::adaptors::map_values, - [ks_name, &ks, this] (schema_ptr s) { + [ks_name, &ks, &column_families, &db] (schema_ptr s) { utils::UUID uuid = s->id(); - lw_shared_ptr cf = _column_families[uuid]; + lw_shared_ptr cf = column_families[uuid]; sstring cfname = cf->schema()->cf_name(); auto sstdir = ks.column_family_directory(cfname, uuid); dblog.info("Keyspace {}: Reading CF {} ", ks_name, cfname); - return ks.make_directory_for_column_family(cfname, uuid).then([cf, sstdir] { - return cf->populate(sstdir); + return ks.make_directory_for_column_family(cfname, uuid).then([&db, sstdir, uuid, ks_name, cfname] { + return distributed_loader::populate_column_family(db, sstdir, ks_name, cfname); }).handle_exception([ks_name, cfname, sstdir](std::exception_ptr eptr) { std::string msg = sprint("Exception while populating keyspace '%s' with column family '%s' from file '%s': %s", @@ -1923,13 +1973,13 @@ future<> database::populate_keyspace(sstring datadir, sstring ks_name) { } } -future<> database::populate(sstring datadir) { - return lister::scan_dir(datadir, { directory_entry_type::directory }, [this, datadir] (directory_entry de) { +static future<> populate(distributed& db, sstring datadir) { + return lister::scan_dir(datadir, { directory_entry_type::directory }, [&db, datadir] (directory_entry de) { auto& ks_name = de.name; if (ks_name == "system") { return make_ready_future<>(); } - return populate_keyspace(datadir, ks_name); + return distributed_loader::populate_keyspace(db, datadir, ks_name); }); } @@ -2003,31 +2053,54 @@ future<> database::parse_system_tables(distributed& prox }); } -future<> -database::init_system_keyspace() { - return init_commitlog().then([this] { - bool durable = _cfg->data_file_directories().size() > 0; - db::system_keyspace::make(*this, durable, _cfg->volatile_system_keyspace_for_testing()); +future<> distributed_loader::init_system_keyspace(distributed& db) { + return seastar::async([&db] { + // We need to init commitlog on shard0 before it is inited on other shards + // because it obtains the list of pre-existing segments for replay, which must + // not include reserve segments created by active commitlogs. + db.invoke_on(0, [] (database& db) { + return db.init_commitlog(); + }).get(); + db.invoke_on_all([] (database& db) { + if (engine().cpu_id() == 0) { + return make_ready_future<>(); + } + return db.init_commitlog(); + }).get(); + + db.invoke_on_all([] (database& db) { + auto& cfg = db.get_config(); + bool durable = cfg.data_file_directories().size() > 0; + db::system_keyspace::make(db, durable, cfg.volatile_system_keyspace_for_testing()); + }).get(); + // FIXME support multiple directories - return io_check(touch_directory, _cfg->data_file_directories()[0] + "/" + db::system_keyspace::NAME).then([this] { - return populate_keyspace(_cfg->data_file_directories()[0], db::system_keyspace::NAME); - }); - }).then([this] { - auto& ks = find_keyspace(db::system_keyspace::NAME); - return parallel_for_each(ks.metadata()->cf_meta_data(), [this] (auto& pair) { - auto cfm = pair.second; - auto& cf = this->find_column_family(cfm); - cf.mark_ready_for_writes(); + const auto& cfg = db.local().get_config(); + auto data_dir = cfg.data_file_directories()[0]; + io_check(touch_directory, data_dir + "/" + db::system_keyspace::NAME).get(); + distributed_loader::populate_keyspace(db, data_dir, db::system_keyspace::NAME).get(); + + db.invoke_on_all([] (database& db) { + auto& ks = db.find_keyspace(db::system_keyspace::NAME); + for (auto& pair : ks.metadata()->cf_meta_data()) { + auto cfm = pair.second; + auto& cf = db.find_column_family(cfm); + cf.mark_ready_for_writes(); + } return make_ready_future<>(); - }); + }).get(); }); -} + } -future<> -database::load_sstables(distributed& proxy) { - return parse_system_tables(proxy).then([this] { - return populate(_cfg->data_file_directories()[0]); - }); +future<> distributed_loader::init_non_system_keyspaces(distributed& db, distributed& proxy) { + return seastar::async([&db, &proxy] { + db.invoke_on_all([&proxy] (database& db) { + return db.parse_system_tables(proxy); + }).get(); + + const auto& cfg = db.local().get_config(); + populate(db, cfg.data_file_directories()[0]).get(); + }); } future<> diff --git a/database.hh b/database.hh index 49e421620d28..8077420bbd84 100644 --- a/database.hh +++ b/database.hh @@ -483,6 +483,11 @@ private: // have not been deleted yet, so must not GC any tombstones in other sstables // that may delete data in these sstables: std::vector _sstables_compacted_but_not_deleted; + // sstables that have been opened but not loaded yet, that's because refresh + // needs to load all opened sstables atomically, and now, we open a sstable + // in all shards at the same time, which makes it hard to store all sstables + // we need to load later on for all shards. + std::vector _sstables_opened_but_not_loaded; // sstables that are shared between several shards so we want to rewrite // them (split the data belonging to this shard to a separate sstable), // but for correct compaction we need to start the compaction only after @@ -522,8 +527,8 @@ private: // Doesn't trigger compaction. void add_sstable(lw_shared_ptr sstable); // returns an empty pointer if sstable doesn't belong to current shard. - future> open_sstable(sstring dir, int64_t generation, - sstables::sstable::version_types v, sstables::sstable::format_types f); + future> open_sstable(sstables::foreign_sstable_open_info info, sstring dir, + int64_t generation, sstables::sstable::version_types v, sstables::sstable::format_types f); void load_sstable(lw_shared_ptr& sstable, bool reset_level = false); lw_shared_ptr new_memtable(); lw_shared_ptr new_streaming_memtable(); @@ -546,6 +551,12 @@ private: return (*_sstable_generation)++ * smp::count + engine().cpu_id(); } + // inverse of calculate_generation_for_new_table(), used to determine which + // shard a sstable should be opened at. + static int64_t calculate_shard_from_sstable_generation(int64_t sstable_generation) { + return sstable_generation % smp::count; + } + // Rebuild existing _sstables with new_sstables added to it and sstables_to_remove removed from it. void rebuild_sstable_list(const std::vector& new_sstables, const std::vector& sstables_to_remove); @@ -663,8 +674,6 @@ public: query::result_memory_limiter& memory_limiter, uint64_t max_result_size); - future<> populate(sstring datadir); - void start(); future<> stop(); future<> flush(); @@ -746,7 +755,6 @@ public: future snapshot_exists(sstring name); - future<> load_new_sstables(std::vector new_tables); future<> snapshot(sstring name); future<> clear_snapshot(sstring name); future> get_snapshot_details(); @@ -863,7 +871,6 @@ private: // Func signature: bool (const decorated_key& dk, const mutation_partition& mp) template future for_all_partitions(schema_ptr, Func&& func) const; - future probe_file(sstring sstdir, sstring fname); void check_valid_rp(const db::replay_position&) const; public: void start_rewrite(); @@ -874,6 +881,8 @@ public: friend std::ostream& operator<<(std::ostream& out, const column_family& cf); // Testing purposes. friend class column_family_test; + + friend class distributed_loader; }; class user_types_metadata { @@ -1099,8 +1108,6 @@ private: future<> init_commitlog(); future<> apply_in_memory(const frozen_mutation& m, schema_ptr m_schema, db::replay_position, timeout_clock::time_point timeout); - future<> populate(sstring datadir); - future<> populate_keyspace(sstring datadir, sstring ks_name); private: // Unless you are an earlier boostraper or the database itself, you should @@ -1143,9 +1150,6 @@ public: return _compaction_manager; } - future<> init_system_keyspace(); - future<> load_sstables(distributed& p); // after init_system_keyspace() - void add_column_family(keyspace& ks, schema_ptr schema, column_family::config cfg); future<> add_column_family_and_make_directory(schema_ptr schema); @@ -1245,6 +1249,8 @@ public: semaphore& system_keyspace_read_concurrency_sem() { return _system_read_concurrency_sem; } + + friend class distributed_loader; }; // FIXME: stub @@ -1252,4 +1258,16 @@ class secondary_index_manager {}; future<> update_schema_version_and_announce(distributed& proxy); +class distributed_loader { +public: + static future<> open_sstable(distributed& db, sstables::entry_descriptor comps, + std::function (column_family&, sstables::foreign_sstable_open_info)> func); + static future<> load_new_sstables(distributed& db, sstring ks, sstring cf, std::vector new_tables); + static future probe_file(distributed& db, sstring sstdir, sstring fname); + static future<> populate_column_family(distributed& db, sstring sstdir, sstring ks, sstring cf); + static future<> populate_keyspace(distributed& db, sstring datadir, sstring ks_name); + static future<> init_system_keyspace(distributed& db); + static future<> init_non_system_keyspaces(distributed& db, distributed& proxy); +}; + #endif /* DATABASE_HH_ */ diff --git a/main.cc b/main.cc index 18b212a06141..e353132bd1ee 100644 --- a/main.cc +++ b/main.cc @@ -458,26 +458,15 @@ int main(int ac, char** av) { return disk_sanity(pathname, db.local().get_config().developer_mode()); }).get(); - // Deletion of previous stale, temporary SSTables is done by Shard0. Therefore, - // let's run Shard0 first. Technically, we could just have all shards agree on - // the deletion and just delete it later, but that is prone to races. - // - // Those races are not supposed to happen during normal operation, but if we have - // bugs, they can. Scylla's Github Issue #1014 is an example of a situation where - // that can happen, making existing problems worse. So running a single shard first - // and getting making sure that all temporary tables are deleted provides extra - // protection against such situations. - // - // We also need to init commitlog on shard0 before it is inited on other shards - // because it obtains the list of pre-existing segments for replay, which must - // not include reserve segments created by active commitlogs. - db.invoke_on(0, [] (database& db) { return db.init_system_keyspace(); }).get(); - db.invoke_on_all([] (database& db) { - if (engine().cpu_id() == 0) { - return make_ready_future<>(); - } - return db.init_system_keyspace(); - }).get(); + // Initialization of a keyspace is done by shard 0 only. For system + // keyspace, the procedure will go through the hardcoded column + // families, and in each of them, it will load the sstables for all + // shards using distributed database object. + // Iteration through column family directory for sstable loading is + // done only by shard 0, so we'll no longer face race conditions as + // described here: https://github.com/scylladb/scylla/issues/1014 + distributed_loader::init_system_keyspace(db).get(); + supervisor::notify("starting gossip"); // Moved local parameters here, esp since with the // ssl stuff it gets to be a lot. @@ -529,15 +518,7 @@ int main(int ac, char** av) { return ks.make_directory_for_column_family(cfm->cf_name(), cfm->id()); }).get(); supervisor::notify("loading sstables"); - // See comment on top of our call to init_system_keyspace as per why we invoke - // on Shard0 first. Scylla's Github Issue #1014 for details - db.invoke_on(0, [&proxy] (database& db) { return db.load_sstables(proxy); }).get(); - db.invoke_on_all([&proxy] (database& db) { - if (engine().cpu_id() == 0) { - return make_ready_future<>(); - } - return db.load_sstables(proxy); - }).get(); + distributed_loader::init_non_system_keyspaces(db, proxy).get(); supervisor::notify("setting up system keyspace"); db::system_keyspace::setup(db, qp).get(); supervisor::notify("starting commit log"); diff --git a/service/storage_service.cc b/service/storage_service.cc index a9a5309b1527..747c4d6698d3 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -2829,11 +2829,8 @@ future<> storage_service::load_new_sstables(sstring ks_name, sstring cf_name) { return make_ready_future>(std::move(new_tables)); }); }).then([this, ks_name, cf_name] (std::vector new_tables) { - return _db.invoke_on_all([ks_name = std::move(ks_name), cf_name = std::move(cf_name), new_tables = std::move(new_tables)] (database& db) { - auto& cf = db.find_column_family(ks_name, cf_name); - return cf.load_new_sstables(new_tables).then([ks_name = std::move(ks_name), cf_name = std::move(cf_name)] { - logger.info("Done loading new SSTables for {}.{}", ks_name, cf_name); - }); + return distributed_loader::load_new_sstables(_db, ks_name, cf_name, std::move(new_tables)).then([ks_name, cf_name] { + logger.info("Done loading new SSTables for {}.{} for all shards", ks_name, cf_name); }); }).finally([this] { _loading_new_sstables = false; diff --git a/sstables/sstables.cc b/sstables/sstables.cc index a8d51588942e..ed4d16c565a8 100644 --- a/sstables/sstables.cc +++ b/sstables/sstables.cc @@ -1147,32 +1147,35 @@ future<> sstable::open_data() { .then([this] (auto files) { _index_file = std::get(std::get<0>(files).get()); _data_file = std::get(std::get<1>(files).get()); - return _data_file.stat().then([this] (struct stat st) { - if (this->has_component(sstable::component_type::CompressionInfo)) { - _components->compression.update(st.st_size); - } else { - _data_file_size = st.st_size; - } - _data_file_write_time = db_clock::from_time_t(st.st_mtime); - }).then([this] { - return _index_file.size().then([this] (auto size) { - _index_file_size = size; - }); - }).then([this] { - this->set_clustering_components_ranges(); - this->set_first_and_last_keys(); - - // Get disk usage for this sstable (includes all components). - _bytes_on_disk = 0; - return do_for_each(_recognized_components, [this] (component_type c) { - return this->sstable_write_io_check([&] { - return engine().file_size(this->filename(c)); - }).then([this] (uint64_t bytes) { - _bytes_on_disk += bytes; - }); - }); + return this->update_info_for_opened_data(); + }); +} + +future<> sstable::update_info_for_opened_data() { + return _data_file.stat().then([this] (struct stat st) { + if (this->has_component(sstable::component_type::CompressionInfo)) { + _components->compression.update(st.st_size); + } else { + _data_file_size = st.st_size; + } + _data_file_write_time = db_clock::from_time_t(st.st_mtime); + }).then([this] { + return _index_file.size().then([this] (auto size) { + _index_file_size = size; }); + }).then([this] { + this->set_clustering_components_ranges(); + this->set_first_and_last_keys(); + // Get disk usage for this sstable (includes all components). + _bytes_on_disk = 0; + return do_for_each(_recognized_components, [this] (component_type c) { + return this->sstable_write_io_check([&] { + return engine().file_size(this->filename(c)); + }).then([this] (uint64_t bytes) { + _bytes_on_disk += bytes; + }); + }); }); } @@ -1212,6 +1215,26 @@ future<> sstable::load() { }); } +future<> sstable::load(sstables::foreign_sstable_open_info info) { + return read_toc().then([this, info = std::move(info)] () mutable { + _components = std::move(info.components); + _data_file = make_checked_file(_read_error_handler, info.data.to_file()); + _index_file = make_checked_file(_read_error_handler, info.index.to_file()); + validate_min_max_metadata(); + return update_info_for_opened_data(); + }); +} + +future sstable::load_shared_components(const schema_ptr& s, sstring dir, int generation, version_types v, format_types f) { + auto sst = make_lw_shared(s, dir, generation, v, f); + return sst->load().then([sst] () mutable { + auto shards = sst->get_shards_for_this_sstable(); + auto info = sstable_open_info{make_lw_shared(std::move(*sst->_components)), + std::move(shards), std::move(sst->_data_file), std::move(sst->_index_file)}; + return make_ready_future(std::move(info)); + }); +} + static void output_promoted_index_entry(bytes_ostream& promoted_index, const bytes& first_col, const bytes& last_col, diff --git a/sstables/sstables.hh b/sstables/sstables.hh index 24df96b80d83..6898f26f9941 100644 --- a/sstables/sstables.hh +++ b/sstables/sstables.hh @@ -114,6 +114,8 @@ public: class key; class sstable_writer; +struct foreign_sstable_open_info; +struct sstable_open_info; using index_list = std::vector; class index_reader; @@ -235,8 +237,14 @@ public: static future<> remove_sstable_with_temp_toc(sstring ks, sstring cf, sstring dir, int64_t generation, version_types v, format_types f); + // load sstable using components shared by a shard + future<> load(foreign_sstable_open_info info); + // load all components from disk + // this variant will be useful for testing purposes and also when loading + // a new sstable from scratch for sharing its components. future<> load(); future<> open_data(); + future<> update_info_for_opened_data(); future<> set_generation(int64_t generation); @@ -693,6 +701,9 @@ public: // relevant to the current shard, thus can be deleted by the deletion manager. static void mark_sstable_for_deletion(const schema_ptr& schema, sstring dir, int64_t generation, version_types v, format_types f); + // returns all info needed for a sstable to be shared with other shards. + static future load_shared_components(const schema_ptr& s, sstring dir, int generation, version_types v, format_types f); + // Allow the test cases from sstable_test.cc to test private methods. We use // a placeholder to avoid cluttering this class too much. The sstable_test class // will then re-export as public every method it needs. @@ -809,4 +820,21 @@ public: void consume_end_of_stream(); }; +// contains data for loading a sstable using components shared by a single shard; +// can be moved across shards +struct foreign_sstable_open_info { + foreign_ptr> components; + std::vector owners; + seastar::file_handle data; + seastar::file_handle index; +}; + +// can only be used locally +struct sstable_open_info { + lw_shared_ptr components; + std::vector owners; + file data; + file index; +}; + } diff --git a/tests/cql_test_env.cc b/tests/cql_test_env.cc index e6c15298d23a..3ecd74ff3e67 100644 --- a/tests/cql_test_env.cc +++ b/tests/cql_test_env.cc @@ -309,9 +309,7 @@ class single_node_cql_env : public cql_test_env { bm.start(std::ref(qp)).get(); auto stop_bm = defer([&bm] { bm.stop().get(); }); - db->invoke_on_all([] (database& db) { - return db.init_system_keyspace(); - }).get(); + distributed_loader::init_system_keyspace(*db).get(); auto& ks = db->local().find_keyspace(db::system_keyspace::NAME); parallel_for_each(ks.metadata()->cf_meta_data(), [&ks] (auto& pair) {