Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor listIncompleteUploads to TypeScript #1228

Merged
merged 13 commits into from
Nov 18, 2023
156 changes: 152 additions & 4 deletions src/internal/client.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import * as http from 'node:http'
import * as https from 'node:https'
import type * as stream from 'node:stream'
import * as stream from 'node:stream'

import * as async from 'async'
import { isBrowser } from 'browser-or-node'
import _ from 'lodash'
import * as qs from 'query-string'
Expand All @@ -27,6 +28,7 @@ import {
isValidEndpoint,
isValidObjectName,
isValidPort,
isValidPrefix,
isVirtualHostStyle,
makeDateLong,
sanitizeETag,
Expand All @@ -43,7 +45,9 @@ import type {
Binary,
BucketItemFromList,
BucketItemStat,
BucketStream,
GetObjectLegalHoldOptions,
IncompleteUploadedBucketItem,
IRequest,
ObjectLockConfigParam,
ObjectLockInfo,
Expand All @@ -59,7 +63,7 @@ import type {
Transport,
VersionIdentificator,
} from './type.ts'
import type { UploadedPart } from './xml-parser.ts'
import type { ListMultipartResult, UploadedPart } from './xml-parser.ts'
import * as xmlParsers from './xml-parser.ts'
import { parseInitiateMultipart, parseObjectLegalHoldConfig } from './xml-parser.ts'

Expand Down Expand Up @@ -125,6 +129,11 @@ export interface RemoveOptions {
forceDelete?: boolean
}

type Part = {
part: number
etag: string
}

export class TypedClient {
protected transport: Transport
protected host: string
Expand Down Expand Up @@ -329,8 +338,13 @@ export class TypedClient {
* Takes care of constructing virtual-host-style or path-style hostname
*/
protected getRequestOptions(
opts: RequestOption & { region: string },
): IRequest & { host: string; headers: Record<string, string> } {
opts: RequestOption & {
region: string
},
): IRequest & {
host: string
headers: Record<string, string>
} {
const method = opts.method
const region = opts.region
const bucketName = opts.bucketName
Expand Down Expand Up @@ -955,6 +969,140 @@ export class TypedClient {

// Calls implemented below are related to multipart.

listIncompleteUploads(
bucket: string,
prefix: string,
recursive: boolean,
): BucketStream<IncompleteUploadedBucketItem> {
if (prefix === undefined) {
prefix = ''
}
if (recursive === undefined) {
recursive = false
}
if (!isValidBucketName(bucket)) {
throw new errors.InvalidBucketNameError('Invalid bucket name: ' + bucket)
}
if (!isValidPrefix(prefix)) {
throw new errors.InvalidPrefixError(`Invalid prefix : ${prefix}`)
}
if (!isBoolean(recursive)) {
throw new TypeError('recursive should be of type "boolean"')
}
const delimiter = recursive ? '' : '/'
let keyMarker = ''
let uploadIdMarker = ''
const uploads: unknown[] = []
let ended = false

// TODO: refactor this with async/await and `stream.Readable.from`
const readStream = new stream.Readable({ objectMode: true })
readStream._read = () => {
// push one upload info per _read()
if (uploads.length) {
return readStream.push(uploads.shift())
}
if (ended) {
return readStream.push(null)
}
this.listIncompleteUploadsQuery(bucket, prefix, keyMarker, uploadIdMarker, delimiter).then(
(result) => {
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
// @ts-ignore
result.prefixes.forEach((prefix) => uploads.push(prefix))
async.eachSeries(
result.uploads,
(upload, cb) => {
// for each incomplete upload add the sizes of its uploaded parts
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
// @ts-ignore
this.listParts(bucket, upload.key, upload.uploadId).then(
(parts: Part[]) => {
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
// @ts-ignore
upload.size = parts.reduce((acc, item) => acc + item.size, 0)
uploads.push(upload)
cb()
},
(err: Error) => cb(err),
)
},
(err) => {
if (err) {
readStream.emit('error', err)
return
}
if (result.isTruncated) {
keyMarker = result.nextKeyMarker
uploadIdMarker = result.nextUploadIdMarker
} else {
ended = true
}

// eslint-disable-next-line @typescript-eslint/ban-ts-comment
// @ts-ignore
readStream._read()
},
)
},
(e) => {
readStream.emit('error', e)
},
)
}
return readStream
}

/**
* Called by listIncompleteUploads to fetch a batch of incomplete uploads.
*/
async listIncompleteUploadsQuery(
bucketName: string,
prefix: string,
keyMarker: string,
uploadIdMarker: string,
delimiter: string,
): Promise<ListMultipartResult> {
if (!isValidBucketName(bucketName)) {
throw new errors.InvalidBucketNameError('Invalid bucket name: ' + bucketName)
}
if (!isString(prefix)) {
throw new TypeError('prefix should be of type "string"')
}
if (!isString(keyMarker)) {
throw new TypeError('keyMarker should be of type "string"')
}
if (!isString(uploadIdMarker)) {
throw new TypeError('uploadIdMarker should be of type "string"')
}
if (!isString(delimiter)) {
throw new TypeError('delimiter should be of type "string"')
}
const queries = []
queries.push(`prefix=${uriEscape(prefix)}`)
queries.push(`delimiter=${uriEscape(delimiter)}`)

if (keyMarker) {
queries.push(`key-marker=${uriEscape(keyMarker)}`)
}
if (uploadIdMarker) {
queries.push(`upload-id-marker=${uploadIdMarker}`)
}

const maxUploads = 1000
queries.push(`max-uploads=${maxUploads}`)
queries.sort()
queries.unshift('uploads')
let query = ''
if (queries.length > 0) {
query = `${queries.join('&')}`
}
const method = 'GET'
const res = await this.makeRequestAsync({ method, bucketName, query })
const body = await readAsString(res)
return xmlParsers.parseListMultipart(body)
}

/**
* Initiate a new multipart upload.
* @internal
Expand Down
76 changes: 74 additions & 2 deletions src/internal/xml-parser.ts
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,9 @@ export type Multipart = {
storageClass: unknown
initiated: unknown
}>
prefixes: { prefix: string }[]
prefixes: {
prefix: string
}[]
isTruncated: boolean
nextKeyMarker: undefined
nextUploadIdMarker: undefined
Expand All @@ -167,7 +169,11 @@ export function parseListParts(xml: string): {
parts: UploadedPart[]
} {
let xmlobj = parseXml(xml)
const result: { isTruncated: boolean; marker: number; parts: UploadedPart[] } = {
const result: {
isTruncated: boolean
marker: number
parts: UploadedPart[]
} = {
isTruncated: false,
parts: [],
marker: 0,
Expand Down Expand Up @@ -263,6 +269,72 @@ export function parseTagging(xml: string) {
return result
}

type UploadID = unknown

export type ListMultipartResult = {
uploads: {
key: string
uploadId: UploadID
initiator: unknown
owner: unknown
storageClass: unknown
initiated: unknown
}[]
prefixes: {
prefix: string
}[]
isTruncated: boolean
nextKeyMarker: string
nextUploadIdMarker: string
}

// parse XML response for listing in-progress multipart uploads
export function parseListMultipart(xml: string): ListMultipartResult {
const result: ListMultipartResult = {
prefixes: [],
uploads: [],
isTruncated: false,
nextKeyMarker: '',
nextUploadIdMarker: '',
}

let xmlobj = parseXml(xml)

if (!xmlobj.ListMultipartUploadsResult) {
throw new errors.InvalidXMLError('Missing tag: "ListMultipartUploadsResult"')
}
xmlobj = xmlobj.ListMultipartUploadsResult
if (xmlobj.IsTruncated) {
result.isTruncated = xmlobj.IsTruncated
}
if (xmlobj.NextKeyMarker) {
result.nextKeyMarker = xmlobj.NextKeyMarker
}
if (xmlobj.NextUploadIdMarker) {
result.nextUploadIdMarker = xmlobj.nextUploadIdMarker || ''
}

if (xmlobj.CommonPrefixes) {
toArray(xmlobj.CommonPrefixes).forEach((prefix) => {
// @ts-expect-error index check
result.prefixes.push({ prefix: sanitizeObjectKey(toArray<string>(prefix.Prefix)[0]) })
})
}

if (xmlobj.Upload) {
toArray(xmlobj.Upload).forEach((upload) => {
const key = upload.Key
const uploadId = upload.UploadId
const initiator = { id: upload.Initiator.ID, displayName: upload.Initiator.DisplayName }
const owner = { id: upload.Owner.ID, displayName: upload.Owner.DisplayName }
const storageClass = upload.StorageClass
const initiated = new Date(upload.Initiated)
result.uploads.push({ key, uploadId, initiator, owner, storageClass, initiated })
})
}
return result
}

export function parseObjectLockConfig(xml: string): ObjectLockInfo {
const xmlObj = parseXml(xml)
let lockConfigResult = {} as ObjectLockInfo
Expand Down
6 changes: 0 additions & 6 deletions src/minio.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -230,12 +230,6 @@ export class Client extends TypedClient {

listObjectsV2(bucketName: string, prefix?: string, recursive?: boolean, startAfter?: string): BucketStream<BucketItem>

listIncompleteUploads(
bucketName: string,
prefix?: string,
recursive?: boolean,
): BucketStream<IncompleteUploadedBucketItem>

getBucketVersioning(bucketName: string, callback: ResultCallback<VersioningConfig>): void
getBucketVersioning(bucketName: string): Promise<VersioningConfig>

Expand Down
Loading
Loading