Skip to content

Commit 98b7722

Browse files
Add workflow start delay option
1 parent c4fb094 commit 98b7722

File tree

6 files changed

+46
-22
lines changed

6 files changed

+46
-22
lines changed

lib/temporal/client.rb

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ def initialize(config)
3838
# @option options [Hash] :timeouts check Temporal::Configuration::DEFAULT_TIMEOUTS
3939
# @option options [Hash] :headers
4040
# @option options [Hash] :search_attributes
41+
# @option options [Integer] :start_delay determines the amount of seconds to wait before initiating a Workflow
4142
#
4243
# @return [String] workflow's run ID
4344
def start_workflow(workflow, *input, options: {}, **args)
@@ -64,6 +65,7 @@ def start_workflow(workflow, *input, options: {}, **args)
6465
headers: config.header_propagator_chain.inject(execution_options.headers),
6566
memo: execution_options.memo,
6667
search_attributes: Workflow::Context::Helpers.process_search_attributes(execution_options.search_attributes),
68+
start_delay: execution_options.start_delay
6769
)
6870
else
6971
raise ArgumentError, 'If signal_input is provided, you must also provide signal_name' if signal_name.nil?
@@ -82,7 +84,8 @@ def start_workflow(workflow, *input, options: {}, **args)
8284
memo: execution_options.memo,
8385
search_attributes: Workflow::Context::Helpers.process_search_attributes(execution_options.search_attributes),
8486
signal_name: signal_name,
85-
signal_input: signal_input
87+
signal_input: signal_input,
88+
start_delay: execution_options.start_delay
8689
)
8790
end
8891

lib/temporal/connection/grpc.rb

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,8 @@ def start_workflow_execution(
122122
headers: nil,
123123
cron_schedule: nil,
124124
memo: nil,
125-
search_attributes: nil
125+
search_attributes: nil,
126+
start_delay: nil
126127
)
127128
request = Temporalio::Api::WorkflowService::V1::StartWorkflowExecutionRequest.new(
128129
identity: identity,
@@ -139,6 +140,7 @@ def start_workflow_execution(
139140
workflow_execution_timeout: execution_timeout,
140141
workflow_run_timeout: run_timeout,
141142
workflow_task_timeout: task_timeout,
143+
workflow_start_delay: start_delay,
142144
request_id: SecureRandom.uuid,
143145
header: Temporalio::Api::Common::V1::Header.new(
144146
fields: to_payload_map(headers || {})
@@ -375,7 +377,8 @@ def signal_with_start_workflow_execution(
375377
headers: nil,
376378
cron_schedule: nil,
377379
memo: nil,
378-
search_attributes: nil
380+
search_attributes: nil,
381+
start_delay: nil
379382
)
380383
proto_header_fields = if headers.nil?
381384
to_payload_map({})
@@ -402,6 +405,7 @@ def signal_with_start_workflow_execution(
402405
workflow_execution_timeout: execution_timeout,
403406
workflow_run_timeout: run_timeout,
404407
workflow_task_timeout: task_timeout,
408+
workflow_start_delay: start_delay,
405409
request_id: SecureRandom.uuid,
406410
header: Temporalio::Api::Common::V1::Header.new(
407411
fields: proto_header_fields

lib/temporal/execution_options.rb

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,8 @@
33

44
module Temporal
55
class ExecutionOptions
6-
attr_reader :name, :namespace, :task_queue, :retry_policy, :timeouts, :headers, :memo, :search_attributes
6+
attr_reader :name, :namespace, :task_queue, :retry_policy, :timeouts, :headers, :memo, :search_attributes,
7+
:start_delay
78

89
def initialize(object, options, defaults = nil)
910
# Options are treated as overrides and take precedence
@@ -15,6 +16,7 @@ def initialize(object, options, defaults = nil)
1516
@headers = options[:headers] || {}
1617
@memo = options[:memo] || {}
1718
@search_attributes = options[:search_attributes] || {}
19+
@start_delay = options[:start_delay] || 0
1820

1921
# For Temporal::Workflow and Temporal::Activity use defined values as the next option
2022
if has_executable_concern?(object)

spec/unit/lib/temporal/client_spec.rb

Lines changed: 23 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -52,20 +52,21 @@ def inject!(header)
5252
subject.start_workflow(TestStartWorkflow, 42)
5353
expect(connection)
5454
.to have_received(:start_workflow_execution)
55-
.with(
56-
namespace: 'default-test-namespace',
57-
workflow_id: an_instance_of(String),
58-
workflow_name: 'TestStartWorkflow',
59-
task_queue: 'default-test-task-queue',
60-
input: [42],
61-
task_timeout: Temporal.configuration.timeouts[:task],
62-
run_timeout: Temporal.configuration.timeouts[:run],
63-
execution_timeout: Temporal.configuration.timeouts[:execution],
64-
workflow_id_reuse_policy: nil,
65-
headers: { 'test' => 'asdf' },
66-
memo: {},
67-
search_attributes: {},
68-
)
55+
.with(
56+
namespace: 'default-test-namespace',
57+
workflow_id: an_instance_of(String),
58+
workflow_name: 'TestStartWorkflow',
59+
task_queue: 'default-test-task-queue',
60+
input: [42],
61+
task_timeout: Temporal.configuration.timeouts[:task],
62+
run_timeout: Temporal.configuration.timeouts[:run],
63+
execution_timeout: Temporal.configuration.timeouts[:execution],
64+
workflow_id_reuse_policy: nil,
65+
headers: { 'test' => 'asdf' },
66+
memo: {},
67+
search_attributes: {},
68+
start_delay: 0
69+
)
6970
end
7071
end
7172

@@ -94,6 +95,7 @@ def inject!(header)
9495
headers: {},
9596
memo: {},
9697
search_attributes: {},
98+
start_delay: 0
9799
)
98100
end
99101

@@ -109,6 +111,7 @@ def inject!(header)
109111
workflow_id_reuse_policy: :reject,
110112
memo: { 'MemoKey1' => 'MemoValue1' },
111113
search_attributes: { 'SearchAttribute1' => 256 },
114+
start_delay: 10
112115
}
113116
)
114117

@@ -127,6 +130,7 @@ def inject!(header)
127130
headers: { 'Foo' => 'Bar' },
128131
memo: { 'MemoKey1' => 'MemoValue1' },
129132
search_attributes: { 'SearchAttribute1' => 256 },
133+
start_delay: 10
130134
)
131135
end
132136

@@ -154,6 +158,7 @@ def inject!(header)
154158
headers: {},
155159
memo: {},
156160
search_attributes: {},
161+
start_delay: 0
157162
)
158163
end
159164

@@ -175,6 +180,7 @@ def inject!(header)
175180
headers: {},
176181
memo: {},
177182
search_attributes: {},
183+
start_delay: 0
178184
)
179185
end
180186

@@ -198,6 +204,7 @@ def inject!(header)
198204
headers: {},
199205
memo: {},
200206
search_attributes: {},
207+
start_delay: 0
201208
)
202209
end
203210
end
@@ -225,6 +232,7 @@ def inject!(header)
225232
headers: {},
226233
memo: {},
227234
search_attributes: {},
235+
start_delay: 0
228236
)
229237
end
230238
end
@@ -255,6 +263,7 @@ def expect_signal_with_start(expected_arguments, expected_signal_argument)
255263
search_attributes: {},
256264
signal_name: 'the question',
257265
signal_input: expected_signal_argument,
266+
start_delay: 0
258267
)
259268
end
260269

spec/unit/lib/temporal/execution_options_spec.rb

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -99,10 +99,11 @@ class TestExecutionOptionsWorkflow < Temporal::Workflow
9999
task_queue: 'test-task-queue',
100100
retry_policy: { interval: 1, backoff: 2, max_attempts: 5 },
101101
timeouts: { start_to_close: 10 },
102-
headers: { 'TestHeader' => 'Test' }
102+
headers: { 'TestHeader' => 'Test' },
103+
start_delay: 10
103104
}
104105
end
105-
106+
106107
it 'is initialized with full options' do
107108
expect(subject.name).to eq(options[:name])
108109
expect(subject.namespace).to eq(options[:namespace])
@@ -113,12 +114,13 @@ class TestExecutionOptionsWorkflow < Temporal::Workflow
113114
expect(subject.retry_policy.max_attempts).to eq(options[:retry_policy][:max_attempts])
114115
expect(subject.timeouts).to eq(options[:timeouts])
115116
expect(subject.headers).to eq(options[:headers])
117+
expect(subject.start_delay).to eq(options[:start_delay])
116118
end
117119
end
118-
120+
119121
context 'when retry policy options are invalid' do
120122
let(:options) { { retry_policy: { max_attempts: 10 } } }
121-
123+
122124
it 'raises' do
123125
expect { subject }.to raise_error(
124126
Temporal::RetryPolicy::InvalidRetryPolicy,

spec/unit/lib/temporal/grpc_spec.rb

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ class TestDeserializer
6262
execution_timeout: 1,
6363
run_timeout: 2,
6464
task_timeout: 3,
65+
start_delay: 10,
6566
memo: {},
6667
search_attributes: {
6768
'foo-int-attribute' => 256,
@@ -86,6 +87,7 @@ class TestDeserializer
8687
expect(request.workflow_execution_timeout.seconds).to eq(1)
8788
expect(request.workflow_run_timeout.seconds).to eq(2)
8889
expect(request.workflow_task_timeout.seconds).to eq(3)
90+
expect(request.workflow_start_delay.seconds).to eq(10)
8991
expect(request.workflow_id_reuse_policy).to eq(:WORKFLOW_ID_REUSE_POLICY_REJECT_DUPLICATE)
9092
expect(request.search_attributes.indexed_fields).to eq({
9193
'foo-int-attribute' => Temporalio::Api::Common::V1::Payload.new(data: '256', metadata: { 'encoding' => 'json/plain' }),
@@ -134,6 +136,7 @@ class TestDeserializer
134136
execution_timeout: 1,
135137
run_timeout: 2,
136138
task_timeout: 3,
139+
start_delay: 10,
137140
workflow_id_reuse_policy: :allow,
138141
signal_name: 'the question',
139142
signal_input: 'what do you get if you multiply six by nine?'
@@ -149,6 +152,7 @@ class TestDeserializer
149152
expect(request.workflow_execution_timeout.seconds).to eq(1)
150153
expect(request.workflow_run_timeout.seconds).to eq(2)
151154
expect(request.workflow_task_timeout.seconds).to eq(3)
155+
expect(request.workflow_start_delay.seconds).to eq(10)
152156
expect(request.signal_name).to eq('the question')
153157
expect(request.signal_input.payloads[0].data).to eq('"what do you get if you multiply six by nine?"')
154158
expect(request.workflow_id_reuse_policy).to eq(:WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE)

0 commit comments

Comments
 (0)