Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions ouroboros-consensus-cardano/app/snapshot-converter.hs
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ load config@Config{inpath = pathToDiskSnapshot -> Just (fs@(SomeHasFS hasFS), pa
pure (forgetLedgerTables st, projectLedgerTables st)
Mem -> do
checkSnapshotFileStructure Mem path fs
(ls, _) <- withExceptT SnapshotError $ V2.loadSnapshot rr ccfg fs ds
(ls, _) <- withExceptT SnapshotError $ V2.loadSnapshot nullTracer rr ccfg fs ds
let h = V2.currentHandle ls
(V2.state h,) <$> Trans.lift (V2.readAll (V2.tables h))
LMDB -> do
Expand Down Expand Up @@ -237,7 +237,7 @@ store config@Config{outpath = pathToDiskSnapshot -> Just (fs@(SomeHasFS hasFS),
withFile hasFS (path <.> "checksum") (WriteMode MustBeNew) $ \h ->
Monad.void $ hPutAll hasFS h . BS.toLazyByteString . BS.word32HexFixed $ getCRC crc
Mem -> do
lseq <- V2.empty state tbs $ V2.newInMemoryLedgerTablesHandle fs
lseq <- V2.empty state tbs $ V2.newInMemoryLedgerTablesHandle nullTracer fs
let h = V2.currentHandle lseq
Monad.void $ V2.takeSnapshot ccfg nullTracer fs suffix h
LMDB -> do
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
### Patch

- Changed the V2 LedgerDB `LedgerTablesHandle`s to actually be closed in all
cases. With the current (only) backend (in-memory), this doesn't matter, but
on-disk backends (like LSM trees) need this.
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
### Breaking

- LedgerDB: added new trace events (enabling new tests).
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,9 @@ data TestInternals m l blk = TestInternals
, reapplyThenPushNOW :: blk -> m ()
, truncateSnapshots :: m ()
, closeLedgerDB :: m ()
, getNumLedgerTablesHandles :: m Word64
-- ^ Get the number of referenced 'LedgerTablesHandle's for V2. For V1, this
-- always returns 0.
}
deriving NoThunks via OnlyCheckWhnfNamed "TestInternals" (TestInternals m l blk)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,7 @@ mkInternals h =
, wipeLedgerDB = getEnv h $ void . destroySnapshots . snapshotsFs . ldbHasFS
, closeLedgerDB = getEnv h $ bsClose . ldbBackingStore
, truncateSnapshots = getEnv h $ void . implIntTruncateSnapshots . ldbHasFS
, getNumLedgerTablesHandles = pure 0
}

-- | Testing only! Truncate all snapshots in the DB.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,12 +85,12 @@ mkInitDb args flavArgs getBlock =
, closeDb = closeLedgerSeq
, initReapplyBlock = \a b c -> do
(x, y) <- reapplyThenPush lgrRegistry a b c
closeLedgerSeq x
x
pure y
, currentTip = ledgerState . current
, pruneDb = \lseq -> do
let (LedgerSeq rel, dbPrunedToImmDBTip) = pruneToImmTipOnly lseq
mapM_ (close . tables) (AS.toOldestFirst rel)
let (rel, dbPrunedToImmDBTip) = pruneToImmTipOnly lseq
rel
pure dbPrunedToImmDBTip
, mkLedgerDb = \lseq -> do
varDB <- newTVarIO lseq
Expand Down Expand Up @@ -128,12 +128,15 @@ mkInitDb args flavArgs getBlock =

bss = case flavArgs of V2Args bss0 -> bss0

v2Tracer :: Tracer m V2.FlavorImplSpecificTrace
v2Tracer = LedgerDBFlavorImplEvent . FlavorImplSpecificTraceV2 >$< lgrTracer

emptyF ::
ExtLedgerState blk ValuesMK ->
m (LedgerSeq' m blk)
emptyF st =
empty' st $ case bss of
InMemoryHandleArgs -> InMemory.newInMemoryLedgerTablesHandle lgrHasFS
InMemoryHandleArgs -> InMemory.newInMemoryLedgerTablesHandle v2Tracer lgrHasFS
LSMHandleArgs x -> absurd x

loadSnapshot ::
Expand All @@ -142,7 +145,7 @@ mkInitDb args flavArgs getBlock =
DiskSnapshot ->
m (Either (SnapshotFailure blk) (LedgerSeq' m blk, RealPoint blk))
loadSnapshot ccfg fs ds = case bss of
InMemoryHandleArgs -> runExceptT $ InMemory.loadSnapshot lgrRegistry ccfg fs ds
InMemoryHandleArgs -> runExceptT $ InMemory.loadSnapshot v2Tracer lgrRegistry ccfg fs ds
LSMHandleArgs x -> absurd x

implMkLedgerDb ::
Expand Down Expand Up @@ -228,6 +231,10 @@ mkInternals bss h =
let LDBHandle tvar = h
in atomically (writeTVar tvar LedgerDBClosed)
, truncateSnapshots = getEnv h $ implIntTruncateSnapshots . ldbHasFS
, getNumLedgerTablesHandles = getEnv h $ \env -> do
l <- readTVarIO (ldbSeq env)
-- We always have a state at the anchor.
pure $ 1 + maxRollback l
}
where
takeSnapshot ::
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ data HandleArgs
deriving (Generic, NoThunks)

data FlavorImplSpecificTrace
= FlavorImplSpecificTraceInMemory
| FlavorImplSpecificTraceOnDisk
= -- | Created a new 'LedgerTablesHandle', potentially by duplicating an
-- existing one.
TraceLedgerTablesHandleCreate
| -- | Closed a 'LedgerTablesHandle'.
TraceLedgerTablesHandleClose
deriving (Show, Eq)
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ implForkerCommit env = do
LedgerSeq lseq <- readTVar foeLedgerSeq
let intersectionSlot = getTipSlot $ state $ AS.anchor lseq
let predicate = (== getTipHash (state (AS.anchor lseq))) . getTipHash . state
(discardedBySelection, LedgerSeq discardedByPruning) <- do
closeDiscarded <- do
stateTVar
foeSwitchVar
( \(LedgerSeq olddb) -> fromMaybe theImpossible $ do
Expand All @@ -153,17 +153,25 @@ implForkerCommit env = do
-- Join the prefix of the selection with the sequence in the forker
newdb <- AS.join (const $ const True) olddb' lseq
-- Prune the resulting sequence to keep @k@ states
let (l, s) = prune (LedgerDbPruneKeeping (foeSecurityParam env)) (LedgerSeq newdb)
pure ((toClose, l), s)
let (closePruned, s) = prune (LedgerDbPruneKeeping (foeSecurityParam env)) (LedgerSeq newdb)
closeDiscarded = do
closePruned
-- Do /not/ close the anchor of @toClose@, as that is also the
-- tip of @olddb'@ which will be used in @newdb@.
case toClose of
AS.Empty _ -> pure ()
_ AS.:< closeOld' -> closeLedgerSeq (LedgerSeq closeOld')
-- Finally, close the anchor of @lseq@ (which is a duplicate of
-- the head of @olddb'@).
close $ tables $ AS.anchor lseq
Comment on lines +157 to +166
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is correct, but it is rather non-obvious.

It would be "nicer" if newdb would use the anchor state of lseq instead of the head state of olddb' (but we would need a special purpose AS.join for this as this isn't possible in general when the anchor is "strictly smaller" than the element type). In that case, we could simply write

closeDiscarded = closePruned *> closeLedgerSeq (LedgerSeq closeOld)

instead of what I have here.

pure (closeDiscarded, s)
)

-- We are discarding the previous value in the TVar because we had accumulated
-- actions for closing the states pushed to the forker. As we are committing
-- those we have to close the ones discarded in this function and forget about
-- those releasing actions.
writeTVar foeResourcesToRelease $
mapM_ (close . tables) $
AS.toOldestFirst discardedBySelection ++ AS.toOldestFirst discardedByPruning
writeTVar foeResourcesToRelease closeDiscarded
where
ForkerEnv
{ foeLedgerSeq
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ import Ouroboros.Consensus.Ledger.SupportsProtocol
import qualified Ouroboros.Consensus.Ledger.Tables.Diff as Diff
import Ouroboros.Consensus.Storage.LedgerDB.API
import Ouroboros.Consensus.Storage.LedgerDB.Snapshots
import qualified Ouroboros.Consensus.Storage.LedgerDB.V2.Args as V2
import Ouroboros.Consensus.Storage.LedgerDB.V2.LedgerSeq
import Ouroboros.Consensus.Util.CBOR (readIncremental)
import Ouroboros.Consensus.Util.CRC
Expand Down Expand Up @@ -85,18 +86,25 @@ newInMemoryLedgerTablesHandle ::
, CanUpgradeLedgerTables l
, SerializeTablesWithHint l
) =>
Tracer m V2.FlavorImplSpecificTrace ->
SomeHasFS m ->
LedgerTables l ValuesMK ->
m (LedgerTablesHandle m l)
newInMemoryLedgerTablesHandle someFS@(SomeHasFS hasFS) l = do
newInMemoryLedgerTablesHandle tracer someFS@(SomeHasFS hasFS) l = do
!tv <- newTVarIO (LedgerTablesHandleOpen l)
traceWith tracer V2.TraceLedgerTablesHandleCreate
pure
LedgerTablesHandle
{ close =
atomically $ writeTVar tv LedgerTablesHandleClosed
{ close = do
-- Temporarily a no-op until
-- https://github.com/IntersectMBO/ouroboros-consensus/issues/1551 has
-- been fixed.

-- atomically $ writeTVar tv LedgerTablesHandleClosed
traceWith tracer V2.TraceLedgerTablesHandleClose
, duplicate = do
hs <- readTVarIO tv
!x <- guardClosed hs $ newInMemoryLedgerTablesHandle someFS
!x <- guardClosed hs $ newInMemoryLedgerTablesHandle tracer someFS
pure x
, read = \keys -> do
hs <- readTVarIO tv
Expand Down Expand Up @@ -208,12 +216,13 @@ loadSnapshot ::
, IOLike m
, LedgerSupportsInMemoryLedgerDB blk
) =>
Tracer m V2.FlavorImplSpecificTrace ->
ResourceRegistry m ->
CodecConfig blk ->
SomeHasFS m ->
DiskSnapshot ->
ExceptT (SnapshotFailure blk) m (LedgerSeq' m blk, RealPoint blk)
loadSnapshot _rr ccfg fs ds = do
loadSnapshot tracer _rr ccfg fs ds = do
snapshotMeta <-
withExceptT (InitFailureRead . ReadMetadataError (snapshotToMetadataPath ds)) $
loadSnapshotMetadata fs ds
Expand Down Expand Up @@ -242,4 +251,4 @@ loadSnapshot _rr ccfg fs ds = do
throwE $
InitFailureRead $
ReadSnapshotDataCorruption
(,pt) <$> lift (empty extLedgerSt values (newInMemoryLedgerTablesHandle fs))
(,pt) <$> lift (empty extLedgerSt values (newInMemoryLedgerTablesHandle tracer fs))
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,15 @@
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE QuantifiedConstraints #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE StandaloneDeriving #-}
{-# LANGUAGE TypeOperators #-}
{-# LANGUAGE UndecidableInstances #-}
{-# LANGUAGE ViewPatterns #-}

-- | The data structure that holds the cached ledger states.
module Ouroboros.Consensus.Storage.LedgerDB.V2.LedgerSeq
Expand Down Expand Up @@ -54,7 +56,6 @@ module Ouroboros.Consensus.Storage.LedgerDB.V2.LedgerSeq

import Cardano.Ledger.BaseTypes
import Control.ResourceRegistry
import qualified Data.Bifunctor as B
import Data.Function (on)
import Data.Word
import GHC.Generics
Expand Down Expand Up @@ -183,8 +184,11 @@ empty' ::
m (LedgerSeq m l)
empty' st = empty (forgetLedgerTables st) (ltprj st)

-- | Close all 'LedgerTablesHandle' in this 'LedgerSeq', in particular that on
-- the anchor.
closeLedgerSeq :: Monad m => LedgerSeq m l -> m ()
closeLedgerSeq = mapM_ (close . tables) . toOldestFirst . getLedgerSeq
closeLedgerSeq (LedgerSeq l) =
mapM_ (close . tables) $ AS.anchor l : AS.toOldestFirst l

{-------------------------------------------------------------------------------
Apply blocks
Expand All @@ -193,15 +197,14 @@ closeLedgerSeq = mapM_ (close . tables) . toOldestFirst . getLedgerSeq
-- | Apply a block on top of the ledger state and extend the LedgerSeq with
-- the result ledger state.
--
-- The @fst@ component of the result should be closed as it contains the pruned
-- states.
-- The @fst@ component of the result should be run to close the pruned states.
reapplyThenPush ::
(IOLike m, ApplyBlock l blk) =>
ResourceRegistry m ->
LedgerDbCfg l ->
blk ->
LedgerSeq m l ->
m (LedgerSeq m l, LedgerSeq m l)
m (m (), LedgerSeq m l)
reapplyThenPush rr cfg ap db =
(\current' -> prune (LedgerDbPruneKeeping (ledgerDbCfgSecParam cfg)) $ extend current' db)
<$> reapplyBlock (ledgerDbCfgComputeLedgerEvents cfg) (ledgerDbCfg cfg) ap rr db
Expand Down Expand Up @@ -229,30 +232,38 @@ reapplyBlock evs cfg b _rr db = do
-- | Prune older ledger states until at we have at most @k@ volatile states in
-- the LedgerDB, plus the one stored at the anchor.
--
-- The @fst@ component of the returned value has to be @close@ed.
-- The @fst@ component of the returned value is an action closing the pruned
-- ledger states.
--
-- >>> ldb = LedgerSeq $ AS.fromOldestFirst l0 [l1, l2, l3]
-- >>> ldb' = LedgerSeq $ AS.fromOldestFirst l1 [l2, l3]
-- >>> snd (prune (LedgerDbPruneKeeping (SecurityParam (unsafeNonZero 2))) ldb) == ldb'
-- True
prune ::
GetTip l =>
(Monad m, GetTip l) =>
LedgerDbPrune ->
LedgerSeq m l ->
(LedgerSeq m l, LedgerSeq m l)
prune (LedgerDbPruneKeeping (SecurityParam k)) (LedgerSeq ldb) =
if toEnum nvol <= unNonZero k
then (LedgerSeq $ Empty (AS.anchor ldb), LedgerSeq ldb)
else
-- We remove the new anchor from the @fst@ component so that its handle is
-- not closed.
B.bimap (LedgerSeq . dropNewest 1) LedgerSeq $ AS.splitAt (nvol - fromEnum (unNonZero k)) ldb
where
nvol = AS.length ldb
prune LedgerDbPruneAll (LedgerSeq ldb) =
B.bimap (LedgerSeq . dropNewest 1) LedgerSeq $ AS.splitAt nvol ldb
(m (), LedgerSeq m l)
prune howToPrune (LedgerSeq ldb) = case howToPrune of
LedgerDbPruneKeeping (SecurityParam (fromEnum . unNonZero -> k))
| nvol <= k -> (pure (), LedgerSeq ldb)
| otherwise -> (closeButHead before, LedgerSeq after)
where
nvol = AS.length ldb
(before, after) = AS.splitAt (nvol - k) ldb
LedgerDbPruneAll ->
(closeButHead before, LedgerSeq after)
where
(before, after) = (ldb, AS.Empty (AS.headAnchor ldb))
where
nvol = AS.length ldb
-- Above, we split @ldb@ into two sequences @before@ and @after@ such that
-- @AS.headAnchor before == AS.anchor after@. We want to close all handles of
-- @ldb@ not present in @after@, which are none if @before@ is empty, and all
-- (in particular the anchor) of @before@ apart from the the head of @before@
-- if @before@ is non-empty.
closeButHead = \case
AS.Empty _ -> pure ()
toPrune AS.:> _ -> closeLedgerSeq (LedgerSeq toPrune)

-- NOTE: we must inline 'prune' otherwise we get unexplained thunks in
-- 'LedgerSeq' and thus a space leak. Alternatively, we could disable the
Expand Down Expand Up @@ -296,9 +307,9 @@ extend newState =
-- >>> AS.anchor ldb' == l3 && AS.toOldestFirst ldb' == []
-- True
pruneToImmTipOnly ::
GetTip l =>
(Monad m, GetTip l) =>
LedgerSeq m l ->
(LedgerSeq m l, LedgerSeq m l)
(m (), LedgerSeq m l)
pruneToImmTipOnly = prune LedgerDbPruneAll

{-------------------------------------------------------------------------------
Expand Down
Loading
Loading