Skip to content

Commit

Permalink
feat: image processor docker
Browse files Browse the repository at this point in the history
  • Loading branch information
TroyKomodo committed Jul 14, 2024
1 parent d2076ea commit 5b18cf7
Show file tree
Hide file tree
Showing 14 changed files with 107 additions and 76 deletions.
7 changes: 6 additions & 1 deletion .dockerignore
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
.vscode/
node_modules/
.env*
Dockerfile
**/Dockerfile
**/.dockerignore
**/.gitignore
.git/
**/*.dockerfile
dev/
.dockerignore
target/
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ debug = true

[profile.release-fat]
inherits = "release"
debug = false
strip = true
lto = "fat"

[profile.wasm]
Expand Down
43 changes: 2 additions & 41 deletions docker/ffmpeg.sh
Original file line number Diff line number Diff line change
Expand Up @@ -26,50 +26,11 @@ apt-get install -y --no-install-recommends \
libpng-dev \
libjpeg-dev \
libtiff-dev \
libpng-16-16 \
libpng16-16 \
libjpeg62 \
libtiff6


git clone https://github.com/ScuffleTV/external.git --depth 1 --recurse-submodule --shallow-submodules /tmp/external
/tmp/external/build.sh --prefix /usr/local --build "x264 x265 svt-av1 libvpx opus dav1d ffmpeg opencv"
/tmp/external/build.sh --prefix /usr/local --build "x264 x265 svt-av1 libvpx dav1d ffmpeg"
ldconfig
rm -rf /tmp/external

apt-get remove -y --purge \
make \
zip \
unzip \
curl \
wget \
git \
ssh \
ca-certificates \
pkg-config \
gnupg2 \
cmake \
clang-format \
ninja-build \
nasm \
yasm \
meson \
libtool \
autoconf \
automake \
build-essential \
libpython3.11-stdlib \
libpython3.11-minimal \
libpython3.11 \
python3.11 \
python3.11-minimal \
g++ \
g++-12 \
gcc \
gcc-12 \
"*-dev" \
"*-dev-*"

apt-get autoremove -y
apt-get clean
rm -rf /var/lib/apt/lists/*
rm /etc/ssh -rf
23 changes: 0 additions & 23 deletions docker/platform/image-processor.Dockerfile

This file was deleted.

9 changes: 9 additions & 0 deletions docker/rust.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
set -ex

apt-get update
apt-get install -y --no-install-recommends \
build-essential \
curl \
ca-certificates

curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y --profile minimal
1 change: 0 additions & 1 deletion foundations/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -251,5 +251,4 @@ default = [
"http",
"http-tls",
"http2",
# "http3",
]
47 changes: 47 additions & 0 deletions image-processor/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
FROM bitnami/minideb as builder

WORKDIR /tmp

ENV CARGO_HOME=/usr/local/cargo \
PATH=/usr/local/cargo/bin:$PATH

RUN --mount=type=bind,src=docker/ffmpeg.sh,dst=/mount/ffmpeg.sh \
/mount/ffmpeg.sh

RUN --mount=type=bind,src=docker/rust.sh,dst=/mount/rust.sh \
/mount/rust.sh

RUN apt-get update && \
apt-get install -y --no-install-recommends \
libclang-dev \
protobuf-compiler \
patchelf

COPY . .

ARG PROFILE=release-fat

RUN cargo build --profile ${PROFILE} --bin scuffle-image-processor -p scuffle-image-processor --locked

RUN mkdir /out && \
mv target/${PROFILE}/scuffle-image-processor /out/image-processor && \
ldd /out/image-processor | grep -o '/[^ ]*' | xargs -I '{}' cp {} /out && \
patchelf --set-rpath '$ORIGIN' /out/image-processor

FROM gcr.io/distroless/base-nossl-debian12

LABEL org.opencontainers.image.source=https://github.com/scuffletv/scuffle
LABEL org.opencontainers.image.description="Scuffle Image Processor"
LABEL org.opencontainers.image.licenses=BSD-4-Clause

WORKDIR /app

ENV LD_LIBRARY_PATH=/app:$LD_LIBRARY_PATH

COPY --from=builder /out /app

STOPSIGNAL SIGTERM

USER 1000

ENTRYPOINT ["/app/image-processor"]
1 change: 0 additions & 1 deletion image-processor/src/drive/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,6 @@ impl Drive for AnyDrive {
}

async fn write(&self, path: &str, data: Bytes, options: Option<DriveWriteOptions>) -> Result<(), DriveError> {
tracing::info!("writing to drive: {}", path);
match self {
AnyDrive::Local(drive) => drive.write(path, data, options).await,
AnyDrive::S3(drive) => drive.write(path, data, options).await,
Expand Down
2 changes: 1 addition & 1 deletion image-processor/src/drive/public_http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ impl PublicHttpDrive {
pub async fn new(config: &PublicHttpDriveConfig) -> Result<Self, DriveError> {
tracing::debug!("setting up public http disk");
if !config.blacklist.is_empty() || !config.whitelist.is_empty() {
tracing::error!("blacklist and whitelist are not supported for public http disk");
tracing::error!("blacklist and whitelist are not currently implemented for public http disk");
return Err(PublicHttpDriveError::Unsupported("blacklist and whitelist").into());
}

Expand Down
9 changes: 6 additions & 3 deletions image-processor/src/management/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,21 @@ use tonic::{Request, Response};
use super::ManagementServer;

impl ManagementServer {
pub async fn run_grpc(&self) -> Result<(), tonic::transport::Error> {
let addr = self.global.config().management.grpc.bind;
#[tracing::instrument(skip_all)]
pub async fn run_grpc(&self, addr: std::net::SocketAddr) -> Result<(), tonic::transport::Error> {
let server = tonic::transport::Server::builder()
.add_service(scuffle_image_processor_proto::image_processor_server::ImageProcessorServer::new(self.clone()))
.serve_with_shutdown(addr, scuffle_foundations::context::Context::global().into_done());

tracing::info!(%addr, "gRPC server listening");
tracing::info!("gRPC management server listening on {}", addr);

server.await
}
}

#[async_trait::async_trait]
impl scuffle_image_processor_proto::image_processor_server::ImageProcessor for ManagementServer {
#[tracing::instrument(skip_all)]
async fn process_image(&self, request: Request<ProcessImageRequest>) -> tonic::Result<Response<ProcessImageResponse>> {
let resp = match self.process_image(request.into_inner()).await {
Ok(resp) => resp,
Expand All @@ -30,6 +32,7 @@ impl scuffle_image_processor_proto::image_processor_server::ImageProcessor for M
Ok(Response::new(resp))
}

#[tracing::instrument(skip_all)]
async fn cancel_task(&self, request: Request<CancelTaskRequest>) -> tonic::Result<Response<CancelTaskResponse>> {
let resp = match self.cancel_task(request.into_inner()).await {
Ok(resp) => resp,
Expand Down
9 changes: 7 additions & 2 deletions image-processor/src/management/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,16 @@ use scuffle_image_processor_proto::{
use super::ManagementServer;

impl ManagementServer {
pub async fn run_http(&self) -> Result<(), scuffle_foundations::http::server::Error> {
#[tracing::instrument(skip_all)]
pub async fn run_http(&self, addr: std::net::SocketAddr) -> Result<(), scuffle_foundations::http::server::Error> {
let router = Router::new()
.route("/process_image", post(process_image))
.route("/cancel_task", post(cancel_task))
.fallback(not_found)
.with_state(self.clone());

let addr = self.global.config().management.http.bind;
tracing::info!("HTTP management server listening on {}", addr);

scuffle_foundations::http::server::Server::builder()
.bind(addr)
.build(router)?
Expand All @@ -24,10 +26,12 @@ impl ManagementServer {
}
}

#[tracing::instrument(skip_all)]
async fn not_found() -> (http::StatusCode, &'static str) {
(http::StatusCode::NOT_FOUND, "Not Found")
}

#[tracing::instrument(skip_all)]
async fn process_image(
State(server): State<ManagementServer>,
Json(request): Json<ProcessImageRequest>,
Expand All @@ -48,6 +52,7 @@ async fn process_image(
(status, Json(resp))
}

#[tracing::instrument(skip_all)]
async fn cancel_task(
State(server): State<ManagementServer>,
Json(request): Json<CancelTaskRequest>,
Expand Down
14 changes: 12 additions & 2 deletions image-processor/src/management/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use crate::worker::process::DecoderFrontend;
pub mod grpc;
pub mod http;

mod utils;
mod validation;

#[derive(Clone)]
Expand All @@ -25,9 +26,12 @@ struct ManagementServer {
}

impl ManagementServer {
#[tracing::instrument(skip_all)]
async fn process_image(&self, mut request: ProcessImageRequest) -> Result<ProcessImageResponse, Error> {
let mut fragment = FragmentBuf::new();

tracing::info!("new process image request");

validate_task(
&self.global,
fragment.push("task"),
Expand Down Expand Up @@ -128,7 +132,10 @@ impl ManagementServer {
})
}

#[tracing::instrument(skip_all)]
async fn cancel_task(&self, request: CancelTaskRequest) -> Result<CancelTaskResponse, Error> {
tracing::info!("new cancel task request");

match Job::cancel(
&self.global,
request.id.parse().map_err(|err| Error {
Expand All @@ -154,19 +161,22 @@ impl ManagementServer {
}
}

#[tracing::instrument(skip_all)]
pub async fn start(global: Arc<Global>) -> anyhow::Result<()> {
let server = ManagementServer { global };

let http = async {
if server.global.config().management.http.enabled {
server.run_http().await.context("http")
let addr = utils::true_bind(server.global.config().management.http.bind).await?;
server.run_http(addr).await.context("http")
} else {
Ok(())
}
};
let grpc = async {
if server.global.config().management.grpc.enabled {
server.run_grpc().await.context("grpc")
let addr = utils::true_bind(server.global.config().management.grpc.bind).await?;
server.run_grpc(addr).await.context("grpc")
} else {
Ok(())
}
Expand Down
8 changes: 8 additions & 0 deletions image-processor/src/management/utils.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
pub async fn true_bind(addr: std::net::SocketAddr) -> std::io::Result<std::net::SocketAddr> {
if addr.port() == 0 {
let bind = tokio::net::TcpListener::bind(addr).await?;
bind.local_addr()
} else {
Ok(addr)
}
}
8 changes: 7 additions & 1 deletion image-processor/src/worker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ pub async fn start(global: Arc<Global>) -> anyhow::Result<()> {

let semaphore = Arc::new(tokio::sync::Semaphore::new(concurrency));

tracing::info!("worker started with {} threads", concurrency);

let mut error_count = 0;
let (_, handle) = context::Context::new();

Expand All @@ -38,8 +40,12 @@ pub async fn start(global: Arc<Global>) -> anyhow::Result<()> {
};

let job = match Job::fetch(&global).await {
Ok(Some(job)) => job,
Ok(Some(job)) => {
tracing::debug!("fetched job");
job
}
Ok(None) => {
tracing::debug!("no jobs found");
tokio::time::sleep(config.worker.polling_interval).await;
continue;
}
Expand Down

0 comments on commit 5b18cf7

Please sign in to comment.