diff --git a/lib/internal/fs/streams.js b/lib/internal/fs/streams.js index f75d0fba917241..c317f3b9202af9 100644 --- a/lib/internal/fs/streams.js +++ b/lib/internal/fs/streams.js @@ -13,8 +13,10 @@ const { const { ERR_INVALID_ARG_TYPE, - ERR_OUT_OF_RANGE, ERR_METHOD_NOT_IMPLEMENTED, + ERR_OUT_OF_RANGE, + ERR_STREAM_DESTROYED, + ERR_SYSTEM_ERROR, } = require('internal/errors').codes; const { deprecate, @@ -392,9 +394,67 @@ WriteStream.prototype.open = openWriteFs; WriteStream.prototype._construct = _construct; +function writeAll(data, size, pos, cb, retries = 0) { + this[kFs].write(this.fd, data, 0, size, pos, (er, bytesWritten, buffer) => { + // No data currently available and operation should be retried later. + if (er?.code === 'EAGAIN') { + er = null; + bytesWritten = 0; + } + + if (this.destroyed || er) { + return cb(er || new ERR_STREAM_DESTROYED('write')); + } + + this.bytesWritten += bytesWritten; + + retries = bytesWritten ? 0 : retries + 1; + size -= bytesWritten; + pos += bytesWritten; + + // Try writing non-zero number of bytes up to 5 times. + if (retries > 5) { + cb(new ERR_SYSTEM_ERROR('write failed')); + } else if (size) { + writeAll.call(this, buffer.slice(bytesWritten), size, pos, cb, retries); + } else { + cb(); + } + }); +} + +function writevAll(chunks, size, pos, cb, retries = 0) { + this[kFs].writev(this.fd, chunks, this.pos, (er, bytesWritten, buffers) => { + // No data currently available and operation should be retried later. + if (er?.code === 'EAGAIN') { + er = null; + bytesWritten = 0; + } + + if (this.destroyed || er) { + return cb(er || new ERR_STREAM_DESTROYED('writev')); + } + + this.bytesWritten += bytesWritten; + + retries = bytesWritten ? 0 : retries + 1; + size -= bytesWritten; + pos += bytesWritten; + + // Try writing non-zero number of bytes up to 5 times. + if (retries > 5) { + cb(new ERR_SYSTEM_ERROR('writev failed')); + } else if (size) { + writevAll.call(this, [Buffer.concat(buffers).slice(bytesWritten)], size, pos, cb, retries); + } else { + cb(); + } + }); +} + WriteStream.prototype._write = function(data, encoding, cb) { this[kIsPerformingIO] = true; - this[kFs].write(this.fd, data, 0, data.length, this.pos, (er, bytes) => { + writeAll.call(this, data, data.length, this.pos, (er) => { this[kIsPerformingIO] = false; if (this.destroyed) { // Tell ._destroy() that it's safe to close the fd now. @@ -402,12 +462,7 @@ WriteStream.prototype._write = function(data, encoding, cb) { return this.emit(kIoDone, er); } - if (er) { - return cb(er); - } - - this.bytesWritten += bytes; - cb(); + cb(er); }); if (this.pos !== undefined) @@ -427,7 +482,7 @@ WriteStream.prototype._writev = function(data, cb) { } this[kIsPerformingIO] = true; - this[kFs].writev(this.fd, chunks, this.pos, (er, bytes) => { + writevAll.call(this, chunks, size, this.pos, (er) => { this[kIsPerformingIO] = false; if (this.destroyed) { // Tell ._destroy() that it's safe to close the fd now. @@ -435,12 +490,7 @@ WriteStream.prototype._writev = function(data, cb) { return this.emit(kIoDone, er); } - if (er) { - return cb(er); - } - - this.bytesWritten += bytes; - cb(); + cb(er); }); if (this.pos !== undefined) diff --git a/test/parallel/test-fs-write-stream-eagain.mjs b/test/parallel/test-fs-write-stream-eagain.mjs new file mode 100644 index 00000000000000..b9de54218fb735 --- /dev/null +++ b/test/parallel/test-fs-write-stream-eagain.mjs @@ -0,0 +1,39 @@ +import * as common from '../common/index.mjs'; +import tmpdir from '../common/tmpdir.js'; +import assert from 'node:assert'; +import fs from 'node:fs'; +import { describe, it, mock } from 'node:test'; +import { finished } from 'node:stream/promises'; + +tmpdir.refresh(); +const file = tmpdir.resolve('writeStreamEAGAIN.txt'); +const errorWithEAGAIN = (fd, buffer, offset, length, position, callback) => { + callback(Object.assign(new Error(), { code: 'EAGAIN' }), 0, buffer); +}; + +describe('WriteStream EAGAIN', { concurrency: true }, () => { + it('_write', async () => { + const mockWrite = mock.fn(fs.write); + mockWrite.mock.mockImplementationOnce(errorWithEAGAIN); + const stream = fs.createWriteStream(file, { + fs: { + open: common.mustCall(fs.open), + write: mockWrite, + close: common.mustCall(fs.close), + } + }); + stream.end('foo'); + stream.on('close', common.mustCall()); + stream.on('error', common.mustNotCall()); + await finished(stream); + assert.strictEqual(mockWrite.mock.callCount(), 2); + assert.strictEqual(fs.readFileSync(file, 'utf8'), 'foo'); + }); + + it('_write', async () => { + const stream = fs.createWriteStream(file); + mock.getter(stream, 'destroyed', () => true); + stream.end('foo'); + await finished(stream).catch(common.mustCall()); + }); +});