Skip to content

Add load test for SQL Server subscription #284

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,15 +1,12 @@
// Copyright (C) Ubiquitous AS. All rights reserved
// Licensed under the Apache License, Version 2.0.

using System.Runtime.CompilerServices;

namespace Eventuous.Subscriptions.Filters;

using Consumers;
using Context;

public class ConsumerFilter(IMessageConsumer<IMessageConsumeContext> consumer) : ConsumeFilter<IMessageConsumeContext> {
[MethodImpl(MethodImplOptions.AggressiveInlining)]
protected override ValueTask Send(IMessageConsumeContext context, LinkedListNode<IConsumeFilter>? next)
=> consumer.Consume(context);
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@
<ItemGroup>
<EmbeddedResource Include="Scripts\AppendEvents.sql"/>
<EmbeddedResource Include="Scripts\CheckStream.sql"/>
<EmbeddedResource Include="Scripts\ReadAllBackwards.sql"/>
<EmbeddedResource Include="Scripts\ReadAllForwards.sql"/>
<EmbeddedResource Include="Scripts\ReadStreamBackwards.sql"/>
<EmbeddedResource Include="Scripts\ReadStreamForwards.sql"/>
<EmbeddedResource Include="Scripts\ReadStreamSub.sql"/>
<EmbeddedResource Include="Scripts\_Schema.sql"/>
Expand All @@ -26,4 +28,8 @@
</Compile>
<Using Include="Eventuous.Tools"/>
</ItemGroup>

<ItemGroup>
<InternalsVisibleTo Include="Eventuous.Tests.SqlServer"/>
</ItemGroup>
</Project>
18 changes: 10 additions & 8 deletions src/SqlServer/src/Eventuous.SqlServer/Schema.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,16 @@ namespace Eventuous.SqlServer;
public class Schema(string schema = Schema.DefaultSchema) {
public const string DefaultSchema = "eventuous";

public string AppendEvents => $"{schema}.append_events";
public string ReadStreamForwards => $"{schema}.read_stream_forwards";
public string ReadStreamSub => $"{schema}.read_stream_sub";
public string ReadAllForwards => $"{schema}.read_all_forwards";
public string CheckStream => $"{schema}.check_stream";
public string StreamExists => $"SELECT CAST(IIF(EXISTS(SELECT 1 FROM {schema}.Streams WHERE StreamName = (@name)), 1, 0) AS BIT)";
public string GetCheckpointSql => $"SELECT Position FROM {schema}.Checkpoints where Id=(@checkpointId)";
public string AddCheckpointSql => $"INSERT INTO {schema}.Checkpoints (Id) VALUES ((@checkpointId))";
public string AppendEvents => $"{schema}.append_events";
public string ReadStreamForwards => $"{schema}.read_stream_forwards";
public string ReadStreamBackwards => $"{schema}.read_stream_backwards";
public string ReadStreamSub => $"{schema}.read_stream_sub";
public string ReadAllForwards => $"{schema}.read_all_forwards";
public string ReadAllBackwards => $"{schema}.read_all_backwards";
public string CheckStream => $"{schema}.check_stream";
public string StreamExists => $"SELECT CAST(IIF(EXISTS(SELECT 1 FROM {schema}.Streams WHERE StreamName = (@name)), 1, 0) AS BIT)";
public string GetCheckpointSql => $"SELECT Position FROM {schema}.Checkpoints where Id=(@checkpointId)";
public string AddCheckpointSql => $"INSERT INTO {schema}.Checkpoints (Id) VALUES ((@checkpointId))";
public string UpdateCheckpointSql
=> $"UPDATE {schema}.Checkpoints set Position=(@position) where Id=(@checkpointId)";

Expand Down
15 changes: 15 additions & 0 deletions src/SqlServer/src/Eventuous.SqlServer/Scripts/ReadAllBackwards.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
CREATE OR ALTER PROCEDURE __schema__.read_all_backwards
@from_position bigint,
@count int
AS
BEGIN

SELECT TOP (@count)
MessageId, MessageType, StreamPosition, GlobalPosition,
JsonData, JsonMetadata, Created, StreamName
FROM __schema__.Messages
INNER JOIN __schema__.Streams ON Messages.StreamId = Streams.StreamId
WHERE Messages.GlobalPosition <= @from_position
ORDER BY Messages.GlobalPosition DESC

END
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
CREATE OR ALTER PROCEDURE __schema__.read_stream_backwards
@stream_name NVARCHAR(850),
@count INT
AS
BEGIN

DECLARE @current_version int, @stream_id int

SELECT @current_version = Version, @stream_id = StreamId
FROM __schema__.Streams
WHERE StreamName = @stream_name

IF @stream_id IS NULL
THROW 50001, 'StreamNotFound', 1;

SELECT TOP (@count)
MessageId, MessageType, StreamPosition, GlobalPosition,
JsonData, JsonMetadata, Created
FROM __schema__.Messages
WHERE StreamId = @stream_id AND StreamPosition <= @current_version
ORDER BY Messages.GlobalPosition DESC

END
25 changes: 18 additions & 7 deletions src/SqlServer/src/Eventuous.SqlServer/SqlServerStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,23 @@ CancellationToken cancellationToken
}
}

public Task<StreamEvent[]> ReadEventsBackwards(StreamName stream, int count, CancellationToken cancellationToken)
=> throw new NotImplementedException();
public async Task<StreamEvent[]> ReadEventsBackwards(StreamName stream, int count, CancellationToken cancellationToken) {
await using var connection = await OpenConnection(cancellationToken).NoContext();

await using var cmd = connection.GetStoredProcCommand(_schema.ReadStreamBackwards)
.Add("@stream_name", SqlDbType.NVarChar, stream.ToString())
.Add("@count", SqlDbType.Int, count);

try {
await using var reader = await cmd.ExecuteReaderAsync(cancellationToken).NoContext();

var result = reader.ReadEvents(cancellationToken);

return await result.Select(x => ToStreamEvent(x)).ToArrayAsync(cancellationToken).NoContext();
} catch (SqlException e) when (e.Message.StartsWith("StreamNotFound")) {
throw new StreamNotFound(stream);
}
}

public async Task<AppendEventsResult> AppendEvents(
StreamName stream,
Expand Down Expand Up @@ -130,11 +145,7 @@ CancellationToken cancellationToken
)
=> throw new NotImplementedException();

public Task DeleteStream(
StreamName stream,
ExpectedStreamVersion expectedVersion,
CancellationToken cancellationToken
)
public Task DeleteStream(StreamName stream, ExpectedStreamVersion expectedVersion, CancellationToken cancellationToken)
=> throw new NotImplementedException();

StreamEvent ToStreamEvent(PersistedEvent evt) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ public sealed class IntegrationFixture : IAsyncLifetime {
public IFixture Auto { get; } = new Fixture().Customize(new NodaTimeCustomization());
public GetSqlServerConnection GetConnection { get; private set; } = null!;
public Faker Faker { get; } = new();
public Schema Schema { get; set; }

public string SchemaName { get; }

Expand All @@ -35,15 +36,17 @@ public sealed class IntegrationFixture : IAsyncLifetime {
public async Task InitializeAsync() {
_sqlServer = new SqlEdgeBuilder()
.WithImage("mcr.microsoft.com/azure-sql-edge:latest")
// .WithAutoRemove(false)
// .WithCleanUp(false)
.Build();
await _sqlServer.StartAsync();

var schema = new Schema(SchemaName);
Schema = new Schema(SchemaName);
var connString = _sqlServer.GetConnectionString();
GetConnection = () => GetConn(connString);
await schema.CreateSchema(GetConnection);
await Schema.CreateSchema(GetConnection);
DefaultEventSerializer.SetDefaultSerializer(Serializer);
EventStore = new SqlServerStore(GetConnection, new SqlServerStoreOptions(SchemaName), Serializer);
EventStore = new SqlServerStore(GetConnection, new SqlServerStoreOptions(SchemaName), Serializer);
ActivitySource.AddActivityListener(_listener);

return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public abstract class SubscriptionFixture<T> : IClassFixture<IntegrationFixture>

protected SubscriptionFixture(
IntegrationFixture fixture,
ITestOutputHelper outputHelper,
ITestOutputHelper output,
T handler,
bool subscribeToAll,
bool autoStart = true,
Expand All @@ -31,14 +31,15 @@ protected SubscriptionFixture(
_subscribeToAll = subscribeToAll;
Stream = new StreamName(fixture.Auto.Create<string>());
SchemaName = fixture.GetSchemaName();
_loggerFactory = TestHelpers.Logging.GetLoggerFactory(outputHelper, logLevel);
_loggerFactory = TestHelpers.Logging.GetLoggerFactory(output, logLevel);
_listener = new LoggingEventListener(_loggerFactory);
SubscriptionId = $"test-{Guid.NewGuid():N}";
Handler = handler;
Log = _loggerFactory.CreateLogger(GetType());
}

protected string SubscriptionId { get; }
protected Schema Schema { get; set; }

protected ValueTask Start() => Subscription.SubscribeWithLog(Log);

Expand All @@ -51,8 +52,8 @@ protected SubscriptionFixture(
readonly ILoggerFactory _loggerFactory;

public virtual async Task InitializeAsync() {
var schema = new Schema(SchemaName);
await schema.CreateSchema(_fixture.GetConnection);
Schema = new Schema(SchemaName);
await Schema.CreateSchema(_fixture.GetConnection);

CheckpointStoreOptions = new SqlServerCheckpointStoreOptions { Schema = SchemaName };
CheckpointStore = new SqlServerCheckpointStore(_fixture.GetConnection, CheckpointStoreOptions);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
using System.Data;
using Eventuous.SqlServer;
using Eventuous.SqlServer.Extensions;
using Eventuous.Sut.App;
using Eventuous.Sut.Domain;
using Eventuous.Sut.Subs;
using Eventuous.Tests.SqlServer.Fixtures;
using Hypothesist;

namespace Eventuous.Tests.SqlServer.Subscriptions;

public class LoadTest : SubscriptionFixture<TestEventHandler> {
readonly IntegrationFixture _fixture;
readonly BookingService _service;

public LoadTest(IntegrationFixture fixture, ITestOutputHelper output)
: base(fixture, output, new TestEventHandler(), true, autoStart: false, logLevel: LogLevel.Debug) {
_fixture = fixture;
var eventStore = new SqlServerStore(fixture.GetConnection, new SqlServerStoreOptions(SchemaName));
var store = new AggregateStore(eventStore);
_service = new BookingService(store);
}

[Fact]
public async Task ProduceAndConsumeManyEvents() {
const int count = 55000;
Handler.AssertThat().Any(_ => true);

var generateTask = Task.Run(() => GenerateAndHandleCommands(count));

await Start();
await Task.Delay(TimeSpan.FromMinutes(7));
await Stop();
Handler.Count.Should().Be(count);

var checkpoint = await CheckpointStore.GetLastCheckpoint(SubscriptionId, default);
checkpoint.Position.Value.Should().Be(count - 1);

await using var connection = _fixture.GetConnection();
await connection.OpenAsync();

await using var cmd = connection.GetStoredProcCommand(Schema.ReadAllBackwards)
.Add("@from_position", SqlDbType.BigInt, long.MaxValue)
.Add("@count", SqlDbType.Int, 1);
await using var reader = await cmd.ExecuteReaderAsync(CancellationToken.None);

var result = reader.ReadEvents(CancellationToken.None);

var lastEvent = await result.LastAsync();
lastEvent.GlobalPosition.Should().Be(count - 1);
}

async Task<List<Commands.ImportBooking>> GenerateAndHandleCommands(int count) {
var commands = Enumerable
.Range(0, count)
.Select(_ => DomainFixture.CreateImportBooking())
.ToList();

foreach (var cmd in commands) {
var result = await _service.Handle(cmd, default);

if (result is ErrorResult<BookingState> error) {
throw error.Exception ?? new Exception(error.Message);
}
}

return commands;
}

static BookingEvents.BookingImported ToEvent(Commands.ImportBooking cmd)
=> new(cmd.RoomId, cmd.Price, cmd.CheckIn, cmd.CheckOut);
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public async Task ShouldConsumeProducedEvents() {

await Start();
await Handler.Validate(2.Seconds());
Handler.Count.Should().Be(10);
Handler.Count.Should().Be(testEvents.Count);
await Stop();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,7 @@ async Task<List<BookingImported>> GenerateAndProduceEvents(int count) {

var streamEvents = events.Select(x => new StreamEvent(Guid.NewGuid(), x, new Metadata(), "", 0));

await _eventStore.AppendEvents(
Stream,
ExpectedStreamVersion.Any,
streamEvents.ToArray(),
default
);
await _eventStore.AppendEvents(Stream, ExpectedStreamVersion.Any, streamEvents.ToArray(), default);

return events;
}
Expand Down
21 changes: 5 additions & 16 deletions test/Eventuous.Sut.Domain/BookingEvents.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,13 @@ namespace Eventuous.Sut.Domain;

public static class BookingEvents {
[EventType("RoomBooked")]
public record RoomBooked(
string RoomId,
LocalDate CheckIn,
LocalDate CheckOut,
float Price,
string? GuestId = null
);
public record RoomBooked(string RoomId, LocalDate CheckIn, LocalDate CheckOut, float Price, string? GuestId = null);

[EventType("PaymentRegistered")]
public record BookingPaymentRegistered(
string PaymentId,
float AmountPaid
);
string PaymentId,
float AmountPaid
);

[EventType("OutstandingAmountChanged")]
public record BookingOutstandingAmountChanged(float OutstandingAmount);
Expand All @@ -33,12 +27,7 @@ public record BookingOverpaid(float OverpaidAmount);
public record BookingCancelled;

[EventType("V1.BookingImported")]
public record BookingImported(
string RoomId,
float Price,
LocalDate CheckIn,
LocalDate CheckOut
);
public record BookingImported(string RoomId, float Price, LocalDate CheckIn, LocalDate CheckOut);

// These constants are for test purpose, use inline names in real apps
public static class TypeNames {
Expand Down
3 changes: 2 additions & 1 deletion test/Eventuous.Sut.Subs/TestEventHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ public record TestEvent(string Data, int Number) {

public class TestEventHandler(TimeSpan? delay = null, ITestOutputHelper? output = null) : BaseEventHandler {
readonly TimeSpan _delay = delay ?? TimeSpan.Zero;
readonly string _id = Guid.NewGuid().ToString("N");

public int Count { get; private set; }

Expand All @@ -27,9 +28,9 @@ public IHypothesis<object> AssertThat() {
public Task Validate(TimeSpan timeout) => EnsureHypothesis.Validate(timeout);

public override async ValueTask<EventHandlingStatus> HandleEvent(IMessageConsumeContext context) {
output?.WriteLine(context.Message!.ToString());
await Task.Delay(_delay);
await EnsureHypothesis.Test(context.Message!, context.CancellationToken);
output?.WriteLine($"[{_id}] Handled event {context.GlobalPosition}, count is {Count}");
Count++;

return EventHandlingStatus.Success;
Expand Down