Skip to content

Commit

Permalink
storage: use hash-stream-validation
Browse files Browse the repository at this point in the history
  • Loading branch information
stephenplusplus committed Aug 26, 2015
1 parent 2b80071 commit 55872da
Show file tree
Hide file tree
Showing 5 changed files with 86 additions and 165 deletions.
193 changes: 63 additions & 130 deletions lib/storage/file.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@

var bufferEqual = require('buffer-equal');
var ConfigStore = require('configstore');
var crc = require('sse4_crc32');
var crypto = require('crypto');
var duplexify = require('duplexify');
var format = require('string-format-obj');
var fs = require('fs');
var hashStreamValidation = require('hash-stream-validation');
var is = require('is');
var once = require('once');
var request = require('request').defaults({
Expand Down Expand Up @@ -430,35 +430,20 @@ File.prototype.createReadStream = function(options) {
var tailRequest = options.end < 0;
var throughStream = streamEvents(through());

var validations = ['crc32c', 'md5'];
var validation;

// For data integrity, hash the contents of the stream as we receive it
// from the server.
var localCrcHash;
var localMd5Hash = crypto.createHash('md5');
var crc32c = false;
var md5 = false;

if (is.string(options.validation)) {
options.validation = options.validation.toLowerCase();

if (validations.indexOf(options.validation) > -1) {
validation = options.validation;
} else {
validation = 'all';
}
crc32c = options.validation === 'crc32c';
md5 = options.validation === 'md5';
}

if (is.undef(options.validation)) {
validation = 'all';
crc32c = true;
md5 = true;
}

if (rangeRequest) {
validation = false;
}

var crc32c = validation === 'crc32c' || validation === 'all';
var md5 = validation === 'md5' || validation === 'all';

var remoteFilePath = format('https://storage.googleapis.com/{b}/{o}', {
b: this.bucket.name,
o: encodeURIComponent(this.name)
Expand All @@ -476,36 +461,6 @@ File.prototype.createReadStream = function(options) {

makeAuthorizedReq(remoteFilePath);

// Calculate the hashes from the http.IncomingMessage response stream, which
// will return the bytes from the source without decompressing gzip'd content.
// The request stream will do the decompression so the user receives the
// expected content.
//
// incomingMessage's end event will always occur before request's complete
// event.
throughStream.on('response', function(incomingMessage) {
incomingMessage
.on('data', function(chunk) {
if (crc32c) {
localCrcHash = crc.calculate(chunk, localCrcHash);
}

if (md5) {
localMd5Hash.update(chunk);
}
})

.on('end', function() {
if (crc32c) {
localCrcHash = new Buffer([localCrcHash]).toString('base64');
}

if (md5) {
localMd5Hash = localMd5Hash.digest('base64');
}
});
});

return throughStream;

// Authenticate the request, then pipe the remote API request to the stream
Expand Down Expand Up @@ -537,19 +492,36 @@ File.prototype.createReadStream = function(options) {
}

var requestStream = that.bucket.storage.makeAuthorizedRequest_(reqOpts);
var validateStream;

function onHttpResponse(incomingMessage) {
throughStream.emit('response', incomingMessage);

util.handleResp(null, incomingMessage, null, function(err) {
if (err) {
requestStream.unpipe(throughStream);
return;
}

if (!rangeRequest && (crc32c || md5)) {
validateStream = hashStreamValidation({
crc32c: crc32c,
md5: md5
});

// Calculate the hashes from the http.IncomingMessage response stream,
// which will return the bytes from the source without decompressing
// gzip'd content. The request stream will do the decompression so the
// user receives the expected content.
incomingMessage.pipe(validateStream).on('data', util.noop);
}
});
}

requestStream
.on('error', endThroughStream)

.on('response', function(incomingMessage) {
throughStream.emit('response', incomingMessage);

util.handleResp(null, incomingMessage, null, function(err) {
if (err) {
requestStream.unpipe(throughStream);
}
});
})
.on('response', onHttpResponse)

.on('complete', function(res) {
util.handleResp(null, res, null, function(err) {
Expand All @@ -564,31 +536,23 @@ File.prototype.createReadStream = function(options) {
return;
}

var failed = false;
var crcFail = true;
var md5Fail = true;

var hashes = {};
res.headers['x-goog-hash'].split(',').forEach(function(hash) {
var hashType = hash.split('=')[0].trim();
hashes[hashType] = hash.substr(hash.indexOf('=') + 1);
});

var remoteMd5 = hashes.md5;
var remoteCrc = hashes.crc32c && hashes.crc32c.substr(4);

if (crc32c) {
crcFail = localCrcHash !== remoteCrc;
failed = crcFail;
}
var failed = false;

if (md5) {
md5Fail = localMd5Hash !== remoteMd5;
failed = md5Fail;
if (crc32c && hashes.crc32c) {
// We must remove the first four bytes from the returned checksum.
// http://stackoverflow.com/questions/25096737/
// base64-encoding-of-crc32c-long-value
failed = !validateStream.test('crc32c', hashes.crc32c.substr(4));
}

if (validation === 'all') {
failed = remoteMd5 ? md5Fail : crcFail;
if (md5 && hashes.md5) {
failed = !validateStream.test('md5', hashes.md5);
}

if (failed) {
Expand All @@ -600,9 +564,10 @@ File.prototype.createReadStream = function(options) {
mismatchError.code = 'CONTENT_DOWNLOAD_MISMATCH';

endThroughStream(mismatchError, res);
} else {
endThroughStream(null, res);
return;
}

endThroughStream(null, res);
});
})

Expand Down Expand Up @@ -717,63 +682,36 @@ File.prototype.createWriteStream = function(options) {
metadata.contentEncoding = 'gzip';
}

var validations = ['crc32c', 'md5'];
var validation;
var crc32c = false;
var md5 = false;

if (is.string(options.validation)) {
options.validation = options.validation.toLowerCase();

if (validations.indexOf(options.validation) > -1) {
validation = options.validation;
} else {
validation = 'all';
}
crc32c = options.validation === 'crc32c';
md5 = options.validation === 'md5';
}

if (is.undef(options.validation)) {
validation = 'all';
crc32c = true;
md5 = true;
}

var crc32c = validation === 'crc32c' || validation === 'all';
var md5 = validation === 'md5' || validation === 'all';

// Collect data as it comes in to store in a hash. This is compared to the
// checksum value on the returned metadata from the API.
var localCrc32cHash;
var localMd5Hash = crypto.createHash('md5');
var validateStream = hashStreamValidation({
crc32c: crc32c,
md5: md5
});

var writableStream = streamEvents(duplexify());

var throughStream = through();

var validationStream = through(function(chunk, enc, next) {
if (crc32c) {
localCrc32cHash = crc.calculate(chunk, localCrc32cHash);
}

if (md5) {
localMd5Hash.update(chunk);
}

this.push(chunk);
next();
});

validationStream.on('end', function() {
if (crc32c) {
localCrc32cHash = new Buffer([localCrc32cHash]).toString('base64');
}

if (md5) {
localMd5Hash = localMd5Hash.digest('base64');
}
});

throughStream

.pipe(gzip ? zlib.createGzip() : through())

.pipe(validationStream)
.pipe(validateStream)

.pipe(writableStream)

Expand All @@ -795,20 +733,15 @@ File.prototype.createWriteStream = function(options) {
.on('complete', function(metadata) {
var failed = false;

// We must remove the first four bytes from the returned checksum.
// http://stackoverflow.com/questions/25096737/
// base64-encoding-of-crc32c-long-value
if (crc32c && metadata.crc32c) {
// We must remove the first four bytes from the returned checksum.
// http://stackoverflow.com/questions/25096737/
// base64-encoding-of-crc32c-long-value
failed = !validateStream.test('crc32c', metadata.crc32c.substr(4));
}

if (validation === 'all') {
if (metadata.md5Hash) {
failed = localMd5Hash !== metadata.md5Hash;
} else if (metadata.crc32c) {
failed = localCrc32cHash !== metadata.crc32c.substr(4);
}
} else if (md5) {
failed = localMd5Hash !== metadata.md5Hash;
} else if (crc32c) {
failed = localCrc32cHash !== metadata.crc32c.substr(4);
if (md5 && metadata.md5Hash) {
failed = !validateStream.test('md5', metadata.md5Hash);
}

if (failed) {
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
"extend": "^2.0.0",
"gce-images": "^0.1.0",
"google-auto-auth": "^0.2.0",
"hash-stream-validation": "^0.1.0",
"is": "^3.0.1",
"methmeth": "^1.0.0",
"mime-types": "^2.0.8",
Expand All @@ -69,7 +70,6 @@
"request": "^2.53.0",
"retry-request": "^1.2.1",
"split-array-stream": "^1.0.0",
"sse4_crc32": "^3.1.0",
"stream-events": "^1.0.1",
"stream-forward": "^3.0.0",
"string-format-obj": "^1.0.0",
Expand Down
10 changes: 5 additions & 5 deletions test/bigquery/dataset.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,6 @@

'use strict';

// If we don't stub see4_crc32 and use mockery, we get "Module did not self-
// register".
var crc = require('sse4_crc32');

var arrify = require('arrify');
var assert = require('assert');
var util = require('../../lib/common/util');
Expand Down Expand Up @@ -49,7 +45,11 @@ describe('BigQuery/Dataset', function() {
var ds;

before(function() {
mockery.registerMock('sse4_crc32', crc);
// If we don't stub see4_crc32 and use mockery, we get "Module did not self-
// register".
var crc32c = require('hash-stream-validation/node_modules/sse4_crc32');
mockery.registerMock('sse4_crc32', crc32c);

mockery.registerMock('../common/stream-router.js', fakeStreamRouter);
mockery.enable({
useCleanCache: true,
Expand Down
Loading

0 comments on commit 55872da

Please sign in to comment.