Skip to content

Commit 4ed2f8e

Browse files
Service work
1 parent ed55640 commit 4ed2f8e

File tree

4 files changed

+186
-16
lines changed

4 files changed

+186
-16
lines changed

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ docopt = "0.6"
1717
rustc-serialize = "0.3"
1818
lazy_static = "0.1.*"
1919
serde = "*"
20+
tangle = "0.3.0"
2021

2122
[dependencies.mio]
2223
git = "https://github.com/carllerche/mio"

src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ extern crate nom;
77
extern crate lazy_static;
88
extern crate mio;
99
extern crate byteorder;
10+
extern crate tangle;
1011

1112
pub use generator::Generator;
1213
use nom::{IResult};

src/protocol.rs

Lines changed: 32 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,7 @@ pub trait ThriftSerializer {
157157
}
158158
}
159159

160+
#[derive(Debug)]
160161
pub struct ThriftMessage {
161162
pub name: String,
162163
pub ty: ThriftMessageType,
@@ -179,7 +180,7 @@ pub trait ThriftDeserializer {
179180
}
180181

181182
pub trait Deserialize: Sized {
182-
fn deserialize(de: &mut Deserializer) -> Result<Self, Error>;
183+
fn deserialize<D>(de: &mut D) -> Result<Self, Error> where D: Deserializer + ThriftDeserializer;
183184
}
184185

185186
/// ```
@@ -192,13 +193,17 @@ pub trait Deserialize: Sized {
192193
/// assert_eq!(val, 100);
193194
/// ```
194195
impl Deserialize for u8 {
195-
fn deserialize(de: &mut Deserializer) -> Result<Self, Error> {
196+
fn deserialize<D>(de: &mut D) -> Result<Self, Error>
197+
where D: Deserializer + ThriftDeserializer
198+
{
196199
de.deserialize_u8()
197200
}
198201
}
199202

200203
impl Deserialize for bool {
201-
fn deserialize(de: &mut Deserializer) -> Result<Self, Error> {
204+
fn deserialize<D>(de: &mut D) -> Result<Self, Error>
205+
where D: Deserializer + ThriftDeserializer
206+
{
202207
de.deserialize_bool()
203208
}
204209
}
@@ -213,49 +218,65 @@ impl Deserialize for bool {
213218
/// assert_eq!(val, 100);
214219
/// ```
215220
impl Deserialize for i8 {
216-
fn deserialize(de: &mut Deserializer) -> Result<Self, Error> {
221+
fn deserialize<D>(de: &mut D) -> Result<Self, Error>
222+
where D: Deserializer + ThriftDeserializer
223+
{
217224
de.deserialize_i8()
218225
}
219226
}
220227

221228
impl Deserialize for u16 {
222-
fn deserialize(de: &mut Deserializer) -> Result<Self, Error> {
229+
fn deserialize<D>(de: &mut D) -> Result<Self, Error>
230+
where D: Deserializer + ThriftDeserializer
231+
{
223232
de.deserialize_u16()
224233
}
225234
}
226235

227236
impl Deserialize for i16 {
228-
fn deserialize(de: &mut Deserializer) -> Result<Self, Error> {
237+
fn deserialize<D>(de: &mut D) -> Result<Self, Error>
238+
where D: Deserializer + ThriftDeserializer
239+
{
229240
de.deserialize_i16()
230241
}
231242
}
232243

233244
impl Deserialize for u32 {
234-
fn deserialize(de: &mut Deserializer) -> Result<Self, Error> {
245+
fn deserialize<D>(de: &mut D) -> Result<Self, Error>
246+
where D: Deserializer + ThriftDeserializer
247+
{
235248
de.deserialize_u32()
236249
}
237250
}
238251

239252
impl Deserialize for i32 {
240-
fn deserialize(de: &mut Deserializer) -> Result<Self, Error> {
253+
fn deserialize<D>(de: &mut D) -> Result<Self, Error>
254+
where D: Deserializer + ThriftDeserializer
255+
{
241256
de.deserialize_i32()
242257
}
243258
}
244259

245260
impl Deserialize for u64 {
246-
fn deserialize(de: &mut Deserializer) -> Result<Self, Error> {
261+
fn deserialize<D>(de: &mut D) -> Result<Self, Error>
262+
where D: Deserializer + ThriftDeserializer
263+
{
247264
de.deserialize_u64()
248265
}
249266
}
250267

251268
impl Deserialize for i64 {
252-
fn deserialize(de: &mut Deserializer) -> Result<Self, Error> {
269+
fn deserialize<D>(de: &mut D) -> Result<Self, Error>
270+
where D: Deserializer + ThriftDeserializer
271+
{
253272
de.deserialize_i64()
254273
}
255274
}
256275

257276
impl Deserialize for String {
258-
fn deserialize(de: &mut Deserializer) -> Result<Self, Error> {
277+
fn deserialize<D>(de: &mut D) -> Result<Self, Error>
278+
where D: Deserializer + ThriftDeserializer
279+
{
259280
de.deserialize_str()
260281
}
261282
}

src/service.rs

Lines changed: 152 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,84 @@
1-
use protocol::{Serializer, ThriftSerializer, Serialize, ThriftType, ThriftMessageType, Error};
1+
use protocol::{Serializer, ThriftSerializer, ThriftMessage, ThriftDeserializer, Deserializer,
2+
Deserialize, Serialize, ThriftType, ThriftMessageType, Error};
23
use binary_protocol::{BinarySerializer, BinaryDeserializer};
4+
use std::io::Cursor;
35

46
pub trait Service {
57
fn query(&mut self, val: bool);
68
}
79

10+
/// TransformCall trait is the transformation from a thrift object to
11+
/// a Rust method call.
12+
pub struct TransformIncomingCall<'a, D: 'a> {
13+
de: &'a mut D,
14+
service: &'a mut Service
15+
}
16+
17+
pub trait TransformCall {
18+
fn call_query(&mut self) -> Result<(), Error>;
19+
}
20+
21+
/// XXX: Add support for RPC return/reply.
22+
/// XXX: Add Future support for RPC calls. This will allow us to support return types as we can
23+
/// essentially have:
24+
///
25+
/// ```notrust
26+
/// TransformIncomingCall::new(&mut de).call_query()
27+
/// ```
28+
impl<'a, D> TransformIncomingCall<'a, D>
29+
where D: 'a + Deserializer + ThriftDeserializer
30+
{
31+
pub fn new(de: &'a mut D, service: &'a mut Service) -> TransformIncomingCall<'a, D> {
32+
TransformIncomingCall {
33+
de: de,
34+
service: service
35+
}
36+
}
37+
}
38+
39+
impl<'a, D> TransformCall for TransformIncomingCall<'a, D>
40+
where D: 'a + Deserializer + ThriftDeserializer
41+
{
42+
fn call_query(&mut self) -> Result<(), Error> {
43+
// Deserialize into QueryArgs
44+
let args: QueryArgs = try!(Deserialize::deserialize(self.de));
45+
46+
self.service.query(args.val);
47+
Ok(())
48+
}
49+
}
50+
51+
// Generated
52+
fn transform_msg<D>(msg: ThriftMessage, de: &mut D, service: &mut Service) -> Result<(), Error>
53+
where D: Deserializer + ThriftDeserializer
54+
{
55+
match &*msg.name {
56+
"query" => {
57+
let args: QueryArgs = try!(Deserialize::deserialize(de));
58+
service.query(args.val);
59+
},
60+
_ => {
61+
// Return Err.
62+
}
63+
}
64+
65+
Ok(())
66+
}
67+
868
pub struct QueryArgs {
969
val: bool
1070
}
1171

72+
impl Deserialize for QueryArgs {
73+
fn deserialize<D>(de: &mut D) -> Result<Self, Error>
74+
where D: Deserializer + ThriftDeserializer
75+
{
76+
Ok(QueryArgs {
77+
val: try!(de.deserialize_bool())
78+
})
79+
}
80+
}
81+
1282
impl Serialize for QueryArgs {
1383
fn serialize<S>(&self, s: &mut S) -> Result<(), Error>
1484
where S: Serializer + ThriftSerializer
@@ -40,14 +110,22 @@ fn dispatch_query(service: &mut Service, args: QueryArgs) {
40110
service.query(args.val);
41111
}
42112

43-
pub struct RpcClient;
113+
pub struct RpcClient {
114+
buf: Vec<u8>
115+
}
116+
117+
impl RpcClient {
118+
pub fn new() -> RpcClient {
119+
RpcClient {
120+
buf: Vec::new()
121+
}
122+
}
123+
}
44124

45125
impl Service for RpcClient {
46126
fn query(&mut self, val: bool) {
47-
let mut v = Vec::new();
48-
49127
{
50-
let mut proto = BinarySerializer::new(&mut v);
128+
let mut proto = BinarySerializer::new(&mut self.buf);
51129
let args = QueryArgs {
52130
val: val
53131
};
@@ -58,3 +136,72 @@ impl Service for RpcClient {
58136
}
59137
}
60138
}
139+
140+
pub struct MessagePipeline<'a, D> {
141+
de: D,
142+
service: &'a mut Service
143+
}
144+
145+
impl<'a, D> MessagePipeline<'a, D>
146+
where D: Deserializer + ThriftDeserializer
147+
{
148+
pub fn new(de: D, service: &'a mut Service) -> MessagePipeline<D> {
149+
MessagePipeline {
150+
de: de,
151+
service: service
152+
}
153+
}
154+
155+
/// Dispatch the incoming RPC call to the respective service method.
156+
pub fn dispatch(&mut self, msg: ThriftMessage) -> Result<(), Error> {
157+
try!(transform_msg(msg, &mut self.de, self.service));
158+
Ok(())
159+
}
160+
161+
/// XXX: The fn signature should be `Result<Future<Vec<u8>>, Error>` where the serialized
162+
/// response is returned into the future.
163+
pub fn run(&mut self) -> Result<(), Error> {
164+
let msg = try!(self.de.read_message_begin());
165+
166+
match msg.ty {
167+
// Dispatch on an RPC method call.
168+
ThriftMessageType::Call => {
169+
try!(self.dispatch(msg));
170+
},
171+
_ => {}
172+
}
173+
174+
Ok(())
175+
}
176+
}
177+
178+
#[test]
179+
fn call_query() {
180+
let mut buf = {
181+
let mut rpc = RpcClient::new();
182+
rpc.query(true);
183+
rpc.buf
184+
};
185+
186+
struct Server;
187+
188+
impl Service for Server {
189+
fn query(&mut self, val: bool) {
190+
assert_eq!(val, true);
191+
}
192+
}
193+
194+
let mut de = BinaryDeserializer::new(Cursor::new(buf));
195+
let mut s = Server;
196+
let mut pipe = MessagePipeline::new(de, &mut s);
197+
// XXX: Expect a future as return value.
198+
//
199+
// ```notrust
200+
// pipe.run().and_then(|res| {
201+
// // ...
202+
// })
203+
// ```
204+
//
205+
// Where `res` is the serialized response.
206+
pipe.run().unwrap();
207+
}

0 commit comments

Comments
 (0)