Skip to content

Commit

Permalink
Minor doc fixes and improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
KingMob committed Sep 4, 2021
1 parent 892c4a4 commit c128663
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 73 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
Contributions by Matthew Davidson

* Updated docs to use cljdoc.org by default
* Minor doc improvements

### 0.1.9-alpha6

Expand Down
19 changes: 10 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
[![Clojars Project](https://img.shields.io/clojars/v/manifold.svg)](https://clojars.org/manifold)
[![cljdoc badge](https://cljdoc.org/badge/manifold/manifold)](https://cljdoc.org/d/manifold/manifold/CURRENT)
[![cljdoc badge](https://cljdoc.org/badge/manifold/manifold)](https://cljdoc.org/d/manifold/manifold)
[![CircleCI](https://circleci.com/gh/clj-commons/manifold.svg?style=svg)](https://circleci.com/gh/clj-commons/manifold)
![](docs/manifold.png)

This library provides basic building blocks for asynchronous programming, and can be used as a translation layer between libraries which use similar but incompatible abstractions.

Manifold provides two core abstractions: **deferreds**, which represent a single asynchronous value, and **streams**, which represent an ordered sequence of asynchronous values.

A detailed discussion of Manifold's rationale can be found [here](http://aleph.io/manifold/rationale.html). Full documentation can be found [here](https://cljdoc.org/d/manifold/manifold/CURRENT).
A detailed discussion of Manifold's rationale can be found [here](http://aleph.io/manifold/rationale.html). Full documentation can be found [here](https://cljdoc.org/d/manifold/manifold).


```clojure
Expand Down Expand Up @@ -112,13 +112,17 @@ nil

Manifold can use any transducer, which are applied via `transform`. It also provides stream-specific transforms, including `zip`, `reduce`, `buffer`, `batch`, and `throttle`. [To learn more about streams, go here](/docs/stream.md).

### Java 8 extensions
### Clojurescript

A Clojurescript implementation of Manifold can be found here: [dm3/manifold-cljs](https://github.com/dm3/manifold-cljs).

### Older Java support

Manifold includes support for a few classes introduced in Java 8:
`java.util.concurrent.CompletableFuture` and `java.util.stream.BaseStream`.
Support for Java 8 is detected automatically at compile time; if you are
AOT compiling Manifold on Java 8 or newer, and will be running the compiled
jar with a Java 7 or older JRE, you will need to disable this feature, by
Support for Java 8+ is detected automatically at compile time; if you are
AOT compiling Manifold on Java 8 or newer, but will be running the compiled
jar with a Java 7 or older JRE, you will need to disable them, by
setting the JVM option `"manifold.disable-jvm8-primitives"`, either at the
command line with

Expand All @@ -130,9 +134,6 @@ or by adding

to your application's project.clj.

### Clojurescript

A Clojurescript implementation of Manifold can be found here: [dm3/manifold-cljs](https://github.com/dm3/manifold-cljs).

### License

Expand Down
32 changes: 16 additions & 16 deletions docs/deferred.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
### deferreds
### Deferreds

A deferred in Manifold is similar to a Clojure promise:

```clj
```clojure
> (require '[manifold.deferred :as d])
nil

Expand All @@ -18,7 +18,7 @@ true

However, similar to Clojure's futures, deferreds in Manifold can also represent errors. Crucially, they also allow for callbacks to be registered, rather than simply blocking on dereferencing.

```clj
```clojure
> (def d (d/deferred))
#'d

Expand All @@ -29,7 +29,7 @@ true
Exception: boom
```

```clj
```clojure
> (def d (d/deferred))
#'d

Expand All @@ -43,13 +43,13 @@ success! :foo
true
```

### composing with deferreds
### Composing with deferreds

Callbacks are a useful building block, but they're a painful way to create asynchronous workflows. In practice, no one should ever use `on-realized`.

Instead, they should use `manifold.deferred/chain`, which chains together callbacks, left to right:

```clj
```clojure
> (def d (d/deferred))
#'d

Expand All @@ -65,7 +65,7 @@ true

Values that can be coerced into a deferred include Clojure futures, Java futures, and Clojure promises.

```clj
```clojure
> (def d (d/deferred))
#'d

Expand All @@ -81,7 +81,7 @@ true

If any stage in `chain` throws an exception or returns a deferred that yields an error, all subsequent stages are skipped, and the deferred returned by `chain` yields that same error. To handle these cases, you can use `manifold.deferred/catch`:

```clj
```clojure
> (def d (d/deferred))
#p

Expand All @@ -99,14 +99,14 @@ Using the `->` threading operator, `chain` and `catch` can be easily and arbitra

To combine multiple deferrable values into a single deferred that yields all their results, we can use `manifold.deferred/zip`:

```clj
```clojure
> @(d/zip (future 1) (future 2) (future 3))
(1 2 3)
```

Finally, we can use `manifold.deferred/timeout!` to register a timeout on the deferred which will yield either a specified timeout value or a `TimeoutException` if the deferred is not realized within `n` milliseconds.

```clj
```clojure
> @(d/timeout!
(d/future (Thread/sleep 1000) :foo)
100
Expand All @@ -126,7 +126,7 @@ Wherever possible, use `manifold.deferred/deferred` instead of `promise`, and `m

Let's say that we have two services which provide us numbers, and want to get their sum. By using `zip` and `chain` together, this is relatively straightforward:

```clj
```clojure
(defn deferred-sum []
(let [a (call-service-a)
b (call-service-b)]
Expand All @@ -137,7 +137,7 @@ Let's say that we have two services which provide us numbers, and want to get th

However, this isn't a very direct expression of what we're doing. For more complex relationships between deferred values, our code will become even more difficult to understand. In these cases, it's often best to use `let-flow`.

```clj
```clojure
(defn deferred-sum []
(let-flow [a (call-service-a)
b (call-service-b)]
Expand All @@ -146,7 +146,7 @@ However, this isn't a very direct expression of what we're doing. For more comp

In `let-flow`, we can treat deferred values as if they're realized. This is only true of values declared within or closed over by `let-flow`, however. So we can do this:

```clj
```clojure
(let [a (future 1)]
(let-flow [b (future (+ a 1))
c (+ b 1)]
Expand All @@ -155,7 +155,7 @@ In `let-flow`, we can treat deferred values as if they're realized. This is onl

but not this:

```clj
```clojure
(let-flow [a (future 1)
b (let [c (future 1)]
(+ a c))]
Expand All @@ -170,7 +170,7 @@ It can be helpful to think of `let-flow` as similar to Prismatic's [Graph](https

Manifold also provides a `loop` macro, which allows for asynchronous loops to be defined. Consider `manifold.stream/consume`, which allows a function to be invoked with each new message from a stream. We can implement similar behavior like so:

```clj
```clojure
(require
'[manifold.deferred :as d]
'[manifold.stream :as s])
Expand All @@ -196,6 +196,6 @@ Here we define a loop which takes messages one at a time from `stream`, and pass

While Manifold doesn't provide anything as general purpose as core.async's `go` macro, the combination of `loop` and `let-flow` can allow for the specification of highly intricate asynchronous workflows.

### custom execution models
### Custom execution models

Both deferreds and streams allow for custom execution models to be specified. To learn more, [go here](/docs/execution.md).
25 changes: 12 additions & 13 deletions docs/execution.md
Original file line number Diff line number Diff line change
@@ -1,33 +1,32 @@
Concurrent systems separate **what** happens from **when** it happens. This is typically accomplished by specifying what the programmers wants to happen (e.g. callbacks), and layering atop an execution model that decides when and where the code should be run (e.g. one or more threads reading from a queue of callbacks to be invoked). Often, this execution model is hardcoded, making interop between different stream representations much harder than necessary.
Concurrent systems separate **what** happens from **when** it happens. This is typically accomplished by specifying what the programmers wants to happen (e.g. callbacks), and layering atop an execution model that decides when and where the code should be run (e.g. one or more threads reading from a queue of callbacks to be invoked). Often, this execution model is hard-coded, making interop between different stream representations much harder than necessary.

Manifold tries to make its execution model as configurable as possible, while still remaining functional for users who don't want to fiddle with the low-level details. Under different circumstances, Manifold will lazily construct three different pools:

* *wait-pool* - Used solely to wait on blocking operations. Only created when `manifold.stream/connect` is used on blocking stream abstractions like `java.util.BlockingQueue` or Clojure seqs, or `manifold.deferred/chain` is used with abstractions like `java.util.concurrent.Future` or Clojure promises. This is an instrumented pool, and statistics can be consumed via `manifold.executor/register-wait-pool-stats-callback`.
* *wait-pool* - Used solely to wait on blocking operations. Only created when `manifold.stream/connect` is used on blocking stream abstractions like `java.util.BlockingQueue` or Clojure seqs, or when `manifold.deferred/chain` is used with abstractions like `java.util.concurrent.Future` or Clojure promises. This is an instrumented pool, and statistics can be consumed via `manifold.executor/register-wait-pool-stats-callback`.
* *execute-pool* - Used to execute `manifold.deferred/future` bodies, and only created if that macro is used. This is an instrumented pool, and statistics can be consumed via `manifold.executor/register-execute-pool-stats-callback`.
* *scheduler-pool* - Used to execute delayed tasks, periodic tasks, or timeouts. Only created when `manifold.time/in`, `manifold.time/every`, `manifold.stream/periodically`, or take/put timeouts are used.

However, by default messages are processed on whatever thread they were originally `put!` on. This can get more complicated if multiple threads are calling `put!` on the same stream at the same time, in which case one thread may propagate messages from the other thread. In general, this means that Manifold conforms to whatever the surrounding execution model is, and users can safely use it in concert with other frameworks.
However, by default, messages are processed on whatever thread they were originally `put!` on. This can get more complicated if multiple threads are calling `put!` on the same stream at the same time, in which case one thread may propagate messages from the other thread. In general, this means that Manifold conforms to whatever the surrounding execution model is, and users can safely use it in concert with other frameworks.

However, this also means that `put!` will only return once the message has been completely propagated through the downstream topology, which is not always the desired behavior. The same is also true for a deferred with a long chain of methods waiting on it to be realized. Conversely, in core.async each hand-off between goroutines is a new task enqueued onto the main thread pool. This gives better guarantees as to how long an enqueue operation will take before it returns, which can be useful in some situations.
This also means that `put!` will only return once the message has been completely propagated through the downstream topology, which is not always the desired behavior. The same is also true for a deferred with a long chain of methods waiting on it to be realized. Conversely, in core.async each hand-off between goroutines is a new task enqueued onto the main thread pool. This gives better guarantees as to how long an enqueue operation will take before it returns, which can be useful in some situations.

In these cases, we can move the stream or deferred `onto` an executor, guaranteeing that all actions resulting from an operation will be enqueued onto a thread pool rather than immediately executed. This executor can be generated via `manifold.executor/instrumented-executor`, or using the convenience methods `fixed-thread-executor` and `utilization-executor`. In addition to providing automatic instrumentation, these executors will guarantee that any streams or deferred created within their scope will also be "on" that executor. For this reason, it's often sufficient to only call `onto` on a single stream in a topology, as everything downstream of it will transitively be executed on the executor.

```clj
(require
'[manifold.deferred :as d]
'[manifold.stream :as s])
```clojure
(require '[manifold.deferred :as d]
'[manifold.stream :as s])

(def executor (fixed-thread-executor 42))

(-> (d/future 1)
(d/onto executor)
(d/chain inc inc inc))
(d/onto executor)
(d/chain inc inc inc))

(->> (s/->source (range 1e3))
(s/onto executor)
(s/map inc))
(s/onto executor)
(s/map inc))
```

If you want to specify your own thread pool, it's important to note that such thread pools in practice either need to have an unbounded queue or an unbounded number of threads. This is because thread pools with bounded queues and threads will throw a `RejectedExecutionException` when they're full, which can leave our message processing in an undefined state if we're only halfway through the message topology. It's important to note, though, that the maximum number of enqueued actions is **not** equal to the number of messages we need to process, but rather to the number of nodes in our topology. This number is usually either fixed, or proportional to something else we can control, such as the number of open connections. In either case, it is not something that a single external actor can artifically inflate (or at least it shouldn't be).
If you want to specify your own thread pool, it's important to note that such thread pools in practice either need to have an unbounded queue or an unbounded number of threads. This is because thread pools with bounded queues and threads will throw a `RejectedExecutionException` when they're full, which can leave our message processing in an undefined state if we're only halfway through the message topology. It's important to note, though, that the maximum number of enqueued actions is **not** equal to the number of messages we need to process, but rather to the number of nodes in our topology. This number is usually either fixed, or proportional to something else we can control, such as the number of open connections. In either case, it is not something that a single external actor can artificially inflate (or at least it shouldn't be).

This configurability is necessary given Manifold's goal of interop with other stream representations, but is only meant to be used by those who need it. Most can, and should, ignore it.
Loading

0 comments on commit c128663

Please sign in to comment.