Skip to content

Commit

Permalink
Merge branch 'master' of github.com:hasura/graphql-engine into issue-…
Browse files Browse the repository at this point in the history
…4306
  • Loading branch information
Rishichandra Wawhal committed Apr 14, 2020
2 parents 2250014 + a23c633 commit 8d05d9d
Show file tree
Hide file tree
Showing 6 changed files with 116 additions and 45 deletions.
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

## Next release


### console: persist columns state in data browser

The order and collapsed state of columns is now persisted across page navigation
Expand All @@ -18,13 +17,16 @@ The order and collapsed state of columns is now persisted across page navigation
- cli: clean up migration files created during a failed migrate api (close #4312) (#4319)
- cli: add support for multiple versions of plugin (close #4105)
- cli: template assets path in console HTML for unversioned builds
- console: recover from SDL parse in actions type definition editor (fix #4385) (#4389)
- console: allow customising graphql field names for columns of views (close #3689) (#4255)
- console: fix clone permission migrations (close #3985) (#4277)
- console: decouple data rows and count fetch in data browser to account for really large tables (close #3793) (#4269)
- console: update cookie policy for API calls to "same-origin"
- console: redirect to /:table/browse from /:table (close #4330) (#4374)
- docs: add One-Click Render deployment guide (close #3683) (#4209)
- server: reserved keywords in column references break parser (fix #3597) #3927
- server: fix postgres specific error message that exposed database type on invalid query parameters (#4294)
- server: manage inflight events when HGE instance is gracefully shutdown (close #3548)
- server: fix an edge case where some events wouldn't be processed because of internal erorrs (#4213)
- server: fix downgrade not working to version v1.1.1 (#4354)
- server: `type` field is not required if `jwk_url` is provided in JWT config
Expand Down
16 changes: 1 addition & 15 deletions cli/CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,21 +37,7 @@ follow the instructions below to make sure the import paths are correct:

## Development workflow

We suggest using [realize](https://github.com/oxequa/realize) for faster dev
workflow. The `.realize.yaml` config is already included in the repo.

- Install realize
```bash
go get github.com/oxequa/realize
```
- Start realize
```bash
realize start
```

`realize` watches the directory for changes and rebuilds the cli whenever a new
change happens. The cli is installed to `$GOPATH/bin/hasura`, which should
already be in your `PATH`. The config is located at `.realize/realize.yaml`.
For faster development workflow, you may use tools that watch the directory for changes and rebuilds the cli whenever a new change happens. [realize](https://github.com/oxequa/realize) and [watchrun](https://github.com/loov/watchrun) are two such examples. The configuration file for `realize` is already included in the repo at `.realize/realize.yaml`.

## Tests

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ const ActionDefinitionEditor = ({

const parseDebounceTimer = setTimeout(() => {
if (v === '') {
return;
return onChange(v, null, null, null);
}
let _e = null;
let ast = null;
Expand Down
7 changes: 5 additions & 2 deletions console/src/components/Services/Data/DataRouter.js
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,12 @@ const makeDataRouter = (
<Route path="permissions" component={PermissionCustomFunction} />
</Route>
<Route
path=":schema/tables/:table/browse"
path=":schema/tables/:table"
component={viewTableConnector(connect)}
/>
>
<IndexRedirect to="browse" />
<Route path="browse" component={viewTableConnector(connect)} />
</Route>
<Route
path=":schema/tables/:table/edit"
component={editItemConnector(connect)}
Expand Down
59 changes: 46 additions & 13 deletions server/src-lib/Hasura/App.hs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ module Hasura.App where
import Control.Monad.Base
import Control.Monad.Stateless
import Control.Monad.STM (atomically)
import Control.Concurrent.STM.TVar (readTVarIO)
import Control.Monad.Trans.Control (MonadBaseControl (..))
import Data.Aeson ((.=))
import Data.Time.Clock (UTCTime)
Expand All @@ -28,6 +29,7 @@ import qualified Network.HTTP.Client.TLS as HTTP
import qualified Network.Wai.Handler.Warp as Warp
import qualified System.Posix.Signals as Signals
import qualified Text.Mustache.Compile as M
import qualified Data.Set as Set

import Hasura.Db
import Hasura.EncJSON
Expand Down Expand Up @@ -236,12 +238,6 @@ runHGEServer ServeOptions{..} InitCtx{..} initTime = do
startSchemaSyncThreads sqlGenCtx _icPgPool logger _icHttpManager
cacheRef _icInstanceId cacheInitTime

let warpSettings = Warp.setPort soPort
. Warp.setHost soHost
. Warp.setGracefulShutdownTimeout (Just 30) -- 30s graceful shutdown
. Warp.setInstallShutdownHandler (shutdownHandler logger shutdownApp)
$ Warp.defaultSettings

maxEvThrds <- liftIO $ getFromEnv defaultMaxEventThreads "HASURA_GRAPHQL_EVENTS_HTTP_POOL_SIZE"
fetchI <- fmap milliseconds $ liftIO $
getFromEnv defaultFetchIntervalMilliSec "HASURA_GRAPHQL_EVENTS_FETCH_INTERVAL"
Expand Down Expand Up @@ -275,14 +271,48 @@ runHGEServer ServeOptions{..} InitCtx{..} initTime = do
let apiInitTime = realToFrac $ Clock.diffUTCTime finishTime initTime
unLogger logger $
mkGenericLog LevelInfo "server" $ StartupTimeInfo "starting API server" apiInitTime
let warpSettings = Warp.setPort soPort
. Warp.setHost soHost
. Warp.setGracefulShutdownTimeout (Just 30) -- 30s graceful shutdown
. Warp.setInstallShutdownHandler (shutdownHandler logger shutdownApp eventEngineCtx _icPgPool)
$ Warp.defaultSettings
liftIO $ Warp.runSettings warpSettings app

where
-- | prepareEvents is a function to unlock all the events that are
-- locked and unprocessed, which is called while hasura is started.
-- Locked and unprocessed events can occur in 2 ways
-- 1.
-- Hasura's shutdown was not graceful in which all the fetched
-- events will remain locked and unprocessed(TODO: clean shutdown)
-- state.
-- 2.
-- There is another hasura instance which is processing events and
-- it will lock events to process them.
-- So, unlocking all the locked events might re-deliver an event(due to #2).
prepareEvents pool (Logger logger) = do
liftIO $ logger $ mkGenericStrLog LevelInfo "event_triggers" "preparing data"
res <- runUnlockTx pool unlockAllEvents
res <- liftIO $ runTx pool (Q.ReadCommitted, Nothing) unlockAllEvents
either printErrJExit return res

-- | shutdownEvents will be triggered when a graceful shutdown has been inititiated, it will
-- get the locked events from the event engine context and then it will unlock all those events.
-- It may happen that an event may be processed more than one time, an event that has been already
-- processed but not been marked as delivered in the db will be unlocked by `shutdownEvents`
-- and will be processed when the events are proccessed next time.
shutdownEvents :: Q.PGPool -> Logger Hasura -> EventEngineCtx -> IO ()
shutdownEvents pool (Logger logger) EventEngineCtx {..} = do
liftIO $ logger $ mkGenericStrLog LevelInfo "event_triggers" "unlocking events that are locked by the HGE"
lockedEvents <- readTVarIO _eeCtxLockedEvents
liftIO $ do
when (not $ Set.null $ lockedEvents) $ do
res <- runTx pool (Q.ReadCommitted, Nothing) (unlockEvents $ toList lockedEvents)
case res of
Left err -> logger $ mkGenericStrLog
LevelWarn "event_triggers" ("Error in unlocking the events " ++ (show err))
Right count -> logger $ mkGenericStrLog
LevelInfo "event_triggers" ((show count) ++ " events were updated")

getFromEnv :: (Read a) => a -> String -> IO a
getFromEnv defaults env = do
mEnv <- lookupEnv env
Expand All @@ -292,21 +322,24 @@ runHGEServer ServeOptions{..} InitCtx{..} initTime = do
eRes = maybe (Left $ "Wrong expected type for environment variable: " <> env) Right mRes
either printErrExit return eRes

runUnlockTx pool tx =
liftIO $ runExceptT $ Q.runTx pool (Q.ReadCommitted, Nothing) tx
runTx :: Q.PGPool -> Q.TxMode -> Q.TxE QErr a -> IO (Either QErr a)
runTx pool txLevel tx =
liftIO $ runExceptT $ Q.runTx pool txLevel tx

-- | Catches the SIGTERM signal and initiates a graceful shutdown. Graceful shutdown for regular HTTP
-- requests is already implemented in Warp, and is triggered by invoking the 'closeSocket' callback.
-- We only catch the SIGTERM signal once, that is, if the user hits CTRL-C once again, we terminate
-- the process immediately.
shutdownHandler :: Logger Hasura -> IO () -> IO () -> IO ()
shutdownHandler (Logger logger) shutdownApp closeSocket =
-- We only catch the SIGTERM signal once, that is, if we catch another SIGTERM signal then the process
-- is terminated immediately.
-- If the user hits CTRL-C (SIGINT), then the process is terminated immediately
shutdownHandler :: Logger Hasura -> IO () -> EventEngineCtx -> Q.PGPool -> IO () -> IO ()
shutdownHandler (Logger logger) shutdownApp eeCtx pool closeSocket =
void $ Signals.installHandler
Signals.sigTERM
(Signals.CatchOnce shutdownSequence)
Nothing
where
shutdownSequence = do
shutdownEvents pool (Logger logger) eeCtx
closeSocket
shutdownApp
logShutdown
Expand Down
73 changes: 60 additions & 13 deletions server/src-lib/Hasura/Events/Lib.hs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ module Hasura.Events.Lib
, defaultMaxEventThreads
, defaultFetchIntervalMilliSec
, Event(..)
, unlockEvents
, EventEngineCtx(..)
) where

import Control.Concurrent.Extended (sleep)
Expand All @@ -30,6 +32,10 @@ import Hasura.RQL.Types
import Hasura.Server.Version (HasVersion)
import Hasura.SQL.Types

-- remove these when array encoding is merged
import qualified Database.PG.Query.PTI as PTI
import qualified PostgreSQL.Binary.Encoding as PE

import qualified Data.ByteString as BS
import qualified Data.CaseInsensitive as CI
import qualified Data.HashMap.Strict as M
Expand All @@ -43,6 +49,7 @@ import qualified Database.PG.Query as Q
import qualified Hasura.Logging as L
import qualified Network.HTTP.Client as HTTP
import qualified Network.HTTP.Types as HTTP
import qualified Data.Set as Set

type Version = T.Text

Expand Down Expand Up @@ -74,7 +81,7 @@ $(deriveJSON (aesonDrop 2 snakeCase){omitNothingFields=True} ''DeliveryInfo)

-- | Change data for a particular row
--
-- https://docs.hasura.io/1.0/graphql/manual/event-triggers/payload.html
-- https://docs.hasura.io/1.0/graphql/manual/event-triggers/payload.html
data Event
= Event
{ eId :: EventId
Expand Down Expand Up @@ -155,6 +162,7 @@ data EventEngineCtx
= EventEngineCtx
{ _eeCtxEventThreadsCapacity :: TVar Int
, _eeCtxFetchInterval :: DiffTime
, _eeCtxLockedEvents :: TVar (Set.Set EventId)
}

defaultMaxEventThreads :: Int
Expand All @@ -169,20 +177,21 @@ retryAfterHeader = "Retry-After"
initEventEngineCtx :: Int -> DiffTime -> STM EventEngineCtx
initEventEngineCtx maxT _eeCtxFetchInterval = do
_eeCtxEventThreadsCapacity <- newTVar maxT
_eeCtxLockedEvents <- newTVar Set.empty
return $ EventEngineCtx{..}

-- | Service events from our in-DB queue.
--
-- There are a few competing concerns and constraints here; we want to...
-- - fetch events in batches for lower DB pressure
-- - don't fetch more than N at a time (since that can mean: space leak, less
-- effective scale out, possible double sends for events we've checked out
-- effective scale out, possible double sends for events we've checked out
-- on exit (TODO clean shutdown procedure))
-- - try not to cause webhook workers to stall waiting on DB fetch
-- - limit webhook HTTP concurrency per HASURA_GRAPHQL_EVENTS_HTTP_POOL_SIZE
processEventQueue
:: (HasVersion) => L.Logger L.Hasura -> LogEnvHeaders -> HTTP.Manager-> Q.PGPool
-> IO SchemaCache -> EventEngineCtx
-> IO SchemaCache -> EventEngineCtx
-> IO void
processEventQueue logger logenv httpMgr pool getSchemaCache EventEngineCtx{..} = do
events0 <- popEventsBatch
Expand All @@ -192,12 +201,25 @@ processEventQueue logger logenv httpMgr pool getSchemaCache EventEngineCtx{..} =
popEventsBatch = do
let run = runExceptT . Q.runTx pool (Q.RepeatableRead, Just Q.ReadWrite)
run (fetchEvents fetchBatchSize) >>= \case
Left err -> do
Left err -> do
L.unLogger logger $ EventInternalErr err
return []
Right events ->
Right events -> do
saveLockedEvents events
return events

-- After the events are fetched from the DB, we store the locked events
-- in a hash set(order doesn't matter and look ups are faster) in the
-- event engine context
saveLockedEvents :: [Event] -> IO ()
saveLockedEvents evts =
liftIO $ atomically $ do
lockedEvents <- readTVar _eeCtxLockedEvents
let evtsIds = map eId evts
let newLockedEvents = Set.union lockedEvents (Set.fromList evtsIds)
writeTVar _eeCtxLockedEvents newLockedEvents


-- work on this batch of events while prefetching the next. Recurse after we've forked workers
-- for each in the batch, minding the requested pool size.
go :: [Event] -> Int -> Bool -> IO void
Expand All @@ -210,25 +232,30 @@ processEventQueue logger logenv httpMgr pool getSchemaCache EventEngineCtx{..} =
-- worth the effort for something more fine-tuned
eventsNext <- withAsync popEventsBatch $ \eventsNextA -> do
-- process approximately in order, minding HASURA_GRAPHQL_EVENTS_HTTP_POOL_SIZE:
forM_ events $ \event ->
forM_ events $ \event ->
mask_ $ do
atomically $ do -- block until < HASURA_GRAPHQL_EVENTS_HTTP_POOL_SIZE threads:
capacity <- readTVar _eeCtxEventThreadsCapacity
check $ capacity > 0
writeTVar _eeCtxEventThreadsCapacity $! (capacity - 1)
writeTVar _eeCtxEventThreadsCapacity $! (capacity - 1)
-- since there is some capacity in our worker threads, we can launch another:
let restoreCapacity = liftIO $ atomically $
modifyTVar' _eeCtxEventThreadsCapacity (+ 1)
t <- async $ flip runReaderT (logger, httpMgr) $
processEvent event `finally` restoreCapacity
let restoreCapacity evt =
liftIO $ atomically $
do
modifyTVar' _eeCtxEventThreadsCapacity (+ 1)
-- After the event has been processed, remove it from the
-- locked events cache
modifyTVar' _eeCtxLockedEvents (Set.delete (eId evt))
t <- async $ flip runReaderT (logger, httpMgr) $
processEvent event `finally` (restoreCapacity event)
link t

-- return when next batch ready; some 'processEvent' threads may be running.
wait eventsNextA

let lenEvents = length events
if | lenEvents == fetchBatchSize -> do
-- If we've seen N fetches in a row from the DB come back full (i.e. only limited
-- If we've seen N fetches in a row from the DB come back full (i.e. only limited
-- by our LIMIT clause), then we say we're clearly falling behind:
let clearlyBehind = fullFetchCount >= 3
unless alreadyWarned $
Expand Down Expand Up @@ -549,7 +576,27 @@ unlockAllEvents =
UPDATE hdb_catalog.event_log
SET locked = 'f'
WHERE locked = 't'
|] () False
|] () True

toInt64 :: (Integral a) => a -> Int64
toInt64 = fromIntegral

-- EventIdArray is only used for PG array encoding
newtype EventIdArray = EventIdArray { unEventIdArray :: [EventId]} deriving (Show, Eq)

instance Q.ToPrepArg EventIdArray where
toPrepVal (EventIdArray l) = Q.toPrepValHelper PTI.unknown encoder $ l
where
-- 25 is the OID value of TEXT, https://jdbc.postgresql.org/development/privateapi/constant-values.html
encoder = PE.array 25 . PE.dimensionArray foldl' (PE.encodingArray . PE.text_strict)

unlockEvents :: [EventId] -> Q.TxE QErr Int
unlockEvents eventIds =
(runIdentity . Q.getRow) <$> Q.withQE defaultTxErrorHandler
[Q.sql|
WITH "cte" AS
(UPDATE hdb_catalog.event_log
SET locked = 'f'
WHERE id = ANY($1::text[]) RETURNING *)
SELECT count(*) FROM "cte"
|] (Identity $ EventIdArray eventIds) True

0 comments on commit 8d05d9d

Please sign in to comment.