Skip to content

Commit

Permalink
Provide Span.Kind for TracingInterceptor (#356)
Browse files Browse the repository at this point in the history
  • Loading branch information
northpowered authored Aug 1, 2023
1 parent 3fec58f commit 393e5c8
Showing 1 changed file with 31 additions and 7 deletions.
38 changes: 31 additions & 7 deletions temporalio/contrib/opentelemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,9 @@ def _start_as_current_span(
*,
attributes: opentelemetry.util.types.Attributes,
input: Optional[_InputWithHeaders] = None,
kind: opentelemetry.trace.SpanKind,
) -> Iterator[None]:
with self.tracer.start_as_current_span(name, attributes=attributes):
with self.tracer.start_as_current_span(name, attributes=attributes, kind=kind):
if input:
input.headers = self._context_to_headers(input.headers)
yield None
Expand Down Expand Up @@ -190,6 +191,7 @@ def _completed_workflow_span(
attributes=params.attributes,
links=links,
start_time=params.time_ns,
kind=params.kind,
)
context = opentelemetry.trace.set_span_in_context(span, context)
if params.exception:
Expand Down Expand Up @@ -218,6 +220,7 @@ async def start_workflow(
f"{prefix}:{input.workflow}",
attributes={"temporalWorkflowID": input.id},
input=input,
kind=opentelemetry.trace.SpanKind.CLIENT,
):
return await super().start_workflow(input)

Expand All @@ -226,6 +229,7 @@ async def query_workflow(self, input: temporalio.client.QueryWorkflowInput) -> A
f"QueryWorkflow:{input.query}",
attributes={"temporalWorkflowID": input.id},
input=input,
kind=opentelemetry.trace.SpanKind.CLIENT,
):
return await super().query_workflow(input)

Expand All @@ -236,6 +240,7 @@ async def signal_workflow(
f"SignalWorkflow:{input.signal}",
attributes={"temporalWorkflowID": input.id},
input=input,
kind=opentelemetry.trace.SpanKind.CLIENT,
):
return await super().signal_workflow(input)

Expand All @@ -261,6 +266,7 @@ async def execute_activity(
"temporalRunID": info.workflow_run_id,
"temporalActivityID": info.activity_id,
},
kind=opentelemetry.trace.SpanKind.SERVER,
):
return await super().execute_activity(input)

Expand All @@ -283,6 +289,7 @@ class _CompletedWorkflowSpanParams:
time_ns: int
link_context: Optional[_CarrierDict]
exception: Optional[Exception]
kind: opentelemetry.trace.SpanKind


_interceptor_context_key = opentelemetry.context.create_key(
Expand Down Expand Up @@ -334,8 +341,10 @@ async def execute_workflow(
:py:meth:`temporalio.worker.WorkflowInboundInterceptor.execute_workflow`.
"""
with self._top_level_workflow_context(success_is_complete=True):
# Entrypoint of workflow should be `server` in OTel
self._completed_span(
f"RunWorkflow:{temporalio.workflow.info().workflow_type}"
f"RunWorkflow:{temporalio.workflow.info().workflow_type}",
kind=opentelemetry.trace.SpanKind.SERVER,
)
return await super().execute_workflow(input)

Expand All @@ -355,6 +364,7 @@ async def handle_signal(self, input: temporalio.worker.HandleSignalInput) -> Non
self._completed_span(
f"HandleSignal:{input.signal}",
link_context_carrier=link_context_carrier,
kind=opentelemetry.trace.SpanKind.SERVER,
)
await super().handle_signal(input)

Expand Down Expand Up @@ -388,6 +398,7 @@ async def handle_query(self, input: temporalio.worker.HandleQueryInput) -> Any:
link_context_carrier=link_context_carrier,
# Create even on replay for queries
new_span_even_on_replay=True,
kind=opentelemetry.trace.SpanKind.SERVER,
)
return await super().handle_query(input)
finally:
Expand Down Expand Up @@ -437,6 +448,7 @@ def _top_level_workflow_context(
self._completed_span(
f"CompleteWorkflow:{temporalio.workflow.info().workflow_type}",
exception=exception,
kind=opentelemetry.trace.SpanKind.INTERNAL,
)
opentelemetry.context.detach(token)

Expand Down Expand Up @@ -468,6 +480,7 @@ def _completed_span(
new_span_even_on_replay: bool = False,
additional_attributes: opentelemetry.util.types.Attributes = None,
exception: Optional[Exception] = None,
kind: opentelemetry.trace.SpanKind = opentelemetry.trace.SpanKind.INTERNAL,
) -> None:
# If there is no span on the context, we do not create a span
if opentelemetry.trace.get_current_span() is opentelemetry.trace.INVALID_SPAN:
Expand Down Expand Up @@ -499,6 +512,7 @@ def _completed_span(
time_ns=temporalio.workflow.time_ns(),
link_context=link_context_carrier,
exception=exception,
kind=kind,
)
)

Expand Down Expand Up @@ -535,7 +549,9 @@ async def signal_child_workflow(
) -> None:
# Create new span and put on outbound input
self.root._completed_span(
f"SignalChildWorkflow:{input.signal}", add_to_outbound=input
f"SignalChildWorkflow:{input.signal}",
add_to_outbound=input,
kind=opentelemetry.trace.SpanKind.SERVER,
)
await super().signal_child_workflow(input)

Expand All @@ -544,7 +560,9 @@ async def signal_external_workflow(
) -> None:
# Create new span and put on outbound input
self.root._completed_span(
f"SignalExternalWorkflow:{input.signal}", add_to_outbound=input
f"SignalExternalWorkflow:{input.signal}",
add_to_outbound=input,
kind=opentelemetry.trace.SpanKind.CLIENT,
)
await super().signal_external_workflow(input)

Expand All @@ -553,7 +571,9 @@ def start_activity(
) -> temporalio.workflow.ActivityHandle:
# Create new span and put on outbound input
self.root._completed_span(
f"StartActivity:{input.activity}", add_to_outbound=input
f"StartActivity:{input.activity}",
add_to_outbound=input,
kind=opentelemetry.trace.SpanKind.CLIENT,
)
return super().start_activity(input)

Expand All @@ -562,7 +582,9 @@ async def start_child_workflow(
) -> temporalio.workflow.ChildWorkflowHandle:
# Create new span and put on outbound input
self.root._completed_span(
f"StartChildWorkflow:{input.workflow}", add_to_outbound=input
f"StartChildWorkflow:{input.workflow}",
add_to_outbound=input,
kind=opentelemetry.trace.SpanKind.CLIENT,
)
return await super().start_child_workflow(input)

Expand All @@ -571,7 +593,9 @@ def start_local_activity(
) -> temporalio.workflow.ActivityHandle:
# Create new span and put on outbound input
self.root._completed_span(
f"StartActivity:{input.activity}", add_to_outbound=input
f"StartActivity:{input.activity}",
add_to_outbound=input,
kind=opentelemetry.trace.SpanKind.CLIENT,
)
return super().start_local_activity(input)

Expand Down

0 comments on commit 393e5c8

Please sign in to comment.