-
Notifications
You must be signed in to change notification settings - Fork 104
Activity support #7
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
5b6bba0
1e5aa06
6dcd10b
5f10f29
f22646e
de00f1f
042734f
6b66c12
dd484a9
419f2f7
9538734
2fb2e21
cbc1321
2697d41
d595c04
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,298 @@ | ||
"""Functions that can be called inside of activities. | ||
|
||
Most of these functions use :py:mod:`contextvars` to obtain the current activity | ||
in context. This is already set before the start of the activity. Activities | ||
that make calls that do not automatically propagate the context, such as calls | ||
in another thread, should not use the calls herein unless the context is | ||
explicitly propagated. | ||
""" | ||
|
||
from __future__ import annotations | ||
|
||
import asyncio | ||
import contextvars | ||
import logging | ||
import threading | ||
from dataclasses import dataclass | ||
from datetime import datetime, timedelta | ||
from typing import ( | ||
Any, | ||
Callable, | ||
Iterable, | ||
Mapping, | ||
MutableMapping, | ||
NoReturn, | ||
Optional, | ||
Tuple, | ||
) | ||
|
||
import temporalio.api.common.v1 | ||
import temporalio.common | ||
import temporalio.exceptions | ||
|
||
|
||
@dataclass(frozen=True) | ||
class Info: | ||
"""Information about the running activity. | ||
|
||
Retrieved inside an activity via :py:func:`info`. | ||
""" | ||
|
||
activity_id: str | ||
activity_type: str | ||
attempt: int | ||
current_attempt_scheduled_time: datetime | ||
header: Mapping[str, temporalio.api.common.v1.Payload] | ||
heartbeat_details: Iterable[Any] | ||
heartbeat_timeout: Optional[timedelta] | ||
is_local: bool | ||
retry_policy: Optional[temporalio.common.RetryPolicy] | ||
schedule_to_close_timeout: Optional[timedelta] | ||
scheduled_time: datetime | ||
start_to_close_timeout: Optional[timedelta] | ||
started_time: datetime | ||
task_queue: str | ||
task_token: bytes | ||
workflow_id: str | ||
workflow_namespace: str | ||
workflow_run_id: str | ||
workflow_type: str | ||
# TODO(cretz): Consider putting identity on here for "worker_id" for logger? | ||
|
||
def _logger_details(self) -> Mapping[str, Any]: | ||
return { | ||
"activity_id": self.activity_id, | ||
"activity_type": self.activity_type, | ||
"attempt": self.attempt, | ||
"namespace": self.workflow_namespace, | ||
"task_queue": self.task_queue, | ||
"workflow_id": self.workflow_id, | ||
"workflow_run_id": self.workflow_run_id, | ||
"workflow_type": self.workflow_type, | ||
} | ||
|
||
|
||
_current_context: contextvars.ContextVar[_Context] = contextvars.ContextVar("activity") | ||
|
||
|
||
@dataclass | ||
class _Context: | ||
info: Callable[[], Info] | ||
# This is optional because during interceptor init it is not present | ||
heartbeat: Optional[Callable[..., None]] | ||
cancelled_event: _CompositeEvent | ||
worker_shutdown_event: _CompositeEvent | ||
_logger_details: Optional[Mapping[str, Any]] = None | ||
|
||
@staticmethod | ||
def current() -> _Context: | ||
context = _current_context.get(None) | ||
if not context: | ||
raise RuntimeError("Not in activity context") | ||
return context | ||
|
||
@staticmethod | ||
def set(context: _Context) -> None: | ||
_current_context.set(context) | ||
|
||
@property | ||
def logger_details(self) -> Mapping[str, Any]: | ||
cretz marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if self._logger_details is None: | ||
self._logger_details = self.info()._logger_details() | ||
return self._logger_details | ||
|
||
|
||
@dataclass | ||
class _CompositeEvent: | ||
# This should always be present, but is sometimes lazily set internally | ||
thread_event: Optional[threading.Event] | ||
# Async event only for async activities | ||
async_event: Optional[asyncio.Event] | ||
|
||
def set(self) -> None: | ||
if not self.thread_event: | ||
raise RuntimeError("Missing event") | ||
self.thread_event.set() | ||
if self.async_event: | ||
self.async_event.set() | ||
|
||
def is_set(self) -> bool: | ||
if not self.thread_event: | ||
raise RuntimeError("Missing event") | ||
return self.thread_event.is_set() | ||
|
||
async def wait(self) -> None: | ||
if not self.async_event: | ||
raise RuntimeError("not in async activity") | ||
await self.async_event.wait() | ||
|
||
def wait_sync(self, timeout: Optional[float] = None) -> None: | ||
if not self.thread_event: | ||
raise RuntimeError("Missing event") | ||
self.thread_event.wait(timeout) | ||
|
||
|
||
def in_activity() -> bool: | ||
"""Whether the current code is inside an activity. | ||
|
||
Returns: | ||
True if in an activity, False otherwise. | ||
""" | ||
return not _current_context.get(None) is None | ||
|
||
|
||
def info() -> Info: | ||
"""Current activity's info. | ||
|
||
Returns: | ||
Info for the currently running activity. | ||
|
||
Raises: | ||
RuntimeError: When not in an activity. | ||
""" | ||
return _Context.current().info() | ||
|
||
|
||
def heartbeat(*details: Any) -> None: | ||
"""Send a heartbeat for the current activity. | ||
|
||
Raises: | ||
RuntimeError: When not in an activity. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So if an activity is canceled the heartbeat still succeeds? In Go the context is invalidated, but context is a pretty standard abstraction in Go. I think most developers are going to ignore cancellation or workflow completion in this case. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For async in Python, the standard abstraction is to use https://docs.python.org/3/library/asyncio-task.html#asyncio.Task.cancel to represent cancel which can be caught, etc. However, for "sync" activities, yes, the way I currently developed it requires explicit cancellation checking. In retrospect this isn't good enough because it doesn't throw. I think I will have this call throw when the activity is cancelled. Note that heartbeat is async due to buffering, so cancellation may not throw until the next heartbeat. @bergundy - can you clarify what TS does here for heartbeat on a cancelled activity? Do you see any problems with me throwing when the activity is cancelled here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. OK, here's some notes on what I think I learned here:
Some of those make sense in their async or sync environments. But Python is both, so what do we do in Python? Notes:
Still working on the best solution here... There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. After discussion with @bergundy, we have resolved to...have further discussions heh. While heartbeat throwing may be ok, we have to discuss how that looks in local activities. Also need to discuss the confusion when your |
||
""" | ||
heartbeat_fn = _Context.current().heartbeat | ||
if not heartbeat_fn: | ||
raise RuntimeError("Can only execute heartbeat after interceptor init") | ||
heartbeat_fn(*details) | ||
|
||
|
||
def is_cancelled() -> bool: | ||
"""Whether a cancellation was ever requested on this activity. | ||
|
||
Returns: | ||
True if the activity has had a cancellation request, False otherwise. | ||
|
||
Raises: | ||
RuntimeError: When not in an activity. | ||
""" | ||
return _Context.current().cancelled_event.is_set() | ||
|
||
|
||
async def wait_for_cancelled() -> None: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we rename this to something like There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This matches Python APIs like There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It still is a known word. I don't expect people to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In TS we simply call this |
||
"""Asynchronously wait for this activity to get a cancellation request. | ||
|
||
Raises: | ||
RuntimeError: When not in an async activity. | ||
""" | ||
await _Context.current().cancelled_event.wait() | ||
|
||
|
||
def wait_for_cancelled_sync(timeout: Optional[float] = None) -> None: | ||
"""Synchronously block while waiting for a cancellation request on this | ||
activity. | ||
|
||
This is essentially a wrapper around :py:meth:`threading.Event.wait`. | ||
|
||
Args: | ||
timeout: Max amount of time to wait for cancellation. | ||
|
||
Raises: | ||
RuntimeError: When not in an activity. | ||
""" | ||
_Context.current().cancelled_event.wait_sync(timeout) | ||
|
||
|
||
def is_worker_shutdown() -> bool: | ||
"""Whether shutdown has been invoked on the worker. | ||
|
||
Returns: | ||
True if shutdown has been called on the worker, False otherwise. | ||
|
||
Raises: | ||
RuntimeError: When not in an activity. | ||
""" | ||
return _Context.current().worker_shutdown_event.is_set() | ||
|
||
|
||
async def wait_for_worker_shutdown() -> None: | ||
cretz marked this conversation as resolved.
Show resolved
Hide resolved
|
||
"""Asynchronously wait for shutdown to be called on the worker. | ||
|
||
Raises: | ||
RuntimeError: When not in an async activity. | ||
""" | ||
await _Context.current().worker_shutdown_event.wait() | ||
|
||
|
||
def wait_for_worker_shutdown_sync(timeout: Optional[float] = None) -> None: | ||
"""Synchronously block while waiting for shutdown to be called on the | ||
worker. | ||
|
||
This is essentially a wrapper around :py:meth:`threading.Event.wait`. | ||
|
||
Args: | ||
timeout: Max amount of time to wait for shutdown to be called on the | ||
worker. | ||
|
||
Raises: | ||
RuntimeError: When not in an activity. | ||
""" | ||
_Context.current().worker_shutdown_event.wait_sync(timeout) | ||
|
||
|
||
def raise_complete_async() -> NoReturn: | ||
cretz marked this conversation as resolved.
Show resolved
Hide resolved
|
||
"""Raise an error that says the activity will be completed | ||
asynchronously. | ||
""" | ||
raise _CompleteAsyncError() | ||
|
||
|
||
class _CompleteAsyncError(temporalio.exceptions.TemporalError): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not blocking this PR, do you think we should make this class inherit from There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Yes, I will change this since it's a private exception class anyways. I will do this when implementing async activity completion.
Fun fact, not in Python 3.7 which is why I catch it separately. |
||
pass | ||
|
||
|
||
class LoggerAdapter(logging.LoggerAdapter): | ||
"""Adapter that adds details to the log about the running activity. | ||
|
||
Attributes: | ||
activity_info_on_message: Boolean for whether a string representation of | ||
a dict of some activity info will be appended to each message. | ||
Default is True. | ||
activity_info_on_extra: Boolean for whether an ``activity_info`` value | ||
will be added to the ``extra`` dictionary, making it present on the | ||
``LogRecord.__dict__`` for use by others. | ||
""" | ||
|
||
def __init__( | ||
self, logger: logging.Logger, extra: Optional[Mapping[str, Any]] | ||
) -> None: | ||
"""Create the logger adapter.""" | ||
super().__init__(logger, extra or {}) | ||
self.activity_info_on_message = True | ||
self.activity_info_on_extra = True | ||
|
||
def process( | ||
self, msg: Any, kwargs: MutableMapping[str, Any] | ||
) -> Tuple[Any, MutableMapping[str, Any]]: | ||
"""Override to add activity details.""" | ||
msg, kwargs = super().process(msg, kwargs) | ||
if self.activity_info_on_extra or self.activity_info_on_extra: | ||
context = _current_context.get(None) | ||
bergundy marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if context: | ||
if self.activity_info_on_message: | ||
msg = f"{msg} ({context.logger_details})" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This should be probably be interceptable. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It is disableable. Any of course all logs are interceptable via various means. You're just looking at default behavior. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I didn't look at the rest of the codebase, is this logger used by the framework or just user code? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Both (we log stuff using the activity logger in the worker in a couple places to have that context). To disable this appending, anyone can call I considered a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If you wanted to make it fully extensible you could make the log message formatter configurable but this is fine for now for sure. |
||
if self.activity_info_on_extra: | ||
# Extra can be absent or None, this handles both | ||
extra = kwargs.get("extra", None) or {} | ||
extra["activity_info"] = context.info() | ||
kwargs["extra"] = extra | ||
return (msg, kwargs) | ||
|
||
@property | ||
def base_logger(self) -> logging.Logger: | ||
"""Underlying logger usable for actions such as adding | ||
handlers/formatters. | ||
""" | ||
return self.logger | ||
|
||
|
||
#: Logger that will have contextual activity details embedded. | ||
logger = LoggerAdapter(logging.getLogger(__name__), None) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not for this PR, but before the release we'll need to publish these docs somewhere.
It'd be nice to see them rendered.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed. We need to have a larger conversation about docs on Python. I would also welcome input on any projects that y'all consider to have good docs and I can emulate those. Unfortunate the Python ecosystem is very very poor with regards to easily navigable API reference.
In the meantime, if you want to view them locally, run
poe gen-docs
and opendocs/_build/index.html
. It's just a limited set of API docs atm.