Skip to content

Commit

Permalink
inject error handler to trace poisoining messages
Browse files Browse the repository at this point in the history
  • Loading branch information
lsfera authored and oskardudycz committed Jul 3, 2024
1 parent a6bc154 commit bcfe721
Showing 1 changed file with 20 additions and 1 deletion.
21 changes: 20 additions & 1 deletion src/Tests/DatabaseFixture.cs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,17 @@ CancellationToken ct
private static string Randomise(string prefix) =>
$"{prefix}_{Guid.NewGuid().ToString().Replace("-", "")}";

protected static (TestConsumer<T> consumer, SubscriptionOptionsBuilder subscriptionOptionsBuilder) SetupFor<T>(
protected static async Task InsertPoisoningMessage(string connectionString, string eventsTable, CancellationToken ct)
{
var connection = new NpgsqlConnection(connectionString);
await using var connection1 = connection.ConfigureAwait(false);
await connection.OpenAsync(ct);
var command = connection.CreateCommand();
command.CommandText = $"INSERT INTO {eventsTable}(message_type, data) values ('urn:message:user-created:v1', '{{\"prop\":\"some faking text\"}}')";
await command.ExecuteNonQueryAsync(ct);
}

protected (TestConsumer<T> consumer, SubscriptionOptionsBuilder subscriptionOptionsBuilder) SetupFor<T>(
string connectionString,
string eventsTable,
JsonSerializerContext info,
Expand All @@ -69,6 +79,7 @@ protected static (TestConsumer<T> consumer, SubscriptionOptionsBuilder subscript
ArgumentNullException.ThrowIfNull(jsonTypeInfo);
var consumer = new TestConsumer<T>(log, jsonTypeInfo);
var subscriptionOptionsBuilder = new SubscriptionOptionsBuilder()
.WithErrorProcessor(new TestOutErrorProcessor(Output))
.ConnectionString(connectionString)
.JsonContext(info)
.NamingPolicy(namingPolicy)
Expand All @@ -82,4 +93,12 @@ protected static (TestConsumer<T> consumer, SubscriptionOptionsBuilder subscript
return (consumer, subscriptionOptionsBuilder);
}

private sealed record TestOutErrorProcessor(ITestOutputHelper Output): IErrorProcessor
{
public Func<Exception, Task> Process => exception =>
{
Output.WriteLine($"record id:{0} resulted in error:{exception.Message}");
return Task.CompletedTask;
};
}
}

0 comments on commit bcfe721

Please sign in to comment.