Skip to content

Commit

Permalink
Fixes #32961 - Groundwork for async batch triggering
Browse files Browse the repository at this point in the history
  • Loading branch information
adamruzicka authored and ezr-ondrej committed Dec 3, 2021
1 parent dc27d13 commit ed53d40
Show file tree
Hide file tree
Showing 8 changed files with 85 additions and 19 deletions.
21 changes: 21 additions & 0 deletions lib/smart_proxy_dynflow/action/batch.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,25 @@ def rescue_strategy
Dynflow::Action::Rescue::Fail
end
end

class AsyncBatch < ::Dynflow::Action
include Dynflow::Action::WithSubPlans
include Dynflow::Action::WithPollingSubPlans

# { execution_plan_uuid => { :action_class => Klass, :input => input } }
def plan(launcher, input_hash)
plan_self :input_hash => input_hash,
:launcher => launcher.to_hash
end

def create_sub_plans
Proxy::Dynflow::TaskLauncher::Abstract
.new_from_hash(world, input[:launcher])
.launch_children(self, input[:input_hash])
end

def rescue_strategy
Dynflow::Action::Rescue::Fail
end
end
end
5 changes: 4 additions & 1 deletion lib/smart_proxy_dynflow/action/batch_callback.rb
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
module Proxy::Dynflow::Action
class BatchCallback < ::Dynflow::Action
def plan(input_hash, results)
plan_self :targets => input_hash, :results => results
callbacks = input_hash.reduce({}) do |acc, (key, value)|
acc.merge(key => value['action_input']['callback'])
end
plan_self :targets => callbacks, :results => results
end

def run
Expand Down
26 changes: 26 additions & 0 deletions lib/smart_proxy_dynflow/action/single_runner_batch.rb
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,30 @@ def rescue_strategy_for_self
Dynflow::Action::Rescue::Skip
end
end

class AsyncSingleRunnerBatch < AsyncBatch
def plan(launcher, input_hash)
results = super
plan_action BatchCallback, input_hash, results.output[:results]
end

def check_for_errors!(optional = true)
super unless optional
end

def on_finish
output[:results] = sub_plans.map(&:entry_action).reduce({}) do |acc, cur|
acc.merge(cur.execution_plan_id => cur.output)
end
end

def finalize
output.delete(:results)
check_for_errors!
end

def rescue_strategy_for_self
Dynflow::Action::Rescue::Skip
end
end
end
2 changes: 1 addition & 1 deletion lib/smart_proxy_dynflow/api.rb
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class Api < ::Sinatra::Base
post "/tasks/launch/?" do
params = MultiJson.load(request.body.read)
launcher = launcher_class(params).new(world, callback_host(params, request), params.fetch('options', {}))
launcher.launch!(params['input'])
plan = launcher.launch!(params['input'])
launcher.results.to_json
end

Expand Down
12 changes: 10 additions & 2 deletions lib/smart_proxy_dynflow/task_launcher/abstract.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,14 @@ def launch!(_input)

def self.input_format; end

def to_hash
{:class => self.class.to_s, :callback => callback, :options => options}
end

def self.new_from_hash(world, hash)
::Dynflow::Utils.constantize(hash[:class]).new(world, hash[:callback], hash[:options])
end

private

def format_result(result)
Expand All @@ -34,9 +42,9 @@ def with_callback(input)
input.merge(:callback_host => callback)
end

def trigger(parent, klass, *input)
def trigger(parent, klass, *input, id: nil)
world.trigger do
world.plan_with_options(caller_action: parent, action_class: klass, args: input)
world.plan_with_options(caller_action: parent, action_class: klass, args: input, id: id)
end
end
end
Expand Down
29 changes: 18 additions & 11 deletions lib/smart_proxy_dynflow/task_launcher/batch.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,36 +2,43 @@ module Proxy::Dynflow
module TaskLauncher
class Batch < Abstract
def launch!(input)
trigger(nil, Proxy::Dynflow::Action::Batch, self, input)
plan = trigger(nil, action_class, self, input)
results[:parent] = format_result(plan)
end

def launch_children(parent, input_hash)
input_hash.each do |task_id, input|
input_hash.map do |task_id, input|
launcher = child_launcher(parent)
launcher.launch!(transform_input(input))
triggered = launcher.launch!(transform_input(input), id: task_id)
results[task_id] = launcher.results
triggered
end
end

def prepare_batch(input_hash)
success_tasks = input_hash.select do |task_id, _input|
results[task_id][:result] == 'success'
end
success_tasks.reduce({}) do |acc, (key, value)|
acc.merge(results[key][:task_id] => value['action_input']['callback'])
end
input_hash
end

private

def child_launcher(parent)
Single.new(world, callback, :parent => parent)
end

private

# Identity by default
def transform_input(input)
input
end

def action_class
Proxy::Dynflow::Action::Batch
end
end

class AsyncBatch < Batch
def action_class
Proxy::Dynflow::Action::AsyncBatch
end
end
end
end
4 changes: 2 additions & 2 deletions lib/smart_proxy_dynflow/task_launcher/group.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ def self.runner_class
raise NotImplementedError
end

def launch!(input)
trigger(nil, Action::SingleRunnerBatch, self, input)
def action_class
Action::AsyncSingleRunnerBatch
end

def launch_children(parent, input_hash)
Expand Down
5 changes: 3 additions & 2 deletions lib/smart_proxy_dynflow/task_launcher/single.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@ def self.input_format
{ :action_class => "MyActionClass", :action_input => {} }
end

def launch!(input)
def launch!(input, id: nil)
triggered = trigger(options[:parent],
action_class(input),
with_callback(input.fetch('action_input', {})))
with_callback(input.fetch('action_input', {})),
id: id)
@results = format_result(triggered)
triggered
end
Expand Down

0 comments on commit ed53d40

Please sign in to comment.