Skip to content

Commit

Permalink
[serverless] Extract EventBridge from SNS/SQS Payloads and Create Eve…
Browse files Browse the repository at this point in the history
…ntBridge Spans (#29551)
  • Loading branch information
nhulston authored and grantseltzer committed Oct 4, 2024
1 parent cad2451 commit 5e02463
Show file tree
Hide file tree
Showing 7 changed files with 203 additions and 13 deletions.
45 changes: 36 additions & 9 deletions pkg/serverless/invocationlifecycle/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,14 @@ func (lp *LifecycleProcessor) initFromSNSEvent(event events.SNSEvent) {
lp.requestHandler.event = event
lp.addTag(tagFunctionTriggerEventSource, sns)
lp.addTag(tagFunctionTriggerEventSourceArn, trigger.ExtractSNSEventArn(event))

// Check for EventBridge event wrapped by the SNS message
var eventBridgeEvent events.EventBridgeEvent
if err := json.Unmarshal([]byte(event.Records[0].SNS.Message), &eventBridgeEvent); err == nil {
if len(eventBridgeEvent.Detail.TraceContext) > 0 {
lp.createWrappedEventBridgeSpan(eventBridgeEvent)
}
}
}

func (lp *LifecycleProcessor) initFromSQSEvent(event events.SQSEvent) {
Expand All @@ -154,19 +162,26 @@ func (lp *LifecycleProcessor) initFromSQSEvent(event events.SQSEvent) {
lp.addTag(tagFunctionTriggerEventSource, sqs)
lp.addTag(tagFunctionTriggerEventSourceArn, trigger.ExtractSQSEventARN(event))

// test for SNS
// Check for SNS event wrapped by the SQS body
var snsEntity events.SNSEntity
if err := json.Unmarshal([]byte(event.Records[0].Body), &snsEntity); err != nil {
return
if err := json.Unmarshal([]byte(event.Records[0].Body), &snsEntity); err == nil {
if strings.ToLower(snsEntity.Type) == "notification" && snsEntity.TopicArn != "" {
lp.createWrappedSNSSpan(snsEntity)
return
}
}

isSNS := strings.ToLower(snsEntity.Type) == "notification" && snsEntity.TopicArn != ""

if !isSNS {
return
// Check for EventBridge event wrapped by the SQS body
var eventBridgeEvent events.EventBridgeEvent
if err := json.Unmarshal([]byte(event.Records[0].Body), &eventBridgeEvent); err == nil {
if len(eventBridgeEvent.Detail.TraceContext) > 0 {
lp.createWrappedEventBridgeSpan(eventBridgeEvent)
}
}
}

// sns span
// createWrappedSNSSpan creates an inferred span for SNS that is wrapped by SQS.
func (lp *LifecycleProcessor) createWrappedSNSSpan(snsEntity events.SNSEntity) {
lp.requestHandler.inferredSpans[1] = &inferredspan.InferredSpan{
CurrentInvocationStartTime: time.Unix(lp.requestHandler.inferredSpans[0].Span.Start, 0),
Span: &pb.Span{
Expand All @@ -179,9 +194,21 @@ func (lp *LifecycleProcessor) initFromSQSEvent(event events.SQSEvent) {
snsEvent.Records[0].SNS = snsEntity

lp.requestHandler.inferredSpans[1].EnrichInferredSpanWithSNSEvent(snsEvent)

lp.requestHandler.inferredSpans[1].Span.Duration = lp.GetInferredSpan().Span.Start - lp.requestHandler.inferredSpans[1].Span.Start
}

// createWrappedEventBridgeSpan creates an inferred span for EventBridge
// that is wrapped by SQS or SNS.
func (lp *LifecycleProcessor) createWrappedEventBridgeSpan(eventBridgeEvent events.EventBridgeEvent) {
lp.requestHandler.inferredSpans[1] = &inferredspan.InferredSpan{
CurrentInvocationStartTime: time.Unix(lp.requestHandler.inferredSpans[0].Span.Start, 0),
Span: &pb.Span{
SpanID: inferredspan.GenerateSpanId(),
},
}

lp.requestHandler.inferredSpans[1].EnrichInferredSpanWithEventBridgeEvent(eventBridgeEvent)
lp.requestHandler.inferredSpans[1].Span.Duration = lp.GetInferredSpan().Span.Start - lp.requestHandler.inferredSpans[1].Span.Start
}

func (lp *LifecycleProcessor) initFromLambdaFunctionURLEvent(event events.LambdaFunctionURLRequest, region string, accountID string, functionName string) {
Expand Down
82 changes: 82 additions & 0 deletions pkg/serverless/invocationlifecycle/lifecycle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1234,6 +1234,88 @@ func TestTriggerTypesLifecycleEventForEventBridge(t *testing.T) {
}, testProcessor.GetTags())
}

func TestTriggerTypesLifecycleEventForEventBridgeSQS(t *testing.T) {
startInvocationTime := time.Now()
duration := 1 * time.Second
endInvocationTime := startInvocationTime.Add(duration)

var tracePayload *api.Payload

startDetails := &InvocationStartDetails{
InvokeEventRawPayload: getEventFromFile("eventbridgesqs.json"),
InvokedFunctionARN: "arn:aws:lambda:us-east-1:123456789012:function:my-function",
StartTime: startInvocationTime,
}

testProcessor := &LifecycleProcessor{
DetectLambdaLibrary: func() bool { return false },
ProcessTrace: func(payload *api.Payload) { tracePayload = payload },
InferredSpansEnabled: true,
requestHandler: &RequestHandler{
executionInfo: &ExecutionStartInfo{
TraceID: 123,
SamplingPriority: 1,
},
},
}

testProcessor.OnInvokeStart(startDetails)
testProcessor.OnInvokeEnd(&InvocationEndDetails{
RequestID: "test-request-id",
EndTime: endInvocationTime,
IsError: false,
})

spans := tracePayload.TracerPayload.Chunks[0].Spans
assert.Equal(t, 3, len(spans))
eventBridgeSpan, sqsSpan := spans[1], spans[2]
assert.Equal(t, "eventbridge", eventBridgeSpan.Service)
assert.Equal(t, "test-bus", eventBridgeSpan.Resource)
assert.Equal(t, "sqs", sqsSpan.Service)
assert.Equal(t, "test-queue", sqsSpan.Resource)
}

func TestTriggerTypesLifecycleEventForEventBridgeSNS(t *testing.T) {
startInvocationTime := time.Now()
duration := 1 * time.Second
endInvocationTime := startInvocationTime.Add(duration)

var tracePayload *api.Payload

startDetails := &InvocationStartDetails{
InvokeEventRawPayload: getEventFromFile("eventbridgesns.json"),
InvokedFunctionARN: "arn:aws:lambda:us-east-1:123456789012:function:my-function",
StartTime: startInvocationTime,
}

testProcessor := &LifecycleProcessor{
DetectLambdaLibrary: func() bool { return false },
ProcessTrace: func(payload *api.Payload) { tracePayload = payload },
InferredSpansEnabled: true,
requestHandler: &RequestHandler{
executionInfo: &ExecutionStartInfo{
TraceID: 123,
SamplingPriority: 1,
},
},
}

testProcessor.OnInvokeStart(startDetails)
testProcessor.OnInvokeEnd(&InvocationEndDetails{
RequestID: "test-request-id",
EndTime: endInvocationTime,
IsError: false,
})

spans := tracePayload.TracerPayload.Chunks[0].Spans
assert.Equal(t, 3, len(spans))
eventBridgeSpan, snsSpan := spans[1], spans[2]
assert.Equal(t, "eventbridge", eventBridgeSpan.Service)
assert.Equal(t, "test-bus", eventBridgeSpan.Resource)
assert.Equal(t, "sns", snsSpan.Service)
assert.Equal(t, "test-notifier", snsSpan.Resource)
}

// Helper function for reading test file
func getEventFromFile(filename string) []byte {
event, err := os.ReadFile("../trace/testdata/event_samples/" + filename)
Expand Down
28 changes: 24 additions & 4 deletions pkg/serverless/trace/propagation/carriers.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ import (
)

const (
awsTraceHeader = "AWSTraceHeader"
datadogSQSHeader = "_datadog"
awsTraceHeader = "AWSTraceHeader"
datadogTraceHeader = "_datadog"

rootPrefix = "Root="
parentPrefix = "Parent="
Expand Down Expand Up @@ -110,9 +110,20 @@ func extractTraceContextfromAWSTraceHeader(value string) (*TraceContext, error)
// sqsMessageCarrier returns the tracer.TextMapReader used to extract trace
// context from the events.SQSMessage type.
func sqsMessageCarrier(event events.SQSMessage) (tracer.TextMapReader, error) {
if attr, ok := event.MessageAttributes[datadogSQSHeader]; ok {
// Check if this is a normal SQS message
if attr, ok := event.MessageAttributes[datadogTraceHeader]; ok {
return sqsMessageAttrCarrier(attr)
}

// Check if this is an EventBridge event sent through SQS
var eventBridgeEvent events.EventBridgeEvent
if err := json.Unmarshal([]byte(event.Body), &eventBridgeEvent); err == nil {
if len(eventBridgeEvent.Detail.TraceContext) > 0 {
return eventBridgeCarrier(eventBridgeEvent)
}
}

// Check if this is an SNS event sent through SQS
return snsSqsMessageCarrier(event)
}

Expand Down Expand Up @@ -164,7 +175,16 @@ func snsSqsMessageCarrier(event events.SQSMessage) (tracer.TextMapReader, error)
// snsEntityCarrier returns the tracer.TextMapReader used to extract trace
// context from the attributes of an events.SNSEntity type.
func snsEntityCarrier(event events.SNSEntity) (tracer.TextMapReader, error) {
msgAttrs, ok := event.MessageAttributes[datadogSQSHeader]
// Check if this is an EventBridge event sent through SNS
var eventBridgeEvent events.EventBridgeEvent
if err := json.Unmarshal([]byte(event.Message), &eventBridgeEvent); err == nil {
if len(eventBridgeEvent.Detail.TraceContext) > 0 {
return eventBridgeCarrier(eventBridgeEvent)
}
}

// If not, check if this is a regular SNS message with Datadog trace information
msgAttrs, ok := event.MessageAttributes[datadogTraceHeader]
if !ok {
return nil, errorNoDDContextFound
}
Expand Down
24 changes: 24 additions & 0 deletions pkg/serverless/trace/propagation/carriers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,18 @@ func TestSnsEntityCarrier(t *testing.T) {
expMap map[string]string
expErr string
}{
{
name: "eventbridge-through-sns",
event: events.SNSEntity{
Message: `{"detail":{"_datadog":{"x-datadog-trace-id":"123456789","x-datadog-parent-id":"987654321","x-datadog-sampling-priority":"1"}}}`,
},
expMap: map[string]string{
"x-datadog-trace-id": "123456789",
"x-datadog-parent-id": "987654321",
"x-datadog-sampling-priority": "1",
},
expErr: "",
},
{
name: "no-msg-attrs",
event: events.SNSEntity{},
Expand Down Expand Up @@ -689,6 +701,18 @@ func TestSqsMessageCarrier(t *testing.T) {
expMap: headersMapAll,
expErr: nil,
},
{
name: "eventbridge-through-sqs",
event: events.SQSMessage{
Body: `{"detail":{"_datadog":{"x-datadog-trace-id":"123456789","x-datadog-parent-id":"987654321","x-datadog-sampling-priority":"1"}}}`,
},
expMap: map[string]string{
"x-datadog-trace-id": "123456789",
"x-datadog-parent-id": "987654321",
"x-datadog-sampling-priority": "1",
},
expErr: nil,
},
}

for _, tc := range testcases {
Expand Down
17 changes: 17 additions & 0 deletions pkg/serverless/trace/testdata/event_samples/eventbridgesns.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
{
"Records":[
{
"SNS":{
"MessageID":"12345678-90abc-def-1234-567890abcdef",
"Type":"Notification",
"TopicArn":"arn:aws:sns:us-east-1:123456789012:test-notifier",
"MessageAttributes":{

},
"Timestamp":"2024-09-16T19:44:01.713Z",
"Subject":"",
"Message":"{\"version\":\"0\",\"id\":\"12345678-90abc-def-1234-567890abcdef\",\"detail-type\":\"TestDetail\",\"source\":\"com.test.source\",\"account\":\"12345667890\",\"time\":\"2024-09-16T19:44:01Z\",\"region\":\"us-east-1\",\"resources\":[],\"detail\":{\"foo\":\"bar\",\"_datadog\":{\"x-datadog-trace-id\":\"12345\",\"x-datadog-parent-id\":\"67890\",\"x-datadog-sampling-priority\":\"1\",\"x-datadog-start-time\":\"1726515840997\",\"x-datadog-resource-name\":\"test-bus\",\"x-datadog-tags\":\"_dd.p.dm=-1,_dd.p.tid=123567890\"}}}"
}
}
]
}
19 changes: 19 additions & 0 deletions pkg/serverless/trace/testdata/event_samples/eventbridgesqs.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
{
"Records":[
{
"ReceiptHandle":"AQEB9RCmPUwKay0Fttcd7JEN1XPUwBq4ixSPWCQ5ne4x2r4SOQmyBy45h08wPSLe3ZXXXqjpAawK0J91O6wu/DsBHFZnYL2CIBbYhnZsYkwiO8XwsDQrf1ZSTTFH7eGwHuVQ2BsX7O+a9m+5THfXl6e7kBhfNTkATxstbr2iVRObgkvmiI9DdoBCsWBHqn8Z48j28ExS4Ov3i1olku6DcTnq6WxBGPMIYz3qX2LEnDFGNwnL6Ldzi/R4C7BJ8qMvsQeXFFAfGuWNjQsO6PKDhKo1eAEzozlcQd5sDtflIeMsNhfi3LusSPudncQ+zS9qUOWKgezKZqVBLbea4Mt1XIpe/e4WL2DVFfU5IE4cjsxrGEF9v2hcGelCrRexEqy+BVi0NLdwyO6R5L1GfU/1NJUVEE9o8wEqtC+0lrwG8xC6eS0=",
"Body":"{\"version\":\"0\",\"id\":\"103310e6-f267-750d-8cdd-6bee88ad2c9c\",\"detail-type\":\"TestDetail\",\"source\":\"com.test.source\",\"account\":\"12345\",\"time\":\"2024-09-16T19:00:27Z\",\"region\":\"us-east-1\",\"resources\":[],\"detail\":{\"foo\":\"bar\",\"_datadog\":{\"x-datadog-trace-id\":\"12345\",\"x-datadog-parent-id\":\"67890\",\"x-datadog-sampling-priority\":\"1\",\"x-datadog-start-time\":\"1726513226645\",\"x-datadog-resource-name\":\"test-bus\",\"x-datadog-tags\":\"_dd.p.dm=-1,_dd.p.tid=1234567800000000\"}}}",
"Attributes":{
"ApproximateReceiveCount":"1",
"SentTimestamp":"1726513227336",
"SenderId":"AIDAIOA2GYWSHW4E2VXIO",
"ApproximateFirstReceiveTimestamp":"1726513227350"
},
"MessageAttributes":{

},
"eventSource": "aws:sqs",
"EventSourceARN":"arn:aws:sqs:us-east-1:123456789012:test-queue"
}
]
}
1 change: 1 addition & 0 deletions pkg/serverless/trigger/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,7 @@ type SNSEntity struct {
MessageAttributes map[string]interface{}
Timestamp time.Time
Subject string
Message string
}

// SQSEvent mirrors events.SQSEvent type, removing unused fields.
Expand Down

0 comments on commit 5e02463

Please sign in to comment.