Skip to content

Commit

Permalink
MessagePack codec (#8546)
Browse files Browse the repository at this point in the history
* MsgPack codec

---------

Co-authored-by: Nikolai Sidorov <nsidorov@microsoft.com>
Co-authored-by: ReubenBond <rebond@microsoft.com>
Co-authored-by: Reuben Bond <reuben.bond@gmail.com>
  • Loading branch information
4 people authored Jul 10, 2024
1 parent a85e337 commit e033fbd
Show file tree
Hide file tree
Showing 9 changed files with 533 additions and 2 deletions.
2 changes: 1 addition & 1 deletion Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@
<PackageVersion Include="Microsoft.Extensions.Configuration.AzureKeyVault" Version="3.1.24" />
<PackageVersion Include="System.CommandLine" Version="2.0.0-beta1.21308.1" />
<PackageVersion Include="Microsoft.Crank.EventSources" Version="0.2.0-alpha.23422.5" />
<PackageVersion Include="MessagePack" Version="2.5.124" />
<PackageVersion Include="MessagePack" Version="2.5.168" />
<PackageVersion Include="ZeroFormatter" Version="1.6.4" />
<PackageVersion Include="Utf8Json" Version="1.3.7" />
<PackageVersion Include="SpanJson" Version="4.0.1" />
Expand Down
7 changes: 7 additions & 0 deletions Orleans.sln
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DashboardToy.AppHost", "pla
EndProject
Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "Orleans.Serialization.FSharp.Tests", "test\Orleans.Serialization.FSharp.Tests\Orleans.Serialization.FSharp.Tests.fsproj", "{B2D53D3C-E44A-4C9B-AAEE-28FB8C1BDF62}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Orleans.Serialization.MessagePack", "src\Orleans.Serialization.MessagePack\Orleans.Serialization.MessagePack.csproj", "{F50F81B6-E9B5-4143-B66B-A1AD913F6E9C}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -622,6 +624,10 @@ Global
{B2D53D3C-E44A-4C9B-AAEE-28FB8C1BDF62}.Debug|Any CPU.Build.0 = Debug|Any CPU
{B2D53D3C-E44A-4C9B-AAEE-28FB8C1BDF62}.Release|Any CPU.ActiveCfg = Release|Any CPU
{B2D53D3C-E44A-4C9B-AAEE-28FB8C1BDF62}.Release|Any CPU.Build.0 = Release|Any CPU
{F50F81B6-E9B5-4143-B66B-A1AD913F6E9C}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{F50F81B6-E9B5-4143-B66B-A1AD913F6E9C}.Debug|Any CPU.Build.0 = Debug|Any CPU
{F50F81B6-E9B5-4143-B66B-A1AD913F6E9C}.Release|Any CPU.ActiveCfg = Release|Any CPU
{F50F81B6-E9B5-4143-B66B-A1AD913F6E9C}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down Expand Up @@ -737,6 +743,7 @@ Global
{C4DD4F96-3EC6-47C6-97AA-9B14F0F2099B} = {316CDCC7-323F-4264-9FC9-667662BB1F80}
{84B44F1D-B7FE-40E3-82F0-730A55AC8613} = {316CDCC7-323F-4264-9FC9-667662BB1F80}
{B2D53D3C-E44A-4C9B-AAEE-28FB8C1BDF62} = {A6573187-FD0D-4DF7-91D1-03E07E470C0A}
{F50F81B6-E9B5-4143-B66B-A1AD913F6E9C} = {4CD3AA9E-D937-48CA-BB6C-158E12257D23}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {7BFB3429-B5BB-4DB1-95B4-67D77A864952}
Expand Down
256 changes: 256 additions & 0 deletions src/Orleans.Serialization.MessagePack/MessagePackCodec.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,256 @@
using System;
using System.Buffers;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
using System.Runtime.Serialization;
using MessagePack;
using Microsoft.Extensions.Options;
using Orleans.Serialization.Buffers;
using Orleans.Serialization.Buffers.Adaptors;
using Orleans.Serialization.Cloning;
using Orleans.Serialization.Codecs;
using Orleans.Serialization.Serializers;
using Orleans.Serialization.WireProtocol;

namespace Orleans.Serialization;

/// <summary>
/// A serialization codec which uses <see cref="MessagePackSerializer"/>.
/// </summary>
/// <remarks>
/// MessagePack codec performs slightly worse than default Orleans serializer, if performance is critical for your application, consider using default serialization.
/// </remarks>
[Alias(WellKnownAlias)]
public class MessagePackCodec : IGeneralizedCodec, IGeneralizedCopier, ITypeFilter
{
private static readonly ConcurrentDictionary<Type, bool> SupportedTypes = new();

private static readonly Type SelfType = typeof(MessagePackCodec);

private readonly ICodecSelector[] _serializableTypeSelectors;
private readonly ICopierSelector[] _copyableTypeSelectors;
private readonly MessagePackCodecOptions _options;

/// <summary>
/// The well-known type alias for this codec.
/// </summary>
public const string WellKnownAlias = "msgpack";

/// <summary>
/// Initializes a new instance of the <see cref="MessagePackCodec"/> class.
/// </summary>
/// /// <param name="serializableTypeSelectors">Filters used to indicate which types should be serialized by this codec.</param>
/// <param name="copyableTypeSelectors">Filters used to indicate which types should be copied by this codec.</param>
/// <param name="options">The MessagePack codec options.</param>
public MessagePackCodec(
IEnumerable<ICodecSelector> serializableTypeSelectors,
IEnumerable<ICopierSelector> copyableTypeSelectors,
IOptions<MessagePackCodecOptions> options)
{
_serializableTypeSelectors = serializableTypeSelectors.Where(t => string.Equals(t.CodecName, WellKnownAlias, StringComparison.Ordinal)).ToArray();
_copyableTypeSelectors = copyableTypeSelectors.Where(t => string.Equals(t.CopierName, WellKnownAlias, StringComparison.Ordinal)).ToArray();
_options = options.Value;
}

/// <inheritdoc/>
void IFieldCodec.WriteField<TBufferWriter>(ref Writer<TBufferWriter> writer, uint fieldIdDelta, Type expectedType, object value)
{
if (ReferenceCodec.TryWriteReferenceField(ref writer, fieldIdDelta, expectedType, value))
{
return;
}

// The schema type when serializing the field is the type of the codec.
writer.WriteFieldHeader(fieldIdDelta, expectedType, SelfType, WireType.TagDelimited);

// Write the type name
ReferenceCodec.MarkValueField(writer.Session);
writer.WriteFieldHeaderExpected(0, WireType.LengthPrefixed);
writer.Session.TypeCodec.WriteLengthPrefixed(ref writer, value.GetType());

var bufferWriter = new BufferWriterBox<PooledBuffer>(new());
try
{

var msgPackWriter = new MessagePackWriter(bufferWriter);
MessagePackSerializer.Serialize(expectedType ?? value.GetType(), ref msgPackWriter, value, _options.SerializerOptions);
msgPackWriter.Flush();

ReferenceCodec.MarkValueField(writer.Session);
writer.WriteFieldHeaderExpected(1, WireType.LengthPrefixed);
writer.WriteVarUInt32((uint)bufferWriter.Value.Length);
bufferWriter.Value.CopyTo(ref writer);
}
finally
{
bufferWriter.Value.Dispose();
}

writer.WriteEndObject();
}

/// <inheritdoc/>
object IFieldCodec.ReadValue<TInput>(ref Reader<TInput> reader, Field field)
{
if (field.IsReference)
{
return ReferenceCodec.ReadReference(ref reader, field.FieldType);
}

field.EnsureWireTypeTagDelimited();

var placeholderReferenceId = ReferenceCodec.CreateRecordPlaceholder(reader.Session);
object result = null;
Type type = null;
uint fieldId = 0;
while (true)
{
var header = reader.ReadFieldHeader();
if (header.IsEndBaseOrEndObject)
{
break;
}

fieldId += header.FieldIdDelta;
switch (fieldId)
{
case 0:
ReferenceCodec.MarkValueField(reader.Session);
type = reader.Session.TypeCodec.ReadLengthPrefixed(ref reader);
break;
case 1:
if (type is null)
{
ThrowTypeFieldMissing();
}

ReferenceCodec.MarkValueField(reader.Session);
var length = reader.ReadVarUInt32();

var bufferWriter = new BufferWriterBox<PooledBuffer>(new());
try
{
reader.ReadBytes(ref bufferWriter, (int)length);
result = MessagePackSerializer.Deserialize(type, bufferWriter.Value.AsReadOnlySequence(), _options.SerializerOptions);
}
finally
{
bufferWriter.Value.Dispose();
}

break;
default:
reader.ConsumeUnknownField(header);
break;
}
}

ReferenceCodec.RecordObject(reader.Session, result, placeholderReferenceId);
return result;
}

/// <inheritdoc/>
bool IGeneralizedCodec.IsSupportedType(Type type)
{
if (type == SelfType)
{
return true;
}

if (CommonCodecTypeFilter.IsAbstractOrFrameworkType(type))
{
return false;
}

foreach (var selector in _serializableTypeSelectors)
{
if (selector.IsSupportedType(type))
{
return true;
}
}

if (_options.IsSerializableType?.Invoke(type) is bool value)
{
return value;
}

return IsMessagePackContract(type, _options.AllowDataContractAttributes);
}

/// <inheritdoc/>
object IDeepCopier.DeepCopy(object input, CopyContext context)
{
if (context.TryGetCopy(input, out object result))
{
return result;
}

var bufferWriter = new BufferWriterBox<PooledBuffer>(new());
try
{
var msgPackWriter = new MessagePackWriter(bufferWriter);
MessagePackSerializer.Serialize(input.GetType(), ref msgPackWriter, input, _options.SerializerOptions);
msgPackWriter.Flush();

var sequence = bufferWriter.Value.AsReadOnlySequence();
result = MessagePackSerializer.Deserialize(input.GetType(), sequence, _options.SerializerOptions);
}
catch
{
bufferWriter.Value.Dispose();
}

context.RecordCopy(input, result);
return result;
}

/// <inheritdoc/>
bool IGeneralizedCopier.IsSupportedType(Type type)
{
if (CommonCodecTypeFilter.IsAbstractOrFrameworkType(type))
{
return false;
}

foreach (var selector in _copyableTypeSelectors)
{
if (selector.IsSupportedType(type))
{
return true;
}
}

if (_options.IsCopyableType?.Invoke(type) is bool value)
{
return value;
}

return IsMessagePackContract(type, _options.AllowDataContractAttributes);
}

/// <inheritdoc/>
bool? ITypeFilter.IsTypeAllowed(Type type) => (((IGeneralizedCopier)this).IsSupportedType(type) || ((IGeneralizedCodec)this).IsSupportedType(type)) ? true : null;

private static bool IsMessagePackContract(Type type, bool allowDataContractAttribute)
{
if (SupportedTypes.TryGetValue(type, out bool isMsgPackContract))
{
return isMsgPackContract;
}

isMsgPackContract = type.GetCustomAttribute<MessagePackObjectAttribute>() is not null;

if (!isMsgPackContract && allowDataContractAttribute)
{
isMsgPackContract = type.GetCustomAttribute<DataContractAttribute>() is DataContractAttribute;
}

SupportedTypes.TryAdd(type, isMsgPackContract);
return isMsgPackContract;
}

private static void ThrowTypeFieldMissing() => throw new RequiredFieldMissingException("Serialized value is missing its type field.");
}
31 changes: 31 additions & 0 deletions src/Orleans.Serialization.MessagePack/MessagePackCodecOptions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
using System;
using System.Runtime.Serialization;
using MessagePack;

namespace Orleans.Serialization;

/// <summary>
/// Options for <see cref="MessagePackCodec"/>.
/// </summary>
public class MessagePackCodecOptions
{
/// <summary>
/// Gets or sets the <see cref="MessagePackSerializerOptions"/>.
/// </summary>
public MessagePackSerializerOptions SerializerOptions { get; set; } = MessagePackSerializerOptions.Standard;

/// <summary>
/// Get or sets flag that allows the use of <see cref="DataContractAttribute"/> marked contracts for MessagePackSerializer.
/// </summary>
public bool AllowDataContractAttributes { get; set; }

/// <summary>
/// Gets or sets a delegate used to determine if a type is supported by the MessagePack serializer for serialization and deserialization.
/// </summary>
public Func<Type, bool?> IsSerializableType { get; set; }

/// <summary>
/// Gets or sets a delegate used to determine if a type is supported by the MessagePack serializer for copying.
/// </summary>
public Func<Type, bool?> IsCopyableType { get; set; }
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<PackageId>Microsoft.Orleans.Serialization.MessagePack</PackageId>
<TargetFrameworks>$(DefaultTargetFrameworks);netstandard2.1</TargetFrameworks>
<PackageDescription>MessagePack integration for Orleans.Serialization</PackageDescription>
<OrleansBuildTimeCodeGen>true</OrleansBuildTimeCodeGen>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="MessagePack" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\Orleans.Serialization\Orleans.Serialization.csproj" />
</ItemGroup>

</Project>
Loading

0 comments on commit e033fbd

Please sign in to comment.