Skip to content

Commit 3d1c9e5

Browse files
committed
Update receivers
Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com>
1 parent 735b801 commit 3d1c9e5

File tree

3 files changed

+27
-20
lines changed

3 files changed

+27
-20
lines changed

rust/src/bin/emit_log_direct.rs

+4-4
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,8 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
77
let args: Vec<_> = std::env::args().skip(1).collect();
88
let severity = args.first().map(String::as_str).unwrap_or("info");
99
let message = match args.len() {
10-
x if x < 2 => b"Hello, world!".to_vec(),
11-
_ => args[1..].join(" ").into_bytes(),
10+
x if x < 2 => "Hello, world!".to_string(),
11+
_ => args[1..].join(" ").to_string(),
1212
};
1313

1414
let addr = "amqp://127.0.0.1:5672";
@@ -29,15 +29,15 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
2929
"direct_logs",
3030
severity,
3131
BasicPublishOptions::default(),
32-
&*message.clone(),
32+
message.as_bytes(),
3333
BasicProperties::default(),
3434
)
3535
.await?;
3636

3737
println!(
3838
" [x] Sent {}:{:?}",
3939
severity,
40-
std::str::from_utf8(&message)?
40+
std::str::from_utf8(message.as_bytes())?
4141
);
4242

4343
conn.close(0, "").await?;

rust/src/bin/receive_logs.rs

+8-4
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use futures::StreamExt;
12
use lapin::{options::*, types::FieldTable, Connection, ConnectionProperties, ExchangeKind};
23

34
#[tokio::main]
@@ -36,7 +37,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
3637
)
3738
.await?;
3839

39-
let consumer = channel
40+
let mut consumer = channel
4041
.basic_consume(
4142
queue.name().as_str(),
4243
"consumer",
@@ -50,10 +51,13 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
5051

5152
println!(" [*] Waiting for logs. To exit press CTRL+C");
5253

53-
for delivery in consumer {
54-
let (_, delivery) = delivery?;
55-
println!(" [x] {:?}", std::str::from_utf8(&delivery.data)?);
54+
55+
while let Some(delivery) = consumer.next().await {
56+
if let Ok(delivery) = delivery {
57+
println!(" [x] Received {:?}", std::str::from_utf8(&delivery.data)?);
58+
}
5659
}
5760

61+
5862
Ok(())
5963
}

rust/src/bin/receive_logs_direct.rs

+15-12
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
1-
use lapin::{options::*, types::FieldTable, Connection, ConnectionProperties, ExchangeKind};
1+
use std::borrow::Borrow;
2+
3+
use futures::StreamExt;
4+
use lapin::{Connection, ConnectionProperties, ExchangeKind, options::*, types::FieldTable};
25

36
#[tokio::main]
47
async fn main() -> Result<(), Box<dyn std::error::Error>> {
@@ -35,6 +38,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
3538
)
3639
.await?;
3740

41+
3842
futures::future::join_all(severities.into_iter().map(|severity| {
3943
channel.queue_bind(
4044
queue.name().as_str(),
@@ -43,10 +47,9 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
4347
QueueBindOptions::default(),
4448
FieldTable::default(),
4549
)
46-
}))
47-
.await;
50+
})).await;
4851

49-
let consumer = channel
52+
let mut consumer = channel
5053
.basic_consume(
5154
queue.name().as_str(),
5255
"consumer",
@@ -60,14 +63,14 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
6063

6164
println!(" [*] Waiting for logs. To exit press CTRL+C");
6265

63-
for delivery in consumer {
64-
let (_, delivery) = delivery?;
65-
println!(
66-
" [x] {}:{:?}",
67-
delivery.routing_key,
68-
std::str::from_utf8(&delivery.data)?
69-
);
66+
while let Some(delivery) = consumer.next().await {
67+
if let Ok(delivery) = delivery {
68+
println!(
69+
" [x] {}:{:?}",
70+
delivery.routing_key,
71+
std::str::from_utf8(&delivery.data)?
72+
);
73+
}
7074
}
71-
7275
Ok(())
7376
}

0 commit comments

Comments
 (0)