-
Notifications
You must be signed in to change notification settings - Fork 316
Cosmos: Add Basic Retry Policy for Throtteled Requests #3230
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Cosmos: Add Basic Retry Policy for Throtteled Requests #3230
Conversation
API Change CheckAPIView identified API level changes in this PR and created the following API reviews |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR implements a basic retry policy infrastructure for handling throttled (429 status) requests in Azure Cosmos DB, including an exponential backoff mechanism and comprehensive test coverage.
Key changes:
- Added retry policy framework with
RetryPolicy
trait andResourceThrottleRetryPolicy
implementation - Implemented
BackOffRetryHandler
to wrap requests with automatic retry logic - Integrated retry handler into
CosmosPipeline
for transparent retry behavior
Reviewed Changes
Copilot reviewed 12 out of 12 changed files in this pull request and generated 3 comments.
Show a summary per file
File | Description |
---|---|
sdk/cosmos/azure_data_cosmos/src/retry_policies/mod.rs |
Defines core retry policy infrastructure including RetryPolicy trait, ShouldRetryResult , BaseRetryPolicy , and configuration |
sdk/cosmos/azure_data_cosmos/src/retry_policies/resource_throttle_retry_policy.rs |
Implements ResourceThrottleRetryPolicy for handling 429 errors with exponential backoff |
sdk/cosmos/azure_data_cosmos/src/handler/retry_handler.rs |
Defines AbstractRetryHandler trait and BackOffRetryHandler implementation for wrapping requests with retry logic |
sdk/cosmos/azure_data_cosmos/src/handler/mod.rs |
Module definition for handler types |
sdk/cosmos/azure_data_cosmos/src/pipeline/mod.rs |
Integrates BackOffRetryHandler into CosmosPipeline request flow |
sdk/cosmos/azure_data_cosmos/tests/retry_policy_429_test.rs |
Comprehensive unit tests validating retry behavior, backoff calculation, and limit enforcement |
sdk/cosmos/azure_data_cosmos/src/routing/mod.rs |
Adds type aliases for routing-related collections |
sdk/cosmos/azure_data_cosmos/src/routing/location_cache.rs |
Adds Debug derives and helper methods for endpoint management |
sdk/cosmos/azure_data_cosmos/src/options/mod.rs |
Adds Debug derive to CosmosClientOptions |
sdk/cosmos/azure_data_cosmos/src/lib.rs |
Exposes new public modules and reorganizes location_cache |
sdk/cosmos/azure_data_cosmos/Cargo.toml |
Adds tokio dependency with time feature |
sdk/cosmos/.dict.txt |
Adds spelling dictionary entries for "backoff" and "pluggable" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Definitely some things to look in to. In general, there's a fair bit of "Object-Oriented" design in here that I don't think translates well into Rust. We should try to avoid translating C#/Java concepts in to Rust and look at a Rust-first approach. Rust code typically avoids a lot of virtual dispatch and high-level abstractions.
#[allow(dead_code)] | ||
#[cfg_attr(not(target_arch = "wasm32"), async_trait)] | ||
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] | ||
pub trait AbstractRetryHandler: Send + Sync { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just RetryHandler
would be the style here. Rust code generally doesn't refer to OOP terms like Abstract
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed. Updated the trait naming.
/// Result of a retry policy decision | ||
/// | ||
/// This struct encapsulates the decision made by a retry policy about whether | ||
/// an operation should be retried and how long to wait before the retry attempt. | ||
/// | ||
/// # Fields | ||
/// * `should_retry` - Whether the operation should be retried | ||
/// * `backoff_time` - Duration to wait before retrying (meaningful only if `should_retry` is true) | ||
#[derive(Debug, Clone)] | ||
pub struct ShouldRetryResult { | ||
pub should_retry: bool, | ||
pub back_off_time: Duration, | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The more idiomatic way to implement this in Rust is to use an enum
. In Rust, enum variants can carry values, which you can use to implement the back-off time:
pub enum RetryResult {
/// Indicates that the operation should not be retried.
DoNotRetry,
/// Indicates that the operation should be retried after waiting for the duration specified in `after`.
Retry { after: Duration }
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed. Refactored the code.
impl ShouldRetryResult { | ||
/// Creates a result indicating the operation should not be retried | ||
/// | ||
/// # Example | ||
/// ``` | ||
/// use azure_data_cosmos::retry_policies::ShouldRetryResult; | ||
/// | ||
/// let result = ShouldRetryResult::no_retry(); | ||
/// assert!(!result.should_retry); | ||
/// ``` | ||
pub fn no_retry() -> Self { | ||
Self { | ||
should_retry: false, | ||
back_off_time: Duration::ZERO, | ||
} | ||
} | ||
|
||
/// Creates a result indicating the operation should be retried after a delay | ||
/// | ||
/// # Arguments | ||
/// * `backoff` - The duration to wait before retrying | ||
/// | ||
/// # Example | ||
/// ``` | ||
/// use azure_data_cosmos::retry_policies::ShouldRetryResult; | ||
/// use std::time::Duration; | ||
/// | ||
/// let result = ShouldRetryResult::retry_after(Duration::from_secs(5)); | ||
/// assert!(result.should_retry); | ||
/// assert_eq!(result.back_off_time, Duration::from_secs(5)); | ||
/// ``` | ||
pub fn retry_after(backoff: Duration) -> Self { | ||
Self { | ||
should_retry: true, | ||
back_off_time: backoff, | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You wouldn't really need this if you're using an enum, you'd have it already in the standard construction syntax:
RetryResult::DoNotRetry
RetryResult::Retry { after: backoff }
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed. Refactored the code.
Ok(r) | ||
// Clone pipeline and convert context to owned so the closure can be Fn | ||
let pipeline = self.pipeline.clone(); | ||
let ctx_owned = ctx.with_value(resource_link).into_owned(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you could avoid this .into_owned()
(which may force an allocation) by having the retry handler (and the sender closure) take it as a parameter, and flowing it down that way rather than closing over it.
/// Type alias matching C# `ReadOnlyCollection<Uri>` semantics in a lightweight form. | ||
/// We use `Arc<[Url]>` for cheap cloning and slice immutability. | ||
pub type ReadOnlyUrlCollection = Arc<[Url]>; | ||
|
||
/// Type alias representing `ReadOnlyDictionary<string, Uri>`. | ||
pub type ReadOnlyLocationMap = Arc<HashMap<String, Url>>; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we need these. First off, we shouldn't just be porting C# concepts over to Rust since we're aiming to make an idiomatic Rust SDK. Second, the idea of separate "read-only collection" types in Rust isn't really necessary. Rust already enforces that only the owner of a collection, or the holder of a &mut
reference, can modify it, so the only place you really need to enforce "read-only" behavior in a collection type is if you want to restrict even the owner from modifying the collection and that's not usually necessary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes yuo are right. These are not needed anymore. Removed.
sdk/cosmos/azure_data_cosmos/src/retry_policies/resource_throttle_retry_policy.rs
Outdated
Show resolved
Hide resolved
max_attempt_count: usize, | ||
backoff_delay_factor: u32, | ||
max_wait_time: Duration, | ||
current_attempt_count: AtomicUsize, | ||
cumulative_retry_delay: Arc<AtomicUsize>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The state management here isn't quite right. The ResourceThrottleRetryPolicy
is a singleton for the entire application, but most of this state is per-request.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now that you're creating an instance of this per-request, I don't think these need to be Arc
s or AtomicUsize
s at all. Regular usize
should be fine, since the instance is being used entirely within a single request "thread"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, and because all the RetryPolicy
methods take &self
, you end up having to use atomic types. That's the Rust compiler giving you a hint that something is a little wrong about how you're handling mutability :).
If a method can mutate fields on self
, it needs to take it in as &mut self
. Rust will then guarantee that as long as the caller is using safe Rust (i.e. not inside an unsafe
block), NO other running code is holding a mutable reference.
I suggest a quick review of References and Borrowing in the Rust Book! Getting the hang of these is a key part of understanding the magic of Rust :)
/// retry policy should be used for this specific request. | ||
/// # Arguments | ||
/// * `request` - The HTTP request to analyze | ||
pub fn get_policy_for_request(&self, _request: &Request) -> Arc<dyn RetryPolicy> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think a RetryPolicy
should be constructed for each request. That seems to me like the cleanest way to have clean per-request state. In Rust, most allocations will be done on the stack, and are thus very cheap, so don't be afraid to create new instances of structs. Heap allocations (which are what we generally try to avoid in managed languages like C#/Java) only happen when you're using wrappers like Box
, Arc
, or Vec
, or for very specific types, like String
, where heap-allocation is essential. Creating a new value of a struct containing only primitive (number) types and returning it is pretty cheap.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes - And you are right. I was not in a favor of creating the retry policy per request to avoid any memory allocation. However, reading further, I think this is safe to do.
Refactored the code to reflect the changes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How is this different than the retry policy we already have in ClientOptions
for all HTTP clients? It's already handles 429 and clients can customize the behavior in several ways.
None, | ||
); | ||
|
||
CosmosPipeline { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How does this fit into the azure core retry policies? We should also double check these as the behavior from core is not necessarily what we want. We have overriden some of the retry configurations from core in python sdk
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We intend to have the same conceptual design as other Azure SDK languages. If you can customize retry behavior in other languages, within reason we'd prefer to replicate that in core as well. Adding policies in clients should be relatively rare.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reason we wanted to keep the cosmos retry policies separate from azure_core general retry policies are:
- We have a lot of cross regional routing logic, based of different status codes.
- We will need to inject our endpoint manager, location caches, pk-range based routing into the retry layer to route the request correctly.
I understand that there is already a base retry policy that exists today in azure_core, and let me explore a bit more to check if they can be leveraged and overridden to modify the behaviors.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thing is, you can't remove the default retry policy. At best, you could have your client set it to none
- overriding any policy options a user might set (or honor them as a copy, but then overwrite them) - and inject your own.
We have the same design as any other Azure SDK language, though. Did you need a custom retry policy for other languages? Are there gaps in our core design that necessitate some additional options perhaps?
You're correct that the current behavior is essentially the same as the existing policy. Cosmos has a lot more retry logic that we'll need to add in the future though (for partition topology changes, cross-region retry, cross-region failover/circuit breakers, etc.), and right now it isn't really possible to re-use the @kundadebdatta and I did chat about plugging our retry policy in to the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Definitely better state management. We talked about seeing if we can rework this to using azure_core::http::policies::Policy
. I'd still like you to look at that, but the rest looks good aside from some style/idiomatic suggestions.
|
||
// Helper trait to conditionally require Send on non-WASM targets | ||
#[cfg(not(target_arch = "wasm32"))] | ||
pub trait CosmosConditionalSend: Send {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should just be ConditionalSend
. It's private to our crate, and even if it was a public API, we generally avoid prefixing types with the crate name (i.e. Cosmos
) in Rust. There are exceptions (CosmosClient
) but it's not like in .NET where we tend to prefix a lot of types with a product/package name like Cosmos
/// a pluggable retry policy system. It wraps HTTP requests with intelligent retry logic | ||
/// that handles both transient network errors and HTTP error responses. | ||
#[derive(Debug, Clone)] | ||
pub struct BackOffRetryHandler {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: It's generally idiomatic to just remove the {}
entirely if you're writing a type with no fields:
pub struct BackOffRetryHandler {} | |
pub struct BackOffRetryHandler; |
/// retry policy should be used for this specific request. | ||
/// # Arguments | ||
/// * `request` - The HTTP request to analyze | ||
pub fn get_policy_for_request(&self, _request: &Request) -> Arc<dyn RetryPolicy> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: policy_for_request
. "Getters" are usually just unprefixed in Rust.
#[allow(dead_code)] | ||
#[cfg_attr(not(target_arch = "wasm32"), async_trait)] | ||
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] | ||
pub trait RetryHandler: Send + Sync { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As mentioned offline, I think we can just have our retry handler be an azure_core::http::policies::Policy
. Even if we don't want it to be a Policy for some reason, we generally avoid introducing traits until we actually have a significant need for them (it's not like in .NET/Java where interfaces/abstract classes get used often for "testability")
max_attempt_count: usize, | ||
backoff_delay_factor: u32, | ||
max_wait_time: Duration, | ||
current_attempt_count: AtomicUsize, | ||
cumulative_retry_delay: Arc<AtomicUsize>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now that you're creating an instance of this per-request, I don't think these need to be Arc
s or AtomicUsize
s at all. Regular usize
should be fine, since the instance is being used entirely within a single request "thread"
let backoff1 = match result1 { | ||
RetryResult::Retry { after } => after, | ||
_ => panic!("Expected retry result"), | ||
}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are a few more compact ways to express "two-arm" matches (where one arm is just _
). I'd suggest this:
let RetryResult::Retry { after: backoff1 } = result1 else {
panic!("Expected retry result")
}
That will match on RetryResult::Retry
and then bind the after
field to the local variable backoff1
. If the result doesn't match that pattern, the else
block will be taken. It's the same code as you have above, just a slightly more compact and idiomatic way to write it.
There's also a similar inverted syntax:
let backoff1 = if let RetryResult::Retry { after } = value {
after
} else {
panic!("Expected retry result");
}
That one is just FYI, I think the first syntax is preferable in this case.
match result4 { | ||
RetryResult::DoNotRetry => { | ||
println!("Fourth attempt - should not retry"); | ||
} | ||
RetryResult::Retry { .. } => panic!("Should NOT retry after exceeding max attempts"), | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can be an assert_eq!
I think:
match result4 { | |
RetryResult::DoNotRetry => { | |
println!("Fourth attempt - should not retry"); | |
} | |
RetryResult::Retry { .. } => panic!("Should NOT retry after exceeding max attempts"), | |
} | |
assert_eq!(result4, RetryResult::DoNotRetry); |
match result { | ||
RetryResult::DoNotRetry => { | ||
// Success - should not retry on client errors | ||
} | ||
RetryResult::Retry { .. } => panic!("Should NOT retry on {}", description), | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another place for an assert_eq!
match result { | ||
RetryResult::DoNotRetry => { | ||
// Success - should not retry | ||
} | ||
RetryResult::Retry { .. } => panic!("Should NOT retry on successful response"), | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another place for an assert_eq!
))) | ||
} | ||
#[tokio::test] | ||
async fn test_retry_policy_handles_429_status() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit (across all these tests): The test_
prefix on test functions isn't usually used in Rust.
Description
Adds structs and traits to define the structure of the cross regional retry logic. Adds a basic implementation of a
ResourceThrottleRetryPolicy
to retry the throtteled requests.Changes
RetryHandler
.BackoffRetryHandler
.retry_policies
module and implementedResourceThrottleRetryPolicy
.All existing functionality remains unchanged and fully backward compatible.
Additional instructions:
Fixes #3170
💬 Share your feedback on Copilot coding agent for the chance to win a $200 gift card! Click here to start the survey.