Skip to content

Commit

Permalink
add PublisherOptions
Browse files Browse the repository at this point in the history
  • Loading branch information
lsfera committed Jul 17, 2024
1 parent 8db4187 commit c446721
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 8 deletions.
12 changes: 6 additions & 6 deletions src/Blumchen/Publications/MessageAppender.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ namespace Blumchen.Publications;
public static class MessageAppender
{
public static async Task AppendAsync<T>(T @input
, (TableDescriptorBuilder.MessageTable tableDescriptor, IJsonTypeResolver jsonTypeResolver) resolver
, PublisherOptions resolver
, NpgsqlConnection connection
, NpgsqlTransaction transaction
, CancellationToken ct
Expand All @@ -18,10 +18,10 @@ public static async Task AppendAsync<T>(T @input
case null:
throw new ArgumentNullException(nameof(@input));
case IEnumerable inputs:
await AppendBatchAsyncOfT(inputs, resolver.tableDescriptor, resolver.jsonTypeResolver, connection, transaction, ct).ConfigureAwait(false);
await AppendBatchAsyncOfT(inputs, resolver.TableDescriptor, resolver.JsonTypeResolver, connection, transaction, ct).ConfigureAwait(false);
break;
default:
await AppendAsyncOfT(input, resolver.tableDescriptor, resolver.jsonTypeResolver, connection, transaction, ct).ConfigureAwait(false);
await AppendAsyncOfT(input, resolver.TableDescriptor, resolver.JsonTypeResolver, connection, transaction, ct).ConfigureAwait(false);
break;
}
}
Expand All @@ -46,20 +46,20 @@ private static async Task AppendAsyncOfT<T>(T input
}

public static async Task AppendAsync<T>(T input
, (TableDescriptorBuilder.MessageTable tableDescriptor, IJsonTypeResolver resolver) options
, PublisherOptions options
, string connectionString
, CancellationToken ct)
where T: class
{
var type = typeof(T);
var (typeName, jsonTypeInfo) = options.resolver.Resolve(type);
var (typeName, jsonTypeInfo) = options.JsonTypeResolver.Resolve(type);
var data = JsonSerialization.ToJson(input, jsonTypeInfo);

await using var connection = new NpgsqlConnection(connectionString);
await connection.OpenAsync(ct).ConfigureAwait(false);
await using var command = connection.CreateCommand();
command.CommandText =
$"INSERT INTO {options.tableDescriptor.Name}({options.tableDescriptor.MessageType.Name}, {options.tableDescriptor.Data.Name}) values ('{typeName}', '{data}')";
$"INSERT INTO {options.TableDescriptor.Name}({options.TableDescriptor.MessageType.Name}, {options.TableDescriptor.Data.Name}) values ('{typeName}', '{data}')";
await command.PrepareAsync(ct).ConfigureAwait(false);
await command.ExecuteNonQueryAsync(ct).ConfigureAwait(false);
}
Expand Down
5 changes: 5 additions & 0 deletions src/Blumchen/Publications/PublisherOptions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
using Blumchen.Serialization;

namespace Blumchen.Publications;

public record PublisherOptions(TableDescriptorBuilder.MessageTable TableDescriptor, IJsonTypeResolver JsonTypeResolver);
4 changes: 2 additions & 2 deletions src/Blumchen/Publications/PublisherSetupOptionsBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public PublisherSetupOptionsBuilder WithTable(Func<TableDescriptorBuilder, Table
return this;
}

public (MessageTable tableDescriptor, IJsonTypeResolver jsonTypeResolver) Build()
public PublisherOptions Build()
{
ArgumentNullException.ThrowIfNull(_jsonSerializerContext);
ArgumentNullException.ThrowIfNull(_namingPolicy);
Expand All @@ -48,6 +48,6 @@ public PublisherSetupOptionsBuilder WithTable(Func<TableDescriptorBuilder, Table
while (typeEnum.MoveNext())
jsonTypeResolver.WhiteList(typeEnum.Current);

return (_tableDescriptor,jsonTypeResolver);
return new(_tableDescriptor,jsonTypeResolver);
}
}

0 comments on commit c446721

Please sign in to comment.