Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 16 additions & 4 deletions src/Streamly/Internal/Data/Fold.hs
Original file line number Diff line number Diff line change
Expand Up @@ -1261,8 +1261,14 @@ toParallelSVar svar winfo = Fold step initial extract
step () x = liftIO $ do
-- XXX we can have a separate fold for unlimited buffer case to avoid a
-- branch in the step here.
decrementBufferLimit svar
void $ send svar (ChildYield x)
case maxBufferLimit svar of
BufferUnlimited ->
void $ send svar (ChildYield x)
BufferLast ->
void $ sendReplace svar (ChildYield x)
BufferLimited _ policy -> do
decrementBufferLimit svar policy
void $ send svar (ChildYield x)

extract () = liftIO $ do
sendStop svar winfo
Expand All @@ -1279,8 +1285,14 @@ toParallelSVarLimited svar winfo = Fold step initial extract
yieldLimitOk <- decrementYieldLimit svar
if yieldLimitOk
then do
decrementBufferLimit svar
void $ send svar (ChildYield x)
case maxBufferLimit svar of
BufferUnlimited ->
void $ send svar (ChildYield x)
BufferLast ->
void $ sendReplace svar (ChildYield x)
BufferLimited _ policy -> do
decrementBufferLimit svar policy
void $ send svar (ChildYield x)
return True
else do
cleanupSVarFromWorker svar
Expand Down
127 changes: 89 additions & 38 deletions src/Streamly/Internal/Data/SVar.hs
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,15 @@ module Streamly.Internal.Data.SVar

-- State threaded around the stream
, Limit (..)
, BufferStyle (..)
, State (streamVar)
, defState
, adaptState
, getMaxThreads
, setMaxThreads
, getMaxBuffer
, getBufferStyle
, setBufferStyle
, setMaxBuffer
, getStreamRate
, setStreamRate
Expand All @@ -62,6 +65,7 @@ module Streamly.Internal.Data.SVar
, ChildEvent (..)
, AheadHeapEntry (..)
, send
, sendReplace
, sendToProducer
, sendYield
, sendStop
Expand Down Expand Up @@ -387,11 +391,27 @@ data SVarStopStyle =
-- XXX Maybe we can separate the implementation in two different types instead
-- of using a common SVar type.
--
data PushBufferPolicy =
data BufferOverflowPolicy =
PushBufferDropNew -- drop the latest element and continue
| PushBufferDropOld -- drop the oldest element and continue
| PushBufferBlock -- block the thread until space
-- becomes available
| PushBufferToFile String -- Append the buffer to a file on disk
-- The String is the filename prefix, two files
-- are used, <filename>1 and <filename>2. While
-- the consumer is consuming from one file the
-- producers are writing to the other file. The
-- current Index ownership is maintained in the
-- SVar.
deriving (Show)

-- XXX in general, instead of just the last event we can store last N events in
-- the SVar, we can have a BufferLastN case.
data BufferStyle
= BufferUnlimited
| BufferLast -- Buffer only the latest element
| BufferLimited Word BufferOverflowPolicy
deriving (Show)

-- IMPORTANT NOTE: we cannot update the SVar after generating it as we have
-- references to the original SVar stored in several functions which will keep
Expand Down Expand Up @@ -441,10 +461,9 @@ data SVar t m a = SVar
-- potentially each worker may yield one value to the buffer in the worst
-- case exceeding the requested buffer size.
, maxWorkerLimit :: Limit
, maxBufferLimit :: Limit
-- These two are valid and used only when maxBufferLimit is Limited.
, maxBufferLimit :: BufferStyle
-- This is valid and used only when maxBufferLimit is BufferLimited.
, pushBufferSpace :: IORef Count
, pushBufferPolicy :: PushBufferPolicy
-- [LOCKING] The consumer puts this MVar after emptying the buffer, workers
-- block on it when the buffer becomes full. No overhead unless the buffer
-- becomes full.
Expand Down Expand Up @@ -505,7 +524,7 @@ data State t m a = State
-- persistent configuration, state that remains valid until changed by
-- an explicit setting via a combinator.
, _threadsHigh :: Limit
, _bufferHigh :: Limit
, _bufferHigh :: BufferStyle
-- XXX these two can be collapsed into a single type
, _streamLatency :: Maybe NanoSecond64 -- bootstrap latency
, _maxStreamRate :: Maybe Rate
Expand All @@ -523,9 +542,11 @@ data State t m a = State
magicMaxBuffer :: Word
magicMaxBuffer = 1500

defaultMaxThreads, defaultMaxBuffer :: Limit
defaultMaxThreads :: Limit
defaultMaxThreads = Limited magicMaxBuffer
defaultMaxBuffer = Limited magicMaxBuffer

defaultMaxBuffer :: BufferStyle
defaultMaxBuffer = BufferLimited magicMaxBuffer PushBufferBlock

-- The fields prefixed by an _ are not to be accessed or updated directly but
-- via smart accessor APIs.
Expand Down Expand Up @@ -592,18 +613,27 @@ setMaxThreads n st =
getMaxThreads :: State t m a -> Limit
getMaxThreads = _threadsHigh

setBufferStyle :: BufferStyle -> State t m a -> State t m a
setBufferStyle style st = st { _bufferHigh = style }

setMaxBuffer :: Int -> State t m a -> State t m a
setMaxBuffer n st =
st { _bufferHigh =
if n < 0
then Unlimited
else if n == 0
then defaultMaxBuffer
else Limited (fromIntegral n)
}
setMaxBuffer n = setBufferStyle style
where
style =
if n < 0
then BufferUnlimited
else if n == 0
then defaultMaxBuffer
else BufferLimited (fromIntegral n) PushBufferBlock

getBufferStyle :: State t m a -> BufferStyle
getBufferStyle = _bufferHigh

getMaxBuffer :: State t m a -> Limit
getMaxBuffer = _bufferHigh
getMaxBuffer st =
case getBufferStyle st of
BufferLimited n _ -> Limited n
_ -> Unlimited

setStreamRate :: Maybe Rate -> State t m a -> State t m a
setStreamRate r st = st { _maxStreamRate = r }
Expand Down Expand Up @@ -1000,18 +1030,18 @@ incrementYieldLimit sv =

-- XXX Only yields should be counted in the buffer limit and not the Stop
-- events.
--
-- XXX we can parameterize the SVar with a buffer type to reduce the runtime
-- overhead of determining the buffer type before queuing the elements.

{-# INLINE decrementBufferLimit #-}
decrementBufferLimit :: SVar t m a -> IO ()
decrementBufferLimit sv =
case maxBufferLimit sv of
Unlimited -> return ()
Limited _ -> do
decrementBufferLimit :: SVar t m a -> BufferOverflowPolicy -> IO ()
decrementBufferLimit sv policy = do
let ref = pushBufferSpace sv
old <- atomicModifyIORefCAS ref $ \x ->
(if x >= 1 then x - 1 else x, x)
when (old <= 0) $
case pushBufferPolicy sv of
case policy of
PushBufferBlock -> blockAndRetry
PushBufferDropNew -> do
-- We just drop one item and proceed. It is possible
Expand All @@ -1031,6 +1061,7 @@ decrementBufferLimit sv =
when block blockAndRetry
-- XXX need a dequeue or ring buffer for this
PushBufferDropOld -> undefined
PushBufferToFile _ -> undefined

where

Expand All @@ -1053,19 +1084,19 @@ decrementBufferLimit sv =
incrementBufferLimit :: SVar t m a -> IO ()
incrementBufferLimit sv =
case maxBufferLimit sv of
Unlimited -> return ()
Limited _ -> do
BufferLimited _ _ -> do
atomicModifyIORefCAS_ (pushBufferSpace sv) (+ 1)
writeBarrier
void $ liftIO $ tryPutMVar (pushBufferMVar sv) ()
_ -> return ()

{-# INLINE resetBufferLimit #-}
resetBufferLimit :: SVar t m a -> IO ()
resetBufferLimit sv =
case maxBufferLimit sv of
Unlimited -> return ()
Limited n -> atomicModifyIORefCAS_ (pushBufferSpace sv)
(const (fromIntegral n))
BufferLimited n _ -> atomicModifyIORefCAS_ (pushBufferSpace sv)
(const (fromIntegral n))
_ -> return ()

{-# INLINE sendWithDoorBell #-}
sendWithDoorBell ::
Expand All @@ -1092,6 +1123,27 @@ sendWithDoorBell q bell msg = do
send :: SVar t m a -> ChildEvent a -> IO Int
send sv msg = sendWithDoorBell (outputQueue sv) (outputDoorBell sv) msg

-- | Just replace the previous value in the buffer.
sendReplace :: SVar t m a -> ChildEvent a -> IO ()
sendReplace sv msg = do
-- XXX we can use a nonlist buffer to make it faster, we do not need a
-- tuple here, for Prim/Storable streams we can also avoid using an IORef
-- we can just use an unboxed reference.
let q = outputQueue sv
oldlen <- atomicModifyIORefCAS q $ \(_, n) -> (([msg], 1), n)
when (oldlen <= 0) $ do
-- The wake up must happen only after the store has finished otherwise
-- we can have lost wakeup problems.
writeBarrier
-- Since multiple workers can try this at the same time, it is possible
-- that we may put a spurious MVar after the consumer has already seen
-- the output. But that's harmless, at worst it may cause the consumer
-- to read the queue again and find it empty.
-- The important point is that the consumer is guaranteed to receive a
-- doorbell if something was added to the queue after it empties it.
let bell = outputDoorBell sv
void $ tryPutMVar bell ()

-- There is no bound implemented on the buffer, this is assumed to be low
-- traffic.
sendToProducer :: SVar t m a -> ChildEvent a -> IO Int
Expand Down Expand Up @@ -1206,10 +1258,10 @@ sendYield sv mwinfo msg = do
oldlen <- send sv msg
let limit = maxBufferLimit sv
bufferSpaceOk <- case limit of
Unlimited -> return True
Limited lim -> do
BufferLimited lim _ -> do
active <- readIORef (workerCount sv)
return $ (oldlen + 1) < (fromIntegral lim - active)
_ -> return True
rateLimitOk <-
case mwinfo of
Just winfo ->
Expand Down Expand Up @@ -2256,9 +2308,8 @@ getAheadSVar st f mrun = do
{ outputQueue = outQ
, outputQueueFromConsumer = undefined
, remainingWork = yl
, maxBufferLimit = getMaxBuffer st
, maxBufferLimit = getBufferStyle st
, pushBufferSpace = undefined
, pushBufferPolicy = undefined
, pushBufferMVar = undefined
, maxWorkerLimit = min (getMaxThreads st) (getMaxBuffer st)
, yieldRateInfo = rateInfo
Expand Down Expand Up @@ -2334,11 +2385,12 @@ getParallelSVar ss st mrun = do
Nothing -> return Nothing
Just x -> Just <$> newIORef x
rateInfo <- getYieldRateInfo st
let bufLim =
case getMaxBuffer st of
Unlimited -> undefined
Limited x -> (fromIntegral x)
remBuf <- newIORef bufLim
let bufSpace =
case getBufferStyle st of
BufferUnlimited -> undefined
BufferLast -> undefined
BufferLimited x _ -> fromIntegral x
remBuf <- newIORef bufSpace
pbMVar <- newMVar ()

stats <- newSVarStats
Expand All @@ -2353,9 +2405,8 @@ getParallelSVar ss st mrun = do
SVar { outputQueue = outQ
, outputQueueFromConsumer = outQRev
, remainingWork = yl
, maxBufferLimit = getMaxBuffer st
, maxBufferLimit = getBufferStyle st
, pushBufferSpace = remBuf
, pushBufferPolicy = PushBufferBlock
, pushBufferMVar = pbMVar
, maxWorkerLimit = Unlimited
-- Used only for diagnostics
Expand Down
4 changes: 2 additions & 2 deletions src/Streamly/Internal/Data/Stream/Ahead.hs
Original file line number Diff line number Diff line change
Expand Up @@ -143,9 +143,9 @@ underMaxHeap sv hp = do

-- XXX simplify this
let maxHeap = case maxBufferLimit sv of
Limited lim -> Limited $
BufferLimited lim _ -> Limited $
max 0 (lim - fromIntegral len)
Unlimited -> Unlimited
_ -> Unlimited

case maxHeap of
Limited lim -> do
Expand Down
6 changes: 2 additions & 4 deletions src/Streamly/Internal/Data/Stream/Async.hs
Original file line number Diff line number Diff line change
Expand Up @@ -295,9 +295,8 @@ getLifoSVar st mrun = do
{ outputQueue = outQ
, outputQueueFromConsumer = undefined
, remainingWork = yl
, maxBufferLimit = getMaxBuffer st
, maxBufferLimit = getBufferStyle st
, pushBufferSpace = undefined
, pushBufferPolicy = undefined
, pushBufferMVar = undefined
, maxWorkerLimit = min (getMaxThreads st) (getMaxBuffer st)
, yieldRateInfo = rateInfo
Expand Down Expand Up @@ -392,9 +391,8 @@ getFifoSVar st mrun = do
{ outputQueue = outQ
, outputQueueFromConsumer = undefined
, remainingWork = yl
, maxBufferLimit = getMaxBuffer st
, maxBufferLimit = getBufferStyle st
, pushBufferSpace = undefined
, pushBufferPolicy = undefined
, pushBufferMVar = undefined
, maxWorkerLimit = min (getMaxThreads st) (getMaxBuffer st)
, yieldRateInfo = rateInfo
Expand Down
9 changes: 9 additions & 0 deletions src/Streamly/Internal/Data/Stream/Combinators.hs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,15 @@ maxThreadsSerial :: Int -> SerialT m a -> SerialT m a
maxThreadsSerial _ = id
-}

-- XXX The actual buffer size can be double of the specified value because the
-- consumer thread takes the whole buffer in one go and decrements the used
-- buffer space to 0. Since the full buffer space is now available to the
-- producers they can again fill it even though the consumer may not yet have
-- actually consumed any of the previous items. So the actual buffer is in the
-- range n and 2n where n is the buffer size specified by the user. We can make
-- this precise by having the consumer also modify the buffer count, but then
-- there will be more lock contention.
--
-- | Specify the maximum size of the buffer for storing the results from
-- concurrent computations. If the buffer becomes full we stop spawning more
-- concurrent tasks until there is space in the buffer.
Expand Down
Loading