Skip to content

Activity support #7

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 15 commits into from
Mar 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,5 @@ temporalio/api/*
temporalio/bridge/proto/*
!temporalio/bridge/proto/__init__.py
temporalio/bridge/target/
/tests/fixtures/golangserver/golangserver
/tests/fixtures/golangworker/golangworker
/tests/helpers/golangserver/golangserver
/tests/helpers/golangworker/golangworker
12 changes: 12 additions & 0 deletions docs/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,18 @@ Client
.. automodule:: temporalio.client
:members:

Worker
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not for this PR, but before the release we'll need to publish these docs somewhere.
It'd be nice to see them rendered.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. We need to have a larger conversation about docs on Python. I would also welcome input on any projects that y'all consider to have good docs and I can emulate those. Unfortunate the Python ecosystem is very very poor with regards to easily navigable API reference.

In the meantime, if you want to view them locally, run poe gen-docs and open docs/_build/index.html. It's just a limited set of API docs atm.

------

.. automodule:: temporalio.worker
:members:

Activity
--------

.. automodule:: temporalio.activity
:members:

Converters
----------

Expand Down
4 changes: 4 additions & 0 deletions docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,3 +69,7 @@
autodoc_preserve_defaults = True

autodoc_member_order = "bysource"

autodoc_default_options = {
"special-members": "__aenter__,__aexit__,__init__",
}
17 changes: 16 additions & 1 deletion poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ repository = "https://github.com/temporalio/sdk-python"
documentation = "https://docs.temporal.io/docs/python"

[tool.poetry.dependencies]
dacite = "^1.6.0"
grpcio = "^1.43.0"
python = "^3.7"
types-protobuf = "^3.19.6"
Expand Down
1 change: 1 addition & 0 deletions scripts/gen-protos.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

pyi_fixes = [
partial(re.compile(r"temporal\.api\.").sub, r"temporalio.api."),
partial(re.compile(r"temporal\.sdk\.core\.").sub, r"temporalio.bridge.proto."),
]

find_message_re = re.compile(r"_sym_db\.RegisterMessage\(([^\)\.]+)\)")
Expand Down
298 changes: 298 additions & 0 deletions temporalio/activity.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,298 @@
"""Functions that can be called inside of activities.

Most of these functions use :py:mod:`contextvars` to obtain the current activity
in context. This is already set before the start of the activity. Activities
that make calls that do not automatically propagate the context, such as calls
in another thread, should not use the calls herein unless the context is
explicitly propagated.
"""

from __future__ import annotations

import asyncio
import contextvars
import logging
import threading
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import (
Any,
Callable,
Iterable,
Mapping,
MutableMapping,
NoReturn,
Optional,
Tuple,
)

import temporalio.api.common.v1
import temporalio.common
import temporalio.exceptions


@dataclass(frozen=True)
class Info:
"""Information about the running activity.

Retrieved inside an activity via :py:func:`info`.
"""

activity_id: str
activity_type: str
attempt: int
current_attempt_scheduled_time: datetime
header: Mapping[str, temporalio.api.common.v1.Payload]
heartbeat_details: Iterable[Any]
heartbeat_timeout: Optional[timedelta]
is_local: bool
retry_policy: Optional[temporalio.common.RetryPolicy]
schedule_to_close_timeout: Optional[timedelta]
scheduled_time: datetime
start_to_close_timeout: Optional[timedelta]
started_time: datetime
task_queue: str
task_token: bytes
workflow_id: str
workflow_namespace: str
workflow_run_id: str
workflow_type: str
# TODO(cretz): Consider putting identity on here for "worker_id" for logger?

def _logger_details(self) -> Mapping[str, Any]:
return {
"activity_id": self.activity_id,
"activity_type": self.activity_type,
"attempt": self.attempt,
"namespace": self.workflow_namespace,
"task_queue": self.task_queue,
"workflow_id": self.workflow_id,
"workflow_run_id": self.workflow_run_id,
"workflow_type": self.workflow_type,
}


_current_context: contextvars.ContextVar[_Context] = contextvars.ContextVar("activity")


@dataclass
class _Context:
info: Callable[[], Info]
# This is optional because during interceptor init it is not present
heartbeat: Optional[Callable[..., None]]
cancelled_event: _CompositeEvent
worker_shutdown_event: _CompositeEvent
_logger_details: Optional[Mapping[str, Any]] = None

@staticmethod
def current() -> _Context:
context = _current_context.get(None)
if not context:
raise RuntimeError("Not in activity context")
return context

@staticmethod
def set(context: _Context) -> None:
_current_context.set(context)

@property
def logger_details(self) -> Mapping[str, Any]:
if self._logger_details is None:
self._logger_details = self.info()._logger_details()
return self._logger_details


@dataclass
class _CompositeEvent:
# This should always be present, but is sometimes lazily set internally
thread_event: Optional[threading.Event]
# Async event only for async activities
async_event: Optional[asyncio.Event]

def set(self) -> None:
if not self.thread_event:
raise RuntimeError("Missing event")
self.thread_event.set()
if self.async_event:
self.async_event.set()

def is_set(self) -> bool:
if not self.thread_event:
raise RuntimeError("Missing event")
return self.thread_event.is_set()

async def wait(self) -> None:
if not self.async_event:
raise RuntimeError("not in async activity")
await self.async_event.wait()

def wait_sync(self, timeout: Optional[float] = None) -> None:
if not self.thread_event:
raise RuntimeError("Missing event")
self.thread_event.wait(timeout)


def in_activity() -> bool:
"""Whether the current code is inside an activity.

Returns:
True if in an activity, False otherwise.
"""
return not _current_context.get(None) is None


def info() -> Info:
"""Current activity's info.

Returns:
Info for the currently running activity.

Raises:
RuntimeError: When not in an activity.
"""
return _Context.current().info()


def heartbeat(*details: Any) -> None:
"""Send a heartbeat for the current activity.

Raises:
RuntimeError: When not in an activity.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So if an activity is canceled the heartbeat still succeeds? In Go the context is invalidated, but context is a pretty standard abstraction in Go. I think most developers are going to ignore cancellation or workflow completion in this case.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For async in Python, the standard abstraction is to use https://docs.python.org/3/library/asyncio-task.html#asyncio.Task.cancel to represent cancel which can be caught, etc. However, for "sync" activities, yes, the way I currently developed it requires explicit cancellation checking. In retrospect this isn't good enough because it doesn't throw.

I think I will have this call throw when the activity is cancelled. Note that heartbeat is async due to buffering, so cancellation may not throw until the next heartbeat. @bergundy - can you clarify what TS does here for heartbeat on a cancelled activity? Do you see any problems with me throwing when the activity is cancelled here?

Copy link
Member Author

@cretz cretz Mar 9, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, here's some notes on what I think I learned here:

  • TypeScript
    • The heartbeat call does not throw and you will never know an activity cancel request happened unless you opt-in to knowing or are calling a cancellation aware call
    • A promise is resolved on the first cancellation request and is not reset if ignored/swallowed. So you can't swallow the first cancel and wait for the next.
  • Go
    • The heartbeat call does not throw, you leverage the Go context to know if cancelled
    • Like TypeScript, there is no way to swallow one cancellation and get notified about the next
    • Unlike TypeScript, there is nothing you can check to see if a cancellation was requested. Checking the context alone doesn't tell you it isn't, say, a worker stop.
  • Java
    • The heartbeat call does throw, and properly resets itself for more cancellation requests if you swallow it (at least from what I am seeing)
    • Heartbeat is the only path for cancellation. There is nothing you can check or wait on to see if cancellation was requested

Some of those make sense in their async or sync environments. But Python is both, so what do we do in Python? Notes:

  • Does temporalio.activity.heartbeat throw?
    • Thinking yes, but since it's asynchronously sent in the background, it can only throw on the next call to heartbeat after the cancel is returned
    • This will have to throw a different error than asyncio.CancelledError, maybe called ActivityCancelledError or similar
      • I am thinking we want the cancellation reason on this too of which there are 4:
        • cancel requested
        • not found - gone from server, so the error is ignored anyways
        • timed out - being handled elsewhere, so the error will be ignored anyways
        • heartbeat failure - only really occurs if the details can't be converted on the previous heartbeat call
  • Is asyncio.Task.cancel still used?
    • Thinking yes, but it's a different exception type and won't have the reason
      • Maybe make the activity cancelled error available for the reason?
  • Should we allow resetting of the is_cancelled and heartbeat if the exception is swallowed
    • Thinking yes, but it's manual. So instead of like Java where the heartbeat call won't continue to throw on error, this will unless you manually reset the cancellation
    • The reason for doing this is to make the event simple
  • This is all made more difficult by multiprocess support
    • It was simple unidirectional heartbeating, but now I've learned that unlike TypeScript (my previous model here), heartbeating has to be bidirectional to report errors

Still working on the best solution here...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After discussion with @bergundy, we have resolved to...have further discussions heh. While heartbeat throwing may be ok, we have to discuss how that looks in local activities. Also need to discuss the confusion when your await call could throw one type of cancel or your heartbeat() call could throw another type. I suspect this same concern will come up in .Net where you have async and non-async and both are fairly heavily used in the ecosystem.

"""
heartbeat_fn = _Context.current().heartbeat
if not heartbeat_fn:
raise RuntimeError("Can only execute heartbeat after interceptor init")
heartbeat_fn(*details)


def is_cancelled() -> bool:
"""Whether a cancellation was ever requested on this activity.

Returns:
True if the activity has had a cancellation request, False otherwise.

Raises:
RuntimeError: When not in an activity.
"""
return _Context.current().cancelled_event.is_set()


async def wait_for_cancelled() -> None:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we rename this to something like until_cancelled, await wait... doesn't read well.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This matches Python APIs like asyncio.wait and asyncio.wait_for, both of with you'd call await in front of too.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wait_for was introduced before Python even had async await, it use to be yield from wait_for...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It still is a known word. until_ sounds a bit strange. Is there any other better phrasing? Remember, we also have the synchronous one (until_cancelled_sync() is a bit rough too).

I don't expect people to await this anyways, I expect them to pass the task to wait/wait_for with several others. But I am ok changing the name if we can come up with something better.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In TS we simply call this cancelled but I'm fine with what we have here.

"""Asynchronously wait for this activity to get a cancellation request.

Raises:
RuntimeError: When not in an async activity.
"""
await _Context.current().cancelled_event.wait()


def wait_for_cancelled_sync(timeout: Optional[float] = None) -> None:
"""Synchronously block while waiting for a cancellation request on this
activity.

This is essentially a wrapper around :py:meth:`threading.Event.wait`.

Args:
timeout: Max amount of time to wait for cancellation.

Raises:
RuntimeError: When not in an activity.
"""
_Context.current().cancelled_event.wait_sync(timeout)


def is_worker_shutdown() -> bool:
"""Whether shutdown has been invoked on the worker.

Returns:
True if shutdown has been called on the worker, False otherwise.

Raises:
RuntimeError: When not in an activity.
"""
return _Context.current().worker_shutdown_event.is_set()


async def wait_for_worker_shutdown() -> None:
"""Asynchronously wait for shutdown to be called on the worker.

Raises:
RuntimeError: When not in an async activity.
"""
await _Context.current().worker_shutdown_event.wait()


def wait_for_worker_shutdown_sync(timeout: Optional[float] = None) -> None:
"""Synchronously block while waiting for shutdown to be called on the
worker.

This is essentially a wrapper around :py:meth:`threading.Event.wait`.

Args:
timeout: Max amount of time to wait for shutdown to be called on the
worker.

Raises:
RuntimeError: When not in an activity.
"""
_Context.current().worker_shutdown_event.wait_sync(timeout)


def raise_complete_async() -> NoReturn:
"""Raise an error that says the activity will be completed
asynchronously.
"""
raise _CompleteAsyncError()


class _CompleteAsyncError(temporalio.exceptions.TemporalError):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not blocking this PR, do you think we should make this class inherit from BaseException so it's less likely to be caught be application code? (I saw that asyncio CancelledError subclasses that)

Copy link
Member Author

@cretz cretz Mar 14, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not blocking this PR, do you think we should make this class inherit from BaseException so it's less likely to be caught be application code?

Yes, I will change this since it's a private exception class anyways. I will do this when implementing async activity completion.

I saw that asyncio CancelledError subclasses that

Fun fact, not in Python 3.7 which is why I catch it separately.

pass


class LoggerAdapter(logging.LoggerAdapter):
"""Adapter that adds details to the log about the running activity.

Attributes:
activity_info_on_message: Boolean for whether a string representation of
a dict of some activity info will be appended to each message.
Default is True.
activity_info_on_extra: Boolean for whether an ``activity_info`` value
will be added to the ``extra`` dictionary, making it present on the
``LogRecord.__dict__`` for use by others.
"""

def __init__(
self, logger: logging.Logger, extra: Optional[Mapping[str, Any]]
) -> None:
"""Create the logger adapter."""
super().__init__(logger, extra or {})
self.activity_info_on_message = True
self.activity_info_on_extra = True

def process(
self, msg: Any, kwargs: MutableMapping[str, Any]
) -> Tuple[Any, MutableMapping[str, Any]]:
"""Override to add activity details."""
msg, kwargs = super().process(msg, kwargs)
if self.activity_info_on_extra or self.activity_info_on_extra:
context = _current_context.get(None)
if context:
if self.activity_info_on_message:
msg = f"{msg} ({context.logger_details})"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be probably be interceptable.

Copy link
Member Author

@cretz cretz Mar 8, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is disableable. Any of course all logs are interceptable via various means. You're just looking at default behavior.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't look at the rest of the codebase, is this logger used by the framework or just user code?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both (we log stuff using the activity logger in the worker in a couple places to have that context). To disable this appending, anyone can call temporalio.activity.logger.activity_info_on_message = False. I don't know of a clearer way to do this and still be idiomatic Python.

I considered a logger() that lazily builds a logger for each activity context, but that isn't how Python prefers it. They like a global variable anyone can attach formatters/handlers to and configure.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you wanted to make it fully extensible you could make the log message formatter configurable but this is fine for now for sure.

if self.activity_info_on_extra:
# Extra can be absent or None, this handles both
extra = kwargs.get("extra", None) or {}
extra["activity_info"] = context.info()
kwargs["extra"] = extra
return (msg, kwargs)

@property
def base_logger(self) -> logging.Logger:
"""Underlying logger usable for actions such as adding
handlers/formatters.
"""
return self.logger


#: Logger that will have contextual activity details embedded.
logger = LoggerAdapter(logging.getLogger(__name__), None)
Loading