Skip to content

Commit cd07caa

Browse files
authored
Activity reset support (#337)
Fixes #254
1 parent 51b5f95 commit cd07caa

File tree

6 files changed

+125
-6
lines changed

6 files changed

+125
-6
lines changed

temporalio/lib/temporalio/client/async_activity_handle.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ def initialize(client:, task_token:, id_reference:)
2929
# @param details [Array<Object>] Details of the heartbeat.
3030
# @param detail_hints [Array<Object>, nil] Converter hints for the details.
3131
# @param rpc_options [RPCOptions, nil] Advanced RPC options.
32+
# @raise [Error::AsyncActivityCanceledError] If the activity was canceled, paused, and/or reset.
3233
def heartbeat(*details, detail_hints: nil, rpc_options: nil)
3334
@client._impl.heartbeat_async_activity(Interceptor::HeartbeatAsyncActivityInput.new(
3435
task_token_or_id_reference:,

temporalio/lib/temporalio/error.rb

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,9 +97,13 @@ def initialize
9797

9898
# Error that occurs when an async activity handle tries to heartbeat and the activity is marked as canceled.
9999
class AsyncActivityCanceledError < Error
100+
# @return [Activity::CancellationDetails]
101+
attr_reader :details
102+
100103
# @!visibility private
101-
def initialize
104+
def initialize(details)
102105
super('Activity canceled')
106+
@details = details
103107
end
104108
end
105109

temporalio/lib/temporalio/internal/client/implementation.rb

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
require 'google/protobuf/well_known_types'
44
require 'securerandom'
5+
require 'temporalio/activity'
56
require 'temporalio/api'
67
require 'temporalio/client/activity_id_reference'
78
require 'temporalio/client/async_activity_handle'
@@ -829,9 +830,13 @@ def heartbeat_async_activity(input)
829830
rpc_options: Implementation.with_default_rpc_options(input.rpc_options)
830831
)
831832
end
832-
raise Error::AsyncActivityCanceledError if resp.cancel_requested
833+
return unless resp.cancel_requested || resp.activity_paused || resp.activity_reset
833834

834-
nil
835+
raise Error::AsyncActivityCanceledError, Activity::CancellationDetails.new(
836+
cancel_requested: resp.cancel_requested,
837+
paused: resp.activity_paused,
838+
reset: resp.activity_reset
839+
)
835840
end
836841

837842
def complete_async_activity(input)

temporalio/sig/temporalio/error.rbs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,16 @@ module Temporalio
3535
def initialize: -> void
3636
end
3737

38+
class ScheduleAlreadyRunningError < Error
39+
def initialize: -> void
40+
end
41+
42+
class AsyncActivityCanceledError < Error
43+
attr_reader details: Activity::CancellationDetails
44+
45+
def initialize: (Activity::CancellationDetails details) -> void
46+
end
47+
3848
class RPCError < Error
3949
attr_reader code: Code::enum
4050

temporalio/test/worker_activity_test.rb

Lines changed: 76 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -562,16 +562,27 @@ def test_worker_shutdown
562562
class AsyncCompletionActivity < Temporalio::Activity::Definition
563563
def initialize
564564
@task_token = Queue.new
565+
@id_ref = Queue.new
565566
end
566567

567568
def execute
568-
@task_token.push(Temporalio::Activity::Context.current.info.task_token)
569+
info = Temporalio::Activity::Context.current.info
570+
@task_token.push(info.task_token)
571+
@id_ref.push(Temporalio::Client::ActivityIDReference.new(
572+
workflow_id: info.workflow_id,
573+
run_id: info.workflow_run_id,
574+
activity_id: info.activity_id
575+
))
569576
raise Temporalio::Activity::CompleteAsyncError
570577
end
571578

572579
def wait_task_token
573580
@task_token.pop
574581
end
582+
583+
def wait_id_ref
584+
@id_ref.pop
585+
end
575586
end
576587

577588
def test_async_completion_success
@@ -629,6 +640,70 @@ def test_async_completion_cancel
629640
end
630641
end
631642

643+
def test_async_completion_cancel_details
644+
# Cancel
645+
act = AsyncCompletionActivity.new
646+
execute_activity(act, wait_for_cancellation: true) do |handle|
647+
task_token = act.wait_task_token
648+
handle.cancel
649+
assert_eventually do
650+
err = assert_raises(Temporalio::Error::AsyncActivityCanceledError) do
651+
env.client.async_activity_handle(task_token).heartbeat
652+
end
653+
assert err.details.cancel_requested?
654+
refute err.details.paused?
655+
refute err.details.reset?
656+
end
657+
end
658+
659+
# Pause
660+
act = AsyncCompletionActivity.new
661+
execute_activity(act, wait_for_cancellation: true) do
662+
id_ref = act.wait_id_ref
663+
env.client.workflow_service.pause_activity(Temporalio::Api::WorkflowService::V1::PauseActivityRequest.new(
664+
namespace: env.client.namespace,
665+
execution: Temporalio::Api::Common::V1::WorkflowExecution.new(
666+
workflow_id: id_ref.workflow_id,
667+
run_id: id_ref.run_id
668+
),
669+
identity: env.client.connection.options.identity,
670+
id: id_ref.activity_id,
671+
reason: 'my reason'
672+
))
673+
assert_eventually do
674+
err = assert_raises(Temporalio::Error::AsyncActivityCanceledError) do
675+
env.client.async_activity_handle(id_ref).heartbeat
676+
end
677+
refute err.details.cancel_requested?
678+
assert err.details.paused?
679+
refute err.details.reset?
680+
end
681+
end
682+
683+
# Reset
684+
act = AsyncCompletionActivity.new
685+
execute_activity(act, wait_for_cancellation: true) do
686+
id_ref = act.wait_id_ref
687+
env.client.workflow_service.reset_activity(Temporalio::Api::WorkflowService::V1::ResetActivityRequest.new(
688+
namespace: env.client.namespace,
689+
execution: Temporalio::Api::Common::V1::WorkflowExecution.new(
690+
workflow_id: id_ref.workflow_id,
691+
run_id: id_ref.run_id
692+
),
693+
identity: env.client.connection.options.identity,
694+
id: id_ref.activity_id
695+
))
696+
assert_eventually do
697+
err = assert_raises(Temporalio::Error::AsyncActivityCanceledError) do
698+
env.client.async_activity_handle(id_ref).heartbeat
699+
end
700+
refute err.details.cancel_requested?
701+
refute err.details.paused?
702+
assert err.details.reset?
703+
end
704+
end
705+
end
706+
632707
def test_async_completion_timeout
633708
act = AsyncCompletionActivity.new
634709
execute_activity(act, start_to_close_timeout: 0.5, wait_for_cancellation: true) do

temporalio/test/worker_workflow_activity_test.rb

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -277,7 +277,7 @@ def execute(swallow)
277277
raise unless swallow
278278

279279
det = Temporalio::Activity::Context.current.cancellation_details
280-
"canceled - paused: #{det&.paused?}, requested: #{det&.cancel_requested?}"
280+
"canceled - paused: #{det&.paused?}, requested: #{det&.cancel_requested?}, reset: #{det&.reset?}"
281281
end
282282
end
283283

@@ -312,7 +312,7 @@ def test_cancellation_pause
312312
reason: 'my reason'
313313
)
314314
env.client.workflow_service.pause_activity(req)
315-
assert_equal 'canceled - paused: true, requested: false', handle.result
315+
assert_equal 'canceled - paused: true, requested: false, reset: false', handle.result
316316
end
317317

318318
# Re-raise
@@ -344,4 +344,28 @@ def test_cancellation_pause
344344
end
345345
end
346346
end
347+
348+
def test_cancellation_reset
349+
queue = Queue.new
350+
execute_workflow(
351+
CancellationDetailsWorkflow, true,
352+
activities: [CancellationDetailsActivity.new(queue)]
353+
) do |handle|
354+
# Wait for activity to start
355+
activity_id = queue.pop(timeout: 10)
356+
assert activity_id
357+
# Send reset, and confirm we get what we expect
358+
req = Temporalio::Api::WorkflowService::V1::ResetActivityRequest.new(
359+
namespace: env.client.namespace,
360+
execution: Temporalio::Api::Common::V1::WorkflowExecution.new(
361+
workflow_id: handle.id,
362+
run_id: handle.result_run_id
363+
),
364+
identity: env.client.connection.options.identity,
365+
id: activity_id
366+
)
367+
env.client.workflow_service.reset_activity(req)
368+
assert_equal 'canceled - paused: false, requested: false, reset: true', handle.result
369+
end
370+
end
347371
end

0 commit comments

Comments
 (0)