Skip to content

Commit

Permalink
Rebased cosmos support for web host and CLI (#55)
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Dec 6, 2018
1 parent e8bfab3 commit d976da3
Show file tree
Hide file tree
Showing 10 changed files with 188 additions and 11 deletions.
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,11 @@ For EventStore, the tests assume a running local instance configured as follows
# run as a single-node cluster to allow connection logic to use cluster mode as for a commercial cluster
& $env:ProgramData\chocolatey\bin\EventStore.ClusterNode.exe --gossip-on-single-node --discover-via-dns 0 --ext-http-port=30778

## CosmosDb (when not using -sc)

dotnet run -f netcoreapp2.1 -p cli/equinox.cli -- init -ru 10000 `
cosmos -s $env:EQUINOX_COSMOS_CONNECTION -d $env:EQUINOX_COSMOS_DATABASE -c $env:EQUINOX_COSMOS_COLLECTION

# DEPROVISIONING

## Deprovisioning (aka nuking) EventStore data resulting from tests to reset baseline
Expand Down
6 changes: 3 additions & 3 deletions build.ps1
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ $env:EQUINOX_INTEGRATION_SKIP_EVENTSTORE=[string]$skipEs
if ($skipEs) { warn "Skipping EventStore tests" }

function cliCosmos($arghs) {
Write-Host "dotnet run cli/Equinox.Cli cosmos -s <REDACTED> -d $cosmosDatabase -c $cosmosCollection $arghs"
dotnet run -p cli/Equinox.Cli -f netcoreapp2.1 cosmos -s $cosmosServer -d $cosmosDatabase -c $cosmosCollection @arghs
Write-Host "dotnet run cli/Equinox.Cli -- $arghs cosmos -s <REDACTED> -d $cosmosDatabase -c $cosmosCollection"
dotnet run -p cli/Equinox.Cli -f netcoreapp2.1 -- @arghs cosmos -s $cosmosServer -d $cosmosDatabase -c $cosmosCollection
}

if ($skipCosmos) {
Expand All @@ -30,7 +30,7 @@ if ($skipCosmos) {
warn "Skipping Provisioning Cosmos"
} else {
warn "Provisioning cosmos..."
cliCosmos @("provision", "-ru", "1000")
cliCosmos @("init", "-ru", "1000")
$deprovisionCosmos=$true
}
$env:EQUINOX_INTEGRATION_SKIP_COSMOS=[string]$skipCosmos
Expand Down
54 changes: 53 additions & 1 deletion cli/Equinox.Cli/Program.fs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ open Argu
open Domain.Infrastructure
open Equinox.Cli.Infrastructure
open Microsoft.Extensions.DependencyInjection
open Samples.Infrastructure.Log
open Samples.Infrastructure.Storage
open Serilog
open Serilog.Events
Expand All @@ -18,13 +19,22 @@ type Arguments =
| [<AltCommandLine("-S")>] LocalSeq
| [<AltCommandLine("-l")>] LogFile of string
| [<CliPrefix(CliPrefix.None); Last; Unique; AltCommandLine>] Run of ParseResults<TestArguments>
| [<CliPrefix(CliPrefix.None); Last; Unique; AltCommandLine("init")>] Initialize of ParseResults<InitArguments>
interface IArgParserTemplate with
member a.Usage = a |> function
| Verbose -> "Include low level logging regarding specific test runs."
| VerboseConsole -> "Include low level test and store actions logging in on-screen output to console."
| LocalSeq -> "Configures writing to a local Seq endpoint at http://localhost:5341, see https://getseq.net"
| LogFile _ -> "specify a log file to write the result breakdown into (default: Equinox.Cli.log)."
| Run _ -> "Run a load test"
| Initialize _ -> "Initialize a store"
and [<NoComparison>]InitArguments =
| [<AltCommandLine("-ru"); Mandatory>] Rus of int
| [<CliPrefix(CliPrefix.None)>] Cosmos of ParseResults<CosmosArguments>
interface IArgParserTemplate with
member a.Usage = a |> function
| Rus _ -> "Specify RU/s level to provision for the Application Collection."
| Cosmos _ -> "Cosmos Connection parameters."
and [<NoComparison>]WebArguments =
| [<AltCommandLine("-u")>] Endpoint of string
interface IArgParserTemplate with
Expand All @@ -41,6 +51,7 @@ and [<NoComparison>]
| [<AltCommandLine("-i")>] ReportIntervalS of int
| [<CliPrefix(CliPrefix.None); Last; Unique>] Memory of ParseResults<Samples.Infrastructure.Storage.MemArguments>
| [<CliPrefix(CliPrefix.None); Last; Unique>] Es of ParseResults<Samples.Infrastructure.Storage.EsArguments>
| [<CliPrefix(CliPrefix.None); Last; Unique>] Cosmos of ParseResults<Samples.Infrastructure.Storage.CosmosArguments>
| [<CliPrefix(CliPrefix.None); Last; Unique>] Web of ParseResults<WebArguments>
interface IArgParserTemplate with
member a.Usage = a |> function
Expand All @@ -53,11 +64,13 @@ and [<NoComparison>]
| ReportIntervalS _ -> "specify reporting intervals in seconds (default: 10)."
| Memory _ -> "target in-process Transient Memory Store (Default if not other target specified)."
| Es _ -> "Run transactions in-process against EventStore."
| Cosmos _ -> "Run transactions in-process against CosmosDb."
| Web _ -> "Run transactions against a Web endpoint."

let createStoreLog verbose verboseConsole maybeSeqEndpoint =
let c = LoggerConfiguration().Destructure.FSharpTypes()
let c = if verbose then c.MinimumLevel.Debug() else c
let c = c.WriteTo.Sink(RuCounterSink())
let c = c.WriteTo.Console((if verbose && verboseConsole then LogEventLevel.Debug else LogEventLevel.Warning), theme = Sinks.SystemConsole.Themes.AnsiConsoleTheme.Code)
let c = match maybeSeqEndpoint with None -> c | Some endpoint -> c.WriteTo.Seq(endpoint)
c.CreateLogger() :> ILogger
Expand Down Expand Up @@ -95,6 +108,10 @@ module LoadTest =
let storeLog = createStoreLog <| sargs.Contains EsArguments.VerboseStore
log.Information("Running transactions in-process against EventStore with storage options: {options:l}", options)
storeLog, EventStore.config (log,storeLog) (cache, unfolds) sargs |> Some, None
| Some (Cosmos sargs) ->
let storeLog = createStoreLog <| sargs.Contains CosmosArguments.VerboseStore
log.Information("Running transactions in-process against CosmosDb with storage options: {options:l}", options)
storeLog, Cosmos.config (log,storeLog) (cache, unfolds) sargs |> Some, None
| _ | Some (Memory _) ->
log.Warning("Running transactions in-process against Volatile Store with storage options: {options:l}", options)
createStoreLog false, MemoryStore.config () |> Some, None
Expand Down Expand Up @@ -130,11 +147,37 @@ module LoadTest =
resultFile.Information("Aggregate: {aggregate}", r)
log.Information("Run completed; Current memory allocation: {bytes:n2} MiB", (GC.GetTotalMemory(true) |> float) / 1024./1024.)

match storeConfig with
| Some (StorageConfig.Cosmos _) ->
let stats =
[ "Read", RuCounterSink.Read
"Write", RuCounterSink.Write
"Resync", RuCounterSink.Resync ]
let mutable totalCount, totalRc, totalMs = 0L, 0., 0L
let logActivity name count rc lat =
log.Information("{name}: {count:n0} requests costing {ru:n0} RU (average: {avg:n2}); Average latency: {lat:n0}ms",
name, count, rc, (if count = 0L then Double.NaN else rc/float count), (if count = 0L then Double.NaN else float lat/float count))
for name, stat in stats do
let ru = float stat.rux100 / 100.
totalCount <- totalCount + stat.count
totalRc <- totalRc + ru
totalMs <- totalMs + stat.ms
logActivity name stat.count ru stat.ms
logActivity "TOTAL" totalCount totalRc totalMs
let measures : (string * (TimeSpan -> float)) list =
[ "s", fun x -> x.TotalSeconds
"m", fun x -> x.TotalMinutes
"h", fun x -> x.TotalHours ]
let logPeriodicRate name count ru = log.Information("rp{name} {count:n0} = ~{ru:n0} RU", name, count, ru)
let duration = args.GetResult(DurationM,1.) |> TimeSpan.FromMinutes
for uom, f in measures do let d = f duration in if d <> 0. then logPeriodicRate uom (float totalCount/d |> int64) (totalRc/d)
| _ -> ()

let createDomainLog verbose verboseConsole maybeSeqEndpoint =
let c = LoggerConfiguration().Destructure.FSharpTypes().Enrich.FromLogContext()
let c = if verbose then c.MinimumLevel.Debug() else c
let c = c.WriteTo.Sink(RuCounterSink())
let c = c.WriteTo.Console((if verboseConsole then LogEventLevel.Debug else LogEventLevel.Warning), theme = Sinks.SystemConsole.Themes.AnsiConsoleTheme.Code)
let c = c.WriteTo.Console((if verboseConsole then LogEventLevel.Debug else LogEventLevel.Information), theme = Sinks.SystemConsole.Themes.AnsiConsoleTheme.Code)
let c = match maybeSeqEndpoint with None -> c | Some endpoint -> c.WriteTo.Seq(endpoint)
c.CreateLogger()

Expand All @@ -149,6 +192,15 @@ let main argv =
let verbose = args.Contains Verbose
let log = createDomainLog verbose verboseConsole maybeSeq
match args.GetSubCommand() with
| Initialize iargs ->
let rus = iargs.GetResult(Rus)
match iargs.TryGetSubCommand() with
| Some (InitArguments.Cosmos sargs) ->
let storeLog = createStoreLog (sargs.Contains CosmosArguments.VerboseStore) verboseConsole maybeSeq
let dbName, collName, (_pageSize: int), conn = Cosmos.conn (log,storeLog) sargs
log.Information("Configuring CosmosDb Collection with Throughput Provision: {rus:n0} RU/s", rus)
Equinox.Cosmos.Store.Sync.Initialization.initialize log conn.Client dbName collName rus |> Async.RunSynchronously
| _ -> failwith "please specify a `cosmos` endpoint"
| Run rargs ->
let reportFilename = args.GetResult(LogFile,programName+".log") |> fun n -> System.IO.FileInfo(n).FullName
LoadTest.run log (verbose,verboseConsole,maybeSeq) reportFilename rargs
Expand Down
2 changes: 2 additions & 0 deletions samples/Store/Infrastructure/Infrastructure.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,12 @@
<ItemGroup>
<Compile Include="Storage.fs" />
<Compile Include="Services.fs" />
<Compile Include="Log.fs" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\..\..\src\Equinox\Equinox.fsproj" />
<ProjectReference Include="..\..\..\src\Equinox.Cosmos\Equinox.Cosmos.fsproj" />
<ProjectReference Include="..\..\..\src\Equinox.EventStore\Equinox.EventStore.fsproj" />
<ProjectReference Include="..\..\..\src\Equinox.MemoryStore\Equinox.MemoryStore.fsproj" PrivateAssets="all" />
<ProjectReference Include="..\Backend\Backend.fsproj" />
Expand Down
43 changes: 43 additions & 0 deletions samples/Store/Infrastructure/Log.fs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
module Samples.Infrastructure.Log

open Serilog.Events

[<AutoOpen>]
module SerilogHelpers =
open Equinox.Cosmos.Store
let inline (|Stats|) ({ interval = i; ru = ru }: Log.Measurement) = ru, let e = i.Elapsed in int64 e.TotalMilliseconds

let (|CosmosReadRc|CosmosWriteRc|CosmosResyncRc|CosmosResponseRc|) = function
| Log.Tip (Stats s)
| Log.TipNotFound (Stats s)
| Log.TipNotModified (Stats s)
| Log.Query (_,_, (Stats s)) -> CosmosReadRc s
// slices are rolled up into batches so be sure not to double-count
| Log.Response (_,(Stats s)) -> CosmosResponseRc s
| Log.SyncSuccess (Stats s)
| Log.SyncConflict (Stats s) -> CosmosWriteRc s
| Log.SyncResync (Stats s) -> CosmosResyncRc s
let (|SerilogScalar|_|) : LogEventPropertyValue -> obj option = function
| (:? ScalarValue as x) -> Some x.Value
| _ -> None
let (|CosmosMetric|_|) (logEvent : LogEvent) : Log.Event option =
match logEvent.Properties.TryGetValue("cosmosEvt") with
| true, SerilogScalar (:? Log.Event as e) -> Some e
| _ -> None
type RuCounter =
{ mutable rux100: int64; mutable count: int64; mutable ms: int64 }
static member Create() = { rux100 = 0L; count = 0L; ms = 0L }
member __.Ingest (ru, ms) =
System.Threading.Interlocked.Increment(&__.count) |> ignore
System.Threading.Interlocked.Add(&__.rux100, int64 (ru*100.)) |> ignore
System.Threading.Interlocked.Add(&__.ms, ms) |> ignore
type RuCounterSink() =
static member val Read = RuCounter.Create()
static member val Write = RuCounter.Create()
static member val Resync = RuCounter.Create()
interface Serilog.Core.ILogEventSink with
member __.Emit logEvent = logEvent |> function
| CosmosMetric (CosmosReadRc stats) -> RuCounterSink.Read.Ingest stats
| CosmosMetric (CosmosWriteRc stats) -> RuCounterSink.Write.Ingest stats
| CosmosMetric (CosmosResyncRc stats) -> RuCounterSink.Resync.Ingest stats
| _ -> ()
4 changes: 4 additions & 0 deletions samples/Store/Infrastructure/Services.fs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ type StreamResolver(storage) =
| Storage.StorageConfig.Es (gateway, cache, unfolds) ->
let accessStrategy = if unfolds then Equinox.EventStore.AccessStrategy.RollingSnapshots snapshot |> Some else None
Equinox.EventStore.GesResolver<'event,'state>(gateway, codec, fold, initial, ?access = accessStrategy, ?caching = cache).Resolve
| Storage.StorageConfig.Cosmos (gateway, cache, unfolds, databaseId, connectionId) ->
let store = Equinox.Cosmos.EqxStore(gateway, Equinox.Cosmos.EqxCollections(databaseId, connectionId))
let accessStrategy = if unfolds then Equinox.Cosmos.AccessStrategy.Snapshot snapshot |> Some else None
Equinox.Cosmos.EqxResolver<'event,'state>(store, codec, fold, initial, ?access = accessStrategy, ?caching = cache).Resolve

type ServiceBuilder(storageConfig, handlerLog) =
let resolver = StreamResolver(storageConfig)
Expand Down
64 changes: 62 additions & 2 deletions samples/Store/Infrastructure/Storage.fs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ type [<NoEquality; NoComparison>] MemArguments =
interface IArgParserTemplate with
member a.Usage = a |> function
| VerboseStore -> "Include low level Store logging."
and [<NoEquality; NoComparison>] EsArguments =
type [<NoEquality; NoComparison>] EsArguments =
| [<AltCommandLine("-vs")>] VerboseStore
| [<AltCommandLine("-o")>] Timeout of float
| [<AltCommandLine("-r")>] Retries of int
Expand All @@ -28,12 +28,35 @@ and [<NoEquality; NoComparison>] EsArguments =
| Password _ -> "specify a Password (default: changeit)."
| ConcurrentOperationsLimit _ -> "max concurrent operations in flight (default: 5000)."
| HeartbeatTimeout _ -> "specify heartbeat timeout in seconds (default: 1.5)."
type [<NoEquality; NoComparison>] CosmosArguments =
| [<AltCommandLine("-vs")>] VerboseStore
| [<AltCommandLine("-m")>] ConnectionMode of Equinox.Cosmos.ConnectionMode
| [<AltCommandLine("-o")>] Timeout of float
| [<AltCommandLine("-r")>] Retries of int
| [<AltCommandLine("-s")>] Connection of string
| [<AltCommandLine("-d")>] Database of string
| [<AltCommandLine("-c")>] Collection of string
| [<AltCommandLine("-rt")>] RetriesWaitTime of int
| [<AltCommandLine("-a")>] PageSize of int
interface IArgParserTemplate with
member a.Usage = a |> function
| VerboseStore -> "Include low level Store logging."
| ConnectionMode _ -> "Override the connection mode (default: DirectTcp)."
| Timeout _ -> "specify operation timeout in seconds (default: 5)."
| Retries _ -> "specify operation retries (default: 1)."
| Connection _ -> "specify a connection string for a Cosmos account (defaults: envvar:EQUINOX_COSMOS_CONNECTION, Cosmos Emulator)."
| Database _ -> "specify a database name for Cosmos account (defaults: envvar:EQUINOX_COSMOS_DATABASE, test)."
| Collection _ -> "specify a collection name for Cosmos account (defaults: envvar:EQUINOX_COSMOS_COLLECTION, test)."
| RetriesWaitTime _ -> "specify max wait-time for retry when being throttled by Cosmos in seconds (default: 5)"
| PageSize _ -> "Specify maximum number of events to record on a page before switching to a new one (default: 1)"

let defaultBatchSize = 500

[<RequireQualifiedAccess; NoEquality; NoComparison>]
type StorageConfig =
| Memory of Equinox.MemoryStore.VolatileStore
| Es of Equinox.EventStore.GesGateway * Equinox.EventStore.CachingStrategy option * unfolds: bool
| Cosmos of Equinox.Cosmos.EqxGateway * Equinox.Cosmos.CachingStrategy option * unfolds: bool * databaseId: string * collectionId: string

module MemoryStore =
let config () =
Expand Down Expand Up @@ -69,4 +92,41 @@ module EventStore =
let c = Caching.Cache("Cli", sizeMb = 50)
CachingStrategy.SlidingWindow (c, TimeSpan.FromMinutes 20.) |> Some
else None
StorageConfig.Es ((createGateway conn defaultBatchSize), cacheStrategy, unfolds)
StorageConfig.Es ((createGateway conn defaultBatchSize), cacheStrategy, unfolds)

module Cosmos =
open Equinox.Cosmos

/// Standing up an Equinox instance is necessary to run for test purposes; You'll need to either:
/// 1) replace connection below with a connection string or Uri+Key for an initialized Equinox instance with a database and collection named "equinox-test"
/// 2) Set the 3x environment variables and create a local Equinox using cli/Equinox.cli/bin/Release/net461/Equinox.Cli `
/// cosmos -s $env:EQUINOX_COSMOS_CONNECTION -d $env:EQUINOX_COSMOS_DATABASE -c $env:EQUINOX_COSMOS_COLLECTION provision -ru 1000
let private connect (log: ILogger) mode discovery operationTimeout (maxRetryForThrottling, maxRetryWaitTime) =
EqxConnector(log=log, mode=mode, requestTimeout=operationTimeout, maxRetryAttemptsOnThrottledRequests=maxRetryForThrottling, maxRetryWaitTimeInSeconds=maxRetryWaitTime)
.Connect("equinox-cli", discovery)
let private createGateway connection (maxItems,maxEvents) = EqxGateway(connection, EqxBatchingPolicy(defaultMaxItems=maxItems, maxEventsPerSlice=maxEvents))
let conn (log: ILogger, storeLog) (sargs : ParseResults<CosmosArguments>) =
let read key = Environment.GetEnvironmentVariable key |> Option.ofObj

let (Discovery.UriAndKey (connUri,_)) as discovery =
sargs.GetResult(Connection, defaultArg (read "EQUINOX_COSMOS_CONNECTION") "AccountEndpoint=https://localhost:8081;AccountKey=C2y6yDjf5/R+ob0N8A7Cgv30VRDJIWEHLM+4QDU5DE2nQ9nDuVTqobD4b8mGGyPMbIZnqyMsEcaGQy67XIw/Jw==;")
|> Discovery.FromConnectionString

let dbName = sargs.GetResult(Database, defaultArg (read "EQUINOX_COSMOS_DATABASE") "equinox-test")
let collName = sargs.GetResult(Collection, defaultArg (read "EQUINOX_COSMOS_COLLECTION") "equinox-test")
let timeout = sargs.GetResult(Timeout,5.) |> float |> TimeSpan.FromSeconds
let mode = sargs.GetResult(ConnectionMode,ConnectionMode.DirectTcp)
let (retries, maxRetryWaitTime) as operationThrottling = sargs.GetResult(Retries, 1), sargs.GetResult(RetriesWaitTime, 5)
let pageSize = sargs.GetResult(PageSize,1)
log.Information("Using CosmosDb {mode} Connection {connection} Database: {database} Collection: {collection} maxEventsPerSlice: {pageSize}. " +
"Request timeout: {timeout} with {retries} retries; throttling MaxRetryWaitTime {maxRetryWaitTime}",
mode, connUri, dbName, collName, pageSize, timeout, retries, maxRetryWaitTime)
dbName, collName, pageSize, connect storeLog mode discovery timeout operationThrottling |> Async.RunSynchronously
let config (log: ILogger, storeLog) (cache, unfolds) (sargs : ParseResults<CosmosArguments>) =
let dbName, collName, pageSize, conn = conn (log, storeLog) sargs
let cacheStrategy =
if cache then
let c = Caching.Cache("Cli", sizeMb = 50)
CachingStrategy.SlidingWindow (c, TimeSpan.FromMinutes 20.) |> Some
else None
StorageConfig.Cosmos (createGateway conn (defaultBatchSize,pageSize), cacheStrategy, unfolds, dbName, collName)
3 changes: 3 additions & 0 deletions samples/Store/Web/Program.fs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ open Argu
open Microsoft.AspNetCore
open Microsoft.AspNetCore.Hosting
open Microsoft.Extensions.DependencyInjection
open Samples.Infrastructure.Log
open Serilog

module Program =
Expand All @@ -27,6 +28,8 @@ module Program =
.MinimumLevel.Override("Microsoft", Serilog.Events.LogEventLevel.Warning)
.Enrich.FromLogContext()
.WriteTo.Console()
// TOCONSIDER log and reset every minute or something ?
.WriteTo.Sink(RuCounterSink())
let c =
let maybeSeq = if args.Contains LocalSeq then Some "http://localhost:5341" else None
match maybeSeq with None -> c | Some endpoint -> c.WriteTo.Seq(endpoint)
Expand Down
Loading

0 comments on commit d976da3

Please sign in to comment.