Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion temporalio/Steepfile
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ target :lib do

ignore 'lib/temporalio/api', 'lib/temporalio/internal/bridge/api'

library 'uri'
library 'uri', 'objspace'

configure_code_diagnostics do |hash|
# TODO(cretz): Fix as more protos are generated
Expand Down
156 changes: 76 additions & 80 deletions temporalio/lib/temporalio/internal/worker/workflow_instance.rb
Original file line number Diff line number Diff line change
Expand Up @@ -162,86 +162,9 @@ def initialize(details)
end

def activate(activation)
# Run inside of scheduler
run_in_scheduler { activate_internal(activation) }
end

def add_command(command)
raise Workflow::InvalidWorkflowStateError, 'Cannot add commands in this context' if @context_frozen

@commands << command
end

def instance
@instance or raise 'Instance accessed before created'
end

def search_attributes
# Lazy on first access
@search_attributes ||= SearchAttributes._from_proto(
@init_job.search_attributes, disable_mutations: true, never_nil: true
) || raise
end

def memo
# Lazy on first access
@memo ||= ExternallyImmutableHash.new(ProtoUtils.memo_from_proto(@init_job.memo, payload_converter) || {})
end

def now
# Create each time
ProtoUtils.timestamp_to_time(@now_timestamp) or raise 'Time unexpectedly not present'
end

def illegal_call_tracing_disabled(&)
@tracer.disable(&)
end

def patch(patch_id:, deprecated:)
# Use memoized result if present. If this is being deprecated, we can still use memoized result and skip the
# command.
patch_id = patch_id.to_s
@patches_memoized ||= {}
@patches_memoized.fetch(patch_id) do
patched = !replaying || @patches_notified.include?(patch_id)
@patches_memoized[patch_id] = patched
if patched
add_command(
Bridge::Api::WorkflowCommands::WorkflowCommand.new(
set_patch_marker: Bridge::Api::WorkflowCommands::SetPatchMarker.new(patch_id:, deprecated:)
)
)
end
patched
end
end

def metric_meter
@metric_meter ||= ReplaySafeMetric::Meter.new(
@runtime_metric_meter.with_additional_attributes(
{
namespace: info.namespace,
task_queue: info.task_queue,
workflow_type: info.workflow_type
}
)
)
end

private

def run_in_scheduler(&)
# Run inside of scheduler (removed on ensure)
Fiber.set_scheduler(@scheduler)
if @tracer
@tracer.enable(&)
else
yield
end
ensure
Fiber.set_scheduler(nil)
end

def activate_internal(activation)
# Reset some activation state
@commands = []
@current_activation_error = nil
Expand All @@ -266,8 +189,12 @@ def activate_internal(activation)
# the first activation)
@primary_fiber ||= schedule(top_level: true) { run_workflow }

# Run the event loop
@scheduler.run_until_all_yielded
# Run the event loop in the tracer if it exists
if @tracer
@tracer.enable { @scheduler.run_until_all_yielded }
else
@scheduler.run_until_all_yielded
end
rescue Exception => e # rubocop:disable Lint/RescueException
on_top_level_exception(e)
end
Expand Down Expand Up @@ -306,8 +233,77 @@ def activate_internal(activation)
ensure
@commands = nil
@current_activation_error = nil
Fiber.set_scheduler(nil)
end

def add_command(command)
raise Workflow::InvalidWorkflowStateError, 'Cannot add commands in this context' if @context_frozen

@commands << command
end

def instance
@instance or raise 'Instance accessed before created'
end

def search_attributes
# Lazy on first access
@search_attributes ||= SearchAttributes._from_proto(
@init_job.search_attributes, disable_mutations: true, never_nil: true
) || raise
end

def memo
# Lazy on first access
@memo ||= ExternallyImmutableHash.new(ProtoUtils.memo_from_proto(@init_job.memo, payload_converter) || {})
end

def now
# Create each time
ProtoUtils.timestamp_to_time(@now_timestamp) or raise 'Time unexpectedly not present'
end

def illegal_call_tracing_disabled(&)
if @tracer
@tracer.disable_temporarily(&)
else
yield
end
end

def patch(patch_id:, deprecated:)
# Use memoized result if present. If this is being deprecated, we can still use memoized result and skip the
# command.
patch_id = patch_id.to_s
@patches_memoized ||= {}
@patches_memoized.fetch(patch_id) do
patched = !replaying || @patches_notified.include?(patch_id)
@patches_memoized[patch_id] = patched
if patched
add_command(
Bridge::Api::WorkflowCommands::WorkflowCommand.new(
set_patch_marker: Bridge::Api::WorkflowCommands::SetPatchMarker.new(patch_id:, deprecated:)
)
)
end
patched
end
end

def metric_meter
@metric_meter ||= ReplaySafeMetric::Meter.new(
@runtime_metric_meter.with_additional_attributes(
{
namespace: info.namespace,
task_queue: info.task_queue,
workflow_type: info.workflow_type
}
)
)
end

private

def create_instance
# Convert workflow arguments
@workflow_arguments = convert_args(payload_array: @init_job.arguments,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ def initialize(illegal_calls)
when :all
''
when Temporalio::Worker::IllegalWorkflowCallValidator
disable do
disable_temporarily do
vals.block.call(Temporalio::Worker::IllegalWorkflowCallValidator::CallInfo.new(
class_name:, method_name: tp.callee_id, trace_point: tp
))
Expand All @@ -98,7 +98,7 @@ def initialize(illegal_calls)
when true
''
when Temporalio::Worker::IllegalWorkflowCallValidator
disable do
disable_temporarily do
per_method.block.call(Temporalio::Worker::IllegalWorkflowCallValidator::CallInfo.new(
class_name:, method_name: tp.callee_id, trace_point: tp
))
Expand All @@ -118,8 +118,11 @@ def initialize(illegal_calls)
end

def enable(&block)
# We've seen leaking issues in Ruby 3.2 where the TracePoint inadvertently remains enabled even for threads
# that it was not started on. So we will check the thread ourselves.
# This is not reentrant and not expected to be called as such. We've seen leaking issues in Ruby 3.2 where
# the TracePoint inadvertently remains enabled even for threads that it was not started on. So we will check
# the thread ourselves. We also use the "enabled thread" concept for disabling checks too, see
# disable_temporarily for more details.

@enabled_thread = Thread.current
@tracepoint.enable do
block.call
Expand All @@ -128,13 +131,17 @@ def enable(&block)
end
end

def disable(&block)
def disable_temporarily(&)
# An earlier version of this used @tracepoint.disable, but in some versions of Ruby, the observed behavior
# is confusingly not reentrant or at least not predictable. Therefore, instead of calling
# @tracepoint.disable, we are just unsetting the enabled thread. This means the tracer is still running, but
# no checks are performed. This is effectively a no-op if tracing was never enabled.

previous_thread = @enabled_thread
@tracepoint.disable do
block.call
ensure
@enabled_thread = previous_thread
end
@enabled_thread = nil
yield
ensure
@enabled_thread = previous_thread
end
end
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,6 @@ module Temporalio

def metric_meter: -> Temporalio::Metric::Meter

def run_in_scheduler: [T] { -> T } -> T

def activate_internal: (untyped activation) -> untyped

def create_instance: -> Temporalio::Workflow::Definition

def apply: (untyped job) -> void
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ module Temporalio
) -> void

def enable: [T] { -> T } -> T
def disable: [T] { -> T } -> T
def disable_temporarily: [T] { -> T } -> T
end
end
end
Expand Down
Loading
Loading