Skip to content

Commit

Permalink
feat: add list of providers to the geoip processor
Browse files Browse the repository at this point in the history
  • Loading branch information
rogercoll committed May 29, 2024
1 parent 49c277b commit 4450aa8
Show file tree
Hide file tree
Showing 3 changed files with 112 additions and 22 deletions.
48 changes: 45 additions & 3 deletions processor/geoipprocessor/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package geoipprocessor // import "github.com/open-telemetry/opentelemetry-collec

import (
"context"
"fmt"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
Expand Down Expand Up @@ -38,14 +39,55 @@ func createDefaultConfig() component.Config {
return &Config{}
}

func createGeoIPProviders(
ctx context.Context,
set processor.CreateSettings,
config *Config,
factories map[string]provider.GeoIPProviderFactory,
) ([]provider.GeoIPProvider, error) {
providers := make([]provider.GeoIPProvider, 0, len(config.Providers))

for key, cfg := range config.Providers {
factory := factories[key]
if factory == nil {
return nil, fmt.Errorf("geoIP provider factory not found for key: %q", key)
}

provider, err := factory.CreateGeoIPProvider(ctx, set, cfg)
if err != nil {
return nil, fmt.Errorf("failed to create provider for key %q: %w", key, err)
}

providers = append(providers, provider)

}

return providers, nil
}

func createMetricsProcessor(ctx context.Context, set processor.CreateSettings, cfg component.Config, nextConsumer consumer.Metrics) (processor.Metrics, error) {
return processorhelper.NewMetricsProcessor(ctx, set, cfg, nextConsumer, newGeoIPProcessor().processMetrics, processorhelper.WithCapabilities(processorCapabilities))
geoCfg := cfg.(*Config)
providers, err := createGeoIPProviders(ctx, set, geoCfg, providerFactories)
if err != nil {
return nil, err
}
return processorhelper.NewMetricsProcessor(ctx, set, cfg, nextConsumer, newGeoIPProcessor(providers).processMetrics, processorhelper.WithCapabilities(processorCapabilities))
}

func createTracesProcessor(ctx context.Context, set processor.CreateSettings, cfg component.Config, nextConsumer consumer.Traces) (processor.Traces, error) {
return processorhelper.NewTracesProcessor(ctx, set, cfg, nextConsumer, newGeoIPProcessor().processTraces, processorhelper.WithCapabilities(processorCapabilities))
geoCfg := cfg.(*Config)
providers, err := createGeoIPProviders(ctx, set, geoCfg, providerFactories)
if err != nil {
return nil, err
}
return processorhelper.NewTracesProcessor(ctx, set, cfg, nextConsumer, newGeoIPProcessor(providers).processTraces, processorhelper.WithCapabilities(processorCapabilities))
}

func createLogsProcessor(ctx context.Context, set processor.CreateSettings, cfg component.Config, nextConsumer consumer.Logs) (processor.Logs, error) {
return processorhelper.NewLogsProcessor(ctx, set, cfg, nextConsumer, newGeoIPProcessor().processLogs, processorhelper.WithCapabilities(processorCapabilities))
geoCfg := cfg.(*Config)
providers, err := createGeoIPProviders(ctx, set, geoCfg, providerFactories)
if err != nil {
return nil, err
}
return processorhelper.NewLogsProcessor(ctx, set, cfg, nextConsumer, newGeoIPProcessor(providers).processLogs, processorhelper.WithCapabilities(processorCapabilities))
}
14 changes: 11 additions & 3 deletions processor/geoipprocessor/geoip_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,23 @@ package geoipprocessor // import "github.com/open-telemetry/opentelemetry-collec
import (
"context"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/geoipprocessor/internal/provider"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"
)

type geoIPProcessor struct{}
// GeoProviders will be used by the Processor retrieve geographical metadata given an IP address.
type GeoProviders = []provider.GeoIPProvider

func newGeoIPProcessor() *geoIPProcessor {
return &geoIPProcessor{}
type geoIPProcessor struct {
providers GeoProviders
}

func newGeoIPProcessor(providers GeoProviders) *geoIPProcessor {
return &geoIPProcessor{
providers,
}
}

func (g *geoIPProcessor) processMetrics(_ context.Context, ms pmetric.Metrics) (pmetric.Metrics, error) {
Expand Down
72 changes: 56 additions & 16 deletions processor/geoipprocessor/provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/otelcol/otelcoltest"
"go.opentelemetry.io/collector/processor"
"go.opentelemetry.io/collector/processor/processortest"
"go.opentelemetry.io/otel/attribute"
)

Expand Down Expand Up @@ -41,30 +42,69 @@ func (fm *ProviderFactoryMock) CreateGeoIPProvider(ctx context.Context, settings
return fm.CreateGeoIPProviderF(ctx, settings, cfg)
}

var baseMockProvider = ProviderMock{
LocationF: func(context.Context, net.IP) (attribute.Set, error) {
return attribute.Set{}, nil
},
}

var baseMockFactory = ProviderFactoryMock{
CreateDefaultConfigF: func() provider.Config {
type emptyConfig struct{}
return &emptyConfig{}
},
CreateGeoIPProviderF: func(ctx context.Context, settings processor.CreateSettings, cfg provider.Config) (provider.GeoIPProvider, error) {
return &baseMockProvider, nil
},
}

func TestLoadConfig_MockProvider(t *testing.T) {
mockProvider := ProviderMock{
LocationF: func(context.Context, net.IP) (attribute.Set, error) {
return attribute.Set{}, nil
},
}
mockFactory := ProviderFactoryMock{
CreateDefaultConfigF: func() provider.Config {
type SampleConfig struct {
Database string `mapstructure:"database"`
}
return &SampleConfig{}
},
CreateGeoIPProviderF: func(ctx context.Context, settings processor.CreateSettings, cfg provider.Config) (provider.GeoIPProvider, error) {
return &mockProvider, nil
},
baseMockFactory.CreateDefaultConfigF = func() provider.Config {
type SampleConfig struct {
Database string `mapstructure:"database"`
}
return &SampleConfig{}
}

factories, err := otelcoltest.NopFactories()
require.NoError(t, err)

providerFactories["mock"] = &mockFactory
providerFactories["mock"] = &baseMockFactory
factory := NewFactory()
factories.Processors[metadata.Type] = factory
_, err = otelcoltest.LoadConfigAndValidate(filepath.Join("testdata", "config-mockProvider.yaml"), factories)
assert.NoError(t, err)
}

func TestGeoProviderLocation(t *testing.T) {
exampleIP := net.IPv4(240, 0, 0, 0)
baseMockProvider.LocationF = func(ctx context.Context, ip net.IP) (attribute.Set, error) {
// dummy provider that only returns data if the IP is 240.0.0.0
if ip.Equal(exampleIP) {
return attribute.NewSet(
attribute.String("geo.city_name", "Barcelona"),
attribute.String("geo.country_name", "Spain"),
), nil
}
return attribute.NewSet(), nil
}
factory := NewFactory()
config := factory.CreateDefaultConfig()
geoCfg := config.(*Config)
geoCfg.Providers = make(map[string]provider.Config, 1)
geoCfg.Providers["mock"] = &baseMockFactory

providers, err := createGeoIPProviders(context.Background(), processortest.NewNopCreateSettings(), geoCfg, providerFactories)
if err != nil {
t.Fatal(err)
}

processor := newGeoIPProcessor(providers)
assert.Equal(t, 1, len(processor.providers))

attributes, err := processor.providers[0].Location(context.Background(), exampleIP)
assert.NoError(t, err)
value, has := attributes.Value("geo.city_name")
require.True(t, has)
require.Equal(t, "Barcelona", value.AsString())
}

0 comments on commit 4450aa8

Please sign in to comment.