Skip to content
Open
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
2616c75
Initial commit to add global endpoint manager.
kundadebdatta Oct 11, 2025
f740ad0
Code changes to add retry policy and handlers.
kundadebdatta Oct 15, 2025
f249d29
Code changes to create a base retry policy.
kundadebdatta Oct 16, 2025
0dc70ef
Code changes to update retry handler.
kundadebdatta Oct 17, 2025
7fa5bf5
Initial commit to introduce retry handler and resource throttle retry…
kundadebdatta Oct 17, 2025
82295ad
Refactor code to use a delegate for the retry handler.
kundadebdatta Oct 17, 2025
b0e3a04
Code changes to clean up.
kundadebdatta Oct 17, 2025
281a957
Updated cargo
kundadebdatta Oct 17, 2025
93321dd
Code changes to add documentation. More refactoring.
kundadebdatta Oct 18, 2025
3452d37
Code changes to apply formatting and stlyecorp changes.
kundadebdatta Oct 18, 2025
fe73bf8
Code changes to fix CI build failures.
kundadebdatta Oct 19, 2025
cc2c1b5
Code changes to fix formatting error.
kundadebdatta Oct 19, 2025
b144368
Code changes to fix stylecorp
kundadebdatta Oct 19, 2025
945187c
Code changes to fix formatting.
kundadebdatta Oct 19, 2025
b644f40
Code changes to add conditional trait to support WASM and non-WASM ta…
kundadebdatta Oct 20, 2025
361ebbd
Formatting changes
kundadebdatta Oct 20, 2025
1f62b51
Code changes to update cargo toml
kundadebdatta Oct 20, 2025
7b97662
Code changes to fix code comment spell check.
kundadebdatta Oct 20, 2025
8a964bd
Adding valid words.
kundadebdatta Oct 20, 2025
f137f7b
Merge branch 'main' into users/kundadebdatta/3170_build_initial_retry…
kundadebdatta Oct 20, 2025
b356da1
Code changes to address review comments.
kundadebdatta Oct 21, 2025
c04ae7a
Code changes to fix formatting.
kundadebdatta Oct 21, 2025
4b88a7f
Code changes to move unit tests.
kundadebdatta Oct 21, 2025
e97078d
Update cargo format
kundadebdatta Oct 22, 2025
c74d400
Code changes to address review comments.
kundadebdatta Oct 23, 2025
0c0c4e2
Code changes to fix formatting and clippy issues.
kundadebdatta Oct 23, 2025
ae6ba7f
Code changes to fix style warnings.
kundadebdatta Oct 23, 2025
af3f3fa
Update trait def to ConditionalSend.
kundadebdatta Oct 23, 2025
79ad231
Injecting no_retry policy for the pipeline.
kundadebdatta Oct 23, 2025
d5b316b
Code changes to fix formatting issues.
kundadebdatta Oct 23, 2025
1a32cc8
Code changes to add is_retry() method in RetryResult.
kundadebdatta Oct 23, 2025
20e1cac
Code changes to use Box for single ownership.
kundadebdatta Oct 23, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions sdk/cosmos/.dict.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ euclidian
pkranges
sprocs
udfs
backoff
pluggable

# Cosmos' docs all use "Autoscale" as a single word, rather than a compound "AutoScale" or "Auto Scale"
autoscale
Expand Down
5 changes: 5 additions & 0 deletions sdk/cosmos/azure_data_cosmos/src/handler/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

//! Handler types for request processing and retry logic.
pub(crate) mod retry_handler;
142 changes: 142 additions & 0 deletions sdk/cosmos/azure_data_cosmos/src/handler/retry_handler.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

use crate::retry_policies::resource_throttle_retry_policy::ResourceThrottleRetryPolicy;
use crate::retry_policies::{RetryPolicy, RetryResult};
use async_trait::async_trait;
use azure_core::http::{request::Request, RawResponse};
use std::sync::Arc;
use typespec_client_core::async_runtime::get_async_runtime;

// Helper trait to conditionally require Send on non-WASM targets
#[cfg(not(target_arch = "wasm32"))]
pub trait CosmosConditionalSend: Send {}
#[cfg(not(target_arch = "wasm32"))]
impl<T: Send> CosmosConditionalSend for T {}

#[cfg(target_arch = "wasm32")]
pub trait CosmosConditionalSend {}
#[cfg(target_arch = "wasm32")]
impl<T> CosmosConditionalSend for T {}

/// Trait defining the interface for retry handlers in Cosmos DB operations
///
/// This trait provides a contract for implementing retry logic that wraps HTTP requests
/// with automatic retry capabilities. Implementations can inject custom retry policies
/// and handle both transient failures (errors) and non-success HTTP responses.
#[allow(dead_code)]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
pub trait RetryHandler: Send + Sync {
/// Sends an HTTP request with automatic retry logic
///
/// This method wraps the provided sender callback with retry logic, automatically
/// handling transient failures and implementing exponential backoff. The method
/// will continue retrying until either:
/// - The request succeeds (non-error 2xx status)
/// - The retry policy determines no more retries should be attempted
/// - Maximum retry attempts are exceeded
///
/// # Arguments
/// * `request` - Mutable reference to the HTTP request to send (may be modified by retry policy)
/// * `sender` - Callback function that performs the actual HTTP request. This function
/// takes a mutable request reference and returns a future that resolves to
/// a `RawResponse` or error.
///
/// # Type Parameters
/// * `Sender` - Function type that takes `&mut Request` and returns a Future
/// * `Fut` - Future type returned by the sender that resolves to `Result<RawResponse>`
///
/// # Returns
/// `Result<RawResponse>` - The final response (success or failure after all retry attempts)
async fn send<Sender, Fut>(
&self,
request: &mut Request,
sender: Sender,
) -> azure_core::Result<RawResponse>
where
Sender: Fn(&mut Request) -> Fut + Send + Sync,
Fut: std::future::Future<Output = azure_core::Result<RawResponse>> + CosmosConditionalSend;
}

/// Concrete retry handler implementation with exponential back off.
/// This handler provides automatic retry capabilities for Cosmos DB operations using
/// a pluggable retry policy system. It wraps HTTP requests with intelligent retry logic
/// that handles both transient network errors and HTTP error responses.
#[derive(Debug, Clone)]
pub struct BackOffRetryHandler {}

impl BackOffRetryHandler {
/// Creates a new instance of `BackOffRetryHandler`
///
/// This constructor initializes a retry handler with exponential backoff capabilities.
/// The handler will dynamically select appropriate retry policies based on the request
/// characteristics (e.g., `ResourceThrottleRetryPolicy` for rate limiting scenarios).
///
/// # Returns
///
/// A new `BackOffRetryHandler` instance ready to handle request retries with
/// automatic policy selection and exponential backoff logic.
///
/// # Example
///
/// ```rust,ignore
/// use azure_data_cosmos::handler::retry_handler::BackOffRetryHandler;
///
/// let retry_handler = BackOffRetryHandler::new();
/// // Use retry_handler.send() to make requests with automatic retry
/// ```
pub fn new() -> Self {
Self {}
}

/// Returns the appropriate retry policy based on the request
///
/// This method examines the underlying operation and resource types and determines
/// retry policy should be used for this specific request.
/// # Arguments
/// * `request` - The HTTP request to analyze
pub fn get_policy_for_request(&self, _request: &Request) -> Arc<dyn RetryPolicy> {
// For now, always return ResourceThrottleRetryPolicy. Future implementation should check
// the request operation type and resource type and accordingly return the respective retry
// policy.
Arc::new(ResourceThrottleRetryPolicy::new(5, 200, 10))
}
}

#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
impl RetryHandler for BackOffRetryHandler {
/// Sends an HTTP request with automatic retry and exponential back off
///
/// This implementation of the `RetryHandler::send` method provides robust
/// retry logic.
///
/// # Arguments
/// * `request` - Mutable HTTP request (may be modified by retry policy between attempts)
/// * `sender` - Callback that performs the actual HTTP request
async fn send<Sender, Fut>(
&self,
request: &mut Request,
sender: Sender,
) -> azure_core::Result<RawResponse>
where
Sender: Fn(&mut Request) -> Fut + Send + Sync,
Fut: std::future::Future<Output = azure_core::Result<RawResponse>> + CosmosConditionalSend,
{
// Get the appropriate retry policy based on the request
let retry_policy = self.get_policy_for_request(request);
retry_policy.on_before_send_request(request);

loop {
// Invoke the provided sender callback instead of calling inner_send_async directly
let result = sender(request).await;
let retry_result = retry_policy.should_retry(&result).await;

match retry_result {
RetryResult::DoNotRetry => return result,
RetryResult::Retry { after } => get_async_runtime().sleep(after).await,
}
}
}
}
5 changes: 3 additions & 2 deletions sdk/cosmos/azure_data_cosmos/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ pub(crate) mod utils;

pub mod models;

mod location_cache;

#[doc(inline)]
pub use clients::CosmosClient;

Expand All @@ -28,3 +26,6 @@ pub use partition_key::*;
pub use query::Query;

pub use feed::{FeedPage, FeedPager};
pub(crate) mod handler;
pub mod retry_policies;
pub mod routing;
2 changes: 1 addition & 1 deletion sdk/cosmos/azure_data_cosmos/src/options/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use std::fmt;
use std::fmt::Display;

/// Options used when creating a [`CosmosClient`](crate::CosmosClient).
#[derive(Clone, Default)]
#[derive(Clone, Default, Debug)]
pub struct CosmosClientOptions {
pub client_options: ClientOptions,
}
Expand Down
38 changes: 27 additions & 11 deletions sdk/cosmos/azure_data_cosmos/src/pipeline/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use futures::TryStreamExt;
use serde::de::DeserializeOwned;
use url::Url;

use crate::handler::retry_handler::{BackOffRetryHandler, RetryHandler};
use crate::{
constants,
models::ThroughputProperties,
Expand All @@ -29,6 +30,7 @@ use crate::{
pub struct CosmosPipeline {
pub endpoint: Url,
pipeline: azure_core::http::Pipeline,
retry_handler: BackOffRetryHandler,
}

impl CosmosPipeline {
Expand All @@ -37,16 +39,19 @@ impl CosmosPipeline {
auth_policy: AuthorizationPolicy,
client_options: ClientOptions,
) -> Self {
let pipeline = azure_core::http::Pipeline::new(
option_env!("CARGO_PKG_NAME"),
option_env!("CARGO_PKG_VERSION"),
client_options,
Vec::new(),
vec![Arc::new(auth_policy)],
None,
);

CosmosPipeline {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does this fit into the azure core retry policies? We should also double check these as the behavior from core is not necessarily what we want. We have overriden some of the retry configurations from core in python sdk

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We intend to have the same conceptual design as other Azure SDK languages. If you can customize retry behavior in other languages, within reason we'd prefer to replicate that in core as well. Adding policies in clients should be relatively rare.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason we wanted to keep the cosmos retry policies separate from azure_core general retry policies are:

  • We have a lot of cross regional routing logic, based of different status codes.
  • We will need to inject our endpoint manager, location caches, pk-range based routing into the retry layer to route the request correctly.

I understand that there is already a base retry policy that exists today in azure_core, and let me explore a bit more to check if they can be leveraged and overridden to modify the behaviors.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thing is, you can't remove the default retry policy. At best, you could have your client set it to none - overriding any policy options a user might set (or honor them as a copy, but then overwrite them) - and inject your own.

We have the same design as any other Azure SDK language, though. Did you need a custom retry policy for other languages? Are there gaps in our core design that necessitate some additional options perhaps?

endpoint,
pipeline: azure_core::http::Pipeline::new(
option_env!("CARGO_PKG_NAME"),
option_env!("CARGO_PKG_VERSION"),
client_options,
Vec::new(),
vec![Arc::new(auth_policy)],
None,
),
pipeline,
retry_handler: BackOffRetryHandler::new(),
}
}

Expand All @@ -65,9 +70,20 @@ impl CosmosPipeline {
request: &mut Request,
resource_link: ResourceLink,
) -> azure_core::Result<RawResponse> {
let ctx = ctx.with_value(resource_link);
let r = self.pipeline.send(&ctx, request, None).await?;
Ok(r)
// Clone pipeline and convert context to owned so the closure can be Fn
let pipeline = self.pipeline.clone();
let ctx_owned = ctx.with_value(resource_link).into_owned();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you could avoid this .into_owned() (which may force an allocation) by having the retry handler (and the sender closure) take it as a parameter, and flowing it down that way rather than closing over it.


// Build a sender closure that forwards to the inner pipeline.send
let sender = move |req: &mut Request| {
let pipeline = pipeline.clone();
let ctx = ctx_owned.clone();
let mut req_clone = req.clone();
async move { pipeline.send(&ctx, &mut req_clone, None).await }
};

// Delegate to the retry handler, providing the sender callback
self.retry_handler.send(request, sender).await
}

pub async fn send<T>(
Expand Down
61 changes: 61 additions & 0 deletions sdk/cosmos/azure_data_cosmos/src/retry_policies/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

pub mod resource_throttle_retry_policy;
use async_trait::async_trait;
use azure_core::http::RawResponse;
use azure_core::time::Duration;
use typespec_client_core::http::Request;

/// Result of a retry policy decision
///
/// This enum represents the outcome of evaluating whether an HTTP request should be retried
/// after encountering an error or receiving a response that may warrant a retry (such as
/// transient failures, rate limiting, or service unavailability).
///
/// # Variants
///
/// * `DoNotRetry` - The operation should not be retried. This is returned for successful
/// responses, permanent failures, or when retry limits have been exhausted.
///
/// * `Retry { after }` - The operation should be retried after waiting for the specified
/// duration. The delay allows for exponential backoff or respects server-provided retry
/// hints (e.g., from `Retry-After` headers).
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum RetryResult {
/// Indicates that the operation should not be retried.
DoNotRetry,
/// Indicates that the operation should be retried after waiting for the duration specified in `after`.
Retry { after: Duration },
}

/// Trait defining the retry policy interface for Cosmos DB operations
///
/// This trait provides a contract for implementing retry logic for transient failures
/// in Azure Cosmos DB operations. Implementers can define custom retry behavior for
/// both exceptions (errors) and HTTP responses based on their specific requirements.
#[async_trait]
pub trait RetryPolicy: Send + Sync {
/// Called before sending a request to allow policy-specific modifications
///
/// This method is invoked immediately before each request is sent (including retries).
/// # Arguments
/// * `request` - Mutable reference to the HTTP request being sent
fn on_before_send_request(&self, request: &mut Request);

/// Determines whether an HTTP request should be retried based on the response or error
///
/// This method evaluates the result of an HTTP request attempt and decides whether
/// the operation should be retried, and if so, how long to wait before the next attempt.
///
/// # Arguments
///
/// * `response` - A reference to the result of the HTTP request attempt. This can be:
/// - `Ok(RawResponse)` - A successful HTTP response (which may still indicate an error via status code)
/// - `Err(azure_core::Error)` - A network or client-side error
///
/// # Returns
///
/// A `RetryResult` indicating the retry decision.
async fn should_retry(&self, response: &azure_core::Result<RawResponse>) -> RetryResult;
}
Loading
Loading