Skip to content

Commit

Permalink
Use TimestampProvider in BatchingSqlJournal (#5192)
Browse files Browse the repository at this point in the history
* Use TimestampProvider in BatchingSqlJournal

* Added comments

* added API approval

* Return DefaultTimestampProvider if typename is null of empty

Co-authored-by: Martijn Schoemaker <m.schoemaker@agrifirm.com>
Co-authored-by: Aaron Stannard <aaron@petabridge.com>
  • Loading branch information
3 people authored Aug 16, 2021
1 parent 5646f96 commit 2a8278b
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
using Akka.Event;
using Akka.Pattern;
using Akka.Persistence.Journal;
using Akka.Persistence.Sql.Common.Journal;
using Akka.Serialization;
using Akka.Util;
using Akka.Util.Internal;
Expand Down Expand Up @@ -233,7 +234,12 @@ public abstract class BatchingSqlJournalSetup
/// <summary>
/// The default serializer used when not type override matching is found
/// </summary>
public string DefaultSerializer { get; }
public string DefaultSerializer { get; }

/// <summary>
/// The fully qualified name of the type that should be used as timestamp provider.
/// </summary>
public string TimestampProviderTypeName { get; }

/// <summary>
/// Initializes a new instance of the <see cref="BatchingSqlJournalSetup" /> class.
Expand Down Expand Up @@ -293,6 +299,7 @@ protected BatchingSqlJournalSetup(Config config, QueryConfiguration namingConven
ReplayFilterSettings = new ReplayFilterSettings(config.GetConfig("replay-filter"));
NamingConventions = namingConventions;
DefaultSerializer = config.GetString("serializer", null);
TimestampProviderTypeName = config.GetString("timestamp-provider", null);
}

/// <summary>
Expand Down Expand Up @@ -528,6 +535,11 @@ public RequestChunk(int chunkId, IJournalRequest[] requests)
/// </summary>
protected readonly bool CanPublish;

/// <summary>
/// The timestamp provider that will be used for the timestamp column when writing messages to the database.
/// </summary>
protected ITimestampProvider TimestampProvider { get; }

/// <summary>
/// Logging adapter for current journal actor .
/// </summary>
Expand Down Expand Up @@ -562,7 +574,8 @@ public RequestChunk(int chunkId, IJournalRequest[] requests)
protected BatchingSqlJournal(BatchingSqlJournalSetup setup)
{
Setup = setup;
CanPublish = Persistence.Instance.Apply(Context.System).Settings.Internal.PublishPluginCommands;
CanPublish = Persistence.Instance.Apply(Context.System).Settings.Internal.PublishPluginCommands;
TimestampProvider = TimestampProviderProvider.GetTimestampProvider(setup.TimestampProviderTypeName, Context);

_persistenceIdSubscribers = new Dictionary<string, HashSet<IActorRef>>();
_tagSubscribers = new Dictionary<string, HashSet<IActorRef>>();
Expand Down Expand Up @@ -1286,7 +1299,7 @@ private async Task<WriteMessagesResult> HandleWriteMessages(WriteMessages req, T
persistent = persistent.WithPayload(tagged.Payload);
}

WriteEvent(command, persistent.WithTimestamp(DateTime.UtcNow.Ticks), tagBuilder.ToString());
WriteEvent(command, persistent.WithTimestamp(TimestampProvider.GenerateTimestamp(persistent)), tagBuilder.ToString());

await command.ExecuteNonQueryAsync();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
//-----------------------------------------------------------------------

using System;
using Akka.Actor;

namespace Akka.Persistence.Sql.Common.Journal
{
Expand Down Expand Up @@ -34,4 +35,21 @@ public sealed class DefaultTimestampProvider : ITimestampProvider
/// <returns>TBD</returns>
public long GenerateTimestamp(IPersistentRepresentation message) => DateTime.UtcNow.Ticks;
}

public static class TimestampProviderProvider
{
public static ITimestampProvider GetTimestampProvider(string typeName, IActorContext context)
{
if (string.IsNullOrEmpty(typeName))
{
return new DefaultTimestampProvider();
}

var type = Type.GetType(typeName, true);
var withSystem = type.GetConstructor(new[] { context.System.GetType() }) != null;
return withSystem ?
(ITimestampProvider)Activator.CreateInstance(type, context.System) :
(ITimestampProvider)Activator.CreateInstance(type);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -514,21 +514,7 @@ protected virtual string GetConnectionString()
return connectionString;
}

#region obsoleted

/// <summary>
/// TBD
/// </summary>
/// <param name="typeName">TBD</param>
/// <returns>TBD</returns>
protected ITimestampProvider GetTimestampProvider(string typeName)
{
var type = Type.GetType(typeName, true);
var withSystem = type.GetConstructor(new[] { Context.System.GetType() }) != null;
return withSystem ?
(ITimestampProvider)Activator.CreateInstance(type, Context.System) :
(ITimestampProvider)Activator.CreateInstance(type);
}
#endregion
protected ITimestampProvider GetTimestampProvider(string typeName) =>
TimestampProviderProvider.GetTimestampProvider(typeName, Context);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ namespace Akka.Persistence.Sql.Common.Journal
public int MaxConcurrentOperations { get; }
public Akka.Persistence.Sql.Common.Journal.QueryConfiguration NamingConventions { get; }
public Akka.Persistence.Sql.Common.Journal.ReplayFilterSettings ReplayFilterSettings { get; }
public string TimestampProviderTypeName { get; }
}
public abstract class BatchingSqlJournal<TConnection, TCommand> : Akka.Persistence.Journal.WriteJournalBase
where TConnection : System.Data.Common.DbConnection
Expand Down Expand Up @@ -98,6 +99,7 @@ namespace Akka.Persistence.Sql.Common.Journal
protected abstract System.Collections.Immutable.ImmutableDictionary<string, string> Initializers { get; }
protected virtual string InsertEventSql { get; }
protected Akka.Persistence.Sql.Common.Journal.BatchingSqlJournalSetup Setup { get; }
protected Akka.Persistence.Sql.Common.Journal.ITimestampProvider TimestampProvider { get; }
protected virtual string UpdateSequenceNrSql { get; }
protected void AddParameter(TCommand command, string paramName, System.Data.DbType dbType, object value) { }
protected void BatchRequest(Akka.Persistence.IJournalRequest message) { }
Expand Down Expand Up @@ -322,6 +324,10 @@ namespace Akka.Persistence.Sql.Common.Journal
public readonly string Tag;
public TaggedEventAppended(string tag) { }
}
public class static TimestampProviderProvider
{
public static Akka.Persistence.Sql.Common.Journal.ITimestampProvider GetTimestampProvider(string typeName, Akka.Actor.IActorContext context) { }
}
public sealed class WriteJournalBatch
{
public readonly System.Collections.Generic.IDictionary<Akka.Persistence.IPersistentRepresentation, System.Collections.Immutable.IImmutableSet<string>> EntryTags;
Expand Down

0 comments on commit 2a8278b

Please sign in to comment.