Skip to content

Commit

Permalink
fix caching leak issues
Browse files Browse the repository at this point in the history
  • Loading branch information
roycald245 committed Aug 15, 2024
1 parent 91ca2cf commit cdb0efd
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 25 deletions.
3 changes: 1 addition & 2 deletions processor/coralogixprocessor/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ go 1.22.0
require (
github.com/auxten/postgresql-parser v1.0.1
github.com/cespare/xxhash/v2 v2.3.0
github.com/dgraph-io/ristretto v0.1.1
github.com/hashicorp/golang-lru/v2 v2.0.7
github.com/stretchr/testify v1.9.0
github.com/xwb1989/sqlparser v0.0.0-20180606152119-120387863bf2
go.opentelemetry.io/collector/component v0.107.0
Expand Down Expand Up @@ -34,7 +34,6 @@ require (
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-viper/mapstructure/v2 v2.0.0 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/glog v1.2.1 // indirect
github.com/golang/protobuf v1.5.4 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/grpc-ecosystem/grpc-gateway v1.16.0 // indirect
Expand Down
9 changes: 2 additions & 7 deletions processor/coralogixprocessor/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

32 changes: 32 additions & 0 deletions processor/coralogixprocessor/internal/cache/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package cache // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/coralogixprocessor/internal/cache

import lru "github.com/hashicorp/golang-lru/v2"

type lruBlueprintCache[V any] struct {
cache *lru.Cache[uint64, V]
}

var _ Cache[any] = (*lruBlueprintCache[any])(nil)

func (c *lruBlueprintCache[V]) Get(hash uint64) (V, bool) {
return c.cache.Get(hash)
}

func (c *lruBlueprintCache[V]) Add(hash uint64, v V) {
c.cache.Add(hash, v)
}

func (c *lruBlueprintCache[V]) Delete(hash uint64) {
c.cache.Remove(hash)
}

func NewLRUBlueprintCache[V any](size int) (Cache[V], error) {
c, err := lru.New[uint64, V](size)
if err != nil {
return nil, err
}
return &lruBlueprintCache[V]{cache: c}, nil
}
15 changes: 15 additions & 0 deletions processor/coralogixprocessor/internal/cache/types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package cache

// Cache is a cache using a uint64 as the key and any generic type as the value.
type Cache[V any] interface {
// Get returns the value for the given blueprint hash, and a boolean to indicate whether the hash was found.
// If the hash is not present, the zero value is returned.
Get(hash uint64) (V, bool)
// Add adds the value for a given id
Add(hash uint64, v V)
// Delete deletes the value for the given id
Delete(hash uint64)
}
24 changes: 8 additions & 16 deletions processor/coralogixprocessor/span.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,21 @@ package coralogixprocessor // import "github.com/open-telemetry/opentelemetry-co

import (
"context"
"regexp"
"strconv"
"strings"

postgresqlparser "github.com/auxten/postgresql-parser/pkg/sql/parser"
"github.com/auxten/postgresql-parser/pkg/sql/sem/tree"
postgresqlwalk "github.com/auxten/postgresql-parser/pkg/walk"
"github.com/cespare/xxhash/v2"
"github.com/dgraph-io/ristretto"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/coralogixprocessor/internal/cache"
mysqlparser "github.com/xwb1989/sqlparser"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.opentelemetry.io/collector/processor"
"go.opentelemetry.io/collector/processor/processorhelper"
"go.uber.org/zap"
"regexp"
"strconv"
"strings"
)

var DBTypes = []string{MySQL, PostgreSQL}
Expand All @@ -34,7 +33,7 @@ type coralogixProcessor struct {
config *Config
component.StartFunc
component.ShutdownFunc
cache *ristretto.Cache
cache cache.Cache[string]
logger *zap.Logger
}

Expand Down Expand Up @@ -184,7 +183,7 @@ func (sp *coralogixProcessor) processTraces(_ context.Context, td ptrace.Traces)
_, found := sp.cache.Get(hash)
if !found {
attributes.PutInt("sampling.priority", 100)
sp.cache.Set(hash, "", 8)
sp.cache.Add(hash, "")
}
}
attributes.PutStr("db.statement.blueprint", blueprintStr)
Expand All @@ -202,17 +201,10 @@ func newCoralogixProcessor(ctx context.Context, set processor.Settings, cfg *Con

if cfg.databaseBlueprintsConfig.sampling.enabled {

var cacheSize = fromConfigOrDefault(cfg.databaseBlueprintsConfig.sampling.maxCacheSizeMib*1024*1024, 1<<30) // Default to 1GB
// 8 bytes per entry, 1GB / 8 Bytes = 134,217,728 entries by default
// Num Counters recommended to be 10x the number of keys to track frequency of
var numCounters = fromConfigOrDefault(cfg.databaseBlueprintsConfig.sampling.maxCacheSizeMib/8, 1.5e6) // number of keys to track frequency of (1.5M).

var cacheSize = fromConfigOrDefault(cfg.databaseBlueprintsConfig.sampling.maxCacheSizeMib*1024*1024, 1<<30) / 8 // Default to 1GB
var err error
sp.cache, err = ristretto.NewCache(&ristretto.Config{
NumCounters: numCounters, // number of keys to track frequency of.
MaxCost: cacheSize, // maximum cost of cache.
BufferItems: 64, // number of keys per Get buffer.
})
sp.cache, err = cache.NewLRUBlueprintCache[string](int(cacheSize))
if err != nil {
return nil, err
}
Expand Down

0 comments on commit cdb0efd

Please sign in to comment.