Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add initial KV Tx RPC implementation #626

Merged
merged 50 commits into from
Apr 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
ea2ca0a
Add KV Tx RPC implementation
canepat Apr 1, 2022
cf188ae
Finish RPC with error when idle timer expires
canepat Apr 1, 2022
171ab86
Add implementation for some Tx RPC cursor operations
canepat Apr 1, 2022
915dfae
Add Tx RPC unit test for cursor FIRST
canepat Apr 2, 2022
369837a
Refactor BackEndKvServer unit tests
canepat Apr 3, 2022
56ede10
Add BackEndKvServer unit tests
canepat Apr 3, 2022
000ada5
Try to fix gRPC data race
canepat Apr 4, 2022
e432953
Fix gRPC data race in unit tests: try 2
canepat Apr 4, 2022
71fa8df
Fix gRPC data race in unit tests: try 3
canepat Apr 4, 2022
1920a41
Fix gRPC data race in unit tests: try 4
canepat Apr 4, 2022
df07efe
Fix gRPC data race in unit tests: try 5
canepat Apr 4, 2022
ac62703
Fix gRPC data race in unit tests: try 6
canepat Apr 4, 2022
24a6225
Fix gRPC data race in unit tests: try 7
canepat Apr 4, 2022
c2347cc
Refactor bidi streaming RPC close to avoid double finish
canepat Apr 5, 2022
0f0462b
Change Sentry mock server from sync to async
canepat Apr 6, 2022
43598ce
Add missing KV Tx cursor operations
canepat Apr 6, 2022
586650e
Add unit tests
canepat Apr 6, 2022
bc8fe65
Remove unused server RPC method
canepat Apr 7, 2022
655ce78
Configure max idle timeout for bidirectional RPC
canepat Apr 7, 2022
25b81bc
Configure max TTL duration in Tx RPC
canepat Apr 7, 2022
cbedefb
Add error handling for cursor operations in Tx RPC
canepat Apr 8, 2022
4dcf7f1
Comment out FIRST_DUP unit tests
canepat Apr 8, 2022
91db211
Comment out sleep_for in max TTL unit test
canepat Apr 8, 2022
92ea7c6
Print message to diagnose max TTL error on Windows
canepat Apr 8, 2022
e610c13
Readd one FIRST_DUP unit test
canepat Apr 9, 2022
5458264
Print MDBX results for debugging
canepat Apr 9, 2022
264cf32
Hand-made print
canepat Apr 9, 2022
b5c00f0
Change restore for single-value cursors in Tx RPC
canepat Apr 9, 2022
84875c9
Add unit test
canepat Apr 9, 2022
6ae13a3
Add unit test
canepat Apr 10, 2022
18cc285
Fix FIRST_DUP implementation
canepat Apr 11, 2022
b073fdd
Fix restore for DUPSORT cursor
canepat Apr 11, 2022
f77e452
Fix NEXT_DUP unit test
canepat Apr 11, 2022
57b97b1
Comment out unit test
canepat Apr 11, 2022
0d389a3
Uncomment out unit test w/o sleep
canepat Apr 11, 2022
547a38c
Add trace log in restore
canepat Apr 12, 2022
a792429
Reinsert delay in max TTL unit test
canepat Apr 12, 2022
ea14e63
Add print
canepat Apr 12, 2022
faefad2
Add PREV_DUP and PREV_NO_DUP cursor operations
canepat Apr 12, 2022
dfae185
Rebind existent cursors in restore
canepat Apr 12, 2022
e687b3c
Add unit test for exceeded max readers
canepat Apr 13, 2022
9787a67
Add unit test triggering server-side write error
canepat Apr 13, 2022
45859b5
Add unit tests for CURRENT cursor operation
canepat Apr 13, 2022
69c43f0
Add unit tests
canepat Apr 13, 2022
20acf74
Comment out unit test
canepat Apr 13, 2022
07b01ac
Retry test for server-side write error
canepat Apr 13, 2022
8af28d0
Add unit tests
canepat Apr 13, 2022
cbb8cbf
Convert unfiltered logs to debug logs
canepat Apr 13, 2022
3eb6503
Avoid unnecessary copies when sending responses
canepat Apr 19, 2022
d90a87e
Merge branch 'master' into backend_kv_server6
canepat Apr 19, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion cmd/backend_kv_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,13 @@ int parse_command_line(int argc, char* argv[], BackEndKvSettings& settings) {
std::string data_dir{silkworm::DataDirectory::get_default_storage_path().string()};
std::string etherbase_address{""};
uint32_t num_contexts{std::thread::hardware_concurrency() / 2};
uint32_t max_readers{silkworm::db::EnvConfig{}.max_readers};
app.add_option("--datadir", data_dir, "The path to data directory", true);
app.add_option("--etherbase", etherbase_address, "The chain identifier as string", true);
// TODO(canepat) add check on etherbase using EthAddressValidator [TBD]
app.add_option("--numContexts", num_contexts, "The number of running contexts", true);
app.add_option("--contexts", num_contexts, "The number of running contexts", true);
app.add_option("--mdbx.max.readers", max_readers, "The maximum number of MDBX readers", true)
->check(CLI::Range(1, 32767));

// RPC Server options
app.add_option("--private.api.addr", node_settings.private_api_addr,
Expand Down Expand Up @@ -147,6 +150,7 @@ int parse_command_line(int argc, char* argv[], BackEndKvSettings& settings) {
/*create=*/false,
/*readonly=*/true
};
node_settings.chaindata_env_config.max_readers = max_readers;

server_settings.set_address_uri(node_settings.private_api_addr);
server_settings.set_num_contexts(num_contexts);
Expand Down Expand Up @@ -187,6 +191,8 @@ int main(int argc, char* argv[]) {
silkworm::EthereumBackEnd backend{node_settings, &database_env};
backend.set_node_name(node_name);

SILK_INFO << "BackEndKvServer MDBX max readers: " << database_env.max_readers();

silkworm::rpc::BackEndKvServer server{server_settings, backend};
server.build_and_start();

Expand Down
2 changes: 1 addition & 1 deletion node/silkworm/backend/ethereum_backend.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

namespace silkworm {

EthereumBackEnd::EthereumBackEnd(const NodeSettings& node_settings, mdbx::env_managed* chaindata_env)
EthereumBackEnd::EthereumBackEnd(const NodeSettings& node_settings, mdbx::env* chaindata_env)
: node_settings_(node_settings), chaindata_env_(chaindata_env) {
// Get the numeric chain identifier from node settings
if (node_settings_.chain_config) {
Expand Down
6 changes: 3 additions & 3 deletions node/silkworm/backend/ethereum_backend.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,12 @@ constexpr const char kSentryAddressDelimiter{','};

class EthereumBackEnd {
public:
explicit EthereumBackEnd(const NodeSettings& node_settings, mdbx::env_managed* chaindata_env);
explicit EthereumBackEnd(const NodeSettings& node_settings, mdbx::env* chaindata_env);

EthereumBackEnd(const EthereumBackEnd&) = delete;
EthereumBackEnd& operator=(const EthereumBackEnd&) = delete;

mdbx::env_managed* chaindata_env() const noexcept { return chaindata_env_; }
mdbx::env* chaindata_env() const noexcept { return chaindata_env_; }
const std::string& node_name() const noexcept { return node_name_; }
std::optional<uint64_t> chain_id() const noexcept { return chain_id_; }
std::optional<evmc::address> etherbase() const noexcept { return node_settings_.etherbase; }
Expand All @@ -47,7 +47,7 @@ class EthereumBackEnd {

private:
const NodeSettings& node_settings_;
mdbx::env_managed* chaindata_env_;
mdbx::env* chaindata_env_;
std::string node_name_{kDefaultNodeName};
std::optional<uint64_t> chain_id_{std::nullopt};
std::vector<std::string> sentry_addresses_;
Expand Down
50 changes: 25 additions & 25 deletions node/silkworm/rpc/backend_calls.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ void EtherbaseCall::fill_predefined_reply(const EthereumBackEnd& backend) {
}
}

EtherbaseCall::EtherbaseCall(remote::ETHBACKEND::AsyncService* service, grpc::ServerCompletionQueue* queue, Handlers handlers)
: UnaryRpc<remote::ETHBACKEND::AsyncService, remote::EtherbaseRequest, remote::EtherbaseReply>(service, queue, handlers) {
EtherbaseCall::EtherbaseCall(boost::asio::io_context& scheduler, remote::ETHBACKEND::AsyncService* service, grpc::ServerCompletionQueue* queue, Handlers handlers)
: UnaryRpc<remote::ETHBACKEND::AsyncService, remote::EtherbaseRequest, remote::EtherbaseReply>(scheduler, service, queue, handlers) {
}

void EtherbaseCall::process(const remote::EtherbaseRequest* request) {
Expand Down Expand Up @@ -82,8 +82,8 @@ void NetVersionCall::fill_predefined_reply(const EthereumBackEnd& backend) {
}
}

NetVersionCall::NetVersionCall(remote::ETHBACKEND::AsyncService* service, grpc::ServerCompletionQueue* queue, Handlers handlers)
: UnaryRpc<remote::ETHBACKEND::AsyncService, remote::NetVersionRequest, remote::NetVersionReply>(service, queue, handlers) {
NetVersionCall::NetVersionCall(boost::asio::io_context& scheduler, remote::ETHBACKEND::AsyncService* service, grpc::ServerCompletionQueue* queue, Handlers handlers)
: UnaryRpc<remote::ETHBACKEND::AsyncService, remote::NetVersionRequest, remote::NetVersionReply>(scheduler, service, queue, handlers) {
}

void NetVersionCall::process(const remote::NetVersionRequest* request) {
Expand All @@ -109,8 +109,8 @@ void NetPeerCountCall::remove_sentry(SentryClient* sentry) {
NetPeerCountCall::sentries_.erase(sentry);
}

NetPeerCountCall::NetPeerCountCall(remote::ETHBACKEND::AsyncService* service, grpc::ServerCompletionQueue* queue, Handlers handlers)
: UnaryRpc<remote::ETHBACKEND::AsyncService, remote::NetPeerCountRequest, remote::NetPeerCountReply>(service, queue, handlers) {
NetPeerCountCall::NetPeerCountCall(boost::asio::io_context& scheduler, remote::ETHBACKEND::AsyncService* service, grpc::ServerCompletionQueue* queue, Handlers handlers)
: UnaryRpc<remote::ETHBACKEND::AsyncService, remote::NetPeerCountRequest, remote::NetPeerCountReply>(scheduler, service, queue, handlers) {
}

void NetPeerCountCall::process(const remote::NetPeerCountRequest* request) {
Expand Down Expand Up @@ -169,8 +169,8 @@ void BackEndVersionCall::fill_predefined_reply() {
BackEndVersionCall::response_.set_patch(std::get<2>(kEthBackEndApiVersion));
}

BackEndVersionCall::BackEndVersionCall(remote::ETHBACKEND::AsyncService* service, grpc::ServerCompletionQueue* queue, Handlers handlers)
: UnaryRpc<remote::ETHBACKEND::AsyncService, google::protobuf::Empty, types::VersionReply>(service, queue, handlers) {
BackEndVersionCall::BackEndVersionCall(boost::asio::io_context& scheduler, remote::ETHBACKEND::AsyncService* service, grpc::ServerCompletionQueue* queue, Handlers handlers)
: UnaryRpc<remote::ETHBACKEND::AsyncService, google::protobuf::Empty, types::VersionReply>(scheduler, service, queue, handlers) {
}

void BackEndVersionCall::process(const google::protobuf::Empty* request) {
Expand All @@ -192,8 +192,8 @@ void ProtocolVersionCall::fill_predefined_reply() {
ProtocolVersionCall::response_.set_id(kEthDevp2pProtocolVersion);
}

ProtocolVersionCall::ProtocolVersionCall(remote::ETHBACKEND::AsyncService* service, grpc::ServerCompletionQueue* queue, Handlers handlers)
: UnaryRpc<remote::ETHBACKEND::AsyncService, remote::ProtocolVersionRequest, remote::ProtocolVersionReply>(service, queue, handlers) {
ProtocolVersionCall::ProtocolVersionCall(boost::asio::io_context& scheduler, remote::ETHBACKEND::AsyncService* service, grpc::ServerCompletionQueue* queue, Handlers handlers)
: UnaryRpc<remote::ETHBACKEND::AsyncService, remote::ProtocolVersionRequest, remote::ProtocolVersionReply>(scheduler, service, queue, handlers) {
}

void ProtocolVersionCall::process(const remote::ProtocolVersionRequest* request) {
Expand All @@ -215,8 +215,8 @@ void ClientVersionCall::fill_predefined_reply(const EthereumBackEnd& backend) {
ClientVersionCall::response_.set_nodename(backend.node_name());
}

ClientVersionCall::ClientVersionCall(remote::ETHBACKEND::AsyncService* service, grpc::ServerCompletionQueue* queue, Handlers handlers)
: UnaryRpc<remote::ETHBACKEND::AsyncService, remote::ClientVersionRequest, remote::ClientVersionReply>(service, queue, handlers) {
ClientVersionCall::ClientVersionCall(boost::asio::io_context& scheduler, remote::ETHBACKEND::AsyncService* service, grpc::ServerCompletionQueue* queue, Handlers handlers)
: UnaryRpc<remote::ETHBACKEND::AsyncService, remote::ClientVersionRequest, remote::ClientVersionReply>(scheduler, service, queue, handlers) {
}

void ClientVersionCall::process(const remote::ClientVersionRequest* request) {
Expand All @@ -232,8 +232,8 @@ ClientVersionCallFactory::ClientVersionCallFactory(const EthereumBackEnd& backen
ClientVersionCall::fill_predefined_reply(backend);
}

SubscribeCall::SubscribeCall(remote::ETHBACKEND::AsyncService* service, grpc::ServerCompletionQueue* queue, Handlers handlers)
: ServerStreamingRpc<remote::ETHBACKEND::AsyncService, remote::SubscribeRequest, remote::SubscribeReply>(service, queue, handlers) {
SubscribeCall::SubscribeCall(boost::asio::io_context& scheduler, remote::ETHBACKEND::AsyncService* service, grpc::ServerCompletionQueue* queue, Handlers handlers)
: ServerStreamingRpc<remote::ETHBACKEND::AsyncService, remote::SubscribeRequest, remote::SubscribeReply>(scheduler, service, queue, handlers) {
}

void SubscribeCall::process(const remote::SubscribeRequest* request) {
Expand Down Expand Up @@ -268,8 +268,8 @@ void NodeInfoCall::remove_sentry(SentryClient* sentry) {
NodeInfoCall::sentries_.erase(sentry);
}

NodeInfoCall::NodeInfoCall(remote::ETHBACKEND::AsyncService* service, grpc::ServerCompletionQueue* queue, Handlers handlers)
: UnaryRpc<remote::ETHBACKEND::AsyncService, remote::NodesInfoRequest, remote::NodesInfoReply>(service, queue, handlers) {
NodeInfoCall::NodeInfoCall(boost::asio::io_context& scheduler, remote::ETHBACKEND::AsyncService* service, grpc::ServerCompletionQueue* queue, Handlers handlers)
: UnaryRpc<remote::ETHBACKEND::AsyncService, remote::NodesInfoRequest, remote::NodesInfoReply>(scheduler, service, queue, handlers) {
}

void NodeInfoCall::process(const remote::NodesInfoRequest* request) {
Expand Down Expand Up @@ -316,16 +316,16 @@ BackEndService::BackEndService(const EthereumBackEnd& backend)
: etherbase_factory_{backend}, net_version_factory_{backend}, client_version_factory_{backend} {
}

void BackEndService::register_backend_request_calls(remote::ETHBACKEND::AsyncService* async_service, grpc::ServerCompletionQueue* queue) {
void BackEndService::register_backend_request_calls(boost::asio::io_context& scheduler, remote::ETHBACKEND::AsyncService* async_service, grpc::ServerCompletionQueue* queue) {
// Register one requested call for each RPC factory
etherbase_factory_.create_rpc(async_service, queue);
net_version_factory_.create_rpc(async_service, queue);
net_peer_count_factory_.create_rpc(async_service, queue);
backend_version_factory_.create_rpc(async_service, queue);
protocol_version_factory_.create_rpc(async_service, queue);
client_version_factory_.create_rpc(async_service, queue);
subscribe_factory_.create_rpc(async_service, queue);
node_info_factory_.create_rpc(async_service, queue);
etherbase_factory_.create_rpc(scheduler, async_service, queue);
net_version_factory_.create_rpc(scheduler, async_service, queue);
net_peer_count_factory_.create_rpc(scheduler, async_service, queue);
backend_version_factory_.create_rpc(scheduler, async_service, queue);
protocol_version_factory_.create_rpc(scheduler, async_service, queue);
client_version_factory_.create_rpc(scheduler, async_service, queue);
subscribe_factory_.create_rpc(scheduler, async_service, queue);
node_info_factory_.create_rpc(scheduler, async_service, queue);
}

void BackEndService::add_sentry(std::unique_ptr<SentryClient>&& sentry) {
Expand Down
18 changes: 9 additions & 9 deletions node/silkworm/rpc/backend_calls.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class EtherbaseCall : public UnaryRpc<remote::ETHBACKEND::AsyncService, remote::
public:
static void fill_predefined_reply(const EthereumBackEnd& backend);

EtherbaseCall(remote::ETHBACKEND::AsyncService* service, grpc::ServerCompletionQueue* queue, Handlers handlers);
EtherbaseCall(boost::asio::io_context& scheduler, remote::ETHBACKEND::AsyncService* service, grpc::ServerCompletionQueue* queue, Handlers handlers);

void process(const remote::EtherbaseRequest* request) override;

Expand All @@ -68,7 +68,7 @@ class NetVersionCall : public UnaryRpc<remote::ETHBACKEND::AsyncService, remote:
public:
static void fill_predefined_reply(const EthereumBackEnd& backend);

NetVersionCall(remote::ETHBACKEND::AsyncService* service, grpc::ServerCompletionQueue* queue, Handlers handlers);
NetVersionCall(boost::asio::io_context& scheduler, remote::ETHBACKEND::AsyncService* service, grpc::ServerCompletionQueue* queue, Handlers handlers);

void process(const remote::NetVersionRequest* request) override;

Expand All @@ -88,7 +88,7 @@ class NetPeerCountCall : public UnaryRpc<remote::ETHBACKEND::AsyncService, remot
static void add_sentry(SentryClient* sentry);
static void remove_sentry(SentryClient* sentry);

NetPeerCountCall(remote::ETHBACKEND::AsyncService* service, grpc::ServerCompletionQueue* queue, Handlers handlers);
NetPeerCountCall(boost::asio::io_context& scheduler, remote::ETHBACKEND::AsyncService* service, grpc::ServerCompletionQueue* queue, Handlers handlers);

void process(const remote::NetPeerCountRequest* request) override;

Expand All @@ -111,7 +111,7 @@ class BackEndVersionCall : public UnaryRpc<remote::ETHBACKEND::AsyncService, goo
public:
static void fill_predefined_reply();

BackEndVersionCall(remote::ETHBACKEND::AsyncService* service, grpc::ServerCompletionQueue* queue, Handlers handlers);
BackEndVersionCall(boost::asio::io_context& scheduler, remote::ETHBACKEND::AsyncService* service, grpc::ServerCompletionQueue* queue, Handlers handlers);

void process(const google::protobuf::Empty* request) override;

Expand All @@ -130,7 +130,7 @@ class ProtocolVersionCall : public UnaryRpc<remote::ETHBACKEND::AsyncService, re
public:
static void fill_predefined_reply();

ProtocolVersionCall(remote::ETHBACKEND::AsyncService* service, grpc::ServerCompletionQueue* queue, Handlers handlers);
ProtocolVersionCall(boost::asio::io_context& scheduler, remote::ETHBACKEND::AsyncService* service, grpc::ServerCompletionQueue* queue, Handlers handlers);

void process(const remote::ProtocolVersionRequest* request) override;

Expand All @@ -149,7 +149,7 @@ class ClientVersionCall : public UnaryRpc<remote::ETHBACKEND::AsyncService, remo
public:
static void fill_predefined_reply(const EthereumBackEnd& backend);

ClientVersionCall(remote::ETHBACKEND::AsyncService* service, grpc::ServerCompletionQueue* queue, Handlers handlers);
ClientVersionCall(boost::asio::io_context& scheduler, remote::ETHBACKEND::AsyncService* service, grpc::ServerCompletionQueue* queue, Handlers handlers);

void process(const remote::ClientVersionRequest* request) override;

Expand All @@ -166,7 +166,7 @@ class ClientVersionCallFactory : public CallFactory<remote::ETHBACKEND::AsyncSer
//! Server-streaming RPC for Subscribe method of 'ethbackend' gRPC protocol.
class SubscribeCall : public ServerStreamingRpc<remote::ETHBACKEND::AsyncService, remote::SubscribeRequest, remote::SubscribeReply> {
public:
SubscribeCall(remote::ETHBACKEND::AsyncService* service, grpc::ServerCompletionQueue* queue, Handlers handlers);
SubscribeCall(boost::asio::io_context& scheduler, remote::ETHBACKEND::AsyncService* service, grpc::ServerCompletionQueue* queue, Handlers handlers);

void process(const remote::SubscribeRequest* request) override;
};
Expand All @@ -183,7 +183,7 @@ class NodeInfoCall : public UnaryRpc<remote::ETHBACKEND::AsyncService, remote::N
static void add_sentry(SentryClient* sentry);
static void remove_sentry(SentryClient* sentry);

NodeInfoCall(remote::ETHBACKEND::AsyncService* service, grpc::ServerCompletionQueue* queue, Handlers handlers);
NodeInfoCall(boost::asio::io_context& scheduler, remote::ETHBACKEND::AsyncService* service, grpc::ServerCompletionQueue* queue, Handlers handlers);

void process(const remote::NodesInfoRequest* request) override;

Expand All @@ -207,7 +207,7 @@ struct BackEndService {
explicit BackEndService(const EthereumBackEnd& backend);
~BackEndService();

void register_backend_request_calls(remote::ETHBACKEND::AsyncService* async_service, grpc::ServerCompletionQueue* queue);
void register_backend_request_calls(boost::asio::io_context& scheduler, remote::ETHBACKEND::AsyncService* async_service, grpc::ServerCompletionQueue* queue);

void add_sentry(std::unique_ptr<SentryClient>&& sentry);

Expand Down
7 changes: 4 additions & 3 deletions node/silkworm/rpc/backend_kv_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ void BackEndKvServer::register_request_calls() {
// Start one server-side RPC request for each available server context
for (auto& backend_kv_svc : backend_kv_services_) {
const auto& server_context = next_context();
const auto server_queue = server_context.server_queue();
const auto client_queue = server_context.client_queue();

// Complete the service initialization
Expand All @@ -55,8 +54,10 @@ void BackEndKvServer::register_request_calls() {
}

// Register initial requested calls for ETHBACKEND and KV services
backend_kv_svc->register_backend_request_calls(&backend_async_service_, server_queue);
backend_kv_svc->register_kv_request_calls(&kv_async_service_, server_queue);
const auto io_context = server_context.io_context();
const auto server_queue = server_context.server_queue();
backend_kv_svc->register_backend_request_calls(*io_context, &backend_async_service_, server_queue);
backend_kv_svc->register_kv_request_calls(*io_context, &kv_async_service_, server_queue);
}
}

Expand Down
Loading