diff --git a/cas/store/BUILD b/cas/store/BUILD index 23aeab01e..4a1b8d1b6 100644 --- a/cas/store/BUILD +++ b/cas/store/BUILD @@ -270,11 +270,13 @@ rust_library( "//util:buf_channel", "//util:common", "//util:error", + "//util:retry", "//util:write_request_stream_wrapper", "@crate_index//:bytes", "@crate_index//:futures", "@crate_index//:parking_lot", "@crate_index//:prost", + "@crate_index//:rand", "@crate_index//:shellexpand", "@crate_index//:tokio", "@crate_index//:tonic", diff --git a/cas/store/grpc_store.rs b/cas/store/grpc_store.rs index f65a55900..97e852ded 100644 --- a/cas/store/grpc_store.rs +++ b/cas/store/grpc_store.rs @@ -12,14 +12,18 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::marker::Send; use std::pin::Pin; use std::sync::Arc; +use std::time::Duration; use async_trait::async_trait; use bytes::BytesMut; use futures::stream::{unfold, FuturesUnordered}; -use futures::{future, Stream, TryStreamExt}; +use futures::{future, Future, Stream, TryStreamExt}; use prost::Message; +use rand::{rngs::OsRng, Rng}; +use tokio::time::sleep; use tonic::{transport, IntoRequest, Request, Response, Streaming}; use uuid::Uuid; @@ -38,6 +42,7 @@ use proto::google::bytestream::{ byte_stream_client::ByteStreamClient, QueryWriteStatusRequest, QueryWriteStatusResponse, ReadRequest, ReadResponse, WriteRequest, WriteResponse, }; +use retry::{ExponentialBackoff, Retrier, RetryResult}; use traits::{StoreTrait, UploadSizeInfo}; use write_request_stream_wrapper::WriteRequestStreamWrapper; @@ -50,10 +55,32 @@ pub struct GrpcStore { bytestream_client: ByteStreamClient, ac_client: ActionCacheClient, store_type: config::stores::StoreType, + jitter_fn: Box Duration + Send + Sync>, + retry: config::stores::Retry, + retrier: Retrier, } impl GrpcStore { pub async fn new(config: &config::stores::GrpcStore) -> Result { + let jitter_amt = config.retry.jitter; + Self::new_with_jitter( + config, + Box::new(move |delay: Duration| { + if jitter_amt == 0. { + return delay; + } + let min = 1. - (jitter_amt / 2.); + let max = 1. + (jitter_amt / 2.); + delay.mul_f32(OsRng.gen_range(min..max)) + }), + ) + .await + } + + pub async fn new_with_jitter( + config: &config::stores::GrpcStore, + jitter_fn: Box Duration + Send + Sync>, + ) -> Result { error_if!(config.endpoints.is_empty(), "Expected at least 1 endpoint in GrpcStore"); let mut endpoints = Vec::with_capacity(config.endpoints.len()); for endpoint in &config.endpoints { @@ -77,9 +104,38 @@ impl GrpcStore { bytestream_client: ByteStreamClient::new(conn.clone()), ac_client: ActionCacheClient::new(conn), store_type: config.store_type, + jitter_fn, + retry: config.retry.to_owned(), + retrier: Retrier::new(Box::new(|duration| Box::pin(sleep(duration)))), }) } + async fn perform_request(&self, input: I, mut request: F) -> Result + where + F: FnMut(I) -> Fut + Send + Copy, + Fut: Future> + Send, + R: Send, + I: Send + Clone, + { + let retry_config = ExponentialBackoff::new(Duration::from_millis(self.retry.delay as u64)) + .map(|d| (self.jitter_fn)(d)) + .take(self.retry.max_retries); // Remember this is number of retries, so will run max_retries + 1. + self.retrier + .retry( + retry_config, + unfold(input, move |input| async move { + let input_clone = input.clone(); + Some(( + request(input_clone) + .await + .map_or_else(RetryResult::Retry, RetryResult::Ok), + input, + )) + }), + ) + .await + } + pub async fn find_missing_blobs( &self, grpc_request: Request, @@ -91,11 +147,14 @@ impl GrpcStore { let mut request = grpc_request.into_inner(); request.instance_name = self.instance_name.clone(); - let mut client = self.cas_client.clone(); - client - .find_missing_blobs(Request::new(request)) - .await - .err_tip(|| "in GrpcStore::find_missing_blobs") + self.perform_request(request, |request| async move { + self.cas_client + .clone() + .find_missing_blobs(Request::new(request)) + .await + .err_tip(|| "in GrpcStore::find_missing_blobs") + }) + .await } pub async fn batch_update_blobs( @@ -109,11 +168,14 @@ impl GrpcStore { let mut request = grpc_request.into_inner(); request.instance_name = self.instance_name.clone(); - let mut client = self.cas_client.clone(); - client - .batch_update_blobs(Request::new(request)) - .await - .err_tip(|| "in GrpcStore::batch_update_blobs") + self.perform_request(request, |request| async move { + self.cas_client + .clone() + .batch_update_blobs(Request::new(request)) + .await + .err_tip(|| "in GrpcStore::batch_update_blobs") + }) + .await } pub async fn batch_read_blobs( @@ -127,11 +189,14 @@ impl GrpcStore { let mut request = grpc_request.into_inner(); request.instance_name = self.instance_name.clone(); - let mut client = self.cas_client.clone(); - client - .batch_read_blobs(Request::new(request)) - .await - .err_tip(|| "in GrpcStore::batch_read_blobs") + self.perform_request(request, |request| async move { + self.cas_client + .clone() + .batch_read_blobs(Request::new(request)) + .await + .err_tip(|| "in GrpcStore::batch_read_blobs") + }) + .await } pub async fn get_tree( @@ -145,11 +210,14 @@ impl GrpcStore { let mut request = grpc_request.into_inner(); request.instance_name = self.instance_name.clone(); - let mut client = self.cas_client.clone(); - client - .get_tree(Request::new(request)) - .await - .err_tip(|| "in GrpcStore::get_tree") + self.perform_request(request, |request| async move { + self.cas_client + .clone() + .get_tree(Request::new(request)) + .await + .err_tip(|| "in GrpcStore::get_tree") + }) + .await } pub async fn read( @@ -174,11 +242,14 @@ impl GrpcStore { request.resource_name.get((first_slash_pos + 1)..).unwrap() ); - let mut client = self.bytestream_client.clone(); - client - .read(Request::new(request)) - .await - .err_tip(|| "in GrpcStore::read") + self.perform_request(request, |request| async move { + self.bytestream_client + .clone() + .read(Request::new(request)) + .await + .err_tip(|| "in GrpcStore::read") + }) + .await } pub async fn write(&self, stream: WriteRequestStreamWrapper) -> Result, Error> @@ -259,11 +330,14 @@ impl GrpcStore { request.resource_name.get((first_slash_pos + 1)..).unwrap() ); - let mut client = self.bytestream_client.clone(); - client - .query_write_status(Request::new(request)) - .await - .err_tip(|| "in GrpcStore::query_write_status") + self.perform_request(request, |request| async move { + self.bytestream_client + .clone() + .query_write_status(Request::new(request)) + .await + .err_tip(|| "in GrpcStore::query_write_status") + }) + .await } pub async fn get_action_result( @@ -272,11 +346,14 @@ impl GrpcStore { ) -> Result, Error> { let mut request = grpc_request.into_inner(); request.instance_name = self.instance_name.clone(); - let mut client = self.ac_client.clone(); - client - .get_action_result(Request::new(request)) - .await - .err_tip(|| "in GrpcStore::get_action_result") + self.perform_request(request, |request| async move { + self.ac_client + .clone() + .get_action_result(Request::new(request)) + .await + .err_tip(|| "in GrpcStore::get_action_result") + }) + .await } pub async fn update_action_result( @@ -285,11 +362,14 @@ impl GrpcStore { ) -> Result, Error> { let mut request = grpc_request.into_inner(); request.instance_name = self.instance_name.clone(); - let mut client = self.ac_client.clone(); - client - .update_action_result(Request::new(request)) - .await - .err_tip(|| "in GrpcStore::update_action_result") + self.perform_request(request, |request| async move { + self.ac_client + .clone() + .update_action_result(Request::new(request)) + .await + .err_tip(|| "in GrpcStore::update_action_result") + }) + .await } async fn get_action_result_from_digest(&self, digest: DigestInfo) -> Result, Error> { diff --git a/config/stores.rs b/config/stores.rs index 8cb30cf9e..05e3cabf8 100644 --- a/config/stores.rs +++ b/config/stores.rs @@ -408,6 +408,10 @@ pub struct GrpcStore { /// The type of the upstream store, this ensures that the correct server calls are made. pub store_type: StoreType, + + /// Retry configuration to use when a network request fails. + #[serde(default)] + pub retry: Retry, } /// Retry configuration. This configuration is exponential and each iteration