Skip to content

Update SDK Core and PyO3 #31

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
632 changes: 274 additions & 358 deletions temporalio/bridge/Cargo.lock

Large diffs are not rendered by default.

5 changes: 3 additions & 2 deletions temporalio/bridge/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,11 @@ crate-type = ["cdylib"]

[dependencies]
log = "0.4"
parking_lot = "0.12"
prost = "0.9"
prost-types = "0.9"
pyo3 = { version = "0.15", features = ["extension-module", "abi3-py37"] }
pyo3-asyncio = { version = "0.15", features = ["tokio-runtime"] }
pyo3 = { version = "0.16", features = ["extension-module", "abi3-py37"] }
pyo3-asyncio = { version = "0.16", features = ["tokio-runtime"] }
temporal-client = { version = "0.1.0", path = "./sdk-core/client" }
temporal-sdk-core = { version = "0.1.0", path = "./sdk-core/core" }
temporal-sdk-core-api = { version = "0.1.0", path = "./sdk-core/core-api" }
Expand Down
2 changes: 1 addition & 1 deletion temporalio/bridge/sdk-core
Submodule sdk-core updated 104 files
22 changes: 18 additions & 4 deletions temporalio/bridge/src/client.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use parking_lot::RwLock;
use pyo3::exceptions::{PyException, PyRuntimeError, PyValueError};
use pyo3::prelude::*;
use pyo3_asyncio::tokio::future_into_py;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use temporal_client::{
ClientOptions, ClientOptionsBuilder, ConfiguredClient, RetryClient, RetryConfig, TlsConfig,
Expand Down Expand Up @@ -51,12 +53,20 @@ struct ClientRetryConfig {

pub fn connect_client(py: Python, config: ClientConfig) -> PyResult<&PyAny> {
// TODO(cretz): Add metrics_meter?
let headers = if config.static_headers.is_empty() {
None
} else {
Some(Arc::new(RwLock::new(config.static_headers.clone())))
};
let opts: ClientOptions = config.try_into()?;
future_into_py(py, async move {
Ok(ClientRef {
retry_client: opts.connect_no_namespace(None).await.map_err(|err| {
PyRuntimeError::new_err(format!("Failed client connect: {}", err))
})?,
retry_client: opts
.connect_no_namespace(None, headers)
.await
.map_err(|err| {
PyRuntimeError::new_err(format!("Failed client connect: {}", err))
})?,
})
})
}
Expand Down Expand Up @@ -96,9 +106,14 @@ impl ClientRef {
"get_search_attributes" => {
rpc_call!(retry_client, retry, get_search_attributes, req)
}
"get_system_info" => rpc_call!(retry_client, retry, get_system_info, req),
"get_workflow_execution_history" => {
rpc_call!(retry_client, retry, get_workflow_execution_history, req)
}
// TODO(cretz): Fix when https://github.com/temporalio/sdk-core/issues/335 fixed
// "get_workflow_execution_history_reverse" => {
// rpc_call!(retry_client, retry, get_workflow_execution_history_reverse, req)
// }
"list_archived_workflow_executions" => {
rpc_call!(retry_client, retry, list_archived_workflow_executions, req)
}
Expand Down Expand Up @@ -238,7 +253,6 @@ impl TryFrom<ClientConfig> for ClientOptions {
)
.client_name(opts.client_name)
.client_version(opts.client_version)
.static_headers(opts.static_headers)
.identity(opts.identity)
.worker_binary_id(opts.worker_binary_id)
.retry_config(
Expand Down
77 changes: 59 additions & 18 deletions temporalio/bridge/src/telemetry.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,39 @@
use pyo3::exceptions::{PyRuntimeError, PyValueError};
use pyo3::prelude::*;
use std::collections::HashMap;
use std::net::SocketAddr;
use std::str::FromStr;
use temporal_sdk_core::{telemetry_init, TelemetryOptions, TelemetryOptionsBuilder};
use temporal_sdk_core::{
telemetry_init, Logger, MetricsExporter, OtelCollectorOptions, TelemetryOptions,
TelemetryOptionsBuilder, TraceExporter,
};
use url::Url;

#[pyclass]
pub struct TelemetryRef {
// TODO(cretz): This is private
// telemetry: &'static temporal_sdk_core::telemetry::GlobalTelemDat,
// telemetry: &'static temporal_sdk_core::telemetry::GlobalTelemDat,
}

#[derive(FromPyObject)]
pub struct TelemetryConfig {
otel_collector_url: Option<String>,
tracing_filter: Option<String>,
otel_tracing: Option<OtelCollectorConfig>,
log_console: bool,
log_forwarding_level: Option<String>,
prometheus_export_bind_address: Option<String>,
otel_metrics: Option<OtelCollectorConfig>,
prometheus_metrics: Option<PrometheusMetricsConfig>,
}

#[derive(FromPyObject)]
pub struct OtelCollectorConfig {
url: String,
headers: HashMap<String, String>,
}

#[derive(FromPyObject)]
pub struct PrometheusMetricsConfig {
bind_address: String,
}

pub fn init_telemetry(config: TelemetryConfig) -> PyResult<TelemetryRef> {
Expand All @@ -34,28 +51,52 @@ impl TryFrom<TelemetryConfig> for TelemetryOptions {

fn try_from(conf: TelemetryConfig) -> PyResult<Self> {
let mut build = TelemetryOptionsBuilder::default();
if let Some(ref v) = conf.otel_collector_url {
build.otel_collector_url(
Url::parse(v)
.map_err(|err| PyValueError::new_err(format!("Invalid OTel URL: {}", err)))?,
);
}
if let Some(v) = conf.tracing_filter {
build.tracing_filter(v);
}
if let Some(v) = conf.otel_tracing {
build.tracing(TraceExporter::Otel(v.try_into()?));
}
if let Some(ref v) = conf.log_forwarding_level {
build.log_forwarding_level(
log::LevelFilter::from_str(v)
.map_err(|err| PyValueError::new_err(format!("Invalid log level: {}", err)))?,
);
if conf.log_console {
return Err(PyValueError::new_err(
"Cannot have log forwarding level and log console",
));
}
build.logging(Logger::Forward(log::LevelFilter::from_str(v).map_err(
|err| PyValueError::new_err(format!("Invalid log level: {}", err)),
)?));
} else if conf.log_console {
build.logging(Logger::Console);
}
if let Some(ref v) = conf.prometheus_export_bind_address {
build.prometheus_export_bind_address(SocketAddr::from_str(v).map_err(|err| {
PyValueError::new_err(format!("Invalid Prometheus address: {}", err))
})?);
if let Some(v) = conf.otel_metrics {
if conf.prometheus_metrics.is_some() {
return Err(PyValueError::new_err(
"Cannot have OTel and Prometheus metrics",
));
}
build.metrics(MetricsExporter::Otel(v.try_into()?));
} else if let Some(v) = conf.prometheus_metrics {
build.metrics(MetricsExporter::Prometheus(
SocketAddr::from_str(&v.bind_address).map_err(|err| {
PyValueError::new_err(format!("Invalid Prometheus address: {}", err))
})?,
));
}
build
.build()
.map_err(|err| PyValueError::new_err(format!("Invalid telemetry config: {}", err)))
}
}

impl TryFrom<OtelCollectorConfig> for OtelCollectorOptions {
type Error = PyErr;

fn try_from(conf: OtelCollectorConfig) -> PyResult<Self> {
Ok(OtelCollectorOptions {
url: Url::parse(&conf.url)
.map_err(|err| PyValueError::new_err(format!("Invalid OTel URL: {}", err)))?,
headers: conf.headers,
})
}
}
25 changes: 22 additions & 3 deletions temporalio/bridge/telemetry.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
"""Telemetry for SDK Core."""

from __future__ import annotations

import warnings
from dataclasses import dataclass
from typing import Optional
from typing import Mapping, Optional

import temporalio.bridge.temporal_sdk_bridge

Expand All @@ -11,10 +13,27 @@
class TelemetryConfig:
"""Python representation of the Rust struct for configuring telemetry."""

otel_collector_url: Optional[str] = None
tracing_filter: Optional[str] = None
otel_tracing: Optional[OtelCollectorConfig] = None
log_console: bool = False
log_forwarding_level: Optional[str] = None
prometheus_export_bind_address: Optional[str] = None
otel_metrics: Optional[OtelCollectorConfig] = None
prometheus_metrics: Optional[PrometheusMetricsConfig] = None


@dataclass
class OtelCollectorConfig:
"""Python representation of the Rust struct for configuring OTel."""

url: str
headers: Mapping[str, str]


@dataclass
class PrometheusMetricsConfig:
"""Python representation of the Rust struct for configuring Prometheus."""

bind_address: str


_inited = False
Expand Down
11 changes: 11 additions & 0 deletions temporalio/workflow_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,11 +171,22 @@ def __init__(self, config: ConnectConfig) -> None:
wsv1.GetSearchAttributesRequest,
wsv1.GetSearchAttributesResponse,
)
self.get_system_info = self._new_call(
"get_system_info",
wsv1.GetSystemInfoRequest,
wsv1.GetSystemInfoResponse,
)
self.get_workflow_execution_history = self._new_call(
"get_workflow_execution_history",
wsv1.GetWorkflowExecutionHistoryRequest,
wsv1.GetWorkflowExecutionHistoryResponse,
)
# TODO(cretz): Fix when https://github.com/temporalio/sdk-core/issues/335 fixed
# self.get_workflow_execution_history_reverse = self._new_call(
# "get_workflow_execution_history_reverse",
# wsv1.GetWorkflowExecutionHistoryReverseRequest,
# wsv1.GetWorkflowExecutionHistoryReverseResponse,
# )
self.list_archived_workflow_executions = self._new_call(
"list_archived_workflow_executions",
wsv1.ListArchivedWorkflowExecutionsRequest,
Expand Down
4 changes: 2 additions & 2 deletions tests/test_workflow_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ def test_all_grpc_calls_present(client: Client):
# Collect gRPC service calls with a fake channel
channel = CallCollectingChannel()
temporalio.api.workflowservice.v1.WorkflowServiceStub(channel)
# TODO(cretz): Remove once get_system_info is in core
del channel.calls["get_system_info"]
# TODO(cretz): Remove once https://github.com/temporalio/sdk-core/issues/335 fixed
del channel.calls["get_workflow_execution_history_reverse"]

assert channel.calls == workflow_service_calls

Expand Down