Skip to content

Commit bde0afb

Browse files
authored
Support user metadata in child workflow, timer, and activity (#846)
* code written, need to write tests * wrote tests * fix integration tests * remove experimental tag from SDK, take static_summary and static_details * another take instead of clone
1 parent 6155b09 commit bde0afb

File tree

19 files changed

+260
-23
lines changed

19 files changed

+260
-23
lines changed

core/src/core_tests/activity_tasks.rs

Lines changed: 57 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,9 @@ use crate::{
22
advance_fut, job_assert, prost_dur,
33
test_help::{
44
build_fake_worker, build_mock_pollers, canned_histories, gen_assert_and_reply,
5-
mock_manual_poller, mock_poller, mock_poller_from_resps, mock_worker, poll_and_reply,
6-
single_hist_mock_sg, test_worker_cfg, MockPollCfg, MockWorkerInputs, MocksHolder,
7-
QueueResponse, ResponseType, WorkerExt, WorkflowCachingPolicy, TEST_Q,
5+
mock_manual_poller, mock_poller, mock_poller_from_resps, mock_sdk_cfg, mock_worker,
6+
poll_and_reply, single_hist_mock_sg, test_worker_cfg, MockPollCfg, MockWorkerInputs,
7+
MocksHolder, QueueResponse, ResponseType, WorkerExt, WorkflowCachingPolicy, TEST_Q,
88
},
99
worker::client::mocks::{mock_manual_workflow_client, mock_workflow_client},
1010
ActivityHeartbeat, Worker,
@@ -45,17 +45,18 @@ use temporal_sdk_core_protos::{
4545
},
4646
temporal::api::{
4747
command::v1::{command::Attributes, ScheduleActivityTaskCommandAttributes},
48-
enums::v1::EventType,
48+
enums::v1::{CommandType, EventType},
4949
history::v1::{
5050
history_event::Attributes as EventAttributes, ActivityTaskScheduledEventAttributes,
5151
},
52+
sdk::v1::UserMetadata,
5253
workflowservice::v1::{
5354
PollActivityTaskQueueResponse, RecordActivityTaskHeartbeatResponse,
5455
RespondActivityTaskCanceledResponse, RespondActivityTaskCompletedResponse,
5556
RespondActivityTaskFailedResponse, RespondWorkflowTaskCompletedResponse,
5657
},
5758
},
58-
TestHistoryBuilder, DEFAULT_WORKFLOW_TYPE,
59+
TestHistoryBuilder, DEFAULT_ACTIVITY_TYPE, DEFAULT_WORKFLOW_TYPE,
5960
};
6061
use temporal_sdk_core_test_utils::{fanout_tasks, start_timer_cmd, TestWorker};
6162
use tokio::{join, sync::Barrier, time::sleep};
@@ -1185,3 +1186,54 @@ async fn activities_must_be_flushed_to_server_on_shutdown(#[values(true, false)]
11851186
};
11861187
join!(shutdown_task, complete_task);
11871188
}
1189+
1190+
#[tokio::test]
1191+
async fn pass_activity_summary_to_metadata() {
1192+
let t = canned_histories::single_activity("1");
1193+
let mut mock_cfg = MockPollCfg::from_hist_builder(t);
1194+
let wf_id = mock_cfg.hists[0].wf_id.clone();
1195+
let wf_type = DEFAULT_WORKFLOW_TYPE;
1196+
let expected_user_metadata = Some(UserMetadata {
1197+
summary: Some(b"activity summary".into()),
1198+
details: None,
1199+
});
1200+
mock_cfg.completion_asserts_from_expectations(|mut asserts| {
1201+
asserts
1202+
.then(move |wft| {
1203+
assert_eq!(wft.commands.len(), 1);
1204+
assert_eq!(
1205+
wft.commands[0].command_type(),
1206+
CommandType::ScheduleActivityTask
1207+
);
1208+
assert_eq!(wft.commands[0].user_metadata, expected_user_metadata)
1209+
})
1210+
.then(move |wft| {
1211+
assert_eq!(wft.commands.len(), 1);
1212+
assert_eq!(
1213+
wft.commands[0].command_type(),
1214+
CommandType::CompleteWorkflowExecution
1215+
);
1216+
});
1217+
});
1218+
1219+
let mut worker = mock_sdk_cfg(mock_cfg, |_| {});
1220+
worker.register_wf(wf_type, |ctx: WfContext| async move {
1221+
ctx.activity(ActivityOptions {
1222+
activity_type: DEFAULT_ACTIVITY_TYPE.to_string(),
1223+
summary: Some("activity summary".to_string()),
1224+
..Default::default()
1225+
})
1226+
.await;
1227+
Ok(().into())
1228+
});
1229+
worker
1230+
.submit_wf(
1231+
wf_id.to_owned(),
1232+
wf_type.to_owned(),
1233+
vec![],
1234+
WorkflowOptions::default(),
1235+
)
1236+
.await
1237+
.unwrap();
1238+
worker.run_until_done().await.unwrap();
1239+
}

core/src/core_tests/child_workflows.rs

Lines changed: 65 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,24 @@
11
use crate::{
22
replay::DEFAULT_WORKFLOW_TYPE,
33
test_help::{
4-
build_fake_sdk, canned_histories, mock_sdk, mock_worker, single_hist_mock_sg, MockPollCfg,
5-
ResponseType,
4+
build_fake_sdk, canned_histories, mock_sdk, mock_sdk_cfg, mock_worker, single_hist_mock_sg,
5+
MockPollCfg, ResponseType,
66
},
77
worker::client::mocks::mock_workflow_client,
88
};
99
use temporal_client::WorkflowOptions;
1010
use temporal_sdk::{ChildWorkflowOptions, Signal, WfContext, WorkflowResult};
1111
use temporal_sdk_core_api::Worker;
12-
use temporal_sdk_core_protos::coresdk::{
13-
child_workflow::{child_workflow_result, ChildWorkflowCancellationType},
14-
workflow_activation::{workflow_activation_job, WorkflowActivationJob},
15-
workflow_commands::{
16-
CancelChildWorkflowExecution, CompleteWorkflowExecution, StartChildWorkflowExecution,
12+
use temporal_sdk_core_protos::{
13+
coresdk::{
14+
child_workflow::{child_workflow_result, ChildWorkflowCancellationType},
15+
workflow_activation::{workflow_activation_job, WorkflowActivationJob},
16+
workflow_commands::{
17+
CancelChildWorkflowExecution, CompleteWorkflowExecution, StartChildWorkflowExecution,
18+
},
19+
workflow_completion::WorkflowActivationCompletion,
1720
},
18-
workflow_completion::WorkflowActivationCompletion,
21+
temporal::api::{enums::v1::CommandType, sdk::v1::UserMetadata},
1922
};
2023
use tokio::join;
2124

@@ -220,3 +223,57 @@ async fn cancel_already_complete_child_ignored() {
220223
.await
221224
.unwrap();
222225
}
226+
227+
#[tokio::test]
228+
async fn pass_child_workflow_summary_to_metadata() {
229+
let wf_id = "1";
230+
let wf_type = DEFAULT_WORKFLOW_TYPE;
231+
let t = canned_histories::single_child_workflow(wf_id);
232+
let mut mock_cfg = MockPollCfg::from_hist_builder(t);
233+
let expected_user_metadata = Some(UserMetadata {
234+
summary: Some(b"child summary".into()),
235+
details: Some(b"child details".into()),
236+
});
237+
mock_cfg.completion_asserts_from_expectations(|mut asserts| {
238+
asserts
239+
.then(move |wft| {
240+
assert_eq!(wft.commands.len(), 1);
241+
assert_eq!(
242+
wft.commands[0].command_type(),
243+
CommandType::StartChildWorkflowExecution
244+
);
245+
assert_eq!(wft.commands[0].user_metadata, expected_user_metadata)
246+
})
247+
.then(move |wft| {
248+
assert_eq!(wft.commands.len(), 1);
249+
assert_eq!(
250+
wft.commands[0].command_type(),
251+
CommandType::CompleteWorkflowExecution
252+
);
253+
});
254+
});
255+
256+
let mut worker = mock_sdk_cfg(mock_cfg, |_| {});
257+
worker.register_wf(wf_type, move |ctx: WfContext| async move {
258+
ctx.child_workflow(ChildWorkflowOptions {
259+
workflow_id: wf_id.to_string(),
260+
workflow_type: "child".to_string(),
261+
static_summary: Some("child summary".to_string()),
262+
static_details: Some("child details".to_string()),
263+
..Default::default()
264+
})
265+
.start(&ctx)
266+
.await;
267+
Ok(().into())
268+
});
269+
worker
270+
.submit_wf(
271+
wf_id.to_owned(),
272+
wf_type.to_owned(),
273+
vec![],
274+
WorkflowOptions::default(),
275+
)
276+
.await
277+
.unwrap();
278+
worker.run_until_done().await.unwrap();
279+
}

core/src/core_tests/updates.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -291,6 +291,7 @@ async fn replay_with_signal_and_update_same_task() {
291291
StartTimer {
292292
seq: 1,
293293
start_to_fire_timeout: Some(prost_dur!(from_secs(1))),
294+
summary: None,
294295
}
295296
.into(),
296297
UpdateResponse {

core/src/core_tests/workers.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ async fn after_shutdown_of_worker_get_shutdown_err() {
4545
workflow_command::Variant::StartTimer(StartTimer {
4646
seq: 1,
4747
start_to_fire_timeout: Some(prost_dur!(from_secs(1))),
48+
summary: None,
4849
}),
4950
))
5051
.await
@@ -352,6 +353,7 @@ async fn worker_shutdown_api(#[case] use_cache: bool, #[case] api_success: bool)
352353
workflow_command::Variant::StartTimer(StartTimer {
353354
seq: 1,
354355
start_to_fire_timeout: Some(prost_dur!(from_secs(1))),
356+
summary: None,
355357
}),
356358
))
357359
.await

core/src/core_tests/workflow_tasks.rs

Lines changed: 49 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ use std::{
2929
time::Duration,
3030
};
3131
use temporal_client::WorkflowOptions;
32-
use temporal_sdk::{ActivityOptions, CancellableFuture, WfContext};
32+
use temporal_sdk::{ActivityOptions, CancellableFuture, TimerOptions, WfContext};
3333
use temporal_sdk_core_api::{
3434
errors::PollWfError,
3535
worker::{
@@ -64,6 +64,7 @@ use temporal_sdk_core_protos::{
6464
history_event, TimerFiredEventAttributes,
6565
WorkflowPropertiesModifiedExternallyEventAttributes,
6666
},
67+
sdk::v1::UserMetadata,
6768
workflowservice::v1::{
6869
GetWorkflowExecutionHistoryResponse, RespondWorkflowTaskCompletedResponse,
6970
},
@@ -3086,3 +3087,50 @@ async fn slot_provider_cant_hand_out_more_permits_than_cache_size() {
30863087
// polling is not exceeding the task limit
30873088
assert_eq!(popped_tasks.load(Ordering::Relaxed), 10);
30883089
}
3090+
3091+
#[tokio::test]
3092+
async fn pass_timer_summary_to_metadata() {
3093+
let t = canned_histories::single_timer("1");
3094+
let mut mock_cfg = MockPollCfg::from_hist_builder(t);
3095+
let wf_id = mock_cfg.hists[0].wf_id.clone();
3096+
let wf_type = DEFAULT_WORKFLOW_TYPE;
3097+
let expected_user_metadata = Some(UserMetadata {
3098+
summary: Some(b"timer summary".into()),
3099+
details: None,
3100+
});
3101+
mock_cfg.completion_asserts_from_expectations(|mut asserts| {
3102+
asserts
3103+
.then(move |wft| {
3104+
assert_eq!(wft.commands.len(), 1);
3105+
assert_eq!(wft.commands[0].command_type(), CommandType::StartTimer);
3106+
assert_eq!(wft.commands[0].user_metadata, expected_user_metadata)
3107+
})
3108+
.then(move |wft| {
3109+
assert_eq!(wft.commands.len(), 1);
3110+
assert_eq!(
3111+
wft.commands[0].command_type(),
3112+
CommandType::CompleteWorkflowExecution
3113+
);
3114+
});
3115+
});
3116+
3117+
let mut worker = mock_sdk_cfg(mock_cfg, |_| {});
3118+
worker.register_wf(wf_type, |ctx: WfContext| async move {
3119+
ctx.timer(TimerOptions {
3120+
duration: Duration::from_secs(1),
3121+
summary: Some("timer summary".to_string()),
3122+
})
3123+
.await;
3124+
Ok(().into())
3125+
});
3126+
worker
3127+
.submit_wf(
3128+
wf_id.to_owned(),
3129+
wf_type.to_owned(),
3130+
vec![],
3131+
WorkflowOptions::default(),
3132+
)
3133+
.await
3134+
.unwrap();
3135+
worker.run_until_done().await.unwrap();
3136+
}

core/src/worker/workflow/machines/activity_state_machine.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ use temporal_sdk_core_protos::{
3030
ActivityTaskCompletedEventAttributes, ActivityTaskFailedEventAttributes,
3131
ActivityTaskTimedOutEventAttributes,
3232
},
33+
sdk::v1::UserMetadata,
3334
},
3435
};
3536

@@ -114,6 +115,10 @@ impl ActivityMachine {
114115
internal_flags: InternalFlagsRef,
115116
use_compatible_version: bool,
116117
) -> NewMachineWithCommand {
118+
let user_metadata = attrs.summary.clone().map(|x| UserMetadata {
119+
summary: Some(x),
120+
details: None,
121+
});
117122
let mut s = Self::from_parts(
118123
Created {}.into(),
119124
SharedState {
@@ -134,7 +139,7 @@ impl ActivityMachine {
134139
s.shared_state().attrs.clone(),
135140
use_compatible_version,
136141
)),
137-
user_metadata: Default::default(),
142+
user_metadata,
138143
};
139144
NewMachineWithCommand {
140145
command,

core/src/worker/workflow/machines/child_workflow_state_machine.rs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ use temporal_sdk_core_protos::{
4141
ChildWorkflowExecutionStartedEventAttributes,
4242
StartChildWorkflowExecutionFailedEventAttributes,
4343
},
44+
sdk::v1::UserMetadata,
4445
},
4546
};
4647

@@ -449,6 +450,17 @@ impl ChildWorkflowMachine {
449450
cancelled_before_sent: false,
450451
},
451452
);
453+
let mut attribs = attribs;
454+
let user_metadata = if attribs.static_summary.is_some() || attribs.static_details.is_some()
455+
{
456+
Some(UserMetadata {
457+
summary: attribs.static_summary.take(),
458+
details: attribs.static_details.take(),
459+
})
460+
} else {
461+
None
462+
};
463+
let attribs = attribs;
452464
OnEventWrapper::on_event_mut(&mut s, ChildWorkflowMachineEvents::Schedule)
453465
.expect("Scheduling child workflows doesn't fail");
454466
let cmd = Command {
@@ -457,7 +469,7 @@ impl ChildWorkflowMachine {
457469
attribs,
458470
use_compatible_version,
459471
)),
460-
user_metadata: Default::default(),
472+
user_metadata,
461473
};
462474
NewMachineWithCommand {
463475
command: cmd,

core/src/worker/workflow/machines/timer_state_machine.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ use temporal_sdk_core_protos::{
1717
command::v1::Command,
1818
enums::v1::{CommandType, EventType},
1919
history::v1::{history_event, TimerFiredEventAttributes},
20+
sdk::v1::UserMetadata,
2021
},
2122
};
2223

@@ -73,13 +74,19 @@ pub(super) fn new_timer(attribs: StartTimer) -> NewMachineWithCommand {
7374
impl TimerMachine {
7475
/// Create a new timer and immediately schedule it
7576
fn new_scheduled(attribs: StartTimer) -> (Self, Command) {
77+
let mut attribs = attribs;
78+
let user_metadata = attribs.summary.take().map(|x| UserMetadata {
79+
summary: Some(x),
80+
details: None,
81+
});
82+
let attribs = attribs;
7683
let mut s = Self::new(attribs);
7784
OnEventWrapper::on_event_mut(&mut s, TimerMachineEvents::Schedule)
7885
.expect("Scheduling timers doesn't fail");
7986
let cmd = Command {
8087
command_type: CommandType::StartTimer as i32,
81-
attributes: Some(s.shared_state().attrs.into()),
82-
user_metadata: Default::default(),
88+
attributes: Some(s.shared_state().attrs.clone().into()),
89+
user_metadata,
8390
};
8491
(s, cmd)
8592
}

sdk-core-protos/protos/local/temporal/sdk/core/workflow_commands/workflow_commands.proto

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,8 @@ message StartTimer {
4646
// Lang's incremental sequence number, used as the operation identifier
4747
uint32 seq = 1;
4848
google.protobuf.Duration start_to_fire_timeout = 2;
49+
// Summary that is stored as user_metadata
50+
temporal.api.common.v1.Payload summary = 3;
4951
}
5052

5153
message CancelTimer {
@@ -88,6 +90,8 @@ message ScheduleActivity {
8890
bool do_not_eagerly_execute = 14;
8991
// Whether this activity should run on a worker with a compatible build id or not.
9092
coresdk.common.VersioningIntent versioning_intent = 15;
93+
// Summary that is stored as user_metadata
94+
temporal.api.common.v1.Payload summary = 16;
9195
}
9296

9397
message ScheduleLocalActivity {
@@ -253,6 +257,10 @@ message StartChildWorkflowExecution {
253257
child_workflow.ChildWorkflowCancellationType cancellation_type = 18;
254258
// Whether this child should run on a worker with a compatible build id or not.
255259
coresdk.common.VersioningIntent versioning_intent = 19;
260+
// Static summary of the child workflow
261+
temporal.api.common.v1.Payload static_summary = 20;
262+
// Static details of the child workflow
263+
temporal.api.common.v1.Payload static_details = 21;
256264
}
257265

258266
// Cancel a child workflow

0 commit comments

Comments
 (0)