Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add CosmosStore.Prometheus #266

Merged
merged 18 commits into from
Dec 3, 2020
Merged
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,13 @@ The `Unreleased` section name is replaced by the expected version of next releas
- Rename `Equinox.Cosmos.Context` -> `Equinox.CosmosStore.CosmosStoreContext`
- Rename `Equinox.Cosmos.Resolver` -> `Equinox.CosmosStore.CosmosStoreCategory`
- Rename `Equinox.Cosmos.Connector` -> `Equinox.CosmosStore.CosmosStoreClientFactory`
- Remove exceptions from 304/404 paths when reading Tip [#257](https://github.com/jet/equinox/pull/257)
- Remove exceptions from 302/404 paths when reading Tip [#257](https://github.com/jet/equinox/pull/257)
- Reorganized `QueryRetryPolicy` to handle `IAsyncEnumerable` coming in Cosmos SDK V4 [#246](https://github.com/jet/equinox/pull/246) :pray: [@ylibrach](https://github.com/ylibrach)
- Added Secondary store fallback for Event loading, enabling Streams to be hot-migrated (archived to a secondary/clone, then pruned from the primary/active) between Primary and Secondary stores [#247](https://github.com/jet/equinox/pull/247), [#259](https://github.com/jet/equinox/pull/259)
- Replaced `BatchingPolicy`, `RetryPolicy` with `TipOptions`, `QueryOptions` to better align with Cosmos SDK V4 [#253](https://github.com/jet/equinox/pull/253)
- Added support for accumulating events in Tip [#251](https://github.com/jet/equinox/pull/251) see also [#110](https://github.com/jet/equinox/pull/110)
- Added support for pruning events in Tip [#258](https://github.com/jet/equinox/pull/258) see also [#233](https://github.com/jet/equinox/pull/233)
- Added Prometheus integration package [#266](https://github.com/jet/equinox/pull/266) see also [#267](https://github.com/jet/equinox/pull/267)
- target `EventStore.Client` v `20.6` (instead of v `5.0.x`) [#224](https://github.com/jet/equinox/pull/224)
- Retarget `netcoreapp2.1` apps to `netcoreapp3.1` with `SystemTextJson`
- Retarget Todobackend to `aspnetcore` v `3.1`
Expand Down
6 changes: 6 additions & 0 deletions Equinox.sln
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ ProjectSection(SolutionItems) = preProject
diagrams\CosmosCode.puml = diagrams\CosmosCode.puml
EndProjectSection
EndProject
Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "Equinox.CosmosStore.Prometheus", "src\Equinox.CosmosStore.Prometheus\Equinox.CosmosStore.Prometheus.fsproj", "{3107BBC1-2BCB-4750-AED0-42B1F4CD1909}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -190,6 +192,10 @@ Global
{3021659A-5CA4-4E06-AF00-2457ED3F105B}.Debug|Any CPU.Build.0 = Debug|Any CPU
{3021659A-5CA4-4E06-AF00-2457ED3F105B}.Release|Any CPU.ActiveCfg = Release|Any CPU
{3021659A-5CA4-4E06-AF00-2457ED3F105B}.Release|Any CPU.Build.0 = Release|Any CPU
{3107BBC1-2BCB-4750-AED0-42B1F4CD1909}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{3107BBC1-2BCB-4750-AED0-42B1F4CD1909}.Debug|Any CPU.Build.0 = Debug|Any CPU
{3107BBC1-2BCB-4750-AED0-42B1F4CD1909}.Release|Any CPU.ActiveCfg = Release|Any CPU
{3107BBC1-2BCB-4750-AED0-42B1F4CD1909}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ The components within this repository are delivered as multi-targeted Nuget pack
- `Equinox.MemoryStore` [![MemoryStore NuGet](https://img.shields.io/nuget/v/Equinox.MemoryStore.svg)](https://www.nuget.org/packages/Equinox.MemoryStore/): In-memory store for integration testing/performance base-lining/providing out-of-the-box zero dependency storage for examples. ([depends](https://www.fuget.org/packages/Equinox.MemoryStore) on `Equinox.Core`, `FsCodec`)
- `Equinox.EventStore` [![EventStore NuGet](https://img.shields.io/nuget/v/Equinox.EventStore.svg)](https://www.nuget.org/packages/Equinox.EventStore/): [EventStoreDB](https://eventstore.org/) Adapter designed to meet Jet's production monitoring requirements. ([depends](https://www.fuget.org/packages/Equinox.EventStore) on `Equinox.Core`, `EventStore.Client >= 20.6`, `FSharp.Control.AsyncSeq >= 2.0.23`)
- `Equinox.CosmosStore` [![CosmosStore NuGet](https://img.shields.io/nuget/v/Equinox.CosmosStore.svg)](https://www.nuget.org/packages/Equinox.CosmosStore/) [*NOTE: `Equinox.Cosmos` from the `/v2` branch is the only version on NuGet atm*](https://github.com/jet/equinox/pull/250#issuecomment-706031334): Azure CosmosDB Adapter with integrated 'unfolds' feature, facilitating optimal read performance in terms of latency and RU costs, instrumented to meet Jet's production monitoring requirements. ([depends](https://www.fuget.org/packages/Equinox.CosmosStore) on `Equinox.Core`, `Microsoft.Azure.Cosmos >= 3.9`, `FsCodec.NewtonsoftJson`, `FSharp.Control.AsyncSeq >= 2.0.23`)
- `Equinox.CosmosStore.Prometheus` [![CosmosStore.Prometheus NuGet](https://img.shields.io/nuget/v/Equinox.CosmosStore.Prometheus.svg)](https://www.nuget.org/packages/Equinox.CosmosStore.Prometheus/): Integration package providing a `Serilog.Core.ILogEventSink` that extracts detailed metrics information attached to the `LogEvent`s and feeds them to the `prometheus-net`'s `Prometheus.Metrics` static instance. ([depends](https://www.fuget.org/packages/Equinox.CosmosStore.Prometheus) on `Equinox.CosmosStore`, `prometheus-net >= 3.6.0`)
- `Equinox.SqlStreamStore` [![SqlStreamStore NuGet](https://img.shields.io/nuget/v/Equinox.SqlStreamStore.svg)](https://www.nuget.org/packages/Equinox.SqlStreamStore/): [SqlStreamStore](https://github.com/SQLStreamStore/SQLStreamStore) Adapter derived from `Equinox.EventStore` - provides core facilities (but does not connect to a specific database; see sibling `SqlStreamStore`.* packages). ([depends](https://www.fuget.org/packages/Equinox.SqlStreamStore) on `Equinox.Core`, `FsCodec`, `SqlStreamStore >= 1.2.0-beta.8`, `FSharp.Control.AsyncSeq`)
- `Equinox.SqlStreamStore.MsSql` [![MsSql NuGet](https://img.shields.io/nuget/v/Equinox.SqlStreamStore.MsSql.svg)](https://www.nuget.org/packages/Equinox.SqlStreamStore.MsSql/): [SqlStreamStore.MsSql](https://sqlstreamstore.readthedocs.io/en/latest/sqlserver) Sql Server `Connector` implementation for `Equinox.SqlStreamStore` package). ([depends](https://www.fuget.org/packages/Equinox.SqlStreamStore.MsSql) on `Equinox.SqlStreamStore`, `SqlStreamStore.MsSql >= 1.2.0-beta.8`)
- `Equinox.SqlStreamStore.MySql` [![MySql NuGet](https://img.shields.io/nuget/v/Equinox.SqlStreamStore.MySql.svg)](https://www.nuget.org/packages/Equinox.SqlStreamStore.MySql/): `SqlStreamStore.MySql` MySQL Í`Connector` implementation for `Equinox.SqlStreamStore` package). ([depends](https://www.fuget.org/packages/Equinox.SqlStreamStore.MySql) on `Equinox.SqlStreamStore`, `SqlStreamStore.MySql >= 1.2.0-beta.8`)
Expand Down
1 change: 1 addition & 0 deletions build.proj
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
<Exec Command="dotnet pack src/Equinox $(Cfg) $(PackOptions)" />
<Exec Command="dotnet pack src/Equinox.Core $(Cfg) $(PackOptions)" />
<Exec Command="dotnet pack src/Equinox.CosmosStore $(Cfg) $(PackOptions)" />
<Exec Command="dotnet pack src/Equinox.CosmosStore.Prometheus $(Cfg) $(PackOptions)" />
<Exec Command="dotnet pack src/Equinox.EventStore $(Cfg) $(PackOptions)" />
<Exec Command="dotnet pack src/Equinox.MemoryStore $(Cfg) $(PackOptions)" />
<Exec Command="dotnet pack src/Equinox.SqlStreamStore $(Cfg) $(PackOptions)" />
Expand Down
32 changes: 16 additions & 16 deletions samples/Store/Integration/LogIntegration.fs
Original file line number Diff line number Diff line change
Expand Up @@ -28,23 +28,23 @@ module EquinoxCosmosInterop =
[<NoEquality; NoComparison>]
type FlatMetric = { action: string; stream : string; interval: StopwatchInterval; bytes: int; count: int; responses: int option; ru: float } with
override __.ToString() = sprintf "%s-Stream=%s %s-Elapsed=%O Ru=%O" __.action __.stream __.action __.interval.Elapsed __.ru
let flatten (evt : Log.Event) : FlatMetric =
let flatten (evt : Log.Metric) : FlatMetric =
let action, metric, batches, ru =
match evt with
| Log.Tip m -> "CosmosTip", m, None, m.ru
| Log.TipNotFound m -> "CosmosTip404", m, None, m.ru
| Log.TipNotModified m -> "CosmosTip302", m, None, m.ru
| Log.Query (Direction.Forward,c,m) -> "CosmosQueryF", m, Some c, m.ru
| Log.Query (Direction.Backward,c,m) -> "CosmosQueryB", m, Some c, m.ru
| Log.Response (Direction.Forward,m) -> "CosmosResponseF", m, None, m.ru
| Log.Response (Direction.Backward,m) -> "CosmosResponseB", m, None, m.ru
| Log.SyncSuccess m -> "CosmosSync200", m, None, m.ru
| Log.SyncConflict m -> "CosmosSync409", m, None, m.ru
| Log.SyncResync m -> "CosmosSyncResync", m, None, m.ru
| Log.PruneResponse m -> "CosmosPruneResponse", m, None, m.ru
| Log.Delete m -> "CosmosDelete", m, None, m.ru
| Log.Trim m -> "CosmosTrim", m, None, m.ru
| Log.Prune (events, m) -> "CosmosPrune", m, Some events, m.ru
| Log.Metric.Tip m -> "CosmosTip", m, None, m.ru
| Log.Metric.TipNotFound m -> "CosmosTip404", m, None, m.ru
| Log.Metric.TipNotModified m -> "CosmosTip302", m, None, m.ru
| Log.Metric.Query (Direction.Forward,c,m) -> "CosmosQueryF", m, Some c, m.ru
| Log.Metric.Query (Direction.Backward,c,m) -> "CosmosQueryB", m, Some c, m.ru
| Log.Metric.QueryResponse (Direction.Forward,m) -> "CosmosResponseF", m, None, m.ru
| Log.Metric.QueryResponse (Direction.Backward,m) -> "CosmosResponseB", m, None, m.ru
| Log.Metric.SyncSuccess m -> "CosmosSync200", m, None, m.ru
| Log.Metric.SyncConflict m -> "CosmosSync409", m, None, m.ru
| Log.Metric.SyncResync m -> "CosmosSyncResync", m, None, m.ru
| Log.Metric.Prune (events, m) -> "CosmosPrune", m, Some events, m.ru
| Log.Metric.PruneResponse m -> "CosmosPruneResponse", m, None, m.ru
| Log.Metric.Delete m -> "CosmosDelete", m, None, m.ru
| Log.Metric.Trim m -> "CosmosTrim", m, None, m.ru
{ action = action; stream = metric.stream; bytes = metric.bytes; count = metric.count; responses = batches
interval = StopwatchInterval(metric.interval.StartTicks,metric.interval.EndTicks); ru = ru }

Expand All @@ -67,7 +67,7 @@ type SerilogMetricsExtractor(emit : string -> unit) =
logEvent.Properties
|> Seq.tryPick (function
| KeyValue (k, SerilogScalar (:? Equinox.EventStore.Log.Event as m)) -> Some <| Choice1Of3 (k,m)
| KeyValue (k, SerilogScalar (:? Equinox.CosmosStore.Core.Log.Event as m)) -> Some <| Choice2Of3 (k,m)
| KeyValue (k, SerilogScalar (:? Equinox.CosmosStore.Core.Log.Metric as m)) -> Some <| Choice2Of3 (k,m)
| _ -> None)
|> Option.defaultValue (Choice3Of3 ())
let handleLogEvent logEvent =
Expand Down
117 changes: 117 additions & 0 deletions src/Equinox.CosmosStore.Prometheus/CosmosStorePrometheus.fs
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
namespace Equinox.CosmosStore.Prometheus

module private Impl =

let baseName stat = "equinox_" + stat
let baseDesc desc = "Equinox CosmosDB " + desc

module private Histograms =

let labelNames = [| "facet"; "op"; "app"; "db"; "con"; "cat" |]
let private mkHistogram (cfg : Prometheus.HistogramConfiguration) name desc =
let h = Prometheus.Metrics.CreateHistogram(name, desc, cfg)
fun (facet : string, op : string) app (db, con, cat : string) s ->
h.WithLabels(facet, op, app, db, con, cat).Observe(s)
// Given we also have summary metrics with equivalent labels, we focus the bucketing on LAN latencies
let private sHistogram =
let sBuckets = [| 0.0005; 0.001; 0.002; 0.004; 0.008; 0.016; 0.5; 1.; 2.; 4.; 8. |]
let sCfg = Prometheus.HistogramConfiguration(Buckets = sBuckets, LabelNames = labelNames)
mkHistogram sCfg
let private ruHistogram =
let ruBuckets = Prometheus.Histogram.ExponentialBuckets(1., 2., 11) // 1 .. 1024
let ruCfg = Prometheus.HistogramConfiguration(Buckets = ruBuckets, LabelNames = labelNames)
mkHistogram ruCfg
let sAndRuPair stat desc =
let baseName, baseDesc = Impl.baseName stat, Impl.baseDesc desc
let observeS = sHistogram (baseName + "_seconds") (baseDesc + " latency")
let observeRu = ruHistogram (baseName + "_ru") (baseDesc + " charge")
fun (facet, op) app (db, con, cat, s : System.TimeSpan, ru) ->
observeS (facet, op) app (db, con, cat) s.TotalSeconds
observeRu (facet, op) app (db, con, cat) ru

module private Summaries =

let labelNames = [| "facet"; "app"; "db"; "con" |]
let private mkSummary (cfg : Prometheus.SummaryConfiguration) name desc =
let s = Prometheus.Metrics.CreateSummary(name, desc, cfg)
fun (facet : string) app (db, con) o -> s.WithLabels(facet, app, db, con).Observe(o)
let config =
let inline qep q e = Prometheus.QuantileEpsilonPair(q, e)
let objectives = [| qep 0.50 0.05; qep 0.95 0.01; qep 0.99 0.01 |]
Prometheus.SummaryConfiguration(Objectives = objectives, LabelNames = labelNames, MaxAge = System.TimeSpan.FromMinutes 1.)
let sAndRuPair stat desc =
let baseName, baseDesc = Impl.baseName stat, Impl.baseDesc desc
let observeS = mkSummary config (baseName + "_seconds") (baseDesc + " latency")
let observeRu = mkSummary config (baseName + "_ru") (baseDesc + " charge")
fun facet app (db, con, s : System.TimeSpan, ru) ->
observeS facet app (db, con) s.TotalSeconds
observeRu facet app (db, con) ru

module private Counters =

let labelNames = [| "facet"; "op"; "outcome"; "app"; "db"; "con"; "cat" |]
let private mkCounter (cfg : Prometheus.CounterConfiguration) name desc =
let h = Prometheus.Metrics.CreateCounter(name, desc, cfg)
fun (facet : string, op : string, outcome : string) app (db, con, cat) c ->
h.WithLabels(facet, op, outcome, app, db, con, cat).Inc(c)
let config = Prometheus.CounterConfiguration(LabelNames = labelNames)
let total stat desc =
let name = Impl.baseName (stat + "_total")
let desc = Impl.baseDesc desc
mkCounter config name desc
let eventsAndBytesPair stat desc =
let observeE = total (stat + "_events") (desc + "Events")
let observeB = total (stat + "_bytes") (desc + "Bytes")
fun ctx app (db, con, cat, e, b) ->
observeE ctx app (db, con, cat) e
match b with None -> () | Some b -> observeB ctx app (db, con, cat) b

module private Stats =

let opHistogram = Histograms.sAndRuPair "op" "Operation"
let roundtripHistogram = Histograms.sAndRuPair "roundtrip" "Fragment"
let opSummary = Summaries.sAndRuPair "op_summary" "Operation Summary"
let roundtripSummary = Summaries.sAndRuPair "roundtrip_summary" "Fragment Summary"
let payloadCounters = Counters.eventsAndBytesPair "payload" "Payload, "
let cacheCounter = Counters.total "cache" "Cache"

let observeLatencyAndCharge (facet, op) app (db, con, cat, s, ru) =
opHistogram (facet, op) app (db, con, cat, s, ru)
opSummary facet app (db, con, s, ru)
let observeLatencyAndChargeWithEventCounts (facet, op, outcome) app (db, con, cat, s, ru, count, bytes) =
observeLatencyAndCharge (facet, op) app (db, con, cat, s, ru)
payloadCounters (facet, op, outcome) app (db, con, cat, float count, if bytes = -1 then None else Some (float bytes))

let inline (|CatSRu|) ({ interval = i; ru = ru } : Equinox.CosmosStore.Core.Log.Measurement as m) =
let cat, _id = FsCodec.StreamName.splitCategoryAndId (FSharp.UMX.UMX.tag m.stream)
m.database, m.container, cat, i.Elapsed, ru
let observeRes (facet, _op as stat) app (CatSRu (db, con, cat, s, ru)) =
roundtripHistogram stat app (db, con, cat, s, ru)
roundtripSummary facet app (db, con, s, ru)
let observe_ stat app (CatSRu (db, con, cat, s, ru)) =
observeLatencyAndCharge stat app (db, con, cat, s, ru)
let observe (facet, op, outcome) app (CatSRu (db, con, cat, s, ru) as m) =
observeLatencyAndChargeWithEventCounts (facet, op, outcome) app (db, con, cat, s, ru, m.count, m.bytes)
let observeTip (facet, op, outcome, cacheOutcome) app (CatSRu (db, con, cat, s, ru) as m) =
observeLatencyAndChargeWithEventCounts (facet, op, outcome) app (db, con, cat, s, ru, m.count, m.bytes)
cacheCounter (facet, op, cacheOutcome) app (db, con, cat) 1.

open Equinox.CosmosStore.Core.Log

type LogSink(app) =
interface Serilog.Core.ILogEventSink with
member __.Emit logEvent = logEvent |> function
| MetricEvent cm -> cm |> function
| Op (Operation.Tip, m) -> Stats.observeTip ("query", "tip", "ok", "200") app m
| Op (Operation.Tip404, m) -> Stats.observeTip ("query", "tip", "ok", "404") app m
| Op (Operation.Tip302, m) -> Stats.observeTip ("query", "tip", "ok", "302") app m
| Op (Operation.Query, m) -> Stats.observe ("query", "query", "ok") app m
| QueryRes (_direction, m) -> Stats.observeRes ("query", "queryPage") app m
| Op (Operation.Write, m) -> Stats.observe ("transact", "sync", "ok") app m
| Op (Operation.Conflict, m) -> Stats.observe ("transact", "conflict", "conflict") app m
| Op (Operation.Resync, m) -> Stats.observe ("transact", "resync", "conflict") app m
| Op (Operation.Prune, m) -> Stats.observe_ ("prune", "pruneQuery") app m
| PruneRes ( m) -> Stats.observeRes ("prune", "pruneQueryPage") app m
| Op (Operation.Delete, m) -> Stats.observe ("prune", "delete", "ok") app m
| Op (Operation.Trim, m) -> Stats.observe ("prune", "trim", "ok") app m
| _ -> ()
Loading