Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Efficient RandomAccess async I/O on the thread pool. #55123

Merged
merged 10 commits into from
Jul 4, 2021
Prev Previous commit
Next Next commit
Flow the ExecutionContext in ThreadPoolValueTaskSource.
  • Loading branch information
teo-tsirpanis committed Jul 4, 2021
commit 6702c0cbce2b93294793cdea74d36aa541989d75
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,10 @@ private void TryToReuse(ThreadPoolValueTaskSource source)
/// </summary>
internal sealed class ThreadPoolValueTaskSource : IThreadPoolWorkItem, IValueTaskSource<int>, IValueTaskSource<long>
{
// Whether to put the operation on a local ThreadPool queue.
private const bool PreferLocal = true;

private readonly SafeFileHandle _fileHandle;
private ManualResetValueTaskSourceCore<long> _source;
private Operation _operation = Operation.None;
private ExecutionContext? _context;
teo-tsirpanis marked this conversation as resolved.
Show resolved Hide resolved

// These fields store the parameters for the operation.
// The first two are common for all kinds of operations.
Expand Down Expand Up @@ -80,7 +78,7 @@ public void OnCompleted(Action<object?> continuation, object? state, short token
int IValueTaskSource<int>.GetResult(short token) => (int) GetResultAndRelease(token);
teo-tsirpanis marked this conversation as resolved.
Show resolved Hide resolved
long IValueTaskSource<long>.GetResult(short token) => GetResultAndRelease(token);

void IThreadPoolWorkItem.Execute()
private void ExecuteInternal()
{
Debug.Assert(_operation >= Operation.Read && _operation <= Operation.WriteGather);

Expand Down Expand Up @@ -137,6 +135,24 @@ void IThreadPoolWorkItem.Execute()
}
}

void IThreadPoolWorkItem.Execute()
{
if (_context == null || _context.IsDefault)
{
ExecuteInternal();
}
else
{
ExecutionContext.RunForThreadPoolUnsafe(_context, static x => x.ExecuteInternal(), this);
}
}

private void QueueToThreadPool()
{
_context = ExecutionContext.Capture();
ThreadPool.UnsafeQueueUserWorkItem(this, preferLocal: true);
}

public ValueTask<int> QueueRead(Memory<byte> buffer, long fileOffset, CancellationToken cancellationToken)
{
ValidateInvariants();
Expand All @@ -145,7 +161,7 @@ public ValueTask<int> QueueRead(Memory<byte> buffer, long fileOffset, Cancellati
_singleSegment = buffer;
_fileOffset = fileOffset;
_cancellationToken = cancellationToken;
ThreadPool.UnsafeQueueUserWorkItem(this, PreferLocal);
QueueToThreadPool();

return new ValueTask<int>(this, _source.Version);
}
Expand All @@ -158,7 +174,7 @@ public ValueTask<int> QueueWrite(ReadOnlyMemory<byte> buffer, long fileOffset, C
_singleSegment = buffer;
_fileOffset = fileOffset;
_cancellationToken = cancellationToken;
ThreadPool.UnsafeQueueUserWorkItem(this, PreferLocal);
QueueToThreadPool();

return new ValueTask<int>(this, _source.Version);
}
Expand All @@ -171,7 +187,7 @@ public ValueTask<long> QueueReadScatter(IReadOnlyList<Memory<byte>> buffers, lon
_multiSegmentCollection = buffers;
_fileOffset = fileOffset;
_cancellationToken = cancellationToken;
ThreadPool.UnsafeQueueUserWorkItem(this, PreferLocal);
QueueToThreadPool();

return new ValueTask<long>(this, _source.Version);
}
Expand All @@ -184,7 +200,7 @@ public ValueTask<long> QueueWriteGather(IReadOnlyList<ReadOnlyMemory<byte>> buff
_multiSegmentCollection = buffers;
_fileOffset = fileOffset;
_cancellationToken = cancellationToken;
ThreadPool.UnsafeQueueUserWorkItem(this, PreferLocal);
QueueToThreadPool();

return new ValueTask<long>(this, _source.Version);
}
Expand Down