Skip to content

Commit

Permalink
raft service: make one way raft messages truly one way
Browse files Browse the repository at this point in the history
Raft core does not expect replies for most messages it sends, but they
are defined as two way by the IDL currently. Fix them to be one way.
  • Loading branch information
Gleb Natapov committed Jan 13, 2022
1 parent b1fea20 commit c500a90
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 14 deletions.
14 changes: 7 additions & 7 deletions idl/raft.idl.hh
Original file line number Diff line number Diff line change
Expand Up @@ -102,13 +102,13 @@ struct entry_id {
};

verb [[with_client_info, with_timeout]] raft_send_snapshot (raft::group_id, raft::server_id from_id, raft::server_id dst_id, raft::install_snapshot) -> raft::snapshot_reply;
verb [[with_client_info, with_timeout]] raft_append_entries (raft::group_id, raft::server_id from_id, raft::server_id dst_id, raft::append_request);
verb [[with_client_info, with_timeout]] raft_append_entries_reply (raft::group_id, raft::server_id from_id, raft::server_id dst_id, raft::append_reply);
verb [[with_client_info, with_timeout]] raft_vote_request (raft::group_id, raft::server_id from_id, raft::server_id dst_id, raft::vote_request);
verb [[with_client_info, with_timeout]] raft_vote_reply (raft::group_id, raft::server_id from_id, raft::server_id dst_id, raft::vote_reply);
verb [[with_client_info, with_timeout]] raft_timeout_now (raft::group_id, raft::server_id from_id, raft::server_id dst_id, raft::timeout_now);
verb [[with_client_info, with_timeout]] raft_read_quorum (raft::group_id, raft::server_id from_id, raft::server_id dst_id, raft::read_quorum);
verb [[with_client_info, with_timeout]] raft_read_quorum_reply (raft::group_id, raft::server_id from_id, raft::server_id dst_id, raft::read_quorum_reply);
verb [[with_client_info, with_timeout, one_way]] raft_append_entries (raft::group_id, raft::server_id from_id, raft::server_id dst_id, raft::append_request);
verb [[with_client_info, with_timeout, one_way]] raft_append_entries_reply (raft::group_id, raft::server_id from_id, raft::server_id dst_id, raft::append_reply);
verb [[with_client_info, with_timeout, one_way]] raft_vote_request (raft::group_id, raft::server_id from_id, raft::server_id dst_id, raft::vote_request);
verb [[with_client_info, with_timeout, one_way]] raft_vote_reply (raft::group_id, raft::server_id from_id, raft::server_id dst_id, raft::vote_reply);
verb [[with_client_info, with_timeout, one_way]] raft_timeout_now (raft::group_id, raft::server_id from_id, raft::server_id dst_id, raft::timeout_now);
verb [[with_client_info, with_timeout, one_way]] raft_read_quorum (raft::group_id, raft::server_id from_id, raft::server_id dst_id, raft::read_quorum);
verb [[with_client_info, with_timeout, one_way]] raft_read_quorum_reply (raft::group_id, raft::server_id from_id, raft::server_id dst_id, raft::read_quorum_reply);
verb [[with_client_info, with_timeout]] raft_execute_read_barrier_on_leader (raft::group_id, raft::server_id from_id, raft::server_id dst_id) -> raft::read_barrier_reply;
verb [[with_client_info, with_timeout]] raft_add_entry (raft::group_id, raft::server_id from_id, raft::server_id dst_id, raft::command cmd) -> raft::add_entry_reply;
verb [[with_client_info, with_timeout]] raft_modify_config (raft::group_id gid, raft::server_id from_id, raft::server_id dst_id, std::vector<raft::server_address> add, std::vector<raft::server_id> del) -> raft::add_entry_reply;
Expand Down
14 changes: 7 additions & 7 deletions service/raft/raft_group_registry.cc
Original file line number Diff line number Diff line change
Expand Up @@ -64,55 +64,55 @@ void raft_group_registry::init_rpc_verbs() {
raft::group_id gid, raft::server_id from, raft::server_id dst, raft::append_request append_request) mutable {
return handle_raft_rpc(cinfo, gid, from, dst, [from, append_request = std::move(append_request)] (raft_rpc& rpc) mutable {
rpc.append_entries(std::move(from), std::move(append_request));
return make_ready_future<>();
return netw::messaging_service::no_wait();
});
});

ser::raft_rpc_verbs::register_raft_append_entries_reply(&_ms, [handle_raft_rpc] (const rpc::client_info& cinfo, rpc::opt_time_point timeout,
raft::group_id gid, raft::server_id from, raft::server_id dst, raft::append_reply reply) mutable {
return handle_raft_rpc(cinfo, gid, from, dst, [from, reply = std::move(reply)] (raft_rpc& rpc) mutable {
rpc.append_entries_reply(std::move(from), std::move(reply));
return make_ready_future<>();
return netw::messaging_service::no_wait();
});
});

ser::raft_rpc_verbs::register_raft_vote_request(&_ms, [handle_raft_rpc] (const rpc::client_info& cinfo, rpc::opt_time_point timeout,
raft::group_id gid, raft::server_id from, raft::server_id dst, raft::vote_request vote_request) mutable {
return handle_raft_rpc(cinfo, gid, from, dst, [from, vote_request] (raft_rpc& rpc) mutable {
rpc.request_vote(std::move(from), std::move(vote_request));
return make_ready_future<>();
return netw::messaging_service::no_wait();
});
});

ser::raft_rpc_verbs::register_raft_vote_reply(&_ms, [handle_raft_rpc] (const rpc::client_info& cinfo, rpc::opt_time_point timeout,
raft::group_id gid, raft::server_id from, raft::server_id dst, raft::vote_reply vote_reply) mutable {
return handle_raft_rpc(cinfo, gid, from, dst, [from, vote_reply] (raft_rpc& rpc) mutable {
rpc.request_vote_reply(std::move(from), std::move(vote_reply));
return make_ready_future<>();
return netw::messaging_service::no_wait();
});
});

ser::raft_rpc_verbs::register_raft_timeout_now(&_ms, [handle_raft_rpc] (const rpc::client_info& cinfo, rpc::opt_time_point timeout,
raft::group_id gid, raft::server_id from, raft::server_id dst, raft::timeout_now timeout_now) mutable {
return handle_raft_rpc(cinfo, gid, from, dst, [from, timeout_now] (raft_rpc& rpc) mutable {
rpc.timeout_now_request(std::move(from), std::move(timeout_now));
return make_ready_future<>();
return netw::messaging_service::no_wait();
});
});

ser::raft_rpc_verbs::register_raft_read_quorum(&_ms, [handle_raft_rpc] (const rpc::client_info& cinfo, rpc::opt_time_point timeout,
raft::group_id gid, raft::server_id from, raft::server_id dst, raft::read_quorum read_quorum) mutable {
return handle_raft_rpc(cinfo, gid, from, dst, [from, read_quorum] (raft_rpc& rpc) mutable {
rpc.read_quorum_request(std::move(from), std::move(read_quorum));
return make_ready_future<>();
return netw::messaging_service::no_wait();
});
});

ser::raft_rpc_verbs::register_raft_read_quorum_reply(&_ms, [handle_raft_rpc] (const rpc::client_info& cinfo, rpc::opt_time_point timeout,
raft::group_id gid, raft::server_id from, raft::server_id dst, raft::read_quorum_reply read_quorum_reply) mutable {
return handle_raft_rpc(cinfo, gid, from, dst, [from, read_quorum_reply] (raft_rpc& rpc) mutable {
rpc.read_quorum_reply(std::move(from), std::move(read_quorum_reply));
return make_ready_future<>();
return netw::messaging_service::no_wait();
});
});

Expand Down

0 comments on commit c500a90

Please sign in to comment.