Skip to content

Commit

Permalink
Grpc plugin archive storage support
Browse files Browse the repository at this point in the history
Signed-off-by: Andrew Putilov <to.merge@gmail.com>
  • Loading branch information
m8rge committed Jun 29, 2020
1 parent 83fabe3 commit e27cc98
Show file tree
Hide file tree
Showing 23 changed files with 1,802 additions and 105 deletions.
9 changes: 9 additions & 0 deletions examples/memstore-plugin/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/spf13/viper"

"github.com/jaegertracing/jaeger/plugin/storage/grpc"
"github.com/jaegertracing/jaeger/plugin/storage/grpc/shared"
"github.com/jaegertracing/jaeger/plugin/storage/memory"
"github.com/jaegertracing/jaeger/storage/dependencystore"
"github.com/jaegertracing/jaeger/storage/spanstore"
Expand Down Expand Up @@ -52,6 +53,14 @@ type memoryStore struct {
store *memory.Store
}

func (ns *memoryStore) ArchiveSpanReader() shared.ArchiveReader {
return nil
}

func (ns *memoryStore) ArchiveSpanWriter() shared.ArchiveWriter {
return nil
}

func (ns *memoryStore) DependencyReader() dependencystore.Reader {
return ns.store
}
Expand Down
2 changes: 2 additions & 0 deletions plugin/storage/grpc/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ A plugin must implement the StoragePlugin interface of:
type StoragePlugin interface {
SpanReader() spanstore.Reader
SpanWriter() spanstore.Writer
ArchiveSpanReader() shared.ArchiveReader
ArchiveSpanWriter() shared.ArchiveWriter
DependencyReader() dependencystore.Reader
}
```
Expand Down
63 changes: 63 additions & 0 deletions plugin/storage/grpc/archive.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// Copyright (c) 2020 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package grpc

import (
"context"

"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/plugin/storage/grpc/shared"
"github.com/jaegertracing/jaeger/storage/spanstore"
)

// ArchiveWriter implements spanstore.Writer
type ArchiveWriter struct {
impl shared.ArchiveWriter
}

// WriteSpan saves the span
func (w *ArchiveWriter) WriteSpan(span *model.Span) error {
return w.impl.WriteArchiveSpan(span)
}

// ArchiveReader implements spanstore.Reader
type ArchiveReader struct {
impl shared.ArchiveReader
}

// GetTrace takes a traceID and returns a Trace associated with that traceID
func (r *ArchiveReader) GetTrace(ctx context.Context, traceID model.TraceID) (*model.Trace, error) {
return r.impl.GetArchiveTrace(ctx, traceID)
}

// GetServices is not used by archive storage
func (r *ArchiveReader) GetServices(ctx context.Context) ([]string, error) {
panic("not implemented")
}

// GetOperations is not used by archive storage
func (r *ArchiveReader) GetOperations(ctx context.Context, query spanstore.OperationQueryParameters) ([]spanstore.Operation, error) {
panic("not implemented")
}

// FindTraces is not used by archive storage
func (r *ArchiveReader) FindTraces(ctx context.Context, query *spanstore.TraceQueryParameters) ([]*model.Trace, error) {
panic("not implemented")
}

// FindTraceIDs is not used by archive storage
func (r *ArchiveReader) FindTraceIDs(ctx context.Context, query *spanstore.TraceQueryParameters) ([]model.TraceID, error) {
panic("not implemented")
}
88 changes: 88 additions & 0 deletions plugin/storage/grpc/archive_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
// Copyright (c) 2020 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package grpc

import (
"context"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"

"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/plugin/storage/grpc/shared/mocks"
"github.com/jaegertracing/jaeger/storage/spanstore"
)

var (
mockTraceID = model.NewTraceID(0, 123456)
mockSpan = &model.Span{
TraceID: mockTraceID,
SpanID: model.NewSpanID(1),
Process: &model.Process{},
}
)

func TestArchiveWriter_WriteSpan(t *testing.T) {
archiveWriter := new(mocks.ArchiveWriter)
archiveWriter.On("WriteArchiveSpan", mockSpan).Return(nil)
writer := &ArchiveWriter{impl: archiveWriter}

err := writer.WriteSpan(mockSpan)
assert.NoError(t, err)
}

func TestArchiveReader_GetTrace(t *testing.T) {
expected := &model.Trace{
Spans: []*model.Span{
mockSpan,
},
}
archiveReader := new(mocks.ArchiveReader)
archiveReader.On("GetArchiveTrace", mock.Anything, mockTraceID).Return(expected, nil)
reader := &ArchiveReader{impl: archiveReader}

trace, err := reader.GetTrace(context.Background(), mockTraceID)
assert.NoError(t, err)
assert.Equal(t, expected, trace)
}

func TestArchiveReader_FindTraceIDs(t *testing.T) {
assert.Panics(t, func() {
reader := ArchiveReader{impl: &mocks.ArchiveReader{}}
_, _ = reader.FindTraceIDs(context.Background(), nil)
})
}

func TestArchiveReader_FindTraces(t *testing.T) {
assert.Panics(t, func() {
reader := ArchiveReader{impl: &mocks.ArchiveReader{}}
_, _ = reader.FindTraces(context.Background(), nil)
})
}

func TestArchiveReader_GetOperations(t *testing.T) {
assert.Panics(t, func() {
reader := ArchiveReader{impl: &mocks.ArchiveReader{}}
_, _ = reader.GetOperations(context.Background(), spanstore.OperationQueryParameters{})
})
}

func TestArchiveReader_GetServices(t *testing.T) {
assert.Panics(t, func() {
reader := ArchiveReader{impl: &mocks.ArchiveReader{}}
_, _ = reader.GetServices(context.Background())
})
}
20 changes: 20 additions & 0 deletions plugin/storage/grpc/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package grpc

import (
"context"
"flag"

"github.com/spf13/viper"
Expand All @@ -23,6 +24,7 @@ import (

"github.com/jaegertracing/jaeger/plugin/storage/grpc/config"
"github.com/jaegertracing/jaeger/plugin/storage/grpc/shared"
"github.com/jaegertracing/jaeger/storage"
"github.com/jaegertracing/jaeger/storage/dependencystore"
"github.com/jaegertracing/jaeger/storage/spanstore"
)
Expand Down Expand Up @@ -88,3 +90,21 @@ func (f *Factory) CreateSpanWriter() (spanstore.Writer, error) {
func (f *Factory) CreateDependencyReader() (dependencystore.Reader, error) {
return f.store.DependencyReader(), nil
}

// CreateArchiveSpanReader implements storage.ArchiveFactory
func (f *Factory) CreateArchiveSpanReader() (spanstore.Reader, error) {
supported, _ := f.store.ArchiveSpanReader().ArchiveSupported(context.Background())
if !supported {
return nil, storage.ErrArchiveStorageNotSupported
}
return &ArchiveReader{impl: f.store.ArchiveSpanReader()}, nil
}

// CreateArchiveSpanWriter implements storage.ArchiveFactory
func (f *Factory) CreateArchiveSpanWriter() (spanstore.Writer, error) {
supported, _ := f.store.ArchiveSpanReader().ArchiveSupported(context.Background())
if !supported {
return nil, storage.ErrArchiveStorageNotSupported
}
return &ArchiveWriter{impl: f.store.ArchiveSpanWriter()}, nil
}
65 changes: 64 additions & 1 deletion plugin/storage/grpc/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@ import (

"github.com/spf13/viper"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/uber/jaeger-lib/metrics"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/pkg/config"
grpcConfig "github.com/jaegertracing/jaeger/plugin/storage/grpc/config"
"github.com/jaegertracing/jaeger/plugin/storage/grpc/shared"
"github.com/jaegertracing/jaeger/plugin/storage/grpc/shared/mocks"
"github.com/jaegertracing/jaeger/storage"
"github.com/jaegertracing/jaeger/storage/dependencystore"
dependencyStoreMocks "github.com/jaegertracing/jaeger/storage/dependencystore/mocks"
Expand All @@ -50,9 +52,19 @@ func (b *mockPluginBuilder) Build() (shared.StoragePlugin, error) {
type mockPlugin struct {
spanReader spanstore.Reader
spanWriter spanstore.Writer
archiveReader shared.ArchiveReader
archiveWriter shared.ArchiveWriter
dependencyReader dependencystore.Reader
}

func (mp *mockPlugin) ArchiveSpanReader() shared.ArchiveReader {
return mp.archiveReader
}

func (mp *mockPlugin) ArchiveSpanWriter() shared.ArchiveWriter {
return mp.archiveWriter
}

func (mp *mockPlugin) SpanReader() spanstore.Reader {
return mp.spanReader
}
Expand Down Expand Up @@ -81,6 +93,8 @@ func TestGRPCStorageFactory(t *testing.T) {
plugin: &mockPlugin{
spanWriter: new(spanStoreMocks.Writer),
spanReader: new(spanStoreMocks.Reader),
archiveReader: new(mocks.ArchiveReader),
archiveWriter: new(mocks.ArchiveWriter),
dependencyReader: new(dependencyStoreMocks.Reader),
},
}
Expand All @@ -98,14 +112,63 @@ func TestGRPCStorageFactory(t *testing.T) {
assert.Equal(t, f.store.DependencyReader(), depReader)
}

func TestGRPCArchiveStorageFactory(t *testing.T) {
f := NewFactory()
v := viper.New()
f.InitFromViper(v)

archiveReader := new(mocks.ArchiveReader)
archiveReader.On("ArchiveSupported", mock.Anything).
Return(true, nil)
f.builder = &mockPluginBuilder{
plugin: &mockPlugin{
archiveReader: archiveReader,
},
}
assert.NoError(t, f.Initialize(metrics.NullFactory, zap.NewNop()))

assert.NotNil(t, f.store)
reader, err := f.CreateArchiveSpanReader()
assert.NoError(t, err)
assert.IsType(t, &ArchiveReader{}, reader)
writer, err := f.CreateArchiveSpanWriter()
assert.NoError(t, err)
assert.IsType(t, &ArchiveWriter{}, writer)
}

func TestGRPCArchiveStorageDisabledFactory(t *testing.T) {
f := NewFactory()
v := viper.New()
f.InitFromViper(v)

archiveReader := new(mocks.ArchiveReader)
archiveReader.On("ArchiveSupported", mock.Anything).
Return(false, nil)
f.builder = &mockPluginBuilder{
plugin: &mockPlugin{
archiveReader: archiveReader,
},
}
assert.NoError(t, f.Initialize(metrics.NullFactory, zap.NewNop()))

assert.NotNil(t, f.store)
reader, err := f.CreateArchiveSpanReader()
assert.EqualError(t, err, storage.ErrArchiveStorageNotSupported.Error())
assert.Nil(t, reader)
writer, err := f.CreateArchiveSpanWriter()
assert.EqualError(t, err, storage.ErrArchiveStorageNotSupported.Error())
assert.Nil(t, writer)
}

func TestWithConfiguration(t *testing.T) {
f := NewFactory()
v, command := config.Viperize(f.AddFlags)
command.ParseFlags([]string{
err := command.ParseFlags([]string{
"--grpc-storage-plugin.binary=noop-grpc-plugin",
"--grpc-storage-plugin.configuration-file=config.json",
"--grpc-storage-plugin.log-level=debug",
})
assert.NoError(t, err)
f.InitFromViper(v)
assert.Equal(t, f.options.Configuration.PluginBinary, "noop-grpc-plugin")
assert.Equal(t, f.options.Configuration.PluginConfigurationFile, "config.json")
Expand Down
1 change: 0 additions & 1 deletion plugin/storage/grpc/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,5 +47,4 @@ func (opt *Options) InitFromViper(v *viper.Viper) {
opt.Configuration.PluginBinary = v.GetString(pluginBinary)
opt.Configuration.PluginConfigurationFile = v.GetString(pluginConfigurationFile)
opt.Configuration.PluginLogLevel = v.GetString(pluginLogLevel)

}
3 changes: 2 additions & 1 deletion plugin/storage/grpc/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,12 @@ import (
func TestOptionsWithFlags(t *testing.T) {
opts := &Options{}
v, command := config.Viperize(opts.AddFlags)
command.ParseFlags([]string{
err := command.ParseFlags([]string{
"--grpc-storage-plugin.binary=noop-grpc-plugin",
"--grpc-storage-plugin.configuration-file=config.json",
"--grpc-storage-plugin.log-level=debug",
})
assert.NoError(t, err)
opts.InitFromViper(v)

assert.Equal(t, opts.Configuration.PluginBinary, "noop-grpc-plugin")
Expand Down
20 changes: 20 additions & 0 deletions plugin/storage/grpc/proto/storage.proto
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,15 @@ message FindTraceIDsResponse {
];
}

// empty; extensible in the future
message ArchiveSupportedRequest {

}

message ArchiveSupportedResponse {
bool supported = 1;
}

service SpanWriterPlugin {
// spanstore/Writer
rpc WriteSpan(WriteSpanRequest) returns (WriteSpanResponse);
Expand All @@ -148,6 +157,17 @@ service SpanReaderPlugin {
rpc FindTraceIDs(FindTraceIDsRequest) returns (FindTraceIDsResponse);
}

service ArchiveSpanWriterPlugin {
// spanstore/Writer
rpc WriteArchiveSpan(WriteSpanRequest) returns (WriteSpanResponse);
}

service ArchiveSpanReaderPlugin {
// spanstore/Reader
rpc GetArchiveTrace(GetTraceRequest) returns (stream SpansResponseChunk);
rpc ArchiveSupported(ArchiveSupportedRequest) returns (ArchiveSupportedResponse);
}

service DependenciesReaderPlugin {
// dependencystore/Reader
rpc GetDependencies(GetDependenciesRequest) returns (GetDependenciesResponse);
Expand Down
Loading

0 comments on commit e27cc98

Please sign in to comment.