Skip to content

Commit a38d0c2

Browse files
authored
FileStream rewrite: Use IValueTaskSource instead of TaskCompletionSource (#50802)
* Remove IFileStreamCompletionStrategy. This makes FileStreamCompletionSource only needed by Net5CompatFileStreamStrategy. * Use AwaitableProvider for AsyncWindowsFileStreamStrategy. * No need for ManualResetValueTaskSource class. Remove it to get rid of one allocation and get the actual profiling improvement. Rename FileStreamAwaitableProvider. * Bring back ReadAsync/WriteAsync code to the strategy for simpler code review. Shorter filename for awaitable provider. * FileStreamCompletionSource can now be nested in Net5CompatFileStreamStrategy, and its _fileHandle can now be private. * Remove duplicate error definitions, use centralized Interop.Errors. * Move shared mask values to FileStreamHelpers to avoid duplication. * Use Interop.Errors also in SyncWindowsFileStreamStrategy. * Move misplaced comment, move method location. * Slightly better comment. * Rename FileStreamAwaitableProvider to FileStreamValueTaskSource to keep in sync with the name of FileStreamCompletionSource. * Bring back the raw pointer intOverlapped. * Rename MemoryAwaitableProvider to MemoryFileStreamValueTaskSource. * Remove numBufferedBytes and avoid unnecessary allocation in Read(byte[],int,int). * Rename files of nested types. * Nested async result codes in static class. * Address feedback. * Address suggestions * Bring back RunContinuationAsynchronously=true Co-authored-by: carlossanlop <carlossanlop@users.noreply.github.com>
1 parent 79ad10c commit a38d0c2

9 files changed

+579
-355
lines changed

src/libraries/System.Private.CoreLib/src/System.Private.CoreLib.Shared.projitems

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1104,10 +1104,10 @@
11041104
<Compile Include="$(CommonPath)Interop\Interop.ResultCode.cs">
11051105
<Link>Common\Interop\Interop.ResultCode.cs</Link>
11061106
</Compile>
1107-
<Compile Include="$(CommonPath)Interop\Interop.TimeZoneDisplayNameType.cs" Condition="'$(TargetsBrowser)' != 'true'" >
1107+
<Compile Include="$(CommonPath)Interop\Interop.TimeZoneDisplayNameType.cs" Condition="'$(TargetsBrowser)' != 'true'">
11081108
<Link>Common\Interop\Interop.TimeZoneDisplayNameType.cs</Link>
11091109
</Compile>
1110-
<Compile Include="$(CommonPath)Interop\Interop.TimeZoneInfo.cs" Condition="'$(TargetsBrowser)' != 'true'" >
1110+
<Compile Include="$(CommonPath)Interop\Interop.TimeZoneInfo.cs" Condition="'$(TargetsBrowser)' != 'true'">
11111111
<Link>Common\Interop\Interop.TimeZoneInfo.cs</Link>
11121112
</Compile>
11131113
<Compile Include="$(CommonPath)Interop\Interop.Utils.cs">
@@ -1656,8 +1656,9 @@
16561656
<Compile Include="$(MSBuildThisFileDirectory)System\IO\PathHelper.Windows.cs" />
16571657
<Compile Include="$(MSBuildThisFileDirectory)System\IO\PathInternal.Windows.cs" />
16581658
<Compile Include="$(MSBuildThisFileDirectory)System\IO\Strategies\AsyncWindowsFileStreamStrategy.cs" />
1659+
<Compile Include="$(MSBuildThisFileDirectory)System\IO\Strategies\AsyncWindowsFileStreamStrategy.ValueTaskSource.cs" />
16591660
<Compile Include="$(MSBuildThisFileDirectory)System\IO\Strategies\FileStreamHelpers.Windows.cs" />
1660-
<Compile Include="$(MSBuildThisFileDirectory)System\IO\Strategies\FileStreamCompletionSource.Win32.cs" />
1661+
<Compile Include="$(MSBuildThisFileDirectory)System\IO\Strategies\Net5CompatFileStreamStrategy.CompletionSource.Windows.cs" />
16611662
<Compile Include="$(MSBuildThisFileDirectory)System\IO\Strategies\Net5CompatFileStreamStrategy.Windows.cs" />
16621663
<Compile Include="$(MSBuildThisFileDirectory)System\IO\Strategies\SyncWindowsFileStreamStrategy.cs" />
16631664
<Compile Include="$(MSBuildThisFileDirectory)System\IO\Strategies\WindowsFileStreamStrategy.cs" />
Lines changed: 251 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,251 @@
1+
// Licensed to the .NET Foundation under one or more agreements.
2+
// The .NET Foundation licenses this file to you under the MIT license.
3+
4+
using System.Buffers;
5+
using System.Diagnostics;
6+
using System.Runtime.InteropServices;
7+
using System.Threading;
8+
using System.Threading.Tasks.Sources;
9+
using TaskSourceCodes = System.IO.Strategies.FileStreamHelpers.TaskSourceCodes;
10+
11+
namespace System.IO.Strategies
12+
{
13+
internal sealed partial class AsyncWindowsFileStreamStrategy : WindowsFileStreamStrategy
14+
{
15+
/// <summary>
16+
/// Type that helps reduce allocations for FileStream.ReadAsync and FileStream.WriteAsync.
17+
/// </summary>
18+
private unsafe class ValueTaskSource : IValueTaskSource<int>, IValueTaskSource
19+
{
20+
internal static readonly IOCompletionCallback s_ioCallback = IOCallback;
21+
22+
private readonly AsyncWindowsFileStreamStrategy _strategy;
23+
24+
private ManualResetValueTaskSourceCore<int> _source; // mutable struct; do not make this readonly
25+
private NativeOverlapped* _overlapped;
26+
private CancellationTokenRegistration _cancellationRegistration;
27+
private long _result; // Using long since this needs to be used in Interlocked APIs
28+
#if DEBUG
29+
private bool _cancellationHasBeenRegistered;
30+
#endif
31+
32+
public static ValueTaskSource Create(
33+
AsyncWindowsFileStreamStrategy strategy,
34+
PreAllocatedOverlapped? preallocatedOverlapped,
35+
ReadOnlyMemory<byte> memory)
36+
{
37+
// If the memory passed in is the strategy's internal buffer, we can use the base AwaitableProvider,
38+
// which has a PreAllocatedOverlapped with the memory already pinned. Otherwise, we use the derived
39+
// MemoryAwaitableProvider, which Retains the memory, which will result in less pinning in the case
40+
// where the underlying memory is backed by pre-pinned buffers.
41+
return preallocatedOverlapped != null &&
42+
MemoryMarshal.TryGetArray(memory, out ArraySegment<byte> buffer) &&
43+
preallocatedOverlapped.IsUserObject(buffer.Array) ?
44+
new ValueTaskSource(strategy, preallocatedOverlapped, buffer.Array) :
45+
new MemoryValueTaskSource(strategy, memory);
46+
}
47+
48+
protected ValueTaskSource(
49+
AsyncWindowsFileStreamStrategy strategy,
50+
PreAllocatedOverlapped? preallocatedOverlapped,
51+
byte[]? bytes)
52+
{
53+
_strategy = strategy;
54+
_result = TaskSourceCodes.NoResult;
55+
56+
_source = default;
57+
_source.RunContinuationsAsynchronously = true;
58+
59+
_overlapped = bytes != null &&
60+
_strategy.CompareExchangeCurrentOverlappedOwner(this, null) == null ?
61+
_strategy._fileHandle.ThreadPoolBinding!.AllocateNativeOverlapped(preallocatedOverlapped!) : // allocated when buffer was created, and buffer is non-null
62+
_strategy._fileHandle.ThreadPoolBinding!.AllocateNativeOverlapped(s_ioCallback, this, bytes);
63+
64+
Debug.Assert(_overlapped != null, "AllocateNativeOverlapped returned null");
65+
}
66+
67+
internal NativeOverlapped* Overlapped => _overlapped;
68+
public ValueTaskSourceStatus GetStatus(short token) => _source.GetStatus(token);
69+
public void OnCompleted(Action<object?> continuation, object? state, short token, ValueTaskSourceOnCompletedFlags flags) => _source.OnCompleted(continuation, state, token, flags);
70+
void IValueTaskSource.GetResult(short token) => _source.GetResult(token);
71+
int IValueTaskSource<int>.GetResult(short token) => _source.GetResult(token);
72+
internal short Version => _source.Version;
73+
74+
internal void RegisterForCancellation(CancellationToken cancellationToken)
75+
{
76+
#if DEBUG
77+
Debug.Assert(cancellationToken.CanBeCanceled);
78+
Debug.Assert(!_cancellationHasBeenRegistered, "Cannot register for cancellation twice");
79+
_cancellationHasBeenRegistered = true;
80+
#endif
81+
82+
// Quick check to make sure the IO hasn't completed
83+
if (_overlapped != null)
84+
{
85+
// Register the cancellation only if the IO hasn't completed
86+
long packedResult = Interlocked.CompareExchange(ref _result, TaskSourceCodes.RegisteringCancellation, TaskSourceCodes.NoResult);
87+
if (packedResult == TaskSourceCodes.NoResult)
88+
{
89+
_cancellationRegistration = cancellationToken.UnsafeRegister((s, token) => Cancel(token), this);
90+
91+
// Switch the result, just in case IO completed while we were setting the registration
92+
packedResult = Interlocked.Exchange(ref _result, TaskSourceCodes.NoResult);
93+
}
94+
else if (packedResult != TaskSourceCodes.CompletedCallback)
95+
{
96+
// Failed to set the result, IO is in the process of completing
97+
// Attempt to take the packed result
98+
packedResult = Interlocked.Exchange(ref _result, TaskSourceCodes.NoResult);
99+
}
100+
101+
// If we have a callback that needs to be completed
102+
if ((packedResult != TaskSourceCodes.NoResult) && (packedResult != TaskSourceCodes.CompletedCallback) && (packedResult != TaskSourceCodes.RegisteringCancellation))
103+
{
104+
CompleteCallback((ulong)packedResult);
105+
}
106+
}
107+
}
108+
109+
internal virtual void ReleaseNativeResource()
110+
{
111+
// Ensure that cancellation has been completed and cleaned up.
112+
_cancellationRegistration.Dispose();
113+
114+
// Free the overlapped.
115+
// NOTE: The cancellation must *NOT* be running at this point, or it may observe freed memory
116+
// (this is why we disposed the registration above).
117+
if (_overlapped != null)
118+
{
119+
_strategy._fileHandle.ThreadPoolBinding!.FreeNativeOverlapped(_overlapped);
120+
_overlapped = null;
121+
}
122+
123+
// Ensure we're no longer set as the current AwaitableProvider (we may not have been to begin with).
124+
// Only one operation at a time is eligible to use the preallocated overlapped
125+
_strategy.CompareExchangeCurrentOverlappedOwner(null, this);
126+
}
127+
128+
private static void IOCallback(uint errorCode, uint numBytes, NativeOverlapped* pOverlapped)
129+
{
130+
// Extract the AwaitableProvider from the overlapped. The state in the overlapped
131+
// will either be a AsyncWindowsFileStreamStrategy (in the case where the preallocated overlapped was used),
132+
// in which case the operation being completed is its _currentOverlappedOwner, or it'll
133+
// be directly the AwaitableProvider that's completing (in the case where the preallocated
134+
// overlapped was already in use by another operation).
135+
object? state = ThreadPoolBoundHandle.GetNativeOverlappedState(pOverlapped);
136+
Debug.Assert(state is AsyncWindowsFileStreamStrategy or ValueTaskSource);
137+
ValueTaskSource valueTaskSource = state switch
138+
{
139+
AsyncWindowsFileStreamStrategy strategy => strategy._currentOverlappedOwner!, // must be owned
140+
_ => (ValueTaskSource)state
141+
};
142+
Debug.Assert(valueTaskSource != null);
143+
Debug.Assert(valueTaskSource._overlapped == pOverlapped, "Overlaps don't match");
144+
145+
// Handle reading from & writing to closed pipes. While I'm not sure
146+
// this is entirely necessary anymore, maybe it's possible for
147+
// an async read on a pipe to be issued and then the pipe is closed,
148+
// returning this error. This may very well be necessary.
149+
ulong packedResult;
150+
if (errorCode != 0 && errorCode != Interop.Errors.ERROR_BROKEN_PIPE && errorCode != Interop.Errors.ERROR_NO_DATA)
151+
{
152+
packedResult = ((ulong)TaskSourceCodes.ResultError | errorCode);
153+
}
154+
else
155+
{
156+
packedResult = ((ulong)TaskSourceCodes.ResultSuccess | numBytes);
157+
}
158+
159+
// Stow the result so that other threads can observe it
160+
// And, if no other thread is registering cancellation, continue
161+
if (Interlocked.Exchange(ref valueTaskSource._result, (long)packedResult) == TaskSourceCodes.NoResult)
162+
{
163+
// Successfully set the state, attempt to take back the callback
164+
if (Interlocked.Exchange(ref valueTaskSource._result, TaskSourceCodes.CompletedCallback) != TaskSourceCodes.NoResult)
165+
{
166+
// Successfully got the callback, finish the callback
167+
valueTaskSource.CompleteCallback(packedResult);
168+
}
169+
// else: Some other thread stole the result, so now it is responsible to finish the callback
170+
}
171+
// else: Some other thread is registering a cancellation, so it *must* finish the callback
172+
}
173+
174+
private void CompleteCallback(ulong packedResult)
175+
{
176+
CancellationToken cancellationToken = _cancellationRegistration.Token;
177+
178+
ReleaseNativeResource();
179+
180+
// Unpack the result and send it to the user
181+
long result = (long)(packedResult & TaskSourceCodes.ResultMask);
182+
if (result == TaskSourceCodes.ResultError)
183+
{
184+
int errorCode = unchecked((int)(packedResult & uint.MaxValue));
185+
Exception e;
186+
if (errorCode == Interop.Errors.ERROR_OPERATION_ABORTED)
187+
{
188+
CancellationToken ct = cancellationToken.IsCancellationRequested ? cancellationToken : new CancellationToken(canceled: true);
189+
e = new OperationCanceledException(ct);
190+
}
191+
else
192+
{
193+
e = Win32Marshal.GetExceptionForWin32Error(errorCode);
194+
}
195+
e.SetCurrentStackTrace();
196+
_source.SetException(e);
197+
}
198+
else
199+
{
200+
Debug.Assert(result == TaskSourceCodes.ResultSuccess, "Unknown result");
201+
_source.SetResult((int)(packedResult & uint.MaxValue));
202+
}
203+
}
204+
205+
private void Cancel(CancellationToken token)
206+
{
207+
// WARNING: This may potentially be called under a lock (during cancellation registration)
208+
Debug.Assert(_overlapped != null && GetStatus(Version) != ValueTaskSourceStatus.Succeeded, "IO should not have completed yet");
209+
210+
// If the handle is still valid, attempt to cancel the IO
211+
if (!_strategy._fileHandle.IsInvalid &&
212+
!Interop.Kernel32.CancelIoEx(_strategy._fileHandle, _overlapped))
213+
{
214+
int errorCode = Marshal.GetLastWin32Error();
215+
216+
// ERROR_NOT_FOUND is returned if CancelIoEx cannot find the request to cancel.
217+
// This probably means that the IO operation has completed.
218+
if (errorCode != Interop.Errors.ERROR_NOT_FOUND)
219+
{
220+
Exception e = new OperationCanceledException(SR.OperationCanceled, Win32Marshal.GetExceptionForWin32Error(errorCode), token);
221+
e.SetCurrentStackTrace();
222+
_source.SetException(e);
223+
}
224+
}
225+
}
226+
}
227+
228+
/// <summary>
229+
/// Extends <see cref="ValueTaskSource"/> with to support disposing of a
230+
/// <see cref="MemoryHandle"/> when the operation has completed. This should only be used
231+
/// when memory doesn't wrap a byte[].
232+
/// </summary>
233+
private sealed class MemoryValueTaskSource : ValueTaskSource
234+
{
235+
private MemoryHandle _handle; // mutable struct; do not make this readonly
236+
237+
// this type handles the pinning, so bytes are null
238+
internal unsafe MemoryValueTaskSource(AsyncWindowsFileStreamStrategy strategy, ReadOnlyMemory<byte> memory)
239+
: base(strategy, null, null) // this type handles the pinning, so null is passed for bytes to the base
240+
{
241+
_handle = memory.Pin();
242+
}
243+
244+
internal override void ReleaseNativeResource()
245+
{
246+
_handle.Dispose();
247+
base.ReleaseNativeResource();
248+
}
249+
}
250+
}
251+
}

0 commit comments

Comments
 (0)