Skip to content

Commit

Permalink
Wait for AMQP connection in worker (useful in docker-compose deployme…
Browse files Browse the repository at this point in the history
…nts)
  • Loading branch information
GamePad64 committed Dec 2, 2024
1 parent 8b3de3a commit 8db9daa
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 1 deletion.
1 change: 1 addition & 0 deletions notifico-worker/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ notifico-dbpipeline = { path = "../notifico-dbpipeline" }

anyhow = "1.0.93"
async-trait = "0.1.83"
backoff = { version = "0.4.0", features = ["tokio"] }
clap = { workspace = true }
dotenvy = "0.15.7"
fe2o3-amqp = { version = "0.13.1" }
Expand Down
7 changes: 6 additions & 1 deletion notifico-worker/src/amqp.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use async_trait::async_trait;
use backoff::future::retry;
use backoff::ExponentialBackoff;
use fe2o3_amqp::connection::ConnectionHandle;
use fe2o3_amqp::session::SessionHandle;
use fe2o3_amqp::{Connection, Receiver, Sender, Session};
Expand All @@ -18,7 +20,10 @@ pub struct AmqpClient {
impl AmqpClient {
pub async fn connect(url: Url, container_id: String) -> anyhow::Result<Self> {
info!("Connecting to AMQP broker: {}", url);
let mut connection = Connection::open(container_id, url.clone()).await?;
let mut connection = retry(ExponentialBackoff::default(), || async {
Ok(Connection::open(container_id.clone(), url.clone()).await?)
})
.await?;
info!("Connected to AMQP broker: {}", url);
let session = Session::begin(&mut connection).await?;
Ok(Self {
Expand Down

0 comments on commit 8db9daa

Please sign in to comment.