Skip to content

Interceptor header and OpenTelemetry support #63

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ jobs:
with:
go-version: "1.18"
- run: python -m pip install --upgrade wheel poetry poethepoet
- run: poetry install --no-root
- run: poetry install --no-root -E opentelemetry
- run: poe lint
- run: poe build-develop
- run: poe test -s -o log_cli_level=DEBUG
Expand Down Expand Up @@ -117,7 +117,7 @@ jobs:

# Prepare
- run: python -m pip install --upgrade wheel poetry poethepoet
- run: poetry install --no-root
- run: poetry install --no-root -E opentelemetry

# Add the source dist only for Linux x64 for now
- if: ${{ matrix.package-suffix == 'linux-amd64' }}
Expand Down Expand Up @@ -164,7 +164,7 @@ jobs:

# Prepare
- run: python -m pip install --upgrade wheel poetry poethepoet
- run: poetry install --no-root
- run: poetry install --no-root -E opentelemetry

# Build and fix the wheel
- run: poetry run cibuildwheel --output-dir dist --arch aarch64
Expand All @@ -175,4 +175,3 @@ jobs:
with:
name: packages-linux-aarch64
path: dist

12 changes: 12 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -572,6 +572,18 @@ cancellation of all outstanding activities.
The `shutdown()` invocation will wait on all activities to complete, so if a long-running activity does not at least
respect cancellation, the shutdown may never complete.

### OpenTelemetry Support

OpenTelemetry support requires the optional `opentelemetry` dependencies which are part of the `opentelemetry` extra.
When using `pip`, running

pip install temporalio[opentelemetry]

will install needed dependencies. Then the `temporalio.contrib.opentelemetry.TracingInterceptor` can be created and set
as an interceptor on the `interceptors` argument of `Client.connect`. When set, spans will be created for all client
calls and for all activity and workflow invocations on the worker, spans will be created and properly serialized through
the server to give one proper trace for a workflow execution.

## Development

The Python SDK is built to work with Python 3.7 and newer. It is built using
Expand Down
297 changes: 218 additions & 79 deletions poetry.lock

Large diffs are not rendered by default.

13 changes: 11 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ script = "build.py"
[tool.poetry.dependencies]
dacite = "^1.6.0"
grpcio = "^1.47.0"
opentelemetry-api = { version = "^1.11.1", optional = true }
opentelemetry-sdk = { version = "^1.11.1", optional = true }
protobuf = "^3.20.1"
python = "^3.7"
types-protobuf = "^3.19.21"
Expand All @@ -51,6 +53,9 @@ setuptools-rust = "^1.3.0"
toml = "^0.10.2"
twine = "^4.0.1"

[tool.poetry.extras]
opentelemetry = ["opentelemetry-api", "opentelemetry-sdk"]

[tool.poe.tasks]
build-develop = ["build-bridge-develop"]
build-bridge-develop = "python scripts/setup_bridge.py develop"
Expand All @@ -67,7 +72,7 @@ lint = [
# TODO(cretz): Why does pydocstyle complain about @overload missing docs after
# https://github.com/PyCQA/pydocstyle/pull/511?
lint-docs = "pydocstyle --ignore-decorators=overload"
lint-types = "mypy ."
lint-types = "mypy --namespace-packages ."
test = "pytest"

# Install local, run single pytest with env var, uninstall local
Expand Down Expand Up @@ -133,7 +138,11 @@ add_ignore = [
add-package = ["temporalio"]
docformat = "google"
html-output = "build/apidocs"
intersphinx = ["https://docs.python.org/3/objects.inv", "https://googleapis.dev/python/protobuf/latest/objects.inv"]
intersphinx = [
"https://docs.python.org/3/objects.inv",
"https://googleapis.dev/python/protobuf/latest/objects.inv",
"https://opentelemetry-python.readthedocs.io/en/latest/objects.inv",
]
privacy = [
"PRIVATE:temporalio.bridge",
"HIDDEN:temporalio.worker.activity",
Expand Down
36 changes: 14 additions & 22 deletions temporalio/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,6 @@ async def start_workflow(
cron_schedule: str = "",
memo: Optional[Mapping[str, Any]] = None,
search_attributes: Optional[temporalio.common.SearchAttributes] = None,
header: Optional[Mapping[str, Any]] = None,
start_signal: Optional[str] = None,
start_signal_args: Iterable[Any] = [],
) -> WorkflowHandle[WorkflowClass, WorkflowReturnType]:
Expand All @@ -235,7 +234,6 @@ async def start_workflow(
cron_schedule: str = "",
memo: Optional[Mapping[str, Any]] = None,
search_attributes: Optional[temporalio.common.SearchAttributes] = None,
header: Optional[Mapping[str, Any]] = None,
start_signal: Optional[str] = None,
start_signal_args: Iterable[Any] = [],
) -> WorkflowHandle[WorkflowClass, WorkflowReturnType]:
Expand All @@ -260,7 +258,6 @@ async def start_workflow(
cron_schedule: str = "",
memo: Optional[Mapping[str, Any]] = None,
search_attributes: Optional[temporalio.common.SearchAttributes] = None,
header: Optional[Mapping[str, Any]] = None,
start_signal: Optional[str] = None,
start_signal_args: Iterable[Any] = [],
) -> WorkflowHandle[WorkflowClass, WorkflowReturnType]:
Expand All @@ -284,7 +281,6 @@ async def start_workflow(
cron_schedule: str = "",
memo: Optional[Mapping[str, Any]] = None,
search_attributes: Optional[temporalio.common.SearchAttributes] = None,
header: Optional[Mapping[str, Any]] = None,
start_signal: Optional[str] = None,
start_signal_args: Iterable[Any] = [],
) -> WorkflowHandle[Any, Any]:
Expand All @@ -306,7 +302,6 @@ async def start_workflow(
cron_schedule: str = "",
memo: Optional[Mapping[str, Any]] = None,
search_attributes: Optional[temporalio.common.SearchAttributes] = None,
header: Optional[Mapping[str, Any]] = None,
start_signal: Optional[str] = None,
start_signal_args: Iterable[Any] = [],
) -> WorkflowHandle[Any, Any]:
Expand All @@ -328,7 +323,6 @@ async def start_workflow(
cron_schedule: See https://docs.temporal.io/docs/content/what-is-a-temporal-cron-job/
memo: Memo for the workflow.
search_attributes: Search attributes for the workflow.
header: Header for the workflow.
start_signal: If present, this signal is sent as signal-with-start
instead of traditional workflow start.
start_signal_args: Arguments for start_signal if start_signal
Expand Down Expand Up @@ -366,7 +360,7 @@ async def start_workflow(
cron_schedule=cron_schedule,
memo=memo,
search_attributes=search_attributes,
header=header,
headers=None,
start_signal=start_signal,
start_signal_args=start_signal_args,
ret_type=ret_type,
Expand All @@ -389,7 +383,6 @@ async def execute_workflow(
cron_schedule: str = "",
memo: Optional[Mapping[str, Any]] = None,
search_attributes: Optional[temporalio.common.SearchAttributes] = None,
header: Optional[Mapping[str, Any]] = None,
start_signal: Optional[str] = None,
start_signal_args: Iterable[Any] = [],
) -> WorkflowReturnType:
Expand All @@ -414,7 +407,6 @@ async def execute_workflow(
cron_schedule: str = "",
memo: Optional[Mapping[str, Any]] = None,
search_attributes: Optional[temporalio.common.SearchAttributes] = None,
header: Optional[Mapping[str, Any]] = None,
start_signal: Optional[str] = None,
start_signal_args: Iterable[Any] = [],
) -> WorkflowReturnType:
Expand All @@ -439,7 +431,6 @@ async def execute_workflow(
cron_schedule: str = "",
memo: Optional[Mapping[str, Any]] = None,
search_attributes: Optional[temporalio.common.SearchAttributes] = None,
header: Optional[Mapping[str, Any]] = None,
start_signal: Optional[str] = None,
start_signal_args: Iterable[Any] = [],
) -> WorkflowReturnType:
Expand All @@ -463,7 +454,6 @@ async def execute_workflow(
cron_schedule: str = "",
memo: Optional[Mapping[str, Any]] = None,
search_attributes: Optional[temporalio.common.SearchAttributes] = None,
header: Optional[Mapping[str, Any]] = None,
start_signal: Optional[str] = None,
start_signal_args: Iterable[Any] = [],
) -> Any:
Expand All @@ -485,7 +475,6 @@ async def execute_workflow(
cron_schedule: str = "",
memo: Optional[Mapping[str, Any]] = None,
search_attributes: Optional[temporalio.common.SearchAttributes] = None,
header: Optional[Mapping[str, Any]] = None,
start_signal: Optional[str] = None,
start_signal_args: Iterable[Any] = [],
) -> Any:
Expand All @@ -511,7 +500,6 @@ async def execute_workflow(
cron_schedule=cron_schedule,
memo=memo,
search_attributes=search_attributes,
header=header,
start_signal=start_signal,
start_signal_args=start_signal_args,
)
Expand Down Expand Up @@ -997,6 +985,7 @@ async def query(
args=temporalio.common._arg_or_args(arg, args),
reject_condition=reject_condition
or self._client._config["default_workflow_query_reject_condition"],
headers=None,
ret_type=ret_type,
)
)
Expand Down Expand Up @@ -1075,6 +1064,7 @@ async def signal(
signal
),
args=temporalio.common._arg_or_args(arg, args),
headers=None,
)
)

Expand Down Expand Up @@ -1298,7 +1288,7 @@ class StartWorkflowInput:
cron_schedule: str
memo: Optional[Mapping[str, Any]]
search_attributes: Optional[temporalio.common.SearchAttributes]
header: Optional[Mapping[str, Any]]
headers: Optional[Mapping[str, temporalio.api.common.v1.Payload]]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for changing this to headers

start_signal: Optional[str]
start_signal_args: Iterable[Any]
# Type may be absent
Expand Down Expand Up @@ -1331,6 +1321,7 @@ class QueryWorkflowInput:
query: str
args: Iterable[Any]
reject_condition: Optional[temporalio.common.QueryRejectCondition]
headers: Optional[Mapping[str, temporalio.api.common.v1.Payload]]
# Type may be absent
ret_type: Optional[Type]

Expand All @@ -1343,6 +1334,7 @@ class SignalWorkflowInput:
run_id: Optional[str]
signal: str
args: Iterable[Any]
headers: Optional[Mapping[str, temporalio.api.common.v1.Payload]]


@dataclass
Expand Down Expand Up @@ -1439,6 +1431,7 @@ async def describe_workflow(
self, input: DescribeWorkflowInput
) -> WorkflowDescription:
"""Called for every :py:meth:`WorkflowHandle.describe` call."""
return await self.next.describe_workflow(input)

async def query_workflow(self, input: QueryWorkflowInput) -> Any:
"""Called for every :py:meth:`WorkflowHandle.query` call."""
Expand Down Expand Up @@ -1530,11 +1523,8 @@ async def start_workflow(
temporalio.converter.encode_search_attributes(
input.search_attributes, req.search_attributes
)
if input.header is not None:
for k, v in input.header.items():
req.header.fields[k] = (await self._client.data_converter.encode([v]))[
0
]
if input.headers is not None:
temporalio.common._apply_headers(input.headers, req.header.fields)

# Start with signal or just normal start
resp: Union[
Expand Down Expand Up @@ -1597,8 +1587,7 @@ async def query_workflow(self, input: QueryWorkflowInput) -> Any:
execution=temporalio.api.common.v1.WorkflowExecution(
workflow_id=input.id,
run_id=input.run_id or "",
)
# TODO(cretz): Headers here and elsewhere
),
)
if input.reject_condition:
req.query_reject_condition = cast(
Expand All @@ -1610,6 +1599,8 @@ async def query_workflow(self, input: QueryWorkflowInput) -> Any:
req.query.query_args.payloads.extend(
await self._client.data_converter.encode(input.args)
)
if input.headers is not None:
temporalio.common._apply_headers(input.headers, req.query.header.fields)
resp = await self._client.service.query_workflow(req, retry=True)
if resp.HasField("query_rejected"):
raise WorkflowQueryRejectedError(
Expand Down Expand Up @@ -1639,12 +1630,13 @@ async def signal_workflow(self, input: SignalWorkflowInput) -> None:
signal_name=input.signal,
identity=self._client.identity,
request_id=str(uuid.uuid4()),
# TODO(cretz): Headers here and elsewhere
)
if input.args:
req.input.payloads.extend(
await self._client.data_converter.encode(input.args)
)
if input.headers is not None:
temporalio.common._apply_headers(input.headers, req.header.fields)
await self._client.service.signal_workflow_execution(req, retry=True)

async def terminate_workflow(self, input: TerminateWorkflowInput) -> None:
Expand Down
21 changes: 20 additions & 1 deletion temporalio/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@
from dataclasses import dataclass
from datetime import datetime, timedelta
from enum import IntEnum
from typing import Any, Iterable, List, Mapping, Optional, Union
from typing import Any, Iterable, List, Mapping, Optional, Text, Union

import google.protobuf.internal.containers
from typing_extensions import TypeAlias

import temporalio.api.common.v1
Expand Down Expand Up @@ -149,3 +150,21 @@ def _arg_or_args(arg: Any, args: Iterable[Any]) -> Iterable[Any]:
raise ValueError("Cannot have arg and args")
args = [arg]
return args


def _apply_headers(
source: Optional[Mapping[str, temporalio.api.common.v1.Payload]],
dest: google.protobuf.internal.containers.MessageMap[
Text, temporalio.api.common.v1.Payload
],
) -> None:
if source is None:
return
# Due to how protobuf maps of messages work, we cannot just set these or
# "update" these, instead they expect a shallow copy
# TODO(cretz): We could make this cheaper where we use it by precreating the
# command, but that forces proto commands to be embedded into interceptor
# inputs.
for k, v in source.items():
# This does not copy bytes, just messages
dest[k].CopyFrom(v)
1 change: 1 addition & 0 deletions temporalio/contrib/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""Extra modules that may have optional dependencies."""
Loading