Skip to content

Commit

Permalink
feat(NODE-3132): Add TypedEventEmitter (#2785)
Browse files Browse the repository at this point in the history
Using a mapped type of event names to function type can now provide
annotations and completion for event argument types.
  • Loading branch information
nbbeeken authored May 11, 2021
1 parent 8bb92f9 commit f4d40a4
Show file tree
Hide file tree
Showing 19 changed files with 1,865 additions and 1,559 deletions.
2,775 changes: 1,417 additions & 1,358 deletions package-lock.json

Large diffs are not rendered by default.

12 changes: 6 additions & 6 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,15 @@
},
"devDependencies": {
"@istanbuljs/nyc-config-typescript": "^1.0.1",
"@microsoft/api-extractor": "^7.13.1",
"@microsoft/tsdoc-config": "^0.14.0",
"@microsoft/api-extractor": "^7.14.0",
"@microsoft/tsdoc-config": "^0.15.2",
"@types/aws4": "^1.5.1",
"@types/bl": "^2.1.0",
"@types/chai": "^4.2.14",
"@types/chai-subset": "^1.3.3",
"@types/kerberos": "^1.1.0",
"@types/mocha": "^8.2.0",
"@types/node": "^14.14.31",
"@types/node": "^14.14.41",
"@types/saslprep": "^1.0.0",
"@types/semver": "^7.3.4",
"@typescript-eslint/eslint-plugin": "^4.15.1",
Expand All @@ -62,7 +62,7 @@
"mocha-sinon": "^2.1.0",
"mongodb-mock-server": "^2.0.1",
"nyc": "^15.1.0",
"prettier": "^2.0.5",
"prettier": "2.1.1",
"rimraf": "^3.0.2",
"semver": "^5.5.0",
"sinon": "^4.3.0",
Expand All @@ -72,8 +72,8 @@
"standard-version": "^8.0.2",
"through2": "^3.0.1",
"ts-node": "^9.1.1",
"typedoc": "^0.20.25",
"typescript": "^4.1.5",
"typedoc": "^0.20.35",
"typescript": "^4.2.4",
"typescript-cached-transpile": "^0.0.6",
"worker-farm": "^1.5.0",
"wtfnode": "^0.8.2",
Expand Down
50 changes: 39 additions & 11 deletions src/change_stream.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
import Denque = require('denque');
import { EventEmitter } from 'events';
import { MongoError, AnyError, isResumableError } from './error';
import { AggregateOperation, AggregateOptions } from './operations/aggregate';
import {
relayEvents,
maxWireVersion,
calculateDurationInMs,
now,
Expand All @@ -22,11 +20,13 @@ import { Collection } from './collection';
import type { Readable } from 'stream';
import {
AbstractCursor,
AbstractCursorEvents,
AbstractCursorOptions,
CursorStreamOptions
} from './cursor/abstract_cursor';
import type { ClientSession } from './sessions';
import { executeOperation, ExecutionResult } from './operations/execute_operation';
import { TypedEventEmitter } from './mongo_types';

const kResumeQueue = Symbol('resumeQueue');
const kCursorStream = Symbol('cursorStream');
Expand Down Expand Up @@ -96,7 +96,8 @@ export interface ChangeStreamOptions extends AggregateOptions {
batchSize?: number;
}

interface ChangeStreamDocument {
/** @public */
export interface ChangeStreamDocument {
/**
* The id functions as an opaque token for use when resuming an interrupted
* change stream.
Expand Down Expand Up @@ -157,7 +158,8 @@ interface ChangeStreamDocument {
fullDocument?: Document;
}

interface UpdateDescription {
/** @public */
export interface UpdateDescription {
/**
* A document containing key:value pairs of names of the fields that were
* changed, and the new value for those fields.
Expand All @@ -170,11 +172,22 @@ interface UpdateDescription {
removedFields: string[];
}

/** @public */
export type ChangeStreamEvents = {
resumeTokenChanged(token: ResumeToken): void;
init(response: Document): void;
more(response?: Document | undefined): void;
response(): void;
end(): void;
error(error: Error): void;
change(change: ChangeStreamDocument): void;
} & AbstractCursorEvents;

/**
* Creates a new Change Stream instance. Normally created using {@link Collection#watch|Collection.watch()}.
* @public
*/
export class ChangeStream extends EventEmitter {
export class ChangeStream extends TypedEventEmitter<ChangeStreamEvents> {
pipeline: Document[];
options: ChangeStreamOptions;
parent: MongoClient | Db | Collection;
Expand All @@ -190,6 +203,12 @@ export class ChangeStream extends EventEmitter {
/** @internal */
[kClosed]: boolean;

/** @event */
static readonly RESPONSE = 'response' as const;
/** @event */
static readonly MORE = 'more' as const;
/** @event */
static readonly INIT = 'init' as const;
/** @event */
static readonly CLOSE = 'close' as const;
/**
Expand Down Expand Up @@ -357,7 +376,7 @@ export interface ChangeStreamCursorOptions extends AbstractCursorOptions {
}

/** @internal */
export class ChangeStreamCursor extends AbstractCursor {
export class ChangeStreamCursor extends AbstractCursor<ChangeStreamEvents> {
_resumeToken: ResumeToken;
startAtOperationTime?: OperationTime;
hasReceived?: boolean;
Expand Down Expand Up @@ -476,8 +495,8 @@ export class ChangeStreamCursor extends AbstractCursor {

this._processBatch('firstBatch', response);

this.emit('init', response);
this.emit('response');
this.emit(ChangeStream.INIT, response);
this.emit(ChangeStream.RESPONSE);

// TODO: NODE-2882
callback(undefined, { server, session, response });
Expand All @@ -492,13 +511,19 @@ export class ChangeStreamCursor extends AbstractCursor {

this._processBatch('nextBatch', response);

this.emit('more', response);
this.emit('response');
this.emit(ChangeStream.MORE, response);
this.emit(ChangeStream.RESPONSE);
callback(err, response);
});
}
}

const CHANGE_STREAM_EVENTS = [
ChangeStream.RESUME_TOKEN_CHANGED,
ChangeStream.END,
ChangeStream.CLOSE
];

/**
* Create a new change stream cursor based on self's configuration
* @internal
Expand All @@ -525,7 +550,10 @@ function createChangeStreamCursor(
cursorOptions
);

relayEvents(changeStreamCursor, changeStream, ['resumeTokenChanged', 'end', 'close']);
for (const event of CHANGE_STREAM_EVENTS) {
changeStreamCursor.on(event, e => changeStream.emit(event, e));
}

if (changeStream.listenerCount(ChangeStream.CHANGE) > 0) {
streamEvents(changeStream, changeStreamCursor);
}
Expand Down
29 changes: 25 additions & 4 deletions src/cmap/connection.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import { EventEmitter } from 'events';
import { MessageStream, OperationDescription } from './message_stream';
import { StreamDescription, StreamDescriptionOptions } from './stream_description';
import {
Expand Down Expand Up @@ -43,6 +42,7 @@ import { ReadPreference, ReadPreferenceLike } from '../read_preference';
import { isTransactionCommand } from '../transactions';
import type { W, WriteConcern, WriteConcernOptions } from '../write_concern';
import type { ServerApi, SupportedNodeConnectionOptions } from '../mongo_client';
import { CancellationToken, TypedEventEmitter } from '../mongo_types';

const kStream = Symbol('stream');
const kQueue = Symbol('queue');
Expand Down Expand Up @@ -121,7 +121,7 @@ export interface ConnectionOptions
keepAliveInitialDelay?: number;
noDelay?: boolean;
socketTimeoutMS?: number;
cancellationToken?: EventEmitter;
cancellationToken?: CancellationToken;

metadata: ClientMetadata;
}
Expand All @@ -133,7 +133,17 @@ export interface DestroyOptions {
}

/** @public */
export class Connection extends EventEmitter {
export type ConnectionEvents = {
[Connection.COMMAND_STARTED](event: CommandStartedEvent): void;
[Connection.COMMAND_SUCCEEDED](event: CommandSucceededEvent): void;
[Connection.COMMAND_FAILED](event: CommandFailedEvent): void;
[Connection.CLUSTER_TIME_RECEIVED](clusterTime: Document): void;
[Connection.CLOSE](): void;
[Connection.MESSAGE](message: any): void;
};

/** @public */
export class Connection extends TypedEventEmitter<ConnectionEvents> {
id: number | '<monitor>';
address: string;
socketTimeoutMS: number;
Expand Down Expand Up @@ -167,6 +177,10 @@ export class Connection extends EventEmitter {
static readonly COMMAND_FAILED = 'commandFailed' as const;
/** @event */
static readonly CLUSTER_TIME_RECEIVED = 'clusterTimeReceived' as const;
/** @event */
static readonly CLOSE = 'close' as const;
/** @event */
static readonly MESSAGE = 'message' as const;

constructor(stream: Stream, options: ConnectionOptions) {
super();
Expand Down Expand Up @@ -266,7 +280,7 @@ export class Connection extends EventEmitter {
}

this[kQueue].clear();
this.emit('close');
this.emit(Connection.CLOSE);
}

destroy(): void;
Expand Down Expand Up @@ -586,6 +600,13 @@ export class Connection extends EventEmitter {
}
}

/** @public */
export const APM_EVENTS = [
Connection.COMMAND_STARTED,
Connection.COMMAND_SUCCEEDED,
Connection.COMMAND_FAILED
];

/** @internal */
export class CryptoConnection extends Connection {
/** @internal */
Expand Down
47 changes: 29 additions & 18 deletions src/cmap/connection_pool.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
import Denque = require('denque');
import { EventEmitter } from 'events';
import { Logger } from '../logger';
import { Connection, ConnectionOptions } from './connection';
import { APM_EVENTS, Connection, ConnectionEvents, ConnectionOptions } from './connection';
import { connect } from './connect';
import { eachAsync, relayEvents, makeCounter, Callback } from '../utils';
import { eachAsync, makeCounter, Callback } from '../utils';
import { MongoError } from '../error';
import { PoolClosedError, WaitQueueTimeoutError } from './errors';
import {
Expand All @@ -18,6 +17,7 @@ import {
ConnectionCheckedInEvent,
ConnectionPoolClearedEvent
} from './connection_pool_events';
import { CancellationToken, TypedEventEmitter } from '../mongo_types';

const kLogger = Symbol('logger');
const kConnections = Symbol('connections');
Expand Down Expand Up @@ -53,11 +53,25 @@ export interface CloseOptions {
force?: boolean;
}

/** @public */
export type ConnectionPoolEvents = {
[ConnectionPool.CONNECTION_POOL_CREATED](event: ConnectionPoolCreatedEvent): void;
[ConnectionPool.CONNECTION_POOL_CLOSED](event: ConnectionPoolClosedEvent): void;
[ConnectionPool.CONNECTION_POOL_CLEARED](event: ConnectionPoolClearedEvent): void;
[ConnectionPool.CONNECTION_CREATED](event: ConnectionCreatedEvent): void;
[ConnectionPool.CONNECTION_READY](event: ConnectionReadyEvent): void;
[ConnectionPool.CONNECTION_CLOSED](event: ConnectionClosedEvent): void;
[ConnectionPool.CONNECTION_CHECK_OUT_STARTED](event: ConnectionCheckOutStartedEvent): void;
[ConnectionPool.CONNECTION_CHECK_OUT_FAILED](event: ConnectionCheckOutFailedEvent): void;
[ConnectionPool.CONNECTION_CHECKED_OUT](event: ConnectionCheckedOutEvent): void;
[ConnectionPool.CONNECTION_CHECKED_IN](event: ConnectionCheckedInEvent): void;
} & Omit<ConnectionEvents, 'close' | 'message'>;

/**
* A pool of connections which dynamically resizes, and emit events related to pool activity
* @public
*/
export class ConnectionPool extends EventEmitter {
export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
closed: boolean;
options: Readonly<ConnectionPoolOptions>;
/** @internal */
Expand All @@ -79,7 +93,7 @@ export class ConnectionPool extends EventEmitter {
/** @internal */
[kConnectionCounter]: Generator<number>;
/** @internal */
[kCancellationToken]: EventEmitter;
[kCancellationToken]: CancellationToken;
/** @internal */
[kWaitQueue]: Denque<WaitQueueMember>;

Expand All @@ -93,6 +107,11 @@ export class ConnectionPool extends EventEmitter {
* @event
*/
static readonly CONNECTION_POOL_CLOSED = 'connectionPoolClosed' as const;
/**
* Emitted each time the connection pool is cleared and it's generation incremented
* @event
*/
static readonly CONNECTION_POOL_CLEARED = 'connectionPoolCleared' as const;
/**
* Emitted when a connection is created.
* @event
Expand Down Expand Up @@ -128,11 +147,6 @@ export class ConnectionPool extends EventEmitter {
* @event
*/
static readonly CONNECTION_CHECKED_IN = 'connectionCheckedIn' as const;
/**
* Emitted each time the connection pool is cleared and it's generation incremented
* @event
*/
static readonly CONNECTION_POOL_CLEARED = 'connectionPoolCleared' as const;

constructor(options: ConnectionPoolOptions) {
super();
Expand Down Expand Up @@ -161,7 +175,7 @@ export class ConnectionPool extends EventEmitter {
this[kMinPoolSizeTimer] = undefined;
this[kGeneration] = 0;
this[kConnectionCounter] = makeCounter(1);
this[kCancellationToken] = new EventEmitter();
this[kCancellationToken] = new CancellationToken();
this[kCancellationToken].setMaxListeners(Infinity);
this[kWaitQueue] = new Denque();

Expand Down Expand Up @@ -409,12 +423,9 @@ function createConnection(pool: ConnectionPool, callback?: Callback<Connection>)
}

// forward all events from the connection to the pool
relayEvents(connection, pool, [
Connection.COMMAND_STARTED,
Connection.COMMAND_FAILED,
Connection.COMMAND_SUCCEEDED,
Connection.CLUSTER_TIME_RECEIVED
]);
for (const event of [...APM_EVENTS, Connection.CLUSTER_TIME_RECEIVED]) {
connection.on(event, (e: any) => pool.emit(event, e));
}

pool.emit(ConnectionPool.CONNECTION_POOL_CREATED, new ConnectionCreatedEvent(pool, connection));

Expand Down Expand Up @@ -522,7 +533,7 @@ function processWaitQueue(pool: ConnectionPool) {
}
}

export const CMAP_EVENT_NAMES = [
export const CMAP_EVENTS = [
ConnectionPool.CONNECTION_POOL_CREATED,
ConnectionPool.CONNECTION_POOL_CLOSED,
ConnectionPool.CONNECTION_CREATED,
Expand Down
Loading

0 comments on commit f4d40a4

Please sign in to comment.