From ff293ee834c1422e0f9e1be5d11b6add34f3a12c Mon Sep 17 00:00:00 2001 From: Maciej Skierkowski Date: Sat, 11 Apr 2015 14:28:51 -0700 Subject: [PATCH] refactor workflow runtime/definition --- lib/factor/workflow/definition.rb | 137 ++++++++++++++++++++++++++++++ lib/factor/workflow/runtime.rb | 129 ++-------------------------- 2 files changed, 143 insertions(+), 123 deletions(-) create mode 100644 lib/factor/workflow/definition.rb diff --git a/lib/factor/workflow/definition.rb b/lib/factor/workflow/definition.rb new file mode 100644 index 0000000..2c85930 --- /dev/null +++ b/lib/factor/workflow/definition.rb @@ -0,0 +1,137 @@ +# encoding: UTF-8 + +require 'factor/commands/base' +require 'factor/common/deep_struct' +require 'factor/common/blocker' +require 'factor/workflow/service_address' +require 'factor/workflow/exec_handler' +require 'factor/connector/runtime' +require 'factor/connector/registry' +require 'factor/connector/definition' + +module Factor + module Workflow + class Definition + attr_accessor :credentials + + def initialize(credentials, options={}) + @workflow_spec = {} + @workflows = {} + @logger = options[:logger] if options[:logger] + @credentials = credentials + @workflow_filename = options[:workflow_filename] + @unload = false + end + + def stop + @unload = true + end + + def listen(service_ref, params = {}, &block) + address, connector_runtime, exec, params_and_creds = initialize_connector_runtime(service_ref,params) + line = caller.first.split(":")[1] + id = @workflow_filename ? "#{service_ref}(#{@workflow_filename}:#{line})" : "#{service_ref}" + + done = false + + connector_runtime.callback do |response| + message = response[:message] + type = response[:type] + + case type + when 'trigger' + success "[#{id}] Triggered" + block.call(Factor::Common.simple_object_convert(response[:payload])) if block + when 'log' + log_callback("[#{id}] #{message}",response[:status]) + when 'fail' + message = response[:message] || 'unkonwn error' + error "[#{id}] Failed: #{message}" + exec.fail_block.call(message) if exec.fail_block + done = true + end + end + + success "[#{id}] Starting" + listener_instance = connector_runtime.start_listener(address.path, params) + success "[#{id}] Started" + + Thread.new do + Factor::Common::Blocker.block_until { done || @unload } + + success "[#{id}] Stopping" + listener_instance = connector_runtime.stop_listener + success "[#{id}] Stopped" + end + + exec + end + + def run(service_ref, params = {}, &block) + address, connector_runtime, exec, params_and_creds = initialize_connector_runtime(service_ref,params) + line = caller.first.split(":")[1] + id = @workflow_filename ? "#{service_ref}(#{@workflow_filename}:#{line})" : "#{service_ref}" + + connector_runtime.callback do |response| + message = response[:message] + type = response[:type] + + case type + when 'log' + log_callback("[#{id}] #{message}",response[:status]) + when 'fail' + error_message = response[:message] || "unknown error" + error "[#{id}] Failed: #{error_message}" + exec.fail_block.call(message) if exec.fail_block + when 'response' + success "[#{id}] Completed" + payload = response[:payload] || {} + block.call(Factor::Common.simple_object_convert(payload)) if block + end + end + + success "[#{id}] Starting" + listener_instance = connector_runtime.run(address.path, params_and_creds) + exec + end + + def success(message) + @logger.success message + end + + def info(message) + @logger.info message + end + + def warn(message) + @logger.warn message + end + + def error(message) + @logger.error message + end + + private + + def initialize_connector_runtime(service_ref, params={}) + address = Factor::Workflow::ServiceAddress.new(service_ref) + service_credentials = @credentials[address.service.to_sym] || {} + exec = Factor::Workflow::ExecHandler.new(service_ref, params) + connector_class = Factor::Connector::Registry.get(address.service) + connector_runtime = Factor::Connector::Runtime.new(connector_class) + params_and_creds = Factor::Common::DeepStruct.new(params.merge(service_credentials)).to_h + + [address, connector_runtime, exec, params_and_creds] + end + + def log_callback(message,status) + case status + when 'info' then info message + when 'warn' then warn message + when 'error' then error message + when 'debug' then error message + end + end + end + end +end diff --git a/lib/factor/workflow/runtime.rb b/lib/factor/workflow/runtime.rb index c13f80c..4fcfc76 100644 --- a/lib/factor/workflow/runtime.rb +++ b/lib/factor/workflow/runtime.rb @@ -1,141 +1,24 @@ # encoding: UTF-8 -require 'factor/commands/base' -require 'factor/common/deep_struct' -require 'factor/common/blocker' -require 'factor/workflow/service_address' -require 'factor/workflow/exec_handler' -require 'factor/connector/runtime' -require 'factor/connector/registry' -require 'factor/connector/definition' +require 'factor/workflow/definition' module Factor module Workflow class Runtime - attr_accessor :name, :description, :credentials - def initialize(credentials, options={}) - @workflow_spec = {} - @workflows = {} - @logger = options[:logger] if options[:logger] - @credentials = credentials - @workflow_filename = options[:workflow_filename] - @unload = false + @definition = Factor::Workflow::Definition.new(credentials, options) end def load(workflow_definition) - @unload = false - instance_eval(workflow_definition) - end - - def unload - @unload = true - end - - def listen(service_ref, params = {}, &block) - address, connector_runtime, exec, params_and_creds = initialize_connector_runtime(service_ref,params) - line = caller.first.split(":")[1] - id = @workflow_filename ? "#{service_ref}(#{@workflow_filename}:#{line})" : "#{service_ref}" - - done = false - - connector_runtime.callback do |response| - message = response[:message] - type = response[:type] - - case type - when 'trigger' - success "[#{id}] Triggered" - block.call(Factor::Common.simple_object_convert(response[:payload])) if block - when 'log' - log_callback("[#{id}] #{message}",response[:status]) - when 'fail' - message = response[:message] || 'unkonwn error' - error "[#{id}] Failed: #{message}" - exec.fail_block.call(message) if exec.fail_block - done = true - end - end - - success "[#{id}] Starting" - listener_instance = connector_runtime.start_listener(address.path, params) - success "[#{id}] Started" - - Thread.new do - Factor::Common::Blocker.block_until { done || @unload } - - success "[#{id}] Stopping" - listener_instance = connector_runtime.stop_listener - success "[#{id}] Stopped" - end - - exec + @definition.instance_eval(workflow_definition) end def run(service_ref, params = {}, &block) - address, connector_runtime, exec, params_and_creds = initialize_connector_runtime(service_ref,params) - line = caller.first.split(":")[1] - id = @workflow_filename ? "#{service_ref}(#{@workflow_filename}:#{line})" : "#{service_ref}" - - connector_runtime.callback do |response| - message = response[:message] - type = response[:type] - - case type - when 'log' - log_callback("[#{id}] #{message}",response[:status]) - when 'fail' - error_message = response[:message] || "unknown error" - error "[#{id}] Failed: #{error_message}" - exec.fail_block.call(message) if exec.fail_block - when 'response' - success "[#{id}] Completed" - payload = response[:payload] || {} - block.call(Factor::Common.simple_object_convert(payload)) if block - end - end - - success "[#{id}] Starting" - listener_instance = connector_runtime.run(address.path, params_and_creds) - exec - end - - def success(message) - @logger.success message - end - - def info(message) - @logger.info message - end - - def warn(message) - @logger.warn message + @definition.run(service_ref, params, &block) end - def error(message) - @logger.error message - end - - private - - def initialize_connector_runtime(service_ref, params={}) - address = Factor::Workflow::ServiceAddress.new(service_ref) - service_credentials = @credentials[address.service.to_sym] || {} - exec = Factor::Workflow::ExecHandler.new(service_ref, params) - connector_class = Factor::Connector::Registry.get(address.service) - connector_runtime = Factor::Connector::Runtime.new(connector_class) - params_and_creds = Factor::Common::DeepStruct.new(params.merge(service_credentials)).to_h - - [address, connector_runtime, exec, params_and_creds] - end - - def log_callback(message,status) - case status - when 'info' then info message - when 'warn' then warn message - when 'error' then error message - when 'debug' then error message - end + def unload + @definition.stop end end end