diff --git a/db/commitlog/commitlog.cc b/db/commitlog/commitlog.cc index 05931da1562a..927d5b528da1 100644 --- a/db/commitlog/commitlog.cc +++ b/db/commitlog/commitlog.cc @@ -428,6 +428,8 @@ class db::commitlog::segment_manager : public ::enable_shared_from_this recalculate_footprint(); + future<> rename_file(sstring, sstring) const; size_t max_request_controller_units() const; segment_id_type _ids = 0; @@ -444,6 +446,7 @@ class db::commitlog::segment_manager : public ::enable_shared_from_this _disk_write_alignment; + seastar::semaphore _reserve_recalculation_guard; }; template @@ -1225,6 +1228,7 @@ db::commitlog::segment_manager::segment_manager(config c) , _recycled_segments(std::numeric_limits::max()) , _reserve_replenisher(make_ready_future<>()) , _background_sync(make_ready_future<>()) + , _reserve_recalculation_guard(1) { assert(max_size > 0); assert(max_mutation_size < segment::multi_entry_size_magic); @@ -1250,6 +1254,11 @@ future<> db::commitlog::segment_manager::replenish_reserve() { } try { gate::holder g(_gate); + auto guard = get_units(_reserve_recalculation_guard, 1); + if (_reserve_segments.full()) { + // can happen if we recalculate + continue; + } // note: if we were strict with disk size, we would refuse to do this // unless disk footprint is lower than threshold. but we cannot (yet?) // trust that flush logic will absolutely free up an existing @@ -1521,7 +1530,7 @@ future db::commitlog::segment_manager: if (cfg.extensions && !cfg.extensions->commitlog_file_extensions().empty()) { for (auto * ext : cfg.extensions->commitlog_file_extensions()) { - auto nf = co_await ext->wrap_file(std::move(filename), f, flags); + auto nf = co_await ext->wrap_file(filename, f, flags); if (nf) { f = std::move(nf); align = is_overwrite ? f.disk_overwrite_dma_alignment() : f.disk_write_dma_alignment(); @@ -1531,13 +1540,17 @@ future db::commitlog::segment_manager: f = make_checked_file(commit_error_handler, std::move(f)); } catch (...) { - ep = std::current_exception(); - commit_error_handler(ep); + try { + commit_error_handler(std::current_exception()); + } catch (...) { + ep = std::current_exception(); + } } if (ep && f) { co_await f.close(); } if (ep) { + add_file_to_delete(filename, d); co_return coroutine::exception(std::move(ep)); } @@ -1867,6 +1880,8 @@ future<> db::commitlog::segment_manager::delete_segments(std::vector fi std::exception_ptr recycle_error; + size_t num_deleted = 0; + bool except = false; while (!files.empty()) { auto filename = std::move(files.back()); files.pop_back(); @@ -1916,8 +1931,10 @@ future<> db::commitlog::segment_manager::delete_segments(std::vector fi } } co_await delete_file(filename); + ++num_deleted; } catch (...) { clogger.error("Could not delete segment {}: {}", filename, std::current_exception()); + except = true; } } @@ -1930,6 +1947,16 @@ future<> db::commitlog::segment_manager::delete_segments(std::vector fi if (recycle_error && _recycled_segments.empty()) { abort_recycled_list(recycle_error); } + // If recycle failed and turned into a delete, we should fake-wakeup waiters + // since we might still have cleaned up disk space. + if (!recycle_error && num_deleted && cfg.reuse_segments && _recycled_segments.empty()) { + abort_recycled_list(std::make_exception_ptr(std::runtime_error("deleted files"))); + } + + // #9348 - if we had an exception, we can't trust our bookeep any more. recalculate. + if (except) { + co_await recalculate_footprint(); + } } void db::commitlog::segment_manager::abort_recycled_list(std::exception_ptr ep) { @@ -1944,6 +1971,63 @@ void db::commitlog::segment_manager::abort_deletion_promise(std::exception_ptr e std::exchange(_disk_deletions, {}).set_exception(ep); } +future<> db::commitlog::segment_manager::recalculate_footprint() { + try { + co_await do_pending_deletes(); + + auto guard = get_units(_reserve_recalculation_guard, 1); + auto segments_copy = _segments; + std::vector reserves; + std::vector recycles; + // this causes haywire things while we steal stuff, but... + while (!_reserve_segments.empty()) { + reserves.push_back(_reserve_segments.pop()); + } + while (!_recycled_segments.empty()) { + recycles.push_back(_recycled_segments.pop()); + } + + // first, guesstimate sizes + uint64_t recycle_size = recycles.size() * max_size; + auto old = totals.total_size_on_disk; + + totals.total_size_on_disk = recycle_size; + for (auto& s : _segments) { + totals.total_size_on_disk += s->_size_on_disk; + } + for (auto& s : reserves) { + totals.total_size_on_disk += s->_size_on_disk; + } + + // now we need to adjust the actual sizes of recycled files + + uint64_t actual_recycled_size = 0; + + try { + for (auto& filename : recycles) { + auto s = co_await seastar::file_size(filename); + actual_recycled_size += s; + } + } catch (...) { + clogger.error("Exception reading disk footprint ({}).", std::current_exception()); + actual_recycled_size = recycle_size; // best we got + } + + for (auto&& filename : recycles) { + _recycled_segments.push(std::move(filename)); + } + for (auto&& s : reserves) { + _reserve_segments.push(std::move(s)); // you can have it back now. + } + + totals.total_size_on_disk += actual_recycled_size - recycle_size; + // pushing things to reserve/recycled queues will have resumed any + // waiters, so we should be done. + } catch (...) { + clogger.error("Exception recalculating disk footprint ({}). Values might be off...", std::current_exception()); + } +} + future<> db::commitlog::segment_manager::do_pending_deletes() { auto ftc = std::exchange(_files_to_close, {}); auto ftd = std::exchange(_files_to_delete, {}); diff --git a/test/boost/commitlog_test.cc b/test/boost/commitlog_test.cc index 87e0c3e8fcc8..83487af12eb6 100644 --- a/test/boost/commitlog_test.cc +++ b/test/boost/commitlog_test.cc @@ -44,7 +44,9 @@ #include "test/lib/tmpdir.hh" #include "db/commitlog/commitlog.hh" #include "db/commitlog/commitlog_replayer.hh" +#include "db/commitlog/commitlog_extensions.hh" #include "db/commitlog/rp_set.hh" +#include "db/extensions.hh" #include "log.hh" #include "service/priority_manager.hh" #include "test/lib/exception_utils.hh" @@ -947,3 +949,113 @@ SEASTAR_TEST_CASE(test_commitlog_deadlock_with_flush_threshold) { co_await log.clear(); } } + +static future<> do_test_exception_in_allocate_ex(bool do_file_delete, bool reuse = true) { + commitlog::config cfg; + + constexpr auto max_size_mb = 1; + + cfg.commitlog_segment_size_in_mb = max_size_mb; + cfg.commitlog_total_space_in_mb = 2 * max_size_mb * smp::count; + cfg.commitlog_sync_period_in_ms = 10; + cfg.reuse_segments = reuse; + cfg.allow_going_over_size_limit = false; // #9348 - now can enforce size limit always + cfg.use_o_dsync = true; // make sure we pre-allocate. + + // not using cl_test, because we need to be able to abandon + // the log. + + tmpdir tmp; + cfg.commit_log_location = tmp.path().string(); + + class myfail : public std::exception { + public: + using std::exception::exception; + }; + + struct myext: public db::commitlog_file_extension { + public: + bool fail = false; + bool thrown = false; + bool do_file_delete; + + myext(bool dd) + : do_file_delete(dd) + {} + + seastar::future wrap_file(const seastar::sstring& filename, seastar::file f, seastar::open_flags flags) override { + if (fail && !thrown) { + thrown = true; + if (do_file_delete) { + co_await f.close(); + co_await seastar::remove_file(filename); + } + throw myfail{}; + } + co_return f; + } + seastar::future<> before_delete(const seastar::sstring&) override { + co_return; + } + }; + + auto ep = std::make_unique(do_file_delete); + auto& mx = *ep; + + db::extensions myexts; + myexts.add_commitlog_file_extension("hufflepuff", std::move(ep)); + + cfg.extensions = &myexts; + + auto log = co_await commitlog::create_commitlog(cfg); + + rp_set rps; + // uncomment for verbosity + // logging::logger_registry().set_logger_level("commitlog", logging::log_level::debug); + + auto uuid = utils::UUID_gen::get_time_UUID(); + auto size = log.max_record_size(); + + auto r = log.add_flush_handler([&](cf_id_type id, replay_position pos) { + log.discard_completed_segments(id, rps); + mx.fail = true; + }); + + try { + while (!mx.thrown) { + rp_handle h = co_await log.add_mutation(uuid, size, db::commitlog::force_sync::no, [&](db::commitlog::output& dst) { + dst.fill('1', size); + }); + rps.put(std::move(h)); + } + } catch (...) { + BOOST_FAIL("log write timed out. maybe it is deadlocked... Will not free log. ASAN errors and leaks will follow..."); + } + + co_await log.shutdown(); + co_await log.clear(); +} + +/** + * Test generating an exception in segment file allocation + */ +SEASTAR_TEST_CASE(test_commitlog_exceptions_in_allocate_ex) { + co_await do_test_exception_in_allocate_ex(false); +} + +SEASTAR_TEST_CASE(test_commitlog_exceptions_in_allocate_ex_no_recycle) { + co_await do_test_exception_in_allocate_ex(false, false); +} + +/** + * Test generating an exception in segment file allocation, but also + * delete the file, which in turn should cause follow-up exceptions + * in cleanup delete. Which CL should handle + */ +SEASTAR_TEST_CASE(test_commitlog_exceptions_in_allocate_ex_deleted_file) { + co_await do_test_exception_in_allocate_ex(true, false); +} + +SEASTAR_TEST_CASE(test_commitlog_exceptions_in_allocate_ex_deleted_file_no_recycle) { + co_await do_test_exception_in_allocate_ex(true); +}