Skip to content

Adding support for Cloudflare Workers #65

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Generated by Cargo
# will have compiled files and executables
/target/
/node_modules/

# Remove Cargo.lock from gitignore if creating an executable, leave it for libraries
# More information here https://doc.rust-lang.org/cargo/guide/cargo-toml-vs-cargo-lock.html
Expand All @@ -13,4 +14,4 @@
.idea
*.iml

Cargo.lock
Cargo.lock
27 changes: 23 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,17 @@ name = "schema"
path = "examples/schema.rs"
required-features = ["schemars"]

[[example]]
name = "cf_workers"
path = "examples/cf_workers.rs"
required-features = ["cf_workers"]

[lib]
crate-type = ["cdylib", "rlib"]

[features]
default = ["http_server", "rand", "uuid", "tracing-span-filter"]
cf_workers = ["worker", "web-sys", "http-body-util", "getrandom"]
hyper = ["dep:hyper", "http-body-util", "restate-sdk-shared-core/http"]
http_server = ["hyper", "hyper/server", "hyper/http2", "hyper-util", "tokio/net", "tokio/signal", "tokio/macros"]
tracing-span-filter = ["dep:tracing-subscriber"]
Expand All @@ -28,8 +37,6 @@ bytes = "1.10"
futures = "0.3"
http = "1.3"
http-body-util = { version = "0.1", optional = true }
hyper = { version = "1.6", optional = true}
hyper-util = { version = "0.1", features = ["tokio", "server", "server-graceful", "http2"], optional = true }
pin-project-lite = "0.2"
rand = { version = "0.9", optional = true }
regress = "0.10"
Expand All @@ -43,14 +50,26 @@ tokio = { version = "1.44", default-features = false, features = ["sync"] }
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["registry"], optional = true }
uuid = { version = "1.16.0", optional = true }
web-sys = {version = "0.3.70", optional = true}
worker = {version = "0.6.0", features=["http"], optional = true }
hyper = { version = "1.6", optional = true}
hyper-util = { version = "0.1", features = ["tokio", "server", "server-graceful", "http2"], optional = true }
getrandom = { version = "0.3.3", features = ["wasm_js"], optional = true}
futures-util = "0.3.31"
wasm-streams = "0.4.2"
async-stream = "0.3.6"
wasm-bindgen-futures = "0.4"

[dev-dependencies]
tokio = { version = "1", features = ["full"] }
tracing-subscriber = { version = "0.3", features = ["env-filter", "registry"] }
trybuild = "1.0"
reqwest = { version = "0.12", features = ["json"] }
rand = "0.9"
schemars = "1.0.0-alpha.17"
wasm-bindgen-test = "0.3"

[target.'cfg(not(target_family = "wasm"))'.dev-dependencies]
reqwest = { version = "0.12", features = ["json"] }
tokio = { version = "1", features = ["full"] }

[build-dependencies]
jsonptr = "0.5.1"
Expand Down
40 changes: 40 additions & 0 deletions examples/cf_workers.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
#![cfg(target_family = "wasm")]

// this is an example of a Rust file that uses the restate-sdk crate to create a Cloudflare Worker service.
// this code needs to be compiled to WebAssembly using the wrangler dev tools with extra rust flags to enable wasm support
//
// for local development use this command:
// RUSTFLAGS='--cfg getrandom_backend="wasm_js"' npx wrangler dev
//
// or push to production using this command:
// RUSTFLAGS='--cfg getrandom_backend="wasm_js"' npx wrangler deploy
//
// The Cloudflare Worker automated build pipeline doesn't currently support this code due to missing clang binaries
//

use restate_sdk::prelude::*;

#[restate_sdk::service]
trait MyService {
async fn my_handler() -> HandlerResult<()>;
}

struct MyServiceImpl;

impl MyService for MyServiceImpl {
async fn my_handler(&self, _: Context<'_>) -> HandlerResult<()> {
Ok(())
}
}

Check warning on line 28 in examples/cf_workers.rs

View workflow job for this annotation

GitHub Actions / Build and test (ubuntu-22.04)

Diff in /home/runner/work/sdk-rust/sdk-rust/examples/cf_workers.rs

#[worker::event(fetch)]
pub async fn main( req:worker::HttpRequest, _env: worker::Env, _ctx: worker::Context) -> worker::Result<http::Response<worker::Body>> {
let endpoint = Endpoint::builder()
.with_protocol_mode(restate_sdk::discovery::ProtocolMode::RequestResponse) // Cloudflare Workers don't support bidi streams
.bind(MyServiceImpl.serve())
.build();

let cf_worker = CfWorkerServer::new(endpoint);

return cf_worker.call(req).await;
}
149 changes: 149 additions & 0 deletions src/cf_workers.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@

Check warning on line 1 in src/cf_workers.rs

View workflow job for this annotation

GitHub Actions / Build and test (ubuntu-22.04)

Diff in /home/runner/work/sdk-rust/sdk-rust/src/cf_workers.rs
use std::str::FromStr;
use http::{HeaderName, HeaderValue, StatusCode};
use tokio::sync::mpsc;
use web_sys::{js_sys::Uint8Array, Reflect, Object, wasm_bindgen::{prelude::Closure, JsCast, JsValue},
ReadableStream, ReadableStreamDefaultController, ReadableStreamDefaultReader};
use wasm_bindgen_futures;
use worker::*;
use crate::{endpoint::{InputReceiver, OutputSender}, prelude::Endpoint};

// Convert Bytes to ReadableStream using Web API bindings
fn bytes_to_readable_stream(data: bytes::Bytes) -> core::result::Result<ReadableStream, JsValue> {
let underlying_source = Object::new();

let start_closure = Closure::wrap(Box::new(move |controller: ReadableStreamDefaultController| {
// Convert bytes to Uint8Array
let uint8_array = Uint8Array::new_with_length(data.len() as u32);
uint8_array.copy_from(&data[..]);

// Enqueue the data
let _ = controller.enqueue_with_chunk(&uint8_array);
let _ = controller.close();

}) as Box<dyn FnMut(ReadableStreamDefaultController)>);

Reflect::set(
&underlying_source,
&JsValue::from_str("start"),
start_closure.as_ref().unchecked_ref(),
)?;

start_closure.forget(); // Prevent cleanup

ReadableStream::new_with_underlying_source(&underlying_source)
}

/// Cloudflare Worker server to expose Restate services.
pub struct CfWorkerServer {
endpoint: Endpoint,
}

impl From<Endpoint> for CfWorkerServer {
fn from(endpoint: Endpoint) -> Self {
Self { endpoint }
}
}

Check warning on line 46 in src/cf_workers.rs

View workflow job for this annotation

GitHub Actions / Build and test (ubuntu-22.04)

Diff in /home/runner/work/sdk-rust/sdk-rust/src/cf_workers.rs

impl CfWorkerServer {

pub fn new(endpoint: Endpoint) -> Self {
Self { endpoint }
}

pub async fn call(&self, req: HttpRequest) -> worker::Result<http::Response<worker::Body>> {
let headers = req.headers().to_owned();
let (parts, request_body) = req.into_parts();
let result = self.endpoint.resolve(parts.uri.path(), headers);

Check warning on line 58 in src/cf_workers.rs

View workflow job for this annotation

GitHub Actions / Build and test (ubuntu-22.04)

Diff in /home/runner/work/sdk-rust/sdk-rust/src/cf_workers.rs
if let Ok(response) = result {
match response {
crate::endpoint::Response::ReplyNow { status_code, headers, body } => {

// convert outbound body data from a buffer into a readable stream
let readable_stream = bytes_to_readable_stream(body)?;
let mut http_response = http::Response::builder()
.status(status_code)
.body(Body::new(readable_stream))?;

for header in headers {
let key = HeaderName::from_str(header.key.as_ref())?;
let value = HeaderValue::from_str(header.value.as_ref())?;
http_response.headers_mut().insert(key, value);
}

Check warning on line 74 in src/cf_workers.rs

View workflow job for this annotation

GitHub Actions / Build and test (ubuntu-22.04)

Diff in /home/runner/work/sdk-rust/sdk-rust/src/cf_workers.rs
Ok(http_response)
}
crate::endpoint::Response::BidiStream { status_code, headers, handler} => {

// Cloudflare Workers don't support HTTP 1.1/HTTP 2 bididirectional streams
// Implenting this as a workaround using existing handler api, expecting a hyper request stream
// Reads entire request/reponse body to proxy data across WASM boundary

Check warning on line 82 in src/cf_workers.rs

View workflow job for this annotation

GitHub Actions / Build and test (ubuntu-22.04)

Diff in /home/runner/work/sdk-rust/sdk-rust/src/cf_workers.rs
let js_stream: ReadableStream = request_body.into_inner().unwrap().unchecked_into();
let reader: ReadableStreamDefaultReader = js_stream.get_reader().unchecked_into();


// Drain the inbound Request API body data from a Stream API ReadableStream into a buffer
let mut request_body = Vec::new();
loop {
let read_result = wasm_bindgen_futures::JsFuture::from(reader.read()).await;

Check warning on line 90 in src/cf_workers.rs

View workflow job for this annotation

GitHub Actions / Build and test (ubuntu-22.04)

Diff in /home/runner/work/sdk-rust/sdk-rust/src/cf_workers.rs
match read_result {
Ok(js_value) => {
let done = js_sys::Reflect::get(&js_value, &JsValue::from_str("done")).unwrap();
if done.as_bool().unwrap_or(false) {
break;
}
let value = js_sys::Reflect::get(&js_value, &JsValue::from_str("value")).unwrap();

Check warning on line 97 in src/cf_workers.rs

View workflow job for this annotation

GitHub Actions / Build and test (ubuntu-22.04)

Diff in /home/runner/work/sdk-rust/sdk-rust/src/cf_workers.rs
let uint8_array: Uint8Array = value.unchecked_into();
let mut bytes = vec![0u8; uint8_array.length() as usize];
uint8_array.copy_to(&mut bytes);
request_body.extend_from_slice(&bytes);
}
Err(_) => break,
}
}

// Create a Rust stream from the collected bytes -- this maps to existing restate sdk stream implementation for hanlders
let request_bytes = bytes::Bytes::from(request_body);
let stream = futures_util::stream::once(async move {
Ok::<bytes::Bytes, Box<dyn std::error::Error + Send + Sync>>(request_bytes)
});

let input_receiver = InputReceiver::from_stream(stream);

let (output_tx, output_rx) = mpsc::unbounded_channel();
let output_sender = OutputSender::from_channel(output_tx);

// Execute handler and collect output
let _ = handler.handle(input_receiver, output_sender).await;

// Collect all output chunks to proxy body response across WASM boundary
let mut response_body = Vec::new();
let mut rx = output_rx;
while let Some(chunk) = rx.recv().await {
response_body.extend_from_slice(&chunk);
}

Check warning on line 126 in src/cf_workers.rs

View workflow job for this annotation

GitHub Actions / Build and test (ubuntu-22.04)

Diff in /home/runner/work/sdk-rust/sdk-rust/src/cf_workers.rs

// Build HTTP Response-- converting outbound buffer into readable stream
let readable_stream = bytes_to_readable_stream(bytes::Bytes::from(response_body))?;
let mut http_response = http::Response::builder()
.status(status_code)
.body(Body::new(readable_stream))?;

for header in headers {
let key = HeaderName::from_str(header.key.as_ref())?;
let value = HeaderValue::from_str(header.value.as_ref())?;
http_response.headers_mut().insert(key, value);
}

Check warning on line 138 in src/cf_workers.rs

View workflow job for this annotation

GitHub Actions / Build and test (ubuntu-22.04)

Diff in /home/runner/work/sdk-rust/sdk-rust/src/cf_workers.rs

Ok(http_response)
},
}
}
else {
let http_response = http::Response::builder().status(StatusCode::BAD_REQUEST).body(worker::Body::empty())?;
Ok(http_response)
}
}
}
5 changes: 5 additions & 0 deletions src/endpoint/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,11 @@ impl Builder {
Self::default()
}

pub fn with_protocol_mode(mut self, mode: crate::discovery::ProtocolMode) -> Self {
self.discovery.protocol_mode = Some(mode);
self
}

/// Add a [`Service`] to this endpoint.
///
/// When using the [`service`](macro@crate::service), [`object`](macro@crate::object) or [`workflow`](macro@crate::workflow) macros,
Expand Down
5 changes: 5 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,8 @@ pub mod filter;
pub mod http_server;
#[cfg(feature = "hyper")]
pub mod hyper;
#[cfg(feature = "cf_workers")]
pub mod cf_workers;
pub mod serde;

/// Entry-point macro to define a Restate [Service](https://docs.restate.dev/concepts/services#services-1).
Expand Down Expand Up @@ -509,6 +511,9 @@ pub mod prelude {
#[cfg(feature = "http_server")]
pub use crate::http_server::HttpServer;

#[cfg(feature = "cf_workers")]
pub use crate::cf_workers::CfWorkerServer;

pub use crate::context::{
CallFuture, Context, ContextAwakeables, ContextClient, ContextPromises, ContextReadState,
ContextSideEffects, ContextTimers, ContextWriteState, HeaderMap, InvocationHandle,
Expand Down
31 changes: 31 additions & 0 deletions tests/cf_workers.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
#![cfg(target_family = "wasm")]

// test requires wasm-pack / wasm-bindgen-test framework
// use following command to execute test:
// RUSTFLAGS='--cfg getrandom_backend="wasm_js"' wasm-pack test --node -- --no-default-features --features cf_workers --test cf_workers
use restate_sdk::{cf_workers::CfWorkerServer, prelude::*};
use wasm_bindgen_test::*;

#[restate_sdk::service]
trait MyService {
async fn my_handler() -> HandlerResult<()>;
}

struct MyServiceImpl;

impl MyService for MyServiceImpl {
async fn my_handler(&self, _: Context<'_>) -> HandlerResult<()> {
Ok(())
}
}

#[wasm_bindgen_test]
fn cf_workerservice_handler() {
let endpoint = Endpoint::builder()
.bind(MyServiceImpl.serve())
.build();

let cf_server = CfWorkerServer::new(endpoint);
let health_check_request = http::Request::builder().uri("/health").body(worker::Body::empty()).unwrap();
let result = cf_server.call(health_check_request);
}
Loading