Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into remove_jaeger_end…
Browse files Browse the repository at this point in the history
…point_in_hotrod
  • Loading branch information
eundoosong committed Oct 12, 2018
2 parents 87116e9 + 732ef79 commit 30d87fc
Show file tree
Hide file tree
Showing 14 changed files with 177 additions and 53 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ build-crossdock-fresh: build-crossdock-linux
install-tools:
go get -u github.com/wadey/gocovmerge
go get -u golang.org/x/tools/cmd/cover
go get -u github.com/golang/lint/golint
go get -u golang.org/x/lint/golint
go get -u github.com/sectioneight/md-to-godoc
go get -u github.com/securego/gosec/cmd/gosec/...
go get -u honnef.co/go/tools/cmd/gosimple
Expand Down
16 changes: 8 additions & 8 deletions cmd/collector/app/builder/builder_flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@ import (
)

const (
collectorQueueSize = "collector.queue-size"
collectorNumWorkers = "collector.num-workers"
collectorWriteCacheTTL = "collector.write-cache-ttl"
collectorPort = "collector.port"
collectorHTTPPort = "collector.http-port"
collectorZipkinHTTPort = "collector.zipkin.http-port"
collectorQueueSize = "collector.queue-size"
collectorNumWorkers = "collector.num-workers"
collectorWriteCacheTTL = "collector.write-cache-ttl"
collectorPort = "collector.port"
collectorHTTPPort = "collector.http-port"
collectorZipkinHTTPPort = "collector.zipkin.http-port"
// CollectorDefaultHealthCheckHTTPPort is the default HTTP Port for health check
CollectorDefaultHealthCheckHTTPPort = 14269
)
Expand All @@ -53,7 +53,7 @@ func AddFlags(flags *flag.FlagSet) {
flags.Int(collectorNumWorkers, app.DefaultNumWorkers, "The number of workers pulling items from the queue")
flags.Int(collectorPort, 14267, "The tchannel port for the collector service")
flags.Int(collectorHTTPPort, 14268, "The http port for the collector service")
flags.Int(collectorZipkinHTTPort, 0, "The http port for the Zipkin collector service e.g. 9411")
flags.Int(collectorZipkinHTTPPort, 0, "The http port for the Zipkin collector service e.g. 9411")
}

// InitFromViper initializes CollectorOptions with properties from viper
Expand All @@ -62,6 +62,6 @@ func (cOpts *CollectorOptions) InitFromViper(v *viper.Viper) *CollectorOptions {
cOpts.NumWorkers = v.GetInt(collectorNumWorkers)
cOpts.CollectorPort = v.GetInt(collectorPort)
cOpts.CollectorHTTPPort = v.GetInt(collectorHTTPPort)
cOpts.CollectorZipkinHTTPPort = v.GetInt(collectorZipkinHTTPort)
cOpts.CollectorZipkinHTTPPort = v.GetInt(collectorZipkinHTTPPort)
return cOpts
}
49 changes: 49 additions & 0 deletions plugin/storage/es/dependencystore/dbmodel/converter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// Copyright (c) 2018 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package dbmodel

import "github.com/jaegertracing/jaeger/model"

// FromDomainDependencies converts model dependencies to database representation
func FromDomainDependencies(dLinks []model.DependencyLink) []DependencyLink {
if dLinks == nil {
return nil
}
ret := make([]DependencyLink, len(dLinks))
for i, d := range dLinks {
ret[i] = DependencyLink{
CallCount: d.CallCount,
Parent: d.Parent,
Child: d.Child,
}
}
return ret
}

// ToDomainDependencies converts database representation of dependencies to model
func ToDomainDependencies(dLinks []DependencyLink) []model.DependencyLink {
if dLinks == nil {
return nil
}
ret := make([]model.DependencyLink, len(dLinks))
for i, d := range dLinks {
ret[i] = model.DependencyLink{
CallCount: d.CallCount,
Parent: d.Parent,
Child: d.Child,
}
}
return ret
}
51 changes: 51 additions & 0 deletions plugin/storage/es/dependencystore/dbmodel/converter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// Copyright (c) 2018 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package dbmodel

import (
"strconv"
"testing"

"github.com/stretchr/testify/assert"

"github.com/jaegertracing/jaeger/model"
)

func TestConvertDependencies(t *testing.T) {
tests := []struct {
dLinks []model.DependencyLink
}{
{
dLinks: []model.DependencyLink{{CallCount: 1, Parent: "foo", Child: "bar"}},
},
{
dLinks: []model.DependencyLink{{CallCount: 3, Parent: "foo"}},
},
{
dLinks: []model.DependencyLink{},
},
{
dLinks: nil,
},
}

for i, test := range tests {
t.Run(strconv.Itoa(i), func(t *testing.T) {
got := FromDomainDependencies(test.dLinks)
a := ToDomainDependencies(got)
assert.Equal(t, test.dLinks, a)
})
}
}
30 changes: 30 additions & 0 deletions plugin/storage/es/dependencystore/dbmodel/model.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// Copyright (c) 2018 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package dbmodel

import "time"

// TimeDependencies encapsulates dependencies created at a given time
type TimeDependencies struct {
Timestamp time.Time `json:"timestamp"`
Dependencies []DependencyLink `json:"dependencies"`
}

// DependencyLink shows dependencies between services
type DependencyLink struct {
Parent string `json:"parent"`
Child string `json:"child"`
CallCount uint64 `json:"callCount"`
}
16 changes: 6 additions & 10 deletions plugin/storage/es/dependencystore/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,14 @@ import (

"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/pkg/es"
"github.com/jaegertracing/jaeger/plugin/storage/es/dependencystore/dbmodel"
)

const (
dependencyType = "dependencies"
dependencyIndex = "jaeger-dependencies-"
)

type timeToDependencies struct {
Timestamp time.Time `json:"timestamp"`
Dependencies []model.DependencyLink `json:"dependencies"`
}

// DependencyStore handles all queries and insertions to ElasticSearch dependencies
type DependencyStore struct {
ctx context.Context
Expand Down Expand Up @@ -78,8 +74,8 @@ func (s *DependencyStore) createIndex(indexName string) error {

func (s *DependencyStore) writeDependencies(indexName string, ts time.Time, dependencies []model.DependencyLink) {
s.client.Index().Index(indexName).Type(dependencyType).
BodyJson(&timeToDependencies{Timestamp: ts,
Dependencies: dependencies,
BodyJson(&dbmodel.TimeDependencies{Timestamp: ts,
Dependencies: dbmodel.FromDomainDependencies(dependencies),
}).Add()
}

Expand All @@ -95,17 +91,17 @@ func (s *DependencyStore) GetDependencies(endTs time.Time, lookback time.Duratio
return nil, errors.Wrap(err, "Failed to search for dependencies")
}

var retDependencies []model.DependencyLink
var retDependencies []dbmodel.DependencyLink
hits := searchResult.Hits.Hits
for _, hit := range hits {
source := hit.Source
var tToD timeToDependencies
var tToD dbmodel.TimeDependencies
if err := json.Unmarshal(*source, &tToD); err != nil {
return nil, errors.New("Unmarshalling ElasticSearch documents failed")
}
retDependencies = append(retDependencies, tToD.Dependencies...)
}
return retDependencies, nil
return dbmodel.ToDomainDependencies(retDependencies), nil
}

func buildTSQuery(endTs time.Time, lookback time.Duration) elastic.Query {
Expand Down
2 changes: 1 addition & 1 deletion plugin/storage/es/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func (f *Factory) CreateSpanReader() (spanstore.Reader, error) {
Client: f.primaryClient,
Logger: f.logger,
MetricsFactory: f.metricsFactory,
MaxLookback: cfg.GetMaxSpanAge(),
MaxSpanAge: cfg.GetMaxSpanAge(),
IndexPrefix: cfg.GetIndexPrefix(),
TagDotReplacement: cfg.GetTagDotReplacement(),
}), nil
Expand Down
6 changes: 6 additions & 0 deletions plugin/storage/es/spanstore/dbmodel/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,3 +92,9 @@ type KeyValue struct {
Type ValueType `json:"type,omitempty"`
Value interface{} `json:"value"`
}

// Service is the JSON struct for service:operation documents in ElasticSearch
type Service struct {
ServiceName string `json:"serviceName"`
OperationName string `json:"operationName"`
}
14 changes: 7 additions & 7 deletions plugin/storage/es/spanstore/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ type SpanReader struct {
logger *zap.Logger
// The age of the oldest service/operation we will look for. Because indices in ElasticSearch are by day,
// this will be rounded down to UTC 00:00 of that day.
maxLookback time.Duration
maxSpanAge time.Duration
serviceOperationStorage *ServiceOperationStorage
spanIndexPrefix string
serviceIndexPrefix string
Expand All @@ -99,7 +99,7 @@ type SpanReader struct {
type SpanReaderParams struct {
Client es.Client
Logger *zap.Logger
MaxLookback time.Duration
MaxSpanAge time.Duration
MetricsFactory metrics.Factory
serviceOperationStorage *ServiceOperationStorage
IndexPrefix string
Expand All @@ -120,8 +120,8 @@ func newSpanReader(p SpanReaderParams) *SpanReader {
ctx: ctx,
client: p.Client,
logger: p.Logger,
maxLookback: p.MaxLookback,
serviceOperationStorage: NewServiceOperationStorage(ctx, p.Client, metrics.NullFactory, p.Logger, 0), // the decorator takes care of metrics
maxSpanAge: p.MaxSpanAge,
serviceOperationStorage: NewServiceOperationStorage(ctx, p.Client, p.Logger, 0), // the decorator takes care of metrics
spanIndexPrefix: p.IndexPrefix + spanIndex,
serviceIndexPrefix: p.IndexPrefix + serviceIndex,
spanConverter: dbmodel.NewToDomain(p.TagDotReplacement),
Expand All @@ -131,7 +131,7 @@ func newSpanReader(p SpanReaderParams) *SpanReader {
// GetTrace takes a traceID and returns a Trace associated with that traceID
func (s *SpanReader) GetTrace(ctx context.Context, traceID model.TraceID) (*model.Trace, error) {
currentTime := time.Now()
traces, err := s.multiRead([]string{traceID.String()}, currentTime.Add(-s.maxLookback), currentTime)
traces, err := s.multiRead([]string{traceID.String()}, currentTime.Add(-s.maxSpanAge), currentTime)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -184,14 +184,14 @@ func (s *SpanReader) indicesForTimeRange(indexName string, startTime time.Time,
// GetServices returns all services traced by Jaeger, ordered by frequency
func (s *SpanReader) GetServices(ctx context.Context) ([]string, error) {
currentTime := time.Now()
jaegerIndices := s.indicesForTimeRange(s.serviceIndexPrefix, currentTime.Add(-s.maxLookback), currentTime)
jaegerIndices := s.indicesForTimeRange(s.serviceIndexPrefix, currentTime.Add(-s.maxSpanAge), currentTime)
return s.serviceOperationStorage.getServices(jaegerIndices)
}

// GetOperations returns all operations for a specific service traced by Jaeger
func (s *SpanReader) GetOperations(ctx context.Context, service string) ([]string, error) {
currentTime := time.Now()
jaegerIndices := s.indicesForTimeRange(s.serviceIndexPrefix, currentTime.Add(-s.maxLookback), currentTime)
jaegerIndices := s.indicesForTimeRange(s.serviceIndexPrefix, currentTime.Add(-s.maxSpanAge), currentTime)
return s.serviceOperationStorage.getOperations(jaegerIndices, service)
}

Expand Down
6 changes: 3 additions & 3 deletions plugin/storage/es/spanstore/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func withSpanReader(fn func(r *spanReaderTest)) {
reader: newSpanReader(SpanReaderParams{
Client: client,
Logger: zap.NewNop(),
MaxLookback: 0,
MaxSpanAge: 0,
IndexPrefix: "",
TagDotReplacement: "@",
}),
Expand All @@ -109,7 +109,7 @@ func TestNewSpanReader(t *testing.T) {
reader := NewSpanReader(SpanReaderParams{
Client: client,
Logger: zap.NewNop(),
MaxLookback: 0,
MaxSpanAge: 0,
MetricsFactory: metrics.NullFactory,
IndexPrefix: ""})
assert.NotNil(t, reader)
Expand All @@ -129,7 +129,7 @@ func TestNewSpanReaderIndexPrefix(t *testing.T) {
r := newSpanReader(SpanReaderParams{
Client: client,
Logger: zap.NewNop(),
MaxLookback: 0,
MaxSpanAge: 0,
IndexPrefix: testCase.prefix})
assert.Equal(t, testCase.expected+spanIndex, r.spanIndexPrefix)
assert.Equal(t, testCase.expected+serviceIndex, r.serviceIndexPrefix)
Expand Down
15 changes: 11 additions & 4 deletions plugin/storage/es/spanstore/service_operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@ package spanstore

import (
"context"
"fmt"
"hash/fnv"
"time"

"github.com/pkg/errors"
"github.com/uber/jaeger-lib/metrics"
"go.uber.org/zap"
"gopkg.in/olivere/elastic.v5"

Expand Down Expand Up @@ -49,7 +50,6 @@ type ServiceOperationStorage struct {
func NewServiceOperationStorage(
ctx context.Context,
client es.Client,
metricsFactory metrics.Factory,
logger *zap.Logger,
cacheTTL time.Duration,
) *ServiceOperationStorage {
Expand All @@ -69,12 +69,12 @@ func NewServiceOperationStorage(
// Write saves a service to operation pair.
func (s *ServiceOperationStorage) Write(indexName string, jsonSpan *dbmodel.Span) {
// Insert serviceName:operationName document
service := Service{
service := dbmodel.Service{
ServiceName: jsonSpan.Process.ServiceName,
OperationName: jsonSpan.OperationName,
}

cacheKey := service.hashCode()
cacheKey := hashCode(service)
if !keyInCache(cacheKey, s.serviceCache) {
s.client.Index().Index(indexName).Type(serviceType).Id(cacheKey).BodyJson(service).Add()
writeCache(cacheKey, s.serviceCache)
Expand Down Expand Up @@ -142,3 +142,10 @@ func getOperationsAggregation() elastic.Query {
Field(operationNameField).
Size(defaultDocCount) // Must set to some large number. ES deprecated size omission for aggregating all. https://github.com/elastic/elasticsearch/issues/18838
}

func hashCode(s dbmodel.Service) string {
h := fnv.New64a()
h.Write([]byte(s.ServiceName))
h.Write([]byte(s.OperationName))
return fmt.Sprintf("%x", h.Sum64())
}
4 changes: 2 additions & 2 deletions plugin/storage/es/spanstore/service_operation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func TestWriteService(t *testing.T) {
indexService.On("Index", stringMatcher(indexName)).Return(indexService)
indexService.On("Type", stringMatcher(serviceType)).Return(indexService)
indexService.On("Id", stringMatcher(serviceHash)).Return(indexService)
indexService.On("BodyJson", mock.AnythingOfType("spanstore.Service")).Return(indexService)
indexService.On("BodyJson", mock.AnythingOfType("dbmodel.Service")).Return(indexService)
indexService.On("Add")

w.client.On("Index").Return(indexService)
Expand Down Expand Up @@ -72,7 +72,7 @@ func TestWriteServiceError(t *testing.T) {
indexService.On("Index", stringMatcher(indexName)).Return(indexService)
indexService.On("Type", stringMatcher(serviceType)).Return(indexService)
indexService.On("Id", stringMatcher(serviceHash)).Return(indexService)
indexService.On("BodyJson", mock.AnythingOfType("spanstore.Service")).Return(indexService)
indexService.On("BodyJson", mock.AnythingOfType("dbmodel.Service")).Return(indexService)
indexService.On("Add")

w.client.On("Index").Return(indexService)
Expand Down
Loading

0 comments on commit 30d87fc

Please sign in to comment.