forked from choria-legacy/marionette-collective
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathagents.rb
140 lines (112 loc) · 4.89 KB
/
agents.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
module MCollective
# A collection of agents, loads them, reloads them and dispatches messages to them.
# It uses the PluginManager to store, load and manage instances of plugins.
class Agents
def initialize
@config = Config.instance
raise ("Configuration has not been loaded, can't load agents") unless @config.configured
@@agents = {}
loadagents
end
# Loads all agents from disk
def loadagents
Log.debug("Reloading all agents from disk")
# We're loading all agents so just nuke all the old agents and unsubscribe
connector = PluginManager["connector_plugin"]
@@agents.each_key do |agent|
PluginManager.delete "#{agent}_agent"
connector.unsubscribe(Util.make_target(agent, :command))
end
@@agents = {}
@config.libdir.each do |libdir|
agentdir = "#{libdir}/mcollective/agent"
raise("Cannot find agents directory: '#{agentdir}'") unless File.directory?(agentdir)
Dir.new(agentdir).grep(/\.rb$/).each do |agent|
agentname = File.basename(agent, ".rb")
loadagent(agentname) unless PluginManager.include?("#{agentname}_agent")
end
end
end
# Loads a specified agent from disk if available
def loadagent(agentname)
agentfile = findagentfile(agentname)
return false unless agentfile
classname = "MCollective::Agent::#{agentname.capitalize}"
PluginManager.delete("#{agentname}_agent")
begin
PluginManager.loadclass(classname)
PluginManager << {:type => "#{agentname}_agent", :class => classname}
PluginManager["connector_plugin"].subscribe(Util.make_target(agentname, :command)) unless @@agents.include?(agentname)
@@agents[agentname] = {:file => agentfile}
return true
rescue Exception => e
Log.error("Loading agent #{agentname} failed: #{e}")
PluginManager.delete("#{agentname}_agent")
end
end
# searches the libdirs for agents
def findagentfile(agentname)
@config.libdir.each do |libdir|
agentfile = "#{libdir}/mcollective/agent/#{agentname}.rb"
if File.exist?(agentfile)
Log.debug("Found #{agentname} at #{agentfile}")
return agentfile
end
end
return false
end
# Determines if we have an agent with a certain name
def include?(agentname)
PluginManager.include?("#{agentname}_agent")
end
# Sends a message to a specific agent
def send(agentname, msg, connection)
raise("No such agent") unless include?(agentname)
PluginManager["#{agentname}_agent"].handlemsg(msg, connection)
end
# Returns the help for an agent after first trying to get
# rid of some indentation infront
def help(agentname)
raise("No such agent") unless include?(agentname)
body = PluginManager["#{agentname}_agent"].help.split("\n")
if body.first =~ /^(\s+)\S/
indent = $1
body = body.map {|b| b.gsub(/^#{indent}/, "")}
end
body.join("\n")
end
# Determine the max amount of time a specific agent should be running
def timeout(agentname)
raise("No such agent") unless include?(agentname)
PluginManager["#{agentname}_agent"].timeout
end
# Dispatches a message to an agent, accepts a block that will get run if there are
# any replies to process from the agent
def dispatch(msg, target, connection)
Log.debug("Dispatching a message to agent #{target}")
Thread.new do
begin
Timeout::timeout(timeout(target)) do
replies = send(target, msg, connection)
# Agents can decide if they wish to reply or not,
# returning nil will mean nothing goes back to the
# requestor
unless replies == nil
yield(replies)
end
end
rescue Timeout::Error => e
Log.warn("Timeout while handling message for #{target}")
rescue Exception => e
Log.error("Execution of #{target} failed: #{e}")
Log.error(e.backtrace.join("\n\t\t"))
end
end
end
# Get a list of agents that we have
def self.agentlist
@@agents.keys
end
end
end
# vi:tabstop=4:expandtab:ai