Skip to content

Commit

Permalink
Merge 'commitlog: recalculate disk footprint on delete_segment except…
Browse files Browse the repository at this point in the history
…ions' from Calle Wilund

If we get errors/exceptions in delete_segments we can (and probably will) loose track of disk footprint counters. This can in turn, if using hard limits, cause us to block indefinitely on segment allocation since we might think we have larger footprint than we actually do.

Of course, if we actually fail deleting a segment, it is 100% true that we still technically hold this disk footprint (now unreachable), but for cases where for example outside forces (or wacky tests) delete a file behind our backs, this might not be true. One could also argue that our footprint is the segments and file names we keep track of, and the rest is exterior sludge.

In any case, if we have any exceptions in delete_segments, we should recalculate disk footprint based on current state, and restart all new_segment paths etc.

Fixes scylladb#9348

(Note: this is based on previous PR scylladb#9344 - so shows these commits as well. Actual changes are only the latter two).

Closes scylladb#9349

* github.com:scylladb/scylla:
  commitlog: Recalculate footprint on delete_segment exceptions
  commitlog_test: Add test for exception in alloc w. deleted underlying file
  commitlog: Ensure failed-to-create-segment is re-deleted
  commitlog::allocate_segment_ex: Don't re-throw out of function
  • Loading branch information
avikivity committed Nov 16, 2021
2 parents bf6898a + dcc73c5 commit e2c27ee
Show file tree
Hide file tree
Showing 2 changed files with 199 additions and 3 deletions.
90 changes: 87 additions & 3 deletions db/commitlog/commitlog.cc
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,8 @@ class db::commitlog::segment_manager : public ::enable_shared_from_this<segment_
void abort_recycled_list(std::exception_ptr);
void abort_deletion_promise(std::exception_ptr);

future<> recalculate_footprint();

future<> rename_file(sstring, sstring) const;
size_t max_request_controller_units() const;
segment_id_type _ids = 0;
Expand All @@ -444,6 +446,7 @@ class db::commitlog::segment_manager : public ::enable_shared_from_this<segment_
seastar::gate _gate;
uint64_t _new_counter = 0;
std::optional<size_t> _disk_write_alignment;
seastar::semaphore _reserve_recalculation_guard;
};

template<typename T>
Expand Down Expand Up @@ -1225,6 +1228,7 @@ db::commitlog::segment_manager::segment_manager(config c)
, _recycled_segments(std::numeric_limits<size_t>::max())
, _reserve_replenisher(make_ready_future<>())
, _background_sync(make_ready_future<>())
, _reserve_recalculation_guard(1)
{
assert(max_size > 0);
assert(max_mutation_size < segment::multi_entry_size_magic);
Expand All @@ -1250,6 +1254,11 @@ future<> db::commitlog::segment_manager::replenish_reserve() {
}
try {
gate::holder g(_gate);
auto guard = get_units(_reserve_recalculation_guard, 1);
if (_reserve_segments.full()) {
// can happen if we recalculate
continue;
}
// note: if we were strict with disk size, we would refuse to do this
// unless disk footprint is lower than threshold. but we cannot (yet?)
// trust that flush logic will absolutely free up an existing
Expand Down Expand Up @@ -1521,7 +1530,7 @@ future<db::commitlog::segment_manager::sseg_ptr> db::commitlog::segment_manager:

if (cfg.extensions && !cfg.extensions->commitlog_file_extensions().empty()) {
for (auto * ext : cfg.extensions->commitlog_file_extensions()) {
auto nf = co_await ext->wrap_file(std::move(filename), f, flags);
auto nf = co_await ext->wrap_file(filename, f, flags);
if (nf) {
f = std::move(nf);
align = is_overwrite ? f.disk_overwrite_dma_alignment() : f.disk_write_dma_alignment();
Expand All @@ -1531,13 +1540,17 @@ future<db::commitlog::segment_manager::sseg_ptr> db::commitlog::segment_manager:

f = make_checked_file(commit_error_handler, std::move(f));
} catch (...) {
ep = std::current_exception();
commit_error_handler(ep);
try {
commit_error_handler(std::current_exception());
} catch (...) {
ep = std::current_exception();
}
}
if (ep && f) {
co_await f.close();
}
if (ep) {
add_file_to_delete(filename, d);
co_return coroutine::exception(std::move(ep));
}

Expand Down Expand Up @@ -1867,6 +1880,8 @@ future<> db::commitlog::segment_manager::delete_segments(std::vector<sstring> fi

std::exception_ptr recycle_error;

size_t num_deleted = 0;
bool except = false;
while (!files.empty()) {
auto filename = std::move(files.back());
files.pop_back();
Expand Down Expand Up @@ -1916,8 +1931,10 @@ future<> db::commitlog::segment_manager::delete_segments(std::vector<sstring> fi
}
}
co_await delete_file(filename);
++num_deleted;
} catch (...) {
clogger.error("Could not delete segment {}: {}", filename, std::current_exception());
except = true;
}
}

Expand All @@ -1930,6 +1947,16 @@ future<> db::commitlog::segment_manager::delete_segments(std::vector<sstring> fi
if (recycle_error && _recycled_segments.empty()) {
abort_recycled_list(recycle_error);
}
// If recycle failed and turned into a delete, we should fake-wakeup waiters
// since we might still have cleaned up disk space.
if (!recycle_error && num_deleted && cfg.reuse_segments && _recycled_segments.empty()) {
abort_recycled_list(std::make_exception_ptr(std::runtime_error("deleted files")));
}

// #9348 - if we had an exception, we can't trust our bookeep any more. recalculate.
if (except) {
co_await recalculate_footprint();
}
}

void db::commitlog::segment_manager::abort_recycled_list(std::exception_ptr ep) {
Expand All @@ -1944,6 +1971,63 @@ void db::commitlog::segment_manager::abort_deletion_promise(std::exception_ptr e
std::exchange(_disk_deletions, {}).set_exception(ep);
}

future<> db::commitlog::segment_manager::recalculate_footprint() {
try {
co_await do_pending_deletes();

auto guard = get_units(_reserve_recalculation_guard, 1);
auto segments_copy = _segments;
std::vector<sseg_ptr> reserves;
std::vector<sstring> recycles;
// this causes haywire things while we steal stuff, but...
while (!_reserve_segments.empty()) {
reserves.push_back(_reserve_segments.pop());
}
while (!_recycled_segments.empty()) {
recycles.push_back(_recycled_segments.pop());
}

// first, guesstimate sizes
uint64_t recycle_size = recycles.size() * max_size;
auto old = totals.total_size_on_disk;

totals.total_size_on_disk = recycle_size;
for (auto& s : _segments) {
totals.total_size_on_disk += s->_size_on_disk;
}
for (auto& s : reserves) {
totals.total_size_on_disk += s->_size_on_disk;
}

// now we need to adjust the actual sizes of recycled files

uint64_t actual_recycled_size = 0;

try {
for (auto& filename : recycles) {
auto s = co_await seastar::file_size(filename);
actual_recycled_size += s;
}
} catch (...) {
clogger.error("Exception reading disk footprint ({}).", std::current_exception());
actual_recycled_size = recycle_size; // best we got
}

for (auto&& filename : recycles) {
_recycled_segments.push(std::move(filename));
}
for (auto&& s : reserves) {
_reserve_segments.push(std::move(s)); // you can have it back now.
}

totals.total_size_on_disk += actual_recycled_size - recycle_size;
// pushing things to reserve/recycled queues will have resumed any
// waiters, so we should be done.
} catch (...) {
clogger.error("Exception recalculating disk footprint ({}). Values might be off...", std::current_exception());
}
}

future<> db::commitlog::segment_manager::do_pending_deletes() {
auto ftc = std::exchange(_files_to_close, {});
auto ftd = std::exchange(_files_to_delete, {});
Expand Down
112 changes: 112 additions & 0 deletions test/boost/commitlog_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,9 @@
#include "test/lib/tmpdir.hh"
#include "db/commitlog/commitlog.hh"
#include "db/commitlog/commitlog_replayer.hh"
#include "db/commitlog/commitlog_extensions.hh"
#include "db/commitlog/rp_set.hh"
#include "db/extensions.hh"
#include "log.hh"
#include "service/priority_manager.hh"
#include "test/lib/exception_utils.hh"
Expand Down Expand Up @@ -947,3 +949,113 @@ SEASTAR_TEST_CASE(test_commitlog_deadlock_with_flush_threshold) {
co_await log.clear();
}
}

static future<> do_test_exception_in_allocate_ex(bool do_file_delete, bool reuse = true) {
commitlog::config cfg;

constexpr auto max_size_mb = 1;

cfg.commitlog_segment_size_in_mb = max_size_mb;
cfg.commitlog_total_space_in_mb = 2 * max_size_mb * smp::count;
cfg.commitlog_sync_period_in_ms = 10;
cfg.reuse_segments = reuse;
cfg.allow_going_over_size_limit = false; // #9348 - now can enforce size limit always
cfg.use_o_dsync = true; // make sure we pre-allocate.

// not using cl_test, because we need to be able to abandon
// the log.

tmpdir tmp;
cfg.commit_log_location = tmp.path().string();

class myfail : public std::exception {
public:
using std::exception::exception;
};

struct myext: public db::commitlog_file_extension {
public:
bool fail = false;
bool thrown = false;
bool do_file_delete;

myext(bool dd)
: do_file_delete(dd)
{}

seastar::future<seastar::file> wrap_file(const seastar::sstring& filename, seastar::file f, seastar::open_flags flags) override {
if (fail && !thrown) {
thrown = true;
if (do_file_delete) {
co_await f.close();
co_await seastar::remove_file(filename);
}
throw myfail{};
}
co_return f;
}
seastar::future<> before_delete(const seastar::sstring&) override {
co_return;
}
};

auto ep = std::make_unique<myext>(do_file_delete);
auto& mx = *ep;

db::extensions myexts;
myexts.add_commitlog_file_extension("hufflepuff", std::move(ep));

cfg.extensions = &myexts;

auto log = co_await commitlog::create_commitlog(cfg);

rp_set rps;
// uncomment for verbosity
// logging::logger_registry().set_logger_level("commitlog", logging::log_level::debug);

auto uuid = utils::UUID_gen::get_time_UUID();
auto size = log.max_record_size();

auto r = log.add_flush_handler([&](cf_id_type id, replay_position pos) {
log.discard_completed_segments(id, rps);
mx.fail = true;
});

try {
while (!mx.thrown) {
rp_handle h = co_await log.add_mutation(uuid, size, db::commitlog::force_sync::no, [&](db::commitlog::output& dst) {
dst.fill('1', size);
});
rps.put(std::move(h));
}
} catch (...) {
BOOST_FAIL("log write timed out. maybe it is deadlocked... Will not free log. ASAN errors and leaks will follow...");
}

co_await log.shutdown();
co_await log.clear();
}

/**
* Test generating an exception in segment file allocation
*/
SEASTAR_TEST_CASE(test_commitlog_exceptions_in_allocate_ex) {
co_await do_test_exception_in_allocate_ex(false);
}

SEASTAR_TEST_CASE(test_commitlog_exceptions_in_allocate_ex_no_recycle) {
co_await do_test_exception_in_allocate_ex(false, false);
}

/**
* Test generating an exception in segment file allocation, but also
* delete the file, which in turn should cause follow-up exceptions
* in cleanup delete. Which CL should handle
*/
SEASTAR_TEST_CASE(test_commitlog_exceptions_in_allocate_ex_deleted_file) {
co_await do_test_exception_in_allocate_ex(true, false);
}

SEASTAR_TEST_CASE(test_commitlog_exceptions_in_allocate_ex_deleted_file_no_recycle) {
co_await do_test_exception_in_allocate_ex(true);
}

0 comments on commit e2c27ee

Please sign in to comment.