-
Notifications
You must be signed in to change notification settings - Fork 126
/
ProtocolPipelining.hs
145 lines (128 loc) · 5.67 KB
/
ProtocolPipelining.hs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE RecordWildCards #-}
-- |A module for automatic, optimal protocol pipelining.
--
-- Protocol pipelining is a technique in which multiple requests are written
-- out to a single socket without waiting for the corresponding responses.
-- The pipelining of requests results in a dramatic improvement in protocol
-- performance.
--
-- [Optimal Pipelining] uses the least number of network packets possible
--
-- [Automatic Pipelining] means that requests are implicitly pipelined as much
-- as possible, i.e. as long as a request's response is not used before any
-- subsequent requests.
--
module Database.Redis.ProtocolPipelining (
Connection,
connect, enableTLS, beginReceiving, disconnect, request, send, recv, flush, fromCtx
) where
import Prelude
import Control.Monad
import qualified Scanner
import qualified Data.ByteString as S
import Data.IORef
import qualified Network.Socket as NS
import qualified Network.TLS as TLS
import System.IO.Unsafe
import Database.Redis.Protocol
import qualified Database.Redis.ConnectionContext as CC
data Connection = Conn
{ connCtx :: CC.ConnectionContext -- ^ Connection socket-handle.
, connReplies :: IORef [Reply] -- ^ Reply thunks for unsent requests.
, connPending :: IORef [Reply]
-- ^ Reply thunks for requests "in the pipeline". Refers to the same list as
-- 'connReplies', but can have an offset.
, connPendingCnt :: IORef Int
-- ^ Number of pending replies and thus the difference length between
-- 'connReplies' and 'connPending'.
-- length connPending - pendingCount = length connReplies
}
fromCtx :: CC.ConnectionContext -> IO Connection
fromCtx ctx = Conn ctx <$> newIORef [] <*> newIORef [] <*> newIORef 0
connect :: NS.HostName -> CC.PortID -> Maybe Int -> IO Connection
connect hostName portId timeoutOpt = do
connCtx <- CC.connect hostName portId timeoutOpt
connReplies <- newIORef []
connPending <- newIORef []
connPendingCnt <- newIORef 0
return Conn{..}
enableTLS :: TLS.ClientParams -> Connection -> IO Connection
enableTLS tlsParams conn@Conn{..} = do
newCtx <- CC.enableTLS tlsParams connCtx
return conn{connCtx = newCtx}
beginReceiving :: Connection -> IO ()
beginReceiving conn = do
rs <- connGetReplies conn
writeIORef (connReplies conn) rs
writeIORef (connPending conn) rs
disconnect :: Connection -> IO ()
disconnect Conn{..} = CC.disconnect connCtx
-- |Write the request to the socket output buffer, without actually sending.
-- The 'Handle' is 'hFlush'ed when reading replies from the 'connCtx'.
send :: Connection -> S.ByteString -> IO ()
send Conn{..} s = do
CC.send connCtx s
-- Signal that we expect one more reply from Redis.
n <- atomicModifyIORef' connPendingCnt $ \n -> let n' = n+1 in (n', n')
-- Limit the "pipeline length". This is necessary in long pipelines, to avoid
-- thunk build-up, and thus space-leaks.
-- TODO find smallest max pending with good-enough performance.
when (n >= 1000) $ do
-- Force oldest pending reply.
r:_ <- readIORef connPending
r `seq` return ()
-- |Take a reply-thunk from the list of future replies.
recv :: Connection -> IO Reply
recv Conn{..} = do
(r:rs) <- readIORef connReplies
writeIORef connReplies rs
return r
-- | Flush the socket. Normally, the socket is flushed in 'recv' (actually 'conGetReplies'), but
-- for the multithreaded pub/sub code, the sending thread needs to explicitly flush the subscription
-- change requests.
flush :: Connection -> IO ()
flush Conn{..} = CC.flush connCtx
-- |Send a request and receive the corresponding reply
request :: Connection -> S.ByteString -> IO Reply
request conn req = send conn req >> recv conn
-- |A list of all future 'Reply's of the 'Connection'.
--
-- The spine of the list can be evaluated without forcing the replies.
--
-- Evaluating/forcing a 'Reply' from the list will 'unsafeInterleaveIO' the
-- reading and parsing from the 'connCtx'. To ensure correct ordering, each
-- Reply first evaluates (and thus reads from the network) the previous one.
--
-- 'unsafeInterleaveIO' only evaluates it's result once, making this function
-- thread-safe. 'Handle' as implemented by GHC is also threadsafe, it is safe
-- to call 'hFlush' here. The list constructor '(:)' must be called from
-- /within/ unsafeInterleaveIO, to keep the replies in correct order.
connGetReplies :: Connection -> IO [Reply]
connGetReplies conn@Conn{..} = go S.empty (SingleLine "previous of first")
where
go rest previous = do
-- lazy pattern match to actually delay the receiving
~(r, rest') <- unsafeInterleaveIO $ do
-- Force previous reply for correct order.
previous `seq` return ()
scanResult <- Scanner.scanWith readMore reply rest
case scanResult of
Scanner.Fail{} -> CC.errConnClosed
Scanner.More{} -> error "Hedis: parseWith returned Partial"
Scanner.Done rest' r -> do
-- r is the same as 'head' of 'connPending'. Since we just
-- received r, we remove it from the pending list.
atomicModifyIORef' connPending $ \case
(_:rs) -> (rs, ())
[] -> error "Hedis: impossible happened parseWith missing value that it just received"
-- We now expect one less reply from Redis. We don't count to
-- negative, which would otherwise occur during pubsub.
atomicModifyIORef' connPendingCnt $ \n -> (max 0 (n-1), ())
return (r, rest')
rs <- unsafeInterleaveIO (go rest' r)
return (r:rs)
readMore = CC.ioErrorToConnLost $ do
flush conn
CC.recv connCtx