Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: remove use fPutObject impl, use putObject internally #1159

Merged
merged 1 commit into from
Jun 2, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
247 changes: 6 additions & 241 deletions src/minio.js
Original file line number Diff line number Diff line change
Expand Up @@ -598,153 +598,12 @@ export class Client extends TypedClient {
// Inserts correct `content-type` attribute based on metaData and filePath
metaData = insertContentType(metaData, filePath)

// Updates metaData to have the correct prefix if needed
metaData = prependXAMZMeta(metaData)
var size
var partSize

async.waterfall(
[
(cb) => fs.stat(filePath, cb),
(stats, cb) => {
size = stats.size
var stream
var cbTriggered = false
var origCb = cb
cb = function () {
if (cbTriggered) {
return
}
cbTriggered = true
if (stream) {
stream.destroy()
}
return origCb.apply(this, arguments)
}
if (size > this.maxObjectSize) {
return cb(new Error(`${filePath} size : ${stats.size}, max allowed size : 5TB`))
}
if (size <= this.partSize) {
// simple PUT request, no multipart
var multipart = false
var uploader = this.getUploader(bucketName, objectName, metaData, multipart)
var hash = transformers.getHashSummer(this.enableSHA256)
var start = 0
var end = size - 1
var autoClose = true
if (size === 0) {
end = 0
}
var options = { start, end, autoClose }
pipesetup(fs.createReadStream(filePath, options), hash)
.on('data', (data) => {
var md5sum = data.md5sum
var sha256sum = data.sha256sum
stream = fs.createReadStream(filePath, options)
uploader(stream, size, sha256sum, md5sum, (err, objInfo) => {
callback(err, objInfo)
cb(true)
})
})
.on('error', (e) => cb(e))
return
}
this.findUploadId(bucketName, objectName, cb)
},
(uploadId, cb) => {
// if there was a previous incomplete upload, fetch all its uploaded parts info
if (uploadId) {
return this.listParts(bucketName, objectName, uploadId, (e, etags) => cb(e, uploadId, etags))
}
// there was no previous upload, initiate a new one
this.initiateNewMultipartUpload(bucketName, objectName, metaData, (e, uploadId) => cb(e, uploadId, []))
},
(uploadId, etags, cb) => {
partSize = this.calculatePartSize(size)
var multipart = true
var uploader = this.getUploader(bucketName, objectName, metaData, multipart)

// convert array to object to make things easy
var parts = etags.reduce(function (acc, item) {
if (!acc[item.part]) {
acc[item.part] = item
}
return acc
}, {})
var partsDone = []
var partNumber = 1
var uploadedSize = 0
async.whilst(
(cb) => {
cb(null, uploadedSize < size)
},
(cb) => {
var stream
var cbTriggered = false
var origCb = cb
cb = function () {
if (cbTriggered) {
return
}
cbTriggered = true
if (stream) {
stream.destroy()
}
return origCb.apply(this, arguments)
}
var part = parts[partNumber]
var hash = transformers.getHashSummer(this.enableSHA256)
var length = partSize
if (length > size - uploadedSize) {
length = size - uploadedSize
}
var start = uploadedSize
var end = uploadedSize + length - 1
var autoClose = true
var options = { autoClose, start, end }
// verify md5sum of each part
pipesetup(fs.createReadStream(filePath, options), hash)
.on('data', (data) => {
var md5sumHex = Buffer.from(data.md5sum, 'base64').toString('hex')
if (part && md5sumHex === part.etag) {
// md5 matches, chunk already uploaded
partsDone.push({ part: partNumber, etag: part.etag })
partNumber++
uploadedSize += length
return cb()
}
// part is not uploaded yet, or md5 mismatch
stream = fs.createReadStream(filePath, options)
uploader(uploadId, partNumber, stream, length, data.sha256sum, data.md5sum, (e, objInfo) => {
if (e) {
return cb(e)
}
partsDone.push({ part: partNumber, etag: objInfo.etag })
partNumber++
uploadedSize += length
return cb()
})
})
.on('error', (e) => cb(e))
},
(e) => {
if (e) {
return cb(e)
}
cb(null, partsDone, uploadId)
},
)
},
// all parts uploaded, complete the multipart upload
(etags, uploadId, cb) => this.completeMultipartUpload(bucketName, objectName, uploadId, etags, cb),
],
(err, ...rest) => {
if (err === true) {
return
}
callback(err, ...rest)
},
)
fs.lstat(filePath, (err, stat) => {
if (err) {
return callback(err)
}
return this.putObject(bucketName, objectName, fs.createReadStream(filePath), stat.size, metaData, callback)
})
}

// Uploads the object.
Expand Down Expand Up @@ -1926,100 +1785,6 @@ export class Client extends TypedClient {
listNext('', '')
}

// Returns a function that can be used for uploading objects.
// If multipart === true, it returns function that is used to upload
// a part of the multipart.
getUploader(bucketName, objectName, metaData, multipart) {
if (!isValidBucketName(bucketName)) {
throw new errors.InvalidBucketNameError('Invalid bucket name: ' + bucketName)
}
if (!isValidObjectName(objectName)) {
throw new errors.InvalidObjectNameError(`Invalid object name: ${objectName}`)
}
if (!isBoolean(multipart)) {
throw new TypeError('multipart should be of type "boolean"')
}
if (!isObject(metaData)) {
throw new TypeError('metadata should be of type "object"')
}

var validate = (stream, length, sha256sum, md5sum, cb) => {
if (!isReadableStream(stream)) {
throw new TypeError('stream should be of type "Stream"')
}
if (!isNumber(length)) {
throw new TypeError('length should be of type "number"')
}
if (!isString(sha256sum)) {
throw new TypeError('sha256sum should be of type "string"')
}
if (!isString(md5sum)) {
throw new TypeError('md5sum should be of type "string"')
}
if (!isFunction(cb)) {
throw new TypeError('callback should be of type "function"')
}
}
var simpleUploader = (...args) => {
validate(...args)
var query = ''
upload(query, ...args)
}
var multipartUploader = (uploadId, partNumber, ...rest) => {
if (!isString(uploadId)) {
throw new TypeError('uploadId should be of type "string"')
}
if (!isNumber(partNumber)) {
throw new TypeError('partNumber should be of type "number"')
}
if (!uploadId) {
throw new errors.InvalidArgumentError('Empty uploadId')
}
if (!partNumber) {
throw new errors.InvalidArgumentError('partNumber cannot be 0')
}
validate(...rest)
var query = `partNumber=${partNumber}&uploadId=${uriEscape(uploadId)}`
upload(query, ...rest)
}
var upload = (query, stream, length, sha256sum, md5sum, cb) => {
var method = 'PUT'
let headers = { 'Content-Length': length }

if (!multipart) {
headers = Object.assign({}, metaData, headers)
}

if (!this.enableSHA256) {
headers['Content-MD5'] = md5sum
}
this.makeRequestStream(
{ method, bucketName, objectName, query, headers },
stream,
sha256sum,
[200],
'',
true,
(e, response) => {
if (e) {
return cb(e)
}
const result = {
etag: sanitizeETag(response.headers.etag),
versionId: getVersionId(response.headers),
}
// Ignore the 'data' event so that the stream closes. (nodejs stream requirement)
response.on('data', () => {})
cb(null, result)
},
)
}
if (multipart) {
return multipartUploader
}
return simpleUploader
}

// Remove all the notification configurations in the S3 provider
setBucketNotification(bucketName, config, cb) {
if (!isValidBucketName(bucketName)) {
Expand Down