Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Codec support for MemoryStore #173

Merged
merged 3 commits into from
Oct 26, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ The `Unreleased` section name is replaced by the expected version of next releas
### Added

- `Cosmos`: Exposed a `Connector.CreateClient` for interop with V2 ChangeFeedProcessor and `Propulsion.Cosmos` [#171](https://github.com/jet/equinox/pull/171)
- `MemoryStore`: Supports custom Codec logic (can use `FsCodec.Box.Codec` as default) [#173](https://github.com/jet/equinox/pull/173)

### Changed

Expand Down
2 changes: 1 addition & 1 deletion samples/Infrastructure/Services.fs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ type StreamResolver(storage) =
snapshot: (('event -> bool) * ('state -> 'event))) =
match storage with
| Storage.StorageConfig.Memory store ->
Equinox.MemoryStore.Resolver(store, fold, initial).Resolve
Equinox.MemoryStore.Resolver(store, codec, fold, initial).Resolve
| Storage.StorageConfig.Es (context, caching, unfolds) ->
let accessStrategy = if unfolds then Equinox.EventStore.AccessStrategy.RollingSnapshots snapshot |> Some else None
Equinox.EventStore.Resolver<'event,'state,_>(context, codec, fold, initial, ?caching = caching, ?access = accessStrategy).Resolve
Expand Down
3 changes: 2 additions & 1 deletion samples/Infrastructure/Storage.fs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ exception MissingArg of string

[<RequireQualifiedAccess; NoEquality; NoComparison>]
type StorageConfig =
| Memory of Equinox.MemoryStore.VolatileStore
// For MemoryStore, we keep the events as UTF8 arrays - we could use FsCodec.Codec.Box to remove the JSON encoding, which would improve perf but can conceal problems
| Memory of Equinox.MemoryStore.VolatileStore<byte[]>
| Es of Equinox.EventStore.Context * Equinox.EventStore.CachingStrategy option * unfolds: bool
| Cosmos of Equinox.Cosmos.Gateway * Equinox.Cosmos.CachingStrategy * unfolds: bool * databaseId: string * containerId: string

Expand Down
5 changes: 3 additions & 2 deletions samples/Store/Integration/CartIntegration.fs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@ let fold, initial = Domain.Cart.Folds.fold, Domain.Cart.Folds.initial
let snapshot = Domain.Cart.Folds.isOrigin, Domain.Cart.Folds.compact

let createMemoryStore () =
new VolatileStore ()
// we want to validate that the JSON UTF8 is working happily
new VolatileStore<byte[]>()
let createServiceMemory log store =
Backend.Cart.Service(log, fun (id,opt) -> MemoryStore.Resolver(store, fold, initial).Resolve(id,?option=opt))
Backend.Cart.Service(log, fun (id,opt) -> MemoryStore.Resolver(store, Domain.Cart.Events.codec, fold, initial).Resolve(id,?option=opt))

let codec = Domain.Cart.Events.codec

Expand Down
4 changes: 2 additions & 2 deletions samples/Store/Integration/ContactPreferencesIntegration.fs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ open Xunit
let fold, initial = Domain.ContactPreferences.Folds.fold, Domain.ContactPreferences.Folds.initial

let createMemoryStore () =
new MemoryStore.VolatileStore()
new MemoryStore.VolatileStore<_>()
let createServiceMemory log store =
Backend.ContactPreferences.Service(log, MemoryStore.Resolver(store, fold, initial).Resolve)
Backend.ContactPreferences.Service(log, MemoryStore.Resolver(store, FsCodec.Box.Codec.Create(), fold, initial).Resolve)

let codec = Domain.ContactPreferences.Events.codec
let resolveStreamGesWithOptimizedStorageSemantics gateway =
Expand Down
4 changes: 2 additions & 2 deletions samples/Store/Integration/FavoritesIntegration.fs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ let fold, initial = Domain.Favorites.Folds.fold, Domain.Favorites.Folds.initial
let snapshot = Domain.Favorites.Folds.isOrigin, Domain.Favorites.Folds.compact

let createMemoryStore () =
new MemoryStore.VolatileStore()
new MemoryStore.VolatileStore<_>()
let createServiceMemory log store =
Backend.Favorites.Service(log, MemoryStore.Resolver(store, fold, initial).Resolve)
Backend.Favorites.Service(log, MemoryStore.Resolver(store, FsCodec.Box.Codec.Create(), fold, initial).Resolve)

let codec = Domain.Favorites.Events.codec
let createServiceGes gateway log =
Expand Down
13 changes: 7 additions & 6 deletions samples/Tutorial/Counter.fsx
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,13 @@
(* Events are things that have already happened,
they always exist in the past, and should always be past tense verbs*)

(* A counter going up might clear to 0, but a counter going down might clear to 100. *)
type Cleared = { value : int }
type Event =
| Incremented
| Decremented
| Cleared of int // NOTE int payload will need to be wrapped in a record if using .Cosmos and/or .EventSore
(* A counter going up might clear to 0,
but a counter going down might clear to 100. *)
| Cleared of Cleared
interface TypeShape.UnionContract.IUnionContract

type State = State of int
let initial : State = State 0
Expand All @@ -25,7 +26,7 @@ let evolve state event =
match event, state with
| Incremented, State s -> State (s + 1)
| Decremented, State s -> State (s - 1)
| Cleared x , _ -> State x
| Cleared { value = x }, _ -> State x

(*fold is just folding the evolve function over all events to get the current state
It's equivalent to Linq's Aggregate function *)
Expand All @@ -46,7 +47,7 @@ let decide command (State state) =
| Decrement ->
if state <= 0 then [] else [Decremented]
| Clear i ->
if state = i then [] else [Cleared i]
if state = i then [] else [Cleared {value = i}]

type Service(log, resolveStream, ?maxAttempts) =
let (|AggregateId|) (id : string) = Equinox.AggregateId("Counter", id)
Expand All @@ -66,7 +67,7 @@ type Service(log, resolveStream, ?maxAttempts) =
read instanceId

let store = Equinox.MemoryStore.VolatileStore()
let resolve = Equinox.MemoryStore.Resolver(store, fold, initial).Resolve
let resolve = Equinox.MemoryStore.Resolver(store, FsCodec.Box.Codec.Create(), fold, initial).Resolve
open Serilog
let log = LoggerConfiguration().WriteTo.Console().CreateLogger()
let service = Service(log, resolve, maxAttempts=3)
Expand Down
17 changes: 14 additions & 3 deletions samples/Tutorial/Favorites.fsx
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
type Event =
| Added of string
| Removed of string
// No IUnionContract or Codec required as we're using MemoryStore in this example
// No IUnionContract or Codec required as we're using a custom encoder in this example
// interface TypeShape.UnionContract.IUnionContract

let initial : string list = []
let evolve state = function
Expand Down Expand Up @@ -90,9 +91,19 @@ let clientAFavoritesStreamId = Equinox.AggregateId(categoryName,"ClientA")
// For test purposes, we use the in-memory store
let store = Equinox.MemoryStore.VolatileStore()

let codec =
// For this example, we hand-code; normally one uses one of the FsCodec auto codecs, which codegen something similar
let encode = function
| Added x -> "Add",box x
| Removed x -> "Remove",box x
let tryDecode : string*obj -> Event option = function
| "Add", (:? string as x) -> Added x |> Some
| "Remove", (:? string as x) -> Removed x |> Some
| _ -> None
FsCodec.Codec.Create(encode,tryDecode)
// Each store has a Resolver which provides an IStream instance which binds to a specific stream in a specific store
// ... because the nature of the contract with the handler is such that the store hands over State, we also pass the `initial` and `fold` as we used above
let stream streamName = Equinox.MemoryStore.Resolver(store, fold, initial).Resolve(streamName)
let stream streamName = Equinox.MemoryStore.Resolver(store, codec, fold, initial).Resolve(streamName)

// We hand the streamId to the resolver
let clientAStream = stream clientAFavoritesStreamId
Expand Down Expand Up @@ -137,7 +148,7 @@ type Service(log, resolveStream) =
let stream = streamHandlerFor clientId
stream.Read

let resolveStream = Equinox.MemoryStore.Resolver(store, fold, initial).Resolve
let resolveStream = Equinox.MemoryStore.Resolver(store, codec, fold, initial).Resolve

let service = Service(log, resolveStream)

Expand Down
3 changes: 3 additions & 0 deletions src/Equinox.MemoryStore/Equinox.MemoryStore.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@

<PackageReference Include="FSharp.Core" Version="3.1.2.5" Condition=" '$(TargetFramework)' == 'net461' " />
<PackageReference Include="FSharp.Core" Version="4.3.4" Condition=" '$(TargetFramework)' == 'netstandard2.0' " />

<!-- only uses FsCodec.Box, which happens to be houses in the NewtonsotJson package-->
<PackageReference Include="FsCodec.NewtonsoftJson" Version="1.2.0" />
</ItemGroup>

</Project>
81 changes: 33 additions & 48 deletions src/Equinox.MemoryStore/MemoryStore.fs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ namespace Equinox.MemoryStore

open Equinox
open Equinox.Core
open Serilog
open System.Runtime.InteropServices

/// Equivalent to GetEventStore's in purpose; signals a conflict has been detected and reprocessing of the decision will be necessary
Expand All @@ -19,87 +18,73 @@ type ConcurrentDictionarySyncResult<'t> = Written of 't | Conflict of int
[<NoEquality; NoComparison>]
type ConcurrentArraySyncResult<'t> = Written of 't | Conflict of 't

// Maintains a dictionary of boxed typed arrays, raising exceptions if an attempt to extract a value encounters a mismatched type
type VolatileStore() =
let streams = System.Collections.Concurrent.ConcurrentDictionary<string,obj>()
let mkBadValueException (log : ILogger) streamName (value : obj) =
let desc = match value with null -> "null" | v -> v.GetType().FullName
let ex : exn = invalidOp (sprintf "Could not read stream %s, value was a: %s" streamName desc)
log.Error<_,_>(ex, "Read Bad Value {StreamName} {Value}", streamName, value)
ex
let mkWrongVersionException (log : ILogger) streamName (expected : int) (value: obj) =
let ex : exn = WrongVersionException (streamName, expected, value)
log.Warning<_,_,_>(ex, "Unexpected Stored Value {StreamName} {Expected} {Value}", streamName, expected, value)
ex
member private __.Unpack<'event> log streamName (x : obj): 'event array =
match x with
| :? ('event array) as value -> value
| value -> raise (mkBadValueException log streamName value)
member private __.Pack (events : 'event seq) : obj =
Array.ofSeq events |> box
/// Maintains a dictionary of ITimelineEvent<'Format>[] per stream-name, allowing one to vary the encoding used to match that of a given concrete store, or optimize test run performance
type VolatileStore<'Format>() =
let streams = System.Collections.Concurrent.ConcurrentDictionary<string,FsCodec.ITimelineEvent<'Format>[]>()

/// Loads state from a given stream
member __.TryLoad streamName log =
match streams.TryGetValue streamName with
| false, _ -> None
| true, packed -> __.Unpack log streamName packed |> Some
member __.TryLoad streamName = match streams.TryGetValue streamName with false, _ -> None | true, packed -> Some packed

/// Attempts a synchronization operation - yields conflicting value if sync function decides there is a conflict
member __.TrySync streamName (log : ILogger) (trySyncValue : 'events array -> ConcurrentDictionarySyncResult<'event seq>) (events: 'event seq)
: ConcurrentArraySyncResult<'event array> =
let seedStream _streamName = __.Pack events
let updatePackedValue streamName (packedCurrentValue : obj) =
let currentValue = __.Unpack log streamName packedCurrentValue
member __.TrySync
( streamName, trySyncValue : FsCodec.ITimelineEvent<'Format>[] -> ConcurrentDictionarySyncResult<FsCodec.ITimelineEvent<'Format>[]>,
events: FsCodec.ITimelineEvent<'Format>[])
: ConcurrentArraySyncResult<FsCodec.ITimelineEvent<'Format>[]> =
let seedStream _streamName = events
let updateValue streamName (currentValue : FsCodec.ITimelineEvent<'Format>[]) =
match trySyncValue currentValue with
| ConcurrentDictionarySyncResult.Conflict expectedVersion -> raise (mkWrongVersionException log streamName expectedVersion packedCurrentValue)
| ConcurrentDictionarySyncResult.Written value -> __.Pack value
try
let boxedSyncedValue = streams.AddOrUpdate(streamName, seedStream, updatePackedValue)
ConcurrentArraySyncResult.Written (unbox boxedSyncedValue)
with WrongVersionException(_, _, conflictingValue) ->
ConcurrentArraySyncResult.Conflict (unbox conflictingValue)
| ConcurrentDictionarySyncResult.Conflict expectedVersion -> raise <| WrongVersionException (streamName, expectedVersion, box currentValue)
| ConcurrentDictionarySyncResult.Written value -> value
try streams.AddOrUpdate(streamName, seedStream, updateValue) |> Written
with WrongVersionException(_, _, conflictingValue) -> unbox conflictingValue |> Conflict

type Token = { streamVersion: int; streamName: string }

/// Internal implementation detail of MemoryStreamStore
module private Token =

let private streamTokenOfIndex streamName (streamVersion : int) : StreamToken =
{ value = box { streamName = streamName; streamVersion = streamVersion }
version = int64 streamVersion }
let (|Unpack|) (token: StreamToken) : Token = unbox<Token> token.value
/// Represent a stream known to be empty
let ofEmpty streamName initial = streamTokenOfIndex streamName -1, initial
let tokenOfArray streamName (value: 'event array) = Array.length value - 1 |> streamTokenOfIndex streamName
let tokenOfSeq streamName (value: 'event seq) = Seq.length value - 1 |> streamTokenOfIndex streamName
/// Represent a known array of events (without a known folded State)
let ofEventArray streamName fold initial (events: 'event array) = tokenOfArray streamName events, fold initial (Seq.ofArray events)
/// Represent a known array of Events together with the associated state
let ofEventArrayAndKnownState streamName fold (state: 'state) (events: 'event array) = tokenOfArray streamName events, fold state events
let ofEventArrayAndKnownState streamName fold (state: 'state) (events: 'event seq) = tokenOfSeq streamName events, fold state events

/// Represents the state of a set of streams in a style consistent withe the concrete Store types - no constraints on memory consumption (but also no persistence!).
type Category<'event, 'state, 'context>(store : VolatileStore, fold, initial) =
type Category<'event, 'state, 'context, 'Format>(store : VolatileStore<'Format>, codec : FsCodec.IUnionEncoder<'event,'Format,'context>, fold, initial) =
let (|Decode|) = Array.choose codec.TryDecode
interface ICategory<'event, 'state, string, 'context> with
member __.Load(log, streamName, _opt) = async {
match store.TryLoad<'event> streamName log with
member __.Load(_log, streamName, _opt) = async {
match store.TryLoad streamName with
| None -> return Token.ofEmpty streamName initial
| Some events -> return Token.ofEventArray streamName fold initial events }
member __.TrySync(log : ILogger, Token.Unpack token, state, events : 'event list, _context) = async {
| Some (Decode events) -> return Token.ofEventArray streamName fold initial events }
member __.TrySync(_log, Token.Unpack token, state, events : 'event list, context : 'context option) = async {
let inline map i (e : FsCodec.IEventData<'Format>) =
FsCodec.Core.TimelineEvent.Create(int64 i,e.EventType,e.Data,e.Meta,e.CorrelationId,e.CausationId,e.Timestamp) :> FsCodec.ITimelineEvent<'Format>
let encoded = events |> Seq.mapi (fun i e -> map (token.streamVersion+i) (codec.Encode(context,e))) |> Array.ofSeq
let trySyncValue currentValue =
if Array.length currentValue <> token.streamVersion + 1 then ConcurrentDictionarySyncResult.Conflict (token.streamVersion)
else ConcurrentDictionarySyncResult.Written (Seq.append currentValue events)
match store.TrySync token.streamName (log : ILogger) trySyncValue events with
else ConcurrentDictionarySyncResult.Written (Seq.append currentValue encoded |> Array.ofSeq)
match store.TrySync(token.streamName, trySyncValue, encoded) with
| ConcurrentArraySyncResult.Conflict conflictingEvents ->
let resync = async {
let version = Token.tokenOfArray token.streamName conflictingEvents
let successorEvents = conflictingEvents |> Seq.skip (token.streamVersion + 1) |> List.ofSeq
return version, fold state (Seq.ofList successorEvents) }
return version, fold state (successorEvents |> Seq.choose codec.TryDecode) }
return SyncResult.Conflict resync
| ConcurrentArraySyncResult.Written events -> return SyncResult.Written <| Token.ofEventArrayAndKnownState token.streamName fold state events }
| ConcurrentArraySyncResult.Written _ -> return SyncResult.Written <| Token.ofEventArrayAndKnownState token.streamName fold state events }

type Resolver<'event, 'state, 'context>(store : VolatileStore, fold, initial) =
let category = Category<'event,'state,'context>(store, fold, initial)
type Resolver<'event, 'state, 'Format, 'context>(store : VolatileStore<'Format>, codec : FsCodec.IUnionEncoder<'event,'Format,'context>, fold, initial) =
let category = Category<'event, 'state, 'context, 'Format>(store, codec, fold, initial)
let resolveStream streamName context = Stream.create category streamName None context
let resolveTarget = function AggregateId (cat,streamId) -> sprintf "%s-%s" cat streamId | StreamName streamName -> streamName
member __.Resolve(target : Target, [<Optional; DefaultParameterValue null>] ?option, [<Optional; DefaultParameterValue null>] ?context) =
member __.Resolve(target : Target, [<Optional; DefaultParameterValue null>] ?option, [<Optional; DefaultParameterValue null>] ?context : 'context) =
match resolveTarget target, option with
| sn,(None|Some AllowStale) -> resolveStream sn context
| sn,Some AssumeEmpty -> Stream.ofMemento (Token.ofEmpty sn initial) (resolveStream sn context)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ open Swensen.Unquote
open Equinox.MemoryStore

let createMemoryStore () =
new VolatileStore()
new VolatileStore<_>()

let createServiceMemory log store =
let resolveStream (id,opt) = Resolver(store, Domain.Cart.Folds.fold, Domain.Cart.Folds.initial).Resolve(id,?option=opt)
let resolveStream (id,opt) = Resolver(store, FsCodec.Box.Codec.Create(), Domain.Cart.Folds.fold, Domain.Cart.Folds.initial).Resolve(id,?option=opt)
Backend.Cart.Service(log, resolveStream)

#nowarn "1182" // From hereon in, we may have some 'unused' privates (the tests)
Expand Down