Skip to content

Commit abbc8a3

Browse files
authored
Policy Architecture (Azure#204)
* Start policy architecture * Request and Response wrapper types * Options
1 parent d4c5f9a commit abbc8a3

28 files changed

+451
-120
lines changed

sdk/core/src/headers/mod.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,9 @@ pub const MS_DATE: &str = "x-ms-date";
99

1010
pub trait AddAsHeader {
1111
fn add_as_header(&self, builder: Builder) -> Builder;
12+
fn add_as_header2(&self, _request: &mut crate::Request) {
13+
unimplemented!()
14+
}
1215
}
1316

1417
#[must_use]
@@ -27,6 +30,12 @@ pub fn add_optional_header<T: AddAsHeader>(item: &Option<T>, mut builder: Builde
2730
builder
2831
}
2932

33+
pub fn add_optional_header2<T: AddAsHeader>(item: &Option<T>, request: &mut crate::Request) {
34+
if let Some(item) = item {
35+
item.add_as_header2(request);
36+
}
37+
}
38+
3039
#[must_use]
3140
pub fn add_mandatory_header<T: AddAsHeader>(item: &T, builder: Builder) -> Builder {
3241
item.add_as_header(builder)

sdk/core/src/http.rs

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
pub struct Request {
2+
inner: http::Request<bytes::Bytes>,
3+
}
4+
5+
impl Request {
6+
/// Get the inner http::Request object, replacing it
7+
/// with an empty one.
8+
/// Note: this method will soon be replaced
9+
pub fn take_inner(&mut self) -> http::Request<bytes::Bytes> {
10+
std::mem::replace(&mut self.inner, http::Request::new(bytes::Bytes::new()))
11+
}
12+
13+
pub fn body<T: serde::Serialize>(
14+
&mut self,
15+
body: T,
16+
) -> Result<(), Box<dyn std::error::Error + Sync + Send>> {
17+
let b = self.inner.body_mut();
18+
*b = crate::to_json(&body)?;
19+
Ok(())
20+
}
21+
}
22+
23+
impl From<http::Request<bytes::Bytes>> for Request {
24+
fn from(inner: http::Request<bytes::Bytes>) -> Self {
25+
Self { inner }
26+
}
27+
}
28+
29+
pub struct Response {
30+
inner: http::Response<bytes::Bytes>,
31+
}
32+
33+
impl Response {
34+
/// TODO: get rid of this
35+
pub fn into_inner(self) -> http::Response<bytes::Bytes> {
36+
self.inner
37+
}
38+
}
39+
40+
impl From<http::Response<bytes::Bytes>> for Response {
41+
fn from(inner: http::Response<bytes::Bytes>) -> Self {
42+
Self { inner }
43+
}
44+
}

sdk/core/src/lib.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,15 +12,15 @@ extern crate serde_derive;
1212
mod macros;
1313

1414
pub mod errors;
15-
mod etag;
1615
pub mod headers;
16+
mod http;
1717
mod http_client;
1818
pub mod incompletevector;
19-
pub mod lease;
19+
mod models;
2020
pub mod parsing;
21+
mod policy;
2122
pub mod prelude;
2223
mod request_options;
23-
mod stored_access_policy;
2424
pub mod util;
2525

2626
use chrono::{DateTime, Utc};
@@ -30,9 +30,11 @@ use oauth2::AccessToken;
3030
use std::fmt::Debug;
3131
use uuid::Uuid;
3232

33+
pub use self::http::{Request, Response};
3334
pub use headers::AddAsHeader;
3435
pub use http_client::{to_json, HttpClient};
35-
pub use stored_access_policy::{StoredAccessPolicy, StoredAccessPolicyList};
36+
pub use models::*;
37+
pub use policy::*;
3638

3739
pub type RequestId = Uuid;
3840
pub type SessionToken = String;
File renamed without changes.
File renamed without changes.

sdk/core/src/models/mod.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pub(crate) mod etag;
2+
pub mod lease;
3+
mod stored_access_policy;
4+
5+
pub use stored_access_policy::{StoredAccessPolicy, StoredAccessPolicyList};
File renamed without changes.

sdk/core/src/policy.rs

Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
use crate::{Request, Response};
2+
3+
use async_trait::async_trait;
4+
5+
use std::error::Error;
6+
use std::future::Future;
7+
use std::pin::Pin;
8+
use std::sync::{Arc, Mutex};
9+
10+
pub type PolicyResult<T> = Result<T, Box<dyn Error + Send + Sync>>;
11+
12+
#[derive(Copy, Clone, Debug)]
13+
pub struct RetryOptions {
14+
num_retries: usize,
15+
}
16+
17+
impl RetryOptions {
18+
pub fn new(num_retries: usize) -> Self {
19+
Self { num_retries }
20+
}
21+
}
22+
23+
#[derive(Copy, Clone, Debug)]
24+
pub struct RetryPolicy {
25+
options: RetryOptions,
26+
}
27+
28+
impl RetryPolicy {
29+
pub fn new(options: RetryOptions) -> Self {
30+
Self { options }
31+
}
32+
}
33+
34+
#[async_trait]
35+
impl Policy for RetryPolicy {
36+
async fn send(
37+
&self,
38+
ctx: Context,
39+
request: &mut Request,
40+
next: &[Arc<dyn Policy>],
41+
) -> PolicyResult<Response> {
42+
let retries = self.options.num_retries;
43+
let mut last_result = next[0].send(ctx.clone(), request, &next[1..]).await;
44+
loop {
45+
if last_result.is_ok() || retries == 0 {
46+
return last_result;
47+
}
48+
49+
last_result = next[0].send(ctx.clone(), request, &next[1..]).await;
50+
}
51+
}
52+
}
53+
54+
type BoxedFuture<T> = Box<dyn Future<Output = PolicyResult<T>> + Send>;
55+
type Transport = dyn Fn(Context, &mut Request) -> Pin<BoxedFuture<Response>> + Send;
56+
57+
pub struct TransportOptions {
58+
send: Box<Mutex<Transport>>,
59+
}
60+
61+
impl TransportOptions {
62+
pub fn new<F>(send: F) -> Self
63+
where
64+
F: Fn(Context, &mut Request) -> Pin<BoxedFuture<Response>> + Send + 'static,
65+
{
66+
Self {
67+
send: Box::new(Mutex::new(send)),
68+
}
69+
}
70+
}
71+
72+
impl std::fmt::Debug for TransportOptions {
73+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
74+
f.write_str("TransportOptions")
75+
}
76+
}
77+
#[derive(Debug)]
78+
pub struct TransportPolicy {
79+
options: TransportOptions,
80+
}
81+
82+
impl TransportPolicy {
83+
pub fn new(options: TransportOptions) -> Self {
84+
Self { options }
85+
}
86+
}
87+
88+
#[async_trait::async_trait]
89+
impl Policy for TransportPolicy {
90+
async fn send(
91+
&self,
92+
ctx: Context,
93+
request: &mut Request,
94+
next: &[Arc<dyn Policy>],
95+
) -> PolicyResult<Response> {
96+
if !next.is_empty() {
97+
panic!("Transport policy was not last policy")
98+
}
99+
let response = {
100+
let transport = self.options.send.lock().unwrap();
101+
(transport)(ctx, request)
102+
};
103+
Ok(response.await?)
104+
}
105+
}
106+
107+
#[derive(Clone)]
108+
pub struct Context {
109+
// Temporary hack to make sure that Context is not initializeable
110+
// Soon Context will have proper data fields
111+
_priv: (),
112+
}
113+
114+
impl Context {
115+
pub fn new() -> Self {
116+
Self { _priv: () }
117+
}
118+
}
119+
120+
#[async_trait::async_trait]
121+
pub trait Policy: Send + Sync + std::fmt::Debug {
122+
async fn send(
123+
&self,
124+
ctx: Context,
125+
request: &mut Request,
126+
next: &[Arc<dyn Policy>],
127+
) -> PolicyResult<Response>;
128+
}
129+
130+
#[derive(Debug, Clone)]
131+
pub struct Pipeline {
132+
policies: Vec<Arc<dyn Policy>>,
133+
}
134+
135+
impl Pipeline {
136+
// TODO: how can we ensure that the transport policy is the last policy?
137+
// Make this more idiot proof
138+
pub fn new(policies: Vec<Arc<dyn Policy>>) -> Self {
139+
Self { policies }
140+
}
141+
142+
pub async fn send(&self, ctx: Context, mut request: Request) -> PolicyResult<Response> {
143+
self.policies[0]
144+
.send(ctx, &mut request, &self.policies[1..])
145+
.await
146+
}
147+
}

sdk/cosmos/examples/create_delete_database.rs

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,11 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
3131
// authorization token at later time if you need, for example, to escalate the privileges for a
3232
// single operation.
3333
let http_client: Arc<Box<dyn HttpClient>> = Arc::new(Box::new(reqwest::Client::new()));
34-
let client = CosmosClient::new(http_client, account, authorization_token);
34+
let client = CosmosClient::new(
35+
http_client.clone(),
36+
account.clone(),
37+
authorization_token.clone(),
38+
);
3539

3640
// The Cosmos' client exposes a lot of methods. This one lists the databases in the specified
3741
// account. Database do not implement Display but deref to &str so you can pass it to methods
@@ -40,7 +44,18 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
4044
let list_databases_response = client.list_databases().execute().await?;
4145
println!("list_databases_response = {:#?}", list_databases_response);
4246

43-
let db = client.create_database().execute(&database_name).await?;
47+
let cosmos_client = CosmosClient::with_pipeline(
48+
account,
49+
authorization_token,
50+
CosmosOptions::with_client(http_client),
51+
);
52+
let db = cosmos_client
53+
.create_database(
54+
azure_core::Context::new(),
55+
&database_name,
56+
azure_cosmos::operations::create_database::Options::new(),
57+
)
58+
.await?;
4459
println!("created database = {:#?}", db);
4560

4661
// create collection!

sdk/cosmos/examples/document_00.rs

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,11 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
5151
// Next we will create a Cosmos client. You need an authorization_token but you can later
5252
// change it if needed.
5353
let http_client: Arc<Box<dyn HttpClient>> = Arc::new(Box::new(reqwest::Client::new()));
54-
let client = CosmosClient::new(http_client, account, authorization_token);
54+
let client = CosmosClient::new(
55+
http_client.clone(),
56+
account.clone(),
57+
authorization_token.clone(),
58+
);
5559

5660
// list_databases will give us the databases available in our account. If there is
5761
// an error (for example, the given key is not valid) you will receive a
@@ -65,10 +69,24 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
6569
.into_iter()
6670
.find(|db| db.id == DATABASE);
6771

72+
let database_client = CosmosClient::with_pipeline(
73+
account,
74+
authorization_token,
75+
CosmosOptions::with_client(http_client),
76+
);
6877
// If the requested database is not found we create it.
6978
let database = match db {
7079
Some(db) => db,
71-
None => client.create_database().execute(DATABASE).await?.database,
80+
None => {
81+
database_client
82+
.create_database(
83+
azure_core::Context::new(),
84+
DATABASE,
85+
azure_cosmos::operations::create_database::Options::new(),
86+
)
87+
.await?
88+
.database
89+
}
7290
};
7391
println!("database == {:?}", database);
7492

0 commit comments

Comments
 (0)