Skip to content

Commit

Permalink
Remove unused methods on operations helper (#839)
Browse files Browse the repository at this point in the history
* Remove unused methods on OperationsHelper

* Remove unused methods on ObjectOperations

* Fix formatting
  • Loading branch information
tvdias authored Aug 21, 2023
1 parent faad373 commit e1ef9f5
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 328 deletions.
294 changes: 1 addition & 293 deletions Minio/ApiEndpoints/ObjectOperations.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,6 @@
using System.Diagnostics.CodeAnalysis;
using System.Globalization;
using System.Reactive.Linq;
using System.Text;
using System.Xml.Linq;
using CommunityToolkit.HighPerformance;
using Minio.ApiEndpoints;
using Minio.DataModel;
using Minio.DataModel.Args;
Expand All @@ -38,15 +35,6 @@ namespace Minio;
[SuppressMessage("Design", "MA0048:File name must match type name", Justification = "Split up in partial classes")]
public partial class MinioClient : IObjectOperations
{
private readonly List<string> supportedHeaders = new()
{
"cache-control",
"content-encoding",
"content-type",
"x-amz-acl",
"content-disposition"
};

/// <summary>
/// Get an object. The object will be streamed to the callback given by the user.
/// </summary>
Expand Down Expand Up @@ -233,7 +221,7 @@ public async Task<string> PresignedGetObjectAsync(PresignedGetObjectArgs args)
var policyBase64 = args.Policy.Base64();

var t = DateTime.UtcNow;
var signV4Algorithm = "AWS4-HMAC-SHA256";
const string signV4Algorithm = "AWS4-HMAC-SHA256";
var credential = authenticator.GetCredentialString(t, region);
var signature = authenticator.PresignPostSignature(region, t, policyBase64);
args = args.WithDate(t)
Expand Down Expand Up @@ -1110,138 +1098,6 @@ await ExecuteTaskAsync(NoErrorHandlers, requestMessageBuilder, cancellationToken
args.ObjectName);
}

/// <summary>
/// Internal method to complete multi part upload of object to server.
/// </summary>
/// <param name="bucketName">Bucket Name</param>
/// <param name="objectName">Object to be uploaded</param>
/// <param name="uploadId">Upload Id</param>
/// <param name="etags">Etags</param>
/// <param name="cancellationToken">Optional cancellation token to cancel the operation</param>
/// <returns></returns>
private async Task CompleteMultipartUploadAsync(string bucketName, string objectName, string uploadId,
IDictionary<int, string> etags, CancellationToken cancellationToken)
{
var requestMessageBuilder = await CreateRequest(HttpMethod.Post, bucketName,
objectName)
.ConfigureAwait(false);
requestMessageBuilder.AddQueryParameter("uploadId", $"{uploadId}");

var parts = new List<XElement>();

for (var i = 1; i <= etags.Count; i++)
parts.Add(new XElement("Part",
new XElement("PartNumber", i),
new XElement("ETag", etags[i])));

var completeMultipartUploadXml = new XElement("CompleteMultipartUpload", parts);
var bodyString = completeMultipartUploadXml.ToString();

requestMessageBuilder.AddOrUpdateHeaderParameter("Content-Type", "application/xml");

requestMessageBuilder.AddXmlBody(bodyString);
using var response =
await ExecuteTaskAsync(NoErrorHandlers, requestMessageBuilder, cancellationToken: cancellationToken)
.ConfigureAwait(false);
}

/// <summary>
/// Returns an async observable of parts corresponding to a uploadId for a specific bucket and objectName
/// </summary>
/// <param name="bucketName">Bucket Name</param>
/// <param name="objectName">Object Name</param>
/// <param name="uploadId"></param>
/// <param name="cancellationToken">Optional cancellation token to cancel the operation</param>
/// <returns></returns>
private IObservable<Part> ListParts(string bucketName, string objectName, string uploadId,
CancellationToken cancellationToken)
{
return Observable.Create<Part>(
async obs =>
{
var nextPartNumberMarker = 0;
var isRunning = true;
while (isRunning)
{
var uploads = await GetListPartsAsync(bucketName, objectName, uploadId, nextPartNumberMarker,
cancellationToken).ConfigureAwait(false);
foreach (var part in uploads.Item2) obs.OnNext(part);
nextPartNumberMarker = uploads.Item1.NextPartNumberMarker;
isRunning = uploads.Item1.IsTruncated;
}
});
}

/// <summary>
/// Gets the list of parts corresponding to a uploadId for given bucket and object
/// </summary>
/// <param name="bucketName">Bucket Name</param>
/// <param name="objectName">Object Name</param>
/// <param name="uploadId"></param>
/// <param name="partNumberMarker"></param>
/// <param name="cancellationToken">Optional cancellation token to cancel the operation</param>
/// <returns></returns>
private async Task<Tuple<ListPartsResult, List<Part>>> GetListPartsAsync(string bucketName, string objectName,
string uploadId, int partNumberMarker, CancellationToken cancellationToken)
{
var requestMessageBuilder = await CreateRequest(HttpMethod.Get, bucketName,
objectName)
.ConfigureAwait(false);
requestMessageBuilder.AddQueryParameter("uploadId", $"{uploadId}");
if (partNumberMarker > 0) requestMessageBuilder.AddQueryParameter("part-number-marker", $"{partNumberMarker}");
requestMessageBuilder.AddQueryParameter("max-parts", "1000");

using var response =
await ExecuteTaskAsync(NoErrorHandlers, requestMessageBuilder, cancellationToken: cancellationToken)
.ConfigureAwait(false);

using var stream = Encoding.UTF8.GetBytes(response.Content).AsMemory().AsStream();
var listPartsResult = Utils.DeserializeXml<ListPartsResult>(stream);

var root = XDocument.Parse(response.Content);
XNamespace ns = Utils.DetermineNamespace(root);

var uploads = from c in root.Root.Descendants(ns + "Part")
select new Part
{
PartNumber = int.Parse(c.Element(ns + "PartNumber").Value,
CultureInfo.CurrentCulture),
ETag = c.Element(ns + "ETag").Value.Replace("\"", string.Empty, StringComparison.OrdinalIgnoreCase),
Size = long.Parse(c.Element(ns + "Size").Value,
CultureInfo.CurrentCulture)
};

return Tuple.Create(listPartsResult, uploads.ToList());
}

/// <summary>
/// Start a new multi-part upload request
/// </summary>
/// <param name="bucketName">Bucket Name</param>
/// <param name="objectName">Object Name</param>
/// <param name="metaData"></param>
/// <param name="sseHeaders"> Server-side encryption options</param>
/// <param name="cancellationToken">Optional cancellation token to cancel the operation</param>
/// <returns></returns>
private async Task<string> NewMultipartUploadAsync(string bucketName, string objectName,
IDictionary<string, string> metaData, Dictionary<string, string> sseHeaders,
CancellationToken cancellationToken = default)
{
foreach (var kv in sseHeaders) metaData.Add(kv.Key, kv.Value);

var requestMessageBuilder = await CreateRequest(HttpMethod.Post, bucketName,
objectName, metaData).ConfigureAwait(false);
requestMessageBuilder.AddQueryParameter("uploads", "");

using var response = await ExecuteTaskAsync(NoErrorHandlers,
requestMessageBuilder, cancellationToken: cancellationToken).ConfigureAwait(false);

using var stream = response.ContentBytes.AsStream();
var newUpload = Utils.DeserializeXml<InitiateMultipartUploadResult>(stream);
return newUpload.UploadId;
}

/// <summary>
/// Advances in the stream upto currentPartSize or End of Stream
/// </summary>
Expand Down Expand Up @@ -1271,152 +1127,4 @@ internal async Task<ReadOnlyMemory<byte>> ReadFullAsync(Stream data, int current
result.Slice(i, 1).CopyTo(truncatedResult[i..]);
return truncatedResult;
}

/// <summary>
/// Create the copy request, execute it and
/// </summary>
/// <param name="bucketName">Bucket name where the object to be copied exists.</param>
/// <param name="objectName">Object name source to be copied.</param>
/// <param name="destBucketName">Bucket name where the object will be copied to.</param>
/// <param name="destObjectName">
/// Object name to be created, if not provided uses source object name as destination object
/// name.
/// </param>
/// <param name="copyConditions">
/// optionally can take a key value CopyConditions as well for conditionally attempting
/// copyObject.
/// </param>
/// <param name="customHeaders">optional custom header to specify byte range</param>
/// <param name="queryMap">optional query parameters like upload id, part number etc for copy operations</param>
/// <param name="type">Type of XML serialization to be applied on the server response</param>
/// <param name="cancellationToken">Optional cancellation token to cancel the operation</param>
/// <returns></returns>
private async Task<object> CopyObjectRequestAsync(string bucketName, string objectName, string destBucketName,
string destObjectName, CopyConditions copyConditions, IDictionary<string, string> customHeaders,
IDictionary<string, string> queryMap, Type type, CancellationToken cancellationToken)
{
// Escape source object path.
var sourceObjectPath = bucketName + "/" + Utils.UrlEncode(objectName);

// Destination object name is optional, if empty default to source object name.
destObjectName ??= objectName;

var requestMessageBuilder = await CreateRequest(HttpMethod.Put, destBucketName,
destObjectName,
customHeaders)
.ConfigureAwait(false);
if (queryMap is not null)
foreach (var query in queryMap)
requestMessageBuilder.AddQueryParameter(query.Key, query.Value);
// Set the object source
requestMessageBuilder.AddOrUpdateHeaderParameter("x-amz-copy-source", sourceObjectPath);

// If no conditions available, skip addition else add the conditions to the header
if (copyConditions is not null)
foreach (var item in copyConditions.Conditions)
requestMessageBuilder.AddOrUpdateHeaderParameter(item.Key, item.Value);

using var response =
await ExecuteTaskAsync(NoErrorHandlers, requestMessageBuilder, cancellationToken: cancellationToken)
.ConfigureAwait(false);

object copyResult = null;

if (type == typeof(CopyObjectResult))
{
using var stream = response.ContentBytes.AsStream();
copyResult = Utils.DeserializeXml<CopyObjectResult>(stream);
}

if (type == typeof(CopyPartResult))
{
using var stream = response.ContentBytes.AsStream();
copyResult = Utils.DeserializeXml<CopyPartResult>(stream);
}

return copyResult;
}

/// <summary>
/// Make a multi part copy upload for objects larger than 5GB or if CopyCondition specifies a byte range.
/// </summary>
/// <param name="bucketName">source bucket name</param>
/// <param name="objectName">source object name</param>
/// <param name="destBucketName">destination bucket name</param>
/// <param name="destObjectName">destination object name</param>
/// <param name="copyConditions">copyconditions </param>
/// <param name="copySize">size of copy upload</param>
/// <param name="metadata">optional metadata on the destination side</param>
/// <param name="sseSrc">optional Server-side encryption options</param>
/// <param name="sseDest">optional Server-side encryption options</param>
/// <param name="cancellationToken">Optional cancellation token to cancel the operation</param>
/// <returns></returns>
private async Task MultipartCopyUploadAsync(string bucketName, string objectName, string destBucketName,
string destObjectName, CopyConditions copyConditions, long copySize,
IDictionary<string, string> metadata = null, IServerSideEncryption sseSrc = null,
IServerSideEncryption sseDest = null, CancellationToken cancellationToken = default)
{
// For all sizes greater than 5GB or if Copy byte range specified in conditions and byte range larger
// than minimum part size (5 MB) do multipart.
var multiPartInfo = Utils.CalculateMultiPartSize(copySize, true);
var partSize = multiPartInfo.PartSize;
var partCount = multiPartInfo.PartCount;
var lastPartSize = multiPartInfo.LastPartSize;
var totalParts = new Part[(int)partCount];

var sseHeaders = new Dictionary<string, string>(StringComparer.Ordinal);
sseDest?.Marshal(sseHeaders);

// No need to resume upload since this is a Server-side copy. Just initiate a new upload.
var uploadId = await NewMultipartUploadAsync(destBucketName,
destObjectName, metadata, sseHeaders, cancellationToken)
.ConfigureAwait(false);

// Upload each part
var expectedReadSize = partSize;
int partNumber;
for (partNumber = 1; partNumber <= partCount; partNumber++)
{
var partCondition = copyConditions.Clone();
partCondition.byteRangeStart = ((long)partSize * (partNumber - 1)) + partCondition.byteRangeStart;
if (partNumber < partCount)
partCondition.byteRangeEnd = partCondition.byteRangeStart + (long)partSize - 1;
else
partCondition.byteRangeEnd = partCondition.byteRangeStart + (long)lastPartSize - 1;

var queryMap = new Dictionary<string, string>(StringComparer.Ordinal);
if (!string.IsNullOrEmpty(uploadId) && partNumber > 0)
{
queryMap.Add("uploadId", uploadId);
queryMap.Add("partNumber", partNumber.ToString(CultureInfo.InvariantCulture));
}

var customHeader = new Dictionary<string, string>
(StringComparer.Ordinal)
{
{
"x-amz-copy-source-range",
"bytes=" + partCondition.byteRangeStart + "-" + partCondition.byteRangeEnd
}
};

if (sseSrc is not null and SSECopy) sseSrc.Marshal(customHeader);
sseDest?.Marshal(customHeader);

var cpPartResult = (CopyPartResult)await CopyObjectRequestAsync(bucketName, objectName,
destBucketName, destObjectName, copyConditions, customHeader, queryMap, typeof(CopyPartResult),
cancellationToken).ConfigureAwait(false);

totalParts[partNumber - 1] = new Part
{
PartNumber = partNumber, ETag = cpPartResult.ETag, Size = (long)expectedReadSize
};
}

var etags = new Dictionary<int, string>();
for (partNumber = 1; partNumber <= partCount; partNumber++) etags[partNumber] = totalParts[partNumber - 1].ETag;
// Complete multi part upload
await CompleteMultipartUploadAsync(destBucketName, destObjectName,
uploadId, etags, cancellationToken).ConfigureAwait(false);
}
}
38 changes: 3 additions & 35 deletions Minio/Helper/OperationsHelper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,7 @@ private async Task<ObjectStat> GetObjectHelper(GetObjectArgs args, CancellationT
args?.Validate();
if (args.FileName is not null)
await GetObjectFileAsync(args, objStat, cancellationToken).ConfigureAwait(false);
else if (args.CallBack is not null)
await GetObjectStreamAsync(args, objStat, args.CallBack, cancellationToken).ConfigureAwait(false);
else await GetObjectStreamAsync(args, objStat, args.FuncCallBack, cancellationToken).ConfigureAwait(false);
else await GetObjectStreamAsync(args, cancellationToken).ConfigureAwait(false);
return objStat;
}

Expand Down Expand Up @@ -95,45 +93,15 @@ async Task callbackAsync(Stream stream, CancellationToken cancellationToken)
await callbackAsync(stream, cts.Token).ConfigureAwait(false);
Utils.MoveWithReplace(tempFileName, args.FileName);
});
return GetObjectStreamAsync(args, objectStat, null, cancellationToken);
return GetObjectStreamAsync(args, cancellationToken);
}

/// <summary>
/// private helper method. It returns the specified portion or full object from the bucket
/// </summary>
/// <param name="args">GetObjectArgs Arguments Object encapsulates information like - bucket name, object name etc </param>
/// <param name="objectStat">
/// ObjectStat object encapsulates information like - object name, size, etag etc, represents
/// Object Information
/// </param>
/// <param name="cb"> Action object of type Stream, callback to send Object contents, if assigned </param>
/// <param name="cancellationToken">Optional cancellation token to cancel the operation</param>
private async Task GetObjectStreamAsync(GetObjectArgs args, ObjectStat objectStat, Action<Stream> cb,
CancellationToken cancellationToken = default)
{
var requestMessageBuilder = await CreateRequest(args).ConfigureAwait(false);
using var response =
await ExecuteTaskAsync(NoErrorHandlers, requestMessageBuilder, cancellationToken: cancellationToken)
.ConfigureAwait(false);
}

/// <summary>
/// private helper method. It returns the specified portion or full object from the bucket
/// </summary>
/// <param name="args">GetObjectArgs Arguments Object encapsulates information like - bucket name, object name etc </param>
/// <param name="objectStat">
/// ObjectStat object encapsulates information like - object name, size, etag etc, represents
/// Object Information
/// </param>
/// <param name="cb">
/// Callback function to send/process Object contents using
/// async Func object which takes Stream and CancellationToken as input
/// and Task as output, if assigned
/// </param>
/// <param name="cancellationToken">Optional cancellation token to cancel the operation</param>
private async Task GetObjectStreamAsync(GetObjectArgs args, ObjectStat objectStat,
Func<Stream, CancellationToken, Task> cb,
CancellationToken cancellationToken = default)
private async Task GetObjectStreamAsync(GetObjectArgs args, CancellationToken cancellationToken = default)
{
var requestMessageBuilder = await CreateRequest(args).ConfigureAwait(false);
using var response =
Expand Down

0 comments on commit e1ef9f5

Please sign in to comment.