Skip to content

Commit

Permalink
Two Stream.Read/WriteAsync improvements
Browse files Browse the repository at this point in the history
A logical port of two of the three fixes in dotnet/coreclr#2724.  Makes it so that Read/WriteAsync calls are serialized asynchronously rather than synchronously, and so that Wait()'ing on a Read/WriteAsync task may be able to inline the execution onto the current thread.  This also reduces allocations when calling Read/WriteAsync (though there are still more optimizations that could be done).


Commit migrated from dotnet/corefx@556f7e2
  • Loading branch information
stephentoub committed Jan 19, 2016
1 parent 262a721 commit 3667b86
Showing 1 changed file with 28 additions and 40 deletions.
68 changes: 28 additions & 40 deletions src/libraries/System.IO/src/System/IO/Stream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -270,29 +270,23 @@ public virtual Task<int> ReadAsync(Byte[] buffer, int offset, int count, Cancell
return new Task<int>(() => 0, cancellationToken);
}

return ReadAsyncTask(buffer, offset, count, cancellationToken);
}

private async Task<int> ReadAsyncTask(Byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
// To avoid a race with a stream's position pointer & generating race
// conditions with internal buffer indexes in our own streams that
// don't natively support async IO operations when there are multiple
// async requests outstanding, we will block the application's main
// thread if it does a second IO request until the first one completes.
EnsureAsyncActiveSemaphoreInitialized().Wait();

try
{
return await Task.Factory.StartNew(() => Read(buffer, offset, count),
cancellationToken,
TaskCreationOptions.DenyChildAttach,
TaskScheduler.Default);
}
finally
{
_asyncActiveSemaphore.Release();
}
// async requests outstanding, we will serialize the requests.
return EnsureAsyncActiveSemaphoreInitialized().WaitAsync().ContinueWith((completedWait, s) =>
{
Debug.Assert(completedWait.Status == TaskStatus.RanToCompletion);
var state = (Tuple<Stream, byte[], int, int>)s;
try
{
return state.Item1.Read(state.Item2, state.Item3, state.Item4); // this.Read(buffer, offset, count);
}
finally
{
state.Item1._asyncActiveSemaphore.Release();
}
}, Tuple.Create(this, buffer, offset, count), CancellationToken.None, TaskContinuationOptions.DenyChildAttach, TaskScheduler.Default);
}

public Task WriteAsync(Byte[] buffer, int offset, int count)
Expand All @@ -312,29 +306,23 @@ public virtual Task WriteAsync(Byte[] buffer, int offset, int count, Cancellatio
return new Task(() => { }, cancellationToken);
}

return WriteAsyncTask(buffer, offset, count, cancellationToken);
}

private async Task WriteAsyncTask(Byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
// To avoid a race with a stream's position pointer & generating race
// conditions with internal buffer indexes in our own streams that
// don't natively support async IO operations when there are multiple
// async requests outstanding, we will block the application's main
// thread if it does a second IO request until the first one completes.
EnsureAsyncActiveSemaphoreInitialized().Wait();

try
{
await Task.Factory.StartNew(() => Write(buffer, offset, count),
cancellationToken,
TaskCreationOptions.DenyChildAttach,
TaskScheduler.Default);
}
finally
{
_asyncActiveSemaphore.Release();
}
// async requests outstanding, we will serialize the requests.
return EnsureAsyncActiveSemaphoreInitialized().WaitAsync().ContinueWith((completedWait, s) =>
{
Debug.Assert(completedWait.Status == TaskStatus.RanToCompletion);
var state = (Tuple<Stream, byte[], int, int>)s;
try
{
state.Item1.Write(state.Item2, state.Item3, state.Item4); // this.Write(buffer, offset, count);
}
finally
{
state.Item1._asyncActiveSemaphore.Release();
}
}, Tuple.Create(this, buffer, offset, count), CancellationToken.None, TaskContinuationOptions.DenyChildAttach, TaskScheduler.Default);
}

public abstract long Seek(long offset, SeekOrigin origin);
Expand Down

0 comments on commit 3667b86

Please sign in to comment.