Skip to content

Commit 445c85e

Browse files
Kahbazidavidfowl
authored andcommitted
- Use PipeWriter to write files
- Use Async Disposable for FileStream - Use PipeWriter in FileResult - Call StartAsync before using the PipeWriter - Add SendFileAsync to HttpProtocol and call StartAsync in there to make sure we're not buffering more than we need to. - Use the array pool as a fallback for large buffers - Max buffer size of 64K
1 parent 9408ed8 commit 445c85e

File tree

12 files changed

+165
-36
lines changed

12 files changed

+165
-36
lines changed

src/Hosting/TestHost/src/ResponseFeature.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,7 @@ public void DisableBuffering()
146146

147147
public Task SendFileAsync(string path, long offset, long? count, CancellationToken cancellation)
148148
{
149-
return SendFileFallback.SendFileAsync(Stream, path, offset, count, cancellation);
149+
return SendFileFallback.SendFileAsync(Writer, path, offset, count, cancellation);
150150
}
151151

152152
public Task CompleteAsync()

src/Http/Http.Extensions/src/Microsoft.AspNetCore.Http.Extensions.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
<Project Sdk="Microsoft.NET.Sdk">
1+
<Project Sdk="Microsoft.NET.Sdk">
22

33
<PropertyGroup>
44
<Description>ASP.NET Core common extension methods for HTTP abstractions, HTTP headers, HTTP request/response, and session state.</Description>

src/Http/Http.Extensions/src/SendFileResponseExtensions.cs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
using System.IO;
66
using System.Threading;
77
using System.Threading.Tasks;
8-
using Microsoft.AspNetCore.Http.Extensions;
98
using Microsoft.AspNetCore.Http.Features;
109
using Microsoft.Extensions.FileProviders;
1110

@@ -16,8 +15,6 @@ namespace Microsoft.AspNetCore.Http
1615
/// </summary>
1716
public static class SendFileResponseExtensions
1817
{
19-
private const int StreamCopyBufferSize = 64 * 1024;
20-
2118
/// <summary>
2219
/// Sends the given file using the SendFile extension.
2320
/// </summary>
@@ -124,13 +121,13 @@ private static async Task SendFileAsyncCore(HttpResponse response, IFileInfo fil
124121
{
125122
fileContent.Seek(offset, SeekOrigin.Begin);
126123
}
127-
await StreamCopyOperation.CopyToAsync(fileContent, response.Body, count, StreamCopyBufferSize, localCancel);
124+
await StreamCopyOperationInternal.CopyToAsync(fileContent, response.BodyWriter, count, localCancel);
128125
}
129126
catch (OperationCanceledException) when (useRequestAborted) { }
130127
}
131128
else
132129
{
133-
await response.SendFileAsync(file.PhysicalPath, offset, count, cancellationToken);
130+
await SendFileAsyncCore(response, file.PhysicalPath, offset, count, cancellationToken);
134131
}
135132
}
136133

src/Http/Http.Extensions/src/StreamCopyOperation.cs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
33

44
using System.IO;
5+
using System.IO.Pipelines;
56
using System.Threading;
67
using System.Threading.Tasks;
78

@@ -28,5 +29,14 @@ public static Task CopyToAsync(Stream source, Stream destination, long? count, C
2829
/// <param name="cancel">The token to monitor for cancellation requests. The default value is <see cref="P:System.Threading.CancellationToken.None" />.</param>
2930
public static Task CopyToAsync(Stream source, Stream destination, long? count, int bufferSize, CancellationToken cancel)
3031
=> StreamCopyOperationInternal.CopyToAsync(source, destination, count, bufferSize, cancel);
32+
33+
/// <summary>Asynchronously reads the given number of bytes from the source stream and writes them using pipe writer.</summary>
34+
/// <returns>A task that represents the asynchronous copy operation.</returns>
35+
/// <param name="source">The stream from which the contents will be copied.</param>
36+
/// <param name="writer">The PipeWriter to which the contents of the current stream will be copied.</param>
37+
/// <param name="count">The count of bytes to be copied.</param>
38+
/// <param name="cancel">The token to monitor for cancellation requests. The default value is <see cref="P:System.Threading.CancellationToken.None" />.</param>
39+
public static Task CopyToAsync(Stream source, PipeWriter writer, long? count, CancellationToken cancel)
40+
=> StreamCopyOperationInternal.CopyToAsync(source, writer, count, cancel);
3141
}
3242
}

src/Http/Http/src/SendFileFallback.cs

Lines changed: 28 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
using System;
55
using System.IO;
6+
using System.IO.Pipelines;
67
using System.Threading;
78
using System.Threading.Tasks;
89

@@ -20,6 +21,31 @@ public static class SendFileFallback
2021
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to abort the transmission.</param>
2122
/// <returns></returns>
2223
public static async Task SendFileAsync(Stream destination, string filePath, long offset, long? count, CancellationToken cancellationToken)
24+
{
25+
await using FileStream fileStream = GetFileStream(filePath, offset, count, cancellationToken);
26+
27+
fileStream.Seek(offset, SeekOrigin.Begin);
28+
await StreamCopyOperationInternal.CopyToAsync(fileStream, destination, count, cancellationToken);
29+
}
30+
31+
/// <summary>
32+
/// Write the segment of the file using pipe writer.
33+
/// </summary>
34+
/// <param name="writer"></param>
35+
/// <param name="filePath">The full disk path to the file.</param>
36+
/// <param name="offset">The offset in the file to start at.</param>
37+
/// <param name="count">The number of bytes to send, or null to send the remainder of the file.</param>
38+
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to abort the transmission.</param>
39+
/// <returns></returns>
40+
public static async Task SendFileAsync(PipeWriter writer, string filePath, long offset, long? count, CancellationToken cancellationToken)
41+
{
42+
await using FileStream fileStream = GetFileStream(filePath, offset, count, cancellationToken);
43+
44+
fileStream.Seek(offset, SeekOrigin.Begin);
45+
await StreamCopyOperationInternal.CopyToAsync(fileStream, writer, count, cancellationToken);
46+
}
47+
48+
private static FileStream GetFileStream(string filePath, long offset, long? count, CancellationToken cancellationToken)
2349
{
2450
var fileInfo = new FileInfo(filePath);
2551
if (offset < 0 || offset > fileInfo.Length)
@@ -34,21 +60,15 @@ public static async Task SendFileAsync(Stream destination, string filePath, long
3460

3561
cancellationToken.ThrowIfCancellationRequested();
3662

37-
int bufferSize = 1024 * 16;
63+
int bufferSize = 1;
3864

39-
var fileStream = new FileStream(
65+
return new FileStream(
4066
filePath,
4167
FileMode.Open,
4268
FileAccess.Read,
4369
FileShare.ReadWrite,
4470
bufferSize: bufferSize,
4571
options: FileOptions.Asynchronous | FileOptions.SequentialScan);
46-
47-
using (fileStream)
48-
{
49-
fileStream.Seek(offset, SeekOrigin.Begin);
50-
await StreamCopyOperationInternal.CopyToAsync(fileStream, destination, count, bufferSize, cancellationToken);
51-
}
5272
}
5373
}
5474
}

src/Http/Shared/StreamCopyOperationInternal.cs

Lines changed: 66 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
using System.Buffers;
66
using System.Diagnostics;
77
using System.IO;
8+
using System.IO.Pipelines;
89
using System.Threading;
910
using System.Threading.Tasks;
1011

@@ -13,7 +14,7 @@ namespace Microsoft.AspNetCore.Http
1314
// FYI: In most cases the source will be a FileStream and the destination will be to the network.
1415
internal static class StreamCopyOperationInternal
1516
{
16-
private const int DefaultBufferSize = 4096;
17+
private const int DefaultBufferSize = 4 * 1024;
1718

1819
/// <summary>Asynchronously reads the given number of bytes from the source stream and writes them to another stream.</summary>
1920
/// <returns>A task that represents the asynchronous copy operation.</returns>
@@ -42,13 +43,13 @@ public static async Task CopyToAsync(Stream source, Stream destination, long? co
4243
{
4344
Debug.Assert(source != null);
4445
Debug.Assert(destination != null);
45-
Debug.Assert(!bytesRemaining.HasValue || bytesRemaining.GetValueOrDefault() >= 0);
46+
Debug.Assert(!bytesRemaining.HasValue || bytesRemaining.Value >= 0);
4647
Debug.Assert(buffer != null);
4748

4849
while (true)
4950
{
5051
// The natural end of the range.
51-
if (bytesRemaining.HasValue && bytesRemaining.GetValueOrDefault() <= 0)
52+
if (bytesRemaining.HasValue && bytesRemaining.Value <= 0)
5253
{
5354
return;
5455
}
@@ -58,7 +59,7 @@ public static async Task CopyToAsync(Stream source, Stream destination, long? co
5859
int readLength = buffer.Length;
5960
if (bytesRemaining.HasValue)
6061
{
61-
readLength = (int)Math.Min(bytesRemaining.GetValueOrDefault(), (long)readLength);
62+
readLength = (int)Math.Min(bytesRemaining.Value, (long)readLength);
6263
}
6364
int read = await source.ReadAsync(buffer, 0, readLength, cancel);
6465

@@ -83,5 +84,66 @@ public static async Task CopyToAsync(Stream source, Stream destination, long? co
8384
ArrayPool<byte>.Shared.Return(buffer);
8485
}
8586
}
87+
88+
/// <summary>Asynchronously reads the given number of bytes from the source stream and writes them using pipe writer.</summary>
89+
/// <returns>A task that represents the asynchronous copy operation.</returns>
90+
/// <param name="source">The stream from which the contents will be copied.</param>
91+
/// <param name="writer">The PipeWriter to which the contents of the current stream will be copied.</param>
92+
/// <param name="count">The count of bytes to be copied.</param>
93+
/// <param name="cancel">The token to monitor for cancellation requests.</param>
94+
public static Task CopyToAsync(Stream source, PipeWriter writer, long? count, CancellationToken cancel)
95+
{
96+
if (count == null)
97+
{
98+
// No length, do a copy with the default buffer size (based on whatever the pipe settings are, default is 4K)
99+
return source.CopyToAsync(writer, cancel);
100+
}
101+
102+
static async Task CopyToAsync(Stream source, PipeWriter writer, long bytesRemaining, CancellationToken cancel)
103+
{
104+
// The array pool likes powers of 2
105+
const int maxBufferSize = 64 * 1024;
106+
const int minBufferSize = 1024;
107+
108+
// We know exactly how much we're going to copy
109+
while (bytesRemaining > 0)
110+
{
111+
var bufferSize = (int)Math.Clamp(bytesRemaining, minBufferSize, maxBufferSize);
112+
113+
// The natural end of the range.
114+
var memory = writer.GetMemory(bufferSize);
115+
116+
if (memory.Length > bytesRemaining)
117+
{
118+
memory = memory.Slice(0, (int)bytesRemaining);
119+
}
120+
121+
var read = await source.ReadAsync(memory, cancel);
122+
123+
bytesRemaining -= read;
124+
125+
// End of the source stream.
126+
if (read == 0)
127+
{
128+
break;
129+
}
130+
131+
writer.Advance(read);
132+
133+
var result = await writer.FlushAsync(cancel);
134+
135+
if (result.IsCompleted)
136+
{
137+
break;
138+
}
139+
}
140+
}
141+
142+
Debug.Assert(source != null);
143+
Debug.Assert(writer != null);
144+
Debug.Assert(count >= 0);
145+
146+
return CopyToAsync(source, writer, count.Value, cancel);
147+
}
86148
}
87149
}
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
@ECHO OFF
2+
3+
%~dp0..\..\..\startvs.cmd %~dp0StaticFiles.slnf

src/Mvc/Mvc.Core/src/Infrastructure/FileResultExecutorBase.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -360,19 +360,19 @@ protected static ILogger CreateLogger<T>(ILoggerFactory factory)
360360

361361
protected static async Task WriteFileAsync(HttpContext context, Stream fileStream, RangeItemHeaderValue range, long rangeLength)
362362
{
363-
var outputStream = context.Response.Body;
364-
using (fileStream)
363+
var outputPipeWriter = context.Response.BodyWriter;
364+
await using (fileStream)
365365
{
366366
try
367367
{
368368
if (range == null)
369369
{
370-
await StreamCopyOperation.CopyToAsync(fileStream, outputStream, count: null, bufferSize: BufferSize, cancel: context.RequestAborted);
370+
await StreamCopyOperation.CopyToAsync(fileStream, outputPipeWriter, count: null, cancel: context.RequestAborted);
371371
}
372372
else
373373
{
374374
fileStream.Seek(range.From.Value, SeekOrigin.Begin);
375-
await StreamCopyOperation.CopyToAsync(fileStream, outputStream, rangeLength, BufferSize, context.RequestAborted);
375+
await StreamCopyOperation.CopyToAsync(fileStream, outputPipeWriter, rangeLength, context.RequestAborted);
376376
}
377377
}
378378
catch (OperationCanceledException)

src/Servers/Kestrel/Core/src/Internal/Http/Http1OutputProducer.cs

Lines changed: 23 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ internal class Http1OutputProducer : IHttpOutputProducer, IDisposable
7070
// These should be cleared by the end of the request
7171
private List<CompletedBuffer> _completedSegments;
7272
private Memory<byte> _currentSegment;
73-
private IMemoryOwner<byte> _currentSegmentOwner;
73+
private object _currentSegmentOwner;
7474
private int _position;
7575
private bool _startCalled;
7676

@@ -436,7 +436,15 @@ public void Dispose()
436436

437437
private void DisposeCurrentSegment()
438438
{
439-
_currentSegmentOwner?.Dispose();
439+
if (_currentSegmentOwner is IMemoryOwner<byte> owner)
440+
{
441+
owner.Dispose();
442+
}
443+
else if (_currentSegmentOwner is byte[] array)
444+
{
445+
ArrayPool<byte>.Shared.Return(array);
446+
}
447+
440448
_currentSegmentOwner = null;
441449
_currentSegment = default;
442450
}
@@ -600,13 +608,13 @@ private Memory<byte> GetChunkedMemory(int sizeHint)
600608
// Calculating ChunkWriter.GetBeginChunkByteCount isn't free, so instead, we can add
601609
// the max length for the prefix and suffix and add it to the sizeHint.
602610
// This still guarantees that the memory passed in will be larger than the sizeHint.
603-
sizeHint += MaxBeginChunkLength + EndChunkLength;
611+
sizeHint += MaxBeginChunkLength + EndChunkLength;
604612
UpdateCurrentChunkMemory(sizeHint);
605613
}
606614
// Check if we need to allocate a new memory.
607615
else if (_advancedBytesForChunk >= _currentChunkMemory.Length - _currentMemoryPrefixBytes - EndChunkLength - sizeHint && _advancedBytesForChunk > 0)
608616
{
609-
sizeHint += MaxBeginChunkLength + EndChunkLength;
617+
sizeHint += MaxBeginChunkLength + EndChunkLength;
610618
var writer = new BufferWriter<PipeWriter>(_pipeWriter);
611619
WriteCurrentChunkMemoryToPipeWriter(ref writer);
612620
writer.Commit();
@@ -632,7 +640,7 @@ private void UpdateCurrentChunkMemory(int sizeHint)
632640
}
633641
_currentChunkMemoryUpdated = true;
634642
}
635-
643+
636644
private void WriteCurrentChunkMemoryToPipeWriter(ref BufferWriter<PipeWriter> writer)
637645
{
638646
Debug.Assert(_advancedBytesForChunk <= _currentChunkMemory.Length);
@@ -714,8 +722,8 @@ private void AddSegment(int sizeHint = 0)
714722
}
715723
else
716724
{
717-
_currentSegment = new byte[sizeHint];
718-
_currentSegmentOwner = null;
725+
_currentSegment = ArrayPool<byte>.Shared.Rent(sizeHint);
726+
_currentSegmentOwner = _currentSegment;
719727
}
720728

721729
_position = 0;
@@ -741,14 +749,14 @@ private static void ThrowSuffixSent()
741749
/// </summary>
742750
private readonly struct CompletedBuffer
743751
{
744-
private readonly IMemoryOwner<byte> _memoryOwner;
752+
private readonly object _memoryOwner;
745753

746754
public Memory<byte> Buffer { get; }
747755
public int Length { get; }
748756

749757
public ReadOnlySpan<byte> Span => Buffer.Span.Slice(0, Length);
750758

751-
public CompletedBuffer(IMemoryOwner<byte> owner, Memory<byte> buffer, int length)
759+
public CompletedBuffer(object owner, Memory<byte> buffer, int length)
752760
{
753761
_memoryOwner = owner;
754762

@@ -758,9 +766,13 @@ public CompletedBuffer(IMemoryOwner<byte> owner, Memory<byte> buffer, int length
758766

759767
public void Return()
760768
{
761-
if (_memoryOwner != null)
769+
if (_memoryOwner is IMemoryOwner<byte> owner)
770+
{
771+
owner.Dispose();
772+
}
773+
else if (_memoryOwner is byte[] array)
762774
{
763-
_memoryOwner.Dispose();
775+
ArrayPool<byte>.Shared.Return(array);
764776
}
765777
}
766778
}

src/Servers/Kestrel/Core/src/Internal/Http/HttpProtocol.FeatureCollection.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -329,7 +329,7 @@ void IHttpResponseBodyFeature.DisableBuffering()
329329

330330
Task IHttpResponseBodyFeature.SendFileAsync(string path, long offset, long? count, CancellationToken cancellation)
331331
{
332-
return SendFileFallback.SendFileAsync(ResponseBody, path, offset, count, cancellation);
332+
return SendFileAsync(path, offset, count, cancellation);
333333
}
334334

335335
Task IHttpResponseBodyFeature.CompleteAsync()

0 commit comments

Comments
 (0)