Skip to content

Commit

Permalink
Decouple instrumentation from SDK (#983)
Browse files Browse the repository at this point in the history
* Remove otel/sdk dependency from grpctrace

Use otel/trace/testtrace instead and cleanup testing code.

* Update httptrace to not depend on the SDK

Update testing to use api/trace/testtrace instead.

* Add changes to Changelog

* Restore check for `http.local` attr on `http.getconn`
  • Loading branch information
MrAlias authored Jul 29, 2020
1 parent 42c2a86 commit d6bf2fb
Show file tree
Hide file tree
Showing 4 changed files with 142 additions and 306 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- Use `global.Handle` for span export errors in the OTLP exporter. (#946)
- Correct Go language formatting in the README documentation. (#961)
- Remove default SDK dependencies from the `go.opentelemetry.io/otel/api` package. (#977)
- Remove default SDK dependencies from the `go.opentelemetry.io/otel/instrumentation` package. (#983)
- Move documented examples for `go.opentelemetry.io/otel/instrumentation/grpctrace` interceptors into Go example tests. (#984)

## [0.9.0] - 2020-07-20
Expand Down
219 changes: 75 additions & 144 deletions instrumentation/grpctrace/interceptor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"time"

"go.opentelemetry.io/otel/api/standard"
"go.opentelemetry.io/otel/api/trace/testtrace"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand All @@ -32,19 +33,30 @@ import (
"google.golang.org/grpc/status"

"go.opentelemetry.io/otel/api/kv"
export "go.opentelemetry.io/otel/sdk/export/trace"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
)

type testExporter struct {
mu sync.Mutex
spanMap map[string]*export.SpanData
type SpanRecorder struct {
mu sync.RWMutex
spans map[string]*testtrace.Span
}

func (t *testExporter) ExportSpan(ctx context.Context, s *export.SpanData) {
t.mu.Lock()
defer t.mu.Unlock()
t.spanMap[s.Name] = s
func NewSpanRecorder() *SpanRecorder {
return &SpanRecorder{spans: make(map[string]*testtrace.Span)}
}

func (sr *SpanRecorder) OnStart(span *testtrace.Span) {}

func (sr *SpanRecorder) OnEnd(span *testtrace.Span) {
sr.mu.Lock()
defer sr.mu.Unlock()
sr.spans[span.Name()] = span
}

func (sr *SpanRecorder) Get(name string) (*testtrace.Span, bool) {
sr.mu.RLock()
defer sr.mu.RUnlock()
s, ok := sr.spans[name]
return s, ok
}

type mockUICInvoker struct {
Expand All @@ -69,18 +81,13 @@ func (mm *mockProtoMessage) ProtoMessage() {
}

func TestUnaryClientInterceptor(t *testing.T) {
exp := &testExporter{spanMap: make(map[string]*export.SpanData)}
tp, _ := sdktrace.NewProvider(sdktrace.WithSyncer(exp),
sdktrace.WithConfig(sdktrace.Config{
DefaultSampler: sdktrace.AlwaysSample(),
},
))

clientConn, err := grpc.Dial("fake:connection", grpc.WithInsecure())
if err != nil {
t.Fatalf("failed to create client connection: %v", err)
}

sr := NewSpanRecorder()
tp := testtrace.NewProvider(testtrace.WithSpanRecorder(sr))
tracer := tp.Tracer("grpctrace/client")
unaryInterceptor := UnaryClientInterceptor(tracer)

Expand Down Expand Up @@ -210,68 +217,24 @@ func TestUnaryClientInterceptor(t *testing.T) {
}

for _, check := range checks {
err = unaryInterceptor(context.Background(), check.method, req, reply, clientConn, uniInterceptorInvoker.invoker)
if err != nil {
t.Errorf("failed to run unary interceptor: %v", err)
if !assert.NoError(t, unaryInterceptor(context.Background(), check.method, req, reply, clientConn, uniInterceptorInvoker.invoker)) {
continue
}

spanData, ok := exp.spanMap[check.name]
if !ok {
t.Errorf("no span data found for name < %s >", check.name)
span, ok := sr.Get(check.name)
if !assert.True(t, ok, "missing span %q", check.name) {
continue
}
assert.Equal(t, check.expectedAttr, span.Attributes())
assert.Equal(t, check.eventsAttr, eventAttrMap(span.Events()))
}
}

attrs := spanData.Attributes
if len(check.expectedAttr) > len(attrs) {
t.Errorf("attributes received are less than expected attributes, received %d, expected %d",
len(attrs), len(check.expectedAttr))
}
for _, attr := range attrs {
expectedAttr, ok := check.expectedAttr[attr.Key]
if ok {
if expectedAttr != attr.Value {
t.Errorf("name: %s invalid %s found. expected %s, actual %s", check.name, string(attr.Key),
expectedAttr.AsString(), attr.Value.AsString())
}
delete(check.expectedAttr, attr.Key)
} else {
t.Errorf("attribute %s not found in expected attributes map", string(attr.Key))
}
}

// Check if any expected attr not seen
if len(check.expectedAttr) > 0 {
for attr := range check.expectedAttr {
t.Errorf("missing attribute %s in span", string(attr))
}
}

events := spanData.MessageEvents
if len(check.eventsAttr) > len(events) {
t.Errorf("events received are less than expected events, received %d, expected %d",
len(events), len(check.eventsAttr))
}
for event := 0; event < len(check.eventsAttr); event++ {
for _, attr := range events[event].Attributes {
expectedAttr, ok := check.eventsAttr[event][attr.Key]
if ok {
if attr.Value != expectedAttr {
t.Errorf("invalid value for attribute %s in events, expected %s actual %s",
string(attr.Key), attr.Value.AsString(), expectedAttr.AsString())
}
delete(check.eventsAttr[event], attr.Key)
} else {
t.Errorf("attribute in event %s not found in expected attributes map", string(attr.Key))
}
}
if len(check.eventsAttr[event]) > 0 {
for attr := range check.eventsAttr[event] {
t.Errorf("missing attribute %s in span event", string(attr))
}
}
}
func eventAttrMap(events []testtrace.Event) []map[kv.Key]kv.Value {
maps := make([]map[kv.Key]kv.Value, len(events))
for i, event := range events {
maps[i] = event.Attributes
}
return maps
}

type mockClientStream struct {
Expand All @@ -287,26 +250,23 @@ func (mockClientStream) Header() (metadata.MD, error) { return nil, nil }
func (mockClientStream) Trailer() metadata.MD { return nil }

func TestStreamClientInterceptor(t *testing.T) {
exp := &testExporter{spanMap: make(map[string]*export.SpanData)}
tp, _ := sdktrace.NewProvider(sdktrace.WithSyncer(exp),
sdktrace.WithConfig(sdktrace.Config{
DefaultSampler: sdktrace.AlwaysSample(),
},
))
clientConn, err := grpc.Dial("fake:connection", grpc.WithInsecure())
if err != nil {
t.Fatalf("failed to create client connection: %v", err)
}

// tracer
sr := NewSpanRecorder()
tp := testtrace.NewProvider(testtrace.WithSpanRecorder(sr))
tracer := tp.Tracer("grpctrace/Server")
streamCI := StreamClientInterceptor(tracer)

var mockClStr mockClientStream
method := "/github.com.serviceName/bar"
name := "github.com.serviceName/bar"

streamClient, err := streamCI(context.Background(),
streamClient, err := streamCI(
context.Background(),
&grpc.StreamDesc{ServerStreams: true},
clientConn,
method,
Expand All @@ -317,16 +277,11 @@ func TestStreamClientInterceptor(t *testing.T) {
opts ...grpc.CallOption) (grpc.ClientStream, error) {
mockClStr = mockClientStream{Desc: desc, Ctx: ctx}
return mockClStr, nil
})

if err != nil {
t.Fatalf("failed to initialize grpc stream client: %v", err)
}

// no span exported while stream is open
if _, ok := exp.spanMap[name]; ok {
t.Fatalf("span shouldn't end while stream is open")
}
},
)
require.NoError(t, err, "initialize grpc stream client")
_, ok := sr.Get(name)
require.False(t, ok, "span should ended while stream is open")

req := &mockProtoMessage{}
reply := &mockProtoMessage{}
Expand All @@ -343,53 +298,36 @@ func TestStreamClientInterceptor(t *testing.T) {
_ = streamClient.RecvMsg(reply)

// added retry because span end is called in separate go routine
var spanData *export.SpanData
var span *testtrace.Span
for retry := 0; retry < 5; retry++ {
ok := false
exp.mu.Lock()
spanData, ok = exp.spanMap[name]
exp.mu.Unlock()
span, ok = sr.Get(name)
if ok {
break
}
time.Sleep(time.Second * 1)
}
if spanData == nil {
t.Fatalf("no span data found for name < %s >", name)
require.True(t, ok, "missing span %s", name)

expectedAttr := map[kv.Key]kv.Value{
standard.RPCSystemKey: kv.StringValue("grpc"),
standard.RPCServiceKey: kv.StringValue("github.com.serviceName"),
standard.RPCMethodKey: kv.StringValue("bar"),
standard.NetPeerIPKey: kv.StringValue("fake"),
standard.NetPeerPortKey: kv.StringValue("connection"),
}
assert.Equal(t, expectedAttr, span.Attributes())

attrs := spanData.Attributes
expectedAttr := map[kv.Key]string{
standard.RPCSystemKey: "grpc",
standard.RPCServiceKey: "github.com.serviceName",
standard.RPCMethodKey: "bar",
standard.NetPeerIPKey: "fake",
standard.NetPeerPortKey: "connection",
}

for _, attr := range attrs {
expected, ok := expectedAttr[attr.Key]
if ok {
if expected != attr.Value.AsString() {
t.Errorf("name: %s invalid %s found. expected %s, actual %s", name, string(attr.Key),
expected, attr.Value.AsString())
}
}
}

events := spanData.MessageEvents
if len(events) != 20 {
t.Fatalf("incorrect number of events expected 20 got %d", len(events))
}
events := span.Events()
require.Len(t, events, 20)
for i := 0; i < 20; i += 2 {
msgID := i/2 + 1
validate := func(eventName string, attrs []kv.KeyValue) {
for _, attr := range attrs {
if attr.Key == standard.RPCMessageTypeKey && attr.Value.AsString() != eventName {
t.Errorf("invalid event on index: %d expecting %s event, receive %s event", i, eventName, attr.Value.AsString())
validate := func(eventName string, attrs map[kv.Key]kv.Value) {
for k, v := range attrs {
if k == standard.RPCMessageTypeKey && v.AsString() != eventName {
t.Errorf("invalid event on index: %d expecting %s event, receive %s event", i, eventName, v.AsString())
}
if attr.Key == standard.RPCMessageIDKey && attr.Value != kv.IntValue(msgID) {
t.Errorf("invalid id for message event expected %d received %d", msgID, attr.Value.AsInt32())
if k == standard.RPCMessageIDKey && v != kv.IntValue(msgID) {
t.Errorf("invalid id for message event expected %d received %d", msgID, v.AsInt32())
}
}
}
Expand All @@ -402,37 +340,30 @@ func TestStreamClientInterceptor(t *testing.T) {
}

func TestServerInterceptorError(t *testing.T) {
exp := &testExporter{spanMap: make(map[string]*export.SpanData)}
tp, err := sdktrace.NewProvider(
sdktrace.WithSyncer(exp),
sdktrace.WithConfig(sdktrace.Config{
DefaultSampler: sdktrace.AlwaysSample(),
}),
)
require.NoError(t, err)

sr := NewSpanRecorder()
tp := testtrace.NewProvider(testtrace.WithSpanRecorder(sr))
tracer := tp.Tracer("grpctrace/Server")
usi := UnaryServerInterceptor(tracer)
deniedErr := status.Error(codes.PermissionDenied, "PERMISSION_DENIED_TEXT")
handler := func(_ context.Context, _ interface{}) (interface{}, error) {
return nil, deniedErr
}
_, err = usi(context.Background(), &mockProtoMessage{}, &grpc.UnaryServerInfo{}, handler)
_, err := usi(context.Background(), &mockProtoMessage{}, &grpc.UnaryServerInfo{}, handler)
require.Error(t, err)
assert.Equal(t, err, deniedErr)

span, ok := exp.spanMap[""]
span, ok := sr.Get("")
if !ok {
t.Fatalf("failed to export error span")
}
assert.Equal(t, span.StatusCode, codes.PermissionDenied)
assert.Contains(t, deniedErr.Error(), span.StatusMessage)
assert.Len(t, span.MessageEvents, 2)
assert.Equal(t, []kv.KeyValue{
kv.String("message.type", "SENT"),
kv.Int("message.id", 1),
kv.Int("message.uncompressed_size", 26),
}, span.MessageEvents[1].Attributes)
assert.Equal(t, span.StatusCode(), codes.PermissionDenied)
assert.Contains(t, deniedErr.Error(), span.StatusMessage())
assert.Len(t, span.Events(), 2)
assert.Equal(t, map[kv.Key]kv.Value{
kv.Key("message.type"): kv.StringValue("SENT"),
kv.Key("message.id"): kv.IntValue(1),
kv.Key("message.uncompressed_size"): kv.IntValue(26),
}, span.Events()[1].Attributes)
}

func TestParseFullMethod(t *testing.T) {
Expand Down
Loading

0 comments on commit d6bf2fb

Please sign in to comment.