Skip to content

Commit 02344aa

Browse files
committed
add aborted$ observable to KibanaRequest
1 parent 81a7f89 commit 02344aa

File tree

2 files changed

+123
-0
lines changed

2 files changed

+123
-0
lines changed
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
/*
2+
* Licensed to Elasticsearch B.V. under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch B.V. licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
import { Stream } from 'stream';
20+
import Boom from 'boom';
21+
import supertest from 'supertest';
22+
import { schema } from '@kbn/config-schema';
23+
24+
import { HttpService } from '../http_service';
25+
26+
import { contextServiceMock } from '../../context/context_service.mock';
27+
import { loggingServiceMock } from '../../logging/logging_service.mock';
28+
import { createHttpServer } from '../test_utils';
29+
30+
let server: HttpService;
31+
32+
let logger: ReturnType<typeof loggingServiceMock.create>;
33+
const contextSetup = contextServiceMock.createSetupContract();
34+
35+
const setupDeps = {
36+
context: contextSetup,
37+
};
38+
39+
beforeEach(() => {
40+
logger = loggingServiceMock.create();
41+
42+
server = createHttpServer({ logger });
43+
});
44+
45+
afterEach(async () => {
46+
await server.stop();
47+
});
48+
49+
const delay = (ms: number) => new Promise(resolve => setTimeout(resolve, ms));
50+
describe('KibanaRequest', () => {
51+
describe('events', () => {
52+
describe('aborted$', () => {
53+
it('emits once and competes when request aborted', async done => {
54+
const { server: innerServer, createRouter } = await server.setup(setupDeps);
55+
const router = createRouter('/');
56+
57+
const nextSpy = jest.fn();
58+
router.get({ path: '/', validate: false }, async (context, request, res) => {
59+
request.events.aborted$.subscribe({
60+
next: nextSpy,
61+
complete: () => {
62+
expect(nextSpy).toHaveBeenCalledTimes(1);
63+
done();
64+
},
65+
});
66+
67+
// prevents the server to respond
68+
await delay(30000);
69+
return res.ok({ body: 'ok' });
70+
});
71+
72+
await server.start();
73+
74+
const incomingRequest = supertest(innerServer.listener)
75+
.get('/')
76+
// end required to send request
77+
.end();
78+
79+
setTimeout(() => incomingRequest.abort(), 50);
80+
});
81+
it('does not emit when request handled', async () => {
82+
const { server: innerServer, createRouter } = await server.setup(setupDeps);
83+
const router = createRouter('/');
84+
85+
const nextSpy = jest.fn();
86+
const completeSpy = jest.fn();
87+
router.get({ path: '/', validate: false }, async (context, request, res) => {
88+
request.events.aborted$.subscribe({
89+
next: nextSpy,
90+
complete: completeSpy,
91+
});
92+
93+
return res.ok({ body: 'ok' });
94+
});
95+
96+
await server.start();
97+
98+
await supertest(innerServer.listener).get('/');
99+
await delay(50);
100+
101+
expect(nextSpy).toHaveBeenCalledTimes(0);
102+
expect(completeSpy).toHaveBeenCalledTimes(0);
103+
});
104+
});
105+
});
106+
});

src/core/server/http/router/request.ts

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919

2020
import { Url } from 'url';
2121
import { Request } from 'hapi';
22+
import { Observable, fromEvent } from 'rxjs';
23+
import { first } from 'rxjs/operators';
2224

2325
import { deepFreeze, RecursiveReadonly } from '../../../utils';
2426
import { Headers } from './headers';
@@ -46,6 +48,17 @@ export interface KibanaRequestRoute<Method extends RouteMethod> {
4648
options: KibanaRequestRouteOptions<Method>;
4749
}
4850

51+
/**
52+
* KibanaRequest events
53+
* @public
54+
*/
55+
export interface KibanaRequestEvents {
56+
/**
57+
* emits once & completes when the request has been aborted.
58+
*/
59+
aborted$: Observable<void>;
60+
}
61+
4962
/**
5063
* @deprecated
5164
* `hapi` request object, supported during migration process only for backward compatibility.
@@ -116,6 +129,7 @@ export class KibanaRequest<
116129
public readonly headers: Headers;
117130

118131
public readonly socket: IKibanaSocket;
132+
public readonly events: KibanaRequestEvents;
119133

120134
/** @internal */
121135
protected readonly [requestSymbol]: Request;
@@ -130,6 +144,9 @@ export class KibanaRequest<
130144
private readonly withoutSecretHeaders: boolean
131145
) {
132146
this.url = request.url;
147+
this.events = {
148+
aborted$: fromEvent<void>(request.raw.req, 'aborted').pipe(first()),
149+
};
133150
this.headers = deepFreeze({ ...request.headers });
134151

135152
// prevent Symbol exposure via Object.getOwnPropertySymbols()

0 commit comments

Comments
 (0)