From c27e844850be69bc2ce8ad3a5a93b3f62b4190ae Mon Sep 17 00:00:00 2001 From: Daria Pardue Date: Fri, 29 Apr 2022 11:00:28 -0400 Subject: [PATCH] fix(NODE-3833): return early on end if gridfs upload stream is already ended (#3223) --- src/gridfs/upload.ts | 2 +- test/integration/gridfs/gridfs_stream.test.js | 75 +++++++++++++++++++ 2 files changed, 76 insertions(+), 1 deletion(-) diff --git a/src/gridfs/upload.ts b/src/gridfs/upload.ts index 021f60008a..ae0d678998 100644 --- a/src/gridfs/upload.ts +++ b/src/gridfs/upload.ts @@ -196,7 +196,7 @@ export class GridFSBucketWriteStream extends Writable implements NodeJS.Writable ? encodingOrCallback : callback; - if (checkAborted(this, callback)) return this; + if (this.state.streamEnd || checkAborted(this, callback)) return this; this.state.streamEnd = true; diff --git a/test/integration/gridfs/gridfs_stream.test.js b/test/integration/gridfs/gridfs_stream.test.js index 1c75a4d27a..c57042fd56 100644 --- a/test/integration/gridfs/gridfs_stream.test.js +++ b/test/integration/gridfs/gridfs_stream.test.js @@ -6,6 +6,8 @@ const fs = require('fs'); const { setupDatabase, withClient } = require('./../shared'); const { expect } = require('chai'); const { GridFSBucket, ObjectId } = require('../../../src'); +const sinon = require('sinon'); +const { sleep } = require('../../tools/utils'); describe('GridFS Stream', function () { before(function () { @@ -947,6 +949,79 @@ describe('GridFS Stream', function () { } }); + describe('upload stream end()', () => { + let client, db; + + afterEach(async () => { + sinon.restore(); + await client.close(); + }); + + it('should not call the callback on repeat calls to end', { + metadata: { requires: { topology: ['single'] } }, + + async test() { + const configuration = this.configuration; + client = configuration.newClient(configuration.writeConcernMax(), { + maxPoolSize: 1 + }); + await client.connect(); + db = client.db(configuration.db); + const bucket = new GridFSBucket(db, { bucketName: 'gridfsabort', chunkSizeBytes: 1 }); + const uploadStream = bucket.openUploadStream('test.dat'); + + const endPromise = new Promise(resolve => { + uploadStream.end('1', resolve); + }); + + const endPromise2 = new Promise((resolve, reject) => { + uploadStream.end('2', () => { + reject(new Error('Expected callback to not be called on duplicate end')); + }); + }); + + await endPromise; + // in the fail case, the callback would be called when the actual write is finished, + // so we need to give it a moment + await Promise.race([endPromise2, sleep(100)]); + } + }); + + it('should not write a chunk on repeat calls to end', { + metadata: { requires: { topology: ['single'] } }, + + async test() { + const configuration = this.configuration; + client = configuration.newClient(configuration.writeConcernMax(), { + maxPoolSize: 1 + }); + await client.connect(); + db = client.db(this.configuration.db); + const bucket = new GridFSBucket(db, { bucketName: 'gridfsabort', chunkSizeBytes: 1 }); + const uploadStream = bucket.openUploadStream('test.dat'); + const spy = sinon.spy(uploadStream, 'write'); + + const endPromise = new Promise(resolve => { + uploadStream.end('1', resolve); + }); + + await endPromise; + expect(spy).to.have.been.calledWith('1'); + + uploadStream.end('2'); + + // wait for potential async calls to happen before we close the client + // so that we don't get a client not connected failure in the afterEach + // in the failure case since it would be confusing and unnecessary + // given the assertions we already have for this case + await sleep(100); + + expect(spy).not.to.have.been.calledWith('2'); + expect(spy.calledOnce).to.be.true; + } + }); + }); + /** * Provide start and end parameters for file download to skip ahead x bytes and limit the total amount of bytes read to n *