Skip to content

Commit

Permalink
server: generalize subscriptions
Browse files Browse the repository at this point in the history
GitOrigin-RevId: 464e80a
  • Loading branch information
Antoine Leblanc authored and hasura-bot committed Feb 20, 2021
1 parent da1a7a5 commit 377425f
Show file tree
Hide file tree
Showing 28 changed files with 919 additions and 818 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ have select permissions to the target table of the function.
- server: fix issue with mapping session variables to standard JWT claims (fix #6449)
- server: support tracking of functions that return a single row (fix #4299)
- server: reduce memory usage consumption of the schema cache structures, and fix a memory leak
- server: add source name in livequery logs
- console: allow user to cascade Postgres dependencies when dropping Postgres objects (close #5109) (#5248)
- console: mark inconsistent remote schemas in the UI (close #5093) (#5181)
- console: remove ONLY as default for ALTER TABLE in column alter operations (close #5512) #5706
Expand Down
36 changes: 19 additions & 17 deletions server/graphql-engine.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,7 @@ library
, Hasura.Metadata.Class

, Hasura.Backends.Postgres.Connection
, Hasura.Backends.Postgres.Execute.LiveQuery
, Hasura.Backends.Postgres.Execute.Mutation
, Hasura.Backends.Postgres.Execute.RemoteJoin
, Hasura.Backends.Postgres.Execute.Types
Expand Down Expand Up @@ -359,15 +360,6 @@ library
, Hasura.Server.Telemetry.Counters
, Hasura.Server.Auth.JWT
, Hasura.GC
, Hasura.GraphQL.Execute
, Hasura.GraphQL.Execute.LiveQuery
, Hasura.GraphQL.Transport.Backend
, Hasura.GraphQL.Transport.Postgres
, Hasura.GraphQL.Transport.HTTP
, Hasura.GraphQL.Transport.HTTP.Protocol
, Hasura.GraphQL.Transport.WebSocket
, Hasura.GraphQL.Transport.WebSocket.Protocol
, Hasura.GraphQL.Transport.WebSocket.Server
, Hasura.Incremental.Internal.Cache
, Hasura.Incremental.Internal.Dependency
, Hasura.Incremental.Internal.Rule
Expand Down Expand Up @@ -457,26 +449,28 @@ library
, Hasura.RQL.IR.Returning
, Hasura.RQL.IR.Select
, Hasura.RQL.IR.Update
, Hasura.GraphQL.Explain
, Hasura.GraphQL.Context
, Hasura.GraphQL.Execute
, Hasura.GraphQL.Execute.Action
, Hasura.GraphQL.Execute.Backend
, Hasura.GraphQL.Execute.Common
, Hasura.GraphQL.Execute.Inline
, Hasura.GraphQL.Execute.Insert
, Hasura.GraphQL.Execute.LiveQuery.Explain
, Hasura.GraphQL.Execute.LiveQuery.Options
, Hasura.GraphQL.Execute.LiveQuery.Plan
, Hasura.GraphQL.Execute.LiveQuery.Poll
, Hasura.GraphQL.Execute.LiveQuery.State
, Hasura.GraphQL.Execute.LiveQuery.TMap
, Hasura.GraphQL.Execute.Mutation
, Hasura.GraphQL.Execute.Plan
, Hasura.GraphQL.Execute.Postgres
, Hasura.GraphQL.Execute.Prepare
, Hasura.GraphQL.Execute.Remote
, Hasura.GraphQL.Execute.RemoteJoin
, Hasura.GraphQL.Execute.Resolve
, Hasura.GraphQL.Execute.Types
, Hasura.GraphQL.Execute.LiveQuery.Options
, Hasura.GraphQL.Execute.LiveQuery.Plan
, Hasura.GraphQL.Execute.LiveQuery.Poll
, Hasura.GraphQL.Execute.LiveQuery.State
, Hasura.GraphQL.Execute.LiveQuery.TMap
, Hasura.GraphQL.RemoteServer
, Hasura.GraphQL.Context
, Hasura.GraphQL.Explain
, Hasura.GraphQL.Parser
, Hasura.GraphQL.Parser.Class
, Hasura.GraphQL.Parser.Class.Parse
Expand All @@ -486,6 +480,7 @@ library
, Hasura.GraphQL.Parser.Internal.Types
, Hasura.GraphQL.Parser.Monad
, Hasura.GraphQL.Parser.Schema
, Hasura.GraphQL.RemoteServer
, Hasura.GraphQL.Schema
, Hasura.GraphQL.Schema.Action
, Hasura.GraphQL.Schema.Backend
Expand All @@ -499,6 +494,13 @@ library
, Hasura.GraphQL.Schema.Remote
, Hasura.GraphQL.Schema.Select
, Hasura.GraphQL.Schema.Table
, Hasura.GraphQL.Transport.Backend
, Hasura.GraphQL.Transport.HTTP
, Hasura.GraphQL.Transport.HTTP.Protocol
, Hasura.GraphQL.Transport.Postgres
, Hasura.GraphQL.Transport.WebSocket
, Hasura.GraphQL.Transport.WebSocket.Protocol
, Hasura.GraphQL.Transport.WebSocket.Server
, Hasura.Eventing.HTTP
, Hasura.Eventing.EventTrigger
, Hasura.Eventing.ScheduledTrigger
Expand Down
182 changes: 182 additions & 0 deletions server/src-lib/Hasura/Backends/Postgres/Execute/LiveQuery.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
module Hasura.Backends.Postgres.Execute.LiveQuery
( MultiplexedQuery (..)
, QueryParametersInfo (..)
, mkMultiplexedQuery
, resolveMultiplexedValue
, validateVariables
, executeMultiplexedQuery
, executeQuery
) where

import Hasura.Prelude

import qualified Data.ByteString as B
import qualified Data.HashMap.Strict as Map
import qualified Data.HashMap.Strict.InsOrd as OMap
import qualified Data.HashSet as Set
import qualified Database.PG.Query as Q
import qualified Language.GraphQL.Draft.Syntax as G

import Control.Lens
import Data.Semigroup.Generic
import Data.Text.Extended

import qualified Hasura.Backends.Postgres.SQL.DML as S
import qualified Hasura.Backends.Postgres.Translate.Select as DS
import qualified Hasura.GraphQL.Parser.Schema as PS

import Hasura.Backends.Postgres.Connection
import Hasura.Backends.Postgres.SQL.Error
import Hasura.Backends.Postgres.SQL.Types
import Hasura.Backends.Postgres.SQL.Value
import Hasura.Backends.Postgres.Translate.Column (toTxtValue)
import Hasura.GraphQL.Context
import Hasura.GraphQL.Execute.LiveQuery.Plan
import Hasura.GraphQL.Parser
import Hasura.RQL.Types
import Hasura.SQL.Types
import Hasura.Session


----------------------------------------------------------------------------------------------------
-- Variables

-- | Internal: Used to collect information about various parameters
-- of a subscription field's AST as we resolve them to SQL expressions.
data QueryParametersInfo
= QueryParametersInfo
{ _qpiReusableVariableValues :: !(HashMap G.Name (ColumnValue 'Postgres))
, _qpiSyntheticVariableValues :: !(Seq (ColumnValue 'Postgres))
, _qpiReferencedSessionVariables :: !(Set.HashSet SessionVariable)
-- ^ The session variables that are referenced in the query root fld's AST.
-- This information is used to determine a cohort's required session
-- variables
} deriving (Generic)
deriving (Semigroup, Monoid) via (GenericSemigroupMonoid QueryParametersInfo)

makeLenses ''QueryParametersInfo

-- | Checks if the provided arguments are valid values for their corresponding types.
-- | Generates SQL of the format "select 'v1'::t1, 'v2'::t2 ..."
validateVariables
:: (Traversable f, MonadError QErr m, MonadIO m)
=> PGExecCtx
-> f (ColumnValue 'Postgres)
-> m (ValidatedVariables f)
validateVariables pgExecCtx variableValues = do
let valSel = mkValidationSel $ toList variableValues
Q.Discard () <- runQueryTx_ $ liftTx $
Q.rawQE dataExnErrHandler (Q.fromBuilder $ toSQL valSel) [] False
pure . ValidatedVariables $ fmap (txtEncodedPGVal . cvValue) variableValues
where
mkExtr = flip S.Extractor Nothing . toTxtValue
mkValidationSel vars =
S.mkSelect { S.selExtr = map mkExtr vars }
runQueryTx_ tx = do
res <- liftIO $ runExceptT (runQueryTx pgExecCtx tx)
liftEither res

-- Explicitly look for the class of errors raised when the format of a value
-- provided for a type is incorrect.
dataExnErrHandler = mkTxErrorHandler (has _PGDataException)


----------------------------------------------------------------------------------------------------
-- Multiplexed queries

newtype MultiplexedQuery = MultiplexedQuery { unMultiplexedQuery :: Q.Query }
deriving (Eq, Hashable)

instance ToTxt MultiplexedQuery where
toTxt = Q.getQueryText . unMultiplexedQuery


toSQLFromItem :: S.Alias -> QueryDB 'Postgres S.SQLExp -> S.FromItem
toSQLFromItem = flip \case
QDBSingleRow s -> S.mkSelFromItem $ DS.mkSQLSelect JASSingleObject s
QDBMultipleRows s -> S.mkSelFromItem $ DS.mkSQLSelect JASMultipleRows s
QDBAggregation s -> S.mkSelFromItem $ DS.mkAggregateSelect s
QDBConnection s -> S.mkSelectWithFromItem $ DS.mkConnectionSelect s

mkMultiplexedQuery :: OMap.InsOrdHashMap G.Name (QueryDB 'Postgres S.SQLExp) -> MultiplexedQuery
mkMultiplexedQuery rootFields = MultiplexedQuery . Q.fromBuilder . toSQL $ S.mkSelect
{ S.selExtr =
-- SELECT _subs.result_id, _fld_resp.root AS result
[ S.Extractor (mkQualifiedIdentifier (Identifier "_subs") (Identifier "result_id")) Nothing
, S.Extractor (mkQualifiedIdentifier (Identifier "_fld_resp") (Identifier "root")) (Just . S.Alias $ Identifier "result") ]
, S.selFrom = Just $ S.FromExp [S.FIJoin $
S.JoinExpr subsInputFromItem S.LeftOuter responseLateralFromItem (S.JoinOn $ S.BELit True)]
}
where
-- FROM unnest($1::uuid[], $2::json[]) _subs (result_id, result_vars)
subsInputFromItem = S.FIUnnest
[S.SEPrep 1 `S.SETyAnn` S.TypeAnn "uuid[]", S.SEPrep 2 `S.SETyAnn` S.TypeAnn "json[]"]
(S.Alias $ Identifier "_subs")
[S.SEIdentifier $ Identifier "result_id", S.SEIdentifier $ Identifier "result_vars"]

-- LEFT OUTER JOIN LATERAL ( ... ) _fld_resp
responseLateralFromItem = S.mkLateralFromItem selectRootFields (S.Alias $ Identifier "_fld_resp")
selectRootFields = S.mkSelect
{ S.selExtr = [S.Extractor rootFieldsJsonAggregate (Just . S.Alias $ Identifier "root")]
, S.selFrom = Just . S.FromExp $
OMap.toList rootFields <&> \(fieldAlias, resolvedAST) ->
toSQLFromItem (S.Alias $ aliasToIdentifier fieldAlias) resolvedAST
}

-- json_build_object('field1', field1.root, 'field2', field2.root, ...)
rootFieldsJsonAggregate = S.SEFnApp "json_build_object" rootFieldsJsonPairs Nothing
rootFieldsJsonPairs = flip concatMap (OMap.keys rootFields) $ \fieldAlias ->
[ S.SELit (G.unName fieldAlias)
, mkQualifiedIdentifier (aliasToIdentifier fieldAlias) (Identifier "root") ]

mkQualifiedIdentifier prefix = S.SEQIdentifier . S.QIdentifier (S.QualifiedIdentifier prefix Nothing) -- TODO fix this Nothing of course
aliasToIdentifier = Identifier . G.unName

-- | Resolves an 'GR.UnresolvedVal' by converting 'GR.UVPG' values to SQL
-- expressions that refer to the @result_vars@ input object, collecting information
-- about various parameters of the query along the way.
resolveMultiplexedValue
:: (MonadState QueryParametersInfo m)
=> UnpreparedValue 'Postgres -> m S.SQLExp
resolveMultiplexedValue = \case
UVParameter varM colVal -> do
varJsonPath <- case fmap PS.getName varM of
Just varName -> do
modifying qpiReusableVariableValues $ Map.insert varName colVal
pure ["query", G.unName varName]
Nothing -> do
syntheticVarIndex <- use (qpiSyntheticVariableValues . to length)
modifying qpiSyntheticVariableValues (|> colVal)
pure ["synthetic", tshow syntheticVarIndex]
pure $ fromResVars (CollectableTypeScalar $ unsafePGColumnToBackend $ cvType colVal) varJsonPath
UVSessionVar ty sessVar -> do
modifying qpiReferencedSessionVariables (Set.insert sessVar)
pure $ fromResVars ty ["session", sessionVariableToText sessVar]
UVLiteral sqlExp -> pure sqlExp
UVSession -> pure $ fromResVars (CollectableTypeScalar PGJSON) ["session"]
where
fromResVars pgType jPath = addTypeAnnotation pgType $ S.SEOpApp (S.SQLOp "#>>")
[ S.SEQIdentifier $ S.QIdentifier (S.QualifiedIdentifier (Identifier "_subs") Nothing) (Identifier "result_vars")
, S.SEArray $ map S.SELit jPath
]
addTypeAnnotation pgType = flip S.SETyAnn (S.mkTypeAnn pgType) . case pgType of
CollectableTypeScalar scalarType -> withConstructorFn scalarType
CollectableTypeArray _ -> id


----------------------------------------------------------------------------------------------------
-- Execution

executeMultiplexedQuery
:: (MonadTx m)
=> MultiplexedQuery -> [(CohortId, CohortVariables)] -> m [(CohortId, B.ByteString)]
executeMultiplexedQuery (MultiplexedQuery query) = executeQuery query

-- | Internal; used by both 'executeMultiplexedQuery' and 'explainLiveQueryPlan'.
executeQuery
:: (MonadTx m, Q.FromRow a)
=> Q.Query -> [(CohortId, CohortVariables)] -> m [a]
executeQuery query cohorts =
let (cohortIds, cohortVars) = unzip cohorts
preparedArgs = (CohortIdArray cohortIds, CohortVariablesArray cohortVars)
in liftTx $ Q.listQE defaultTxErrorHandler query preparedArgs True
Loading

0 comments on commit 377425f

Please sign in to comment.