Skip to content

Commit

Permalink
feat(response-headers): forward the response headers from the storage…
Browse files Browse the repository at this point in the history
… engine to the stakeholders (#109)

Co-authored-by: bogdan.vidrean <bogdan.vidrean@olx.com>
  • Loading branch information
BogdanVidrean and bogdan.vidrean authored Sep 16, 2024
1 parent 6eb8d1e commit cbbb92f
Show file tree
Hide file tree
Showing 7 changed files with 80 additions and 29 deletions.
12 changes: 6 additions & 6 deletions .github/workflows/pull-requests-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ jobs:
build-base=0.5-r3
clang=16.0.6-r1
clang16-libclang=16.0.6-r1
expat-dev=2.6.0-r0
giflib-dev=5.2.1-r4
expat-dev=2.6.3-r0
giflib-dev=5.2.2-r0
glib-dev=2.76.6-r0
lcms2-dev=2.15-r2
libexif-dev=0.6.24-r1
Expand All @@ -58,8 +58,8 @@ jobs:
libpng-dev=1.6.39-r3
librsvg-dev=2.56.3-r0
libwebp-dev=1.3.2-r0
openssl-dev=3.1.4-r5
orc-dev=0.4.34-r0
openssl-dev=3.1.7-r0
orc-dev=0.4.39-r0
pkgconf=1.9.5-r0
tiff-dev=4.5.1-r0
tar
Expand Down Expand Up @@ -93,8 +93,8 @@ jobs:
run: RUSTFLAGS="-C target-feature=-crt-static $(pkg-config vips --libs)" cargo build
- name: Run Dali
run: ./target/debug/dali >> /dev/null &
- name: Check if Dali is running
run: sleep 5 && nc -z localhost 8080
- name: Wait for Dali to start
run: sleep 5
- name: Run tests
run: |
set +e
Expand Down
16 changes: 8 additions & 8 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ RUN apk add --update --no-cache --repository https://dl-cdn.alpinelinux.org/alpi
build-base=0.5-r3 \
clang=16.0.6-r1 \
clang16-libclang=16.0.6-r1 \
expat-dev=2.6.0-r0 \
giflib-dev=5.2.1-r4 \
expat-dev=2.6.3-r0 \
giflib-dev=5.2.2-r0 \
glib-dev=2.76.6-r0 \
lcms2-dev=2.15-r2 \
libexif-dev=0.6.24-r1 \
Expand All @@ -19,8 +19,8 @@ RUN apk add --update --no-cache --repository https://dl-cdn.alpinelinux.org/alpi
libpng-dev=1.6.39-r3 \
librsvg-dev=2.56.3-r0 \
libwebp-dev=1.3.2-r0 \
openssl-dev=3.1.4-r5 \
orc-dev=0.4.34-r0 \
openssl-dev=3.1.7-r0 \
orc-dev=0.4.39-r0 \
pkgconf=1.9.5-r0 \
tiff-dev=4.5.1-r0

Expand Down Expand Up @@ -51,8 +51,8 @@ COPY --from=build /usr/local/lib /usr/local/lib
RUN apk add --update --no-cache \
--repository=https://dl-cdn.alpinelinux.org/alpine/v3.18/main \
--repository=https://dl-cdn.alpinelinux.org/alpine/v3.18/community \
expat=2.6.0-r0 \
giflib=5.2.1-r4 \
expat=2.6.3-r0 \
giflib=5.2.2-r0 \
glib=2.76.6-r0 \
lcms2=2.15-r2 \
libde265=1.0.15-r0 \
Expand All @@ -64,8 +64,8 @@ RUN apk add --update --no-cache \
libpng=1.6.39-r3 \
librsvg=2.56.3-r0 \
libwebp=1.3.2-r0 \
openssl=3.1.4-r5 \
orc=0.4.34-r0 \
openssl=3.1.7-r0 \
orc=0.4.39-r0 \
tiff=4.5.1-r0

COPY --from=build /usr/src/dali/target/release/dali /usr/local/bin/dali
Expand Down
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,13 @@ The application will compute the number of threads by the following formula: `po
* Libvips
* A HTTP server for images
* Docker
* Rust
* Rust (1.74.0)

This application relies on C libvips library. That means it has to be previously installed into the system before compiling and/or running.

For installation follow this [instructions](https://libvips.github.io/libvips/install.html). (Required minimum version 8.10.1)

Using `rustup` is the recommended way to install `rust`. It is a tool that manages and updates rust versions (like `nvm` for node for example). To install it, simply run `curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh`. Then run `rustup update`.
Using `rustup` is the recommended way to install `rust`. It is a tool that manages and updates rust versions (like `nvm` for node for example). To install it, simply run `curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh`. Then run `rustup install 1.74.0`.

To build and run the application, run the following command:

Expand Down
9 changes: 8 additions & 1 deletion src/image_provider/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::collections::HashMap;

use async_trait::async_trait;

#[cfg(feature = "reqwest")]
Expand All @@ -13,9 +15,14 @@ pub mod s3;
#[cfg(not(any(feature = "reqwest", feature = "s3")))]
compile_error!("only 's3' is available as an extra feature for the image storage service");

pub struct ImageResponse {
pub bytes: Vec<u8>,
pub response_headers: HashMap<String, Vec<u8>>,
}

#[async_trait]
pub trait ImageProvider: Send + Sync {
async fn get_file(&self, resource: &str) -> Result<Vec<u8>, ImageProcessingError>;
async fn get_file(&self, resource: &str) -> Result<ImageResponse, ImageProcessingError>;
}

#[allow(unreachable_code)]
Expand Down
21 changes: 17 additions & 4 deletions src/image_provider/reqwest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ pub mod client {
ClientReturnedErrorStatusCode, ImageDownloadFailed, ImageDownloadTimedOut,
InvalidResourceUriProvided,
};
use crate::image_provider::ImageProvider;
use crate::image_provider::{ImageProvider, ImageResponse};
use crate::routes::image::ImageProcessingError;

pub struct ReqwestImageProvider {
Expand Down Expand Up @@ -49,7 +49,7 @@ pub mod client {

#[async_trait]
impl ImageProvider for ReqwestImageProvider {
async fn get_file(&self, resource: &str) -> Result<Vec<u8>, ImageProcessingError> {
async fn get_file(&self, resource: &str) -> Result<ImageResponse, ImageProcessingError> {
let url = Url::parse(resource).map_err(|_| {
error!(
"the provided resource uri is not a valid http url: '{}'",
Expand All @@ -69,8 +69,18 @@ pub mod client {
ImageDownloadFailed
}
})?;

let status = response.status();
let headers = response
.headers()
.into_iter()
.map(|header| {
(
String::from(header.0.as_str()),
header.1.as_bytes().to_vec(),
)
})
.collect();
if status.is_success() {
let bytes = response.bytes().await.map_err(|e| {
error!(
Expand All @@ -79,7 +89,10 @@ pub mod client {
);
ImageDownloadFailed
})?;
Ok(bytes.to_vec())
Ok(ImageResponse {
bytes: bytes.to_vec(),
response_headers: headers,
})
} else if status.is_client_error() {
error!(
"the requested image '{}' couldn't be downloaded. received status code: {}",
Expand Down
23 changes: 21 additions & 2 deletions src/image_provider/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ pub mod s3 {
use aws_sdk_s3::error::SdkError;
use axum::http::StatusCode;
use log::error;
use std::collections::HashMap;
use std::io::Write;
use thiserror::Error;

Expand All @@ -13,6 +14,7 @@ pub mod s3 {
use aws_sdk_s3::error::ProvideErrorMetadata;

use crate::commons::config::Configuration;
use crate::image_provider::ImageResponse;
use crate::image_provider::{
ImageProcessingError::{
self, ClientReturnedErrorStatusCode, ImageDownloadFailed, ImageDownloadTimedOut,
Expand Down Expand Up @@ -78,7 +80,7 @@ pub mod s3 {

#[async_trait]
impl ImageProvider for S3ImageProvider {
async fn get_file(&self, resource: &str) -> Result<Vec<u8>, ImageProcessingError> {
async fn get_file(&self, resource: &str) -> Result<ImageResponse, ImageProcessingError> {
if String::from(resource).is_empty() {
error!("the provided resource uri is empty");
return Err(InvalidResourceUriProvided(String::new()));
Expand Down Expand Up @@ -129,6 +131,20 @@ pub mod s3 {
}
})?;

let headers = match result.metadata() {
None => HashMap::new(),
Some(metadata) => metadata
.into_iter()
.map(|(key, value)| {
let mut response_header_key = String::from("x-amz-meta-");
response_header_key.push_str(key);
(
response_header_key,
value.as_bytes().to_vec(),
)
})
.collect(),
};
let mut binary_payload: Vec<u8> = Vec::new();
while let Some(bytes) = result.body.try_next().await.map_err(|e| {
error!(
Expand All @@ -146,7 +162,10 @@ pub mod s3 {
})?;
}

Ok(binary_payload)
Ok(ImageResponse {
bytes: binary_payload,
response_headers: headers,
})
}
}
}
24 changes: 18 additions & 6 deletions src/routes/image.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use axum::{
http::{Response, StatusCode},
response::IntoResponse,
};
use core::str;
use futures::future::join_all;
use log::{error, warn};
use serde::de::DeserializeOwned;
Expand All @@ -19,6 +20,12 @@ use crate::{

use super::metric::{FETCH_DURATION, INPUT_SIZE, OUTPUT_SIZE};

// The following response headers are determined by Dali as it formats the image dowloaded from the provided source.
// Thus the length and type of the resulted image might be different compared to what the storage engine has returned.
// To match different variations regarding the case (lower/upper) they're specified in lowercase here and we convert
// to lower the other ones that we compare with.
const HEADERS_DETERMINED_BY_DALI: [&str; 2] = ["content-type", "content-length"];

pub struct ProcessImageRequestExtractor<T>(pub T);

#[async_trait]
Expand Down Expand Up @@ -113,7 +120,7 @@ pub async fn process_image(
) -> Result<Response<Body>, ImageProcessingError> {
let now = SystemTime::now();
let main_img = image_provider.get_file(&params.image_address).await?;
let mut total_input_size = main_img.len();
let mut total_input_size = main_img.bytes.len();

let watermarks_futures = params
.watermarks
Expand All @@ -133,8 +140,8 @@ pub async fn process_image(
})
.map(|r| {
let watermark = r.unwrap();
total_input_size += watermark.len();
watermark
total_input_size += watermark.bytes.len();
watermark.bytes
})
.collect();

Expand All @@ -151,7 +158,7 @@ pub async fn process_image(
// response time and memory used
let (send, recv) = tokio::sync::oneshot::channel();
rayon::spawn(move || {
let image = image_processor::process_image(main_img, watermarks, params);
let image = image_processor::process_image(main_img.bytes, watermarks, params);
let _ = send.send(image);
});
let processed_image = recv.await.map_err(|e| {
Expand All @@ -170,8 +177,13 @@ pub async fn process_image(
})?;

log_size_metrics(&format, total_input_size, processed_image.len());
Ok(Response::builder()
.status(StatusCode::OK)
let mut response_builder = Response::builder().status(StatusCode::OK);
for (key, value) in main_img.response_headers.into_iter() {
if !HEADERS_DETERMINED_BY_DALI.contains(&key.to_lowercase().as_str()) {
response_builder = response_builder.header(key, value);
}
}
Ok(response_builder
.header("Content-Type", format!("image/{}", format))
.body(Body::from(processed_image))
.unwrap())
Expand Down

0 comments on commit cbbb92f

Please sign in to comment.