Skip to content

Commit

Permalink
Server: Add Event Trigger Metrics
Browse files Browse the repository at this point in the history
- [x] **Event Triggers Metrics**
  - [x] Distribution of size of event trigger fetches / Number of events fetched in the last `event trigger fetch`
  - [x] Event Triggers: Number of event trigger HTTP workers in process
  - [x]  Event Triggers: Avg event trigger lock time (if an event has been fetched but not processed because http worker is not free)

#### Sample response
The metrics can be viewed from the `/dev/ekg` endpoint

```json
{
"num_events_fetched":{
      "max":0,
      "mean":0,
      "count":1,
      "min":0,
      "variance":null,
      "type":"d",
      "sum":0
   },
 "num_event_trigger_http_workers":{
      "type":"g",
      "val":0
   },
"event_lock_time":{
      "max":0,
      "mean":0,
      "count":0,
      "min":0,
      "variance":0,
      "type":"d",
      "sum":0
   },
```

#### Todo
- [ ]  Group similar metrics together (Eg: Group all the metrics related to Event trigger, How do we do it??)

Closes: hasura/graphql-engine-mono#202
GitOrigin-RevId: bada11d
  • Loading branch information
Naveenaidu authored and hasura-bot committed Mar 7, 2021
1 parent 5a1f71b commit f55df22
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 13 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

### Bug fixes and improvements

(Add entries here in the order of: server, console, cli, docs, others)

- server/mssql: support tracking and querying from views
- cli: add support for rest endpoints

Expand Down
2 changes: 1 addition & 1 deletion server/src-lib/Hasura/App.hs
Original file line number Diff line number Diff line change
Expand Up @@ -518,7 +518,7 @@ runHGEServer setupHook env ServeOptions{..} ServeCtx{..} initTime postPollHook s

_eventQueueThread <- C.forkManagedT "processEventQueue" logger $
processEventQueue logger logEnvHeaders
_scHttpManager (getSCFromRef cacheRef) eventEngineCtx lockedEventsCtx
_scHttpManager (getSCFromRef cacheRef) eventEngineCtx lockedEventsCtx serverMetrics

-- start a backgroud thread to handle async actions
_asyncActionsThread <- C.forkManagedT "asyncActionsProcessor" logger $
Expand Down
41 changes: 33 additions & 8 deletions server/src-lib/Hasura/Eventing/EventTrigger.hs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ import qualified Database.PG.Query as Q
import qualified Database.PG.Query.PTI as PTI
import qualified Network.HTTP.Client as HTTP
import qualified PostgreSQL.Binary.Encoding as PE
import qualified System.Metrics.Distribution as EKG.Distribution
import qualified System.Metrics.Gauge as EKG.Gauge

import Control.Concurrent.Extended (sleep)
import Control.Concurrent.STM.TVar
Expand All @@ -77,6 +79,7 @@ import Hasura.Eventing.HTTP
import Hasura.HTTP
import Hasura.RQL.DDL.Headers
import Hasura.RQL.Types
import Hasura.Server.Init.Config
import Hasura.Server.Version (HasVersion)

data TriggerMetadata
Expand Down Expand Up @@ -155,7 +158,7 @@ initEventEngineCtx maxT _eeCtxFetchInterval = do
_eeCtxEventThreadsCapacity <- newTVar maxT
return $ EventEngineCtx{..}

type EventWithSource = (Event, SourceConfig 'Postgres)
type EventWithSource = (Event, SourceConfig 'Postgres, Time.UTCTime)

-- | Service events from our in-DB queue.
--
Expand All @@ -181,9 +184,12 @@ processEventQueue
-> IO SchemaCache
-> EventEngineCtx
-> LockedEventsCtx
-> ServerMetrics
-> m void
processEventQueue logger logenv httpMgr getSchemaCache eeCtx@EventEngineCtx{..} LockedEventsCtx{leEvents} = do
processEventQueue logger logenv httpMgr getSchemaCache eeCtx@EventEngineCtx{..} LockedEventsCtx{leEvents} serverMetrics = do
events0 <- popEventsBatch
-- Track number of events fetched in EKG
_ <- liftIO $ EKG.Distribution.add (smNumEventsFetched serverMetrics) (fromIntegral $ length events0)
go events0 0 False
where
fetchBatchSize = 100
Expand All @@ -208,8 +214,10 @@ processEventQueue logger logenv httpMgr getSchemaCache eeCtx@EventEngineCtx{..}
liftIO $ L.unLogger logger $ EventInternalErr err
return []
Right events -> do
-- The time when the events were fetched. This is used to calculate the average lock time of an event.
eventsFetchedTime <- liftIO getCurrentTime
saveLockedEvents (map eId events) leEvents
return $ map (, sourceConfig) events
return $ map (, sourceConfig, eventsFetchedTime) events

-- work on this batch of events while prefetching the next. Recurse after we've forked workers
-- for each in the batch, minding the requested pool size.
Expand All @@ -223,8 +231,8 @@ processEventQueue logger logenv httpMgr getSchemaCache eeCtx@EventEngineCtx{..}
-- worth the effort for something more fine-tuned
eventsNext <- LA.withAsync popEventsBatch $ \eventsNextA -> do
-- process approximately in order, minding HASURA_GRAPHQL_EVENTS_HTTP_POOL_SIZE:
forM_ events $ \(event, sourceConfig) -> do
t <- processEvent event sourceConfig
forM_ events $ \(event, sourceConfig, eventFetchedTime) -> do
t <- processEvent event sourceConfig eventFetchedTime
& withEventEngineCtx eeCtx
& flip runReaderT (logger, httpMgr)
& LA.async
Expand Down Expand Up @@ -262,9 +270,20 @@ processEventQueue logger logenv httpMgr getSchemaCache eeCtx@EventEngineCtx{..}
, Has HTTP.Manager r
, Has (L.Logger L.Hasura) r
, Tracing.HasReporter io
, MonadMask io
)
=> Event -> SourceConfig 'Postgres -> io ()
processEvent e sourceConfig = do
=> Event
-> SourceConfig 'Postgres
-> Time.UTCTime
-- ^ Time when the event was fetched from DB. Used to calculate Event Lock time
-> io ()
processEvent e sourceConfig eventFetchedTime= do
-- Track Lock Time of Event
-- Lock Time = Time when the event was fetched from DB - Time when the event is being processed
eventProcessTime <- liftIO getCurrentTime
let eventLockTime = realToFrac $ diffUTCTime eventProcessTime eventFetchedTime
_ <- liftIO $ EKG.Distribution.add (smEventLockTime serverMetrics) eventLockTime

cache <- liftIO getSchemaCache

tracingCtx <- liftIO (Tracing.extractEventContext (eEvent e))
Expand Down Expand Up @@ -297,7 +316,13 @@ processEventQueue logger logenv httpMgr getSchemaCache eeCtx@EventEngineCtx{..}
payload = encode $ toJSON ep
extraLogCtx = ExtraLogContext Nothing (epId ep) -- avoiding getting current time here to avoid another IO call with each event call
requestDetails = RequestDetails $ LBS.length payload
res <- runExceptT $ tryWebhook headers responseTimeout payload webhook

-- Track the number of active HTTP workers using EKG.
res <- bracket_
(liftIO $ EKG.Gauge.inc $ smNumEventHTTPWorkers serverMetrics)
(liftIO $ EKG.Gauge.dec $ smNumEventHTTPWorkers serverMetrics)
(runExceptT $ tryWebhook headers responseTimeout payload webhook)

logHTTPForET res extraLogCtx requestDetails
let decodedHeaders = map (decodeHeader logenv headerInfos) headers
either
Expand Down
15 changes: 13 additions & 2 deletions server/src-lib/Hasura/Server/Init/Config.hs
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ import qualified Hasura.GraphQL.Execute.LiveQuery.Options as LQ
import qualified Hasura.GraphQL.Execute.Plan as E
import qualified Hasura.Logging as L
import qualified System.Metrics as EKG
import qualified System.Metrics.Distribution as EKG.Distribution
import qualified System.Metrics.Gauge as EKG.Gauge


import Hasura.Prelude
import Hasura.RQL.Types
import Hasura.Server.Auth
Expand Down Expand Up @@ -338,12 +338,23 @@ type WithEnv a = ReaderT Env (ExceptT String Identity) a
runWithEnv :: Env -> WithEnv a -> Either String a
runWithEnv env m = runIdentity $ runExceptT $ runReaderT m env

-- | Collection of various server metrics
data ServerMetrics
= ServerMetrics
{ smWarpThreads :: !EKG.Gauge.Gauge
{ smWarpThreads :: !EKG.Gauge.Gauge
-- ^ Current Number of warp threads
, smNumEventsFetched :: !EKG.Distribution.Distribution
-- ^ Total Number of events fetched from last 'Event Trigger Fetch'
, smNumEventHTTPWorkers :: !EKG.Gauge.Gauge
-- ^ Current number of Event trigger's HTTP workers in process
, smEventLockTime :: !EKG.Distribution.Distribution
-- ^ Time between the 'Event Trigger Fetch' from DB and the processing of the event
}

createServerMetrics :: EKG.Store -> IO ServerMetrics
createServerMetrics store = do
smWarpThreads <- EKG.createGauge "warp_threads" store
smNumEventsFetched <- EKG.createDistribution "num_events_fetched" store
smNumEventHTTPWorkers <- EKG.createGauge "num_event_trigger_http_workers" store
smEventLockTime <- EKG.createDistribution "event_lock_time" store
pure ServerMetrics { .. }
6 changes: 4 additions & 2 deletions server/src-lib/Hasura/Tracing.hs
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,15 @@ import qualified System.Random as Rand
import qualified Web.HttpApiData as HTTP

import Control.Lens ((^?))
import Control.Monad.Catch (MonadCatch, MonadMask, MonadThrow)
import Control.Monad.Morph
import Control.Monad.Trans.Control
import Control.Monad.Unique
import Data.String (fromString)
import Network.URI (URI)



-- | Any additional human-readable key-value pairs relevant
-- to the execution of a block of code.
type TracingMetadata = [(Text, Text)]
Expand Down Expand Up @@ -89,7 +91,7 @@ data TraceContext = TraceContext
-- | The 'TraceT' monad transformer adds the ability to keep track of
-- the current trace context.
newtype TraceT m a = TraceT { unTraceT :: ReaderT (TraceContext, Reporter) (WriterT TracingMetadata m) a }
deriving (Functor, Applicative, Monad, MonadIO, MonadUnique)
deriving (Functor, Applicative, Monad, MonadIO, MonadUnique, MonadMask, MonadCatch, MonadThrow)

instance MonadTrans TraceT where
lift = TraceT . lift . lift
Expand Down Expand Up @@ -215,7 +217,7 @@ word64ToHex randNum = bsToTxt $ Hex.encode numInBytes
hexToWord64 :: Text -> Maybe Word64
hexToWord64 randText = do
case Hex.decode $ txtToBs randText of
Left _ -> Nothing
Left _ -> Nothing
Right decoded -> Just $ Bin.decode $ BL.fromStrict decoded

-- | Inject the trace context as a set of HTTP headers.
Expand Down

0 comments on commit f55df22

Please sign in to comment.