Skip to content

Commit d9d9997

Browse files
authored
Merge pull request #132 from kjin/grpc-js-core-work-3
grpc-js-core: prevent callback before 'end' event and handle eos headers as trailers
2 parents 78f6458 + 22438ae commit d9d9997

File tree

4 files changed

+125
-54
lines changed

4 files changed

+125
-54
lines changed

packages/grpc-js-core/src/call-stream.ts

Lines changed: 84 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,13 @@ export class Http2CallStream extends Duplex implements CallStream {
105105
// Status code mapped from :status. To be used if grpc-status is not received
106106
private mappedStatusCode: Status = Status.UNKNOWN;
107107

108+
// Promise objects that are re-assigned to resolving promises when headers
109+
// or trailers received. Processing headers/trailers is asynchronous, so we
110+
// can use these objects to await their completion. This helps us establish
111+
// order of precedence when obtaining the status of the call.
112+
private handlingHeaders = Promise.resolve();
113+
private handlingTrailers = Promise.resolve();
114+
108115
// This is populated (non-null) if and only if the call has ended
109116
private finalStatus: StatusObject|null = null;
110117

@@ -116,6 +123,11 @@ export class Http2CallStream extends Duplex implements CallStream {
116123
this.filterStack = filterStackFactory.createFilter(this);
117124
}
118125

126+
/**
127+
* On first call, emits a 'status' event with the given StatusObject.
128+
* Subsequent calls are no-ops.
129+
* @param status The status of the call.
130+
*/
119131
private endCall(status: StatusObject): void {
120132
if (this.finalStatus === null) {
121133
this.finalStatus = status;
@@ -135,12 +147,46 @@ export class Http2CallStream extends Duplex implements CallStream {
135147
return canPush;
136148
}
137149

150+
private handleTrailers(headers: http2.IncomingHttpHeaders) {
151+
let code: Status = this.mappedStatusCode;
152+
let details = '';
153+
let metadata: Metadata;
154+
try {
155+
metadata = Metadata.fromHttp2Headers(headers);
156+
} catch (e) {
157+
metadata = new Metadata();
158+
}
159+
let status: StatusObject = {code, details, metadata};
160+
this.handlingTrailers = (async () => {
161+
let finalStatus;
162+
try {
163+
// Attempt to assign final status.
164+
finalStatus = await this.filterStack.receiveTrailers(Promise.resolve(status));
165+
} catch (error) {
166+
await this.handlingHeaders;
167+
// This is a no-op if the call was already ended when handling headers.
168+
this.endCall({
169+
code: Status.INTERNAL,
170+
details: 'Failed to process received status',
171+
metadata: new Metadata()
172+
});
173+
return;
174+
}
175+
// It's possible that headers were received but not fully handled yet.
176+
// Give the headers handler an opportunity to end the call first,
177+
// if an error occurred.
178+
await this.handlingHeaders;
179+
// This is a no-op if the call was already ended when handling headers.
180+
this.endCall(finalStatus);
181+
})();
182+
}
183+
138184
attachHttp2Stream(stream: http2.ClientHttp2Stream): void {
139185
if (this.finalStatus !== null) {
140186
stream.rstWithCancel();
141187
} else {
142188
this.http2Stream = stream;
143-
stream.on('response', (headers) => {
189+
stream.on('response', (headers, flags) => {
144190
switch (headers[HTTP2_HEADER_STATUS]) {
145191
// TODO(murgatroid99): handle 100 and 101
146192
case '400':
@@ -166,57 +212,27 @@ export class Http2CallStream extends Duplex implements CallStream {
166212
}
167213
delete headers[HTTP2_HEADER_STATUS];
168214
delete headers[HTTP2_HEADER_CONTENT_TYPE];
169-
let metadata: Metadata;
170-
try {
171-
metadata = Metadata.fromHttp2Headers(headers);
172-
} catch (e) {
173-
this.cancelWithStatus(Status.UNKNOWN, e.message);
174-
return;
175-
}
176-
this.filterStack.receiveMetadata(Promise.resolve(metadata))
177-
.then(
178-
(finalMetadata) => {
179-
this.emit('metadata', finalMetadata);
180-
},
181-
(error) => {
182-
this.cancelWithStatus(Status.UNKNOWN, error.message);
183-
});
184-
});
185-
stream.on('trailers', (headers: http2.IncomingHttpHeaders) => {
186-
let code: Status = this.mappedStatusCode;
187-
let details = '';
188-
if (typeof headers['grpc-status'] === 'string') {
189-
let receivedCode = Number(headers['grpc-status']);
190-
if (receivedCode in Status) {
191-
code = receivedCode;
192-
} else {
193-
code = Status.UNKNOWN;
215+
if (flags & http2.constants.NGHTTP2_FLAG_END_STREAM) {
216+
this.handleTrailers(headers);
217+
} else {
218+
let metadata: Metadata;
219+
try {
220+
metadata = Metadata.fromHttp2Headers(headers);
221+
} catch (error) {
222+
this.endCall({code: Status.UNKNOWN, details: error.message, metadata: new Metadata()});
223+
return;
194224
}
195-
delete headers['grpc-status'];
196-
}
197-
if (typeof headers['grpc-message'] === 'string') {
198-
details = decodeURI(headers['grpc-message'] as string);
225+
this.handlingHeaders =
226+
this.filterStack.receiveMetadata(Promise.resolve(metadata))
227+
.then((finalMetadata) => {
228+
this.emit('metadata', finalMetadata);
229+
}).catch((error) => {
230+
this.destroyHttp2Stream();
231+
this.endCall({code: Status.UNKNOWN, details: error.message, metadata: new Metadata()});
232+
});
199233
}
200-
let metadata: Metadata;
201-
try {
202-
metadata = Metadata.fromHttp2Headers(headers);
203-
} catch (e) {
204-
metadata = new Metadata();
205-
}
206-
let status: StatusObject = {code, details, metadata};
207-
this.filterStack.receiveTrailers(Promise.resolve(status))
208-
.then(
209-
(finalStatus) => {
210-
this.endCall(finalStatus);
211-
},
212-
(error) => {
213-
this.endCall({
214-
code: Status.INTERNAL,
215-
details: 'Failed to process received status',
216-
metadata: new Metadata()
217-
});
218-
});
219234
});
235+
stream.on('trailers', this.handleTrailers.bind(this));
220236
stream.on('data', (data) => {
221237
let readHead = 0;
222238
let canPush = true;
@@ -278,7 +294,7 @@ export class Http2CallStream extends Duplex implements CallStream {
278294
this.unpushedReadMessages.push(null);
279295
}
280296
});
281-
stream.on('streamClosed', (errorCode) => {
297+
stream.on('close', async (errorCode) => {
282298
let code: Status;
283299
let details = '';
284300
switch (errorCode) {
@@ -302,6 +318,13 @@ export class Http2CallStream extends Duplex implements CallStream {
302318
default:
303319
code = Status.INTERNAL;
304320
}
321+
// This guarantees that if trailers were received, the value of the
322+
// 'grpc-status' header takes precedence for emitted status data.
323+
await this.handlingTrailers;
324+
// This is a no-op if trailers were received at all.
325+
// This is OK, because status codes emitted here correspond to more
326+
// catastrophic issues that prevent us from receiving trailers in the
327+
// first place.
305328
this.endCall({code: code, details: details, metadata: new Metadata()});
306329
});
307330
stream.on('error', (err: Error) => {
@@ -326,8 +349,7 @@ export class Http2CallStream extends Duplex implements CallStream {
326349
}
327350
}
328351

329-
cancelWithStatus(status: Status, details: string): void {
330-
this.endCall({code: status, details: details, metadata: new Metadata()});
352+
private destroyHttp2Stream() {
331353
// The http2 stream could already have been destroyed if cancelWithStatus
332354
// is called in response to an internal http2 error.
333355
if (this.http2Stream !== null && !this.http2Stream.destroyed) {
@@ -337,6 +359,16 @@ export class Http2CallStream extends Duplex implements CallStream {
337359
}
338360
}
339361

362+
cancelWithStatus(status: Status, details: string): void {
363+
this.destroyHttp2Stream();
364+
(async () => {
365+
// If trailers are currently being processed, the call should be ended
366+
// by handleTrailers instead.
367+
await this.handlingTrailers;
368+
this.endCall({code: status, details: details, metadata: new Metadata()});
369+
})();
370+
}
371+
340372
getDeadline(): Deadline {
341373
return this.options.deadline;
342374
}

packages/grpc-js-core/src/channel.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import {Status} from './constants';
1212
import {DeadlineFilterFactory} from './deadline-filter';
1313
import {FilterStackFactory} from './filter-stack';
1414
import {Metadata, MetadataObject} from './metadata';
15+
import { MetadataStatusFilterFactory } from './metadata-status-filter';
1516

1617
const IDLE_TIMEOUT_MS = 300000;
1718

@@ -189,7 +190,9 @@ export class Http2Channel extends EventEmitter implements Channel {
189190
}
190191
this.filterStackFactory = new FilterStackFactory([
191192
new CompressionFilterFactory(this),
192-
new CallCredentialsFilterFactory(this), new DeadlineFilterFactory(this)
193+
new CallCredentialsFilterFactory(this),
194+
new DeadlineFilterFactory(this),
195+
new MetadataStatusFilterFactory(this)
193196
]);
194197
this.currentBackoffDeadline = new Date();
195198
/* The only purpose of these lines is to ensure that this.backoffTimerId has
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
import {CallStream} from './call-stream';
2+
import {Channel} from './channel';
3+
import {BaseFilter, Filter, FilterFactory} from './filter';
4+
import {StatusObject} from './call-stream';
5+
import {Status} from './constants';
6+
7+
export class MetadataStatusFilter extends BaseFilter implements Filter {
8+
async receiveTrailers(status: Promise<StatusObject>): Promise<StatusObject> {
9+
let { code, details, metadata } = await status;
10+
if (code !== Status.UNKNOWN) {
11+
// we already have a known status, so don't assign a new one.
12+
return { code, details, metadata };
13+
}
14+
const metadataMap = metadata.getMap();
15+
if (typeof metadataMap['grpc-status'] === 'string') {
16+
let receivedCode = Number(metadataMap['grpc-status']);
17+
if (receivedCode in Status) {
18+
code = receivedCode;
19+
}
20+
metadata.remove('grpc-status');
21+
}
22+
if (typeof metadataMap['grpc-message'] === 'string') {
23+
details = decodeURI(metadataMap['grpc-message'] as string);
24+
metadata.remove('grpc-message');
25+
}
26+
return { code, details, metadata };
27+
}
28+
}
29+
30+
export class MetadataStatusFilterFactory implements
31+
FilterFactory<MetadataStatusFilter> {
32+
constructor(private readonly channel: Channel) {}
33+
createFilter(callStream: CallStream): MetadataStatusFilter {
34+
return new MetadataStatusFilter();
35+
}
36+
}

packages/grpc-js-core/src/metadata.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ function isLegalKey(key: string): boolean {
2626
}
2727

2828
function isLegalNonBinaryValue(value: string): boolean {
29-
return !!value.match(/^[ -~]+$/);
29+
return !!value.match(/^[ -~]*$/);
3030
}
3131

3232
function isBinaryKey(key: string): boolean {

0 commit comments

Comments
 (0)