Skip to content

Commit 8d95ebb

Browse files
committed
rpc implementation
1 parent 8d9966b commit 8d95ebb

File tree

8 files changed

+302
-263
lines changed

8 files changed

+302
-263
lines changed

example/rpc.cpp

Lines changed: 20 additions & 190 deletions
Original file line numberDiff line numberDiff line change
@@ -5,190 +5,14 @@
55
#include <print>
66
#include <iostream>
77

8-
#include <erl/rpc/rpc.hpp>
9-
#include <erl/rpc/protocol.hpp>
10-
#include <erl/queue/mpmc_bounded.hpp>
11-
#include <erl/queue/spsc_bounded.hpp>
12-
#include <erl/reflect.hpp>
8+
#include <erl/rpc.hpp>
9+
1310
#include <thread>
1411
#include "erl/log/message.hpp"
1512
#include "erl/net/message/reader.hpp"
1613

17-
template <typename Proto>
18-
struct BlockingCall {
19-
template <typename R, typename Self, typename... Args>
20-
auto call(this Self&& self, std::size_t index, Args&&... args) {
21-
auto request = Proto::request(index, std::forward<Args>(args)...);
22-
self.send(request);
23-
auto response = self.recv();
24-
return Proto::template read_response<R>(index, std::span<char const>{response});
25-
}
26-
27-
template <typename Self, typename S>
28-
void handle(this Self&& self, S&& service, std::span<char const> message) {
29-
auto reply = Proto::dispatch(std::forward<S>(service), std::span<char const>{message});
30-
std::forward<Self>(self).send(reply);
31-
}
32-
};
33-
34-
template <typename Proto>
35-
struct EventCall {
36-
template <typename R, typename Self, typename... Args>
37-
void call(this Self&& self, std::size_t index, Args&&... args) {
38-
auto request = Proto::request(index, std::forward<Args>(args)...);
39-
std::forward<Self>(self).send(request);
40-
}
41-
42-
template <typename Self, typename S>
43-
void handle(this Self&& self, S&& service, std::span<char const> message) {
44-
Proto::dispatch(std::forward<S>(service), std::span<char const>{message});
45-
}
46-
};
47-
48-
template <typename MsgType>
49-
struct RPCProtocol {
50-
using message_type = MsgType;
51-
using index_type = std::uint32_t;
52-
53-
template <typename... Args>
54-
static message_type request(index_type index, Args&&... args) {
55-
auto message = message_type{};
56-
erl::serialize(index, message);
57-
(erl::serialize(std::forward<Args>(args), message), ...);
58-
return message;
59-
}
60-
61-
template <typename S>
62-
static message_type dispatch(S&& service, std::span<char const> message) {
63-
auto reader = erl::message::MessageView{message};
64-
auto index = erl::deserialize<index_type>(reader);
65-
auto remainder = reader.buffer.subspan(reader.cursor);
66-
constexpr static auto dispatcher = erl::rpc::Dispatcher<S, RPCProtocol>{};
67-
return dispatcher(std::forward<S>(service), index, remainder);
68-
}
69-
70-
template <typename... Ts>
71-
static message_type make_response(index_type index, Ts&&... value) {
72-
auto message = message_type{};
73-
erl::serialize(index, message);
74-
(erl::serialize(std::forward<Ts>(value), message), ...);
75-
return message;
76-
}
77-
78-
template <typename T>
79-
static T read_response(index_type expected_index, std::span<char const> message) {
80-
auto reader = erl::message::MessageView{message};
81-
auto index = erl::deserialize<index_type>(reader);
82-
assert(index == expected_index);
83-
if constexpr (!std::same_as<T, void>) {
84-
return erl::deserialize<T>(reader);
85-
}
86-
}
87-
};
8814

89-
template <typename T>
90-
concept is_queue = requires(T obj) {
91-
typename T::element_type;
92-
{ obj.push(typename T::element_type{}) } -> std::same_as<void>;
93-
{ obj.pop() } -> std::same_as<typename T::element_type>;
94-
};
95-
96-
template <typename U, typename V>
97-
struct Client {
98-
U* in;
99-
V* out;
100-
101-
explicit Client(U* in, V* out) : in(in), out(out) {}
102-
103-
void send(auto const& message)
104-
requires(is_queue<U>)
105-
{
106-
in->push(message);
107-
}
108-
109-
auto recv()
110-
requires(is_queue<V>)
111-
{
112-
return out->pop();
113-
}
11415

115-
void kill()
116-
requires(is_queue<U>)
117-
{
118-
auto goodbye_message = typename U::element_type{};
119-
in->push(goodbye_message);
120-
}
121-
};
122-
123-
template <is_queue U>
124-
Client(U*, std::nullptr_t) -> Client<U, void>;
125-
126-
template <is_queue V>
127-
Client(std::nullptr_t, V*) -> Client<void, V>;
128-
129-
template <typename C>
130-
struct Server {
131-
C client;
132-
133-
explicit Server(C client) : client(client) {}
134-
135-
template <typename T>
136-
void run(T&& service) {
137-
while (true) {
138-
auto msg = client.recv();
139-
if (msg.size() == 0) {
140-
break;
141-
}
142-
143-
client.handle(service, std::span<char const>{msg});
144-
}
145-
}
146-
};
147-
148-
template <typename... T>
149-
struct Mixin : T... {};
150-
151-
template <template <typename> class Protocol>
152-
struct Pipe {
153-
using message = erl::message::HybridBuffer<58>;
154-
using protocol = Protocol<message>;
155-
using message_queue = erl::queues::BoundedSPSC<message, 64>;
156-
using call_type = BlockingCall<protocol>;
157-
158-
message_queue in{};
159-
message_queue out{};
160-
161-
auto make_server() { return Server{Mixin{Client{&out, &in}, call_type{}}}; }
162-
163-
auto make_client() { return Mixin{Client{&in, &out}, call_type{}}; }
164-
};
165-
166-
template <typename T>
167-
struct Null {
168-
Null() = default;
169-
Null(auto&&) {}
170-
171-
T pop() {
172-
std::unreachable();
173-
return {};
174-
}
175-
176-
void push(auto&&) { return; }
177-
};
178-
179-
template <template <typename> class Protocol>
180-
struct EventQueue {
181-
using message = erl::message::HybridBuffer<58>;
182-
using protocol = Protocol<message>;
183-
using call_type = EventCall<protocol>;
184-
using message_queue = erl::queues::BoundedMPMC<message, 64>;
185-
186-
message_queue events{};
187-
188-
auto make_server() { return Server{Mixin{Client{nullptr, &events}, call_type{}}}; }
189-
190-
auto make_client() { return Mixin{Client{&events, nullptr}, call_type{}}; }
191-
};
19216
using namespace std::string_literals;
19317

19418
struct TestService {
@@ -211,21 +35,21 @@ namespace erl::logging {
21135
struct LoggingService {
21236
// std::vector<std::unique_ptr<Sink>> sinks;
21337

214-
using policy = rpc::Policy<rpc::Annotated, rpc::DefaultPolicy>;
38+
using policy = rpc::Annotated;
21539
using message_type = erl::message::HybridBuffer<58>;
216-
using protocol = RPCProtocol<message_type>;
217-
using call_type = EventCall<protocol>;
40+
using protocol = rpc::RPCProtocol<message_type>;
21841

219-
void spawn(std::uint64_t thread){}
220-
void exit(std::uint64_t thread){}
221-
void rename(std::uint64_t thread, std::string_view name){}
222-
void set_parent(std::uint64_t thread, std::uint64_t parent){}
223-
void add_sink(Sink* sink){}
224-
void remove_sink(Sink* sink){}
42+
[[=rpc::callback]] void spawn(std::uint64_t thread){std::print("spawn {}", thread);}
43+
[[=rpc::callback]] void exit(std::uint64_t thread){}
44+
[[=rpc::callback]] void rename(std::uint64_t thread, std::string_view name){}
45+
[[=rpc::callback]] void set_parent(std::uint64_t thread, std::uint64_t parent){}
46+
[[=rpc::callback]] void add_sink(Sink* sink){}
47+
[[=rpc::callback]] void remove_sink(Sink* sink){}
22548

22649
private:
22750
void handle_print(std::span<char const> data){
22851
auto reader = erl::message::MessageView{data};
52+
auto severity = deserialize<Severity>(reader);
22953
auto fnc = deserialize<formatter_type>(reader);
23054
// auto prelude = deserialize<LoggingEvent>(reader);
23155
// auto fnc = reinterpret_cast<formatter_type>(prelude.handler_ptr);
@@ -261,7 +85,7 @@ struct LoggingService {
26185
} // namespace erl::logging
26286

26387

264-
auto logging_pipe = EventQueue<RPCProtocol>();
88+
auto logging_pipe = erl::EventQueue<erl::logging::LoggingService::message_type>();
26589

26690
template <typename... Args>
26791
void logg(erl::logging::FormatString<Args...> fmt, Args&&... args) {
@@ -272,6 +96,8 @@ void logg(erl::logging::FormatString<Args...> fmt, Args&&... args) {
27296

27397

27498
int main() {
99+
using namespace erl::logging;
100+
275101
auto x = std::jthread([&] {
276102
// auto service = TestService{};
277103
auto server = logging_pipe.make_server();
@@ -281,9 +107,13 @@ int main() {
281107
});
282108

283109
auto client = logging_pipe.make_client();
284-
// auto remote = erl::rpc::make_proxy<erl::logging::LoggingService>(&client);
110+
auto remote = erl::rpc::make_proxy<LoggingService>(&client);
285111
// auto remote = erl::rpc::make_proxy<TestService>(&client);
286112

287-
logg("foo {}", 42);
113+
// logg("foo {}\n", 42);
114+
remote.spawn(42);
115+
116+
// erl::rpc::call<&LoggingService::spawn>(client, 42);
117+
288118
client.kill();
289119
}

include/erl/net/null.hpp

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
template <typename T>
2+
struct Null {
3+
Null() = default;
4+
explicit Null(auto&&...) {}
5+
6+
T pop() {
7+
return {};
8+
}
9+
10+
void push(auto&&) { }
11+
};

include/erl/net/queue.hpp

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
#pragma once
2+
#include <concepts>
3+
#include <cstddef>
4+
5+
namespace erl::net {
6+
template <typename T>
7+
concept is_queue = requires(T obj) {
8+
typename T::element_type;
9+
{ obj.push(typename T::element_type{}) } -> std::same_as<void>;
10+
{ obj.pop() } -> std::same_as<typename T::element_type>;
11+
};
12+
13+
template <typename U, typename V>
14+
struct QueueClient {
15+
U* in;
16+
V* out;
17+
18+
void send(auto const& message)
19+
requires(is_queue<U>)
20+
{
21+
in->push(message);
22+
}
23+
24+
auto recv()
25+
requires(is_queue<V>)
26+
{
27+
return out->pop();
28+
}
29+
30+
void kill()
31+
requires(is_queue<U>)
32+
{
33+
auto goodbye_message = typename U::element_type{};
34+
in->push(goodbye_message);
35+
}
36+
};
37+
38+
template <is_queue U>
39+
QueueClient(U*, std::nullptr_t) -> QueueClient<U, void>;
40+
41+
template <is_queue V>
42+
QueueClient(std::nullptr_t, V*) -> QueueClient<void, V>;
43+
}

include/erl/net/service.hpp

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
#pragma once
2+
#include <span>
3+
4+
5+
namespace erl::net {
6+
template <typename C>
7+
struct Server {
8+
C client;
9+
10+
explicit Server(C client) : client(client) {}
11+
12+
template <typename T>
13+
void run(T&& service) {
14+
while (true) {
15+
auto msg = client.recv();
16+
if (msg.size() == 0) {
17+
break;
18+
}
19+
20+
client.handle(service, std::span<char const>{msg});
21+
}
22+
}
23+
};
24+
}

0 commit comments

Comments
 (0)