diff --git a/flake.nix b/flake.nix index 5da444a..74d95e6 100644 --- a/flake.nix +++ b/flake.nix @@ -35,7 +35,7 @@ default = pkgs.buildGoModule { pname = "prisme"; version = "0.11.0"; - vendorHash = "sha256-ZztZLzkKDhka9kZRSThccP+hcQH9qD5xY5rIRhm+H3A="; + vendorHash = "sha256-ozHf3CAdR9rwhpckEz8aA2wDXFOfmvM3IUBK1Ez3fJs="; src = ./.; # Skip go test. diff --git a/go.mod b/go.mod index 506995f..e396834 100644 --- a/go.mod +++ b/go.mod @@ -10,7 +10,7 @@ require ( github.com/golang-migrate/migrate/v4 v4.17.0 github.com/google/uuid v1.5.0 github.com/google/wire v0.5.0 - github.com/negrel/ringo v0.3.0 + github.com/negrel/ringo v0.5.0 github.com/oschwald/maxminddb-golang v1.12.0 github.com/rs/zerolog v1.31.0 github.com/stretchr/testify v1.8.4 diff --git a/go.sum b/go.sum index 47a6169..ed2b915 100644 --- a/go.sum +++ b/go.sum @@ -83,8 +83,8 @@ github.com/moby/term v0.5.0/go.mod h1:8FzsFHVUBGZdbDsJw/ot+X+d5HLUbvklYLJ9uGfcI3 github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe/go.mod h1:wL8QJuTMNUDYhXwkmfOly8iTdp5TEcJFWZD2D7SIkUc= github.com/morikuni/aec v1.0.0 h1:nP9CBfwrvYnBRgY6qfDQkygYDmYwOilePFkwzv4dU8A= github.com/morikuni/aec v1.0.0/go.mod h1:BbKIizmSmc5MMPqRYbxO4ZU0S0+P200+tUnFx7PXmsc= -github.com/negrel/ringo v0.3.0 h1:LtSdWHCOR1AjIE5welBJb96rMtx0kifCHw3RuaWt2e4= -github.com/negrel/ringo v0.3.0/go.mod h1:cDSDvU1fY2PcKCOj0OB53CBqSt1m1uKWUqcIJlaCvL4= +github.com/negrel/ringo v0.5.0 h1:o29JXp+aWN5b8o+2U40JhXUXx7FVHnQ3O66h9E8RKgw= +github.com/negrel/ringo v0.5.0/go.mod h1:cDSDvU1fY2PcKCOj0OB53CBqSt1m1uKWUqcIJlaCvL4= github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U= github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM= github.com/opencontainers/image-spec v1.1.0-rc5 h1:Ygwkfw9bpDvs+c9E34SdgGOj41dX/cbdlwvlWt0pnFI= diff --git a/pkg/services/eventstore/clickhouse_service.go b/pkg/services/eventstore/clickhouse_service.go index a7310df..62550be 100644 --- a/pkg/services/eventstore/clickhouse_service.go +++ b/pkg/services/eventstore/clickhouse_service.go @@ -15,7 +15,6 @@ import ( // ProvideClickhouseService is a wire provider for a clickhouse based event // storage service. func ProvideClickhouseService(ch clickhouse.Ch, logger zerolog.Logger) Service { - service := &ClickhouseService{ logger: logger, conn: ch.Conn, @@ -23,10 +22,20 @@ func ProvideClickhouseService(ch clickhouse.Ch, logger zerolog.Logger) Service { maxBatchTimeout: config.ParseDurationEnvOrDefault("PRISME_EVENTSTORE_MAX_BATCH_TIMEOUT", 1*time.Minute), } service.pageViewRingBuf = ringo.NewWaiter( - ringo.NewManyToOne[*event.PageView](int(service.maxBatchSize * 10)), + ringo.NewManyToOne( + int(service.maxBatchSize*10), + ringo.WithManyToOneCollisionHandler[*event.PageView](ringo.CollisionHandlerFunc(func(_ any) { + service.logger.Warn().Msg("pageview events ring buffer collision detected, consider increasing PRISME_EVENTSTORE_MAX_BATCH_SIZE") + })), + ), ) service.customEventRingBuf = ringo.NewWaiter( - ringo.NewManyToOne[*event.Custom](int(service.maxBatchSize * 10)), + ringo.NewManyToOne( + int(service.maxBatchSize*10), + ringo.WithManyToOneCollisionHandler[*event.Custom](ringo.CollisionHandlerFunc(func(_ any) { + service.logger.Warn().Msg("custom events ring buffer collision detected, consider increasing PRISME_EVENTSTORE_MAX_BATCH_SIZE") + })), + ), ) logger = logger.With().