Skip to content

Commit c5be5c8

Browse files
committed
Improve exception handling in callLocal.
callLocal had few flaws, 1. it could leak worker process in parent process received an exception but handles it. Then process was not killed, so linking worker to parent didn't help 2. it was not possible to handle asynchronous exception in worker process. This this patch both things are solved alltogether, this patch as a correct mechanism for exceptin handling, so if worker was not finished then all exceptions are rethrown to that process. In this case worker is capable for exception handling. However if worker already finished, then exception in thrown from the callLocal code. The resulting semantics is the same as wheb code is executed in a single process.
1 parent db5aa53 commit c5be5c8

File tree

1 file changed

+34
-11
lines changed

1 file changed

+34
-11
lines changed

src/Control/Distributed/Process.hs

Lines changed: 34 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -160,9 +160,16 @@ import Prelude hiding (catch)
160160
#endif
161161

162162
import Control.Monad.IO.Class (liftIO)
163-
import Control.Applicative ((<$>))
163+
import Control.Applicative ((<$>), (<*))
164164
import Control.Monad.Reader (ask)
165-
import Control.Concurrent.MVar (newEmptyMVar, takeMVar, putMVar)
165+
import Control.Concurrent (myThreadId)
166+
import Control.Concurrent.MVar
167+
( newMVar
168+
, newEmptyMVar
169+
, tryTakeMVar
170+
, takeMVar
171+
, putMVar
172+
)
166173
import Control.Distributed.Static
167174
( Closure
168175
, closure
@@ -303,7 +310,8 @@ import Control.Distributed.Process.Internal.Spawn
303310
, spawnSupervised
304311
, call
305312
)
306-
import Control.Exception (SomeException, throwIO)
313+
import Control.Exception (SomeException, throwIO, throwTo)
314+
import qualified Control.Exception as Exception
307315

308316
-- INTERNAL NOTES
309317
--
@@ -402,12 +410,27 @@ spawnChannelLocal proc = do
402410
callLocal ::
403411
Process a -- ^ Process to run
404412
-> Process a -- ^ Value returned
405-
callLocal proc = do
406-
mv <- liftIO newEmptyMVar
407-
self <- getSelfPid
408-
_ <- spawnLocal $ do
409-
link self
410-
try proc >>= liftIO . putMVar mv
411-
liftIO $ takeMVar mv >>=
412-
either (throwIO :: SomeException -> IO a) return
413+
callLocal proc = mask $ \release -> do
414+
lock <- liftIO $ newMVar ()
415+
tidMV <- liftIO $ newEmptyMVar
416+
mv <- liftIO newEmptyMVar
417+
_ <- spawnLocal $ mask $ \release' -> do
418+
liftIO $ myThreadId >>= putMVar tidMV
419+
ep <- try $ release' $ proc <* liftIO (takeMVar lock)
420+
liftIO $ putMVar mv ep
421+
tid <- liftIO $ takeMVar tidMV
422+
let fetchResult =
423+
(takeMVar mv >>= either (throwIO :: SomeException -> IO a) return)
424+
`Exception.catch`
425+
(\e -> Exception.mask $ \release' -> do
426+
ml <- tryTakeMVar lock
427+
case ml of
428+
Nothing -> -- lock already taked by worker so we need
429+
-- to rethrow exception now
430+
throwIO e
431+
Just{} -> do
432+
throwTo tid (e::SomeException)
433+
putMVar lock ()
434+
release' fetchResult)
435+
release $ liftIO $ fetchResult
413436

0 commit comments

Comments
 (0)