diff --git a/lib/storage/file.js b/lib/storage/file.js index 0b5bda529d0..29ab8ef406a 100644 --- a/lib/storage/file.js +++ b/lib/storage/file.js @@ -22,11 +22,11 @@ var bufferEqual = require('buffer-equal'); var ConfigStore = require('configstore'); -var crc = require('sse4_crc32'); var crypto = require('crypto'); var duplexify = require('duplexify'); var format = require('string-format-obj'); var fs = require('fs'); +var hashStreamValidation = require('hash-stream-validation'); var is = require('is'); var once = require('once'); var request = require('request').defaults({ @@ -430,35 +430,20 @@ File.prototype.createReadStream = function(options) { var tailRequest = options.end < 0; var throughStream = streamEvents(through()); - var validations = ['crc32c', 'md5']; - var validation; - - // For data integrity, hash the contents of the stream as we receive it - // from the server. - var localCrcHash; - var localMd5Hash = crypto.createHash('md5'); + var crc32c = false; + var md5 = false; if (is.string(options.validation)) { options.validation = options.validation.toLowerCase(); - - if (validations.indexOf(options.validation) > -1) { - validation = options.validation; - } else { - validation = 'all'; - } + crc32c = options.validation === 'crc32c'; + md5 = options.validation === 'md5'; } if (is.undef(options.validation)) { - validation = 'all'; + crc32c = true; + md5 = true; } - if (rangeRequest) { - validation = false; - } - - var crc32c = validation === 'crc32c' || validation === 'all'; - var md5 = validation === 'md5' || validation === 'all'; - var remoteFilePath = format('https://storage.googleapis.com/{b}/{o}', { b: this.bucket.name, o: encodeURIComponent(this.name) @@ -476,36 +461,6 @@ File.prototype.createReadStream = function(options) { makeAuthorizedReq(remoteFilePath); - // Calculate the hashes from the http.IncomingMessage response stream, which - // will return the bytes from the source without decompressing gzip'd content. - // The request stream will do the decompression so the user receives the - // expected content. - // - // incomingMessage's end event will always occur before request's complete - // event. - throughStream.on('response', function(incomingMessage) { - incomingMessage - .on('data', function(chunk) { - if (crc32c) { - localCrcHash = crc.calculate(chunk, localCrcHash); - } - - if (md5) { - localMd5Hash.update(chunk); - } - }) - - .on('end', function() { - if (crc32c) { - localCrcHash = new Buffer([localCrcHash]).toString('base64'); - } - - if (md5) { - localMd5Hash = localMd5Hash.digest('base64'); - } - }); - }); - return throughStream; // Authenticate the request, then pipe the remote API request to the stream @@ -537,19 +492,36 @@ File.prototype.createReadStream = function(options) { } var requestStream = that.bucket.storage.makeAuthorizedRequest_(reqOpts); + var validateStream; + + function onHttpResponse(incomingMessage) { + throughStream.emit('response', incomingMessage); + + util.handleResp(null, incomingMessage, null, function(err) { + if (err) { + requestStream.unpipe(throughStream); + return; + } + + if (!rangeRequest && (crc32c || md5)) { + validateStream = hashStreamValidation({ + crc32c: crc32c, + md5: md5 + }); + + // Calculate the hashes from the http.IncomingMessage response stream, + // which will return the bytes from the source without decompressing + // gzip'd content. The request stream will do the decompression so the + // user receives the expected content. + incomingMessage.pipe(validateStream).on('data', util.noop); + } + }); + } requestStream .on('error', endThroughStream) - .on('response', function(incomingMessage) { - throughStream.emit('response', incomingMessage); - - util.handleResp(null, incomingMessage, null, function(err) { - if (err) { - requestStream.unpipe(throughStream); - } - }); - }) + .on('response', onHttpResponse) .on('complete', function(res) { util.handleResp(null, res, null, function(err) { @@ -564,31 +536,23 @@ File.prototype.createReadStream = function(options) { return; } - var failed = false; - var crcFail = true; - var md5Fail = true; - var hashes = {}; res.headers['x-goog-hash'].split(',').forEach(function(hash) { var hashType = hash.split('=')[0].trim(); hashes[hashType] = hash.substr(hash.indexOf('=') + 1); }); - var remoteMd5 = hashes.md5; - var remoteCrc = hashes.crc32c && hashes.crc32c.substr(4); - - if (crc32c) { - crcFail = localCrcHash !== remoteCrc; - failed = crcFail; - } + var failed = false; - if (md5) { - md5Fail = localMd5Hash !== remoteMd5; - failed = md5Fail; + if (crc32c && hashes.crc32c) { + // We must remove the first four bytes from the returned checksum. + // http://stackoverflow.com/questions/25096737/ + // base64-encoding-of-crc32c-long-value + failed = !validateStream.test('crc32c', hashes.crc32c.substr(4)); } - if (validation === 'all') { - failed = remoteMd5 ? md5Fail : crcFail; + if (md5 && hashes.md5) { + failed = !validateStream.test('md5', hashes.md5); } if (failed) { @@ -600,9 +564,10 @@ File.prototype.createReadStream = function(options) { mismatchError.code = 'CONTENT_DOWNLOAD_MISMATCH'; endThroughStream(mismatchError, res); - } else { - endThroughStream(null, res); + return; } + + endThroughStream(null, res); }); }) @@ -717,63 +682,36 @@ File.prototype.createWriteStream = function(options) { metadata.contentEncoding = 'gzip'; } - var validations = ['crc32c', 'md5']; - var validation; + var crc32c = false; + var md5 = false; if (is.string(options.validation)) { options.validation = options.validation.toLowerCase(); - - if (validations.indexOf(options.validation) > -1) { - validation = options.validation; - } else { - validation = 'all'; - } + crc32c = options.validation === 'crc32c'; + md5 = options.validation === 'md5'; } if (is.undef(options.validation)) { - validation = 'all'; + crc32c = true; + md5 = true; } - var crc32c = validation === 'crc32c' || validation === 'all'; - var md5 = validation === 'md5' || validation === 'all'; - // Collect data as it comes in to store in a hash. This is compared to the // checksum value on the returned metadata from the API. - var localCrc32cHash; - var localMd5Hash = crypto.createHash('md5'); + var validateStream = hashStreamValidation({ + crc32c: crc32c, + md5: md5 + }); var writableStream = streamEvents(duplexify()); var throughStream = through(); - var validationStream = through(function(chunk, enc, next) { - if (crc32c) { - localCrc32cHash = crc.calculate(chunk, localCrc32cHash); - } - - if (md5) { - localMd5Hash.update(chunk); - } - - this.push(chunk); - next(); - }); - - validationStream.on('end', function() { - if (crc32c) { - localCrc32cHash = new Buffer([localCrc32cHash]).toString('base64'); - } - - if (md5) { - localMd5Hash = localMd5Hash.digest('base64'); - } - }); - throughStream .pipe(gzip ? zlib.createGzip() : through()) - .pipe(validationStream) + .pipe(validateStream) .pipe(writableStream) @@ -795,20 +733,15 @@ File.prototype.createWriteStream = function(options) { .on('complete', function(metadata) { var failed = false; - // We must remove the first four bytes from the returned checksum. - // http://stackoverflow.com/questions/25096737/ - // base64-encoding-of-crc32c-long-value + if (crc32c && metadata.crc32c) { + // We must remove the first four bytes from the returned checksum. + // http://stackoverflow.com/questions/25096737/ + // base64-encoding-of-crc32c-long-value + failed = !validateStream.test('crc32c', metadata.crc32c.substr(4)); + } - if (validation === 'all') { - if (metadata.md5Hash) { - failed = localMd5Hash !== metadata.md5Hash; - } else if (metadata.crc32c) { - failed = localCrc32cHash !== metadata.crc32c.substr(4); - } - } else if (md5) { - failed = localMd5Hash !== metadata.md5Hash; - } else if (crc32c) { - failed = localCrc32cHash !== metadata.crc32c.substr(4); + if (md5 && metadata.md5Hash) { + failed = !validateStream.test('md5', metadata.md5Hash); } if (failed) { diff --git a/package.json b/package.json index e5b4c7cfc55..448c67f61f6 100644 --- a/package.json +++ b/package.json @@ -58,6 +58,7 @@ "extend": "^2.0.0", "gce-images": "^0.1.0", "google-auto-auth": "^0.2.0", + "hash-stream-validation": "^0.1.0", "is": "^3.0.1", "methmeth": "^1.0.0", "mime-types": "^2.0.8", @@ -69,7 +70,6 @@ "request": "^2.53.0", "retry-request": "^1.2.1", "split-array-stream": "^1.0.0", - "sse4_crc32": "^3.1.0", "stream-events": "^1.0.1", "stream-forward": "^3.0.0", "string-format-obj": "^1.0.0", diff --git a/test/bigquery/dataset.js b/test/bigquery/dataset.js index d91c999ef98..067893e4b91 100644 --- a/test/bigquery/dataset.js +++ b/test/bigquery/dataset.js @@ -18,10 +18,6 @@ 'use strict'; -// If we don't stub see4_crc32 and use mockery, we get "Module did not self- -// register". -var crc = require('sse4_crc32'); - var arrify = require('arrify'); var assert = require('assert'); var util = require('../../lib/common/util'); @@ -49,7 +45,11 @@ describe('BigQuery/Dataset', function() { var ds; before(function() { - mockery.registerMock('sse4_crc32', crc); + // If we don't stub see4_crc32 and use mockery, we get "Module did not self- + // register". + var crc32c = require('hash-stream-validation/node_modules/sse4_crc32'); + mockery.registerMock('sse4_crc32', crc32c); + mockery.registerMock('../common/stream-router.js', fakeStreamRouter); mockery.enable({ useCleanCache: true, diff --git a/test/storage/file.js b/test/storage/file.js index 389983a0c99..b32018631bc 100644 --- a/test/storage/file.js +++ b/test/storage/file.js @@ -18,8 +18,6 @@ var assert = require('assert'); var Bucket = require('../../lib/storage/bucket.js'); -var crc = require('sse4_crc32'); -var crypto = require('crypto'); var duplexify = require('duplexify'); var extend = require('extend'); var format = require('string-format-obj'); @@ -107,7 +105,11 @@ describe('File', function() { var bucket; before(function() { - mockery.registerMock('sse4_crc32', crc); + // If we don't stub see4_crc32 and use mockery, we get "Module did not self- + // register". + var crc32c = require('hash-stream-validation/node_modules/sse4_crc32'); + mockery.registerMock('sse4_crc32', crc32c); + mockery.registerMock('configstore', FakeConfigStore); mockery.registerMock('duplexify', FakeDuplexify); mockery.registerMock('request', fakeRequest); @@ -438,10 +440,8 @@ describe('File', function() { var self = this; setImmediate(function() { - var responseStream = through(); - self.emit('response', responseStream); - responseStream.push(data); - responseStream.push(null); + var stream = new FakeRequest(); + self.emit('response', stream); setImmediate(function() { self.emit('complete', fakeResponse); @@ -657,18 +657,12 @@ describe('File', function() { describe('validation', function() { var data = 'test'; - var crc32cBase64 = new Buffer([crc.calculate(data)]).toString('base64'); - - var md5HashBase64 = crypto.createHash('md5'); - md5HashBase64.update(data); - md5HashBase64 = md5HashBase64.digest('base64'); - var fakeResponse = { crc32c: { - headers: { 'x-goog-hash': 'crc32c=####' + crc32cBase64 } + headers: { 'x-goog-hash': 'crc32c=####wA==' } }, md5: { - headers: { 'x-goog-hash': 'md5=' + md5HashBase64 } + headers: { 'x-goog-hash': 'md5=CY9rzUYh03PK3k6DJie09g==' } } }; @@ -717,7 +711,7 @@ describe('File', function() { it('should emit an error if md5 validation fails', function(done) { requestOverride = getFakeSuccessfulRequest( - 'bad-data', fakeResponse.crc32c); + 'bad-data', fakeResponse.md5); file.createReadStream({ validation: 'md5' }) .on('error', function(err) { @@ -959,15 +953,9 @@ describe('File', function() { describe('validation', function() { var data = 'test'; - var crc32cBase64 = new Buffer([crc.calculate(data)]).toString('base64'); - - var md5HashBase64 = crypto.createHash('md5'); - md5HashBase64.update(data); - md5HashBase64 = md5HashBase64.digest('base64'); - var fakeMetadata = { - crc32c: { crc32c: '####' + crc32cBase64 }, - md5: { md5Hash: md5HashBase64 } + crc32c: { crc32c: '####wA==' }, + md5: { md5Hash: 'CY9rzUYh03PK3k6DJie09g==' } }; it('should validate with crc32c', function(done) { diff --git a/test/storage/index.js b/test/storage/index.js index 9c3957e3734..b41c8753357 100644 --- a/test/storage/index.js +++ b/test/storage/index.js @@ -16,10 +16,6 @@ 'use strict'; -// If we don't stub see4_crc32 and use mockery, we get "Module did not self- -// register". -var crc = require('sse4_crc32'); - var arrify = require('arrify'); var assert = require('assert'); var extend = require('extend'); @@ -47,7 +43,11 @@ describe('Storage', function() { var Bucket; before(function() { - mockery.registerMock('sse4_crc32', crc); + // If we don't stub see4_crc32 and use mockery, we get "Module did not self- + // register". + var crc32c = require('hash-stream-validation/node_modules/sse4_crc32'); + mockery.registerMock('sse4_crc32', crc32c); + mockery.registerMock('../common/stream-router.js', fakeStreamRouter); mockery.enable({ useCleanCache: true,