-
Notifications
You must be signed in to change notification settings - Fork 104
Activity support #7
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
temporalio/bridge/src/client.rs
Outdated
@@ -0,0 +1,296 @@ | |||
use pyo3::exceptions::{PyRuntimeError, PyValueError}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was just extracted from lib.rs
, nothing much changed. No need for deep review.
// TODO(cretz): Python won't let self take ownership. How to do this best? | ||
// Make self.worker an option and move out? I don't think it's necessarily | ||
// clear we can use https://pyo3.rs/v0.15.1/class/protocols.html#garbage-collector-integration-1. | ||
// fn finalize_shutdown<'p>(self, py: Python<'p>) -> PyResult<&'p PyAny> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently this requires Python to give up ownership of this object back to Rust which is a bit rough right now. @Sushisource has provided a solution that does manual reference counting which I fear, if Core must continue to have this object moved back into Rust for finalize_shutdown
, we may end up using. The other option here is to use unsafe and have Python promise not to use this reference after change. I need to dig into options here, but I left it off for this review.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This might change a bit with the changes Roey wanted to make to shutdown and he's working on, but I don't think it affects this specific thing.
Anyway, using https://doc.rust-lang.org/std/sync/struct.Arc.html#method.try_unwrap and waiting on some condition that wakes every time a ref is dropped is I think perfectly fine, I don't think there's really any reason to be afraid of that. Alternatively or in addition you can from higher up ensure this just shouldn't be called until the worker is done being used otherwise, as well. Definitely better than using unsafe which there's no need for.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To clarify, I am not afraid of using it, I just didn't want to have to. But it sounds like I must.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've started working on refactoring Core shutdown.
In the "new" version there are 2 methods:
initiate_method
- not async, signals that worker shutdown has been requestedfinalize_shutdown
- basically an async drop, AFAICT this is optional
The worker can be considered shutdown once it returns shutdown error in both poll methods.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ug, that upends my efforts here on getting shutdown right by waiting on the right things. But ok.
As for finalize_shutdown
, is there anyway you can make the regular Core shutdown do as much of this as possible and make this really optional? Having to move ownership from Python back to Rust for this is rough. Python owns the object and the drop.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For now I will be leaving finalize_shutdown
uncalled until these things come about and I may have to rework shutdown anyways.
@@ -307,9 +314,44 @@ def get_workflow_handle( | |||
first_execution_run_id=first_execution_run_id, | |||
) | |||
|
|||
@overload | |||
def get_activity_completion_handle( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These are placeholders just to capture the ideas that @bergundy and I discussed around using @overload
. The full implementation is not present.
heartbeat_fn = heartbeat | ||
temporalio.activity._Context.set( | ||
temporalio.activity._Context( | ||
info=lambda: info, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hrmm, this removes the ability for an interceptor to override this for non-async activities, but do I want to make a whole threadsafe-and-picklable info call?
@@ -0,0 +1,223 @@ | |||
package main |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This file was mostly just moved and is a helper for running workflows in certain ways for test cases. It can be skipped by most reviewers.
assert info.heartbeat_details == [] | ||
assert info.heartbeat_timeout is None | ||
# TODO(cretz): Broken? | ||
# assert info.schedule_to_close_timeout is None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I need to open a server issue about this. This is being set to a few nanos more than start_to_close_timeout
by the server for some reason.
// TODO(cretz): Python won't let self take ownership. How to do this best? | ||
// Make self.worker an option and move out? I don't think it's necessarily | ||
// clear we can use https://pyo3.rs/v0.15.1/class/protocols.html#garbage-collector-integration-1. | ||
// fn finalize_shutdown<'p>(self, py: Python<'p>) -> PyResult<&'p PyAny> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This might change a bit with the changes Roey wanted to make to shutdown and he's working on, but I don't think it affects this specific thing.
Anyway, using https://doc.rust-lang.org/std/sync/struct.Arc.html#method.try_unwrap and waiting on some condition that wakes every time a ref is dropped is I think perfectly fine, I don't think there's really any reason to be afraid of that. Alternatively or in addition you can from higher up ensure this just shouldn't be called until the worker is done being used otherwise, as well. Definitely better than using unsafe which there's no need for.
# The function must be picklable for use in process | ||
# executors, we we perform this eager check to fail at | ||
# registration time | ||
# TODO(cretz): Is this too expensive/unnecessary? | ||
try: | ||
pickle.dumps(activity) | ||
except Exception as err: | ||
raise TypeError( | ||
f"Activity {name} must be picklable when using a process executor" | ||
) from err |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we really need this? If users want to spawn within their activities, they can always do that. Not sure we need to support this form out-of-the-box. If it's asked for a bunch, maybe we add it then. Process management always ends up being a huge pain.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's mostly harmless to support. They are still doing the process management, but it makes for a nice setup if you support common Python async (asyncio, threading, multiprocess) out of the box. This eager check is just to save them if they are using multiprocessing. If we had to do process management I'd agree we shouldn't support it, but in this file we just make sure our stuff works across process, not managing processes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's definitely not an MVP feature but since all of this work has been put in, let's keep it.
There's a bit more to it than just spawning a thread or process (or using an executor), @cretz added heartbeat and cancellation support for sync activities which would be much harder for users to implement themselves and they'll look to us to provide an official solution.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've managed to review all but test_worker.py
, I'll get back to it later.
@@ -11,6 +11,18 @@ Client | |||
.. automodule:: temporalio.client | |||
:members: | |||
|
|||
Worker |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not for this PR, but before the release we'll need to publish these docs somewhere.
It'd be nice to see them rendered.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed. We need to have a larger conversation about docs on Python. I would also welcome input on any projects that y'all consider to have good docs and I can emulate those. Unfortunate the Python ecosystem is very very poor with regards to easily navigable API reference.
In the meantime, if you want to view them locally, run poe gen-docs
and open docs/_build/index.html
. It's just a limited set of API docs atm.
return _Context.current().cancelled_event.thread_event.is_set() | ||
|
||
|
||
async def wait_for_cancelled() -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we rename this to something like until_cancelled
, await wait...
doesn't read well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This matches Python APIs like asyncio.wait
and asyncio.wait_for
, both of with you'd call await
in front of too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wait_for
was introduced before Python even had async await
, it use to be yield from wait_for...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It still is a known word. until_
sounds a bit strange. Is there any other better phrasing? Remember, we also have the synchronous one (until_cancelled_sync()
is a bit rough too).
I don't expect people to await
this anyways, I expect them to pass the task to wait
/wait_for
with several others. But I am ok changing the name if we can come up with something better.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In TS we simply call this cancelled
but I'm fine with what we have here.
) | ||
elif ( | ||
isinstance(err, asyncio.CancelledError) | ||
and running_activity.cancelled_by_request |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note that core might cancel due to various reasons like workflow already complete
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you clarify the suggestion here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just meant that cancellation is also a mechanism for core to propagate heartbeat errors back to the activity.
The fact that core cancelled the activity doesn't mean it completes as cancelled and might need to be considered as failure (I think we overlooked that in TS too - we didn't use cancels like we do now when that code was written).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see that core sends a cancel with one of three enum reasons and the non-cancelled-request ones won't do anything with the error anyways (activity already considered handled elsewhere). So is the concern that I use the term _by_request
here or that I should return a different error than cancelled error for a different reason? Can you clarify the suggestion?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In node I ended up treating all of those as cancel requests.
I only treat cancel due to failed heartbeat converter as activity failure.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here
"""Send a heartbeat for the current activity. | ||
|
||
Raises: | ||
RuntimeError: When not in an activity. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So if an activity is canceled the heartbeat still succeeds? In Go the context is invalidated, but context is a pretty standard abstraction in Go. I think most developers are going to ignore cancellation or workflow completion in this case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For async in Python, the standard abstraction is to use https://docs.python.org/3/library/asyncio-task.html#asyncio.Task.cancel to represent cancel which can be caught, etc. However, for "sync" activities, yes, the way I currently developed it requires explicit cancellation checking. In retrospect this isn't good enough because it doesn't throw.
I think I will have this call throw when the activity is cancelled. Note that heartbeat is async due to buffering, so cancellation may not throw until the next heartbeat. @bergundy - can you clarify what TS does here for heartbeat on a cancelled activity? Do you see any problems with me throwing when the activity is cancelled here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, here's some notes on what I think I learned here:
- TypeScript
- The heartbeat call does not throw and you will never know an activity cancel request happened unless you opt-in to knowing or are calling a cancellation aware call
- A promise is resolved on the first cancellation request and is not reset if ignored/swallowed. So you can't swallow the first cancel and wait for the next.
- Go
- The heartbeat call does not throw, you leverage the Go context to know if cancelled
- Like TypeScript, there is no way to swallow one cancellation and get notified about the next
- Unlike TypeScript, there is nothing you can check to see if a cancellation was requested. Checking the context alone doesn't tell you it isn't, say, a worker stop.
- Java
- The heartbeat call does throw, and properly resets itself for more cancellation requests if you swallow it (at least from what I am seeing)
- Heartbeat is the only path for cancellation. There is nothing you can check or wait on to see if cancellation was requested
Some of those make sense in their async or sync environments. But Python is both, so what do we do in Python? Notes:
- Does
temporalio.activity.heartbeat
throw?- Thinking yes, but since it's asynchronously sent in the background, it can only throw on the next call to heartbeat after the cancel is returned
- This will have to throw a different error than
asyncio.CancelledError
, maybe calledActivityCancelledError
or similar- I am thinking we want the cancellation reason on this too of which there are 4:
- cancel requested
- not found - gone from server, so the error is ignored anyways
- timed out - being handled elsewhere, so the error will be ignored anyways
- heartbeat failure - only really occurs if the details can't be converted on the previous heartbeat call
- I am thinking we want the cancellation reason on this too of which there are 4:
- Is
asyncio.Task.cancel
still used?- Thinking yes, but it's a different exception type and won't have the reason
- Maybe make the activity cancelled error available for the reason?
- Thinking yes, but it's a different exception type and won't have the reason
- Should we allow resetting of the
is_cancelled
andheartbeat
if the exception is swallowed- Thinking yes, but it's manual. So instead of like Java where the
heartbeat
call won't continue to throw on error, this will unless you manually reset the cancellation - The reason for doing this is to make the event simple
- Thinking yes, but it's manual. So instead of like Java where the
- This is all made more difficult by multiprocess support
- It was simple unidirectional heartbeating, but now I've learned that unlike TypeScript (my previous model here), heartbeating has to be bidirectional to report errors
Still working on the best solution here...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After discussion with @bergundy, we have resolved to...have further discussions heh. While heartbeat throwing may be ok, we have to discuss how that looks in local activities. Also need to discuss the confusion when your await
call could throw one type of cancel or your heartbeat()
call could throw another type. I suspect this same concern will come up in .Net where you have async and non-async and both are fairly heavily used in the ecosystem.
default_heartbeat_throttle_interval_millis: int | ||
|
||
|
||
class Worker: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could calling this Worker to create collisions in code completion as this class is purely an implementation one?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not really because it's in a the "bridge" package that no customer-facing code sees/uses. Technically I suppose when you start typing "Wo" in your IDE all things called worker from all packages appear (same for "Client" and similar). I could prefix this with an underscore if this was a concern.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking good to me
raise _CompleteAsyncError() | ||
|
||
|
||
class _CompleteAsyncError(temporalio.exceptions.TemporalError): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not blocking this PR, do you think we should make this class inherit from BaseException
so it's less likely to be caught be application code? (I saw that asyncio CancelledError subclasses that)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not blocking this PR, do you think we should make this class inherit from BaseException so it's less likely to be caught be application code?
Yes, I will change this since it's a private exception class anyways. I will do this when implementing async activity completion.
I saw that asyncio CancelledError subclasses that
Fun fact, not in Python 3.7 which is why I catch it separately.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't think I have any blocking comments here.
Merging since there are no large outstanding issues |
What was changed
temporalio/worker.py
andtests/test_worker.py
, everything else is built to support those two (especially the latter as that gives an idea of what the API looks like from a customer POV though I admittedly added a helper or two to keep tests small)All feedback welcome!