Skip to content

Commit

Permalink
server(events): utilize proper backpressure scheme (close #3839) (#4013)
Browse files Browse the repository at this point in the history
* Test working through a backlog of change events

* Use a slightly more performant threaded http server in eventing pytests

This helped locally but not on CI it seems...

* Rework event processing for backpressure. Closes #3839

With loo low `HASURA_GRAPHQL_EVENTS_FETCH_INTERVAL` and/or slow webhooks
and/or too small `HASURA_GRAPHQL_EVENTS_HTTP_POOL_SIZE` we might
previously check out events from the DB faster than we can service them,
leading to space leaks, weirdness, etc.

Other changes:
- avoid fetch interval sleep latency when we previously did a non-empty
  fetch
- prefetch event batch while http pool is working
- warn when it appears we can't keep up with events being generated
- make some effort to process events in creation order so we don't
  starve older ones.

ALSO NOTE: HASURA_GRAPHQL_EVENTS_FETCH_INTERVAL changes semantics
slightly, since it only comes into play after an empty fetch. The old
semantics weren't documented in detail, so I think this is fine.
  • Loading branch information
jberryman authored Mar 11, 2020
1 parent db724f7 commit c425b55
Show file tree
Hide file tree
Showing 13 changed files with 175 additions and 86 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,3 +64,5 @@
- fix casting citext column type (fix #2818) (#3861)
- Add downgrade command (close #1156) (#3760)
- persist mix files only when coverage is enabled (#3844)
- `HASURA_GRAPHQL_EVENTS_FETCH_INTERVAL` changes semantics slightly: we only sleep for the interval
when there were previously no events to process. Potential space leak fixed. (#3839)
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,8 @@ For the ``serve`` sub-command these are the available flags and ENV variables:

* - N/A
- ``HASURA_GRAPHQL_EVENTS_FETCH_INTERVAL``
- Postgres events polling interval
- Interval in milliseconds to sleep before trying to fetch events again after a fetch
returned no events from postgres

* - ``-s, --stripes <NO_OF_STRIPES>``
- ``HASURA_GRAPHQL_PG_STRIPES``
Expand Down
5 changes: 5 additions & 0 deletions docs/graphql/manual/event-triggers/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@ Events can be of the following types:
- DELETE: When a row is deleted from a table
- MANUAL: Using the console or API, an event can be triggered manually on a row

.. note::

Event webhook notifications will be delivered at least once, and may arrive out of order with
respect to the underlying event.

**See:**

.. toctree::
Expand Down
4 changes: 2 additions & 2 deletions server/src-lib/Hasura/App.hs
Original file line number Diff line number Diff line change
Expand Up @@ -252,8 +252,8 @@ runHGEServer ServeOptions{..} InitCtx{..} initTime = do
prepareEvents _icPgPool logger
eventEngineCtx <- liftIO $ atomically $ initEventEngineCtx maxEvThrds fetchI
unLogger logger $ mkGenericStrLog LevelInfo "event_triggers" "starting workers"
(_pushEventsThread, _consumeEventsThread) <- liftIO $
forkEventQueueProcessors logger logEnvHeaders
_eventQueueThread <- C.forkImmortal "processEventQueue" logger $ liftIO $
processEventQueue logger logEnvHeaders
_icHttpManager _icPgPool (getSCFromRef cacheRef) eventEngineCtx

-- start a backgroud thread to handle async actions
Expand Down
189 changes: 110 additions & 79 deletions server/src-lib/Hasura/Events/Lib.hs
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
{-# LANGUAGE StrictData #-} -- TODO project-wide, maybe. See #3941
{-# LANGUAGE RecordWildCards #-}
module Hasura.Events.Lib
( initEventEngineCtx
, forkEventQueueProcessors
, processEventQueue
, unlockAllEvents
, defaultMaxEventThreads
, defaultFetchIntervalMilliSec
, Event(..)
) where

import Control.Concurrent.Extended (sleep, forkImmortal)
import Control.Concurrent.Async (async, link)
import Control.Concurrent.Extended (sleep)
import Control.Concurrent.Async (wait, withAsync, async, link)
import Control.Concurrent.STM.TVar
import Control.Exception.Lifted (mask_, try, bracket_)
import Control.Monad.Trans.Control (MonadBaseControl)
import Control.Exception.Lifted (finally, mask_, try)
import Control.Monad.STM
import Data.Aeson
import Data.Aeson.Casing
Expand All @@ -20,6 +21,7 @@ import Data.Has
import Data.Int (Int64)
import Data.String
import Data.Time.Clock
import Data.Word
import Hasura.Events.HTTP
import Hasura.HTTP
import Hasura.Prelude
Expand All @@ -28,8 +30,6 @@ import Hasura.RQL.Types
import Hasura.Server.Version (HasVersion)
import Hasura.SQL.Types

import qualified Control.Concurrent.STM.TQueue as TQ
import qualified Control.Immortal as Immortal
import qualified Data.ByteString as BS
import qualified Data.CaseInsensitive as CI
import qualified Data.HashMap.Strict as M
Expand Down Expand Up @@ -72,6 +72,9 @@ data DeliveryInfo

$(deriveJSON (aesonDrop 2 snakeCase){omitNothingFields=True} ''DeliveryInfo)

-- | Change data for a particular row
--
-- https://docs.hasura.io/1.0/graphql/manual/event-triggers/payload.html
data Event
= Event
{ eId :: EventId
Expand All @@ -94,6 +97,7 @@ instance ToJSON QualifiedTableStrict where
, "name" .= tn
]

-- | See 'Event'.
data EventPayload
= EventPayload
{ epId :: EventId
Expand Down Expand Up @@ -149,9 +153,7 @@ data Invocation

data EventEngineCtx
= EventEngineCtx
{ _eeCtxEventQueue :: TQ.TQueue Event
, _eeCtxEventThreads :: TVar Int
, _eeCtxMaxEventThreads :: Int
{ _eeCtxEventThreadsCapacity :: TVar Int
, _eeCtxFetchInterval :: DiffTime
}

Expand All @@ -165,50 +167,91 @@ retryAfterHeader :: CI.CI T.Text
retryAfterHeader = "Retry-After"

initEventEngineCtx :: Int -> DiffTime -> STM EventEngineCtx
initEventEngineCtx maxT fetchI = do
q <- TQ.newTQueue
c <- newTVar 0
return $ EventEngineCtx q c maxT fetchI

forkEventQueueProcessors
initEventEngineCtx maxT _eeCtxFetchInterval = do
_eeCtxEventThreadsCapacity <- newTVar maxT
return $ EventEngineCtx{..}

-- | Service events from our in-DB queue.
--
-- There are a few competing concerns and constraints here; we want to...
-- - fetch events in batches for lower DB pressure
-- - don't fetch more than N at a time (since that can mean: space leak, less
-- effective scale out, possible double sends for events we've checked out
-- on exit (TODO clean shutdown procedure))
-- - try not to cause webhook workers to stall waiting on DB fetch
-- - limit webhook HTTP concurrency per HASURA_GRAPHQL_EVENTS_HTTP_POOL_SIZE
processEventQueue
:: (HasVersion) => L.Logger L.Hasura -> LogEnvHeaders -> HTTP.Manager-> Q.PGPool
-> IO SchemaCache -> EventEngineCtx
-> IO (Immortal.Thread, Immortal.Thread)
-- ^ returns: (pushEvents handle, consumeEvents handle)
forkEventQueueProcessors logger logenv httpMgr pool getSchemaCache eectx = do
(,) <$> forkImmortal "pushEvents" logger pushEvents
<*> forkImmortal "consumeEvents" logger consumeEvents
-> IO void
processEventQueue logger logenv httpMgr pool getSchemaCache EventEngineCtx{..} = do
events0 <- popEventsBatch
go events0 0 False
where
-- FIXME proper backpressure. See: #3839
pushEvents = forever $ do
let EventEngineCtx q _ _ fetchI = eectx
eventsOrError <- runExceptT $ Q.runTx pool (Q.RepeatableRead, Just Q.ReadWrite) fetchEvents
case eventsOrError of
Left err -> L.unLogger logger $ EventInternalErr err
Right events -> atomically $ mapM_ (TQ.writeTQueue q) events
sleep fetchI

-- TODO this has all events race. How do we know this is correct? Document.
consumeEvents = forever $
-- ensure async exceptions from link only raised between iterations of forever block:
mask_ $ do
event <- atomically $ do
let EventEngineCtx q _ _ _ = eectx
TQ.readTQueue q
-- FIXME proper backpressure. See: #3839
t <- async $ runReaderT (processEvent event) (logger, httpMgr, eectx)
-- Make sure any stray exceptions are at least logged via 'forkImmortal':
link t

-- NOTE: Blocks in tryWebhook if >= _eeCtxMaxEventThreads invocations active.
fetchBatchSize = 100
popEventsBatch = do
let run = runExceptT . Q.runTx pool (Q.RepeatableRead, Just Q.ReadWrite)
run (fetchEvents fetchBatchSize) >>= \case
Left err -> do
L.unLogger logger $ EventInternalErr err
return []
Right events ->
return events

-- work on this batch of events while prefetching the next. Recurse after we've forked workers
-- for each in the batch, minding the requested pool size.
go :: [Event] -> Int -> Bool -> IO void
go events !fullFetchCount !alreadyWarned = do
-- process events ASAP until we've caught up; only then can we sleep
when (null events) $ sleep _eeCtxFetchInterval

-- Prefetch next events payload while concurrently working through our current batch.
-- NOTE: we probably don't need to prefetch so early, but probably not
-- worth the effort for something more fine-tuned
eventsNext <- withAsync popEventsBatch $ \eventsNextA -> do
-- process approximately in order, minding HASURA_GRAPHQL_EVENTS_HTTP_POOL_SIZE:
forM_ events $ \event ->
mask_ $ do
atomically $ do -- block until < HASURA_GRAPHQL_EVENTS_HTTP_POOL_SIZE threads:
capacity <- readTVar _eeCtxEventThreadsCapacity
check $ capacity > 0
writeTVar _eeCtxEventThreadsCapacity (capacity - 1)
-- since there is some capacity in our worker threads, we can launch another:
let restoreCapacity = liftIO $ atomically $
modifyTVar' _eeCtxEventThreadsCapacity (+ 1)
t <- async $ flip runReaderT (logger, httpMgr) $
processEvent event `finally` restoreCapacity
link t

-- return when next batch ready; some 'processEvent' threads may be running.
wait eventsNextA

let lenEvents = length events
if | lenEvents == fetchBatchSize -> do
-- If we've seen N fetches in a row from the DB come back full (i.e. only limited
-- by our LIMIT clause), then we say we're clearly falling behind:
let clearlyBehind = fullFetchCount >= 3
unless alreadyWarned $
when clearlyBehind $
L.unLogger logger $ L.UnstructuredLog L.LevelWarn $ fromString $
"Events processor may not be keeping up with events generated in postgres, " <>
"or we're working on a backlog of events. Consider increasing " <>
"HASURA_GRAPHQL_EVENTS_HTTP_POOL_SIZE"
go eventsNext (fullFetchCount+1) (alreadyWarned || clearlyBehind)

| otherwise -> do
when (lenEvents /= fetchBatchSize && alreadyWarned) $
-- emit as warning in case users are only logging warning severity and saw above
L.unLogger logger $ L.UnstructuredLog L.LevelWarn $ fromString $
"It looks like the events processor is keeping up again."
go eventsNext 0 False

processEvent
:: ( HasVersion
, MonadReader r m
, Has HTTP.Manager r
, Has (L.Logger L.Hasura) r
, Has EventEngineCtx r
, MonadIO m
, MonadBaseControl IO m
)
=> Event -> m ()
processEvent e = do
Expand All @@ -228,10 +271,10 @@ forkEventQueueProcessors logger logenv httpMgr pool getSchemaCache eectx = do
ep = createEventPayload retryConf e
res <- runExceptT $ tryWebhook headers responseTimeout ep webhook
let decodedHeaders = map (decodeHeader logenv headerInfos) headers
finally <- either
either
(processError pool e retryConf decodedHeaders ep)
(processSuccess pool e decodedHeaders ep) res
either logQErr return finally
>>= flip onLeft logQErr

createEventPayload :: RetryConf -> Event -> EventPayload
createEventPayload retryConf e = EventPayload
Expand Down Expand Up @@ -389,21 +432,17 @@ logHTTPErr err = do
logger :: L.Logger L.Hasura <- asks getter
L.unLogger logger $ err

-- NOTE: Blocks if >= _eeCtxMaxEventThreads invocations active, though we
-- expect this to be bounded by responseTimeout.
-- These run concurrently on their respective EventPayloads
tryWebhook
:: ( Has (L.Logger L.Hasura) r
, Has HTTP.Manager r
, Has EventEngineCtx r
, MonadReader r m
, MonadBaseControl IO m
, MonadIO m
, MonadError HTTPErr m
)
=> [HTTP.Header] -> HTTP.ResponseTimeout -> EventPayload -> String
-> m HTTPResp
tryWebhook headers responseTimeout ep webhook = do
logger :: L.Logger L.Hasura <- asks getter
let context = ExtraContext (epCreatedAt ep) (epId ep)
initReqE <- liftIO $ try $ HTTP.parseRequest webhook
case initReqE of
Expand All @@ -415,36 +454,26 @@ tryWebhook headers responseTimeout ep webhook = do
, HTTP.requestBody = HTTP.RequestBodyLBS (encode ep)
, HTTP.responseTimeout = responseTimeout
}
EventEngineCtx _ c maxT _ <- asks getter
-- wait for counter and then increment beforing making http request
let haveCapacity = do
countThreads <- readTVar c
pure $ countThreads < maxT
waitForCapacity = do
haveCapacity >>= check
modifyTVar' c (+1)
release = modifyTVar' c (subtract 1)

-- we could also log after we block, but that's actually even more awkward:
likelyHaveCapacity <- liftIO $ atomically haveCapacity
unless likelyHaveCapacity $ do
L.unLogger logger $ L.UnstructuredLog L.LevelWarn $
fromString $ "In event queue webhook: exceeded HASURA_GRAPHQL_EVENTS_HTTP_POOL_SIZE " <>
"and likely about to block for: "<> show context

-- ensure we don't leak capacity and become totally broken in the
-- presence of unexpected exceptions:
bracket_ (liftIO $ atomically waitForCapacity) (liftIO $ atomically release) $ do
eitherResp <- runHTTP req (Just context)
onLeft eitherResp throwError

eitherResp <- runHTTP req (Just context)
onLeft eitherResp throwError

getEventTriggerInfoFromEvent :: SchemaCache -> Event -> Maybe EventTriggerInfo
getEventTriggerInfoFromEvent sc e = let table = eTable e
tableInfo = M.lookup table $ scTables sc
in M.lookup ( tmName $ eTrigger e) =<< (_tiEventTriggerInfoMap <$> tableInfo)

fetchEvents :: Q.TxE QErr [Event]
fetchEvents =
---- DATABASE QUERIES ---------------------
--
-- The API for our in-database work queue:
-------------------------------------------

-- | Lock and return events not yet being processed or completed, up to some
-- limit. Process events approximately in created_at order, but we make no
-- ordering guarentees; events can and will race. Nevertheless we want to
-- ensure newer change events don't starve older ones.
fetchEvents :: Int -> Q.TxE QErr [Event]
fetchEvents limitI =
map uncurryEvent <$> Q.listQE defaultTxErrorHandler [Q.sql|
UPDATE hdb_catalog.event_log
SET locked = 't'
Expand All @@ -453,10 +482,11 @@ fetchEvents =
WHERE l.delivered = 'f' and l.error = 'f' and l.locked = 'f'
and (l.next_retry_at is NULL or l.next_retry_at <= now())
and l.archived = 'f'
FOR UPDATE SKIP LOCKED
LIMIT 100 )
ORDER BY created_at
LIMIT $1
FOR UPDATE SKIP LOCKED )
RETURNING id, schema_name, table_name, trigger_name, payload::json, tries, created_at
|] () True
|] (Identity limit) True
where uncurryEvent (id', sn, tn, trn, Q.AltJ payload, tries, created) =
Event
{ eId = id'
Expand All @@ -466,6 +496,7 @@ fetchEvents =
, eTries = tries
, eCreatedAt = created
}
limit = fromIntegral limitI :: Word64

insertInvocation :: Invocation -> Q.TxE QErr ()
insertInvocation invo = do
Expand Down
2 changes: 2 additions & 0 deletions server/src-lib/Hasura/RQL/Types/EventTrigger.hs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import qualified Data.Text as T
import qualified Database.PG.Query as Q
import qualified Text.Regex.TDFA as TDFA

-- | Unique name for event trigger.
newtype TriggerName = TriggerName { unTriggerName :: NonEmptyText }
deriving (Show, Eq, Hashable, Lift, DQuote, FromJSON, ToJSON, ToJSONKey, Q.FromCol, Q.ToPrepArg, Generic, Arbitrary, NFData, Cacheable)

Expand Down Expand Up @@ -172,6 +173,7 @@ instance FromJSON CreateEventTriggerQuery where

$(deriveToJSON (aesonDrop 4 snakeCase){omitNothingFields=True} ''CreateEventTriggerQuery)

-- | The table operations on which the event trigger will be invoked.
data TriggerOpsDef
= TriggerOpsDef
{ tdInsert :: !(Maybe SubscribeOpSpec)
Expand Down
6 changes: 6 additions & 0 deletions server/src-lib/Hasura/RQL/Types/Table.hs
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,13 @@ data EventTriggerInfo
, etiOpsDef :: !TriggerOpsDef
, etiRetryConf :: !RetryConf
, etiWebhookInfo :: !WebhookConfInfo
-- ^ The HTTP(s) URL which will be called with the event payload on configured operation.
-- Must be a POST handler. This URL can be entered manually or can be picked up from an
-- environment variable (the environment variable needs to be set before using it for
-- this configuration).
, etiHeaders :: ![EventHeaderInfo]
-- ^ Custom headers can be added to an event trigger. Each webhook request will have these
-- headers added.
} deriving (Show, Eq, Generic)
instance NFData EventTriggerInfo
$(deriveToJSON (aesonDrop 3 snakeCase) ''EventTriggerInfo)
Expand Down
3 changes: 2 additions & 1 deletion server/src-lib/Hasura/Server/Init.hs
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,8 @@ serveCmdFooter =
, "Max event threads"
)
, ( "HASURA_GRAPHQL_EVENTS_FETCH_INTERVAL"
, "Postgres events polling interval in milliseconds"
, "Interval in milliseconds to sleep before trying to fetch events again after a "
<> "fetch returned no events from postgres."
)
]

Expand Down
2 changes: 1 addition & 1 deletion server/src-rsr/catalog_version.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
32
33
1 change: 1 addition & 0 deletions server/src-rsr/migrations/32_to_33.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
CREATE INDEX event_log_created_at_idx ON hdb_catalog.event_log (created_at);
1 change: 1 addition & 0 deletions server/src-rsr/migrations/33_to_32.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DROP INDEX IF EXISTS hdb_catalog."event_log_created_at_idx";
Loading

0 comments on commit c425b55

Please sign in to comment.