diff --git a/CHANGES.md b/CHANGES.md index 08147f26..01526ed2 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -3,6 +3,7 @@ Contributions by Matthew Davidson * Updated docs to use cljdoc.org by default +* Minor doc improvements ### 0.1.9-alpha6 diff --git a/README.md b/README.md index 259e6a54..0532a5c3 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,5 @@ [![Clojars Project](https://img.shields.io/clojars/v/manifold.svg)](https://clojars.org/manifold) -[![cljdoc badge](https://cljdoc.org/badge/manifold/manifold)](https://cljdoc.org/d/manifold/manifold/CURRENT) +[![cljdoc badge](https://cljdoc.org/badge/manifold/manifold)](https://cljdoc.org/d/manifold/manifold) [![CircleCI](https://circleci.com/gh/clj-commons/manifold.svg?style=svg)](https://circleci.com/gh/clj-commons/manifold) ![](docs/manifold.png) @@ -7,7 +7,7 @@ This library provides basic building blocks for asynchronous programming, and ca Manifold provides two core abstractions: **deferreds**, which represent a single asynchronous value, and **streams**, which represent an ordered sequence of asynchronous values. -A detailed discussion of Manifold's rationale can be found [here](http://aleph.io/manifold/rationale.html). Full documentation can be found [here](https://cljdoc.org/d/manifold/manifold/CURRENT). +A detailed discussion of Manifold's rationale can be found [here](http://aleph.io/manifold/rationale.html). Full documentation can be found [here](https://cljdoc.org/d/manifold/manifold). ```clojure @@ -112,13 +112,17 @@ nil Manifold can use any transducer, which are applied via `transform`. It also provides stream-specific transforms, including `zip`, `reduce`, `buffer`, `batch`, and `throttle`. [To learn more about streams, go here](/docs/stream.md). -### Java 8 extensions +### Clojurescript + +A Clojurescript implementation of Manifold can be found here: [dm3/manifold-cljs](https://github.com/dm3/manifold-cljs). + +### Older Java support Manifold includes support for a few classes introduced in Java 8: `java.util.concurrent.CompletableFuture` and `java.util.stream.BaseStream`. -Support for Java 8 is detected automatically at compile time; if you are -AOT compiling Manifold on Java 8 or newer, and will be running the compiled -jar with a Java 7 or older JRE, you will need to disable this feature, by +Support for Java 8+ is detected automatically at compile time; if you are +AOT compiling Manifold on Java 8 or newer, but will be running the compiled +jar with a Java 7 or older JRE, you will need to disable them, by setting the JVM option `"manifold.disable-jvm8-primitives"`, either at the command line with @@ -130,9 +134,6 @@ or by adding to your application's project.clj. -### Clojurescript - -A Clojurescript implementation of Manifold can be found here: [dm3/manifold-cljs](https://github.com/dm3/manifold-cljs). ### License diff --git a/docs/deferred.md b/docs/deferred.md index 6c39eff1..013a28b0 100644 --- a/docs/deferred.md +++ b/docs/deferred.md @@ -1,8 +1,8 @@ -### deferreds +### Deferreds A deferred in Manifold is similar to a Clojure promise: -```clj +```clojure > (require '[manifold.deferred :as d]) nil @@ -18,7 +18,7 @@ true However, similar to Clojure's futures, deferreds in Manifold can also represent errors. Crucially, they also allow for callbacks to be registered, rather than simply blocking on dereferencing. -```clj +```clojure > (def d (d/deferred)) #'d @@ -29,7 +29,7 @@ true Exception: boom ``` -```clj +```clojure > (def d (d/deferred)) #'d @@ -43,13 +43,13 @@ success! :foo true ``` -### composing with deferreds +### Composing with deferreds Callbacks are a useful building block, but they're a painful way to create asynchronous workflows. In practice, no one should ever use `on-realized`. Instead, they should use `manifold.deferred/chain`, which chains together callbacks, left to right: -```clj +```clojure > (def d (d/deferred)) #'d @@ -65,7 +65,7 @@ true Values that can be coerced into a deferred include Clojure futures, Java futures, and Clojure promises. -```clj +```clojure > (def d (d/deferred)) #'d @@ -81,7 +81,7 @@ true If any stage in `chain` throws an exception or returns a deferred that yields an error, all subsequent stages are skipped, and the deferred returned by `chain` yields that same error. To handle these cases, you can use `manifold.deferred/catch`: -```clj +```clojure > (def d (d/deferred)) #p @@ -99,14 +99,14 @@ Using the `->` threading operator, `chain` and `catch` can be easily and arbitra To combine multiple deferrable values into a single deferred that yields all their results, we can use `manifold.deferred/zip`: -```clj +```clojure > @(d/zip (future 1) (future 2) (future 3)) (1 2 3) ``` Finally, we can use `manifold.deferred/timeout!` to register a timeout on the deferred which will yield either a specified timeout value or a `TimeoutException` if the deferred is not realized within `n` milliseconds. -```clj +```clojure > @(d/timeout! (d/future (Thread/sleep 1000) :foo) 100 @@ -126,7 +126,7 @@ Wherever possible, use `manifold.deferred/deferred` instead of `promise`, and `m Let's say that we have two services which provide us numbers, and want to get their sum. By using `zip` and `chain` together, this is relatively straightforward: -```clj +```clojure (defn deferred-sum [] (let [a (call-service-a) b (call-service-b)] @@ -137,7 +137,7 @@ Let's say that we have two services which provide us numbers, and want to get th However, this isn't a very direct expression of what we're doing. For more complex relationships between deferred values, our code will become even more difficult to understand. In these cases, it's often best to use `let-flow`. -```clj +```clojure (defn deferred-sum [] (let-flow [a (call-service-a) b (call-service-b)] @@ -146,7 +146,7 @@ However, this isn't a very direct expression of what we're doing. For more comp In `let-flow`, we can treat deferred values as if they're realized. This is only true of values declared within or closed over by `let-flow`, however. So we can do this: -```clj +```clojure (let [a (future 1)] (let-flow [b (future (+ a 1)) c (+ b 1)] @@ -155,7 +155,7 @@ In `let-flow`, we can treat deferred values as if they're realized. This is onl but not this: -```clj +```clojure (let-flow [a (future 1) b (let [c (future 1)] (+ a c))] @@ -170,7 +170,7 @@ It can be helpful to think of `let-flow` as similar to Prismatic's [Graph](https Manifold also provides a `loop` macro, which allows for asynchronous loops to be defined. Consider `manifold.stream/consume`, which allows a function to be invoked with each new message from a stream. We can implement similar behavior like so: -```clj +```clojure (require '[manifold.deferred :as d] '[manifold.stream :as s]) @@ -196,6 +196,6 @@ Here we define a loop which takes messages one at a time from `stream`, and pass While Manifold doesn't provide anything as general purpose as core.async's `go` macro, the combination of `loop` and `let-flow` can allow for the specification of highly intricate asynchronous workflows. -### custom execution models +### Custom execution models Both deferreds and streams allow for custom execution models to be specified. To learn more, [go here](/docs/execution.md). diff --git a/docs/execution.md b/docs/execution.md index 407d4bef..373108c9 100644 --- a/docs/execution.md +++ b/docs/execution.md @@ -1,33 +1,32 @@ -Concurrent systems separate **what** happens from **when** it happens. This is typically accomplished by specifying what the programmers wants to happen (e.g. callbacks), and layering atop an execution model that decides when and where the code should be run (e.g. one or more threads reading from a queue of callbacks to be invoked). Often, this execution model is hardcoded, making interop between different stream representations much harder than necessary. +Concurrent systems separate **what** happens from **when** it happens. This is typically accomplished by specifying what the programmers wants to happen (e.g. callbacks), and layering atop an execution model that decides when and where the code should be run (e.g. one or more threads reading from a queue of callbacks to be invoked). Often, this execution model is hard-coded, making interop between different stream representations much harder than necessary. Manifold tries to make its execution model as configurable as possible, while still remaining functional for users who don't want to fiddle with the low-level details. Under different circumstances, Manifold will lazily construct three different pools: -* *wait-pool* - Used solely to wait on blocking operations. Only created when `manifold.stream/connect` is used on blocking stream abstractions like `java.util.BlockingQueue` or Clojure seqs, or `manifold.deferred/chain` is used with abstractions like `java.util.concurrent.Future` or Clojure promises. This is an instrumented pool, and statistics can be consumed via `manifold.executor/register-wait-pool-stats-callback`. +* *wait-pool* - Used solely to wait on blocking operations. Only created when `manifold.stream/connect` is used on blocking stream abstractions like `java.util.BlockingQueue` or Clojure seqs, or when `manifold.deferred/chain` is used with abstractions like `java.util.concurrent.Future` or Clojure promises. This is an instrumented pool, and statistics can be consumed via `manifold.executor/register-wait-pool-stats-callback`. * *execute-pool* - Used to execute `manifold.deferred/future` bodies, and only created if that macro is used. This is an instrumented pool, and statistics can be consumed via `manifold.executor/register-execute-pool-stats-callback`. * *scheduler-pool* - Used to execute delayed tasks, periodic tasks, or timeouts. Only created when `manifold.time/in`, `manifold.time/every`, `manifold.stream/periodically`, or take/put timeouts are used. -However, by default messages are processed on whatever thread they were originally `put!` on. This can get more complicated if multiple threads are calling `put!` on the same stream at the same time, in which case one thread may propagate messages from the other thread. In general, this means that Manifold conforms to whatever the surrounding execution model is, and users can safely use it in concert with other frameworks. +However, by default, messages are processed on whatever thread they were originally `put!` on. This can get more complicated if multiple threads are calling `put!` on the same stream at the same time, in which case one thread may propagate messages from the other thread. In general, this means that Manifold conforms to whatever the surrounding execution model is, and users can safely use it in concert with other frameworks. -However, this also means that `put!` will only return once the message has been completely propagated through the downstream topology, which is not always the desired behavior. The same is also true for a deferred with a long chain of methods waiting on it to be realized. Conversely, in core.async each hand-off between goroutines is a new task enqueued onto the main thread pool. This gives better guarantees as to how long an enqueue operation will take before it returns, which can be useful in some situations. +This also means that `put!` will only return once the message has been completely propagated through the downstream topology, which is not always the desired behavior. The same is also true for a deferred with a long chain of methods waiting on it to be realized. Conversely, in core.async each hand-off between goroutines is a new task enqueued onto the main thread pool. This gives better guarantees as to how long an enqueue operation will take before it returns, which can be useful in some situations. In these cases, we can move the stream or deferred `onto` an executor, guaranteeing that all actions resulting from an operation will be enqueued onto a thread pool rather than immediately executed. This executor can be generated via `manifold.executor/instrumented-executor`, or using the convenience methods `fixed-thread-executor` and `utilization-executor`. In addition to providing automatic instrumentation, these executors will guarantee that any streams or deferred created within their scope will also be "on" that executor. For this reason, it's often sufficient to only call `onto` on a single stream in a topology, as everything downstream of it will transitively be executed on the executor. -```clj -(require - '[manifold.deferred :as d] - '[manifold.stream :as s]) +```clojure +(require '[manifold.deferred :as d] + '[manifold.stream :as s]) (def executor (fixed-thread-executor 42)) (-> (d/future 1) - (d/onto executor) - (d/chain inc inc inc)) + (d/onto executor) + (d/chain inc inc inc)) (->> (s/->source (range 1e3)) - (s/onto executor) - (s/map inc)) + (s/onto executor) + (s/map inc)) ``` -If you want to specify your own thread pool, it's important to note that such thread pools in practice either need to have an unbounded queue or an unbounded number of threads. This is because thread pools with bounded queues and threads will throw a `RejectedExecutionException` when they're full, which can leave our message processing in an undefined state if we're only halfway through the message topology. It's important to note, though, that the maximum number of enqueued actions is **not** equal to the number of messages we need to process, but rather to the number of nodes in our topology. This number is usually either fixed, or proportional to something else we can control, such as the number of open connections. In either case, it is not something that a single external actor can artifically inflate (or at least it shouldn't be). +If you want to specify your own thread pool, it's important to note that such thread pools in practice either need to have an unbounded queue or an unbounded number of threads. This is because thread pools with bounded queues and threads will throw a `RejectedExecutionException` when they're full, which can leave our message processing in an undefined state if we're only halfway through the message topology. It's important to note, though, that the maximum number of enqueued actions is **not** equal to the number of messages we need to process, but rather to the number of nodes in our topology. This number is usually either fixed, or proportional to something else we can control, such as the number of open connections. In either case, it is not something that a single external actor can artificially inflate (or at least it shouldn't be). This configurability is necessary given Manifold's goal of interop with other stream representations, but is only meant to be used by those who need it. Most can, and should, ignore it. diff --git a/docs/stream.md b/docs/stream.md index 8d5ff759..ec44c1e0 100644 --- a/docs/stream.md +++ b/docs/stream.md @@ -1,8 +1,8 @@ -### stream basics +### Stream basics A Manifold stream can be created using `manifold.stream/stream`: -```clj +```clojure > (require '[manifold.stream :as s]) nil > (def s (s/stream)) @@ -11,7 +11,7 @@ nil A stream can be thought of as two separate halves: a **sink** which consumes messages, and a **source** which produces them. We can `put!` messages into the sink, and `take!` them from the source: -```clj +```clojure > (s/put! s 1) << ... >> > (s/take! s) @@ -22,7 +22,7 @@ Notice that both `put!` and `take!` return [deferred values](/docs/deferred.md). Sinks can be **closed** by calling `close!`, which means they will no longer accept messages. -```clj +```clojure > (s/close! s) nil > @(s/put! s 1) @@ -31,18 +31,18 @@ false We can check if a sink is closed by calling `closed?`, and register a no-arg callback using `on-closed` to be notified when the sink is closed. -Sources that will never produce any more messages (often because the corresponding sink is closed) are said to be **drained**. We may check whether a source is drained via `drained?` and `on-drained`. +Sources that will never produce any more messages (often because the corresponding sink is closed) are said to be **drained**. We may check whether a source is drained via `drained?` and register callbacks with `on-drained`. By default, calling `take!` on a drained source will yield a message of `nil`. However, if `nil` is a valid message, we may want to specify some other return value to denote that the source is drained: -```clj +```clojure > @(s/take! s ::drained) ::drained ``` We may also want to put a time limit on how long we're willing to wait on our put or take to complete. For this, we can use `try-put!` and `try-take!`: -```clj +```clojure > (def s (s/stream)) #'s > @(s/try-put! s :foo 1000 ::timeout) @@ -51,18 +51,18 @@ We may also want to put a time limit on how long we're willing to wait on our pu Here we try to put a message into the stream, but since there are no consumers, it will fail after waiting for 1000ms. Here we've specified `::timeout` as our special timeout value, otherwise it would simply return `false`. -```clj +```clojure > @(s/try-take! s ::drained 1000 ::timeout) ::timeout ``` Again, we specify the timeout and special timeout value. When using `try-take!`, we must specify return values for both the drained and timeout outcomes. -### stream operators +### Stream operators The simplest thing we can do with a stream is consume every message that comes into it: -```clj +```clojure > (s/consume #(prn 'message! %) s) nil > @(s/put! s 1) @@ -72,36 +72,36 @@ true However, we can also create derivative streams using operators analogous to Clojure's sequence operators, a full list of which [can be found here](http://ideolalia.com/manifold/): -```clj +```clojure > (->> [1 2 3] - s/->source - (s/map inc) - s/stream->seq) + s/->source + (s/map inc) + s/stream->seq) (2 3 4) ``` Here, we've mapped `inc` over a stream, transforming from a sequence to a stream and then back to a sequence for the sake of a concise example. Note that calling `manifold.stream/map` on a sequence will automatically call `->source`, so we can actually omit that, leaving just: -```clj +```clojure > (->> [1 2 3] - (s/map inc) - s/stream->seq) + (s/map inc) + s/stream->seq) (2 3 4) ``` Since streams are not immutable, in order to treat it as a sequence we must do an explicit transformation via `stream->seq`: -```clj +```clojure > (->> [1 2 3] - s/->source - s/stream->seq - (map inc)) + s/->source + s/stream->seq + (map inc)) (2 3 4) ``` Note that we can create multiple derived streams from the same source: -```clj +```clojure > (def s (s/stream)) #'s > (def a (s/map inc s)) @@ -122,21 +122,21 @@ If `s` is closed, both `a` and `b` will be closed, as will any other downstream For any Clojure operation that doesn't have an equivalent in `manifold.stream`, we can use `manifold.stream/transform` with a transducer: -```clj +```clojure > (->> [1 2 3] - (s/transform (map inc)) - s/stream->seq) + (s/transform (map inc)) + s/stream->seq) (2 3 4) ``` There's also `(periodically period f)`, which behaves like `(repeatedly f)`, but will emit the result of `(f)` every `period` milliseconds. -### connecting streams +### Connecting streams Having created an event source through composition of operators, we will often want to feed all messages into a sink. This can be accomplished via `connect`: -```clj +```clojure > (def a (s/stream)) #'a > (def b (s/stream)) @@ -151,16 +151,16 @@ true Again, we see that our message is immediately accepted into `a`, and can be read from `b`. We may also pass an options map into `connect`, with any of the following keys: -| field | description | +| Field | Description | |-------|-------------| | `downstream?` | whether the source closing will close the sink, defaults to `true` | | `upstream?` | whether the sink closing will close the source, *even if there are other sinks downstream of the source*, defaults to `false` | | `timeout` | the maximum time that will be spent waiting to convey a message into the sink before the connection is severed, defaults to `nil` | | `description` | a description of the connection between the source and sink, useful for introspection purposes | -Upon connecting two streams, we can inspect any of the streams using `description`, and follow the flow of data using `downstream`: +After connecting two streams, we can inspect any of the streams using `description`, and follow the flow of data using `downstream`: -```clj +```clojure > (def a (s/stream)) #'a > (def b (s/stream)) @@ -173,9 +173,11 @@ nil (["a connection" << stream: ... >>]) ``` -We can recursively apply `downstream` to traverse the entire topology of our streams. This can be a powerful way to reason about the structure of our running processes, but sometimes we want to change the message from the source before it's placed into the sink. For this, we can use `connect-via`: +We can recursively apply `downstream` to traverse the entire topology of our streams. This can be a powerful way to reason about the structure of our running processes. -```clj +Sometimes we want to change the message from the source before it's placed into the sink. For this, we can use `connect-via`: + +```clojure > (def a (s/stream)) #'a > (def b (s/stream)) @@ -186,7 +188,7 @@ nil Note that `connect-via` takes an argument between the source and sink, which is a single-argument callback. This callback will be invoked with messages from the source, under the assumption that they will be propagated to the sink. This is the underlying mechanism for `map`, `filter`, and other stream operators; it allow us to create complex operations that are visible via `downstream`: -```clj +```clojure > (def a (s/stream)) #'a > (s/map inc a) @@ -199,7 +201,7 @@ Each element returned by `downstream` is a 2-tuple, the first element describing The value returned by the callback for `connect-via` provides backpressure - if a deferred value is returned, further messages will not be passed in until the deferred value is realized. -### buffers and backpressure +### Buffers and backpressure We saw above that if we attempt to put a message into a stream, it won't succeed until the value is taken out. This is because the default stream has no buffer; it simply conveys messages from producers to consumers. If we want to create a stream with a buffer, we can simply call `(stream buffer-size)`. We can also call `(buffer size stream)` to create a buffer downstream of an existing stream. @@ -207,7 +209,7 @@ We may also call `(buffer metric limit stream)`, if we don't want to measure our To limit the rate of messages from a stream, we can use `(throttle max-rate stream)`. -### event buses and publish/subscribe models +### Event buses and publish/subscribe models Manifold provides a simple publish/subscribe mechanism in the `manifold.bus` namespace. To create an event bus, we can use `(event-bus)`. To publish to a particular topic on that bus, we use `(publish! bus topic msg)`. To get a stream representing all messages on a topic, we can call `(subscribe bus topic)`.