Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion hydra-node/src/Hydra/API/ServerOutput.hs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ instance IsChainState tx => ToJSON (TimedServerOutput tx) where
case toJSON output of
Object o ->
Object $ o <> KeyMap.fromList [("seq", toJSON seq), ("timestamp", toJSON time)]
_NotAnObject -> error "expected ServerOutput to serialize to an Object"
_NotAnObject -> error $ "expected ServerOutput to serialize to an Object: " <> show _NotAnObject

instance IsChainState tx => FromJSON (TimedServerOutput tx) where
parseJSON v = flip (withObject "TimedServerOutput") v $ \o ->
Expand Down
8 changes: 5 additions & 3 deletions hydra-node/src/Hydra/Logging.hs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import Control.Concurrent.Class.MonadSTM (
readTVarIO,
writeTBQueue,
)
import Control.Monad.Class.MonadAsync (link)
import Control.Monad.Class.MonadFork (myThreadId)
import Control.Monad.Class.MonadSay (MonadSay, say)
import Control.Tracer (
Expand Down Expand Up @@ -84,7 +85,7 @@ defaultQueueSize = 500
-- is wrapping 'msg' into an 'Envelope' with metadata.
withTracer ::
forall m msg a.
(MonadIO m, MonadFork m, MonadTime m, ToJSON msg) =>
(HasCallStack, MonadIO m, MonadFork m, MonadTime m, ToJSON msg) =>
Verbosity ->
(Tracer m msg -> IO a) ->
IO a
Expand All @@ -96,14 +97,15 @@ withTracer (Verbose namespace) = withTracerOutputTo stdout namespace
-- with metadata.
withTracerOutputTo ::
forall m msg a.
(MonadIO m, MonadFork m, MonadTime m, ToJSON msg) =>
(HasCallStack, MonadIO m, MonadFork m, MonadTime m, ToJSON msg) =>
Handle ->
Text ->
(Tracer m msg -> IO a) ->
IO a
withTracerOutputTo hdl namespace action = do
msgQueue <- newTBQueueIO @_ @(Envelope msg) defaultQueueSize
withAsync (writeLogs msgQueue) $ \_ ->
withAsync (writeLogs msgQueue) $ \t -> do
-- link t
action (tracer msgQueue) `finally` flushLogs msgQueue
where
tracer queue =
Expand Down
6 changes: 3 additions & 3 deletions hydra-node/src/Hydra/Logging/Monitoring.hs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,8 @@ module Hydra.Logging.Monitoring (
withMonitoring,
) where

import Hydra.Prelude

import Control.Concurrent.Class.MonadSTM (modifyTVar', newTVarIO, readTVarIO)
import Control.Monad.Class.MonadAsync (link)
import Control.Tracer (Tracer (Tracer))
import Data.Map.Strict as Map
import Hydra.HeadLogic (
Expand All @@ -24,6 +23,7 @@ import Hydra.Logging.Messages (HydraLog (..))
import Hydra.Network (PortNumber)
import Hydra.Network.Message (Message (ReqTx), NetworkEvent (..))
import Hydra.Node (HydraNodeLog (..))
import Hydra.Prelude
import Hydra.Tx (IsTx (TxIdType), Snapshot (..), txId)
import System.Metrics.Prometheus.Http.Scrape (serveMetrics)
import System.Metrics.Prometheus.Metric (Metric (CounterMetric, GaugeMetric, HistogramMetric))
Expand All @@ -46,7 +46,7 @@ withMonitoring ::
withMonitoring Nothing tracer action = action tracer
withMonitoring (Just monitoringPort) (Tracer tracer) action = do
(traceMetric, registry) <- prepareRegistry
withAsync (serveMetrics (fromIntegral monitoringPort) ["metrics"] (sample registry)) $ \_ ->
withAsync (serveMetrics (fromIntegral monitoringPort) ["metrics"] (sample registry)) $ \t ->
let wrappedTracer = Tracer $ \msg -> do
traceMetric msg
tracer msg
Expand Down
3 changes: 2 additions & 1 deletion hydra-node/src/Hydra/Network/Etcd.hs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ import Control.Concurrent.Class.MonadSTM (
)
import Control.Exception (IOException)
import Control.Lens ((^.), (^..), (^?))
import Control.Monad.Class.MonadAsync (link)
import Data.Aeson (decodeFileStrict', encodeFile)
import Data.Aeson qualified as Aeson
import Data.Aeson.Lens qualified as Aeson
Expand Down Expand Up @@ -147,7 +148,7 @@ withEtcdNetwork tracer protocolVersion config callback action = do
-- first rpc call will block until the connection has been established.
withConnection (connParams doneVar) grpcServer $ \conn -> do
-- REVIEW: checkVersion blocks if used on main thread - why?
withAsync (checkVersion tracer conn protocolVersion callback) $ \_ -> do
withAsync (checkVersion tracer conn protocolVersion callback) $ \t -> do
race_ (pollConnectivity tracer conn advertise callback) $
race_ (waitMessages tracer conn persistenceDir callback) $ do
queue <- newPersistentQueue (persistenceDir </> "pending-broadcast") 100
Expand Down
4 changes: 2 additions & 2 deletions hydra-node/src/Hydra/Node.hs
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ hydrate ::
[EventSink (StateEvent tx) m] ->
m (DraftHydraNode tx m)
hydrate tracer env ledger initialChainState EventStore{eventSource, eventSink} eventSinks = do
let allSinks = eventSink : eventSinks
let allSinks = eventSinks
traceWith tracer LoadingState
(lastEventId, (headState, chainStateHistory)) <-
runConduitRes $
Expand Down Expand Up @@ -215,7 +215,7 @@ hydrate tracer env ledger initialChainState EventStore{eventSource, eventSink} e
, nodeState
, inputQueue
, eventSource
, eventSinks = allSinks
, eventSinks = eventSink : allSinks
, chainStateHistory
}
where
Expand Down
Loading