-
Notifications
You must be signed in to change notification settings - Fork 104
Phase 2 - Initial workflow implementation #21
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lookin' real nice
temporalio/worker/worker.py
Outdated
created with ``max_workers`` set to | ||
``max_concurrent_workflow_tasks``. The default one will be |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The number of threads here should be independent of (and probably lower than) max WFTs. There shouldn't be any blocking stuff going on in these threads, so it ought to be fine for them to be sliced up among a number of active workflows
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There shouldn't be any blocking stuff going on in these threads
There is in our case because this is how we do deadlock detection. Essentially, every WFT runs in its own thread. Of course WFTs should only take a couple milliseconds so that is harmless, but technically 5 WFTs running means 5 threads running.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm.. do you need a thread per concurrent running workflow? Is it just easier to manage? Why not use a single thread?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do you need a thread per concurrent running workflow?
No. It's a choice.
Is it just easier to manage? Why not use a single thread?
We could use a single thread, but now you've removed all concurrency from workflow tasks for a worker. The goal of a thread is to detect deadlocks and support concurrent workflow tasks. I could definitely reduce the number here or even use a single thread if we wanted.
Note that the current model supports things like using non-asyncio HTTP calls inside payload codec.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My main concern is you'd possibly run into problems with consuming too many threads and the associated overhead as is the case with Java which uses the same approach.
Deadlock detection is important, but exists to solve a problem that should be rather temporary. Personally I think it's probably OK to lock up a few other in-flight workflows for a second because of a deadlock in one. The user is going to need to address that immediately in any case. Whereas the thread-per-workflow thing is possibly a problem for the happy path
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can understand that. Suggestion on what the default max-threads should be? I can make it something like "5". That sound good? I have confirmed Java uses max-concurrent-WFT to determine thread pool size, though I understand we may not want to mimic Java here.
(of course many users might be better off passing in their own thread pools they can share across workers, both activity and workflow)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After further discussion, I'll put this at some low number like "4". The GIL prevents most practical concurrency anyways (though newer Python versions are working on improving this).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not concerned with thread consumption since we don't actually use a thread per workflow for caching purposes like Java does.
I do tend to agree with @Sushisource though that there's not much benefit to using more than 1 thread here since activations will be processed serially anyways, since there's no I/O in workflow code.
Thread will buy you some concurrency in case the workflow runs CPU intensive work which it technically shouldn't but I don't see any harm in setting this to a very low value like Chad suggests but I wouldn't tie this to the max concurrent WFTs since there's work that can be done concurrently before we call into user code like payload codecs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have changed this to the CPU count or 4, whichever is higher.
3d53d71
to
f02a1ea
Compare
) | ||
else: | ||
handle._resolve_failure( | ||
RuntimeError("Unknown child start fail cause: {job.failed.cause}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
RuntimeError("Unknown child start fail cause: {job.failed.cause}") | |
RuntimeError(f"Unknown child start fail cause: {job.failed.cause}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there some lint rule that can identify this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hrmm, maybe. Currently I only use a style linter and MyPy and a doc linter. I don't have a general purpose linter for general Python suggestions. I will see if I can find a well-accepted one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I checked pylint
and confirmed it wouldn't catch this. I suppose there would be too many false positives with normal brace use in string literals.
I also just gave pylint
a general look and it doesn't give us more than MyPy really, it's actually worse (can't recognize stub files). And black solves the formatting. So I think we're as good as we can reasonably get with linting currently.
I apologize for the force pushes. They are minor attempts to fix whatever I have done recently to make tests flaky on top of the custom-signal/query-handler registration that I committed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great readme!
command.schedule_local_activity.original_schedule_time.CopyFrom( | ||
local_backoff.original_schedule_time | ||
) | ||
# TODO(cretz): Remove when https://github.com/temporalio/sdk-core/issues/316 fixed |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I fixed this already
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! Will remove this on my next core update
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a place where you prevent generating new commands after a terminal workflow command has been emitted?
name: Optional[str] = None, | ||
dynamic: Optional[bool] = False, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could simplify with special value: name: Union[Literal[CatchAll, str]]
and get rid of dynamic
..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not really, because this is the final destination of the overloads. And one overload is name=
and the other is dynamic=
. So I have to accept both parameters in the final target method because it has to be a superset of overload parameters. Unless you are saying @workflow.signal(name=CatchAll)
is more user-friendly @workflow.signal(dynamic=True)
.
Note, non-@overload
functions are not documentation in API doc. This is basically the best way to do mutually exclusive parameters.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking @workflow.signal(name=CatchAll)
is more friendly, it also implies that you can combine named and catchall signal handlers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I disagree it's more friendly (and it'd be @workflow.signal(name=workflow.CatchAll)
). They are different mutually exclusive parameters. Also, you can't combine them because the dynamic catch all one is given a name and then the args (we actually enforce at runtime that they accept self, a name, then positional varargs).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After discussion it was decided not to reuse the name
as a way to mark this signal dynamic/catch-all
Not directly. Rather I am forcing the loop to stop, which should prevent any other command generation, in the two places that can generate terminal workflow commands - workflow function return or signal handler failure. See the |
Well, it looks like the So I am going to add something that removes non- EDIT: I have pushed changes to cull them before returning |
tests/worker/test_workflow.py
Outdated
with pytest.raises(WorkflowFailureError) as err: | ||
await client.execute_workflow( | ||
CancelLocalActivityWorkflow.run, | ||
False, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: use keyword argument so it's clear what is False
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't support keyword arguments in SDKs. Were this something besides a test case, I'd make params a dataclass.
assert "never_completing_coroutine" in trace | ||
|
||
|
||
# TODO: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's probably more, I'd review the other SDKs' tests to see if there's anything important we're missing.
# * Custom workflow runner that also confirms WorkflowInstanceDetails can be pickled | ||
# * Deadlock detection | ||
# * Non-query commands after workflow complete | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Test for query after workflow completed / failed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Last call on this PR. While I will be adding more tests throughout the day and maybe also updating the core submodule, it is basically ready to merge (with all outstanding phase 2 TODOs to become issues). |
Merging. All outstanding tasks have been created as issues. |
Phase 2 items included:
Phase 2 TODO:
Payload codec(EDIT: Done)Child/external workflow signal(EDIT: Done)External workflow cancel(EDIT: Done)Workflow logger(EDIT: Done)In-workflow signal/query handlers(EDIT: Done)Continue as new(EDIT: Done)Stack trace query(EDIT: Done, albeit ugly)Notes: