-
Notifications
You must be signed in to change notification settings - Fork 105
/
retry.rs
87 lines (75 loc) · 2.48 KB
/
retry.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
// Copyright 2022 The Native Link Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use futures::future::Future;
use futures::stream::StreamExt;
use std::pin::Pin;
use std::time::Duration;
use error::{make_err, Code, Error};
pub struct ExponentialBackoff {
current: Duration,
}
impl ExponentialBackoff {
pub fn new(base: Duration) -> Self {
ExponentialBackoff { current: base }
}
}
impl Iterator for ExponentialBackoff {
type Item = Duration;
fn next(&mut self) -> Option<Duration> {
self.current *= 2;
Some(self.current)
}
}
type SleepFn = Arc<dyn Fn(Duration) -> Pin<Box<dyn Future<Output = ()> + Send>> + Sync + Send>;
#[derive(PartialEq, Eq, Debug)]
pub enum RetryResult<T> {
Ok(T),
Retry(Error),
Err(Error),
}
/// Class used to retry a job with a sleep function in between each retry.
#[derive(Clone)]
pub struct Retrier {
sleep_fn: SleepFn,
}
impl Retrier {
pub fn new(sleep_fn: SleepFn) -> Self {
Retrier { sleep_fn }
}
pub fn retry<'a, T, Fut, I>(
&'a self,
duration_iter: I,
operation: Fut,
) -> Pin<Box<dyn Future<Output = Result<T, Error>> + 'a + Send>>
where
Fut: futures::stream::Stream<Item = RetryResult<T>> + Send + 'a,
I: IntoIterator<Item = Duration> + Send + 'a,
<I as IntoIterator>::IntoIter: Send,
T: Send,
{
Box::pin(async move {
let mut iter = duration_iter.into_iter();
let mut operation = Box::pin(operation);
loop {
match operation.next().await {
None => return Err(make_err!(Code::Internal, "Retry stream ended abruptly",)),
Some(RetryResult::Ok(value)) => return Ok(value),
Some(RetryResult::Err(e)) => return Err(e),
Some(RetryResult::Retry(e)) => (self.sleep_fn)(iter.next().ok_or(e)?).await,
}
}
})
}
}