Skip to content

Commit

Permalink
Merge pull request pinojs#17 from mcollina/fixed
Browse files Browse the repository at this point in the history
Do not currupt data
  • Loading branch information
mcollina authored Nov 7, 2018
2 parents 2a1adc9 + 4ed395c commit b5ec5ae
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 6 deletions.
1 change: 1 addition & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@ node_js:
- '6'
- '8'
- '10'
- '11'
2 changes: 1 addition & 1 deletion bench.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ var fs = require('fs')
var core = fs.createWriteStream('/dev/null')
var fd = fs.openSync('/dev/null', 'w')
var sonic = new SonicBoom(fd)
var sonic4k = new SonicBoom(fd, 10000)
var sonic4k = new SonicBoom(fd, 4096)

setTimeout(doBench, 100)

Expand Down
22 changes: 22 additions & 0 deletions fixtures/firehose.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
'use strict'

const SonicBoom = require('..')

const out = new SonicBoom(process.stdout.fd)
const str = Buffer.alloc(1000).fill('a').toString()

let i = 0

function write () {
if (i++ === 10) {
return
}

if (out.write(str)) {
write()
} else {
out.once('drain', write)
}
}

write()
21 changes: 16 additions & 5 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -52,16 +52,26 @@ function SonicBoom (fd, minLength) {
throw new Error('SonicBoom supports only file descriptors and files')
}

this.release = (err) => {
this.release = (err, n) => {
if (err) {
if (err.code === 'EAGAIN') {
fs.write(this.fd, this._writingBuf, 'utf8', this.release)
// let's give the destination some time to process the chunk
setTimeout(() => {
fs.write(this.fd, this._writingBuf, 'utf8', this.release)
}, 100)
return
}

this.emit('error', err)
return
}

if (this._writingBuf.length !== n) {
this._writingBuf = this._writingBuf.slice(n)
fs.write(this.fd, this._writingBuf, 'utf8', this.release)
return
}

this._writingBuf = ''

if (this.destroyed) {
Expand Down Expand Up @@ -189,10 +199,11 @@ SonicBoom.prototype.destroy = function () {

function actualWrite (sonic) {
sonic._writing = true
flatstr(sonic._buf)
fs.write(sonic.fd, sonic._buf, 'utf8', sonic.release)
sonic._writingBuf = sonic._buf
var buf = sonic._buf
sonic._buf = ''
flatstr(buf)
sonic._writingBuf = buf
fs.write(sonic.fd, buf, 'utf8', sonic.release)
}

function actualClose (sonic) {
Expand Down
60 changes: 60 additions & 0 deletions test.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
const tap = require('tap')
const test = tap.test
const tearDown = tap.tearDown
const { join } = require('path')
const { fork } = require('child_process')
const fs = require('fs')
const os = require('os')
const path = require('path')
Expand Down Expand Up @@ -410,3 +412,61 @@ test('retry on EAGAIN', (t) => {
t.pass('close emitted')
})
})

test('chunk data accordingly', (t) => {
t.plan(2)

const child = fork(join(__dirname, 'fixtures', 'firehose.js'), { silent: true })
const str = Buffer.alloc(10000).fill('a').toString()

let data = ''

child.stdout.on('data', function (chunk) {
data += chunk.toString()
})

child.stdout.on('end', function () {
t.is(data, str)
})

child.on('close', function (code) {
t.is(code, 0)
})
})

test('write buffers that are not totally written', (t) => {
t.plan(7)

const fakeFs = Object.create(fs)
fakeFs.write = function (fd, buf, enc, cb) {
t.pass('fake fs.write called')
fakeFs.write = fs.write
process.nextTick(cb, null, 0)
}
const SonicBoom = proxyquire('.', {
fs: fakeFs
})

const dest = file()
const fd = fs.openSync(dest, 'w')
const stream = new SonicBoom(fd)

stream.on('ready', () => {
t.pass('ready emitted')
})

t.ok(stream.write('hello world\n'))
t.ok(stream.write('something else\n'))

stream.end()

stream.on('finish', () => {
fs.readFile(dest, 'utf8', (err, data) => {
t.error(err)
t.equal(data, 'hello world\nsomething else\n')
})
})
stream.on('close', () => {
t.pass('close emitted')
})
})

0 comments on commit b5ec5ae

Please sign in to comment.