Skip to content

Commit

Permalink
fix: unregister termination callback when AcceptServer exits (#343)
Browse files Browse the repository at this point in the history
  • Loading branch information
romange authored Nov 20, 2024
1 parent e76edd0 commit 944be56
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 14 deletions.
1 change: 1 addition & 0 deletions util/accept_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ class AcceptServer {
fb2::BlockingCounter ref_bc_; // to synchronize listener threads during the shutdown.

bool was_run_ = false;
bool break_on_int_;

uint16_t backlog_ = 128;
};
Expand Down
7 changes: 5 additions & 2 deletions util/fibers/accept_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ using namespace boost;
using namespace std;

AcceptServer::AcceptServer(ProactorPool* pool, PMR_NS::memory_resource* mr, bool break_on_int)
: pool_(pool), mr_(mr), ref_bc_(0) {
: pool_(pool), mr_(mr), ref_bc_(0), break_on_int_(break_on_int) {
if (break_on_int) {
ProactorBase* proactor = pool_->GetNextProactor();
proactor->RegisterSignal({SIGINT, SIGTERM}, [this](int signal) {
ProactorBase::RegisterSignal({SIGINT, SIGTERM}, proactor, [this](int signal) {
LOG(INFO) << "Exiting on signal " << strsignal(signal);
if (on_break_hook_) {
on_break_hook_();
Expand All @@ -32,6 +32,9 @@ AcceptServer::AcceptServer(ProactorPool* pool, PMR_NS::memory_resource* mr, bool
}

AcceptServer::~AcceptServer() {
if (break_on_int_) {
ProactorBase::ClearSignal({SIGINT, SIGTERM});
}
list_interface_.clear();
}

Expand Down
5 changes: 3 additions & 2 deletions util/fibers/proactor_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,8 @@ void ProactorBase::Migrate(ProactorBase* dest) {
});
}

void ProactorBase::RegisterSignal(std::initializer_list<uint16_t> l, std::function<void(int)> cb) {
void ProactorBase::RegisterSignal(std::initializer_list<uint16_t> l, ProactorBase* proactor,
std::function<void(int)> cb) {
auto* state = get_signal_state();

struct sigaction sa;
Expand All @@ -243,7 +244,7 @@ void ProactorBase::RegisterSignal(std::initializer_list<uint16_t> l, std::functi
for (uint16_t val : l) {
CHECK(!state->signal_map[val].cb) << "Signal " << val << " was already registered";
state->signal_map[val].cb = cb;
state->signal_map[val].proactor = this;
state->signal_map[val].proactor = proactor;

CHECK_EQ(0, sigaction(val, &sa, NULL));
}
Expand Down
19 changes: 9 additions & 10 deletions util/fibers/proactor_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,11 @@ class ProactorBase {
return tl_info_.owner;
}

void RegisterSignal(std::initializer_list<uint16_t> l, std::function<void(int)> cb);
static void RegisterSignal(std::initializer_list<uint16_t> l, ProactorBase* proactor,
std::function<void(int)> cb);

void ClearSignal(std::initializer_list<uint16_t> l) {
RegisterSignal(l, nullptr);
static void ClearSignal(std::initializer_list<uint16_t> l) {
RegisterSignal(l, nullptr, nullptr);
}

// Returns an approximate (cached) time with nano-sec granularity.
Expand Down Expand Up @@ -253,7 +254,6 @@ class ProactorBase {
return absl::GetCurrentTimeNanos();
}


// Returns true if we should poll scheduler tasks that run periodically but not too often.
bool ShouldPollL2Tasks() const;

Expand All @@ -265,12 +265,12 @@ class ProactorBase {
static uint64_t GetCPUCycleCount() {
#if defined(__x86_64__)
uint64_t low, high;
__asm__ volatile("rdtsc" : "=a"(low), "=d"(high));
return static_cast<int64_t>((high << 32) | low);
__asm__ volatile("rdtsc" : "=a"(low), "=d"(high));
return static_cast<int64_t>((high << 32) | low);
#elif defined(__aarch64__)
int64_t tv;
asm volatile("mrs %0, cntvct_el0" : "=r"(tv));
return tv;
int64_t tv;
asm volatile("mrs %0, cntvct_el0" : "=r"(tv));
return tv;
#else
return absl::base_internal::CycleClock::Now();
#endif
Expand Down Expand Up @@ -367,7 +367,6 @@ inline void ProactorBase::WakeupIfNeeded() {
// memory_order_acq_rel until further notice.
auto current = tq_seq_.fetch_add(2, std::memory_order_acq_rel);
if (current == WAIT_SECTION_STATE) {

// We protect WakeRing using tq_seq_. That means only one thread at a time
// can enter here. Moreover tq_seq_ == WAIT_SECTION_STATE only when
// proactor enters WAIT section, therefore we do not race over SQE ring with proactor thread.
Expand Down

0 comments on commit 944be56

Please sign in to comment.