Skip to content

Commit 3f051a7

Browse files
committed
Introduce chunking in the logs.
In the background logger we introduce chunking, a process when we read a number of messages from the queue and process that chunk and flush afterwards. It approach can lead to better performance, for example if you have hLogStdout, and hFlush as a flush you may may still use BufferredHandle instead of line one, without introducing delays in writing logs. Currently there are 2 ways to write consume log messages: 1. old one (Just 1) in limit parameter of the Capacity - each message if written and processed one by one 2. `Nothing` unbounded (actually bounded by the TQueue size) 3. `Just i` bounded Default choice is `Just 32`, it's an arbitrary decision that works in practice for our company, but we have not spend much time investigating.
1 parent 8d73fb3 commit 3f051a7

File tree

4 files changed

+47
-16
lines changed

4 files changed

+47
-16
lines changed

CHANGELOG.markdown

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66

77
- Introduce mkCapacity function, as previously it was not possible
88
to create custom capacity.
9+
- Fork background logger now takes an explicit flush action. And
10+
reads logs in chunks.
911

1012
## 0.4.0.0 -- 2020-04-18
1113

co-log-concurrent.cabal

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
cabal-version: 2.4
22
name: co-log-concurrent
3-
version: 0.4.0.0
3+
version: 0.5.0.0
44
synopsis: Asynchronous backend for co-log library
55
description: Buiding block for writing asynchronous logger pipelines.
66
homepage: https://github.com/qnikst/co-log-concurrent/

src/Colog/Concurrent.hs

Lines changed: 32 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -53,14 +53,15 @@ module Colog.Concurrent
5353
-- $worker-thread-usage
5454
) where
5555

56-
import Control.Applicative (many)
56+
import Control.Applicative (many, (<|>), some)
5757
import Control.Concurrent (forkFinally, killThread)
58-
import Control.Concurrent.STM (atomically, check, newTVarIO, readTVar, writeTVar)
58+
import Control.Concurrent.STM (STM, atomically, check, newTVarIO, readTVar, writeTVar)
5959
import Control.Concurrent.STM.TBQueue (newTBQueueIO, readTBQueue, writeTBQueue)
6060
import Control.Exception (bracket, finally)
6161
import Control.Monad (forever, join)
6262
import Control.Monad.IO.Class (MonadIO (..))
6363
import Data.Foldable (for_)
64+
import Numeric.Natural (Natural)
6465

6566
import Colog.Concurrent.Internal (BackgroundWorker (..), Capacity (..), mkCapacity)
6667
import Colog.Core.Action (LogAction (..))
@@ -140,12 +141,16 @@ See 'forkBackgroundLogger' for more details.
140141
application state or thread info, so you should only pass methods that serialize
141142
and dump data there.
142143
144+
@IO ()@ - flush provides a function to flush all the logs, it allows flush logs
145+
by chunks, so @LogAction@ may not care about flushing.
146+
143147
@
144148
main :: IO ()
145149
main =
146150
'withBackgroundLogger'
147151
'defCapacity'
148152
'Colog.Actions.logByteStringStdout'
153+
'(pure ())
149154
(\log -> 'Colog.Monad.usingLoggerT' log $ __do__
150155
'Colog.Monad.logMsg' \@ByteString "Starting application..."
151156
'Colog.Monad.logMsg' \@ByteString "Finishing application..."
@@ -156,16 +161,17 @@ withBackgroundLogger
156161
:: MonadIO m
157162
=> Capacity -- ^ Capacity of messages to handle; bounded channel size
158163
-> LogAction IO msg -- ^ Action that will be used in a forked thread
164+
-> IO () -- ^ Action to flush logs
159165
-> (LogAction m msg -> IO a) -- ^ Continuation action
160166
-> IO a
161-
withBackgroundLogger cap logger action =
162-
bracket (forkBackgroundLogger cap logger)
167+
withBackgroundLogger cap logger flush action =
168+
bracket (forkBackgroundLogger cap logger flush)
163169
killBackgroundLogger
164170
(action . convertToLogAction)
165171

166172
-- | Default capacity size, (4096)
167173
defCapacity :: Capacity
168-
defCapacity = Capacity 4096
174+
defCapacity = Capacity 4096 (Just 32)
169175

170176

171177
{- $extended-api
@@ -218,19 +224,30 @@ __N.B.__ On exit, even in case of exception thread will dump all values
218224
that are in the queue. But it will stop doing that in case if another
219225
exception will happen.
220226
-}
221-
forkBackgroundLogger :: Capacity -> LogAction IO msg -> IO (BackgroundWorker msg)
222-
forkBackgroundLogger (Capacity cap) logAction = do
227+
forkBackgroundLogger :: Capacity -> LogAction IO msg -> IO () -> IO (BackgroundWorker msg)
228+
forkBackgroundLogger (Capacity cap lim) logAction flush = do
223229
queue <- newTBQueueIO cap
224230
isAlive <- newTVarIO True
225231
tid <- forkFinally
226232
(forever $ do
227-
msg <- atomically $ readTBQueue queue
228-
unLogAction logAction msg)
233+
msgs <- atomically $ fetch $ readTBQueue queue
234+
for_ msgs $ unLogAction logAction
235+
flush)
229236
(\_ ->
230237
(do msgs <- atomically $ many $ readTBQueue queue
231-
for_ msgs $ unLogAction logAction)
238+
for_ msgs $ unLogAction logAction
239+
flush)
232240
`finally` atomically (writeTVar isAlive False))
233241
pure $ BackgroundWorker tid (writeTBQueue queue) isAlive
242+
where
243+
fetch
244+
| Just n <- lim = someN n
245+
| otherwise = some
246+
someN :: Natural -> STM a -> STM [a]
247+
someN 0 _ = pure []
248+
someN n f = (:) <$> f <*> go n where
249+
go 0 = pure []
250+
go k = ((:) <$> f <*> go (k-1)) <|> pure []
234251

235252

236253
{- | Convert a given 'BackgroundWorker msg' into a 'LogAction msg'
@@ -280,9 +297,13 @@ happening.
280297
When closed it will dump all pending messages, unless
281298
another asynchronous exception will arrive, or synchronous
282299
exception will happen during the logging.
300+
301+
Note. Limit parameter of capacity is ignored here as the function
302+
performs IO actions and seems that doesn't benefit from the chunking.
303+
However it may change in the future versions if proved to be wrong.
283304
-}
284305
mkBackgroundThread :: Capacity -> IO (BackgroundWorker (IO ()))
285-
mkBackgroundThread (Capacity cap) = do
306+
mkBackgroundThread (Capacity cap _lim) = do
286307
queue <- newTBQueueIO cap
287308
isAlive <- newTVarIO True
288309
tid <- forkFinally

src/Colog/Concurrent/Internal.hs

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,16 +24,24 @@ import Numeric.Natural (Natural)
2424
differrent for the different GHC versions.
2525
-}
2626
#if MIN_VERSION_stm(2,5,0)
27-
newtype Capacity = Capacity Natural
27+
data Capacity = Capacity Natural (Maybe Natural)
2828
#else
29-
newtype Capacity = Capacity Int
29+
data Capacity = Capacity Int (Maybe Natural)
3030
#endif
3131

3232
-- | Create new capacity.
3333
--
3434
-- @since 0.5.0.0
35-
mkCapacity :: Natural -> Capacity
36-
mkCapacity = Capacity . fromIntegral
35+
mkCapacity
36+
:: Natural -- ^ Size of the queue. Number of the messages in flight
37+
-> Maybe Natural -- ^ Maximum number of messages that logger can read in a chunk.
38+
-> Capacity
39+
mkCapacity n = Capacity (mk n) where
40+
#if MIN_VERSION_stm(2,5,0)
41+
mk = id
42+
#else
43+
mk = fromIntegral
44+
#endif
3745

3846
{- | Wrapper for the background thread that may receive messages to
3947
process.

0 commit comments

Comments
 (0)