Skip to content

Commit e44945f

Browse files
authored
[Storage] [DataMovement] Adding fix for awaiting completion on a DataTransfer to prevent delay (#33621)
* Adding fix for awaiting completion on a DataTransfer to prevent delay * Remove SpinWait, utilize TaskCompletionSource instead * PR Comments * WIP - Change to run tasks on factory * Changelog * Revert change to EnsureCompleted * Run export-api * Adding lock surrounding the job part status change * Moving up status setting above calling the completion status
1 parent 228dc09 commit e44945f

File tree

109 files changed

+31083
-104
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

109 files changed

+31083
-104
lines changed

sdk/storage/Azure.Storage.DataMovement.Blobs/CHANGELOG.md

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,7 @@
11
# Release History
22

33
## 12.0.0-beta.2 (Unreleased)
4-
5-
### Features Added
6-
7-
### Breaking Changes
8-
9-
### Bugs Fixed
10-
11-
### Other Changes
4+
- This release contains bug fixes to improve quality.
125

136
## 12.0.0-beta.1 (2022-12-15)
147
- This preview is the first release of a ground-up rewrite of our client data movement

sdk/storage/Azure.Storage.DataMovement/CHANGELOG.md

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,7 @@
11
# Release History
22

33
## 12.0.0-beta.2 (Unreleased)
4-
5-
### Features Added
6-
7-
### Breaking Changes
8-
9-
### Bugs Fixed
10-
11-
### Other Changes
4+
- Fix to prevent thread starvation on the DataTransfer.AwaitCompletion
125

136
## 12.0.0-beta.1 (2022-12-15)
147
- This preview is the first release of a ground-up rewrite of our client data movement

sdk/storage/Azure.Storage.DataMovement/api/Azure.Storage.DataMovement.netstandard2.0.cs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,12 @@
11
namespace Azure.Storage.DataMovement
22
{
3-
public partial class DataTransfer : System.IAsyncDisposable
3+
public partial class DataTransfer
44
{
55
internal DataTransfer() { }
66
public bool HasCompleted { get { throw null; } }
77
public string Id { get { throw null; } }
88
public Azure.Storage.DataMovement.StorageTransferStatus TransferStatus { get { throw null; } }
99
public System.Threading.Tasks.Task AwaitCompletion(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
10-
public System.Threading.Tasks.ValueTask DisposeAsync() { throw null; }
1110
public void EnsureCompleted(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { }
1211
}
1312
[System.FlagsAttribute]

sdk/storage/Azure.Storage.DataMovement/src/JobPartInternal.cs

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ internal abstract class JobPartInternal
8585
/// The current status of each job part.
8686
/// </summary>
8787
public StorageTransferStatus JobPartStatus { get; set; }
88+
private object _statusLock = new object();
8889

8990
/// <summary>
9091
/// Optional. If the length is known, we log it instead of doing a GetProperties call on the
@@ -206,10 +207,18 @@ internal async Task TriggerCancellation(StorageTransferStatus status)
206207
/// <param name="transferStatus"></param>
207208
internal async Task OnTransferStatusChanged(StorageTransferStatus transferStatus)
208209
{
209-
if (transferStatus != StorageTransferStatus.None
210-
&& JobPartStatus != transferStatus)
210+
bool statusChanged = false;
211+
lock (_statusLock)
212+
{
213+
if (transferStatus != StorageTransferStatus.None
214+
&& JobPartStatus != transferStatus)
215+
{
216+
statusChanged = true;
217+
JobPartStatus = transferStatus;
218+
}
219+
}
220+
if (statusChanged)
211221
{
212-
JobPartStatus = transferStatus;
213222
if (JobPartStatus == StorageTransferStatus.Completed)
214223
{
215224
await InvokeSingleCompletedArg().ConfigureAwait(false);

sdk/storage/Azure.Storage.DataMovement/src/Shared/DataTransfer.cs

Lines changed: 6 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,14 @@
88
using System.Threading;
99
using System.Threading.Tasks;
1010
using Azure.Core;
11+
using Azure.Core.Pipeline;
1112

1213
namespace Azure.Storage.DataMovement
1314
{
1415
/// <summary>
1516
/// Holds transfer information
1617
/// </summary>
17-
public class DataTransfer : IAsyncDisposable
18+
public class DataTransfer
1819
{
1920
/// <summary>
2021
/// Defines whether the DataTransfer has completed.
@@ -62,56 +63,24 @@ internal DataTransfer(string id, long bytesTransferred)
6263
_state = new DataTransferState(id, bytesTransferred);
6364
}
6465

65-
/// <summary>
66-
/// Disposes the DataTransfer object.
67-
/// </summary>
68-
/// <returns></returns>
69-
public async ValueTask DisposeAsync()
70-
{
71-
await _state.DisposeAsync().ConfigureAwait(false);
72-
GC.SuppressFinalize(this);
73-
}
74-
7566
/// <summary>
7667
/// Ensures completion of the DataTransfer and attempts to get result
7768
/// </summary>
7869
public void EnsureCompleted(CancellationToken cancellationToken = default)
7970
{
80-
#if DEBUG
81-
VerifyTaskCompleted(HasCompleted);
82-
#endif
8371
#pragma warning disable AZC0102 // Do not use GetAwaiter().GetResult(). Use the TaskExtensions.EnsureCompleted() extension method instead.
84-
AwaitCompletion(cancellationToken);
72+
AwaitCompletion(cancellationToken).GetAwaiter().GetResult();
8573
#pragma warning restore AZC0102 // Do not use GetAwaiter().GetResult(). Use the TaskExtensions.EnsureCompleted() extension method instead.
8674
}
8775

88-
[Conditional("DEBUG")]
89-
private static void VerifyTaskCompleted(bool isCompleted)
90-
{
91-
if (!isCompleted)
92-
{
93-
if (Debugger.IsAttached)
94-
{
95-
Debugger.Break();
96-
}
97-
}
98-
}
99-
10076
/// <summary>
10177
/// Waits until the data transfer itself has completed
10278
/// </summary>
10379
/// <param name="cancellationToken"></param>
104-
public Task AwaitCompletion(CancellationToken cancellationToken = default)
80+
public async Task AwaitCompletion(CancellationToken cancellationToken = default)
10581
{
106-
while (!HasCompleted)
107-
{
108-
#if DEBUG
109-
VerifyTaskCompleted(HasCompleted);
110-
#endif
111-
CancellationHelper.ThrowIfCancellationRequested(cancellationToken);
112-
}
113-
114-
return Task.CompletedTask;
82+
cancellationToken.Register(() => _state._completionSource.TrySetCanceled(cancellationToken), useSynchronizationContext: false);
83+
await _state._completionSource.Task.ConfigureAwait(false);
11584
}
11685
}
11786
}

sdk/storage/Azure.Storage.DataMovement/src/Shared/DataTransferState.cs

Lines changed: 39 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
using System;
55
using System.Collections.Generic;
6+
using System.Runtime.InteropServices.ComTypes;
67
using System.Threading;
78
using System.Threading.Tasks;
89
using Azure.Core;
@@ -13,28 +14,25 @@ namespace Azure.Storage.DataMovement
1314
/// <summary>
1415
/// Defines the state of the transfer
1516
/// </summary>
16-
internal class DataTransferState : IAsyncDisposable
17+
internal class DataTransferState
1718
{
18-
// To detect redundant calls
19-
private bool _disposedValue;
20-
19+
private readonly object _statusLock = new object();
2120
private string _id;
2221
private StorageTransferStatus _status;
22+
2323
private long _currentTransferredBytes;
24-
private SemaphoreSlim _statusSemaphore;
24+
private object _lockCurrentBytes = new object();
25+
26+
public TaskCompletionSource<StorageTransferStatus> _completionSource;
2527

2628
public StorageTransferStatus Status => _status;
2729

2830
/// <summary>
2931
/// constructor
3032
/// </summary>
3133
public DataTransferState()
34+
: this(StorageTransferStatus.Queued)
3235
{
33-
_disposedValue = false;
34-
_statusSemaphore = new SemaphoreSlim(1, 1);
35-
_id = Guid.NewGuid().ToString();
36-
_status = StorageTransferStatus.Queued;
37-
_currentTransferredBytes = 0;
3836
}
3937

4038
/// <summary>
@@ -45,6 +43,15 @@ public DataTransferState(StorageTransferStatus status)
4543
_id = Guid.NewGuid().ToString();
4644
_status = status;
4745
_currentTransferredBytes = 0;
46+
_completionSource = new TaskCompletionSource<StorageTransferStatus>(
47+
_status,
48+
TaskCreationOptions.RunContinuationsAsynchronously);
49+
if (StorageTransferStatus.Completed == status ||
50+
StorageTransferStatus.CompletedWithSkippedTransfers == status ||
51+
StorageTransferStatus.CompletedWithFailedTransfers == status)
52+
{
53+
_completionSource.TrySetResult(status);
54+
}
4855
}
4956

5057
/// <summary>
@@ -55,20 +62,9 @@ public DataTransferState(string id, long bytesTransferred)
5562
_id = id;
5663
_status = StorageTransferStatus.Queued;
5764
_currentTransferredBytes = bytesTransferred;
58-
}
59-
60-
public async ValueTask DisposeAsync()
61-
{
62-
if (!_disposedValue)
63-
{
64-
_disposedValue = true;
65-
if (_statusSemaphore.CurrentCount == 0)
66-
{
67-
await _statusSemaphore.WaitAsync().ConfigureAwait(false);
68-
_statusSemaphore.Release();
69-
}
70-
_statusSemaphore.Dispose();
71-
}
65+
_completionSource = new TaskCompletionSource<StorageTransferStatus>(
66+
_status,
67+
TaskCreationOptions.RunContinuationsAsynchronously);
7268
}
7369

7470
/// <summary>
@@ -111,16 +107,23 @@ public void SetId(string id)
111107
/// Sets the completion status
112108
/// </summary>
113109
/// <param name="status"></param>
114-
public async Task SetTransferStatus(StorageTransferStatus status)
110+
public void SetTransferStatus(StorageTransferStatus status)
115111
{
116-
if (!_disposedValue)
112+
lock (_statusLock)
117113
{
118-
await _statusSemaphore.WaitAsync().ConfigureAwait(false);
119114
if (_status != status)
120115
{
121116
_status = status;
117+
if (StorageTransferStatus.Completed == status ||
118+
StorageTransferStatus.CompletedWithSkippedTransfers == status ||
119+
StorageTransferStatus.CompletedWithFailedTransfers == status)
120+
{
121+
// If the _completionSource has been cancelled or the exception
122+
// has been set, we don't need to check if TrySetResult returns false
123+
// because it's acceptable to cancel or have an error occur before then.
124+
_completionSource.TrySetResult(status);
125+
}
122126
}
123-
_statusSemaphore.Release();
124127
}
125128
}
126129

@@ -129,7 +132,10 @@ public async Task SetTransferStatus(StorageTransferStatus status)
129132
/// </summary>
130133
public void ResetTransferredBytes()
131134
{
132-
Volatile.Write(ref _currentTransferredBytes, 0);
135+
lock (_lockCurrentBytes)
136+
{
137+
Volatile.Write(ref _currentTransferredBytes, 0);
138+
}
133139
}
134140

135141
/// <summary>
@@ -138,7 +144,10 @@ public void ResetTransferredBytes()
138144
/// <param name="transferredBytes"></param>
139145
public void UpdateTransferBytes(long transferredBytes)
140146
{
141-
Interlocked.Add(ref _currentTransferredBytes, transferredBytes);
147+
lock (_lockCurrentBytes)
148+
{
149+
Interlocked.Add(ref _currentTransferredBytes, transferredBytes);
150+
}
142151
}
143152
}
144153
}

sdk/storage/Azure.Storage.DataMovement/src/TransferJobInternal.cs

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,7 @@ internal abstract class TransferJobInternal : IDisposable
9999
/// Transfer Status for the job.
100100
/// </summary>
101101
internal StorageTransferStatus _transferStatus;
102+
private object _statusLock = new object();
102103

103104
/// <summary>
104105
/// To help set the job status when all job parts have completed.
@@ -314,10 +315,19 @@ public async Task JobPartEvent(TransferStatusEventArgs args)
314315

315316
public async Task OnJobStatusChangedAsync(StorageTransferStatus status)
316317
{
317-
//TODO: change to RaiseAsync after implementing ClientDiagnostics for TransferManager
318-
if (_transferStatus != status)
318+
bool statusChanged = false;
319+
lock (_statusLock)
320+
{
321+
//TODO: change to RaiseAsync after implementing ClientDiagnostics for TransferManager
322+
if (_transferStatus != status)
323+
{
324+
statusChanged = true;
325+
_transferStatus = status;
326+
}
327+
_dataTransfer._state.SetTransferStatus(status);
328+
}
329+
if (statusChanged)
319330
{
320-
_transferStatus = status;
321331
if (TransferStatusEventHandler != null)
322332
{
323333
await TransferStatusEventHandler.Invoke(
@@ -327,7 +337,6 @@ await TransferStatusEventHandler.Invoke(
327337
isRunningSynchronously: false,
328338
cancellationToken: _cancellationTokenSource.Token)).ConfigureAwait(false);
329339
}
330-
await _dataTransfer._state.SetTransferStatus(status).ConfigureAwait(false);
331340
}
332341
}
333342

sdk/storage/Azure.Storage.DataMovement/tests/DataTransferTests.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ public void EnsureCompleted_CancellationToken()
4141

4242
TestHelper.AssertExpectedException(
4343
() => transfer.EnsureCompleted(cancellationTokenSource.Token),
44-
new TaskCanceledException("The operation was canceled."));
44+
new TaskCanceledException("A task was canceled."));
4545
}
4646

4747
[Test]
@@ -63,7 +63,7 @@ public async Task AwaitCompletion_CancellationToken()
6363
}
6464
catch (TaskCanceledException exception)
6565
{
66-
Assert.AreEqual(exception.Message, "The operation was canceled.");
66+
Assert.AreEqual(exception.Message, "A task was canceled.");
6767
}
6868
}
6969
}

0 commit comments

Comments
 (0)