Skip to content

Commit

Permalink
Merge pull request #405 from IntersectMBO/jdral/merge-refactor
Browse files Browse the repository at this point in the history
Small refactorings for the `Merge` module
  • Loading branch information
jorisdral authored Sep 30, 2024
2 parents 590471d + ea8fc61 commit bbc4025
Show file tree
Hide file tree
Showing 5 changed files with 107 additions and 62 deletions.
7 changes: 1 addition & 6 deletions bench/micro/Bench/Database/LSMTree/Internal/Merge.hs
Original file line number Diff line number Diff line change
Expand Up @@ -234,12 +234,7 @@ merge fs hbio Config {..} targetPaths runs = do
let f = fromMaybe const mergeMappend
m <- fromMaybe (error "empty inputs, no merge created") <$>
Merge.new fs hbio Run.CacheRunData (RunAllocFixed 10) mergeLevel f targetPaths runs
go m
where
go m =
Merge.steps fs hbio m stepSize >>= \case
(_, Merge.MergeComplete run) -> return run
(_, Merge.MergeInProgress) -> go m
Merge.stepsToCompletion m stepSize

outputRunPaths :: Run.RunFsPaths
outputRunPaths = RunFsPaths (FS.mkFsPath []) (RunNumber 0)
Expand Down
3 changes: 3 additions & 0 deletions src-extras/Database/LSMTree/Extras/NoThunks.hs
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,9 @@ deriving anyclass instance (Typeable m, Typeable (PrimState m), Typeable h)
deriving stock instance Generic Merge.Level
deriving anyclass instance NoThunks Merge.Level

deriving stock instance Generic Merge.StepResult
deriving anyclass instance NoThunks Merge.StepResult

{-------------------------------------------------------------------------------
Readers
-------------------------------------------------------------------------------}
Expand Down
136 changes: 97 additions & 39 deletions src/Database/LSMTree/Internal/Merge.hs
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
-- | The 'Merge' type and its functions are not intended for concurrent use.
-- Concurrent access should therefore be sequentialised using a suitable
-- concurrency primitive, such as an 'MVar'.
module Database.LSMTree.Internal.Merge (
Merge (..)
, Level (..)
, Mappend
, new
, close
, complete
, stepsToCompletion
, stepsToCompletionCounted
, StepResult (..)
, steps
) where
Expand All @@ -13,6 +19,7 @@ import Control.Monad (when)
import Control.Monad.Primitive (PrimState, RealWorld)
import Control.RefCount (RefCount (..))
import Data.Coerce (coerce)
import Data.Primitive.MutVar
import Data.Traversable (for)
import qualified Data.Vector as V
import Database.LSMTree.Internal.BlobRef (BlobRef)
Expand All @@ -23,6 +30,7 @@ import Database.LSMTree.Internal.RunAcc (RunBloomFilterAlloc (..))
import Database.LSMTree.Internal.RunBuilder (RunBuilder)
import qualified Database.LSMTree.Internal.RunBuilder as Builder
import qualified Database.LSMTree.Internal.RunReader as Reader
import Database.LSMTree.Internal.RunReaders (Readers)
import qualified Database.LSMTree.Internal.RunReaders as Readers
import Database.LSMTree.Internal.Serialise
import qualified System.FS.API as FS
Expand All @@ -36,13 +44,18 @@ import System.FS.BlockIO.API (HasBlockIO)
--
-- TODO: Reference counting will have to be done somewhere, either here or in
-- the layer above.
data Merge m fhandle = Merge {
mergeLevel :: !Level
, mergeMappend :: !Mappend
, mergeReaders :: {-# UNPACK #-} !(Readers.Readers m fhandle)
, mergeBuilder :: !(RunBuilder (PrimState m) fhandle)
, mergeCaching :: !RunDataCaching
-- ^ The caching policy to use for the Run in the 'MergeComplete'.
data Merge m h = Merge {
mergeLevel :: !Level
, mergeMappend :: !Mappend
, mergeReaders :: {-# UNPACK #-} !(Readers m (FS.Handle h))
, mergeBuilder :: !(RunBuilder (PrimState m) (FS.Handle h))
-- | The caching policy to use for the Run in the 'MergeComplete'.
, mergeCaching :: !RunDataCaching
-- | The result of the latest call to 'steps'. This is used to determine
-- whether a merge can be 'complete'd.
, mergeLastStepResult :: !(MutVar (PrimState m) StepResult)
, mergeHasFS :: !(HasFS m h)
, mergeHasBlockIO :: !(HasBlockIO m h)
}

data Level = MidLevel | LastLevel
Expand All @@ -61,33 +74,74 @@ new ::
-> Mappend
-> Run.RunFsPaths
-> V.Vector (Run IO (FS.Handle h))
-> IO (Maybe (Merge IO (FS.Handle h)))
-> IO (Maybe (Merge IO h))
new fs hbio mergeCaching alloc mergeLevel mergeMappend targetPaths runs = do
-- no offset, no write buffer
mreaders <- Readers.new fs hbio Readers.NoOffsetKey Nothing runs
for mreaders $ \mergeReaders -> do
-- calculate upper bounds based on input runs
let numEntries = coerce (sum @V.Vector @Int) (fmap Run.runNumEntries runs)
mergeBuilder <- Builder.new fs targetPaths numEntries alloc
return Merge {..}
mergeLastStepResult <- newMutVar $! MergeInProgress
return Merge {
mergeHasFS = fs
, mergeHasBlockIO = hbio
, ..
}


-- | This function should be called when discarding a 'Merge' before it
-- was done (i.e. returned 'MergeComplete'). This removes the incomplete files
-- created for the new run so far and avoids leaking file handles.
--
-- Once it has been called, do not use the 'Merge' any more!
close ::
HasFS IO h
-> HasBlockIO IO h
-> Merge IO (FS.Handle h)
-> IO ()
close fs hbio Merge {..} = do
Builder.close fs mergeBuilder
Readers.close fs hbio mergeReaders
close :: Merge IO h -> IO ()
close Merge {..} = do
Builder.close mergeHasFS mergeBuilder
Readers.close mergeHasFS mergeHasBlockIO mergeReaders

-- | Complete a 'Merge', returning a new 'Run' as the result of merging the
-- input runs. This function will /not/ do any merging work if there is any
-- remaining. That is, if not enough 'steps' were performed to exhaust the input
-- 'Readers', this function will throw an error.
--
-- Note: this function creates new 'Run' resources, so it is recommended to run
-- this function with async exceptions masked. Otherwise, these resources can
-- leak.
complete :: Merge IO h -> IO (Run IO (FS.Handle h))
complete Merge{..} = do
readMutVar mergeLastStepResult >>= \case
MergeInProgress -> error "complete: Merge is not yet completed!"
MergeComplete -> do
Run.fromMutable mergeHasFS mergeHasBlockIO mergeCaching
(RefCount 1) mergeBuilder

-- | Like 'steps', but calling 'complete' once the merge is finished.
--
-- Note: run with async exceptions masked. See 'complete'.
stepsToCompletion :: Merge IO h -> Int -> IO (Run IO (FS.Handle h))
stepsToCompletion m stepBatchSize = go
where
go = do
steps m stepBatchSize >>= \case
(_, MergeInProgress) -> go
(_, MergeComplete) -> complete m

data StepResult m fhandle = MergeInProgress | MergeComplete !(Run m fhandle)
-- | Like 'steps', but calling 'complete' once the merge is finished.
--
-- Note: run with async exceptions masked. See 'complete'.
stepsToCompletionCounted :: Merge IO h -> Int -> IO (Int, Run IO (FS.Handle h))
stepsToCompletionCounted m stepBatchSize = go 0
where
go !stepsSum = do
steps m stepBatchSize >>= \case
(n, MergeInProgress) -> go (stepsSum + n)
(n, MergeComplete) -> let !stepsSum' = stepsSum + n
in (stepsSum',) <$> complete m

data StepResult = MergeInProgress | MergeComplete

stepsInvariant :: Int -> (Int, StepResult IO a) -> Bool
stepsInvariant :: Int -> (Int, StepResult) -> Bool
stepsInvariant requestedSteps = \case
(n, MergeInProgress) -> n >= requestedSteps
_ -> True
Expand All @@ -99,19 +153,26 @@ stepsInvariant requestedSteps = \case
--
-- Returns the number of input entries read, which is guaranteed to be at least
-- as many as requested (unless the merge is complete).
--
-- If this returns 'MergeComplete', do not use the `Merge` any more!
--
-- The resulting run has a reference count of 1.
steps ::
HasFS IO h
-> HasBlockIO IO h
-> Merge IO (FS.Handle h)
Merge IO h
-> Int -- ^ How many input entries to consume (at least)
-> IO (Int, StepResult IO (FS.Handle h))
steps fs hbio Merge {..} requestedSteps =
(\res -> assert (stepsInvariant requestedSteps res) res) <$> go 0
-> IO (Int, StepResult)
steps Merge {..} requestedSteps = assertStepsInvariant <$> do
-- TODO: ideally, we would not check whether the merge was already done on
-- every call to @steps@. It is important for correctness, however, that we
-- do not call @steps@ on a merge when it was already done. It is not yet
-- clear whether our (upcoming) implementation of scheduled merges is going
-- to satisfy this precondition when it calls @steps@, so for now we do the
-- check.
readMutVar mergeLastStepResult >>= \case
MergeComplete -> pure (0, MergeComplete)
MergeInProgress -> go 0
where
assertStepsInvariant res = assert (stepsInvariant requestedSteps res) res

fs = mergeHasFS
hbio = mergeHasBlockIO

go !n
| n >= requestedSteps =
return (n, MergeInProgress)
Expand All @@ -123,7 +184,8 @@ steps fs hbio Merge {..} requestedSteps =
Readers.Drained -> do
-- no future entries, no previous entry to resolve, just write!
writeReaderEntry fs mergeLevel mergeBuilder key entry
completeMerge (n + 1)
writeMutVar mergeLastStepResult $! MergeComplete
pure (n + 1, MergeComplete)

handleEntry !n !key (Reader.Entry (Mupdate v)) =
-- resolve small mupsert vals with the following entries of the same key
Expand Down Expand Up @@ -161,20 +223,16 @@ steps fs hbio Merge {..} requestedSteps =
dropRemaining (n + 1) key
Readers.Drained -> do
writeSerialisedEntry fs mergeLevel mergeBuilder key resolved
completeMerge (n + 1)
writeMutVar mergeLastStepResult $! MergeComplete
pure (n + 1, MergeComplete)

dropRemaining !n !key = do
(dropped, hasMore) <- Readers.dropWhileKey fs hbio mergeReaders key
case hasMore of
Readers.HasMore -> go (n + dropped)
Readers.Drained -> completeMerge (n + dropped)

completeMerge !n = do
-- All Readers have been drained, the builder finalised.
-- No further cleanup required.
run <- Run.fromMutable fs hbio mergeCaching
(RefCount 1) mergeBuilder
return (n, MergeComplete run)
Readers.Drained -> do
writeMutVar mergeLastStepResult $! MergeComplete
pure (n + dropped, MergeComplete)


writeReaderEntry ::
Expand Down
7 changes: 1 addition & 6 deletions src/Database/LSMTree/Internal/MergeSchedule.hs
Original file line number Diff line number Diff line change
Expand Up @@ -614,9 +614,4 @@ mergeRuns ::
mergeRuns resolve hfs hbio caching alloc runPaths mergeLevel runs = do
Merge.new hfs hbio caching alloc mergeLevel resolve runPaths runs >>= \case
Nothing -> error "mergeRuns: no inputs"
Just merge -> go merge
where
go m =
Merge.steps hfs hbio m 1024 >>= \case
(_, Merge.MergeInProgress) -> go m
(_, Merge.MergeComplete run) -> return run
Just m -> Merge.stepsToCompletion m 1024
16 changes: 5 additions & 11 deletions test/Test/Database/LSMTree/Internal/Merge.hs
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ prop_CloseMerge fs hbio level (Positive stepSize) (fmap unTypedWriteBuffer -> Sm
let path0 = RunFsPaths (FS.mkFsPath []) (RunNumber 0)
runs <- fmap V.fromList $ sequenceA $ zipWith (flush . RunNumber) [10..] wbs
mergeToClose <- makeInProgressMerge path0 runs
traverse_ (Merge.close fs hbio) mergeToClose
traverse_ Merge.close mergeToClose

filesExist <- traverse (FS.doesFileExist fs) (pathsForRunFiles path0)

Expand All @@ -147,9 +147,9 @@ prop_CloseMerge fs hbio level (Positive stepSize) (fmap unTypedWriteBuffer -> Sm
Nothing -> return Nothing -- not in progress
Just merge -> do
-- just do a few steps once, ideally not completing the merge
Merge.steps fs hbio merge stepSize >>= \case
(_, Merge.MergeComplete run) -> do
Run.removeReference run -- run not needed, close
Merge.steps merge stepSize >>= \case
(_, Merge.MergeComplete) -> do
Merge.close merge -- run not needed, close
return Nothing -- not in progress
(_, Merge.MergeInProgress) ->
return (Just merge)
Expand All @@ -173,13 +173,7 @@ mergeRuns fs hbio level runNumber runs (Positive stepSize) = do
(RunFsPaths (FS.mkFsPath []) runNumber) runs >>= \case
Nothing -> (,) 0 <$> mkRunFromSerialisedKOps fs hbio
(RunFsPaths (FS.mkFsPath []) runNumber) Map.empty
Just m -> go 0 m
where
go !steps m =
Merge.steps fs hbio m stepSize >>= \case
(n, Merge.MergeComplete run) -> return (steps + n, run)
(n, Merge.MergeInProgress) -> go (steps + n) m

Just m -> Merge.stepsToCompletionCounted m stepSize

type SerialisedEntry = Entry.Entry SerialisedValue SerialisedBlob

Expand Down

0 comments on commit bbc4025

Please sign in to comment.