Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change PersistenceMessageSerializer base class to SerializerWithStringManifest #5002

Open
wants to merge 12 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1053,11 +1053,11 @@ namespace Akka.Persistence.Journal
namespace Akka.Persistence.Serialization
{
public interface IMessage { }
public sealed class PersistenceMessageSerializer : Akka.Serialization.Serializer
public sealed class PersistenceMessageSerializer : Akka.Serialization.SerializerWithStringManifest
{
public PersistenceMessageSerializer(Akka.Actor.ExtendedActorSystem system) { }
public override bool IncludeManifest { get; }
public override object FromBinary(byte[] bytes, System.Type type) { }
public override object FromBinary(byte[] bytes, string manifest) { }
public override string Manifest(object obj) { }
public override byte[] ToBinary(object obj) { }
}
public class PersistenceSnapshotSerializer : Akka.Serialization.Serializer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
using Akka.Persistence.Fsm;
using Akka.Persistence.Serialization;
using Akka.TestKit;
using Akka.Util;
using FluentAssertions;
using Xunit;

Expand Down Expand Up @@ -117,7 +118,7 @@ public void MessageSerializer_ToBinary_should_throw_an_exception_on_wrong_type()
};

deserializeAction.Should().Throw<SerializationException>()
.WithMessage($"Unimplemented deserialization of message with type [{typeof(string)}] in [{typeof(PersistenceMessageSerializer)}]");
.WithMessage($"Unimplemented deserialization of message with manifest [{typeof(string).TypeQualifiedName()}] in [{typeof(PersistenceMessageSerializer)}]");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,43 @@

namespace Akka.Persistence.Serialization
{
public sealed class PersistenceMessageSerializer : Serializer
public sealed class PersistenceMessageSerializer : SerializerWithStringManifest
{
private const string IPersistentRepresentationManifest = "PR";
private const string AtomicWriteManifest = "AW";
private const string AtLeastOnceDeliverySnapshotManifest = "ALODS";
private const string StateChangeEventManifest = "FSM.SCE";
private const string PersistentFSMSnapshotManifest = "FSM.PS";

// Backward compatibility constants
private const string IPersistentRepresentationManifestNetCoreManifest = "Akka.Persistence.Persistent, Akka.Persistence";
private const string AtomicWriteNetCoreManifest = "Akka.Persistence.AtomicWrite, Akka.Persistence";
private const string AtLeastOnceDeliverySnapshotNetCoreManifest = "Akka.Persistence.AtLeastOnceDeliverySnapshot, Akka.Persistence";
private const string StateChangeEventNetCoreManifest = "Akka.Persistence.Fsm.PersistentFSM+StateChangeEvent, Akka.Persistence";
private const string PersistentFSMSnapshotNetCoreManifest = "Akka.Persistence.Fsm.PersistentFSM+PersistentFSMSnapshot";


public PersistenceMessageSerializer(ExtendedActorSystem system) : base(system)
{
}

public override bool IncludeManifest { get; } = true;

public override byte[] ToBinary(object obj)
{
if (obj is IPersistentRepresentation repr) return GetPersistentMessage(repr).ToByteArray();
if (obj is AtomicWrite aw) return GetAtomicWrite(aw).ToByteArray();
if (obj is AtLeastOnceDeliverySnapshot snap) return GetAtLeastOnceDeliverySnapshot(snap).ToByteArray();
if (obj is PersistentFSM.StateChangeEvent stateEvent) return GetStateChangeEvent(stateEvent).ToByteArray();
switch (obj)
{
case IPersistentRepresentation repr:
return GetPersistentMessage(repr).ToByteArray();
case AtomicWrite aw:
return GetAtomicWrite(aw).ToByteArray();
case AtLeastOnceDeliverySnapshot snap:
return GetAtLeastOnceDeliverySnapshot(snap).ToByteArray();
case PersistentFSM.StateChangeEvent stateEvent:
return GetStateChangeEvent(stateEvent).ToByteArray();
}

if (obj.GetType().GetTypeInfo().IsGenericType
&& obj.GetType().GetGenericTypeDefinition() == typeof(PersistentFSM.PersistentFSMSnapshot<>)) return GetPersistentFSMSnapshot(obj).ToByteArray();
&& obj.GetType().GetGenericTypeDefinition() == typeof(PersistentFSM.PersistentFSMSnapshot<>))
return GetPersistentFSMSnapshot(obj).ToByteArray();

throw new ArgumentException($"Can't serialize object of type [{obj.GetType()}] in [{GetType()}]");
}
Expand Down Expand Up @@ -148,18 +169,54 @@ private PersistentFSMSnapshot GetPersistentFSMSnapshot(object obj)
return message;
}

public override object FromBinary(byte[] bytes, Type type)
public override object FromBinary(byte[] bytes, string manifest)
{
if(type == null) return GetPersistentRepresentation(PersistentMessage.Parser.ParseFrom(bytes));
if (type == typeof(Persistent)) return GetPersistentRepresentation(PersistentMessage.Parser.ParseFrom(bytes));
if (type == typeof(IPersistentRepresentation)) return GetPersistentRepresentation(PersistentMessage.Parser.ParseFrom(bytes));
if (type == typeof(AtomicWrite)) return GetAtomicWrite(bytes);
if (type == typeof(AtLeastOnceDeliverySnapshot)) return GetAtLeastOnceDeliverySnapshot(bytes);
if (type == typeof(PersistentFSM.StateChangeEvent)) return GetStateChangeEvent(bytes);
if (type.GetTypeInfo().IsGenericType
&& type.GetGenericTypeDefinition() == typeof(PersistentFSM.PersistentFSMSnapshot<>)) return GetPersistentFSMSnapshot(type, bytes);
switch (manifest)
{
case null:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch on the null

case IPersistentRepresentationManifest:
case IPersistentRepresentationManifestNetCoreManifest:
return GetPersistentRepresentation(PersistentMessage.Parser.ParseFrom(bytes));

case AtomicWriteManifest:
case AtomicWriteNetCoreManifest:
return GetAtomicWrite(bytes);

case AtLeastOnceDeliverySnapshotManifest:
case AtLeastOnceDeliverySnapshotNetCoreManifest:
return GetAtLeastOnceDeliverySnapshot(bytes);

case StateChangeEventManifest:
case StateChangeEventNetCoreManifest:
return GetStateChangeEvent(bytes);

case PersistentFSMSnapshotManifest:
case var m when m.StartsWith(PersistentFSMSnapshotNetCoreManifest):
return GetPersistentFSMSnapshot(bytes);
}

throw new SerializationException($"Unimplemented deserialization of message with type [{type}] in [{GetType()}]");
throw new SerializationException($"Unimplemented deserialization of message with manifest [{manifest}] in [{GetType()}]");
}

public override string Manifest(object obj)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the time being, we should return the legacy types - I believe there are some persistence plugins (i.e. SQL ones) that are still relying on manifest data using reflection and while I would like to eventually clean that up, probably best to change as little as possible here first.

{
switch (obj)
{
case IPersistentRepresentation _:
return IPersistentRepresentationManifest;
case AtomicWrite _:
return AtomicWriteManifest;
case AtLeastOnceDeliverySnapshot _:
return AtLeastOnceDeliverySnapshotManifest;
case PersistentFSM.StateChangeEvent _:
return StateChangeEventManifest;
}

if (obj.GetType().GetTypeInfo().IsGenericType
&& obj.GetType().GetGenericTypeDefinition() == typeof(PersistentFSM.PersistentFSMSnapshot<>))
return PersistentFSMSnapshotManifest;

throw new ArgumentException($"Can't serialize object of type [{obj.GetType()}] in [{GetType()}]. No manifest for said type is defined.");
}

private IPersistentRepresentation GetPersistentRepresentation(PersistentMessage message)
Expand Down Expand Up @@ -226,7 +283,7 @@ private PersistentFSM.StateChangeEvent GetStateChangeEvent(byte[] bytes)
return new PersistentFSM.StateChangeEvent(message.StateIdentifier, timeout);
}

private object GetPersistentFSMSnapshot(Type type, byte[] bytes)
private object GetPersistentFSMSnapshot(byte[] bytes)
{
var message = PersistentFSMSnapshot.Parser.ParseFrom(bytes);

Expand All @@ -236,10 +293,13 @@ private object GetPersistentFSMSnapshot(Type type, byte[] bytes)
timeout = TimeSpan.FromMilliseconds(message.TimeoutMillis);
}

var payload = GetPayload(message.Data);

// use reflection to create the generic type of PersistentFSM.PersistentFSMSnapshot
Type[] types = { typeof(string), type.GenericTypeArguments[0], typeof(TimeSpan?) };
object[] arguments = { message.StateIdentifier, GetPayload(message.Data), timeout };
Type[] types = { typeof(string), payload.GetType(), typeof(TimeSpan?) };
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Get the type of the payload from the payload type instead of relying on a generic passed into the method

object[] arguments = { message.StateIdentifier, payload, timeout };

var type = typeof(PersistentFSM.PersistentFSMSnapshot<>).MakeGenericType(payload.GetType());
return type.GetConstructor(types).Invoke(arguments);
}
}
Expand Down
12 changes: 9 additions & 3 deletions src/core/Akka/Util/TypeExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
using System;
using System.Collections.Concurrent;
using System.Reflection;
using System.Runtime.Serialization;
using System.Text.RegularExpressions;
using Akka.Annotations;

Expand Down Expand Up @@ -58,13 +59,18 @@ public static bool Implements(this Type type, Type moreGeneralType)
[InternalApi]
public static string TypeQualifiedName(this Type type)
{
string shortened;
if (ShortenedTypeNames.TryGetValue(type, out shortened))
if (ShortenedTypeNames.TryGetValue(type, out var shortened))
{
return shortened;
}

shortened = cleanAssemblyVersionRegex.Replace(type.AssemblyQualifiedName, string.Empty);
// Defensive coding. `type.AssemblyQualifiedName` can return null if type is of generic type parameter type
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

shortened = type.AssemblyQualifiedName;
if (shortened == null)
throw new SerializationException(
$"Could not get a type qualified name for type [{type}], most likely because it is not a concrete type, but a generic type parameter type.");

shortened = cleanAssemblyVersionRegex.Replace(shortened, string.Empty);
ShortenedTypeNames.TryAdd(type, shortened);

return shortened;
Expand Down