Skip to content

Commit 8603699

Browse files
committed
Add runtime test for event adapters with multiple persistence configurations
This test reproduces the scenario from issue #552 where: 1. WithSqlPersistence is called with event adapters (for tagging) 2. WithSqlPersistence is called again with separate journal/snapshot options (for sharding) The test uses Akka.Hosting.TestKit for cleaner lifecycle management and follows the same pattern as other end-to-end tests in the project. Test verifies: - Creates a persistent actor - Persists events that should be tagged by the event adapter - Queries by tag using EventsByTag - Asserts that all 3 events are found with the correct tag On v1.5.51: TEST PASSES ✓ Event adapters work correctly even with multiple WithSqlPersistence calls. Next step: Test on v1.5.51.1 to see if regression exists.
1 parent 2756708 commit 8603699

File tree

1 file changed

+194
-0
lines changed

1 file changed

+194
-0
lines changed
Lines changed: 194 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,194 @@
1+
using System;
2+
using System.Linq;
3+
using System.Threading.Tasks;
4+
using Akka.Actor;
5+
using Akka.Hosting;
6+
using Akka.Persistence;
7+
using Akka.Persistence.Journal;
8+
using Akka.Persistence.Query;
9+
using Akka.Persistence.Sql.Hosting;
10+
using Akka.Persistence.Sql.Query;
11+
using Akka.Persistence.Sql.Tests.Common.Containers;
12+
using Akka.Streams;
13+
using Akka.Streams.Dsl;
14+
using FluentAssertions;
15+
using Microsoft.Extensions.Logging;
16+
using Xunit;
17+
using Xunit.Abstractions;
18+
19+
namespace Akka.Persistence.Sql.Hosting.Tests;
20+
21+
/// <summary>
22+
/// Tests runtime behavior of event adapters when combined with multiple journal configurations
23+
/// (simulating the scenario where WithSqlPersistence is called with adapters, then
24+
/// WithSqlPersistence is called again for sharding with separate journal/snapshot options)
25+
/// </summary>
26+
public class RuntimeEventAdapterSpec : Akka.Hosting.TestKit.TestKit, IClassFixture<SqliteContainer>
27+
{
28+
private const string TestTag = "test-tag";
29+
private const string PersistenceId = "test-1";
30+
31+
private readonly SqliteContainer _fixture;
32+
33+
public RuntimeEventAdapterSpec(ITestOutputHelper output, SqliteContainer fixture)
34+
: base(nameof(RuntimeEventAdapterSpec), output, logLevel: LogLevel.Debug)
35+
{
36+
_fixture = fixture;
37+
}
38+
39+
protected override async Task BeforeTestStart()
40+
{
41+
await base.BeforeTestStart();
42+
await _fixture.InitializeAsync();
43+
}
44+
45+
protected override void ConfigureAkka(AkkaConfigurationBuilder builder, IServiceProvider provider)
46+
{
47+
// Mimic the user's scenario from issue #552:
48+
// 1. First call: Set up global persistence with event adapters (for tagging)
49+
builder.WithSqlPersistence(
50+
connectionString: _fixture.ConnectionString,
51+
providerName: _fixture.ProviderName,
52+
journalBuilder: journal => journal
53+
.AddWriteEventAdapter<TestEventTagger>("test-tagger",
54+
new[] { typeof(TestEvent) }));
55+
56+
// 2. Second call: Set up separate journal/snapshot options (like sharding does)
57+
// This is the key issue - does this overwrite the event adapters?
58+
var shardingJournalOptions = new SqlJournalOptions(isDefaultPlugin: false, identifier: "sharding")
59+
{
60+
ConnectionString = _fixture.ConnectionString,
61+
ProviderName = _fixture.ProviderName,
62+
AutoInitialize = true
63+
};
64+
65+
var shardingSnapshotOptions = new SqlSnapshotOptions(isDefaultPlugin: false, identifier: "sharding")
66+
{
67+
ConnectionString = _fixture.ConnectionString,
68+
ProviderName = _fixture.ProviderName,
69+
AutoInitialize = true
70+
};
71+
72+
builder.WithSqlPersistence(
73+
journalOptions: shardingJournalOptions,
74+
snapshotOptions: shardingSnapshotOptions);
75+
}
76+
77+
// Test event and adapter - mimics the user's MessageTagger
78+
public sealed class TestEvent
79+
{
80+
public TestEvent(string data)
81+
{
82+
Data = data;
83+
}
84+
85+
public string Data { get; }
86+
}
87+
88+
public sealed class TestEventTagger : IWriteEventAdapter
89+
{
90+
public string Manifest(object evt) => string.Empty;
91+
92+
public object ToJournal(object evt)
93+
{
94+
return evt switch
95+
{
96+
TestEvent => new Tagged(evt, new[] { TestTag }),
97+
_ => evt
98+
};
99+
}
100+
}
101+
102+
// Test persistent actor
103+
public sealed class TestPersistentActor : ReceivePersistentActor
104+
{
105+
public sealed class SaveEvent
106+
{
107+
public SaveEvent(string data)
108+
{
109+
Data = data;
110+
}
111+
112+
public string Data { get; }
113+
}
114+
115+
public sealed class GetState
116+
{
117+
public static readonly GetState Instance = new();
118+
private GetState() { }
119+
}
120+
121+
public sealed class State
122+
{
123+
public State(string[] events)
124+
{
125+
Events = events;
126+
}
127+
128+
public string[] Events { get; }
129+
}
130+
131+
private readonly System.Collections.Generic.List<string> _events = new();
132+
133+
public TestPersistentActor(string persistenceId)
134+
{
135+
PersistenceId = persistenceId;
136+
137+
Command<SaveEvent>(cmd =>
138+
{
139+
var evt = new TestEvent(cmd.Data);
140+
Persist(evt, _ =>
141+
{
142+
_events.Add(cmd.Data);
143+
Sender.Tell("OK");
144+
});
145+
});
146+
147+
Command<GetState>(_ =>
148+
{
149+
Sender.Tell(new State(_events.ToArray()));
150+
});
151+
152+
Recover<TestEvent>(evt =>
153+
{
154+
_events.Add(evt.Data);
155+
});
156+
}
157+
158+
public override string PersistenceId { get; }
159+
}
160+
161+
[Fact]
162+
public async Task EventAdapter_ShouldWork_WhenFollowedByWithSqlPersistence()
163+
{
164+
// Arrange
165+
var persistentActor = Sys.ActorOf(Props.Create(() => new TestPersistentActor(PersistenceId)));
166+
167+
// Act - persist some events
168+
await persistentActor.Ask<string>(new TestPersistentActor.SaveEvent("event-1"), TimeSpan.FromSeconds(5));
169+
await persistentActor.Ask<string>(new TestPersistentActor.SaveEvent("event-2"), TimeSpan.FromSeconds(5));
170+
await persistentActor.Ask<string>(new TestPersistentActor.SaveEvent("event-3"), TimeSpan.FromSeconds(5));
171+
172+
// Query by tag - this should work if event adapters are configured correctly
173+
var readJournal = PersistenceQuery.Get(Sys)
174+
.ReadJournalFor<SqlReadJournal>(SqlReadJournal.Identifier);
175+
176+
var source = readJournal.EventsByTag(TestTag, Offset.NoOffset());
177+
var materializer = Sys.Materializer();
178+
179+
var eventsTask = source
180+
.Take(3)
181+
.RunWith(Sink.Seq<EventEnvelope>(), materializer)
182+
.ContinueWith(t => t.Result.ToList());
183+
184+
var events = await eventsTask.WaitAsync(TimeSpan.FromSeconds(10));
185+
186+
// Assert - verify that events were tagged (meaning event adapter worked)
187+
events.Should().HaveCount(3, "all 3 events should be tagged");
188+
189+
var eventData = events.Select(e => ((TestEvent)e.Event).Data).ToList();
190+
eventData.Should().Contain("event-1");
191+
eventData.Should().Contain("event-2");
192+
eventData.Should().Contain("event-3");
193+
}
194+
}

0 commit comments

Comments
 (0)