Skip to content

Commit

Permalink
Merge pull request ceph#57725 from Matan-B/wip-matanb-crimson-load-exist
Browse files Browse the repository at this point in the history
crimson/osd/object_context_loader: Fix obc cache existence usage

Reviewed-by: Aishwarya Mathuria <amathuri@redhat.com>
Reviewed-by: Samuel Just <sjust@redhat.com>
Reviewed-by: Xuehan Xu <xuxuehan@qianxin.com>
Reviewed-by: Yingxin Cheng <yingxin.cheng@intel.com>
  • Loading branch information
athanatos authored Jun 6, 2024
2 parents 437060b + 1675ce8 commit a0d71e5
Show file tree
Hide file tree
Showing 6 changed files with 198 additions and 53 deletions.
11 changes: 11 additions & 0 deletions src/crimson/common/interruptible_future.h
Original file line number Diff line number Diff line change
Expand Up @@ -1230,6 +1230,17 @@ struct interruptor
};
}

template <typename Lock, typename Func>
[[gnu::always_inline]]
static auto with_lock(Lock& lock, Func&& func) {
return seastar::with_lock(
lock,
[func=std::move(func),
interrupt_condition=interrupt_cond<InterruptCond>.interrupt_cond]() mutable {
return call_with_interruption(interrupt_condition, func);
});
}

template <typename Iterator,
InvokeReturnsInterruptibleFuture<typename Iterator::reference> AsyncAction>
[[gnu::always_inline]]
Expand Down
84 changes: 65 additions & 19 deletions src/crimson/common/tri_mutex.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,11 @@

#include <seastar/util/later.hh>

seastar::future<> read_lock::lock()
SET_SUBSYS(osd);
//TODO: SET_SUBSYS(crimson_tri_mutex);

std::optional<seastar::future<>>
read_lock::lock()
{
return static_cast<tri_mutex*>(this)->lock_for_read();
}
Expand All @@ -15,7 +19,8 @@ void read_lock::unlock()
static_cast<tri_mutex*>(this)->unlock_for_read();
}

seastar::future<> write_lock::lock()
std::optional<seastar::future<>>
write_lock::lock()
{
return static_cast<tri_mutex*>(this)->lock_for_write();
}
Expand All @@ -25,7 +30,8 @@ void write_lock::unlock()
static_cast<tri_mutex*>(this)->unlock_for_write();
}

seastar::future<> excl_lock::lock()
std::optional<seastar::future<>>
excl_lock::lock()
{
return static_cast<tri_mutex*>(this)->lock_for_excl();
}
Expand Down Expand Up @@ -57,30 +63,40 @@ void excl_lock_from_write::unlock()

tri_mutex::~tri_mutex()
{
LOG_PREFIX(tri_mutex::~tri_mutex());
DEBUGDPP("", *this);
assert(!is_acquired());
}

seastar::future<> tri_mutex::lock_for_read()
std::optional<seastar::future<>>
tri_mutex::lock_for_read()
{
LOG_PREFIX(tri_mutex::lock_for_read());
DEBUGDPP("", *this);
if (try_lock_for_read()) {
return seastar::now();
DEBUGDPP("lock_for_read successfully", *this);
return std::nullopt;
}
waiters.emplace_back(seastar::promise<>(), type_t::read);
DEBUGDPP("can't lock_for_read, adding to waiters", *this);
waiters.emplace_back(seastar::promise<>(), type_t::read, name);
return waiters.back().pr.get_future();
}

bool tri_mutex::try_lock_for_read() noexcept
{
LOG_PREFIX(tri_mutex::try_lock_for_read());
DEBUGDPP("", *this);
if (!writers && !exclusively_used && waiters.empty()) {
++readers;
return true;
} else {
return false;
}
return false;
}

void tri_mutex::unlock_for_read()
{
LOG_PREFIX(tri_mutex::unlock_for_read());
DEBUGDPP("", *this);
assert(readers > 0);
if (--readers == 0) {
wake();
Expand All @@ -89,40 +105,51 @@ void tri_mutex::unlock_for_read()

void tri_mutex::promote_from_read()
{
LOG_PREFIX(tri_mutex::promote_from_read());
DEBUGDPP("", *this);
assert(readers == 1);
--readers;
exclusively_used = true;
}

void tri_mutex::demote_to_read()
{
LOG_PREFIX(tri_mutex::demote_to_read());
DEBUGDPP("", *this);
assert(exclusively_used);
exclusively_used = false;
++readers;
}

seastar::future<> tri_mutex::lock_for_write()
std::optional<seastar::future<>>
tri_mutex::lock_for_write()
{
LOG_PREFIX(tri_mutex::lock_for_write());
DEBUGDPP("", *this);
if (try_lock_for_write()) {
return seastar::now();
DEBUGDPP("lock_for_write successfully", *this);
return std::nullopt;
}
waiters.emplace_back(seastar::promise<>(), type_t::write);
DEBUGDPP("can't lock_for_write, adding to waiters", *this);
waiters.emplace_back(seastar::promise<>(), type_t::write, name);
return waiters.back().pr.get_future();
}

bool tri_mutex::try_lock_for_write() noexcept
{
if (!readers && !exclusively_used) {
if (waiters.empty()) {
++writers;
return true;
}
LOG_PREFIX(tri_mutex::try_lock_for_write());
DEBUGDPP("", *this);
if (!readers && !exclusively_used && waiters.empty()) {
++writers;
return true;
}
return false;
}

void tri_mutex::unlock_for_write()
{
LOG_PREFIX(tri_mutex::unlock_for_write());
DEBUGDPP("", *this);
assert(writers > 0);
if (--writers == 0) {
wake();
Expand All @@ -131,30 +158,41 @@ void tri_mutex::unlock_for_write()

void tri_mutex::promote_from_write()
{
LOG_PREFIX(tri_mutex::promote_from_write());
DEBUGDPP("", *this);
assert(writers == 1);
--writers;
exclusively_used = true;
}

void tri_mutex::demote_to_write()
{
LOG_PREFIX(tri_mutex::demote_to_write());
DEBUGDPP("", *this);
assert(exclusively_used);
exclusively_used = false;
++writers;
}

// for exclusive users
seastar::future<> tri_mutex::lock_for_excl()
std::optional<seastar::future<>>
tri_mutex::lock_for_excl()
{
LOG_PREFIX(tri_mutex::lock_for_excl());
DEBUGDPP("", *this);
if (try_lock_for_excl()) {
return seastar::now();
DEBUGDPP("lock_for_excl, successfully", *this);
return std::nullopt;
}
waiters.emplace_back(seastar::promise<>(), type_t::exclusive);
DEBUGDPP("can't lock_for_excl, adding to waiters", *this);
waiters.emplace_back(seastar::promise<>(), type_t::exclusive, name);
return waiters.back().pr.get_future();
}

bool tri_mutex::try_lock_for_excl() noexcept
{
LOG_PREFIX(tri_mutex::try_lock_for_excl());
DEBUGDPP("", *this);
if (readers == 0u && writers == 0u && !exclusively_used) {
exclusively_used = true;
return true;
Expand All @@ -165,13 +203,17 @@ bool tri_mutex::try_lock_for_excl() noexcept

void tri_mutex::unlock_for_excl()
{
LOG_PREFIX(tri_mutex::unlock_for_excl());
DEBUGDPP("", *this);
assert(exclusively_used);
exclusively_used = false;
wake();
}

bool tri_mutex::is_acquired() const
{
LOG_PREFIX(tri_mutex::is_acquired());
DEBUGDPP("", *this);
if (readers != 0u) {
return true;
} else if (writers != 0u) {
Expand All @@ -185,6 +227,8 @@ bool tri_mutex::is_acquired() const

void tri_mutex::wake()
{
LOG_PREFIX(tri_mutex::wake());
DEBUGDPP("", *this);
assert(!readers && !writers && !exclusively_used);
type_t type = type_t::none;
while (!waiters.empty()) {
Expand All @@ -210,7 +254,9 @@ void tri_mutex::wake()
default:
assert(0);
}
DEBUGDPP("waking up {}", *this, waiter.waiter_name);
waiter.pr.set_value();
waiters.pop_front();
}
DEBUGDPP("no waiters", *this);
}
42 changes: 35 additions & 7 deletions src/crimson/common/tri_mutex.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,27 @@

#pragma once

#include <optional>

#include <seastar/core/future.hh>
#include <seastar/core/circular_buffer.hh>
#include "crimson/common/log.h"

class read_lock {
public:
seastar::future<> lock();
std::optional<seastar::future<>> lock();
void unlock();
};

class write_lock {
public:
seastar::future<> lock();
std::optional<seastar::future<>> lock();
void unlock();
};

class excl_lock {
public:
seastar::future<> lock();
std::optional<seastar::future<>> lock();
void unlock();
};

Expand Down Expand Up @@ -62,6 +65,11 @@ class tri_mutex : private read_lock,
{
public:
tri_mutex() = default;
#ifdef NDEBUG
tri_mutex(const std::string obj_name) : name() {}
#else
tri_mutex(const std::string obj_name) : name(obj_name) {}
#endif
~tri_mutex();

read_lock& for_read() {
Expand All @@ -81,7 +89,7 @@ class tri_mutex : private read_lock,
}

// for shared readers
seastar::future<> lock_for_read();
std::optional<seastar::future<>> lock_for_read();
bool try_lock_for_read() noexcept;
void unlock_for_read();
void promote_from_read();
Expand All @@ -91,7 +99,7 @@ class tri_mutex : private read_lock,
}

// for shared writers
seastar::future<> lock_for_write();
std::optional<seastar::future<>> lock_for_write();
bool try_lock_for_write() noexcept;
void unlock_for_write();
void promote_from_write();
Expand All @@ -101,7 +109,7 @@ class tri_mutex : private read_lock,
}

// for exclusive users
seastar::future<> lock_for_excl();
std::optional<seastar::future<>> lock_for_excl();
bool try_lock_for_excl() noexcept;
void unlock_for_excl();
bool is_excl_acquired() const {
Expand All @@ -120,6 +128,10 @@ class tri_mutex : private read_lock,
}
}

std::string_view get_name() const{
return name;
}

private:
void wake();
unsigned readers = 0;
Expand All @@ -132,17 +144,33 @@ class tri_mutex : private read_lock,
none,
};
struct waiter_t {
waiter_t(seastar::promise<>&& pr, type_t type)
waiter_t(seastar::promise<>&& pr, type_t type, std::string_view waiter_name)
: pr(std::move(pr)), type(type)
{}
seastar::promise<> pr;
type_t type;
std::string_view waiter_name;
};
seastar::circular_buffer<waiter_t> waiters;
const std::string name;
friend class read_lock;
friend class write_lock;
friend class excl_lock;
friend class excl_lock_from_read;
friend class excl_lock_from_write;
friend class excl_lock_from_excl;
friend std::ostream& operator<<(std::ostream &lhs, const tri_mutex &rhs);
};

inline std::ostream& operator<<(std::ostream& os, const tri_mutex& tm)
{
os << fmt::format("tri_mutex {} writers {} readers {}"
" exclusively_used {} waiters: {}",
tm.get_name(), tm.get_writers(), tm.get_readers(),
tm.exclusively_used, tm.waiters.size());
return os;
}

#if FMT_VERSION >= 90000
template <> struct fmt::formatter<tri_mutex> : fmt::ostream_formatter {};
#endif
Loading

0 comments on commit a0d71e5

Please sign in to comment.