Skip to content

Commit

Permalink
little refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
skierkowski committed Apr 11, 2015
1 parent 58df074 commit 9a7e086
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 22 deletions.
30 changes: 18 additions & 12 deletions lib/factor/commands/run_command.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,23 +24,29 @@ def run(args, options)
params = JSON.parse(args[1] || '{}')
rescue => ex
logger.error "'#{args[1]}' can't be parsed as JSON"
exit
end

if params
EM.run do
runtime.run(args[0],params) do |response_info|
data = response_info.is_a?(Array) ? response_info.map {|i| i.marshal_dump} : response_info.marshal_dump
JSON.pretty_generate(data).split("\n").each do |line|
logger.info line
end
EM.stop
end.on_fail do
EM.stop
done = false

begin
runtime.run(args[0],params) do |response_info|
data = response_info.is_a?(Array) ? response_info.map {|i| i.marshal_dump} : response_info.marshal_dump
JSON.pretty_generate(data).split("\n").each do |line|
logger.info line
end
done = true
end.on_fail do
done = true
end

logger.info 'Good bye!'
rescue => ex
logger.error ex.message
done = true
end

Factor::Common::Blocker.block_until { done }

logger.info 'Good bye!'
end
end
end
Expand Down
18 changes: 8 additions & 10 deletions lib/factor/workflow/runtime.rb
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,10 @@ def load(workflow_definition)

def listen(service_ref, params = {}, &block)
address, connector_runtime, exec, params_and_creds = initialize_connector_runtime(service_ref,params)
line = caller.first.split(":")[1]
file_id = @workflow_filename ? "#{@workflow_filename}:#{line}" : "line:#{line}"
id = "#{service_ref}(#{file_id})"
line = caller.first.split(":")[1]
id = @workflow_filename ? "#{service_ref}(#{@workflow_filename}:#{line})" : "#{service_ref}"

connector_runtime.callback = proc do |response|
connector_runtime.callback do |response|
message = response[:message]
type = response[:type]

Expand All @@ -40,7 +39,7 @@ def listen(service_ref, params = {}, &block)
success "[#{id}] Triggered"
block.call(Factor::Common.simple_object_convert(response[:payload])) if block
when 'log'
log_callback(" [#{id}] #{message}",response[:status])
log_callback("[#{id}] #{message}",response[:status])
when 'fail'
message = response[:message] || 'unkonwn error'
error "[#{id}] Failed: #{message}"
Expand All @@ -56,17 +55,16 @@ def listen(service_ref, params = {}, &block)

def run(service_ref, params = {}, &block)
address, connector_runtime, exec, params_and_creds = initialize_connector_runtime(service_ref,params)
line = caller.first.split(":")[1]
file_id = @workflow_filename ? "#{@workflow_filename}:#{line}" : "line:#{line}"
id = "#{service_ref}(#{file_id})"
line = caller.first.split(":")[1]
id = @workflow_filename ? "#{service_ref}(#{@workflow_filename}:#{line})" : "#{service_ref}"

connector_runtime.callback = Proc.new do |response|
connector_runtime.callback do |response|
message = response[:message]
type = response[:type]

case type
when 'log'
log_callback(" [#{id}] #{message}",response[:status])
log_callback("[#{id}] #{message}",response[:status])
when 'fail'
error_message = response[:message] || "unknown error"
error "[#{id}] Failed: #{error_message}"
Expand Down

0 comments on commit 9a7e086

Please sign in to comment.