Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Standardize store naming: Connection/Context/Category #276

Merged
merged 4 commits into from
Feb 24, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ The `Unreleased` section name is replaced by the expected version of next releas
### Changed

- Rename `type Stream` to `Decider` [#272](https://github.com/jet/equinox/pull/272) :pray: [@thinkbeforecoding](https://github.com/thinkbeforecoding)
- Standardise naming of top level structure: `<StoreName>Connection` (wraps the relevant `*Client` for that store), `Context`, `Category` [#276](https://github.com/jet/equinox/pull/276)
- Rename `Resolver` -> `<StoreName>Category`
- Rename `Context` -> `<StoreName>Context`
- Fork `Equinox.Cosmos` to `Equinox.CosmosStore`:
- target `Microsoft.Azure.Cosmos` v `3.9.0` (instead of `Microsoft.Azure.DocumentDB`[`.Core`] v 2.x) [#144](https://github.com/jet/equinox/pull/144)
- Removed [warmup call](https://github.com/Azure/azure-cosmos-dotnet-v3/issues/1436)
Expand Down
10 changes: 5 additions & 5 deletions DOCUMENTATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -404,9 +404,9 @@ module EventStore =
let create (context, cache) =
let cacheStrategy =
Equinox.EventStore.CachingStrategy.SlidingWindow (cache, System.TimeSpan.FromMinutes 20.)
let resolver =
Equinox.EventStore.Resolver(context, Events.codec, Fold.fold, Fold.initial, cacheStrategy, accessStrategy)
create resolver.Resolve
let cat =
Equinox.EventStore.EventStoreCategory(context, Events.codec, Fold.fold, Fold.initial, cacheStrategy, accessStrategy)
create cat.Resolve

module Cosmos =
let accessStrategy =
Expand All @@ -428,8 +428,8 @@ can use the `MemoryStore` in the context of your tests:
```fsharp
module MemoryStore =
let create (store : Equinox.MemoryStore.VolatileStore) =
let resolver = Equinox.MemoryStore.Resolver(store, Events.codec, Fold.fold, Fold.initial)
create resolver.Resolve
let cat = Equinox.MemoryStore.MemoryStoreCategory(store, Events.codec, Fold.fold, Fold.initial)
create cat.Resolve
```

Typically that binding module can live with your test helpers rather than
Expand Down
14 changes: 7 additions & 7 deletions samples/Infrastructure/Services.fs
Original file line number Diff line number Diff line change
Expand Up @@ -16,30 +16,30 @@ type StreamResolver(storage) =
Equinox.CosmosStore.CosmosStoreCategory<'event,'state,_>(store, codec, fold, initial, caching, accessStrategy).Resolve
| Storage.StorageConfig.Es (context, caching, unfolds) ->
let accessStrategy = if unfolds then Equinox.EventStore.AccessStrategy.RollingSnapshots snapshot |> Some else None
Equinox.EventStore.Resolver<'event,'state,_>(context, codec, fold, initial, ?caching = caching, ?access = accessStrategy).Resolve
Equinox.EventStore.EventStoreCategory<'event,'state,_>(context, codec, fold, initial, ?caching = caching, ?access = accessStrategy).Resolve
| Storage.StorageConfig.Memory store ->
Equinox.MemoryStore.Resolver(store, codec, fold, initial).Resolve
Equinox.MemoryStore.MemoryStoreCategory(store, codec, fold, initial).Resolve
| Storage.StorageConfig.Sql (context, caching, unfolds) ->
let accessStrategy = if unfolds then Equinox.SqlStreamStore.AccessStrategy.RollingSnapshots snapshot |> Some else None
Equinox.SqlStreamStore.Resolver<'event,'state,_>(context, codec, fold, initial, ?caching = caching, ?access = accessStrategy).Resolve
Equinox.SqlStreamStore.SqlStreamStoreCategory<'event,'state,_>(context, codec, fold, initial, ?caching = caching, ?access = accessStrategy).Resolve

type ServiceBuilder(storageConfig, handlerLog) =
let resolver = StreamResolver(storageConfig)
let cat = StreamResolver(storageConfig)

member __.CreateFavoritesService() =
let fold, initial = Favorites.Fold.fold, Favorites.Fold.initial
let snapshot = Favorites.Fold.isOrigin,Favorites.Fold.snapshot
Favorites.create handlerLog (resolver.Resolve(Favorites.Events.codec,fold,initial,snapshot))
Favorites.create handlerLog (cat.Resolve(Favorites.Events.codec,fold,initial,snapshot))

member __.CreateSaveForLaterService() =
let fold, initial = SavedForLater.Fold.fold, SavedForLater.Fold.initial
let snapshot = SavedForLater.Fold.isOrigin,SavedForLater.Fold.compact
SavedForLater.create 50 handlerLog (resolver.Resolve(SavedForLater.Events.codec,fold,initial,snapshot))
SavedForLater.create 50 handlerLog (cat.Resolve(SavedForLater.Events.codec,fold,initial,snapshot))

member __.CreateTodosService() =
let fold, initial = TodoBackend.Fold.fold, TodoBackend.Fold.initial
let snapshot = TodoBackend.Fold.isOrigin, TodoBackend.Fold.snapshot
TodoBackend.create handlerLog (resolver.Resolve(TodoBackend.Events.codec,fold,initial,snapshot))
TodoBackend.create handlerLog (cat.Resolve(TodoBackend.Events.codec,fold,initial,snapshot))

let register (services : IServiceCollection, storageConfig, handlerLog) =
let regF (factory : IServiceProvider -> 'T) = services.AddSingleton<'T>(fun (sp: IServiceProvider) -> factory sp) |> ignore
Expand Down
50 changes: 25 additions & 25 deletions samples/Infrastructure/Storage.fs
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ exception MissingArg of string
type StorageConfig =
// For MemoryStore, we keep the events as UTF8 arrays - we could use FsCodec.Codec.Box to remove the JSON encoding, which would improve perf but can conceal problems
| Memory of Equinox.MemoryStore.VolatileStore<byte[]>
| Es of Equinox.EventStore.Context * Equinox.EventStore.CachingStrategy option * unfolds: bool
| Es of Equinox.EventStore.EventStoreContext * Equinox.EventStore.CachingStrategy option * unfolds: bool
| Cosmos of Equinox.CosmosStore.CosmosStoreContext * Equinox.CosmosStore.CachingStrategy * unfolds: bool
| Sql of Equinox.SqlStreamStore.Context * Equinox.SqlStreamStore.CachingStrategy option * unfolds: bool
| Sql of Equinox.SqlStreamStore.SqlStreamStoreContext * Equinox.SqlStreamStore.CachingStrategy option * unfolds: bool

module MemoryStore =
type [<NoEquality; NoComparison>] Arguments =
Expand Down Expand Up @@ -93,30 +93,30 @@ module Cosmos =

let logContainer (log: ILogger) name (mode, endpoint, db, container) =
log.Information("CosmosDB {name:l} {mode} {connection} Database {database} Container {container}", name, mode, endpoint, db, container)
let connect (a : Info) connectionString =
let createClient (a : Info) connectionString =
CosmosStoreClientFactory(a.Timeout, a.Retries, a.MaxRetryWaitTime, mode=a.Mode).Create(Discovery.ConnectionString connectionString)
let conn (log : ILogger) (a : Info) =
let (primaryClient, primaryDatabase, primaryContainer) as primary = connect a a.Connection, a.Database, a.Container
let connect (log : ILogger) (a : Info) =
let (primaryClient, primaryDatabase, primaryContainer) as primary = createClient a a.Connection, a.Database, a.Container
logContainer log "Primary" (a.Mode, primaryClient.Endpoint, primaryDatabase, primaryContainer)
let secondary =
match a.Secondary with
| Some (Some c2, db, container) -> Some (connect a c2, db, container)
| Some (Some c2, db, container) -> Some (createClient a c2, db, container)
| Some (None, db, container) -> Some (primaryClient, db, container)
| None -> None
secondary |> Option.iter (fun (client, db, container) -> logContainer log "Secondary" (a.Mode, client.Endpoint, db, container))
primary, secondary
let config (log : ILogger) (cache, unfolds) (a : Info) =
let conn =
match conn log a with
let connection =
match connect log a with
| (client, databaseId, containerId), None ->
CosmosStoreConnection(client, databaseId, containerId)
| (client, databaseId, containerId), Some (client2, db2, cont2) ->
CosmosStoreConnection(client, databaseId, containerId, client2 = client2, databaseId2 = db2, containerId2 = cont2)
log.Information("CosmosStore Max Events in Tip: {maxTipEvents}e {maxTipJsonLength}b Items in Query: {queryMaxItems}",
a.TipMaxEvents, a.TipMaxJsonLength, a.QueryMaxItems)
let ctx = CosmosStoreContext.Create(conn, queryMaxItems = a.QueryMaxItems, tipMaxEvents = a.TipMaxEvents, tipMaxJsonLength = a.TipMaxJsonLength)
let context = CosmosStoreContext.Create(connection, queryMaxItems = a.QueryMaxItems, tipMaxEvents = a.TipMaxEvents, tipMaxJsonLength = a.TipMaxJsonLength)
let cacheStrategy = match cache with Some c -> CachingStrategy.SlidingWindow (c, TimeSpan.FromMinutes 20.) | None -> CachingStrategy.NoCaching
StorageConfig.Cosmos (ctx, cacheStrategy, unfolds)
StorageConfig.Cosmos (context, cacheStrategy, unfolds)

/// To establish a local node to run the tests against:
/// 1. cinst eventstore-oss -y # where cinst is an invocation of the Chocolatey Package Installer on Windows
Expand Down Expand Up @@ -163,17 +163,17 @@ module EventStore =
log=(if log.IsEnabled(Serilog.Events.LogEventLevel.Debug) then Logger.SerilogVerbose log else Logger.SerilogNormal log),
tags=["M", Environment.MachineName; "I", Guid.NewGuid() |> string])
.Establish(appName, Discovery.GossipDns dnsQuery, ConnectionStrategy.ClusterTwinPreferSlaveReads)
let private createGateway connection batchSize = Context(connection, BatchingPolicy(maxBatchSize = batchSize))
let private createContext connection batchSize = EventStoreContext(connection, BatchingPolicy(maxBatchSize = batchSize))
let config (log: ILogger, storeLog) (cache, unfolds) (args : ParseResults<Arguments>) =
let a = Info(args)
let (timeout, retries) as operationThrottling = a.Timeout, a.Retries
let heartbeatTimeout = a.HeartbeatTimeout
let concurrentOperationsLimit = a.ConcurrentOperationsLimit
log.Information("EventStoreDB {host} heartbeat: {heartbeat}s timeout: {timeout}s concurrent reqs: {concurrency} retries {retries}",
a.Host, heartbeatTimeout.TotalSeconds, timeout.TotalSeconds, concurrentOperationsLimit, retries)
let conn = connect storeLog (a.Host, heartbeatTimeout, concurrentOperationsLimit) a.Credentials operationThrottling |> Async.RunSynchronously
let connection = connect storeLog (a.Host, heartbeatTimeout, concurrentOperationsLimit) a.Credentials operationThrottling |> Async.RunSynchronously
let cacheStrategy = cache |> Option.map (fun c -> CachingStrategy.SlidingWindow (c, TimeSpan.FromMinutes 20.))
StorageConfig.Es ((createGateway conn a.MaxEvents), cacheStrategy, unfolds)
StorageConfig.Es ((createContext connection a.MaxEvents), cacheStrategy, unfolds)

module Sql =
open Equinox.SqlStreamStore
Expand Down Expand Up @@ -202,12 +202,12 @@ module Sql =
let connect (log : ILogger) (connectionString,schema,credentials,autoCreate) =
let sssConnectionString = String.Join(";", connectionString, credentials)
log.Information("SqlStreamStore MsSql Connection {connectionString} Schema {schema} AutoCreate {autoCreate}", connectionString, schema, autoCreate)
Equinox.SqlStreamStore.MsSql.Connector(sssConnectionString,schema,autoCreate=autoCreate).Establish(appName)
let private createGateway connection batchSize = Context(connection, BatchingPolicy(maxBatchSize = batchSize))
Equinox.SqlStreamStore.MsSql.Connector(sssConnectionString,schema,autoCreate=autoCreate).Establish()
let private createContext connection batchSize = SqlStreamStoreContext(connection, BatchingPolicy(maxBatchSize = batchSize))
let config (log: ILogger) (cache, unfolds) (args : ParseResults<Arguments>) =
let a = Info(args)
let conn = connect log (a.ConnectionString, a.Schema, a.Credentials, a.AutoCreate) |> Async.RunSynchronously
StorageConfig.Sql((createGateway conn a.MaxEvents), cacheStrategy cache, unfolds)
let connection = connect log (a.ConnectionString, a.Schema, a.Credentials, a.AutoCreate) |> Async.RunSynchronously
StorageConfig.Sql((createContext connection a.MaxEvents), cacheStrategy cache, unfolds)
module My =
type [<NoEquality; NoComparison>] Arguments =
| [<AltCommandLine "-c"; Mandatory>] ConnectionString of string
Expand All @@ -228,12 +228,12 @@ module Sql =
let connect (log : ILogger) (connectionString,credentials,autoCreate) =
let sssConnectionString = String.Join(";", connectionString, credentials)
log.Information("SqlStreamStore MySql Connection {connectionString} AutoCreate {autoCreate}", connectionString, autoCreate)
Equinox.SqlStreamStore.MySql.Connector(sssConnectionString,autoCreate=autoCreate).Establish(appName)
let private createGateway connection batchSize = Context(connection, BatchingPolicy(maxBatchSize = batchSize))
Equinox.SqlStreamStore.MySql.Connector(sssConnectionString,autoCreate=autoCreate).Establish()
let private createContext connection batchSize = SqlStreamStoreContext(connection, BatchingPolicy(maxBatchSize = batchSize))
let config (log: ILogger) (cache, unfolds) (args : ParseResults<Arguments>) =
let a = Info(args)
let conn = connect log (a.ConnectionString, a.Credentials, a.AutoCreate) |> Async.RunSynchronously
StorageConfig.Sql((createGateway conn a.MaxEvents), cacheStrategy cache, unfolds)
let connection = connect log (a.ConnectionString, a.Credentials, a.AutoCreate) |> Async.RunSynchronously
StorageConfig.Sql((createContext connection a.MaxEvents), cacheStrategy cache, unfolds)
module Pg =
type [<NoEquality; NoComparison>] Arguments =
| [<AltCommandLine "-c"; Mandatory>] ConnectionString of string
Expand All @@ -257,9 +257,9 @@ module Sql =
let connect (log : ILogger) (connectionString,schema,credentials,autoCreate) =
let sssConnectionString = String.Join(";", connectionString, credentials)
log.Information("SqlStreamStore Postgres Connection {connectionString} Schema {schema} AutoCreate {autoCreate}", connectionString, schema, autoCreate)
Equinox.SqlStreamStore.Postgres.Connector(sssConnectionString,schema,autoCreate=autoCreate).Establish(appName)
let private createGateway connection batchSize = Context(connection, BatchingPolicy(maxBatchSize = batchSize))
Equinox.SqlStreamStore.Postgres.Connector(sssConnectionString,schema,autoCreate=autoCreate).Establish()
let private createContext connection batchSize = SqlStreamStoreContext(connection, BatchingPolicy(maxBatchSize = batchSize))
let config (log: ILogger) (cache, unfolds) (args : ParseResults<Arguments>) =
let a = Info(args)
let conn = connect log (a.ConnectionString, a.Schema, a.Credentials, a.AutoCreate) |> Async.RunSynchronously
StorageConfig.Sql((createGateway conn a.MaxEvents), cacheStrategy cache, unfolds)
let connection = connect log (a.ConnectionString, a.Schema, a.Credentials, a.AutoCreate) |> Async.RunSynchronously
StorageConfig.Sql((createContext connection a.MaxEvents), cacheStrategy cache, unfolds)
24 changes: 12 additions & 12 deletions samples/Store/Integration/CartIntegration.fs
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,14 @@ let snapshot = Domain.Cart.Fold.isOrigin, Domain.Cart.Fold.snapshot

let createMemoryStore () = MemoryStore.VolatileStore<byte[]>()
let createServiceMemory log store =
Cart.create log (fun (id,opt) -> MemoryStore.Resolver(store, Domain.Cart.Events.codec, fold, initial).Resolve(id,?option=opt))
Cart.create log (fun (id,opt) -> MemoryStore.MemoryStoreCategory(store, Domain.Cart.Events.codec, fold, initial).Resolve(id,?option=opt))

let codec = Domain.Cart.Events.codec

let resolveGesStreamWithRollingSnapshots gateway =
fun (id,opt) -> EventStore.Resolver(gateway, codec, fold, initial, access = EventStore.AccessStrategy.RollingSnapshots snapshot).Resolve(id,?option=opt)
let resolveGesStreamWithoutCustomAccessStrategy gateway =
fun (id,opt) -> EventStore.Resolver(gateway, codec, fold, initial).Resolve(id,?option=opt)
let resolveGesStreamWithRollingSnapshots context =
fun (id,opt) -> EventStore.EventStoreCategory(context, codec, fold, initial, access = EventStore.AccessStrategy.RollingSnapshots snapshot).Resolve(id,?option=opt)
let resolveGesStreamWithoutCustomAccessStrategy context =
fun (id,opt) -> EventStore.EventStoreCategory(context, codec, fold, initial).Resolve(id,?option=opt)

let resolveCosmosStreamWithSnapshotStrategy context =
fun (id,opt) -> CosmosStore.CosmosStoreCategory(context, codec, fold, initial, CosmosStore.CachingStrategy.NoCaching, CosmosStore.AccessStrategy.Snapshot snapshot).Resolve(id,?option=opt)
Expand Down Expand Up @@ -53,26 +53,26 @@ type Tests(testOutputHelper) =

let arrangeEs connect choose resolve = async {
let log = createLog ()
let! conn = connect log
let gateway = choose conn defaultBatchSize
return Cart.create log (resolve gateway) }
let! client = connect log
let context = choose client defaultBatchSize
return Cart.create log (resolve context) }

[<AutoData(SkipIfRequestedViaEnvironmentVariable="EQUINOX_INTEGRATION_SKIP_EVENTSTORE")>]
let ``Can roundtrip against EventStore, correctly folding the events without compaction semantics`` args = Async.RunSynchronously <| async {
let! service = arrangeEs connectToLocalEventStoreNode createGesGateway resolveGesStreamWithoutCustomAccessStrategy
let! service = arrangeEs connectToLocalEventStoreNode createContext resolveGesStreamWithoutCustomAccessStrategy
do! act service args
}

[<AutoData(SkipIfRequestedViaEnvironmentVariable="EQUINOX_INTEGRATION_SKIP_EVENTSTORE")>]
let ``Can roundtrip against EventStore, correctly folding the events with RollingSnapshots`` args = Async.RunSynchronously <| async {
let! service = arrangeEs connectToLocalEventStoreNode createGesGateway resolveGesStreamWithRollingSnapshots
let! service = arrangeEs connectToLocalEventStoreNode createContext resolveGesStreamWithRollingSnapshots
do! act service args
}

let arrangeCosmos connect resolve =
let log = createLog ()
let ctx : CosmosStore.CosmosStoreContext = connect log defaultQueryMaxItems
Cart.create log (resolve ctx)
let context : CosmosStore.CosmosStoreContext = connect log defaultQueryMaxItems
Cart.create log (resolve context)

[<AutoData(SkipIfRequestedViaEnvironmentVariable="EQUINOX_INTEGRATION_SKIP_COSMOS")>]
let ``Can roundtrip against Cosmos, correctly folding the events without custom access strategy`` args = Async.RunSynchronously <| async {
Expand Down
Loading