Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions src/httpserver.cpp
Copy link
Contributor

@hodlinator hodlinator Feb 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Commit message suggestion for 5326821:

Given that I think the unclean shutdown is already fixed in the first commit by using util::Expected instead of throwing exceptions, I think this is somewhat more correct for the second:

http: properly respond to HTTP request during shutdown

Makes sure we respond to the client as the HTTP request attempts to submit a task to
the threadpool during server shutdown.

Roughly what happens:

1) The server receives an HTTP request and starts calling http_request_cb().
2) Meanwhile on another thread, shutdown is triggered which calls InterruptHTTPServer()
   and unregisters http_request_cb() and interrupts the thread pool.
3) The request (step 1) resumes and tries to submit a task
   to the now-interrupted server.

This fix detects failed submissions immediately, and the server
responds with HTTP_SERVICE_UNAVAILABLE.

I don't think it's necessary to classify this as a race condition. I guess it depends on how you define it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Taken, thanks!

I don't think it's necessary to classify this as a race condition. I guess it depends on how you define it.

There is a thread (shutdown - main thread) modifying a resource (thread pool) just before another thread (libevent http handler) accesses it. It is pretty much a race condition to me.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a thread (shutdown - main thread) modifying a resource (thread pool) just before another thread (libevent http handler) accesses it. It is pretty much a race condition to me.

Fair.

Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ static void http_request_cb(struct evhttp_request* req, void* arg)
}
}
}
auto hreq{std::make_unique<HTTPRequest>(req, *static_cast<const util::SignalInterrupt*>(arg))};
auto hreq{std::make_shared<HTTPRequest>(req, *static_cast<const util::SignalInterrupt*>(arg))};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remark: Was experimenting with having ThreadPool::Submit() take a "task_creator" lambda that only gets called when no error condition in Submit occurs, it would then return the inner lambda which would take ownership of the unique_ptr as on master. Even though I have a distaste for shared_ptr, your version is more elegant.


// Early address-based allow check
if (!ClientAllowed(hreq->GetPeer())) {
Expand Down Expand Up @@ -258,7 +258,7 @@ static void http_request_cb(struct evhttp_request* req, void* arg)
return;
}

auto item = [req = std::move(hreq), in_path = std::move(path), fn = i->handler]() {
auto item = [req = hreq, in_path = std::move(path), fn = i->handler]() {
std::string err_msg;
try {
fn(req.get(), in_path);
Expand All @@ -276,7 +276,13 @@ static void http_request_cb(struct evhttp_request* req, void* arg)
req->WriteReply(HTTP_INTERNAL_SERVER_ERROR, err_msg);
};

[[maybe_unused]] auto _{g_threadpool_http.Submit(std::move(item))};
if (auto res = g_threadpool_http.Submit(std::move(item)); !res.has_value()) {
Assume(hreq.use_count() == 1); // ensure request will be deleted
// Both SubmitError::Inactive and SubmitError::Interrupted mean shutdown
LogWarning("HTTP request rejected during server shutdown: '%s'", SubmitErrorString(res.error()));
hreq->WriteReply(HTTP_SERVICE_UNAVAILABLE, "Request rejected during server shutdown");
return;
}
} else {
hreq->WriteReply(HTTP_NOT_FOUND);
}
Expand Down
4 changes: 2 additions & 2 deletions src/test/fuzz/threadpool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,10 @@ FUZZ_TARGET(threadpool, .init = setup_threadpool_test) EXCLUSIVE_LOCKS_REQUIRED(
std::future<void> fut;
if (will_throw) {
expected_fail_tasks++;
fut = g_pool.Submit(ThrowTask{});
fut = *Assert(g_pool.Submit(ThrowTask{}));
} else {
expected_task_counter++;
fut = g_pool.Submit(CounterTask{task_counter});
fut = *Assert(g_pool.Submit(CounterTask{task_counter}));
}

// If caller wants to wait immediately, consume the future here (safe).
Expand Down
48 changes: 27 additions & 21 deletions src/test/threadpool_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,13 @@ BOOST_FIXTURE_TEST_SUITE(threadpool_tests, ThreadPoolFixture)
} \
} while (0)

// Helper to unwrap a valid pool submission
template <typename F>
[[nodiscard]] auto Submit(ThreadPool& pool, F&& fn)
{
return std::move(*Assert(pool.Submit(std::forward<F>(fn))));
}

// Block a number of worker threads by submitting tasks that wait on `blocker_future`.
// Returns the futures of the blocking tasks, ensuring all have started and are waiting.
std::vector<std::future<void>> BlockWorkers(ThreadPool& threadPool, const std::shared_future<void>& blocker_future, int num_of_threads_to_block)
Expand All @@ -58,7 +65,7 @@ std::vector<std::future<void>> BlockWorkers(ThreadPool& threadPool, const std::s
std::vector<std::future<void>> blocking_tasks;
for (int i = 0; i < num_of_threads_to_block; i++) {
std::promise<void>& ready = ready_promises[i];
blocking_tasks.emplace_back(threadPool.Submit([blocker_future, &ready]() {
blocking_tasks.emplace_back(Submit(threadPool, [blocker_future, &ready]() {
ready.set_value();
blocker_future.wait();
}));
Expand All @@ -73,10 +80,9 @@ std::vector<std::future<void>> BlockWorkers(ThreadPool& threadPool, const std::s
BOOST_AUTO_TEST_CASE(submit_task_before_start_fails)
{
ThreadPool threadPool(POOL_NAME);
BOOST_CHECK_EXCEPTION((void)threadPool.Submit([]{ return false; }), std::runtime_error, [&](const std::runtime_error& e) {
BOOST_CHECK_EQUAL(e.what(), "No active workers; cannot accept new tasks");
return true;
});
auto res = threadPool.Submit([]{ return false; });
BOOST_CHECK(!res);
BOOST_CHECK_EQUAL(SubmitErrorString(res.error()), "No active workers");
}

// Test 1, submit tasks and verify completion
Expand All @@ -92,7 +98,7 @@ BOOST_AUTO_TEST_CASE(submit_tasks_complete_successfully)
std::vector<std::future<void>> futures;
futures.reserve(num_tasks);
for (int i = 1; i <= num_tasks; i++) {
futures.emplace_back(threadPool.Submit([&counter, i]() {
futures.emplace_back(Submit(threadPool, [&counter, i]() {
counter.fetch_add(i, std::memory_order_relaxed);
}));
}
Expand Down Expand Up @@ -121,7 +127,7 @@ BOOST_AUTO_TEST_CASE(single_available_worker_executes_all_tasks)

// Store futures to wait on
std::vector<std::future<void>> futures(num_tasks);
for (auto& f : futures) f = threadPool.Submit([&counter]{ counter++; });
for (auto& f : futures) f = Submit(threadPool, [&counter]{ counter++; });

WAIT_FOR(futures);
BOOST_CHECK_EQUAL(counter, num_tasks);
Expand All @@ -138,7 +144,7 @@ BOOST_AUTO_TEST_CASE(wait_for_task_to_finish)
ThreadPool threadPool(POOL_NAME);
threadPool.Start(NUM_WORKERS_DEFAULT);
std::atomic<bool> flag = false;
std::future<void> future = threadPool.Submit([&flag]() {
std::future<void> future = Submit(threadPool, [&flag]() {
UninterruptibleSleep(200ms);
flag.store(true, std::memory_order_release);
});
Expand All @@ -151,10 +157,10 @@ BOOST_AUTO_TEST_CASE(get_result_from_completed_task)
{
ThreadPool threadPool(POOL_NAME);
threadPool.Start(NUM_WORKERS_DEFAULT);
std::future<bool> future_bool = threadPool.Submit([]() { return true; });
std::future<bool> future_bool = Submit(threadPool, []() { return true; });
BOOST_CHECK(future_bool.get());

std::future<std::string> future_str = threadPool.Submit([]() { return std::string("true"); });
std::future<std::string> future_str = Submit(threadPool, []() { return std::string("true"); });
std::string result = future_str.get();
BOOST_CHECK_EQUAL(result, "true");
}
Expand All @@ -170,7 +176,7 @@ BOOST_AUTO_TEST_CASE(task_exception_propagates_to_future)
std::vector<std::future<void>> futures;
futures.reserve(num_tasks);
for (int i = 0; i < num_tasks; i++) {
futures.emplace_back(threadPool.Submit([err_msg, i]() {
futures.emplace_back(Submit(threadPool, [err_msg, i]() {
throw std::runtime_error(err_msg + util::ToString(i));
}));
}
Expand All @@ -197,7 +203,7 @@ BOOST_AUTO_TEST_CASE(process_tasks_manually_when_workers_busy)
int num_tasks = 20;
std::atomic<int> counter = 0;
for (int i = 0; i < num_tasks; i++) {
(void)threadPool.Submit([&counter]() {
(void)Submit(threadPool, [&counter]() {
counter.fetch_add(1, std::memory_order_relaxed);
});
}
Expand All @@ -222,8 +228,8 @@ BOOST_AUTO_TEST_CASE(recursive_task_submission)
threadPool.Start(NUM_WORKERS_DEFAULT);

std::promise<void> signal;
(void)threadPool.Submit([&]() {
(void)threadPool.Submit([&]() {
(void)Submit(threadPool, [&]() {
(void)Submit(threadPool, [&]() {
signal.set_value();
});
});
Expand All @@ -243,7 +249,7 @@ BOOST_AUTO_TEST_CASE(task_submitted_while_busy_completes)
const auto& blocking_tasks = BlockWorkers(threadPool, blocker_future, NUM_WORKERS_DEFAULT);

// Submit an extra task that should execute once a worker is free
std::future<bool> future = threadPool.Submit([]() { return true; });
std::future<bool> future = Submit(threadPool, []() { return true; });

// At this point, all workers are blocked, and the extra task is queued
BOOST_CHECK_EQUAL(threadPool.WorkQueueSize(), 1);
Expand Down Expand Up @@ -280,7 +286,7 @@ BOOST_AUTO_TEST_CASE(congestion_more_workers_than_cores)
std::vector<std::future<void>> futures;
futures.reserve(num_tasks);
for (int i = 0; i < num_tasks; i++) {
futures.emplace_back(threadPool.Submit([&counter] {
futures.emplace_back(Submit(threadPool, [&counter] {
counter.fetch_add(1, std::memory_order_relaxed);
}));
}
Expand All @@ -296,10 +302,10 @@ BOOST_AUTO_TEST_CASE(interrupt_blocks_new_submissions)
ThreadPool threadPool(POOL_NAME);
threadPool.Start(NUM_WORKERS_DEFAULT);
threadPool.Interrupt();
BOOST_CHECK_EXCEPTION((void)threadPool.Submit([]{}), std::runtime_error, [&](const std::runtime_error& e) {
BOOST_CHECK_EQUAL(e.what(), "No active workers; cannot accept new tasks");
return true;
});

auto res = threadPool.Submit([]{});
BOOST_CHECK(!res);
BOOST_CHECK_EQUAL(SubmitErrorString(res.error()), "Interrupted");

// Reset pool
threadPool.Stop();
Expand All @@ -310,7 +316,7 @@ BOOST_AUTO_TEST_CASE(interrupt_blocks_new_submissions)
std::atomic<int> counter{0};
std::promise<void> blocker;
const auto blocking_tasks = BlockWorkers(threadPool, blocker.get_future().share(), 1);
threadPool.Submit([&threadPool, &counter]{
Submit(threadPool, [&threadPool, &counter]{
threadPool.Interrupt();
counter.fetch_add(1, std::memory_order_relaxed);
}).get();
Expand Down
47 changes: 35 additions & 12 deletions src/util/threadpool.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

#include <sync.h>
#include <tinyformat.h>
#include <util/expected.h>
#include <util/check.h>
#include <util/thread.h>

Expand All @@ -15,8 +16,8 @@
#include <functional>
#include <future>
#include <queue>
#include <stdexcept>
#include <thread>
#include <type_traits>
#include <utility>
#include <vector>

Expand Down Expand Up @@ -143,28 +144,39 @@ class ThreadPool
// Note: m_interrupt is left true until next Start()
}

enum class SubmitError {
Inactive,
Interrupted,
};

/**
* @brief Enqueues a new task for asynchronous execution.
*
* Returns a `std::future` that provides the task's result or propagates
* any exception it throws.
* Note: Ignoring the returned future requires guarding the task against
* uncaught exceptions, as they would otherwise be silently discarded.
* @param fn Callable to execute asynchronously.
* @return On success, a future containing fn's result.
* On failure, an error indicating why the task was rejected:
* - SubmitError::Inactive: Pool has no workers (never started or already stopped).
* - SubmitError::Interrupted: Pool task acceptance has been interrupted.
*
* Thread-safe: Can be called from any thread, including within the provided 'fn' callable.
*
* @warning Ignoring the returned future requires guarding the task against
* uncaught exceptions, as they would otherwise be silently discarded.
*/
template <class F> [[nodiscard]] EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
auto Submit(F&& fn)
template <class F>
[[nodiscard]] util::Expected<std::future<std::invoke_result_t<F>>, SubmitError> Submit(F&& fn) noexcept EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
{
std::packaged_task task{std::forward<F>(fn)};
std::packaged_task<std::invoke_result_t<F>()> task{std::forward<F>(fn)};
auto future{task.get_future()};
{
LOCK(m_mutex);
if (m_interrupt || m_workers.empty()) {
throw std::runtime_error("No active workers; cannot accept new tasks");
}
if (m_workers.empty()) return util::Unexpected{SubmitError::Inactive};
if (m_interrupt) return util::Unexpected{SubmitError::Interrupted};

m_work_queue.emplace(std::move(task));
}
m_cv.notify_one();
return future;
return {std::move(future)};
}

/**
Expand Down Expand Up @@ -208,4 +220,15 @@ class ThreadPool
}
};

constexpr std::string_view SubmitErrorString(const ThreadPool::SubmitError err) noexcept {
switch (err) {
case ThreadPool::SubmitError::Inactive:
return "No active workers";
case ThreadPool::SubmitError::Interrupted:
return "Interrupted";
}
Assume(false); // Unreachable
return "Unknown error";
}

#endif // BITCOIN_UTIL_THREADPOOL_H
Loading