Class MCollective::Client
In: lib/mcollective/client.rb
Parent: Object

Helpers for writing clients that can talk to agents, do discovery and so forth

Methods

Attributes

options  [RW] 
stats  [RW] 

Public Class methods

[Source]

    # File lib/mcollective/client.rb, line 6
 6:     def initialize(configfile)
 7:       @config = Config.instance
 8:       @config.loadconfig(configfile) unless @config.configured
 9: 
10:       @connection = PluginManager["connector_plugin"]
11:       @security = PluginManager["security_plugin"]
12: 
13:       @security.initiated_by = :client
14:       @options = nil
15:       @subscriptions = {}
16: 
17:       @connection.connect
18:     end

Public Instance methods

Returns the configured main collective if no specific collective is specified as options

[Source]

    # File lib/mcollective/client.rb, line 22
22:     def collective
23:       if @options[:collective].nil?
24:         @config.main_collective
25:       else
26:         @options[:collective]
27:       end
28:     end

Disconnects cleanly from the middleware

[Source]

    # File lib/mcollective/client.rb, line 31
31:     def disconnect
32:       Log.debug("Disconnecting from the middleware")
33:       @connection.disconnect
34:     end

Performs a discovery of nodes matching the filter passed returns an array of nodes

An integer limit can be supplied this will have the effect of the discovery being cancelled soon as it reached the requested limit of hosts

[Source]

     # File lib/mcollective/client.rb, line 113
113:     def discover(filter, timeout, limit=0)
114:       raise "Limit has to be an integer" unless limit.is_a?(Fixnum)
115: 
116:       begin
117:         hosts = []
118:         Timeout.timeout(timeout) do
119:           reqid = sendreq("ping", "discovery", filter)
120:           Log.debug("Waiting #{timeout} seconds for discovery replies to request #{reqid}")
121: 
122:           loop do
123:             reply = receive(reqid)
124:             Log.debug("Got discovery reply from #{reply.payload[:senderid]}")
125:             hosts << reply.payload[:senderid]
126: 
127:             return hosts if limit > 0 && hosts.size == limit
128:           end
129:         end
130:       rescue Timeout::Error => e
131:       rescue Exception => e
132:         raise
133:       ensure
134:         unsubscribe("discovery", :reply)
135:       end
136: 
137:       hosts.sort
138:     end

Performs a discovery and then send a request, performs the passed block for each response

   times = discovered_req("status", "mcollectived", options, client) {|resp|
      pp resp
   }

It returns a hash of times and timeouts for discovery and total run is taken from the options hash which in turn is generally built using MCollective::Optionparser

[Source]

     # File lib/mcollective/client.rb, line 200
200:     def discovered_req(body, agent, options=false)
201:       stat = {:starttime => Time.now.to_f, :discoverytime => 0, :blocktime => 0, :totaltime => 0}
202: 
203:       options = @options unless options
204: 
205:       STDOUT.sync = true
206: 
207:       print("Determining the amount of hosts matching filter for #{options[:disctimeout]} seconds .... ")
208: 
209:       begin
210:         discovered_hosts = discover(options[:filter], options[:disctimeout])
211:         discovered = discovered_hosts.size
212:         hosts_responded = []
213:         hosts_not_responded = discovered_hosts
214: 
215:         stat[:discoverytime] = Time.now.to_f - stat[:starttime]
216: 
217:         puts("#{discovered}\n\n")
218:       rescue Interrupt
219:         puts("Discovery interrupted.")
220:         exit!
221:       end
222: 
223:       raise("No matching clients found") if discovered == 0
224: 
225:       begin
226:         Timeout.timeout(options[:timeout]) do
227:           reqid = sendreq(body, agent, options[:filter])
228: 
229:           (1..discovered).each do |c|
230:             resp = receive(reqid)
231: 
232:             hosts_responded << resp.payload[:senderid]
233:             hosts_not_responded.delete(resp.payload[:senderid]) if hosts_not_responded.include?(resp.payload[:senderid])
234: 
235:             yield(resp.payload)
236:           end
237:         end
238:       rescue Interrupt => e
239:       rescue Timeout::Error => e
240:       end
241: 
242:       stat[:totaltime] = Time.now.to_f - stat[:starttime]
243:       stat[:blocktime] = stat[:totaltime] - stat[:discoverytime]
244:       stat[:responses] = hosts_responded.size
245:       stat[:responsesfrom] = hosts_responded
246:       stat[:noresponsefrom] = hosts_not_responded
247:       stat[:discovered] = discovered
248: 
249:       @stats = stat
250:       return stat
251:     end

Prints out the stats returns from req and discovered_req in a nice way

[Source]

     # File lib/mcollective/client.rb, line 254
254:     def display_stats(stats, options=false, caption="stomp call summary")
255:       options = @options unless options
256: 
257:       if options[:verbose]
258:         puts("\n---- #{caption} ----")
259: 
260:         if stats[:discovered]
261:           puts("           Nodes: #{stats[:discovered]} / #{stats[:responses]}")
262:         else
263:           puts("           Nodes: #{stats[:responses]}")
264:         end
265: 
266:         printf("      Start Time: %s\n", Time.at(stats[:starttime]))
267:         printf("  Discovery Time: %.2fms\n", stats[:discoverytime] * 1000)
268:         printf("      Agent Time: %.2fms\n", stats[:blocktime] * 1000)
269:         printf("      Total Time: %.2fms\n", stats[:totaltime] * 1000)
270: 
271:       else
272:         if stats[:discovered]
273:           printf("\nFinished processing %d / %d hosts in %.2f ms\n\n", stats[:responses], stats[:discovered], stats[:blocktime] * 1000)
274:         else
275:           printf("\nFinished processing %d hosts in %.2f ms\n\n", stats[:responses], stats[:blocktime] * 1000)
276:         end
277:       end
278: 
279:       if stats[:noresponsefrom].size > 0
280:         puts("\nNo response from:\n")
281: 
282:         stats[:noresponsefrom].each do |c|
283:           puts if c % 4 == 1
284:           printf("%30s", c)
285:         end
286: 
287:         puts
288:       end
289:     end

Blocking call that waits for ever for a message to arrive.

If you give it a requestid this means you‘ve previously send a request with that ID and now you just want replies that matches that id, in that case the current connection will just ignore all messages not directed at it and keep waiting for more till it finds a matching message.

[Source]

     # File lib/mcollective/client.rb, line 83
 83:     def receive(requestid = nil)
 84:       reply = nil
 85: 
 86:       begin
 87:         reply = @connection.receive
 88:         reply.type = :reply
 89:         reply.expected_msgid = requestid
 90: 
 91:         reply.decode!
 92: 
 93:         reply.payload[:senderid] = Digest::MD5.hexdigest(reply.payload[:senderid]) if ENV.include?("MCOLLECTIVE_ANON")
 94: 
 95:         raise(MsgDoesNotMatchRequestID, "Message reqid #{requestid} does not match our reqid #{reply.requestid}") unless reply.requestid == requestid
 96:       rescue SecurityValidationFailed => e
 97:         Log.warn("Ignoring a message that did not pass security validations")
 98:         retry
 99:       rescue MsgDoesNotMatchRequestID => e
100:         Log.debug("Ignoring a message for some other client")
101:         retry
102:       end
103: 
104:       reply
105:     end

Send a request, performs the passed block for each response

times = req("status", "mcollectived", options, client) {|resp|

  pp resp

}

It returns a hash of times and timeouts for discovery and total run is taken from the options hash which in turn is generally built using MCollective::Optionparser

[Source]

     # File lib/mcollective/client.rb, line 148
148:     def req(body, agent=nil, options=false, waitfor=0)
149:       if body.is_a?(Message)
150:         agent = body.agent
151:         options = body.options
152:         waitfor = body.discovered_hosts.size || 0
153:       end
154: 
155:       stat = {:starttime => Time.now.to_f, :discoverytime => 0, :blocktime => 0, :totaltime => 0}
156: 
157:       options = @options unless options
158: 
159:       STDOUT.sync = true
160: 
161:       hosts_responded = 0
162: 
163:       begin
164:         Timeout.timeout(options[:timeout]) do
165:           reqid = sendreq(body, agent, options[:filter])
166: 
167:           loop do
168:             resp = receive(reqid)
169: 
170:             hosts_responded += 1
171: 
172:             yield(resp.payload)
173: 
174:             break if (waitfor != 0 && hosts_responded >= waitfor)
175:           end
176:         end
177:       rescue Interrupt => e
178:       rescue Timeout::Error => e
179:       ensure
180:         unsubscribe(agent, :reply)
181:       end
182: 
183:       stat[:totaltime] = Time.now.to_f - stat[:starttime]
184:       stat[:blocktime] = stat[:totaltime] - stat[:discoverytime]
185:       stat[:responses] = hosts_responded
186:       stat[:noresponsefrom] = []
187: 
188:       @stats = stat
189:       return stat
190:     end

Sends a request and returns the generated request id, doesn‘t wait for responses and doesn‘t execute any passed in code blocks for responses

[Source]

    # File lib/mcollective/client.rb, line 38
38:     def sendreq(msg, agent, filter = {})
39:       if msg.is_a?(Message)
40:         request = msg
41:         agent = request.agent
42:       else
43:         ttl = @options[:ttl] || @config.ttl
44:         request = Message.new(msg, nil, {:agent => agent, :type => :request, :collective => collective, :filter => filter, :ttl => ttl})
45:       end
46: 
47:       request.encode!
48: 
49:       Log.debug("Sending request #{request.requestid} to the #{request.agent} agent with ttl #{request.ttl} in collective #{request.collective}")
50: 
51:       subscribe(agent, :reply)
52: 
53:       request.publish
54: 
55:       request.requestid
56:     end

[Source]

    # File lib/mcollective/client.rb, line 58
58:     def subscribe(agent, type)
59:       unless @subscriptions.include?(agent)
60:         subscription = Util.make_subscriptions(agent, type, collective)
61:         Log.debug("Subscribing to #{type} target for agent #{agent}")
62: 
63:         Util.subscribe(subscription)
64:         @subscriptions[agent] = 1
65:       end
66:     end

[Source]

    # File lib/mcollective/client.rb, line 68
68:     def unsubscribe(agent, type)
69:       if @subscriptions.include?(agent)
70:         subscription = Util.make_subscriptions(agent, type, collective)
71:         Log.debug("Unsubscribing #{type} target for #{agent}")
72: 
73:         Util.unsubscribe(subscription)
74:         @subscriptions.delete(agent)
75:       end
76:     end

[Validate]