diff --git a/packages/livedata/DDP.md b/packages/livedata/DDP.md index 655f1ca1058..1bb201ee371 100644 --- a/packages/livedata/DDP.md +++ b/packages/livedata/DDP.md @@ -71,6 +71,19 @@ connections. The client can rely on the server sending a `failed` message if a better version is possible as a result of the client or the server having been upgraded. +## Heartbeats + +### Messages: + + * `ping` + - `id`: optional string (identifier used to correlate with response) + * `pong` + - `id`: optional string (same as received in the `ping` message) + +### Procedure: + +At any time after the connection is established either side may send a `ping` message. The sender may chose to include an `id` field in the `ping` message. When the other side receives a `ping` it must immediately respond with a `pong` message. If the received `ping` message includes an `id` field, the `pong` message must include the same `id` field. + ## Managing Data: ### Messages: diff --git a/packages/livedata/heartbeat.js b/packages/livedata/heartbeat.js new file mode 100644 index 00000000000..0eaafd9cdc4 --- /dev/null +++ b/packages/livedata/heartbeat.js @@ -0,0 +1,102 @@ +// Heartbeat options: +// heartbeatInterval: interval to send pings, in milliseconds. +// heartbeatTimeout: timeout to close the connection if a reply isn't +// received, in milliseconds. +// sendPing: function to call to send a ping on the connection. +// onTimeout: function to call to close the connection. + +Heartbeat = function (options) { + var self = this; + + self.heartbeatInterval = options.heartbeatInterval; + self.heartbeatTimeout = options.heartbeatTimeout; + self._sendPing = options.sendPing; + self._onTimeout = options.onTimeout; + + self._heartbeatIntervalHandle = null; + self._heartbeatTimeoutHandle = null; +}; + +_.extend(Heartbeat.prototype, { + stop: function () { + var self = this; + self._clearHeartbeatIntervalTimer(); + self._clearHeartbeatTimeoutTimer(); + }, + + start: function () { + var self = this; + self.stop(); + self._startHeartbeatIntervalTimer(); + }, + + _startHeartbeatIntervalTimer: function () { + var self = this; + self._heartbeatIntervalHandle = Meteor.setTimeout( + _.bind(self._heartbeatIntervalFired, self), + self.heartbeatInterval + ); + }, + + _startHeartbeatTimeoutTimer: function () { + var self = this; + self._heartbeatTimeoutHandle = Meteor.setTimeout( + _.bind(self._heartbeatTimeoutFired, self), + self.heartbeatTimeout + ); + }, + + _clearHeartbeatIntervalTimer: function () { + var self = this; + if (self._heartbeatIntervalHandle) { + Meteor.clearTimeout(self._heartbeatIntervalHandle); + self._heartbeatIntervalHandle = null; + } + }, + + _clearHeartbeatTimeoutTimer: function () { + var self = this; + if (self._heartbeatTimeoutHandle) { + Meteor.clearTimeout(self._heartbeatTimeoutHandle); + self._heartbeatTimeoutHandle = null; + } + }, + + // The heartbeat interval timer is fired when we should send a ping. + _heartbeatIntervalFired: function () { + var self = this; + self._heartbeatIntervalHandle = null; + self._sendPing(); + // Wait for a pong. + self._startHeartbeatTimeoutTimer(); + }, + + // The heartbeat timeout timer is fired when we sent a ping, but we + // timed out waiting for the pong. + _heartbeatTimeoutFired: function () { + var self = this; + self._heartbeatTimeoutHandle = null; + self._onTimeout(); + }, + + pingReceived: function () { + var self = this; + // We know the connection is alive if we receive a ping, so we + // don't need to send a ping ourselves. Reset the interval timer. + if (self._heartbeatIntervalHandle) { + self._clearHeartbeatIntervalTimer(); + self._startHeartbeatIntervalTimer(); + } + }, + + pongReceived: function () { + var self = this; + + // Receiving a pong means we won't timeout, so clear the timeout + // timer and start the interval again. + if (self._heartbeatTimeoutHandle) { + self._clearHeartbeatTimeoutTimer(); + self._startHeartbeatIntervalTimer(); + } + } +}); diff --git a/packages/livedata/livedata_common.js b/packages/livedata/livedata_common.js index f9a873c6f71..9c59d0e5108 100644 --- a/packages/livedata/livedata_common.js +++ b/packages/livedata/livedata_common.js @@ -1,6 +1,6 @@ DDP = {}; -SUPPORTED_DDP_VERSIONS = [ 'pre1' ]; +SUPPORTED_DDP_VERSIONS = [ 'pre2', 'pre1' ]; LivedataTest.SUPPORTED_DDP_VERSIONS = SUPPORTED_DDP_VERSIONS; diff --git a/packages/livedata/livedata_connection.js b/packages/livedata/livedata_connection.js index be5b79e15c4..ed1e8acccae 100644 --- a/packages/livedata/livedata_connection.js +++ b/packages/livedata/livedata_connection.js @@ -31,10 +31,13 @@ var Connection = function (url, options) { onDDPVersionNegotiationFailure: function (description) { Meteor._debug(description); }, + heartbeatInterval: 35000, + heartbeatTimeout: 15000, // These options are only for testing. reloadWithOutstanding: false, supportedDDPVersions: SUPPORTED_DDP_VERSIONS, - retry: true + retry: true, + respondToPings: true }, options); // If set, called when we reconnect, queuing method calls _before_ the @@ -64,6 +67,9 @@ var Connection = function (url, options) { self._nextMethodId = 1; self._supportedDDPVersions = options.supportedDDPVersions; + self._heartbeatInterval = options.heartbeatInterval; + self._heartbeatTimeout = options.heartbeatTimeout; + // Tracks methods which the user has tried to call but which have not yet // called their user callback (ie, they are waiting on their result or for all // of their writes to be written to the local cache). Map from method ID to @@ -224,6 +230,17 @@ var Connection = function (url, options) { options.onDDPVersionNegotiationFailure(description); } } + else if (msg.msg === 'ping') { + if (options.respondToPings) + self._send({msg: "pong", id: msg.id}); + if (self._heartbeat) + self._heartbeat.pingReceived(); + } + else if (msg.msg === 'pong') { + if (self._heartbeat) { + self._heartbeat.pongReceived(); + } + } else if (_.include(['added', 'changed', 'removed', 'ready', 'updated'], msg.msg)) self._livedata_data(msg); else if (msg.msg === 'nosub') @@ -291,12 +308,21 @@ var Connection = function (url, options) { }); }; + var onDisconnect = function () { + if (self._heartbeat) { + self._heartbeat.stop() + self._heartbeat = null; + } + }; + if (Meteor.isServer) { self._stream.on('message', Meteor.bindEnvironment(onMessage, Meteor._debug)); self._stream.on('reset', Meteor.bindEnvironment(onReset, Meteor._debug)); + self._stream.on('disconnect', Meteor.bindEnvironment(onDisconnect, Meteor._debug)); } else { self._stream.on('message', onMessage); self._stream.on('reset', onReset); + self._stream.on('disconnect', onDisconnect); } }; @@ -834,6 +860,14 @@ _.extend(Connection.prototype, { self._stream.send(stringifyDDP(obj)); }, + // We detected via DDP-level heartbeats that we've lost the + // connection. Unlike `disconnect` or `close`, a lost connection + // will be automatically retried. + _lostConnection: function () { + var self = this; + self._stream._lostConnection(); + }, + status: function (/*passthrough args*/) { var self = this; return self._stream.status.apply(self._stream, arguments); @@ -893,6 +927,28 @@ _.extend(Connection.prototype, { _livedata_connected: function (msg) { var self = this; + if (self._version !== 'pre1' && self._heartbeatInterval !== 0) { + self._heartbeat = new Heartbeat({ + heartbeatInterval: self._heartbeatInterval, + heartbeatTimeout: self._heartbeatTimeout, + onTimeout: function () { + if (Meteor.isClient) { + // only print on the client. this message is useful on the + // browser console to see that we've lost connection. on the + // server (eg when doing server-to-server DDP), it gets + // kinda annoying. also this matches the behavior with + // sockjs timeouts. + Meteor._debug("Connection timeout. No DDP heartbeat received."); + } + self._lostConnection(); + }, + sendPing: function () { + self._send({msg: 'ping'}); + } + }); + self._heartbeat.start(); + } + // If this is a reconnect, we'll have to reset all stores. if (self._lastSessionId) self._resetStores = true; diff --git a/packages/livedata/livedata_connection_tests.js b/packages/livedata/livedata_connection_tests.js index b37e3d97a74..ea84baa2987 100644 --- a/packages/livedata/livedata_connection_tests.js +++ b/packages/livedata/livedata_connection_tests.js @@ -1285,6 +1285,25 @@ Tinytest.add("livedata connection - onReconnect prepends messages correctly with ]); }); +Tinytest.add("livedata connection - ping without id", function (test) { + var stream = new StubStream(); + var conn = newConnection(stream); + startAndConnect(test, stream); + + stream.receive({msg: 'ping'}); + testGotMessage(test, stream, {msg: 'pong'}); +}); + +Tinytest.add("livedata connection - ping with id", function (test) { + var stream = new StubStream(); + var conn = newConnection(stream); + startAndConnect(test, stream); + + var id = Random.id(); + stream.receive({msg: 'ping', id: id}); + testGotMessage(test, stream, {msg: 'pong', id: id}); +}); + var getSelfConnectionUrl = function () { if (Meteor.isClient) { return Meteor._relativeToSiteRootUrl("/"); diff --git a/packages/livedata/livedata_server.js b/packages/livedata/livedata_server.js index cbdd29ae63a..d133e648c01 100644 --- a/packages/livedata/livedata_server.js +++ b/packages/livedata/livedata_server.js @@ -216,7 +216,7 @@ _.extend(SessionCollectionView.prototype, { /* Session */ /******************************************************************************/ -var Session = function (server, version, socket) { +var Session = function (server, version, socket, options) { var self = this; self.id = Random.id(); @@ -262,13 +262,16 @@ var Session = function (server, version, socket) { // temporary and will go away in the near future. self._socketUrl = socket.url; + // Allow tests to disable responding to pings. + self._respondToPings = options.respondToPings; + // This object is the public interface to the session. In the public // API, it is called the `connection` object. Internally we call it // a `connectionHandle` to avoid ambiguity. self.connectionHandle = { id: self.id, close: function () { - self.server._closeSession(self); + self.close(); }, onClose: function (fn) { var cb = Meteor.bindEnvironment(fn, "connection onClose callback"); @@ -290,6 +293,20 @@ var Session = function (server, version, socket) { self.startUniversalSubs(); }).run(); + if (version !== 'pre1' && options.heartbeatInterval !== 0) { + self.heartbeat = new Heartbeat({ + heartbeatInterval: options.heartbeatInterval, + heartbeatTimeout: options.heartbeatTimeout, + onTimeout: function () { + self.destroy(); + }, + sendPing: function () { + self.send({msg: 'ping'}); + } + }); + self.heartbeat.start(); + } + Package.facts && Package.facts.Facts.incrementServerFact( "livedata", "sessions", 1); }; @@ -391,6 +408,15 @@ _.extend(Session.prototype, { destroy: function () { var self = this; + // Already destroyed. + if (!self.inQueue) + return; + + if (self.heartbeat) { + self.heartbeat.stop(); + self.heartbeat = null; + } + if (self.socket) { self.socket.close(); self.socket._meteorSession = null; @@ -417,6 +443,19 @@ _.extend(Session.prototype, { }); }, + // Destroy this session and unregister it at the server. + close: function () { + var self = this; + + // Unconditionally destroy this session, even if it's not + // registered at the server. + self.destroy(); + + // Unregister the session. This will also call `destroy`, but + // that's OK because `destroy` is idempotent. + self.server._closeSession(self); + }, + // Send a message (doing nothing if no socket is connected right now.) // It should be a JSON object (it will be stringified.) send: function (msg) { @@ -457,6 +496,32 @@ _.extend(Session.prototype, { if (!self.inQueue) // we have been destroyed. return; + // Respond to ping and pong messages immediately without queuing. + // If the negotiated DDP version is "pre1" which didn't support + // pings, preserve the "pre1" behavior of responding with a "bad + // request" for the unknown messages. + // + // Fibers are needed because heartbeat uses Meteor.setTimeout, which + // needs a Fiber. We could actually use regular setTimeout and avoid + // these new fibers, but it is easier to just make everything use + // Meteor.setTimeout and not think too hard. + if (self.version !== 'pre1' && msg_in.msg === 'ping') { + if (self._respondToPings) + self.send({msg: "pong", id: msg_in.id}); + if (self.heartbeat) + Fiber(function () { + self.heartbeat.pingReceived(); + }).run(); + return; + } + if (self.version !== 'pre1' && msg_in.msg === 'pong') { + if (self.heartbeat) + Fiber(function () { + self.heartbeat.pongReceived(); + }).run(); + return; + } + self.inQueue.push(msg_in); if (self.workerRunning) return; @@ -1047,9 +1112,20 @@ _.extend(Subscription.prototype, { /* Server */ /******************************************************************************/ -Server = function () { +Server = function (options) { var self = this; + // The default heartbeat interval is 30 seconds on the server and 35 + // seconds on the client. Since the client doesn't need to send a + // ping as long as it is receiving pings, this means that pings + // normally go from the server to the client. + self.options = _.defaults(options || {}, { + heartbeatInterval: 30000, + heartbeatTimeout: 15000, + // For testing, allow responding to pings to be disabled. + respondToPings: true + }); + // Map of callbacks to call when a new connection comes in to the // server and completes DDP version negotiation. Use an object instead // of an array so we can safely remove one from the list while @@ -1120,7 +1196,7 @@ Server = function () { socket.on('close', function () { if (socket._meteorSession) { Fiber(function () { - self._closeSession(socket._meteorSession); + socket._meteorSession.close(); }).run(); } }); @@ -1142,7 +1218,7 @@ _.extend(Server.prototype, { if (msg.version === version) { // Creating a new session - socket._meteorSession = new Session(self, version, socket); + socket._meteorSession = new Session(self, version, socket, self.options); self.sessions[socket._meteorSession.id] = socket._meteorSession; self.onConnectionHook.each(function (callback) { if (socket._meteorSession) diff --git a/packages/livedata/package.js b/packages/livedata/package.js index b3c5f33cb36..3fe9b835e4a 100644 --- a/packages/livedata/package.js +++ b/packages/livedata/package.js @@ -49,6 +49,7 @@ Package.on_use(function (api) { // _idParse, _idStringify. api.use('minimongo', ['client', 'server']); + api.add_files('heartbeat.js', ['client', 'server']); api.add_files('livedata_server.js', 'server'); diff --git a/packages/livedata/stream_client_common.js b/packages/livedata/stream_client_common.js index 6f551ca424d..cdc90928112 100644 --- a/packages/livedata/stream_client_common.js +++ b/packages/livedata/stream_client_common.js @@ -87,7 +87,7 @@ _.extend(LivedataTest.ClientStream.prototype, { on: function (name, callback) { var self = this; - if (name !== 'message' && name !== 'reset') + if (name !== 'message' && name !== 'reset' && name !== 'disconnect') throw new Error("unknown event type: " + name); if (!self.eventCallbacks[name]) diff --git a/packages/livedata/stream_client_nodejs.js b/packages/livedata/stream_client_nodejs.js index a3bc996d27f..718ee3fe417 100644 --- a/packages/livedata/stream_client_nodejs.js +++ b/packages/livedata/stream_client_nodejs.js @@ -96,6 +96,8 @@ _.extend(LivedataTest.ClientStream.prototype, { self.client = null; client.close(); } + + _.each(self.eventCallbacks.disconnect, function (callback) { callback(); }); }, _clearConnectionTimer: function () { diff --git a/packages/livedata/stream_client_sockjs.js b/packages/livedata/stream_client_sockjs.js index 3f42a76371c..b7e39a13a45 100644 --- a/packages/livedata/stream_client_sockjs.js +++ b/packages/livedata/stream_client_sockjs.js @@ -12,13 +12,15 @@ LivedataTest.ClientStream = function (url, options) { // how long between hearing heartbeat from the server until we declare - // the connection dead. heartbeats come every 25s (stream_server.js) + // the connection dead. heartbeats come every 45s (stream_server.js) // - // NOTE: this is a workaround until sockjs detects heartbeats on the - // client automatically. - // https://github.com/sockjs/sockjs-client/issues/67 - // https://github.com/sockjs/sockjs-node/issues/68 - self.HEARTBEAT_TIMEOUT = 60000; + // NOTE: this is a older timeout mechanism. We now send heartbeats at + // the DDP level (https://github.com/meteor/meteor/pull/1865), and + // expect those timeouts to kill a non-responsive connection before + // this timeout fires. This is kept around for compatibility (when + // talking to a server that doesn't support DDP heartbeats) and can be + // removed later. + self.HEARTBEAT_TIMEOUT = 100*1000; self.rawUrl = url; self.socket = null; @@ -88,6 +90,8 @@ _.extend(LivedataTest.ClientStream.prototype, { self.socket.close(); self.socket = null; } + + _.each(self.eventCallbacks.disconnect, function (callback) { callback(); }); }, _clearConnectionAndHeartbeatTimers: function () { @@ -104,7 +108,7 @@ _.extend(LivedataTest.ClientStream.prototype, { _heartbeat_timeout: function () { var self = this; - Meteor._debug("Connection timeout. No heartbeat received."); + Meteor._debug("Connection timeout. No sockjs heartbeat received."); self._lostConnection(); }, diff --git a/packages/livedata/stream_server.js b/packages/livedata/stream_server.js index f3440853806..83dffadb597 100644 --- a/packages/livedata/stream_server.js +++ b/packages/livedata/stream_server.js @@ -23,7 +23,7 @@ StreamServer = function () { log: function() {}, // this is the default, but we code it explicitly because we depend // on it in stream_client:HEARTBEAT_TIMEOUT - heartbeat_delay: 25000, + heartbeat_delay: 45000, // The default disconnect_delay is 5 seconds, but if the server ends up CPU // bound for that much time, SockJS might not notice that the user has // reconnected because the timer (of disconnect_delay ms) can fire before diff --git a/packages/livedata/stub_stream.js b/packages/livedata/stub_stream.js index 14420248196..54e75c83673 100644 --- a/packages/livedata/stub_stream.js +++ b/packages/livedata/stub_stream.js @@ -30,6 +30,9 @@ _.extend(StubStream.prototype, { // no-op }, + _lostConnection: function () { + // no-op + }, // Methods for tests receive: function (data) { diff --git a/tools/tests/apps/ddp-heartbeat/.meteor/.gitignore b/tools/tests/apps/ddp-heartbeat/.meteor/.gitignore new file mode 100644 index 00000000000..40830374235 --- /dev/null +++ b/tools/tests/apps/ddp-heartbeat/.meteor/.gitignore @@ -0,0 +1 @@ +local diff --git a/tools/tests/apps/ddp-heartbeat/.meteor/packages b/tools/tests/apps/ddp-heartbeat/.meteor/packages new file mode 100644 index 00000000000..60221cd338e --- /dev/null +++ b/tools/tests/apps/ddp-heartbeat/.meteor/packages @@ -0,0 +1,7 @@ +# Meteor packages used by this project, one per line. +# +# 'meteor add' and 'meteor remove' will edit this file for you, +# but you can also edit it by hand. + +standard-app-packages +underscore diff --git a/tools/tests/apps/ddp-heartbeat/.meteor/release b/tools/tests/apps/ddp-heartbeat/.meteor/release new file mode 100644 index 00000000000..621e94f0ec9 --- /dev/null +++ b/tools/tests/apps/ddp-heartbeat/.meteor/release @@ -0,0 +1 @@ +none diff --git a/tools/tests/apps/ddp-heartbeat/server/heartbeat_test.js b/tools/tests/apps/ddp-heartbeat/server/heartbeat_test.js new file mode 100644 index 00000000000..f6e7bb49a75 --- /dev/null +++ b/tools/tests/apps/ddp-heartbeat/server/heartbeat_test.js @@ -0,0 +1,96 @@ +var Fiber = Npm.require("fibers"); +var Future = Npm.require("fibers/future"); + +// XXX Deps isn't supported on the server... but we need a way to +// capture client connection status transitions. + +var waitReactive = function (fn) { + var future = new Future(); + var timeoutHandle = Meteor.setTimeout( + function () { + future.throw(new Error("timeout")); + }, + 60000 + ); + Deps.autorun(function (c) { + var ret = fn(); + if (ret) { + c.stop(); + Meteor.clearTimeout(timeoutHandle); + + // We need to run in a fiber for `defer`. + Fiber(function () { + // Use `defer` because yields are blocked inside of autorun. + Meteor.defer(function () { + future.return(ret); + }) + }).run(); + } + }); + return future.wait(); +}; + +var waitForClientConnectionStatus = function (connection, status) { + waitReactive(function () { + return connection.status().status === status; + }); +}; + + +// Expect to connect, and then to reconnect (presumably because of a +// timeout). + +var expectConnectAndReconnect = function (clientConnection) { + console.log("client is connecting"); + waitForClientConnectionStatus(clientConnection, "connected"); + + console.log("client is connected, expecting ping timeout and reconnect"); + waitForClientConnectionStatus(clientConnection, "connecting"); + + console.log("client is reconnecting"); +}; + + +var testClientTimeout = function () { + console.log("Test client timeout"); + + var savedServerOptions = _.clone(Meteor.server.options); + Meteor.server.options.heartbeatInterval = 0; + Meteor.server.options.respondToPings = false; + + var clientConnection = DDP.connect(Meteor.absoluteUrl()); + + expectConnectAndReconnect(clientConnection); + + clientConnection.close(); + + Meteor.server.options = savedServerOptions; + + console.log("test successful\n"); +}; + + +var testServerTimeout = function () { + console.log("Test server timeout"); + + var clientConnection = DDP.connect( + Meteor.absoluteUrl(), + { + heartbeatInterval: 0, + respondToPings: false + } + ); + + expectConnectAndReconnect(clientConnection); + + clientConnection.close(); + console.log("test successful\n"); +}; + +Fiber(function () { + Meteor._printReceivedDDP = true; + Meteor._printSentDDP = true; + testClientTimeout(); + testServerTimeout(); + process.exit(0); +}).run(); diff --git a/tools/tests/ddp-heartbeat.js b/tools/tests/ddp-heartbeat.js new file mode 100644 index 00000000000..643947ae7d4 --- /dev/null +++ b/tools/tests/ddp-heartbeat.js @@ -0,0 +1,18 @@ +var selftest = require('../selftest.js'); +var Sandbox = selftest.Sandbox; + +var MONGO_LISTENING = + { stdout: " [initandlisten] waiting for connections on port" }; + +selftest.define("ddp-heartbeat", ["slow"], function () { + var s = new Sandbox({ fakeMongo: true }); + var run; + + s.createApp("ddpapp", "ddp-heartbeat"); + s.cd("ddpapp"); + + var run = s.run("--once", "--raw-logs"); + run.tellMongo(MONGO_LISTENING); + run.waitSecs(120); + run.expectExit(0); +});