-
Notifications
You must be signed in to change notification settings - Fork 133
/
pubsub.ts
196 lines (168 loc) · 7.06 KB
/
pubsub.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
//
// This is basically just event emitters wrapped with a function that filters messages.
//
import { EventEmitter } from 'events';
import graphql, {
GraphQLSchema,
GraphQLError,
validate,
execute,
parse,
specifiedRules,
OperationDefinition,
Field,
Variable,
IntValue,
} from 'graphql';
const valueFromAST = require('graphql').valueFromAST;
import {
subscriptionHasSingleRootField
} from './validation';
export interface PubSubEngine {
publish(triggerName: string, payload: any): boolean
subscribe(triggerName: string, onMessage: Function): Promise<number>
unsubscribe(subId: number)
}
export class PubSub implements PubSubEngine {
private ee: EventEmitter;
private subscriptions: {[key: string]: [string, Function]};
private subIdCounter: number;
constructor(){
this.ee = new EventEmitter(); // max listeners = 10.
this.subscriptions = {};
this.subIdCounter = 0;
}
public publish(triggerName: string, payload: any): boolean {
this.ee.emit(triggerName, payload);
// Not using the value returned from emit method because it gives
// irrelevant false when there are no listeners.
return true;
}
public subscribe(triggerName: string, onMessage: Function): Promise<number> {
this.ee.addListener(triggerName, onMessage);
this.subIdCounter = this.subIdCounter + 1;
this.subscriptions[this.subIdCounter] = [triggerName, onMessage];
return Promise.resolve(this.subIdCounter);
}
public unsubscribe(subId: number) {
const [triggerName, onMessage] = this.subscriptions[subId];
delete this.subscriptions[subId];
this.ee.removeListener(triggerName, onMessage);
}
}
export class ValidationError extends Error {
errors: Array<GraphQLError>;
message: string;
constructor(errors){
super();
this.errors = errors;
this.message = 'Subscription query has validation errors';
}
}
export interface SubscriptionOptions {
query: string;
operationName: string;
callback: Function;
variables?: { [key: string]: any };
context?: any;
formatError?: Function;
formatResponse?: Function;
};
// This manages actual GraphQL subscriptions.
export class SubscriptionManager {
private pubsub: PubSubEngine;
private schema: GraphQLSchema;
private setupFunctions: { [subscriptionName: string]: Function };
private subscriptions: { [externalId: number]: Array<number>};
private maxSubscriptionId: number;
constructor(options: { schema: GraphQLSchema,
setupFunctions: {[subscriptionName: string]: Function},
pubsub: PubSubEngine }){
this.pubsub = options.pubsub;
this.schema = options.schema;
this.setupFunctions = options.setupFunctions || {};
this.subscriptions = {};
this.maxSubscriptionId = 0;
}
public publish(triggerName: string, payload: any) {
this.pubsub.publish(triggerName, payload);
}
public subscribe(options: SubscriptionOptions): Promise<number> {
// 1. validate the query, operationName and variables
const parsedQuery = parse(options.query);
const errors = validate(
this.schema,
parsedQuery,
[...specifiedRules, subscriptionHasSingleRootField]
);
// TODO: validate that all variables have been passed (and are of correct type)?
if (errors.length){
// this error kills the subscription, so we throw it.
return Promise.reject<number>(new ValidationError(errors));
}
const args = {};
// operationName is the name of the only root field in the subscription document
let subscriptionName = '';
parsedQuery.definitions.forEach( definition => {
if (definition.kind === 'OperationDefinition'){
// only one root field is allowed on subscription. No fragments for now.
const rootField = (definition as OperationDefinition).selectionSet.selections[0] as Field;
subscriptionName = rootField.name.value;
const fields = this.schema.getSubscriptionType().getFields();
rootField.arguments.forEach( arg => {
// we have to get the one arg's definition from the schema
const argDefinition = fields[subscriptionName].args.filter(
argDef => argDef.name === arg.name.value
)[0];
args[argDefinition.name] = valueFromAST(arg.value, argDefinition.type, options.variables);
});
}
});
// if not provided, the triggerName will be the subscriptionName, and
// the filter will always return true.
let triggerMap = {[subscriptionName]: () => true};
if (this.setupFunctions[subscriptionName]){
triggerMap = this.setupFunctions[subscriptionName](options, args, subscriptionName);
}
const externalSubscriptionId = this.maxSubscriptionId++;
this.subscriptions[externalSubscriptionId] = [];
const subscriptionPromises = [];
Object.keys(triggerMap).forEach( triggerName => {
// 2. generate the filter function and the handler function
const onMessage = rootValue => {
// rootValue is the payload sent by the event emitter / trigger
// by convention this is the value returned from the mutation resolver
try {
execute(
this.schema,
parsedQuery,
rootValue,
options.context,
options.variables,
options.operationName
).then( data => options.callback(null, data) )
} catch (e) {
// this does not kill the subscription, it could be a temporary failure
// TODO: when could this happen?
// It's not a GraphQL error, so what do we do with it?
options.callback(e);
}
}
// Will run the onMessage function only if the message passes the filter function.
const shouldTrigger: Function = triggerMap[triggerName];
const handler = (data) => shouldTrigger(data) && onMessage(data);
// 3. subscribe and keep the subscription id
const subsPromise = this.pubsub.subscribe(triggerName, handler);
subsPromise.then(id => this.subscriptions[externalSubscriptionId].push(id));
subscriptionPromises.push(subsPromise);
});
// Resolve the promise with external sub id only after all subscriptions completed
return Promise.all(subscriptionPromises).then(() => externalSubscriptionId);
}
public unsubscribe(subId){
// pass the subId right through to pubsub. Do nothing else.
this.subscriptions[subId].forEach( internalId => {
this.pubsub.unsubscribe(internalId);
});
}
}