-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Change PersistenceMessageSerializer base class to SerializerWithStringManifest #5002
base: dev
Are you sure you want to change the base?
Changes from 6 commits
d6826d4
a177bec
c60dc22
590c1d3
6d5269f
d20b2a4
705c854
186e539
9755866
b568622
4ea1bc2
1d11fce
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,22 +19,43 @@ | |
|
||
namespace Akka.Persistence.Serialization | ||
{ | ||
public sealed class PersistenceMessageSerializer : Serializer | ||
public sealed class PersistenceMessageSerializer : SerializerWithStringManifest | ||
{ | ||
private const string IPersistentRepresentationManifest = "PR"; | ||
private const string AtomicWriteManifest = "AW"; | ||
private const string AtLeastOnceDeliverySnapshotManifest = "ALODS"; | ||
private const string StateChangeEventManifest = "FSM.SCE"; | ||
private const string PersistentFSMSnapshotManifest = "FSM.PS"; | ||
|
||
// Backward compatibility constants | ||
private const string IPersistentRepresentationManifestNetCoreManifest = "Akka.Persistence.Persistent, Akka.Persistence"; | ||
private const string AtomicWriteNetCoreManifest = "Akka.Persistence.AtomicWrite, Akka.Persistence"; | ||
private const string AtLeastOnceDeliverySnapshotNetCoreManifest = "Akka.Persistence.AtLeastOnceDeliverySnapshot, Akka.Persistence"; | ||
private const string StateChangeEventNetCoreManifest = "Akka.Persistence.Fsm.PersistentFSM+StateChangeEvent, Akka.Persistence"; | ||
private const string PersistentFSMSnapshotNetCoreManifest = "Akka.Persistence.Fsm.PersistentFSM+PersistentFSMSnapshot"; | ||
|
||
|
||
public PersistenceMessageSerializer(ExtendedActorSystem system) : base(system) | ||
{ | ||
} | ||
|
||
public override bool IncludeManifest { get; } = true; | ||
|
||
public override byte[] ToBinary(object obj) | ||
{ | ||
if (obj is IPersistentRepresentation repr) return GetPersistentMessage(repr).ToByteArray(); | ||
if (obj is AtomicWrite aw) return GetAtomicWrite(aw).ToByteArray(); | ||
if (obj is AtLeastOnceDeliverySnapshot snap) return GetAtLeastOnceDeliverySnapshot(snap).ToByteArray(); | ||
if (obj is PersistentFSM.StateChangeEvent stateEvent) return GetStateChangeEvent(stateEvent).ToByteArray(); | ||
switch (obj) | ||
{ | ||
case IPersistentRepresentation repr: | ||
return GetPersistentMessage(repr).ToByteArray(); | ||
case AtomicWrite aw: | ||
return GetAtomicWrite(aw).ToByteArray(); | ||
case AtLeastOnceDeliverySnapshot snap: | ||
return GetAtLeastOnceDeliverySnapshot(snap).ToByteArray(); | ||
case PersistentFSM.StateChangeEvent stateEvent: | ||
return GetStateChangeEvent(stateEvent).ToByteArray(); | ||
} | ||
|
||
if (obj.GetType().GetTypeInfo().IsGenericType | ||
&& obj.GetType().GetGenericTypeDefinition() == typeof(PersistentFSM.PersistentFSMSnapshot<>)) return GetPersistentFSMSnapshot(obj).ToByteArray(); | ||
&& obj.GetType().GetGenericTypeDefinition() == typeof(PersistentFSM.PersistentFSMSnapshot<>)) | ||
return GetPersistentFSMSnapshot(obj).ToByteArray(); | ||
|
||
throw new ArgumentException($"Can't serialize object of type [{obj.GetType()}] in [{GetType()}]"); | ||
} | ||
|
@@ -148,18 +169,54 @@ private PersistentFSMSnapshot GetPersistentFSMSnapshot(object obj) | |
return message; | ||
} | ||
|
||
public override object FromBinary(byte[] bytes, Type type) | ||
public override object FromBinary(byte[] bytes, string manifest) | ||
{ | ||
if(type == null) return GetPersistentRepresentation(PersistentMessage.Parser.ParseFrom(bytes)); | ||
if (type == typeof(Persistent)) return GetPersistentRepresentation(PersistentMessage.Parser.ParseFrom(bytes)); | ||
if (type == typeof(IPersistentRepresentation)) return GetPersistentRepresentation(PersistentMessage.Parser.ParseFrom(bytes)); | ||
if (type == typeof(AtomicWrite)) return GetAtomicWrite(bytes); | ||
if (type == typeof(AtLeastOnceDeliverySnapshot)) return GetAtLeastOnceDeliverySnapshot(bytes); | ||
if (type == typeof(PersistentFSM.StateChangeEvent)) return GetStateChangeEvent(bytes); | ||
if (type.GetTypeInfo().IsGenericType | ||
&& type.GetGenericTypeDefinition() == typeof(PersistentFSM.PersistentFSMSnapshot<>)) return GetPersistentFSMSnapshot(type, bytes); | ||
switch (manifest) | ||
{ | ||
case null: | ||
case IPersistentRepresentationManifest: | ||
case IPersistentRepresentationManifestNetCoreManifest: | ||
return GetPersistentRepresentation(PersistentMessage.Parser.ParseFrom(bytes)); | ||
|
||
case AtomicWriteManifest: | ||
case AtomicWriteNetCoreManifest: | ||
return GetAtomicWrite(bytes); | ||
|
||
case AtLeastOnceDeliverySnapshotManifest: | ||
case AtLeastOnceDeliverySnapshotNetCoreManifest: | ||
return GetAtLeastOnceDeliverySnapshot(bytes); | ||
|
||
case StateChangeEventManifest: | ||
case StateChangeEventNetCoreManifest: | ||
return GetStateChangeEvent(bytes); | ||
|
||
case PersistentFSMSnapshotManifest: | ||
case var m when m.StartsWith(PersistentFSMSnapshotNetCoreManifest): | ||
return GetPersistentFSMSnapshot(bytes); | ||
} | ||
|
||
throw new SerializationException($"Unimplemented deserialization of message with type [{type}] in [{GetType()}]"); | ||
throw new SerializationException($"Unimplemented deserialization of message with manifest [{manifest}] in [{GetType()}]"); | ||
} | ||
|
||
public override string Manifest(object obj) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For the time being, we should return the legacy types - I believe there are some persistence plugins (i.e. SQL ones) that are still relying on manifest data using reflection and while I would like to eventually clean that up, probably best to change as little as possible here first. |
||
{ | ||
switch (obj) | ||
{ | ||
case IPersistentRepresentation _: | ||
return IPersistentRepresentationManifest; | ||
case AtomicWrite _: | ||
return AtomicWriteManifest; | ||
case AtLeastOnceDeliverySnapshot _: | ||
return AtLeastOnceDeliverySnapshotManifest; | ||
case PersistentFSM.StateChangeEvent _: | ||
return StateChangeEventManifest; | ||
} | ||
|
||
if (obj.GetType().GetTypeInfo().IsGenericType | ||
&& obj.GetType().GetGenericTypeDefinition() == typeof(PersistentFSM.PersistentFSMSnapshot<>)) | ||
return PersistentFSMSnapshotManifest; | ||
|
||
throw new ArgumentException($"Can't serialize object of type [{obj.GetType()}] in [{GetType()}]. No manifest for said type is defined."); | ||
} | ||
|
||
private IPersistentRepresentation GetPersistentRepresentation(PersistentMessage message) | ||
|
@@ -226,7 +283,7 @@ private PersistentFSM.StateChangeEvent GetStateChangeEvent(byte[] bytes) | |
return new PersistentFSM.StateChangeEvent(message.StateIdentifier, timeout); | ||
} | ||
|
||
private object GetPersistentFSMSnapshot(Type type, byte[] bytes) | ||
private object GetPersistentFSMSnapshot(byte[] bytes) | ||
{ | ||
var message = PersistentFSMSnapshot.Parser.ParseFrom(bytes); | ||
|
||
|
@@ -236,10 +293,13 @@ private object GetPersistentFSMSnapshot(Type type, byte[] bytes) | |
timeout = TimeSpan.FromMilliseconds(message.TimeoutMillis); | ||
} | ||
|
||
var payload = GetPayload(message.Data); | ||
|
||
// use reflection to create the generic type of PersistentFSM.PersistentFSMSnapshot | ||
Type[] types = { typeof(string), type.GenericTypeArguments[0], typeof(TimeSpan?) }; | ||
object[] arguments = { message.StateIdentifier, GetPayload(message.Data), timeout }; | ||
Type[] types = { typeof(string), payload.GetType(), typeof(TimeSpan?) }; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Get the type of the payload from the payload type instead of relying on a generic passed into the method |
||
object[] arguments = { message.StateIdentifier, payload, timeout }; | ||
|
||
var type = typeof(PersistentFSM.PersistentFSMSnapshot<>).MakeGenericType(payload.GetType()); | ||
return type.GetConstructor(types).Invoke(arguments); | ||
} | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -8,6 +8,7 @@ | |
using System; | ||
using System.Collections.Concurrent; | ||
using System.Reflection; | ||
using System.Runtime.Serialization; | ||
using System.Text.RegularExpressions; | ||
using Akka.Annotations; | ||
|
||
|
@@ -58,13 +59,18 @@ public static bool Implements(this Type type, Type moreGeneralType) | |
[InternalApi] | ||
public static string TypeQualifiedName(this Type type) | ||
{ | ||
string shortened; | ||
if (ShortenedTypeNames.TryGetValue(type, out shortened)) | ||
if (ShortenedTypeNames.TryGetValue(type, out var shortened)) | ||
{ | ||
return shortened; | ||
} | ||
|
||
shortened = cleanAssemblyVersionRegex.Replace(type.AssemblyQualifiedName, string.Empty); | ||
// Defensive coding. `type.AssemblyQualifiedName` can return null if type is of generic type parameter type | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. LGTM |
||
shortened = type.AssemblyQualifiedName; | ||
if (shortened == null) | ||
throw new SerializationException( | ||
$"Could not get a type qualified name for type [{type}], most likely because it is not a concrete type, but a generic type parameter type."); | ||
|
||
shortened = cleanAssemblyVersionRegex.Replace(shortened, string.Empty); | ||
ShortenedTypeNames.TryAdd(type, shortened); | ||
|
||
return shortened; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good catch on the
null