Skip to content

Packaging, dependency updates, and other errata #8

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Mar 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 43 additions & 4 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ jobs:
fail-fast: true
matrix:
python: ["3.7", "3.10"]
os: [ubuntu-latest] # TODO: macos-latest, windows-latest
os: [ubuntu-latest, macos-latest, windows-latest]
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Opened issue #9 for macOS arm support and #10 for more generic manylinux support

runs-on: ${{ matrix.os }}
steps:
- name: Print build information
Expand All @@ -27,12 +27,51 @@ jobs:
- uses: actions/setup-python@v1
with:
python-version: ${{ matrix.python }}
# Needed to tests since they use external server
# Needed for tests since they use external server
- uses: actions/setup-go@v2
with:
go-version: "1.17"
- run: python -m pip install --upgrade wheel poetry poethepoet
- run: poetry install
- run: poe lint
- run: poe build
- run: poe test
- run: poe build-develop
- run: poe test -s -o log_cli_level=DEBUG

# Compile the binaries and upload artifacts
compile-binaries:
strategy:
fail-fast: true
matrix:
include:
- os: ubuntu-latest
package-suffix: linux-amd64
- os: macos-latest
package-suffix: macos-amd64
- os: windows-latest
package-suffix: windows-amd64
runs-on: ${{ matrix.os }}
steps:
- uses: actions/checkout@v2
with:
submodules: recursive
- uses: actions-rs/toolchain@v1
with:
toolchain: stable
- uses: actions/setup-python@v1
with:
python-version: "3.10"
# Needed for tests since they use external server
- uses: actions/setup-go@v2
with:
go-version: "1.17"
- run: python -m pip install --upgrade wheel poetry poethepoet
- run: poetry install
- run: poe gen-protos
- run: poetry build
- run: poe fix-wheel
- run: poe test-dist-single
- uses: actions/upload-artifact@v2
with:
name: packages-${{ matrix.package-suffix }}
path: dist

3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
.venv
__pycache__
/build
/dist
/docs/_build
temporalio/api/*
!temporalio/api/__init__.py
temporalio/bridge/proto/*
!temporalio/bridge/proto/__init__.py
temporalio/bridge/target/
temporalio/bridge/temporal_sdk_bridge*
/tests/helpers/golangserver/golangserver
/tests/helpers/golangworker/golangworker
191 changes: 189 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,193 @@

The Python SDK is under development. There are no compatibility guarantees nor proper documentation pages at this time.

## Usage

### Installation

Install the `temporalio` package from [PyPI](https://pypi.org/project/temporalio). If using `pip` directly, this might
look like:

python -m pip install temporalio
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I plan on publishing this once we merge


### Client

A client can be created and used to start a workflow like so:

```python
from temporalio.client import Client

async def main():
# Create client connected to server at the given address
client = await Client.connect("http://localhost:7233", namespace="my-namespace")

# Start a workflow
handle = await client.start_workflow("my workflow name", "some arg", id="my-workflow-id", task_queue="my-task-queue")

# Wait for result
result = await handle.result()
print(f"Result: {result}")
```

Some things to note about the above code:

* A `Client` does not have an explicit "close"
* Positional arguments can be passed to `start_workflow`
* The `handle` represents the workflow that was started and can be used for more than just getting the result
* Since we are just getting the handle and waiting on the result, we could have called `client.execute_workflow` which
does the same thing
* Clients can have many more options not shown here (e.g. data converters and interceptors)

#### Data Conversion

Data converters are used to convert raw Temporal payloads to/from actual Python types. A custom data converter of type
`temporalio.converter.DataConverter` can be set via the `data_converter` client parameter.

The default data converter supports converting multiple types including:

* `None`
* `bytes`
* `google.protobuf.message.Message` - As JSON when encoding, but has ability to decode binary proto from other languages
* Anything that [`json.dump`](https://docs.python.org/3/library/json.html#json.dump) supports

As a special case in the default converter, [data classes](https://docs.python.org/3/library/dataclasses.html) are
automatically [converted to dictionaries](https://docs.python.org/3/library/dataclasses.html#dataclasses.asdict) before
encoding as JSON. Since Python is a dynamic language, when decoding via
[`json.load`](https://docs.python.org/3/library/json.html#json.load), the type is not known at runtime so, for example,
a JSON object will be a `dict`. As a special case, if the parameter type hint is a data class for a JSON payload, it is
decoded into an instance of that data class (properly recursing into child data classes).

### Activities

#### Activity-only Worker

An activity-only worker can be started like so:

```python
import asyncio
import logging
from temporalio.client import Client
from temporalio.worker import Worker

async def say_hello_activity(name: str) -> str:
return f"Hello, {name}!"


async def main(stop_event: asyncio.Event):
# Create client connected to server at the given address
client = await Client.connect("http://localhost:7233", namespace="my-namespace")

# Run the worker until the event is set
worker = Worker(client, task_queue="my-task-queue", activities={"say-hello-activity": say_hello_activity})
async with worker:
await stop_event.wait()
```

Some things to note about the above code:

* This creates/uses the same client that is used for starting workflows
* The `say_hello_activity` is `async` which is the recommended activity type (see "Types of Activities" below)
* The created worker only runs activities, not workflows
* Activities are passed as a mapping with the key as a string activity name and the value as a callable
* While this example accepts a stop event and uses `async with`, `run()` and `shutdown()` may be used instead
* Workers can have many more options not shown here (e.g. data converters and interceptors)

#### Types of Activities

There are 3 types of activity callables accepted and described below: asynchronous, synchronous multithreaded, and
synchronous multiprocess/other. Only positional parameters are allowed in activity callables.

##### Asynchronous Activities

Asynchronous activities, i.e. functions using `async def`, are the recommended activity type. When using asynchronous
activities no special worker parameters are needed.

Cancellation for asynchronous activities is done via
[`asyncio.Task.cancel`](https://docs.python.org/3/library/asyncio-task.html#asyncio.Task.cancel). This means that
`asyncio.CancelledError` will be raised (and can be caught, but it is not recommended). An activity must heartbeat to
receive cancellation and there are other ways to be notified about cancellation (see "Activity Context" and
"Heartbeating and Cancellation" later).

##### Synchronous Activities

Synchronous activities, i.e. functions that do not have `async def`, can be used with workers, but the
`activity_executor` worker parameter must be set with a `concurrent.futures.Executor` instance to use for executing the
activities.

Cancellation for synchronous activities is done in the background and the activity must choose to listen for it and
react appropriately. An activity must heartbeat to receive cancellation and there are other ways to be notified about
cancellation (see "Activity Context" and "Heartbeating and Cancellation" later).

###### Synchronous Multithreaded Activities

If `activity_executor` is set to an instance of `concurrent.futures.ThreadPoolExecutor` then the synchronous activities
are considered multithreaded activities. Besides `activity_executor`, no other worker parameters are required for
synchronous multithreaded activities.

###### Synchronous Multiprocess/Other Activities

Synchronous activities, i.e. functions that do not have `async def`, can be used with workers, but the
`activity_executor` worker parameter must be set with a `concurrent.futures.Executor` instance to use for executing the
activities. If this is _not_ set to an instance of `concurrent.futures.ThreadPoolExecutor` then the synchronous
activities are considered multiprocess/other activities.

These require special primitives for heartbeating and cancellation. The `shared_state_manager` worker parameter must be
set to an instance of `temporalio.worker.SharedStateManager`. The most common implementation can be created by passing a
`multiprocessing.managers.SyncManager` (i.e. result of `multiprocessing.managers.Manager()`) to
`temporalio.worker.SharedStateManager.create_from_multiprocessing()`.

Also, all of these activity functions must be
["picklable"](https://docs.python.org/3/library/pickle.html#what-can-be-pickled-and-unpickled).

#### Activity Context

During activity execution, an implicit activity context is set as a
[context variable](https://docs.python.org/3/library/contextvars.html). The context variable itself is not visible, but
calls in the `temporalio.activity` package make use of it. Specifically:

* `in_activity()` - Whether an activity context is present
* `info()` - Returns the immutable info of the currently running activity
* `heartbeat(*details)` - Record a heartbeat
* `is_cancelled()` - Whether a cancellation has been requested on this activity
* `wait_for_cancelled()` - `async` call to wait for cancellation request
* `wait_for_cancelled_sync(timeout)` - Synchronous blocking call to wait for cancellation request
* `is_worker_shutdown()` - Whether the worker has started graceful shutdown
* `wait_for_worker_shutdown()` - `async` call to wait for start of graceful worker shutdown
* `wait_for_worker_shutdown_sync(timeout)` - Synchronous blocking call to wait for start of graceful worker shutdown
* `raise_complete_async()` - Raise an error that this activity will be completed asynchronously (i.e. after return of
the activity function in a separate client call)

With the exception of `in_activity()`, if any of the functions are called outside of an activity context, an error
occurs. Synchronous activities cannot call any of the `async` functions.

##### Heartbeating and Cancellation

In order for an activity to be notified of cancellation requests, they must invoke `temporalio.activity.heartbeat()`.
It is strongly recommended that all but the fastest executing activities call this function regularly. "Types of
Activities" has specifics on cancellation for asynchronous and synchronous activities.

In addition to obtaining cancellation information, heartbeats also support detail data that is persisted on the server
for retrieval during activity retry. If an activity calls `temporalio.activity.heartbeat(123, 456)` and then fails and
is retried, `temporalio.activity.info().heartbeat_details` will return an iterable containing `123` and `456` on the
next run.

##### Worker Shutdown

An activity can react to a worker shutdown. Using `is_worker_shutdown` or one of the `wait_for_worker_shutdown`
functions an activity can react to a shutdown.

When the `graceful_shutdown_timeout` worker parameter is given a `datetime.timedelta`, on shutdown the worker will
notify activities of the graceful shutdown. Once that timeout has passed (or if wasn't set), the worker will perform
cancellation of all outstanding activities.

The `shutdown()` invocation will wait on all activities to complete, so if a long-running activity does not at least
respect cancellation, the shutdown may never complete.

## Development

The Python SDK is built to work with Python 3.7 and newer. It is built using
[SDK Core](https://github.com/temporalio/sdk-core/) which is written in Rust.

### Local development environment

- Install the system dependencies:
Expand All @@ -19,7 +206,7 @@ The Python SDK is under development. There are no compatibility guarantees nor p
poetry config virtualenvs.in-project true
```

- Install the package dependencies:
- Install the package dependencies (requires Rust):

```bash
poetry install
Expand All @@ -28,7 +215,7 @@ The Python SDK is under development. There are no compatibility guarantees nor p
- Build the project (requires Rust):

```bash
poe build
poe build-develop
```

- Run the tests (requires Go):
Expand Down
28 changes: 28 additions & 0 deletions build.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
"""Additional setup options for Poetry build."""

import shutil

from setuptools_rust import Binding, RustExtension


def build(setup_kwargs):
"""Additional setup options for Poetry build."""
setup_kwargs.update(
# Same as in scripts/setup_bridge.py, but we cannot import that here
# because it's not in the sdist
rust_extensions=[
RustExtension(
"temporalio.bridge.temporal_sdk_bridge",
path="temporalio/bridge/Cargo.toml",
binding=Binding.PyO3,
py_limited_api=True,
features=["pyo3/abi3-py37"],
)
],
zip_safe=False,
# We have to remove packages and package data due to duplicate files
# being generated in the wheel
packages=[],
package_data={},
)
shutil.rmtree("temporalio.egg-info", ignore_errors=True)
2 changes: 1 addition & 1 deletion docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
# The theme to use for HTML and HTML Help pages. See the documentation for
# a list of builtin themes.
#
html_theme = "sphinx_rtd_theme"
html_theme = "furo"

# Add any paths that contain custom static files (such as style sheets) here,
# relative to this directory. They are copied after the builtin static files,
Expand Down
Loading