@@ -11,6 +11,7 @@ const {
1111} = primordials ;
1212
1313const {
14+ ERR_INVALID_ARG_TYPE ,
1415 ERR_OUT_OF_RANGE ,
1516 ERR_STREAM_DESTROYED
1617} = require ( 'internal/errors' ) . codes ;
@@ -28,6 +29,7 @@ const kIoDone = Symbol('kIoDone');
2829const kIsPerformingIO = Symbol ( 'kIsPerformingIO' ) ;
2930
3031const kMinPoolSpace = 128 ;
32+ const kFs = Symbol ( 'kFs' ) ;
3133
3234let pool ;
3335// It can happen that we expect to read a large chunk of data, and reserve
@@ -76,6 +78,23 @@ function ReadStream(path, options) {
7678 options . emitClose = false ;
7779 }
7880
81+ this [ kFs ] = options . fs || fs ;
82+
83+ if ( typeof this [ kFs ] . open !== 'function' ) {
84+ throw new ERR_INVALID_ARG_TYPE ( 'options.fs.open' , 'function' ,
85+ this [ kFs ] . open ) ;
86+ }
87+
88+ if ( typeof this [ kFs ] . read !== 'function' ) {
89+ throw new ERR_INVALID_ARG_TYPE ( 'options.fs.read' , 'function' ,
90+ this [ kFs ] . read ) ;
91+ }
92+
93+ if ( typeof this [ kFs ] . close !== 'function' ) {
94+ throw new ERR_INVALID_ARG_TYPE ( 'options.fs.close' , 'function' ,
95+ this [ kFs ] . close ) ;
96+ }
97+
7998 Readable . call ( this , options ) ;
8099
81100 // Path will be ignored when fd is specified, so it can be falsy
@@ -136,7 +155,7 @@ function _openReadFs(stream) {
136155 return ;
137156 }
138157
139- fs . open ( stream . path , stream . flags , stream . mode , ( er , fd ) => {
158+ stream [ kFs ] . open ( stream . path , stream . flags , stream . mode , ( er , fd ) => {
140159 if ( er ) {
141160 if ( stream . autoClose ) {
142161 stream . destroy ( ) ;
@@ -186,42 +205,43 @@ ReadStream.prototype._read = function(n) {
186205
187206 // the actual read.
188207 this [ kIsPerformingIO ] = true ;
189- fs . read ( this . fd , pool , pool . used , toRead , this . pos , ( er , bytesRead ) => {
190- this [ kIsPerformingIO ] = false ;
191- // Tell ._destroy() that it's safe to close the fd now.
192- if ( this . destroyed ) return this . emit ( kIoDone , er ) ;
193-
194- if ( er ) {
195- if ( this . autoClose ) {
196- this . destroy ( ) ;
197- }
198- this . emit ( 'error' , er ) ;
199- } else {
200- let b = null ;
201- // Now that we know how much data we have actually read, re-wind the
202- // 'used' field if we can, and otherwise allow the remainder of our
203- // reservation to be used as a new pool later.
204- if ( start + toRead === thisPool . used && thisPool === pool ) {
205- const newUsed = thisPool . used + bytesRead - toRead ;
206- thisPool . used = roundUpToMultipleOf8 ( newUsed ) ;
208+ this [ kFs ] . read (
209+ this . fd , pool , pool . used , toRead , this . pos , ( er , bytesRead ) => {
210+ this [ kIsPerformingIO ] = false ;
211+ // Tell ._destroy() that it's safe to close the fd now.
212+ if ( this . destroyed ) return this . emit ( kIoDone , er ) ;
213+
214+ if ( er ) {
215+ if ( this . autoClose ) {
216+ this . destroy ( ) ;
217+ }
218+ this . emit ( 'error' , er ) ;
207219 } else {
208- // Round down to the next lowest multiple of 8 to ensure the new pool
209- // fragment start and end positions are aligned to an 8 byte boundary.
210- const alignedEnd = ( start + toRead ) & ~ 7 ;
211- const alignedStart = roundUpToMultipleOf8 ( start + bytesRead ) ;
212- if ( alignedEnd - alignedStart >= kMinPoolSpace ) {
213- poolFragments . push ( thisPool . slice ( alignedStart , alignedEnd ) ) ;
220+ let b = null ;
221+ // Now that we know how much data we have actually read, re-wind the
222+ // 'used' field if we can, and otherwise allow the remainder of our
223+ // reservation to be used as a new pool later.
224+ if ( start + toRead === thisPool . used && thisPool === pool ) {
225+ const newUsed = thisPool . used + bytesRead - toRead ;
226+ thisPool . used = roundUpToMultipleOf8 ( newUsed ) ;
227+ } else {
228+ // Round down to the next lowest multiple of 8 to ensure the new pool
229+ // fragment start and end positions are aligned to an 8 byte boundary.
230+ const alignedEnd = ( start + toRead ) & ~ 7 ;
231+ const alignedStart = roundUpToMultipleOf8 ( start + bytesRead ) ;
232+ if ( alignedEnd - alignedStart >= kMinPoolSpace ) {
233+ poolFragments . push ( thisPool . slice ( alignedStart , alignedEnd ) ) ;
234+ }
214235 }
215- }
216236
217- if ( bytesRead > 0 ) {
218- this . bytesRead += bytesRead ;
219- b = thisPool . slice ( start , start + bytesRead ) ;
220- }
237+ if ( bytesRead > 0 ) {
238+ this . bytesRead += bytesRead ;
239+ b = thisPool . slice ( start , start + bytesRead ) ;
240+ }
221241
222- this . push ( b ) ;
223- }
224- } ) ;
242+ this . push ( b ) ;
243+ }
244+ } ) ;
225245
226246 // Move the pool positions, and internal position for reading.
227247 if ( this . pos !== undefined )
@@ -245,7 +265,7 @@ ReadStream.prototype._destroy = function(err, cb) {
245265} ;
246266
247267function closeFsStream ( stream , cb , err ) {
248- fs . close ( stream . fd , ( er ) => {
268+ stream [ kFs ] . close ( stream . fd , ( er ) => {
249269 er = er || err ;
250270 cb ( er ) ;
251271 stream . closed = true ;
@@ -279,6 +299,40 @@ function WriteStream(path, options) {
279299 options . emitClose = false ;
280300 }
281301
302+ this [ kFs ] = options . fs || fs ;
303+ if ( typeof this [ kFs ] . open !== 'function' ) {
304+ throw new ERR_INVALID_ARG_TYPE ( 'options.fs.open' , 'function' ,
305+ this [ kFs ] . open ) ;
306+ }
307+
308+ if ( ! this [ kFs ] . write && ! this [ kFs ] . writev ) {
309+ throw new ERR_INVALID_ARG_TYPE ( 'options.fs.write' , 'function' ,
310+ this [ kFs ] . write ) ;
311+ }
312+
313+ if ( this [ kFs ] . write && typeof this [ kFs ] . write !== 'function' ) {
314+ throw new ERR_INVALID_ARG_TYPE ( 'options.fs.write' , 'function' ,
315+ this [ kFs ] . write ) ;
316+ }
317+
318+ if ( this [ kFs ] . writev && typeof this [ kFs ] . writev !== 'function' ) {
319+ throw new ERR_INVALID_ARG_TYPE ( 'options.fs.writev' , 'function' ,
320+ this [ kFs ] . writev ) ;
321+ }
322+
323+ if ( typeof this [ kFs ] . close !== 'function' ) {
324+ throw new ERR_INVALID_ARG_TYPE ( 'options.fs.close' , 'function' ,
325+ this [ kFs ] . close ) ;
326+ }
327+
328+ // It's enough to override either, in which case only one will be used.
329+ if ( ! this [ kFs ] . write ) {
330+ this . _write = null ;
331+ }
332+ if ( ! this [ kFs ] . writev ) {
333+ this . _writev = null ;
334+ }
335+
282336 Writable . call ( this , options ) ;
283337
284338 // Path will be ignored when fd is specified, so it can be falsy
@@ -335,7 +389,7 @@ function _openWriteFs(stream) {
335389 return ;
336390 }
337391
338- fs . open ( stream . path , stream . flags , stream . mode , ( er , fd ) => {
392+ stream [ kFs ] . open ( stream . path , stream . flags , stream . mode , ( er , fd ) => {
339393 if ( er ) {
340394 if ( stream . autoClose ) {
341395 stream . destroy ( ) ;
@@ -361,7 +415,7 @@ WriteStream.prototype._write = function(data, encoding, cb) {
361415 if ( this . destroyed ) return cb ( new ERR_STREAM_DESTROYED ( 'write' ) ) ;
362416
363417 this [ kIsPerformingIO ] = true ;
364- fs . write ( this . fd , data , 0 , data . length , this . pos , ( er , bytes ) => {
418+ this [ kFs ] . write ( this . fd , data , 0 , data . length , this . pos , ( er , bytes ) => {
365419 this [ kIsPerformingIO ] = false ;
366420 // Tell ._destroy() that it's safe to close the fd now.
367421 if ( this . destroyed ) {
@@ -405,7 +459,7 @@ WriteStream.prototype._writev = function(data, cb) {
405459 }
406460
407461 this [ kIsPerformingIO ] = true ;
408- fs . writev ( this . fd , chunks , this . pos , ( er , bytes ) => {
462+ this [ kFs ] . writev ( this . fd , chunks , this . pos , ( er , bytes ) => {
409463 this [ kIsPerformingIO ] = false ;
410464 // Tell ._destroy() that it's safe to close the fd now.
411465 if ( this . destroyed ) {
0 commit comments