Skip to content
This repository was archived by the owner on Jan 23, 2023. It is now read-only.

Commit 80c2952

Browse files
committed
Merge pull request #2724 from stephentoub/readwrite_perf
Several Stream.Read/WriteAsync improvements
2 parents 4c1ed41 + 22c5cb0 commit 80c2952

File tree

6 files changed

+192
-76
lines changed

6 files changed

+192
-76
lines changed

src/mscorlib/src/System/IO/Stream.cs

Lines changed: 99 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -289,11 +289,13 @@ protected virtual WaitHandle CreateWaitHandle()
289289
public virtual IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, Object state)
290290
{
291291
Contract.Ensures(Contract.Result<IAsyncResult>() != null);
292-
return BeginReadInternal(buffer, offset, count, callback, state, serializeAsynchronously: false);
292+
return BeginReadInternal(buffer, offset, count, callback, state, serializeAsynchronously: false, apm: true);
293293
}
294294

295295
[HostProtection(ExternalThreading = true)]
296-
internal IAsyncResult BeginReadInternal(byte[] buffer, int offset, int count, AsyncCallback callback, Object state, bool serializeAsynchronously)
296+
internal IAsyncResult BeginReadInternal(
297+
byte[] buffer, int offset, int count, AsyncCallback callback, Object state,
298+
bool serializeAsynchronously, bool apm)
297299
{
298300
Contract.Ensures(Contract.Result<IAsyncResult>() != null);
299301
if (!CanRead) __Error.ReadNotSupported();
@@ -326,18 +328,31 @@ internal IAsyncResult BeginReadInternal(byte[] buffer, int offset, int count, As
326328

327329
// Create the task to asynchronously do a Read. This task serves both
328330
// as the asynchronous work item and as the IAsyncResult returned to the user.
329-
var asyncResult = new ReadWriteTask(true /*isRead*/, delegate
331+
var asyncResult = new ReadWriteTask(true /*isRead*/, apm, delegate
330332
{
331333
// The ReadWriteTask stores all of the parameters to pass to Read.
332334
// As we're currently inside of it, we can get the current task
333335
// and grab the parameters from it.
334336
var thisTask = Task.InternalCurrent as ReadWriteTask;
335337
Contract.Assert(thisTask != null, "Inside ReadWriteTask, InternalCurrent should be the ReadWriteTask");
336338

337-
// Do the Read and return the number of bytes read
338-
var bytesRead = thisTask._stream.Read(thisTask._buffer, thisTask._offset, thisTask._count);
339-
thisTask.ClearBeginState(); // just to help alleviate some memory pressure
340-
return bytesRead;
339+
try
340+
{
341+
// Do the Read and return the number of bytes read
342+
return thisTask._stream.Read(thisTask._buffer, thisTask._offset, thisTask._count);
343+
}
344+
finally
345+
{
346+
// If this implementation is part of Begin/EndXx, then the EndXx method will handle
347+
// finishing the async operation. However, if this is part of XxAsync, then there won't
348+
// be an end method, and this task is responsible for cleaning up.
349+
if (!thisTask._apm)
350+
{
351+
thisTask._stream.FinishTrackingAsyncOperation();
352+
}
353+
354+
thisTask.ClearBeginState(); // just to help alleviate some memory pressure
355+
}
341356
}, state, this, buffer, offset, count, callback);
342357

343358
// Schedule it
@@ -388,9 +403,7 @@ public virtual int EndRead(IAsyncResult asyncResult)
388403
}
389404
finally
390405
{
391-
_activeReadWriteTask = null;
392-
Contract.Assert(_asyncActiveSemaphore != null, "Must have been initialized in order to get here.");
393-
_asyncActiveSemaphore.Release();
406+
FinishTrackingAsyncOperation();
394407
}
395408
#endif
396409
}
@@ -413,8 +426,20 @@ public virtual Task<int> ReadAsync(Byte[] buffer, int offset, int count, Cancell
413426
: BeginEndReadAsync(buffer, offset, count);
414427
}
415428

429+
[System.Security.SecuritySafeCritical]
430+
[MethodImplAttribute(MethodImplOptions.InternalCall)]
431+
private extern bool HasOverriddenBeginEndRead();
432+
416433
private Task<Int32> BeginEndReadAsync(Byte[] buffer, Int32 offset, Int32 count)
417-
{
434+
{
435+
if (!HasOverriddenBeginEndRead())
436+
{
437+
// If the Stream does not override Begin/EndRead, then we can take an optimized path
438+
// that skips an extra layer of tasks / IAsyncResults.
439+
return (Task<Int32>)BeginReadInternal(buffer, offset, count, null, null, serializeAsynchronously: true, apm: false);
440+
}
441+
442+
// Otherwise, we need to wrap calls to Begin/EndWrite to ensure we use the derived type's functionality.
418443
return TaskFactory<Int32>.FromAsyncTrim(
419444
this, new ReadWriteParameters { Buffer = buffer, Offset = offset, Count = count },
420445
(stream, args, callback, state) => stream.BeginRead(args.Buffer, args.Offset, args.Count, callback, state), // cached by compiler
@@ -434,11 +459,13 @@ private Task<Int32> BeginEndReadAsync(Byte[] buffer, Int32 offset, Int32 count)
434459
public virtual IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback callback, Object state)
435460
{
436461
Contract.Ensures(Contract.Result<IAsyncResult>() != null);
437-
return BeginWriteInternal(buffer, offset, count, callback, state, serializeAsynchronously: false);
462+
return BeginWriteInternal(buffer, offset, count, callback, state, serializeAsynchronously: false, apm: true);
438463
}
439464

440465
[HostProtection(ExternalThreading = true)]
441-
internal IAsyncResult BeginWriteInternal(byte[] buffer, int offset, int count, AsyncCallback callback, Object state, bool serializeAsynchronously)
466+
internal IAsyncResult BeginWriteInternal(
467+
byte[] buffer, int offset, int count, AsyncCallback callback, Object state,
468+
bool serializeAsynchronously, bool apm)
442469
{
443470
Contract.Ensures(Contract.Result<IAsyncResult>() != null);
444471
if (!CanWrite) __Error.WriteNotSupported();
@@ -470,18 +497,32 @@ internal IAsyncResult BeginWriteInternal(byte[] buffer, int offset, int count, A
470497

471498
// Create the task to asynchronously do a Write. This task serves both
472499
// as the asynchronous work item and as the IAsyncResult returned to the user.
473-
var asyncResult = new ReadWriteTask(false /*isRead*/, delegate
500+
var asyncResult = new ReadWriteTask(false /*isRead*/, apm, delegate
474501
{
475502
// The ReadWriteTask stores all of the parameters to pass to Write.
476503
// As we're currently inside of it, we can get the current task
477504
// and grab the parameters from it.
478505
var thisTask = Task.InternalCurrent as ReadWriteTask;
479506
Contract.Assert(thisTask != null, "Inside ReadWriteTask, InternalCurrent should be the ReadWriteTask");
480507

481-
// Do the Write
482-
thisTask._stream.Write(thisTask._buffer, thisTask._offset, thisTask._count);
483-
thisTask.ClearBeginState(); // just to help alleviate some memory pressure
484-
return 0; // not used, but signature requires a value be returned
508+
try
509+
{
510+
// Do the Write
511+
thisTask._stream.Write(thisTask._buffer, thisTask._offset, thisTask._count);
512+
return 0; // not used, but signature requires a value be returned
513+
}
514+
finally
515+
{
516+
// If this implementation is part of Begin/EndXx, then the EndXx method will handle
517+
// finishing the async operation. However, if this is part of XxAsync, then there won't
518+
// be an end method, and this task is responsible for cleaning up.
519+
if (!thisTask._apm)
520+
{
521+
thisTask._stream.FinishTrackingAsyncOperation();
522+
}
523+
524+
thisTask.ClearBeginState(); // just to help alleviate some memory pressure
525+
}
485526
}, state, this, buffer, offset, count, callback);
486527

487528
// Schedule it
@@ -501,23 +542,19 @@ private void RunReadWriteTaskWhenReady(Task asyncWaiter, ReadWriteTask readWrite
501542
// preconditions in async methods that await.
502543
Contract.Assert(asyncWaiter != null); // Ditto
503544

504-
// If the wait has already complete, run the task.
545+
// If the wait has already completed, run the task.
505546
if (asyncWaiter.IsCompleted)
506547
{
507548
Contract.Assert(asyncWaiter.IsRanToCompletion, "The semaphore wait should always complete successfully.");
508549
RunReadWriteTask(readWriteTask);
509550
}
510551
else // Otherwise, wait for our turn, and then run the task.
511552
{
512-
asyncWaiter.ContinueWith((t, state) =>
513-
{
514-
Contract.Assert(t.IsRanToCompletion, "The semaphore wait should always complete successfully.");
515-
var tuple = (Tuple<Stream,ReadWriteTask>)state;
516-
tuple.Item1.RunReadWriteTask(tuple.Item2); // RunReadWriteTask(readWriteTask);
517-
}, Tuple.Create<Stream,ReadWriteTask>(this, readWriteTask),
518-
default(CancellationToken),
519-
TaskContinuationOptions.ExecuteSynchronously,
520-
TaskScheduler.Default);
553+
asyncWaiter.ContinueWith((t, state) => {
554+
Contract.Assert(t.IsRanToCompletion, "The semaphore wait should always complete successfully.");
555+
var rwt = (ReadWriteTask)state;
556+
rwt._stream.RunReadWriteTask(rwt); // RunReadWriteTask(readWriteTask);
557+
}, readWriteTask, default(CancellationToken), TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default);
521558
}
522559
}
523560

@@ -534,6 +571,13 @@ private void RunReadWriteTask(ReadWriteTask readWriteTask)
534571
readWriteTask.m_taskScheduler = TaskScheduler.Default;
535572
readWriteTask.ScheduleAndStart(needsProtection: false);
536573
}
574+
575+
private void FinishTrackingAsyncOperation()
576+
{
577+
_activeReadWriteTask = null;
578+
Contract.Assert(_asyncActiveSemaphore != null, "Must have been initialized in order to get here.");
579+
_asyncActiveSemaphore.Release();
580+
}
537581
#endif
538582

539583
public virtual void EndWrite(IAsyncResult asyncResult)
@@ -574,9 +618,7 @@ public virtual void EndWrite(IAsyncResult asyncResult)
574618
}
575619
finally
576620
{
577-
_activeReadWriteTask = null;
578-
Contract.Assert(_asyncActiveSemaphore != null, "Must have been initialized in order to get here.");
579-
_asyncActiveSemaphore.Release();
621+
FinishTrackingAsyncOperation();
580622
}
581623
#endif
582624
}
@@ -600,11 +642,12 @@ public virtual void EndWrite(IAsyncResult asyncResult)
600642
// with a single allocation.
601643
private sealed class ReadWriteTask : Task<int>, ITaskCompletionAction
602644
{
603-
internal readonly bool _isRead;
645+
internal readonly bool _isRead;
646+
internal readonly bool _apm; // true if this is from Begin/EndXx; false if it's from XxAsync
604647
internal Stream _stream;
605648
internal byte [] _buffer;
606-
internal int _offset;
607-
internal int _count;
649+
internal readonly int _offset;
650+
internal readonly int _count;
608651
private AsyncCallback _callback;
609652
private ExecutionContext _context;
610653

@@ -618,6 +661,7 @@ private sealed class ReadWriteTask : Task<int>, ITaskCompletionAction
618661
[MethodImpl(MethodImplOptions.NoInlining)]
619662
public ReadWriteTask(
620663
bool isRead,
664+
bool apm,
621665
Func<object,int> function, object state,
622666
Stream stream, byte[] buffer, int offset, int count, AsyncCallback callback) :
623667
base(function, state, CancellationToken.None, TaskCreationOptions.DenyChildAttach)
@@ -631,6 +675,7 @@ public ReadWriteTask(
631675

632676
// Store the arguments
633677
_isRead = isRead;
678+
_apm = apm;
634679
_stream = stream;
635680
_buffer = buffer;
636681
_offset = offset;
@@ -697,6 +742,8 @@ public Task WriteAsync(Byte[] buffer, int offset, int count)
697742
return WriteAsync(buffer, offset, count, CancellationToken.None);
698743
}
699744

745+
746+
700747
[HostProtection(ExternalThreading = true)]
701748
[ComVisible(false)]
702749
public virtual Task WriteAsync(Byte[] buffer, int offset, int count, CancellationToken cancellationToken)
@@ -708,9 +755,20 @@ public virtual Task WriteAsync(Byte[] buffer, int offset, int count, Cancellatio
708755
: BeginEndWriteAsync(buffer, offset, count);
709756
}
710757

758+
[System.Security.SecuritySafeCritical]
759+
[MethodImplAttribute(MethodImplOptions.InternalCall)]
760+
private extern bool HasOverriddenBeginEndWrite();
711761

712762
private Task BeginEndWriteAsync(Byte[] buffer, Int32 offset, Int32 count)
713-
{
763+
{
764+
if (!HasOverriddenBeginEndWrite())
765+
{
766+
// If the Stream does not override Begin/EndWrite, then we can take an optimized path
767+
// that skips an extra layer of tasks / IAsyncResults.
768+
return (Task)BeginWriteInternal(buffer, offset, count, null, null, serializeAsynchronously: true, apm: false);
769+
}
770+
771+
// Otherwise, we need to wrap calls to Begin/EndWrite to ensure we use the derived type's functionality.
714772
return TaskFactory<VoidTaskResult>.FromAsyncTrim(
715773
this, new ReadWriteParameters { Buffer=buffer, Offset=offset, Count=count },
716774
(stream, args, callback, state) => stream.BeginWrite(args.Buffer, args.Offset, args.Count, callback, state), // cached by compiler
@@ -1057,10 +1115,6 @@ internal static void EndWrite(IAsyncResult asyncResult) {
10571115
internal sealed class SyncStream : Stream, IDisposable
10581116
{
10591117
private Stream _stream;
1060-
[NonSerialized]
1061-
private bool? _overridesBeginRead;
1062-
[NonSerialized]
1063-
private bool? _overridesBeginWrite;
10641118

10651119
internal SyncStream(Stream stream)
10661120
{
@@ -1179,38 +1233,11 @@ public override int ReadByte()
11791233
lock(_stream)
11801234
return _stream.ReadByte();
11811235
}
1182-
1183-
private static bool OverridesBeginMethod(Stream stream, string methodName)
1184-
{
1185-
Contract.Requires(stream != null, "Expected a non-null stream.");
1186-
Contract.Requires(methodName == "BeginRead" || methodName == "BeginWrite",
1187-
"Expected BeginRead or BeginWrite as the method name to check.");
1188-
1189-
// Get all of the methods on the underlying stream
1190-
var methods = stream.GetType().GetMethods(BindingFlags.Public | BindingFlags.Instance);
1191-
1192-
// If any of the methods have the desired name and are defined on the base Stream
1193-
// Type, then the method was not overridden. If none of them were defined on the
1194-
// base Stream, then it must have been overridden.
1195-
foreach (var method in methods)
1196-
{
1197-
if (method.DeclaringType == typeof(Stream) &&
1198-
method.Name == methodName)
1199-
{
1200-
return false;
1201-
}
1202-
}
1203-
return true;
1204-
}
12051236

12061237
[HostProtection(ExternalThreading=true)]
12071238
public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, Object state)
12081239
{
1209-
// Lazily-initialize whether the wrapped stream overrides BeginRead
1210-
if (_overridesBeginRead == null)
1211-
{
1212-
_overridesBeginRead = OverridesBeginMethod(_stream, "BeginRead");
1213-
}
1240+
bool overridesBeginRead = _stream.HasOverriddenBeginEndRead();
12141241

12151242
lock (_stream)
12161243
{
@@ -1220,9 +1247,9 @@ public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, Asy
12201247
// than a synchronous wait. A synchronous wait will result in a deadlock condition, because
12211248
// the EndXx method for the outstanding async operation won't be able to acquire the lock on
12221249
// _stream due to this call blocked while holding the lock.
1223-
return _overridesBeginRead.Value ?
1250+
return overridesBeginRead ?
12241251
_stream.BeginRead(buffer, offset, count, callback, state) :
1225-
_stream.BeginReadInternal(buffer, offset, count, callback, state, serializeAsynchronously: true);
1252+
_stream.BeginReadInternal(buffer, offset, count, callback, state, serializeAsynchronously: true, apm: true);
12261253
}
12271254
}
12281255

@@ -1264,11 +1291,7 @@ public override void WriteByte(byte b)
12641291
[HostProtection(ExternalThreading=true)]
12651292
public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback callback, Object state)
12661293
{
1267-
// Lazily-initialize whether the wrapped stream overrides BeginWrite
1268-
if (_overridesBeginWrite == null)
1269-
{
1270-
_overridesBeginWrite = OverridesBeginMethod(_stream, "BeginWrite");
1271-
}
1294+
bool overridesBeginWrite = _stream.HasOverriddenBeginEndWrite();
12721295

12731296
lock (_stream)
12741297
{
@@ -1278,9 +1301,9 @@ public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, As
12781301
// than a synchronous wait. A synchronous wait will result in a deadlock condition, because
12791302
// the EndXx method for the outstanding async operation won't be able to acquire the lock on
12801303
// _stream due to this call blocked while holding the lock.
1281-
return _overridesBeginWrite.Value ?
1304+
return overridesBeginWrite ?
12821305
_stream.BeginWrite(buffer, offset, count, callback, state) :
1283-
_stream.BeginWriteInternal(buffer, offset, count, callback, state, serializeAsynchronously: true);
1306+
_stream.BeginWriteInternal(buffer, offset, count, callback, state, serializeAsynchronously: true, apm: true);
12841307
}
12851308
}
12861309

0 commit comments

Comments
 (0)