Skip to content

Commit

Permalink
Allowed message table customization:
Browse files Browse the repository at this point in the history
table name, columns names and dimension of varchar column
  • Loading branch information
lsfera authored and oskardudycz committed Jul 3, 2024
1 parent 4941d3b commit 1f4aa7a
Show file tree
Hide file tree
Showing 16 changed files with 208 additions and 96 deletions.
21 changes: 7 additions & 14 deletions src/Blumchen/Database/Run.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,8 @@ private static async Task Execute(
await command.ExecuteNonQueryAsync(ct).ConfigureAwait(false);
}

public static async Task EnsureTableExists(this NpgsqlDataSource dataSource, string tableName, CancellationToken ct)
{
var sql = @$"
CREATE TABLE IF NOT EXISTS {tableName} (
id BIGINT PRIMARY KEY GENERATED BY DEFAULT AS IDENTITY,
message_type VARCHAR(250) NOT NULL,
data JSONB NOT NULL
);
";
await dataSource.Execute(sql, ct).ConfigureAwait(false);
}
public static async Task EnsureTableExists(this NpgsqlDataSource dataSource, TableDescriptorBuilder.MessageTable tableDescriptor, CancellationToken ct)
=> await dataSource.Execute(tableDescriptor.ToString(), ct).ConfigureAwait(false);

public static async Task<bool> Exists(
this NpgsqlDataSource dataSource,
Expand All @@ -49,7 +40,7 @@ public static async Task<bool> Exists(

internal static async IAsyncEnumerable<IEnvelope> QueryTransactionSnapshot(this NpgsqlConnection connection,
string snapshotName,
string tableName,
TableDescriptorBuilder.MessageTable tableDescriptor,
ISet<string> registeredTypesKeys,
IReplicationDataMapper dataMapper,
[EnumeratorCancellation] CancellationToken ct)
Expand All @@ -62,9 +53,9 @@ internal static async IAsyncEnumerable<IEnvelope> QueryTransactionSnapshot(this
await using var command1 = command.ConfigureAwait(false);
await command.ExecuteScalarAsync(ct).ConfigureAwait(false);
var whereClause = registeredTypesKeys.Count > 0
? $" WHERE message_type IN({PublicationFilter(registeredTypesKeys)})"
? $" WHERE {tableDescriptor.MessageType.Name} IN({PublicationFilter(registeredTypesKeys)})"
: null;
var cmd = new NpgsqlCommand($"SELECT * FROM {tableName}{whereClause}", connection, transaction);
var cmd = new NpgsqlCommand($"SELECT * FROM {tableDescriptor.Name}{whereClause}", connection, transaction);
await using var cmd1 = cmd.ConfigureAwait(false);
var reader = await cmd.ExecuteReaderAsync(ct).ConfigureAwait(false);
await using var reader1 = reader.ConfigureAwait(false);
Expand All @@ -76,3 +67,5 @@ internal static async IAsyncEnumerable<IEnvelope> QueryTransactionSnapshot(this
static string PublicationFilter(ICollection<string> input) => string.Join(", ", input.Select(s => $"'{s}'"));
}
}


76 changes: 76 additions & 0 deletions src/Blumchen/MessageTableOptions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
using Blumchen.Subscriptions;
using NpgsqlTypes;

namespace Blumchen;

#pragma warning disable CS1591
public record TableDescriptorBuilder
{
private MessageTable TableDescriptor { get; set; } = new();

public MessageTable Build() => TableDescriptor.Build();

public TableDescriptorBuilder Name(string eventsTable)
{
TableDescriptor = new MessageTable(eventsTable);
return this;
}

public TableDescriptorBuilder Id(string name)
{
TableDescriptor = TableDescriptor with { Id = new Column.Id(name) };
return this;
}

public TableDescriptorBuilder MessageData(string name, MimeType mime)
{
TableDescriptor = TableDescriptor with { Data = new Column.Data(name), MimeType = mime };
return this;
}

public TableDescriptorBuilder MessageType(string name, int dimension = 250)
{
TableDescriptor = TableDescriptor with { MessageType = new Column.MessageType(name, dimension) };
return this;
}

public record MessageTable(string Name = MessageTable.DefaultName)
{
internal const string DefaultName = "outbox";
public Column.Id Id { get; internal init; } = Column.Id.Default();
public Column.MessageType MessageType { get; internal init; } = Column.MessageType.Default();
public Column.Data Data { get; internal init; } = Column.Data.Default();
public MimeType MimeType { get; internal init; } = new MimeType.Json();
public MessageTable Build() => this;

public override string ToString() => @$"
CREATE TABLE IF NOT EXISTS {Name} (
{Id} PRIMARY KEY GENERATED BY DEFAULT AS IDENTITY,
{MessageType} NOT NULL,
{Data} NOT NULL
);";
}

public record Column(string Name, NpgsqlDbType Type)
{
public override string ToString() => $"{Name} {Type}";

public record Id(string Name): Column(Name, NpgsqlDbType.Bigint)
{
public override string ToString() => base.ToString();
internal static readonly Func<Id> Default = () => new("id");
}

public record MessageType(string Name, int Dimension): Column(Name, NpgsqlDbType.Varchar)
{
internal static readonly Func<MessageType> Default = () => new("message_type", 250);
public override string ToString() => $"{base.ToString()}({Dimension})";
}

public record Data(string Name): Column(Name, NpgsqlDbType.Jsonb)
{
internal static readonly Func<Data> Default = () => new("data");
public override string ToString() => base.ToString();
}
}
}
90 changes: 47 additions & 43 deletions src/Blumchen/Publications/MessageAppender.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,41 +7,64 @@ namespace Blumchen.Publications;

public static class MessageAppender
{

public static async Task AppendAsync<T>(string tableName, T @event, IJsonTypeResolver resolver, string connectionString, CancellationToken ct)
where T: class
{
var type = typeof(T);
var (typeName, jsonTypeInfo) = resolver.Resolve(type);
var data = JsonSerialization.ToJson(@event, jsonTypeInfo);

var connection = new NpgsqlConnection(connectionString);
await using var connection1 = connection.ConfigureAwait(false);
await connection.OpenAsync(ct).ConfigureAwait(false);
var command = connection.CreateCommand();
command.CommandText = $"INSERT INTO {tableName}(message_type, data) values ('{typeName}', '{data}')";
await command.ExecuteNonQueryAsync(ct).ConfigureAwait(false);
}

public static async Task AppendAsync<T>(string tableName, T @input, IJsonTypeResolver resolver, NpgsqlConnection connection, NpgsqlTransaction transaction, CancellationToken ct)
where T : class
public static async Task AppendAsync<T>(T @input
, (TableDescriptorBuilder.MessageTable tableDescriptor, IJsonTypeResolver jsonTypeResolver) resolver
, NpgsqlConnection connection
, NpgsqlTransaction transaction
, CancellationToken ct
) where T : class
{
switch (@input)
{
case null:
throw new ArgumentNullException(nameof(@input));
case IEnumerable inputs:
await AppendBatchAsyncOfT(tableName, inputs, resolver, connection, transaction, ct).ConfigureAwait(false);
await AppendBatchAsyncOfT(inputs, resolver.tableDescriptor, resolver.jsonTypeResolver, connection, transaction, ct).ConfigureAwait(false);
break;
default:
await AppendAsyncOfT(tableName, input, resolver, connection, transaction, ct).ConfigureAwait(false);
await AppendAsyncOfT(input, resolver.tableDescriptor, resolver.jsonTypeResolver, connection, transaction, ct).ConfigureAwait(false);
break;
}
}

private static async Task AppendBatchAsyncOfT<T>(
string tableName
, T inputs
private static async Task AppendAsyncOfT<T>(T input
, TableDescriptorBuilder.MessageTable tableDescriptor
, IJsonTypeResolver typeResolver
, NpgsqlConnection connection
, NpgsqlTransaction transaction
, CancellationToken ct) where T : class
{
var (typeName, jsonTypeInfo) = typeResolver.Resolve(typeof(T));
var data = JsonSerialization.ToJson(@input, jsonTypeInfo);
var command = new NpgsqlCommand(
$"INSERT INTO {tableDescriptor.Name}({tableDescriptor.MessageType.Name}, {tableDescriptor.Data.Name}) values ('{typeName}', '{data}')",
connection,
transaction
);
await command.ExecuteNonQueryAsync(ct).ConfigureAwait(false);
}

public static async Task AppendAsync<T>(T input
, (TableDescriptorBuilder.MessageTable tableDescriptor, IJsonTypeResolver resolver) options
, string connectionString
, CancellationToken ct)
where T: class
{
var type = typeof(T);
var (typeName, jsonTypeInfo) = options.resolver.Resolve(type);
var data = JsonSerialization.ToJson(input, jsonTypeInfo);

var connection = new NpgsqlConnection(connectionString);
await using var connection1 = connection.ConfigureAwait(false);
await connection.OpenAsync(ct).ConfigureAwait(false);
var command = connection.CreateCommand();
command.CommandText =
$"INSERT INTO {options.tableDescriptor.Name}({options.tableDescriptor.MessageType.Name}, {options.tableDescriptor.Data.Name}) values ('{typeName}', '{data}')";
await command.ExecuteNonQueryAsync(ct).ConfigureAwait(false);
}

private static async Task AppendBatchAsyncOfT<T>(T inputs
, TableDescriptorBuilder.MessageTable tableDescriptor
, IJsonTypeResolver resolver
, NpgsqlConnection connection
, NpgsqlTransaction transaction
Expand All @@ -56,28 +79,9 @@ string tableName


batchCommand.CommandText =
$"INSERT INTO {tableName}(message_type, data) values ('{typeName}', '{data}')";
$"INSERT INTO {tableDescriptor.Name}({tableDescriptor.MessageType.Name}, {tableDescriptor.Data.Name}) values ('{typeName}', '{data}')";
batch.BatchCommands.Add(batchCommand);
}
await batch.ExecuteNonQueryAsync(ct).ConfigureAwait(false);
}

private static async Task AppendAsyncOfT<T>(
string tableName
, T @input
, IJsonTypeResolver resolver
, NpgsqlConnection connection
, NpgsqlTransaction transaction
, CancellationToken ct) where T : class
{

var (typeName, jsonTypeInfo) = resolver.Resolve(typeof(T));
var data = JsonSerialization.ToJson(@input, jsonTypeInfo);
var command = new NpgsqlCommand(
$"INSERT INTO {tableName}(message_type, data) values ('{typeName}', '{data}')",
connection,
transaction
);
await command.ExecuteNonQueryAsync(ct).ConfigureAwait(false);
}
}
15 changes: 13 additions & 2 deletions src/Blumchen/Publications/PublisherSetupOptionsBuilder.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using System.Text.Json.Serialization;
using Blumchen.Serialization;
using JetBrains.Annotations;
using static Blumchen.TableDescriptorBuilder;

namespace Blumchen.Publications;

Expand All @@ -9,6 +10,8 @@ public class PublisherSetupOptionsBuilder
{
private INamingPolicy? _namingPolicy;
private JsonSerializerContext? _jsonSerializerContext;
private static readonly TableDescriptorBuilder TableDescriptorBuilder = new();
private MessageTable? _tableDescriptor;

[UsedImplicitly]
public PublisherSetupOptionsBuilder NamingPolicy(INamingPolicy namingPolicy)
Expand All @@ -24,11 +27,19 @@ public PublisherSetupOptionsBuilder JsonContext(JsonSerializerContext jsonSerial
return this;
}

public IJsonTypeResolver Build()
[UsedImplicitly]
public PublisherSetupOptionsBuilder WithTable(Func<TableDescriptorBuilder, TableDescriptorBuilder> builder)
{
_tableDescriptor = builder(TableDescriptorBuilder).Build();
return this;
}

public (MessageTable tableDescriptor, IJsonTypeResolver jsonTypeResolver) Build()
{
ArgumentNullException.ThrowIfNull(_jsonSerializerContext);
ArgumentNullException.ThrowIfNull(_namingPolicy);

_tableDescriptor ??= TableDescriptorBuilder.Build();
var jsonTypeResolver = new JsonTypeResolver(_jsonSerializerContext, _namingPolicy);
using var typeEnum = _jsonSerializerContext.GetType()
.GetCustomAttributesData()
Expand All @@ -38,6 +49,6 @@ public IJsonTypeResolver Build()
while (typeEnum.MoveNext())
jsonTypeResolver.WhiteList(typeEnum.Current);

return jsonTypeResolver;
return (_tableDescriptor,jsonTypeResolver);
}
}
19 changes: 10 additions & 9 deletions src/Blumchen/Subscriptions/Management/PublicationManagement.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@ public static async Task<SetupPublicationResult> SetupPublication(
CancellationToken ct
)
{
var (publicationName, tableName, createStyle, shouldReAddTablesIfWereRecreated, typeResolver) = setupOptions;
var (publicationName, createStyle, shouldReAddTablesIfWereRecreated, typeResolver, tableDescription) = setupOptions;

return createStyle switch
{
Subscription.CreateStyle.Never => new None(),
Subscription.CreateStyle.AlwaysRecreate => await ReCreate(dataSource, publicationName, tableName, typeResolver, ct).ConfigureAwait(false),
Subscription.CreateStyle.WhenNotExists when await dataSource.PublicationExists(publicationName, ct).ConfigureAwait(false) => await Refresh(dataSource, publicationName, tableName, shouldReAddTablesIfWereRecreated, ct).ConfigureAwait(false),
Subscription.CreateStyle.WhenNotExists => await Create(dataSource, publicationName, tableName, typeResolver, ct).ConfigureAwait(false),
Subscription.CreateStyle.AlwaysRecreate => await ReCreate(dataSource, publicationName, tableDescription.Name, typeResolver, ct).ConfigureAwait(false),
Subscription.CreateStyle.WhenNotExists when await dataSource.PublicationExists(publicationName, ct).ConfigureAwait(false) => await Refresh(dataSource, publicationName, tableDescription.Name, shouldReAddTablesIfWereRecreated, ct).ConfigureAwait(false),
Subscription.CreateStyle.WhenNotExists => await Create(dataSource, publicationName, tableDescription.Name, typeResolver, ct).ConfigureAwait(false),
_ => throw new ArgumentOutOfRangeException(nameof(setupOptions.CreateStyle))
};

Expand Down Expand Up @@ -143,27 +143,28 @@ public record Created: SetupPublicationResult;

public sealed record PublicationSetupOptions(
string PublicationName = PublicationSetupOptions.DefaultPublicationName,
string TableName = PublicationSetupOptions.DefaultTableName,
Subscription.CreateStyle CreateStyle = Subscription.CreateStyle.WhenNotExists,
bool ShouldReAddTablesIfWereRecreated = false
)
{
internal const string DefaultTableName = "outbox";
internal const string DefaultPublicationName = "pub";
internal JsonTypeResolver? TypeResolver { get; init; } = default;

internal TableDescriptorBuilder.MessageTable TableDescriptor { get; init; } = new TableDescriptorBuilder().Build();

internal void Deconstruct(
out string publicationName,
out string tableName,
out Subscription.CreateStyle createStyle,
out bool reAddTablesIfWereRecreated,
out JsonTypeResolver? typeResolver)
out JsonTypeResolver? typeResolver,
out TableDescriptorBuilder.MessageTable tableDescription)
{
publicationName = PublicationName;
tableName = TableName;
createStyle = Subscription.CreateStyle.WhenNotExists;
reAddTablesIfWereRecreated = ShouldReAddTablesIfWereRecreated;
typeResolver = TypeResolver;
tableDescription = TableDescriptor;
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ CancellationToken ct
) => dataSource.Exists("pg_replication_slots", "slot_name = $1", [slotName], ct);

public record ReplicationSlotSetupOptions(
string SlotName = $"{PublicationManagement.PublicationSetupOptions.DefaultTableName}_slot",
string SlotName = $"{TableDescriptorBuilder.MessageTable.DefaultName}_slot",
Subscription.CreateStyle CreateStyle = Subscription.CreateStyle.WhenNotExists,
bool Binary = false //https://www.postgresql.org/docs/current/sql-createsubscription.html#SQL-CREATESUBSCRIPTION-WITH-BINARY
);
Expand Down
7 changes: 7 additions & 0 deletions src/Blumchen/Subscriptions/MimeType.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
namespace Blumchen.Subscriptions;

#pragma warning disable CS1591
public abstract record MimeType(string mimeType)
{
public record Json(): MimeType("application/json");
}
4 changes: 2 additions & 2 deletions src/Blumchen/Subscriptions/SnapshotReader/SnapshotReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,14 @@ public static class SnapshotReader
{
internal static async IAsyncEnumerable<IEnvelope> GetRowsFromSnapshot(this NpgsqlConnection connection,
string snapshotName,
string tableName,
TableDescriptorBuilder.MessageTable tableDescriptor,
IReplicationDataMapper dataMapper,
ISet<string> registeredTypes,
[EnumeratorCancellation] CancellationToken ct = default)
{
await foreach (var @event in connection.QueryTransactionSnapshot(
snapshotName,
tableName,
tableDescriptor,
registeredTypes,
dataMapper,
ct).ConfigureAwait(false))
Expand Down
4 changes: 2 additions & 2 deletions src/Blumchen/Subscriptions/Subscription.cs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public async IAsyncEnumerable<IEnvelope> Subscribe(
dataSourceBuilder.UseLoggerFactory(loggerFactory);

var dataSource = dataSourceBuilder.Build();
await dataSource.EnsureTableExists(publicationSetupOptions.TableName, ct).ConfigureAwait(false);
await dataSource.EnsureTableExists(publicationSetupOptions.TableDescriptor, ct).ConfigureAwait(false);

_connection = new LogicalReplicationConnection(connectionString);
await _connection.Open(ct).ConfigureAwait(false);
Expand Down Expand Up @@ -145,7 +145,7 @@ private static async IAsyncEnumerable<IEnvelope> ReadExistingRowsFromSnapshot(
await using var connection1 = connection.ConfigureAwait(false);
await foreach (var row in connection.GetRowsFromSnapshot(
snapshotName,
options.PublicationOptions.TableName,
options.PublicationOptions.TableDescriptor,
options.DataMapper,
options.PublicationOptions.TypeResolver.Keys().ToHashSet(),
ct).ConfigureAwait(false))
Expand Down
Loading

0 comments on commit 1f4aa7a

Please sign in to comment.