Skip to content

Commit

Permalink
Add n-lines flag
Browse files Browse the repository at this point in the history
  • Loading branch information
lucagrulla committed Dec 23, 2020
1 parent cc4ceca commit 2fc35f9
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 34 deletions.
19 changes: 10 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,16 +72,17 @@ new Tail(fileToTail, options)

### Constructor parameters

* `separator`: the line separator token (default: `/[\r]{0,1}\n/` to handle linux/mac (9+)/windows). Pass null if your file is binary there's no line separator.
* `separator`: the line separator token (default: `/[\r]{0,1}\n/` to handle linux/mac (9+)/windows). Pass `null` for is binary files with no line separator.
* `fsWatchOptions`: the full set of options that can be passed to `fs.watch` as per node documentation (default: {}).
* `fromBeginning`: forces the tail of the file from the very beginning of it instead of from the first new line that will be appended (default: `false`).
* `follow`: simulate `tail -F` option. In the case the file is moved/renamed (or logrotated), if set to `true` `tail` will try to start tailing again after a 1 second delay, if set to `false` it will just emit an error event (default: `true`).
* `logger`: a logger object(default: no logger). The passed logger has to respond to two methods:
* `fromBeginning`: tail from the beginning of the file (default: `false`). If `fromBeginning` is true `nLines` will be ignored.
* `follow`: simulate `tail -F` option. In the case the file is moved/renamed/logrotated, if set to `true` will start tailing again after a 1 second delay; if set to `false` it will emit an error event (default: `true`).
* `logger`: a logger object(default: no logger). The passed logger should follow the folliwing signature:
* `info([data][, ...])`
* `error([data][, ...])`
* `useWatchFile`: if set to `true` will force the use of `fs.watchFile` rather than delegating to the library the choice between `fs.watch` and `fs.watchFile` (default: `false`).
* `encoding`: the encoding of the file to tail (default:`utf-8`).
* `flushAtEOF`: set to `true` if you want to force flush of content when end of file is reached. Particularly useful when there's no separator character at the end of the file (default: `false`).
* `nLines`: tail from the last n lines. (default: `undefined`). Ignored if `fromBeginning` is set to `true`.
* `useWatchFile`: if set to `true` will force the use of `fs.watchFile` over delegating to the library the choice between `fs.watch` and `fs.watchFile` (default: `false`).
* `encoding`: the file encoding (default:`utf-8`).
* `flushAtEOF`: set to `true` to force flush of content when end of file is reached. Useful when there's no separator character at the end of the file (default: `false`).

## Emitted events

Expand All @@ -105,12 +106,12 @@ tail.on('error', (err) => {

## How to contribute

Tail is written in plain ES6.Pull Requests are welcome.
Tail is written in ES6. Pull Requests are welcome.

## History

Tail was born as part of a data firehose. Read about it [here](https://www.lucagrulla.com/posts/building-a-firehose-with-nodejs/).
Tail original version was written in [CoffeeScript](https://coffeescript.org/). Since 2020 it's pure ES6.
Tail original version was written in [CoffeeScript](https://coffeescript.org/). Since December 2020 it's pure ES6.

## License

Expand Down
56 changes: 33 additions & 23 deletions src/tail.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ class Tail extends events.EventEmitter {
this.logger = options.logger || new devNull();
this.useWatchFile = options.useWatchFile || false;
this.flushAtEOF = options.flushAtEOF || false;
this.encoding = options.encoding || `utf-8`;
this.encoding = options.encoding || 'utf-8';
const fromBeginning = options.fromBeginning || false;

this.nLines = options.nLines || undefined;

this.logger.info(`Tail starting...`)
this.logger.info(`filename: ${this.filename}`);
Expand All @@ -48,14 +48,24 @@ class Tail extends events.EventEmitter {
});

this.logger.info(`fromBeginning: ${fromBeginning}`);
let startingPos = undefined;
let startingCursor;
if (fromBeginning) {
startingPos = 0;
//if fromBeginning triggers a check for content to flush the existing file
//without waiting for a new appended line
this.change(this.filename);
}
this.watch(startingPos);
startingCursor = 0;
} else if (this.nLines !== undefined) {
const data = fs.readFileSync(this.filename, {
flag: 'r',
encoding:this.encoding
});
const tokens = data.split(this.separator);
const dropLastToken = (tokens[tokens.length-1] === '') ? 1:0;//if the file end with empty line ignore line NL
const match = data.match(new RegExp(`(?:[^\r\n]*[\r]{0,1}\n){${tokens.length - this.nLines - dropLastToken}}`));
startingCursor = (match && match.length) ? Buffer.byteLength(match[0], this.encoding) : this.latestPosition();
} else {
startingCursor = this.latestPosition() ;
}
if (startingCursor === undefined) throw new Error("Tail can't initialize.");
const flush = fromBeginning || (this.nLines != undefined);
this.watch(startingCursor, flush);
}

latestPosition() {
Expand Down Expand Up @@ -103,28 +113,28 @@ class Tail extends events.EventEmitter {
}
}

change(filename) {
change() {
let p = this.latestPosition()
if (p < this.pos) {//scenario where text is not appended but it's actually a w+
this.pos = p
} else if (p > this.pos) {
this.queue.push({ start: this.pos, end: p});
this.pos = p
if (p < this.currentCursorPos) {//scenario where text is not appended but it's actually a w+
this.currentCursorPos = p
} else if (p > this.currentCursorPos) {
this.queue.push({ start: this.currentCursorPos, end: p});
this.currentCursorPos = p
if (this.queue.length == 1) {
this.internalDispatcher.emit("next");
}
}
}

watch(startingPos) {
if (this.isWatching) {
return
}
watch(startingCursor, flush) {
if (this.isWatching) return;
this.logger.info(`filesystem.watch present? ${fs.watch != undefined}`);
this.logger.info(`useWatchFile: ${this.useWatchFile}`);

this.isWatching = true;
this.pos = (startingPos === undefined) ? this.latestPosition() : startingPos;
this.currentCursorPos = startingCursor;
//force a file flush is either fromBegining or nLines flags were passed.
if (flush) this.change();

try {
if (!this.useWatchFile && fs.watch) {
Expand Down Expand Up @@ -156,7 +166,7 @@ class Tail extends events.EventEmitter {
if (this.follow) {
this.filename = path.join(this.absPath, filename);
this.rewatchId = setTimeout((() => {
this.watch(this.pos);
this.watch(this.currentCursorPos);
}), 1000);
} else {
this.logger.error(`'rename' event for ${this.filename}. File not available anymore.`);
Expand All @@ -169,15 +179,15 @@ class Tail extends events.EventEmitter {

watchEvent(e, evtFilename) {
if (e === 'change') {
this.change(this.filename);
this.change();
} else if (e === 'rename') {
this.rename(evtFilename);
}
}

watchFileEvent(curr, prev) {
if (curr.size > prev.size) {
this.pos = curr.size; //Update this.pos so that a consumer can determine if entire file has been handled
this.currentCursorPos = curr.size; //Update this.currentCursorPos so that a consumer can determine if entire file has been handled
this.queue.push({ start: prev.size, end: curr.size });
if (this.queue.length == 1) {
this.internalDispatcher.emit("next");
Expand Down
50 changes: 48 additions & 2 deletions test/test.js
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ describe('Tail', function () {
fs.closeSync(fd);
});

it ('should not lose data between rename events', function(done) {
it('should not lose data between rename events', function(done) {
this.timeout(10000);
const fd = fs.openSync(fileToTest, 'w+');
const newName = path.join( __dirname, 'example2.txt');
Expand All @@ -212,12 +212,58 @@ describe('Tail', function () {

let writeNo=0;
let id = setInterval(() => {
fs.writeSync(fd, `${writeNo}${os.EOL}`);
fs.writeSync(fd, `${writeNo}\n`);
writeNo++;
}, 50);

setTimeout(() => {
exec(`mv ${fileToTest} ${newName}`);
}, 250);
});

it('should respect the nLines flag with newline', function(done) {
const fd = fs.openSync(fileToTest, 'w+');
let tokens = [1,2,3,4,5,6,7,8,9,10];
const input = tokens.reduce((acc,n) =>{ return `${acc}${n}${os.EOL}`},"");
fs.writeSync(fd, input);

const n = 3;
const tailedFile = new Tail(fileToTest, {nLines: n, flushAtEOF:true, fsWatchOptions: { interval: 100 }});
let counter = 1;
const toBePrinted = tokens.slice(tokens.length-n);
tailedFile.on('line', (l) => {
assert.equal(parseInt(l), toBePrinted[counter-1]);
if (counter == toBePrinted.length) {
done();
fs.closeSync(fd);
tailedFile.unwatch();
}
counter++;
})
})

it('should respect the nLines flag when file', function(done) {
const fd = fs.openSync(fileToTest, 'w+');
const tokens = [1,2,3,4,5,6,7,8,9,10];
const input = tokens.reduce((acc,n,i) => {
let t = (i == tokens.length-1) ? n : `${n}${os.EOL}`;
return `${acc}${t}`;
},"");
fs.writeSync(fd, input);

const n = 3;
let counter = 1;
const tailedFile = new Tail(fileToTest, {nLines: n, flushAtEOF:true, fsWatchOptions: { interval: 100 }});
const toBePrinted = tokens.slice(tokens.length-n);

tailedFile.on('line', (l) => {
assert.equal(parseInt(l), toBePrinted[counter-1]);
if (counter == toBePrinted.length) {
done();
fs.closeSync(fd);
tailedFile.unwatch();
}
counter++;
})
})
});

0 comments on commit 2fc35f9

Please sign in to comment.