Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MVP proposal #26

Closed
wants to merge 67 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
67 commits
Select commit Hold shift + click to select a range
ab10cc1
FS clean up
lsfera Jul 4, 2024
977f203
suppress CS1591 (Missing XML comment for publicly visible type or me…
lsfera Jul 4, 2024
ad74d3c
made UseTable() optional
lsfera Jul 4, 2024
b59d386
enforce AOT
lsfera Jul 4, 2024
71c1f14
rename IConsumes to IHandles
lsfera Jul 4, 2024
86656ec
Use double quote
lsfera Jul 4, 2024
438cf21
use ILKE. Replication slot name is forced to lcase
lsfera Jul 4, 2024
b24c01f
unused directive
lsfera Jul 4, 2024
b024c6f
remove static variables to enable multiple instances on same process
lsfera Jul 4, 2024
8cc1fed
Added DependencyIbjection project
lsfera Jul 4, 2024
380d06a
Added demo projrct for DI
lsfera Jul 4, 2024
acf5f4d
Enhanced logging
lsfera Jul 5, 2024
e918dfa
bumped version to 0.1.1
lsfera Jul 5, 2024
2055fed
allow tableDescriptor access
lsfera Jul 5, 2024
eadba73
renamed vars
lsfera Jul 5, 2024
c097047
expose NpgsqlDataSource along with connection string - close #16
lsfera Jul 15, 2024
ef9ac95
rename extension method
lsfera Jul 15, 2024
345cad6
move to files
lsfera Jul 15, 2024
645ec1a
simplified di registration
lsfera Jul 16, 2024
ca28054
collapse project
lsfera Jul 16, 2024
f4cfe0c
rename folder
lsfera Jul 16, 2024
f9bf48f
mark as implicit usage
lsfera Jul 17, 2024
7fff1b4
switch to prepared statement
lsfera Jul 17, 2024
f0c4160
dispose resources
lsfera Jul 17, 2024
8db4187
explicit defaults
lsfera Jul 17, 2024
c446721
add PublisherOptions
lsfera Jul 17, 2024
e71999b
Expose shortcut for table validation when bootstrapping publisher
lsfera Jul 17, 2024
2351af1
explain different available to publisher for validating table
lsfera Jul 17, 2024
9d3249a
file renamed
lsfera Jul 6, 2024
ea6f7fe
renamed IHandler to IMessageHandler
lsfera Jul 7, 2024
3887f14
first working version
lsfera Jul 17, 2024
0cf7595
removed unused
lsfera Jul 18, 2024
6809b02
files reorg
lsfera Jul 18, 2024
e20bfa9
marked classes as sealed
lsfera Jul 18, 2024
9656b3b
Reviewed Atrtibutes class allowed only, not inherit
lsfera Jul 18, 2024
06bd0a1
unused file
lsfera Jul 18, 2024
41a96de
unused
lsfera Jul 18, 2024
52b5567
expose singleton
lsfera Jul 18, 2024
253d4d4
reify memoization
lsfera Jul 19, 2024
678b2c8
formatting stuff
lsfera Jul 19, 2024
c60bc89
class renamed to PublisherOptions and SubscriberOptions. Move classes…
lsfera Jul 19, 2024
7729fcd
Ensure subscriber default options
lsfera Jul 20, 2024
5ae673e
rename `PublicationSetupOptions` to `PublicationOptions` and `Replica…
lsfera Jul 20, 2024
85f520b
move classes to files
lsfera Jul 20, 2024
20bc581
renamed files
lsfera Jul 21, 2024
a95cf77
table creation can be enforced when bootstrapping in both processes(p…
lsfera Jul 21, 2024
d3c5825
typo
lsfera Jul 21, 2024
f1d0961
Enforcing invariants on publisher/subscriber options builder
lsfera Jul 21, 2024
910902a
updated satellite packages to latest
lsfera Jul 22, 2024
5126bb9
simplify test
lsfera Jul 22, 2024
b236c91
additional examples
lsfera Jul 22, 2024
49408fa
consumer lookup logic must fallback on willdcard
lsfera Jul 22, 2024
376d748
use select pg_advisory_xact_lock to serialize access to message table…
lsfera Jul 23, 2024
8794c4f
mime type is internally exposed for future extension towards binary d…
lsfera Jul 23, 2024
fa05588
renamed nethod Name => Named for table name
lsfera Jul 23, 2024
7205446
tested table creation
lsfera Jul 23, 2024
ddbd7f6
simplified raw urn
lsfera Jul 24, 2024
964b883
Provide untyped append method
lsfera Jul 24, 2024
1de0a77
avoid usage checking on public method
lsfera Jul 24, 2024
575c2b5
provide additional usage patterns
lsfera Jul 24, 2024
44953bb
provide minimal dsl on typed consumer
lsfera Jul 27, 2024
2726be9
move methodInfo registration at configuration time
lsfera Jul 27, 2024
c777721
enable processed data trace only on trace enabled logging level
lsfera Jul 27, 2024
b3d4d17
corrected IErrorProcessor signature to accept KoEnvelope Id
lsfera Jul 30, 2024
8fd8041
added more publishing options
lsfera Jul 31, 2024
e56c3c1
embed ConsumeOptions with typed Consumes
lsfera Jul 31, 2024
28745e8
Added EnableSubscriptionAutoHeal - see https://github.com/event-drive…
lsfera Aug 1, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
first working version
  • Loading branch information
lsfera committed Jul 17, 2024
commit 3887f148e0dad88c2b057cab9b6b822a633fca14
2 changes: 1 addition & 1 deletion src/Blumchen/Database/Run.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ internal static async IAsyncEnumerable<IEnvelope> QueryTransactionSnapshot(this
string snapshotName,
TableDescriptorBuilder.MessageTable tableDescriptor,
ISet<string> registeredTypesKeys,
IReplicationDataMapper dataMapper,
ReplicationDataMapper dataMapper,
[EnumeratorCancellation] CancellationToken ct)
{
var transaction = await connection.BeginTransactionAsync(IsolationLevel.RepeatableRead, ct).ConfigureAwait(false);
Expand Down
5 changes: 3 additions & 2 deletions src/Blumchen/Publications/MessageAppender.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System.Collections;
using System.Text.Json.Serialization.Metadata;
using Blumchen.Serialization;
using Npgsql;

Expand Down Expand Up @@ -28,7 +29,7 @@ public static async Task AppendAsync<T>(T @input

private static async Task AppendAsyncOfT<T>(T input
, TableDescriptorBuilder.MessageTable tableDescriptor
, IJsonTypeResolver typeResolver
, ITypeResolver<JsonTypeInfo> typeResolver
, NpgsqlConnection connection
, NpgsqlTransaction transaction
, CancellationToken ct) where T : class
Expand Down Expand Up @@ -66,7 +67,7 @@ public static async Task AppendAsync<T>(T input

private static async Task AppendBatchAsyncOfT<T>(T inputs
, TableDescriptorBuilder.MessageTable tableDescriptor
, IJsonTypeResolver resolver
, ITypeResolver<JsonTypeInfo> resolver
, NpgsqlConnection connection
, NpgsqlTransaction transaction
, CancellationToken ct) where T : class, IEnumerable
Expand Down
3 changes: 2 additions & 1 deletion src/Blumchen/Publications/PublisherOptions.cs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
using System.Text.Json.Serialization.Metadata;
using Blumchen.Database;
using Blumchen.Serialization;
using Npgsql;

namespace Blumchen.Publications;

public record PublisherOptions(TableDescriptorBuilder.MessageTable TableDescriptor, IJsonTypeResolver JsonTypeResolver);
public record PublisherOptions(TableDescriptorBuilder.MessageTable TableDescriptor, ITypeResolver<JsonTypeInfo> JsonTypeResolver);

public static class PublisherOptionsExtensions
{
Expand Down
1 change: 1 addition & 0 deletions src/Blumchen/Publications/PublisherSetupOptionsBuilder.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System.Text.Json.Serialization;
using System.Text.Json.Serialization.Metadata;
using Blumchen.Serialization;
using JetBrains.Annotations;
using static Blumchen.TableDescriptorBuilder;
Expand Down
10 changes: 5 additions & 5 deletions src/Blumchen/Serialization/ITypeResolver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@ namespace Blumchen.Serialization;
public interface ITypeResolver<T>
{
(string, T) Resolve(Type type);
Type Resolve(string type);
IDictionary<string, Type> RegisteredTypes { get; }
}

public interface IJsonTypeResolver: ITypeResolver<JsonTypeInfo>;

internal sealed class JsonTypeResolver(
JsonSerializerContext serializationContext,
INamingPolicy? namingPolicy = default)
: IJsonTypeResolver
: ITypeResolver<JsonTypeInfo>
{
public JsonSerializerContext SerializationContext { get; } = serializationContext;
private readonly ConcurrentDictionary<string, Type> _typeDictionary = [];
Expand All @@ -31,7 +31,7 @@ internal void WhiteList(Type type)
public (string, JsonTypeInfo) Resolve(Type type) =>
(_typeDictionary.Single(kv => kv.Value == type).Key, _typeInfoDictionary[type]);

internal IDictionary<string,Type> RegisteredTypes { get => _typeDictionary; }
internal Type Resolve(string type) => _typeDictionary[type];
public IDictionary<string,Type> RegisteredTypes { get => _typeDictionary; }
public Type Resolve(string type) => _typeDictionary[type];
}

4 changes: 2 additions & 2 deletions src/Blumchen/Serialization/JsonTypeResolverExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,6 @@ namespace Blumchen.Serialization;

public static class JsonTypeResolverExtensions
{
internal static IEnumerable<string> Keys(this JsonTypeResolver? resolver) => resolver?.RegisteredTypes.Keys ?? Enumerable.Empty<string>();
internal static IEnumerable<Type> Values(this JsonTypeResolver? resolver) => resolver?.RegisteredTypes.Values ?? Enumerable.Empty<Type>();
internal static IEnumerable<string> Keys<T>(this ITypeResolver<T>? resolver) => resolver?.RegisteredTypes.Keys ?? Enumerable.Empty<string>();
internal static IEnumerable<Type> Values<T>(this ITypeResolver<T>? resolver) => resolver?.RegisteredTypes.Values ?? Enumerable.Empty<Type>();
}
44 changes: 43 additions & 1 deletion src/Blumchen/Serialization/MessageUrnAttribute.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

namespace Blumchen.Serialization;

[AttributeUsage(AttributeTargets.Class | AttributeTargets.Interface)]
[AttributeUsage(AttributeTargets.Class)]
public class MessageUrnAttribute:
Attribute
{
Expand Down Expand Up @@ -33,6 +33,48 @@ private static Uri FormatUrn(string urn)
}


public enum RawData
{
String,
Object
}

[AttributeUsage(AttributeTargets.Interface, AllowMultiple = true)]
public class RawUrnAttribute:
Attribute
{
public RawData Data { get; }

/// <summary>
/// </summary>
/// <param name="urn">The urn value to use for this message type.</param>
/// <param name="data">The <see cref="RawData"/> value to bind to this <paramref name="urn"/></param>
public RawUrnAttribute(string urn, RawData data)
{
Data = data;
ArgumentException.ThrowIfNullOrEmpty(urn, nameof(urn));

if (urn.StartsWith(MessageUrn.Prefix))
throw new ArgumentException($"Value should not contain the default prefix '{MessageUrn.Prefix}'.", nameof(urn));

Urn = FormatUrn(urn);
}

public Uri Urn { get; }

private static Uri FormatUrn(string urn)
{
var fullValue = MessageUrn.Prefix + urn;

if (Uri.TryCreate(fullValue, UriKind.Absolute, out var uri))
return uri;

throw new UriFormatException($"Invalid URN: {fullValue}");
}
}



public static class MessageUrn
{
public const string Prefix = "urn:message:";
Expand Down
8 changes: 3 additions & 5 deletions src/Blumchen/Subscriptions/ISubscriptionOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ public interface ISubscriptionOptions
{
[UsedImplicitly] NpgsqlDataSource DataSource { get; }
[UsedImplicitly] NpgsqlConnectionStringBuilder ConnectionStringBuilder { get; }
IReplicationDataMapper DataMapper { get; }
IDictionary<string, Tuple<IReplicationJsonBMapper, IMessageHandler>> Registry { get; }
[UsedImplicitly] PublicationSetupOptions PublicationOptions { get; }
[UsedImplicitly] ReplicationSlotSetupOptions ReplicationOptions { get; }
[UsedImplicitly] IErrorProcessor ErrorProcessor { get; }
Expand All @@ -21,8 +21,7 @@ void Deconstruct(
out PublicationSetupOptions publicationSetupOptions,
out ReplicationSlotSetupOptions replicationSlotSetupOptions,
out IErrorProcessor errorProcessor,
out IReplicationDataMapper dataMapper,
out Dictionary<Type, IMessageHandler> registry);
out IDictionary<string, Tuple<IReplicationJsonBMapper, IMessageHandler>> registry);
}

internal record SubscriptionOptions(
Expand All @@ -31,5 +30,4 @@ internal record SubscriptionOptions(
PublicationSetupOptions PublicationOptions,
ReplicationSlotSetupOptions ReplicationOptions,
IErrorProcessor ErrorProcessor,
IReplicationDataMapper DataMapper,
Dictionary<Type, IMessageHandler> Registry): ISubscriptionOptions;
IDictionary<string, Tuple<IReplicationJsonBMapper, IMessageHandler>> Registry): ISubscriptionOptions;
29 changes: 14 additions & 15 deletions src/Blumchen/Subscriptions/Management/PublicationManagement.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
using System.Text.Json.Serialization.Metadata;
using Blumchen.Database;
using Blumchen.Serialization;
using Npgsql;
Expand All @@ -16,37 +17,35 @@ public static async Task<SetupPublicationResult> SetupPublication(
CancellationToken ct
)
{
var (publicationName, createStyle, shouldReAddTablesIfWereRecreated, typeResolver, tableDescription) = setupOptions;
var (publicationName, createStyle, shouldReAddTablesIfWereRecreated, registeredTypes, tableDescription) = setupOptions;

return createStyle switch
{
Subscription.CreateStyle.Never => new None(),
Subscription.CreateStyle.AlwaysRecreate => await ReCreate(dataSource, publicationName, tableDescription.Name, typeResolver, ct).ConfigureAwait(false),
Subscription.CreateStyle.AlwaysRecreate => await ReCreate(dataSource, publicationName, tableDescription.Name, registeredTypes, ct).ConfigureAwait(false),
Subscription.CreateStyle.WhenNotExists when await dataSource.PublicationExists(publicationName, ct).ConfigureAwait(false) => await Refresh(dataSource, publicationName, tableDescription.Name, shouldReAddTablesIfWereRecreated, ct).ConfigureAwait(false),
Subscription.CreateStyle.WhenNotExists => await Create(dataSource, publicationName, tableDescription.Name, typeResolver, ct).ConfigureAwait(false),
Subscription.CreateStyle.WhenNotExists => await Create(dataSource, publicationName, tableDescription.Name, registeredTypes, ct).ConfigureAwait(false),
_ => throw new ArgumentOutOfRangeException(nameof(setupOptions.CreateStyle))
};

static async Task<SetupPublicationResult> ReCreate(
NpgsqlDataSource dataSource,
string publicationName,
string tableName,
JsonTypeResolver? typeResolver,
ISet<string> registeredTypes,
CancellationToken ct
) {
await dataSource.DropPublication(publicationName, ct).ConfigureAwait(false);
return await Create(dataSource, publicationName, tableName, typeResolver, ct).ConfigureAwait(false);
return await Create(dataSource, publicationName, tableName, registeredTypes, ct).ConfigureAwait(false);
}

static async Task<SetupPublicationResult> Create(NpgsqlDataSource dataSource,
string publicationName,
string tableName,
JsonTypeResolver? typeResolver,
ISet<string> registeredTypes,
CancellationToken ct
) {
await dataSource.CreatePublication(publicationName, tableName,
typeResolver.Keys().ToHashSet(), ct).ConfigureAwait(false);

await dataSource.CreatePublication(publicationName, tableName, registeredTypes, ct).ConfigureAwait(false);
return new Created();
}

Expand All @@ -66,14 +65,14 @@ internal static Task CreatePublication(
this NpgsqlDataSource dataSource,
string publicationName,
string tableName,
ISet<string> eventTypes,
ISet<string> registeredTypes,
CancellationToken ct
) {
var sql = $"CREATE PUBLICATION \"{publicationName}\" FOR TABLE {tableName} {{0}} WITH (publish = 'insert');";
return eventTypes.Count switch
return registeredTypes.Count switch
{
0 => Execute(dataSource, string.Format(sql,string.Empty), ct),
_ => Execute(dataSource, string.Format(sql, $"WHERE ({PublicationFilter(eventTypes)})"), ct)
_ => Execute(dataSource, string.Format(sql, $"WHERE ({PublicationFilter(registeredTypes)})"), ct)
};
static string PublicationFilter(ICollection<string> input) => string.Join(" OR ", input.Select(s => $"message_type = '{s}'"));
}
Expand Down Expand Up @@ -143,21 +142,21 @@ public sealed record PublicationSetupOptions(
)
{
internal const string DefaultPublicationName = "pub";
internal JsonTypeResolver? TypeResolver { get; init; } = default;
internal ISet<string> RegisteredTypes { get; init; } = Enumerable.Empty<string>().ToHashSet();

internal TableDescriptorBuilder.MessageTable TableDescriptor { get; init; } = new TableDescriptorBuilder().Build();

internal void Deconstruct(
out string publicationName,
out Subscription.CreateStyle createStyle,
out bool reAddTablesIfWereRecreated,
out JsonTypeResolver? typeResolver,
out ISet<string> registeredTypes,
out TableDescriptorBuilder.MessageTable tableDescription)
{
publicationName = PublicationName;
createStyle = Subscription.CreateStyle.WhenNotExists;
reAddTablesIfWereRecreated = ShouldReAddTablesIfWereRecreated;
typeResolver = TypeResolver;
registeredTypes = RegisteredTypes;
tableDescription = TableDescriptor;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,6 @@ public interface IReplicationDataMapper
Task<IEnvelope> ReadFromSnapshot(NpgsqlDataReader reader, CancellationToken ct);

Task<IEnvelope> ReadFromReplication(InsertMessage insertMessage, CancellationToken ct);


}
Loading