Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow storing tags as object fields in ES - better kibana support #1018

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 0 additions & 39 deletions model/converter/json/domain_span_compare_test.go

This file was deleted.

1 change: 0 additions & 1 deletion model/converter/thrift/jaeger/to_domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ func ToDomainSpan(jSpan *jaeger.Span, jProcess *jaeger.Process) *model.Span {
return toDomain{}.ToDomainSpan(jSpan, jProcess)
}

// toDomain is a private struct that namespaces some conversion functions. It has access to its own private utility functions
type toDomain struct{}

func (td toDomain) ToDomain(jSpans []*jaeger.Span, jProcess *jaeger.Process) []*model.Span {
Expand Down
22 changes: 22 additions & 0 deletions pkg/es/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ type Configuration struct {
BulkActions int
BulkFlushInterval time.Duration
IndexPrefix string
TagsFilePath string
AllTagsAsFields bool
TagDotReplacement string
}

// ClientBuilder creates new es.Client
Expand All @@ -51,6 +54,9 @@ type ClientBuilder interface {
GetNumReplicas() int64
GetMaxSpanAge() time.Duration
GetIndexPrefix() string
GetTagsFilePath() string
GetAllTagsAsFields() bool
GetTagDotReplacement() string
}

// NewClient creates a new ElasticSearch client
Expand Down Expand Up @@ -165,6 +171,22 @@ func (c *Configuration) GetIndexPrefix() string {
return c.IndexPrefix
}

// GetTagsFilePath returns a path to file containing tag keys
func (c *Configuration) GetTagsFilePath() string {
return c.TagsFilePath
}

// GetAllTagsAsFields returns true if all tags should be stored as object fields
func (c *Configuration) GetAllTagsAsFields() bool {
return c.AllTagsAsFields
}

// GetTagDotReplacement returns character is used to replace dots in tag keys, when
// the tag is stored as object field.
func (c *Configuration) GetTagDotReplacement() string {
return c.TagDotReplacement
}

// GetConfigs wraps the configs to feed to the ElasticSearch client init
func (c *Configuration) GetConfigs() []elastic.ClientOptionFunc {
options := make([]elastic.ClientOptionFunc, 3)
Expand Down
50 changes: 48 additions & 2 deletions plugin/storage/es/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,11 @@
package es

import (
"bufio"
"flag"
"os"
"path/filepath"
"strings"

"github.com/spf13/viper"
"github.com/uber/jaeger-lib/metrics"
Expand Down Expand Up @@ -74,16 +78,58 @@ func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger)
// CreateSpanReader implements storage.Factory
func (f *Factory) CreateSpanReader() (spanstore.Reader, error) {
cfg := f.primaryConfig
return esSpanStore.NewSpanReader(f.primaryClient, f.logger, cfg.GetMaxSpanAge(), f.metricsFactory, cfg.GetIndexPrefix()), nil
return esSpanStore.NewSpanReader(esSpanStore.SpanReaderParams{
Client: f.primaryClient,
Logger: f.logger,
MetricsFactory: f.metricsFactory,
MaxLookback: cfg.GetMaxSpanAge(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pavolloffay why did we rename it? It's confusing that config params do not match the storage params.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we renamed anything in this PR

The flag name is --es.max-span-age and configuration cfg.GetMaxSpanAge(). MaxLookback is only used in reader maybe also writer.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should change everything to maxSpanAge

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can change it, there are also other refactoring opportunities

IndexPrefix: cfg.GetIndexPrefix(),
TagDotReplacement: cfg.GetTagDotReplacement(),
}), nil
}

// CreateSpanWriter implements storage.Factory
func (f *Factory) CreateSpanWriter() (spanstore.Writer, error) {
cfg := f.primaryConfig
return esSpanStore.NewSpanWriter(f.primaryClient, f.logger, f.metricsFactory, cfg.GetNumShards(), cfg.GetNumReplicas(), cfg.GetIndexPrefix()), nil
var tags []string
if cfg.GetTagsFilePath() != "" {
var err error
if tags, err = loadTagsFromFile(cfg.GetTagsFilePath()); err != nil {
f.logger.Error("Could not open file with tags", zap.Error(err))
return nil, err
}
}
return esSpanStore.NewSpanWriter(esSpanStore.SpanWriterParams{Client: f.primaryClient,
Logger: f.logger,
MetricsFactory: f.metricsFactory,
NumShards: f.primaryConfig.GetNumShards(),
NumReplicas: f.primaryConfig.GetNumReplicas(),
IndexPrefix: f.primaryConfig.GetIndexPrefix(),
AllTagsAsFields: f.primaryConfig.GetAllTagsAsFields(),
TagKeysAsFields: tags,
TagDotReplacement: f.primaryConfig.GetTagDotReplacement(),
}), nil
}

// CreateDependencyReader implements storage.Factory
func (f *Factory) CreateDependencyReader() (dependencystore.Reader, error) {
return esDepStore.NewDependencyStore(f.primaryClient, f.logger, f.primaryConfig.GetIndexPrefix()), nil
}

func loadTagsFromFile(filePath string) ([]string, error) {
file, err := os.Open(filepath.Clean(filePath))
if err != nil {
return nil, err
}
defer file.Close()

scanner := bufio.NewScanner(file)
var tags []string
for scanner.Scan() {
line := scanner.Text()
if tag := strings.TrimSpace(line); tag != "" {
tags = append(tags, tag)
}
}
return tags, nil
}
43 changes: 43 additions & 0 deletions plugin/storage/es/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/uber/jaeger-lib/metrics"
"go.uber.org/zap"

Expand Down Expand Up @@ -66,3 +67,45 @@ func TestElasticsearchFactory(t *testing.T) {
_, err = f.CreateDependencyReader()
assert.NoError(t, err)
}

func TestElasticsearchTagsFileDoNotExist(t *testing.T) {
f := NewFactory()
mockConf := &mockClientBuilder{}
mockConf.TagsFilePath = "fixtures/tags_foo.txt"
f.primaryConfig = mockConf
assert.NoError(t, f.Initialize(metrics.NullFactory, zap.NewNop()))
r, err := f.CreateSpanWriter()
require.Error(t, err)
assert.Nil(t, r)
}

func TestLoadTagsFromFile(t *testing.T) {
tests := []struct {
path string
tags []string
error bool
}{
{
path: "fixtures/do_not_exists.txt",
error: true,
},
{
path: "fixtures/tags_01.txt",
tags: []string{"foo", "bar", "space"},
},
{
path: "fixtures/tags_02.txt",
tags: nil,
},
}

for _, test := range tests {
tags, err := loadTagsFromFile(test.path)
if test.error {
require.Error(t, err)
assert.Nil(t, tags)
} else {
assert.Equal(t, test.tags, tags)
}
}
}
3 changes: 3 additions & 0 deletions plugin/storage/es/fixtures/tags_01.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
foo
bar
space
4 changes: 4 additions & 0 deletions plugin/storage/es/fixtures/tags_02.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@




20 changes: 20 additions & 0 deletions plugin/storage/es/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ const (
suffixBulkActions = ".bulk.actions"
suffixBulkFlushInterval = ".bulk.flush-interval"
suffixIndexPrefix = ".index-prefix"
suffixTagsAsFields = ".tags-as-fields"
suffixTagsAsFieldsAll = suffixTagsAsFields + ".all"
suffixTagsFile = suffixTagsAsFields + ".config-file"
suffixTagDeDotChar = suffixTagsAsFields + ".dot-replacement"
)

// TODO this should be moved next to config.Configuration struct (maybe ./flags package)
Expand Down Expand Up @@ -75,6 +79,7 @@ func NewOptions(primaryNamespace string, otherNamespaces ...string) *Options {
BulkWorkers: 1,
BulkActions: 1000,
BulkFlushInterval: time.Millisecond * 200,
TagDotReplacement: "@",
},
servers: "http://127.0.0.1:9200",
namespace: primaryNamespace,
Expand Down Expand Up @@ -146,6 +151,18 @@ func addFlags(flagSet *flag.FlagSet, nsConfig *namespaceConfig) {
nsConfig.namespace+suffixIndexPrefix,
nsConfig.IndexPrefix,
"Optional prefix of Jaeger indices. For example \"production\" creates \"production:jaeger-*\".")
flagSet.Bool(
nsConfig.namespace+suffixTagsAsFieldsAll,
nsConfig.AllTagsAsFields,
"Store all span and process tags as object fields. If true "+suffixTagsFile+" is ignored. Binary tags are always stored as nested objects.")
flagSet.String(
nsConfig.namespace+suffixTagsFile,
nsConfig.TagsFilePath,
"Optional path to a file containing tag keys which will be stored as object fields. Each key should be on a separate line.")
flagSet.String(
nsConfig.namespace+suffixTagDeDotChar,
nsConfig.TagDotReplacement,
"The character used to replace dots (\".\") in tag keys stored as object fields.")
}

// InitFromViper initializes Options with properties from viper
Expand All @@ -169,6 +186,9 @@ func initFromViper(cfg *namespaceConfig, v *viper.Viper) {
cfg.BulkActions = v.GetInt(cfg.namespace + suffixBulkActions)
cfg.BulkFlushInterval = v.GetDuration(cfg.namespace + suffixBulkFlushInterval)
cfg.IndexPrefix = v.GetString(cfg.namespace + suffixIndexPrefix)
cfg.AllTagsAsFields = v.GetBool(cfg.namespace + suffixTagsAsFieldsAll)
cfg.TagsFilePath = v.GetString(cfg.namespace + suffixTagsFile)
cfg.TagDotReplacement = v.GetString(cfg.namespace + suffixTagDeDotChar)
}

// GetPrimary returns primary configuration.
Expand Down
89 changes: 89 additions & 0 deletions plugin/storage/es/spanstore/dbmodel/fixtures/domain_01.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
{
"traceId": "AAAAAAAAAAAAAAAAAAAAAQ==",
"spanId": "AAAAAAAAAAI=",
"operationName": "test-general-conversion",
"references": [
{
"refType": "CHILD_OF",
"traceId": "AAAAAAAAAAAAAAAAAAAAAQ==",
"spanId": "AAAAAAAAAAM="
},
{
"refType": "FOLLOWS_FROM",
"traceId": "AAAAAAAAAAAAAAAAAAAAAQ==",
"spanId": "AAAAAAAAAAQ="
},
{
"refType": "CHILD_OF",
"traceId": "AAAAAAAAAAAAAAAAAAAA/w==",
"spanId": "AAAAAAAAAP8="
}
],
"flags": 1,
"startTime": "2017-01-26T16:46:31.639875-05:00",
"duration": "5000ns",
"tags": [
{
"key": "peer.service",
"vType": "STRING",
"vStr": "service-y"
},
{
"key": "peer.ipv4",
"vType": "INT64",
"vInt64": 23456
},
{
"key": "error",
"vType": "BOOL",
"vBool": true
},
{
"key": "temperature",
"vType": "FLOAT64",
"vFloat64": 72.5
},
{
"key": "blob",
"vType": "BINARY",
"vBinary": "AAAwOQ=="
}
],
"logs": [
{
"timestamp": "2017-01-26T16:46:31.639875-05:00",
"fields": [
{
"key": "event",
"vType": "INT64",
"vInt64": 123415
}
]
},
{
"timestamp": "2017-01-26T16:46:31.639875-05:00",
"fields": [
{
"key": "x",
"vType": "STRING",
"vStr": "y"
}
]
}
],
"process": {
"serviceName": "service-x",
"tags": [
{
"key": "peer.ipv4",
"vType": "INT64",
"vInt64": 23456
},
{
"key": "error",
"vType": "BOOL",
"vBool": true
}
]
}
}
Loading