Skip to content

Commit

Permalink
Correct handling of /images/create response stream (#6451)
Browse files Browse the repository at this point in the history
The `/images/create` Docker API returns a stream of JSON objects whose last one indicates success (irrespective of initial status code). `release/1.2` properly handles this, but the corrected handling was lost during the upgrade to `async/await`. These changes reintroduce the correct handling and add verification tests.

## Azure IoT Edge PR checklist:
  • Loading branch information
onalante-msft authored Jun 17, 2022
1 parent 59b192c commit 287629d
Show file tree
Hide file tree
Showing 10 changed files with 288 additions and 84 deletions.
12 changes: 4 additions & 8 deletions edgelet/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions edgelet/aziot-edged/src/watchdog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::error::Error as EdgedError;
pub(crate) async fn run_until_shutdown(
settings: edgelet_settings::docker::Settings,
device_info: &aziot_identity_common::AzureIoTSpec,
runtime: edgelet_docker::DockerModuleRuntime,
runtime: edgelet_docker::DockerModuleRuntime<http_common::Connector>,
identity_client: &aziot_identity_client_async::Client,
mut action_rx: tokio::sync::mpsc::UnboundedReceiver<edgelet_core::WatchdogAction>,
) -> Result<edgelet_core::WatchdogAction, EdgedError> {
Expand Down Expand Up @@ -76,7 +76,7 @@ pub(crate) async fn run_until_shutdown(
async fn watchdog(
settings: &edgelet_settings::docker::Settings,
device_info: &aziot_identity_common::AzureIoTSpec,
runtime: &edgelet_docker::DockerModuleRuntime,
runtime: &edgelet_docker::DockerModuleRuntime<http_common::Connector>,
identity_client: &aziot_identity_client_async::Client,
) -> Result<(), EdgedError> {
log::info!("Watchdog checking Edge runtime status");
Expand Down Expand Up @@ -127,7 +127,7 @@ async fn watchdog(

async fn restart_modules(
settings: &edgelet_settings::docker::Settings,
runtime: &edgelet_docker::DockerModuleRuntime,
runtime: &edgelet_docker::DockerModuleRuntime<http_common::Connector>,
) {
let agent_name = settings.agent().name();

Expand Down Expand Up @@ -186,7 +186,7 @@ async fn restart_modules(
async fn create_and_start_agent(
settings: &edgelet_settings::docker::Settings,
device_info: &aziot_identity_common::AzureIoTSpec,
runtime: &edgelet_docker::DockerModuleRuntime,
runtime: &edgelet_docker::DockerModuleRuntime<http_common::Connector>,
identity_client: &aziot_identity_client_async::Client,
) -> Result<(), EdgedError> {
let agent_name = settings.agent().name();
Expand Down
9 changes: 7 additions & 2 deletions edgelet/docker-rs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,17 @@ version = "0.1.0"
[dependencies]
async-trait = "0.1"
base64 = "0.13"
futures = "0.3"
hyper = "0.14"
futures-core = "0.3"
hyper = { version = "0.14", features = ["client", "http1"] }
serde = "1.0"
serde_derive = "1.0"
serde_json = "1.0"
tokio = { version = "1", features = ["time"] }
url = "2"

http-common = { git = "https://github.com/Azure/iot-identity-service", branch = "main" }

[dev-dependencies]
tokio = { version = "1", features = ["macros", "rt"] }

edgelet-test-utils = { path = "../edgelet-test-utils" }
106 changes: 83 additions & 23 deletions edgelet/docker-rs/src/apis/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ use std::borrow::Borrow;
use std::error::Error;
use std::sync::Arc;

use futures::{Future, Stream};
use futures_core::{Future, Stream};
use serde_json;

use http_common::{Connector, ErrorBody, HttpRequest};
use http_common::{ErrorBody, HttpRequest};
use hyper::{Body, Client, Uri};

use super::configuration::Configuration;
Expand All @@ -16,13 +16,16 @@ use crate::models;
type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>;

#[derive(Clone)]
pub struct DockerApiClient {
connector: Connector,
pub struct DockerApiClient<C> {
connector: C,
configuration: Arc<Configuration>,
}

impl DockerApiClient {
pub fn new(connector: Connector) -> Self {
impl<C> DockerApiClient<C>
where
C: Clone + hyper::client::connect::Connect + Send + Sync + 'static,
{
pub fn new(connector: C) -> Self {
Self {
connector,
configuration: Arc::new(Configuration::default()),
Expand All @@ -34,7 +37,7 @@ impl DockerApiClient {
self
}

fn add_user_agent<TBody>(&self, request: &mut HttpRequest<TBody, Connector>) -> Result<()>
fn add_user_agent<TBody>(&self, request: &mut HttpRequest<TBody, C>) -> Result<()>
where
TBody: serde::Serialize,
{
Expand Down Expand Up @@ -113,7 +116,10 @@ pub trait DockerApi {
}

#[async_trait::async_trait]
impl DockerApi for DockerApiClient {
impl<C> DockerApi for DockerApiClient<C>
where
C: Clone + hyper::client::connect::Connect + Send + Sync + 'static,
{
async fn system_info(&self) -> Result<models::SystemInfo> {
let uri_str = format!("/info");
let uri = (self.configuration.uri_composer)(&self.configuration.base_path, &uri_str)?;
Expand Down Expand Up @@ -147,25 +153,43 @@ impl DockerApi for DockerApiClient {
.finish();
let uri_str = format!("/images/create?{}", query);
let uri = (self.configuration.uri_composer)(&self.configuration.base_path, &uri_str)?;
let uri = uri.to_string();

let mut request = HttpRequest::post(self.connector.clone(), &uri, Some(input_image));
request.add_header(
// Response body is a sequence of JSON objects.
// Each object is either a `{ "status": ... }` or an `{ "errorDetail": ... }`
//
// The overall success or failure of the operation is determined by which one
// the last object is.
let client = hyper::client::Client::builder().build::<_, Body>(self.connector.clone());
let mut request = hyper::Request::post(uri).header(
hyper::header::HeaderName::from_static("x-registry-auth"),
x_registry_auth,
)?;
self.add_user_agent(&mut request)?;

let response = request
.response(true)
.await
.map_err(ApiError::with_context("Could not create image."))?;
let (status, _) = response.into_parts();
);
if let Some(agent) = &self.configuration.user_agent {
request = request.header(hyper::header::USER_AGENT, agent);
}
let request = request.body(serde_json::to_string(input_image)?.into())?;

if status == hyper::StatusCode::OK {
Ok(())
let response =
tokio::time::timeout(std::time::Duration::from_secs(30), client.request(request))
.await?
.map_err(ApiError::with_context("Could not create image."))?;
let (parts, body) = response.into_parts();

if parts.status == hyper::StatusCode::OK {
let response_bytes = hyper::body::to_bytes(body).await?;
let last = serde_json::Deserializer::from_slice(&response_bytes)
.into_iter::<serde_json::Map<String, serde_json::Value>>()
.last()
.ok_or_else(|| {
ApiError::with_message("received empty response from container runtime")
})??;
if let Some(detail) = last.get("errorDetail") {
Err(ApiError::with_message(serde_json::to_string(detail)?).into())
} else {
Ok(())
}
} else {
Err(ApiError::with_message(format!("Bad status code: {}", status)).into())
Err(ApiError::with_message(format!("Bad status code: {}", parts.status)).into())
}
}

Expand Down Expand Up @@ -459,7 +483,7 @@ impl DockerApi for DockerApiClient {
.expect("could not build hyper::Request");

// send request
let client = self.connector.clone().into_client();
let client = hyper::client::Client::builder().build(self.connector.clone());
let resp = client.request(req).await?;
let (hyper::http::response::Parts { status, .. }, body) = resp.into_parts();
if status.is_success() {
Expand Down Expand Up @@ -514,3 +538,39 @@ impl DockerApi for DockerApiClient {
Ok(response)
}
}

#[cfg(test)]
mod tests {
use super::DockerApi;

#[tokio::test]
async fn image_create_stream_ok() {
let payload = format!(
"{}{}",
serde_json::to_string(&serde_json::json!({"status":"STATUS"})).unwrap(),
serde_json::to_string(&serde_json::json!({"status":"STATUS"})).unwrap(),
);
let client = super::DockerApiClient::new(edgelet_test_utils::JsonConnector::ok(&payload));
assert!(client
.image_create("", "", "", "", "", "", "")
.await
.is_ok());
}

#[tokio::test]
async fn image_create_stream_error() {
let payload = format!(
"{}{}",
serde_json::to_string(&serde_json::json!({"status":"STATUS"})).unwrap(),
serde_json::to_string(
&serde_json::json!({"errorDetail":{"code":"CODE","message":"MESSAGE"}})
)
.unwrap()
);
let client = super::DockerApiClient::new(edgelet_test_utils::JsonConnector::ok(&payload));
assert!(client
.image_create("", "", "", "", "", "", "")
.await
.is_err());
}
}
15 changes: 9 additions & 6 deletions edgelet/edgelet-docker/src/module.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,21 @@ use crate::error::Error;
pub const MODULE_TYPE: &str = "docker";
pub const MIN_DATE: &str = "0001-01-01T00:00:00Z";

pub struct DockerModule {
client: DockerApiClient,
pub struct DockerModule<C> {
client: DockerApiClient<C>,
name: String,
config: DockerConfig,
}

impl std::fmt::Debug for DockerModule {
impl<C> std::fmt::Debug for DockerModule<C> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("DockerModule").finish()
}
}

impl DockerModule {
impl<C> DockerModule<C> {
pub fn new(
client: DockerApiClient,
client: DockerApiClient<C>,
name: String,
config: DockerConfig,
) -> anyhow::Result<Self> {
Expand Down Expand Up @@ -91,7 +91,10 @@ pub fn runtime_state(
}

#[async_trait::async_trait]
impl Module for DockerModule {
impl<C> Module for DockerModule<C>
where
C: Clone + hyper::client::connect::Connect + Send + Sync + 'static,
{
type Config = DockerConfig;

fn name(&self) -> &str {
Expand Down
Loading

0 comments on commit 287629d

Please sign in to comment.