Skip to content

Commit

Permalink
rpc: allow extending existing verbs in backwards compatable way
Browse files Browse the repository at this point in the history
This patch allows adding (but not removing or replacing) additional
parameters to existing verbs or return additional values from rpc
handlers. Suppose there is a verb:

(1) future<int> rpc_call(long, int)

this patch allow it to be modified to be:

(2) future<int, rpc::optional<long>> rpc_call(long, int, rpc::optional<char>)

and be used against binary versions that had definition 1. If
older version is a client then during the call  rpc::optional<char>
will be disengaged since older version does not send value for the
parameter.  If older version is a server it will ignore value sent for
rpc::optional<char> and call the callback with old signature. Extending
return value works in a similar way.
  • Loading branch information
Gleb Natapov committed Dec 27, 2015
1 parent c12e683 commit e8afec0
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 2 deletions.
35 changes: 33 additions & 2 deletions rpc/rpc_impl.hh
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,16 @@

namespace rpc {

template<typename T>
struct remove_optional {
using type = T;
};

template<typename T>
struct remove_optional<optional<T>> {
using type = T;
};

struct wait_type {}; // opposite of no_wait_type

// tags to tell whether we want a const client_info& parameter
Expand Down Expand Up @@ -178,10 +188,28 @@ inline std::tuple<> do_unmarshall(Serializer& serializer, Input& in) {
return std::make_tuple();
}

template<typename Serializer, typename Input, typename T>
struct unmarshal_one {
static T doit(Serializer& serializer, Input& in) {
return read(serializer, in, type<T>());
}
};

template<typename Serializer, typename Input, typename T>
struct unmarshal_one<Serializer, Input, optional<T>> {
static optional<T> doit(Serializer& serializer, Input& in) {
if (in.size()) {
return optional<T>(read(serializer, in, type<typename remove_optional<T>::type>()));
} else {
return optional<T>();
}
}
};

template <typename Serializer, typename Input, typename T0, typename... Trest>
inline std::tuple<T0, Trest...> do_unmarshall(Serializer& serializer, Input& in) {
// FIXME: something less recursive
auto first = std::make_tuple(read(serializer, in, type<T0>()));
auto first = std::make_tuple(unmarshal_one<Serializer, Input, T0>::doit(serializer, in));
auto rest = do_unmarshall<Serializer, Input, Trest...>(serializer, in);
return std::tuple_cat(std::move(first), std::move(rest));
}
Expand All @@ -199,6 +227,9 @@ public:
_p += size;
_size -= size;
}
const size_t size() const {
return _size;
}
};

template <typename Serializer, typename... T>
Expand Down Expand Up @@ -463,7 +494,7 @@ struct handler_type_impl;

template<typename Ret, typename F, std::size_t... I>
struct handler_type_impl<Ret, F, std::integer_sequence<std::size_t, I...>> {
using type = handler_type_helper<Ret, typename F::template arg<I>::type...>;
using type = handler_type_helper<Ret, typename remove_optional<typename F::template arg<I>::type>::type...>;
};

// this class is used to calculate client side rpc function signature
Expand Down
7 changes: 7 additions & 0 deletions rpc/rpc_types.hh
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include <stdexcept>
#include <string>
#include <boost/any.hpp>
#include <experimental/optional>

namespace rpc {

Expand Down Expand Up @@ -83,4 +84,10 @@ struct no_wait_type {};
// return this from a callback if client does not want to waiting for a reply
extern no_wait_type no_wait;

template <typename T>
class optional : public std::experimental::optional<T> {
public:
using std::experimental::optional<T>::optional;
};

} // namespace rpc
31 changes: 31 additions & 0 deletions tests/rpc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,12 @@ int main(int ac, char** av) {
if (config.count("server")) {
std::cout << "client" << std::endl;
auto test7 = myrpc.make_client<long (long a, long b)>(7);
auto test9 = myrpc.make_client<long (long a, long b)>(9); // do not send optional
auto test9_1 = myrpc.make_client<long (long a, long b, int c)>(9); // send optional
auto test9_2 = myrpc.make_client<long (long a, long b, int c, long d)>(9); // send more data than handler expects
auto test10 = myrpc.make_client<long ()>(10); // receive less then replied
auto test10_1 = myrpc.make_client<future<long, int> ()>(10); // receive all
auto test11 = myrpc.make_client<future<long, rpc::optional<int>> ()>(11); // receive more then replied

client = std::make_unique<rpc::protocol<serializer>::client>(myrpc, ipv4_addr{config["server"].as<std::string>()});

Expand Down Expand Up @@ -133,6 +139,12 @@ int main(int ac, char** av) {
test5(*client).then([] { print("test5 no wait ended\n"); });
test6(*client, 1).then([] { print("test6 ended\n"); });
test7(*client, 5, 6).then([] (long r) { print("test7 got %ld\n", r); });
test9(*client, 1, 2).then([] (long r) { print("test9 got %ld\n", r); });
test9_1(*client, 1, 2, 3).then([] (long r) { print("test9.1 got %ld\n", r); });
test9_2(*client, 1, 2, 3, 4).then([] (long r) { print("test9.2 got %ld\n", r); });
test10(*client).then([] (long r) { print("test10 got %ld\n", r); });
test10_1(*client).then([] (long r, int rr) { print("test10_1 got %ld and %d\n", r, rr); });
test11(*client).then([] (long r, rpc::optional<int> rr) { print("test11 got %ld and %d\n", r, bool(rr)); });
}
f.finally([] {
sleep(1s).then([] {
Expand All @@ -155,6 +167,25 @@ int main(int ac, char** av) {
t->arm(1s);
return f;
});
myrpc.register_handler(9, [] (long a, long b, rpc::optional<int> c) {
long r = 2;
print("test9 got %ld %ld ", a, b);
if (c) {
print("%d", c.value());
r++;
}
print("\n");
return r;
});
myrpc.register_handler(10, [] {
print("test 10\n");
return make_ready_future<long, int>(1, 2);
});
myrpc.register_handler(11, [] {
print("test 11\n");
return 1ul;
});

server = std::make_unique<rpc::protocol<serializer>::server>(myrpc, ipv4_addr{port});
}
});
Expand Down

0 comments on commit e8afec0

Please sign in to comment.