Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Efficient RandomAccess async I/O on the thread pool. #55123

Merged
merged 10 commits into from
Jul 4, 2021
Prev Previous commit
Next Next commit
Reduce ThreadPoolValueTaskSource's field count.
  • Loading branch information
teo-tsirpanis committed Jul 3, 2021
commit 218b566397748ff6133679f479d42c486d04fde5
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

using Internal.Runtime.CompilerServices;
using System;
using System.Collections.Generic;
using System.Diagnostics;
Expand Down Expand Up @@ -49,10 +50,10 @@ private enum Operation : byte
// The first two are common for all kinds of operations.
private long _fileOffset;
private CancellationToken _cancellationToken;
private Memory<byte> _readMemory;
private ReadOnlyMemory<byte> _writeMemory;
private IReadOnlyList<Memory<byte>>? _readScatterMemories;
private IReadOnlyList<ReadOnlyMemory<byte>>? _writeGatherMemories;
// Used by simple reads and writes. Will be unsafely cast to a memory when performing a read.
private ReadOnlyMemory<byte> _singleSegment;
// Used by vectored reads and writes. Is an IReadOnlyList of either Memory or ReadOnlyMemory of bytes.
private object? _multiSegmentCollection;
teo-tsirpanis marked this conversation as resolved.
Show resolved Hide resolved

internal ThreadPoolValueTaskSource(SafeFileHandle fileHandle)
{
Expand Down Expand Up @@ -98,18 +99,19 @@ void IThreadPoolWorkItem.Execute()
switch (_operation)
{
case Operation.Read:
result = RandomAccess.ReadAtOffset(_fileHandle, _readMemory.Span, _fileOffset);
Memory<byte> writableSingleSegment = Unsafe.As<ReadOnlyMemory<byte>, Memory<byte>>(ref _singleSegment);
teo-tsirpanis marked this conversation as resolved.
Show resolved Hide resolved
result = RandomAccess.ReadAtOffset(_fileHandle, writableSingleSegment.Span, _fileOffset);
break;
case Operation.Write:
result = RandomAccess.WriteAtOffset(_fileHandle, _writeMemory.Span, _fileOffset);
result = RandomAccess.WriteAtOffset(_fileHandle, _singleSegment.Span, _fileOffset);
break;
case Operation.ReadScatter:
Debug.Assert(_readScatterMemories != null);
result = RandomAccess.ReadScatterAtOffset(_fileHandle, _readScatterMemories!, _fileOffset);
Debug.Assert(_multiSegmentCollection != null && _multiSegmentCollection is IReadOnlyList<Memory<byte>>);
teo-tsirpanis marked this conversation as resolved.
Show resolved Hide resolved
result = RandomAccess.ReadScatterAtOffset(_fileHandle, (IReadOnlyList<Memory<byte>>) _multiSegmentCollection, _fileOffset);
break;
case Operation.WriteGather:
Debug.Assert(_writeGatherMemories != null);
result = RandomAccess.WriteGatherAtOffset(_fileHandle, _writeGatherMemories!, _fileOffset);
Debug.Assert(_multiSegmentCollection != null && _multiSegmentCollection is IReadOnlyList<ReadOnlyMemory<byte>>);
teo-tsirpanis marked this conversation as resolved.
Show resolved Hide resolved
result = RandomAccess.WriteGatherAtOffset(_fileHandle, (IReadOnlyList<ReadOnlyMemory<byte>>)_multiSegmentCollection, _fileOffset);
break;
}
} finally
teo-tsirpanis marked this conversation as resolved.
Show resolved Hide resolved
Expand All @@ -119,10 +121,8 @@ void IThreadPoolWorkItem.Execute()
_operation = Operation.None;
_fileOffset = 0;
_cancellationToken = default;
_readMemory = default;
_writeMemory = default;
_readScatterMemories = null;
_writeGatherMemories = null;
_singleSegment = default;
_multiSegmentCollection = null;
}
} catch (Exception e)
teo-tsirpanis marked this conversation as resolved.
Show resolved Hide resolved
{
Expand All @@ -136,7 +136,7 @@ public ValueTask<int> QueueRead(Memory<byte> buffer, long fileOffset, Cancellati
Debug.Assert(_operation == Operation.None, "An operation was queued before the previous one's completion.");
teo-tsirpanis marked this conversation as resolved.
Show resolved Hide resolved

_operation = Operation.Read;
_readMemory = buffer;
_singleSegment = buffer;
_fileOffset = fileOffset;
_cancellationToken = cancellationToken;
ThreadPool.UnsafeQueueUserWorkItem(this, false);
teo-tsirpanis marked this conversation as resolved.
Show resolved Hide resolved
Expand All @@ -149,7 +149,7 @@ public ValueTask<int> QueueWrite(ReadOnlyMemory<byte> buffer, long fileOffset, C
Debug.Assert(_operation == Operation.None, "An operation was queued before the previous one's completion.");

_operation = Operation.Write;
_writeMemory = buffer;
_singleSegment = buffer;
_fileOffset = fileOffset;
_cancellationToken = cancellationToken;
ThreadPool.UnsafeQueueUserWorkItem(this, false);
Expand All @@ -162,7 +162,7 @@ public ValueTask<long> QueueReadScatter(IReadOnlyList<Memory<byte>> buffers, lon
Debug.Assert(_operation == Operation.None, "An operation was queued before the previous one's completion.");

_operation = Operation.ReadScatter;
_readScatterMemories = buffers;
_multiSegmentCollection = buffers;
_fileOffset = fileOffset;
_cancellationToken = cancellationToken;
ThreadPool.UnsafeQueueUserWorkItem(this, false);
Expand All @@ -175,7 +175,7 @@ public ValueTask<long> QueueWriteGather(IReadOnlyList<ReadOnlyMemory<byte>> buff
Debug.Assert(_operation == Operation.None, "An operation was queued before the previous one's completion.");

_operation = Operation.WriteGather;
_writeGatherMemories = buffers;
_multiSegmentCollection = buffers;
_fileOffset = fileOffset;
_cancellationToken = cancellationToken;
ThreadPool.UnsafeQueueUserWorkItem(this, false);
Expand Down