Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Nov 8, 2018
1 parent 67ae40d commit a976fc6
Showing 1 changed file with 60 additions and 18 deletions.
78 changes: 60 additions & 18 deletions src/Equinox.Cosmos/Cosmos.fs
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,17 @@ module Store =
{ p: string // "{streamName}"
id: string // "{-1}"

w: int64 // 100: window size
//w: int64 // 100: window size
/// last index/i value
m: int64 // {index}

(* "x": [
/// Compacted projections based on version identified by `m`
c: IndexProjection[]

(*// Potential schema to manage Pending Events together with compaction events based on each one
// This scheme is more complete than the simple `c` encoding, which relies on every writer being able to write all salient snapshots
// For instance, in the case of blue/green deploys, older versions need to be able to coexist without destroying the perf for eachother
"x": [
{ "i":0,
"c":"ISO 8601"
"e":[
Expand All @@ -95,8 +101,22 @@ module Store =
]
}
] *)
x: JObject[][] }
//x: JObject[][]
}
static member Create (pos: Position) eventCount (eds: EventData[]) : IndexEvent =
{ p = pos.streamName; id = "-1"; m = pos.IndexRel eventCount
c = [| for ed in eds -> { t = ed.eventType; d = ed.data; m = ed.metadata } |] }
and IndexProjection =
{ /// The Event Type, used to drive deserialization
t: string // required

/// Event body, as UTF-8 encoded json ready to be injected into the Json being rendered for DocDb
[<JsonConverter(typeof<VerbatimUtf8JsonConverter>)>]
d: byte[] // required

/// Optional metadata (null, or same as d, not written if missing)
[<JsonConverter(typeof<VerbatimUtf8JsonConverter>); JsonProperty(Required=Required.Default, NullValueHandling=NullValueHandling.Ignore)>]
m: byte[] } // optional
(* Pseudocode:
function sync(p, expectedVersion, windowSize, events) {
if (i == 0) then {
Expand Down Expand Up @@ -155,19 +175,23 @@ type EqxSyncResult = Written of Store.Position * requestCharge: float | Conflict

module private Write =
let [<Literal>] sprocName = "AtomicMultiDocInsert"
let append (client: IDocumentClient) (pos: Store.Position) (eventsData: Store.EventData seq): Async<Store.Position * float> = async {
let append (client: IDocumentClient) (pos: Store.Position) (eventsData: Store.EventData seq,maybeIndexEvents): Async<Store.Position * float> = async {
let sprocUri = sprintf "%O/sprocs/%s" pos.collectionUri sprocName
let opts = Client.RequestOptions(PartitionKey=PartitionKey(pos.streamName))
let! ct = Async.CancellationToken
let events = eventsData |> Seq.mapi (fun i ed -> Store.Event.Create pos (i+1) ed |> JsonConvert.SerializeObject) |> Seq.toArray
if events.Length = 0 then invalidArg "eventsData" "must be non-empty"
let! res = client.ExecuteStoredProcedureAsync<bool>(sprocUri, opts, ct, box events) |> Async.AwaitTaskCorrect
let index : Store.IndexEvent =
match maybeIndexEvents with
| None | Some [||] -> Unchecked.defaultof<_>
| Some eds -> Store.IndexEvent.Create pos (events.Length) eds
let! res = client.ExecuteStoredProcedureAsync<bool>(sprocUri, opts, ct, box events, box index) |> Async.AwaitTaskCorrect
return { pos with index = Some (pos.IndexRel events.Length) }, res.RequestCharge }

/// Yields `EqxSyncResult.Written`, or `EqxSyncResult.Conflict` to signify WrongExpectedVersion
let private writeEventsAsync (log : ILogger) client pk (events : Store.EventData[]): Async<EqxSyncResult> = async {
let private writeEventsAsync (log : ILogger) client pk (events : Store.EventData[],maybeIndexEvents): Async<EqxSyncResult> = async {
try
let! wr = append client pk events
let! wr = append client pk (events,maybeIndexEvents)
return EqxSyncResult.Written wr
with :? DocumentClientException as ex when ex.Message.Contains "already" -> // TODO this does not work for the SP
log.Information(ex, "Eqx TrySync WrongExpectedVersionException writing {EventTypes}", [| for x in events -> x.eventType |])
Expand All @@ -177,12 +201,12 @@ module private Write =
let eventDataLen ({ data = Log.BlobLen bytes; metadata = Log.BlobLen metaBytes } : Store.EventData) = bytes + metaBytes
events |> Array.sumBy eventDataLen

let private writeEventsLogged client (pos : Store.Position) (events : Store.EventData[]) (log : ILogger): Async<EqxSyncResult> = async {
let private writeEventsLogged client (pos : Store.Position) (events : Store.EventData[], maybeIndexEvents) (log : ILogger): Async<EqxSyncResult> = async {
let log = if (not << log.IsEnabled) Events.LogEventLevel.Debug then log else log |> Log.propEventData "Json" events
let bytes, count = bytes events, events.Length
let log = log |> Log.prop "bytes" bytes
let writeLog = log |> Log.prop "stream" pos.streamName |> Log.prop "expectedVersion" pos.Index |> Log.prop "count" count
let! t, result = writeEventsAsync writeLog client pos events |> Stopwatch.Time
let! t, result = writeEventsAsync writeLog client pos (events,maybeIndexEvents) |> Stopwatch.Time
let (ru: float), resultLog =
let mkMetric ru : Log.Measurement = { stream = pos.streamName; interval = t; bytes = bytes; count = count; ru = ru }
match result with
Expand All @@ -191,8 +215,8 @@ module private Write =
resultLog.Information("Eqx {action:l} {count} {ms}ms rc={ru}", "Write", events.Length, (let e = t.Elapsed in e.TotalMilliseconds), ru)
return result }

let writeEvents (log : ILogger) retryPolicy client pk (events : Store.EventData[]): Async<EqxSyncResult> =
let call = writeEventsLogged client pk events
let writeEvents (log : ILogger) retryPolicy client pk (events : Store.EventData[],maybeIndexEvents): Async<EqxSyncResult> =
let call = writeEventsLogged client pk (events,maybeIndexEvents)
Log.withLoggedRetries retryPolicy "writeAttempt" call log

module private Read =
Expand Down Expand Up @@ -397,8 +421,8 @@ type EqxGateway(conn : EqxConnection, batching : EqxBatchingPolicy) =
match events |> Array.tryFindBack isCompactionEvent with
| None -> return Token.ofPreviousTokenAndEventsLength token events.Length batching.BatchSize pos, events
| Some resolvedEvent -> return Token.ofCompactionResolvedEventAndVersion resolvedEvent batching.BatchSize pos, events }
member __.TrySync log (Pos pos as token) (encodedEvents: Store.EventData[]) isCompactionEventType: Async<GatewaySyncResult> = async {
let! wr = Write.writeEvents log conn.WriteRetryPolicy conn.Client pos encodedEvents
member __.TrySync log (Pos pos as token) (encodedEvents: Store.EventData[],maybeIndexEvents) isCompactionEventType: Async<GatewaySyncResult> = async {
let! wr = Write.writeEvents log conn.WriteRetryPolicy conn.Client pos (encodedEvents,maybeIndexEvents)
match wr with
| EqxSyncResult.Conflict _ -> return GatewaySyncResult.Conflict
| EqxSyncResult.Written (wr, _) ->
Expand All @@ -419,10 +443,16 @@ type private Collection(gateway : EqxGateway, databaseId, collectionId) =
member __.Gateway = gateway
member __.CollectionUri = Client.UriFactory.CreateDocumentCollectionUri(databaseId, collectionId)

[<NoComparison; NoEquality; RequireQualifiedAccess>]
type SearchStrategy<'event> =
| EventType of string
| Predicate of ('event -> bool)

[<NoComparison; NoEquality; RequireQualifiedAccess>]
type AccessStrategy<'event,'state> =
| EventsAreState
| RollingSnapshots of eventType: string * compact: ('state -> 'event)
| IndexedSearch of predicate: ('event -> bool) * index: ('state -> 'event seq)

type private CompactionContext(eventsLen : int, capacityBeforeCompaction : int) =
/// Determines whether writing a Compaction event is warranted (based on the existing state and the current `Accumulated` changes)
Expand All @@ -432,13 +462,20 @@ type private Category<'event, 'state>(coll : Collection, codec : UnionCodec.IUni
let (|Pos|) streamName : Store.Position = { collectionUri = coll.CollectionUri; streamName = streamName; index = None }
let compactionPredicate =
match access with
| Some (AccessStrategy.IndexedSearch _)
| None -> None
| Some AccessStrategy.EventsAreState -> Some (fun _ -> true)
| Some (AccessStrategy.RollingSnapshots (et,_)) -> Some ((=) et)
let searchPredicate =
match access with
| None -> None
| Some AccessStrategy.EventsAreState -> Some (SearchStrategy.Predicate (fun _ -> true))
| Some (AccessStrategy.IndexedSearch (ep,_)) -> Some (SearchStrategy.Predicate ep)
let loadAlgorithm load (Pos pos) initial log =
let batched = load initial (coll.Gateway.LoadBatched log None pos)
let compacted predicate = load initial (coll.Gateway.LoadBackwardsStoppingAtCompactionEvent log predicate pos)
match access with
| Some (AccessStrategy.IndexedSearch _)
| None -> batched
| Some AccessStrategy.EventsAreState -> compacted (fun _ -> true)
| Some (AccessStrategy.RollingSnapshots (et,_)) -> compacted ((=) et)
Expand All @@ -452,14 +489,18 @@ type private Category<'event, 'state>(coll : Collection, codec : UnionCodec.IUni
member __.TrySync (fold: 'state -> 'event seq -> 'state) (log : ILogger)
(token : Storage.StreamToken, state : 'state)
(events : 'event list, state' : 'state) : Async<Storage.SyncResult<'state>> = async {
let events =
let events, index =
match access with
| None | Some AccessStrategy.EventsAreState -> events
| None | Some AccessStrategy.EventsAreState ->
events, None
| Some (AccessStrategy.RollingSnapshots (_,f)) ->
let cc = CompactionContext(List.length events, token.batchCapacityLimit.Value)
if cc.IsCompactionDue then events @ [f state'] else events
(if cc.IsCompactionDue then events @ [f state'] else events), None
| Some (AccessStrategy.IndexedSearch (_,index)) ->
events, Some (index state')
let encodedEvents : Store.EventData[] = UnionEncoderAdapters.encodeEvents codec (Seq.ofList events)
let! syncRes = coll.Gateway.TrySync log token encodedEvents compactionPredicate
let maybeIndexEvents : Store.EventData[] option = index |> Option.map (UnionEncoderAdapters.encodeEvents codec)
let! syncRes = coll.Gateway.TrySync log token (encodedEvents,maybeIndexEvents) compactionPredicate
match syncRes with
| GatewaySyncResult.Conflict -> return Storage.SyncResult.Conflict (load fold state (coll.Gateway.LoadFromToken log token compactionPredicate true))
| GatewaySyncResult.Written token' -> return Storage.SyncResult.Written (token', fold state (Seq.ofList events)) }
Expand Down Expand Up @@ -590,7 +631,8 @@ module Initialization =
return coll.Resource.Id }

let createProc (client: IDocumentClient) (collectionUri: Uri) = async {
let f ="""function multidocInsert(docs) {
let f ="""function multidocInsert(docs,index) {
// TODO insert or update the index, verifying expectedVersion
var response = getContext().getResponse();
var collection = getContext().getCollection();
var collectionLink = collection.getSelfLink();
Expand Down

0 comments on commit a976fc6

Please sign in to comment.