-
Notifications
You must be signed in to change notification settings - Fork 316
Cosmos: Add Basic Retry Policy for Throtteled Requests #3230
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 23 commits
2616c75
f740ad0
f249d29
0dc70ef
7fa5bf5
82295ad
b0e3a04
281a957
93321dd
3452d37
fe73bf8
cc2c1b5
b144368
945187c
b644f40
361ebbd
1f62b51
7b97662
8a964bd
f137f7b
b356da1
c04ae7a
4b88a7f
e97078d
c74d400
0c0c4e2
ae6ba7f
af3f3fa
79ad231
d5b316b
1a32cc8
20e1cac
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,5 @@ | ||
| // Copyright (c) Microsoft Corporation. All rights reserved. | ||
| // Licensed under the MIT License. | ||
|
|
||
| //! Handler types for request processing and retry logic. | ||
| pub(crate) mod retry_handler; |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,142 @@ | ||
| // Copyright (c) Microsoft Corporation. All rights reserved. | ||
| // Licensed under the MIT License. | ||
|
|
||
| use crate::retry_policies::resource_throttle_retry_policy::ResourceThrottleRetryPolicy; | ||
| use crate::retry_policies::{RetryPolicy, RetryResult}; | ||
| use async_trait::async_trait; | ||
| use azure_core::http::{request::Request, RawResponse}; | ||
| use std::sync::Arc; | ||
| use typespec_client_core::async_runtime::get_async_runtime; | ||
|
|
||
| // Helper trait to conditionally require Send on non-WASM targets | ||
| #[cfg(not(target_arch = "wasm32"))] | ||
| pub trait CosmosConditionalSend: Send {} | ||
| #[cfg(not(target_arch = "wasm32"))] | ||
| impl<T: Send> CosmosConditionalSend for T {} | ||
|
|
||
| #[cfg(target_arch = "wasm32")] | ||
| pub trait CosmosConditionalSend {} | ||
| #[cfg(target_arch = "wasm32")] | ||
| impl<T> CosmosConditionalSend for T {} | ||
|
|
||
| /// Trait defining the interface for retry handlers in Cosmos DB operations | ||
| /// | ||
| /// This trait provides a contract for implementing retry logic that wraps HTTP requests | ||
| /// with automatic retry capabilities. Implementations can inject custom retry policies | ||
| /// and handle both transient failures (errors) and non-success HTTP responses. | ||
| #[allow(dead_code)] | ||
| #[cfg_attr(not(target_arch = "wasm32"), async_trait)] | ||
| #[cfg_attr(target_arch = "wasm32", async_trait(?Send))] | ||
| pub trait RetryHandler: Send + Sync { | ||
kundadebdatta marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| /// Sends an HTTP request with automatic retry logic | ||
| /// | ||
| /// This method wraps the provided sender callback with retry logic, automatically | ||
| /// handling transient failures and implementing exponential backoff. The method | ||
| /// will continue retrying until either: | ||
| /// - The request succeeds (non-error 2xx status) | ||
| /// - The retry policy determines no more retries should be attempted | ||
| /// - Maximum retry attempts are exceeded | ||
| /// | ||
| /// # Arguments | ||
| /// * `request` - Mutable reference to the HTTP request to send (may be modified by retry policy) | ||
| /// * `sender` - Callback function that performs the actual HTTP request. This function | ||
| /// takes a mutable request reference and returns a future that resolves to | ||
| /// a `RawResponse` or error. | ||
| /// | ||
| /// # Type Parameters | ||
| /// * `Sender` - Function type that takes `&mut Request` and returns a Future | ||
| /// * `Fut` - Future type returned by the sender that resolves to `Result<RawResponse>` | ||
| /// | ||
| /// # Returns | ||
| /// `Result<RawResponse>` - The final response (success or failure after all retry attempts) | ||
| async fn send<Sender, Fut>( | ||
| &self, | ||
| request: &mut Request, | ||
| sender: Sender, | ||
| ) -> azure_core::Result<RawResponse> | ||
| where | ||
| Sender: Fn(&mut Request) -> Fut + Send + Sync, | ||
| Fut: std::future::Future<Output = azure_core::Result<RawResponse>> + CosmosConditionalSend; | ||
| } | ||
|
|
||
| /// Concrete retry handler implementation with exponential back off. | ||
| /// This handler provides automatic retry capabilities for Cosmos DB operations using | ||
| /// a pluggable retry policy system. It wraps HTTP requests with intelligent retry logic | ||
| /// that handles both transient network errors and HTTP error responses. | ||
| #[derive(Debug, Clone)] | ||
| pub struct BackOffRetryHandler {} | ||
kundadebdatta marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| impl BackOffRetryHandler { | ||
| /// Creates a new instance of `BackOffRetryHandler` | ||
| /// | ||
| /// This constructor initializes a retry handler with exponential backoff capabilities. | ||
| /// The handler will dynamically select appropriate retry policies based on the request | ||
| /// characteristics (e.g., `ResourceThrottleRetryPolicy` for rate limiting scenarios). | ||
| /// | ||
| /// # Returns | ||
| /// | ||
| /// A new `BackOffRetryHandler` instance ready to handle request retries with | ||
| /// automatic policy selection and exponential backoff logic. | ||
| /// | ||
| /// # Example | ||
| /// | ||
| /// ```rust,ignore | ||
| /// use azure_data_cosmos::handler::retry_handler::BackOffRetryHandler; | ||
| /// | ||
| /// let retry_handler = BackOffRetryHandler::new(); | ||
| /// // Use retry_handler.send() to make requests with automatic retry | ||
| /// ``` | ||
| pub fn new() -> Self { | ||
| Self {} | ||
| } | ||
|
|
||
| /// Returns the appropriate retry policy based on the request | ||
| /// | ||
| /// This method examines the underlying operation and resource types and determines | ||
| /// retry policy should be used for this specific request. | ||
| /// # Arguments | ||
| /// * `request` - The HTTP request to analyze | ||
| pub fn get_policy_for_request(&self, _request: &Request) -> Arc<dyn RetryPolicy> { | ||
kundadebdatta marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| // For now, always return ResourceThrottleRetryPolicy. Future implementation should check | ||
| // the request operation type and resource type and accordingly return the respective retry | ||
| // policy. | ||
| Arc::new(ResourceThrottleRetryPolicy::new(5, 200, 10)) | ||
| } | ||
| } | ||
|
|
||
| #[cfg_attr(not(target_arch = "wasm32"), async_trait)] | ||
| #[cfg_attr(target_arch = "wasm32", async_trait(?Send))] | ||
| impl RetryHandler for BackOffRetryHandler { | ||
| /// Sends an HTTP request with automatic retry and exponential back off | ||
kundadebdatta marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| /// | ||
| /// This implementation of the `RetryHandler::send` method provides robust | ||
| /// retry logic. | ||
| /// | ||
| /// # Arguments | ||
| /// * `request` - Mutable HTTP request (may be modified by retry policy between attempts) | ||
| /// * `sender` - Callback that performs the actual HTTP request | ||
| async fn send<Sender, Fut>( | ||
| &self, | ||
| request: &mut Request, | ||
| sender: Sender, | ||
| ) -> azure_core::Result<RawResponse> | ||
| where | ||
| Sender: Fn(&mut Request) -> Fut + Send + Sync, | ||
| Fut: std::future::Future<Output = azure_core::Result<RawResponse>> + CosmosConditionalSend, | ||
| { | ||
| // Get the appropriate retry policy based on the request | ||
| let retry_policy = self.get_policy_for_request(request); | ||
| retry_policy.on_before_send_request(request); | ||
|
|
||
| loop { | ||
| // Invoke the provided sender callback instead of calling inner_send_async directly | ||
| let result = sender(request).await; | ||
| let retry_result = retry_policy.should_retry(&result).await; | ||
|
|
||
| match retry_result { | ||
| RetryResult::DoNotRetry => return result, | ||
| RetryResult::Retry { after } => get_async_runtime().sleep(after).await, | ||
| } | ||
| } | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -17,6 +17,7 @@ use futures::TryStreamExt; | |
| use serde::de::DeserializeOwned; | ||
| use url::Url; | ||
|
|
||
| use crate::handler::retry_handler::{BackOffRetryHandler, RetryHandler}; | ||
| use crate::{ | ||
| constants, | ||
| models::ThroughputProperties, | ||
|
|
@@ -29,6 +30,7 @@ use crate::{ | |
| pub struct CosmosPipeline { | ||
| pub endpoint: Url, | ||
| pipeline: azure_core::http::Pipeline, | ||
| retry_handler: BackOffRetryHandler, | ||
| } | ||
|
|
||
| impl CosmosPipeline { | ||
|
|
@@ -37,16 +39,19 @@ impl CosmosPipeline { | |
| auth_policy: AuthorizationPolicy, | ||
| client_options: ClientOptions, | ||
| ) -> Self { | ||
| let pipeline = azure_core::http::Pipeline::new( | ||
| option_env!("CARGO_PKG_NAME"), | ||
| option_env!("CARGO_PKG_VERSION"), | ||
| client_options, | ||
| Vec::new(), | ||
| vec![Arc::new(auth_policy)], | ||
| None, | ||
| ); | ||
|
|
||
| CosmosPipeline { | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How does this fit into the azure core retry policies? We should also double check these as the behavior from core is not necessarily what we want. We have overriden some of the retry configurations from core in python sdk There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We intend to have the same conceptual design as other Azure SDK languages. If you can customize retry behavior in other languages, within reason we'd prefer to replicate that in core as well. Adding policies in clients should be relatively rare. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The reason we wanted to keep the cosmos retry policies separate from azure_core general retry policies are:
I understand that there is already a base retry policy that exists today in azure_core, and let me explore a bit more to check if they can be leveraged and overridden to modify the behaviors. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thing is, you can't remove the default retry policy. At best, you could have your client set it to We have the same design as any other Azure SDK language, though. Did you need a custom retry policy for other languages? Are there gaps in our core design that necessitate some additional options perhaps? |
||
| endpoint, | ||
| pipeline: azure_core::http::Pipeline::new( | ||
| option_env!("CARGO_PKG_NAME"), | ||
| option_env!("CARGO_PKG_VERSION"), | ||
| client_options, | ||
| Vec::new(), | ||
| vec![Arc::new(auth_policy)], | ||
| None, | ||
| ), | ||
| pipeline, | ||
| retry_handler: BackOffRetryHandler::new(), | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -65,9 +70,20 @@ impl CosmosPipeline { | |
| request: &mut Request, | ||
| resource_link: ResourceLink, | ||
| ) -> azure_core::Result<RawResponse> { | ||
| let ctx = ctx.with_value(resource_link); | ||
| let r = self.pipeline.send(&ctx, request, None).await?; | ||
| Ok(r) | ||
| // Clone pipeline and convert context to owned so the closure can be Fn | ||
| let pipeline = self.pipeline.clone(); | ||
| let ctx_owned = ctx.with_value(resource_link).into_owned(); | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think you could avoid this |
||
|
|
||
| // Build a sender closure that forwards to the inner pipeline.send | ||
| let sender = move |req: &mut Request| { | ||
| let pipeline = pipeline.clone(); | ||
| let ctx = ctx_owned.clone(); | ||
| let mut req_clone = req.clone(); | ||
| async move { pipeline.send(&ctx, &mut req_clone, None).await } | ||
| }; | ||
|
|
||
| // Delegate to the retry handler, providing the sender callback | ||
| self.retry_handler.send(request, sender).await | ||
| } | ||
|
|
||
| pub async fn send<T>( | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,61 @@ | ||
| // Copyright (c) Microsoft Corporation. All rights reserved. | ||
| // Licensed under the MIT License. | ||
|
|
||
| pub mod resource_throttle_retry_policy; | ||
| use async_trait::async_trait; | ||
| use azure_core::http::RawResponse; | ||
| use azure_core::time::Duration; | ||
| use typespec_client_core::http::Request; | ||
|
|
||
| /// Result of a retry policy decision | ||
| /// | ||
| /// This enum represents the outcome of evaluating whether an HTTP request should be retried | ||
| /// after encountering an error or receiving a response that may warrant a retry (such as | ||
| /// transient failures, rate limiting, or service unavailability). | ||
| /// | ||
| /// # Variants | ||
| /// | ||
| /// * `DoNotRetry` - The operation should not be retried. This is returned for successful | ||
| /// responses, permanent failures, or when retry limits have been exhausted. | ||
| /// | ||
| /// * `Retry { after }` - The operation should be retried after waiting for the specified | ||
| /// duration. The delay allows for exponential backoff or respects server-provided retry | ||
| /// hints (e.g., from `Retry-After` headers). | ||
| #[derive(Debug, Clone, PartialEq, Eq)] | ||
| pub enum RetryResult { | ||
| /// Indicates that the operation should not be retried. | ||
| DoNotRetry, | ||
| /// Indicates that the operation should be retried after waiting for the duration specified in `after`. | ||
| Retry { after: Duration }, | ||
| } | ||
|
|
||
| /// Trait defining the retry policy interface for Cosmos DB operations | ||
| /// | ||
| /// This trait provides a contract for implementing retry logic for transient failures | ||
| /// in Azure Cosmos DB operations. Implementers can define custom retry behavior for | ||
| /// both exceptions (errors) and HTTP responses based on their specific requirements. | ||
| #[async_trait] | ||
| pub trait RetryPolicy: Send + Sync { | ||
| /// Called before sending a request to allow policy-specific modifications | ||
| /// | ||
| /// This method is invoked immediately before each request is sent (including retries). | ||
| /// # Arguments | ||
| /// * `request` - Mutable reference to the HTTP request being sent | ||
| fn on_before_send_request(&self, request: &mut Request); | ||
kundadebdatta marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| /// Determines whether an HTTP request should be retried based on the response or error | ||
| /// | ||
| /// This method evaluates the result of an HTTP request attempt and decides whether | ||
| /// the operation should be retried, and if so, how long to wait before the next attempt. | ||
| /// | ||
| /// # Arguments | ||
| /// | ||
| /// * `response` - A reference to the result of the HTTP request attempt. This can be: | ||
| /// - `Ok(RawResponse)` - A successful HTTP response (which may still indicate an error via status code) | ||
| /// - `Err(azure_core::Error)` - A network or client-side error | ||
| /// | ||
| /// # Returns | ||
| /// | ||
| /// A `RetryResult` indicating the retry decision. | ||
| async fn should_retry(&self, response: &azure_core::Result<RawResponse>) -> RetryResult; | ||
kundadebdatta marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.