From 4aa9d9d1add0a469d3c3c16ad70e574830158db2 Mon Sep 17 00:00:00 2001 From: Daria Pardue Date: Thu, 14 Jul 2022 10:13:02 -0400 Subject: [PATCH] refactor(NODE-4415): clean up cmap implementation (#3320) --- src/cmap/connection_pool.ts | 375 ++++++++++++++++++++---------------- src/index.ts | 1 + 2 files changed, 208 insertions(+), 168 deletions(-) diff --git a/src/cmap/connection_pool.ts b/src/cmap/connection_pool.ts index e8888ccc54..9adae0a3f5 100644 --- a/src/cmap/connection_pool.ts +++ b/src/cmap/connection_pool.ts @@ -62,6 +62,8 @@ const kCancelled = Symbol('cancelled'); const kMetrics = Symbol('metrics'); /** @internal */ const kProcessingWaitQueue = Symbol('processingWaitQueue'); +/** @internal */ +const kPoolState = Symbol('poolState'); /** @public */ export interface ConnectionPoolOptions extends Omit { @@ -86,6 +88,13 @@ export interface WaitQueueMember { [kCancelled]?: boolean; } +/** @internal */ +export const PoolState = Object.freeze({ + paused: 'paused', + ready: 'ready', + closed: 'closed' +} as const); + /** @public */ export interface CloseOptions { force?: boolean; @@ -110,9 +119,10 @@ export type ConnectionPoolEvents = { * @internal */ export class ConnectionPool extends TypedEventEmitter { - closed: boolean; options: Readonly; /** @internal */ + [kPoolState]: typeof PoolState[keyof typeof PoolState]; + /** @internal */ [kLogger]: Logger; /** @internal */ [kConnections]: Denque; @@ -197,7 +207,6 @@ export class ConnectionPool extends TypedEventEmitter { constructor(options: ConnectionPoolOptions) { super(); - this.closed = false; this.options = Object.freeze({ ...options, connectionType: Connection, @@ -216,6 +225,7 @@ export class ConnectionPool extends TypedEventEmitter { ); } + this[kPoolState] = PoolState.paused; this[kLogger] = new Logger('ConnectionPool'); this[kConnections] = new Denque(); this[kPending] = 0; @@ -232,7 +242,7 @@ export class ConnectionPool extends TypedEventEmitter { process.nextTick(() => { this.emit(ConnectionPool.CONNECTION_POOL_CREATED, new ConnectionPoolCreatedEvent(this)); - ensureMinPoolSize(this); + this.ensureMinPoolSize(); }); } @@ -241,6 +251,15 @@ export class ConnectionPool extends TypedEventEmitter { return this.options.hostAddress.toString(); } + /** + * Check if the pool has been closed + * + * TODO(NODE-3263): We can remove this property once shell no longer needs it + */ + get closed(): boolean { + return this[kPoolState] === PoolState.closed; + } + /** An integer representing the SDAM generation of the pool */ get generation(): number { return this[kGeneration]; @@ -285,6 +304,13 @@ export class ConnectionPool extends TypedEventEmitter { return this[kMetrics].info(this.options.maxPoolSize); } + /** + * Set the pool state to "ready" + */ + ready(): void { + this[kPoolState] = PoolState.ready; + } + /** * Check a connection out of this pool. The connection will continue to be tracked, but no reference to it * will be held by the pool. This means that if a connection is checked out it MUST be checked back in or @@ -328,7 +354,7 @@ export class ConnectionPool extends TypedEventEmitter { } this[kWaitQueue].push(waitQueueMember); - process.nextTick(processWaitQueue, this); + process.nextTick(() => this.processWaitQueue()); } /** @@ -338,7 +364,7 @@ export class ConnectionPool extends TypedEventEmitter { */ checkIn(connection: Connection): void { const poolClosed = this.closed; - const stale = connectionIsStale(this, connection); + const stale = this.connectionIsStale(connection); const willDestroy = !!(poolClosed || stale || connection.closed); if (!willDestroy) { @@ -351,10 +377,10 @@ export class ConnectionPool extends TypedEventEmitter { if (willDestroy) { const reason = connection.closed ? 'error' : poolClosed ? 'poolClosed' : 'stale'; - destroyConnection(this, connection, reason); + this.destroyConnection(connection, reason); } - process.nextTick(processWaitQueue, this); + process.nextTick(() => this.processWaitQueue()); } /** @@ -380,7 +406,10 @@ export class ConnectionPool extends TypedEventEmitter { this[kGeneration] += 1; } - this.emit('connectionPoolCleared', new ConnectionPoolClearedEvent(this, serviceId)); + this.emit( + ConnectionPool.CONNECTION_POOL_CLEARED, + new ConnectionPoolClearedEvent(this, serviceId) + ); } /** Close the pool */ @@ -427,7 +456,7 @@ export class ConnectionPool extends TypedEventEmitter { } // mark the pool as closed immediately - this.closed = true; + this[kPoolState] = PoolState.closed; eachAsync( this[kConnections].toArray(), (conn, cb) => { @@ -496,201 +525,211 @@ export class ConnectionPool extends TypedEventEmitter { }); }); } -} -function ensureMinPoolSize(pool: ConnectionPool) { - const minPoolSize = pool.options.minPoolSize; - if (pool.closed || minPoolSize === 0) { - return; - } + private destroyConnection(connection: Connection, reason: string) { + this.emit( + ConnectionPool.CONNECTION_CLOSED, + new ConnectionClosedEvent(this, connection, reason) + ); - if ( - pool.totalConnectionCount < minPoolSize && - pool.pendingConnectionCount < pool.options.maxConnecting - ) { - // NOTE: ensureMinPoolSize should not try to get all the pending - // connection permits because that potentially delays the availability of - // the connection to a checkout request - createConnection(pool, (err, connection) => { - pool[kPending]--; - if (!err && connection) { - pool[kConnections].push(connection); - process.nextTick(processWaitQueue, pool); - } - pool[kMinPoolSizeTimer] = setTimeout(() => ensureMinPoolSize(pool), 10); - }); - } else { - pool[kMinPoolSizeTimer] = setTimeout(() => ensureMinPoolSize(pool), 100); + // destroy the connection + process.nextTick(() => connection.destroy()); } -} -function connectionIsStale(pool: ConnectionPool, connection: Connection) { - const serviceId = connection.serviceId; - if (pool.loadBalanced && serviceId) { - const sid = serviceId.toHexString(); - const generation = pool.serviceGenerations.get(sid); - return connection.generation !== generation; - } + private connectionIsStale(connection: Connection) { + const serviceId = connection.serviceId; + if (this.loadBalanced && serviceId) { + const sid = serviceId.toHexString(); + const generation = this.serviceGenerations.get(sid); + return connection.generation !== generation; + } - return connection.generation !== pool[kGeneration]; -} + return connection.generation !== this[kGeneration]; + } -function connectionIsIdle(pool: ConnectionPool, connection: Connection) { - return !!(pool.options.maxIdleTimeMS && connection.idleTime > pool.options.maxIdleTimeMS); -} + private connectionIsIdle(connection: Connection) { + return !!(this.options.maxIdleTimeMS && connection.idleTime > this.options.maxIdleTimeMS); + } -function createConnection(pool: ConnectionPool, callback: Callback) { - const connectOptions: ConnectionOptions = { - ...pool.options, - id: pool[kConnectionCounter].next().value, - generation: pool[kGeneration], - cancellationToken: pool[kCancellationToken] - }; - - pool[kPending]++; - // This is our version of a "virtual" no-I/O connection as the spec requires - pool.emit( - ConnectionPool.CONNECTION_CREATED, - new ConnectionCreatedEvent(pool, { id: connectOptions.id }) - ); - - connect(connectOptions, (err, connection) => { - if (err || !connection) { - pool[kLogger].debug(`connection attempt failed with error [${JSON.stringify(err)}]`); - callback(err); - return; + private connectionIsPerished(connection: Connection) { + const isStale = this.connectionIsStale(connection); + const isIdle = this.connectionIsIdle(connection); + if (!isStale && !isIdle && !connection.closed) { + return false; } + const reason = connection.closed ? 'error' : isStale ? 'stale' : 'idle'; + this.destroyConnection(connection, reason); + return true; + } - // The pool might have closed since we started trying to create a connection - if (pool.closed) { - pool[kPending]--; - connection.destroy({ force: true }); - return; - } + private createConnection(callback: Callback) { + const connectOptions: ConnectionOptions = { + ...this.options, + id: this[kConnectionCounter].next().value, + generation: this[kGeneration], + cancellationToken: this[kCancellationToken] + }; - // forward all events from the connection to the pool - for (const event of [...APM_EVENTS, Connection.CLUSTER_TIME_RECEIVED]) { - connection.on(event, (e: any) => pool.emit(event, e)); - } + this[kPending]++; + // This is our version of a "virtual" no-I/O connection as the spec requires + this.emit( + ConnectionPool.CONNECTION_CREATED, + new ConnectionCreatedEvent(this, { id: connectOptions.id }) + ); - if (pool.loadBalanced) { - connection.on(Connection.PINNED, pinType => pool[kMetrics].markPinned(pinType)); - connection.on(Connection.UNPINNED, pinType => pool[kMetrics].markUnpinned(pinType)); + connect(connectOptions, (err, connection) => { + if (err || !connection) { + this[kLogger].debug(`connection attempt failed with error [${JSON.stringify(err)}]`); + callback(err); + return; + } - const serviceId = connection.serviceId; - if (serviceId) { - let generation; - const sid = serviceId.toHexString(); - if ((generation = pool.serviceGenerations.get(sid))) { - connection.generation = generation; - } else { - pool.serviceGenerations.set(sid, 0); - connection.generation = 0; - } + // The pool might have closed since we started trying to create a connection + if (this.closed) { + this[kPending]--; + connection.destroy({ force: true }); + return; } - } - connection.markAvailable(); - pool.emit(ConnectionPool.CONNECTION_READY, new ConnectionReadyEvent(pool, connection)); + // forward all events from the connection to the pool + for (const event of [...APM_EVENTS, Connection.CLUSTER_TIME_RECEIVED]) { + connection.on(event, (e: any) => this.emit(event, e)); + } - callback(undefined, connection); - return; - }); -} + if (this.loadBalanced) { + connection.on(Connection.PINNED, pinType => this[kMetrics].markPinned(pinType)); + connection.on(Connection.UNPINNED, pinType => this[kMetrics].markUnpinned(pinType)); -function destroyConnection(pool: ConnectionPool, connection: Connection, reason: string) { - pool.emit(ConnectionPool.CONNECTION_CLOSED, new ConnectionClosedEvent(pool, connection, reason)); + const serviceId = connection.serviceId; + if (serviceId) { + let generation; + const sid = serviceId.toHexString(); + if ((generation = this.serviceGenerations.get(sid))) { + connection.generation = generation; + } else { + this.serviceGenerations.set(sid, 0); + connection.generation = 0; + } + } + } - // destroy the connection - process.nextTick(() => connection.destroy()); -} + connection.markAvailable(); + this.emit(ConnectionPool.CONNECTION_READY, new ConnectionReadyEvent(this, connection)); -function processWaitQueue(pool: ConnectionPool) { - if (pool.closed || pool[kProcessingWaitQueue]) { - return; + callback(undefined, connection); + return; + }); } - pool[kProcessingWaitQueue] = true; - while (pool.waitQueueSize) { - const waitQueueMember = pool[kWaitQueue].peekFront(); - if (!waitQueueMember) { - pool[kWaitQueue].shift(); - continue; + private ensureMinPoolSize() { + const minPoolSize = this.options.minPoolSize; + if (this.closed || minPoolSize === 0) { + return; } - if (waitQueueMember[kCancelled]) { - pool[kWaitQueue].shift(); - continue; + if ( + this.totalConnectionCount < minPoolSize && + this.pendingConnectionCount < this.options.maxConnecting + ) { + // NOTE: ensureMinPoolSize should not try to get all the pending + // connection permits because that potentially delays the availability of + // the connection to a checkout request + this.createConnection((err, connection) => { + this[kPending]--; + if (!err && connection) { + this[kConnections].push(connection); + process.nextTick(() => this.processWaitQueue()); + } + this[kMinPoolSizeTimer] = setTimeout(() => this.ensureMinPoolSize(), 10); + }); + } else { + this[kMinPoolSizeTimer] = setTimeout(() => this.ensureMinPoolSize(), 100); } + } - if (!pool.availableConnectionCount) { - break; + private processWaitQueue() { + if (this.closed || this[kProcessingWaitQueue]) { + return; } - const connection = pool[kConnections].shift(); - if (!connection) { - break; - } + this[kProcessingWaitQueue] = true; - const isStale = connectionIsStale(pool, connection); - const isIdle = connectionIsIdle(pool, connection); - if (!isStale && !isIdle && !connection.closed) { - pool[kCheckedOut]++; - pool.emit( - ConnectionPool.CONNECTION_CHECKED_OUT, - new ConnectionCheckedOutEvent(pool, connection) - ); - if (waitQueueMember.timer) { - clearTimeout(waitQueueMember.timer); + while (this.waitQueueSize) { + const waitQueueMember = this[kWaitQueue].peekFront(); + if (!waitQueueMember) { + this[kWaitQueue].shift(); + continue; } - pool[kWaitQueue].shift(); - waitQueueMember.callback(undefined, connection); - } else { - const reason = connection.closed ? 'error' : isStale ? 'stale' : 'idle'; - destroyConnection(pool, connection, reason); - } - } - - const { maxPoolSize, maxConnecting } = pool.options; - while ( - pool.waitQueueSize > 0 && - pool.pendingConnectionCount < maxConnecting && - (maxPoolSize === 0 || pool.totalConnectionCount < maxPoolSize) - ) { - const waitQueueMember = pool[kWaitQueue].shift(); - if (!waitQueueMember || waitQueueMember[kCancelled]) { - continue; - } - createConnection(pool, (err, connection) => { - pool[kPending]--; if (waitQueueMember[kCancelled]) { - if (!err && connection) { - pool[kConnections].push(connection); - } - } else { - if (err) { - pool.emit( - ConnectionPool.CONNECTION_CHECK_OUT_FAILED, - new ConnectionCheckOutFailedEvent(pool, err) - ); - } else if (connection) { - pool[kCheckedOut]++; - pool.emit( - ConnectionPool.CONNECTION_CHECKED_OUT, - new ConnectionCheckedOutEvent(pool, connection) - ); - } + this[kWaitQueue].shift(); + continue; + } + + if (!this.availableConnectionCount) { + break; + } + + const connection = this[kConnections].shift(); + if (!connection) { + break; + } + if (!this.connectionIsPerished(connection)) { + this[kCheckedOut]++; + this.emit( + ConnectionPool.CONNECTION_CHECKED_OUT, + new ConnectionCheckedOutEvent(this, connection) + ); if (waitQueueMember.timer) { clearTimeout(waitQueueMember.timer); } - waitQueueMember.callback(err, connection); + + this[kWaitQueue].shift(); + waitQueueMember.callback(undefined, connection); } - process.nextTick(processWaitQueue, pool); - }); + } + + const { maxPoolSize, maxConnecting } = this.options; + while ( + this.waitQueueSize > 0 && + this.pendingConnectionCount < maxConnecting && + (maxPoolSize === 0 || this.totalConnectionCount < maxPoolSize) + ) { + const waitQueueMember = this[kWaitQueue].shift(); + if (!waitQueueMember || waitQueueMember[kCancelled]) { + continue; + } + this.createConnection((err, connection) => { + this[kPending]--; + if (waitQueueMember[kCancelled]) { + if (!err && connection) { + this[kConnections].push(connection); + } + } else { + if (err) { + this.emit( + ConnectionPool.CONNECTION_CHECK_OUT_FAILED, + new ConnectionCheckOutFailedEvent(this, err) + ); + } else if (connection) { + this[kCheckedOut]++; + this.emit( + ConnectionPool.CONNECTION_CHECKED_OUT, + new ConnectionCheckedOutEvent(this, connection) + ); + } + + if (waitQueueMember.timer) { + clearTimeout(waitQueueMember.timer); + } + waitQueueMember.callback(err, connection); + } + process.nextTick(() => this.processWaitQueue()); + }); + } + this[kProcessingWaitQueue] = false; } - pool[kProcessingWaitQueue] = false; } /** diff --git a/src/index.ts b/src/index.ts index 24a819ba21..6f8fa3d227 100644 --- a/src/index.ts +++ b/src/index.ts @@ -232,6 +232,7 @@ export type { ConnectionPool, ConnectionPoolEvents, ConnectionPoolOptions, + PoolState, WaitQueueMember, WithConnectionCallback } from './cmap/connection_pool';