Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge transferjobinternal #46540

Merged
merged 4 commits into from
Oct 18, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
erase old impls
  • Loading branch information
jaschrep-msft committed Oct 9, 2024
commit 6687940d240ce980061b5c552b65d56a868973cd
Original file line number Diff line number Diff line change
Expand Up @@ -60,168 +60,5 @@ internal ServiceToServiceTransferJob(
clientDiagnostics)
{
}

/// <summary>
/// Processes the job to job parts
/// </summary>
/// <returns>An IEnumerable that contains the job parts</returns>
public override async IAsyncEnumerable<JobPartInternal> ProcessJobToJobPartAsync()
{
await OnJobStateChangedAsync(DataTransferState.InProgress).ConfigureAwait(false);
int partNumber = 0;

if (_jobParts.Count == 0)
{
// Starting brand new job
if (_isSingleResource)
{
JobPartInternal part = default;
try
{
// Single resource transfer, we can skip to chunking the job.
part = await ServiceToServiceJobPart.CreateJobPartAsync(
job: this,
partNumber: partNumber).ConfigureAwait(false);
AppendJobPart(part);
await OnAllResourcesEnumerated().ConfigureAwait(false);
}
catch (Exception ex)
{
await InvokeFailedArgAsync(ex).ConfigureAwait(false);
yield break;
}
yield return part;
}
else
{
await foreach (JobPartInternal part in GetStorageResourcesAsync().ConfigureAwait(false))
{
yield return part;
}
}
}
else
{
// Resuming old job with existing job parts
foreach (JobPartInternal part in _jobParts)
{
if (!part.JobPartStatus.HasCompletedSuccessfully)
{
part.JobPartStatus.TrySetTransferStateChange(DataTransferState.Queued);
yield return part;
}
}

if (!await _checkpointer.IsEnumerationCompleteAsync(_dataTransfer.Id, _cancellationToken).ConfigureAwait(false))
{
await foreach (JobPartInternal jobPartInternal in GetStorageResourcesAsync().ConfigureAwait(false))
{
yield return jobPartInternal;
}
}
}

// Call regardless of the outcome of enumeration so job can pause/finish
await OnEnumerationComplete().ConfigureAwait(false);
}

private async IAsyncEnumerable<JobPartInternal> GetStorageResourcesAsync()
{
// Start the partNumber based on the last part number. If this is a new job,
// the count will automatically be at 0 (the beginning).
int partNumber = _jobParts.Count;
HashSet<Uri> existingSources = GetJobPartSourceResourcePaths();
// Call listing operation on the source container
IAsyncEnumerator<StorageResource> enumerator;

// Obtain enumerator and check for any point of failure before we attempt to list
// and fail gracefully.
try
{
enumerator = _sourceResourceContainer.GetStorageResourcesAsync(
destinationContainer: _destinationResourceContainer,
cancellationToken: _cancellationToken).GetAsyncEnumerator();
}
catch (Exception ex)
{
await InvokeFailedArgAsync(ex).ConfigureAwait(false);
yield break;
}

// List the container in this specific way because MoveNext needs to be separately wrapped
// in a try/catch as we can't yield return inside a try/catch.
bool enumerationCompleted = false;
while (!enumerationCompleted)
{
try
{
_cancellationToken.ThrowIfCancellationRequested();
if (!await enumerator.MoveNextAsync().ConfigureAwait(false))
{
await OnAllResourcesEnumerated().ConfigureAwait(false);
enumerationCompleted = true;
continue;
}
}
catch (Exception ex)
{
await InvokeFailedArgAsync(ex).ConfigureAwait(false);
yield break;
}

StorageResource current = enumerator.Current;

if (current.IsContainer)
{
// Create sub-container
string containerUriPath = _sourceResourceContainer.Uri.GetPath();
string subContainerPath = string.IsNullOrEmpty(containerUriPath)
? current.Uri.GetPath()
: current.Uri.GetPath().Substring(containerUriPath.Length + 1);
StorageResourceContainer subContainer =
_destinationResourceContainer.GetChildStorageResourceContainer(subContainerPath);

try
{
await subContainer.CreateIfNotExistsAsync(_cancellationToken).ConfigureAwait(false);
}
catch (Exception ex)
{
await InvokeFailedArgAsync(ex).ConfigureAwait(false);
yield break;
}
}
else
{
if (!existingSources.Contains(current.Uri))
{
string containerUriPath = _sourceResourceContainer.Uri.GetPath();
string sourceName = string.IsNullOrEmpty(containerUriPath)
? current.Uri.GetPath()
: current.Uri.GetPath().Substring(containerUriPath.Length + 1);

JobPartInternal part;
try
{
StorageResourceItem sourceItem = (StorageResourceItem)current;
part = await ServiceToServiceJobPart.CreateJobPartAsync(
job: this,
partNumber: partNumber,
sourceResource: sourceItem,
destinationResource: _destinationResourceContainer.GetStorageResourceReference(sourceName, sourceItem.ResourceId))
.ConfigureAwait(false);
AppendJobPart(part);
}
catch (Exception ex)
{
await InvokeFailedArgAsync(ex).ConfigureAwait(false);
yield break;
}
yield return part;
partNumber++;
}
}
}
}
}
}
161 changes: 0 additions & 161 deletions sdk/storage/Azure.Storage.DataMovement/src/StreamToUriTransferJob.cs
Original file line number Diff line number Diff line change
Expand Up @@ -60,166 +60,5 @@ internal StreamToUriTransferJob(
clientDiagnostics)
{
}

/// <summary>
/// Processes the job to job parts
/// </summary>
/// <returns>An IEnumerable that contains the job parts</returns>
public override async IAsyncEnumerable<JobPartInternal> ProcessJobToJobPartAsync()
{
await OnJobStateChangedAsync(DataTransferState.InProgress).ConfigureAwait(false);
int partNumber = 0;

if (_jobParts.Count == 0)
{
// Starting brand new job
if (_isSingleResource)
{
JobPartInternal part = default;
try
{
// Single resource transfer, we can skip to chunking the job.
part = await StreamToUriJobPart.CreateJobPartAsync(
job: this,
partNumber: partNumber).ConfigureAwait(false);
AppendJobPart(part);
await OnAllResourcesEnumerated().ConfigureAwait(false);
}
catch (Exception ex)
{
await InvokeFailedArgAsync(ex).ConfigureAwait(false);
yield break;
}
yield return part;
}
else
{
await foreach (JobPartInternal part in GetStorageResourcesAsync().ConfigureAwait(false))
{
yield return part;
}
}
}
else
{
// Resuming old job with existing job parts
foreach (JobPartInternal part in _jobParts)
{
if (!part.JobPartStatus.HasCompletedSuccessfully)
{
part.JobPartStatus.TrySetTransferStateChange(DataTransferState.Queued);
yield return part;
}
}

if (!await _checkpointer.IsEnumerationCompleteAsync(_dataTransfer.Id, _cancellationToken).ConfigureAwait(false))
{
await foreach (JobPartInternal jobPartInternal in GetStorageResourcesAsync().ConfigureAwait(false))
{
yield return jobPartInternal;
}
}
}

// Call regardless of the outcome of enumeration so job can pause/finish
await OnEnumerationComplete().ConfigureAwait(false);
}

private async IAsyncEnumerable<JobPartInternal> GetStorageResourcesAsync()
{
// Start the partNumber based on the last part number. If this is a new job,
// the count will automatically be at 0 (the beginning).
int partNumber = _jobParts.Count;
HashSet<Uri> existingSources = GetJobPartSourceResourcePaths();
// Call listing operation on the source container
IAsyncEnumerator<StorageResource> enumerator;

// Obtain enumerator and check for any point of failure before we attempt to list
// and fail gracefully.
try
{
enumerator = _sourceResourceContainer.GetStorageResourcesAsync(
cancellationToken: _cancellationToken).GetAsyncEnumerator();
}
catch (Exception ex)
{
await InvokeFailedArgAsync(ex).ConfigureAwait(false);
yield break;
}

// List the container in this specific way because MoveNext needs to be separately wrapped
// in a try/catch as we can't yield return inside a try/catch.
bool enumerationCompleted = false;
while (!enumerationCompleted)
{
try
{
_cancellationToken.ThrowIfCancellationRequested();
if (!await enumerator.MoveNextAsync().ConfigureAwait(false))
{
await OnAllResourcesEnumerated().ConfigureAwait(false);
enumerationCompleted = true;
continue;
}
}
catch (Exception ex)
{
await InvokeFailedArgAsync(ex).ConfigureAwait(false);
yield break;
}

StorageResource current = enumerator.Current;

if (current.IsContainer)
{
// Create sub-container
string containerUriPath = _sourceResourceContainer.Uri.GetPath();
string subContainerPath = string.IsNullOrEmpty(containerUriPath)
? current.Uri.GetPath()
: current.Uri.GetPath().Substring(containerUriPath.Length + 1);
StorageResourceContainer subContainer =
_destinationResourceContainer.GetChildStorageResourceContainer(subContainerPath);

try
{
await subContainer.CreateIfNotExistsAsync(_cancellationToken).ConfigureAwait(false);
}
catch (Exception ex)
{
await InvokeFailedArgAsync(ex).ConfigureAwait(false);
yield break;
}
}
else
{
if (!existingSources.Contains(current.Uri))
{
string containerUriPath = _sourceResourceContainer.Uri.GetPath();
string sourceName = string.IsNullOrEmpty(containerUriPath)
? current.Uri.GetPath()
: current.Uri.GetPath().Substring(containerUriPath.Length + 1);

JobPartInternal part;
try
{
part = await StreamToUriJobPart.CreateJobPartAsync(
job: this,
partNumber: partNumber,
sourceResource: (StorageResourceItem)current,
destinationResource: _destinationResourceContainer.GetStorageResourceReference(sourceName, default))
.ConfigureAwait(false);
AppendJobPart(part);
}
catch (Exception ex)
{
await InvokeFailedArgAsync(ex).ConfigureAwait(false);
yield break;
}
yield return part;
partNumber++;
}
}
}
}
}
}
Loading