-
Notifications
You must be signed in to change notification settings - Fork 28
/
basic_pub_sub.rs
102 lines (89 loc) · 2.85 KB
/
basic_pub_sub.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
use amqprs::{
callbacks::{DefaultChannelCallback, DefaultConnectionCallback},
channel::{
BasicConsumeArguments, BasicPublishArguments, QueueBindArguments, QueueDeclareArguments,
},
connection::{Connection, OpenConnectionArguments},
consumer::DefaultConsumer,
BasicProperties,
};
use tokio::time;
use tracing_subscriber::{fmt, prelude::*, EnvFilter};
#[tokio::main(flavor = "multi_thread", worker_threads = 2)]
async fn main() {
// construct a subscriber that prints formatted traces to stdout
// global subscriber with log level according to RUST_LOG
tracing_subscriber::registry()
.with(fmt::layer())
.with(EnvFilter::from_default_env())
.try_init()
.ok();
// open a connection to RabbitMQ server
let connection = Connection::open(&OpenConnectionArguments::new(
"localhost",
5672,
"user",
"bitnami",
))
.await
.unwrap();
connection
.register_callback(DefaultConnectionCallback)
.await
.unwrap();
// open a channel on the connection
let channel = connection.open_channel(None).await.unwrap();
channel
.register_callback(DefaultChannelCallback)
.await
.unwrap();
// declare a durable queue
let (queue_name, _, _) = channel
.queue_declare(QueueDeclareArguments::durable_client_named(
"amqprs.examples.basic",
))
.await
.unwrap()
.unwrap();
// bind the queue to exchange
let routing_key = "amqprs.example";
let exchange_name = "amq.topic";
channel
.queue_bind(QueueBindArguments::new(
&queue_name,
exchange_name,
routing_key,
))
.await
.unwrap();
//////////////////////////////////////////////////////////////////////////////
// start consumer with given name
let args = BasicConsumeArguments::new(&queue_name, "example_basic_pub_sub");
channel
.basic_consume(DefaultConsumer::new(args.no_ack), args)
.await
.unwrap();
//////////////////////////////////////////////////////////////////////////////
// publish message
let content = String::from(
r#"
{
"publisher": "example"
"data": "Hello, amqprs!"
}
"#,
)
.into_bytes();
// create arguments for basic_publish
let args = BasicPublishArguments::new(exchange_name, routing_key);
channel
.basic_publish(BasicProperties::default(), content, args)
.await
.unwrap();
// keep the `channel` and `connection` object from dropping before pub/sub is done.
// channel/connection will be closed when drop.
time::sleep(time::Duration::from_secs(1)).await;
// explicitly close
channel.close().await.unwrap();
connection.close().await.unwrap();
}