Skip to content

Commit

Permalink
Propagate trace inforation from incoming headers (dapr#7932)
Browse files Browse the repository at this point in the history
* Propagate trace inforation from incoming headers

Signed-off-by: Yevgen Polyak <yevgen.polyak@gmail.com>

* Remove unrelated change

Signed-off-by: Yevgen Polyak <yevgen.polyak@gmail.com>

* Remove unrelated change

Signed-off-by: Yevgen Polyak <yevgen.polyak@gmail.com>

* Propagate tracestate headers from incoming metadata

Signed-off-by: Yevgen Polyak <yevgen.polyak@gmail.com>

* Linter fix

Signed-off-by: Yevgen Polyak <yevgen.polyak@gmail.com>

* traceparent supersedes traceid

Signed-off-by: Yevgen Polyak <yevgen.polyak@gmail.com>

* traceparent supersedes traceid

Signed-off-by: Yevgen Polyak <yevgen.polyak@gmail.com>

---------

Signed-off-by: Yevgen Polyak <yevgen.polyak@gmail.com>
Co-authored-by: Yevgen Polyak <yevgen.polyak@elh-54wd4m.tail71e11.ts.net>
Co-authored-by: Yaron Schneider <schneider.yaron@live.com>
  • Loading branch information
3 people authored Oct 31, 2024
1 parent 284211f commit d697612
Show file tree
Hide file tree
Showing 7 changed files with 138 additions and 19 deletions.
12 changes: 6 additions & 6 deletions pkg/api/grpc/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,14 +185,14 @@ func (a *api) PublishEvent(ctx context.Context, in *runtimev1pb.PublishEventRequ

if !rawPayload {
span := diagUtils.SpanFromContext(ctx)
corID, traceState := diag.TraceIDAndStateFromSpan(span)
traceID, traceState := diag.TraceIDAndStateFromSpan(span)

envelope, err := runtimePubsub.NewCloudEvent(&runtimePubsub.CloudEvent{
Source: a.Universal.AppID(),
Topic: in.GetTopic(),
DataContentType: in.GetDataContentType(),
Data: body,
TraceID: corID,
TraceID: traceID,
TraceState: traceState,
Pubsub: in.GetPubsubName(),
}, in.GetMetadata())
Expand Down Expand Up @@ -399,7 +399,7 @@ func (a *api) BulkPublishEventAlpha1(ctx context.Context, in *runtimev1pb.BulkPu
if !rawPayload {
// Extract trace context from context.
_, childSpan := diag.StartGRPCProducerSpanChildFromParent(ctx, span, "/dapr.proto.runtime.v1.Dapr/BulkPublishEventAlpha1/")
corID, traceState := diag.TraceIDAndStateFromSpan(childSpan)
traceID, traceState := diag.TraceIDAndStateFromSpan(childSpan)

// For multiple events in a single bulk call traceParent is different for each event.
// Populate W3C traceparent to cloudevent envelope
Expand All @@ -410,7 +410,7 @@ func (a *api) BulkPublishEventAlpha1(ctx context.Context, in *runtimev1pb.BulkPu
Topic: topic,
DataContentType: entries[i].ContentType,
Data: entries[i].Event,
TraceID: corID,
TraceID: traceID,
TraceState: traceState,
Pubsub: pubsubName,
}, entries[i].Metadata)
Expand Down Expand Up @@ -998,8 +998,8 @@ func (a *api) ExecuteStateTransaction(ctx context.Context, in *runtimev1pb.Execu
outboxEnabled := a.outbox.Enabled(in.GetStoreName())
if outboxEnabled {
span := diagUtils.SpanFromContext(ctx)
corID, traceState := diag.TraceIDAndStateFromSpan(span)
ops, err := a.outbox.PublishInternal(ctx, in.GetStoreName(), operations, a.Universal.AppID(), corID, traceState)
traceID, traceState := diag.TraceIDAndStateFromSpan(span)
ops, err := a.outbox.PublishInternal(ctx, in.GetStoreName(), operations, a.Universal.AppID(), traceID, traceState)
if err != nil {
nerr := apierrors.PubSubOutbox(a.AppID(), err)
apiServerLogger.Debug(nerr)
Expand Down
12 changes: 6 additions & 6 deletions pkg/api/http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -1118,13 +1118,13 @@ func (a *api) onPublish(reqCtx *fasthttp.RequestCtx) {

if !rawPayload {
span := diagUtils.SpanFromContext(reqCtx)
corID, traceState := diag.TraceIDAndStateFromSpan(span)
traceID, traceState := diag.TraceIDAndStateFromSpan(span)
envelope, err := runtimePubsub.NewCloudEvent(&runtimePubsub.CloudEvent{
Source: a.universal.AppID(),
Topic: topic,
DataContentType: contentType,
Data: body,
TraceID: corID,
TraceID: traceID,
TraceState: traceState,
Pubsub: pubsubName,
}, metadata)
Expand Down Expand Up @@ -1272,7 +1272,7 @@ func (a *api) onBulkPublish(reqCtx *fasthttp.RequestCtx) {
if !rawPayload {
for i := range entries {
childSpan := diag.StartProducerSpanChildFromParent(reqCtx, span)
corID, traceState := diag.TraceIDAndStateFromSpan(childSpan)
traceID, traceState := diag.TraceIDAndStateFromSpan(childSpan)
// For multiple events in a single bulk call traceParent is different for each event.
// Populate W3C traceparent to cloudevent envelope
spanMap[i] = childSpan
Expand All @@ -1283,7 +1283,7 @@ func (a *api) onBulkPublish(reqCtx *fasthttp.RequestCtx) {
Topic: topic,
DataContentType: entries[i].ContentType,
Data: entries[i].Event,
TraceID: corID,
TraceID: traceID,
TraceState: traceState,
Pubsub: pubsubName,
}, entries[i].Metadata)
Expand Down Expand Up @@ -1595,8 +1595,8 @@ func (a *api) onPostStateTransaction(reqCtx *fasthttp.RequestCtx) {
outboxEnabled := a.outbox.Enabled(storeName)
if outboxEnabled {
span := diagUtils.SpanFromContext(reqCtx)
corID, traceState := diag.TraceIDAndStateFromSpan(span)
ops, err := a.outbox.PublishInternal(reqCtx, storeName, operations, a.universal.AppID(), corID, traceState)
traceID, traceState := diag.TraceIDAndStateFromSpan(span)
ops, err := a.outbox.PublishInternal(reqCtx, storeName, operations, a.universal.AppID(), traceID, traceState)
if err != nil {
nerr := apierrors.PubSubOutbox(a.universal.AppID(), err)
universalFastHTTPErrorResponder(reqCtx, nerr)
Expand Down
22 changes: 20 additions & 2 deletions pkg/channel/testing/channel_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions pkg/diagnostics/tracing.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,18 +178,18 @@ func StartInternalCallbackSpan(ctx context.Context, spanName string, parent trac
}

func TraceIDAndStateFromSpan(span trace.Span) (string, string) {
var corID, traceState string
var traceID, traceState string

if span != nil {
sc := span.SpanContext()

if !sc.Equal(trace.SpanContext{}) {
corID = SpanContextToW3CString(sc)
traceID = SpanContextToW3CString(sc)
}
if sc.TraceState().Len() > 0 {
traceState = TraceStateToW3CString(sc)
}
}

return corID, traceState
return traceID, traceState
}
5 changes: 3 additions & 2 deletions pkg/runtime/subscription/bulksubscription_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1480,8 +1480,9 @@ func (m *mockSubscribePubSub) Publish(ctx context.Context, req *contribpubsub.Pu
var err error
if handler, ok := m.handlers[req.Topic]; ok {
pubsubMsg := &contribpubsub.NewMessage{
Data: req.Data,
Topic: req.Topic,
Data: req.Data,
Topic: req.Topic,
Metadata: req.Metadata,
}
handler(context.Background(), pubsubMsg)
} else if bulkHandler, ok := m.bulkHandlers[req.Topic]; ok {
Expand Down
69 changes: 69 additions & 0 deletions pkg/runtime/subscription/publish_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"
"net/http"
"reflect"
"strconv"
"testing"

"github.com/phayes/freeport"
Expand Down Expand Up @@ -803,3 +804,71 @@ func TestOnNewPublishedMessageGRPC(t *testing.T) {
})
}
}

func TestTracingOnNewPublishedMessage(t *testing.T) {
t.Run("succeeded to publish message with TraceParent in metadata", func(t *testing.T) {
comp := &mockSubscribePubSub{}
require.NoError(t, comp.Init(context.Background(), contribpubsub.Metadata{}))

resp := contribpubsub.AppResponse{
Status: contribpubsub.Success,
}

respB, _ := json.Marshal(resp)
fakeResp := invokev1.NewInvokeMethodResponse(200, "OK", nil).
WithRawDataBytes(respB).
WithContentType("application/json")
defer fakeResp.Close()

for _, rawPayload := range []bool{false, true} {
mockAppChannel := new(channelt.MockAppChannel)
mockAppChannel.Init()
mockAppChannel.On("InvokeMethod", mock.MatchedBy(matchContextInterface), mock.Anything).Return(fakeResp, nil)

ps, err := New(Options{
Resiliency: resiliency.New(log),
IsHTTP: true,
Channels: new(channels.Channels).WithAppChannel(mockAppChannel),
PubSub: &runtimePubsub.PubsubItem{Component: comp},
AppID: TestRuntimeConfigID,
PubSubName: "testpubsub",
Topic: "topic0",
Route: runtimePubsub.Subscription{
Metadata: map[string]string{"rawPayload": strconv.FormatBool(rawPayload)},
Rules: []*runtimePubsub.Rule{
{Path: "orders"},
},
DeadLetterTopic: "topic1",
},
})
require.NoError(t, err)
t.Cleanup(ps.Stop)

traceparent := "00-0af7651916cd43dd8448eb211c80319c-b9c7c989f97918e1-01"
traceid := "00-80e1afed08e019fc1110464cfa66635c-7a085853722dc6d2-01"
tracestate := "abc=xyz"
err = comp.Publish(context.TODO(), &contribpubsub.PublishRequest{
PubsubName: "testpubsub",
Topic: "topic0",
Data: []byte(`{"orderId":"1"}`),
Metadata: map[string]string{contribpubsub.TraceParentField: traceparent, contribpubsub.TraceIDField: traceid, contribpubsub.TraceStateField: tracestate},
})
require.NoError(t, err)
reqs := mockAppChannel.GetInvokedRequest()
reqMetadata := mockAppChannel.GetInvokedRequestMetadata()
mockAppChannel.AssertNumberOfCalls(t, "InvokeMethod", 1)
assert.Contains(t, reqMetadata["orders"][contribpubsub.TraceParentField], traceparent)
assert.Contains(t, reqMetadata["orders"][contribpubsub.TraceStateField], tracestate)
if rawPayload {
assert.Contains(t, string(reqs["orders"]), `{"data_base64":"eyJvcmRlcklkIjoiMSJ9"`)
// traceparent also included as part of a CloudEvent
assert.Contains(t, string(reqs["orders"]), traceparent)
assert.Contains(t, string(reqs["orders"]), tracestate)
// traceid is superseded by traceparent
assert.NotContains(t, string(reqs["orders"]), traceid)
} else {
assert.Contains(t, string(reqs["orders"]), `{"orderId":"1"}`)
}
}
})
}
31 changes: 31 additions & 0 deletions pkg/runtime/subscription/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,17 @@ func New(opts Options) (*Subscription, error) {
data := msg.Data
if rawPayload {
cloudEvent = contribpubsub.FromRawPayload(msg.Data, msgTopic, name)
if traceid, ok := msg.Metadata[contribpubsub.TraceIDField]; ok {
cloudEvent[contribpubsub.TraceIDField] = traceid
}
if traceparent, ok := msg.Metadata[contribpubsub.TraceParentField]; ok {
cloudEvent[contribpubsub.TraceParentField] = traceparent
// traceparent supersedes traceid
cloudEvent[contribpubsub.TraceIDField] = traceparent
}
if tracestate, ok := msg.Metadata[contribpubsub.TraceStateField]; ok {
cloudEvent[contribpubsub.TraceStateField] = tracestate
}
data, err = json.Marshal(cloudEvent)
if err != nil {
log.Errorf("error serializing cloud event in pubsub %s and topic %s: %s", name, msgTopic, err)
Expand All @@ -162,6 +173,7 @@ func New(opts Options) (*Subscription, error) {
return err
}
} else {
// all messages consumed with "rawPayload=false" are deserialized as a CloudEvent, even when the payload is not a CloudEvent
err = json.Unmarshal(msg.Data, &cloudEvent)
if err != nil {
log.Errorf("error deserializing cloud event in pubsub %s and topic %s: %s", name, msgTopic, err)
Expand All @@ -175,6 +187,25 @@ func New(opts Options) (*Subscription, error) {
diag.DefaultComponentMonitoring.PubsubIngressEvent(ctx, name, strings.ToLower(string(contribpubsub.Retry)), "", msgTopic, 0)
return err
}

// fallback to message metadata to propagate the tracing information
if _, ok := cloudEvent[contribpubsub.TraceIDField]; !ok {
if traceid, ok := msg.Metadata[contribpubsub.TraceIDField]; ok {
cloudEvent[contribpubsub.TraceIDField] = traceid
}
}
if _, ok := cloudEvent[contribpubsub.TraceParentField]; !ok {
if traceparent, ok := msg.Metadata[contribpubsub.TraceParentField]; ok {
cloudEvent[contribpubsub.TraceParentField] = traceparent
// traceparent supersedes traceid
cloudEvent[contribpubsub.TraceIDField] = traceparent
}
}
if _, ok := cloudEvent[contribpubsub.TraceStateField]; !ok {
if tracestate, ok := msg.Metadata[contribpubsub.TraceStateField]; ok {
cloudEvent[contribpubsub.TraceStateField] = tracestate
}
}
}

if contribpubsub.HasExpired(cloudEvent) {
Expand Down

0 comments on commit d697612

Please sign in to comment.