diff --git a/.ocamlformat b/.ocamlformat index 3e47755..e69de29 100644 --- a/.ocamlformat +++ b/.ocamlformat @@ -1 +0,0 @@ -version = 0.15.0 diff --git a/bench/dune b/bench/dune new file mode 100644 index 0000000..6646057 --- /dev/null +++ b/bench/dune @@ -0,0 +1,4 @@ +(executable + (name service) + (libraries bechamel notty.unix bechamel-notty grpc grpc-lwt ocaml-protoc lwt + lwt.unix h2 h2-lwt-unix bigstringaf)) diff --git a/bench/service.ml b/bench/service.ml new file mode 100644 index 0000000..612fb10 --- /dev/null +++ b/bench/service.ml @@ -0,0 +1,73 @@ +open Bechamel +open Toolkit + +let make_buffer capacity = + Staged.stage @@ fun () -> ignore (Grpc.Buffer.v ~capacity ()) + +let make_message len = + Staged.stage @@ fun () -> + ignore (Grpc.Message.make (String.init len (fun _ -> 'a'))) + +let extract_message len = + let msg = Grpc.Message.make (String.init len (fun _ -> 'a')) in + let bs = Bigstringaf.of_string ~off:0 ~len msg in + Staged.stage @@ fun () -> + let buf = Grpc.Buffer.v () in + Grpc.Buffer.copy_from_bigstringaf ~src_off:0 ~src:bs ~dst:buf ~length:len; + ignore (Grpc.Message.extract buf) + +let test_buffer = + Test.make_indexed ~name:"buffer" ~fmt:"%s %d" + ~args:[ 0; 512; 1024; 2048; 4096; 8192 ] + make_buffer + +let test_message = + Test.make_indexed ~name:"message" ~fmt:"%s %d" + ~args:[ 0; 32; 64; 128; 256; 512 ] + make_message + +let test_extract = + Test.make_indexed ~name:"extract" ~fmt:"%s %d" + ~args:[ 0; 32; 64; 128; 256; 512 ] + extract_message + +let benchmark () = + let ols = + Analyze.ols ~bootstrap:0 ~r_square:true ~predictors:Measure.[| run |] + in + let instances = + Instance.[ minor_allocated; major_allocated; monotonic_clock ] + in + let cfg = + Benchmark.cfg ~limit:2000 ~quota:(Time.second 0.5) ~kde:(Some 1000) () + in + let raw_results = + Benchmark.all cfg instances + (Test.make_grouped ~name:"make" ~fmt:"%s %s" + [ test_buffer; test_message; test_extract ]) + in + let results = + List.map (fun instance -> Analyze.all ols instance raw_results) instances + in + let results = Analyze.merge ols instances results in + (results, raw_results) + +let () = + List.iter + (fun v -> Bechamel_notty.Unit.add v (Measure.unit v)) + Instance.[ minor_allocated; major_allocated; monotonic_clock ] + +let img (window, results) = + Bechamel_notty.Multiple.image_of_ols_results ~rect:window + ~predictor:Measure.run results + +open Notty_unix + +let () = + let window = + match winsize Unix.stdout with + | Some (w, h) -> { Bechamel_notty.w; h } + | None -> { Bechamel_notty.w = 80; h = 1 } + in + let results, _ = benchmark () in + img (window, results) |> eol |> output_image diff --git a/docs/grpc-lwt/Grpc_lwt/Client/Rpc/index.html b/docs/grpc-lwt/Grpc_lwt/Client/Rpc/index.html new file mode 100644 index 0000000..e728014 --- /dev/null +++ b/docs/grpc-lwt/Grpc_lwt/Client/Rpc/index.html @@ -0,0 +1,2 @@ + +Rpc (grpc-lwt.Grpc_lwt.Client.Rpc)

Module Client.Rpc

type 'a handler = [ `write ] H2.Body.t Lwt.t -> [ `read ] H2.Body.t -> 'a Lwt.t
val bidirectional_streaming : f:((Pbrt.Encoder.t -> unit) -> Pbrt.Decoder.t Lwt_stream.t -> 'a Lwt.t) -> 'a handler

bidirectional_streaming ~f write read sets up the sending and receiving logic using write and read, then calls f with a push function for requests and a stream of responses.

val client_streaming : f:((Pbrt.Encoder.t -> unit) -> Pbrt.Decoder.t option Lwt.t -> 'a Lwt.t) -> 'a handler

client_streaming ~f write read sets up the sending and receiving logic using write and read, then calls f with a push function for requests and promise for the response.

val server_streaming : f:(Pbrt.Decoder.t Lwt_stream.t -> 'a Lwt.t) -> Pbrt.Encoder.t -> 'a handler

server_streaming ~f enc write read sets up the sending and receiving logic using write and read, then sends enc and calls f with a stream of responses.

val unary : f:(Pbrt.Decoder.t option Lwt.t -> 'a Lwt.t) -> Pbrt.Encoder.t -> 'a handler

unary ~f enc write read sets up the sending and receiving logic using write and read, then sends enc and calls f with a promise for the response.

\ No newline at end of file diff --git a/docs/grpc-lwt/Grpc_lwt/Client/index.html b/docs/grpc-lwt/Grpc_lwt/Client/index.html new file mode 100644 index 0000000..c9206f4 --- /dev/null +++ b/docs/grpc-lwt/Grpc_lwt/Client/index.html @@ -0,0 +1,2 @@ + +Client (grpc-lwt.Grpc_lwt.Client)

Module Grpc_lwt.Client

module Rpc : sig ... end
type response_handler = H2.Client_connection.response_handler
type do_request = ?⁠trailers_handler:(H2.Headers.t -> unit) -> H2.Request.t -> response_handler:response_handler -> [ `write ] H2.Body.t

do_request is the type of a function that performs the request

val call : service:string -> rpc:string -> ?⁠scheme:string -> handler:'a Rpc.handler -> do_request:do_request -> unit -> ('a * Grpc.Status.tGrpc.Status.t) Stdlib.result Lwt.t

call ~service ~rpc ~handler ~do_request () calls the rpc endpoint given by service and rpc using the do_request function. The handler is called when this request is set up to send and receive data.

\ No newline at end of file diff --git a/docs/grpc-lwt/Grpc_lwt/Rpc/index.html b/docs/grpc-lwt/Grpc_lwt/Rpc/index.html deleted file mode 100644 index 9593511..0000000 --- a/docs/grpc-lwt/Grpc_lwt/Rpc/index.html +++ /dev/null @@ -1,2 +0,0 @@ - -Rpc (grpc-lwt.Grpc_lwt.Rpc)

Module Grpc_lwt.Rpc

type unary = Pbrt.Decoder.t -> (Grpc.Status.t * Pbrt.Encoder.t option) Lwt.t

unary is the type for a unary grpc rpc, one request, one response.

type client_streaming = Pbrt.Decoder.t Lwt_stream.t -> (Grpc.Status.t * Pbrt.Encoder.t option) Lwt.t

client_streaming is the type for an rpc where the client streams the requests and the server responds once.

type server_streaming = Pbrt.Decoder.t -> (Pbrt.Encoder.t -> unit) -> Grpc.Status.t Lwt.t

server_streaming is the type for an rpc where the client sends one request and the server sends multiple responses.

type bidirectional_streaming = Pbrt.Decoder.t Lwt_stream.t -> (Pbrt.Encoder.t -> unit) -> Grpc.Status.t Lwt.t

bidirectional_streaming is the type for an rpc where both the client and server can send multiple messages.

type t =
| Unary of unary
| Client_streaming of client_streaming
| Server_streaming of server_streaming
| Bidirectional_streaming of bidirectional_streaming
val unary : f:unary -> H2.Reqd.t -> unit Lwt.t

unary ~f reqd calls f with the request obtained from reqd and handles sending the response.

val client_streaming : f:client_streaming -> H2.Reqd.t -> unit Lwt.t

client_streaming ~f reqd calls f with a stream to pull requests from and handles sending the response.

val server_streaming : f:server_streaming -> H2.Reqd.t -> unit Lwt.t

server_streaming ~f reqd calls f with the request optained from reqd and handles sending the responses pushed out.

val bidirectional_streaming : f:bidirectional_streaming -> H2.Reqd.t -> unit Lwt.t

bidirectional_streaming ~f reqd calls f with a stream to pull requests from and andles sending the responses pushed out.

\ No newline at end of file diff --git a/docs/grpc-lwt/Grpc_lwt/Server/Rpc/index.html b/docs/grpc-lwt/Grpc_lwt/Server/Rpc/index.html new file mode 100644 index 0000000..0cd0e0f --- /dev/null +++ b/docs/grpc-lwt/Grpc_lwt/Server/Rpc/index.html @@ -0,0 +1,2 @@ + +Rpc (grpc-lwt.Grpc_lwt.Server.Rpc)

Module Server.Rpc

type unary = Pbrt.Decoder.t -> (Grpc.Status.t * Pbrt.Encoder.t option) Lwt.t

unary is the type for a unary grpc rpc, one request, one response.

type client_streaming = Pbrt.Decoder.t Lwt_stream.t -> (Grpc.Status.t * Pbrt.Encoder.t option) Lwt.t

client_streaming is the type for an rpc where the client streams the requests and the server responds once.

type server_streaming = Pbrt.Decoder.t -> (Pbrt.Encoder.t -> unit) -> Grpc.Status.t Lwt.t

server_streaming is the type for an rpc where the client sends one request and the server sends multiple responses.

type bidirectional_streaming = Pbrt.Decoder.t Lwt_stream.t -> (Pbrt.Encoder.t -> unit) -> Grpc.Status.t Lwt.t

bidirectional_streaming is the type for an rpc where both the client and server can send multiple messages.

type t =
| Unary of unary
| Client_streaming of client_streaming
| Server_streaming of server_streaming
| Bidirectional_streaming of bidirectional_streaming
val unary : f:unary -> H2.Reqd.t -> unit Lwt.t

unary ~f reqd calls f with the request obtained from reqd and handles sending the response.

val client_streaming : f:client_streaming -> H2.Reqd.t -> unit Lwt.t

client_streaming ~f reqd calls f with a stream to pull requests from and handles sending the response.

val server_streaming : f:server_streaming -> H2.Reqd.t -> unit Lwt.t

server_streaming ~f reqd calls f with the request optained from reqd and handles sending the responses pushed out.

val bidirectional_streaming : f:bidirectional_streaming -> H2.Reqd.t -> unit Lwt.t

bidirectional_streaming ~f reqd calls f with a stream to pull requests from and andles sending the responses pushed out.

\ No newline at end of file diff --git a/docs/grpc-lwt/Grpc_lwt/Server/Service/index.html b/docs/grpc-lwt/Grpc_lwt/Server/Service/index.html new file mode 100644 index 0000000..5cfb1e6 --- /dev/null +++ b/docs/grpc-lwt/Grpc_lwt/Server/Service/index.html @@ -0,0 +1,2 @@ + +Service (grpc-lwt.Grpc_lwt.Server.Service)

Module Server.Service

type t

t represents a gRPC service with potentially multiple rpcs and the information needed to route to them.

val v : unit -> t

v () creates a new service

val add_rpc : name:string -> rpc:Rpc.t -> t -> t

add_rpc ~name ~rpc t adds rpc to t and ensures that t can route to it with name.

val handle_request : t -> H2.Reqd.t -> unit

handle_request t reqd handles routing reqd to the correct rpc if available in t.

\ No newline at end of file diff --git a/docs/grpc-lwt/Grpc_lwt/Server/index.html b/docs/grpc-lwt/Grpc_lwt/Server/index.html index d0f6c76..ae2eea8 100644 --- a/docs/grpc-lwt/Grpc_lwt/Server/index.html +++ b/docs/grpc-lwt/Grpc_lwt/Server/index.html @@ -1,2 +1,2 @@ -Server (grpc-lwt.Grpc_lwt.Server)

Module Grpc_lwt.Server

include Grpc.Server.S
type t

t represents a server and its associated services and routing information.

val v : unit -> t

v () creates a new server.

val add_service : name:string -> service:(H2.Reqd.t -> unit) -> t -> t

add_service ~name ~service t adds service to t and ensures that it is routable via name.

val handle_request : t -> H2.Reqd.t -> unit

handle_request t reqd routes reqd to the appropriate service in t if available.

\ No newline at end of file +Server (grpc-lwt.Grpc_lwt.Server)

Module Grpc_lwt.Server

include Grpc.Server.S
type t

t represents a server and its associated services and routing information.

val v : unit -> t

v () creates a new server.

val add_service : name:string -> service:(H2.Reqd.t -> unit) -> t -> t

add_service ~name ~service t adds service to t and ensures that it is routable via name.

val handle_request : t -> H2.Reqd.t -> unit

handle_request t reqd routes reqd to the appropriate service in t if available.

module Rpc : sig ... end
module Service : sig ... end
\ No newline at end of file diff --git a/docs/grpc-lwt/Grpc_lwt/Service/index.html b/docs/grpc-lwt/Grpc_lwt/Service/index.html deleted file mode 100644 index 9ab0e6d..0000000 --- a/docs/grpc-lwt/Grpc_lwt/Service/index.html +++ /dev/null @@ -1,2 +0,0 @@ - -Service (grpc-lwt.Grpc_lwt.Service)

Module Grpc_lwt.Service

type t

t represents a gRPC service with potentially multiple rpcs and the information needed to route to them.

val v : unit -> t

v () creates a new service

val add_rpc : name:string -> rpc:Rpc.t -> t -> t

add_rpc ~name ~rpc t adds rpc to t and ensures that t can route to it with name.

val handle_request : t -> H2.Reqd.t -> unit

handle_request t reqd handles routing reqd to the correct rpc if available in t.

\ No newline at end of file diff --git a/docs/grpc-lwt/Grpc_lwt/index.html b/docs/grpc-lwt/Grpc_lwt/index.html index c27a1d0..821335c 100644 --- a/docs/grpc-lwt/Grpc_lwt/index.html +++ b/docs/grpc-lwt/Grpc_lwt/index.html @@ -1,2 +1,2 @@ -Grpc_lwt (grpc-lwt.Grpc_lwt)

Module Grpc_lwt

module Server : sig ... end
module Service : sig ... end
module Rpc : sig ... end
\ No newline at end of file +Grpc_lwt (grpc-lwt.Grpc_lwt)

Module Grpc_lwt

module Server : sig ... end
module Client : sig ... end
\ No newline at end of file diff --git a/docs/grpc-lwt/Grpc_lwt__/Client/Rpc/index.html b/docs/grpc-lwt/Grpc_lwt__/Client/Rpc/index.html new file mode 100644 index 0000000..480a1f7 --- /dev/null +++ b/docs/grpc-lwt/Grpc_lwt__/Client/Rpc/index.html @@ -0,0 +1,2 @@ + +Rpc (grpc-lwt.Grpc_lwt__.Client.Rpc)

Module Client.Rpc

type 'a handler = [ `write ] H2.Body.t Lwt.t -> [ `read ] H2.Body.t -> 'a Lwt.t
val bidirectional_streaming : f:((Pbrt.Encoder.t -> unit) -> Pbrt.Decoder.t Lwt_stream.t -> 'a Lwt.t) -> 'a handler

bidirectional_streaming ~f write read sets up the sending and receiving logic using write and read, then calls f with a push function for requests and a stream of responses.

val client_streaming : f:((Pbrt.Encoder.t -> unit) -> Pbrt.Decoder.t option Lwt.t -> 'a Lwt.t) -> 'a handler

client_streaming ~f write read sets up the sending and receiving logic using write and read, then calls f with a push function for requests and promise for the response.

val server_streaming : f:(Pbrt.Decoder.t Lwt_stream.t -> 'a Lwt.t) -> Pbrt.Encoder.t -> 'a handler

server_streaming ~f enc write read sets up the sending and receiving logic using write and read, then sends enc and calls f with a stream of responses.

val unary : f:(Pbrt.Decoder.t option Lwt.t -> 'a Lwt.t) -> Pbrt.Encoder.t -> 'a handler

unary ~f enc write read sets up the sending and receiving logic using write and read, then sends enc and calls f with a promise for the response.

\ No newline at end of file diff --git a/docs/grpc-lwt/Grpc_lwt__/Client/index.html b/docs/grpc-lwt/Grpc_lwt__/Client/index.html new file mode 100644 index 0000000..c114af9 --- /dev/null +++ b/docs/grpc-lwt/Grpc_lwt__/Client/index.html @@ -0,0 +1,2 @@ + +Client (grpc-lwt.Grpc_lwt__.Client)

Module Grpc_lwt__.Client

module Rpc : sig ... end
type response_handler = H2.Client_connection.response_handler
type do_request = ?⁠trailers_handler:(H2.Headers.t -> unit) -> H2.Request.t -> response_handler:response_handler -> [ `write ] H2.Body.t

do_request is the type of a function that performs the request

val call : service:string -> rpc:string -> ?⁠scheme:string -> handler:'a Rpc.handler -> do_request:do_request -> unit -> ('a * Grpc.Status.tGrpc.Status.t) Stdlib.result Lwt.t

call ~service ~rpc ~handler ~do_request () calls the rpc endpoint given by service and rpc using the do_request function. The handler is called when this request is set up to send and receive data.

\ No newline at end of file diff --git a/docs/grpc-lwt/Grpc_lwt__/Connection/index.html b/docs/grpc-lwt/Grpc_lwt__/Connection/index.html index 5704fcd..73d489d 100644 --- a/docs/grpc-lwt/Grpc_lwt__/Connection/index.html +++ b/docs/grpc-lwt/Grpc_lwt__/Connection/index.html @@ -1,2 +1,2 @@ -Connection (grpc-lwt.Grpc_lwt__.Connection)

Module Grpc_lwt__.Connection

val grpc_recv_streaming : H2.Reqd.t -> (Pbrt.Decoder.t option -> unit) -> unit
val grpc_send_streaming : H2.Reqd.t -> Pbrt.Encoder.t Lwt_stream.t -> Grpc.Status.t Lwt_mvar.t -> unit Lwt.t
\ No newline at end of file +Connection (grpc-lwt.Grpc_lwt__.Connection)

Module Grpc_lwt__.Connection

val grpc_recv_streaming : [ `read ] H2.Body.t -> (Pbrt.Decoder.t option -> unit) -> unit
val grpc_send_streaming_client : [ `write ] H2.Body.t -> Pbrt.Encoder.t Lwt_stream.t -> unit Lwt.t
val grpc_send_streaming : H2.Reqd.t -> Pbrt.Encoder.t Lwt_stream.t -> Grpc.Status.t Lwt_mvar.t -> unit Lwt.t
\ No newline at end of file diff --git a/docs/grpc-lwt/Grpc_lwt__/Rpc/index.html b/docs/grpc-lwt/Grpc_lwt__/Rpc/index.html deleted file mode 100644 index 541f1c5..0000000 --- a/docs/grpc-lwt/Grpc_lwt__/Rpc/index.html +++ /dev/null @@ -1,2 +0,0 @@ - -Rpc (grpc-lwt.Grpc_lwt__.Rpc)

Module Grpc_lwt__.Rpc

type unary = Pbrt.Decoder.t -> (Grpc.Status.t * Pbrt.Encoder.t option) Lwt.t

unary is the type for a unary grpc rpc, one request, one response.

type client_streaming = Pbrt.Decoder.t Lwt_stream.t -> (Grpc.Status.t * Pbrt.Encoder.t option) Lwt.t

client_streaming is the type for an rpc where the client streams the requests and the server responds once.

type server_streaming = Pbrt.Decoder.t -> (Pbrt.Encoder.t -> unit) -> Grpc.Status.t Lwt.t

server_streaming is the type for an rpc where the client sends one request and the server sends multiple responses.

type bidirectional_streaming = Pbrt.Decoder.t Lwt_stream.t -> (Pbrt.Encoder.t -> unit) -> Grpc.Status.t Lwt.t

bidirectional_streaming is the type for an rpc where both the client and server can send multiple messages.

type t =
| Unary of unary
| Client_streaming of client_streaming
| Server_streaming of server_streaming
| Bidirectional_streaming of bidirectional_streaming
val unary : f:unary -> H2.Reqd.t -> unit Lwt.t

unary ~f reqd calls f with the request obtained from reqd and handles sending the response.

val client_streaming : f:client_streaming -> H2.Reqd.t -> unit Lwt.t

client_streaming ~f reqd calls f with a stream to pull requests from and handles sending the response.

val server_streaming : f:server_streaming -> H2.Reqd.t -> unit Lwt.t

server_streaming ~f reqd calls f with the request optained from reqd and handles sending the responses pushed out.

val bidirectional_streaming : f:bidirectional_streaming -> H2.Reqd.t -> unit Lwt.t

bidirectional_streaming ~f reqd calls f with a stream to pull requests from and andles sending the responses pushed out.

\ No newline at end of file diff --git a/docs/grpc-lwt/Grpc_lwt__/Server/Rpc/index.html b/docs/grpc-lwt/Grpc_lwt__/Server/Rpc/index.html new file mode 100644 index 0000000..74f2e4e --- /dev/null +++ b/docs/grpc-lwt/Grpc_lwt__/Server/Rpc/index.html @@ -0,0 +1,2 @@ + +Rpc (grpc-lwt.Grpc_lwt__.Server.Rpc)

Module Server.Rpc

type unary = Pbrt.Decoder.t -> (Grpc.Status.t * Pbrt.Encoder.t option) Lwt.t

unary is the type for a unary grpc rpc, one request, one response.

type client_streaming = Pbrt.Decoder.t Lwt_stream.t -> (Grpc.Status.t * Pbrt.Encoder.t option) Lwt.t

client_streaming is the type for an rpc where the client streams the requests and the server responds once.

type server_streaming = Pbrt.Decoder.t -> (Pbrt.Encoder.t -> unit) -> Grpc.Status.t Lwt.t

server_streaming is the type for an rpc where the client sends one request and the server sends multiple responses.

type bidirectional_streaming = Pbrt.Decoder.t Lwt_stream.t -> (Pbrt.Encoder.t -> unit) -> Grpc.Status.t Lwt.t

bidirectional_streaming is the type for an rpc where both the client and server can send multiple messages.

type t =
| Unary of unary
| Client_streaming of client_streaming
| Server_streaming of server_streaming
| Bidirectional_streaming of bidirectional_streaming
val unary : f:unary -> H2.Reqd.t -> unit Lwt.t

unary ~f reqd calls f with the request obtained from reqd and handles sending the response.

val client_streaming : f:client_streaming -> H2.Reqd.t -> unit Lwt.t

client_streaming ~f reqd calls f with a stream to pull requests from and handles sending the response.

val server_streaming : f:server_streaming -> H2.Reqd.t -> unit Lwt.t

server_streaming ~f reqd calls f with the request optained from reqd and handles sending the responses pushed out.

val bidirectional_streaming : f:bidirectional_streaming -> H2.Reqd.t -> unit Lwt.t

bidirectional_streaming ~f reqd calls f with a stream to pull requests from and andles sending the responses pushed out.

\ No newline at end of file diff --git a/docs/grpc-lwt/Grpc_lwt__/Server/Service/index.html b/docs/grpc-lwt/Grpc_lwt__/Server/Service/index.html new file mode 100644 index 0000000..ba94397 --- /dev/null +++ b/docs/grpc-lwt/Grpc_lwt__/Server/Service/index.html @@ -0,0 +1,2 @@ + +Service (grpc-lwt.Grpc_lwt__.Server.Service)

Module Server.Service

type t

t represents a gRPC service with potentially multiple rpcs and the information needed to route to them.

val v : unit -> t

v () creates a new service

val add_rpc : name:string -> rpc:Rpc.t -> t -> t

add_rpc ~name ~rpc t adds rpc to t and ensures that t can route to it with name.

val handle_request : t -> H2.Reqd.t -> unit

handle_request t reqd handles routing reqd to the correct rpc if available in t.

\ No newline at end of file diff --git a/docs/grpc-lwt/Grpc_lwt__/Server/index.html b/docs/grpc-lwt/Grpc_lwt__/Server/index.html index e30658a..94fc507 100644 --- a/docs/grpc-lwt/Grpc_lwt__/Server/index.html +++ b/docs/grpc-lwt/Grpc_lwt__/Server/index.html @@ -1,2 +1,2 @@ -Server (grpc-lwt.Grpc_lwt__.Server)

Module Grpc_lwt__.Server

include Grpc.Server.S
type t

t represents a server and its associated services and routing information.

val v : unit -> t

v () creates a new server.

val add_service : name:string -> service:(H2.Reqd.t -> unit) -> t -> t

add_service ~name ~service t adds service to t and ensures that it is routable via name.

val handle_request : t -> H2.Reqd.t -> unit

handle_request t reqd routes reqd to the appropriate service in t if available.

\ No newline at end of file +Server (grpc-lwt.Grpc_lwt__.Server)

Module Grpc_lwt__.Server

include Grpc.Server.S
type t

t represents a server and its associated services and routing information.

val v : unit -> t

v () creates a new server.

val add_service : name:string -> service:(H2.Reqd.t -> unit) -> t -> t

add_service ~name ~service t adds service to t and ensures that it is routable via name.

val handle_request : t -> H2.Reqd.t -> unit

handle_request t reqd routes reqd to the appropriate service in t if available.

module Rpc : sig ... end
module Service : sig ... end
\ No newline at end of file diff --git a/docs/grpc-lwt/Grpc_lwt__/Service/index.html b/docs/grpc-lwt/Grpc_lwt__/Service/index.html deleted file mode 100644 index cd694a3..0000000 --- a/docs/grpc-lwt/Grpc_lwt__/Service/index.html +++ /dev/null @@ -1,2 +0,0 @@ - -Service (grpc-lwt.Grpc_lwt__.Service)

Module Grpc_lwt__.Service

type t

t represents a gRPC service with potentially multiple rpcs and the information needed to route to them.

val v : unit -> t

v () creates a new service

val add_rpc : name:string -> rpc:Grpc_lwt.Rpc.t -> t -> t

add_rpc ~name ~rpc t adds rpc to t and ensures that t can route to it with name.

val handle_request : t -> H2.Reqd.t -> unit

handle_request t reqd handles routing reqd to the correct rpc if available in t.

\ No newline at end of file diff --git a/docs/grpc-lwt/Grpc_lwt__/index.html b/docs/grpc-lwt/Grpc_lwt__/index.html index 6427644..17acdb0 100644 --- a/docs/grpc-lwt/Grpc_lwt__/index.html +++ b/docs/grpc-lwt/Grpc_lwt__/index.html @@ -1,2 +1,2 @@ -Grpc_lwt__ (grpc-lwt.Grpc_lwt__)

Module Grpc_lwt__

module Connection : sig ... end
module Rpc : sig ... end
module Server : sig ... end
module Service : sig ... end
\ No newline at end of file +Grpc_lwt__ (grpc-lwt.Grpc_lwt__)

Module Grpc_lwt__

module Client : sig ... end
module Connection : sig ... end
module Server : sig ... end
\ No newline at end of file diff --git a/docs/grpc-lwt/Grpc_lwt__Rpc/.dune-keep b/docs/grpc-lwt/Grpc_lwt__Client/.dune-keep similarity index 100% rename from docs/grpc-lwt/Grpc_lwt__Rpc/.dune-keep rename to docs/grpc-lwt/Grpc_lwt__Client/.dune-keep diff --git a/docs/grpc-lwt/Grpc_lwt__Client/Rpc/index.html b/docs/grpc-lwt/Grpc_lwt__Client/Rpc/index.html new file mode 100644 index 0000000..bf852fa --- /dev/null +++ b/docs/grpc-lwt/Grpc_lwt__Client/Rpc/index.html @@ -0,0 +1,2 @@ + +Rpc (grpc-lwt.Grpc_lwt__Client.Rpc)

Module Grpc_lwt__Client.Rpc

type 'a handler = [ `write ] H2.Body.t Lwt.t -> [ `read ] H2.Body.t -> 'a Lwt.t
val bidirectional_streaming : f:((Pbrt.Encoder.t -> unit) -> Pbrt.Decoder.t Lwt_stream.t -> 'a Lwt.t) -> 'a handler

bidirectional_streaming ~f write read sets up the sending and receiving logic using write and read, then calls f with a push function for requests and a stream of responses.

val client_streaming : f:((Pbrt.Encoder.t -> unit) -> Pbrt.Decoder.t option Lwt.t -> 'a Lwt.t) -> 'a handler

client_streaming ~f write read sets up the sending and receiving logic using write and read, then calls f with a push function for requests and promise for the response.

val server_streaming : f:(Pbrt.Decoder.t Lwt_stream.t -> 'a Lwt.t) -> Pbrt.Encoder.t -> 'a handler

server_streaming ~f enc write read sets up the sending and receiving logic using write and read, then sends enc and calls f with a stream of responses.

val unary : f:(Pbrt.Decoder.t option Lwt.t -> 'a Lwt.t) -> Pbrt.Encoder.t -> 'a handler

unary ~f enc write read sets up the sending and receiving logic using write and read, then sends enc and calls f with a promise for the response.

\ No newline at end of file diff --git a/docs/grpc-lwt/Grpc_lwt__Client/index.html b/docs/grpc-lwt/Grpc_lwt__Client/index.html new file mode 100644 index 0000000..6563163 --- /dev/null +++ b/docs/grpc-lwt/Grpc_lwt__Client/index.html @@ -0,0 +1,2 @@ + +Grpc_lwt__Client (grpc-lwt.Grpc_lwt__Client)

Module Grpc_lwt__Client

module Rpc : sig ... end
type response_handler = H2.Client_connection.response_handler
type do_request = ?⁠trailers_handler:(H2.Headers.t -> unit) -> H2.Request.t -> response_handler:response_handler -> [ `write ] H2.Body.t

do_request is the type of a function that performs the request

val call : service:string -> rpc:string -> ?⁠scheme:string -> handler:'a Rpc.handler -> do_request:do_request -> unit -> ('a * Grpc.Status.tGrpc.Status.t) Stdlib.result Lwt.t

call ~service ~rpc ~handler ~do_request () calls the rpc endpoint given by service and rpc using the do_request function. The handler is called when this request is set up to send and receive data.

\ No newline at end of file diff --git a/docs/grpc-lwt/Grpc_lwt__Connection/index.html b/docs/grpc-lwt/Grpc_lwt__Connection/index.html index 4f2d59b..5ce664c 100644 --- a/docs/grpc-lwt/Grpc_lwt__Connection/index.html +++ b/docs/grpc-lwt/Grpc_lwt__Connection/index.html @@ -1,2 +1,2 @@ -Grpc_lwt__Connection (grpc-lwt.Grpc_lwt__Connection)

Module Grpc_lwt__Connection

val grpc_recv_streaming : H2.Reqd.t -> (Pbrt.Decoder.t option -> unit) -> unit
val grpc_send_streaming : H2.Reqd.t -> Pbrt.Encoder.t Lwt_stream.t -> Grpc.Status.t Lwt_mvar.t -> unit Lwt.t
\ No newline at end of file +Grpc_lwt__Connection (grpc-lwt.Grpc_lwt__Connection)

Module Grpc_lwt__Connection

val grpc_recv_streaming : [ `read ] H2.Body.t -> (Pbrt.Decoder.t option -> unit) -> unit
val grpc_send_streaming_client : [ `write ] H2.Body.t -> Pbrt.Encoder.t Lwt_stream.t -> unit Lwt.t
val grpc_send_streaming : H2.Reqd.t -> Pbrt.Encoder.t Lwt_stream.t -> Grpc.Status.t Lwt_mvar.t -> unit Lwt.t
\ No newline at end of file diff --git a/docs/grpc-lwt/Grpc_lwt__Rpc/index.html b/docs/grpc-lwt/Grpc_lwt__Rpc/index.html deleted file mode 100644 index 2344e19..0000000 --- a/docs/grpc-lwt/Grpc_lwt__Rpc/index.html +++ /dev/null @@ -1,2 +0,0 @@ - -Grpc_lwt__Rpc (grpc-lwt.Grpc_lwt__Rpc)

Module Grpc_lwt__Rpc

type unary = Pbrt.Decoder.t -> (Grpc.Status.t * Pbrt.Encoder.t option) Lwt.t

unary is the type for a unary grpc rpc, one request, one response.

type client_streaming = Pbrt.Decoder.t Lwt_stream.t -> (Grpc.Status.t * Pbrt.Encoder.t option) Lwt.t

client_streaming is the type for an rpc where the client streams the requests and the server responds once.

type server_streaming = Pbrt.Decoder.t -> (Pbrt.Encoder.t -> unit) -> Grpc.Status.t Lwt.t

server_streaming is the type for an rpc where the client sends one request and the server sends multiple responses.

type bidirectional_streaming = Pbrt.Decoder.t Lwt_stream.t -> (Pbrt.Encoder.t -> unit) -> Grpc.Status.t Lwt.t

bidirectional_streaming is the type for an rpc where both the client and server can send multiple messages.

type t =
| Unary of unary
| Client_streaming of client_streaming
| Server_streaming of server_streaming
| Bidirectional_streaming of bidirectional_streaming
val unary : f:unary -> H2.Reqd.t -> unit Lwt.t

unary ~f reqd calls f with the request obtained from reqd and handles sending the response.

val client_streaming : f:client_streaming -> H2.Reqd.t -> unit Lwt.t

client_streaming ~f reqd calls f with a stream to pull requests from and handles sending the response.

val server_streaming : f:server_streaming -> H2.Reqd.t -> unit Lwt.t

server_streaming ~f reqd calls f with the request optained from reqd and handles sending the responses pushed out.

val bidirectional_streaming : f:bidirectional_streaming -> H2.Reqd.t -> unit Lwt.t

bidirectional_streaming ~f reqd calls f with a stream to pull requests from and andles sending the responses pushed out.

\ No newline at end of file diff --git a/docs/grpc-lwt/Grpc_lwt__Server/Rpc/index.html b/docs/grpc-lwt/Grpc_lwt__Server/Rpc/index.html new file mode 100644 index 0000000..1895941 --- /dev/null +++ b/docs/grpc-lwt/Grpc_lwt__Server/Rpc/index.html @@ -0,0 +1,2 @@ + +Rpc (grpc-lwt.Grpc_lwt__Server.Rpc)

Module Grpc_lwt__Server.Rpc

type unary = Pbrt.Decoder.t -> (Grpc.Status.t * Pbrt.Encoder.t option) Lwt.t

unary is the type for a unary grpc rpc, one request, one response.

type client_streaming = Pbrt.Decoder.t Lwt_stream.t -> (Grpc.Status.t * Pbrt.Encoder.t option) Lwt.t

client_streaming is the type for an rpc where the client streams the requests and the server responds once.

type server_streaming = Pbrt.Decoder.t -> (Pbrt.Encoder.t -> unit) -> Grpc.Status.t Lwt.t

server_streaming is the type for an rpc where the client sends one request and the server sends multiple responses.

type bidirectional_streaming = Pbrt.Decoder.t Lwt_stream.t -> (Pbrt.Encoder.t -> unit) -> Grpc.Status.t Lwt.t

bidirectional_streaming is the type for an rpc where both the client and server can send multiple messages.

type t =
| Unary of unary
| Client_streaming of client_streaming
| Server_streaming of server_streaming
| Bidirectional_streaming of bidirectional_streaming
val unary : f:unary -> H2.Reqd.t -> unit Lwt.t

unary ~f reqd calls f with the request obtained from reqd and handles sending the response.

val client_streaming : f:client_streaming -> H2.Reqd.t -> unit Lwt.t

client_streaming ~f reqd calls f with a stream to pull requests from and handles sending the response.

val server_streaming : f:server_streaming -> H2.Reqd.t -> unit Lwt.t

server_streaming ~f reqd calls f with the request optained from reqd and handles sending the responses pushed out.

val bidirectional_streaming : f:bidirectional_streaming -> H2.Reqd.t -> unit Lwt.t

bidirectional_streaming ~f reqd calls f with a stream to pull requests from and andles sending the responses pushed out.

\ No newline at end of file diff --git a/docs/grpc-lwt/Grpc_lwt__Server/Service/index.html b/docs/grpc-lwt/Grpc_lwt__Server/Service/index.html new file mode 100644 index 0000000..73dfa1b --- /dev/null +++ b/docs/grpc-lwt/Grpc_lwt__Server/Service/index.html @@ -0,0 +1,2 @@ + +Service (grpc-lwt.Grpc_lwt__Server.Service)

Module Grpc_lwt__Server.Service

type t

t represents a gRPC service with potentially multiple rpcs and the information needed to route to them.

val v : unit -> t

v () creates a new service

val add_rpc : name:string -> rpc:Rpc.t -> t -> t

add_rpc ~name ~rpc t adds rpc to t and ensures that t can route to it with name.

val handle_request : t -> H2.Reqd.t -> unit

handle_request t reqd handles routing reqd to the correct rpc if available in t.

\ No newline at end of file diff --git a/docs/grpc-lwt/Grpc_lwt__Server/index.html b/docs/grpc-lwt/Grpc_lwt__Server/index.html index 6e20410..947119c 100644 --- a/docs/grpc-lwt/Grpc_lwt__Server/index.html +++ b/docs/grpc-lwt/Grpc_lwt__Server/index.html @@ -1,2 +1,2 @@ -Grpc_lwt__Server (grpc-lwt.Grpc_lwt__Server)

Module Grpc_lwt__Server

include Grpc.Server.S
type t

t represents a server and its associated services and routing information.

val v : unit -> t

v () creates a new server.

val add_service : name:string -> service:(H2.Reqd.t -> unit) -> t -> t

add_service ~name ~service t adds service to t and ensures that it is routable via name.

val handle_request : t -> H2.Reqd.t -> unit

handle_request t reqd routes reqd to the appropriate service in t if available.

\ No newline at end of file +Grpc_lwt__Server (grpc-lwt.Grpc_lwt__Server)

Module Grpc_lwt__Server

include Grpc.Server.S
type t

t represents a server and its associated services and routing information.

val v : unit -> t

v () creates a new server.

val add_service : name:string -> service:(H2.Reqd.t -> unit) -> t -> t

add_service ~name ~service t adds service to t and ensures that it is routable via name.

val handle_request : t -> H2.Reqd.t -> unit

handle_request t reqd routes reqd to the appropriate service in t if available.

module Rpc : sig ... end
module Service : sig ... end
\ No newline at end of file diff --git a/docs/grpc-lwt/Grpc_lwt__Service/.dune-keep b/docs/grpc-lwt/Grpc_lwt__Service/.dune-keep deleted file mode 100644 index e69de29..0000000 diff --git a/docs/grpc-lwt/Grpc_lwt__Service/index.html b/docs/grpc-lwt/Grpc_lwt__Service/index.html deleted file mode 100644 index 32ea142..0000000 --- a/docs/grpc-lwt/Grpc_lwt__Service/index.html +++ /dev/null @@ -1,2 +0,0 @@ - -Grpc_lwt__Service (grpc-lwt.Grpc_lwt__Service)

Module Grpc_lwt__Service

type t

t represents a gRPC service with potentially multiple rpcs and the information needed to route to them.

val v : unit -> t

v () creates a new service

val add_rpc : name:string -> rpc:Grpc_lwt.Rpc.t -> t -> t

add_rpc ~name ~rpc t adds rpc to t and ensures that t can route to it with name.

val handle_request : t -> H2.Reqd.t -> unit

handle_request t reqd handles routing reqd to the correct rpc if available in t.

\ No newline at end of file diff --git a/docs/grpc/Grpc/Status/index.html b/docs/grpc/Grpc/Status/index.html index 2d1e0a2..0b5c666 100644 --- a/docs/grpc/Grpc/Status/index.html +++ b/docs/grpc/Grpc/Status/index.html @@ -1,2 +1,2 @@ -Status (grpc.Grpc.Status)

Module Grpc.Status

type code =
| OK
| Cancelled
| Unknown
| Invalid_argument
| Deadline_exceeded
| Not_found
| Already_exists
| Permission_denied
| Resource_exhausted
| Failed_precondition
| Aborted
| Out_of_range
| Unimplemented
| Internal
| Unavailable
| Data_loss
| Unauthenticated
val int_of_code : code -> int

int_of_code c returns the corresponding integer status code for c.

type t

t represents a full gRPC status, this includes code and optional message.

val v : ?⁠message:string -> code -> t

v ~message code creates a new status with the given code and message.

val code : t -> code

code t returns the code associated with t.

val message : t -> string option

message t returns the message associated with t, if there is one.

\ No newline at end of file +Status (grpc.Grpc.Status)

Module Grpc.Status

type code =
| OK
| Cancelled
| Unknown
| Invalid_argument
| Deadline_exceeded
| Not_found
| Already_exists
| Permission_denied
| Resource_exhausted
| Failed_precondition
| Aborted
| Out_of_range
| Unimplemented
| Internal
| Unavailable
| Data_loss
| Unauthenticated
val int_of_code : code -> int

int_of_code c returns the corresponding integer status code for c.

val code_of_int : int -> code option

code_of_int i returns the corresponding code for i if it exists.

type t

t represents a full gRPC status, this includes code and optional message.

val v : ?⁠message:string -> code -> t

v ~message code creates a new status with the given code and message.

val code : t -> code

code t returns the code associated with t.

val message : t -> string option

message t returns the message associated with t, if there is one.

\ No newline at end of file diff --git a/docs/grpc/Grpc__/Client/index.html b/docs/grpc/Grpc__/Client/index.html deleted file mode 100644 index 782aaf8..0000000 --- a/docs/grpc/Grpc__/Client/index.html +++ /dev/null @@ -1,2 +0,0 @@ - -Client (grpc.Grpc__.Client)

Module Grpc__.Client

\ No newline at end of file diff --git a/docs/grpc/Grpc__/Status/index.html b/docs/grpc/Grpc__/Status/index.html index 525eb1f..43d5ce1 100644 --- a/docs/grpc/Grpc__/Status/index.html +++ b/docs/grpc/Grpc__/Status/index.html @@ -1,2 +1,2 @@ -Status (grpc.Grpc__.Status)

Module Grpc__.Status

type code =
| OK
| Cancelled
| Unknown
| Invalid_argument
| Deadline_exceeded
| Not_found
| Already_exists
| Permission_denied
| Resource_exhausted
| Failed_precondition
| Aborted
| Out_of_range
| Unimplemented
| Internal
| Unavailable
| Data_loss
| Unauthenticated
val int_of_code : code -> int

int_of_code c returns the corresponding integer status code for c.

type t

t represents a full gRPC status, this includes code and optional message.

val v : ?⁠message:string -> code -> t

v ~message code creates a new status with the given code and message.

val code : t -> code

code t returns the code associated with t.

val message : t -> string option

message t returns the message associated with t, if there is one.

\ No newline at end of file +Status (grpc.Grpc__.Status)

Module Grpc__.Status

type code =
| OK
| Cancelled
| Unknown
| Invalid_argument
| Deadline_exceeded
| Not_found
| Already_exists
| Permission_denied
| Resource_exhausted
| Failed_precondition
| Aborted
| Out_of_range
| Unimplemented
| Internal
| Unavailable
| Data_loss
| Unauthenticated
val int_of_code : code -> int

int_of_code c returns the corresponding integer status code for c.

val code_of_int : int -> code option

code_of_int i returns the corresponding code for i if it exists.

type t

t represents a full gRPC status, this includes code and optional message.

val v : ?⁠message:string -> code -> t

v ~message code creates a new status with the given code and message.

val code : t -> code

code t returns the code associated with t.

val message : t -> string option

message t returns the message associated with t, if there is one.

\ No newline at end of file diff --git a/docs/grpc/Grpc__/index.html b/docs/grpc/Grpc__/index.html index 499075e..3de6ee2 100644 --- a/docs/grpc/Grpc__/index.html +++ b/docs/grpc/Grpc__/index.html @@ -1,2 +1,2 @@ -Grpc__ (grpc.Grpc__)

Module Grpc__

module Buffer : sig ... end
module Client : sig ... end
module Message : sig ... end
module Server : sig ... end
module Status : sig ... end
\ No newline at end of file +Grpc__ (grpc.Grpc__)

Module Grpc__

module Buffer : sig ... end
module Message : sig ... end
module Server : sig ... end
module Status : sig ... end
\ No newline at end of file diff --git a/docs/grpc/Grpc__Client/index.html b/docs/grpc/Grpc__Client/index.html index c5ede63..e943de7 100644 --- a/docs/grpc/Grpc__Client/index.html +++ b/docs/grpc/Grpc__Client/index.html @@ -1,2 +1,2 @@ -Grpc__Client (grpc.Grpc__Client)

Module Grpc__Client

\ No newline at end of file +Grpc__Client (grpc.Grpc__Client)

Module Grpc__Client

module type S = sig ... end

The type of a Client

\ No newline at end of file diff --git a/docs/grpc/Grpc__Client/module-type-S/index.html b/docs/grpc/Grpc__Client/module-type-S/index.html new file mode 100644 index 0000000..c47945a --- /dev/null +++ b/docs/grpc/Grpc__Client/module-type-S/index.html @@ -0,0 +1,2 @@ + +S (grpc.Grpc__Client.S)

Module type Grpc__Client.S

The type of a Client

type t

t represents a client and the associated services and routing information.

val v : unit -> t

v () creates a new server.

val add_service : name:string -> service:(H2.Reqd.t -> unit) -> t -> t

add_service ~name ~service t adds service to t and ensures that it is routable via name.

val make_request : t -> service:string -> rpc:string -> request:bytes -> H2.Reqd.t

make_request t reqd routes reqd to the appropriate service in t if available.

\ No newline at end of file diff --git a/docs/grpc/Grpc__Status/index.html b/docs/grpc/Grpc__Status/index.html index 5a3316f..d8b8765 100644 --- a/docs/grpc/Grpc__Status/index.html +++ b/docs/grpc/Grpc__Status/index.html @@ -1,2 +1,2 @@ -Grpc__Status (grpc.Grpc__Status)

Module Grpc__Status

type code =
| OK
| Cancelled
| Unknown
| Invalid_argument
| Deadline_exceeded
| Not_found
| Already_exists
| Permission_denied
| Resource_exhausted
| Failed_precondition
| Aborted
| Out_of_range
| Unimplemented
| Internal
| Unavailable
| Data_loss
| Unauthenticated
val int_of_code : code -> int

int_of_code c returns the corresponding integer status code for c.

type t

t represents a full gRPC status, this includes code and optional message.

val v : ?⁠message:string -> code -> t

v ~message code creates a new status with the given code and message.

val code : t -> code

code t returns the code associated with t.

val message : t -> string option

message t returns the message associated with t, if there is one.

\ No newline at end of file +Grpc__Status (grpc.Grpc__Status)

Module Grpc__Status

type code =
| OK
| Cancelled
| Unknown
| Invalid_argument
| Deadline_exceeded
| Not_found
| Already_exists
| Permission_denied
| Resource_exhausted
| Failed_precondition
| Aborted
| Out_of_range
| Unimplemented
| Internal
| Unavailable
| Data_loss
| Unauthenticated
val int_of_code : code -> int

int_of_code c returns the corresponding integer status code for c.

val code_of_int : int -> code option

code_of_int i returns the corresponding code for i if it exists.

type t

t represents a full gRPC status, this includes code and optional message.

val v : ?⁠message:string -> code -> t

v ~message code creates a new status with the given code and message.

val code : t -> code

code t returns the code associated with t.

val message : t -> string option

message t returns the message associated with t, if there is one.

\ No newline at end of file diff --git a/dune-project b/dune-project index 0de1e52..0e089d7 100644 --- a/dune-project +++ b/dune-project @@ -27,17 +27,18 @@ (>= 4.08)) (uri (>= 4.0.0)) - (ocaml-protoc - (>= 2.0.0)) (h2 - (>= 0.7.0)))) + (>= 0.7.0)) + ppx_deriving)) (package (name grpc-lwt) (synopsis "An Lwt implementation of gRPC") (description "Functionality for building gRPC services and rpcs with `lwt`.") (depends - (grpc :dev) + grpc (lwt (>= 5.0.0)) - (h2 :dev))) + (h2 + (>= 0.8.0)) + stringext)) diff --git a/examples/greeter-client-lwt/dune b/examples/greeter-client-lwt/dune new file mode 100644 index 0000000..584a0aa --- /dev/null +++ b/examples/greeter-client-lwt/dune @@ -0,0 +1,3 @@ +(executable + (name greeter_client_lwt) + (libraries grpc grpc-lwt ocaml-protoc lwt lwt.unix greeter h2 h2-lwt-unix)) diff --git a/examples/greeter-client-lwt/greeter_client_lwt.ml b/examples/greeter-client-lwt/greeter_client_lwt.ml new file mode 100644 index 0000000..7c5dd93 --- /dev/null +++ b/examples/greeter-client-lwt/greeter_client_lwt.ml @@ -0,0 +1,41 @@ +open Grpc_lwt +open Lwt.Syntax + +let call_server address port req = + let* addresses = + Lwt_unix.getaddrinfo address (string_of_int port) + [ Unix.(AI_FAMILY PF_INET) ] + in + let socket = Lwt_unix.socket Unix.PF_INET Unix.SOCK_STREAM 0 in + let* () = Lwt_unix.connect socket (List.hd addresses).Unix.ai_addr in + let error_handler _ = print_endline "error" in + let* connection = + H2_lwt_unix.Client.create_connection ~error_handler socket + in + (* code generation *) + let enc = Pbrt.Encoder.create () in + Greeter.Greeter_pb.encode_hello_request req enc; + + Client.call ~service:"mypackage.Greeter" ~rpc:"SayHello" + ~do_request:(H2_lwt_unix.Client.request connection ~error_handler:ignore) + ~handler: + (Client.Rpc.unary (Pbrt.Encoder.to_string enc) ~f:(fun decoder -> + let+ decoder = decoder in + match decoder with + | Some decoder -> + let decoder = Pbrt.Decoder.of_string decoder in + Greeter.Greeter_pb.decode_hello_reply decoder + | None -> Greeter.Greeter_types.default_hello_reply ())) + () + +let () = + let open Lwt.Syntax in + let port = 8080 in + let address = "localhost" in + let name = if Array.length Sys.argv > 1 then Sys.argv.(1) else "anonymous" in + let req = Greeter.Greeter_types.default_hello_request ~name () in + Lwt_main.run + (let+ res = call_server address port req in + match res with + | Ok (res, _) -> print_endline res.message + | Error _ -> print_endline "an error occurred") diff --git a/examples/greeter-server-lwt/greeter_server_lwt.ml b/examples/greeter-server-lwt/greeter_server_lwt.ml index bddbd8f..1a90721 100644 --- a/examples/greeter-server-lwt/greeter_server_lwt.ml +++ b/examples/greeter-server-lwt/greeter_server_lwt.ml @@ -1,9 +1,8 @@ open Grpc_lwt -let say_hello decoder = +let say_hello buffer = + let decoder = Pbrt.Decoder.of_string buffer in let req = Greeter.Greeter_pb.decode_hello_request decoder in - Format.printf "Received %a" Greeter.Greeter_pp.pp_hello_request req; - print_endline req.name; let message = if req.name = "" then "You forgot your name!" else Format.sprintf "Hello, %s!" req.name @@ -11,10 +10,10 @@ let say_hello decoder = let reply = Greeter.Greeter_types.default_hello_reply ~message () in let encoder = Pbrt.Encoder.create () in Greeter.Greeter_pb.encode_hello_reply reply encoder; - Lwt.return (Grpc.Status.(v OK), Some encoder) + Lwt.return (Grpc.Status.(v OK), Some (Pbrt.Encoder.to_string encoder)) let greeter_service = - Service.( + Server.Service.( v () |> add_rpc ~name:"SayHello" ~rpc:(Unary say_hello) |> handle_request) let server = @@ -40,7 +39,7 @@ let () = print_endline "Try running:"; print_endline ""; print_endline - {| grpcurl --plaintext --proto examples/greeter/greeter.proto -d '{"name":""}' localhost:8080 mypackage.Greeter/SayHello |}); + {| dune exec -- examples/greeter-client-lwt/greeter_client_lwt.exe |}); let forever, _ = Lwt.wait () in Lwt_main.run forever diff --git a/examples/greeter/greeter.proto b/examples/greeter/greeter.proto index 1607c70..9950cb6 100644 --- a/examples/greeter/greeter.proto +++ b/examples/greeter/greeter.proto @@ -2,10 +2,10 @@ syntax = "proto3"; package mypackage; // The greeting service definition. -service Greeter { +//service Greeter { // Sends a greeting - rpc SayHello(HelloRequest) returns (HelloReply) {} -} + //rpc SayHello(HelloRequest) returns (HelloReply) {} +//} // The request message containing the user's name. message HelloRequest { string name = 1; } diff --git a/flake.lock b/flake.lock index 8d92682..6d4b0fc 100644 --- a/flake.lock +++ b/flake.lock @@ -2,16 +2,18 @@ "nodes": { "nixpkgs": { "locked": { - "lastModified": 1609868504, - "narHash": "sha256-qAWYShxIMHAzN40r8QApVQgHjAHgnLtgBIZtIqhAbYo=", - "owner": "NixOS", + "lastModified": 1609870929, + "narHash": "sha256-aVGF0O3T+Xg4avzyCWhkZG6DvqItK6u/1Y4yY7jnj80=", + "owner": "sternenseemann", "repo": "nixpkgs", - "rev": "12677c44d547ea242be07651472fb39685e580a8", + "rev": "2de4f7dab09871fd05856ffde8f8e3bd40635579", "type": "github" }, "original": { - "id": "nixpkgs", - "type": "indirect" + "owner": "sternenseemann", + "ref": "ppx_deriving-5.1", + "repo": "nixpkgs", + "type": "github" } }, "root": { diff --git a/flake.nix b/flake.nix index 4c35cc6..5b5a73c 100644 --- a/flake.nix +++ b/flake.nix @@ -1,11 +1,17 @@ { description = "A modular gRPC library"; + inputs = { + nixpkgs = { + url = "github:sternenseemann/nixpkgs/ppx_deriving-5.1"; + }; + }; + outputs = { self, nixpkgs }: with import nixpkgs { system = "x86_64-linux"; }; let h2-src = fetchFromGitHub { - owner = "anmonteiro"; + owner = "jeffa5"; repo = "ocaml-h2"; rev = "36bd7bfa46fb0eb2bce184413f663a46a5e0dd3b"; sha256 = "sha256-8vsRpx0JVN6KHOVfKit6LhlQqGTO1ofRhfyDgJ7dGz0="; @@ -36,7 +42,7 @@ src = self; useDune2 = true; doCheck = true; - buildInputs = (with ocamlPackages; [ uri h2 ]); + buildInputs = (with ocamlPackages; [ uri h2 ppx_deriving ]); }; grpc-lwt = @@ -55,10 +61,18 @@ devShell.x86_64-linux = mkShell { buildInputs = [ ocaml + opam + + m4 + pkgconfig nixpkgs-fmt rnix-lsp - ] ++ (with ocamlPackages; [ dune_2 ocaml-protoc uri ]); + ]; + + shellHook = '' + eval $(opam env) + ''; }; }; } diff --git a/grpc-lwt.opam b/grpc-lwt.opam index 9a5ba8b..e4f6eeb 100644 --- a/grpc-lwt.opam +++ b/grpc-lwt.opam @@ -10,9 +10,10 @@ doc: "https://jeffas.io/ocaml-grpc" bug-reports: "https://github.com/jeffa5/ocaml-grpc/issues" depends: [ "dune" {>= "2.7"} - "grpc" {dev} + "grpc" "lwt" {>= "5.0.0"} - "h2" {dev} + "h2" {>= "0.8.0"} + "stringext" "odoc" {with-doc} ] build: [ @@ -30,6 +31,3 @@ build: [ ] ] dev-repo: "git+https://github.com/jeffa5/ocaml-grpc.git" -pin-depends: [ - [ "h2.dev" "git+https://github.com/anmonteiro/ocaml-h2#7df18604ae389e3151f79e2be5dfd4278d4377fb" ] -] diff --git a/grpc-lwt.opam.template b/grpc-lwt.opam.template index 5ff6e05..e69de29 100644 --- a/grpc-lwt.opam.template +++ b/grpc-lwt.opam.template @@ -1,3 +0,0 @@ -pin-depends: [ - [ "h2.dev" "git+https://github.com/anmonteiro/ocaml-h2#7df18604ae389e3151f79e2be5dfd4278d4377fb" ] -] diff --git a/grpc.opam b/grpc.opam index 7cae201..2152a25 100644 --- a/grpc.opam +++ b/grpc.opam @@ -13,8 +13,8 @@ depends: [ "dune" {>= "2.7"} "ocaml" {>= "4.08"} "uri" {>= "4.0.0"} - "ocaml-protoc" {>= "2.0.0"} "h2" {>= "0.7.0"} + "ppx_deriving" "odoc" {with-doc} ] build: [ diff --git a/lib/grpc-lwt/client.ml b/lib/grpc-lwt/client.ml new file mode 100644 index 0000000..6a8d4c1 --- /dev/null +++ b/lib/grpc-lwt/client.ml @@ -0,0 +1,101 @@ +open Lwt.Syntax + +type response_handler = H2.Client_connection.response_handler + +type do_request = + ?trailers_handler:(H2.Headers.t -> unit) -> + H2.Request.t -> + response_handler:response_handler -> + [ `write ] H2.Body.t + +let make_request ~scheme ~service ~rpc ~headers = + let request = + H2.Request.create ~scheme `POST ("/" ^ service ^ "/" ^ rpc) ~headers + in + request + +let default_headers = + H2.Headers.of_list + [ ("te", "trailers"); ("content-type", "application/grpc+proto") ] + +let call ~service ~rpc ?(scheme = "https") ~handler ~do_request + ?(headers = default_headers) () = + let request = make_request ~service ~rpc ~scheme ~headers in + let read_body, read_body_notify = Lwt.wait () in + let handler_res, handler_res_notify = Lwt.wait () in + let out, out_notify = Lwt.wait () in + let response_handler (response : H2.Response.t) body = + Lwt.wakeup_later read_body_notify body; + Lwt.async (fun () -> + if response.status <> `OK then ( + Lwt.wakeup_later out_notify + (Error (Grpc.Status.v Grpc.Status.Unknown)); + Lwt.return_unit) + else + let+ handler_res = handler_res in + Lwt.wakeup_later out_notify (Ok handler_res)) + in + let status, status_notify = Lwt.wait () in + let trailers_handler headers = + let code = + match H2.Headers.get headers "grpc-status" with + | None -> None + | Some s -> ( + match int_of_string_opt s with + | None -> None + | Some i -> Grpc.Status.code_of_int i) + in + match code with + | None -> () + | Some code -> + let message = H2.Headers.get headers "grpc-message" in + let status = Grpc.Status.v ?message code in + Lwt.wakeup_later status_notify status + in + let write_body = + do_request ?trailers_handler:(Some trailers_handler) request + ~response_handler + in + Lwt.async (fun () -> + let+ handler_res = handler write_body read_body in + Lwt.wakeup_later handler_res_notify handler_res); + let* out = out in + let+ status = status in + match out with Error _ as e -> e | Ok out -> Ok (out, status) + +module Rpc = struct + type 'a handler = + [ `write ] H2.Body.t -> [ `read ] H2.Body.t Lwt.t -> 'a Lwt.t + + let bidirectional_streaming ~f write_body read_body = + let decoder_stream, decoder_push = Lwt_stream.create () in + Lwt.async (fun () -> + let+ read_body in + Connection.grpc_recv_streaming read_body decoder_push); + let encoder_stream, encoder_push = Lwt_stream.create () in + Lwt.async (fun () -> + Connection.grpc_send_streaming_client write_body encoder_stream); + f encoder_push decoder_stream + + let client_streaming ~f = + bidirectional_streaming ~f:(fun encoder_push decoder_stream -> + let decoder = Lwt_stream.get decoder_stream in + f encoder_push decoder) + + let server_streaming ~f enc = + bidirectional_streaming ~f:(fun encoder_push decoder_stream -> + (fun enc -> + encoder_push (Some enc); + encoder_push None) + enc; + f decoder_stream) + + let unary ~f enc = + bidirectional_streaming ~f:(fun encoder_push decoder_stream -> + (fun enc -> + encoder_push (Some enc); + encoder_push None) + enc; + let decoder = Lwt_stream.get decoder_stream in + f decoder) +end diff --git a/lib/grpc-lwt/client.mli b/lib/grpc-lwt/client.mli new file mode 100644 index 0000000..78fb3c5 --- /dev/null +++ b/lib/grpc-lwt/client.mli @@ -0,0 +1,49 @@ +module Rpc : sig + type 'a handler = + [ `write ] H2.Body.t -> [ `read ] H2.Body.t Lwt.t -> 'a Lwt.t + + val bidirectional_streaming : + f:((string option -> unit) -> string Lwt_stream.t -> 'a Lwt.t) -> 'a handler + (** [bidirectional_streaming ~f write read] sets up the sending and receiving + logic using [write] and [read], then calls [f] with a push function for + requests and a stream of responses. *) + + val client_streaming : + f:((string option -> unit) -> string option Lwt.t -> 'a Lwt.t) -> 'a handler + (** [client_streaming ~f write read] sets up the sending and receiving + logic using [write] and [read], then calls [f] with a push function for + requests and promise for the response. *) + + val server_streaming : + f:(string Lwt_stream.t -> 'a Lwt.t) -> string -> 'a handler + (** [server_streaming ~f enc write read] sets up the sending and receiving + logic using [write] and [read], then sends [enc] and calls [f] with a + stream of responses. *) + + val unary : f:(string option Lwt.t -> 'a Lwt.t) -> string -> 'a handler + (** [unary ~f enc write read] sets up the sending and receiving + logic using [write] and [read], then sends [enc] and calls [f] with a + promise for the response. *) +end + +type response_handler = H2.Client_connection.response_handler + +type do_request = + ?trailers_handler:(H2.Headers.t -> unit) -> + H2.Request.t -> + response_handler:response_handler -> + [ `write ] H2.Body.t +(** [do_request] is the type of a function that performs the request *) + +val call : + service:string -> + rpc:string -> + ?scheme:string -> + handler:'a Rpc.handler -> + do_request:do_request -> + ?headers:H2.Headers.t -> + unit -> + ('a * Grpc.Status.t, Grpc.Status.t) result Lwt.t +(** [call ~service ~rpc ~handler ~do_request ()] calls the rpc endpoint given + by [service] and [rpc] using the [do_request] function. The [handler] is + called when this request is set up to send and receive data. *) diff --git a/lib/grpc-lwt/connection.ml b/lib/grpc-lwt/connection.ml index 3269fe8..6bbdc82 100644 --- a/lib/grpc-lwt/connection.ml +++ b/lib/grpc-lwt/connection.ml @@ -1,21 +1,29 @@ open Lwt.Syntax -let grpc_recv_streaming request decoder_push = - let body = H2.Reqd.request_body request in - let request_buffer = ref @@ Grpc.Buffer.v () in +let grpc_recv_streaming body buffer_push = + let request_buffer = Grpc.Buffer.v () in + let on_eof () = buffer_push None in let rec on_read buffer ~off ~len = Grpc.Buffer.copy_from_bigstringaf ~src_off:off ~src:buffer - ~dst:!request_buffer ~length:len; - let message = Grpc.Message.extract !request_buffer in - ( match message with - | Some message -> - decoder_push - (Some (Pbrt.Decoder.of_bytes (Grpc.Buffer.to_bytes message))) - | None -> () ); + ~dst:request_buffer ~length:len; + let message = Grpc.Message.extract request_buffer in + (match message with + | Some message -> buffer_push (Some message) + | None -> ()); H2.Body.schedule_read body ~on_read ~on_eof - and on_eof () = decoder_push None in + in H2.Body.schedule_read body ~on_read ~on_eof +let grpc_send_streaming_client body encoder_stream = + let+ () = + Lwt_stream.iter + (fun encoder -> + let payload = Grpc.Message.make encoder in + H2.Body.write_string body payload) + encoder_stream + in + H2.Body.close_writer body + let grpc_send_streaming request encoder_stream status_mvar = let body = H2.Reqd.respond_with_streaming ~flush_headers_immediately:true request @@ -26,8 +34,8 @@ let grpc_send_streaming request encoder_stream status_mvar = in let* () = Lwt_stream.iter - (fun encoder -> - let payload = Grpc.Message.make (Pbrt.Encoder.to_string encoder) in + (fun input -> + let payload = Grpc.Message.make input in H2.Body.write_string body payload; H2.Body.flush body (fun () -> ())) encoder_stream @@ -35,13 +43,12 @@ let grpc_send_streaming request encoder_stream status_mvar = let+ status = Lwt_mvar.take status_mvar in H2.Reqd.schedule_trailers request (H2.Headers.of_list - ( [ - ( "grpc-status", - string_of_int (Grpc.Status.int_of_code (Grpc.Status.code status)) - ); - ] + ([ + ( "grpc-status", + string_of_int (Grpc.Status.int_of_code (Grpc.Status.code status)) ); + ] @ match Grpc.Status.message status with | None -> [] - | Some message -> [ ("grpc-message", message) ] )); + | Some message -> [ ("grpc-message", message) ])); H2.Body.close_writer body diff --git a/lib/grpc-lwt/dune b/lib/grpc-lwt/dune index ad5a30f..0031944 100644 --- a/lib/grpc-lwt/dune +++ b/lib/grpc-lwt/dune @@ -1,4 +1,4 @@ (library (name grpc_lwt) (public_name grpc-lwt) - (libraries grpc h2 stringext lwt ocaml-protoc)) + (libraries grpc h2 stringext lwt)) diff --git a/lib/grpc-lwt/grpc_lwt.ml b/lib/grpc-lwt/grpc_lwt.ml index 22c9e19..9313e56 100644 --- a/lib/grpc-lwt/grpc_lwt.ml +++ b/lib/grpc-lwt/grpc_lwt.ml @@ -1,3 +1,2 @@ module Server = Server -module Service = Service -module Rpc = Rpc +module Client = Client diff --git a/lib/grpc-lwt/rpc.ml b/lib/grpc-lwt/rpc.ml deleted file mode 100644 index cf4c08a..0000000 --- a/lib/grpc-lwt/rpc.ml +++ /dev/null @@ -1,54 +0,0 @@ -open Lwt.Syntax - -type unary = Pbrt.Decoder.t -> (Grpc.Status.t * Pbrt.Encoder.t option) Lwt.t - -type client_streaming = - Pbrt.Decoder.t Lwt_stream.t -> (Grpc.Status.t * Pbrt.Encoder.t option) Lwt.t - -type server_streaming = - Pbrt.Decoder.t -> (Pbrt.Encoder.t -> unit) -> Grpc.Status.t Lwt.t - -type bidirectional_streaming = - Pbrt.Decoder.t Lwt_stream.t -> (Pbrt.Encoder.t -> unit) -> Grpc.Status.t Lwt.t - -type t = - | Unary of unary - | Client_streaming of client_streaming - | Server_streaming of server_streaming - | Bidirectional_streaming of bidirectional_streaming - -let bidirectional_streaming ~f reqd = - let decoder_stream, decoder_push = Lwt_stream.create () in - Connection.grpc_recv_streaming reqd decoder_push; - let encoder_stream, encoder_push = Lwt_stream.create () in - let status_mvar = Lwt_mvar.create_empty () in - Lwt.async (fun () -> - Connection.grpc_send_streaming reqd encoder_stream status_mvar); - let* status = f decoder_stream (fun encoder -> encoder_push (Some encoder)) in - encoder_push None; - Lwt_mvar.put status_mvar status - -let client_streaming ~f reqd = - bidirectional_streaming reqd ~f:(fun decoder_stream encoder_push -> - let+ status, encoder = f decoder_stream in - (match encoder with None -> () | Some encoder -> encoder_push encoder); - (status : Grpc.Status.t)) - -let server_streaming ~f reqd = - bidirectional_streaming reqd ~f:(fun decoder_stream encoder_push -> - let* decoder = Lwt_stream.get decoder_stream in - match decoder with - | None -> Lwt.return Grpc.Status.(v OK) - | Some decoder -> f decoder encoder_push) - -let unary ~f reqd = - bidirectional_streaming reqd ~f:(fun decoder_stream encoder_push -> - let* decoder = Lwt_stream.get decoder_stream in - match decoder with - | None -> Lwt.return Grpc.Status.(v OK) - | Some decoder -> - let+ status, encoder = f decoder in - ( match encoder with - | None -> () - | Some encoder -> encoder_push encoder ); - status) diff --git a/lib/grpc-lwt/rpc.mli b/lib/grpc-lwt/rpc.mli deleted file mode 100644 index 321af5a..0000000 --- a/lib/grpc-lwt/rpc.mli +++ /dev/null @@ -1,35 +0,0 @@ -type unary = Pbrt.Decoder.t -> (Grpc.Status.t * Pbrt.Encoder.t option) Lwt.t -(** [unary] is the type for a unary grpc rpc, one request, one response. *) - -type client_streaming = - Pbrt.Decoder.t Lwt_stream.t -> (Grpc.Status.t * Pbrt.Encoder.t option) Lwt.t -(** [client_streaming] is the type for an rpc where the client streams the requests and the server responds once. *) - -type server_streaming = - Pbrt.Decoder.t -> (Pbrt.Encoder.t -> unit) -> Grpc.Status.t Lwt.t -(** [server_streaming] is the type for an rpc where the client sends one request and the server sends multiple responses. *) - -type bidirectional_streaming = - Pbrt.Decoder.t Lwt_stream.t -> (Pbrt.Encoder.t -> unit) -> Grpc.Status.t Lwt.t -(** [bidirectional_streaming] is the type for an rpc where both the client and server can send multiple messages. *) - -type t = - | Unary of unary - | Client_streaming of client_streaming - | Server_streaming of server_streaming - | Bidirectional_streaming of bidirectional_streaming - -(** [t] represents the types of rpcs available in gRPC. *) - -val unary : f:unary -> H2.Reqd.t -> unit Lwt.t -(** [unary ~f reqd] calls [f] with the request obtained from [reqd] and handles sending the response. *) - -val client_streaming : f:client_streaming -> H2.Reqd.t -> unit Lwt.t -(** [client_streaming ~f reqd] calls [f] with a stream to pull requests from and handles sending the response. *) - -val server_streaming : f:server_streaming -> H2.Reqd.t -> unit Lwt.t -(** [server_streaming ~f reqd] calls [f] with the request optained from [reqd] and handles sending the responses pushed out. *) - -val bidirectional_streaming : - f:bidirectional_streaming -> H2.Reqd.t -> unit Lwt.t -(** [bidirectional_streaming ~f reqd] calls [f] with a stream to pull requests from and andles sending the responses pushed out. *) diff --git a/lib/grpc-lwt/server.ml b/lib/grpc-lwt/server.ml index 0f6fd0a..5b1c30d 100644 --- a/lib/grpc-lwt/server.ml +++ b/lib/grpc-lwt/server.ml @@ -38,10 +38,101 @@ let handle_request t reqd = | Some encodings -> let encodings = String.split_on_char ',' encodings in if List.mem "identity" encodings then route () - else respond_with `Not_acceptable ) + else respond_with `Not_acceptable) | Some _ -> (* TODO: not sure if there is a specific way to handle this in grpc *) respond_with `Bad_request else respond_with `Unsupported_media_type - | None -> respond_with `Unsupported_media_type ) + | None -> respond_with `Unsupported_media_type) | _ -> respond_with `Not_found + +module Rpc = struct + open Lwt.Syntax + + type unary = string -> (Grpc.Status.t * string option) Lwt.t + + type client_streaming = + string Lwt_stream.t -> (Grpc.Status.t * string option) Lwt.t + + type server_streaming = string -> (string -> unit) -> Grpc.Status.t Lwt.t + + type bidirectional_streaming = + string Lwt_stream.t -> (string -> unit) -> Grpc.Status.t Lwt.t + + type t = + | Unary of unary + | Client_streaming of client_streaming + | Server_streaming of server_streaming + | Bidirectional_streaming of bidirectional_streaming + + let bidirectional_streaming ~f reqd = + let decoder_stream, decoder_push = Lwt_stream.create () in + let body = H2.Reqd.request_body reqd in + Connection.grpc_recv_streaming body decoder_push; + let encoder_stream, encoder_push = Lwt_stream.create () in + let status_mvar = Lwt_mvar.create_empty () in + Lwt.async (fun () -> + Connection.grpc_send_streaming reqd encoder_stream status_mvar); + let* status = + f decoder_stream (fun encoder -> encoder_push (Some encoder)) + in + encoder_push None; + Lwt_mvar.put status_mvar status + + let client_streaming ~f reqd = + bidirectional_streaming reqd ~f:(fun decoder_stream encoder_push -> + let+ status, encoder = f decoder_stream in + (match encoder with None -> () | Some encoder -> encoder_push encoder); + (status : Grpc.Status.t)) + + let server_streaming ~f reqd = + bidirectional_streaming reqd ~f:(fun decoder_stream encoder_push -> + let* decoder = Lwt_stream.get decoder_stream in + match decoder with + | None -> Lwt.return Grpc.Status.(v OK) + | Some decoder -> f decoder encoder_push) + + let unary ~f reqd = + bidirectional_streaming reqd ~f:(fun decoder_stream encoder_push -> + let* decoder = Lwt_stream.get decoder_stream in + match decoder with + | None -> Lwt.return Grpc.Status.(v OK) + | Some decoder -> + let+ status, encoder = f decoder in + (match encoder with + | None -> () + | Some encoder -> encoder_push encoder); + status) +end + +module Service = struct + module RpcMap = Map.Make (String) + + type t = Rpc.t RpcMap.t + + let v () = RpcMap.empty + + let add_rpc ~name ~rpc t = RpcMap.add name rpc t + + let handle_request (t : t) reqd = + let request = H2.Reqd.request reqd in + let respond_with code = + H2.Reqd.respond_with_string reqd (H2.Response.create code) "" + in + let parts = String.split_on_char '/' request.target in + if List.length parts > 1 then + let rpc_name = List.nth parts (List.length parts - 1) in + let rpc = RpcMap.find_opt rpc_name t in + match rpc with + | Some rpc -> ( + match rpc with + | Unary f -> Lwt.async (fun () -> Rpc.unary ~f reqd) + | Client_streaming f -> + Lwt.async (fun () -> Rpc.client_streaming ~f reqd) + | Server_streaming f -> + Lwt.async (fun () -> Rpc.server_streaming ~f reqd) + | Bidirectional_streaming f -> + Lwt.async (fun () -> Rpc.bidirectional_streaming ~f reqd)) + | None -> respond_with `Not_found + else respond_with `Not_found +end diff --git a/lib/grpc-lwt/server.mli b/lib/grpc-lwt/server.mli index a64a057..a319fa0 100644 --- a/lib/grpc-lwt/server.mli +++ b/lib/grpc-lwt/server.mli @@ -1 +1,52 @@ include Grpc.Server.S + +module Rpc : sig + type unary = string -> (Grpc.Status.t * string option) Lwt.t + (** [unary] is the type for a unary grpc rpc, one request, one response. *) + + type client_streaming = + string Lwt_stream.t -> (Grpc.Status.t * string option) Lwt.t + (** [client_streaming] is the type for an rpc where the client streams the requests and the server responds once. *) + + type server_streaming = string -> (string -> unit) -> Grpc.Status.t Lwt.t + (** [server_streaming] is the type for an rpc where the client sends one request and the server sends multiple responses. *) + + type bidirectional_streaming = + string Lwt_stream.t -> (string -> unit) -> Grpc.Status.t Lwt.t + (** [bidirectional_streaming] is the type for an rpc where both the client and server can send multiple messages. *) + + type t = + | Unary of unary + | Client_streaming of client_streaming + | Server_streaming of server_streaming + | Bidirectional_streaming of bidirectional_streaming + + (** [t] represents the types of rpcs available in gRPC. *) + + val unary : f:unary -> H2.Reqd.t -> unit Lwt.t + (** [unary ~f reqd] calls [f] with the request obtained from [reqd] and handles sending the response. *) + + val client_streaming : f:client_streaming -> H2.Reqd.t -> unit Lwt.t + (** [client_streaming ~f reqd] calls [f] with a stream to pull requests from and handles sending the response. *) + + val server_streaming : f:server_streaming -> H2.Reqd.t -> unit Lwt.t + (** [server_streaming ~f reqd] calls [f] with the request optained from [reqd] and handles sending the responses pushed out. *) + + val bidirectional_streaming : + f:bidirectional_streaming -> H2.Reqd.t -> unit Lwt.t + (** [bidirectional_streaming ~f reqd] calls [f] with a stream to pull requests from and andles sending the responses pushed out. *) +end + +module Service : sig + type t + (** [t] represents a gRPC service with potentially multiple rpcs and the information needed to route to them. *) + + val v : unit -> t + (** [v ()] creates a new service *) + + val add_rpc : name:string -> rpc:Rpc.t -> t -> t + (** [add_rpc ~name ~rpc t] adds [rpc] to [t] and ensures that [t] can route to it with [name]. *) + + val handle_request : t -> H2.Reqd.t -> unit + (** [handle_request t reqd] handles routing [reqd] to the correct rpc if available in [t]. *) +end diff --git a/lib/grpc-lwt/service.ml b/lib/grpc-lwt/service.ml deleted file mode 100644 index 0a62cde..0000000 --- a/lib/grpc-lwt/service.ml +++ /dev/null @@ -1,29 +0,0 @@ -module RpcMap = Map.Make (String) - -type t = Rpc.t RpcMap.t - -let v () = RpcMap.empty - -let add_rpc ~name ~rpc t = RpcMap.add name rpc t - -let handle_request (t : t) reqd = - let request = H2.Reqd.request reqd in - let respond_with code = - H2.Reqd.respond_with_string reqd (H2.Response.create code) "" - in - let parts = String.split_on_char '/' request.target in - if List.length parts > 1 then - let rpc_name = List.nth parts (List.length parts - 1) in - let rpc = RpcMap.find_opt rpc_name t in - match rpc with - | Some rpc -> ( - match rpc with - | Unary f -> Lwt.async (fun () -> Rpc.unary ~f reqd) - | Client_streaming f -> - Lwt.async (fun () -> Rpc.client_streaming ~f reqd) - | Server_streaming f -> - Lwt.async (fun () -> Rpc.server_streaming ~f reqd) - | Bidirectional_streaming f -> - Lwt.async (fun () -> Rpc.bidirectional_streaming ~f reqd) ) - | None -> respond_with `Not_found - else respond_with `Not_found diff --git a/lib/grpc-lwt/service.mli b/lib/grpc-lwt/service.mli deleted file mode 100644 index 1dedad9..0000000 --- a/lib/grpc-lwt/service.mli +++ /dev/null @@ -1,11 +0,0 @@ -type t -(** [t] represents a gRPC service with potentially multiple rpcs and the information needed to route to them. *) - -val v : unit -> t -(** [v ()] creates a new service *) - -val add_rpc : name:string -> rpc:Rpc.t -> t -> t -(** [add_rpc ~name ~rpc t] adds [rpc] to [t] and ensures that [t] can route to it with [name]. *) - -val handle_request : t -> H2.Reqd.t -> unit -(** [handle_request t reqd] handles routing [reqd] to the correct rpc if available in [t]. *) diff --git a/lib/grpc/buffer.ml b/lib/grpc/buffer.ml index a80a020..3cfb2ef 100644 --- a/lib/grpc/buffer.ml +++ b/lib/grpc/buffer.ml @@ -1,20 +1,22 @@ type t = { mutable contents : bytes; mutable length : int } -let v ?(capacity = 2048) () = { contents = Bytes.create capacity; length = 0 } +let v ?(capacity = 1024) () = { contents = Bytes.create capacity; length = 0 } let length t = t.length let capacity t = Bytes.length t.contents -let extend t amount = - t.contents <- Bytes.extend t.contents 0 amount; - t.length <- t.length + amount +let extend t amount = t.contents <- Bytes.extend t.contents 0 amount -let log2 i = log (float_of_int i) /. log 2. +let rec nearest_power_of_2 acc target = + if acc >= target then acc else nearest_power_of_2 (acc * 2) target let ensure_size t ~extra = - if t.length + extra >= capacity t then - extend t (int_of_float (log2 (t.length + extra)) - capacity t) + let current_capacity = capacity t in + let needed_capacity = t.length + extra in + if needed_capacity >= current_capacity then + extend t + (nearest_power_of_2 current_capacity needed_capacity - current_capacity) let copy_from_bigstringaf ~src_off ~src ~dst ~length = ensure_size dst ~extra:length; @@ -28,6 +30,8 @@ let sub ~start ~length t = let to_bytes t = Bytes.sub t.contents 0 t.length +let to_string t = to_bytes t |> Bytes.to_string + let shift_left ~by t = Bytes.blit t.contents by t.contents 0 (t.length - by); t.length <- t.length - by diff --git a/lib/grpc/buffer.mli b/lib/grpc/buffer.mli index 775f759..61437a7 100644 --- a/lib/grpc/buffer.mli +++ b/lib/grpc/buffer.mli @@ -15,6 +15,9 @@ val capacity : t -> int val to_bytes : t -> bytes (** [to_bytes t] converts the valid data in the buffer into bytes. *) +val to_string : t -> string +(** [to_string t] converts the valid data in the buffer into a string. *) + val copy_from_bigstringaf : src_off:int -> src:Bigstringaf.t -> dst:t -> length:int -> unit (** [copy_from_bigstringaf ~src_off ~src ~dst ~length] copies data from [src] diff --git a/lib/grpc/client.ml b/lib/grpc/client.ml deleted file mode 100644 index e69de29..0000000 diff --git a/lib/grpc/dune b/lib/grpc/dune index 4de153b..1170fe5 100644 --- a/lib/grpc/dune +++ b/lib/grpc/dune @@ -1,4 +1,6 @@ (library (name grpc) (public_name grpc) + (preprocess + (pps ppx_deriving.show)) (libraries h2 bigstringaf uri)) diff --git a/lib/grpc/message.ml b/lib/grpc/message.ml index 5cb9197..cccdcd3 100644 --- a/lib/grpc/message.ml +++ b/lib/grpc/message.ml @@ -31,8 +31,8 @@ let extract_message buf = in if compressed then failwith "Compressed flag set but not supported"; if Buffer.length buf - 5 >= length then - Some (Buffer.sub buf ~start:5 ~length) - else None ) + Some (Buffer.sub buf ~start:5 ~length |> Buffer.to_string) + else None) else None (** [get_message_and_shift buf] tries to extract the first grpc message @@ -42,7 +42,7 @@ let get_message_and_shift buf = match message with | None -> None | Some message -> - let mlen = Buffer.length message in + let mlen = String.length message in Buffer.shift_left buf ~by:(5 + mlen); Some message diff --git a/lib/grpc/message.mli b/lib/grpc/message.mli index fc1c6e4..46ca6a5 100644 --- a/lib/grpc/message.mli +++ b/lib/grpc/message.mli @@ -1,5 +1,5 @@ val make : string -> string (** [make s] encodes a string as a gRPC message. *) -val extract : Buffer.t -> Buffer.t option +val extract : Buffer.t -> string option (** [extract b] attempts to extract a gRPC message from [b]. *) diff --git a/lib/grpc/status.ml b/lib/grpc/status.ml index c1cf0fe..2d4e8eb 100644 --- a/lib/grpc/status.ml +++ b/lib/grpc/status.ml @@ -16,6 +16,7 @@ type code = | Unavailable | Data_loss | Unauthenticated +[@@deriving show] let int_of_code = function | OK -> 0 @@ -36,7 +37,27 @@ let int_of_code = function | Data_loss -> 15 | Unauthenticated -> 16 -type t = { code : code; message : string option } +let code_of_int = function + | 0 -> Some OK + | 1 -> Some Cancelled + | 2 -> Some Unknown + | 3 -> Some Invalid_argument + | 4 -> Some Deadline_exceeded + | 5 -> Some Not_found + | 6 -> Some Already_exists + | 7 -> Some Permission_denied + | 8 -> Some Resource_exhausted + | 9 -> Some Failed_precondition + | 10 -> Some Aborted + | 11 -> Some Out_of_range + | 12 -> Some Unimplemented + | 13 -> Some Internal + | 14 -> Some Unavailable + | 15 -> Some Data_loss + | 16 -> Some Unauthenticated + | _ -> None + +type t = { code : code; message : string option } [@@deriving show] let v ?message code = { code; message } diff --git a/lib/grpc/status.mli b/lib/grpc/status.mli index 87a4958..842eeb2 100644 --- a/lib/grpc/status.mli +++ b/lib/grpc/status.mli @@ -16,13 +16,17 @@ type code = | Unavailable | Data_loss | Unauthenticated +[@@deriving show] (** [code] represents the valid gRPC status codes to respond with. *) val int_of_code : code -> int (** [int_of_code c] returns the corresponding integer status code for [c]. *) -type t +val code_of_int : int -> code option +(** [code_of_int i] returns the corresponding code for [i] if it exists. *) + +type t [@@deriving show] (** [t] represents a full gRPC status, this includes code and optional message. *) val v : ?message:string -> code -> t