Skip to content

Commit

Permalink
rpc: add protocol negotiation
Browse files Browse the repository at this point in the history
Currently rpc does not have any support for protocol negotiation. Client
and server are expected to implement same protocol. This is not robust
against protocol changes. This patch implement protocol features
negotiation that can be used later to extend the protocol. The
negotiation works by exchanging negotiation frame immediately after
connection establishment. The negotiation frame looks like this:

uint8_t magic[8] = SSTARRPC
uint32_t required_features_mask
uint32_t optional_features_mask
uint32_t len
uint8_t data[len]

Actual negotiation looks like this:

Client                                  Server
---------------------------------------------------------
send negotiation frame
                                    recv frame
                                    check magic (disconnect on error)
                                    check required (disconnect on error)
                                    send negotiation frame back
recv frame
  • Loading branch information
Gleb Natapov committed Dec 28, 2015
1 parent e8afec0 commit 53068ce
Show file tree
Hide file tree
Showing 3 changed files with 132 additions and 18 deletions.
18 changes: 18 additions & 0 deletions rpc/rpc.hh
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,15 @@ struct SerializerConcept {
// Input and Output expose void read(char*, size_t) and write(const char*, size_t).
};

static constexpr char rpc_magic[] = "SSTARRPC";

struct negotiation_frame {
char magic[sizeof(rpc_magic) - 1];
uint32_t required_features_mask;
uint32_t optional_features_mask;
uint32_t len; // additional negotiation data length
} __attribute__((packed));

// MsgType is a type that holds type of a message. The type should be hashable
// and serializable. It is preferable to use enum for message types, but
// do not forget to provide hash function for it
Expand Down Expand Up @@ -85,8 +94,10 @@ public:
client_info _info;
stats _stats;
private:
future<negotiation_frame> negotiate_protocol(input_stream<char>& in);
future<MsgType, int64_t, std::experimental::optional<temporary_buffer<char>>>
read_request_frame(input_stream<char>& in);

public:
connection(server& s, connected_socket&& fd, socket_address&& addr, protocol& proto);
future<> process();
Expand All @@ -100,6 +111,9 @@ public:
stats& get_stats_internal() {
return _stats;
}
ipv4_addr peer_address() const {
return ipv4_addr(_info.addr);
}
};
private:
protocol& _proto;
Expand Down Expand Up @@ -158,6 +172,7 @@ public:
stats _stats;
ipv4_addr _server_addr;
private:
future<negotiation_frame> negotiate_protocol(input_stream<char>& in);
future<int64_t, std::experimental::optional<temporary_buffer<char>>>
read_response_frame(input_stream<char>& in);
public:
Expand Down Expand Up @@ -199,6 +214,9 @@ public:
return this->_stopped.get_future();
}
}
ipv4_addr peer_address() const {
return _server_addr;
}
};
friend server;
private:
Expand Down
127 changes: 109 additions & 18 deletions rpc/rpc_impl.hh
Original file line number Diff line number Diff line change
Expand Up @@ -581,6 +581,81 @@ protocol<Serializer, MsgType>::server::connection::connection(protocol<Serialize
_info.addr = std::move(addr);
}

template<typename Connection>
static bool verify_frame(Connection& c, temporary_buffer<char>& buf, size_t expected, const char* log) {
if (buf.size() != expected) {
if (buf.size() != 0) {
c.get_protocol().log(c.peer_address(), log);
}
return false;
}
return true;
}

template<typename Connection>
static void send_negotiation_frame(Connection& c, const negotiation_frame& nf) {
sstring reply(sstring::initialized_later(), sizeof(negotiation_frame));
auto p = reply.begin();
p = std::copy_n(rpc_magic, sizeof(nf.magic), p);
*unaligned_cast<uint32_t*>(p ) = net::hton(nf.required_features_mask);
*unaligned_cast<uint32_t*>(p + 4) = net::hton(nf.optional_features_mask);
*unaligned_cast<uint32_t*>(p + 8) = net::hton(nf.len);
c.out_ready() = c.out().write(reply).then([&c] {
return c.out().flush();
});
}

template<typename Connection>
static
future<negotiation_frame> receive_negotiation_frame(Connection& c, input_stream<char>& in) {
return in.read_exactly(sizeof(negotiation_frame)).then([&c, &in] (temporary_buffer<char> neg) {
if (!verify_frame(c, neg, sizeof(negotiation_frame), "unexpected eof during negotiation frame")) {
return make_exception_future<negotiation_frame>(closed_error());
}
negotiation_frame* frame = reinterpret_cast<negotiation_frame*>(neg.get_write());
if (std::memcmp(frame->magic, rpc_magic, sizeof(frame->magic)) != 0) {
c.get_protocol().log(c.peer_address(), "wrong protocol magic");
return make_exception_future<negotiation_frame>(closed_error());
}
frame->required_features_mask = net::ntoh(frame->required_features_mask);
frame->optional_features_mask = net::ntoh(frame->required_features_mask);
frame->len = net::ntoh(frame->len);
return make_ready_future<negotiation_frame>(*frame);
});
}

template<typename Connection>
static
future<temporary_buffer<char>> receive_additional_negotiation_data(Connection& c, input_stream<char>& in, size_t len) {
if (len == 0) {
return make_ready_future<temporary_buffer<char>>(temporary_buffer<char>());
} else {
return in.read_exactly(len).then([&c, &in, len] (temporary_buffer<char> neg) {
if (!verify_frame(c, neg, len, "unexpected eof during negotiation frame")) {
return make_exception_future<temporary_buffer<char>>(closed_error());
}
return make_ready_future<temporary_buffer<char>>(std::move(neg));
});
}
}

template<typename Serializer, typename MsgType>
future<negotiation_frame>
protocol<Serializer, MsgType>::server::connection::negotiate_protocol(input_stream<char>& in) {
return receive_negotiation_frame(*this, in).then([this, &in] (negotiation_frame nf) {
return receive_additional_negotiation_data(*this, in, nf.len).then([this, nf] (temporary_buffer<char> buf) mutable {
if (nf.required_features_mask != 0) {
this->get_protocol().log(_info, "negotiation failed: unsupported required features");
return make_exception_future<negotiation_frame>(closed_error());
}
nf.optional_features_mask = 0;
nf.len = 0;
send_negotiation_frame(*this, nf);
return make_ready_future<negotiation_frame>(nf);
});
});
}

template <typename Serializer, typename MsgType>
future<MsgType, int64_t, std::experimental::optional<temporary_buffer<char>>>
protocol<Serializer, MsgType>::server::connection::read_request_frame(input_stream<char>& in) {
Expand All @@ -607,14 +682,16 @@ protocol<Serializer, MsgType>::server::connection::read_request_frame(input_stre

template<typename Serializer, typename MsgType>
future<> protocol<Serializer, MsgType>::server::connection::process() {
return do_until([this] { return this->_read_buf.eof() || this->_error; }, [this] () mutable {
return this->read_request_frame(this->_read_buf).then([this] (MsgType type, int64_t msg_id, std::experimental::optional<temporary_buffer<char>> data) {
auto it = _server._proto._handlers.find(type);
if (data && it != _server._proto._handlers.end()) {
it->second(this->shared_from_this(), msg_id, std::move(data.value()));
} else {
this->_error = true;
}
return this->negotiate_protocol(this->_read_buf).then([this] (negotiation_frame frame) mutable {
return do_until([this] { return this->_read_buf.eof() || this->_error; }, [this] () mutable {
return this->read_request_frame(this->_read_buf).then([this] (MsgType type, int64_t msg_id, std::experimental::optional<temporary_buffer<char>> data) {
auto it = _server._proto._handlers.find(type);
if (data && it != _server._proto._handlers.end()) {
it->second(this->shared_from_this(), msg_id, std::move(data.value()));
} else {
this->_error = true;
}
});
});
}).then_wrapped([this] (future<> f) {
f.ignore_ready_future();
Expand All @@ -636,6 +713,18 @@ future<> protocol<Serializer, MsgType>::server::connection::process() {
});
}

template<typename Serializer, typename MsgType>
future<negotiation_frame>
protocol<Serializer, MsgType>::client::negotiate_protocol(input_stream<char>& in) {
negotiation_frame nf = {{}, 0, 0, 0};
send_negotiation_frame(*this, nf);
return receive_negotiation_frame(*this, in).then([this, &in] (negotiation_frame nf) {
return receive_additional_negotiation_data(*this, in, nf.len).then([nf] (temporary_buffer<char> buf){
return nf;
});
});
}

// FIXME: take out-of-line?
template<typename Serializer, typename MsgType>
inline
Expand Down Expand Up @@ -672,16 +761,18 @@ protocol<Serializer, MsgType>::client::client(protocol<Serializer, MsgType>& pro
this->_write_buf = this->_fd.output();
this->_connected_promise.set_value();
this->_connected = true;
return do_until([this] { return this->_read_buf.eof() || this->_error; }, [this] () mutable {
return this->read_response_frame(this->_read_buf).then([this] (int64_t msg_id, std::experimental::optional<temporary_buffer<char>> data) {
auto it = _outstanding.find(::abs(msg_id));
if (data && it != _outstanding.end()) {
auto handler = std::move(it->second);
_outstanding.erase(it);
(*handler)(*this, msg_id, std::move(data.value()));
} else {
this->_error = true;
}
return this->negotiate_protocol(this->_read_buf).then([this] (negotiation_frame frame) {
return do_until([this] { return this->_read_buf.eof() || this->_error; }, [this] () mutable {
return this->read_response_frame(this->_read_buf).then([this] (int64_t msg_id, std::experimental::optional<temporary_buffer<char>> data) {
auto it = _outstanding.find(::abs(msg_id));
if (data && it != _outstanding.end()) {
auto handler = std::move(it->second);
_outstanding.erase(it);
(*handler)(*this, msg_id, std::move(data.value()));
} else {
this->_error = true;
}
});
});
});
}).then_wrapped([this] (future<> f){
Expand Down
5 changes: 5 additions & 0 deletions tests/rpc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,11 @@ int main(int ac, char** av) {
static std::unique_ptr<rpc::protocol<serializer>::client> client;
static double x = 30.0;

myrpc.set_logger([] (const sstring& log) {
print("%s", log);
std::cout << std::flush;
});

return app.run_deprecated(ac, av, [&] {
auto&& config = app.configuration();
uint16_t port = config["port"].as<uint16_t>();
Expand Down

0 comments on commit 53068ce

Please sign in to comment.