Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 19 additions & 8 deletions sdk/core/src/http_client.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,21 @@
use crate::errors::{AzureError, UnexpectedHTTPResult};
use async_trait::async_trait;
use bytes::Bytes;
use http::{Request, Response, StatusCode};
use hyper::{self, body, Body};
use hyper_rustls::HttpsConnector;
use serde::Serialize;

#[async_trait]
pub trait HttpClient: Send + Sync + std::fmt::Debug {
async fn execute_request(
&self,
request: Request<&[u8]>,
request: Request<Bytes>,
) -> Result<Response<Vec<u8>>, Box<dyn std::error::Error + Sync + Send>>;

async fn execute_request_check_status(
&self,
request: Request<&[u8]>,
request: Request<Bytes>,
expected_status: StatusCode,
) -> Result<Response<Vec<u8>>, Box<dyn std::error::Error + Sync + Send>> {
let response = self.execute_request(request).await?;
Expand All @@ -30,7 +32,7 @@ pub trait HttpClient: Send + Sync + std::fmt::Debug {

async fn execute_request_check_statuses(
&self,
request: Request<&[u8]>,
request: Request<Bytes>,
expected_statuses: &[StatusCode],
) -> Result<Response<Vec<u8>>, Box<dyn std::error::Error + Sync + Send>> {
let response = self.execute_request(request).await?;
Expand Down Expand Up @@ -59,13 +61,13 @@ pub trait HttpClient: Send + Sync + std::fmt::Debug {
}
}

pub static EMPTY_BODY: [u8; 0] = [];
pub static EMPTY_BODY: &[u8; 0] = &[];

#[async_trait]
impl HttpClient for hyper::Client<HttpsConnector<hyper::client::HttpConnector>> {
async fn execute_request(
&self,
request: Request<&[u8]>,
request: Request<Bytes>,
) -> Result<Response<Vec<u8>>, Box<dyn std::error::Error + Sync + Send>> {
let mut hyper_request = hyper::Request::builder()
.uri(request.uri())
Expand All @@ -75,7 +77,8 @@ impl HttpClient for hyper::Client<HttpsConnector<hyper::client::HttpConnector>>
hyper_request = hyper_request.header(header.0, header.1);
}

let hyper_request = hyper_request.body(Body::from(request.body().to_vec()))?;
let body = request.into_body();
let hyper_request = hyper_request.body(Body::from(body))?;

let hyper_response = self.request(hyper_request).await?;

Expand All @@ -97,15 +100,15 @@ impl HttpClient for hyper::Client<HttpsConnector<hyper::client::HttpConnector>>
impl HttpClient for reqwest::Client {
async fn execute_request(
&self,
request: Request<&[u8]>,
request: Request<Bytes>,
) -> Result<Response<Vec<u8>>, Box<dyn std::error::Error + Sync + Send>> {
let mut reqwest_request =
self.request(request.method().clone(), &request.uri().to_string());
for header in request.headers() {
reqwest_request = reqwest_request.header(header.0, header.1);
}

let body = String::from_utf8(request.body().to_vec())?;
Copy link
Contributor Author

@ctaggart ctaggart Jan 15, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem here is that byte array reference gets cloned with to_vec & then converts it to a string, only to have the string converted to bytes later. It is more efficient to own the bytes and send them without the conversions.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes you are right, the string conversion is not really necessary.
Also the idea to use ref counting here (by using bytes) is really good. 👍 👍

let body = request.into_body();
let reqwest_request = reqwest_request.body(body).build()?;

let reqwest_response = self.execute(reqwest_request).await?;
Expand All @@ -123,3 +126,11 @@ impl HttpClient for reqwest::Client {
Ok(response)
}
}

/// Serialize to json
pub fn to_json<T>(value: &T) -> Result<Bytes, Box<dyn std::error::Error + Sync + Send>>
where
T: ?Sized + Serialize,
{
Ok(Bytes::from(serde_json::to_vec(value)?))
}
1 change: 1 addition & 0 deletions sdk/cosmos/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ serde_json = "1.0"
url = "2.2"
uuid = { version = "0.8", features = ["v4"] }
failure = "0.1"
bytes = "1.0"

[dev-dependencies]
env_logger = "0.8"
Expand Down
3 changes: 2 additions & 1 deletion sdk/cosmos/examples/attachments_00.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use azure_core::HttpClient;
use azure_cosmos::prelude::*;
use bytes::Bytes;
use serde::{Deserialize, Serialize};
use std::borrow::Cow;
use std::error::Error;
Expand Down Expand Up @@ -128,7 +129,7 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
.create_slug()
.consistency_level(&resp_delete)
.content_type("text/plain")
.body(b"FFFFF")
.body(Bytes::from_static(b"FFFFF"))
.execute()
.await?;

Expand Down
6 changes: 3 additions & 3 deletions sdk/cosmos/src/requests/create_collection_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,10 @@ impl<'a> CreateCollectionBuilder<'a> {
partition_key: &self.partition_key,
};

let body = serde_json::to_string(&collection)?;
debug!("body == {}", body);
let body = azure_core::to_json(&collection)?;
debug!("body == {:?}", body);

let req = req.body(body.as_bytes())?;
let req = req.body(body)?;
debug!("\nreq == {:?}", req);

Ok(self
Expand Down
4 changes: 2 additions & 2 deletions sdk/cosmos/src/requests/create_database_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ impl<'a> CreateDatabaseBuilder<'a, Yes> {
pub id: &'a str,
}

let req = serde_json::to_string(&CreateDatabaseRequest {
let req = azure_core::to_json(&CreateDatabaseRequest {
id: self.database_name.unwrap(),
})?;

Expand All @@ -77,7 +77,7 @@ impl<'a> CreateDatabaseBuilder<'a, Yes> {
let request = azure_core::headers::add_optional_header(&self.activity_id, request);
let request = azure_core::headers::add_optional_header(&self.consistency_level, request);

let request = request.body(req.as_bytes())?; // todo: set content-length here and elsewhere without builders
let request = request.body(req)?; // todo: set content-length here and elsewhere without builders

debug!("create database request prepared == {:?}", request);

Expand Down
4 changes: 2 additions & 2 deletions sdk/cosmos/src/requests/create_document_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,8 @@ impl<'a, 'b> CreateDocumentBuilder<'a, 'b, Yes> {
req = azure_core::headers::add_mandatory_header(&self.indexing_directive, req);
req = azure_core::headers::add_mandatory_header(&self.allow_tentative_writes, req);

let serialized = serde_json::to_string(document)?;
let req = req.body(serialized.as_bytes())?;
let serialized = azure_core::to_json(document)?;
let req = req.body(serialized)?;

let response = self
.collection_client
Expand Down
4 changes: 2 additions & 2 deletions sdk/cosmos/src/requests/create_or_replace_trigger_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,8 +202,8 @@ impl<'a> CreateOrReplaceTriggerBuilder<'a, Yes, Yes, Yes> {
body: self.body(),
};

let request = serde_json::to_string(&request)?;
let request = req.body(request.as_bytes())?;
let request = azure_core::to_json(&request)?;
let request = req.body(request)?;

let expected_status = if self.is_create {
StatusCode::CREATED
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,8 @@ impl<'a, 'b> CreateOrReplaceUserDefinedFunctionBuilder<'a, 'b, Yes> {
.user_defined_function_name(),
};

let request = serde_json::to_string(&request)?;
let request = req.body(request.as_bytes())?;
let request = azure_core::to_json(&request)?;
let request = req.body(request)?;

Ok(if self.is_create {
self.user_defined_function_client
Expand Down
6 changes: 3 additions & 3 deletions sdk/cosmos/src/requests/create_permission_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,16 +67,16 @@ impl<'a, 'b> CreatePermissionBuilder<'a, 'b> {
#[serde(rename = "permissionMode")]
permission_mode: &'x str,
resource: &'x str,
};
}

let request_body = RequestBody {
id: self.permission_client.permission_name(),
permission_mode: permission_mode.kind(),
resource: permission_mode.resource(),
};
let request_body = serde_json::to_string(&request_body)?;
let request_body = azure_core::to_json(&request_body)?;

let request = request.body(request_body.as_bytes())?;
let request = request.body(request_body)?;
debug!("\nrequest == {:#?}", request);

Ok(self
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,15 +116,15 @@ impl<'a, 'b> CreateReferenceAttachmentBuilder<'a, 'b, Yes, Yes> {
pub media: &'r str,
}

let request = serde_json::to_string(&_Request {
let request = azure_core::to_json(&_Request {
id: self.attachment_client.attachment_name(),
content_type: self.content_type.unwrap().as_str(),
media: self.media.unwrap(),
})?;

req = req.header(http::header::CONTENT_TYPE, "application/json");
req = req.header(http::header::CONTENT_LENGTH, request.len());
let req = req.body(request.as_bytes())?;
let req = req.body(request)?;
debug!("req == {:#?}", req);

Ok(self
Expand Down
15 changes: 10 additions & 5 deletions sdk/cosmos/src/requests/create_slug_attachment_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::prelude::*;
use crate::responses::CreateSlugAttachmentResponse;
use azure_core::prelude::*;
use azure_core::{No, ToAssign, Yes};
use bytes::Bytes;
use http::StatusCode;
use std::convert::TryInto;
use std::marker::PhantomData;
Expand All @@ -13,7 +14,7 @@ where
ContentTypeSet: ToAssign,
{
attachment_client: &'a AttachmentClient,
body: Option<&'b [u8]>,
body: Option<Bytes>,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure about this change. I don't really know how CreateSlugAttachmentBuilder is used. If It is reused, this should be a &Bytes then cloned.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to the docs (https://docs.rs/bytes/1.0.1/bytes/) bytes uses ref counting so cloning should be relatively cheap. I'd go with the owned struct.

content_type: Option<ContentType<'b>>,
if_match_condition: Option<IfMatchCondition<'b>>,
user_agent: Option<UserAgent<'b>>,
Expand Down Expand Up @@ -56,9 +57,12 @@ impl<'a, 'b, ContentTypeSet> CreateSlugAttachmentBuilder<'a, 'b, No, ContentType
where
ContentTypeSet: ToAssign,
{
pub fn body(self, body: &'b [u8]) -> CreateSlugAttachmentBuilder<'a, 'b, Yes, ContentTypeSet> {
pub fn body(
self,
body: impl Into<Bytes>,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rylev, like this with impl Into<Bytes>?

) -> CreateSlugAttachmentBuilder<'a, 'b, Yes, ContentTypeSet> {
CreateSlugAttachmentBuilder {
body: Some(body),
body: Some(body.into()),
attachment_client: self.attachment_client,
content_type: self.content_type,
if_match_condition: self.if_match_condition,
Expand Down Expand Up @@ -111,9 +115,10 @@ impl<'a, 'b> CreateSlugAttachmentBuilder<'a, 'b, Yes, Yes> {
req = azure_core::headers::add_mandatory_header(&self.content_type.unwrap(), req);

req = req.header("Slug", self.attachment_client.attachment_name());
req = req.header(http::header::CONTENT_LENGTH, self.body.unwrap().len());
let body = self.body.clone().unwrap();
req = req.header(http::header::CONTENT_LENGTH, body.len());

let req = req.body(self.body.unwrap())?;
let req = req.body(body)?;

debug!("req == {:#?}", req);

Expand Down
4 changes: 2 additions & 2 deletions sdk/cosmos/src/requests/create_stored_procedure_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,8 @@ impl<'a, 'b> CreateStoredProcedureBuilder<'a, 'b, Yes> {
id: self.stored_procedure_client.stored_procedure_name(),
};

let request = serde_json::to_string(&request)?;
let request = req.body(request.as_bytes())?;
let request = azure_core::to_json(&request)?;
let request = req.body(request)?;

Ok(self
.stored_procedure_client
Expand Down
4 changes: 2 additions & 2 deletions sdk/cosmos/src/requests/create_user_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@ impl<'a, 'b> CreateUserBuilder<'a, 'b> {
let request_body = RequestBody {
id: self.user_client.user_name(),
};
let request_body = serde_json::to_string(&request_body)?;
let request_body = azure_core::to_json(&request_body)?;

let req = req.body(request_body.as_bytes())?;
let req = req.body(request_body)?;
debug!("\nreq == {:?}", req);

Ok(self
Expand Down
2 changes: 1 addition & 1 deletion sdk/cosmos/src/requests/delete_attachment_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ impl<'a, 'b> DeleteAttachmentBuilder<'a, 'b> {
req,
);

let req = req.body(EMPTY_BODY.as_ref())?;
let req = req.body(bytes::Bytes::from_static(EMPTY_BODY))?;

debug!("req == {:#?}", req);

Expand Down
2 changes: 1 addition & 1 deletion sdk/cosmos/src/requests/delete_collection_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ impl<'a> DeleteCollectionBuilder<'a> {
let request = azure_core::headers::add_optional_header(&self.activity_id, request);
let request = azure_core::headers::add_optional_header(&self.consistency_level, request);

let request = request.body(EMPTY_BODY.as_ref())?;
let request = request.body(bytes::Bytes::from_static(EMPTY_BODY))?;

Ok(self
.collection_client
Expand Down
2 changes: 1 addition & 1 deletion sdk/cosmos/src/requests/delete_database_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ impl<'a> DeleteDatabaseBuilder<'a> {
let request = azure_core::headers::add_optional_header(&self.activity_id, request);
let request = azure_core::headers::add_optional_header(&self.consistency_level, request);

let request = request.body(EMPTY_BODY.as_ref())?;
let request = request.body(bytes::Bytes::from_static(EMPTY_BODY))?;

trace!("request prepared == {:?}", request);

Expand Down
2 changes: 1 addition & 1 deletion sdk/cosmos/src/requests/delete_document_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ impl<'a> DeleteDocumentBuilder<'a> {

req = crate::headers::add_partition_keys_header(self.document_client.partition_keys(), req);

let req = req.body(EMPTY_BODY.as_ref())?;
let req = req.body(bytes::Bytes::from_static(EMPTY_BODY))?;
debug!("{:?}", req);

Ok(self
Expand Down
2 changes: 1 addition & 1 deletion sdk/cosmos/src/requests/delete_permission_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ impl<'a, 'b> DeletePermissionsBuilder<'a, 'b> {
let request = azure_core::headers::add_optional_header(&self.activity_id, request);
let request = azure_core::headers::add_optional_header(&self.consistency_level, request);

let request = request.body(EMPTY_BODY.as_ref())?;
let request = request.body(bytes::Bytes::from_static(EMPTY_BODY))?;
debug!("\nrequest == {:#?}", request);

Ok(self
Expand Down
2 changes: 1 addition & 1 deletion sdk/cosmos/src/requests/delete_stored_procedure_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ impl<'a, 'b> DeleteStoredProcedureBuilder<'a, 'b> {
let request = azure_core::headers::add_optional_header(&self.activity_id, request);
let request = azure_core::headers::add_optional_header(&self.consistency_level, request);

let request = request.body(EMPTY_BODY.as_ref())?;
let request = request.body(bytes::Bytes::from_static(EMPTY_BODY))?;

Ok(self
.stored_procedure_client
Expand Down
2 changes: 1 addition & 1 deletion sdk/cosmos/src/requests/delete_trigger_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ impl<'a, 'b> DeleteTriggerBuilder<'a, 'b> {
let req = azure_core::headers::add_optional_header(&self.activity_id, req);
let req = azure_core::headers::add_optional_header(&self.consistency_level, req);

let request = req.body(EMPTY_BODY.as_ref())?;
let request = req.body(bytes::Bytes::from_static(EMPTY_BODY))?;

Ok(self
.trigger_client
Expand Down
2 changes: 1 addition & 1 deletion sdk/cosmos/src/requests/delete_user_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ impl<'a, 'b> DeleteUserBuilder<'a, 'b> {
let req = azure_core::headers::add_optional_header(&self.activity_id, req);
let req = azure_core::headers::add_optional_header(&self.consistency_level, req);

let req = req.body(EMPTY_BODY.as_ref())?;
let req = req.body(bytes::Bytes::from_static(EMPTY_BODY))?;
debug!("\nreq == {:?}", req);

Ok(self
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ impl<'a, 'b> DeleteUserDefinedFunctionBuilder<'a, 'b> {
let request = azure_core::headers::add_optional_header(&self.activity_id, request);
let request = azure_core::headers::add_optional_header(&self.consistency_level, request);

let request = request.body(EMPTY_BODY.as_ref())?;
let request = request.body(bytes::Bytes::from_static(EMPTY_BODY))?;

Ok(self
.user_defined_function_client
Expand Down
9 changes: 6 additions & 3 deletions sdk/cosmos/src/requests/execute_stored_procedure_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::prelude::*;
use crate::resources::stored_procedure::Parameters;
use crate::responses::ExecuteStoredProcedureResponse;
use azure_core::prelude::*;
use bytes::Bytes;
use http::StatusCode;
use serde::de::DeserializeOwned;
use std::convert::TryInto;
Expand All @@ -17,6 +18,8 @@ pub struct ExecuteStoredProcedureBuilder<'a, 'b> {
partition_keys: Option<&'b PartitionKeys>,
}

static EMPTY_LIST: &[u8; 2] = b"[]";

impl<'a, 'b> ExecuteStoredProcedureBuilder<'a, 'b> {
pub(crate) fn new(stored_procedure_client: &'a StoredProcedureClient) -> Self {
Self {
Expand Down Expand Up @@ -66,12 +69,12 @@ impl<'a, 'b> ExecuteStoredProcedureBuilder<'a, 'b> {
let request = request.header(http::header::CONTENT_TYPE, "application/json");

let body = if let Some(parameters) = self.parameters.as_ref() {
parameters.to_json()
Bytes::from(parameters.to_json())
} else {
String::from("[]")
Bytes::from_static(EMPTY_LIST)
};

let request = request.body(body.as_bytes())?;
let request = request.body(body)?;

Ok(self
.stored_procedure_client
Expand Down
Loading