Skip to content

Ensure namespace exists on worker startup #750

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions core-api/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,17 @@

use temporal_sdk_core_protos::coresdk::activity_result::ActivityExecutionResult;

/// Errors thrown by [crate::Worker::validate]
#[derive(thiserror::Error, Debug)]
pub enum WorkerValidationError {
/// The namespace provided to the worker does not exist on the server.
#[error("Namespace {namespace} was not found or otherwise could not be described: {source:?}")]
NamespaceDescribeError {
source: tonic::Status,
namespace: String,
},
}

/// Errors thrown by [crate::Worker::poll_workflow_activation]
#[derive(thiserror::Error, Debug)]
pub enum PollWfError {
Expand Down
10 changes: 9 additions & 1 deletion core-api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@ pub mod telemetry;
pub mod worker;

use crate::{
errors::{CompleteActivityError, CompleteWfError, PollActivityError, PollWfError},
errors::{
CompleteActivityError, CompleteWfError, PollActivityError, PollWfError,
WorkerValidationError,
},
worker::WorkerConfig,
};
use temporal_sdk_core_protos::coresdk::{
Expand All @@ -16,6 +19,11 @@ use temporal_sdk_core_protos::coresdk::{
/// and is bound to a specific task queue.
#[async_trait::async_trait]
pub trait Worker: Send + Sync {
/// Validate that the worker can properly connect to server, plus any other validation that
/// needs to be done asynchronously. Lang SDKs should call this function once before calling
/// any others.
async fn validate(&self) -> Result<(), WorkerValidationError>;

/// Ask the worker for some work, returning a [WorkflowActivation]. It is then the language
/// SDK's responsibility to call the appropriate workflow code with the provided inputs. Blocks
/// indefinitely until such work is available or [Worker::shutdown] is called.
Expand Down
2 changes: 1 addition & 1 deletion core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["parking_lot", "env-filter", "registry"] }
url = "2.2"
uuid = { version = "1.1", features = ["v4"] }
zip = { version = "1.2", optional = true }
zip = { version = "2.0", optional = true }

# 1st party local deps
[dependencies.temporal-sdk-core-api]
Expand Down
6 changes: 5 additions & 1 deletion core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ use crate::{
},
worker::client::WorkerClientBag,
};
use anyhow::bail;
use futures::Stream;
use std::sync::Arc;
use temporal_client::{ConfiguredClient, TemporalServiceClientWithMetrics};
Expand Down Expand Up @@ -82,7 +83,10 @@ where
{
let client = init_worker_client(&worker_config, *client.into().into_inner());
if client.namespace() != worker_config.namespace {
panic!("Passed in client is not bound to the same namespace as the worker");
bail!("Passed in client is not bound to the same namespace as the worker");
}
if client.namespace() == "" {
bail!("Namespace cannot be empty");
}
let client_ident = client.get_options().identity.clone();
let sticky_q = sticky_q_name_for_worker(&client_ident, &worker_config);
Expand Down
13 changes: 11 additions & 2 deletions core/src/worker/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
pub(crate) mod mocks;
use parking_lot::RwLock;
use std::sync::Arc;
use temporal_client::{Client, RetryClient, SlotManager, WorkflowService};
use temporal_client::{Client, Namespace, RetryClient, SlotManager, WorkflowService};
use temporal_sdk_core_protos::{
coresdk::workflow_commands::QueryResult,
temporal::api::{
Expand Down Expand Up @@ -145,9 +145,10 @@ pub(crate) trait WorkerClient: Sync + Send {
task_token: TaskToken,
query_result: QueryResult,
) -> Result<RespondQueryTaskCompletedResponse>;
async fn describe_namespace(&self) -> Result<DescribeNamespaceResponse>;

fn replace_client(&self, new_client: RetryClient<Client>);
fn capabilities(&self) -> Option<get_system_info_response::Capabilities>;
fn capabilities(&self) -> Option<Capabilities>;
fn workers(&self) -> Arc<SlotManager>;
fn is_mock(&self) -> bool;
}
Expand Down Expand Up @@ -375,6 +376,14 @@ impl WorkerClient for WorkerClientBag {
.into_inner())
}

async fn describe_namespace(&self) -> Result<DescribeNamespaceResponse> {
temporal_client::WorkflowClientTrait::describe_namespace(
&self.cloned_client(),
Namespace::Name(self.namespace.clone()),
)
.await
}

fn replace_client(&self, new_client: RetryClient<Client>) {
let mut replaceable_client = self.replaceable_client.write();
*replaceable_client = new_client;
Expand Down
9 changes: 5 additions & 4 deletions core/src/worker/client/mocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,12 +115,13 @@ mockall::mock! {
) -> impl Future<Output = Result<RespondQueryTaskCompletedResponse>> + Send + 'b
where 'a: 'b, Self: 'b;

fn replace_client(&self, new_client: RetryClient<Client>);

fn capabilities(&self) -> Option<get_system_info_response::Capabilities>;
fn describe_namespace<'a, 'b>(&self) ->
impl Future<Output = Result<DescribeNamespaceResponse>> + Send + 'b
where 'a: 'b, Self: 'b;

fn replace_client(&self, new_client: RetryClient<Client>);
fn capabilities(&self) -> Option<Capabilities>;
fn workers(&self) -> Arc<SlotManager>;

fn is_mock(&self) -> bool;
}
}
35 changes: 25 additions & 10 deletions core/src/worker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ pub(crate) use activities::{
};
pub(crate) use workflow::{wft_poller::new_wft_poller, LEGACY_QUERY_ID};

use temporal_client::{ConfiguredClient, TemporalServiceClientWithMetrics, WorkerKey};

use crate::{
abstractions::{dbg_panic, MeteredPermitDealer},
errors::CompleteWfError,
Expand Down Expand Up @@ -51,6 +49,7 @@ use std::{
Arc,
},
};
use temporal_client::{ConfiguredClient, TemporalServiceClientWithMetrics, WorkerKey};
use temporal_sdk_core_protos::{
coresdk::{
activity_result::activity_execution_result,
Expand All @@ -70,6 +69,7 @@ use tokio::sync::mpsc::unbounded_channel;
use tokio_stream::wrappers::UnboundedReceiverStream;
use tokio_util::sync::CancellationToken;

use temporal_sdk_core_api::errors::WorkerValidationError;
#[cfg(test)]
use {
crate::{
Expand All @@ -83,7 +83,7 @@ use {
/// A worker polls on a certain task queue
pub struct Worker {
config: WorkerConfig,
wf_client: Arc<dyn WorkerClient>,
client: Arc<dyn WorkerClient>,
/// Registration key to enable eager workflow start for this worker
worker_key: Mutex<Option<WorkerKey>>,
/// Manages all workflows and WFT processing
Expand All @@ -105,6 +105,11 @@ pub struct Worker {

#[async_trait::async_trait]
impl WorkerTrait for Worker {
async fn validate(&self) -> Result<(), WorkerValidationError> {
self.verify_namespace_exists().await?;
Ok(())
}

async fn poll_workflow_activation(&self) -> Result<WorkflowActivation, PollWfError> {
self.next_workflow_activation().await
}
Expand Down Expand Up @@ -174,7 +179,7 @@ impl WorkerTrait for Worker {
self.shutdown_token.cancel();
// First, disable Eager Workflow Start
if let Some(key) = *self.worker_key.lock() {
self.wf_client.workers().unregister(key);
self.client.workers().unregister(key);
}
// Second, we want to stop polling of both activity and workflow tasks
if let Some(atm) = self.at_task_mgr.as_ref() {
Expand Down Expand Up @@ -226,11 +231,11 @@ impl Worker {
pub fn replace_client(&self, new_client: ConfiguredClient<TemporalServiceClientWithMetrics>) {
// Unregister worker from current client, register in new client at the end
let mut worker_key = self.worker_key.lock();
let slot_provider = (*worker_key).and_then(|k| self.wf_client.workers().unregister(k));
self.wf_client
let slot_provider = (*worker_key).and_then(|k| self.client.workers().unregister(k));
self.client
.replace_client(super::init_worker_client(&self.config, new_client));
*worker_key = slot_provider
.and_then(|slot_provider| self.wf_client.workers().register(slot_provider));
*worker_key =
slot_provider.and_then(|slot_provider| self.client.workers().register(slot_provider));
}

#[cfg(test)]
Expand Down Expand Up @@ -410,7 +415,7 @@ impl Worker {
let worker_key = Mutex::new(client.workers().register(Box::new(provider)));
Self {
worker_key,
wf_client: client.clone(),
client: client.clone(),
workflows: Workflows::new(
build_wf_basics(
config.clone(),
Expand Down Expand Up @@ -607,7 +612,7 @@ impl Worker {
}

if let Some(atm) = &self.at_task_mgr {
atm.complete(task_token, status, &*self.wf_client).await;
atm.complete(task_token, status, &*self.client).await;
} else {
error!(
"Tried to complete activity {} on a worker that does not have an activity manager",
Expand Down Expand Up @@ -697,6 +702,16 @@ impl Worker {
fn notify_local_result(&self, run_id: &str, res: LocalResolution) {
self.workflows.notify_of_local_result(run_id, res);
}

async fn verify_namespace_exists(&self) -> Result<(), WorkerValidationError> {
if let Err(e) = self.client.describe_namespace().await {
return Err(WorkerValidationError::NamespaceDescribeError {
source: e,
namespace: self.config.namespace.clone(),
});
}
Ok(())
}
}

pub(crate) struct PostActivateHookData<'a> {
Expand Down
32 changes: 32 additions & 0 deletions tests/integ_tests/worker_tests.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
use assert_matches::assert_matches;
use temporal_sdk_core::{init_worker, CoreRuntime};
use temporal_sdk_core_api::{errors::WorkerValidationError, worker::WorkerConfigBuilder, Worker};
use temporal_sdk_core_test_utils::{get_integ_server_options, get_integ_telem_options};

#[tokio::test]
async fn worker_validation_fails_on_nonexistent_namespace() {
let opts = get_integ_server_options();
let runtime = CoreRuntime::new_assume_tokio(get_integ_telem_options()).unwrap();
let retrying_client = opts
.connect_no_namespace(runtime.telemetry().get_temporal_metric_meter())
.await
.unwrap();

let worker = init_worker(
&runtime,
WorkerConfigBuilder::default()
.namespace("i_dont_exist")
.task_queue("Wheee!")
.worker_build_id("blah")
.build()
.unwrap(),
retrying_client,
)
.unwrap();

let res = worker.validate().await;
assert_matches!(
res,
Err(WorkerValidationError::NamespaceDescribeError { .. })
);
}
1 change: 1 addition & 0 deletions tests/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ mod integ_tests {
mod queries_tests;
mod update_tests;
mod visibility_tests;
mod worker_tests;
mod workflow_tests;

use std::str::FromStr;
Expand Down
Loading