Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(NODE-3132): Add TypedEventEmitter #2785

Merged
merged 9 commits into from
May 11, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2,775 changes: 1,417 additions & 1,358 deletions package-lock.json

Large diffs are not rendered by default.

12 changes: 6 additions & 6 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,15 @@
},
"devDependencies": {
"@istanbuljs/nyc-config-typescript": "^1.0.1",
"@microsoft/api-extractor": "^7.13.1",
"@microsoft/tsdoc-config": "^0.14.0",
"@microsoft/api-extractor": "^7.14.0",
"@microsoft/tsdoc-config": "^0.15.2",
"@types/aws4": "^1.5.1",
"@types/bl": "^2.1.0",
"@types/chai": "^4.2.14",
"@types/chai-subset": "^1.3.3",
"@types/kerberos": "^1.1.0",
"@types/mocha": "^8.2.0",
"@types/node": "^14.14.31",
"@types/node": "^14.14.41",
"@types/saslprep": "^1.0.0",
"@types/semver": "^7.3.4",
"@typescript-eslint/eslint-plugin": "^4.15.1",
Expand All @@ -62,7 +62,7 @@
"mocha-sinon": "^2.1.0",
"mongodb-mock-server": "^2.0.1",
"nyc": "^15.1.0",
"prettier": "^2.0.5",
"prettier": "2.1.1",
"rimraf": "^3.0.2",
"semver": "^5.5.0",
"sinon": "^4.3.0",
Expand All @@ -72,8 +72,8 @@
"standard-version": "^8.0.2",
"through2": "^3.0.1",
"ts-node": "^9.1.1",
"typedoc": "^0.20.25",
"typescript": "^4.1.5",
"typedoc": "^0.20.35",
"typescript": "^4.2.4",
"typescript-cached-transpile": "^0.0.6",
"worker-farm": "^1.5.0",
"wtfnode": "^0.8.2",
Expand Down
50 changes: 39 additions & 11 deletions src/change_stream.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
import Denque = require('denque');
import { EventEmitter } from 'events';
import { MongoError, AnyError, isResumableError } from './error';
import { AggregateOperation, AggregateOptions } from './operations/aggregate';
import {
relayEvents,
maxWireVersion,
calculateDurationInMs,
now,
Expand All @@ -22,11 +20,13 @@ import { Collection } from './collection';
import type { Readable } from 'stream';
import {
AbstractCursor,
AbstractCursorEvents,
AbstractCursorOptions,
CursorStreamOptions
} from './cursor/abstract_cursor';
import type { ClientSession } from './sessions';
import { executeOperation, ExecutionResult } from './operations/execute_operation';
import { TypedEventEmitter } from './mongo_types';

const kResumeQueue = Symbol('resumeQueue');
const kCursorStream = Symbol('cursorStream');
Expand Down Expand Up @@ -96,7 +96,8 @@ export interface ChangeStreamOptions extends AggregateOptions {
batchSize?: number;
}

interface ChangeStreamDocument {
/** @public */
export interface ChangeStreamDocument {
/**
* The id functions as an opaque token for use when resuming an interrupted
* change stream.
Expand Down Expand Up @@ -157,7 +158,8 @@ interface ChangeStreamDocument {
fullDocument?: Document;
}

interface UpdateDescription {
/** @public */
export interface UpdateDescription {
/**
* A document containing key:value pairs of names of the fields that were
* changed, and the new value for those fields.
Expand All @@ -170,11 +172,22 @@ interface UpdateDescription {
removedFields: string[];
}

/** @public */
export type ChangeStreamEvents = {
resumeTokenChanged(token: ResumeToken): void;
init(response: Document): void;
more(response?: Document | undefined): void;
response(): void;
end(): void;
error(error: Error): void;
change(change: ChangeStreamDocument): void;
} & AbstractCursorEvents;

/**
* Creates a new Change Stream instance. Normally created using {@link Collection#watch|Collection.watch()}.
* @public
*/
export class ChangeStream extends EventEmitter {
export class ChangeStream extends TypedEventEmitter<ChangeStreamEvents> {
pipeline: Document[];
options: ChangeStreamOptions;
parent: MongoClient | Db | Collection;
Expand All @@ -190,6 +203,12 @@ export class ChangeStream extends EventEmitter {
/** @internal */
[kClosed]: boolean;

/** @event */
static readonly RESPONSE = 'response' as const;
/** @event */
static readonly MORE = 'more' as const;
/** @event */
static readonly INIT = 'init' as const;
/** @event */
static readonly CLOSE = 'close' as const;
/**
Expand Down Expand Up @@ -357,7 +376,7 @@ export interface ChangeStreamCursorOptions extends AbstractCursorOptions {
}

/** @internal */
export class ChangeStreamCursor extends AbstractCursor {
export class ChangeStreamCursor extends AbstractCursor<ChangeStreamEvents> {
_resumeToken: ResumeToken;
startAtOperationTime?: OperationTime;
hasReceived?: boolean;
Expand Down Expand Up @@ -476,8 +495,8 @@ export class ChangeStreamCursor extends AbstractCursor {

this._processBatch('firstBatch', response);

this.emit('init', response);
this.emit('response');
this.emit(ChangeStream.INIT, response);
this.emit(ChangeStream.RESPONSE);

// TODO: NODE-2882
callback(undefined, { server, session, response });
Expand All @@ -492,13 +511,19 @@ export class ChangeStreamCursor extends AbstractCursor {

this._processBatch('nextBatch', response);

this.emit('more', response);
this.emit('response');
this.emit(ChangeStream.MORE, response);
this.emit(ChangeStream.RESPONSE);
callback(err, response);
});
}
}

const CHANGE_STREAM_EVENTS = [
ChangeStream.RESUME_TOKEN_CHANGED,
ChangeStream.END,
ChangeStream.CLOSE
];

/**
* Create a new change stream cursor based on self's configuration
* @internal
Expand All @@ -525,7 +550,10 @@ function createChangeStreamCursor(
cursorOptions
);

relayEvents(changeStreamCursor, changeStream, ['resumeTokenChanged', 'end', 'close']);
for (const event of CHANGE_STREAM_EVENTS) {
changeStreamCursor.on(event, e => changeStream.emit(event, e));
}

if (changeStream.listenerCount(ChangeStream.CHANGE) > 0) {
streamEvents(changeStream, changeStreamCursor);
}
Expand Down
29 changes: 25 additions & 4 deletions src/cmap/connection.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import { EventEmitter } from 'events';
import { MessageStream, OperationDescription } from './message_stream';
import { StreamDescription, StreamDescriptionOptions } from './stream_description';
import {
Expand Down Expand Up @@ -43,6 +42,7 @@ import { ReadPreference, ReadPreferenceLike } from '../read_preference';
import { isTransactionCommand } from '../transactions';
import type { W, WriteConcern, WriteConcernOptions } from '../write_concern';
import type { ServerApi, SupportedNodeConnectionOptions } from '../mongo_client';
import { CancellationToken, TypedEventEmitter } from '../mongo_types';

const kStream = Symbol('stream');
const kQueue = Symbol('queue');
Expand Down Expand Up @@ -121,7 +121,7 @@ export interface ConnectionOptions
keepAliveInitialDelay?: number;
noDelay?: boolean;
socketTimeoutMS?: number;
cancellationToken?: EventEmitter;
cancellationToken?: CancellationToken;

metadata: ClientMetadata;
}
Expand All @@ -133,7 +133,17 @@ export interface DestroyOptions {
}

/** @public */
export class Connection extends EventEmitter {
export type ConnectionEvents = {
[Connection.COMMAND_STARTED](event: CommandStartedEvent): void;
[Connection.COMMAND_SUCCEEDED](event: CommandSucceededEvent): void;
[Connection.COMMAND_FAILED](event: CommandFailedEvent): void;
[Connection.CLUSTER_TIME_RECEIVED](clusterTime: Document): void;
[Connection.CLOSE](): void;
[Connection.MESSAGE](message: any): void;
};

/** @public */
export class Connection extends TypedEventEmitter<ConnectionEvents> {
id: number | '<monitor>';
address: string;
socketTimeoutMS: number;
Expand Down Expand Up @@ -167,6 +177,10 @@ export class Connection extends EventEmitter {
static readonly COMMAND_FAILED = 'commandFailed' as const;
/** @event */
static readonly CLUSTER_TIME_RECEIVED = 'clusterTimeReceived' as const;
/** @event */
static readonly CLOSE = 'close' as const;
/** @event */
static readonly MESSAGE = 'message' as const;

constructor(stream: Stream, options: ConnectionOptions) {
super();
Expand Down Expand Up @@ -266,7 +280,7 @@ export class Connection extends EventEmitter {
}

this[kQueue].clear();
this.emit('close');
this.emit(Connection.CLOSE);
}

destroy(): void;
Expand Down Expand Up @@ -586,6 +600,13 @@ export class Connection extends EventEmitter {
}
}

/** @public */
export const APM_EVENTS = [
Connection.COMMAND_STARTED,
Connection.COMMAND_SUCCEEDED,
Connection.COMMAND_FAILED
];

/** @internal */
export class CryptoConnection extends Connection {
/** @internal */
Expand Down
47 changes: 29 additions & 18 deletions src/cmap/connection_pool.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
import Denque = require('denque');
import { EventEmitter } from 'events';
import { Logger } from '../logger';
import { Connection, ConnectionOptions } from './connection';
import { APM_EVENTS, Connection, ConnectionEvents, ConnectionOptions } from './connection';
import { connect } from './connect';
import { eachAsync, relayEvents, makeCounter, Callback } from '../utils';
import { eachAsync, makeCounter, Callback } from '../utils';
import { MongoError } from '../error';
import { PoolClosedError, WaitQueueTimeoutError } from './errors';
import {
Expand All @@ -18,6 +17,7 @@ import {
ConnectionCheckedInEvent,
ConnectionPoolClearedEvent
} from './connection_pool_events';
import { CancellationToken, TypedEventEmitter } from '../mongo_types';

const kLogger = Symbol('logger');
const kConnections = Symbol('connections');
Expand Down Expand Up @@ -53,11 +53,25 @@ export interface CloseOptions {
force?: boolean;
}

/** @public */
export type ConnectionPoolEvents = {
[ConnectionPool.CONNECTION_POOL_CREATED](event: ConnectionPoolCreatedEvent): void;
[ConnectionPool.CONNECTION_POOL_CLOSED](event: ConnectionPoolClosedEvent): void;
[ConnectionPool.CONNECTION_POOL_CLEARED](event: ConnectionPoolClearedEvent): void;
[ConnectionPool.CONNECTION_CREATED](event: ConnectionCreatedEvent): void;
[ConnectionPool.CONNECTION_READY](event: ConnectionReadyEvent): void;
[ConnectionPool.CONNECTION_CLOSED](event: ConnectionClosedEvent): void;
[ConnectionPool.CONNECTION_CHECK_OUT_STARTED](event: ConnectionCheckOutStartedEvent): void;
[ConnectionPool.CONNECTION_CHECK_OUT_FAILED](event: ConnectionCheckOutFailedEvent): void;
[ConnectionPool.CONNECTION_CHECKED_OUT](event: ConnectionCheckedOutEvent): void;
[ConnectionPool.CONNECTION_CHECKED_IN](event: ConnectionCheckedInEvent): void;
} & Omit<ConnectionEvents, 'close' | 'message'>;

/**
* A pool of connections which dynamically resizes, and emit events related to pool activity
* @public
*/
export class ConnectionPool extends EventEmitter {
export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
closed: boolean;
options: Readonly<ConnectionPoolOptions>;
/** @internal */
Expand All @@ -79,7 +93,7 @@ export class ConnectionPool extends EventEmitter {
/** @internal */
[kConnectionCounter]: Generator<number>;
/** @internal */
[kCancellationToken]: EventEmitter;
[kCancellationToken]: CancellationToken;
/** @internal */
[kWaitQueue]: Denque<WaitQueueMember>;

Expand All @@ -93,6 +107,11 @@ export class ConnectionPool extends EventEmitter {
* @event
*/
static readonly CONNECTION_POOL_CLOSED = 'connectionPoolClosed' as const;
/**
* Emitted each time the connection pool is cleared and it's generation incremented
* @event
*/
static readonly CONNECTION_POOL_CLEARED = 'connectionPoolCleared' as const;
/**
* Emitted when a connection is created.
* @event
Expand Down Expand Up @@ -128,11 +147,6 @@ export class ConnectionPool extends EventEmitter {
* @event
*/
static readonly CONNECTION_CHECKED_IN = 'connectionCheckedIn' as const;
/**
* Emitted each time the connection pool is cleared and it's generation incremented
* @event
*/
static readonly CONNECTION_POOL_CLEARED = 'connectionPoolCleared' as const;

constructor(options: ConnectionPoolOptions) {
super();
Expand Down Expand Up @@ -161,7 +175,7 @@ export class ConnectionPool extends EventEmitter {
this[kMinPoolSizeTimer] = undefined;
this[kGeneration] = 0;
this[kConnectionCounter] = makeCounter(1);
this[kCancellationToken] = new EventEmitter();
this[kCancellationToken] = new CancellationToken();
this[kCancellationToken].setMaxListeners(Infinity);
this[kWaitQueue] = new Denque();

Expand Down Expand Up @@ -409,12 +423,9 @@ function createConnection(pool: ConnectionPool, callback?: Callback<Connection>)
}

// forward all events from the connection to the pool
relayEvents(connection, pool, [
Connection.COMMAND_STARTED,
Connection.COMMAND_FAILED,
Connection.COMMAND_SUCCEEDED,
Connection.CLUSTER_TIME_RECEIVED
]);
for (const event of [...APM_EVENTS, Connection.CLUSTER_TIME_RECEIVED]) {
connection.on(event, (e: any) => pool.emit(event, e));
}

pool.emit(ConnectionPool.CONNECTION_POOL_CREATED, new ConnectionCreatedEvent(pool, connection));

Expand Down Expand Up @@ -522,7 +533,7 @@ function processWaitQueue(pool: ConnectionPool) {
}
}

export const CMAP_EVENT_NAMES = [
export const CMAP_EVENTS = [
ConnectionPool.CONNECTION_POOL_CREATED,
ConnectionPool.CONNECTION_POOL_CLOSED,
ConnectionPool.CONNECTION_CREATED,
Expand Down
Loading