Skip to content

Commit e4823d0

Browse files
authored
Update Core with worker heartbeating support (#368)
1 parent c84290a commit e4823d0

File tree

34 files changed

+650
-309
lines changed

34 files changed

+650
-309
lines changed

temporalio/Cargo.lock

Lines changed: 305 additions & 135 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

temporalio/Cargo.toml

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,12 @@ license-file = "LICENSE"
1515
derive_builder = "0.20"
1616
derive_more = { version = "2.0", features = ["constructor", "display", "from", "into", "debug", "try_into"] }
1717
thiserror = "2"
18-
tonic = "0.13"
19-
tonic-build = "0.13"
20-
opentelemetry = { version = "0.30", features = ["metrics"] }
21-
prost = "0.13"
22-
prost-types = "0.13"
18+
tonic = "0.14"
19+
tonic-prost = "0.14"
20+
tonic-prost-build = "0.14"
21+
opentelemetry = { version = "0.31", features = ["metrics"] }
22+
prost = "0.14"
23+
prost-types = { version = "0.7", package = "prost-wkt-types" }
2324

2425
[workspace.lints.rust]
2526
unreachable_pub = "warn"

temporalio/ext/Cargo.toml

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,14 @@ futures = "0.3"
1515
log = "0.4"
1616
magnus = "0.7"
1717
parking_lot = "0.12"
18-
prost = "0.13"
18+
prost = { workspace = true }
1919
rb-sys = "0.9"
20-
temporal-client = { version = "0.1.0", path = "./sdk-core/client" }
21-
temporal-sdk-core = { version = "0.1.0", path = "./sdk-core/core", features = ["ephemeral-server"] }
22-
temporal-sdk-core-api = { version = "0.1.0", path = "./sdk-core/core-api", features = ["envconfig"] }
23-
temporal-sdk-core-protos = { version = "0.1.0", path = "./sdk-core/sdk-core-protos" }
24-
tokio = "1.37"
20+
temporalio-client = { version = "0.1.0", path = "./sdk-core/crates/client" }
21+
temporalio-common = { version = "0.1.0", path = "./sdk-core/crates/common", features = ["envconfig"] }
22+
temporalio-sdk-core = { version = "0.1.0", path = "./sdk-core/crates/sdk-core", features = ["ephemeral-server"] }
23+
tokio = "1.47"
2524
tokio-stream = "0.1"
2625
tokio-util = "0.7"
27-
tonic = "0.13"
26+
tonic = { workspace = true }
2827
tracing = "0.1"
29-
url = "2.2"
28+
url = "2.5"

temporalio/ext/sdk-core

Submodule sdk-core updated 367 files

temporalio/ext/src/client.rs

Lines changed: 83 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,8 @@
11
use std::{collections::HashMap, future::Future, marker::PhantomData, time::Duration};
22

3-
use temporal_client::{
4-
ClientInitError, ClientKeepAliveConfig, ClientOptionsBuilder, ClientTlsConfig,
5-
ConfiguredClient, HttpConnectProxyOptions, RetryClient, RetryConfig,
6-
TemporalServiceClientWithMetrics, TlsConfig,
3+
use temporalio_client::{
4+
ClientInitError, ClientKeepAliveOptions, ClientOptions, ClientTlsOptions, ConfiguredClient,
5+
HttpConnectProxyOptions, RetryClient, RetryOptions, TemporalServiceClient, TlsOptions,
76
};
87

98
use magnus::{
@@ -52,7 +51,7 @@ pub fn init(ruby: &Ruby) -> Result<(), Error> {
5251
Ok(())
5352
}
5453

55-
type CoreClient = RetryClient<ConfiguredClient<TemporalServiceClientWithMetrics>>;
54+
type CoreClient = RetryClient<ConfiguredClient<TemporalServiceClient>>;
5655

5756
#[derive(DataTypeFunctions, TypedData)]
5857
#[magnus(class = "Temporalio::Internal::Bridge::Client", free_immediately)]
@@ -86,10 +85,12 @@ impl Client {
8685
runtime.handle.fork_check("create client")?;
8786
let ruby = Ruby::get().expect("Ruby not available");
8887
// Build options
89-
let mut opts_build = ClientOptionsBuilder::default();
90-
let tls = options.child(id!("tls"))?;
9188
let headers = partition_grpc_headers(&ruby, options.member(id!("rpc_metadata"))?)?;
92-
opts_build
89+
let rpc_retry = options
90+
.child(id!("rpc_retry"))?
91+
.ok_or_else(|| error!("Missing rpc_retry"))?;
92+
let tls = options.child(id!("tls"))?;
93+
let opts = ClientOptions::builder()
9394
.target_url(
9495
Url::parse(
9596
format!(
@@ -103,71 +104,82 @@ impl Client {
103104
)
104105
.client_name(options.member::<String>(id!("client_name"))?)
105106
.client_version(options.member::<String>(id!("client_version"))?)
106-
.headers(Some(headers.headers))
107-
.binary_headers(Some(headers.binary_headers))
108-
.api_key(options.member(id!("api_key"))?)
109-
.identity(options.member::<String>(id!("identity"))?);
110-
if let Some(tls) = tls {
111-
opts_build.tls_cfg(TlsConfig {
112-
client_tls_config: match (
113-
tls.member::<Option<RString>>(id!("client_cert"))?,
114-
tls.member::<Option<RString>>(id!("client_private_key"))?,
115-
) {
116-
(None, None) => None,
117-
(Some(client_cert), Some(client_private_key)) => Some(ClientTlsConfig {
118-
// These are unsafe because of lifetime issues, but we copy right away
119-
client_cert: unsafe { client_cert.as_slice().to_vec() },
120-
client_private_key: unsafe { client_private_key.as_slice().to_vec() },
121-
}),
122-
_ => {
123-
return Err(error!(
124-
"Must have both client cert and private key or neither"
125-
));
126-
}
107+
.headers(headers.headers)
108+
.binary_headers(headers.binary_headers)
109+
.maybe_api_key(options.member::<Option<String>>(id!("api_key"))?)
110+
.identity(options.member::<String>(id!("identity"))?)
111+
.maybe_tls_options(if let Some(tls) = tls {
112+
Some(TlsOptions {
113+
client_tls_options: match (
114+
tls.member::<Option<RString>>(id!("client_cert"))?,
115+
tls.member::<Option<RString>>(id!("client_private_key"))?,
116+
) {
117+
(None, None) => None,
118+
(Some(client_cert), Some(client_private_key)) => Some(ClientTlsOptions {
119+
// These are unsafe because of lifetime issues, but we copy right away
120+
client_cert: unsafe { client_cert.as_slice().to_vec() },
121+
client_private_key: unsafe { client_private_key.as_slice().to_vec() },
122+
}),
123+
_ => {
124+
return Err(error!(
125+
"Must have both client cert and private key or neither"
126+
));
127+
}
128+
},
129+
server_root_ca_cert: tls
130+
.member::<Option<RString>>(id!("server_root_ca_cert"))?
131+
.map(|rstr| unsafe { rstr.as_slice().to_vec() }),
132+
domain: tls.member(id!("domain"))?,
133+
})
134+
} else {
135+
None
136+
})
137+
.retry_options(RetryOptions {
138+
initial_interval: Duration::from_secs_f64(
139+
rpc_retry.member(id!("initial_interval"))?,
140+
),
141+
randomization_factor: rpc_retry.member(id!("randomization_factor"))?,
142+
multiplier: rpc_retry.member(id!("multiplier"))?,
143+
max_interval: Duration::from_secs_f64(rpc_retry.member(id!("max_interval"))?),
144+
max_elapsed_time: match rpc_retry.member::<f64>(id!("max_elapsed_time"))? {
145+
// 0 means none
146+
0.0 => None,
147+
val => Some(Duration::from_secs_f64(val)),
127148
},
128-
server_root_ca_cert: tls
129-
.member::<Option<RString>>(id!("server_root_ca_cert"))?
130-
.map(|rstr| unsafe { rstr.as_slice().to_vec() }),
131-
domain: tls.member(id!("domain"))?,
132-
});
133-
}
134-
let rpc_retry = options
135-
.child(id!("rpc_retry"))?
136-
.ok_or_else(|| error!("Missing rpc_retry"))?;
137-
opts_build.retry_config(RetryConfig {
138-
initial_interval: Duration::from_secs_f64(rpc_retry.member(id!("initial_interval"))?),
139-
randomization_factor: rpc_retry.member(id!("randomization_factor"))?,
140-
multiplier: rpc_retry.member(id!("multiplier"))?,
141-
max_interval: Duration::from_secs_f64(rpc_retry.member(id!("max_interval"))?),
142-
max_elapsed_time: match rpc_retry.member::<f64>(id!("max_elapsed_time"))? {
143-
// 0 means none
144-
0.0 => None,
145-
val => Some(Duration::from_secs_f64(val)),
146-
},
147-
max_retries: rpc_retry.member(id!("max_retries"))?,
148-
});
149-
if let Some(keep_alive) = options.child(id!("keep_alive"))? {
150-
opts_build.keep_alive(Some(ClientKeepAliveConfig {
151-
interval: Duration::from_secs_f64(keep_alive.member(id!("interval"))?),
152-
timeout: Duration::from_secs_f64(keep_alive.member(id!("timeout"))?),
153-
}));
154-
}
155-
if let Some(proxy) = options.child(id!("http_connect_proxy"))? {
156-
opts_build.http_connect_proxy(Some(HttpConnectProxyOptions {
157-
target_addr: proxy.member(id!("target_host"))?,
158-
basic_auth: match (
159-
proxy.member::<Option<String>>(id!("basic_auth_user"))?,
160-
proxy.member::<Option<String>>(id!("basic_auth_user"))?,
161-
) {
162-
(None, None) => None,
163-
(Some(user), Some(pass)) => Some((user, pass)),
164-
_ => return Err(error!("Must have both basic auth and pass or neither")),
149+
max_retries: rpc_retry.member(id!("max_retries"))?,
150+
})
151+
.keep_alive(
152+
if let Some(keep_alive) = options.child(id!("keep_alive"))? {
153+
Some(ClientKeepAliveOptions {
154+
interval: Duration::from_secs_f64(keep_alive.member(id!("interval"))?),
155+
timeout: Duration::from_secs_f64(keep_alive.member(id!("timeout"))?),
156+
})
157+
} else {
158+
None
165159
},
166-
}));
167-
}
168-
let opts = opts_build
169-
.build()
170-
.map_err(|err| error!("Invalid client options: {}", err))?;
160+
)
161+
.maybe_http_connect_proxy(
162+
if let Some(proxy) = options.child(id!("http_connect_proxy"))? {
163+
Some(HttpConnectProxyOptions {
164+
target_addr: proxy.member(id!("target_host"))?,
165+
basic_auth: match (
166+
proxy.member::<Option<String>>(id!("basic_auth_user"))?,
167+
proxy.member::<Option<String>>(id!("basic_auth_pass"))?,
168+
) {
169+
(None, None) => None,
170+
(Some(user), Some(pass)) => Some((user, pass)),
171+
_ => {
172+
return Err(error!(
173+
"Must have both basic auth and pass or neither"
174+
));
175+
}
176+
},
177+
})
178+
} else {
179+
None
180+
},
181+
)
182+
.build();
171183

172184
// Create client
173185
let callback = AsyncCallback::from_queue(queue);

temporalio/ext/src/client_rpc_generated.rs

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
// Generated code. DO NOT EDIT!
22

33
use magnus::{Error, Ruby};
4-
use temporal_client::{CloudService, OperatorService, TestService, WorkflowService};
4+
use temporalio_client::{CloudService, OperatorService, TestService, WorkflowService};
55

66
use super::{error, rpc_call};
77
use crate::{
@@ -80,6 +80,9 @@ impl Client {
8080
"describe_task_queue" => {
8181
rpc_call!(self, callback, call, WorkflowService, describe_task_queue)
8282
}
83+
"describe_worker" => {
84+
rpc_call!(self, callback, call, WorkflowService, describe_worker)
85+
}
8386
"describe_worker_deployment" => rpc_call!(
8487
self,
8588
callback,
@@ -420,6 +423,13 @@ impl Client {
420423
WorkflowService,
421424
set_worker_deployment_current_version
422425
),
426+
"set_worker_deployment_manager" => rpc_call!(
427+
self,
428+
callback,
429+
call,
430+
WorkflowService,
431+
set_worker_deployment_manager
432+
),
423433
"set_worker_deployment_ramping_version" => rpc_call!(
424434
self,
425435
callback,
@@ -704,6 +714,13 @@ impl Client {
704714
CloudService,
705715
rename_custom_search_attribute
706716
),
717+
"set_service_account_namespace_access" => rpc_call!(
718+
self,
719+
callback,
720+
call,
721+
CloudService,
722+
set_service_account_namespace_access
723+
),
707724
"set_user_group_namespace_access" => rpc_call!(
708725
self,
709726
callback,
@@ -743,6 +760,13 @@ impl Client {
743760
"update_user_group" => {
744761
rpc_call!(self, callback, call, CloudService, update_user_group)
745762
}
763+
"validate_account_audit_log_sink" => rpc_call!(
764+
self,
765+
callback,
766+
call,
767+
CloudService,
768+
validate_account_audit_log_sink
769+
),
746770
"validate_namespace_export_sink" => rpc_call!(
747771
self,
748772
callback,

temporalio/ext/src/envconfig.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use std::collections::HashMap;
22

33
use magnus::{Error, RHash, RString, Ruby, class, function, prelude::*, scan_args};
4-
use temporal_sdk_core_api::envconfig::{
4+
use temporalio_common::envconfig::{
55
ClientConfig as CoreClientConfig, ClientConfigCodec,
66
ClientConfigProfile as CoreClientConfigProfile, ClientConfigTLS as CoreClientConfigTLS,
77
DataSource, LoadClientConfigOptions, LoadClientConfigProfileOptions,

temporalio/ext/src/metric.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use magnus::{
99
r_hash::ForEach,
1010
value::{IntoId, Lazy, Qfalse, Qtrue},
1111
};
12-
use temporal_sdk_core_api::telemetry::metrics::{
12+
use temporalio_common::telemetry::metrics::{
1313
self, BufferInstrumentRef, CustomMetricAttributes, MetricEvent,
1414
};
1515

temporalio/ext/src/runtime.rs

Lines changed: 23 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -10,15 +10,15 @@ use std::str::FromStr;
1010
use std::sync::mpsc::{Receiver, Sender, channel};
1111
use std::time::Duration;
1212
use std::{future::Future, sync::Arc};
13-
use temporal_sdk_core::telemetry::{
14-
MetricsCallBuffer, build_otlp_metric_exporter, start_prometheus_metric_exporter,
15-
};
16-
use temporal_sdk_core::{CoreRuntime, TokioRuntimeBuilder};
17-
use temporal_sdk_core_api::telemetry::HistogramBucketOverrides;
18-
use temporal_sdk_core_api::telemetry::{
13+
use temporalio_common::telemetry::HistogramBucketOverrides;
14+
use temporalio_common::telemetry::{
1915
Logger, MetricTemporality, OtelCollectorOptionsBuilder, OtlpProtocol,
2016
PrometheusExporterOptionsBuilder, TelemetryOptionsBuilder, metrics::MetricCallBufferer,
2117
};
18+
use temporalio_sdk_core::telemetry::{
19+
MetricsCallBuffer, build_otlp_metric_exporter, start_prometheus_metric_exporter,
20+
};
21+
use temporalio_sdk_core::{CoreRuntime, RuntimeOptionsBuilder, TokioRuntimeBuilder};
2222
use tracing::error as log_error;
2323
use url::Url;
2424

@@ -55,7 +55,7 @@ pub(crate) struct RuntimeHandle {
5555
macro_rules! enter_sync {
5656
($runtime:expr) => {
5757
if let Some(subscriber) = $runtime.core.telemetry().trace_subscriber() {
58-
temporal_sdk_core::telemetry::set_trace_subscriber_for_current_thread(subscriber);
58+
temporalio_sdk_core::telemetry::set_trace_subscriber_for_current_thread(subscriber);
5959
}
6060
let _guard = $runtime.core.tokio_handle().enter();
6161
};
@@ -71,12 +71,12 @@ pub(crate) enum AsyncCommand {
7171
impl Runtime {
7272
pub fn new(options: Struct) -> Result<Self, Error> {
7373
// Build options
74-
let mut opts_build = TelemetryOptionsBuilder::default();
74+
let mut telemetry_opts_build = TelemetryOptionsBuilder::default();
7575
let telemetry = options
7676
.child(id!("telemetry"))?
7777
.ok_or_else(|| error!("Missing telemetry options"))?;
7878
if let Some(logging) = telemetry.child(id!("logging"))? {
79-
opts_build.logging(
79+
telemetry_opts_build.logging(
8080
if let Some(_forward_to) = logging.member::<Option<Value>>(id!("forward_to"))? {
8181
// TODO(cretz): This
8282
return Err(error!("Forwarding not yet supported"));
@@ -90,14 +90,24 @@ impl Runtime {
9090
// Set some metrics options now, but the metrics instance is late-bound
9191
// after CoreRuntime created since it needs Tokio runtime
9292
if let Some(metrics) = telemetry.child(id!("metrics"))? {
93-
opts_build.attach_service_name(metrics.member(id!("attach_service_name"))?);
93+
telemetry_opts_build.attach_service_name(metrics.member(id!("attach_service_name"))?);
9494
if let Some(prefix) = metrics.member::<Option<String>>(id!("metric_prefix"))? {
95-
opts_build.metric_prefix(prefix);
95+
telemetry_opts_build.metric_prefix(prefix);
9696
}
9797
}
98-
let opts = opts_build
98+
let opts = RuntimeOptionsBuilder::default()
99+
.telemetry_options(
100+
telemetry_opts_build
101+
.build()
102+
.map_err(|err| error!("Invalid telemetry options: {}", err))?,
103+
)
104+
.heartbeat_interval(
105+
options
106+
.member::<Option<f64>>(id!("worker_heartbeat_interval"))?
107+
.map(Duration::from_secs_f64),
108+
)
99109
.build()
100-
.map_err(|err| error!("Invalid telemetry options: {}", err))?;
110+
.map_err(|err| error!("Invalid runtime options: {}", err))?;
101111

102112
// Create core runtime
103113
let mut core = CoreRuntime::new(opts, TokioRuntimeBuilder::default())

temporalio/ext/src/testing.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use magnus::{
44
DataTypeFunctions, Error, Ruby, TypedData, Value, class, function, method, prelude::*,
55
};
66
use parking_lot::Mutex;
7-
use temporal_sdk_core::ephemeral_server::{
7+
use temporalio_sdk_core::ephemeral_server::{
88
self, EphemeralExe, EphemeralExeVersion, TemporalDevServerConfigBuilder,
99
TestServerConfigBuilder,
1010
};

0 commit comments

Comments
 (0)