Skip to content

Use PipeWriter to write files #24851

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/Hosting/TestHost/src/ResponseFeature.cs
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ public void DisableBuffering()

public Task SendFileAsync(string path, long offset, long? count, CancellationToken cancellation)
{
return SendFileFallback.SendFileAsync(Stream, path, offset, count, cancellation);
return SendFileFallback.SendFileAsync(Writer, path, offset, count, cancellation);
}

public Task CompleteAsync()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
<Project Sdk="Microsoft.NET.Sdk">
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<Description>ASP.NET Core common extension methods for HTTP abstractions, HTTP headers, HTTP request/response, and session state.</Description>
Expand Down
7 changes: 2 additions & 5 deletions src/Http/Http.Extensions/src/SendFileResponseExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Http.Extensions;
using Microsoft.AspNetCore.Http.Features;
using Microsoft.Extensions.FileProviders;

Expand All @@ -16,8 +15,6 @@ namespace Microsoft.AspNetCore.Http
/// </summary>
public static class SendFileResponseExtensions
{
private const int StreamCopyBufferSize = 64 * 1024;

/// <summary>
/// Sends the given file using the SendFile extension.
/// </summary>
Expand Down Expand Up @@ -124,13 +121,13 @@ private static async Task SendFileAsyncCore(HttpResponse response, IFileInfo fil
{
fileContent.Seek(offset, SeekOrigin.Begin);
}
await StreamCopyOperation.CopyToAsync(fileContent, response.Body, count, StreamCopyBufferSize, localCancel);
await StreamCopyOperationInternal.CopyToAsync(fileContent, response.BodyWriter, count, localCancel);
}
catch (OperationCanceledException) when (useRequestAborted) { }
}
else
{
await response.SendFileAsync(file.PhysicalPath, offset, count, cancellationToken);
await SendFileAsyncCore(response, file.PhysicalPath, offset, count, cancellationToken);
}
}

Expand Down
10 changes: 10 additions & 0 deletions src/Http/Http.Extensions/src/StreamCopyOperation.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.

using System.IO;
using System.IO.Pipelines;
using System.Threading;
using System.Threading.Tasks;

Expand All @@ -28,5 +29,14 @@ public static Task CopyToAsync(Stream source, Stream destination, long? count, C
/// <param name="cancel">The token to monitor for cancellation requests. The default value is <see cref="P:System.Threading.CancellationToken.None" />.</param>
public static Task CopyToAsync(Stream source, Stream destination, long? count, int bufferSize, CancellationToken cancel)
=> StreamCopyOperationInternal.CopyToAsync(source, destination, count, bufferSize, cancel);

/// <summary>Asynchronously reads the given number of bytes from the source stream and writes them using pipe writer.</summary>
/// <returns>A task that represents the asynchronous copy operation.</returns>
/// <param name="source">The stream from which the contents will be copied.</param>
/// <param name="writer">The PipeWriter to which the contents of the current stream will be copied.</param>
/// <param name="count">The count of bytes to be copied.</param>
/// <param name="cancel">The token to monitor for cancellation requests. The default value is <see cref="P:System.Threading.CancellationToken.None" />.</param>
public static Task CopyToAsync(Stream source, PipeWriter writer, long? count, CancellationToken cancel)
=> StreamCopyOperationInternal.CopyToAsync(source, writer, count, cancel);
}
}
44 changes: 36 additions & 8 deletions src/Http/Http/src/SendFileFallback.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

using System;
using System.IO;
using System.IO.Pipelines;
using System.Threading;
using System.Threading.Tasks;

Expand All @@ -20,6 +21,39 @@ public static class SendFileFallback
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to abort the transmission.</param>
/// <returns></returns>
public static async Task SendFileAsync(Stream destination, string filePath, long offset, long? count, CancellationToken cancellationToken)
{
await using FileStream fileStream = GetFileStream(filePath, offset, count, cancellationToken);

if (offset > 0)
{
fileStream.Seek(offset, SeekOrigin.Begin);
}

await StreamCopyOperationInternal.CopyToAsync(fileStream, destination, count, cancellationToken);
}

/// <summary>
/// Write the segment of the file using pipe writer.
/// </summary>
/// <param name="writer"></param>
/// <param name="filePath">The full disk path to the file.</param>
/// <param name="offset">The offset in the file to start at.</param>
/// <param name="count">The number of bytes to send, or null to send the remainder of the file.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to abort the transmission.</param>
/// <returns></returns>
public static async Task SendFileAsync(PipeWriter writer, string filePath, long offset, long? count, CancellationToken cancellationToken)
{
await using FileStream fileStream = GetFileStream(filePath, offset, count, cancellationToken);

if (offset > 0)
{
fileStream.Seek(offset, SeekOrigin.Begin);
}

await StreamCopyOperationInternal.CopyToAsync(fileStream, writer, count, cancellationToken);
}

private static FileStream GetFileStream(string filePath, long offset, long? count, CancellationToken cancellationToken)
{
var fileInfo = new FileInfo(filePath);
if (offset < 0 || offset > fileInfo.Length)
Expand All @@ -34,21 +68,15 @@ public static async Task SendFileAsync(Stream destination, string filePath, long

cancellationToken.ThrowIfCancellationRequested();

int bufferSize = 1024 * 16;
int bufferSize = 1;

var fileStream = new FileStream(
return new FileStream(
filePath,
FileMode.Open,
FileAccess.Read,
FileShare.ReadWrite,
bufferSize: bufferSize,
options: FileOptions.Asynchronous | FileOptions.SequentialScan);

using (fileStream)
{
fileStream.Seek(offset, SeekOrigin.Begin);
await StreamCopyOperationInternal.CopyToAsync(fileStream, destination, count, bufferSize, cancellationToken);
}
}
}
}
70 changes: 66 additions & 4 deletions src/Http/Shared/StreamCopyOperationInternal.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using System.Buffers;
using System.Diagnostics;
using System.IO;
using System.IO.Pipelines;
using System.Threading;
using System.Threading.Tasks;

Expand All @@ -13,7 +14,7 @@ namespace Microsoft.AspNetCore.Http
// FYI: In most cases the source will be a FileStream and the destination will be to the network.
internal static class StreamCopyOperationInternal
{
private const int DefaultBufferSize = 4096;
private const int DefaultBufferSize = 4 * 1024;

/// <summary>Asynchronously reads the given number of bytes from the source stream and writes them to another stream.</summary>
/// <returns>A task that represents the asynchronous copy operation.</returns>
Expand Down Expand Up @@ -42,13 +43,13 @@ public static async Task CopyToAsync(Stream source, Stream destination, long? co
{
Debug.Assert(source != null);
Debug.Assert(destination != null);
Debug.Assert(!bytesRemaining.HasValue || bytesRemaining.GetValueOrDefault() >= 0);
Debug.Assert(!bytesRemaining.HasValue || bytesRemaining.Value >= 0);
Debug.Assert(buffer != null);

while (true)
{
// The natural end of the range.
if (bytesRemaining.HasValue && bytesRemaining.GetValueOrDefault() <= 0)
if (bytesRemaining.HasValue && bytesRemaining.Value <= 0)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did you change these to Value? Value checks whether the value is null and throws an error if it is. GetValueOrDefault returns whatever the value is, making it faster.

{
return;
}
Expand All @@ -58,7 +59,7 @@ public static async Task CopyToAsync(Stream source, Stream destination, long? co
int readLength = buffer.Length;
if (bytesRemaining.HasValue)
{
readLength = (int)Math.Min(bytesRemaining.GetValueOrDefault(), (long)readLength);
readLength = (int)Math.Min(bytesRemaining.Value, (long)readLength);
}
int read = await source.ReadAsync(buffer, 0, readLength, cancel);

Expand All @@ -83,5 +84,66 @@ public static async Task CopyToAsync(Stream source, Stream destination, long? co
ArrayPool<byte>.Shared.Return(buffer);
}
}

/// <summary>Asynchronously reads the given number of bytes from the source stream and writes them using pipe writer.</summary>
/// <returns>A task that represents the asynchronous copy operation.</returns>
/// <param name="source">The stream from which the contents will be copied.</param>
/// <param name="writer">The PipeWriter to which the contents of the current stream will be copied.</param>
/// <param name="count">The count of bytes to be copied.</param>
/// <param name="cancel">The token to monitor for cancellation requests.</param>
public static Task CopyToAsync(Stream source, PipeWriter writer, long? count, CancellationToken cancel)
{
if (count == null)
{
// No length, do a copy with the default buffer size (based on whatever the pipe settings are, default is 4K)
return source.CopyToAsync(writer, cancel);
}

static async Task CopyToAsync(Stream source, PipeWriter writer, long bytesRemaining, CancellationToken cancel)
{
// The array pool likes powers of 2
const int maxBufferSize = 64 * 1024;
const int minBufferSize = 1024;

// We know exactly how much we're going to copy
while (bytesRemaining > 0)
{
var bufferSize = (int)Math.Clamp(bytesRemaining, minBufferSize, maxBufferSize);

// The natural end of the range.
var memory = writer.GetMemory(bufferSize);

if (memory.Length > bytesRemaining)
{
memory = memory.Slice(0, (int)bytesRemaining);
}

var read = await source.ReadAsync(memory, cancel);

bytesRemaining -= read;

// End of the source stream.
if (read == 0)
{
break;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should you advance here to get the writer in a working state?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that only matters if you write something. At least in the Pipe implementation.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought if you call GetMemory, you can't call GetMemory afterwards without advancing.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope.

}

writer.Advance(read);

var result = await writer.FlushAsync(cancel);

if (result.IsCompleted)
{
break;
}
}
}

Debug.Assert(source != null);
Debug.Assert(writer != null);
Debug.Assert(count >= 0);

return CopyToAsync(source, writer, count.Value, cancel);
}
}
}
3 changes: 3 additions & 0 deletions src/Middleware/StaticFiles/startvs.cmd
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
@ECHO OFF

%~dp0..\..\..\startvs.cmd %~dp0StaticFiles.slnf
8 changes: 4 additions & 4 deletions src/Mvc/Mvc.Core/src/Infrastructure/FileResultExecutorBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -360,19 +360,19 @@ protected static ILogger CreateLogger<T>(ILoggerFactory factory)

protected static async Task WriteFileAsync(HttpContext context, Stream fileStream, RangeItemHeaderValue range, long rangeLength)
{
var outputStream = context.Response.Body;
using (fileStream)
var outputPipeWriter = context.Response.BodyWriter;
await using (fileStream)
{
try
{
if (range == null)
{
await StreamCopyOperation.CopyToAsync(fileStream, outputStream, count: null, bufferSize: BufferSize, cancel: context.RequestAborted);
await StreamCopyOperation.CopyToAsync(fileStream, outputPipeWriter, count: null, cancel: context.RequestAborted);
}
else
{
fileStream.Seek(range.From.Value, SeekOrigin.Begin);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does here also need a check for range.From.Value > 0 before seek?
Also does it need a null check for range.From?

await StreamCopyOperation.CopyToAsync(fileStream, outputStream, rangeLength, BufferSize, context.RequestAborted);
await StreamCopyOperation.CopyToAsync(fileStream, outputPipeWriter, rangeLength, context.RequestAborted);
}
}
catch (OperationCanceledException)
Expand Down
34 changes: 23 additions & 11 deletions src/Servers/Kestrel/Core/src/Internal/Http/Http1OutputProducer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ internal class Http1OutputProducer : IHttpOutputProducer, IDisposable
// These should be cleared by the end of the request
private List<CompletedBuffer> _completedSegments;
private Memory<byte> _currentSegment;
private IMemoryOwner<byte> _currentSegmentOwner;
private object _currentSegmentOwner;
private int _position;
private bool _startCalled;

Expand Down Expand Up @@ -436,7 +436,15 @@ public void Dispose()

private void DisposeCurrentSegment()
{
_currentSegmentOwner?.Dispose();
if (_currentSegmentOwner is IMemoryOwner<byte> owner)
{
owner.Dispose();
}
else if (_currentSegmentOwner is byte[] array)
{
ArrayPool<byte>.Shared.Return(array);
}

_currentSegmentOwner = null;
_currentSegment = default;
}
Expand Down Expand Up @@ -600,13 +608,13 @@ private Memory<byte> GetChunkedMemory(int sizeHint)
// Calculating ChunkWriter.GetBeginChunkByteCount isn't free, so instead, we can add
// the max length for the prefix and suffix and add it to the sizeHint.
// This still guarantees that the memory passed in will be larger than the sizeHint.
sizeHint += MaxBeginChunkLength + EndChunkLength;
sizeHint += MaxBeginChunkLength + EndChunkLength;
UpdateCurrentChunkMemory(sizeHint);
}
// Check if we need to allocate a new memory.
else if (_advancedBytesForChunk >= _currentChunkMemory.Length - _currentMemoryPrefixBytes - EndChunkLength - sizeHint && _advancedBytesForChunk > 0)
{
sizeHint += MaxBeginChunkLength + EndChunkLength;
sizeHint += MaxBeginChunkLength + EndChunkLength;
var writer = new BufferWriter<PipeWriter>(_pipeWriter);
WriteCurrentChunkMemoryToPipeWriter(ref writer);
writer.Commit();
Expand All @@ -632,7 +640,7 @@ private void UpdateCurrentChunkMemory(int sizeHint)
}
_currentChunkMemoryUpdated = true;
}

private void WriteCurrentChunkMemoryToPipeWriter(ref BufferWriter<PipeWriter> writer)
{
Debug.Assert(_advancedBytesForChunk <= _currentChunkMemory.Length);
Expand Down Expand Up @@ -714,8 +722,8 @@ private void AddSegment(int sizeHint = 0)
}
else
{
_currentSegment = new byte[sizeHint];
_currentSegmentOwner = null;
_currentSegment = ArrayPool<byte>.Shared.Rent(sizeHint);
_currentSegmentOwner = _currentSegment;
}

_position = 0;
Expand All @@ -741,14 +749,14 @@ private static void ThrowSuffixSent()
/// </summary>
private readonly struct CompletedBuffer
{
private readonly IMemoryOwner<byte> _memoryOwner;
private readonly object _memoryOwner;

public Memory<byte> Buffer { get; }
public int Length { get; }

public ReadOnlySpan<byte> Span => Buffer.Span.Slice(0, Length);

public CompletedBuffer(IMemoryOwner<byte> owner, Memory<byte> buffer, int length)
public CompletedBuffer(object owner, Memory<byte> buffer, int length)
{
_memoryOwner = owner;

Expand All @@ -758,9 +766,13 @@ public CompletedBuffer(IMemoryOwner<byte> owner, Memory<byte> buffer, int length

public void Return()
{
if (_memoryOwner != null)
if (_memoryOwner is IMemoryOwner<byte> owner)
{
owner.Dispose();
}
else if (_memoryOwner is byte[] array)
{
_memoryOwner.Dispose();
ArrayPool<byte>.Shared.Return(array);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ void IHttpResponseBodyFeature.DisableBuffering()

Task IHttpResponseBodyFeature.SendFileAsync(string path, long offset, long? count, CancellationToken cancellation)
{
return SendFileFallback.SendFileAsync(ResponseBody, path, offset, count, cancellation);
return SendFileAsync(path, offset, count, cancellation);
}

Task IHttpResponseBodyFeature.CompleteAsync()
Expand Down
Loading