Skip to content

Commit

Permalink
New IBufferWriter<byte>.AsStream() extension (#3522)
Browse files Browse the repository at this point in the history
## PR Type
What kind of change does this PR introduce?
<!-- Please uncomment one or more that apply to this PR. -->

<!-- - Bugfix -->
- Feature
<!-- - Code style update (formatting) -->
<!-- - Refactoring (no functional changes, no api changes) -->
<!-- - Build or CI related changes -->
<!-- - Documentation content changes -->
<!-- - Sample app changes -->
<!-- - Other... Please describe: -->


## What is the current behavior?
<!-- Please describe the current behavior that you are modifying, or link to a relevant issue. -->
There is currently no way to interoperate between the `IBufferWriter<T>` interface and the `Stream` class. Many APIs in the BCL and in 3rd party libraries use `Stream` as the standard way to accept an instance that can be written to or read from, and there is no built-in way to have a memory stream that is also using memory pooling, because none of the types in the BCL and in the `HighPerformance` package currently support both features at the same time. This PR fixes that 😄🚀

Consider this example that I saw from a user in the C# Discord server:
```csharp
public byte[] Compress(byte[] source)
{
    MemoryStream output = new MemoryStream();
    using (DeflateStream dstream = new DeflateStream(output, CompressionLevel.Optimal))
    {
        dstream.Write(source, 0, source.Length);
    }
    return output.ToArray();
}

public byte[] Decompress(byte[] source)
{
    MemoryStream input = new MemoryStream(source);
    MemoryStream output = new MemoryStream();
    using (DeflateStream dstream = new DeflateStream(input, CompressionMode.Decompress))
    {
        dstream.CopyTo(output);
    }
    return output.ToArray();
}
```

You can see how the code is very memory inefficient: the `MemoryStream` type will just `new`-up arrays as it goes, and at the end `ToArray()` is used too, which will duplicate the arrays too. Even by removing that, the main issue within `MemoryStream` remains. With the new extension introduced in this PR, these two APIs can be rewritten much more efficiently, like this:

```csharp
public IMemoryOwner<byte> Compress(ReadOnlySpan<byte> span)
{
    ArrayPoolBufferWriter<byte> bufferWriter = new ArrayPoolBufferWriter<byte>();

    using DeflateStream deflateStream = new DeflateStream(bufferWriter.AsStream(), CompressionLevel.Optimal);

    deflateStream.Write(span);

    return bufferWriter;
}

public IMemoryOwner<byte> Decompress(ReadOnlyMemory<byte> memory)
{
    ArrayPoolBufferWriter<byte> bufferWriter = new ArrayPoolBufferWriter<byte>(memory.Length);

    using DeflateStream deflateStream = new DeflateStream(memory.AsStream(), CompressionMode.Decompress);

    deflateStream.CopyTo(bufferWriter.AsStream());

    return bufferWriter;
}
```

Which heavily leverages all the various APIs and helpers in the `HighPerformance` package, and gives us the following results:

| Method | Categories |        Mean |     Error |    StdDev | Ratio |    Gen 0 |    Gen 1 |    Gen 2 | Allocated |
|------- |----------- |------------:|----------:|----------:|------:|---------:|---------:|---------:|----------:|
|  new[] |   COMPRESS | 29,923.5 us | 174.19 us | 162.94 us |  1.00 | 312.5000 | 312.5000 | 312.5000 | 3089853 B |
|   **pool** |   COMPRESS | **29,116.0 us** | 120.55 us | 106.87 us |  **0.97** |        - |        - |        - |     **297 B** |
|        |            |             |           |           |       |          |          |          |           |
|  new[] | DECOMPRESS |    832.9 us |   9.96 us |   8.83 us |  1.00 | 337.8906 | 336.9141 | 336.9141 | 2966680 B |
|   **pool** | DECOMPRESS |    **119.6 us** |   0.70 us |   0.62 us |  **0.14** |        - |        - |        - |     **392 B** |

This benchmark compresses and decompresses a 1MB buffer, using the two methods detailed above.
You can see the vastly reduced memory allocations using the pooled writer backed stream 🚀

## What is the new behavior?
<!-- Describe how was this issue resolved or changed? -->
This PR introduces this new extension:

```csharp
namespace Microsoft.Toolkit.HighPerformance.Extensions
{
    public static class ArrayPoolBufferWriterExtensions
    {
        public static Stream AsStream(this ArrayPoolBufferWriter<byte> writer);
    }

    public static class IBufferWriterExtensions
    {
        public static Stream AsStream(this IBufferWriter<byte> writer);
    }
}
```

Which helps to interoperate between the `IBufferWriter<T>` interface and the `Stream` class. In particular, since the `HighPerformance` package includes the `ArrayPoolBufferWriter<T>` type, this extension allows users to use that as a `Stream`, and then keep working with the resulting `ReadOnlyMemory<T>` produced by that type, as shown above.

## PR Checklist

Please check if your PR fulfills the following requirements:

- [X] Tested code with current [supported SDKs](../readme.md#supported)
- [ ] ~~Pull Request has been submitted to the documentation repository [instructions](..\contributing.md#docs). Link: <!-- docs PR link -->~~
- [ ] ~~Sample in sample app has been added / updated (for bug fixes / features)~~
    - [ ] ~~Icon has been created (if new sample) following the [Thumbnail Style Guide and templates](https://github.com/windows-toolkit/WindowsCommunityToolkit-design-assets)~~
- [X] Tests for the changes have been added (for bug fixes / features) (if applicable)
- [X] Header has been added to all new source files (run *build/UpdateHeaders.bat*)
- [X] Contains **NO** breaking changes
  • Loading branch information
msftbot[bot] authored Nov 14, 2020
2 parents 15edbed + 4600e42 commit a778ffc
Show file tree
Hide file tree
Showing 14 changed files with 628 additions and 38 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.

using System.Diagnostics.Contracts;
using System.IO;
using System.Runtime.CompilerServices;
using Microsoft.Toolkit.HighPerformance.Buffers;
using Microsoft.Toolkit.HighPerformance.Streams;
using Microsoft.Toolkit.HighPerformance.Streams.Sources;

namespace Microsoft.Toolkit.HighPerformance.Extensions
{
/// <summary>
/// Helpers for working with the <see cref="ArrayPoolBufferWriter{T}"/> type.
/// </summary>
public static class ArrayPoolBufferWriterExtensions
{
/// <summary>
/// Returns a <see cref="Stream"/> that can be used to write to a target an <see cref="ArrayPoolBufferWriter{T}"/> of <see cref="byte"/> instance.
/// </summary>
/// <param name="writer">The target <see cref="ArrayPoolBufferWriter{T}"/> instance.</param>
/// <returns>A <see cref="Stream"/> wrapping <paramref name="writer"/> and writing data to its underlying buffer.</returns>
/// <remarks>The returned <see cref="Stream"/> can only be written to and does not support seeking.</remarks>
[Pure]
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public static Stream AsStream(this ArrayPoolBufferWriter<byte> writer)
{
return new IBufferWriterStream<ArrayBufferWriterOwner>(new ArrayBufferWriterOwner(writer));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,13 @@

using System;
using System.Buffers;
using System.Diagnostics.Contracts;
using System.IO;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
using Microsoft.Toolkit.HighPerformance.Buffers;
using Microsoft.Toolkit.HighPerformance.Streams;
using Microsoft.Toolkit.HighPerformance.Streams.Sources;

namespace Microsoft.Toolkit.HighPerformance.Extensions
{
Expand All @@ -14,6 +19,28 @@ namespace Microsoft.Toolkit.HighPerformance.Extensions
/// </summary>
public static class IBufferWriterExtensions
{
/// <summary>
/// Returns a <see cref="Stream"/> that can be used to write to a target an <see cref="IBufferWriter{T}"/> of <see cref="byte"/> instance.
/// </summary>
/// <param name="writer">The target <see cref="IBufferWriter{T}"/> instance.</param>
/// <returns>A <see cref="Stream"/> wrapping <paramref name="writer"/> and writing data to its underlying buffer.</returns>
/// <remarks>The returned <see cref="Stream"/> can only be written to and does not support seeking.</remarks>
[Pure]
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public static Stream AsStream(this IBufferWriter<byte> writer)
{
if (writer.GetType() == typeof(ArrayPoolBufferWriter<byte>))
{
// If the input writer is of type ArrayPoolBufferWriter<byte>, we can use the type
// specific buffer writer owner to let the JIT elide callvirts when accessing it.
var internalWriter = Unsafe.As<ArrayPoolBufferWriter<byte>>(writer)!;

return new IBufferWriterStream<ArrayBufferWriterOwner>(new ArrayBufferWriterOwner(internalWriter));
}

return new IBufferWriterStream<IBufferWriterOwner>(new IBufferWriterOwner(writer));
}

/// <summary>
/// Writes a value of a specified type into a target <see cref="IBufferWriter{T}"/> instance.
/// </summary>
Expand Down
18 changes: 9 additions & 9 deletions Microsoft.Toolkit.HighPerformance/Extensions/StreamExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -167,24 +167,24 @@ public static void Write(this Stream stream, ReadOnlySpan<byte> buffer)
/// <returns>The <typeparamref name="T"/> value read from <paramref name="stream"/>.</returns>
/// <exception cref="InvalidOperationException">Thrown if <paramref name="stream"/> reaches the end.</exception>
#if SPAN_RUNTIME_SUPPORT
// Avoid inlining as we're renting a stack buffer, which
// cause issues if this method was called inside a loop
[MethodImpl(MethodImplOptions.NoInlining)]
[MethodImpl(MethodImplOptions.AggressiveInlining)]
#endif
public static T Read<T>(this Stream stream)
where T : unmanaged
{
#if SPAN_RUNTIME_SUPPORT
Span<byte> span = stackalloc byte[Unsafe.SizeOf<T>()];
T result = default;
int length = Unsafe.SizeOf<T>();

if (stream.Read(span) != span.Length)
unsafe
{
ThrowInvalidOperationExceptionForEndOfStream();
if (stream.Read(new Span<byte>(&result, length)) != length)
{
ThrowInvalidOperationExceptionForEndOfStream();
}
}

ref byte r0 = ref MemoryMarshal.GetReference(span);

return Unsafe.ReadUnaligned<T>(ref r0);
return result;
#else
int length = Unsafe.SizeOf<T>();
byte[] buffer = ArrayPool<byte>.Shared.Rent(length);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.

#if SPAN_RUNTIME_SUPPORT

using System;
using System.IO;
using System.Threading;
using System.Threading.Tasks;

namespace Microsoft.Toolkit.HighPerformance.Streams
{
/// <inheritdoc cref="IBufferWriterStream{TWriter}"/>
internal sealed partial class IBufferWriterStream<TWriter>
{
/// <inheritdoc/>
public override void CopyTo(Stream destination, int bufferSize)
{
throw MemoryStream.GetNotSupportedException();
}

/// <inheritdoc/>
public override ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken = default)
{
throw MemoryStream.GetNotSupportedException();
}

/// <inheritdoc/>
public override ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken = default)
{
if (cancellationToken.IsCancellationRequested)
{
return new ValueTask(Task.FromCanceled(cancellationToken));
}

try
{
Write(buffer.Span);

return default;
}
catch (OperationCanceledException e)
{
return new ValueTask(Task.FromCanceled(e.CancellationToken));
}
catch (Exception e)
{
return new ValueTask(Task.FromException(e));
}
}

/// <inheritdoc/>
public override int Read(Span<byte> buffer)
{
throw MemoryStream.GetNotSupportedException();
}

/// <inheritdoc/>
public override void Write(ReadOnlySpan<byte> buffer)
{
MemoryStream.ValidateDisposed(this.disposed);

Span<byte> destination = this.bufferWriter.GetSpan(buffer.Length);

if (!buffer.TryCopyTo(destination))
{
MemoryStream.ThrowArgumentExceptionForEndOfStreamOnWrite();
}

this.bufferWriter.Advance(buffer.Length);
}
}
}

#endif
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.

using System;
using System.Buffers;
using System.IO;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;

namespace Microsoft.Toolkit.HighPerformance.Streams
{
/// <summary>
/// A <see cref="Stream"/> implementation wrapping an <see cref="IBufferWriter{T}"/> instance.
/// </summary>
/// <typeparam name="TWriter">The type of buffer writer to use.</typeparam>
internal sealed partial class IBufferWriterStream<TWriter> : Stream
where TWriter : struct, IBufferWriter<byte>
{
/// <summary>
/// The target <typeparamref name="TWriter"/> instance to use.
/// </summary>
private readonly TWriter bufferWriter;

/// <summary>
/// Indicates whether or not the current instance has been disposed
/// </summary>
private bool disposed;

/// <summary>
/// Initializes a new instance of the <see cref="IBufferWriterStream{TWriter}"/> class.
/// </summary>
/// <param name="bufferWriter">The target <see cref="IBufferWriter{T}"/> instance to use.</param>
public IBufferWriterStream(TWriter bufferWriter)
{
this.bufferWriter = bufferWriter;
}

/// <inheritdoc/>
public override bool CanRead => false;

/// <inheritdoc/>
public override bool CanSeek => false;

/// <inheritdoc/>
public override bool CanWrite
{
[MethodImpl(MethodImplOptions.AggressiveInlining)]
get => !this.disposed;
}

/// <inheritdoc/>
public override long Length => throw MemoryStream.GetNotSupportedException();

/// <inheritdoc/>
public override long Position
{
get => throw MemoryStream.GetNotSupportedException();
set => throw MemoryStream.GetNotSupportedException();
}

/// <inheritdoc/>
public override Task CopyToAsync(Stream destination, int bufferSize, CancellationToken cancellationToken)
{
throw MemoryStream.GetNotSupportedException();
}

/// <inheritdoc/>
public override void Flush()
{
}

/// <inheritdoc/>
public override Task FlushAsync(CancellationToken cancellationToken)
{
if (cancellationToken.IsCancellationRequested)
{
return Task.FromCanceled(cancellationToken);
}

return Task.CompletedTask;
}

/// <inheritdoc/>
public override Task<int> ReadAsync(byte[]? buffer, int offset, int count, CancellationToken cancellationToken)
{
throw MemoryStream.GetNotSupportedException();
}

/// <inheritdoc/>
public override Task WriteAsync(byte[]? buffer, int offset, int count, CancellationToken cancellationToken)
{
if (cancellationToken.IsCancellationRequested)
{
return Task.FromCanceled(cancellationToken);
}

try
{
Write(buffer, offset, count);

return Task.CompletedTask;
}
catch (OperationCanceledException e)
{
return Task.FromCanceled(e.CancellationToken);
}
catch (Exception e)
{
return Task.FromException(e);
}
}

/// <inheritdoc/>
public override long Seek(long offset, SeekOrigin origin)
{
throw MemoryStream.GetNotSupportedException();
}

/// <inheritdoc/>
public override void SetLength(long value)
{
throw MemoryStream.GetNotSupportedException();
}

/// <inheritdoc/>
public override int Read(byte[]? buffer, int offset, int count)
{
throw MemoryStream.GetNotSupportedException();
}

/// <inheritdoc/>
public override int ReadByte()
{
throw MemoryStream.GetNotSupportedException();
}

/// <inheritdoc/>
public override void Write(byte[]? buffer, int offset, int count)
{
MemoryStream.ValidateDisposed(this.disposed);
MemoryStream.ValidateBuffer(buffer, offset, count);

Span<byte>
source = buffer.AsSpan(offset, count),
destination = this.bufferWriter.GetSpan(count);

if (!source.TryCopyTo(destination))
{
MemoryStream.ThrowArgumentExceptionForEndOfStreamOnWrite();
}

this.bufferWriter.Advance(count);
}

/// <inheritdoc/>
public override void WriteByte(byte value)
{
MemoryStream.ValidateDisposed(this.disposed);

this.bufferWriter.GetSpan(1)[0] = value;

this.bufferWriter.Advance(1);
}

/// <inheritdoc/>
protected override void Dispose(bool disposing)
{
this.disposed = true;
}
}
}
Loading

0 comments on commit a778ffc

Please sign in to comment.