Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
440 changes: 305 additions & 135 deletions temporalio/Cargo.lock

Large diffs are not rendered by default.

11 changes: 6 additions & 5 deletions temporalio/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,12 @@ license-file = "LICENSE"
derive_builder = "0.20"
derive_more = { version = "2.0", features = ["constructor", "display", "from", "into", "debug", "try_into"] }
thiserror = "2"
tonic = "0.13"
tonic-build = "0.13"
opentelemetry = { version = "0.30", features = ["metrics"] }
prost = "0.13"
prost-types = "0.13"
tonic = "0.14"
tonic-prost = "0.14"
tonic-prost-build = "0.14"
opentelemetry = { version = "0.31", features = ["metrics"] }
prost = "0.14"
prost-types = { version = "0.7", package = "prost-wkt-types" }

[workspace.lints.rust]
unreachable_pub = "warn"
15 changes: 7 additions & 8 deletions temporalio/ext/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,14 @@ futures = "0.3"
log = "0.4"
magnus = "0.7"
parking_lot = "0.12"
prost = "0.13"
prost = { workspace = true }
rb-sys = "0.9"
temporal-client = { version = "0.1.0", path = "./sdk-core/client" }
temporal-sdk-core = { version = "0.1.0", path = "./sdk-core/core", features = ["ephemeral-server"] }
temporal-sdk-core-api = { version = "0.1.0", path = "./sdk-core/core-api", features = ["envconfig"] }
temporal-sdk-core-protos = { version = "0.1.0", path = "./sdk-core/sdk-core-protos" }
tokio = "1.37"
temporalio-client = { version = "0.1.0", path = "./sdk-core/crates/client" }
temporalio-common = { version = "0.1.0", path = "./sdk-core/crates/common", features = ["envconfig"] }
temporalio-sdk-core = { version = "0.1.0", path = "./sdk-core/crates/sdk-core", features = ["ephemeral-server"] }
tokio = "1.47"
tokio-stream = "0.1"
tokio-util = "0.7"
tonic = "0.13"
tonic = { workspace = true }
tracing = "0.1"
url = "2.2"
url = "2.5"
2 changes: 1 addition & 1 deletion temporalio/ext/sdk-core
Submodule sdk-core updated 367 files
154 changes: 83 additions & 71 deletions temporalio/ext/src/client.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
use std::{collections::HashMap, future::Future, marker::PhantomData, time::Duration};

use temporal_client::{
ClientInitError, ClientKeepAliveConfig, ClientOptionsBuilder, ClientTlsConfig,
ConfiguredClient, HttpConnectProxyOptions, RetryClient, RetryConfig,
TemporalServiceClientWithMetrics, TlsConfig,
use temporalio_client::{
ClientInitError, ClientKeepAliveOptions, ClientOptions, ClientTlsOptions, ConfiguredClient,
HttpConnectProxyOptions, RetryClient, RetryOptions, TemporalServiceClient, TlsOptions,
};

use magnus::{
Expand Down Expand Up @@ -52,7 +51,7 @@ pub fn init(ruby: &Ruby) -> Result<(), Error> {
Ok(())
}

type CoreClient = RetryClient<ConfiguredClient<TemporalServiceClientWithMetrics>>;
type CoreClient = RetryClient<ConfiguredClient<TemporalServiceClient>>;

#[derive(DataTypeFunctions, TypedData)]
#[magnus(class = "Temporalio::Internal::Bridge::Client", free_immediately)]
Expand Down Expand Up @@ -86,10 +85,12 @@ impl Client {
runtime.handle.fork_check("create client")?;
let ruby = Ruby::get().expect("Ruby not available");
// Build options
let mut opts_build = ClientOptionsBuilder::default();
let tls = options.child(id!("tls"))?;
let headers = partition_grpc_headers(&ruby, options.member(id!("rpc_metadata"))?)?;
opts_build
let rpc_retry = options
.child(id!("rpc_retry"))?
.ok_or_else(|| error!("Missing rpc_retry"))?;
let tls = options.child(id!("tls"))?;
let opts = ClientOptions::builder()
.target_url(
Url::parse(
format!(
Expand All @@ -103,71 +104,82 @@ impl Client {
)
.client_name(options.member::<String>(id!("client_name"))?)
.client_version(options.member::<String>(id!("client_version"))?)
.headers(Some(headers.headers))
.binary_headers(Some(headers.binary_headers))
.api_key(options.member(id!("api_key"))?)
.identity(options.member::<String>(id!("identity"))?);
if let Some(tls) = tls {
opts_build.tls_cfg(TlsConfig {
client_tls_config: match (
tls.member::<Option<RString>>(id!("client_cert"))?,
tls.member::<Option<RString>>(id!("client_private_key"))?,
) {
(None, None) => None,
(Some(client_cert), Some(client_private_key)) => Some(ClientTlsConfig {
// These are unsafe because of lifetime issues, but we copy right away
client_cert: unsafe { client_cert.as_slice().to_vec() },
client_private_key: unsafe { client_private_key.as_slice().to_vec() },
}),
_ => {
return Err(error!(
"Must have both client cert and private key or neither"
));
}
.headers(headers.headers)
.binary_headers(headers.binary_headers)
.maybe_api_key(options.member::<Option<String>>(id!("api_key"))?)
.identity(options.member::<String>(id!("identity"))?)
.maybe_tls_options(if let Some(tls) = tls {
Some(TlsOptions {
client_tls_options: match (
tls.member::<Option<RString>>(id!("client_cert"))?,
tls.member::<Option<RString>>(id!("client_private_key"))?,
) {
(None, None) => None,
(Some(client_cert), Some(client_private_key)) => Some(ClientTlsOptions {
// These are unsafe because of lifetime issues, but we copy right away
client_cert: unsafe { client_cert.as_slice().to_vec() },
client_private_key: unsafe { client_private_key.as_slice().to_vec() },
}),
_ => {
return Err(error!(
"Must have both client cert and private key or neither"
));
}
},
server_root_ca_cert: tls
.member::<Option<RString>>(id!("server_root_ca_cert"))?
.map(|rstr| unsafe { rstr.as_slice().to_vec() }),
domain: tls.member(id!("domain"))?,
})
} else {
None
})
.retry_options(RetryOptions {
initial_interval: Duration::from_secs_f64(
rpc_retry.member(id!("initial_interval"))?,
),
randomization_factor: rpc_retry.member(id!("randomization_factor"))?,
multiplier: rpc_retry.member(id!("multiplier"))?,
max_interval: Duration::from_secs_f64(rpc_retry.member(id!("max_interval"))?),
max_elapsed_time: match rpc_retry.member::<f64>(id!("max_elapsed_time"))? {
// 0 means none
0.0 => None,
val => Some(Duration::from_secs_f64(val)),
},
server_root_ca_cert: tls
.member::<Option<RString>>(id!("server_root_ca_cert"))?
.map(|rstr| unsafe { rstr.as_slice().to_vec() }),
domain: tls.member(id!("domain"))?,
});
}
let rpc_retry = options
.child(id!("rpc_retry"))?
.ok_or_else(|| error!("Missing rpc_retry"))?;
opts_build.retry_config(RetryConfig {
initial_interval: Duration::from_secs_f64(rpc_retry.member(id!("initial_interval"))?),
randomization_factor: rpc_retry.member(id!("randomization_factor"))?,
multiplier: rpc_retry.member(id!("multiplier"))?,
max_interval: Duration::from_secs_f64(rpc_retry.member(id!("max_interval"))?),
max_elapsed_time: match rpc_retry.member::<f64>(id!("max_elapsed_time"))? {
// 0 means none
0.0 => None,
val => Some(Duration::from_secs_f64(val)),
},
max_retries: rpc_retry.member(id!("max_retries"))?,
});
if let Some(keep_alive) = options.child(id!("keep_alive"))? {
opts_build.keep_alive(Some(ClientKeepAliveConfig {
interval: Duration::from_secs_f64(keep_alive.member(id!("interval"))?),
timeout: Duration::from_secs_f64(keep_alive.member(id!("timeout"))?),
}));
}
if let Some(proxy) = options.child(id!("http_connect_proxy"))? {
opts_build.http_connect_proxy(Some(HttpConnectProxyOptions {
target_addr: proxy.member(id!("target_host"))?,
basic_auth: match (
proxy.member::<Option<String>>(id!("basic_auth_user"))?,
proxy.member::<Option<String>>(id!("basic_auth_user"))?,
) {
(None, None) => None,
(Some(user), Some(pass)) => Some((user, pass)),
_ => return Err(error!("Must have both basic auth and pass or neither")),
max_retries: rpc_retry.member(id!("max_retries"))?,
})
.keep_alive(
if let Some(keep_alive) = options.child(id!("keep_alive"))? {
Some(ClientKeepAliveOptions {
interval: Duration::from_secs_f64(keep_alive.member(id!("interval"))?),
timeout: Duration::from_secs_f64(keep_alive.member(id!("timeout"))?),
})
} else {
None
},
}));
}
let opts = opts_build
.build()
.map_err(|err| error!("Invalid client options: {}", err))?;
)
.maybe_http_connect_proxy(
if let Some(proxy) = options.child(id!("http_connect_proxy"))? {
Some(HttpConnectProxyOptions {
target_addr: proxy.member(id!("target_host"))?,
basic_auth: match (
proxy.member::<Option<String>>(id!("basic_auth_user"))?,
proxy.member::<Option<String>>(id!("basic_auth_pass"))?,
) {
(None, None) => None,
(Some(user), Some(pass)) => Some((user, pass)),
_ => {
return Err(error!(
"Must have both basic auth and pass or neither"
));
}
},
})
} else {
None
},
)
.build();

// Create client
let callback = AsyncCallback::from_queue(queue);
Expand Down
26 changes: 25 additions & 1 deletion temporalio/ext/src/client_rpc_generated.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Generated code. DO NOT EDIT!

use magnus::{Error, Ruby};
use temporal_client::{CloudService, OperatorService, TestService, WorkflowService};
use temporalio_client::{CloudService, OperatorService, TestService, WorkflowService};

use super::{error, rpc_call};
use crate::{
Expand Down Expand Up @@ -80,6 +80,9 @@ impl Client {
"describe_task_queue" => {
rpc_call!(self, callback, call, WorkflowService, describe_task_queue)
}
"describe_worker" => {
rpc_call!(self, callback, call, WorkflowService, describe_worker)
}
"describe_worker_deployment" => rpc_call!(
self,
callback,
Expand Down Expand Up @@ -420,6 +423,13 @@ impl Client {
WorkflowService,
set_worker_deployment_current_version
),
"set_worker_deployment_manager" => rpc_call!(
self,
callback,
call,
WorkflowService,
set_worker_deployment_manager
),
"set_worker_deployment_ramping_version" => rpc_call!(
self,
callback,
Expand Down Expand Up @@ -704,6 +714,13 @@ impl Client {
CloudService,
rename_custom_search_attribute
),
"set_service_account_namespace_access" => rpc_call!(
self,
callback,
call,
CloudService,
set_service_account_namespace_access
),
"set_user_group_namespace_access" => rpc_call!(
self,
callback,
Expand Down Expand Up @@ -743,6 +760,13 @@ impl Client {
"update_user_group" => {
rpc_call!(self, callback, call, CloudService, update_user_group)
}
"validate_account_audit_log_sink" => rpc_call!(
self,
callback,
call,
CloudService,
validate_account_audit_log_sink
),
"validate_namespace_export_sink" => rpc_call!(
self,
callback,
Expand Down
2 changes: 1 addition & 1 deletion temporalio/ext/src/envconfig.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::collections::HashMap;

use magnus::{Error, RHash, RString, Ruby, class, function, prelude::*, scan_args};
use temporal_sdk_core_api::envconfig::{
use temporalio_common::envconfig::{
ClientConfig as CoreClientConfig, ClientConfigCodec,
ClientConfigProfile as CoreClientConfigProfile, ClientConfigTLS as CoreClientConfigTLS,
DataSource, LoadClientConfigOptions, LoadClientConfigProfileOptions,
Expand Down
2 changes: 1 addition & 1 deletion temporalio/ext/src/metric.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use magnus::{
r_hash::ForEach,
value::{IntoId, Lazy, Qfalse, Qtrue},
};
use temporal_sdk_core_api::telemetry::metrics::{
use temporalio_common::telemetry::metrics::{
self, BufferInstrumentRef, CustomMetricAttributes, MetricEvent,
};

Expand Down
36 changes: 23 additions & 13 deletions temporalio/ext/src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,15 @@ use std::str::FromStr;
use std::sync::mpsc::{Receiver, Sender, channel};
use std::time::Duration;
use std::{future::Future, sync::Arc};
use temporal_sdk_core::telemetry::{
MetricsCallBuffer, build_otlp_metric_exporter, start_prometheus_metric_exporter,
};
use temporal_sdk_core::{CoreRuntime, TokioRuntimeBuilder};
use temporal_sdk_core_api::telemetry::HistogramBucketOverrides;
use temporal_sdk_core_api::telemetry::{
use temporalio_common::telemetry::HistogramBucketOverrides;
use temporalio_common::telemetry::{
Logger, MetricTemporality, OtelCollectorOptionsBuilder, OtlpProtocol,
PrometheusExporterOptionsBuilder, TelemetryOptionsBuilder, metrics::MetricCallBufferer,
};
use temporalio_sdk_core::telemetry::{
MetricsCallBuffer, build_otlp_metric_exporter, start_prometheus_metric_exporter,
};
use temporalio_sdk_core::{CoreRuntime, RuntimeOptionsBuilder, TokioRuntimeBuilder};
use tracing::error as log_error;
use url::Url;

Expand Down Expand Up @@ -55,7 +55,7 @@ pub(crate) struct RuntimeHandle {
macro_rules! enter_sync {
($runtime:expr) => {
if let Some(subscriber) = $runtime.core.telemetry().trace_subscriber() {
temporal_sdk_core::telemetry::set_trace_subscriber_for_current_thread(subscriber);
temporalio_sdk_core::telemetry::set_trace_subscriber_for_current_thread(subscriber);
}
let _guard = $runtime.core.tokio_handle().enter();
};
Expand All @@ -71,12 +71,12 @@ pub(crate) enum AsyncCommand {
impl Runtime {
pub fn new(options: Struct) -> Result<Self, Error> {
// Build options
let mut opts_build = TelemetryOptionsBuilder::default();
let mut telemetry_opts_build = TelemetryOptionsBuilder::default();
let telemetry = options
.child(id!("telemetry"))?
.ok_or_else(|| error!("Missing telemetry options"))?;
if let Some(logging) = telemetry.child(id!("logging"))? {
opts_build.logging(
telemetry_opts_build.logging(
if let Some(_forward_to) = logging.member::<Option<Value>>(id!("forward_to"))? {
// TODO(cretz): This
return Err(error!("Forwarding not yet supported"));
Expand All @@ -90,14 +90,24 @@ impl Runtime {
// Set some metrics options now, but the metrics instance is late-bound
// after CoreRuntime created since it needs Tokio runtime
if let Some(metrics) = telemetry.child(id!("metrics"))? {
opts_build.attach_service_name(metrics.member(id!("attach_service_name"))?);
telemetry_opts_build.attach_service_name(metrics.member(id!("attach_service_name"))?);
if let Some(prefix) = metrics.member::<Option<String>>(id!("metric_prefix"))? {
opts_build.metric_prefix(prefix);
telemetry_opts_build.metric_prefix(prefix);
}
}
let opts = opts_build
let opts = RuntimeOptionsBuilder::default()
.telemetry_options(
telemetry_opts_build
.build()
.map_err(|err| error!("Invalid telemetry options: {}", err))?,
)
.heartbeat_interval(
options
.member::<Option<f64>>(id!("worker_heartbeat_interval"))?
.map(Duration::from_secs_f64),
)
.build()
.map_err(|err| error!("Invalid telemetry options: {}", err))?;
.map_err(|err| error!("Invalid runtime options: {}", err))?;

// Create core runtime
let mut core = CoreRuntime::new(opts, TokioRuntimeBuilder::default())
Expand Down
2 changes: 1 addition & 1 deletion temporalio/ext/src/testing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use magnus::{
DataTypeFunctions, Error, Ruby, TypedData, Value, class, function, method, prelude::*,
};
use parking_lot::Mutex;
use temporal_sdk_core::ephemeral_server::{
use temporalio_sdk_core::ephemeral_server::{
self, EphemeralExe, EphemeralExeVersion, TemporalDevServerConfigBuilder,
TestServerConfigBuilder,
};
Expand Down
Loading
Loading