Skip to content

Commit 33ea0cb

Browse files
committed
feat: added retry to service
1 parent 259aaea commit 33ea0cb

File tree

2 files changed

+62
-41
lines changed

2 files changed

+62
-41
lines changed

src/flow_service/mod.rs

Lines changed: 12 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
use crate::flow_definition::Reader;
1+
use crate::{flow_definition::Reader, flow_service::retry::create_channel_with_retry};
2+
use tonic::transport::Channel;
23
use tucana::{
34
aquila::{
45
DataTypeUpdateRequest, FlowTypeUpdateRequest, RuntimeFunctionDefinitionUpdateRequest,
@@ -9,18 +10,20 @@ use tucana::{
910
shared::{DefinitionDataType as DataType, FlowType, RuntimeFunctionDefinition},
1011
};
1112

13+
mod retry;
14+
1215
pub struct FlowUpdateService {
13-
aquila_url: String,
1416
data_types: Vec<DataType>,
1517
runtime_definitions: Vec<RuntimeFunctionDefinition>,
1618
flow_types: Vec<FlowType>,
19+
channel: Channel,
1720
}
1821

1922
impl FlowUpdateService {
2023
/// Create a new FlowUpdateService instance from an Aquila URL and a definition path.
2124
///
2225
/// This will read the definition files from the given path and initialize the service with the data types, runtime definitions, and flow types.
23-
pub fn from_url(aquila_url: String, definition_path: &str) -> Self {
26+
pub async fn from_url(aquila_url: String, definition_path: &str) -> Self {
2427
let mut data_types = Vec::new();
2528
let mut runtime_definitions = Vec::new();
2629
let mut flow_types = Vec::new();
@@ -41,11 +44,13 @@ impl FlowUpdateService {
4144
runtime_definitions.append(&mut feature.functions.clone());
4245
}
4346

47+
let channel = create_channel_with_retry("Aquila", aquila_url).await;
48+
4449
Self {
45-
aquila_url,
4650
data_types,
4751
runtime_definitions,
4852
flow_types,
53+
channel,
4954
}
5055
}
5156

@@ -80,17 +85,7 @@ impl FlowUpdateService {
8085
}
8186

8287
log::info!("Updating the current DataTypes!");
83-
let mut client = match DataTypeServiceClient::connect(self.aquila_url.clone()).await {
84-
Ok(client) => {
85-
log::info!("Successfully connected to the DataTypeService");
86-
client
87-
}
88-
Err(err) => {
89-
log::error!("Failed to connect to the DataTypeService: {:?}", err);
90-
return;
91-
}
92-
};
93-
88+
let mut client = DataTypeServiceClient::new(self.channel.clone());
9489
let request = DataTypeUpdateRequest {
9590
data_types: self.data_types.clone(),
9691
};
@@ -115,21 +110,7 @@ impl FlowUpdateService {
115110
}
116111

117112
log::info!("Updating the current RuntimeDefinitions!");
118-
let mut client =
119-
match RuntimeFunctionDefinitionServiceClient::connect(self.aquila_url.clone()).await {
120-
Ok(client) => {
121-
log::info!("Connected to RuntimeFunctionDefinitionService");
122-
client
123-
}
124-
Err(err) => {
125-
log::error!(
126-
"Failed to connect to RuntimeFunctionDefinitionService: {:?}",
127-
err
128-
);
129-
return;
130-
}
131-
};
132-
113+
let mut client = RuntimeFunctionDefinitionServiceClient::new(self.channel.clone());
133114
let request = RuntimeFunctionDefinitionUpdateRequest {
134115
runtime_functions: self.runtime_definitions.clone(),
135116
};
@@ -154,17 +135,7 @@ impl FlowUpdateService {
154135
}
155136

156137
log::info!("Updating the current FlowTypes!");
157-
let mut client = match FlowTypeServiceClient::connect(self.aquila_url.clone()).await {
158-
Ok(client) => {
159-
log::info!("Connected to FlowTypeService!");
160-
client
161-
}
162-
Err(err) => {
163-
log::error!("Failed to connect to FlowTypeService: {:?}", err);
164-
return;
165-
}
166-
};
167-
138+
let mut client = FlowTypeServiceClient::new(self.channel.clone());
168139
let request = FlowTypeUpdateRequest {
169140
flow_types: self.flow_types.clone(),
170141
};

src/flow_service/retry.rs

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
use tokio::time::{Duration, sleep};
2+
use tonic::transport::{Channel, Endpoint};
3+
4+
const MAX_BACKOFF: u64 = 2000 * 60;
5+
const MAX_RETRIES: i8 = 10;
6+
7+
// Will create a channel and retry if its not possible
8+
pub async fn create_channel_with_retry(channel_name: &str, url: String) -> Channel {
9+
let mut backoff = 100;
10+
let mut retries = 0;
11+
12+
loop {
13+
let channel = match Endpoint::from_shared(url.clone()) {
14+
Ok(c) => {
15+
log::debug!("Creating a new endpoint for the: {} Service", channel_name);
16+
c.connect_timeout(Duration::from_secs(2))
17+
.timeout(Duration::from_secs(10))
18+
}
19+
Err(err) => {
20+
panic!(
21+
"Cannot create Endpoint for Service: `{}`. Reason: {:?}",
22+
channel_name, err
23+
);
24+
}
25+
};
26+
27+
match channel.connect().await {
28+
Ok(ch) => {
29+
return ch;
30+
}
31+
Err(err) => {
32+
log::warn!(
33+
"Retry connect to `{}` using url: `{}` failed: {:?}, retrying in {}ms",
34+
channel_name,
35+
url,
36+
err,
37+
backoff
38+
);
39+
sleep(Duration::from_millis(backoff)).await;
40+
41+
backoff = (backoff * 2).min(MAX_BACKOFF);
42+
retries += 1;
43+
44+
if retries >= MAX_RETRIES {
45+
panic!("Reached max retries to url {}", url)
46+
}
47+
}
48+
}
49+
}
50+
}

0 commit comments

Comments
 (0)