Skip to content

Commit da48867

Browse files
committed
Update rpc
Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com>
1 parent 5c727cb commit da48867

File tree

3 files changed

+68
-59
lines changed

3 files changed

+68
-59
lines changed

rust/src/bin/receive_logs_topic.rs

+13-9
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use futures::StreamExt;
12
use lapin::{options::*, types::FieldTable, Connection, ConnectionProperties, ExchangeKind};
23

34
#[tokio::main]
@@ -35,7 +36,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
3536
)
3637
.await?;
3738

38-
futures::future::join_all(binding_keys.into_iter().map(|binding_key| {
39+
futures::future::join_all(binding_keys.iter().map(|binding_key| {
3940
channel.queue_bind(
4041
queue.name().as_str(),
4142
"topic_logs",
@@ -46,7 +47,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
4647
}))
4748
.await;
4849

49-
let consumer = channel
50+
let mut consumer = channel
5051
.basic_consume(
5152
queue.name().as_str(),
5253
"consumer",
@@ -60,14 +61,17 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
6061

6162
println!(" [*] Waiting for logs. To exit press CTRL+C");
6263

63-
for delivery in consumer {
64-
let (_, delivery) = delivery?;
65-
println!(
66-
" [x] {}:{:?}",
67-
delivery.routing_key,
68-
std::str::from_utf8(&delivery.data)?
69-
);
64+
65+
while let Some(delivery) = consumer.next().await {
66+
if let Ok(delivery) = delivery {
67+
println!(
68+
" [x] {}:{:?}",
69+
delivery.routing_key,
70+
std::str::from_utf8(&delivery.data)?
71+
);
72+
}
7073
}
7174

75+
7276
Ok(())
7377
}

rust/src/bin/rpc_client.rs

+10-10
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ use lapin::{
55
};
66
use std::convert::TryInto;
77
use std::fmt::Display;
8-
use tokio::stream::StreamExt;
98
use uuid::Uuid;
109

1110
#[derive(Debug)]
@@ -87,15 +86,16 @@ impl FibonacciRpcClient {
8786
.await?;
8887

8988
while let Some(delivery) = self.consumer.next().await {
90-
let (_, reply) = delivery?;
91-
if reply.properties.correlation_id().as_ref() == Some(&self.correlation_id) {
92-
return Ok(u64::from_le_bytes(
93-
reply
94-
.data
95-
.as_slice()
96-
.try_into()
97-
.map_err(|_| Error::CannotDecodeReply)?,
98-
));
89+
if let Ok(delivery) = delivery {
90+
if delivery.properties.correlation_id().as_ref() == Some(&self.correlation_id) {
91+
return Ok(u64::from_le_bytes(
92+
delivery
93+
.data
94+
.as_slice()
95+
.try_into()
96+
.map_err(|_| Error::CannotDecodeReply)?,
97+
));
98+
}
9999
}
100100
}
101101

rust/src/bin/rpc_server.rs

+45-40
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
1-
use lapin::{options::*, types::FieldTable, BasicProperties, Connection, ConnectionProperties};
21
use std::convert::TryInto;
32
use std::fmt::Display;
3+
use futures::StreamExt;
4+
use lapin::{BasicProperties, Connection, ConnectionProperties, options::*, types::FieldTable};
45

56
#[derive(Debug)]
67
enum Error {
@@ -45,7 +46,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
4546

4647
channel.basic_qos(1, BasicQosOptions::default()).await?;
4748

48-
let consumer = channel
49+
let mut consumer = channel
4950
.basic_consume(
5051
"rpc_queue",
5152
"rpc_server",
@@ -56,44 +57,48 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
5657

5758
println!(" [x] Awaiting RPC requests");
5859

59-
for delivery in consumer {
60-
let (channel, delivery) = delivery?;
61-
let n = u64::from_le_bytes(
62-
delivery
63-
.data
64-
.as_slice()
65-
.try_into()
66-
.map_err(|_| Error::CannotDecodeArg)?,
67-
);
68-
println!(" [.] fib({})", n);
69-
let response = fib(n);
70-
71-
let routing_key = delivery
72-
.properties
73-
.reply_to()
74-
.as_ref()
75-
.ok_or(Error::MissingReplyTo)?
76-
.as_str();
77-
78-
let correlation_id = delivery
79-
.properties
80-
.correlation_id()
81-
.clone()
82-
.ok_or(Error::MissingCorrelationId)?;
83-
84-
channel
85-
.basic_publish(
86-
"",
87-
routing_key,
88-
BasicPublishOptions::default(),
89-
response.to_le_bytes().to_vec(),
90-
BasicProperties::default().with_correlation_id(correlation_id),
91-
)
92-
.await?;
93-
94-
channel
95-
.basic_ack(delivery.delivery_tag, BasicAckOptions::default())
96-
.await?;
60+
61+
while let Some(delivery) = consumer.next().await {
62+
if let Ok(delivery) = delivery {
63+
println!(" [x] Received {:?}", std::str::from_utf8(&delivery.data)?);
64+
let n = u64::from_le_bytes(
65+
delivery
66+
.data
67+
.as_slice()
68+
.try_into()
69+
.map_err(|_| Error::CannotDecodeArg)?,
70+
);
71+
println!(" [.] fib({})", n);
72+
let response = fib(n);
73+
let payload = response.to_be_bytes();
74+
75+
let routing_key = delivery
76+
.properties
77+
.reply_to()
78+
.as_ref()
79+
.ok_or(Error::MissingReplyTo)?
80+
.as_str();
81+
82+
let correlation_id = delivery
83+
.properties
84+
.correlation_id()
85+
.clone()
86+
.ok_or(Error::MissingCorrelationId)?;
87+
88+
channel
89+
.basic_publish(
90+
"",
91+
routing_key,
92+
BasicPublishOptions::default(),
93+
&payload,
94+
BasicProperties::default().with_correlation_id(correlation_id),
95+
)
96+
.await?;
97+
98+
channel
99+
.basic_ack(delivery.delivery_tag, BasicAckOptions::default())
100+
.await?;
101+
}
97102
}
98103

99104
Ok(())

0 commit comments

Comments
 (0)