Skip to content

Commit

Permalink
Merge pull request #309 from IntersectMBO/jdral/fadvise
Browse files Browse the repository at this point in the history
Use `hAdvise` for reading and writing runs
  • Loading branch information
jorisdral authored Aug 1, 2024
2 parents a4ba65b + a8cee0f commit 0b2ade1
Show file tree
Hide file tree
Showing 18 changed files with 349 additions and 140 deletions.
2 changes: 1 addition & 1 deletion bench/macro/lsm-tree-bench-lookups.hs
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ benchmarks !caching = withFS $ \hfs hbio -> do

traceMarkerIO "Cleaning up"
putStrLn "Cleaning up"
V.mapM_ (Run.removeReference hfs) runs
V.mapM_ (Run.removeReference hfs hbio) runs

traceMarkerIO "Computing statistics for prepLookups results"
putStr "<Computing statistics for prepLookups>"
Expand Down
2 changes: 1 addition & 1 deletion bench/micro/Bench/Database/LSMTree/Internal/Lookup.hs
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ lookupsInBatchesCleanup ::
-> IO ()
lookupsInBatchesCleanup (tmpDir, _arenaManager, hasFS, hasBlockIO, rs, _) = do
FS.close hasBlockIO
forM_ rs $ Run.removeReference hasFS
forM_ rs $ Run.removeReference hasFS hasBlockIO
removeDirectoryRecursive tmpDir

-- | Generate keys to store and keys to lookup
Expand Down
6 changes: 3 additions & 3 deletions bench/micro/Bench/Database/LSMTree/Internal/Merge.hs
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ benchMerge conf@Config{name} =
-- Make sure to immediately close resulting runs so we don't run
-- out of file handles. Ideally this would not be measured, but at
-- least it's pretty cheap.
Run.removeReference hasFS run
Run.removeReference hasFS hasBlockIO run
]
where
withEnv =
Expand All @@ -229,7 +229,7 @@ merge ::
merge fs hbio Config {..} targetPaths runs = do
let f = fromMaybe const mergeMappend
m <- fromMaybe (error "empty inputs, no merge created") <$>
Merge.new fs Run.CacheRunData (RunAllocFixed 10) mergeLevel f targetPaths runs
Merge.new fs hbio Run.CacheRunData (RunAllocFixed 10) mergeLevel f targetPaths runs
go m
where
go m =
Expand Down Expand Up @@ -336,7 +336,7 @@ mergeEnvCleanup ::
)
-> IO ()
mergeEnvCleanup (tmpDir, hasFS, hasBlockIO, runs) = do
traverse_ (Run.removeReference hasFS) runs
traverse_ (Run.removeReference hasFS hasBlockIO) runs
removeDirectoryRecursive tmpDir
FS.close hasBlockIO

Expand Down
4 changes: 2 additions & 2 deletions bench/micro/Bench/Database/LSMTree/Internal/WriteBuffer.hs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ benchWriteBuffer conf@Config{name} =
bench "flush" $
Cr.perRunEnvWithCleanup (getPaths hasFS) (const (cleanupPaths hasFS)) $ \p -> do
!run <- flush hasFS hasBlockIO p wb
Run.removeReference hasFS run
Run.removeReference hasFS hasBlockIO run
, bench "insert+flush" $
-- To make sure the WriteBuffer really gets recomputed on every run,
-- we'd like to do: `whnfAppIO (kops' -> ...) kops`.
Expand All @@ -135,7 +135,7 @@ benchWriteBuffer conf@Config{name} =
-- Make sure to immediately close runs so we don't run out of
-- file handles. Ideally this would not be measured, but at
-- least it's pretty cheap.
Run.removeReference hasFS run
Run.removeReference hasFS hasBlockIO run
]
where
withEnv =
Expand Down
12 changes: 12 additions & 0 deletions blockio-api/src/System/FS/BlockIO/API.hs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@ module System.FS.BlockIO.API (
, ioopBufferOffset
, ioopByteCount
, IOResult (..)
-- * Advice
, Advice (..)
, hAdviseAll
, hDropCacheAll
-- * Re-exports
, ByteCount
, FileOffset
Expand Down Expand Up @@ -157,3 +160,12 @@ data Advice =
| AdviceDontNeed
| AdviceNoReuse
deriving stock (Show, Eq)

-- | Apply 'Advice' to all bytes of a file, which is referenced by a 'Handle'.
hAdviseAll :: HasBlockIO m h -> Handle h -> Advice -> m ()
hAdviseAll hbio h advice = hAdvise hbio h 0 0 advice -- len=0 implies until the end of file

-- | Drop the full file referenced by a 'Handle' from the OS page cache, if
-- present.
hDropCacheAll :: HasBlockIO m h -> Handle h -> m ()
hDropCacheAll hbio h = hAdviseAll hbio h AdviceDontNeed
21 changes: 11 additions & 10 deletions src/Database/LSMTree/Internal.hs
Original file line number Diff line number Diff line change
Expand Up @@ -506,13 +506,14 @@ runsInLevels levels = flip V.concatMap levels $ \(Level mr rs) ->
SingleRun r -> r `V.cons` rs
MergingRun (CompletedMerge r) -> r `V.cons` rs

{-# SPECIALISE closeLevels :: HasFS IO h -> Levels (Handle h) -> IO () #-}
{-# SPECIALISE closeLevels :: HasFS IO h -> HasBlockIO IO h -> Levels (Handle h) -> IO () #-}
closeLevels ::
m ~ IO -- TODO: replace by @io-classes@ constraints for IO simulation.
=> HasFS m h
-> HasBlockIO m h
-> Levels (Handle h)
-> m ()
closeLevels hfs levels = V.mapM_ (Run.removeReference hfs) (runsInLevels levels)
closeLevels hfs hbio levels = V.mapM_ (Run.removeReference hfs hbio) (runsInLevels levels)

-- | Flattend cache of the runs that referenced by a table handle.
--
Expand Down Expand Up @@ -610,7 +611,7 @@ close th = RW.withWriteAccess_ (tableHandleState th) $ \case
-- forget about this table.
tableSessionUntrackTable thEnv
RW.withWriteAccess_ (tableContent thEnv) $ \lvls -> do
closeLevels (tableHasFS thEnv) (tableLevels lvls)
closeLevels (tableHasFS thEnv) (tableHasBlockIO thEnv) (tableLevels lvls)
pure emptyTableContent
pure TableHandleClosed

Expand Down Expand Up @@ -773,7 +774,7 @@ flushWriteBuffer conf@TableConfig{confDiskCachePolicy, confBloomFilterAlloc}
(bloomFilterAllocForLevel confBloomFilterAlloc (LevelNo 1))
(Paths.runPath root n)
(tableWriteBuffer tc))
(Run.removeReference hfs)
(Run.removeReference hfs hbio)
levels' <- addRunToLevels conf hfs hbio root uniqC r reg (tableLevels tc)
pure $! TableContent {
tableWriteBuffer = WB.empty
Expand Down Expand Up @@ -941,8 +942,8 @@ addRunToLevels conf@TableConfig{..} hfs hbio root uniqC r0 reg levels = do
alloc = bloomFilterAllocForLevel confBloomFilterAlloc ln
r <- allocateTemp reg
(mergeRuns conf hfs hbio root uniqC caching alloc mergepolicy mergelast rs)
(Run.removeReference hfs)
V.mapM_ (freeTemp reg . Run.removeReference hfs) rs
(Run.removeReference hfs hbio)
V.mapM_ (freeTemp reg . Run.removeReference hfs hbio) rs
pure $! MergingRun (CompletedMerge r)

data MergePolicyForLevel = LevelTiering | LevelLevelling
Expand Down Expand Up @@ -1018,7 +1019,7 @@ mergeRuns ::
mergeRuns conf hfs hbio root uniqC caching alloc _ mergeLevel runs = do
n <- incrUniqCounter uniqC
let runPaths = Paths.runPath root n
Merge.new hfs caching alloc mergeLevel resolve runPaths (toList runs) >>= \case
Merge.new hfs hbio caching alloc mergeLevel resolve runPaths (toList runs) >>= \case
Nothing -> error "mergeRuns: no inputs"
Just merge -> go merge
where
Expand Down Expand Up @@ -1181,11 +1182,11 @@ openLevels hfs hbio diskCachePolicy levels =
caching = diskCachePolicyForLevel diskCachePolicy ln
!r <- Managed $ bracketOnError
(Run.openFromDisk hfs hbio caching (snd mrPath))
(Run.removeReference hfs)
(Run.removeReference hfs hbio)
!rs <- flip V.mapMStrict rsPaths $ \run ->
Managed $ bracketOnError
(Run.openFromDisk hfs hbio caching run)
(Run.removeReference hfs)
(Run.removeReference hfs hbio)
let !mr = if fst mrPath then SingleRun r else MergingRun (CompletedMerge r)
pure $! Level mr rs

Expand Down Expand Up @@ -1247,7 +1248,7 @@ duplicate th = withOpenTable th $ \thEnv -> do
V.forM_ (runsInLevels (tableLevels content)) $ \r -> do
allocateTemp reg
(Run.addReference (tableHasFS thEnv) r)
(\_ -> Run.removeReference (tableHasFS thEnv) r)
(\_ -> Run.removeReference (tableHasFS thEnv) (tableHasBlockIO thEnv) r)
pure content
newWith
(tableSession thEnv)
Expand Down
96 changes: 87 additions & 9 deletions src/Database/LSMTree/Internal/CRC32C.hs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE MagicHash #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE ScopedTypeVariables #-}

Expand All @@ -24,33 +25,42 @@ module Database.LSMTree.Internal.CRC32C (
hPutAllChunksCRC32C,
readFileCRC32C,

ChunkSize (..),
defaultChunkSize,
hGetExactlyCRC32C_SBS,
hGetAllCRC32C',

-- * Checksum files
-- $checksum-files
ChecksumsFile,
ChecksumsFileName(..),
readChecksumsFile,
writeChecksumsFile,
writeChecksumsFile',
) where

import Data.Digest.CRC32C as CRC

import Control.Monad
import Control.Monad.Class.MonadThrow
import Control.Monad.Primitive
import Data.Bits
import qualified Data.ByteString as BS
import qualified Data.ByteString.Builder as BS
import qualified Data.ByteString.Char8 as BSC
import qualified Data.ByteString.Internal as BS.Internal
import qualified Data.ByteString.Lazy as BSL
import qualified Data.ByteString.Short as SBS
import Data.Char (ord)
import Data.Digest.CRC32C as CRC
import Data.Either (partitionEithers)
import Data.Map.Strict (Map)
import qualified Data.Map.Strict as Map

import Data.Bits
import Data.Char (ord)
import Data.Primitive
import Data.Word

import Control.Monad
import Control.Monad.Class.MonadThrow

import GHC.Exts
import qualified GHC.ForeignPtr as Foreign
import System.FS.API
import System.FS.API.Lazy
import System.FS.BlockIO.API (ByteCount)


newtype CRC32C = CRC32C Word32
Expand Down Expand Up @@ -151,6 +161,70 @@ readFileCRC32C fs file =
then return crc
else go h (updateCRC32C bs crc)

newtype ChunkSize = ChunkSize ByteCount

defaultChunkSize :: ChunkSize
defaultChunkSize = ChunkSize 65504 -- 2^16 - 4 words overhead

{-# SPECIALISE hGetExactlyCRC32C_SBS :: HasFS IO h -> Handle h -> ByteCount -> CRC32C -> IO (SBS.ShortByteString, CRC32C) #-}
-- | Reads exactly as many bytes as requested, returning a 'ShortByteString' and
-- updating a given 'CRC32C' value.
--
-- If EOF is found before the requested number of bytes is read, an FsError
-- exception is thrown.
--
-- The returned 'ShortByteString' is backed by pinned memory.
hGetExactlyCRC32C_SBS ::
forall m h. (MonadThrow m, PrimMonad m)
=> HasFS m h
-> Handle h
-> ByteCount -- ^ Number of bytes to read
-> CRC32C
-> m (SBS.ShortByteString, CRC32C)
hGetExactlyCRC32C_SBS hfs h !c !crc = do
buf@(MutableByteArray !mba#) <- newPinnedByteArray (fromIntegral c)
void $ hGetBufExactly hfs h buf 0 c
(ByteArray !ba#) <- unsafeFreezeByteArray buf
let fp = Foreign.ForeignPtr (byteArrayContents# ba#) (Foreign.PlainPtr (unsafeCoerce# mba#))
!bs = BS.Internal.BS fp (fromIntegral c)
!crc' = updateCRC32C bs crc
pure (SBS.SBS ba#, crc')

{-# SPECIALISE hGetAllCRC32C' :: HasFS IO h -> Handle h -> ChunkSize -> CRC32C -> IO CRC32C #-}
-- | Reads all bytes, updating a given 'CRC32C' value without returning the
-- bytes.
hGetAllCRC32C' ::
forall m h. PrimMonad m
=> HasFS m h
-> Handle h
-> ChunkSize -- ^ Chunk size, must be larger than 0
-> CRC32C
-> m CRC32C
hGetAllCRC32C' hfs h (ChunkSize !chunkSize) !crc0
| chunkSize <= 0
= error "hGetAllCRC32C_SBS: chunkSize must be >0"
| otherwise
= do
buf@(MutableByteArray !mba#) <- newPinnedByteArray (fromIntegral chunkSize)
(ByteArray !ba#) <- unsafeFreezeByteArray buf
let fp = Foreign.ForeignPtr (byteArrayContents# ba#) (Foreign.PlainPtr (unsafeCoerce# mba#))
!bs = BS.Internal.BS fp (fromIntegral chunkSize)
go bs buf crc0
where
-- In particular, note that the "immutable" bs :: BS.ByteString aliases the
-- mutable buf :: MutableByteArray. This is a bit hairy but we need to do
-- something like this because the CRC code only works with ByteString.
-- We thus have to be very careful about when bs is used.
go :: BS.ByteString -> MutableByteArray (PrimState m) -> CRC32C -> m CRC32C
go !bs buf !crc = do
!n <- hGetBufSome hfs h buf 0 chunkSize
if n == 0
then return crc
else do
-- compute the update CRC value before reading the next bytes
let !crc' = updateCRC32C (BS.take (fromIntegral n) bs) crc
go bs buf crc'


{- | $checksum-files
We use @.checksum@ files to help verify the integrity of on disk snapshots.
Expand Down Expand Up @@ -202,6 +276,10 @@ writeChecksumsFile fs path checksums =
_ <- hPutAll fs h (formatChecksumsFile checksums)
return ()

writeChecksumsFile' :: MonadThrow m
=> HasFS m h -> Handle h -> ChecksumsFile -> m ()
writeChecksumsFile' fs h checksums = void $ hPutAll fs h (formatChecksumsFile checksums)

parseChecksumsFile :: BSC.ByteString -> Either BSC.ByteString ChecksumsFile
parseChecksumsFile content =
case partitionEithers (parseLines content) of
Expand Down
16 changes: 9 additions & 7 deletions src/Database/LSMTree/Internal/Merge.hs
Original file line number Diff line number Diff line change
Expand Up @@ -51,15 +51,16 @@ type Mappend = SerialisedValue -> SerialisedValue -> SerialisedValue
-- The list of runs should be sorted from new to old.
new ::
HasFS IO h
-> HasBlockIO IO h
-> RunDataCaching
-> RunBloomFilterAlloc
-> Level
-> Mappend
-> Run.RunFsPaths
-> [Run (FS.Handle h)]
-> IO (Maybe (Merge (FS.Handle h)))
new fs mergeCaching alloc mergeLevel mergeMappend targetPaths runs = do
mreaders <- Readers.new fs runs
new fs hbio mergeCaching alloc mergeLevel mergeMappend targetPaths runs = do
mreaders <- Readers.new fs hbio runs
for mreaders $ \mergeReaders -> do
-- calculate upper bounds based on input runs
let numEntries = coerce (sum @[] @Int) (map Run.runNumEntries runs)
Expand All @@ -73,11 +74,12 @@ new fs mergeCaching alloc mergeLevel mergeMappend targetPaths runs = do
-- Once it has been called, do not use the 'Merge' any more!
close ::
HasFS IO h
-> HasBlockIO IO h
-> Merge (FS.Handle h)
-> IO ()
close fs Merge {..} = do
close fs hbio Merge {..} = do
Builder.close fs mergeBuilder
Readers.close fs mergeReaders
Readers.close fs hbio mergeReaders

data StepResult fhandle = MergeInProgress | MergeComplete !(Run fhandle)

Expand Down Expand Up @@ -110,7 +112,7 @@ steps fs hbio Merge {..} requestedSteps =
| n >= requestedSteps =
return (n, MergeInProgress)
| otherwise = do
(key, entry, hasMore) <- Readers.pop fs mergeReaders
(key, entry, hasMore) <- Readers.pop fs hbio mergeReaders
case hasMore of
Readers.HasMore ->
handleEntry (n + 1) key entry
Expand Down Expand Up @@ -139,7 +141,7 @@ steps fs hbio Merge {..} requestedSteps =
writeSerialisedEntry fs mergeLevel mergeBuilder key (Mupdate v)
go n
else do
(_, nextEntry, hasMore) <- Readers.pop fs mergeReaders
(_, nextEntry, hasMore) <- Readers.pop fs hbio mergeReaders
-- for resolution, we need the full second value to be present
let resolved = combine mergeMappend
(Mupdate v)
Expand All @@ -158,7 +160,7 @@ steps fs hbio Merge {..} requestedSteps =
completeMerge (n + 1)

dropRemaining !n !key = do
(dropped, hasMore) <- Readers.dropWhileKey fs mergeReaders key
(dropped, hasMore) <- Readers.dropWhileKey fs hbio mergeReaders key
case hasMore of
Readers.HasMore -> go (n + dropped)
Readers.Drained -> completeMerge (n + dropped)
Expand Down
Loading

0 comments on commit 0b2ade1

Please sign in to comment.