Skip to content

Commit

Permalink
Merge branch 'ddp-heartbeats' into devel
Browse files Browse the repository at this point in the history
  • Loading branch information
n1mmy committed Mar 31, 2014
2 parents 7dab23c + a690cac commit c918bae
Show file tree
Hide file tree
Showing 17 changed files with 415 additions and 16 deletions.
13 changes: 13 additions & 0 deletions packages/livedata/DDP.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,19 @@ connections. The client can rely on the server sending a `failed`
message if a better version is possible as a result of the client or
the server having been upgraded.

## Heartbeats

### Messages:

* `ping`
- `id`: optional string (identifier used to correlate with response)
* `pong`
- `id`: optional string (same as received in the `ping` message)

### Procedure:

At any time after the connection is established either side may send a `ping` message. The sender may chose to include an `id` field in the `ping` message. When the other side receives a `ping` it must immediately respond with a `pong` message. If the received `ping` message includes an `id` field, the `pong` message must include the same `id` field.

## Managing Data:

### Messages:
Expand Down
102 changes: 102 additions & 0 deletions packages/livedata/heartbeat.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
// Heartbeat options:
// heartbeatInterval: interval to send pings, in milliseconds.
// heartbeatTimeout: timeout to close the connection if a reply isn't
// received, in milliseconds.
// sendPing: function to call to send a ping on the connection.
// onTimeout: function to call to close the connection.

Heartbeat = function (options) {
var self = this;

self.heartbeatInterval = options.heartbeatInterval;
self.heartbeatTimeout = options.heartbeatTimeout;
self._sendPing = options.sendPing;
self._onTimeout = options.onTimeout;

self._heartbeatIntervalHandle = null;
self._heartbeatTimeoutHandle = null;
};

_.extend(Heartbeat.prototype, {
stop: function () {
var self = this;
self._clearHeartbeatIntervalTimer();
self._clearHeartbeatTimeoutTimer();
},

start: function () {
var self = this;
self.stop();
self._startHeartbeatIntervalTimer();
},

_startHeartbeatIntervalTimer: function () {
var self = this;
self._heartbeatIntervalHandle = Meteor.setTimeout(
_.bind(self._heartbeatIntervalFired, self),
self.heartbeatInterval
);
},

_startHeartbeatTimeoutTimer: function () {
var self = this;
self._heartbeatTimeoutHandle = Meteor.setTimeout(
_.bind(self._heartbeatTimeoutFired, self),
self.heartbeatTimeout
);
},

_clearHeartbeatIntervalTimer: function () {
var self = this;
if (self._heartbeatIntervalHandle) {
Meteor.clearTimeout(self._heartbeatIntervalHandle);
self._heartbeatIntervalHandle = null;
}
},

_clearHeartbeatTimeoutTimer: function () {
var self = this;
if (self._heartbeatTimeoutHandle) {
Meteor.clearTimeout(self._heartbeatTimeoutHandle);
self._heartbeatTimeoutHandle = null;
}
},

// The heartbeat interval timer is fired when we should send a ping.
_heartbeatIntervalFired: function () {
var self = this;
self._heartbeatIntervalHandle = null;
self._sendPing();
// Wait for a pong.
self._startHeartbeatTimeoutTimer();
},

// The heartbeat timeout timer is fired when we sent a ping, but we
// timed out waiting for the pong.
_heartbeatTimeoutFired: function () {
var self = this;
self._heartbeatTimeoutHandle = null;
self._onTimeout();
},

pingReceived: function () {
var self = this;
// We know the connection is alive if we receive a ping, so we
// don't need to send a ping ourselves. Reset the interval timer.
if (self._heartbeatIntervalHandle) {
self._clearHeartbeatIntervalTimer();
self._startHeartbeatIntervalTimer();
}
},

pongReceived: function () {
var self = this;

// Receiving a pong means we won't timeout, so clear the timeout
// timer and start the interval again.
if (self._heartbeatTimeoutHandle) {
self._clearHeartbeatTimeoutTimer();
self._startHeartbeatIntervalTimer();
}
}
});
2 changes: 1 addition & 1 deletion packages/livedata/livedata_common.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
DDP = {};

SUPPORTED_DDP_VERSIONS = [ 'pre1' ];
SUPPORTED_DDP_VERSIONS = [ 'pre2', 'pre1' ];

LivedataTest.SUPPORTED_DDP_VERSIONS = SUPPORTED_DDP_VERSIONS;

Expand Down
58 changes: 57 additions & 1 deletion packages/livedata/livedata_connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,13 @@ var Connection = function (url, options) {
onDDPVersionNegotiationFailure: function (description) {
Meteor._debug(description);
},
heartbeatInterval: 35000,
heartbeatTimeout: 15000,
// These options are only for testing.
reloadWithOutstanding: false,
supportedDDPVersions: SUPPORTED_DDP_VERSIONS,
retry: true
retry: true,
respondToPings: true
}, options);

// If set, called when we reconnect, queuing method calls _before_ the
Expand Down Expand Up @@ -64,6 +67,9 @@ var Connection = function (url, options) {
self._nextMethodId = 1;
self._supportedDDPVersions = options.supportedDDPVersions;

self._heartbeatInterval = options.heartbeatInterval;
self._heartbeatTimeout = options.heartbeatTimeout;

// Tracks methods which the user has tried to call but which have not yet
// called their user callback (ie, they are waiting on their result or for all
// of their writes to be written to the local cache). Map from method ID to
Expand Down Expand Up @@ -224,6 +230,17 @@ var Connection = function (url, options) {
options.onDDPVersionNegotiationFailure(description);
}
}
else if (msg.msg === 'ping') {
if (options.respondToPings)
self._send({msg: "pong", id: msg.id});
if (self._heartbeat)
self._heartbeat.pingReceived();
}
else if (msg.msg === 'pong') {
if (self._heartbeat) {
self._heartbeat.pongReceived();
}
}
else if (_.include(['added', 'changed', 'removed', 'ready', 'updated'], msg.msg))
self._livedata_data(msg);
else if (msg.msg === 'nosub')
Expand Down Expand Up @@ -291,12 +308,21 @@ var Connection = function (url, options) {
});
};

var onDisconnect = function () {
if (self._heartbeat) {
self._heartbeat.stop()
self._heartbeat = null;
}
};

if (Meteor.isServer) {
self._stream.on('message', Meteor.bindEnvironment(onMessage, Meteor._debug));
self._stream.on('reset', Meteor.bindEnvironment(onReset, Meteor._debug));
self._stream.on('disconnect', Meteor.bindEnvironment(onDisconnect, Meteor._debug));
} else {
self._stream.on('message', onMessage);
self._stream.on('reset', onReset);
self._stream.on('disconnect', onDisconnect);
}
};

Expand Down Expand Up @@ -834,6 +860,14 @@ _.extend(Connection.prototype, {
self._stream.send(stringifyDDP(obj));
},

// We detected via DDP-level heartbeats that we've lost the
// connection. Unlike `disconnect` or `close`, a lost connection
// will be automatically retried.
_lostConnection: function () {
var self = this;
self._stream._lostConnection();
},

status: function (/*passthrough args*/) {
var self = this;
return self._stream.status.apply(self._stream, arguments);
Expand Down Expand Up @@ -893,6 +927,28 @@ _.extend(Connection.prototype, {
_livedata_connected: function (msg) {
var self = this;

if (self._version !== 'pre1' && self._heartbeatInterval !== 0) {
self._heartbeat = new Heartbeat({
heartbeatInterval: self._heartbeatInterval,
heartbeatTimeout: self._heartbeatTimeout,
onTimeout: function () {
if (Meteor.isClient) {
// only print on the client. this message is useful on the
// browser console to see that we've lost connection. on the
// server (eg when doing server-to-server DDP), it gets
// kinda annoying. also this matches the behavior with
// sockjs timeouts.
Meteor._debug("Connection timeout. No DDP heartbeat received.");
}
self._lostConnection();
},
sendPing: function () {
self._send({msg: 'ping'});
}
});
self._heartbeat.start();
}

// If this is a reconnect, we'll have to reset all stores.
if (self._lastSessionId)
self._resetStores = true;
Expand Down
19 changes: 19 additions & 0 deletions packages/livedata/livedata_connection_tests.js
Original file line number Diff line number Diff line change
Expand Up @@ -1285,6 +1285,25 @@ Tinytest.add("livedata connection - onReconnect prepends messages correctly with
]);
});

Tinytest.add("livedata connection - ping without id", function (test) {
var stream = new StubStream();
var conn = newConnection(stream);
startAndConnect(test, stream);

stream.receive({msg: 'ping'});
testGotMessage(test, stream, {msg: 'pong'});
});

Tinytest.add("livedata connection - ping with id", function (test) {
var stream = new StubStream();
var conn = newConnection(stream);
startAndConnect(test, stream);

var id = Random.id();
stream.receive({msg: 'ping', id: id});
testGotMessage(test, stream, {msg: 'pong', id: id});
});

var getSelfConnectionUrl = function () {
if (Meteor.isClient) {
return Meteor._relativeToSiteRootUrl("/");
Expand Down
Loading

0 comments on commit c918bae

Please sign in to comment.