Skip to content

Commit 026e3c3

Browse files
authored
API v3 (#13)
1 parent 673aeee commit 026e3c3

23 files changed

+1049
-920
lines changed

SqlStreamStore.FSharp.sln

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,6 @@ VisualStudioVersion = 15.0.26124.0
55
MinimumVisualStudioVersion = 15.0.26124.0
66
Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "SqlStreamStore.FSharp", "src\SqlStreamStore.FSharp.fsproj", "{22955019-D275-45AB-AD7C-B192BDB3F60F}"
77
EndProject
8-
Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "SqlStreamStore.FSharp.Tests", "tests\SqlStreamStore.FSharp.Tests.fsproj", "{377DE082-49F3-4925-8CFF-5A24329B5512}"
9-
EndProject
108
Global
119
GlobalSection(SolutionConfigurationPlatforms) = preSolution
1210
Debug|Any CPU = Debug|Any CPU
@@ -32,17 +30,5 @@ Global
3230
{22955019-D275-45AB-AD7C-B192BDB3F60F}.Release|x64.Build.0 = Release|Any CPU
3331
{22955019-D275-45AB-AD7C-B192BDB3F60F}.Release|x86.ActiveCfg = Release|Any CPU
3432
{22955019-D275-45AB-AD7C-B192BDB3F60F}.Release|x86.Build.0 = Release|Any CPU
35-
{377DE082-49F3-4925-8CFF-5A24329B5512}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
36-
{377DE082-49F3-4925-8CFF-5A24329B5512}.Debug|Any CPU.Build.0 = Debug|Any CPU
37-
{377DE082-49F3-4925-8CFF-5A24329B5512}.Debug|x64.ActiveCfg = Debug|Any CPU
38-
{377DE082-49F3-4925-8CFF-5A24329B5512}.Debug|x64.Build.0 = Debug|Any CPU
39-
{377DE082-49F3-4925-8CFF-5A24329B5512}.Debug|x86.ActiveCfg = Debug|Any CPU
40-
{377DE082-49F3-4925-8CFF-5A24329B5512}.Debug|x86.Build.0 = Debug|Any CPU
41-
{377DE082-49F3-4925-8CFF-5A24329B5512}.Release|Any CPU.ActiveCfg = Release|Any CPU
42-
{377DE082-49F3-4925-8CFF-5A24329B5512}.Release|Any CPU.Build.0 = Release|Any CPU
43-
{377DE082-49F3-4925-8CFF-5A24329B5512}.Release|x64.ActiveCfg = Release|Any CPU
44-
{377DE082-49F3-4925-8CFF-5A24329B5512}.Release|x64.Build.0 = Release|Any CPU
45-
{377DE082-49F3-4925-8CFF-5A24329B5512}.Release|x86.ActiveCfg = Release|Any CPU
46-
{377DE082-49F3-4925-8CFF-5A24329B5512}.Release|x86.Build.0 = Release|Any CPU
4733
EndGlobalSection
4834
EndGlobal

src/Append.fs

Lines changed: 46 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,51 @@
11
namespace SqlStreamStore.FSharp
22

3+
open FSharp.Prelude
4+
open System.Threading
5+
open SqlStreamStore.Streams
6+
7+
[<RequireQualifiedAccess>]
8+
type AppendOption =
9+
| CancellationToken of CancellationToken
10+
| ExpectedVersion of int
11+
12+
module Append =
13+
let streamMessages'
14+
(messages: NewStreamMessage list)
15+
(appendOptions: AppendOption list)
16+
: Stream -> AsyncResult<AppendResult, exn> =
17+
18+
let mutable expectedVersion = ExpectedVersion.Any
19+
let mutable cancellationToken = Unchecked.defaultof<CancellationToken>
20+
21+
appendOptions
22+
|> List.iter
23+
(function
24+
| AppendOption.CancellationToken token -> cancellationToken <- token
25+
| AppendOption.ExpectedVersion version -> expectedVersion <- version)
26+
27+
fun (Stream stream) ->
28+
stream.store.AppendToStream(stream.streamId, expectedVersion, List.toArray messages, cancellationToken)
29+
30+
let streamMessages (messages: NewStreamMessage list) : Stream -> AsyncResult<AppendResult, exn> =
31+
streamMessages' messages []
32+
33+
namespace SqlStreamStore.FSharp.EventSourcing
34+
35+
open FSharp.Prelude
36+
open SqlStreamStore.FSharp
337
open SqlStreamStore.Streams
438

539
module Append =
6-
/// Appends a new message to a specific stream.
7-
let appendNewMessage (store: SqlStreamStore.IStreamStore)
8-
(streamName: string)
9-
(appendVersion: AppendVersion)
10-
(messageDetails: MessageDetails)
11-
: Async<Result<AppendResult, exn>> =
12-
AppendRaw.appendNewMessage store streamName appendVersion messageDetails
13-
|> ExceptionsHandler.asyncExceptionHandler
14-
15-
/// Appends a list of messages to a specific stream.
16-
let appendNewMessages (store: SqlStreamStore.IStreamStore)
17-
(streamName: string)
18-
(appendVersion: AppendVersion)
19-
(messages: MessageDetails list)
20-
: Async<Result<AppendResult, exn>> =
21-
AppendRaw.appendNewMessages store streamName appendVersion messages
22-
|> ExceptionsHandler.asyncExceptionHandler
40+
let streamEvents'
41+
(events: NewStreamEvent<'event> list)
42+
(appendOptions: AppendOption list)
43+
: Stream -> AsyncResult<AppendResult, exn> =
44+
45+
fun stream ->
46+
List.traverseResultM NewStreamEvent.toNewStreamMessage events
47+
|> Async.singleton
48+
|> AsyncResult.bind (fun messages -> Append.streamMessages' messages appendOptions stream)
49+
50+
let streamEvents (events: NewStreamEvent<'a> list) : Stream -> AsyncResult<AppendResult, exn> =
51+
streamEvents' events []

src/AppendRaw.fs

Lines changed: 0 additions & 54 deletions
This file was deleted.

src/Async.fs

Lines changed: 0 additions & 35 deletions
This file was deleted.

src/Connect.fs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
namespace SqlStreamStore.FSharp
2+
3+
open SqlStreamStore
4+
5+
type private StreamInternal =
6+
{
7+
store: IStreamStore
8+
streamId: string
9+
}
10+
11+
type Stream = private Stream of StreamInternal
12+
13+
module Connect =
14+
let toStream streamId store =
15+
Stream { streamId = streamId; store = store }

src/Create.fs

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
namespace SqlStreamStore.FSharp
2+
3+
open FSharp.Prelude
4+
open SqlStreamStore
5+
6+
type PostgresConfig =
7+
{
8+
host: string
9+
port: string
10+
username: string
11+
password: string
12+
database: string
13+
}
14+
member this.ToConnectionString(?maxPoolSize) : string =
15+
let maxPoolSize' = defaultArg maxPoolSize "10"
16+
17+
sprintf
18+
"Host=%s;Port=%s;Username=%s;Password=%s;Database=%s;Pooling=true;Minimum Pool Size=0;Maximum Pool Size=%s"
19+
this.host
20+
this.port
21+
this.username
22+
this.password
23+
this.database
24+
maxPoolSize'
25+
26+
module Create =
27+
28+
/// Represents an in-memory implementation of a stream store. Use for testing or high/speed + volatile scenarios.
29+
let inMemoryStore : unit -> InMemoryStreamStore = fun _ -> new InMemoryStreamStore()
30+
31+
let postgresStore (config: PostgresConfig) (schema: string option) : PostgresStreamStore =
32+
33+
let storeSettings =
34+
let settings =
35+
PostgresStreamStoreSettings(config.ToConnectionString())
36+
37+
match schema with
38+
| None -> settings
39+
| Some schema' ->
40+
settings.Schema <- schema'
41+
settings
42+
43+
new PostgresStreamStore(storeSettings)
44+
45+
let schemaIfNotExists (store: PostgresStreamStore) : AsyncResult<IStreamStore, exn> =
46+
asyncResult {
47+
do! store.CreateSchemaIfNotExists()
48+
return store :> IStreamStore
49+
}

src/ExceptionsHandler.fs

Lines changed: 0 additions & 10 deletions
This file was deleted.

src/Get.fs

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
namespace SqlStreamStore.FSharp
2+
3+
open FSharp.Prelude
4+
open SqlStreamStore.Streams
5+
6+
module Get =
7+
8+
let private curriedMap : (ReadStreamPage -> 'a) -> AsyncResult<ReadStreamPage, exn> -> AsyncResult<'a, exn> =
9+
AsyncResult.map
10+
11+
let messages =
12+
curriedMap (fun page -> page.Messages |> Array.toList)
13+
14+
let messagesData =
15+
messages
16+
>> AsyncResult.bind (List.traverseAsyncResultM (fun msg -> msg.GetJsonData()))
17+
18+
let messagesDataAs<'data> =
19+
messages
20+
>> AsyncResult.bind (List.traverseAsyncResultM (fun msg -> msg.GetJsonDataAs<'data>()))
21+
22+
let status = curriedMap (fun page -> page.Status)
23+
24+
let isEnd = curriedMap (fun page -> page.IsEnd)
25+
26+
let readDirection =
27+
curriedMap (fun page -> page.ReadDirection)
28+
29+
let streamId = curriedMap (fun page -> page.StreamId)
30+
31+
let fromStreamVersion =
32+
curriedMap (fun page -> page.FromStreamVersion)
33+
34+
let lastStreamPosition =
35+
curriedMap (fun page -> page.LastStreamPosition)
36+
37+
let nextStreamVersion =
38+
curriedMap (fun page -> page.NextStreamVersion)
39+
40+
41+
namespace SqlStreamStore.FSharp.EventSourcing
42+
43+
open FSharp.Prelude
44+
open SqlStreamStore.FSharp
45+
open SqlStreamStore.Streams
46+
47+
module Get =
48+
49+
let events<'event> =
50+
Get.messages
51+
>> Async.map (
52+
Result.bind (
53+
List.filter (fun msg -> msg.Type.Contains eventPrefix)
54+
>> List.traverseResultM StreamEvent.ofStreamMessage<'event>
55+
)
56+
)
57+
58+
let eventsData<'event> =
59+
events<'event>
60+
>> AsyncResult.bind (List.traverseAsyncResultM (fun event -> event.data))
61+
62+
let eventsAndEventsData<'event> =
63+
fun (page: AsyncResult<ReadStreamPage, exn>) ->
64+
asyncResult {
65+
let! events' = events<'event> page
66+
let! data = List.traverseAsyncResultM (fun event -> event.data) events'
67+
return List.zip events' data
68+
}
69+
70+
let eventDataAsString<'event> =
71+
events<'event>
72+
>> AsyncResult.bind (List.traverseAsyncResultM (fun event -> event.dataAsString))

0 commit comments

Comments
 (0)