Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] add server-side C API #2807

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/ffi/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ mod client;
mod error;
mod http_types;
mod io;
mod server;
mod task;

pub use self::body::*;
Expand Down
236 changes: 236 additions & 0 deletions src/ffi/server.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,236 @@
use http::{Request, Response};
use std::future::Future;
use std::pin::Pin;
use std::ptr;
use std::sync::Arc;
use std::task::{Context, Poll};

use libc::{c_int, c_uint, c_void, size_t};

use crate::body::Body;
use crate::common::exec::ConnStreamExec;
use crate::server::conn::{Connection, Http};
use crate::service::Service;

use super::error::hyper_code;
use super::http_types::{hyper_request, hyper_response};
use super::io::hyper_io;
use super::task::{hyper_executor, hyper_task, WeakExec};

pub struct hyper_http(Http<WeakExec>);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You couldn't know this, but I'm not a fan of the name that's currently Http in hyper. I think this C API type might want to follow the client-side naming, so maybe hyper_serverconn_options? And then similarly with the related functions.


// Runtime-related functions are not wrapped.

ffi_fn! {
fn hyper_http_new(exec: *mut hyper_executor) -> *mut hyper_http {
let exec = non_null! { Arc::from_raw(exec) ?= ptr::null_mut() };
let weak_exec = hyper_executor::downgrade(&exec);
std::mem::forget(exec);
Box::into_raw(Box::new(hyper_http(Http::new().with_executor(weak_exec))))
}
}

ffi_fn! {
fn hyper_http_http1_only(http: *mut hyper_http, enabled: c_int) -> hyper_code {
let http = non_null! { &mut *http ?= hyper_code::HYPERE_INVALID_ARG };
http.0.http1_only(enabled != 0);
hyper_code::HYPERE_OK
}
}

ffi_fn! {
fn hyper_http_http1_half_close(http: *mut hyper_http, enabled: c_int) -> hyper_code {
let http = non_null! { &mut *http ?= hyper_code::HYPERE_INVALID_ARG };
http.0.http1_half_close(enabled != 0);
hyper_code::HYPERE_OK
}
}

ffi_fn! {
fn hyper_http_http1_keep_alive(http: *mut hyper_http, enabled: c_int) -> hyper_code {
let http = non_null! { &mut *http ?= hyper_code::HYPERE_INVALID_ARG };
http.0.http1_keep_alive(enabled != 0);
hyper_code::HYPERE_OK
}
}

ffi_fn! {
fn hyper_http_http1_title_case_headers(http: *mut hyper_http, enabled: c_int) -> hyper_code {
let http = non_null! { &mut *http ?= hyper_code::HYPERE_INVALID_ARG };
http.0.http1_title_case_headers(enabled != 0);
hyper_code::HYPERE_OK
}
}

ffi_fn! {
fn hyper_http_http1_preserve_header_case(http: *mut hyper_http, enabled: c_int) -> hyper_code {
let http = non_null! { &mut *http ?= hyper_code::HYPERE_INVALID_ARG };
http.0.http1_preserve_header_case(enabled != 0);
hyper_code::HYPERE_OK
}
}

ffi_fn! {
fn hyper_http_http1_writev(http: *mut hyper_http, enabled: c_int) -> hyper_code {
let http = non_null! { &mut *http ?= hyper_code::HYPERE_INVALID_ARG };
http.0.http1_writev(enabled != 0);
hyper_code::HYPERE_OK
}
}

ffi_fn! {
fn hyper_http_http2_only(http: *mut hyper_http, enabled: c_int) -> hyper_code {
let http = non_null! { &mut *http ?= hyper_code::HYPERE_INVALID_ARG };
http.0.http2_only(enabled != 0);
hyper_code::HYPERE_OK
}
}

ffi_fn! {
fn hyper_http_http2_initial_stream_window_size(http: *mut hyper_http, window_size: c_uint) -> hyper_code {
let http = non_null! { &mut *http ?= hyper_code::HYPERE_INVALID_ARG };
http.0.http2_initial_stream_window_size(if window_size == 0 { None } else { Some(window_size) });
hyper_code::HYPERE_OK
}
}

ffi_fn! {
fn hyper_http_http2_initial_connection_window_size(http: *mut hyper_http, window_size: c_uint) -> hyper_code {
let http = non_null! { &mut *http ?= hyper_code::HYPERE_INVALID_ARG };
http.0.http2_initial_connection_window_size(if window_size == 0 { None } else { Some(window_size) });
hyper_code::HYPERE_OK
}
}

ffi_fn! {
fn hyper_http_http2_adaptive_window(http: *mut hyper_http, enabled: c_int) -> hyper_code {
let http = non_null! { &mut *http ?= hyper_code::HYPERE_INVALID_ARG };
http.0.http2_adaptive_window(enabled != 0);
hyper_code::HYPERE_OK
}
}

ffi_fn! {
fn hyper_http_http2_max_frame_size(http: *mut hyper_http, frame_size: c_uint) -> hyper_code {
let http = non_null! { &mut *http ?= hyper_code::HYPERE_INVALID_ARG };
http.0.http2_max_frame_size(if frame_size == 0 { None } else { Some(frame_size) });
hyper_code::HYPERE_OK
}
}

ffi_fn! {
fn hyper_http_http2_max_concurrent_streams(http: *mut hyper_http, max_streams: c_uint) -> hyper_code {
let http = non_null! { &mut *http ?= hyper_code::HYPERE_INVALID_ARG };
http.0.http2_max_concurrent_streams(if max_streams == 0 { None } else { Some(max_streams) });
hyper_code::HYPERE_OK
}
}

ffi_fn! {
fn hyper_http_http2_max_send_buf_size(http: *mut hyper_http, max_buf_size: usize) -> hyper_code {
let http = non_null! { &mut *http ?= hyper_code::HYPERE_INVALID_ARG };
http.0.http2_max_send_buf_size(max_buf_size);
hyper_code::HYPERE_OK
}
}

ffi_fn! {
fn hyper_http_http2_enable_connect_protocol(http: *mut hyper_http) -> hyper_code {
let http = non_null! { &mut *http ?= hyper_code::HYPERE_INVALID_ARG };
http.0.http2_enable_connect_protocol();
hyper_code::HYPERE_OK
}
}

ffi_fn! {
fn hyper_http_max_buf_size(http: *mut hyper_http, max_buf_size: usize) -> hyper_code {
let http = non_null! { &mut *http ?= hyper_code::HYPERE_INVALID_ARG };
http.0.max_buf_size(max_buf_size);
hyper_code::HYPERE_OK
}
}

ffi_fn! {
fn hyper_http_pipeline_flush(http: *mut hyper_http, enabled: c_int) -> hyper_code {
let http = non_null! { &mut *http ?= hyper_code::HYPERE_INVALID_ARG };
http.0.pipeline_flush(enabled != 0);
hyper_code::HYPERE_OK
}
}

ffi_fn! {
fn hyper_http_serve_connection(http: *mut hyper_http, io: *mut hyper_io, service: *mut hyper_service) -> *mut hyper_serverconn {
let http = non_null! { &mut *http ?= ptr::null_mut() };
let io = non_null! { Box::from_raw(io) ?= ptr::null_mut() };
let service = non_null! { Box::from_raw(service) ?= ptr::null_mut() };

Box::into_raw(Box::new(hyper_serverconn(http.0.serve_connection(*io, *service))))
} ?= ptr::null_mut()
}

pub struct hyper_service {
service_func: hyper_service_callback,
userdata: *mut c_void,
}

type hyper_service_callback =
extern "C" fn(*mut c_void, *mut hyper_request, *mut hyper_response) -> ();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, it looks like this would require that in C, the callback is synchronous. It must return a response, it can't return a future. I worry that this might make servers in C just do everything in a blocking style, which would be bad for the server.

Would it be hard to have this callback return a *mut hyper_task? I haven't thought too hard about it


ffi_fn! {
fn hyper_service_new() -> *mut hyper_service {
Box::into_raw(Box::new(hyper_service {
service_func: service_noop,
userdata: ptr::null_mut(),
}))
} ?= ptr::null_mut()
}

ffi_fn! {
fn hyper_service_set_func(service: *mut hyper_service, func: hyper_service_callback){
let s = non_null!{ &mut *service ?= () };
s.service_func = func;
}
}

ffi_fn! {
fn hyper_service_set_userdata(service: *mut hyper_service, userdata: *mut c_void){
let s = non_null!{ &mut *service ?= () };
s.userdata = userdata;
}
}

impl Service<Request<Body>> for hyper_service {
type Response = Response<Body>;
type Error = crate::Error;
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;

fn poll_ready(&mut self, _: &mut Context) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}

fn call(&mut self, req: Request<Body>) -> Self::Future {
let req_ptr = Box::into_raw(Box::new(hyper_request(req)));
let res = Response::new(Body::empty());
let res_ptr = Box::into_raw(Box::new(hyper_response(res)));

(self.service_func)(self.userdata, req_ptr, res_ptr);

let hyper_res = non_null! {
Box::from_raw(res_ptr) ?= Box::pin(async { Err(crate::error::Error::new(
crate::error::Kind::Io
))})
};

Box::pin(async move { Ok((*hyper_res).0) })
}
}

/// cbindgen:ignore
extern "C" fn service_noop(
_userdata: *mut c_void,
_req: *mut hyper_request,
_res: *mut hyper_response,
) {
}

pub struct hyper_serverconn(Connection<hyper_io, hyper_service, WeakExec>);
17 changes: 16 additions & 1 deletion src/ffi/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@ use std::task::{Context, Poll};
use futures_util::stream::{FuturesUnordered, Stream};
use libc::c_int;

use crate::body::HttpBody;
use crate::common::exec::ConnStreamExec;
use crate::proto::h2::server::H2Stream;
use crate::rt::Executor;

use super::error::hyper_code;
use super::UserDataPointer;

Expand Down Expand Up @@ -177,14 +182,24 @@ impl WeakExec {
}
}

impl crate::rt::Executor<BoxFuture<()>> for WeakExec {
impl Executor<BoxFuture<()>> for WeakExec {
fn execute(&self, fut: BoxFuture<()>) {
if let Some(exec) = self.0.upgrade() {
exec.spawn(hyper_task::boxed(fut));
}
}
}

impl<F, B> ConnStreamExec<F, B> for WeakExec
where
H2Stream<F, B>: Future<Output = ()> + Send + 'static,
B: HttpBody,
{
fn execute_h2stream(&mut self, fut: H2Stream<F, B>) {
self.execute(Box::pin(fut))
}
}

ffi_fn! {
/// Creates a new task executor.
fn hyper_executor_new() -> *const hyper_executor {
Expand Down