1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 from __future__ import absolute_import
21
22 from cproton import PN_DEFAULT_PRIORITY, PN_OVERFLOW, pn_error_text, pn_message, \
23 pn_message_annotations, pn_message_body, pn_message_clear, pn_message_correlation_id, pn_message_decode, \
24 pn_message_encode, pn_message_error, pn_message_free, pn_message_get_address, pn_message_get_content_encoding, \
25 pn_message_get_content_type, pn_message_get_creation_time, pn_message_get_delivery_count, \
26 pn_message_get_expiry_time, pn_message_get_group_id, pn_message_get_group_sequence, pn_message_get_priority, \
27 pn_message_get_reply_to, pn_message_get_reply_to_group_id, pn_message_get_subject, pn_message_get_ttl, \
28 pn_message_get_user_id, pn_message_id, pn_message_instructions, pn_message_is_durable, pn_message_is_first_acquirer, \
29 pn_message_is_inferred, pn_message_properties, pn_message_set_address, pn_message_set_content_encoding, \
30 pn_message_set_content_type, pn_message_set_creation_time, pn_message_set_delivery_count, pn_message_set_durable, \
31 pn_message_set_expiry_time, pn_message_set_first_acquirer, pn_message_set_group_id, pn_message_set_group_sequence, \
32 pn_message_set_inferred, pn_message_set_priority, pn_message_set_reply_to, pn_message_set_reply_to_group_id, \
33 pn_message_set_subject, pn_message_set_ttl, pn_message_set_user_id
34
35 from . import _compat
36 from ._common import isinteger, millis2secs, secs2millis, unicode2utf8, utf82unicode
37 from ._data import Data, symbol, ulong
38 from ._endpoints import Link
39 from ._exceptions import EXCEPTIONS, MessageException
40
41
42
43 try:
44 unicode()
45 except NameError:
46 unicode = str
47
48
50 """The L{Message} class is a mutable holder of message content.
51
52 @ivar instructions: delivery instructions for the message
53 @type instructions: dict
54 @ivar annotations: infrastructure defined message annotations
55 @type annotations: dict
56 @ivar properties: application defined message properties
57 @type properties: dict
58 @ivar body: message body
59 @type body: bytes | unicode | dict | list | int | long | float | UUID
60 """
61
62 DEFAULT_PRIORITY = PN_DEFAULT_PRIORITY
63
64 - def __init__(self, body=None, **kwargs):
65 """
66 @param kwargs: Message property name/value pairs to initialise the Message
67 """
68 self._msg = pn_message()
69 self._id = Data(pn_message_id(self._msg))
70 self._correlation_id = Data(pn_message_correlation_id(self._msg))
71 self.instructions = None
72 self.annotations = None
73 self.properties = None
74 self.body = body
75 for k, v in _compat.iteritems(kwargs):
76 getattr(self, k)
77 setattr(self, k, v)
78
80 if hasattr(self, "_msg"):
81 pn_message_free(self._msg)
82 del self._msg
83
85 if err < 0:
86 exc = EXCEPTIONS.get(err, MessageException)
87 raise exc("[%s]: %s" % (err, pn_error_text(pn_message_error(self._msg))))
88 else:
89 return err
90
102
122
123 - def _post_decode(self):
124 inst = Data(pn_message_instructions(self._msg))
125 ann = Data(pn_message_annotations(self._msg))
126 props = Data(pn_message_properties(self._msg))
127 body = Data(pn_message_body(self._msg))
128
129 if inst.next():
130 self.instructions = inst.get_object()
131 else:
132 self.instructions = None
133 if ann.next():
134 self.annotations = ann.get_object()
135 else:
136 self.annotations = None
137 if props.next():
138 self.properties = props.get_object()
139 else:
140 self.properties = None
141 if body.next():
142 self.body = body.get_object()
143 else:
144 self.body = None
145
147 """
148 Clears the contents of the L{Message}. All fields will be reset to
149 their default values.
150 """
151 pn_message_clear(self._msg)
152 self.instructions = None
153 self.annotations = None
154 self.properties = None
155 self.body = None
156
158 return pn_message_is_inferred(self._msg)
159
161 self._check(pn_message_set_inferred(self._msg, bool(value)))
162
163 inferred = property(_is_inferred, _set_inferred, doc="""
164 The inferred flag for a message indicates how the message content
165 is encoded into AMQP sections. If inferred is true then binary and
166 list values in the body of the message will be encoded as AMQP DATA
167 and AMQP SEQUENCE sections, respectively. If inferred is false,
168 then all values in the body of the message will be encoded as AMQP
169 VALUE sections regardless of their type.
170 """)
171
173 return pn_message_is_durable(self._msg)
174
176 self._check(pn_message_set_durable(self._msg, bool(value)))
177
178 durable = property(_is_durable, _set_durable,
179 doc="""
180 The durable property indicates that the message should be held durably
181 by any intermediaries taking responsibility for the message.
182 """)
183
185 return pn_message_get_priority(self._msg)
186
188 self._check(pn_message_set_priority(self._msg, value))
189
190 priority = property(_get_priority, _set_priority,
191 doc="""
192 The priority of the message.
193 """)
194
197
199 self._check(pn_message_set_ttl(self._msg, secs2millis(value)))
200
201 ttl = property(_get_ttl, _set_ttl,
202 doc="""
203 The time to live of the message measured in seconds. Expired messages
204 may be dropped.
205 """)
206
208 return pn_message_is_first_acquirer(self._msg)
209
211 self._check(pn_message_set_first_acquirer(self._msg, bool(value)))
212
213 first_acquirer = property(_is_first_acquirer, _set_first_acquirer,
214 doc="""
215 True iff the recipient is the first to acquire the message.
216 """)
217
219 return pn_message_get_delivery_count(self._msg)
220
222 self._check(pn_message_set_delivery_count(self._msg, value))
223
224 delivery_count = property(_get_delivery_count, _set_delivery_count,
225 doc="""
226 The number of delivery attempts made for this message.
227 """)
228
231
237
238 id = property(_get_id, _set_id,
239 doc="""
240 The id of the message.
241 """)
242
244 return pn_message_get_user_id(self._msg)
245
247 self._check(pn_message_set_user_id(self._msg, value))
248
249 user_id = property(_get_user_id, _set_user_id,
250 doc="""
251 The user id of the message creator.
252 """)
253
256
258 self._check(pn_message_set_address(self._msg, unicode2utf8(value)))
259
260 address = property(_get_address, _set_address,
261 doc="""
262 The address of the message.
263 """)
264
267
269 self._check(pn_message_set_subject(self._msg, unicode2utf8(value)))
270
271 subject = property(_get_subject, _set_subject,
272 doc="""
273 The subject of the message.
274 """)
275
278
280 self._check(pn_message_set_reply_to(self._msg, unicode2utf8(value)))
281
282 reply_to = property(_get_reply_to, _set_reply_to,
283 doc="""
284 The reply-to address for the message.
285 """)
286
289
295
296 correlation_id = property(_get_correlation_id, _set_correlation_id,
297 doc="""
298 The correlation-id for the message.
299 """)
300
302 return symbol(utf82unicode(pn_message_get_content_type(self._msg)))
303
304 - def _set_content_type(self, value):
305 self._check(pn_message_set_content_type(self._msg, unicode2utf8(value)))
306
307 content_type = property(_get_content_type, _set_content_type,
308 doc="""
309 The content-type of the message.
310 """)
311
313 return symbol(utf82unicode(pn_message_get_content_encoding(self._msg)))
314
315 - def _set_content_encoding(self, value):
316 self._check(pn_message_set_content_encoding(self._msg, unicode2utf8(value)))
317
318 content_encoding = property(_get_content_encoding, _set_content_encoding,
319 doc="""
320 The content-encoding of the message.
321 """)
322
324 return millis2secs(pn_message_get_expiry_time(self._msg))
325
327 self._check(pn_message_set_expiry_time(self._msg, secs2millis(value)))
328
329 expiry_time = property(_get_expiry_time, _set_expiry_time,
330 doc="""
331 The expiry time of the message.
332 """)
333
335 return millis2secs(pn_message_get_creation_time(self._msg))
336
338 self._check(pn_message_set_creation_time(self._msg, secs2millis(value)))
339
340 creation_time = property(_get_creation_time, _set_creation_time,
341 doc="""
342 The creation time of the message.
343 """)
344
347
349 self._check(pn_message_set_group_id(self._msg, unicode2utf8(value)))
350
351 group_id = property(_get_group_id, _set_group_id,
352 doc="""
353 The group id of the message.
354 """)
355
357 return pn_message_get_group_sequence(self._msg)
358
360 self._check(pn_message_set_group_sequence(self._msg, value))
361
362 group_sequence = property(_get_group_sequence, _set_group_sequence,
363 doc="""
364 The sequence of the message within its group.
365 """)
366
368 return utf82unicode(pn_message_get_reply_to_group_id(self._msg))
369
371 self._check(pn_message_set_reply_to_group_id(self._msg, unicode2utf8(value)))
372
373 reply_to_group_id = property(_get_reply_to_group_id, _set_reply_to_group_id,
374 doc="""
375 The group-id for any replies.
376 """)
377
379 self._pre_encode()
380 sz = 16
381 while True:
382 err, data = pn_message_encode(self._msg, sz)
383 if err == PN_OVERFLOW:
384 sz *= 2
385 continue
386 else:
387 self._check(err)
388 return data
389
391 self._check(pn_message_decode(self._msg, data))
392 self._post_decode()
393
394 - def send(self, sender, tag=None):
402
403 - def recv(self, link):
404 """
405 Receives and decodes the message content for the current delivery
406 from the link. Upon success it will return the current delivery
407 for the link. If there is no current delivery, or if the current
408 delivery is incomplete, or if the link is not a receiver, it will
409 return None.
410
411 @type link: Link
412 @param link: the link to receive a message from
413 @return the delivery associated with the decoded message (or None)
414
415 """
416 if link.is_sender: return None
417 dlv = link.current
418 if not dlv or dlv.partial: return None
419 dlv.encoded = link.recv(dlv.pending)
420 link.advance()
421
422
423 if link.remote_snd_settle_mode == Link.SND_SETTLED:
424 dlv.settle()
425 self.decode(dlv.encoded)
426 return dlv
427
429 props = []
430 for attr in ("inferred", "address", "reply_to", "durable", "ttl",
431 "priority", "first_acquirer", "delivery_count", "id",
432 "correlation_id", "user_id", "group_id", "group_sequence",
433 "reply_to_group_id", "instructions", "annotations",
434 "properties", "body"):
435 value = getattr(self, attr)
436 if value: props.append("%s=%r" % (attr, value))
437 return "Message(%s)" % ", ".join(props)
438