From d976da302937886c0d5fd2042caec5cca0b9f6ce Mon Sep 17 00:00:00 2001 From: Ruben Bartelink Date: Thu, 6 Dec 2018 16:00:10 +0000 Subject: [PATCH] Rebased cosmos support for web host and CLI (#55) --- README.md | 5 ++ build.ps1 | 6 +- cli/Equinox.Cli/Program.fs | 54 +++++++++++++++- .../Infrastructure/Infrastructure.fsproj | 2 + samples/Store/Infrastructure/Log.fs | 43 +++++++++++++ samples/Store/Infrastructure/Services.fs | 4 ++ samples/Store/Infrastructure/Storage.fs | 64 ++++++++++++++++++- samples/Store/Web/Program.fs | 3 + samples/Store/Web/Startup.fs | 6 ++ src/Equinox.Cosmos/Cosmos.fs | 12 ++-- 10 files changed, 188 insertions(+), 11 deletions(-) create mode 100644 samples/Store/Infrastructure/Log.fs diff --git a/README.md b/README.md index 9c829c96d..8e17a51d8 100644 --- a/README.md +++ b/README.md @@ -144,6 +144,11 @@ For EventStore, the tests assume a running local instance configured as follows # run as a single-node cluster to allow connection logic to use cluster mode as for a commercial cluster & $env:ProgramData\chocolatey\bin\EventStore.ClusterNode.exe --gossip-on-single-node --discover-via-dns 0 --ext-http-port=30778 +## CosmosDb (when not using -sc) + + dotnet run -f netcoreapp2.1 -p cli/equinox.cli -- init -ru 10000 ` + cosmos -s $env:EQUINOX_COSMOS_CONNECTION -d $env:EQUINOX_COSMOS_DATABASE -c $env:EQUINOX_COSMOS_COLLECTION + # DEPROVISIONING ## Deprovisioning (aka nuking) EventStore data resulting from tests to reset baseline diff --git a/build.ps1 b/build.ps1 index 3b27d0148..9762133db 100644 --- a/build.ps1 +++ b/build.ps1 @@ -20,8 +20,8 @@ $env:EQUINOX_INTEGRATION_SKIP_EVENTSTORE=[string]$skipEs if ($skipEs) { warn "Skipping EventStore tests" } function cliCosmos($arghs) { - Write-Host "dotnet run cli/Equinox.Cli cosmos -s -d $cosmosDatabase -c $cosmosCollection $arghs" - dotnet run -p cli/Equinox.Cli -f netcoreapp2.1 cosmos -s $cosmosServer -d $cosmosDatabase -c $cosmosCollection @arghs + Write-Host "dotnet run cli/Equinox.Cli -- $arghs cosmos -s -d $cosmosDatabase -c $cosmosCollection" + dotnet run -p cli/Equinox.Cli -f netcoreapp2.1 -- @arghs cosmos -s $cosmosServer -d $cosmosDatabase -c $cosmosCollection } if ($skipCosmos) { @@ -30,7 +30,7 @@ if ($skipCosmos) { warn "Skipping Provisioning Cosmos" } else { warn "Provisioning cosmos..." - cliCosmos @("provision", "-ru", "1000") + cliCosmos @("init", "-ru", "1000") $deprovisionCosmos=$true } $env:EQUINOX_INTEGRATION_SKIP_COSMOS=[string]$skipCosmos diff --git a/cli/Equinox.Cli/Program.fs b/cli/Equinox.Cli/Program.fs index b34e5f22b..6c3e0a819 100644 --- a/cli/Equinox.Cli/Program.fs +++ b/cli/Equinox.Cli/Program.fs @@ -4,6 +4,7 @@ open Argu open Domain.Infrastructure open Equinox.Cli.Infrastructure open Microsoft.Extensions.DependencyInjection +open Samples.Infrastructure.Log open Samples.Infrastructure.Storage open Serilog open Serilog.Events @@ -18,6 +19,7 @@ type Arguments = | [] LocalSeq | [] LogFile of string | [] Run of ParseResults + | [] Initialize of ParseResults interface IArgParserTemplate with member a.Usage = a |> function | Verbose -> "Include low level logging regarding specific test runs." @@ -25,6 +27,14 @@ type Arguments = | LocalSeq -> "Configures writing to a local Seq endpoint at http://localhost:5341, see https://getseq.net" | LogFile _ -> "specify a log file to write the result breakdown into (default: Equinox.Cli.log)." | Run _ -> "Run a load test" + | Initialize _ -> "Initialize a store" +and []InitArguments = + | [] Rus of int + | [] Cosmos of ParseResults + interface IArgParserTemplate with + member a.Usage = a |> function + | Rus _ -> "Specify RU/s level to provision for the Application Collection." + | Cosmos _ -> "Cosmos Connection parameters." and []WebArguments = | [] Endpoint of string interface IArgParserTemplate with @@ -41,6 +51,7 @@ and [] | [] ReportIntervalS of int | [] Memory of ParseResults | [] Es of ParseResults + | [] Cosmos of ParseResults | [] Web of ParseResults interface IArgParserTemplate with member a.Usage = a |> function @@ -53,11 +64,13 @@ and [] | ReportIntervalS _ -> "specify reporting intervals in seconds (default: 10)." | Memory _ -> "target in-process Transient Memory Store (Default if not other target specified)." | Es _ -> "Run transactions in-process against EventStore." + | Cosmos _ -> "Run transactions in-process against CosmosDb." | Web _ -> "Run transactions against a Web endpoint." let createStoreLog verbose verboseConsole maybeSeqEndpoint = let c = LoggerConfiguration().Destructure.FSharpTypes() let c = if verbose then c.MinimumLevel.Debug() else c + let c = c.WriteTo.Sink(RuCounterSink()) let c = c.WriteTo.Console((if verbose && verboseConsole then LogEventLevel.Debug else LogEventLevel.Warning), theme = Sinks.SystemConsole.Themes.AnsiConsoleTheme.Code) let c = match maybeSeqEndpoint with None -> c | Some endpoint -> c.WriteTo.Seq(endpoint) c.CreateLogger() :> ILogger @@ -95,6 +108,10 @@ module LoadTest = let storeLog = createStoreLog <| sargs.Contains EsArguments.VerboseStore log.Information("Running transactions in-process against EventStore with storage options: {options:l}", options) storeLog, EventStore.config (log,storeLog) (cache, unfolds) sargs |> Some, None + | Some (Cosmos sargs) -> + let storeLog = createStoreLog <| sargs.Contains CosmosArguments.VerboseStore + log.Information("Running transactions in-process against CosmosDb with storage options: {options:l}", options) + storeLog, Cosmos.config (log,storeLog) (cache, unfolds) sargs |> Some, None | _ | Some (Memory _) -> log.Warning("Running transactions in-process against Volatile Store with storage options: {options:l}", options) createStoreLog false, MemoryStore.config () |> Some, None @@ -130,11 +147,37 @@ module LoadTest = resultFile.Information("Aggregate: {aggregate}", r) log.Information("Run completed; Current memory allocation: {bytes:n2} MiB", (GC.GetTotalMemory(true) |> float) / 1024./1024.) + match storeConfig with + | Some (StorageConfig.Cosmos _) -> + let stats = + [ "Read", RuCounterSink.Read + "Write", RuCounterSink.Write + "Resync", RuCounterSink.Resync ] + let mutable totalCount, totalRc, totalMs = 0L, 0., 0L + let logActivity name count rc lat = + log.Information("{name}: {count:n0} requests costing {ru:n0} RU (average: {avg:n2}); Average latency: {lat:n0}ms", + name, count, rc, (if count = 0L then Double.NaN else rc/float count), (if count = 0L then Double.NaN else float lat/float count)) + for name, stat in stats do + let ru = float stat.rux100 / 100. + totalCount <- totalCount + stat.count + totalRc <- totalRc + ru + totalMs <- totalMs + stat.ms + logActivity name stat.count ru stat.ms + logActivity "TOTAL" totalCount totalRc totalMs + let measures : (string * (TimeSpan -> float)) list = + [ "s", fun x -> x.TotalSeconds + "m", fun x -> x.TotalMinutes + "h", fun x -> x.TotalHours ] + let logPeriodicRate name count ru = log.Information("rp{name} {count:n0} = ~{ru:n0} RU", name, count, ru) + let duration = args.GetResult(DurationM,1.) |> TimeSpan.FromMinutes + for uom, f in measures do let d = f duration in if d <> 0. then logPeriodicRate uom (float totalCount/d |> int64) (totalRc/d) + | _ -> () + let createDomainLog verbose verboseConsole maybeSeqEndpoint = let c = LoggerConfiguration().Destructure.FSharpTypes().Enrich.FromLogContext() let c = if verbose then c.MinimumLevel.Debug() else c let c = c.WriteTo.Sink(RuCounterSink()) - let c = c.WriteTo.Console((if verboseConsole then LogEventLevel.Debug else LogEventLevel.Warning), theme = Sinks.SystemConsole.Themes.AnsiConsoleTheme.Code) + let c = c.WriteTo.Console((if verboseConsole then LogEventLevel.Debug else LogEventLevel.Information), theme = Sinks.SystemConsole.Themes.AnsiConsoleTheme.Code) let c = match maybeSeqEndpoint with None -> c | Some endpoint -> c.WriteTo.Seq(endpoint) c.CreateLogger() @@ -149,6 +192,15 @@ let main argv = let verbose = args.Contains Verbose let log = createDomainLog verbose verboseConsole maybeSeq match args.GetSubCommand() with + | Initialize iargs -> + let rus = iargs.GetResult(Rus) + match iargs.TryGetSubCommand() with + | Some (InitArguments.Cosmos sargs) -> + let storeLog = createStoreLog (sargs.Contains CosmosArguments.VerboseStore) verboseConsole maybeSeq + let dbName, collName, (_pageSize: int), conn = Cosmos.conn (log,storeLog) sargs + log.Information("Configuring CosmosDb Collection with Throughput Provision: {rus:n0} RU/s", rus) + Equinox.Cosmos.Store.Sync.Initialization.initialize log conn.Client dbName collName rus |> Async.RunSynchronously + | _ -> failwith "please specify a `cosmos` endpoint" | Run rargs -> let reportFilename = args.GetResult(LogFile,programName+".log") |> fun n -> System.IO.FileInfo(n).FullName LoadTest.run log (verbose,verboseConsole,maybeSeq) reportFilename rargs diff --git a/samples/Store/Infrastructure/Infrastructure.fsproj b/samples/Store/Infrastructure/Infrastructure.fsproj index d6b399cd1..a7882483b 100644 --- a/samples/Store/Infrastructure/Infrastructure.fsproj +++ b/samples/Store/Infrastructure/Infrastructure.fsproj @@ -11,10 +11,12 @@ + + diff --git a/samples/Store/Infrastructure/Log.fs b/samples/Store/Infrastructure/Log.fs new file mode 100644 index 000000000..834011cf9 --- /dev/null +++ b/samples/Store/Infrastructure/Log.fs @@ -0,0 +1,43 @@ +module Samples.Infrastructure.Log + +open Serilog.Events + +[] +module SerilogHelpers = + open Equinox.Cosmos.Store + let inline (|Stats|) ({ interval = i; ru = ru }: Log.Measurement) = ru, let e = i.Elapsed in int64 e.TotalMilliseconds + + let (|CosmosReadRc|CosmosWriteRc|CosmosResyncRc|CosmosResponseRc|) = function + | Log.Tip (Stats s) + | Log.TipNotFound (Stats s) + | Log.TipNotModified (Stats s) + | Log.Query (_,_, (Stats s)) -> CosmosReadRc s + // slices are rolled up into batches so be sure not to double-count + | Log.Response (_,(Stats s)) -> CosmosResponseRc s + | Log.SyncSuccess (Stats s) + | Log.SyncConflict (Stats s) -> CosmosWriteRc s + | Log.SyncResync (Stats s) -> CosmosResyncRc s + let (|SerilogScalar|_|) : LogEventPropertyValue -> obj option = function + | (:? ScalarValue as x) -> Some x.Value + | _ -> None + let (|CosmosMetric|_|) (logEvent : LogEvent) : Log.Event option = + match logEvent.Properties.TryGetValue("cosmosEvt") with + | true, SerilogScalar (:? Log.Event as e) -> Some e + | _ -> None + type RuCounter = + { mutable rux100: int64; mutable count: int64; mutable ms: int64 } + static member Create() = { rux100 = 0L; count = 0L; ms = 0L } + member __.Ingest (ru, ms) = + System.Threading.Interlocked.Increment(&__.count) |> ignore + System.Threading.Interlocked.Add(&__.rux100, int64 (ru*100.)) |> ignore + System.Threading.Interlocked.Add(&__.ms, ms) |> ignore + type RuCounterSink() = + static member val Read = RuCounter.Create() + static member val Write = RuCounter.Create() + static member val Resync = RuCounter.Create() + interface Serilog.Core.ILogEventSink with + member __.Emit logEvent = logEvent |> function + | CosmosMetric (CosmosReadRc stats) -> RuCounterSink.Read.Ingest stats + | CosmosMetric (CosmosWriteRc stats) -> RuCounterSink.Write.Ingest stats + | CosmosMetric (CosmosResyncRc stats) -> RuCounterSink.Resync.Ingest stats + | _ -> () \ No newline at end of file diff --git a/samples/Store/Infrastructure/Services.fs b/samples/Store/Infrastructure/Services.fs index 13ebae2af..403915346 100644 --- a/samples/Store/Infrastructure/Services.fs +++ b/samples/Store/Infrastructure/Services.fs @@ -18,6 +18,10 @@ type StreamResolver(storage) = | Storage.StorageConfig.Es (gateway, cache, unfolds) -> let accessStrategy = if unfolds then Equinox.EventStore.AccessStrategy.RollingSnapshots snapshot |> Some else None Equinox.EventStore.GesResolver<'event,'state>(gateway, codec, fold, initial, ?access = accessStrategy, ?caching = cache).Resolve + | Storage.StorageConfig.Cosmos (gateway, cache, unfolds, databaseId, connectionId) -> + let store = Equinox.Cosmos.EqxStore(gateway, Equinox.Cosmos.EqxCollections(databaseId, connectionId)) + let accessStrategy = if unfolds then Equinox.Cosmos.AccessStrategy.Snapshot snapshot |> Some else None + Equinox.Cosmos.EqxResolver<'event,'state>(store, codec, fold, initial, ?access = accessStrategy, ?caching = cache).Resolve type ServiceBuilder(storageConfig, handlerLog) = let resolver = StreamResolver(storageConfig) diff --git a/samples/Store/Infrastructure/Storage.fs b/samples/Store/Infrastructure/Storage.fs index b26b4a2be..879015c4a 100644 --- a/samples/Store/Infrastructure/Storage.fs +++ b/samples/Store/Infrastructure/Storage.fs @@ -9,7 +9,7 @@ type [] MemArguments = interface IArgParserTemplate with member a.Usage = a |> function | VerboseStore -> "Include low level Store logging." -and [] EsArguments = +type [] EsArguments = | [] VerboseStore | [] Timeout of float | [] Retries of int @@ -28,12 +28,35 @@ and [] EsArguments = | Password _ -> "specify a Password (default: changeit)." | ConcurrentOperationsLimit _ -> "max concurrent operations in flight (default: 5000)." | HeartbeatTimeout _ -> "specify heartbeat timeout in seconds (default: 1.5)." +type [] CosmosArguments = + | [] VerboseStore + | [] ConnectionMode of Equinox.Cosmos.ConnectionMode + | [] Timeout of float + | [] Retries of int + | [] Connection of string + | [] Database of string + | [] Collection of string + | [] RetriesWaitTime of int + | [] PageSize of int + interface IArgParserTemplate with + member a.Usage = a |> function + | VerboseStore -> "Include low level Store logging." + | ConnectionMode _ -> "Override the connection mode (default: DirectTcp)." + | Timeout _ -> "specify operation timeout in seconds (default: 5)." + | Retries _ -> "specify operation retries (default: 1)." + | Connection _ -> "specify a connection string for a Cosmos account (defaults: envvar:EQUINOX_COSMOS_CONNECTION, Cosmos Emulator)." + | Database _ -> "specify a database name for Cosmos account (defaults: envvar:EQUINOX_COSMOS_DATABASE, test)." + | Collection _ -> "specify a collection name for Cosmos account (defaults: envvar:EQUINOX_COSMOS_COLLECTION, test)." + | RetriesWaitTime _ -> "specify max wait-time for retry when being throttled by Cosmos in seconds (default: 5)" + | PageSize _ -> "Specify maximum number of events to record on a page before switching to a new one (default: 1)" + let defaultBatchSize = 500 [] type StorageConfig = | Memory of Equinox.MemoryStore.VolatileStore | Es of Equinox.EventStore.GesGateway * Equinox.EventStore.CachingStrategy option * unfolds: bool + | Cosmos of Equinox.Cosmos.EqxGateway * Equinox.Cosmos.CachingStrategy option * unfolds: bool * databaseId: string * collectionId: string module MemoryStore = let config () = @@ -69,4 +92,41 @@ module EventStore = let c = Caching.Cache("Cli", sizeMb = 50) CachingStrategy.SlidingWindow (c, TimeSpan.FromMinutes 20.) |> Some else None - StorageConfig.Es ((createGateway conn defaultBatchSize), cacheStrategy, unfolds) \ No newline at end of file + StorageConfig.Es ((createGateway conn defaultBatchSize), cacheStrategy, unfolds) + +module Cosmos = + open Equinox.Cosmos + + /// Standing up an Equinox instance is necessary to run for test purposes; You'll need to either: + /// 1) replace connection below with a connection string or Uri+Key for an initialized Equinox instance with a database and collection named "equinox-test" + /// 2) Set the 3x environment variables and create a local Equinox using cli/Equinox.cli/bin/Release/net461/Equinox.Cli ` + /// cosmos -s $env:EQUINOX_COSMOS_CONNECTION -d $env:EQUINOX_COSMOS_DATABASE -c $env:EQUINOX_COSMOS_COLLECTION provision -ru 1000 + let private connect (log: ILogger) mode discovery operationTimeout (maxRetryForThrottling, maxRetryWaitTime) = + EqxConnector(log=log, mode=mode, requestTimeout=operationTimeout, maxRetryAttemptsOnThrottledRequests=maxRetryForThrottling, maxRetryWaitTimeInSeconds=maxRetryWaitTime) + .Connect("equinox-cli", discovery) + let private createGateway connection (maxItems,maxEvents) = EqxGateway(connection, EqxBatchingPolicy(defaultMaxItems=maxItems, maxEventsPerSlice=maxEvents)) + let conn (log: ILogger, storeLog) (sargs : ParseResults) = + let read key = Environment.GetEnvironmentVariable key |> Option.ofObj + + let (Discovery.UriAndKey (connUri,_)) as discovery = + sargs.GetResult(Connection, defaultArg (read "EQUINOX_COSMOS_CONNECTION") "AccountEndpoint=https://localhost:8081;AccountKey=C2y6yDjf5/R+ob0N8A7Cgv30VRDJIWEHLM+4QDU5DE2nQ9nDuVTqobD4b8mGGyPMbIZnqyMsEcaGQy67XIw/Jw==;") + |> Discovery.FromConnectionString + + let dbName = sargs.GetResult(Database, defaultArg (read "EQUINOX_COSMOS_DATABASE") "equinox-test") + let collName = sargs.GetResult(Collection, defaultArg (read "EQUINOX_COSMOS_COLLECTION") "equinox-test") + let timeout = sargs.GetResult(Timeout,5.) |> float |> TimeSpan.FromSeconds + let mode = sargs.GetResult(ConnectionMode,ConnectionMode.DirectTcp) + let (retries, maxRetryWaitTime) as operationThrottling = sargs.GetResult(Retries, 1), sargs.GetResult(RetriesWaitTime, 5) + let pageSize = sargs.GetResult(PageSize,1) + log.Information("Using CosmosDb {mode} Connection {connection} Database: {database} Collection: {collection} maxEventsPerSlice: {pageSize}. " + + "Request timeout: {timeout} with {retries} retries; throttling MaxRetryWaitTime {maxRetryWaitTime}", + mode, connUri, dbName, collName, pageSize, timeout, retries, maxRetryWaitTime) + dbName, collName, pageSize, connect storeLog mode discovery timeout operationThrottling |> Async.RunSynchronously + let config (log: ILogger, storeLog) (cache, unfolds) (sargs : ParseResults) = + let dbName, collName, pageSize, conn = conn (log, storeLog) sargs + let cacheStrategy = + if cache then + let c = Caching.Cache("Cli", sizeMb = 50) + CachingStrategy.SlidingWindow (c, TimeSpan.FromMinutes 20.) |> Some + else None + StorageConfig.Cosmos (createGateway conn (defaultBatchSize,pageSize), cacheStrategy, unfolds, dbName, collName) \ No newline at end of file diff --git a/samples/Store/Web/Program.fs b/samples/Store/Web/Program.fs index 8b749f709..72cc1a9b6 100644 --- a/samples/Store/Web/Program.fs +++ b/samples/Store/Web/Program.fs @@ -4,6 +4,7 @@ open Argu open Microsoft.AspNetCore open Microsoft.AspNetCore.Hosting open Microsoft.Extensions.DependencyInjection +open Samples.Infrastructure.Log open Serilog module Program = @@ -27,6 +28,8 @@ module Program = .MinimumLevel.Override("Microsoft", Serilog.Events.LogEventLevel.Warning) .Enrich.FromLogContext() .WriteTo.Console() + // TOCONSIDER log and reset every minute or something ? + .WriteTo.Sink(RuCounterSink()) let c = let maybeSeq = if args.Contains LocalSeq then Some "http://localhost:5341" else None match maybeSeq with None -> c | Some endpoint -> c.WriteTo.Seq(endpoint) diff --git a/samples/Store/Web/Startup.fs b/samples/Store/Web/Startup.fs index 2800aad01..fac5a6bca 100644 --- a/samples/Store/Web/Startup.fs +++ b/samples/Store/Web/Startup.fs @@ -18,6 +18,7 @@ type Arguments = | [] Unfolds | [] Memory of ParseResults | [] Es of ParseResults + | [] Cosmos of ParseResults interface IArgParserTemplate with member a.Usage = a |> function | VerboseConsole -> "Include low level Domain and Store logging in screen output." @@ -26,6 +27,7 @@ type Arguments = | Unfolds -> "employ a store-appropriate Rolling Snapshots and/or Unfolding strategy." | Memory _ -> "specify In-Memory Volatile Store (Default store)." | Es _ -> "specify storage in EventStore (--help for options)." + | Cosmos _ -> "specify storage in CosmosDb (--help for options)." type App = class end @@ -54,6 +56,10 @@ type Startup() = let storeLog = createStoreLog <| sargs.Contains EsArguments.VerboseStore log.Information("EventStore Storage options: {options:l}", options) EventStore.config (log,storeLog) (cache, unfolds) sargs, storeLog + | Some (Cosmos sargs) -> + let storeLog = createStoreLog <| sargs.Contains CosmosArguments.VerboseStore + log.Information("CosmosDb Storage options: {options:l}", options) + Cosmos.config (log,storeLog) (cache, unfolds) sargs, storeLog | _ | Some (Memory _) -> log.Fatal("Web App is using Volatile Store; Storage options: {options:l}", options) MemoryStore.config (), log diff --git a/src/Equinox.Cosmos/Cosmos.fs b/src/Equinox.Cosmos/Cosmos.fs index 7819f2b3e..b88073de4 100644 --- a/src/Equinox.Cosmos/Cosmos.fs +++ b/src/Equinox.Cosmos/Cosmos.fs @@ -953,6 +953,7 @@ type EqxConnector /// Additional strings identifying the context of this connection; should provide enough context to disambiguate all potential connections to a cluster /// NB as this will enter server and client logs, it should not contain sensitive information ?tags : (string*string) seq) = + do if log = null then nullArg "log" let connPolicy = let cp = Client.ConnectionPolicy.Default @@ -1010,7 +1011,7 @@ type EqxContext /// Database + Collection selector collections: EqxCollections, /// Logger to write to - see https://github.com/serilog/serilog/wiki/Provided-Sinks for how to wire to your logger - logger : Serilog.ILogger, + log : Serilog.ILogger, /// Optional maximum number of Store.Batch records to retrieve as a set (how many Events are placed therein is controlled by maxEventsPerSlice). /// Defaults to 10 ?defaultMaxItems, @@ -1019,6 +1020,7 @@ type EqxContext /// Threshold defining the number of events a slice is allowed to hold before switching to a new Batch is triggered. /// Defaults to 1 ?maxEventsPerSlice) = + do if log = null then nullArg "log" let getDefaultMaxItems = match getDefaultMaxItems with Some f -> f | None -> fun () -> defaultArg defaultMaxItems 10 let maxEventsPerSlice = defaultArg maxEventsPerSlice 1 let batching = EqxBatchingPolicy(getDefaultMaxItems=getDefaultMaxItems, maxEventsPerSlice=maxEventsPerSlice) @@ -1041,7 +1043,7 @@ type EqxContext let direction = defaultArg direction Direction.Forward let batchSize = defaultArg batchSize batching.MaxItems * maxEventsPerSlice let batching = EqxBatchingPolicy(if batchSize < maxEventsPerSlice then 1 else batchSize/maxEventsPerSlice) - gateway.ReadLazy batching logger stream direction startPos (Some,fun _ -> false) + gateway.ReadLazy batching log stream direction startPos (Some,fun _ -> false) member internal __.GetInternal((stream, startPos), ?maxCount, ?direction) = async { let direction = defaultArg direction Direction.Forward @@ -1053,13 +1055,13 @@ type EqxContext match maxCount with | Some limit -> maxCountPredicate limit | None -> fun _ -> false - return! gateway.Read logger stream direction startPos (Some,isOrigin) } + return! gateway.Read log stream direction startPos (Some,isOrigin) } /// Establishes the current position of the stream in as effficient a manner as possible /// (The ideal situation is that the preceding token is supplied as input in order to avail of 1RU low latency state checks) member __.Sync(stream, ?position: Position) : Async = async { //let indexed predicate = load fold initial (coll.Gateway.IndexedOrBatched log predicate (stream,None)) - let! (Token.Unpack (_,pos')) = gateway.GetPosition(logger, stream, ?pos=position) + let! (Token.Unpack (_,pos')) = gateway.GetPosition(log, stream, ?pos=position) return pos' } /// Reads in batches of `batchSize` from the specified `Position`, allowing the reader to efficiently walk away from a running query @@ -1075,7 +1077,7 @@ type EqxContext /// Callers should implement appropriate idempotent handling, or use Equinox.Handler for that purpose member __.Sync(stream, position, events: IEvent[]) : Async> = async { let batch = Sync.mkBatch stream events Seq.empty - let! res = gateway.Sync logger stream (Some position.index,batch) + let! res = gateway.Sync log stream (Some position.index,batch) match res with | InternalSyncResult.Written (Token.Unpack (_,pos)) -> return AppendResult.Ok pos | InternalSyncResult.Conflict (Token.Unpack (_,pos),events) -> return AppendResult.Conflict (pos, events)