Skip to content

Commit

Permalink
More diff reduction
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Aug 16, 2019
1 parent b09d938 commit fe1f800
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 18 deletions.
33 changes: 17 additions & 16 deletions src/Equinox.Cosmos/Cosmos.fs
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ type IRetryPolicy = abstract member Execute: (int -> Async<'T>) -> Async<'T>

module Log =
[<NoEquality; NoComparison>]
type Measurement = { stream : string; interval: StopwatchInterval; bytes: int; count: int; ru: float }
type Measurement = { stream: string; interval: StopwatchInterval; bytes: int; count: int; ru: float }
[<NoEquality; NoComparison>]
type Event =
/// Individual read request for the Tip
Expand Down Expand Up @@ -472,30 +472,30 @@ function sync(req, expectedVersion, maxEvents) {
let adjustOffer (c:Client.DocumentClient) resourceLink rus = async {
let offer = c.CreateOfferQuery().Where(fun r -> r.ResourceLink = resourceLink).AsEnumerable().Single()
let! _ = c.ReplaceOfferAsync(OfferV2(offer,rus)) |> Async.AwaitTaskCorrect in () }
let private createDatabaseIfNotExists (client:Client.DocumentClient) dbName maybeRus =
let private createDatabaseIfNotExists (client:Client.DocumentClient) dName maybeRus =
let opts = Client.RequestOptions(ConsistencyLevel = Nullable ConsistencyLevel.Session)
maybeRus |> Option.iter (fun rus -> opts.OfferThroughput <- Nullable rus)
client.CreateDatabaseIfNotExistsAsync(Database(Id=dbName), options = opts) |> Async.AwaitTaskCorrect
let private createOrProvisionDatabase (client:Client.DocumentClient) dbName mode = async {
client.CreateDatabaseIfNotExistsAsync(Database(Id=dName), options = opts) |> Async.AwaitTaskCorrect
let private createOrProvisionDatabase (client:Client.DocumentClient) dName mode = async {
match mode with
| Provisioning.Database rus ->
let! db = createDatabaseIfNotExists client dbName (Some rus)
let! db = createDatabaseIfNotExists client dName (Some rus)
return! adjustOffer client db.Resource.SelfLink rus
| Provisioning.Container _ ->
let! _ = createDatabaseIfNotExists client dbName None in () }
let private createContainerIfNotExists (client:Client.DocumentClient) dbName (def: DocumentCollection) maybeRus =
let dbUri = Client.UriFactory.CreateDatabaseUri dbName
let! _ = createDatabaseIfNotExists client dName None in () }
let private createContainerIfNotExists (client:Client.DocumentClient) dName (def: DocumentCollection) maybeRus =
let dbUri = Client.UriFactory.CreateDatabaseUri dName
let opts = match maybeRus with None -> Client.RequestOptions() | Some rus -> Client.RequestOptions(OfferThroughput=Nullable rus)
client.CreateDocumentCollectionIfNotExistsAsync(dbUri, def, opts) |> Async.AwaitTaskCorrect
let private createOrProvisionContainer (client: Client.DocumentClient) (dbName, def: DocumentCollection) mode = async {
let private createOrProvisionContainer (client: Client.DocumentClient) (dName, def: DocumentCollection) mode = async {
match mode with
| Provisioning.Database _ ->
let! _ = createContainerIfNotExists client dbName def None in ()
let! _ = createContainerIfNotExists client dName def None in ()
| Provisioning.Container rus ->
let! container = createContainerIfNotExists client dbName def (Some rus) in ()
let! container = createContainerIfNotExists client dName def (Some rus) in ()
return! adjustOffer client container.Resource.SelfLink rus }
let private createStoredProcIfNotExists (container:Container) (name, body): Async<float> = async {
try let! r = container.Client.CreateStoredProcedureAsync(container.CollectionUri, StoredProcedure(Id = name, Body = body)) |> Async.AwaitTaskCorrect
let private createStoredProcIfNotExists (c:Container) (name, body): Async<float> = async {
try let! r = c.Client.CreateStoredProcedureAsync(c.CollectionUri, StoredProcedure(Id = name, Body = body)) |> Async.AwaitTaskCorrect
return r.RequestCharge
with CosmosException ((CosmosStatusCode sc) as e) when sc = System.Net.HttpStatusCode.Conflict -> return e.RequestCharge }
let private mkContainerProperties idFieldName partionKeyFieldName =
Expand Down Expand Up @@ -525,7 +525,8 @@ function sync(req, expectedVersion, maxEvents) {
def.IndexingPolicy.Automatic <- false
def.IndexingPolicy.IndexingMode <- IndexingMode.None
createOrProvisionContainer client (dName,def) mode
let init log (container : Container) (dName,cName) mode skipStoredProc = async {
let init log (client : Client.DocumentClient) (dName,cName) mode skipStoredProc = async {
let container = Container(client,dName,cName)
do! createOrProvisionDatabase container.Client dName mode
do! createBatchAndTipContainerIfNotExists container.Client (dName,cName) mode
if not skipStoredProc then
Expand Down Expand Up @@ -925,7 +926,7 @@ type private ContainerWrapper(container : Container, ?initContainer : Container
member internal __.InitializationGate = match initGuard with Some g when g.PeekIsValid() |> not -> Some g.AwaitValue | _ -> None

/// Defines a process for mapping from a Stream Name to the appropriate storage area, allowing control over segregation / co-locating of data
type Containers(categoryAndIdToDatabaseContainerAndStream : string -> string -> string*string*string, [<O; D(null)>]?disableInitialization) =
type Containers(categoryAndIdToDatabaseContainerStream : string -> string -> string*string*string, [<O; D(null)>]?disableInitialization) =
// Index of database*collection -> Initialization Context
let wrappers = ConcurrentDictionary<string*string, ContainerWrapper>()
new (databaseId, containerId) =
Expand All @@ -934,7 +935,7 @@ type Containers(categoryAndIdToDatabaseContainerAndStream : string -> string ->
Containers(fun categoryName streamId -> databaseId, containerId, genStreamName categoryName streamId)

member internal __.Resolve(client, categoryName, id, init) : (Container*string) * (unit -> Async<unit>) option =
let databaseId, containerName, streamName = categoryAndIdToDatabaseContainerAndStream categoryName id
let databaseId, containerName, streamName = categoryAndIdToDatabaseContainerStream categoryName id
let init = match disableInitialization with Some true -> None | _ -> Some init
let mkWrapped (db,containerName) = ContainerWrapper(Container(client,db,containerName), ?initContainer = init)
let wrapped = wrappers.GetOrAdd((databaseId,containerName), mkWrapped)
Expand Down
3 changes: 1 addition & 2 deletions tools/Equinox.Tool/Program.fs
Original file line number Diff line number Diff line change
Expand Up @@ -223,8 +223,7 @@ module CosmosInit =
let modeStr, rus = match mode with Provisioning.Container rus -> "Container",rus | Provisioning.Database rus -> "Database",rus
log.Information("Provisioning `Equinox.Cosmos` Store collection at {mode:l} level for {rus:n0} RU/s", modeStr, rus)
let! conn = connector.Connect("equinox-tool", discovery)
let container = Equinox.Cosmos.Store.Container(conn.Client,dName,cName)
return! init log container (dName,cName) mode skipStoredProc
return! init log conn.Client (dName,cName) mode skipStoredProc
| _ -> failwith "please specify a `cosmos` endpoint" }

[<EntryPoint>]
Expand Down

0 comments on commit fe1f800

Please sign in to comment.