Skip to content

Commit

Permalink
chore: give liveSubscribe access to options/pluginHook (#1482)
Browse files Browse the repository at this point in the history
  • Loading branch information
benjie authored May 26, 2021
1 parent c583026 commit 95a3451
Show file tree
Hide file tree
Showing 2 changed files with 112 additions and 97 deletions.
206 changes: 110 additions & 96 deletions src/postgraphile/http/liveSubscribe.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,111 +18,125 @@ import {
GraphQLError,
ExecutionResult,
} from 'graphql';
import type { subscribe } from 'graphql';
import mapAsyncIterator from './mapAsyncIterator';
import { isAsyncIterable } from 'iterall';
import { CreateRequestHandlerOptions } from '../../interfaces';
import { PluginHookFn } from '../pluginHook';

type mixed = any;

export default function liveSubscribe(
argsOrSchema: any | GraphQLSchema,
document?: DocumentNode,
rootValue?: any,
contextValue?: any,
variableValues?: { [key: string]: any },
operationName?: string,
fieldResolver?: GraphQLFieldResolver<any, any>,
subscribeFieldResolver?: GraphQLFieldResolver<any, any>,
) {
/* eslint-enable no-redeclare */
// Extract arguments from object args if provided.
return arguments.length === 1
? liveSubscribeImpl(
argsOrSchema.schema,
argsOrSchema.document,
argsOrSchema.rootValue,
argsOrSchema.contextValue,
argsOrSchema.variableValues,
argsOrSchema.operationName,
argsOrSchema.fieldResolver,
argsOrSchema.subscribeFieldResolver,
)
: liveSubscribeImpl(
argsOrSchema,
document as DocumentNode,
rootValue,
contextValue,
variableValues,
operationName,
fieldResolver,
subscribeFieldResolver,
);
}
/**
* This method returns a function compatible with the `subscribe` function from
* GraphQL.js, but with enhancements to support live queries.
*
* @internal
*/
export function makeLiveSubscribe(_params: {
options: CreateRequestHandlerOptions;
pluginHook: PluginHookFn;
}): typeof subscribe {
return function liveSubscribe(
argsOrSchema: any | GraphQLSchema,
document?: DocumentNode,
rootValue?: any,
contextValue?: any,
variableValues?: { [key: string]: any },
operationName?: string,
fieldResolver?: GraphQLFieldResolver<any, any>,
subscribeFieldResolver?: GraphQLFieldResolver<any, any>,
) {
/* eslint-enable no-redeclare */
// Extract arguments from object args if provided.
return arguments.length === 1
? liveSubscribeImpl(
argsOrSchema.schema,
argsOrSchema.document,
argsOrSchema.rootValue,
argsOrSchema.contextValue,
argsOrSchema.variableValues,
argsOrSchema.operationName,
argsOrSchema.fieldResolver,
argsOrSchema.subscribeFieldResolver,
)
: liveSubscribeImpl(
argsOrSchema,
document as DocumentNode,
rootValue,
contextValue,
variableValues,
operationName,
fieldResolver,
subscribeFieldResolver,
);
};

function liveSubscribeImpl(
schema: GraphQLSchema,
document: DocumentNode,
rootValue?: any,
contextValue?: any,
variableValues?: { [key: string]: any },
operationName?: string,
fieldResolver?: GraphQLFieldResolver<any, any>,
subscribeFieldResolver?: GraphQLFieldResolver<any, any>,
) {
const sourcePromise = createSourceEventStream(
schema,
document,
rootValue,
contextValue,
variableValues,
operationName,
subscribeFieldResolver,
);
function liveSubscribeImpl(
schema: GraphQLSchema,
document: DocumentNode,
rootValue?: any,
contextValue?: any,
variableValues?: { [key: string]: any },
operationName?: string,
fieldResolver?: GraphQLFieldResolver<any, any>,
subscribeFieldResolver?: GraphQLFieldResolver<any, any>,
) {
const sourcePromise = createSourceEventStream(
schema,
document,
rootValue,
contextValue,
variableValues,
operationName,
subscribeFieldResolver,
);

// For each payload yielded from a subscription, map it over the normal
// GraphQL `execute` function, with `payload` as the rootValue.
// This implements the "MapSourceToResponseEvent" algorithm described in
// the GraphQL specification. The `execute` function provides the
// "ExecuteSubscriptionEvent" algorithm, as it is nearly identical to the
// "ExecuteQuery" algorithm, for which `execute` is also used.
const mapSourceToResponse = async (payload: any) => {
/*
* GRAPHILE FORK
*
* We need to tell Graphile Engine when the execution has completed
* (because we cannot detect this from inside the GraphQL execution) so
* that it can clean up old listeners; we do this with the `finally` block.
*/
try {
return await execute(
schema,
document,
payload,
contextValue,
variableValues,
operationName,
fieldResolver,
);
} finally {
if (payload && typeof payload.release === 'function') {
payload.release();
// For each payload yielded from a subscription, map it over the normal
// GraphQL `execute` function, with `payload` as the rootValue.
// This implements the "MapSourceToResponseEvent" algorithm described in
// the GraphQL specification. The `execute` function provides the
// "ExecuteSubscriptionEvent" algorithm, as it is nearly identical to the
// "ExecuteQuery" algorithm, for which `execute` is also used.
const mapSourceToResponse = async (payload: any) => {
/*
* GRAPHILE FORK
*
* We need to tell Graphile Engine when the execution has completed
* (because we cannot detect this from inside the GraphQL execution) so
* that it can clean up old listeners; we do this with the `finally` block.
*/
try {
return await execute(
schema,
document,
payload,
contextValue,
variableValues,
operationName,
fieldResolver,
);
} finally {
if (payload && typeof payload.release === 'function') {
payload.release();
}
}
}
};
};

// Resolve the Source Stream, then map every source value to a
// ExecutionResult value as described above.
return sourcePromise.then(
resultOrStream =>
// Note: Flow can't refine isAsyncIterable, so explicit casts are used.
isAsyncIterable(resultOrStream)
? mapAsyncIterator(
(resultOrStream as any) as AsyncIterable<mixed>,
mapSourceToResponse,
reportGraphQLError,
)
: ((resultOrStream as any) as ExecutionResult),
reportGraphQLError,
);
// Resolve the Source Stream, then map every source value to a
// ExecutionResult value as described above.
return sourcePromise.then(
resultOrStream =>
// Note: Flow can't refine isAsyncIterable, so explicit casts are used.
isAsyncIterable(resultOrStream)
? mapAsyncIterator(
(resultOrStream as any) as AsyncIterable<mixed>,
mapSourceToResponse,
reportGraphQLError,
)
: ((resultOrStream as any) as ExecutionResult),
reportGraphQLError,
);
}
}

/**
Expand Down
3 changes: 2 additions & 1 deletion src/postgraphile/http/subscriptions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import { useServer } from 'graphql-ws/lib/use/ws';
import parseUrl = require('parseurl');
import { pluginHookFromOptions } from '../pluginHook';
import { isEmpty } from './createPostGraphileHttpRequestHandler';
import liveSubscribe from './liveSubscribe';
import { makeLiveSubscribe } from './liveSubscribe';

interface Deferred<T> extends Promise<T> {
resolve: (input?: T | PromiseLike<T> | undefined) => void;
Expand Down Expand Up @@ -70,6 +70,7 @@ export async function enhanceHttpServerWithWebSockets<
handleErrors,
} = postgraphileMiddleware;
const pluginHook = pluginHookFromOptions(options);
const liveSubscribe = makeLiveSubscribe({ pluginHook, options });
const graphqlRoute =
(subscriptionServerOptions && subscriptionServerOptions.graphqlRoute) ||
(options.externalUrlBase || '') + (options.graphqlRoute || '/graphql');
Expand Down

0 comments on commit 95a3451

Please sign in to comment.