Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 71 additions & 63 deletions CHANGES.md

Large diffs are not rendered by default.

43 changes: 35 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -221,12 +221,18 @@ Calling an operation that performs an effect (such as `yield`) can switch to a d

## Tracing

The library can write traces in CTF format, showing when threads (fibers) are created, when they run, and how they interact.
We can run the previous code with tracing enabled (writing to a new `trace.ctf` file) like this:
The tracing infrastructure is being switched from the CTF format to using OCaml
runtime events.

### Starting from OCaml 5.1

The `eio.runtime_events` library provides a set of custom events emitted by
eio's runtime. They are written in a ring-buffer and can be consumed by various
tools. We can run the previous code with tracing enabled like this:

```ocaml
# let () =
Eio_unix.Ctf.with_tracing "trace.ctf" @@ fun () ->
Eio.Tracing.with_tracing @@ fun () ->
Eio_main.run main;;
+x = 1
+y = 1
Expand All @@ -236,9 +242,11 @@ We can run the previous code with tracing enabled (writing to a new `trace.ctf`
+y = 3
```

The trace can be viewed using [mirage-trace-viewer][].
This should work even while the program is still running.
The file is a ring buffer, so when it gets full, old events will start to be overwritten with new ones.
These events can be consumed using:
- [Meio][] (Monitoring for Eio) project provides an interactive console-based
UI for exploring running fibers.
- [Olly][]: traces and statistics. (WIP: https://github.com/tarides/runtime_events_tools/pull/12)
- [mirage-trace-viewer][]: render an event trace. (WIP: https://github.com/talex5/mirage-trace-viewer/pull/14)

<p align='center'>
<img src="./doc/trace.svg"/>
Expand All @@ -248,8 +256,26 @@ This shows the two counting threads as two horizonal lines.
The white regions indicate when each thread was running.
Note that the output from `traceln` appears in the trace as well as on the console.

The [Meio][] (Monitoring for Eio) project provides an interactive console-based UI for exploring running fibers,
using the new runtime events support in OCaml 5.1.
### Before OCaml 5.1

The library can write traces in CTF format, showing when threads (fibers) are created, when they run, and how they interact.
To write the trace to a new `trace.ctf` file, use `Eio_unix.Ctf.with_tracing`:

```ocaml
# let () =
Eio_unix.Ctf.with_tracing "trace.ctf" @@ fun () ->
Eio_main.run main;;
+x = 1
+y = 1
+x = 2
+y = 2
+x = 3
+y = 3
```

The trace can be viewed using [mirage-trace-viewer][].
This should work even while the program is still running.
The file is a ring buffer, so when it gets full, old events will start to be overwritten with new ones.

## Cancellation

Expand Down Expand Up @@ -1863,3 +1889,4 @@ Some background about the effects system can be found in:
[Lambda Capabilities]: https://roscidus.com/blog/blog/2023/04/26/lambda-capabilities/
[Eio.Process]: https://ocaml-multicore.github.io/eio/eio/Eio/Process/index.html
[Dev meetings]: https://docs.google.com/document/d/1ZBfbjAkvEkv9ldumpZV5VXrEc_HpPeYjHPW_TiwJe4Q
[Olly]: https://github.com/tarides/runtime_events_tools
2 changes: 1 addition & 1 deletion doc/prelude.ml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ module Eio_main = struct

(* To avoid non-deterministic output, we run the examples a single domain. *)
let fake_domain_mgr = object (_ : #Eio.Domain_manager.t)
method run fn =
method run ?loc fn =
(* Since we're in the same domain, cancelling the calling fiber will
cancel the fake spawned one automatically. *)
let cancelled, _ = Promise.create () in
Expand Down
1 change: 1 addition & 0 deletions dune-project
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
(fmt (>= 0.8.9))
(hmap (>= 0.8.1))
(domain-local-await (>= 0.1.0))
(astring (>= 0.8.5))
(crowbar (and (>= 0.2) :with-test))
(mtime (>= 2.0.0))
(mdx (and (>= 2.2.0) :with-test))
Expand Down
1 change: 1 addition & 0 deletions eio.opam
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ depends: [
"fmt" {>= "0.8.9"}
"hmap" {>= "0.8.1"}
"domain-local-await" {>= "0.1.0"}
"astring" {>= "0.8.5"}
"crowbar" {>= "0.2" & with-test}
"mtime" {>= "2.0.0"}
"mdx" {>= "2.2.0" & with-test}
Expand Down
8 changes: 5 additions & 3 deletions lib_eio/buf_write.ml
Original file line number Diff line number Diff line change
Expand Up @@ -480,10 +480,12 @@ let copy t flow =
try aux ()
with End_of_file -> ()

let with_flow ?(initial_size=0x1000) flow fn =
Switch.run @@ fun sw ->
let with_flow ?name ?(loc = Tracing.get_caller ()) ?(initial_size=0x1000) flow fn =
Switch.run ?name ~loc @@ fun sw ->
let t = create ~sw initial_size in
Fiber.fork ~sw (fun () -> copy t flow);
Fiber.fork ~sw (fun () ->
Tracing.set_name "eio.buf_write.with_flow writer";
copy t flow);
match fn t with
| x ->
close t;
Expand Down
2 changes: 1 addition & 1 deletion lib_eio/buf_write.mli
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ exception Flush_aborted

(** {2 Running} *)

val with_flow : ?initial_size:int -> #Flow.sink -> (t -> 'a) -> 'a
val with_flow : ?name:string -> ?loc:string -> ?initial_size:int -> #Flow.sink -> (t -> 'a) -> 'a
(** [with_flow flow fn] runs [fn writer], where [writer] is a buffer that flushes to [flow].

Concurrently with [fn], it also runs a fiber that copies from [writer] to [flow].
Expand Down
50 changes: 33 additions & 17 deletions lib_eio/core/cancel.ml
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,15 @@ type state =
If a function can succeed in a separate domain,
the user's cancel function is responsible for ensuring that this is done atomically. *)
type t = {
id : Tracing.id;
mutable state : state;
children : t Lwt_dllist.t;
fibers : fiber_context Lwt_dllist.t;
protected : bool;
domain : Domain.id; (* Prevent access from other domains *)
}
and fiber_context = {
tid : Ctf.id;
tid : Tracing.id;
mutable cancel_context : t;
mutable cancel_node : fiber_context Lwt_dllist.node option; (* Our entry in [cancel_context.fibers] *)
mutable cancel_fn : exn -> unit; (* Encourage the current operation to finish *)
Expand Down Expand Up @@ -85,41 +86,55 @@ let is_finished t =

let move_fiber_to t fiber =
let new_node = Lwt_dllist.add_r fiber t.fibers in (* Add to new context *)
Tracing.note_parent ~child:fiber.tid ~parent:t.id;
fiber.cancel_context <- t;
Option.iter Lwt_dllist.remove fiber.cancel_node; (* Remove from old context *)
fiber.cancel_node <- Some new_node

(* Note: the new value is not linked into the cancellation tree. *)
let create ~protected =
let create ?name ?loc ~protected purpose =
let children = Lwt_dllist.create () in
let fibers = Lwt_dllist.create () in
{ state = Finished; children; protected; fibers; domain = Domain.self () }
let id = Tracing.mint_id () in
Tracing.note_created id (Tracing.Cancellation_context {purpose; protected});
Option.iter (Tracing.note_name id) name;
Option.iter (Tracing.note_loc id) loc;
{
id;
state = Finished;
children;
protected;
fibers;
domain = Domain.self ()
}

(* Links [t] into the tree as a child of [parent] and returns a function to remove it again. *)
let activate t ~parent =
assert (t.state = Finished);
assert (parent.state <> Finished);
t.state <- On;
Tracing.note_parent ~child:t.id ~parent:parent.id;
let node = Lwt_dllist.add_r t parent.children in
fun () ->
assert (parent.state <> Finished);
t.state <- Finished;
Tracing.note_resolved t.id ~ex:None;
Lwt_dllist.remove node

(* Runs [fn] with a fresh cancellation context. *)
let with_cc ~ctx:fiber ~parent ~protected fn =
let with_cc ?(name="Cancel.with_cc") ?loc ~ctx:fiber ~parent ~protected purpose fn =
if not protected then check parent;
let t = create ~protected in
let t = create ~name ?loc ~protected purpose in
let deactivate = activate t ~parent in
move_fiber_to t fiber;
let cleanup () = move_fiber_to parent fiber; deactivate () in
match fn t with
| x -> cleanup (); x
| exception ex -> cleanup (); raise ex

let protect fn =
let protect ?loc ?(name="Cancel.protect") ?(purpose=Tracing.Protect) fn =
let ctx = Effect.perform Get_context in
with_cc ~ctx ~parent:ctx.cancel_context ~protected:true @@ fun _ ->
with_cc ~name ?loc ~ctx ~parent:ctx.cancel_context ~protected:true purpose @@ fun _ ->
(* Note: there is no need to check the new context after [fn] returns;
the goal of cancellation is only to finish the thread promptly, not to report the error.
We also do not check the parent context, to make sure the caller has a chance to handle the result. *)
Expand Down Expand Up @@ -164,18 +179,18 @@ let cancel t ex =
| exns -> raise (Cancel_hook_failed exns)
)

let sub fn =
let sub ?loc ?(name="Cancel.sub") ?(purpose=Tracing.Sub) fn =
let ctx = Effect.perform Get_context in
let parent = ctx.cancel_context in
with_cc ~ctx ~parent ~protected:false @@ fun t ->
with_cc ?loc ~name ~ctx ~parent ~protected:false purpose @@ fun t ->
fn t

(* Like [sub], but it's OK if the new context is cancelled.
(instead, return the parent context on exit so the caller can check that) *)
let sub_unchecked fn =
let sub_unchecked ?loc ?(name="Cancel.protect") ~purpose fn =
let ctx = Effect.perform Get_context in
let parent = ctx.cancel_context in
with_cc ~ctx ~parent ~protected:false @@ fun t ->
with_cc ?loc ~name ~ctx ~parent ~protected:false purpose @@ fun t ->
fn t;
parent

Expand All @@ -193,17 +208,18 @@ module Fiber_context = struct
let clear_cancel_fn t =
t.cancel_fn <- ignore

let make ~cc ~vars =
let tid = Ctf.mint_id () in
Ctf.note_created tid Ctf.Task;
let make ?loc ~cc ~vars () =
let tid = Tracing.mint_id () in
Tracing.note_created ?loc tid Tracing.Task;
Tracing.note_parent ~child:tid ~parent:cc.id;
let t = { tid; cancel_context = cc; cancel_node = None; cancel_fn = ignore; vars } in
t.cancel_node <- Some (Lwt_dllist.add_r t cc.fibers);
t

let make_root () =
let cc = create ~protected:false in
let make_root ?loc () =
let cc = create ?loc ~name:"root" ~protected:false Root in
cc.state <- On;
make ~cc ~vars:Hmap.empty
make ?loc ~cc ~vars:Hmap.empty ()

let destroy t =
Option.iter Lwt_dllist.remove t.cancel_node
Expand Down
2 changes: 1 addition & 1 deletion lib_eio/core/debug.ml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ let default_traceln ?__POS__:pos fmt =
Format.pp_close_box f ();
Format.pp_print_flush f ();
let msg = Buffer.contents b in
Ctf.label msg;
Tracing.log msg;
let lines = String.split_on_char '\n' msg in
Mutex.lock traceln_mutex;
Fun.protect ~finally:(fun () -> Mutex.unlock traceln_mutex) @@ fun () ->
Expand Down
6 changes: 5 additions & 1 deletion lib_eio/core/dune
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
(library
(name eio__core)
(public_name eio.core)
(libraries cstruct hmap lwt-dllist fmt optint domain-local-await))
(libraries astring cstruct hmap lwt-dllist fmt optint domain-local-await
(select tracing.ml from
(eio.runtime_events -> tracing.re.ml)
( -> tracing.ctf.ml))
))
2 changes: 1 addition & 1 deletion lib_eio/core/eio__core.ml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ module Private = struct
module Suspend = Suspend
module Cells = Cells
module Broadcast = Broadcast
module Ctf = Ctf
module Tracing = Tracing
module Fiber_context = Cancel.Fiber_context
module Debug = Debug

Expand Down
Loading