Skip to content

Commit 98a0cbb

Browse files
authored
Merge pull request #152 from calibr/master
add messageQueueSizeLimit option
2 parents a045b8c + d81e981 commit 98a0cbb

File tree

3 files changed

+28
-0
lines changed

3 files changed

+28
-0
lines changed

README.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -367,6 +367,10 @@ Set flush interval in milliseconds. This option has no effect in `Message` mode.
367367
The logger stores emitted events in buffer and flush events for each interval.
368368
Default `100`.
369369
370+
**messageQueueSizeLimit**
371+
372+
Maximum number of messages that can be in queue at the same time. If a new message is received and it overflows the queue then the oldest message will be removed before adding the new item. This option has effect only in `Message` mode. No limit by default.
373+
370374
**security.clientHostname**
371375
372376
Set hostname of this logger. Use this value for hostname based authentication.

lib/sender.js

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ class FluentSender {
3737
if (this._eventMode === 'Message') {
3838
this._sendQueue = []; // queue for items waiting for being sent.
3939
this._flushInterval = 0;
40+
this._messageQueueSizeLimit = options.messageQueueSizeLimit || 0;
4041
} else {
4142
this._sendQueue = new Map();
4243
this._flushInterval = options.flushInterval || 100;
@@ -186,6 +187,9 @@ class FluentSender {
186187
// Message mode
187188
const item = this._makePacketItem(tag, time, data);
188189
item.callback = callback;
190+
if (this._messageQueueSizeLimit && this._sendQueue.length === this._messageQueueSizeLimit) {
191+
this._sendQueue.shift();
192+
}
189193
this._sendQueue.push(item);
190194
} else {
191195
// PackedForward mode

test/test.sender.js

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -947,6 +947,26 @@ let doTest = (tls) => {
947947
});
948948
});
949949
});
950+
951+
it('should limit messages stored in queue if server is not available', (done) => {
952+
runServer({}, serverOptions, (server, finish) => {
953+
finish(() => {
954+
const s = new FluentSender('debug', Object.assign({}, clientOptions, {
955+
port: server.port,
956+
messageQueueSizeLimit: 3
957+
}));
958+
s.emit('message1', {});
959+
s.emit('message2', {});
960+
s.emit('message3', {});
961+
s.emit('message4', {});
962+
expect(s._sendQueue.length).to.be.equal(3);
963+
expect(s._sendQueue[0].tag).to.be.equal('debug.message2');
964+
expect(s._sendQueue[1].tag).to.be.equal('debug.message3');
965+
expect(s._sendQueue[2].tag).to.be.equal('debug.message4');
966+
done();
967+
});
968+
});
969+
});
950970
};
951971

952972
describe('FluentSender', () => {

0 commit comments

Comments
 (0)