Skip to content

Commit

Permalink
Add stored proc auto provisioning per collection #59
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Dec 13, 2018
1 parent 1b39b7e commit ecdf2f7
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 49 deletions.
3 changes: 2 additions & 1 deletion build.ps1
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ if ($skipCosmos) {
warn "Skipping Provisioning Cosmos"
} else {
warn "Provisioning cosmos..."
cliCosmos @("init", "-ru", "1000")
# -P: inhibit creation of stored proc (everything in the repo should work without it due to auto-provisioning)
cliCosmos @("init", "-ru", "400", "-P")
$deprovisionCosmos=$true
}
$env:EQUINOX_INTEGRATION_SKIP_COSMOS=[string]$skipCosmos
Expand Down
9 changes: 8 additions & 1 deletion cli/Equinox.Cli/Program.fs
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,12 @@ type Arguments =
| Initialize _ -> "Initialize a store"
and [<NoComparison>]InitArguments =
| [<AltCommandLine("-ru"); Mandatory>] Rus of int
| [<AltCommandLine("-P")>] SkipStoredProc
| [<CliPrefix(CliPrefix.None)>] Cosmos of ParseResults<CosmosArguments>
interface IArgParserTemplate with
member a.Usage = a |> function
| Rus _ -> "Specify RU/s level to provision for the Application Collection."
| SkipStoredProc -> "Inhibit creation of stored procedure in cited Collection."
| Cosmos _ -> "Cosmos Connection parameters."
and [<NoComparison>]WebArguments =
| [<AltCommandLine("-u")>] Endpoint of string
Expand Down Expand Up @@ -206,7 +208,12 @@ let main argv =
let storeLog = createStoreLog (sargs.Contains CosmosArguments.VerboseStore) verboseConsole maybeSeq
let dbName, collName, (_pageSize: int), conn = Cosmos.conn (log,storeLog) sargs
log.Information("Configuring CosmosDb Collection with Throughput Provision: {rus:n0} RU/s", rus)
Equinox.Cosmos.Store.Sync.Initialization.initialize log conn.Client dbName collName rus |> Async.RunSynchronously
Async.RunSynchronously <| async {
do! Equinox.Cosmos.Store.Sync.Initialization.createDatabaseIfNotExists conn.Client dbName
do! Equinox.Cosmos.Store.Sync.Initialization.createCollectionIfNotExists conn.Client (dbName,collName) rus
let collectionUri = Microsoft.Azure.Documents.Client.UriFactory.CreateDocumentCollectionUri(dbName,collName)
if not (iargs.Contains SkipStoredProc) then
do! Equinox.Cosmos.Store.Sync.Initialization.createSyncStoredProcIfNotExists (Some (upcast log)) conn.Client collectionUri }
| _ -> failwith "please specify a `cosmos` endpoint"
| Run rargs ->
let reportFilename = args.GetResult(LogFile,programName+".log") |> fun n -> System.IO.FileInfo(n).FullName
Expand Down
103 changes: 65 additions & 38 deletions src/Equinox.Cosmos/Cosmos.fs
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,7 @@ module internal Position =
type Direction = Forward | Backward override this.ToString() = match this with Forward -> "Forward" | Backward -> "Backward"

/// Reference to Collection and name that will be used as the location for the stream
type [<NoComparison>]
CollectionStream = { collectionUri: System.Uri; name: string } //with
type [<NoComparison>] CollectionStream = { collectionUri: Uri; name: string }

type internal Enum() =
static member internal Events(b: Tip) =
Expand Down Expand Up @@ -293,8 +292,8 @@ module Sync =
// NB don't nest in a private module, or serialization will fail miserably ;)
[<CLIMutable; NoEquality; NoComparison; Newtonsoft.Json.JsonObject(ItemRequired=Newtonsoft.Json.Required.AllowNull)>]
type SyncResponse = { etag: string; n: int64; conflicts: Event[] }
let [<Literal>] sprocName = "EquinoxNoTipEvents" // NB need to renumber for any breaking change
let [<Literal>] sprocBody = """
let [<Literal>] private sprocName = "EquinoxNoTipEvents" // NB need to renumber for any breaking change
let [<Literal>] private sprocBody = """
// Manages the merging of the supplied Request Batch, fulfilling one of the following end-states
// 1 perform expectedVersion verification (can request inhibiting of check by supplying -1)
Expand Down Expand Up @@ -413,12 +412,10 @@ function sync(req, expectedVersion, maxEvents) {

module Initialization =
open System.Collections.ObjectModel
let createDatabase (client:IDocumentClient) dbName = async {
let createDatabaseIfNotExists (client:IDocumentClient) dbName =
let opts = Client.RequestOptions(ConsistencyLevel = Nullable ConsistencyLevel.Session)
let! db = client.CreateDatabaseIfNotExistsAsync(Database(Id=dbName), options = opts) |> Async.AwaitTaskCorrect
return db.Resource.Id }

let createCollection (client: IDocumentClient) (dbUri: Uri) collName ru = async {
client.CreateDatabaseIfNotExistsAsync(Database(Id=dbName), options = opts) |> Async.AwaitTaskCorrect |> Async.Ignore
let createCollectionIfNotExists (client: IDocumentClient) (dbName,collName) ru = async {
let pkd = PartitionKeyDefinition()
pkd.Paths.Add(sprintf "/%s" Batch.PartitionKeyField)
let colld = DocumentCollection(Id = collName, PartitionKey = pkd)
Expand All @@ -430,22 +427,17 @@ function sync(req, expectedVersion, maxEvents) {
colld.IndexingPolicy.ExcludedPaths <- Collection [|ExcludedPath(Path="/*")|]
// NB its critical to index the nominated PartitionKey field defined above or there will be runtime errors
colld.IndexingPolicy.IncludedPaths <- Collection [| for k in Batch.IndexedFields -> IncludedPath(Path=sprintf "/%s/?" k) |]
let! coll = client.CreateDocumentCollectionIfNotExistsAsync(dbUri, colld, Client.RequestOptions(OfferThroughput=Nullable ru)) |> Async.AwaitTaskCorrect
return coll.Resource.Id }

let createProc (log: ILogger) (client: IDocumentClient) (collectionUri: Uri) = async {
let def = new StoredProcedure(Id = sprocName, Body = sprocBody)
log.Information("Creating stored procedure {sprocId}", def.Id)
// TODO ifnotexist semantics
return! client.CreateStoredProcedureAsync(collectionUri, def) |> Async.AwaitTaskCorrect |> Async.Ignore }

let initialize log (client : IDocumentClient) dbName collName ru = async {
let! dbId = createDatabase client dbName
let dbUri = Client.UriFactory.CreateDatabaseUri dbId
let! collId = createCollection client dbUri collName ru
let collUri = Client.UriFactory.CreateDocumentCollectionUri (dbName, collId)
//let! _aux = createAux client dbUri collName auxRu
return! createProc log client collUri }
let dbUri = Client.UriFactory.CreateDatabaseUri dbName
return! client.CreateDocumentCollectionIfNotExistsAsync(dbUri, colld, Client.RequestOptions(OfferThroughput=Nullable ru)) |> Async.AwaitTaskCorrect |> Async.Ignore }
let private createStoredProcIfNotExists (client: IDocumentClient) (collectionUri: Uri) (name, body): Async<float> = async {
try let! r = client.CreateStoredProcedureAsync(collectionUri, StoredProcedure(Id = name, Body = body)) |> Async.AwaitTaskCorrect
return r.RequestCharge
with DocDbException ((DocDbStatusCode sc) as e) when sc = System.Net.HttpStatusCode.Conflict -> return e.RequestCharge }
let createSyncStoredProcIfNotExists (log: ILogger option) client collUri = async {
let! t, ru = createStoredProcIfNotExists client collUri (sprocName,sprocBody) |> Stopwatch.Time
match log with
| None -> ()
| Some log -> log.Information("Created stored procedure {sprocId} rc={ru} t={ms}", sprocName, ru, (let e = t.Elapsed in e.TotalMilliseconds)) }

module internal Tip =
let private get (client: IDocumentClient) (stream: CollectionStream, maybePos: Position option) =
Expand Down Expand Up @@ -658,6 +650,7 @@ open Equinox.Store.Infrastructure
open FSharp.Control
open Serilog
open System
open System.Collections.Concurrent

/// Defines policies for retrying with respect to transient failures calling CosmosDb (as opposed to application level concurrency conflicts)
type EqxConnection(client: Microsoft.Azure.Documents.IDocumentClient, ?readRetryPolicy: IRetryPolicy, ?writeRetryPolicy) =
Expand Down Expand Up @@ -719,6 +712,8 @@ type EqxGateway(conn : EqxConnection, batching : EqxBatchingPolicy) =
| Tip.Result.Found (pos, FromUnfold tryDecode isOrigin span) -> return LoadFromTokenResult.Found (Token.create stream pos, span)
| _ -> let! res = __.Read log stream Direction.Forward (Some pos) (tryDecode,isOrigin)
return LoadFromTokenResult.Found res }
member __.CreateSyncStoredProcIfNotExists log =
Sync.Initialization.createSyncStoredProcIfNotExists log conn.Client
member __.Sync log stream (expectedVersion, batch: Tip): Async<InternalSyncResult> = async {
let! wr = Sync.batch log conn.WriteRetryPolicy conn.Client stream (expectedVersion,batch,batching.MaxItems)
match wr with
Expand Down Expand Up @@ -834,20 +829,37 @@ type private Folder<'event, 'state>
| Store.SyncResult.Conflict resync -> return Store.SyncResult.Conflict resync
| Store.SyncResult.Written (token',state') -> return Store.SyncResult.Written (token',state') }

/// Holds Database/Collection pair, coordinating initialization activities
type private EqxCollection(databaseId, collectionId, ?initCollection : Uri -> Async<unit>) =
let collectionUri = Microsoft.Azure.Documents.Client.UriFactory.CreateDocumentCollectionUri(databaseId, collectionId)
let initGuard = initCollection |> Option.map (fun init -> AsyncCacheCell<unit>(init collectionUri))

member __.CollectionUri = collectionUri
member internal __.InitializationGate = match initGuard with Some g when g.PeekIsValid() |> not -> Some g.AwaitValue | _ -> None

/// Defines a process for mapping from a Stream Name to the appropriate storage area, allowing control over segregation / co-locating of data
type EqxCollections(categoryAndIdToDatabaseCollectionAndStream : string -> string -> string*string*string) =
type EqxCollections(categoryAndIdToDatabaseCollectionAndStream : string -> string -> string*string*string, ?disableInitialization) =
// Index of database*collection -> Initialization Context
let collections = ConcurrentDictionary<string*string, EqxCollection>()
new (databaseId, collectionId) =
// TOCONSIDER - this works to support the Core.Events APIs
let genStreamName categoryName streamId = if categoryName = null then streamId else sprintf "%s-%s" categoryName streamId
EqxCollections(fun categoryName streamId -> databaseId, collectionId, genStreamName categoryName streamId)
member __.CollectionForStream (categoryName,id) : CollectionStream =

member internal __.Resolve(categoryName, id, init) : CollectionStream * (unit -> Async<unit>) option =
let databaseId, collectionId, streamName = categoryAndIdToDatabaseCollectionAndStream categoryName id
{ collectionUri = Microsoft.Azure.Documents.Client.UriFactory.CreateDocumentCollectionUri(databaseId, collectionId); name = streamName }
let init = match disableInitialization with Some true -> None | _ -> Some init

let coll = collections.GetOrAdd((databaseId,collectionId), fun (db,coll) -> EqxCollection(db, coll, ?initCollection = init))
{ collectionUri = coll.CollectionUri; name = streamName },coll.InitializationGate

/// Pairs a Gateway, defining the retry policies for CosmosDb with an EqxCollections to
type EqxStore(gateway: EqxGateway, collections: EqxCollections) =
/// Pairs a Gateway, defining the retry policies for CosmosDb with an EqxCollections defining mappings from (category,id) to (database,collection,streamName)
type EqxStore(gateway: EqxGateway, collections: EqxCollections, ?resolverLog) =
let init = gateway.CreateSyncStoredProcIfNotExists resolverLog
member __.Gateway = gateway
member __.Collections = collections
member internal __.ResolveCollStream(categoryName, id) : CollectionStream * (unit -> Async<unit>) option =
collections.Resolve(categoryName, id, init)

[<NoComparison; NoEquality; RequireQualifiedAccess>]
type CachingStrategy =
Expand Down Expand Up @@ -886,19 +898,27 @@ type EqxResolver<'event, 'state>(store : EqxStore, codec, fold, initial, ?access
| Some (CachingStrategy.SlidingWindow(cache, window)) ->
Caching.applyCacheUpdatesWithSlidingExpiration cache null window folder

let mkStreamName = store.Collections.CollectionForStream
let resolve = Equinox.Store.Stream.create category
let resolveStream (streamId, maybeCollectionInitializationGate) =
{ new Store.IStream<'event, 'state> with
member __.Load log = category.Load streamId log
member __.TrySync (log: ILogger) (token: Store.StreamToken, originState: 'state) (events: 'event list) =
match maybeCollectionInitializationGate with
| None -> category.TrySync log (token, originState) events
| Some init -> async {
do! init ()
return! category.TrySync log (token, originState) events } }

member __.Resolve = function
| Target.CatId (categoryName,streamId) ->
resolve <| mkStreamName (categoryName, streamId)
store.ResolveCollStream(categoryName, streamId) |> resolveStream
| Target.CatIdEmpty (categoryName,streamId) ->
let stream = mkStreamName (categoryName, streamId)
Store.Stream.ofMemento (Token.create stream Position.fromKnownEmpty,initial) (resolve stream)
let collStream, maybeInit = store.ResolveCollStream(categoryName, streamId)
Store.Stream.ofMemento (Token.create collStream Position.fromKnownEmpty,initial) (resolveStream (collStream, maybeInit))
| Target.DeprecatedRawName _ as x -> failwithf "Stream name not supported: %A" x

member __.FromMemento(Token.Unpack (stream,_pos) as streamToken,state) =
Store.Stream.ofMemento (streamToken,state) (resolve stream)
let skipInitialization = None
Store.Stream.ofMemento (streamToken,state) (resolveStream (stream,skipInitialization))

[<RequireQualifiedAccess; NoComparison>]
type Discovery =
Expand Down Expand Up @@ -1023,7 +1043,8 @@ type EqxContext
let! (Token.Unpack (_,pos')), data = res
return pos', data }

member __.CreateStream(streamName) = collections.CollectionForStream(null, streamName)
member __.ResolveStream(streamName) = collections.Resolve(null, streamName, gateway.CreateSyncStoredProcIfNotExists (Some log))
member __.CreateStream(streamName) = __.ResolveStream streamName |> fst

member internal __.GetLazy((stream, startPos), ?batchSize, ?direction) : AsyncSeq<IIndexedEvent[]> =
let direction = defaultArg direction Direction.Forward
Expand Down Expand Up @@ -1061,7 +1082,13 @@ type EqxContext

/// Appends the supplied batch of events, subject to a consistency check based on the `position`
/// Callers should implement appropriate idempotent handling, or use Equinox.Handler for that purpose
member __.Sync(stream, position, events: IEvent[]) : Async<AppendResult<Position>> = async {
member __.Sync(stream : CollectionStream, position, events: IEvent[]) : Async<AppendResult<Position>> = async {
// Writes go through the stored proc, which we need to provision per-collection
// Having to do this here in this way is far from ideal, but work on caching, external snapshots and caching is likely
// to move this about before we reach a final destination in any case
match __.ResolveStream stream.name |> snd with
| None -> ()
| Some init -> do! init ()
let batch = Sync.mkBatch stream events Seq.empty
let! res = gateway.Sync log stream (Some position.index,batch)
match res with
Expand Down
32 changes: 23 additions & 9 deletions src/Equinox.EventStore/Infrastructure.fs
Original file line number Diff line number Diff line change
Expand Up @@ -126,30 +126,44 @@ type AsyncLazy<'T>(workflow : Async<'T>) =

/// Generic async lazy caching implementation that admits expiration/recomputation semantics
/// If `workflow` fails, all readers entering while the load/refresh is in progress will share the failure
type AsyncCacheCell<'T>(workflow : Async<'T>, isExpired : 'T -> bool) =
type AsyncCacheCell<'T>(workflow : Async<'T>, ?isExpired : 'T -> bool) =
let mutable currentCell = AsyncLazy workflow

let initializationFailed (value: System.Threading.Tasks.Task<_>) =
// for TMI on this, see https://stackoverflow.com/a/33946166/11635
value.IsCompleted && value.Status = System.Threading.Tasks.TaskStatus.RanToCompletion

let update cell = async {
// avoid unneccessary recomputation in cases where competing threads detect expiry;
// the first write attempt wins, and everybody else reads off that value.
// the first write attempt wins, and everybody else reads off that value
let _ = System.Threading.Interlocked.CompareExchange(&currentCell, AsyncLazy workflow, cell)
return! currentCell.AwaitValue()
}

/// Enables callers to short-circuit the gate by checking whether a value has been computed
member __.PeekIsValid() =
let cell = currentCell
let currentState = cell.PeekInternalTask
if not currentState.IsValueCreated then false else

let value = currentState.Value
not (initializationFailed value)
&& (match isExpired with Some f -> not (f value.Result) | _ -> false)

/// Gets or asynchronously recomputes a cached value depending on expiry and availability
member __.AwaitValue() = async {
let cell = currentCell
let currentState = cell.PeekInternalTask
// If the last attempt completed, but failed, we need to treat it as expired; for TMI on this, see https://stackoverflow.com/a/33946166/11635
if currentState.IsValueCreated && currentState.Value.IsCompleted && currentState.Value.Status <> System.Threading.Tasks.TaskStatus.RanToCompletion then
// If the last attempt completed, but failed, we need to treat it as expired
if currentState.IsValueCreated && initializationFailed currentState.Value then
return! update cell
else
let! current = cell.AwaitValue()
if isExpired current then
return! update cell
else
return current
match isExpired with
| Some f when f current -> return! update cell
| _ -> return current
}

[<RequireQualifiedAccess>]
module Regex =
open System.Text.RegularExpressions
Expand Down

0 comments on commit ecdf2f7

Please sign in to comment.