Skip to content

Commit

Permalink
db: avoid excessive memory usage during resharding
Browse files Browse the repository at this point in the history
After resharding, sstables may be owned by all shards, which
means that file descriptors and memory usage for metadata will
increase by a factor equal to number of shards. That can easily
lead to OOM.

SSTable components are immutable, so they can be stored in one
shard and shared with others that need it. We use the following
formula to decide which shard will open the sstable and share
it with the others: (generation % smp::count), which is the
inverse of how we calculate generation for new sstables.
So if no resharding is performed, everything is shard-local.
With this approach, resource usage due to loaded sstables will
be evenly distributed among shards.

For this approach to work, we now only populate keyspaces from
shard 0. It's now the sole responsible for iterating through
column family dirs. In addition, most of population functions
are now free and take distributed database object as parameter.

Fixes scylladb#1951.

Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
  • Loading branch information
raphaelsc committed Jan 9, 2017
1 parent 9200e38 commit 68dfcf5
Show file tree
Hide file tree
Showing 8 changed files with 311 additions and 193 deletions.
2 changes: 1 addition & 1 deletion checked-file-impl.hh
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ private:
file _file;
};

inline file make_checked_file(const io_error_handler& error_handler, file& f)
inline file make_checked_file(const io_error_handler& error_handler, file f)
{
return file(::make_shared<checked_file_impl>(error_handler, f));
}
Expand Down
313 changes: 193 additions & 120 deletions database.cc

Large diffs are not rendered by default.

40 changes: 29 additions & 11 deletions database.hh
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,11 @@ private:
// have not been deleted yet, so must not GC any tombstones in other sstables
// that may delete data in these sstables:
std::vector<sstables::shared_sstable> _sstables_compacted_but_not_deleted;
// sstables that have been opened but not loaded yet, that's because refresh
// needs to load all opened sstables atomically, and now, we open a sstable
// in all shards at the same time, which makes it hard to store all sstables
// we need to load later on for all shards.
std::vector<sstables::shared_sstable> _sstables_opened_but_not_loaded;
// sstables that are shared between several shards so we want to rewrite
// them (split the data belonging to this shard to a separate sstable),
// but for correct compaction we need to start the compaction only after
Expand Down Expand Up @@ -522,8 +527,8 @@ private:
// Doesn't trigger compaction.
void add_sstable(lw_shared_ptr<sstables::sstable> sstable);
// returns an empty pointer if sstable doesn't belong to current shard.
future<lw_shared_ptr<sstables::sstable>> open_sstable(sstring dir, int64_t generation,
sstables::sstable::version_types v, sstables::sstable::format_types f);
future<lw_shared_ptr<sstables::sstable>> open_sstable(sstables::foreign_sstable_open_info info, sstring dir,
int64_t generation, sstables::sstable::version_types v, sstables::sstable::format_types f);
void load_sstable(lw_shared_ptr<sstables::sstable>& sstable, bool reset_level = false);
lw_shared_ptr<memtable> new_memtable();
lw_shared_ptr<memtable> new_streaming_memtable();
Expand All @@ -546,6 +551,12 @@ private:
return (*_sstable_generation)++ * smp::count + engine().cpu_id();
}

// inverse of calculate_generation_for_new_table(), used to determine which
// shard a sstable should be opened at.
static int64_t calculate_shard_from_sstable_generation(int64_t sstable_generation) {
return sstable_generation % smp::count;
}

// Rebuild existing _sstables with new_sstables added to it and sstables_to_remove removed from it.
void rebuild_sstable_list(const std::vector<sstables::shared_sstable>& new_sstables,
const std::vector<sstables::shared_sstable>& sstables_to_remove);
Expand Down Expand Up @@ -663,8 +674,6 @@ public:
query::result_memory_limiter& memory_limiter,
uint64_t max_result_size);

future<> populate(sstring datadir);

void start();
future<> stop();
future<> flush();
Expand Down Expand Up @@ -746,7 +755,6 @@ public:

future<bool> snapshot_exists(sstring name);

future<> load_new_sstables(std::vector<sstables::entry_descriptor> new_tables);
future<> snapshot(sstring name);
future<> clear_snapshot(sstring name);
future<std::unordered_map<sstring, snapshot_details>> get_snapshot_details();
Expand Down Expand Up @@ -863,7 +871,6 @@ private:
// Func signature: bool (const decorated_key& dk, const mutation_partition& mp)
template <typename Func>
future<bool> for_all_partitions(schema_ptr, Func&& func) const;
future<sstables::entry_descriptor> probe_file(sstring sstdir, sstring fname);
void check_valid_rp(const db::replay_position&) const;
public:
void start_rewrite();
Expand All @@ -874,6 +881,8 @@ public:
friend std::ostream& operator<<(std::ostream& out, const column_family& cf);
// Testing purposes.
friend class column_family_test;

friend class distributed_loader;
};

class user_types_metadata {
Expand Down Expand Up @@ -1099,8 +1108,6 @@ private:

future<> init_commitlog();
future<> apply_in_memory(const frozen_mutation& m, schema_ptr m_schema, db::replay_position, timeout_clock::time_point timeout);
future<> populate(sstring datadir);
future<> populate_keyspace(sstring datadir, sstring ks_name);

private:
// Unless you are an earlier boostraper or the database itself, you should
Expand Down Expand Up @@ -1143,9 +1150,6 @@ public:
return _compaction_manager;
}

future<> init_system_keyspace();
future<> load_sstables(distributed<service::storage_proxy>& p); // after init_system_keyspace()

void add_column_family(keyspace& ks, schema_ptr schema, column_family::config cfg);
future<> add_column_family_and_make_directory(schema_ptr schema);

Expand Down Expand Up @@ -1245,11 +1249,25 @@ public:
semaphore& system_keyspace_read_concurrency_sem() {
return _system_read_concurrency_sem;
}

friend class distributed_loader;
};

// FIXME: stub
class secondary_index_manager {};

future<> update_schema_version_and_announce(distributed<service::storage_proxy>& proxy);

class distributed_loader {
public:
static future<> open_sstable(distributed<database>& db, sstables::entry_descriptor comps,
std::function<future<> (column_family&, sstables::foreign_sstable_open_info)> func);
static future<> load_new_sstables(distributed<database>& db, sstring ks, sstring cf, std::vector<sstables::entry_descriptor> new_tables);
static future<sstables::entry_descriptor> probe_file(distributed<database>& db, sstring sstdir, sstring fname);
static future<> populate_column_family(distributed<database>& db, sstring sstdir, sstring ks, sstring cf);
static future<> populate_keyspace(distributed<database>& db, sstring datadir, sstring ks_name);
static future<> init_system_keyspace(distributed<database>& db);
static future<> init_non_system_keyspaces(distributed<database>& db, distributed<service::storage_proxy>& proxy);
};

#endif /* DATABASE_HH_ */
39 changes: 10 additions & 29 deletions main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -458,26 +458,15 @@ int main(int ac, char** av) {
return disk_sanity(pathname, db.local().get_config().developer_mode());
}).get();

// Deletion of previous stale, temporary SSTables is done by Shard0. Therefore,
// let's run Shard0 first. Technically, we could just have all shards agree on
// the deletion and just delete it later, but that is prone to races.
//
// Those races are not supposed to happen during normal operation, but if we have
// bugs, they can. Scylla's Github Issue #1014 is an example of a situation where
// that can happen, making existing problems worse. So running a single shard first
// and getting making sure that all temporary tables are deleted provides extra
// protection against such situations.
//
// We also need to init commitlog on shard0 before it is inited on other shards
// because it obtains the list of pre-existing segments for replay, which must
// not include reserve segments created by active commitlogs.
db.invoke_on(0, [] (database& db) { return db.init_system_keyspace(); }).get();
db.invoke_on_all([] (database& db) {
if (engine().cpu_id() == 0) {
return make_ready_future<>();
}
return db.init_system_keyspace();
}).get();
// Initialization of a keyspace is done by shard 0 only. For system
// keyspace, the procedure will go through the hardcoded column
// families, and in each of them, it will load the sstables for all
// shards using distributed database object.
// Iteration through column family directory for sstable loading is
// done only by shard 0, so we'll no longer face race conditions as
// described here: https://github.com/scylladb/scylla/issues/1014
distributed_loader::init_system_keyspace(db).get();

supervisor::notify("starting gossip");
// Moved local parameters here, esp since with the
// ssl stuff it gets to be a lot.
Expand Down Expand Up @@ -529,15 +518,7 @@ int main(int ac, char** av) {
return ks.make_directory_for_column_family(cfm->cf_name(), cfm->id());
}).get();
supervisor::notify("loading sstables");
// See comment on top of our call to init_system_keyspace as per why we invoke
// on Shard0 first. Scylla's Github Issue #1014 for details
db.invoke_on(0, [&proxy] (database& db) { return db.load_sstables(proxy); }).get();
db.invoke_on_all([&proxy] (database& db) {
if (engine().cpu_id() == 0) {
return make_ready_future<>();
}
return db.load_sstables(proxy);
}).get();
distributed_loader::init_non_system_keyspaces(db, proxy).get();
supervisor::notify("setting up system keyspace");
db::system_keyspace::setup(db, qp).get();
supervisor::notify("starting commit log");
Expand Down
7 changes: 2 additions & 5 deletions service/storage_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2829,11 +2829,8 @@ future<> storage_service::load_new_sstables(sstring ks_name, sstring cf_name) {
return make_ready_future<std::vector<sstables::entry_descriptor>>(std::move(new_tables));
});
}).then([this, ks_name, cf_name] (std::vector<sstables::entry_descriptor> new_tables) {
return _db.invoke_on_all([ks_name = std::move(ks_name), cf_name = std::move(cf_name), new_tables = std::move(new_tables)] (database& db) {
auto& cf = db.find_column_family(ks_name, cf_name);
return cf.load_new_sstables(new_tables).then([ks_name = std::move(ks_name), cf_name = std::move(cf_name)] {
logger.info("Done loading new SSTables for {}.{}", ks_name, cf_name);
});
return distributed_loader::load_new_sstables(_db, ks_name, cf_name, std::move(new_tables)).then([ks_name, cf_name] {
logger.info("Done loading new SSTables for {}.{} for all shards", ks_name, cf_name);
});
}).finally([this] {
_loading_new_sstables = false;
Expand Down
71 changes: 47 additions & 24 deletions sstables/sstables.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1147,32 +1147,35 @@ future<> sstable::open_data() {
.then([this] (auto files) {
_index_file = std::get<file>(std::get<0>(files).get());
_data_file = std::get<file>(std::get<1>(files).get());
return _data_file.stat().then([this] (struct stat st) {
if (this->has_component(sstable::component_type::CompressionInfo)) {
_components->compression.update(st.st_size);
} else {
_data_file_size = st.st_size;
}
_data_file_write_time = db_clock::from_time_t(st.st_mtime);
}).then([this] {
return _index_file.size().then([this] (auto size) {
_index_file_size = size;
});
}).then([this] {
this->set_clustering_components_ranges();
this->set_first_and_last_keys();

// Get disk usage for this sstable (includes all components).
_bytes_on_disk = 0;
return do_for_each(_recognized_components, [this] (component_type c) {
return this->sstable_write_io_check([&] {
return engine().file_size(this->filename(c));
}).then([this] (uint64_t bytes) {
_bytes_on_disk += bytes;
});
});
return this->update_info_for_opened_data();
});
}

future<> sstable::update_info_for_opened_data() {
return _data_file.stat().then([this] (struct stat st) {
if (this->has_component(sstable::component_type::CompressionInfo)) {
_components->compression.update(st.st_size);
} else {
_data_file_size = st.st_size;
}
_data_file_write_time = db_clock::from_time_t(st.st_mtime);
}).then([this] {
return _index_file.size().then([this] (auto size) {
_index_file_size = size;
});
}).then([this] {
this->set_clustering_components_ranges();
this->set_first_and_last_keys();

// Get disk usage for this sstable (includes all components).
_bytes_on_disk = 0;
return do_for_each(_recognized_components, [this] (component_type c) {
return this->sstable_write_io_check([&] {
return engine().file_size(this->filename(c));
}).then([this] (uint64_t bytes) {
_bytes_on_disk += bytes;
});
});
});
}

Expand Down Expand Up @@ -1212,6 +1215,26 @@ future<> sstable::load() {
});
}

future<> sstable::load(sstables::foreign_sstable_open_info info) {
return read_toc().then([this, info = std::move(info)] () mutable {
_components = std::move(info.components);
_data_file = make_checked_file(_read_error_handler, info.data.to_file());
_index_file = make_checked_file(_read_error_handler, info.index.to_file());
validate_min_max_metadata();
return update_info_for_opened_data();
});
}

future<sstable_open_info> sstable::load_shared_components(const schema_ptr& s, sstring dir, int generation, version_types v, format_types f) {
auto sst = make_lw_shared<sstables::sstable>(s, dir, generation, v, f);
return sst->load().then([sst] () mutable {
auto shards = sst->get_shards_for_this_sstable();
auto info = sstable_open_info{make_lw_shared<shareable_components>(std::move(*sst->_components)),
std::move(shards), std::move(sst->_data_file), std::move(sst->_index_file)};
return make_ready_future<sstable_open_info>(std::move(info));
});
}

static void output_promoted_index_entry(bytes_ostream& promoted_index,
const bytes& first_col,
const bytes& last_col,
Expand Down
28 changes: 28 additions & 0 deletions sstables/sstables.hh
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,8 @@ public:

class key;
class sstable_writer;
struct foreign_sstable_open_info;
struct sstable_open_info;

using index_list = std::vector<index_entry>;
class index_reader;
Expand Down Expand Up @@ -235,8 +237,14 @@ public:
static future<> remove_sstable_with_temp_toc(sstring ks, sstring cf, sstring dir, int64_t generation,
version_types v, format_types f);

// load sstable using components shared by a shard
future<> load(foreign_sstable_open_info info);
// load all components from disk
// this variant will be useful for testing purposes and also when loading
// a new sstable from scratch for sharing its components.
future<> load();
future<> open_data();
future<> update_info_for_opened_data();

future<> set_generation(int64_t generation);

Expand Down Expand Up @@ -693,6 +701,9 @@ public:
// relevant to the current shard, thus can be deleted by the deletion manager.
static void mark_sstable_for_deletion(const schema_ptr& schema, sstring dir, int64_t generation, version_types v, format_types f);

// returns all info needed for a sstable to be shared with other shards.
static future<sstable_open_info> load_shared_components(const schema_ptr& s, sstring dir, int generation, version_types v, format_types f);

// Allow the test cases from sstable_test.cc to test private methods. We use
// a placeholder to avoid cluttering this class too much. The sstable_test class
// will then re-export as public every method it needs.
Expand Down Expand Up @@ -809,4 +820,21 @@ public:
void consume_end_of_stream();
};

// contains data for loading a sstable using components shared by a single shard;
// can be moved across shards
struct foreign_sstable_open_info {
foreign_ptr<lw_shared_ptr<sstable::shareable_components>> components;
std::vector<shard_id> owners;
seastar::file_handle data;
seastar::file_handle index;
};

// can only be used locally
struct sstable_open_info {
lw_shared_ptr<sstable::shareable_components> components;
std::vector<shard_id> owners;
file data;
file index;
};

}
4 changes: 1 addition & 3 deletions tests/cql_test_env.cc
Original file line number Diff line number Diff line change
Expand Up @@ -309,9 +309,7 @@ class single_node_cql_env : public cql_test_env {
bm.start(std::ref(qp)).get();
auto stop_bm = defer([&bm] { bm.stop().get(); });

db->invoke_on_all([] (database& db) {
return db.init_system_keyspace();
}).get();
distributed_loader::init_system_keyspace(*db).get();

auto& ks = db->local().find_keyspace(db::system_keyspace::NAME);
parallel_for_each(ks.metadata()->cf_meta_data(), [&ks] (auto& pair) {
Expand Down

0 comments on commit 68dfcf5

Please sign in to comment.