Skip to content

Phase 2 - Initial workflow implementation #21

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 23 commits into from
May 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ __pycache__
/dist
/docs/_autosummary
/docs/_build
/docs/_build_pydoctor
temporalio/api/*
!temporalio/api/__init__.py
temporalio/bridge/proto/*
Expand Down
367 changes: 323 additions & 44 deletions README.md

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ Modules

temporalio.client
temporalio.worker
temporalio.workflow
temporalio.activity
temporalio.converter
temporalio.exceptions
Expand Down
427 changes: 400 additions & 27 deletions poetry.lock

Large diffs are not rendered by default.

19 changes: 16 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,10 @@ black = "^21.12b0"
furo = "^2022.3.4"
grpcio-tools = "^1.43.0"
isort = "^5.10.1"
mypy = "^0.931"
mypy = "^0.950"
mypy-protobuf = "^3.2.0"
pydocstyle = "^6.1.1"
pydoctor = "^22.5.1"
pytest = "^7.1.1"
pytest-asyncio = "^0.18.0"
pytest-timeout = "^2.1.0"
Expand All @@ -56,14 +57,17 @@ build-bridge-develop = "python scripts/setup_bridge.py develop"
fix-wheel = "python scripts/fix_wheel.py"
format = [{cmd = "black ."}, {cmd = "isort ."}]
gen-docs = "sphinx-build docs docs/_build"
gen-docs-pydoctor = "pydoctor"
gen-protos = "python scripts/gen_protos.py"
lint = [
{cmd = "black --check ."},
{cmd = "isort --check-only ."},
{ref = "lint-types"},
{ref = "lint-docs"},
]
lint-docs = "pydocstyle"
# TODO(cretz): Why does pydocstyle complain about @overload missing docs after
# https://github.com/PyCQA/pydocstyle/pull/511?
lint-docs = "pydocstyle --ignore-decorators=overload"
lint-types = "mypy ."
test = "pytest"

Expand All @@ -86,7 +90,7 @@ asyncio_mode = "auto"
log_cli = true
log_cli_level = "INFO"
log_cli_format = "%(asctime)s [%(levelname)8s] %(message)s (%(filename)s:%(lineno)s)"
timeout = 60
timeout = 600
timeout_func_only = true

[tool.isort]
Expand All @@ -111,6 +115,15 @@ add_ignore = [
"D205", "D415"
]

[tool.pydoctor]
add-package = ["temporalio"]
docformat = "google"
html-output = "docs/_build_pydoctor"
intersphinx = ["https://docs.python.org/3/objects.inv", "https://googleapis.dev/python/protobuf/latest/objects.inv"]
privacy = ["PRIVATE:temporalio.bridge", "HIDDEN:**.*_pb2*"]
project-name = "Temporal"
sidebar-expand-depth = 40

[build-system]
build-backend = "poetry.core.masonry.api"
requires = ["poetry-core>=1.0.0", "setuptools", "wheel", "setuptools-rust"]
8 changes: 8 additions & 0 deletions scripts/gen_protos.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,17 @@ def fix_generated_output(base_path: Path):
f.write(content)
# Write init
with (base_path / "__init__.py").open("w") as f:
# Imports
message_names = []
for stem, messages in imports.items():
for message in messages:
f.write(f"from .{stem} import {message}\n")
message_names.append(message)
# __all__
if message_names:
f.write(
f'\n__all__ = [\n "' + '",\n "'.join(message_names) + '",\n]\n'
)


if __name__ == "__main__":
Expand Down
56 changes: 46 additions & 10 deletions temporalio/activity.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

import asyncio
import contextvars
import inspect
import logging
import threading
from dataclasses import dataclass
Expand Down Expand Up @@ -57,15 +58,8 @@ def defn(fn: Optional[ActivityFunc] = None, *, name: Optional[str] = None):
"""

def with_name(name: str, fn: ActivityFunc) -> ActivityFunc:
# Validate the activity
if not callable(fn):
raise TypeError("Activity is not callable")
elif not fn.__code__:
raise TypeError("Activity callable missing __code__")
elif fn.__code__.co_kwonlyargcount:
raise TypeError("Activity cannot have keyword-only arguments")
# Set the name
setattr(fn, "__temporal_activity_name", name)
# This performs validation
_Definition._apply_to_callable(fn, name)
return fn

# If name option is available, return decorator function
Expand Down Expand Up @@ -320,7 +314,7 @@ def process(
) -> Tuple[Any, MutableMapping[str, Any]]:
"""Override to add activity details."""
msg, kwargs = super().process(msg, kwargs)
if self.activity_info_on_extra or self.activity_info_on_extra:
if self.activity_info_on_message or self.activity_info_on_extra:
context = _current_context.get(None)
if context:
if self.activity_info_on_message:
Expand All @@ -342,3 +336,45 @@ def base_logger(self) -> logging.Logger:

#: Logger that will have contextual activity details embedded.
logger = LoggerAdapter(logging.getLogger(__name__), None)


@dataclass
class _Definition:
name: str
fn: Callable
is_async: bool

@staticmethod
def from_callable(fn: Callable) -> Optional[_Definition]:
return getattr(fn, "__temporal_activity_definition", None)

@staticmethod
def must_from_callable(fn: Callable) -> _Definition:
ret = _Definition.from_callable(fn)
if ret:
return ret
fn_name = getattr(fn, "__name__", "<unknown>")
raise TypeError(
f"Activity {fn_name} missing attributes, was it decorated with @activity.defn?"
)

@staticmethod
def _apply_to_callable(fn: Callable, activity_name: str) -> None:
# Validate the activity
if hasattr(fn, "__temporal_activity_definition"):
raise ValueError("Function already contains activity definition")
elif not callable(fn):
raise TypeError("Activity is not callable")
elif not fn.__code__:
raise TypeError("Activity callable missing __code__")
elif fn.__code__.co_kwonlyargcount:
raise TypeError("Activity cannot have keyword-only arguments")
setattr(
fn,
"__temporal_activity_definition",
_Definition(
name=activity_name,
fn=fn,
is_async=inspect.iscoroutinefunction(fn),
),
)
38 changes: 19 additions & 19 deletions temporalio/bridge/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions temporalio/bridge/proto/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,6 @@
from .core_interface_pb2 import ActivityHeartbeat, ActivityTaskCompletion

__all__ = [
"ActivityHeartbeat",
"ActivityTaskCompletion",
]
2 changes: 1 addition & 1 deletion temporalio/bridge/sdk-core
Submodule sdk-core updated 69 files
+5 −4 .buildkite/docker/docker-compose.yaml
+1 −0 .gitignore
+1 −1 bridge-ffi/include/sdk-core-bridge.h
+9 −9 bridge-ffi/src/lib.rs
+14 −14 bridge-ffi/src/wrappers.rs
+4 −6 client/Cargo.toml
+129 −156 client/src/lib.rs
+66 −1 client/src/metrics.rs
+0 −171 client/src/mocks.rs
+6 −6 client/src/raw.rs
+51 −51 client/src/retry.rs
+1 −1 core-api/Cargo.toml
+1 −8 core-api/src/lib.rs
+9 −0 core-api/src/worker.rs
+2 −3 core/Cargo.toml
+2 −2 core/benches/workflow_replay.rs
+10 −3 core/src/abstractions.rs
+43 −50 core/src/core_tests/activity_tasks.rs
+10 −11 core/src/core_tests/child_workflows.rs
+12 −22 core/src/core_tests/determinism.rs
+29 −38 core/src/core_tests/local_activities.rs
+9 −9 core/src/core_tests/mod.rs
+293 −24 core/src/core_tests/queries.rs
+10 −40 core/src/core_tests/workers.rs
+262 −59 core/src/core_tests/workflow_tasks.rs
+36 −58 core/src/lib.rs
+11 −0 core/src/pending_activations.rs
+4 −6 core/src/pollers/mod.rs
+19 −14 core/src/pollers/poll_buffer.rs
+6 −20 core/src/replay/mod.rs
+1 −1 core/src/telemetry/mod.rs
+91 −45 core/src/test_help/mod.rs
+11 −8 core/src/worker/activities.rs
+28 −25 core/src/worker/activities/activity_heartbeat_manager.rs
+58 −6 core/src/worker/activities/local_activities.rs
+209 −0 core/src/worker/client.rs
+86 −0 core/src/worker/client/mocks.rs
+111 −51 core/src/worker/mod.rs
+14 −14 core/src/workflow/history_update.rs
+1 −1 core/src/workflow/machines/mod.rs
+14 −3 core/src/workflow/machines/workflow_machines.rs
+5 −2 core/src/workflow/mod.rs
+41 −2 core/src/workflow/workflow_tasks/cache_manager.rs
+86 −13 core/src/workflow/workflow_tasks/concurrency_manager.rs
+266 −127 core/src/workflow/workflow_tasks/mod.rs
+1 −1 protos/local/temporal/sdk/core/bridge/bridge.proto
+3 −7 sdk-core-protos/src/history_info.rs
+1 −0 sdk/Cargo.toml
+223 −0 sdk/src/activity_context.rs
+8 −2 sdk/src/interceptors.rs
+176 −134 sdk/src/lib.rs
+1 −1 test-utils/Cargo.toml
+3 −3 test-utils/src/histfetch.rs
+64 −57 test-utils/src/lib.rs
+13 −6 tests/integ_tests/client_tests.rs
+7 −4 tests/integ_tests/heartbeat_tests.rs
+4 −2 tests/integ_tests/polling_tests.rs
+23 −14 tests/integ_tests/queries_tests.rs
+98 −19 tests/integ_tests/workflow_tests.rs
+57 −34 tests/integ_tests/workflow_tests/activities.rs
+4 −5 tests/integ_tests/workflow_tests/cancel_wf.rs
+25 −8 tests/integ_tests/workflow_tests/continue_as_new.rs
+33 −17 tests/integ_tests/workflow_tests/local_activities.rs
+2 −28 tests/integ_tests/workflow_tests/replay.rs
+1 −1 tests/integ_tests/workflow_tests/resets.rs
+6 −3 tests/integ_tests/workflow_tests/timers.rs
+4 −4 tests/integ_tests/workflow_tests/upsert_search_attrs.rs
+78 −6 tests/load_tests.rs
+5 −5 tests/main.rs
12 changes: 6 additions & 6 deletions temporalio/bridge/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,15 @@ use pyo3_asyncio::tokio::future_into_py;
use std::collections::HashMap;
use std::time::Duration;
use temporal_client::{
ConfiguredClient, RetryConfig, RetryGateway, ServerGatewayOptions, ServerGatewayOptionsBuilder,
TlsConfig, WorkflowService, WorkflowServiceClientWithMetrics,
ClientOptions, ClientOptionsBuilder, ConfiguredClient, RetryClient, RetryConfig, TlsConfig,
WorkflowService, WorkflowServiceClientWithMetrics,
};
use tonic;
use url::Url;

pyo3::create_exception!(temporal_sdk_bridge, RPCError, PyException);

type Client = RetryGateway<ConfiguredClient<WorkflowServiceClientWithMetrics>>;
type Client = RetryClient<ConfiguredClient<WorkflowServiceClientWithMetrics>>;

#[pyclass]
pub struct ClientRef {
Expand Down Expand Up @@ -51,7 +51,7 @@ struct ClientRetryConfig {

pub fn connect_client(py: Python, config: ClientConfig) -> PyResult<&PyAny> {
// TODO(cretz): Add metrics_meter?
let opts: ServerGatewayOptions = config.try_into()?;
let opts: ClientOptions = config.try_into()?;
future_into_py(py, async move {
Ok(ClientRef {
retry_client: opts.connect_no_namespace(None).await.map_err(|err| {
Expand Down Expand Up @@ -226,11 +226,11 @@ where
}
}

impl TryFrom<ClientConfig> for temporal_client::ServerGatewayOptions {
impl TryFrom<ClientConfig> for ClientOptions {
type Error = PyErr;

fn try_from(opts: ClientConfig) -> PyResult<Self> {
let mut gateway_opts = ServerGatewayOptionsBuilder::default();
let mut gateway_opts = ClientOptionsBuilder::default();
gateway_opts
.target_url(
Url::parse(&opts.target_url)
Expand Down
Loading