Skip to content

Commit

Permalink
Add Codec support for MemoryStore (#173)
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink authored Oct 26, 2019
1 parent 8a7bdac commit 259ad25
Show file tree
Hide file tree
Showing 11 changed files with 70 additions and 67 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ The `Unreleased` section name is replaced by the expected version of next releas
### Added

- `Cosmos`: Exposed a `Connector.CreateClient` for interop with V2 ChangeFeedProcessor and `Propulsion.Cosmos` [#171](https://github.com/jet/equinox/pull/171)
- `MemoryStore`: Supports custom Codec logic (can use `FsCodec.Box.Codec` as default) [#173](https://github.com/jet/equinox/pull/173)

### Changed

Expand Down
2 changes: 1 addition & 1 deletion samples/Infrastructure/Services.fs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ type StreamResolver(storage) =
snapshot: (('event -> bool) * ('state -> 'event))) =
match storage with
| Storage.StorageConfig.Memory store ->
Equinox.MemoryStore.Resolver(store, fold, initial).Resolve
Equinox.MemoryStore.Resolver(store, codec, fold, initial).Resolve
| Storage.StorageConfig.Es (context, caching, unfolds) ->
let accessStrategy = if unfolds then Equinox.EventStore.AccessStrategy.RollingSnapshots snapshot |> Some else None
Equinox.EventStore.Resolver<'event,'state,_>(context, codec, fold, initial, ?caching = caching, ?access = accessStrategy).Resolve
Expand Down
3 changes: 2 additions & 1 deletion samples/Infrastructure/Storage.fs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ exception MissingArg of string

[<RequireQualifiedAccess; NoEquality; NoComparison>]
type StorageConfig =
| Memory of Equinox.MemoryStore.VolatileStore
// For MemoryStore, we keep the events as UTF8 arrays - we could use FsCodec.Codec.Box to remove the JSON encoding, which would improve perf but can conceal problems
| Memory of Equinox.MemoryStore.VolatileStore<byte[]>
| Es of Equinox.EventStore.Context * Equinox.EventStore.CachingStrategy option * unfolds: bool
| Cosmos of Equinox.Cosmos.Gateway * Equinox.Cosmos.CachingStrategy * unfolds: bool * databaseId: string * containerId: string

Expand Down
5 changes: 3 additions & 2 deletions samples/Store/Integration/CartIntegration.fs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@ let fold, initial = Domain.Cart.Folds.fold, Domain.Cart.Folds.initial
let snapshot = Domain.Cart.Folds.isOrigin, Domain.Cart.Folds.compact

let createMemoryStore () =
new VolatileStore ()
// we want to validate that the JSON UTF8 is working happily
new VolatileStore<byte[]>()
let createServiceMemory log store =
Backend.Cart.Service(log, fun (id,opt) -> MemoryStore.Resolver(store, fold, initial).Resolve(id,?option=opt))
Backend.Cart.Service(log, fun (id,opt) -> MemoryStore.Resolver(store, Domain.Cart.Events.codec, fold, initial).Resolve(id,?option=opt))

let codec = Domain.Cart.Events.codec

Expand Down
4 changes: 2 additions & 2 deletions samples/Store/Integration/ContactPreferencesIntegration.fs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ open Xunit
let fold, initial = Domain.ContactPreferences.Folds.fold, Domain.ContactPreferences.Folds.initial

let createMemoryStore () =
new MemoryStore.VolatileStore()
new MemoryStore.VolatileStore<_>()
let createServiceMemory log store =
Backend.ContactPreferences.Service(log, MemoryStore.Resolver(store, fold, initial).Resolve)
Backend.ContactPreferences.Service(log, MemoryStore.Resolver(store, FsCodec.Box.Codec.Create(), fold, initial).Resolve)

let codec = Domain.ContactPreferences.Events.codec
let resolveStreamGesWithOptimizedStorageSemantics gateway =
Expand Down
4 changes: 2 additions & 2 deletions samples/Store/Integration/FavoritesIntegration.fs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ let fold, initial = Domain.Favorites.Folds.fold, Domain.Favorites.Folds.initial
let snapshot = Domain.Favorites.Folds.isOrigin, Domain.Favorites.Folds.compact

let createMemoryStore () =
new MemoryStore.VolatileStore()
new MemoryStore.VolatileStore<_>()
let createServiceMemory log store =
Backend.Favorites.Service(log, MemoryStore.Resolver(store, fold, initial).Resolve)
Backend.Favorites.Service(log, MemoryStore.Resolver(store, FsCodec.Box.Codec.Create(), fold, initial).Resolve)

let codec = Domain.Favorites.Events.codec
let createServiceGes gateway log =
Expand Down
13 changes: 7 additions & 6 deletions samples/Tutorial/Counter.fsx
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,13 @@
(* Events are things that have already happened,
they always exist in the past, and should always be past tense verbs*)

(* A counter going up might clear to 0, but a counter going down might clear to 100. *)
type Cleared = { value : int }
type Event =
| Incremented
| Decremented
| Cleared of int // NOTE int payload will need to be wrapped in a record if using .Cosmos and/or .EventSore
(* A counter going up might clear to 0,
but a counter going down might clear to 100. *)
| Cleared of Cleared
interface TypeShape.UnionContract.IUnionContract

type State = State of int
let initial : State = State 0
Expand All @@ -25,7 +26,7 @@ let evolve state event =
match event, state with
| Incremented, State s -> State (s + 1)
| Decremented, State s -> State (s - 1)
| Cleared x , _ -> State x
| Cleared { value = x }, _ -> State x

(*fold is just folding the evolve function over all events to get the current state
It's equivalent to Linq's Aggregate function *)
Expand All @@ -46,7 +47,7 @@ let decide command (State state) =
| Decrement ->
if state <= 0 then [] else [Decremented]
| Clear i ->
if state = i then [] else [Cleared i]
if state = i then [] else [Cleared {value = i}]

type Service(log, resolveStream, ?maxAttempts) =
let (|AggregateId|) (id : string) = Equinox.AggregateId("Counter", id)
Expand All @@ -66,7 +67,7 @@ type Service(log, resolveStream, ?maxAttempts) =
read instanceId

let store = Equinox.MemoryStore.VolatileStore()
let resolve = Equinox.MemoryStore.Resolver(store, fold, initial).Resolve
let resolve = Equinox.MemoryStore.Resolver(store, FsCodec.Box.Codec.Create(), fold, initial).Resolve
open Serilog
let log = LoggerConfiguration().WriteTo.Console().CreateLogger()
let service = Service(log, resolve, maxAttempts=3)
Expand Down
17 changes: 14 additions & 3 deletions samples/Tutorial/Favorites.fsx
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
type Event =
| Added of string
| Removed of string
// No IUnionContract or Codec required as we're using MemoryStore in this example
// No IUnionContract or Codec required as we're using a custom encoder in this example
// interface TypeShape.UnionContract.IUnionContract

let initial : string list = []
let evolve state = function
Expand Down Expand Up @@ -90,9 +91,19 @@ let clientAFavoritesStreamId = Equinox.AggregateId(categoryName,"ClientA")
// For test purposes, we use the in-memory store
let store = Equinox.MemoryStore.VolatileStore()

let codec =
// For this example, we hand-code; normally one uses one of the FsCodec auto codecs, which codegen something similar
let encode = function
| Added x -> "Add",box x
| Removed x -> "Remove",box x
let tryDecode : string*obj -> Event option = function
| "Add", (:? string as x) -> Added x |> Some
| "Remove", (:? string as x) -> Removed x |> Some
| _ -> None
FsCodec.Codec.Create(encode,tryDecode)
// Each store has a Resolver which provides an IStream instance which binds to a specific stream in a specific store
// ... because the nature of the contract with the handler is such that the store hands over State, we also pass the `initial` and `fold` as we used above
let stream streamName = Equinox.MemoryStore.Resolver(store, fold, initial).Resolve(streamName)
let stream streamName = Equinox.MemoryStore.Resolver(store, codec, fold, initial).Resolve(streamName)

// We hand the streamId to the resolver
let clientAStream = stream clientAFavoritesStreamId
Expand Down Expand Up @@ -137,7 +148,7 @@ type Service(log, resolveStream) =
let stream = streamHandlerFor clientId
stream.Read

let resolveStream = Equinox.MemoryStore.Resolver(store, fold, initial).Resolve
let resolveStream = Equinox.MemoryStore.Resolver(store, codec, fold, initial).Resolve

let service = Service(log, resolveStream)

Expand Down
3 changes: 3 additions & 0 deletions src/Equinox.MemoryStore/Equinox.MemoryStore.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@

<PackageReference Include="FSharp.Core" Version="3.1.2.5" Condition=" '$(TargetFramework)' == 'net461' " />
<PackageReference Include="FSharp.Core" Version="4.3.4" Condition=" '$(TargetFramework)' == 'netstandard2.0' " />

<!-- only uses FsCodec.Box, which happens to be houses in the NewtonsotJson package-->
<PackageReference Include="FsCodec.NewtonsoftJson" Version="1.2.0" />
</ItemGroup>

</Project>
81 changes: 33 additions & 48 deletions src/Equinox.MemoryStore/MemoryStore.fs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ namespace Equinox.MemoryStore

open Equinox
open Equinox.Core
open Serilog
open System.Runtime.InteropServices

/// Equivalent to GetEventStore's in purpose; signals a conflict has been detected and reprocessing of the decision will be necessary
Expand All @@ -19,87 +18,73 @@ type ConcurrentDictionarySyncResult<'t> = Written of 't | Conflict of int
[<NoEquality; NoComparison>]
type ConcurrentArraySyncResult<'t> = Written of 't | Conflict of 't

// Maintains a dictionary of boxed typed arrays, raising exceptions if an attempt to extract a value encounters a mismatched type
type VolatileStore() =
let streams = System.Collections.Concurrent.ConcurrentDictionary<string,obj>()
let mkBadValueException (log : ILogger) streamName (value : obj) =
let desc = match value with null -> "null" | v -> v.GetType().FullName
let ex : exn = invalidOp (sprintf "Could not read stream %s, value was a: %s" streamName desc)
log.Error<_,_>(ex, "Read Bad Value {StreamName} {Value}", streamName, value)
ex
let mkWrongVersionException (log : ILogger) streamName (expected : int) (value: obj) =
let ex : exn = WrongVersionException (streamName, expected, value)
log.Warning<_,_,_>(ex, "Unexpected Stored Value {StreamName} {Expected} {Value}", streamName, expected, value)
ex
member private __.Unpack<'event> log streamName (x : obj): 'event array =
match x with
| :? ('event array) as value -> value
| value -> raise (mkBadValueException log streamName value)
member private __.Pack (events : 'event seq) : obj =
Array.ofSeq events |> box
/// Maintains a dictionary of ITimelineEvent<'Format>[] per stream-name, allowing one to vary the encoding used to match that of a given concrete store, or optimize test run performance
type VolatileStore<'Format>() =
let streams = System.Collections.Concurrent.ConcurrentDictionary<string,FsCodec.ITimelineEvent<'Format>[]>()

/// Loads state from a given stream
member __.TryLoad streamName log =
match streams.TryGetValue streamName with
| false, _ -> None
| true, packed -> __.Unpack log streamName packed |> Some
member __.TryLoad streamName = match streams.TryGetValue streamName with false, _ -> None | true, packed -> Some packed

/// Attempts a synchronization operation - yields conflicting value if sync function decides there is a conflict
member __.TrySync streamName (log : ILogger) (trySyncValue : 'events array -> ConcurrentDictionarySyncResult<'event seq>) (events: 'event seq)
: ConcurrentArraySyncResult<'event array> =
let seedStream _streamName = __.Pack events
let updatePackedValue streamName (packedCurrentValue : obj) =
let currentValue = __.Unpack log streamName packedCurrentValue
member __.TrySync
( streamName, trySyncValue : FsCodec.ITimelineEvent<'Format>[] -> ConcurrentDictionarySyncResult<FsCodec.ITimelineEvent<'Format>[]>,
events: FsCodec.ITimelineEvent<'Format>[])
: ConcurrentArraySyncResult<FsCodec.ITimelineEvent<'Format>[]> =
let seedStream _streamName = events
let updateValue streamName (currentValue : FsCodec.ITimelineEvent<'Format>[]) =
match trySyncValue currentValue with
| ConcurrentDictionarySyncResult.Conflict expectedVersion -> raise (mkWrongVersionException log streamName expectedVersion packedCurrentValue)
| ConcurrentDictionarySyncResult.Written value -> __.Pack value
try
let boxedSyncedValue = streams.AddOrUpdate(streamName, seedStream, updatePackedValue)
ConcurrentArraySyncResult.Written (unbox boxedSyncedValue)
with WrongVersionException(_, _, conflictingValue) ->
ConcurrentArraySyncResult.Conflict (unbox conflictingValue)
| ConcurrentDictionarySyncResult.Conflict expectedVersion -> raise <| WrongVersionException (streamName, expectedVersion, box currentValue)
| ConcurrentDictionarySyncResult.Written value -> value
try streams.AddOrUpdate(streamName, seedStream, updateValue) |> Written
with WrongVersionException(_, _, conflictingValue) -> unbox conflictingValue |> Conflict

type Token = { streamVersion: int; streamName: string }

/// Internal implementation detail of MemoryStreamStore
module private Token =

let private streamTokenOfIndex streamName (streamVersion : int) : StreamToken =
{ value = box { streamName = streamName; streamVersion = streamVersion }
version = int64 streamVersion }
let (|Unpack|) (token: StreamToken) : Token = unbox<Token> token.value
/// Represent a stream known to be empty
let ofEmpty streamName initial = streamTokenOfIndex streamName -1, initial
let tokenOfArray streamName (value: 'event array) = Array.length value - 1 |> streamTokenOfIndex streamName
let tokenOfSeq streamName (value: 'event seq) = Seq.length value - 1 |> streamTokenOfIndex streamName
/// Represent a known array of events (without a known folded State)
let ofEventArray streamName fold initial (events: 'event array) = tokenOfArray streamName events, fold initial (Seq.ofArray events)
/// Represent a known array of Events together with the associated state
let ofEventArrayAndKnownState streamName fold (state: 'state) (events: 'event array) = tokenOfArray streamName events, fold state events
let ofEventArrayAndKnownState streamName fold (state: 'state) (events: 'event seq) = tokenOfSeq streamName events, fold state events

/// Represents the state of a set of streams in a style consistent withe the concrete Store types - no constraints on memory consumption (but also no persistence!).
type Category<'event, 'state, 'context>(store : VolatileStore, fold, initial) =
type Category<'event, 'state, 'context, 'Format>(store : VolatileStore<'Format>, codec : FsCodec.IUnionEncoder<'event,'Format,'context>, fold, initial) =
let (|Decode|) = Array.choose codec.TryDecode
interface ICategory<'event, 'state, string, 'context> with
member __.Load(log, streamName, _opt) = async {
match store.TryLoad<'event> streamName log with
member __.Load(_log, streamName, _opt) = async {
match store.TryLoad streamName with
| None -> return Token.ofEmpty streamName initial
| Some events -> return Token.ofEventArray streamName fold initial events }
member __.TrySync(log : ILogger, Token.Unpack token, state, events : 'event list, _context) = async {
| Some (Decode events) -> return Token.ofEventArray streamName fold initial events }
member __.TrySync(_log, Token.Unpack token, state, events : 'event list, context : 'context option) = async {
let inline map i (e : FsCodec.IEventData<'Format>) =
FsCodec.Core.TimelineEvent.Create(int64 i,e.EventType,e.Data,e.Meta,e.CorrelationId,e.CausationId,e.Timestamp) :> FsCodec.ITimelineEvent<'Format>
let encoded = events |> Seq.mapi (fun i e -> map (token.streamVersion+i) (codec.Encode(context,e))) |> Array.ofSeq
let trySyncValue currentValue =
if Array.length currentValue <> token.streamVersion + 1 then ConcurrentDictionarySyncResult.Conflict (token.streamVersion)
else ConcurrentDictionarySyncResult.Written (Seq.append currentValue events)
match store.TrySync token.streamName (log : ILogger) trySyncValue events with
else ConcurrentDictionarySyncResult.Written (Seq.append currentValue encoded |> Array.ofSeq)
match store.TrySync(token.streamName, trySyncValue, encoded) with
| ConcurrentArraySyncResult.Conflict conflictingEvents ->
let resync = async {
let version = Token.tokenOfArray token.streamName conflictingEvents
let successorEvents = conflictingEvents |> Seq.skip (token.streamVersion + 1) |> List.ofSeq
return version, fold state (Seq.ofList successorEvents) }
return version, fold state (successorEvents |> Seq.choose codec.TryDecode) }
return SyncResult.Conflict resync
| ConcurrentArraySyncResult.Written events -> return SyncResult.Written <| Token.ofEventArrayAndKnownState token.streamName fold state events }
| ConcurrentArraySyncResult.Written _ -> return SyncResult.Written <| Token.ofEventArrayAndKnownState token.streamName fold state events }

type Resolver<'event, 'state, 'context>(store : VolatileStore, fold, initial) =
let category = Category<'event,'state,'context>(store, fold, initial)
type Resolver<'event, 'state, 'Format, 'context>(store : VolatileStore<'Format>, codec : FsCodec.IUnionEncoder<'event,'Format,'context>, fold, initial) =
let category = Category<'event, 'state, 'context, 'Format>(store, codec, fold, initial)
let resolveStream streamName context = Stream.create category streamName None context
let resolveTarget = function AggregateId (cat,streamId) -> sprintf "%s-%s" cat streamId | StreamName streamName -> streamName
member __.Resolve(target : Target, [<Optional; DefaultParameterValue null>] ?option, [<Optional; DefaultParameterValue null>] ?context) =
member __.Resolve(target : Target, [<Optional; DefaultParameterValue null>] ?option, [<Optional; DefaultParameterValue null>] ?context : 'context) =
match resolveTarget target, option with
| sn,(None|Some AllowStale) -> resolveStream sn context
| sn,Some AssumeEmpty -> Stream.ofMemento (Token.ofEmpty sn initial) (resolveStream sn context)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ open Swensen.Unquote
open Equinox.MemoryStore

let createMemoryStore () =
new VolatileStore()
new VolatileStore<_>()

let createServiceMemory log store =
let resolveStream (id,opt) = Resolver(store, Domain.Cart.Folds.fold, Domain.Cart.Folds.initial).Resolve(id,?option=opt)
let resolveStream (id,opt) = Resolver(store, FsCodec.Box.Codec.Create(), Domain.Cart.Folds.fold, Domain.Cart.Folds.initial).Resolve(id,?option=opt)
Backend.Cart.Service(log, resolveStream)

#nowarn "1182" // From hereon in, we may have some 'unused' privates (the tests)
Expand Down

0 comments on commit 259ad25

Please sign in to comment.