Skip to content

Commit

Permalink
feat: Include lastReadPosition in the flush event
Browse files Browse the repository at this point in the history
Users will sometimes want to track the current position
that `tail-file` is reading from. This could be useful
between two separate `tail-file` instances on the same
file. Since the `flush` event represents being "caught up"
when reading from the source file, emit a `lastReadPosition`
in the `flush` event payload for informative purposes. This
value represents the `startPos` value at the time of flush.

Semver: minor
Fixes: #19
  • Loading branch information
darinspivey committed Jun 8, 2021
1 parent 165c5c9 commit 91762e0
Show file tree
Hide file tree
Showing 4 changed files with 115 additions and 7 deletions.
8 changes: 6 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -153,13 +153,17 @@ superclass. Additionally, it will emit the following custom events.

### Event: `'flush'`

* No payload
* [`<Object>`][]
* `lastReadPosition` [`<Number>`][] - The current file position at `flush` time

This event is emitted when the underlying stream is done being read.
If backpressure is in effect, then `_read()` may be called multiple
times until it's flushed, so this event signals the end of the process.
It is used primarily in shutdown to make sure the data is exhausted,
and there should be no reason for the user to listen to this event.
but users may listen for this event if the relative "read position" in the
file is of interest. For example, the `lastReadPosition` may be persisted to memory
or database for resuming `tail-file` on a separate execution without missing
any lines or duplicating them.

### Event: `'renamed'`

Expand Down
9 changes: 8 additions & 1 deletion lib/tail-file.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,13 @@ interface RetryEventPayload extends EventPayload {
attempts: number
}

interface FlushEventPayload {
/**
* The `lastReadPosition` represents the `startPos` value at the time of flushing
*/
lastReadPosition: number
}

interface TailErrorEventPayload {
/**
* The error message as written by `TailFile`.
Expand Down Expand Up @@ -59,7 +66,7 @@ interface ReadableEvents {
}

interface TailFileEvents extends ReadableEvents {
flush: () => void
flush: (payload: FlushEventPayload) => void
renamed: (payload: EventPayload) => void
retry: (payload: RetryEventPayload) => void
tail_error: (payload: TailErrorEventPayload) => void
Expand Down
8 changes: 6 additions & 2 deletions lib/tail-file.js
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,9 @@ class TailFile extends Readable {
this._scheduleTimer(this[kPollFileIntervalMs])
}
this[kStream] = null
setImmediate(this.emit.bind(this), 'flush')
setImmediate(this.emit.bind(this), 'flush', {
lastReadPosition: this[kStartPos]
})
return
}

Expand Down Expand Up @@ -209,7 +211,9 @@ class TailFile extends Readable {
await this._streamFileChanges()
if (this[kStream]) return // Pause polling if backpressure is on
} else {
setImmediate(this.emit.bind(this), 'flush')
setImmediate(this.emit.bind(this), 'flush', {
lastReadPosition: this[kStartPos]
})
}

this._scheduleTimer(this[kPollFileIntervalMs])
Expand Down
97 changes: 95 additions & 2 deletions test/tail-file.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

const fs = require('fs')
const path = require('path')
const {pipeline} = require('stream')
const {promisify} = require('util')
const {once} = require('events')
const {Transform} = require('stream')
Expand Down Expand Up @@ -360,6 +361,97 @@ test('Success: startPos can be provided to start tailing at a given place', (t)
})
})

test('Success: lastReadPosition is emitted with "flush" and is accurate', async (t) => {
const name = 'logfile.txt'
let lineCounter = 0
let lastReadPosition
let finalSize

function getLotsOfLines() {
// Add a line marker just to give some uniqueness for human-readability
let result = ''
const longLine = 'X'.repeat(100) + '\n'
for (let i = 0; i < 100; i++) {
result += `Line Counter: ${++lineCounter}\t` + longLine
}
return result
}

// Populate a log file then 1) consume 2) add more 3) consume from lastReadPosition
const testDir = t.testdir({
[name]: getLotsOfLines()
})
const logFile = path.join(testDir, name)
const clonedFile = path.join(testDir, 'cloned-logfile.txt')

t.test('Read some lines, then quit. Collect the `lastReadPosition`', (t) => {
const tail = new TailFile(logFile, {
startPos: 0 // Read the WHOLE file, not start from the end
})
const clonedStream = fs.createWriteStream(clonedFile)

tail
.on('flush', (evt) => {
lastReadPosition = evt.lastReadPosition
if (lastReadPosition > 0) {
tail.removeAllListeners('flush')
// The flush has received the appended data
tail.quit()
}
})
.start()

pipeline(tail, clonedStream, (err) => {
t.error(err, 'No error from piping between tail and the cloned file')
t.type(lastReadPosition, 'number', '"flush" emitted the `lastReadPosition`')
t.pass('The first pass of reading logfile.txt is complete!')
t.end()
})
})

t.test('Add more data to logfile.txt before tail starts', async (t) => {
// Since `tail-file` starts at the *end* of the file, append this data FIRST, then
// we will start from the last startPos and read it forward. This is different than
// starting the tail first, then adding data.

await fs.promises.appendFile(logFile, getLotsOfLines())
const {size} = await fs.promises.stat(logFile)
finalSize = size
})

t.test('Consume from logfile.txt from the `lastReadPosition`', (t) => {
// Start from the last-known position
const tail = new TailFile(logFile, {
startPos: lastReadPosition
})
const clonedStream = fs.createWriteStream(clonedFile, {flags: 'a'}) // append

tail
.on('flush', (evt) => {
lastReadPosition = evt.lastReadPosition // Save for asserting below
if (lastReadPosition === finalSize) {
tail.removeAllListeners('flush')
// Done reading, we can quit
t.pass('lastReadPosition is === the size of the file')
tail.quit()
}
})
.start()

pipeline(tail, clonedStream, (err) => {
t.error(err, 'No error from piping between tail and the cloned file')
t.pass('The second pass of reading logfile.txt is complete!')
t.end()
})
})

t.test('Verify the file is cloned (no duplicates)', async (t) => {
const original = await fs.promises.readFile(logFile, 'utf8')
const cloned = await fs.promises.readFile(clonedFile, 'utf8')
t.same(original, cloned, 'The files have the same contents')
})
})

test('Error: filename provided does not exist (throws on start)', async (t) => {
const tail = new TailFile('THISFILEDOSNOTEXIST')
await t.rejects(tail.start(), {
Expand Down Expand Up @@ -530,7 +622,7 @@ test('Handled: Error reading old FH emits tail_error after inode changes', async
})

test('Quitting destroys any open tail file stream', (t) => {
t.plan(4)
t.plan(5)

const tail = new TailFile(__filename)
const symbols = getSymbols(tail)
Expand All @@ -540,7 +632,8 @@ test('Quitting destroys any open tail file stream', (t) => {
t.pass('TailFile emitted \'end\' event')
t.ok(tail[symbols.stream]._readableState.destroyed, 'Underlying stream destroyed')
})
.on('flush', () => {
.on('flush', (evt) => {
t.type(evt.lastReadPosition, 'number', 'lastReadPosition is in the "flush" event')
t.pass('flush event received as expected')
})

Expand Down

0 comments on commit 91762e0

Please sign in to comment.