Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions docs/events.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,16 @@ point in time:
client.events.get({stream_position: '1408838928446360'}, callback);
```

### Destroying the Stream

If you ever need to *stop* long-polling, use:

```js
client.events.destroy();
```

This *will not* cancel in-process network requests. It *will* ensure no further long-polling nor event fetching takes place.

Enterprise Events
-----------------

Expand Down
64 changes: 61 additions & 3 deletions lib/event-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,9 @@ util.inherits(EventStream, Readable);
* @private
*/
EventStream.prototype.getLongPollInfo = function() {
if (this.destroyed) {
return Promise.resolve(false);
}

return this._client.events.getLongPollInfo()
.then(longPollInfo => {
Expand All @@ -108,7 +111,7 @@ EventStream.prototype.getLongPollInfo = function() {

// Only retry on resolvable errors
if (!err.authExpired) {
setTimeout(() => this.getLongPollInfo(), this._options.retryDelay);
this.retryPollInfo();
}
});
};
Expand All @@ -121,6 +124,9 @@ EventStream.prototype.getLongPollInfo = function() {
* @private
*/
EventStream.prototype.doLongPoll = function() {
if (this.destroyed) {
return Promise.resolve(false);
}

// If we're over the max number of retries, reset
if (this._longPollRetries > this._longPollInfo.max_retries) {
Expand All @@ -147,6 +153,9 @@ EventStream.prototype.doLongPoll = function() {
this._longPollRetries += 1;
return this._client.wrapWithDefaultHandler(this._client.get)(url, options)
.then(data => {
if (this.destroyed) {
return false;
}

if (data.message === 'reconnect') {
return this.getLongPollInfo();
Expand All @@ -161,17 +170,34 @@ EventStream.prototype.doLongPoll = function() {
return this.fetchEvents();
})
.catch(() => {
setTimeout(() => this.getLongPollInfo(), this._options.retryDelay);
this.retryPollInfo();
});
};

/**
* Retries long-polling after a delay.
* Does not attempt if stream is already destroyed.
* @returns {void}
* @private
*/
EventStream.prototype.retryPollInfo = function() {

if (!this.destroyed) {
this._retryTimer = setTimeout(() => this.getLongPollInfo(), this._options.retryDelay);
}
};

/**
* Fetch the latest group of events and push them into the stream
* @returns {Promise} Promise for testing purposes
* @private
*/
EventStream.prototype.fetchEvents = function() {

if (this.destroyed) {
return Promise.resolve(false);
}

var eventParams = {
stream_position: this._streamPosition,
limit: 500
Expand Down Expand Up @@ -229,7 +255,8 @@ EventStream.prototype.fetchEvents = function() {
.catch(err => {

this.emit('error', err);
setTimeout(() => this.getLongPollInfo(), this._options.retryDelay);

this.retryPollInfo();
})
);
};
Expand Down Expand Up @@ -269,4 +296,35 @@ EventStream.prototype._read = function() {
this.getLongPollInfo();
};

/**
* Implementation of stream-internal `_destroy` function (v8.0.0 and later).
* Called by stream consumers to effectively stop polling via the public
* `destroy()`.
* @returns {void}
* @private
*/
EventStream.prototype._destroy = function() {
clearTimeout(this._retryTimer);
delete this._retryTimer;
};

// backwards-compat for Node.js pre-v8.0.0
/* istanbul ignore if */
if (typeof Readable.prototype.destroy !== 'function') {
/**
* Destroys the stream. Rough polyfill for `Readable#destroy`.
* @returns {void}
* @public
*/
EventStream.prototype.destroy = function() {
if (!this.destroyed) {
process.nextTick(() => {
this.emit('close');
});
this.destroyed = true;
this._destroy();
}
};
}

module.exports = EventStream;
136 changes: 136 additions & 0 deletions tests/lib/event-stream-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,25 @@ describe('EventStream', function() {
clock.tick(1000);
});
});

describe('when stream is destroyed', function() {

beforeEach(function() {
eventStream.destroy();
});

it('should do nothing and resolve `false`', function() {

sandbox.mock(eventStream._client.events)
.expects('getLongPollInfo')
.never();

return eventStream.getLongPollInfo()
.then(result => {
assert.strictEqual(result, false);
});
});
});
});

describe('doLongPoll()', function() {
Expand Down Expand Up @@ -274,6 +293,54 @@ describe('EventStream', function() {
return eventStream.doLongPoll();
});

describe('when stream is destroyed', function() {

describe('before client fetch', function() {

beforeEach(function() {
eventStream.destroy();
});

it('should do nothing and resolve `false`', function() {

sandbox.mock(eventStream)
.expects('getLongPollInfo')
.never();
sandbox.mock(boxClientFake)
.expects('wrapWithDefaultHandler')
.never();

return eventStream.doLongPoll()
.then(result => {
assert.strictEqual(result, false);
});
});
});

describe('after client fetch', function() {

it('should resolve `false`', function() {

sandbox.mock(eventStream)
.expects('getLongPollInfo')
.never();
sandbox.mock(boxClientFake).expects('wrapWithDefaultHandler')
.returnsArg(0);
sandbox.mock(boxClientFake).expects('get')
.returns(Promise.resolve({
message: 'reconnect'
}));

const promise = eventStream.doLongPoll();

eventStream.destroy();

return promise.then(result => {
assert.strictEqual(result, false);
});
});
});
});
});

describe('fetchEvents()', function() {
Expand Down Expand Up @@ -550,6 +617,25 @@ describe('EventStream', function() {
]);
});

describe('when stream is destroyed before fetch', function() {

beforeEach(function() {

eventStream.destroy();
});

it('should do nothing and resolve `false`', function() {

sandbox.mock(eventStream._rateLimiter)
.expects('then')
.never();

return eventStream.fetchEvents()
.then(result => {
assert.strictEqual(result, false);
});
});
});
});

describe('cleanupDedupFilter()', function() {
Expand Down Expand Up @@ -583,4 +669,54 @@ describe('EventStream', function() {
});
});

describe('destroy()', function() {

afterEach(function() {

eventStream.destroy();
});

it('should cancel any active polling retry timer', function() {

eventStream.retryPollInfo();
assert.property(eventStream, '_retryTimer');

eventStream.destroy();
assert.notProperty(eventStream, '_retryTimer');
});

it('should set `destroyed` prop if not already set', function() {

eventStream.destroy();
assert.propertyVal(eventStream, 'destroyed', true);
});
});

describe('retryPollInfo()', function() {

describe('when not destroyed', function() {

afterEach(function() {

eventStream.destroy();
});

it('should create a `_retryTimer` property', function() {

assert.notProperty(eventStream, '_retryTimer');
eventStream.retryPollInfo();
assert.property(eventStream, '_retryTimer');
});
});

describe('when destroyed', function() {

it('should not create a `_retryTimer` property', function() {

eventStream.destroy();
eventStream.retryPollInfo();
assert.notProperty(eventStream, '_retryTimer');
});
});
});
});