Skip to content

Commit

Permalink
OTEL annotates gRPC responses (temporalio#7165)
Browse files Browse the repository at this point in the history
## What changed?
<!-- Describe what has changed in this PR -->

Added support for our OTEL interceptor to parse tags from the gRPC
response (not just the request).

## Why?
<!-- Tell your future self why have you made these changes -->

Without annotating the responses, traces from calls like
`PollWorkflowTaskQueue` cannot be queried for since the ID is only on
the response but not the request.

## How did you test it?
<!-- How have you verified this change? Tested locally? Added a unit
test? Checked in staging env? -->

Verified in Tempo:

<img width="1573" alt="image"
src="https://github.com/user-attachments/assets/96a1f1b3-99f5-48f4-b603-b0b66f158538"
/>


## Potential risks
<!-- Assuming the worst case, what can be broken when deploying this
change to production? -->

There is one other caller of `Extract`, but they are only feeding in the
request payload. So no behavior change is expected.

## Documentation
<!-- Have you made sure this change doesn't falsify anything currently
stated in `docs/`? If significant
new behavior is added, have you described that in `docs/`? -->

## Is hotfix candidate?
<!-- Is this PR a hotfix candidate or does it require a notification to
be sent to the broader community? (Yes/No) -->
  • Loading branch information
stephanos authored Jan 28, 2025
1 parent 519fb71 commit c0a7f99
Show file tree
Hide file tree
Showing 7 changed files with 556 additions and 71 deletions.
81 changes: 43 additions & 38 deletions cmd/tools/genrpcserverinterceptors/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,10 @@ import (
"go.temporal.io/server/api/matchingservice/v1"
)

const maxRequestDepth = 5
const maxMessageDepth = 5

type (
requestData struct {
messageData struct {
Type string

WorkflowIdGetter string
Expand All @@ -57,7 +57,7 @@ type (
grpcServerData struct {
Server string
Imports []string
Requests []requestData
Messages []messageData
}
)

Expand Down Expand Up @@ -125,9 +125,9 @@ import (
"go.temporal.io/server/common/log/tag"
)
func (wt *WorkflowTags) extractFrom{{.Server}}Request(req any) []tag.Tag {
switch r := req.(type) {
{{- range .Requests}}
func (wt *WorkflowTags) extractFrom{{.Server}}Message(message any) []tag.Tag {
switch r := message.(type) {
{{- range .Messages}}
case {{.Type}}:
{{- if or .TaskTokenGetter .WorkflowIdGetter .RunIdGetter}}
{{- if .TaskTokenGetter}}
Expand Down Expand Up @@ -162,55 +162,60 @@ func writeGrpcServerData(w io.Writer, grpcServerT reflect.Type, tmpl string) {
if rpcT.NumIn() < 2 {
continue
}
requestT := rpcT.In(1) // Assume request is always the second parameter.

rd := workflowTagGetters(requestT, 0)
rd.Type = requestT.String()
sd.Requests = append(sd.Requests, rd)
requestT := rpcT.In(1) // Assume request is always the second parameter.
requestMd := workflowTagGetters(requestT, 0)
requestMd.Type = requestT.String()
sd.Messages = append(sd.Messages, requestMd)

respT := rpcT.Out(0) // Assume response is always the first parameter.
responseMd := workflowTagGetters(respT, 0)
responseMd.Type = respT.String()
sd.Messages = append(sd.Messages, responseMd)
}

fatalIfErr(template.Must(template.New("code").Parse(tmpl)).Execute(w, sd))
}

//nolint:revive // cognitive complexity 37 (> max enabled 25)
func workflowTagGetters(requestT reflect.Type, depth int) requestData {
rd := requestData{}
if depth > maxRequestDepth {
return rd
func workflowTagGetters(messageType reflect.Type, depth int) messageData {
pd := messageData{}
if depth > maxMessageDepth {
return pd
}

switch {
case requestT.AssignableTo(executionGetterT):
rd.WorkflowIdGetter = "GetExecution().GetWorkflowId()"
rd.RunIdGetter = "GetExecution().GetRunId()"
case requestT.AssignableTo(workflowExecutionGetterT):
rd.WorkflowIdGetter = "GetWorkflowExecution().GetWorkflowId()"
rd.RunIdGetter = "GetWorkflowExecution().GetRunId()"
case requestT.AssignableTo(taskTokenGetterT):
case messageType.AssignableTo(executionGetterT):
pd.WorkflowIdGetter = "GetExecution().GetWorkflowId()"
pd.RunIdGetter = "GetExecution().GetRunId()"
case messageType.AssignableTo(workflowExecutionGetterT):
pd.WorkflowIdGetter = "GetWorkflowExecution().GetWorkflowId()"
pd.RunIdGetter = "GetWorkflowExecution().GetRunId()"
case messageType.AssignableTo(taskTokenGetterT):
for _, ert := range excludeTaskTokenTypes {
if requestT.AssignableTo(ert) {
return rd
if messageType.AssignableTo(ert) {
return pd
}
}
rd.TaskTokenGetter = "GetTaskToken()"
pd.TaskTokenGetter = "GetTaskToken()"
default:
// Might be one of these, both, or neither.
if requestT.AssignableTo(workflowIdGetterT) {
rd.WorkflowIdGetter = "GetWorkflowId()"
if messageType.AssignableTo(workflowIdGetterT) {
pd.WorkflowIdGetter = "GetWorkflowId()"
}
if requestT.AssignableTo(runIdGetterT) {
rd.RunIdGetter = "GetRunId()"
if messageType.AssignableTo(runIdGetterT) {
pd.RunIdGetter = "GetRunId()"
}
}

// Iterates over fields in order they defined in proto file, not proto index.
// Order is important because the first match wins.
for fieldNum := 0; fieldNum < requestT.Elem().NumField(); fieldNum++ {
if (rd.WorkflowIdGetter != "" && rd.RunIdGetter != "") || rd.TaskTokenGetter != "" {
for fieldNum := 0; fieldNum < messageType.Elem().NumField(); fieldNum++ {
if (pd.WorkflowIdGetter != "" && pd.RunIdGetter != "") || pd.TaskTokenGetter != "" {
break
}

nestedRequest := requestT.Elem().Field(fieldNum)
nestedRequest := messageType.Elem().Field(fieldNum)
if nestedRequest.Type.Kind() != reflect.Ptr {
continue
}
Expand All @@ -223,17 +228,17 @@ func workflowTagGetters(requestT reflect.Type, depth int) requestData {

nestedRd := workflowTagGetters(nestedRequest.Type, depth+1)
// First match wins: if getter is already set, it won't be overwritten.
if rd.WorkflowIdGetter == "" && nestedRd.WorkflowIdGetter != "" {
rd.WorkflowIdGetter = fmt.Sprintf("Get%s().%s", nestedRequest.Name, nestedRd.WorkflowIdGetter)
if pd.WorkflowIdGetter == "" && nestedRd.WorkflowIdGetter != "" {
pd.WorkflowIdGetter = fmt.Sprintf("Get%s().%s", nestedRequest.Name, nestedRd.WorkflowIdGetter)
}
if rd.RunIdGetter == "" && nestedRd.RunIdGetter != "" {
rd.RunIdGetter = fmt.Sprintf("Get%s().%s", nestedRequest.Name, nestedRd.RunIdGetter)
if pd.RunIdGetter == "" && nestedRd.RunIdGetter != "" {
pd.RunIdGetter = fmt.Sprintf("Get%s().%s", nestedRequest.Name, nestedRd.RunIdGetter)
}
if rd.TaskTokenGetter == "" && nestedRd.TaskTokenGetter != "" {
rd.TaskTokenGetter = fmt.Sprintf("Get%s().%s", nestedRequest.Name, nestedRd.TaskTokenGetter)
if pd.TaskTokenGetter == "" && nestedRd.TaskTokenGetter != "" {
pd.TaskTokenGetter = fmt.Sprintf("Get%s().%s", nestedRequest.Name, nestedRd.TaskTokenGetter)
}
}
return rd
return pd
}

func callWithFile(generator func(io.Writer, reflect.Type), server reflect.Type, outPath string, licenseText string) {
Expand Down
88 changes: 86 additions & 2 deletions common/rpc/interceptor/logtags/admin_service_server_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit c0a7f99

Please sign in to comment.