@@ -160,16 +160,20 @@ import Prelude hiding (catch)
160
160
#endif
161
161
162
162
import Control.Monad.IO.Class (liftIO )
163
- import Control.Applicative ((<$>) , (<*) )
163
+ import Control.Applicative ((<$>) )
164
164
import Control.Monad.Reader (ask )
165
- import Control.Concurrent (myThreadId )
166
165
import Control.Concurrent.MVar
167
- ( newMVar
168
- , newEmptyMVar
169
- , tryTakeMVar
166
+ ( newEmptyMVar
170
167
, takeMVar
171
168
, putMVar
172
169
)
170
+ import Control.Concurrent.STM (atomically )
171
+ import Control.Concurrent.STM.TMVar
172
+ ( TMVar
173
+ , newEmptyTMVarIO
174
+ , putTMVar
175
+ , takeTMVar
176
+ )
173
177
import Control.Distributed.Static
174
178
( Closure
175
179
, closure
@@ -310,8 +314,7 @@ import Control.Distributed.Process.Internal.Spawn
310
314
, spawnSupervised
311
315
, call
312
316
)
313
- import Control.Exception (SomeException , throwIO , throwTo )
314
- import qualified Control.Exception as Exception
317
+ import Control.Exception (SomeException , throwIO )
315
318
316
319
-- INTERNAL NOTES
317
320
--
@@ -410,27 +413,29 @@ spawnChannelLocal proc = do
410
413
callLocal ::
411
414
Process a -- ^ Process to run
412
415
-> Process a -- ^ Value returned
413
- callLocal proc = mask $ \ release -> do
414
- lock <- liftIO $ newMVar ()
415
- tidMV <- liftIO $ newEmptyMVar
416
- mv <- liftIO newEmptyMVar
417
- _ <- spawnLocal $ mask $ \ release' -> do
418
- liftIO $ myThreadId >>= putMVar tidMV
419
- ep <- try $ release' $ proc <* liftIO (takeMVar lock)
420
- liftIO $ putMVar mv ep
421
- tid <- liftIO $ takeMVar tidMV
422
- let fetchResult =
423
- (takeMVar mv >>= either (throwIO :: SomeException -> IO a ) return )
424
- `Exception.catch`
425
- (\ e -> Exception. mask $ \ release' -> do
426
- ml <- tryTakeMVar lock
427
- case ml of
428
- Nothing -> -- lock already taked by worker so we need
429
- -- to rethrow exception now
430
- throwIO e
431
- Just {} -> do
432
- throwTo tid (e:: SomeException )
433
- putMVar lock ()
434
- release' fetchResult)
435
- release $ liftIO $ fetchResult
436
-
416
+ callLocal proc = do
417
+ parent <- getSelfPid
418
+ mv <- liftIO newEmptyTMVarIO :: Process (TMVar (Either SomeException a ))
419
+ child <- spawnLocal $ mask $ \ release -> do
420
+ link parent
421
+ ep <- try $ release $ proc
422
+ liftIO $ atomically $ putTMVar mv ep
423
+ withMonitor child $ do
424
+ fetchResult child mv `catch` (\ e -> do exit child (show (e:: SomeException ))
425
+ waitForExit child
426
+ liftIO $ throwIO e)
427
+ where
428
+ waitForExit child =
429
+ receiveWait
430
+ [ matchIf (\ (ProcessMonitorNotification _ ch _) -> child == ch)
431
+ (\ _ -> return () )
432
+ ]
433
+ fetchResult child mv = do
434
+ receiveWait
435
+ [ matchSTM (takeTMVar mv)
436
+ (\ rs -> do waitForExit child -- avoid monitor events leak
437
+ liftIO $ either throwIO return rs)
438
+ , matchIf (\ (ProcessMonitorNotification _ ch _) -> child == ch)
439
+ (\ (ProcessMonitorNotification _ _ reason) ->
440
+ fail $ " callLocal: remote process died: " ++ show reason)
441
+ ]
0 commit comments