Skip to content

Implement MonadPar and MonadRace #62

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Jun 8, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/.*
!/.gitignore
!/.travis.yml
/output/
/node_modules/
/bower_components/
/node_modules/
/output/
20 changes: 10 additions & 10 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
language: node_js
sudo: false
node_js: 5
dist: trusty
sudo: required
node_js: 6
install:
- npm install -g bower
- npm install
- npm install bower -g
- bower install
script:
- npm test
- bower install --production
- npm run -s build
- bower install
- npm -s test
after_success:
- >-
test $TRAVIS_TAG &&
node_modules/.bin/psc-publish > .pursuit.json &&
curl -X POST http://pursuit.purescript.org/packages \
-d @.pursuit.json \
-H 'Accept: application/json' \
-H "Authorization: token ${GITHUB_TOKEN}"
echo $GITHUB_TOKEN | pulp login &&
echo y | pulp publish --no-push
46 changes: 26 additions & 20 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,16 @@

An asynchronous effect monad for PureScript.

The moral equivalent of `ErrorT (ContT Unit (Eff e) a`, for effects `e`.
The moral equivalent of `ErrorT (ContT Unit (Eff e)) a`, for effects `e`.

`Aff` lets you say goodbye to monad transformers and callback hell!

# Example

```purescript
main = launchAff $
do response <- Ajax.get "http://foo.bar"
liftEff $ log response.body
main = launchAff do
response <- Ajax.get "http://foo.bar"
liftEff $ log response.body
```

See the [tests](https://github.com/slamdata/purescript-aff/blob/master/test/Test/Main.purs) for more examples.
Expand All @@ -33,10 +33,10 @@ bower install purescript-aff
An example of `Aff` is shown below:

```purescript
deleteBlankLines path =
do contents <- loadFile path
let contents' = S.join "\n" $ A.filter (\a -> S.length a > 0) (S.split "\n" contents)
saveFile path contents'
deleteBlankLines path = do
contents <- loadFile path
let contents' = S.join "\n" $ A.filter (\a -> S.length a > 0) (S.split "\n" contents)
saveFile path contents'
```

This looks like ordinary, synchronous, imperative code, but actually operates asynchronously without any callbacks. Error handling is baked in so you only deal with it when you want to.
Expand Down Expand Up @@ -149,23 +149,23 @@ Here's an example of how you can use them:

```purescript
do resp <- (Ajax.get "http://foo.com") `catchError` (const $ pure defaultResponse)
if resp.statusCode != 200 then throwError myErr
if resp.statusCode != 200 then throwError myErr
else pure resp.body
```

Thrown exceptions are propagated on the error channel, and can be recovered from using `attempt` or `catchError`.

## Forking

Using the `forkAff`, you can "fork" an asynchronous computation, which means
Using the `forkAff`, you can "fork" an asynchronous computation, which means
that its activities will not block the current thread of execution:

```purescript
forkAff myAff
```

Because Javascript is single-threaded, forking does not actually cause the
computation to be run in a separate thread. Forking just allows the subsequent
Because Javascript is single-threaded, forking does not actually cause the
computation to be run in a separate thread. Forking just allows the subsequent
actions to execute without waiting for the forked computation to complete.

If the asynchronous computation supports it, you can "kill" a forked computation
Expand All @@ -191,28 +191,34 @@ The `Control.Monad.Aff.AVar` module contains asynchronous variables, which are v
```purescript
do v <- makeVar
forkAff (later $ putVar v 1.0)
a <- takeVar v
a <- takeVar v
liftEff $ log ("Succeeded with " ++ show a)
```

You can use these constructs as one-sided blocking queues, which suspend (if
You can use these constructs as one-sided blocking queues, which suspend (if
necessary) on `take` operations, or as asynchronous, empty-or-full variables.

## Parallel Execution

If you only need the power of `Applicative`, then instead of using the monadic `Aff`, you can use the `Par` newtype wrapper defined in `Control.Monad.Aff.Par`.
There are `MonadPar` and `MonadRace` instances defined for `Aff`, allowing for parallel execution of `Aff` computations.

This provides parallel instances of `Apply` and `Alt`.
There are two ways of taking advantage of these instances - directly through the `par` and `race` functions from these classes, or by using the `Parallel` newtype wrapper that enables parallel behaviours through the `Applicative` and `Alternative` operators.

In the following example, two Ajax requests are initiated simultaneously (rather than in sequence, as they would be for `Aff`):
In the following example, using the newtype, two Ajax requests are initiated simultaneously (rather than in sequence, as they would be for `Aff`):

```purescript
runPar (f <$> Par (Ajax.get "http://foo.com") <*> Par (Ajax.get "http://foo.com"))
runParallel (f <$> parallel (Ajax.get "http://foo.com") <*> parallel (Ajax.get "http://foo.com"))
```

The `(<|>)` operator of the `Alt` instance of `Par` allows you to race two asynchronous computations, and use whichever value comes back first (or the first error, if both err).
And the equivalent using the `MonadPar` function directly:

The `runPar` function allows you to unwrap the `Aff` and return to normal monadic (sequential) composition.
```purescript
par f (Ajax.get "http://foo.com") (Ajax.get "http://foo.com")
```

The `race` function from `MonadPar` or the `(<|>)` operator of the `Alt` instance of `Parallel` allows you to race two asynchronous computations, and use whichever value comes back first (or the first error, if both err).

The `runParallel` function allows you to unwrap the `Aff` and return to normal monadic (sequential) composition.

A parallel computation can be canceled if both of its individual components can be canceled.

Expand Down
4 changes: 3 additions & 1 deletion bower.json
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
"purescript-console": "^1.0.0",
"purescript-exceptions": "^1.0.0",
"purescript-functions": "^1.0.0",
"purescript-transformers": "^1.0.0"
"purescript-parallel": "^1.0.0",
"purescript-transformers": "^1.0.0",
"purescript-unsafe-coerce": "^1.0.0"
},
"devDependencies": {
"purescript-partial": "^1.1.2"
Expand Down
7 changes: 4 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@
"private": true,
"scripts": {
"clean": "rimraf output && rimraf .pulp-cache",
"build": "pulp build",
"build": "pulp build --censor-lib --strict",
"test": "pulp test"
},
"devDependencies": {
"purescript": "^0.9.1-rc.1",
"pulp": "^9.0.0",
"rimraf": "^2.5.2"
"purescript-psa": "^0.3.9",
"purescript": "^0.9.1",
"rimraf": "^2.5.0"
}
}
50 changes: 49 additions & 1 deletion src/Control/Monad/Aff.purs
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,27 @@ module Control.Monad.Aff
where

import Prelude

import Control.Alt (class Alt)
import Control.Alternative (class Alternative)
import Control.Monad.Aff.Internal (AVBox, AVar, _killVar, _putVar, _takeVar, _makeVar)
import Control.Monad.Cont.Class (class MonadCont)
import Control.Monad.Eff (Eff)
import Control.Monad.Eff.Class (class MonadEff)
import Control.Monad.Eff.Exception (Error, EXCEPTION, throwException, error)
import Control.Monad.Error.Class (class MonadError, throwError)
import Control.Monad.Rec.Class (class MonadRec)
import Control.MonadPlus (class MonadZero, class MonadPlus)
import Control.Parallel.Class (class MonadRace, class MonadPar)
import Control.Plus (class Plus)

import Data.Either (Either(..), either, isLeft)
import Data.Foldable (class Foldable, foldl)
import Data.Function.Uncurried (Fn2, Fn3, runFn2, runFn3)
import Data.Monoid (class Monoid, mempty)

import Unsafe.Coerce (unsafeCoerce)

-- | An asynchronous computation with effects `e`. The computation either
-- | errors or produces a value of type `a`.
-- |
Expand All @@ -54,7 +60,7 @@ type PureAff a = forall e. Aff e a
newtype Canceler e = Canceler (Error -> Aff e Boolean)

-- | Unwraps the canceler function from the newtype that wraps it.
cancel :: forall e. Canceler e -> Error -> Aff e Boolean
cancel :: forall e. Canceler e -> Error -> Aff e Boolean
cancel (Canceler f) = f

-- | This function allows you to attach a custom canceler to an asynchronous
Expand Down Expand Up @@ -202,6 +208,48 @@ instance semigroupCanceler :: Semigroup (Canceler e) where
instance monoidCanceler :: Monoid (Canceler e) where
mempty = Canceler (const (pure true))

instance monadParAff :: MonadPar (Aff e) where
par f ma mb = do
va <- makeVar
vb <- makeVar
c1 <- forkAff (putOrKill va =<< attempt ma)
c2 <- forkAff (putOrKill vb =<< attempt mb)
f <$> (takeVar va) <*> (takeVar vb)
where
putOrKill :: forall a. AVar a -> Either Error a -> Aff e Unit
putOrKill v = either (killVar v) (putVar v)

instance monadRaceAff :: MonadRace (Aff e) where
stall = throwError $ error "Stalled"
race a1 a2 = do
va <- makeVar -- the `a` value
ve <- makeVar -- the error count (starts at 0)
putVar ve 0
c1 <- forkAff $ either (maybeKill va ve) (putVar va) =<< attempt a1
c2 <- forkAff $ either (maybeKill va ve) (putVar va) =<< attempt a2
takeVar va `cancelWith` (c1 <> c2)
where
maybeKill :: forall a. AVar a -> AVar Int -> Error -> Aff e Unit
maybeKill va ve err = do
e <- takeVar ve
if e == 1 then killVar va err else pure unit
putVar ve (e + 1)

makeVar :: forall e a. Aff e (AVar a)
makeVar = fromAVBox $ _makeVar nonCanceler

takeVar :: forall e a. AVar a -> Aff e a
takeVar q = fromAVBox $ runFn2 _takeVar nonCanceler q

putVar :: forall e a. AVar a -> a -> Aff e Unit
putVar q a = fromAVBox $ runFn3 _putVar nonCanceler q a

killVar :: forall e a. AVar a -> Error -> Aff e Unit
killVar q e = fromAVBox $ runFn3 _killVar nonCanceler q e

fromAVBox :: forall a e. AVBox a -> Aff e a
fromAVBox = unsafeCoerce

foreign import _cancelWith :: forall e a. Fn3 (Canceler e) (Aff e a) (Canceler e) (Aff e a)

foreign import _setTimeout :: forall e a. Fn3 (Canceler e) Int (Aff e a) (Aff e a)
Expand Down
39 changes: 18 additions & 21 deletions src/Control/Monad/Aff/AVar.purs
Original file line number Diff line number Diff line change
@@ -1,32 +1,34 @@
-- | A low-level primitive for building asynchronous code.
module Control.Monad.Aff.AVar
( AffAVar()
, AVar()
, AVAR()
, killVar
( AffAVar
, AVAR
, makeVar
, makeVar'
, modifyVar
, putVar
, takeVar
, putVar
, modifyVar
, killVar
, module Exports
) where

import Prelude

import Control.Monad.Aff (Aff(), Canceler(), nonCanceler)
import Control.Monad.Aff (Aff, nonCanceler)
import Control.Monad.Aff.Internal (AVar) as Exports
import Control.Monad.Aff.Internal (AVBox, AVar, _killVar, _putVar, _takeVar, _makeVar)
import Control.Monad.Eff.Exception (Error())

import Data.Function.Uncurried (Fn2(), Fn3(), runFn2, runFn3)
import Data.Function.Uncurried (runFn3, runFn2)

foreign import data AVAR :: !
import Unsafe.Coerce (unsafeCoerce)

foreign import data AVar :: * -> *
foreign import data AVAR :: !

type AffAVar e a = Aff (avar :: AVAR | e) a

-- | Makes a new asynchronous avar.
makeVar :: forall e a. AffAVar e (AVar a)
makeVar = _makeVar nonCanceler
makeVar = fromAVBox $ _makeVar nonCanceler

-- | Makes a avar and sets it to some value.
makeVar' :: forall e a. a -> AffAVar e (AVar a)
Expand All @@ -37,25 +39,20 @@ makeVar' a = do

-- | Takes the next value from the asynchronous avar.
takeVar :: forall e a. AVar a -> AffAVar e a
takeVar q = runFn2 _takeVar nonCanceler q
takeVar q = fromAVBox $ runFn2 _takeVar nonCanceler q

-- | Puts a new value into the asynchronous avar. If the avar has
-- | been killed, this will result in an error.
putVar :: forall e a. AVar a -> a -> AffAVar e Unit
putVar q a = runFn3 _putVar nonCanceler q a
putVar q a = fromAVBox $ runFn3 _putVar nonCanceler q a

-- | Modifies the value at the head of the avar (will suspend until one is available).
modifyVar :: forall e a. (a -> a) -> AVar a -> AffAVar e Unit
modifyVar f v = takeVar v >>= (f >>> putVar v)

-- | Kills an asynchronous avar.
killVar :: forall e a. AVar a -> Error -> AffAVar e Unit
killVar q e = runFn3 _killVar nonCanceler q e

foreign import _makeVar :: forall e a. Canceler e -> AffAVar e (AVar a)

foreign import _takeVar :: forall e a. Fn2 (Canceler e) (AVar a) (AffAVar e a)

foreign import _putVar :: forall e a. Fn3 (Canceler e) (AVar a) a (AffAVar e Unit)
killVar q e = fromAVBox $ runFn3 _killVar nonCanceler q e

foreign import _killVar :: forall e a. Fn3 (Canceler e) (AVar a) Error (AffAVar e Unit)
fromAVBox :: forall a e. AVBox a -> AffAVar e a
fromAVBox = unsafeCoerce
File renamed without changes.
26 changes: 26 additions & 0 deletions src/Control/Monad/Aff/Internal.purs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
module Control.Monad.Aff.Internal
( AVBox
, AVar
, _makeVar
, _takeVar
, _putVar
, _killVar
) where

import Prelude

import Control.Monad.Eff.Exception (Error)

import Data.Function.Uncurried (Fn2, Fn3)

foreign import data AVar :: * -> *

foreign import data AVBox :: * -> *

foreign import _makeVar :: forall c a. c -> AVBox (AVar a)

foreign import _takeVar :: forall c a. Fn2 c (AVar a) (AVBox a)

foreign import _putVar :: forall c a. Fn3 c (AVar a) a (AVBox Unit)

foreign import _killVar :: forall c a. Fn3 c (AVar a) Error (AVBox Unit)
Loading