Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove unnecessary mutexes in RPC server #646

Merged
merged 3 commits into from
Apr 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions cmd/backend_kv_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -203,10 +203,7 @@ int main(int argc, char* argv[]) {
signals.async_wait([&](const boost::system::error_code& error, int signal_number) {
std::cout << "\n";
SILK_INFO << "Signal caught, error: " << error << " number: " << signal_number;
std::thread shutdown_thread{[&server]() {
server.shutdown();
}};
shutdown_thread.detach();
server.shutdown();
});

SILK_LOG << "BackEndKvServer is now running [pid=" << pid << ", main thread=" << tid << "]";
Expand Down
113 changes: 54 additions & 59 deletions node/silkworm/rpc/server.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
#define SILKWORM_RPC_SERVER_HPP_

#include <memory>
#include <mutex>
#include <vector>

#include <grpcpp/grpcpp.h>
Expand Down Expand Up @@ -49,46 +48,45 @@ class Server {
//! Build the RPC server according to its configuration.
void build_and_start() {
SILK_TRACE << "Server::build_and_start " << this << " START";
{
std::unique_lock<std::mutex> lock(mutex_);
if (shutdown_) {
SILK_TRACE << "Server::build_and_start " << this << " already shut down END";
return;
}

grpc::ServerBuilder builder;

// Disable SO_REUSEPORT socket option to obtain "address already in use" on Windows.
builder.AddChannelArgument(GRPC_ARG_ALLOW_REUSEPORT, 0);

// Add the local endpoint to bind the RPC server to (selected_port will be set *after* BuildAndStart call).
int selected_port;
builder.AddListeningPort(config_.address_uri(), config_.credentials(), &selected_port);

// Add one server-side gRPC completion queue for each execution context.
for (std::size_t i{0}; i < config_.num_contexts(); ++i) {
context_pool_.add_context(builder.AddCompletionQueue());
}

// gRPC async model requires the server to register one responded call for each RPC in advance.
SILK_DEBUG << "Server " << this << " registering async services";
register_async_services(builder);

server_ = builder.BuildAndStart();
SILK_DEBUG << "Server " << this << " bound at selected port: " << selected_port;
if (server_ == nullptr) {
SILK_ERROR << "Server " << this << " BuildAndStart failed [" << config_.address_uri() << "]";
throw std::runtime_error("cannot start gRPC server at " + config_.address_uri());
}

// gRPC async model requires the server to register one request call for each RPC in advance.
SILK_DEBUG << "Server " << this << " registering request calls";
register_request_calls();

// Start the server execution: the context pool will spawn the context threads.
SILK_DEBUG << "Server " << this << " starting execution loop";
context_pool_.start();

if (shutdown_) {
SILK_TRACE << "Server::build_and_start " << this << " already shut down END";
return;
}

grpc::ServerBuilder builder;

// Disable SO_REUSEPORT socket option to obtain "address already in use" on Windows.
builder.AddChannelArgument(GRPC_ARG_ALLOW_REUSEPORT, 0);

// Add the local endpoint to bind the RPC server to (selected_port will be set *after* BuildAndStart call).
int selected_port;
builder.AddListeningPort(config_.address_uri(), config_.credentials(), &selected_port);

// Add one server-side gRPC completion queue for each execution context.
for (std::size_t i{0}; i < config_.num_contexts(); ++i) {
context_pool_.add_context(builder.AddCompletionQueue());
}

// gRPC async model requires the server to register one responded call for each RPC in advance.
SILK_DEBUG << "Server " << this << " registering async services";
register_async_services(builder);

server_ = builder.BuildAndStart();
SILK_DEBUG << "Server " << this << " bound at selected port: " << selected_port;
if (server_ == nullptr) {
SILK_ERROR << "Server " << this << " BuildAndStart failed [" << config_.address_uri() << "]";
throw std::runtime_error("cannot start gRPC server at " + config_.address_uri());
}

// gRPC async model requires the server to register one request call for each RPC in advance.
SILK_DEBUG << "Server " << this << " registering request calls";
register_request_calls();

// Start the server execution: the context pool will spawn the context threads.
SILK_DEBUG << "Server " << this << " starting execution loop";
context_pool_.start();

SILK_TRACE << "Server::build_and_start " << this << " END";
}

Expand All @@ -102,26 +100,26 @@ class Server {
//! Stop this Server instance forever. Any subsequent call to \ref build_and_start() has not effect.
void shutdown() {
SILK_TRACE << "Server::shutdown " << this << " START";
{
std::lock_guard<std::mutex> guard(mutex_);
if (shutdown_) {
SILK_TRACE << "Server::shutdown " << this << " already shut down END";
return;
}
shutdown_ = true;

SILK_DEBUG << "Server::shutdown " << this << " shutting down server immediately";

// Order matters here: 1) shutdown the server (immediate deadline)
if (server_) {
server_->Shutdown(gpr_time_0(GPR_CLOCK_REALTIME));
}
if (shutdown_) {
SILK_TRACE << "Server::shutdown " << this << " already shut down END";
return;
}
shutdown_ = true;

SILK_DEBUG << "Server::shutdown " << this << " stopping context pool";
SILK_DEBUG << "Server::shutdown " << this << " shutting down server immediately";

// Order matters here: 2) shutdown and drain the queues
context_pool_.stop();
// Order matters here: 1) shutdown the server (immediate deadline)
if (server_) {
server_->Shutdown(gpr_time_0(GPR_CLOCK_REALTIME));
server_->Wait();
}

SILK_DEBUG << "Server::shutdown " << this << " stopping context pool";

// Order matters here: 2) shutdown and drain the queues
context_pool_.stop();

SILK_TRACE << "Server::shutdown " << this << " END";
}

Expand Down Expand Up @@ -151,9 +149,6 @@ class Server {
//! Pool of server schedulers used to run the execution loops.
ServerContextPool context_pool_;

//! Mutual exclusion to synchronize run/shutdown operations.
std::mutex mutex_;

bool shutdown_{false};
};

Expand Down
2 changes: 0 additions & 2 deletions node/silkworm/rpc/server_context_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ void ServerContextPool::add_context(std::unique_ptr<grpc::ServerCompletionQueue>
void ServerContextPool::start() {
SILK_TRACE << "ServerContextPool::start START";

std::unique_lock<std::mutex> lock(mutex_);
if (!stopped_) {
// Create a pool of threads to run all the contexts (each context having 1 thread)
for (std::size_t i{0}; i < contexts_.size(); ++i) {
Expand Down Expand Up @@ -113,7 +112,6 @@ void ServerContextPool::join() {
void ServerContextPool::stop() {
SILK_TRACE << "ServerContextPool::stop START";

std::lock_guard<std::mutex> guard(mutex_);
if (!stopped_) {
// Explicitly stop all context runnable components
for (std::size_t i{0}; i < contexts_.size(); ++i) {
Expand Down
4 changes: 0 additions & 4 deletions node/silkworm/rpc/server_context_pool.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
#include <ostream>
#include <list>
#include <memory>
#include <mutex>
#include <vector>

#include <boost/asio/io_context.hpp>
Expand Down Expand Up @@ -97,9 +96,6 @@ class ServerContextPool {
//! The index for obtaining next context to use (round-robin).
std::size_t next_index_;

//! Mutual exclusion to synchronize run/stop operations.
std::mutex mutex_;

//! Flag indicating if pool has been stopped.
bool stopped_{false};
};
Expand Down
12 changes: 12 additions & 0 deletions node/silkworm/rpc/server_context_pool_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,4 +84,16 @@ TEST_CASE("ServerContext", "[silkworm][rpc][server_context]") {
}
}

TEST_CASE("ServerContextPool", "[silkworm][rpc][server_context]") {
grpc::ServerBuilder builder;
ServerContextPool server_context_pool{2};

SECTION("ServerContextPool") {
REQUIRE(server_context_pool.num_contexts() == 0);
server_context_pool.add_context(builder.AddCompletionQueue());
server_context_pool.add_context(builder.AddCompletionQueue());
CHECK(server_context_pool.num_contexts() == 2);
}
}

} // namespace silkworm::rpc