From 8a534bb074122cf8910f13c9ef4b6298317bd73d Mon Sep 17 00:00:00 2001 From: Matt Broadstone Date: Mon, 28 Oct 2019 09:53:46 -0400 Subject: [PATCH] fix(sdam): don't lose servers when they fail monitoring For legacy reasons the unified topology forced the connection pool into auto reconnect mode by default. This caused failed server checks to continue to emit errors on the server, causing the server to lose track of its monitoring state, and never returning the node to the pool of selectable servers. This results client-side as an error about server selection timing out. NODE-2274 --- lib/core/connection/pool.js | 11 ++----- lib/core/sdam/server.js | 34 ++++++------------- lib/core/sdam/topology.js | 66 ++++++++++++++++++++++++++++++------- 3 files changed, 67 insertions(+), 44 deletions(-) diff --git a/lib/core/connection/pool.js b/lib/core/connection/pool.js index a020840a4b7..a35e10c9a59 100644 --- a/lib/core/connection/pool.js +++ b/lib/core/connection/pool.js @@ -636,6 +636,7 @@ function destroy(self, connections, options, callback) { */ Pool.prototype.destroy = function(force, callback) { var self = this; + // Do not try again if the pool is already dead if (this.state === DESTROYED || self.state === DESTROYING) { if (typeof callback === 'function') callback(null, null); @@ -958,15 +959,6 @@ function createConnection(pool, callback) { pool.logger.debug(`connection attempt failed with error [${JSON.stringify(err)}]`); } - if (pool.options.legacyCompatMode === false) { - // The unified topology uses the reported `error` from a pool to track what error - // reason is returned to the user during selection timeout. We only want to emit - // this if the pool is active because the listeners are removed on destruction. - if (pool.state !== DESTROYED && pool.state !== DESTROYING) { - pool.emit('error', err); - } - } - // check if reconnect is enabled, and attempt retry if so if (!pool.reconnectId && pool.options.reconnect) { if (pool.state === CONNECTING && pool.options.legacyCompatMode) { @@ -1044,6 +1036,7 @@ function _execute(self) { // operations if (self.connectingConnections > 0) { self.executing = false; + setTimeout(() => _execute(self)(), 10); return; } diff --git a/lib/core/sdam/server.js b/lib/core/sdam/server.js index 19f94cd84bc..32767a2f5bf 100644 --- a/lib/core/sdam/server.js +++ b/lib/core/sdam/server.js @@ -148,16 +148,13 @@ class Server extends EventEmitter { { bson: this.s.bson } ); - // NOTE: this should only be the case if we are connecting to a single server - poolOptions.reconnect = true; + // NOTE: reconnect is explicitly false because of the server selection loop + poolOptions.reconnect = false; poolOptions.legacyCompatMode = false; this.s.pool = new Pool(this, poolOptions); // setup listeners - this.s.pool.on('connect', connectEventHandler(this)); - this.s.pool.on('close', errorEventHandler(this)); - this.s.pool.on('error', errorEventHandler(this)); this.s.pool.on('parseError', parseErrorEventHandler(this)); // it is unclear whether consumers should even know about these events @@ -169,14 +166,7 @@ class Server extends EventEmitter { relayEvents(this.s.pool, this, ['commandStarted', 'commandSucceeded', 'commandFailed']); stateTransition(this, STATE_CONNECTING); - - // If auth settings have been provided, use them - if (options.auth) { - this.s.pool.connect.apply(this.s.pool, options.auth); - return; - } - - this.s.pool.connect(); + this.s.pool.connect(connectEventHandler(this)); } /** @@ -474,7 +464,13 @@ function executeWriteOperation(args, options, callback) { } function connectEventHandler(server) { - return function(pool, conn) { + return function(err, conn) { + if (err) { + server.emit('error', new MongoNetworkError(err)); + server.emit('close'); + return; + } + const ismaster = conn.ismaster; server.s.lastIsMasterMS = conn.lastIsMasterMS; if (conn.agreedCompressor) { @@ -506,16 +502,6 @@ function connectEventHandler(server) { }; } -function errorEventHandler(server) { - return function(err) { - if (err) { - server.emit('error', new MongoNetworkError(err)); - } - - server.emit('close'); - }; -} - function parseErrorEventHandler(server) { return function(err) { stateTransition(this, STATE_CLOSED); diff --git a/lib/core/sdam/topology.js b/lib/core/sdam/topology.js index e8f1dc93dde..343db2e707b 100644 --- a/lib/core/sdam/topology.js +++ b/lib/core/sdam/topology.js @@ -894,15 +894,6 @@ function selectServers(topology, selector, timeout, start, callback) { topology.s.monitorTimers.push(timer); }); - const descriptionChangedHandler = () => { - // successful iteration, clear the check timer - clearTimeout(iterationTimer); - topology.s.iterationTimers.splice(timerIndex, 1); - - // topology description has changed due to monitoring, reattempt server selection - selectServers(topology, selector, timeout, start, callback); - }; - const iterationTimer = setTimeout(() => { topology.removeListener('topologyDescriptionChanged', descriptionChangedHandler); callback( @@ -913,8 +904,17 @@ function selectServers(topology, selector, timeout, start, callback) { ); }, timeout - duration); + const descriptionChangedHandler = () => { + // successful iteration, clear the check timer + removeTimerFrom(iterationTimer, topology.s.iterationTimers); + clearTimeout(iterationTimer); + + // topology description has changed due to monitoring, reattempt server selection + selectServers(topology, selector, timeout, start, callback); + }; + // track this timer in case we need to clean it up outside this loop - const timerIndex = topology.s.iterationTimers.push(iterationTimer); + topology.s.iterationTimers.push(iterationTimer); topology.once('topologyDescriptionChanged', descriptionChangedHandler); }; @@ -922,7 +922,7 @@ function selectServers(topology, selector, timeout, start, callback) { retrySelection(); } -function createAndConnectServer(topology, serverDescription) { +function createAndConnectServer(topology, serverDescription, connectDelay) { topology.emit( 'serverOpening', new monitoring.ServerOpeningEvent(topology.s.id, serverDescription.address) @@ -934,10 +934,45 @@ function createAndConnectServer(topology, serverDescription) { server.once('connect', serverConnectEventHandler(server, topology)); server.on('descriptionReceived', topology.serverUpdateHandler.bind(topology)); server.on('error', serverErrorEventHandler(server, topology)); + + if (connectDelay) { + const connectTimer = setTimeout(() => { + removeTimerFrom(connectTimer, topology.s.iterationTimers); + server.connect(); + }, connectDelay); + + topology.s.iterationTimers.push(connectTimer); + return server; + } + server.connect(); return server; } +function resetServer(topology, serverDescription) { + if (!topology.s.servers.has(serverDescription.address)) { + return; + } + + // first remove the old server + const server = topology.s.servers.get(serverDescription.address); + destroyServer(server, topology); + + // add the new server, and attempt connection after a delay + const newServer = createAndConnectServer( + topology, + serverDescription, + topology.s.heartbeatFrequencyMS + ); + + topology.s.servers.set(serverDescription.address, newServer); +} + +function removeTimerFrom(timer, timers) { + const idx = timers.findIndex(t => t === timer); + timers.splice(idx, 1); +} + /** * Create `Server` instances for all initially known servers, connect them, and assign * them to the passed in `Topology`. @@ -954,6 +989,15 @@ function connectServers(topology, serverDescriptions) { } function updateServers(topology, incomingServerDescription) { + // if the server was reset internally because of an error, we need to replace the + // `Server` instance for it so we can attempt reconnect. + // + // TODO: this logical can change once CMAP is put in place + if (incomingServerDescription && incomingServerDescription.error) { + resetServer(topology, incomingServerDescription); + return; + } + // update the internal server's description if (incomingServerDescription && topology.s.servers.has(incomingServerDescription.address)) { const server = topology.s.servers.get(incomingServerDescription.address);