Skip to content

Commit 95711c3

Browse files
committed
Rough example of how to use Projac with SqlStreamStore.
1 parent 36c95b3 commit 95711c3

File tree

3 files changed

+181
-0
lines changed

3 files changed

+181
-0
lines changed

src/Recipes/Recipes.csproj

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,9 @@
8989
<HintPath>..\packages\RavenDB.Database.3.0.3800\lib\net45\Raven.Database.dll</HintPath>
9090
<Private>True</Private>
9191
</Reference>
92+
<Reference Include="SqlStreamStore, Version=0.5.1.0, Culture=neutral, processorArchitecture=MSIL">
93+
<HintPath>..\packages\SqlStreamStore.0.5.1\lib\Portable-Net45+Win8+WP8+WPA81\SqlStreamStore.dll</HintPath>
94+
</Reference>
9295
<Reference Include="StackExchange.Redis, Version=1.0.316.0, Culture=neutral, processorArchitecture=MSIL">
9396
<HintPath>..\packages\StackExchange.Redis.1.0.481\lib\net45\StackExchange.Redis.dll</HintPath>
9497
<Private>True</Private>
@@ -124,6 +127,7 @@
124127
<Compile Include="RavenDBIntegration\TestingUsage.cs" />
125128
<Compile Include="RedisIntegration\Usage.cs" />
126129
<Compile Include="SQLiteIntegration\Usage.cs" />
130+
<Compile Include="SqlStreamStoreIntegration\Usage.cs" />
127131
<Compile Include="Syntax\Usage.cs" />
128132
<Compile Include="WindowsAzureStorageIntegration\PortfolioModel.cs" />
129133
<Compile Include="WindowsAzureStorageIntegration\RebuildProjection.cs" />
Lines changed: 176 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,176 @@
1+
using System;
2+
using System.Configuration;
3+
using System.Data;
4+
using System.Linq;
5+
using System.Threading.Tasks;
6+
using Newtonsoft.Json;
7+
using NUnit.Framework;
8+
using Paramol.Executors;
9+
using Paramol.SqlClient;
10+
using Projac;
11+
using Recipes.DataDefinition;
12+
using Recipes.Shared;
13+
using SqlStreamStore;
14+
using SqlStreamStore.Streams;
15+
16+
namespace Recipes.SqlStreamStoreIntegration
17+
{
18+
[TestFixture, Ignore("Because 'Explicit' is not respected by R#")]
19+
public class Usage
20+
{
21+
[Test]
22+
public async Task ShowWithStream()
23+
{
24+
//setup a projection schema (one of many ways)
25+
var projector = new SqlProjector(
26+
Resolve.WhenEqualToHandlerMessageType(new PortfolioProjection()),
27+
new TransactionalSqlCommandExecutor(
28+
new ConnectionStringSettings(
29+
"projac",
30+
@"Data Source=(localdb)\ProjectsV12;Initial Catalog=ProjacUsage;Integrated Security=SSPI;",
31+
"System.Data.SqlClient"),
32+
IsolationLevel.ReadCommitted));
33+
projector.Project(new object[] { new DropSchema(), new CreateSchema() });
34+
35+
//setup a memory eventstore
36+
var store = new InMemoryStreamStore();
37+
38+
//setup a sample stream (using some sample events)
39+
var portfolioId = Guid.NewGuid();
40+
var events = new object[]
41+
{
42+
new PortfolioAdded {Id = portfolioId, Name = "My Portfolio"},
43+
new PortfolioRenamed {Id = portfolioId, Name = "Your Portfolio"},
44+
new PortfolioRemoved {Id = portfolioId}
45+
};
46+
var stream = string.Format("portfolio-{0}", portfolioId.ToString("N"));
47+
await store.AppendToStream(
48+
stream,
49+
ExpectedVersion.Any,
50+
events
51+
.Select(@event => new NewStreamMessage(
52+
Guid.NewGuid(),
53+
@event.GetType().FullName,
54+
JsonConvert.SerializeObject(@event)))
55+
.ToArray());
56+
57+
//project the sample stream (until end of stream)
58+
var result =
59+
await store.ReadStreamForwards(stream, StreamVersion.Start, 1, true);
60+
foreach (var rawMessage in result.Messages)
61+
{
62+
var @event = JsonConvert.DeserializeObject(
63+
await rawMessage.GetJsonData(),
64+
Type.GetType(rawMessage.Type, true));
65+
66+
projector.Project(@event);
67+
}
68+
69+
while (!result.IsEnd)
70+
{
71+
result =
72+
await store.ReadStreamForwards(stream, result.NextStreamVersion, 1, true);
73+
foreach (var rawMessage in result.Messages)
74+
{
75+
var @event = JsonConvert.DeserializeObject(
76+
await rawMessage.GetJsonData(),
77+
Type.GetType(rawMessage.Type, true));
78+
79+
projector.Project(@event);
80+
}
81+
}
82+
}
83+
84+
[Test]
85+
public async Task ShowWithCatchupSubscription()
86+
{
87+
//setup a projection schema (one of many ways)
88+
var projector = new SqlProjector(
89+
Resolve.WhenEqualToHandlerMessageType(new PortfolioProjection()),
90+
new TransactionalSqlCommandExecutor(
91+
new ConnectionStringSettings(
92+
"projac",
93+
@"Data Source=(localdb)\ProjectsV12;Initial Catalog=ProjacUsage;Integrated Security=SSPI;",
94+
"System.Data.SqlClient"),
95+
IsolationLevel.ReadCommitted));
96+
projector.Project(new object[] { new DropSchema(), new CreateSchema() });
97+
98+
//setup a memory eventstore
99+
var store = new InMemoryStreamStore();
100+
101+
//setup a sample stream (using some sample events)
102+
var portfolioId = Guid.NewGuid();
103+
var events = new object[]
104+
{
105+
new PortfolioAdded {Id = portfolioId, Name = "My Portfolio"},
106+
new PortfolioRenamed {Id = portfolioId, Name = "Your Portfolio"},
107+
new PortfolioRemoved {Id = portfolioId}
108+
};
109+
var stream = string.Format("portfolio-{0}", portfolioId.ToString("N"));
110+
await store.AppendToStream(
111+
stream,
112+
ExpectedVersion.Any,
113+
events
114+
.Select(@event => new NewStreamMessage(
115+
Guid.NewGuid(),
116+
@event.GetType().FullName,
117+
JsonConvert.SerializeObject(@event)))
118+
.ToArray());
119+
120+
//project the sample stream (until end of stream)
121+
var subscription = store.SubscribeToStream(stream, null, async (_, rawMessage) =>
122+
{
123+
var @event = JsonConvert.DeserializeObject(
124+
await rawMessage.GetJsonData(),
125+
Type.GetType(rawMessage.Type, true));
126+
127+
projector.Project(@event);
128+
});
129+
//should complete within 5 seconds.
130+
await Task.Delay(TimeSpan.FromSeconds(5));
131+
subscription.Dispose();
132+
}
133+
134+
public class PortfolioProjection : SqlProjection
135+
{
136+
private static readonly SqlClientSyntax Sql = new SqlClientSyntax();
137+
138+
public PortfolioProjection()
139+
{
140+
When<PortfolioAdded>(@event =>
141+
Sql.NonQueryStatement(
142+
"INSERT INTO [Portfolio] ([Id], [Name], [PhotoCount]) VALUES (@P1, @P2, 0)",
143+
new {P1 = Sql.UniqueIdentifier(@event.Id), P2 = Sql.NVarChar(@event.Name, 40)}
144+
));
145+
When<PortfolioRemoved>(@event =>
146+
Sql.NonQueryStatement(
147+
"DELETE FROM [Portfolio] WHERE [Id] = @P1",
148+
new {P1 = Sql.UniqueIdentifier(@event.Id)}
149+
));
150+
When<PortfolioRenamed>(@event =>
151+
Sql.NonQueryStatement(
152+
"UPDATE [Portfolio] SET [Name] = @P2 WHERE [Id] = @P1",
153+
new {P1 = Sql.UniqueIdentifier(@event.Id), P2 = Sql.NVarChar(@event.Name, 40)}
154+
));
155+
156+
When<CreateSchema>(_ =>
157+
Sql.NonQueryStatement(
158+
@"IF NOT EXISTS (SELECT * FROM SYSOBJECTS WHERE NAME='Portfolio' AND XTYPE='U')
159+
BEGIN
160+
CREATE TABLE [Portfolio] (
161+
[Id] UNIQUEIDENTIFIER NOT NULL CONSTRAINT PK_Portfolio PRIMARY KEY,
162+
[Name] NVARCHAR(MAX) NOT NULL,
163+
[PhotoCount] INT NOT NULL)
164+
END"));
165+
When<DropSchema>(_ =>
166+
Sql.NonQueryStatement(
167+
@"IF EXISTS (SELECT * FROM SYSOBJECTS WHERE NAME='Portfolio' AND XTYPE='U')
168+
DROP TABLE [Portfolio]"));
169+
When<DeleteData>(_ =>
170+
Sql.NonQueryStatement(
171+
@"IF EXISTS (SELECT * FROM SYSOBJECTS WHERE NAME='Portfolio' AND XTYPE='U')
172+
DELETE FROM [Portfolio]"));
173+
}
174+
}
175+
}
176+
}

src/Recipes/packages.config

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
<package id="RavenDB.Client" version="3.0.3800" targetFramework="net45" />
1515
<package id="RavenDB.Database" version="3.0.3800" targetFramework="net45" />
1616
<package id="RavenDB.Embedded" version="3.0.3800" targetFramework="net45" />
17+
<package id="SqlStreamStore" version="0.5.1" targetFramework="net45" />
1718
<package id="StackExchange.Redis" version="1.0.481" targetFramework="net45" />
1819
<package id="System.Data.SQLite.Core" version="1.0.99.0" targetFramework="net45" />
1920
<package id="System.Spatial" version="5.6.4" targetFramework="net451" />

0 commit comments

Comments
 (0)