Skip to content

Encapsulate the logic of caching the last synchronously completed task. #61781

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Jan 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1050,6 +1050,7 @@
<Compile Include="$(MSBuildThisFileDirectory)System\Threading\SynchronizationContext.cs" />
<Compile Include="$(MSBuildThisFileDirectory)System\Threading\SynchronizationLockException.cs" />
<Compile Include="$(MSBuildThisFileDirectory)System\Threading\Tasks\AsyncCausalityTracerConstants.cs" />
<Compile Include="$(MSBuildThisFileDirectory)System\Threading\Tasks\CachedCompletedInt32Task.cs" />
<Compile Include="$(MSBuildThisFileDirectory)System\Threading\Tasks\ConcurrentExclusiveSchedulerPair.cs" />
<Compile Include="$(MSBuildThisFileDirectory)System\Threading\Tasks\Future.cs" />
<Compile Include="$(MSBuildThisFileDirectory)System\Threading\Tasks\FutureFactory.cs" />
Expand Down Expand Up @@ -2338,4 +2339,4 @@
<Compile Include="$(MSBuildThisFileDirectory)System\IUnaryNegationOperators.cs" />
<Compile Include="$(MSBuildThisFileDirectory)System\IUnaryPlusOperators.cs" />
</ItemGroup>
</Project>
</Project>
Original file line number Diff line number Diff line change
Expand Up @@ -48,16 +48,14 @@ public sealed class BufferedStream : Stream
private const int MaxShadowBufferSize = 81920; // Make sure not to get to the Large Object Heap.
private const int DefaultBufferSize = 4096;

private Stream? _stream; // Underlying stream. Close sets _stream to null.
private byte[]? _buffer; // Shared read/write buffer. Alloc on first use.
private readonly int _bufferSize; // Length of internal buffer (not counting the shadow buffer).
private int _readPos; // Read pointer within shared buffer.
private int _readLen; // Number of bytes read in buffer from _stream.
private int _writePos; // Write pointer within shared buffer.
private Task<int>? _lastSyncCompletedReadTask; // The last successful Task returned from ReadAsync
// (perf optimization for successive reads of the same size)
// Removing a private default constructor is a breaking change for the DataDebugSerializer.
// Because this ctor was here previously we need to keep it around.
private Stream? _stream; // Underlying stream. Close sets _stream to null.
private byte[]? _buffer; // Shared read/write buffer. Alloc on first use.
private readonly int _bufferSize; // Length of internal buffer (not counting the shadow buffer).
private int _readPos; // Read pointer within shared buffer.
private int _readLen; // Number of bytes read in buffer from _stream.
private int _writePos; // Write pointer within shared buffer.
private CachedCompletedInt32Task _lastSyncCompletedReadTask; // The last successful Task returned from ReadAsync
// (perf optimization for successive reads of the same size)

public BufferedStream(Stream stream)
: this(stream, DefaultBufferSize)
Expand Down Expand Up @@ -571,19 +569,6 @@ public override int Read(Span<byte> destination)
}
}

private Task<int> LastSyncCompletedReadTask(int val)
{
Task<int>? t = _lastSyncCompletedReadTask;
Debug.Assert(t == null || t.IsCompletedSuccessfully);

if (t != null && t.Result == val)
return t;

t = Task.FromResult<int>(val);
_lastSyncCompletedReadTask = t;
return t;
}

public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
ValidateBufferArguments(buffer, offset, count);
Expand Down Expand Up @@ -622,7 +607,7 @@ public override Task<int> ReadAsync(byte[] buffer, int offset, int count, Cancel
{

return (error == null)
? LastSyncCompletedReadTask(bytesFromBuffer)
? _lastSyncCompletedReadTask.GetTask(bytesFromBuffer)
: Task.FromException<int>(error);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public class MemoryStream : Stream
private readonly bool _exposable; // Whether the array can be returned to the user.
private bool _isOpen; // Is this stream open or closed?

private Task<int>? _lastReadTask; // The last successful task returned from ReadAsync
private CachedCompletedInt32Task _lastReadTask; // The last successful task returned from ReadAsync

private const int MemStreamMaxLength = int.MaxValue;

Expand Down Expand Up @@ -127,7 +127,7 @@ protected override void Dispose(bool disposing)
_writable = false;
_expandable = false;
// Don't set buffer to null - allow TryGetBuffer, GetBuffer & ToArray to work.
_lastReadTask = null;
_lastReadTask = default;
}
}
finally
Expand Down Expand Up @@ -389,10 +389,7 @@ public override Task<int> ReadAsync(byte[] buffer, int offset, int count, Cancel
try
{
int n = Read(buffer, offset, count);
Task<int>? t = _lastReadTask;
Debug.Assert(t == null || t.Status == TaskStatus.RanToCompletion,
"Expected that a stored last task completed successfully");
return (t != null && t.Result == n) ? t : (_lastReadTask = Task.FromResult<int>(n));
return _lastReadTask.GetTask(n);
}
catch (OperationCanceledException oce)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ internal sealed class BufferedFileStreamStrategy : FileStreamStrategy
private int _readPos;
private int _readLen;
// The last successful Task returned from ReadAsync (perf optimization for successive reads of the same size)
private Task<int>? _lastSyncCompletedReadTask;
private CachedCompletedInt32Task _lastSyncCompletedReadTask;

internal BufferedFileStreamStrategy(FileStreamStrategy strategy, int bufferSize)
{
Expand Down Expand Up @@ -310,21 +310,8 @@ public override Task<int> ReadAsync(byte[] buffer, int offset, int count, Cancel
ValueTask<int> readResult = ReadAsync(new Memory<byte>(buffer, offset, count), cancellationToken);

return readResult.IsCompletedSuccessfully
? LastSyncCompletedReadTask(readResult.Result)
? _lastSyncCompletedReadTask.GetTask(readResult.Result)
: readResult.AsTask();

Task<int> LastSyncCompletedReadTask(int val)
{
Task<int>? t = _lastSyncCompletedReadTask;
Debug.Assert(t == null || t.IsCompletedSuccessfully);

if (t != null && t.Result == val)
return t;

t = Task.FromResult<int>(val);
_lastSyncCompletedReadTask = t;
return t;
}
}

public override ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken = default)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public class UnmanagedMemoryStream : Stream
private long _offset;
private FileAccess _access;
private bool _isOpen;
private Task<int>? _lastReadTask; // The last successful task returned from ReadAsync
private CachedCompletedInt32Task _lastReadTask; // The last successful task returned from ReadAsync

/// <summary>
/// Creates a closed stream.
Expand Down Expand Up @@ -437,12 +437,11 @@ public override Task<int> ReadAsync(byte[] buffer, int offset, int count, Cancel
try
{
int n = Read(buffer, offset, count);
Task<int>? t = _lastReadTask;
return (t != null && t.Result == n) ? t : (_lastReadTask = Task.FromResult<int>(n));
return _lastReadTask.GetTask(n);
}
catch (Exception ex)
{
Debug.Assert(!(ex is OperationCanceledException));
Debug.Assert(ex is not OperationCanceledException);
return Task.FromException<int>(ex);
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

using System.Diagnostics;
using System.IO;
using System.Runtime.CompilerServices;

namespace System.Threading.Tasks
{
/// <summary>
/// Encapsulates the logic of caching the last synchronously completed task of integer.
/// Used in classes like <see cref="MemoryStream"/> to reduce allocations.
/// </summary>
internal struct CachedCompletedInt32Task
{
private Task<int>? _task;

/// <summary>
/// Gets a completed <see cref="Task{Int32}"/> whose result is <paramref name="result"/>.
/// </summary>
/// <remarks>
/// This method will try to return an already cached task if available.
/// </remarks>
/// <param name="result">The task's result.</param>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public Task<int> GetTask(int result)
{
Task<int>? task;
#pragma warning disable CA1849 // Call async methods when in an async method
if ((task = _task) is not null && task.Result == result)
Comment on lines +28 to +30
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could have been:

if (_task is Task<int> task && task.Result == result)

#pragma warning restore CA1849 // Call async methods when in an async method
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't have CA1849 enabled. Why is this suppression needed? There are many other places in the code where it would trigger if it were to be enabled, which is why we haven't enabled it... it still needs work.

{
Debug.Assert(task.IsCompletedSuccessfully,
"Expected that a stored last task completed successfully");
Copy link
Member

@stephentoub stephentoub Jan 18, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This assert is kind of meaningless: if the task didn't complete successfully, the task.Result in the if condition would have thrown. It would make more sense if it were between the check for null and accessing Result.

return task;
}
else
{
return _task = Task.FromResult(result);
}
}
}
}