Skip to content

Drop all post-terminal commands & sort activation jobs #502

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ http = "0.2"
hyper = "0.14"
itertools = "0.10"
lazy_static = "1.4"
lru = "0.9"
lru = "0.10"
mockall = "0.11"
nix = "0.26"
once_cell = "1.5"
Expand Down
43 changes: 31 additions & 12 deletions core/src/core_tests/queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use temporal_sdk_core_protos::{
},
temporal::api::{
common::v1::Payload,
enums::v1::EventType,
enums::v1::{CommandType, EventType},
failure::v1::Failure,
history::v1::{history_event, ActivityTaskCancelRequestedEventAttributes, History},
query::v1::WorkflowQuery,
Expand Down Expand Up @@ -148,10 +148,11 @@ async fn legacy_query(#[case] include_history: bool) {
}

#[rstest::rstest]
#[case::one_query(1)]
#[case::multiple_queries(3)]
#[tokio::test]
async fn new_queries(#[case] num_queries: usize) {
async fn new_queries(
#[values(1, 3)] num_queries: usize,
#[values(false, true)] query_results_after_complete: bool,
) {
let wfid = "fake_wf_id";
let query_resp = "response";
let t = canned_histories::single_timer("1");
Expand All @@ -174,7 +175,18 @@ async fn new_queries(#[case] num_queries: usize) {
}]);
let mut mock_client = mock_workflow_client();
mock_client.expect_respond_legacy_query().times(0);
let mut mock = single_hist_mock_sg(wfid, t, tasks, mock_client, true);
let mut mh = MockPollCfg::from_resp_batches(wfid, t, tasks, mock_workflow_client());
mh.completion_asserts = Some(Box::new(move |c| {
// If the completion is the one ending the workflow, make sure it includes the query resps
if c.commands[0].command_type() == CommandType::CompleteWorkflowExecution {
assert_eq!(c.query_responses.len(), num_queries);
} else if c.commands[0].command_type() == CommandType::StartTimer {
// first reply, no queries here.
} else {
panic!("Unexpected command in response")
}
}));
let mut mock = build_mock_pollers(mh);
mock.worker_cfg(|wc| wc.max_cached_workflows = 10);
let core = mock_worker(mock);

Expand Down Expand Up @@ -203,8 +215,13 @@ async fn new_queries(#[case] num_queries: usize) {
}
);
}
let mut qresults: Vec<_> = (1..=num_queries)
.map(|i| {

let mut commands = vec![];
if query_results_after_complete {
commands.push(CompleteWorkflowExecution { result: None }.into());
}
for i in 1..=num_queries {
commands.push(
QueryResult {
query_id: format!("q{i}"),
variant: Some(
Expand All @@ -214,13 +231,15 @@ async fn new_queries(#[case] num_queries: usize) {
.into(),
),
}
.into()
})
.collect();
qresults.push(CompleteWorkflowExecution { result: None }.into());
.into(),
);
}
if !query_results_after_complete {
commands.push(CompleteWorkflowExecution { result: None }.into());
}
core.complete_workflow_activation(WorkflowActivationCompletion::from_cmds(
task.run_id,
qresults,
commands,
))
.await
.unwrap();
Expand Down
108 changes: 107 additions & 1 deletion core/src/core_tests/workflow_tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use temporal_sdk_core_protos::{
workflow_commands::{
ActivityCancellationType, CancelTimer, CompleteWorkflowExecution,
ContinueAsNewWorkflowExecution, FailWorkflowExecution, RequestCancelActivity,
ScheduleActivity,
ScheduleActivity, SetPatchMarker,
},
workflow_completion::WorkflowActivationCompletion,
},
Expand Down Expand Up @@ -2431,3 +2431,109 @@ async fn core_internal_flags() {
core.complete_execution(&act.run_id).await;
core.shutdown().await;
}

#[tokio::test]
async fn post_terminal_commands_are_discarded() {
crate::telemetry::test_telem_console();
let mut t = TestHistoryBuilder::default();
t.add_by_type(EventType::WorkflowExecutionStarted);
t.add_full_wf_task();
t.add_workflow_execution_completed();

let mut mh = MockPollCfg::from_resp_batches(
"fake_wf_id",
t,
[ResponseType::ToTaskNum(1), ResponseType::AllHistory],
mock_workflow_client(),
);
mh.completion_asserts = Some(Box::new(|c| {
// Only the complete execution command should actually be sent
assert_eq!(c.commands.len(), 1);
}));
let mut mock = build_mock_pollers(mh);
mock.worker_cfg(|wc| wc.max_cached_workflows = 1);
let core = mock_worker(mock);

let act = core.poll_workflow_activation().await.unwrap();
core.complete_workflow_activation(WorkflowActivationCompletion::from_cmds(
act.run_id,
vec![
CompleteWorkflowExecution { result: None }.into(),
start_timer_cmd(1, Duration::from_secs(1)),
],
))
.await
.unwrap();

// This just ensures applying the complete history w/ the completion command works, though
// there's no activation.
let act = core.poll_workflow_activation().await;
assert_matches!(act.unwrap_err(), PollWfError::ShutDown);

core.shutdown().await;
}

// Lang expects to always see jobs in this order:
// patches, signals, everything else, queries
#[tokio::test]
async fn jobs_are_in_appropriate_order() {
let p1 = "patchy-mc-patchface";
let p2 = "enchi-the-kitty";
let mut t = TestHistoryBuilder::default();
t.add_by_type(EventType::WorkflowExecutionStarted);
t.add_full_wf_task();
t.add_has_change_marker(p1, false);
let timer_started_event_id = t.add_by_type(EventType::TimerStarted);
t.add_timer_fired(timer_started_event_id, "1".to_string());
t.add_we_signaled("yummy-salmon", vec![]);
t.add_full_wf_task();
t.add_has_change_marker(p2, false);
t.add_workflow_execution_completed();

let mh = MockPollCfg::from_resp_batches(
"fake_wf_id",
t,
[ResponseType::AllHistory],
mock_workflow_client(),
);
let mut mock = build_mock_pollers(mh);
mock.worker_cfg(|wc| wc.max_cached_workflows = 1);
let core = mock_worker(mock);

let act = core.poll_workflow_activation().await.unwrap();
// Patch notifications always come first
assert_matches!(
act.jobs[0].variant.as_ref().unwrap(),
workflow_activation_job::Variant::NotifyHasPatch(_)
);
assert_matches!(
act.jobs[1].variant.as_ref().unwrap(),
workflow_activation_job::Variant::StartWorkflow(_)
);
core.complete_workflow_activation(WorkflowActivationCompletion::from_cmds(
act.run_id,
vec![
SetPatchMarker {
patch_id: p1.to_string(),
deprecated: false,
}
.into(),
start_timer_cmd(1, Duration::from_secs(1)),
],
))
.await
.unwrap();
let act = core.poll_workflow_activation().await.unwrap();
assert_matches!(
act.jobs[0].variant.as_ref().unwrap(),
workflow_activation_job::Variant::NotifyHasPatch(_)
);
assert_matches!(
act.jobs[1].variant.as_ref().unwrap(),
workflow_activation_job::Variant::SignalWorkflow(_)
);
assert_matches!(
act.jobs[2].variant.as_ref().unwrap(),
workflow_activation_job::Variant::FireTimer(_)
);
}
113 changes: 111 additions & 2 deletions core/src/worker/workflow/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,11 @@ use futures_util::{future::abortable, stream};
use prost_types::TimestampError;
use std::{
cell::RefCell,
cmp::Ordering,
collections::VecDeque,
fmt::Debug,
future::Future,
mem::discriminant,
ops::DerefMut,
rc::Rc,
result,
Expand Down Expand Up @@ -274,7 +276,9 @@ impl Workflows {
};
Span::current().record("run_id", al.run_id());
match al {
ActivationOrAuto::LangActivation(act) | ActivationOrAuto::ReadyForQueries(act) => {
ActivationOrAuto::LangActivation(mut act)
| ActivationOrAuto::ReadyForQueries(mut act) => {
sort_act_jobs(&mut act);
debug!(activation=%act, "Sending activation to lang");
break Ok(act);
}
Expand Down Expand Up @@ -978,7 +982,7 @@ fn validate_completion(
match completion.status {
Some(workflow_activation_completion::Status::Successful(success)) => {
// Convert to wf commands
let commands = success
let mut commands = success
.commands
.into_iter()
.map(|c| c.try_into())
Expand All @@ -1005,6 +1009,16 @@ fn validate_completion(
});
}

// Any non-query-response commands after a terminal command should be ignored
if let Some(term_cmd_pos) = commands.iter().position(|c| c.is_terminal()) {
// Query responses are just fine, so keep them.
let queries = commands
.split_off(term_cmd_pos + 1)
.into_iter()
.filter(|c| matches!(c, WFCommand::QueryResponse(_)));
commands.extend(queries);
}

Ok(ValidatedCompletion::Success {
run_id: completion.run_id,
commands,
Expand Down Expand Up @@ -1157,6 +1171,23 @@ impl TryFrom<WorkflowCommand> for WFCommand {
}
}

impl WFCommand {
/// Returns true if the command is one which ends the workflow:
/// * Completed
/// * Failed
/// * Cancelled
/// * Continue-as-new
pub fn is_terminal(&self) -> bool {
matches!(
self,
WFCommand::CompleteWorkflow(_)
| WFCommand::FailWorkflow(_)
| WFCommand::CancelWorkflow(_)
| WFCommand::ContinueAsNew(_)
)
}
}

#[derive(Debug, PartialEq, Eq, Hash, Clone, Copy)]
enum CommandID {
Timer(u32),
Expand Down Expand Up @@ -1273,3 +1304,81 @@ impl LocalActivityRequestSink for LAReqSink {
res
}
}

/// Sorts jobs in an activation to be in the order lang expects:
/// `patches -> signals -> other -> queries`
fn sort_act_jobs(wfa: &mut WorkflowActivation) {
wfa.jobs.sort_by(|j1, j2| {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you confirm this is a "stable" sort (meaning equal things aren't reordered)?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it's stable.

// Unwrapping is fine here since we'll never issue empty variants
let j1v = j1.variant.as_ref().unwrap();
let j2v = j2.variant.as_ref().unwrap();
if discriminant(j1v) == discriminant(j2v) {
return Ordering::Equal;
}
fn variant_ordinal(v: &workflow_activation_job::Variant) -> u8 {
match v {
workflow_activation_job::Variant::NotifyHasPatch(_) => 1,
workflow_activation_job::Variant::SignalWorkflow(_) => 2,
workflow_activation_job::Variant::QueryWorkflow(_) => 4,
_ => 3,
}
}
variant_ordinal(j1v).cmp(&variant_ordinal(j2v))
})
}

#[cfg(test)]
mod tests {
use super::*;
use itertools::Itertools;

#[test]
fn jobs_sort() {
let mut act = WorkflowActivation {
jobs: vec![
WorkflowActivationJob {
variant: Some(workflow_activation_job::Variant::SignalWorkflow(
Default::default(),
)),
},
WorkflowActivationJob {
variant: Some(workflow_activation_job::Variant::NotifyHasPatch(
Default::default(),
)),
},
WorkflowActivationJob {
variant: Some(workflow_activation_job::Variant::QueryWorkflow(
Default::default(),
)),
},
WorkflowActivationJob {
variant: Some(workflow_activation_job::Variant::FireTimer(
Default::default(),
)),
},
WorkflowActivationJob {
variant: Some(workflow_activation_job::Variant::ResolveActivity(
Default::default(),
)),
},
],
..Default::default()
};
sort_act_jobs(&mut act);
let variants = act
.jobs
.into_iter()
.map(|j| j.variant.unwrap())
.collect_vec();
assert_matches!(
variants.as_slice(),
&[
workflow_activation_job::Variant::NotifyHasPatch(_),
workflow_activation_job::Variant::SignalWorkflow(_),
workflow_activation_job::Variant::FireTimer(_),
workflow_activation_job::Variant::ResolveActivity(_),
workflow_activation_job::Variant::QueryWorkflow(_)
]
)
}
}
10 changes: 5 additions & 5 deletions sdk/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ use temporal_sdk_core_protos::{
common::NamespacedWorkflowExecution,
workflow_activation::{
resolve_child_workflow_execution_start::Status as ChildWorkflowStartStatus,
workflow_activation_job::Variant, WorkflowActivation, WorkflowActivationJob,
workflow_activation_job::Variant, WorkflowActivation,
},
workflow_commands::{workflow_command, ContinueAsNewWorkflowExecution},
workflow_completion::WorkflowActivationCompletion,
Expand Down Expand Up @@ -391,10 +391,10 @@ impl WorkflowHalf {

// If the activation is to start a workflow, create a new workflow driver for it,
// using the function associated with that workflow id
if let Some(WorkflowActivationJob {
variant: Some(Variant::StartWorkflow(sw)),
}) = activation.jobs.get(0)
{
if let Some(sw) = activation.jobs.iter().find_map(|j| match j.variant {
Some(Variant::StartWorkflow(ref sw)) => Some(sw),
_ => None,
}) {
let workflow_type = &sw.workflow_type;
let wf_fns_borrow = self.workflow_fns.borrow();
let wf_function = wf_fns_borrow
Expand Down
Loading