Skip to content

Commit d77826f

Browse files
committed
Add pipe to eio_unix
1 parent 7cb0da3 commit d77826f

File tree

5 files changed

+46
-0
lines changed

5 files changed

+46
-0
lines changed

lib_eio/unix/eio_unix.ml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ module Private = struct
1717
| Get_system_clock : Eio.Time.clock Effect.t
1818
| Socket_of_fd : Eio.Switch.t * bool * Unix.file_descr -> socket Effect.t
1919
| Socketpair : Eio.Switch.t * Unix.socket_domain * Unix.socket_type * int -> (socket * socket) Effect.t
20+
| Pipe : Eio.Switch.t -> (<Eio.Flow.source; Eio.Flow.close; unix_fd> * <Eio.Flow.sink; Eio.Flow.close; unix_fd>) Effect.t
2021
end
2122

2223
let await_readable fd = Effect.perform (Private.Await_readable fd)
@@ -48,6 +49,8 @@ end
4849
let socketpair ~sw ?(domain=Unix.PF_UNIX) ?(ty=Unix.SOCK_STREAM) ?(protocol=0) () =
4950
Effect.perform (Private.Socketpair (sw, domain, ty, protocol))
5051

52+
let pipe sw = Effect.perform (Private.Pipe sw)
53+
5154
module Ipaddr = struct
5255
let to_unix : _ Eio.Net.Ipaddr.t -> Unix.inet_addr = Obj.magic
5356
let of_unix : Unix.inet_addr -> _ Eio.Net.Ipaddr.t = Obj.magic

lib_eio/unix/eio_unix.mli

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,11 @@ val socketpair :
7777
This creates OS-level resources using [socketpair(2)].
7878
Note that, like all FDs created by Eio, they are both marked as close-on-exec by default. *)
7979

80+
val pipe : Switch.t -> <Eio.Flow.source; Eio.Flow.close; unix_fd> * <Eio.Flow.sink; Eio.Flow.close; unix_fd>
81+
(** [pipe sw] returns a connected pair of flows [src] and [sink]. Data written to [sink]
82+
can be read from [src].
83+
Note that, like all FDs created by Eio, they are both marked as close-on-exec by default. *)
84+
8085
(** API for Eio backends only. *)
8186
module Private : sig
8287
type _ Eio.Generic.ty += Unix_file_descr : [`Peek | `Take] -> Unix.file_descr Eio.Generic.ty
@@ -90,6 +95,8 @@ module Private : sig
9095
socket Effect.t (** See {!FD.as_socket} *)
9196
| Socketpair : Eio.Switch.t * Unix.socket_domain * Unix.socket_type * int ->
9297
(socket * socket) Effect.t (** See {!socketpair} *)
98+
| Pipe : Eio.Switch.t ->
99+
(<Eio.Flow.source; Eio.Flow.close; unix_fd> * <Eio.Flow.sink; Eio.Flow.close; unix_fd>) Effect.t (** See {!pipe} *)
93100
end
94101

95102
module Ctf = Ctf_unix

lib_eio_linux/eio_linux.ml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1446,6 +1446,12 @@ let rec run : type a.
14461446
let b = FD.of_unix ~sw ~seekable:false ~close_unix:true b |> flow in
14471447
continue k ((a :> Eio_unix.socket), (b :> Eio_unix.socket))
14481448
)
1449+
| Eio_unix.Private.Pipe sw -> Some (fun k ->
1450+
let r, w = Unix.pipe ~cloexec:true () in
1451+
let r = (flow (FD.of_unix ~sw ~seekable:false ~close_unix:true r) :> <Eio.Flow.source; Eio.Flow.close; Eio_unix.unix_fd>) in
1452+
let w = (flow (FD.of_unix ~sw ~seekable:false ~close_unix:true w) :> <Eio.Flow.sink; Eio.Flow.close; Eio_unix.unix_fd>) in
1453+
continue k (r, w)
1454+
)
14491455
| Low_level.Alloc -> Some (fun k ->
14501456
match st.mem with
14511457
| None -> continue k None

lib_eio_luv/eio_luv.ml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -596,6 +596,7 @@ let get_fd_opt t = Eio.Generic.probe t FD
596596
let flow fd = object (_ : <source; sink; ..>)
597597
method fd = fd
598598
method close = Low_level.File.close fd
599+
method unix_fd op = File.to_unix op fd
599600

600601
method probe : type a. a Eio.Generic.ty -> a option = function
601602
| FD -> Some fd
@@ -1155,6 +1156,12 @@ let rec run : type a. (_ -> a) -> a = fun main ->
11551156
with Luv_error _ as ex ->
11561157
discontinue k ex
11571158
)
1159+
| Eio_unix.Private.Pipe sw -> Some (fun k ->
1160+
let r, w = Luv.Pipe.pipe ~read_flags:[] ~write_flags:[] () |> or_raise in
1161+
let r = (flow (File.of_luv ~close_unix:true ~sw r) :> <Eio.Flow.source; Eio.Flow.close; Eio_unix.unix_fd>) in
1162+
let w = (flow (File.of_luv ~close_unix:true ~sw w) :> <Eio.Flow.sink; Eio.Flow.close; Eio_unix.unix_fd>) in
1163+
continue k (r, w)
1164+
)
11581165
| _ -> None
11591166
}
11601167
in

tests/flow.md

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,3 +118,26 @@ Copying from src using `Read_source_buffer`:
118118
+dst: wrote "foobar"
119119
- : unit = ()
120120
```
121+
122+
## Pipes
123+
124+
Writing to and reading from a pipe.
125+
126+
```ocaml
127+
# Eio_main.run @@ fun env ->
128+
Switch.run @@ fun sw ->
129+
let r, w = Eio_unix.pipe sw in
130+
let msg = "Hello, world" in
131+
Eio.Fiber.both
132+
(fun () ->
133+
let buf = Cstruct.create (String.length msg) in
134+
let () = Eio.Flow.read_exact r buf in
135+
traceln "Got: %s" (Cstruct.to_string buf)
136+
)
137+
(fun () ->
138+
Eio.Flow.copy_string msg w;
139+
Eio.Flow.close w
140+
);;
141+
+Got: Hello, world
142+
- : unit = ()
143+
```

0 commit comments

Comments
 (0)