Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add SAMPLING_STORAGE_TYPE environment variable #3573

Merged
merged 4 commits into from
Mar 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion plugin/sampling/strategystore/adaptive/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func (f *Factory) InitFromViper(v *viper.Viper, logger *zap.Logger) {
// Initialize implements strategystore.Factory
func (f *Factory) Initialize(metricsFactory metrics.Factory, ssFactory storage.SamplingStoreFactory, logger *zap.Logger) error {
if ssFactory == nil {
return errors.New("lock or SamplingStore nil. Please configure a backend that supports adaptive sampling")
return errors.New("sampling store factory is nil. Please configure a backend that supports adaptive sampling")
}

var err error
Expand Down
18 changes: 18 additions & 0 deletions plugin/storage/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ func NewFactory(config FactoryConfig) (*Factory, error) {
for _, storageType := range f.SpanWriterTypes {
uniqueTypes[storageType] = struct{}{}
}
// skip SamplingStorageType if it is empty. See CreateSamplingStoreFactory for details
if f.SamplingStorageType != "" {
uniqueTypes[f.SamplingStorageType] = struct{}{}
}
f.factories = make(map[string]storage.Factory)
for t := range uniqueTypes {
ff, err := f.getFactoryOfType(t)
Expand Down Expand Up @@ -162,6 +166,20 @@ func (f *Factory) CreateSpanWriter() (spanstore.Writer, error) {

// CreateSamplingStoreFactory creates a distributedlock.Lock and samplingstore.Store for use with adaptive sampling
func (f *Factory) CreateSamplingStoreFactory() (storage.SamplingStoreFactory, error) {
// if a sampling storage type was specified then use it, otherwise search all factories
// for compatibility
if f.SamplingStorageType != "" {
factory, ok := f.factories[f.SamplingStorageType]
if !ok {
return nil, fmt.Errorf("no %s backend registered for sampling store", f.SamplingStorageType)
}
ss, ok := factory.(storage.SamplingStoreFactory)
if !ok {
return nil, fmt.Errorf("storage factory of type %s does not support sampling store", f.SamplingStorageType)
}
return ss, nil
}

for _, factory := range f.factories {
ss, ok := factory.(storage.SamplingStoreFactory)
if ok {
Expand Down
6 changes: 6 additions & 0 deletions plugin/storage/factory_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,17 @@ const (
// DependencyStorageTypeEnvVar is the name of the env var that defines the type of backend used for dependencies storage.
DependencyStorageTypeEnvVar = "DEPENDENCY_STORAGE_TYPE"

// SamplingStorageTypeEnvVar is the name of the env var that defines the type of backend used for sampling data storage when using adaptive sampling.
SamplingStorageTypeEnvVar = "SAMPLING_STORAGE_TYPE"

spanStorageFlag = "--span-storage.type"
)

// FactoryConfig tells the Factory which types of backends it needs to create for different storage types.
type FactoryConfig struct {
SpanWriterTypes []string
SpanReaderType string
SamplingStorageType string
DependenciesStorageType string
DownsamplingRatio float64
DownsamplingHashSalt string
Expand Down Expand Up @@ -73,11 +77,13 @@ func FactoryConfigFromEnvAndCLI(args []string, log io.Writer) FactoryConfig {
if depStorageType == "" {
depStorageType = spanWriterTypes[0]
}
samplingStorageType := os.Getenv(SamplingStorageTypeEnvVar)
// TODO support explicit configuration for readers
return FactoryConfig{
SpanWriterTypes: spanWriterTypes,
SpanReaderType: spanWriterTypes[0],
DependenciesStorageType: depStorageType,
SamplingStorageType: samplingStorageType,
}
}

Expand Down
4 changes: 4 additions & 0 deletions plugin/storage/factory_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
func clearEnv() {
os.Setenv(SpanStorageTypeEnvVar, "")
os.Setenv(DependencyStorageTypeEnvVar, "")
os.Setenv(SamplingStorageTypeEnvVar, "")
}

func TestFactoryConfigFromEnv(t *testing.T) {
Expand All @@ -37,15 +38,18 @@ func TestFactoryConfigFromEnv(t *testing.T) {
assert.Equal(t, cassandraStorageType, f.SpanWriterTypes[0])
assert.Equal(t, cassandraStorageType, f.SpanReaderType)
assert.Equal(t, cassandraStorageType, f.DependenciesStorageType)
assert.Empty(t, f.SamplingStorageType)

os.Setenv(SpanStorageTypeEnvVar, elasticsearchStorageType)
os.Setenv(DependencyStorageTypeEnvVar, memoryStorageType)
os.Setenv(SamplingStorageTypeEnvVar, cassandraStorageType)

f = FactoryConfigFromEnvAndCLI(nil, &bytes.Buffer{})
assert.Equal(t, 1, len(f.SpanWriterTypes))
assert.Equal(t, elasticsearchStorageType, f.SpanWriterTypes[0])
assert.Equal(t, elasticsearchStorageType, f.SpanReaderType)
assert.Equal(t, memoryStorageType, f.DependenciesStorageType)
assert.Equal(t, cassandraStorageType, f.SamplingStorageType)

os.Setenv(SpanStorageTypeEnvVar, elasticsearchStorageType+","+kafkaStorageType)

Expand Down
21 changes: 20 additions & 1 deletion plugin/storage/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,20 +296,39 @@ func TestCreateError(t *testing.T) {
}
}

func CreateSamplingStoreFactory(t *testing.T) {
func TestCreateSamplingStoreFactory(t *testing.T) {
f, err := NewFactory(defaultCfg())
require.NoError(t, err)
assert.NotEmpty(t, f.factories)
assert.NotEmpty(t, f.factories[cassandraStorageType])

// if not specified sampling store is chosen from available factories
ssFactory, err := f.CreateSamplingStoreFactory()
assert.Equal(t, f.factories[cassandraStorageType], ssFactory)
assert.NoError(t, err)

// if not specified and there's no compatible factories then return nil
delete(f.factories, cassandraStorageType)
ssFactory, err = f.CreateSamplingStoreFactory()
assert.Nil(t, ssFactory)
assert.NoError(t, err)

// if an incompatible factory is specified return err
cfg := defaultCfg()
cfg.SamplingStorageType = "elasticsearch"
f, err = NewFactory(cfg)
require.NoError(t, err)
ssFactory, err = f.CreateSamplingStoreFactory()
assert.Nil(t, ssFactory)
assert.EqualError(t, err, "storage factory of type elasticsearch does not support sampling store")

// if a compatible factory is specified then return it
cfg.SamplingStorageType = "cassandra"
f, err = NewFactory(cfg)
require.NoError(t, err)
ssFactory, err = f.CreateSamplingStoreFactory()
assert.Equal(t, ssFactory, f.factories["cassandra"])
assert.NoError(t, err)
}

type configurable struct {
Expand Down