Skip to content

Commit

Permalink
exceptions: Shutdown communications on non file I/O errors
Browse files Browse the repository at this point in the history
Apply the same treatment to non file filesystem I/O errors.

Signed-off-by: Benoît Canet <benoit@scylladb.com>
Message-Id: <1458154098-9977-2-git-send-email-benoit@scylladb.com>
  • Loading branch information
Benoît Canet authored and avikivity committed Mar 17, 2016
1 parent 1fb9a48 commit 3b1d3d9
Show file tree
Hide file tree
Showing 7 changed files with 113 additions and 57 deletions.
11 changes: 11 additions & 0 deletions checked-file-impl.hh
Original file line number Diff line number Diff line change
Expand Up @@ -134,3 +134,14 @@ inline open_checked_file_dma(disk_error_signal_type& signal,
});
});
}

future<file>
inline open_checked_directory(disk_error_signal_type& signal,
sstring name)
{
return do_io_check(signal, [&] {
return engine().open_directory(name).then([&] (file f) {
return make_ready_future<file>(make_checked_file(signal, f));
});
});
}
37 changes: 19 additions & 18 deletions database.cc
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
#include "service/priority_manager.hh"

#include "checked-file-impl.hh"
#include "disk-error-handler.hh"

using namespace std::chrono_literals;

Expand Down Expand Up @@ -432,9 +433,9 @@ class lister {


future<> lister::scan_dir(sstring name, lister::dir_entry_types type, walker_type walker, filter_type filter) {
return engine().open_directory(name).then([type, walker = std::move(walker), filter = std::move(filter), name] (file f) {
auto l = make_lw_shared<lister>(std::move(f), type, walker, filter, name);
return l->done().then([l] { });
return open_checked_directory(general_disk_error, name).then([type, walker = std::move(walker), filter = std::move(filter), name] (file f) {
auto l = make_lw_shared<lister>(std::move(f), type, walker, filter, name);
return l->done().then([l] { });
});
}

Expand Down Expand Up @@ -1202,7 +1203,7 @@ database::init_system_keyspace() {
db::system_keyspace::make(*this, durable, _cfg->volatile_system_keyspace_for_testing());

// FIXME support multiple directories
return touch_directory(_cfg->data_file_directories()[0] + "/" + db::system_keyspace::NAME).then([this] {
return io_check(touch_directory, _cfg->data_file_directories()[0] + "/" + db::system_keyspace::NAME).then([this] {
return populate_keyspace(_cfg->data_file_directories()[0], db::system_keyspace::NAME).then([this]() {
return init_commitlog();
});
Expand Down Expand Up @@ -1441,7 +1442,7 @@ keyspace::column_family_directory(const sstring& name, utils::UUID uuid) const {

future<>
keyspace::make_directory_for_column_family(const sstring& name, utils::UUID uuid) {
return touch_directory(column_family_directory(name, uuid));
return io_check(touch_directory, column_family_directory(name, uuid));
}

no_such_keyspace::no_such_keyspace(const sstring& ks_name)
Expand Down Expand Up @@ -1507,7 +1508,7 @@ database::create_keyspace(const lw_shared_ptr<keyspace_metadata>& ksm) {
create_in_memory_keyspace(ksm);
auto& datadir = _keyspaces.at(ksm->name()).datadir();
if (datadir != "") {
return touch_directory(datadir);
return io_check(touch_directory, datadir);
} else {
return make_ready_future<>();
}
Expand Down Expand Up @@ -2062,7 +2063,7 @@ seal_snapshot(sstring jsondir) {

dblog.debug("Storing manifest {}", jsonfile);

return recursive_touch_directory(jsondir).then([jsonfile, json = std::move(json)] {
return io_check(recursive_touch_directory, jsondir).then([jsonfile, json = std::move(json)] {
return open_checked_file_dma(general_disk_error, jsonfile, open_flags::wo | open_flags::create | open_flags::truncate).then([json](file f) {
return do_with(make_file_output_stream(std::move(f)), [json] (output_stream<char>& out) {
return out.write(json.c_str(), json.size()).then([&out] {
Expand All @@ -2073,7 +2074,7 @@ seal_snapshot(sstring jsondir) {
});
});
}).then([jsondir] {
return sync_directory(std::move(jsondir));
return io_check(sync_directory, std::move(jsondir));
}).finally([jsondir] {
pending_snapshots.erase(jsondir);
return make_ready_future<>();
Expand All @@ -2088,7 +2089,7 @@ future<> column_family::snapshot(sstring name) {

return parallel_for_each(tables, [name](sstables::shared_sstable sstable) {
auto dir = sstable->get_dir() + "/snapshots/" + name;
return recursive_touch_directory(dir).then([sstable, dir] {
return io_check(recursive_touch_directory, dir).then([sstable, dir] {
return sstable->create_links(dir).then_wrapped([] (future<> f) {
// If the SSTables are shared, one of the CPUs will fail here.
// That is completely fine, though. We only need one link.
Expand All @@ -2106,7 +2107,7 @@ future<> column_family::snapshot(sstring name) {
// This is not just an optimization. If we have no files, jsondir may not have been created,
// and sync_directory would throw.
if (tables.size()) {
return sync_directory(std::move(jsondir));
return io_check(sync_directory, std::move(jsondir));
} else {
return make_ready_future<>();
}
Expand Down Expand Up @@ -2151,7 +2152,7 @@ future<> column_family::snapshot(sstring name) {

future<bool> column_family::snapshot_exists(sstring tag) {
sstring jsondir = _config.datadir + "/snapshots/" + tag;
return engine().open_directory(std::move(jsondir)).then_wrapped([] (future<file> f) {
return open_checked_directory(general_disk_error, std::move(jsondir)).then_wrapped([] (future<file> f) {
try {
f.get0();
return make_ready_future<bool>(true);
Expand Down Expand Up @@ -2200,20 +2201,20 @@ future<> column_family::clear_snapshot(sstring tag) {
}
auto newdir = curr_dir + "/" + de.name;
recurse = lister::scan_dir(newdir, dir_and_files, [this, curr_dir = newdir] (directory_entry de) {
return remove_file(curr_dir + "/" + de.name);
return io_check(remove_file, curr_dir + "/" + de.name);
});
}
return recurse.then([fname = curr_dir + "/" + de.name] {
return remove_file(fname);
return io_check(remove_file, fname);
});
}).then_wrapped([jsondir] (future<> f) {
// Fine if directory does not exist. If it did, we delete it
if (file_missing(std::move(f)) == missing::no) {
return remove_file(jsondir);
return io_check(remove_file, jsondir);
}
return make_ready_future<>();
}).then([parent] {
return sync_directory(parent).then_wrapped([] (future<> f) {
return io_check(sync_directory, parent).then_wrapped([] (future<> f) {
// Should always exist for empty tags, but may not exist for a single tag if we never took
// snapshots. We will check this here just to mask out the exception, without silencing
// unexpected ones.
Expand All @@ -2226,7 +2227,7 @@ future<> column_family::clear_snapshot(sstring tag) {
future<std::unordered_map<sstring, column_family::snapshot_details>> column_family::get_snapshot_details() {
std::unordered_map<sstring, snapshot_details> all_snapshots;
return do_with(std::move(all_snapshots), [this] (auto& all_snapshots) {
return engine().file_exists(_config.datadir + "/snapshots").then([this, &all_snapshots](bool file_exists) {
return io_check([&] { return engine().file_exists(_config.datadir + "/snapshots"); }).then([this, &all_snapshots](bool file_exists) {
if (!file_exists) {
return make_ready_future<>();
}
Expand All @@ -2235,7 +2236,7 @@ future<std::unordered_map<sstring, column_family::snapshot_details>> column_fami
auto snapshot = _config.datadir + "/snapshots/" + snapshot_name;
all_snapshots.emplace(snapshot_name, snapshot_details());
return lister::scan_dir(snapshot, { directory_entry_type::regular }, [this, &all_snapshots, snapshot, snapshot_name] (directory_entry de) {
return file_size(snapshot + "/" + de.name).then([this, &all_snapshots, snapshot_name, name = de.name] (auto size) {
return io_check(file_size, snapshot + "/" + de.name).then([this, &all_snapshots, snapshot_name, name = de.name] (auto size) {
// The manifest is the only file expected to be in this directory not belonging to the SSTable.
// For it, we account the total size, but zero it for the true size calculation.
//
Expand All @@ -2251,7 +2252,7 @@ future<std::unordered_map<sstring, column_family::snapshot_details>> column_fami
}).then([this, &all_snapshots, snapshot_name, name = de.name] (auto size) {
// FIXME: When we support multiple data directories, the file may not necessarily
// live in this same location. May have to test others as well.
return file_size(_config.datadir + "/" + name).then_wrapped([&all_snapshots, snapshot_name, size] (auto fut) {
return io_check(file_size, _config.datadir + "/" + name).then_wrapped([&all_snapshots, snapshot_name, size] (auto fut) {
try {
// File exists in the main SSTable directory. Snapshots are not contributing to size
fut.get0();
Expand Down
9 changes: 7 additions & 2 deletions db/commitlog/commitlog.cc
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
#include <boost/range/adaptor/transformed.hpp>

#include "checked-file-impl.hh"
#include "disk-error-handler.hh"

static logging::logger logger("commitlog");

Expand Down Expand Up @@ -464,7 +465,7 @@ class db::commitlog::segment: public enable_lw_shared_from_this<segment> {
++_segment_manager->totals.segments_destroyed;
_segment_manager->totals.total_size_on_disk -= size_on_disk();
_segment_manager->totals.total_size -= (size_on_disk() + _buffer.size());
::unlink(
commit_io_check(::unlink,
(_segment_manager->cfg.commit_log_location + "/" + _desc.filename()).c_str());
} else {
logger.warn("Segment {} is dirty and is left on disk.", *this);
Expand Down Expand Up @@ -897,7 +898,7 @@ db::commitlog::segment_manager::list_descriptors(sstring dirname) {
}
};

return engine().open_directory(dirname).then([this, dirname](file dir) {
return open_checked_directory(commit_error, dirname).then([this, dirname](file dir) {
auto h = make_lw_shared<helper>(std::move(dirname), std::move(dir));
return h->done().then([h]() {
return make_ready_future<std::vector<db::commitlog::descriptor>>(std::move(h->_result));
Expand Down Expand Up @@ -1443,6 +1444,8 @@ const db::commitlog::config& db::commitlog::active_config() const {
return _segment_manager->cfg;
}

// No commit_io_check needed in the log reader since the database will fail
// on error at startup if required
future<std::unique_ptr<subscription<temporary_buffer<char>, db::replay_position>>>
db::commitlog::read_log_file(const sstring& filename, commit_load_reader_func next, position_type off) {
return open_checked_file_dma(commit_error, filename, open_flags::ro).then([next = std::move(next), off](file f) {
Expand All @@ -1451,6 +1454,8 @@ db::commitlog::read_log_file(const sstring& filename, commit_load_reader_func ne
});
}

// No commit_io_check needed in the log reader since the database will fail
// on error at startup if required
subscription<temporary_buffer<char>, db::replay_position>
db::commitlog::read_log_file(file f, commit_load_reader_func next, position_type off) {
struct work {
Expand Down
21 changes: 21 additions & 0 deletions disk-error-handler.hh
Original file line number Diff line number Diff line change
Expand Up @@ -77,3 +77,24 @@ auto do_io_check(disk_error_signal_type& signal, Func&& func, Args&&... args) {
throw;
}
}

template<typename Func, typename... Args>
auto commit_io_check(Func&& func, Args&&... args) {
return do_io_check(commit_error, func, std::forward<Args>(args)...);
}

template<typename Func, typename... Args>
auto sstable_read_io_check(Func&& func, Args&&... args) {
return do_io_check(sstable_read_error, func, std::forward<Args>(args)...);
}

template<typename Func, typename... Args>
auto sstable_write_io_check(Func&& func, Args&&... args) {
return do_io_check(sstable_write_error, func, std::forward<Args>(args)...);
}

template<typename Func, typename... Args>
auto io_check(Func&& func, Args&&... args) {
return do_io_check(general_disk_error, func, std::forward<Args>(args)...);
}

2 changes: 1 addition & 1 deletion main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ static void apply_logger_settings(sstring default_level, db::config::string_map
class directories {
public:
future<> touch_and_lock(sstring path) {
return recursive_touch_directory(path).then_wrapped([this, path] (future<> f) {
return io_check(recursive_touch_directory, path).then_wrapped([this, path] (future<> f) {
try {
f.get();
return utils::file_lock::acquire(path + "/.lock").then([this](utils::file_lock lock) {
Expand Down
6 changes: 5 additions & 1 deletion sstables/filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
#include "sstables.hh"
#include "utils/bloom_filter.hh"

#include "disk-error-handler.hh"

namespace sstables {

future<> sstable::read_filter(const io_priority_class& pc) {
Expand All @@ -43,7 +45,9 @@ future<> sstable::read_filter(const io_priority_class& pc) {
bs.load(filter.buckets.elements.begin(), filter.buckets.elements.end());
_filter = utils::filter::create_filter(filter.hashes, std::move(bs));
}).then([this] {
return engine().file_size(this->filename(sstable::component_type::Filter));
return io_check([&] {
return engine().file_size(this->filename(sstable::component_type::Filter));
});
});
}).then([this] (auto size) {
_filter_file_size = size;
Expand Down
Loading

0 comments on commit 3b1d3d9

Please sign in to comment.