Class MCollective::Message
In: lib/mcollective/message.rb
Parent: Object

container for a message, its headers, agent, collective and other meta data

Methods

Constants

VALIDTYPES = [:message, :request, :direct_request, :reply]

Attributes

agent  [RW] 
collective  [RW] 
discovered_hosts  [RW] 
expected_msgid  [R] 
filter  [RW] 
headers  [RW] 
message  [R] 
msgtime  [R] 
options  [RW] 
payload  [R] 
request  [R] 
requestid  [RW] 
ttl  [RW] 
type  [R] 
validated  [R] 

Public Class methods

payload - the message body without headers etc, just the text message - the original message received from the middleware options[:base64] - if the body base64 encoded? options[:agent] - the agent the message is for/from options[:collective] - the collective its for/from options[:headers] - the message headers options[:type] - an indicator about the type of message, :message, :request, :direct_request or :reply options[:request] - if this is a reply this should old the message we are replying to options[:filter] - for requests, the filter to encode into the message options[:options] - the normal client options hash options[:ttl] - the maximum amount of seconds this message can be valid for options[:expected_msgid] - in the case of replies this is the msgid it is expecting in the replies

[Source]

    # File lib/mcollective/message.rb, line 22
22:     def initialize(payload, message, options = {})
23:       options = {:base64 => false,
24:                  :agent => nil,
25:                  :headers => {},
26:                  :type => :message,
27:                  :request => nil,
28:                  :filter => Util.empty_filter,
29:                  :options => {},
30:                  :ttl => 60,
31:                  :expected_msgid => nil,
32:                  :collective => nil}.merge(options)
33: 
34:       @payload = payload
35:       @message = message
36:       @requestid = nil
37:       @discovered_hosts = nil
38: 
39:       @type = options[:type]
40:       @headers = options[:headers]
41:       @base64 = options[:base64]
42:       @filter = options[:filter]
43:       @expected_msgid = options[:expected_msgid]
44:       @options = options[:options]
45: 
46:       @ttl = @options[:ttl] || Config.instance.ttl
47:       @msgtime = 0
48: 
49:       @validated = false
50: 
51:       if options[:request]
52:         @request = options[:request]
53:         @agent = request.agent
54:         @collective = request.collective
55:         @type = :reply
56:       else
57:         @agent = options[:agent]
58:         @collective = options[:collective]
59:       end
60: 
61:       base64_decode!
62:     end

Public Instance methods

[Source]

     # File lib/mcollective/message.rb, line 111
111:     def base64?
112:       @base64
113:     end

[Source]

     # File lib/mcollective/message.rb, line 97
 97:     def base64_decode!
 98:       return unless @base64
 99: 
100:       @body = SSL.base64_decode(@body)
101:       @base64 = false
102:     end

[Source]

     # File lib/mcollective/message.rb, line 104
104:     def base64_encode!
105:       return if @base64
106: 
107:       @body = SSL.base64_encode(@body)
108:       @base64 = true
109:     end

[Source]

     # File lib/mcollective/message.rb, line 187
187:     def create_reqid
188:       Digest::MD5.hexdigest("#{Config.instance.identity}-#{Time.now.to_f}-#{agent}-#{collective}")
189:     end

[Source]

     # File lib/mcollective/message.rb, line 131
131:     def decode!
132:       raise "Cannot decode message type #{type}" unless [:request, :reply].include?(type)
133: 
134:       @payload = PluginManager["security_plugin"].decodemsg(self)
135: 
136:       if type == :request
137:         raise 'callerid in request is not valid, surpressing reply to potentially forged request' unless PluginManager["security_plugin"].valid_callerid?(payload[:callerid])
138:       end
139: 
140:       [:collective, :agent, :filter, :requestid, :ttl, :msgtime].each do |prop|
141:         instance_variable_set("@#{prop}", payload[prop]) if payload.include?(prop)
142:       end
143:     end

[Source]

     # File lib/mcollective/message.rb, line 115
115:     def encode!
116:       case type
117:         when :reply
118:           raise "Cannot encode a reply message if no request has been associated with it" unless request
119:           raise 'callerid in original request is not valid, surpressing reply to potentially forged request' unless PluginManager["security_plugin"].valid_callerid?(request.payload[:callerid])
120: 
121:           @requestid = request.payload[:requestid]
122:           @payload = PluginManager["security_plugin"].encodereply(agent, payload, requestid, request.payload[:callerid])
123:         when :request, :direct_request
124:           @requestid = create_reqid
125:           @payload = PluginManager["security_plugin"].encoderequest(Config.instance.identity, payload, requestid, filter, agent, collective, ttl)
126:         else
127:           raise "Cannot encode #{type} messages"
128:       end
129:     end

in the case of reply messages we are expecting replies to a previously created message. This stores a hint to that previously sent message id and can be used by other classes like the security plugins as a means of optimizing their behavior like by ignoring messages not directed at us.

[Source]

    # File lib/mcollective/message.rb, line 92
92:     def expected_msgid=(msgid)
93:       raise "Can only store the expected msgid for reply messages" unless @type == :reply
94:       @expected_msgid = msgid
95:     end

publish a reply message by creating a target name and sending it

[Source]

     # File lib/mcollective/message.rb, line 169
169:     def publish
170:       Timeout.timeout(2) do
171:         # If we've been specificaly told about hosts that were discovered
172:         # use that information to do P2P calls if appropriate else just
173:         # send it as is.
174:         if @discovered_hosts && Config.instance.direct_addressing
175:           if @discovered_hosts.size <= Config.instance.direct_addressing_threshold
176:             @type = :direct_request
177:             Log.debug("Handling #{requestid} as a direct request")
178:           end
179: 
180:           PluginManager["connector_plugin"].publish(self)
181:         else
182:           PluginManager["connector_plugin"].publish(self)
183:         end
184:       end
185:     end

Sets the message type to one of the known types. In the case of :direct_request the list of hosts to communicate with should have been set with discovered_hosts else an exception will be raised. This is for extra security, we never accidentally want to send a direct request without a list of hosts or something weird like that as it might result in a filterless broadcast being sent.

Additionally you simply cannot set :direct_request if direct_addressing was not enabled this is to force a workflow that doesnt not yield in a mistake when someone might assume direct_addressing is enabled when its not.

[Source]

    # File lib/mcollective/message.rb, line 73
73:     def type=(type)
74:       if type == :direct_request
75:         raise "Direct requests is not enabled using the direct_addressing config option" unless Config.instance.direct_addressing
76: 
77:         unless @discovered_hosts && !@discovered_hosts.empty?
78:           raise "Can only set type to :direct_request if discovered_hosts have been set"
79:         end
80:       end
81: 
82:       raise "Unknown message type #{type}" unless VALIDTYPES.include?(type)
83: 
84:       @type = type
85:     end

Perform validation against the message by checking filters and ttl

[Source]

     # File lib/mcollective/message.rb, line 146
146:     def validate
147:       raise "Can only validate request messages" unless type == :request
148: 
149:       msg_age = Time.now.utc.to_i - msgtime
150: 
151:       if msg_age > ttl
152:         cid = ""
153:         cid += payload[:callerid] + "@" if payload.include?(:callerid)
154:         cid += payload[:senderid]
155: 
156:         if msg_age > ttl
157:           PluginManager["global_stats"].ttlexpired
158: 
159:           raise(MsgTTLExpired, "Message #{requestid} from #{cid} created at #{msgtime} is #{msg_age} seconds old, TTL is #{ttl}")
160:         end
161:       end
162: 
163:       raise(NotTargettedAtUs, "Received message is not targetted to us") unless PluginManager["security_plugin"].validate_filter?(payload[:filter])
164: 
165:       @validated = true
166:     end

[Validate]