diff --git a/pkg/grpc/api.go b/pkg/grpc/api.go index 06ea8cae785..c4b7e991afd 100644 --- a/pkg/grpc/api.go +++ b/pkg/grpc/api.go @@ -920,7 +920,9 @@ func (a *api) ExecuteStateTransaction(ctx context.Context, in *runtimev1pb.Execu outboxEnabled := a.pubsubAdapter.Outbox().Enabled(in.StoreName) if outboxEnabled { - trs, err := a.pubsubAdapter.Outbox().PublishInternal(ctx, in.StoreName, operations, a.UniversalAPI.AppID) + span := diagUtils.SpanFromContext(ctx) + corID, traceState := diag.TraceIDAndStateFromSpan(span) + trs, err := a.pubsubAdapter.Outbox().PublishInternal(ctx, in.StoreName, operations, a.UniversalAPI.AppID, corID, traceState) if err != nil { err = status.Errorf(codes.Internal, messages.ErrPublishOutbox, err.Error()) apiServerLogger.Debug(err) diff --git a/pkg/http/api.go b/pkg/http/api.go index 0b43613fd40..4c2380cbff0 100644 --- a/pkg/http/api.go +++ b/pkg/http/api.go @@ -2037,7 +2037,9 @@ func (a *api) onPostStateTransaction(reqCtx *fasthttp.RequestCtx) { outboxEnabled := a.pubsubAdapter.Outbox().Enabled(storeName) if outboxEnabled { - trs, err := a.pubsubAdapter.Outbox().PublishInternal(reqCtx, storeName, operations, a.universal.AppID) + span := diagUtils.SpanFromContext(reqCtx) + corID, traceState := diag.TraceIDAndStateFromSpan(span) + trs, err := a.pubsubAdapter.Outbox().PublishInternal(reqCtx, storeName, operations, a.universal.AppID, corID, traceState) if err != nil { msg := NewErrorResponse( "ERR_PUBLISH_OUTBOX", diff --git a/pkg/outbox/outbox.go b/pkg/outbox/outbox.go index 8c2665d3f32..c6498d7f657 100644 --- a/pkg/outbox/outbox.go +++ b/pkg/outbox/outbox.go @@ -24,6 +24,6 @@ import ( type Outbox interface { AddOrUpdateOutbox(stateStore v1alpha1.Component) Enabled(stateStore string) bool - PublishInternal(ctx context.Context, stateStore string, states []state.TransactionalStateOperation, source string) ([]state.TransactionalStateOperation, error) + PublishInternal(ctx context.Context, stateStore string, states []state.TransactionalStateOperation, source, traceID, traceState string) ([]state.TransactionalStateOperation, error) SubscribeToInternalTopics(ctx context.Context, appID string) error } diff --git a/pkg/runtime/pubsub/outbox.go b/pkg/runtime/pubsub/outbox.go index a8ef55f0169..81c3844dd16 100644 --- a/pkg/runtime/pubsub/outbox.go +++ b/pkg/runtime/pubsub/outbox.go @@ -129,7 +129,7 @@ func transaction() (state.TransactionalStateOperation, error) { } // PublishInternal publishes the state to an internal topic for outbox processing -func (o *outboxImpl) PublishInternal(ctx context.Context, stateStore string, operations []state.TransactionalStateOperation, source string) ([]state.TransactionalStateOperation, error) { +func (o *outboxImpl) PublishInternal(ctx context.Context, stateStore string, operations []state.TransactionalStateOperation, source, traceID, traceState string) ([]state.TransactionalStateOperation, error) { o.lock.RLock() c, ok := o.outboxStores[stateStore] o.lock.RUnlock() @@ -156,10 +156,12 @@ func (o *outboxImpl) PublishInternal(ctx context.Context, stateStore string, ope } ce := &CloudEvent{ - ID: tr.GetKey(), - Source: source, - Pubsub: c.outboxPubsub, - Data: ceData, + ID: tr.GetKey(), + Source: source, + Pubsub: c.outboxPubsub, + Data: ceData, + TraceID: traceID, + TraceState: traceState, } if sr.ContentType != nil { @@ -220,6 +222,8 @@ func (o *outboxImpl) SubscribeToInternalTopics(ctx context.Context, appID string stateKey := o.cloudEventExtractorFn(cloudEvent, contribPubsub.IDField) data := []byte(o.cloudEventExtractorFn(cloudEvent, contribPubsub.DataField)) contentType := o.cloudEventExtractorFn(cloudEvent, contribPubsub.DataContentTypeField) + traceID := o.cloudEventExtractorFn(cloudEvent, contribPubsub.TraceIDField) + traceState := o.cloudEventExtractorFn(cloudEvent, contribPubsub.TraceStateField) store, ok := o.getStateFn(stateStore) if !ok { @@ -268,6 +272,8 @@ func (o *outboxImpl) SubscribeToInternalTopics(ctx context.Context, appID string Pubsub: c.publishPubSub, Source: appID, Topic: c.publishTopic, + TraceID: traceID, + TraceState: traceState, }, nil) if err != nil { return err diff --git a/pkg/runtime/pubsub/outbox_test.go b/pkg/runtime/pubsub/outbox_test.go index f8d553fb5f8..27199e832e4 100644 --- a/pkg/runtime/pubsub/outbox_test.go +++ b/pkg/runtime/pubsub/outbox_test.go @@ -254,7 +254,7 @@ func TestPublishInternal(t *testing.T) { Key: "key", Value: "test", }, - }, "testapp") + }, "testapp", "", "") assert.NoError(t, err) }) @@ -309,7 +309,7 @@ func TestPublishInternal(t *testing.T) { Value: "test", ContentType: &contentType, }, - }, "testapp") + }, "testapp", "", "") assert.NoError(t, err) }) @@ -322,7 +322,7 @@ func TestPublishInternal(t *testing.T) { Key: "key", Value: "test", }, - }, "testapp") + }, "testapp", "", "") assert.Error(t, err) }) @@ -359,7 +359,7 @@ func TestPublishInternal(t *testing.T) { }, }) - _, err := o.PublishInternal(context.TODO(), "test", []state.TransactionalStateOperation{}, "testapp") + _, err := o.PublishInternal(context.TODO(), "test", []state.TransactionalStateOperation{}, "testapp", "", "") assert.NoError(t, err) }) @@ -401,14 +401,14 @@ func TestPublishInternal(t *testing.T) { Key: "1", Value: "hello", }, - }, "testapp") + }, "testapp", "", "") assert.Error(t, err) }) } func TestSubscribeToInternalTopics(t *testing.T) { - t.Run("correct configuration", func(t *testing.T) { + t.Run("correct configuration with trace", func(t *testing.T) { o := newTestOutbox().(*outboxImpl) o.cloudEventExtractorFn = extractCloudEventProperty @@ -432,6 +432,14 @@ func TestSubscribeToInternalTopics(t *testing.T) { close(externalCalledCh) } + ce := map[string]string{} + json.Unmarshal(pr.Data, &ce) + + traceID := ce[contribPubsub.TraceIDField] + traceState := ce[contribPubsub.TraceStateField] + assert.Equal(t, "00-ecdf5aaa79bff09b62b201442c0f3061-d2597ed7bfd029e4-01", traceID) + assert.Equal(t, "00-ecdf5aaa79bff09b62b201442c0f3061-d2597ed7bfd029e4-01", traceState) + return psMock.Publish(ctx, pr) } o.getPubsubFn = func(s string) (contribPubsub.PubSub, bool) { @@ -480,7 +488,7 @@ func TestSubscribeToInternalTopics(t *testing.T) { Key: "1", Value: "hello", }, - }, appID) + }, appID, "00-ecdf5aaa79bff09b62b201442c0f3061-d2597ed7bfd029e4-01", "00-ecdf5aaa79bff09b62b201442c0f3061-d2597ed7bfd029e4-01") if pErr != nil { errCh <- pErr @@ -559,7 +567,7 @@ func TestSubscribeToInternalTopics(t *testing.T) { Key: "1", Value: "hello", }, - }, appID) + }, appID, "", "") assert.Error(t, pErr) assert.Len(t, trs, 0) @@ -634,7 +642,7 @@ func TestSubscribeToInternalTopics(t *testing.T) { Key: "1", Value: "hello", }, - }, appID) + }, appID, "", "") if pErr != nil { errCh <- pErr @@ -759,7 +767,7 @@ func TestSubscribeToInternalTopics(t *testing.T) { Key: "1", Value: "hello", }, - }, appID) + }, appID, "", "") if pErr != nil { errCh <- pErr diff --git a/pkg/testing/pubsubadapter_mock.go b/pkg/testing/pubsubadapter_mock.go index d83f55a36cb..55d1cb58ed3 100644 --- a/pkg/testing/pubsubadapter_mock.go +++ b/pkg/testing/pubsubadapter_mock.go @@ -56,7 +56,7 @@ func (o *outboxMock) Enabled(stateStore string) bool { return false } -func (o *outboxMock) PublishInternal(ctx context.Context, stateStore string, states []state.TransactionalStateOperation, source string) ([]state.TransactionalStateOperation, error) { +func (o *outboxMock) PublishInternal(ctx context.Context, stateStore string, states []state.TransactionalStateOperation, source, traceID, traceState string) ([]state.TransactionalStateOperation, error) { return nil, nil }