@@ -23,7 +23,7 @@ import {
2323} from 'gaxios' ;
2424import * as gaxios from 'gaxios' ;
2525import { GoogleAuth , GoogleAuthOptions } from 'google-auth-library' ;
26- import { Readable , Writable } from 'stream' ;
26+ import { Readable , Writable , WritableOptions } from 'stream' ;
2727import retry = require( 'async-retry' ) ;
2828import { RetryOptions , PreconditionOptions } from './storage' ;
2929import * as uuid from 'uuid' ;
@@ -62,7 +62,7 @@ export interface QueryParameters extends PreconditionOptions {
6262 userProject ?: string ;
6363}
6464
65- export interface UploadConfig {
65+ export interface UploadConfig extends Pick < WritableOptions , 'highWaterMark' > {
6666 /**
6767 * The API endpoint used for the request.
6868 * Defaults to `storage.googleapis.com`.
@@ -260,20 +260,22 @@ export class Upload extends Writable {
260260 uri : uuid . v4 ( ) ,
261261 offset : uuid . v4 ( ) ,
262262 } ;
263- private upstreamChunkBuffer : Buffer = Buffer . alloc ( 0 ) ;
264- private chunkBufferEncoding ?: BufferEncoding = undefined ;
263+ /**
264+ * A cache of buffers written to this instance, ready for consuming
265+ */
266+ private writeBuffers : Buffer [ ] = [ ] ;
265267 private numChunksReadInRequest = 0 ;
266268 /**
267- * A chunk used for caching the most recent upload chunk.
269+ * An array of buffers used for caching the most recent upload chunk.
268270 * We should not assume that the server received all bytes sent in the request.
269271 * - https://cloud.google.com/storage/docs/performing-resumable-uploads#chunked-upload
270272 */
271- private lastChunkSent = Buffer . alloc ( 0 ) ;
273+ private localWriteCache : Buffer [ ] = [ ] ;
274+ private localWriteCacheByteLength = 0 ;
272275 private upstreamEnded = false ;
273276
274277 constructor ( cfg : UploadConfig ) {
275- super ( ) ;
276-
278+ super ( cfg ) ;
277279 cfg = cfg || { } ;
278280
279281 if ( ! cfg . bucket || ! cfg . file ) {
@@ -391,24 +393,73 @@ export class Upload extends Writable {
391393 // Backwards-compatible event
392394 this . emit ( 'writing' ) ;
393395
394- this . upstreamChunkBuffer = Buffer . concat ( [
395- this . upstreamChunkBuffer ,
396- typeof chunk === 'string' ? Buffer . from ( chunk , encoding ) : chunk ,
397- ] ) ;
398- this . chunkBufferEncoding = encoding ;
396+ this . writeBuffers . push (
397+ typeof chunk === 'string' ? Buffer . from ( chunk , encoding ) : chunk
398+ ) ;
399399
400400 this . once ( 'readFromChunkBuffer' , readCallback ) ;
401401
402402 process . nextTick ( ( ) => this . emit ( 'wroteToChunkBuffer' ) ) ;
403403 }
404404
405+ #resetLocalBuffersCache( ) {
406+ this . localWriteCache = [ ] ;
407+ this . localWriteCacheByteLength = 0 ;
408+ }
409+
410+ #addLocalBufferCache( buf : Buffer ) {
411+ this . localWriteCache . push ( buf ) ;
412+ this . localWriteCacheByteLength += buf . byteLength ;
413+ }
414+
405415 /**
406- * Prepends data back to the upstream chunk buffer .
416+ * Prepends the local buffer to write buffer and resets it .
407417 *
408- * @param chunk The data to prepend
418+ * @param keepLastBytes number of bytes to keep from the end of the local buffer.
409419 */
410- private unshiftChunkBuffer ( chunk : Buffer ) {
411- this . upstreamChunkBuffer = Buffer . concat ( [ chunk , this . upstreamChunkBuffer ] ) ;
420+ private prependLocalBufferToUpstream ( keepLastBytes ?: number ) {
421+ // Typically, the upstream write buffers should be smaller than the local
422+ // cache, so we can save time by setting the local cache as the new
423+ // upstream write buffer array and appending the old array to it
424+ let initialBuffers : Buffer [ ] = [ ] ;
425+
426+ if ( keepLastBytes ) {
427+ // we only want the last X bytes
428+ let bytesKept = 0 ;
429+
430+ while ( keepLastBytes > bytesKept ) {
431+ // load backwards because we want the last X bytes
432+ // note: `localWriteCacheByteLength` is reset below
433+ let buf = this . localWriteCache . pop ( ) ;
434+ if ( ! buf ) break ;
435+
436+ bytesKept += buf . byteLength ;
437+
438+ if ( bytesKept > keepLastBytes ) {
439+ // we have gone over the amount desired, let's keep the last X bytes
440+ // of this buffer
441+ const diff = bytesKept - keepLastBytes ;
442+ buf = buf . subarray ( diff ) ;
443+ bytesKept -= diff ;
444+ }
445+
446+ initialBuffers . unshift ( buf ) ;
447+ }
448+ } else {
449+ // we're keeping all of the local cache, simply use it as the initial buffer
450+ initialBuffers = this . localWriteCache ;
451+ }
452+
453+ // Append the old upstream to the new
454+ const append = this . writeBuffers ;
455+ this . writeBuffers = initialBuffers ;
456+
457+ for ( const buf of append ) {
458+ this . writeBuffers . push ( buf ) ;
459+ }
460+
461+ // reset last buffers sent
462+ this . #resetLocalBuffersCache( ) ;
412463 }
413464
414465 /**
@@ -417,15 +468,28 @@ export class Upload extends Writable {
417468 * @param limit The maximum amount to return from the buffer.
418469 * @returns The data requested.
419470 */
420- private pullFromChunkBuffer ( limit : number ) {
421- const chunk = this . upstreamChunkBuffer . slice ( 0 , limit ) ;
422- this . upstreamChunkBuffer = this . upstreamChunkBuffer . slice ( limit ) ;
471+ private * pullFromChunkBuffer ( limit : number ) {
472+ while ( limit ) {
473+ const buf = this . writeBuffers . shift ( ) ;
474+ if ( ! buf ) break ;
475+
476+ let bufToYield = buf ;
423477
424- // notify upstream we've read from the buffer so it can potentially
425- // send more data down.
426- process . nextTick ( ( ) => this . emit ( 'readFromChunkBuffer' ) ) ;
478+ if ( buf . byteLength > limit ) {
479+ bufToYield = buf . subarray ( 0 , limit ) ;
480+ this . writeBuffers . unshift ( buf . subarray ( limit ) ) ;
481+ limit = 0 ;
482+ } else {
483+ limit -= buf . byteLength ;
484+ }
485+
486+ yield bufToYield ;
427487
428- return chunk ;
488+ // Notify upstream we've read from the buffer and we're able to consume
489+ // more. It can also potentially send more data down as we're currently
490+ // iterating.
491+ this . emit ( 'readFromChunkBuffer' ) ;
492+ }
429493 }
430494
431495 /**
@@ -436,7 +500,7 @@ export class Upload extends Writable {
436500 private async waitForNextChunk ( ) : Promise < boolean > {
437501 const willBeMoreChunks = await new Promise < boolean > ( resolve => {
438502 // There's data available - it should be digested
439- if ( this . upstreamChunkBuffer . byteLength ) {
503+ if ( this . writeBuffers . length ) {
440504 return resolve ( true ) ;
441505 }
442506
@@ -457,7 +521,7 @@ export class Upload extends Writable {
457521 removeListeners ( ) ;
458522
459523 // this should be the last chunk, if there's anything there
460- if ( this . upstreamChunkBuffer . length ) return resolve ( true ) ;
524+ if ( this . writeBuffers . length ) return resolve ( true ) ;
461525
462526 return resolve ( false ) ;
463527 } ;
@@ -483,35 +547,16 @@ export class Upload extends Writable {
483547 * Ends when the limit has reached or no data is expected to be pushed from upstream.
484548 *
485549 * @param limit The most amount of data this iterator should return. `Infinity` by default.
486- * @param oneChunkMode Determines if one, exhaustive chunk is yielded for the iterator
487550 */
488- private async * upstreamIterator ( limit = Infinity , oneChunkMode ?: boolean ) {
489- let completeChunk = Buffer . alloc ( 0 ) ;
490-
551+ private async * upstreamIterator ( limit = Infinity ) {
491552 // read from upstream chunk buffer
492553 while ( limit && ( await this . waitForNextChunk ( ) ) ) {
493554 // read until end or limit has been reached
494- const chunk = this . pullFromChunkBuffer ( limit ) ;
495-
496- limit -= chunk . byteLength ;
497- if ( oneChunkMode ) {
498- // return 1 chunk at the end of iteration
499- completeChunk = Buffer . concat ( [ completeChunk , chunk ] ) ;
500- } else {
501- // return many chunks throughout iteration
502- yield {
503- chunk,
504- encoding : this . chunkBufferEncoding ,
505- } ;
555+ for ( const chunk of this . pullFromChunkBuffer ( limit ) ) {
556+ limit -= chunk . byteLength ;
557+ yield chunk ;
506558 }
507559 }
508-
509- if ( oneChunkMode ) {
510- yield {
511- chunk : completeChunk ,
512- encoding : this . chunkBufferEncoding ,
513- } ;
514- }
515560 }
516561
517562 createURI ( ) : Promise < string > ;
@@ -680,10 +725,7 @@ export class Upload extends Writable {
680725 }
681726
682727 // A queue for the upstream data
683- const upstreamQueue = this . upstreamIterator (
684- expectedUploadSize ,
685- multiChunkMode // multi-chunk mode should return 1 chunk per request
686- ) ;
728+ const upstreamQueue = this . upstreamIterator ( expectedUploadSize ) ;
687729
688730 // The primary read stream for this request. This stream retrieves no more
689731 // than the exact requested amount from upstream.
@@ -696,15 +738,23 @@ export class Upload extends Writable {
696738
697739 if ( result . value ) {
698740 this . numChunksReadInRequest ++ ;
699- this . lastChunkSent = result . value . chunk ;
700- this . numBytesWritten += result . value . chunk . byteLength ;
741+
742+ if ( multiChunkMode ) {
743+ // save ever buffer used in the request in multi-chunk mode
744+ this . #addLocalBufferCache( result . value ) ;
745+ } else {
746+ this . #resetLocalBuffersCache( ) ;
747+ this . #addLocalBufferCache( result . value ) ;
748+ }
749+
750+ this . numBytesWritten += result . value . byteLength ;
701751
702752 this . emit ( 'progress' , {
703753 bytesWritten : this . numBytesWritten ,
704754 contentLength : this . contentLength ,
705755 } ) ;
706756
707- requestStream . push ( result . value . chunk , result . value . encoding ) ;
757+ requestStream . push ( result . value ) ;
708758 }
709759
710760 if ( result . done ) {
@@ -720,17 +770,21 @@ export class Upload extends Writable {
720770 // If using multiple chunk upload, set appropriate header
721771 if ( multiChunkMode ) {
722772 // We need to know how much data is available upstream to set the `Content-Range` header.
723- const oneChunkIterator = this . upstreamIterator ( expectedUploadSize , true ) ;
724- const { value} = await oneChunkIterator . next ( ) ;
773+ // https://cloud.google.com/storage/docs/performing-resumable-uploads#chunked-upload
774+ for await ( const chunk of this . upstreamIterator ( expectedUploadSize ) ) {
775+ // This will conveniently track and keep the size of the buffers
776+ this . #addLocalBufferCache( chunk ) ;
777+ }
725778
726- const bytesToUpload = value ! . chunk . byteLength ;
779+ // We hit either the expected upload size or the remainder
780+ const bytesToUpload = this . localWriteCacheByteLength ;
727781
728782 // Important: we want to know if the upstream has ended and the queue is empty before
729783 // unshifting data back into the queue. This way we will know if this is the last request or not.
730784 const isLastChunkOfUpload = ! ( await this . waitForNextChunk ( ) ) ;
731785
732- // Important: put the data back in the queue for the actual upload iterator
733- this . unshiftChunkBuffer ( value ! . chunk ) ;
786+ // Important: put the data back in the queue for the actual upload
787+ this . prependLocalBufferToUpstream ( ) ;
734788
735789 let totalObjectSize = this . contentLength ;
736790
@@ -808,15 +862,14 @@ export class Upload extends Writable {
808862 // - https://cloud.google.com/storage/docs/performing-resumable-uploads#chunked-upload
809863 const missingBytes = this . numBytesWritten - this . offset ;
810864 if ( missingBytes ) {
811- const dataToPrependForResending = this . lastChunkSent . slice (
812- - missingBytes
813- ) ;
814865 // As multi-chunk uploads send one chunk per request and pulls one
815866 // chunk into the pipeline, prepending the missing bytes back should
816867 // be fine for the next request.
817- this . unshiftChunkBuffer ( dataToPrependForResending ) ;
868+ this . prependLocalBufferToUpstream ( missingBytes ) ;
818869 this . numBytesWritten -= missingBytes ;
819- this . lastChunkSent = Buffer . alloc ( 0 ) ;
870+ } else {
871+ // No bytes missing - no need to keep the local cache
872+ this . #resetLocalBuffersCache( ) ;
820873 }
821874
822875 // continue uploading next chunk
@@ -831,8 +884,8 @@ export class Upload extends Writable {
831884
832885 this . destroy ( err ) ;
833886 } else {
834- // remove the last chunk sent to free memory
835- this . lastChunkSent = Buffer . alloc ( 0 ) ;
887+ // no need to keep the cache
888+ this . #resetLocalBuffersCache ( ) ;
836889
837890 if ( resp && resp . data ) {
838891 resp . data . size = Number ( resp . data . size ) ;
@@ -983,11 +1036,9 @@ export class Upload extends Writable {
9831036 return ;
9841037 }
9851038
986- // Unshift the most recent chunk back in case it's needed for the next
987- // request.
988- this . numBytesWritten -= this . lastChunkSent . byteLength ;
989- this . unshiftChunkBuffer ( this . lastChunkSent ) ;
990- this . lastChunkSent = Buffer . alloc ( 0 ) ;
1039+ // Unshift the local cache back in case it's needed for the next request.
1040+ this . numBytesWritten -= this . localWriteCacheByteLength ;
1041+ this . prependLocalBufferToUpstream ( ) ;
9911042
9921043 // We don't know how much data has been received by the server.
9931044 // `continueUploading` will recheck the offset via `getAndSetOffset`.
0 commit comments