|
| 1 | +{-# LANGUAGE ConstraintKinds #-} |
| 2 | +{-# LANGUAGE GADTs #-} |
| 3 | +{-# LANGUAGE RecordWildCards #-} |
| 4 | +{-# LANGUAGE ScopedTypeVariables #-} |
| 5 | +{-# LANGUAGE StandaloneDeriving #-} |
| 6 | +{-# LANGUAGE StandaloneKindSignatures #-} |
| 7 | +{-# LANGUAGE TupleSections #-} |
| 8 | +{-# LANGUAGE TypeApplications #-} |
| 9 | + |
| 10 | +-- Model's 'open' and 'snapshot' have redundant constraints. |
| 11 | +{-# OPTIONS_GHC -Wno-redundant-constraints #-} |
| 12 | + |
| 13 | +-- | IO-based model implementation. |
| 14 | +-- |
| 15 | +-- Differences from the (current) real API: |
| 16 | +-- |
| 17 | +-- * `newSession` doesn't take file-system arguments. |
| 18 | +-- |
| 19 | +-- * `snapshot` and `open` require `Typeable` constraints |
| 20 | +-- |
| 21 | +module Database.LSMTree.ModelIO.Normal ( |
| 22 | + -- * Temporary placeholder types |
| 23 | + Model.SomeSerialisationConstraint (..) |
| 24 | + -- * Utility types |
| 25 | + , IOLike |
| 26 | + -- * Sessions |
| 27 | + , Session |
| 28 | + , newSession |
| 29 | + , closeSession |
| 30 | + -- * Tables |
| 31 | + , TableHandle |
| 32 | + , TableConfig (..) |
| 33 | + , new |
| 34 | + , close |
| 35 | + -- * Table querying and updates |
| 36 | + -- ** Queries |
| 37 | + , Model.Range (..) |
| 38 | + , Model.LookupResult (..) |
| 39 | + , lookups |
| 40 | + , Model.RangeLookupResult (..) |
| 41 | + , rangeLookup |
| 42 | + -- ** Updates |
| 43 | + , Model.Update (..) |
| 44 | + , updates |
| 45 | + , inserts |
| 46 | + , deletes |
| 47 | + -- ** Blobs |
| 48 | + , Model.BlobRef |
| 49 | + , retrieveBlobs |
| 50 | + -- * Snapshots |
| 51 | + , SnapshotName |
| 52 | + , snapshot |
| 53 | + , open |
| 54 | + -- * Multiple writable table handles |
| 55 | + , duplicate |
| 56 | + ) where |
| 57 | + |
| 58 | +import Control.Concurrent.Class.MonadSTM |
| 59 | +import Control.Monad (unless, void) |
| 60 | +import Data.Kind (Type) |
| 61 | +import Data.Map (Map) |
| 62 | +import qualified Data.Map.Strict as Map |
| 63 | +import Data.Typeable (Typeable, cast) |
| 64 | +import Database.LSMTree.Common (IOLike, SnapshotName, |
| 65 | + SomeSerialisationConstraint) |
| 66 | +import qualified Database.LSMTree.Model.Normal as Model |
| 67 | +import Database.LSMTree.Normal (LookupResult (..), |
| 68 | + RangeLookupResult (..), Update (..)) |
| 69 | +import GHC.IO.Exception (IOErrorType (..), IOException (..)) |
| 70 | + |
| 71 | +{------------------------------------------------------------------------------- |
| 72 | + Sessions |
| 73 | +-------------------------------------------------------------------------------} |
| 74 | + |
| 75 | +type Session :: (Type -> Type) -> Type |
| 76 | +data Session m = Session |
| 77 | + { session_open :: !(TVar m Bool) |
| 78 | + -- ^ technically we don't need to track whether session is open. |
| 79 | + -- When session is closed, all the table handles are closed as well, |
| 80 | + -- So there shouldn't be situation where we have an open table handle |
| 81 | + -- pointing to closed session. |
| 82 | + -- |
| 83 | + -- In a model we track this anyway, to catch bugs. |
| 84 | + |
| 85 | + , snapshots :: !(TVar m (Map SnapshotName SomeModelTable)) |
| 86 | + -- ^ model session keeps snapshots in memory. |
| 87 | + |
| 88 | + , counter :: !(TVar m Int) |
| 89 | + -- ^ counter for new handles. |
| 90 | + , openHandles :: !(TVar m (Map Int (STM m ()))) |
| 91 | + -- ^ actions to close each open handle |
| 92 | + } |
| 93 | + |
| 94 | +new_handle :: MonadSTM m => Session m -> TMVar m b -> STM m Int |
| 95 | +new_handle Session {..} ref = do |
| 96 | + i <- readTVar counter |
| 97 | + writeTVar counter (i + 1) |
| 98 | + modifyTVar' openHandles $ Map.insert i $ void $ tryTakeTMVar ref |
| 99 | + return i |
| 100 | + |
| 101 | +close_handle :: MonadSTM m => Session m -> Int -> STM m () |
| 102 | +close_handle Session {..} i = do |
| 103 | + modifyTVar' openHandles $ Map.delete i |
| 104 | + |
| 105 | +data SomeModelTable where |
| 106 | + SomeModelTable :: (Typeable k, Typeable v, Typeable blob) |
| 107 | + => !(Model.Table k v blob) |
| 108 | + -> SomeModelTable |
| 109 | + |
| 110 | +-- | Create either a new empty table session. |
| 111 | +newSession :: IOLike m => m (Session m) |
| 112 | +newSession = atomically $ do |
| 113 | + session_open <- newTVar True |
| 114 | + snapshots <- newTVar Map.empty |
| 115 | + counter <- newTVar 0 |
| 116 | + openHandles <- newTVar Map.empty |
| 117 | + return Session {..} |
| 118 | + |
| 119 | +-- | Close the table session. |
| 120 | +-- |
| 121 | +-- This also closes any open table handles in the session. |
| 122 | +-- |
| 123 | +closeSession :: IOLike m => Session m -> m () |
| 124 | +closeSession Session {..} = atomically $ do |
| 125 | + writeTVar session_open False |
| 126 | + hdls <- readTVar openHandles |
| 127 | + sequence_ hdls |
| 128 | + |
| 129 | +{------------------------------------------------------------------------------- |
| 130 | + Tables |
| 131 | +-------------------------------------------------------------------------------} |
| 132 | + |
| 133 | +-- | A handle to a table. |
| 134 | +type TableHandle :: (Type -> Type) -> Type -> Type -> Type -> Type |
| 135 | +data TableHandle m k v blob = TableHandle { |
| 136 | + thSession :: !(Session m) |
| 137 | + , thId :: !Int |
| 138 | + , thRef :: !(TMVar m (Model.Table k v blob)) |
| 139 | + } |
| 140 | + |
| 141 | +data TableConfig = TableConfig |
| 142 | + |
| 143 | +-- | Configs should be comparable, because only tables with the same config |
| 144 | +-- options are __compatible__. |
| 145 | +deriving instance Eq TableConfig |
| 146 | + |
| 147 | +-- | Create a new table referenced by a table handle. |
| 148 | +new :: |
| 149 | + IOLike m |
| 150 | + => Session m |
| 151 | + -> TableConfig |
| 152 | + -> m (TableHandle m k v blob) |
| 153 | +new session _config = atomically $ do |
| 154 | + ref <- newTMVar Model.empty |
| 155 | + i <- new_handle session ref |
| 156 | + return TableHandle {thSession = session, thId = i, thRef = ref } |
| 157 | + |
| 158 | +-- | Close a table handle. |
| 159 | +close :: |
| 160 | + IOLike m |
| 161 | + => TableHandle m k v blob |
| 162 | + -> m () |
| 163 | +close TableHandle {..} = atomically $ do |
| 164 | + close_handle thSession thId |
| 165 | + void $ tryTakeTMVar thRef |
| 166 | + |
| 167 | +{------------------------------------------------------------------------------- |
| 168 | + Table querying and updates |
| 169 | +-------------------------------------------------------------------------------} |
| 170 | + |
| 171 | +-- | Perform a batch of lookups. |
| 172 | +lookups :: |
| 173 | + (IOLike m, SomeSerialisationConstraint k, SomeSerialisationConstraint v) |
| 174 | + => [k] |
| 175 | + -> TableHandle m k v blob |
| 176 | + -> m [LookupResult k v (Model.BlobRef blob)] |
| 177 | +lookups ks TableHandle {..} = atomically $ |
| 178 | + withModel "lookups" thSession thRef $ \tbl -> |
| 179 | + return $ Model.lookups ks tbl |
| 180 | + |
| 181 | +-- | Perform a range lookup. |
| 182 | +rangeLookup :: |
| 183 | + (IOLike m, SomeSerialisationConstraint k, SomeSerialisationConstraint v) |
| 184 | + => Model.Range k |
| 185 | + -> TableHandle m k v blob |
| 186 | + -> m [RangeLookupResult k v (Model.BlobRef blob)] |
| 187 | +rangeLookup r TableHandle {..} = atomically $ |
| 188 | + withModel "rangeLookup" thSession thRef $ \tbl -> |
| 189 | + return $ Model.rangeLookup r tbl |
| 190 | + |
| 191 | +-- | Perform a mixed batch of inserts and deletes. |
| 192 | +updates :: |
| 193 | + ( IOLike m |
| 194 | + , SomeSerialisationConstraint k |
| 195 | + , SomeSerialisationConstraint v |
| 196 | + , SomeSerialisationConstraint blob |
| 197 | + ) |
| 198 | + => [(k, Update v blob)] |
| 199 | + -> TableHandle m k v blob |
| 200 | + -> m () |
| 201 | +updates ups TableHandle {..} = atomically $ |
| 202 | + withModel "updates" thSession thRef $ \tbl -> |
| 203 | + writeTMVar thRef $ Model.updates ups tbl |
| 204 | + |
| 205 | + |
| 206 | +-- | Perform a batch of inserts. |
| 207 | +inserts :: |
| 208 | + ( IOLike m |
| 209 | + , SomeSerialisationConstraint k |
| 210 | + , SomeSerialisationConstraint v |
| 211 | + , SomeSerialisationConstraint blob |
| 212 | + ) |
| 213 | + => [(k, v, Maybe blob)] |
| 214 | + -> TableHandle m k v blob |
| 215 | + -> m () |
| 216 | +inserts = updates . fmap (\(k, v, blob) -> (k, Model.Insert v blob)) |
| 217 | + |
| 218 | +-- | Perform a batch of deletes. |
| 219 | +deletes :: |
| 220 | + ( IOLike m |
| 221 | + , SomeSerialisationConstraint k |
| 222 | + , SomeSerialisationConstraint v |
| 223 | + , SomeSerialisationConstraint blob |
| 224 | + ) |
| 225 | + => [k] |
| 226 | + -> TableHandle m k v blob |
| 227 | + -> m () |
| 228 | +deletes = updates . fmap (,Model.Delete) |
| 229 | + |
| 230 | +-- | Perform a batch of blob retrievals. |
| 231 | +retrieveBlobs :: |
| 232 | + (IOLike m, SomeSerialisationConstraint blob) |
| 233 | + => TableHandle m k v blob |
| 234 | + -> [Model.BlobRef blob] |
| 235 | + -> m [blob] |
| 236 | +retrieveBlobs TableHandle {..} brefs = atomically $ |
| 237 | + withModel "retrieveBlobs" thSession thRef $ \tbl -> |
| 238 | + return $ Model.retrieveBlobs tbl brefs |
| 239 | + |
| 240 | +{------------------------------------------------------------------------------- |
| 241 | + Snapshots |
| 242 | +-------------------------------------------------------------------------------} |
| 243 | + |
| 244 | +-- | Take a snapshot. |
| 245 | +snapshot :: |
| 246 | + ( IOLike m |
| 247 | + , SomeSerialisationConstraint k |
| 248 | + , SomeSerialisationConstraint v |
| 249 | + , SomeSerialisationConstraint blob |
| 250 | + , Typeable k |
| 251 | + , Typeable v |
| 252 | + , Typeable blob |
| 253 | + ) |
| 254 | + => SnapshotName |
| 255 | + -> TableHandle m k v blob |
| 256 | + -> m () |
| 257 | +snapshot n TableHandle {..} = atomically $ |
| 258 | + withModel "snapshot" thSession thRef $ \tbl -> |
| 259 | + modifyTVar' (snapshots thSession) (Map.insert n (SomeModelTable tbl)) |
| 260 | + |
| 261 | +-- | Open a table through a snapshot, returning a new table handle. |
| 262 | +open :: |
| 263 | + ( IOLike m |
| 264 | + , SomeSerialisationConstraint k |
| 265 | + , SomeSerialisationConstraint v |
| 266 | + , SomeSerialisationConstraint blob |
| 267 | + , Typeable k |
| 268 | + , Typeable v |
| 269 | + , Typeable blob |
| 270 | + ) |
| 271 | + => Session m |
| 272 | + -> SnapshotName |
| 273 | + -> m (TableHandle m k v blob) |
| 274 | +open s n = atomically $ do |
| 275 | + ss <- readTVar (snapshots s) |
| 276 | + case Map.lookup n ss of |
| 277 | + Nothing -> throwSTM IOError |
| 278 | + { ioe_handle = Nothing |
| 279 | + , ioe_type = NoSuchThing |
| 280 | + , ioe_location = "open" |
| 281 | + , ioe_description = "no such snapshotd" |
| 282 | + , ioe_errno = Nothing |
| 283 | + , ioe_filename = Nothing |
| 284 | + } |
| 285 | + |
| 286 | + Just (SomeModelTable tbl) -> case cast tbl of |
| 287 | + Nothing -> throwSTM IOError |
| 288 | + { ioe_handle = Nothing |
| 289 | + , ioe_type = InappropriateType |
| 290 | + , ioe_location = "open" |
| 291 | + , ioe_description = "table type mismatch" |
| 292 | + , ioe_errno = Nothing |
| 293 | + , ioe_filename = Nothing |
| 294 | + } |
| 295 | + |
| 296 | + Just tbl' -> do |
| 297 | + ref <- newTMVar tbl' |
| 298 | + i <- new_handle s ref |
| 299 | + return TableHandle { thRef = ref, thId = i, thSession = s } |
| 300 | + |
| 301 | +{------------------------------------------------------------------------------- |
| 302 | + Mutiple writable table handles |
| 303 | +-------------------------------------------------------------------------------} |
| 304 | + |
| 305 | +-- | Create a cheap, independent duplicate of a table handle. This returns a new |
| 306 | +-- table handle. |
| 307 | +duplicate :: |
| 308 | + IOLike m |
| 309 | + => TableHandle m k v blob |
| 310 | + -> m (TableHandle m k v blob) |
| 311 | +duplicate TableHandle {..} = atomically $ |
| 312 | + withModel "duplicate" thSession thRef $ \tbl -> do |
| 313 | + thRef' <- newTMVar tbl |
| 314 | + i <- new_handle thSession thRef' |
| 315 | + return TableHandle { thRef = thRef', thId = i, thSession = thSession } |
| 316 | + |
| 317 | +{------------------------------------------------------------------------------- |
| 318 | + Internal helpers |
| 319 | +-------------------------------------------------------------------------------} |
| 320 | + |
| 321 | +withModel :: IOLike m => String -> Session m -> TMVar m a -> (a -> STM m r) -> STM m r |
| 322 | +withModel fun s ref kont = do |
| 323 | + m <- tryReadTMVar ref |
| 324 | + case m of |
| 325 | + Nothing -> throwSTM IOError |
| 326 | + { ioe_handle = Nothing |
| 327 | + , ioe_type = IllegalOperation |
| 328 | + , ioe_location = fun |
| 329 | + , ioe_description = "table handle closed" |
| 330 | + , ioe_errno = Nothing |
| 331 | + , ioe_filename = Nothing |
| 332 | + } |
| 333 | + Just m' -> do |
| 334 | + sok <- readTVar (session_open s) |
| 335 | + unless sok $ throwSTM IOError |
| 336 | + { ioe_handle = Nothing |
| 337 | + , ioe_type = IllegalOperation |
| 338 | + , ioe_location = fun |
| 339 | + , ioe_description = "session closed" |
| 340 | + , ioe_errno = Nothing |
| 341 | + , ioe_filename = Nothing |
| 342 | + } |
| 343 | + |
| 344 | + kont m' |
| 345 | + |
| 346 | +writeTMVar :: MonadSTM m => TMVar m a -> a -> STM m () |
| 347 | +writeTMVar t n = tryTakeTMVar t >> putTMVar t n |
0 commit comments