Skip to content

Commit

Permalink
fix(NODE-3833): return early on end if gridfs upload stream is alread…
Browse files Browse the repository at this point in the history
…y ended (#3223)
  • Loading branch information
dariakp authored Apr 29, 2022
1 parent b0085a2 commit c27e844
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 1 deletion.
2 changes: 1 addition & 1 deletion src/gridfs/upload.ts
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ export class GridFSBucketWriteStream extends Writable implements NodeJS.Writable
? encodingOrCallback
: callback;

if (checkAborted(this, callback)) return this;
if (this.state.streamEnd || checkAborted(this, callback)) return this;

this.state.streamEnd = true;

Expand Down
75 changes: 75 additions & 0 deletions test/integration/gridfs/gridfs_stream.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ const fs = require('fs');
const { setupDatabase, withClient } = require('./../shared');
const { expect } = require('chai');
const { GridFSBucket, ObjectId } = require('../../../src');
const sinon = require('sinon');
const { sleep } = require('../../tools/utils');

describe('GridFS Stream', function () {
before(function () {
Expand Down Expand Up @@ -947,6 +949,79 @@ describe('GridFS Stream', function () {
}
});

describe('upload stream end()', () => {
let client, db;

afterEach(async () => {
sinon.restore();
await client.close();
});

it('should not call the callback on repeat calls to end', {
metadata: { requires: { topology: ['single'] } },

async test() {
const configuration = this.configuration;
client = configuration.newClient(configuration.writeConcernMax(), {
maxPoolSize: 1
});
await client.connect();
db = client.db(configuration.db);
const bucket = new GridFSBucket(db, { bucketName: 'gridfsabort', chunkSizeBytes: 1 });
const uploadStream = bucket.openUploadStream('test.dat');

const endPromise = new Promise(resolve => {
uploadStream.end('1', resolve);
});

const endPromise2 = new Promise((resolve, reject) => {
uploadStream.end('2', () => {
reject(new Error('Expected callback to not be called on duplicate end'));
});
});

await endPromise;
// in the fail case, the callback would be called when the actual write is finished,
// so we need to give it a moment
await Promise.race([endPromise2, sleep(100)]);
}
});

it('should not write a chunk on repeat calls to end', {
metadata: { requires: { topology: ['single'] } },

async test() {
const configuration = this.configuration;
client = configuration.newClient(configuration.writeConcernMax(), {
maxPoolSize: 1
});
await client.connect();
db = client.db(this.configuration.db);
const bucket = new GridFSBucket(db, { bucketName: 'gridfsabort', chunkSizeBytes: 1 });
const uploadStream = bucket.openUploadStream('test.dat');
const spy = sinon.spy(uploadStream, 'write');

const endPromise = new Promise(resolve => {
uploadStream.end('1', resolve);
});

await endPromise;
expect(spy).to.have.been.calledWith('1');

uploadStream.end('2');

// wait for potential async calls to happen before we close the client
// so that we don't get a client not connected failure in the afterEach
// in the failure case since it would be confusing and unnecessary
// given the assertions we already have for this case
await sleep(100);

expect(spy).not.to.have.been.calledWith('2');
expect(spy.calledOnce).to.be.true;
}
});
});

/**
* Provide start and end parameters for file download to skip ahead x bytes and limit the total amount of bytes read to n
*
Expand Down

0 comments on commit c27e844

Please sign in to comment.