Skip to content

Commit

Permalink
p8s added
Browse files Browse the repository at this point in the history
  • Loading branch information
erayarslan committed Oct 24, 2022
1 parent 563dab1 commit 693b321
Show file tree
Hide file tree
Showing 8 changed files with 203 additions and 5 deletions.
2 changes: 2 additions & 0 deletions api.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ func (s *api) observerState(c *fiber.Ctx) error {
func NewApi(config Config, observer Observer) Api {
app := fiber.New(fiber.Config{DisableStartupMessage: true})

app.Use(NewMetricMiddleware(app, config, observer))

api := &api{
app: app,
config: config,
Expand Down
5 changes: 5 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ type ConfigAPI struct {
Port int `yaml:"port"`
}

type ConfigMetric struct {
Path string `yaml:"path"`
}

type Config struct {
Hosts []string `yaml:"hosts"`
Username string `yaml:"username"`
Expand All @@ -39,6 +43,7 @@ type Config struct {
ConnectTimeout time.Duration `yaml:"connectTimeout"`
Dcp ConfigDCP `yaml:"dcp"`
Api ConfigAPI `yaml:"api"`
Metric ConfigMetric `yaml:"metric"`
}

func Options(opts *config.Options) {
Expand Down
4 changes: 3 additions & 1 deletion dcp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@ dcp:
memberNumber: 1
totalMembers: 1
api:
port: 8080`
port: 8080
metric:
path: /metrics`

tmpFile, err := os.CreateTemp("", "*.yml")

Expand Down
4 changes: 3 additions & 1 deletion example/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,6 @@ dcp:
memberNumber: 1
totalMembers: 1
api:
port: 8080
port: 8080
metric:
path: /metrics
9 changes: 9 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/Trendyol/go-dcp-client
go 1.19

require (
github.com/ansrivas/fiberprometheus/v2 v2.4.1
github.com/couchbase/gocbcore/v10 v10.2.0
github.com/gofiber/fiber/v2 v2.39.0
github.com/gookit/config/v2 v2.1.7
Expand All @@ -15,14 +16,17 @@ require (
github.com/Microsoft/go-winio v0.5.2 // indirect
github.com/Microsoft/hcsshim v0.9.4 // indirect
github.com/andybalholm/brotli v1.0.4 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cenkalti/backoff/v4 v4.1.3 // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/containerd/cgroups v1.0.4 // indirect
github.com/containerd/containerd v1.6.8 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/docker/distribution v2.8.1+incompatible // indirect
github.com/docker/docker v20.10.17+incompatible // indirect
github.com/docker/go-connections v0.4.0 // indirect
github.com/docker/go-units v0.5.0 // indirect
github.com/gofiber/adaptor/v2 v2.1.25 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/protobuf v1.5.2 // indirect
Expand All @@ -35,6 +39,7 @@ require (
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.16 // indirect
github.com/mattn/go-runewidth v0.0.14 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/moby/sys/mount v0.3.3 // indirect
github.com/moby/sys/mountinfo v0.6.2 // indirect
Expand All @@ -45,6 +50,10 @@ require (
github.com/opencontainers/runc v1.1.3 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_golang v1.12.2 // indirect
github.com/prometheus/client_model v0.2.0 // indirect
github.com/prometheus/common v0.32.1 // indirect
github.com/prometheus/procfs v0.7.3 // indirect
github.com/rivo/uniseg v0.2.0 // indirect
github.com/sirupsen/logrus v1.8.1 // indirect
github.com/valyala/bytebufferpool v1.0.0 // indirect
Expand Down
16 changes: 15 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ github.com/alexflint/go-filemutex v0.0.0-20171022225611-72bdc8eae2ae/go.mod h1:C
github.com/alexflint/go-filemutex v1.1.0/go.mod h1:7P4iRhttt/nUvUOrYIhcpMzv2G6CY9UnI16Z+UJqRyk=
github.com/andybalholm/brotli v1.0.4 h1:V7DdXeJtZscaqfNuAdSRuRFzuiKlHSC/Zh3zl9qY3JY=
github.com/andybalholm/brotli v1.0.4/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig=
github.com/ansrivas/fiberprometheus/v2 v2.4.1 h1:V87ahTcU/I4c8tD6GKiuyyB0Z82dw2VVqLDgBtUcUgc=
github.com/ansrivas/fiberprometheus/v2 v2.4.1/go.mod h1:ATJ3l0sufyoZBz+TEohAyQJqbgUSQaPwCHNL/L67Wnw=
github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
github.com/apparentlymart/go-dump v0.0.0-20180507223929-23540a00eaa3/go.mod h1:oL81AME2rN47vu18xqj1S1jPIPuN7afo62yKTNn3XMM=
github.com/apparentlymart/go-textseg v1.0.0/go.mod h1:z96Txxhf3xSFMPmb5X/1W05FF/Nj9VFpLOpjS5yuumk=
Expand All @@ -119,6 +121,7 @@ github.com/benbjohnson/clock v1.0.3/go.mod h1:bGMdMPoPVvcYyt1gHDf4J2KE153Yf9BuiU
github.com/beorn7/perks v0.0.0-20160804104726-4c0e84591b9a/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs=
github.com/bitly/go-simplejson v0.5.0/go.mod h1:cXHtHw4XUPsvGaxgjIAn8PhEWG9NfngEKAMDJEczWVA=
Expand All @@ -140,7 +143,6 @@ github.com/cenkalti/backoff/v4 v4.1.3/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInq
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/certifi/gocertifi v0.0.0-20191021191039-0944d244cd40/go.mod h1:sGbDF6GwGcLpkNXPUTkMRoywsNa/ol15pxFe6ERfguA=
github.com/certifi/gocertifi v0.0.0-20200922220541-2c3bb06c6054/go.mod h1:sGbDF6GwGcLpkNXPUTkMRoywsNa/ol15pxFe6ERfguA=
github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko=
github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/cespare/xxhash/v2 v2.1.2 h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE=
Expand Down Expand Up @@ -420,6 +422,9 @@ github.com/godbus/dbus v0.0.0-20190422162347-ade71ed3457e/go.mod h1:bBOAhwG1umN6
github.com/godbus/dbus/v5 v5.0.3/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
github.com/godbus/dbus/v5 v5.0.6/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
github.com/gofiber/adaptor/v2 v2.1.25 h1:K2Ef2a7mUsCfL/oJdzbjyMXchGYuUUwIVXrYVm+P+xs=
github.com/gofiber/adaptor/v2 v2.1.25/go.mod h1:gOxtwMVqUStB5goAYtKd+hSvGupdd+aRIafZHPLNaUk=
github.com/gofiber/fiber/v2 v2.36.0/go.mod h1:tgCr+lierLwLoVHHO/jn3Niannv34WRkQETU8wiL9fQ=
github.com/gofiber/fiber/v2 v2.39.0 h1:uhWpYQ6EHN8J7FOPYbI2hrdBD/KNZBC5CjbuOd4QUt4=
github.com/gofiber/fiber/v2 v2.39.0/go.mod h1:Cmuu+elPYGqlvQvdKyjtYsjGMi69PDp8a1AY2I5B2gM=
github.com/gogo/googleapis v1.2.0/go.mod h1:Njal3psf3qN6dwBtQfUmBZh2ybovJ0tlu3o/AC7HYjU=
Expand Down Expand Up @@ -650,6 +655,7 @@ github.com/mattn/go-shellwords v1.0.3/go.mod h1:3xCvwCdWdlDJUrvuMn7Wuy9eWs4pE8vq
github.com/mattn/go-shellwords v1.0.6/go.mod h1:3xCvwCdWdlDJUrvuMn7Wuy9eWs4pE8vqg+NOMyg4B2o=
github.com/mattn/go-shellwords v1.0.12/go.mod h1:EZzvwXDESEeg03EKmM+RmDnNOPKG4lLtQsUlTZDWQ8Y=
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369 h1:I0XW9+e1XWDxdcEniV4rQAIOPUGDq67JSCiRCgGCZLI=
github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4=
github.com/maxbrunsfeld/counterfeiter/v6 v6.2.2/go.mod h1:eD9eIE7cdwcMi9rYluz88Jz2VyhSmden33/aXg4oVIY=
github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg=
Expand Down Expand Up @@ -787,10 +793,13 @@ github.com/prometheus/client_golang v1.1.0/go.mod h1:I1FGZT9+L76gKKOs5djB6ezCbFQ
github.com/prometheus/client_golang v1.7.1/go.mod h1:PY5Wy2awLA44sXw4AOSfFBetzPP4j5+D6mVACh+pe2M=
github.com/prometheus/client_golang v1.11.0/go.mod h1:Z6t4BnS23TR94PD6BsDNk8yVqroYurpAkEiz0P2BEV0=
github.com/prometheus/client_golang v1.11.1/go.mod h1:Z6t4BnS23TR94PD6BsDNk8yVqroYurpAkEiz0P2BEV0=
github.com/prometheus/client_golang v1.12.2 h1:51L9cDoUHVrXx4zWYlcLQIZ+d+VXHgqnYKkIuq4g/34=
github.com/prometheus/client_golang v1.12.2/go.mod h1:3Z9XVyYiZYEO+YQWt3RD2R3jrbd179Rt297l4aS6nDY=
github.com/prometheus/client_model v0.0.0-20171117100541-99fa1f4be8e5/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo=
github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo=
github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/prometheus/client_model v0.2.0 h1:uq5h0d+GuxiXLJLNABMgp2qUWDPiLvgCzz2dUR+/W/M=
github.com/prometheus/client_model v0.2.0/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/prometheus/common v0.0.0-20180110214958-89604d197083/go.mod h1:daVV7qP5qjZbuso7PdcryaAu0sAZbrN9i7WWcTMWvro=
github.com/prometheus/common v0.0.0-20181113130724-41aa239b4cce/go.mod h1:daVV7qP5qjZbuso7PdcryaAu0sAZbrN9i7WWcTMWvro=
Expand All @@ -800,6 +809,8 @@ github.com/prometheus/common v0.6.0/go.mod h1:eBmuwkDJBwy6iBfxCBob6t6dR6ENT/y+J+
github.com/prometheus/common v0.10.0/go.mod h1:Tlit/dnDKsSWFlCLTWaA1cyBgKHSMdTB80sz/V91rCo=
github.com/prometheus/common v0.26.0/go.mod h1:M7rCNAaPfAosfx8veZJCuw84e35h3Cfd9VFqTh1DIvc=
github.com/prometheus/common v0.30.0/go.mod h1:vu+V0TpY+O6vW9J44gczi3Ap/oXXR10b+M/gUGO4Hls=
github.com/prometheus/common v0.32.1 h1:hWIdL3N2HoUx3B8j3YN9mWor0qhY/NlEKZEaXxuIRh4=
github.com/prometheus/common v0.32.1/go.mod h1:vu+V0TpY+O6vW9J44gczi3Ap/oXXR10b+M/gUGO4Hls=
github.com/prometheus/procfs v0.0.0-20180125133057-cb4147076ac7/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk=
github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk=
github.com/prometheus/procfs v0.0.0-20190507164030-5867b95ac084/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA=
Expand All @@ -811,6 +822,7 @@ github.com/prometheus/procfs v0.0.8/go.mod h1:7Qr8sr6344vo1JqZ6HhLceV9o3AJ1Ff+Gx
github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU=
github.com/prometheus/procfs v0.2.0/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU=
github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA=
github.com/prometheus/procfs v0.7.3 h1:4jVXhlkAyzOScmCkXBTOLRLTz8EeU+eyjrwB/EPq0VU=
github.com/prometheus/procfs v0.7.3/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA=
github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU=
github.com/rivo/uniseg v0.2.0 h1:S1pD9weZBuJdFmowNwbpi7BJ8TNftyUImj/0WQi72jY=
Expand Down Expand Up @@ -899,6 +911,7 @@ github.com/urfave/cli v1.22.1/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtX
github.com/urfave/cli v1.22.2/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0=
github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw=
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
github.com/valyala/fasthttp v1.38.0/go.mod h1:t/G+3rLek+CyY9bnIE+YlMRddxVAAGjhxndDB4i4C0I=
github.com/valyala/fasthttp v1.40.0 h1:CRq/00MfruPGFLTQKY8b+8SfdK60TxNztjRMnH0t1Yc=
github.com/valyala/fasthttp v1.40.0/go.mod h1:t/G+3rLek+CyY9bnIE+YlMRddxVAAGjhxndDB4i4C0I=
github.com/valyala/tcplisten v1.0.0 h1:rBHj/Xf+E1tRGZyWIWwJDiRY0zc1Js+CV5DqwacVSA8=
Expand Down Expand Up @@ -1234,6 +1247,7 @@ golang.org/x/sys v0.0.0-20211019181941-9d821ace8654/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20211025201205-69cdffdb9359/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20211116061358-0a5406a5449c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220114195835-da31bd327af9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220227234510-4e6760a101f9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220405210540-1e041c57c461/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220412211240-33da011f77ad/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
Expand Down
124 changes: 124 additions & 0 deletions metric.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package godcpclient

import (
"github.com/ansrivas/fiberprometheus/v2"
"github.com/gofiber/fiber/v2"
"github.com/prometheus/client_golang/prometheus"
"log"
"strconv"
)

type MetricCollector struct {
observer Observer

mutation *prometheus.Desc
deletion *prometheus.Desc
expiration *prometheus.Desc

currentSeqNo *prometheus.Desc
startSeqNo *prometheus.Desc
endSeqNo *prometheus.Desc
}

func (s *MetricCollector) Describe(ch chan<- *prometheus.Desc) {
prometheus.DescribeByCollect(s, ch)
}

func (s *MetricCollector) Collect(ch chan<- prometheus.Metric) {
metric := s.observer.GetMetric()

ch <- prometheus.MustNewConstMetric(
s.mutation,
prometheus.CounterValue,
metric.TotalMutations,
)

ch <- prometheus.MustNewConstMetric(
s.deletion,
prometheus.CounterValue,
metric.TotalDeletions,
)

ch <- prometheus.MustNewConstMetric(
s.expiration,
prometheus.CounterValue,
metric.TotalExpirations,
)

for vbId, state := range s.observer.GetState() {
ch <- prometheus.MustNewConstMetric(
s.currentSeqNo,
prometheus.CounterValue,
float64(state.SeqNo),
strconv.Itoa(int(vbId)),
)

ch <- prometheus.MustNewConstMetric(
s.startSeqNo,
prometheus.CounterValue,
float64(state.StartSeqNo),
strconv.Itoa(int(vbId)),
)

ch <- prometheus.MustNewConstMetric(
s.endSeqNo,
prometheus.CounterValue,
float64(state.EndSeqNo),
strconv.Itoa(int(vbId)),
)
}
}

func NewMetricMiddleware(app *fiber.App, config Config, observer Observer) func(ctx *fiber.Ctx) error {
err := prometheus.DefaultRegisterer.Register(&MetricCollector{
observer: observer,

mutation: prometheus.NewDesc(
prometheus.BuildFQName(Name, "mutation", "total"),
"Mutation count",
nil,
nil,
),
deletion: prometheus.NewDesc(
prometheus.BuildFQName(Name, "deletion", "total"),
"Deletion count",
nil,
nil,
),
expiration: prometheus.NewDesc(
prometheus.BuildFQName(Name, "expiration", "total"),
"Expiration count",
nil,
nil,
),
currentSeqNo: prometheus.NewDesc(
prometheus.BuildFQName(Name, "seq_no", "current"),
"Current seq no",
[]string{"vbId"},
nil,
),
startSeqNo: prometheus.NewDesc(
prometheus.BuildFQName(Name, "start_seq_no", "current"),
"Start seq no",
[]string{"vbId"},
nil,
),
endSeqNo: prometheus.NewDesc(
prometheus.BuildFQName(Name, "end_seq_no", "current"),
"End seq no",
[]string{"vbId"},
nil,
),
})

if err != nil {
panic(err)
}

fiberPrometheus := fiberprometheus.New(config.UserAgent)
fiberPrometheus.RegisterAt(app, config.Metric.Path)

log.Printf("Metric middleware registered on path %s", config.Metric.Path)

return fiberPrometheus.Middleware
}
44 changes: 42 additions & 2 deletions observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,13 @@ type Observer interface {
SeqNoAdvanced(advanced DcpSeqNoAdvanced)
GetState() map[uint16]*ObserverState
SetState(map[uint16]*ObserverState)
GetMetric() ObserverMetric
}

type ObserverMetric struct {
TotalMutations float64
TotalDeletions float64
TotalExpirations float64
}

type ObserverState struct {
Expand All @@ -32,6 +39,9 @@ type observer struct {
stateLock sync.Mutex
state map[uint16]*ObserverState

metricLock sync.Mutex
metric ObserverMetric

listener Listener

vbIds []uint16
Expand Down Expand Up @@ -64,6 +74,12 @@ func (so *observer) Mutation(mutation DcpMutation) {
if so.listener != nil {
so.listener(mutation, nil)
}

so.metricLock.Lock()

so.metric.TotalMutations++

so.metricLock.Unlock()
}

func (so *observer) Deletion(deletion DcpDeletion) {
Expand All @@ -76,6 +92,12 @@ func (so *observer) Deletion(deletion DcpDeletion) {
if so.listener != nil {
so.listener(deletion, nil)
}

so.metricLock.Lock()

so.metric.TotalDeletions++

so.metricLock.Unlock()
}

func (so *observer) Expiration(expiration DcpExpiration) {
Expand All @@ -88,6 +110,12 @@ func (so *observer) Expiration(expiration DcpExpiration) {
if so.listener != nil {
so.listener(expiration, nil)
}

so.metricLock.Lock()

so.metric.TotalExpirations++

so.metricLock.Unlock()
}

func (so *observer) End(dcpEnd DcpStreamEnd, err error) {
Expand Down Expand Up @@ -164,11 +192,23 @@ func (so *observer) SetState(state map[uint16]*ObserverState) {
so.state = state
}

func (so *observer) GetMetric() ObserverMetric {
so.metricLock.Lock()
defer so.metricLock.Unlock()

return so.metric
}

func NewObserver(vbIds []uint16, listener Listener) Observer {
return &observer{
stateLock: sync.Mutex{},
state: map[uint16]*ObserverState{},
listener: listener,
vbIds: vbIds,

metricLock: sync.Mutex{},
metric: ObserverMetric{},

listener: listener,

vbIds: vbIds,
}
}

0 comments on commit 693b321

Please sign in to comment.