Package proton :: Module reactor
[frames] | no frames]

Source Code for Module proton.reactor

  1  from __future__ import absolute_import 
  2  # 
  3  # Licensed to the Apache Software Foundation (ASF) under one 
  4  # or more contributor license agreements.  See the NOTICE file 
  5  # distributed with this work for additional information 
  6  # regarding copyright ownership.  The ASF licenses this file 
  7  # to you under the Apache License, Version 2.0 (the 
  8  # "License"); you may not use this file except in compliance 
  9  # with the License.  You may obtain a copy of the License at 
 10  # 
 11  #   http://www.apache.org/licenses/LICENSE-2.0 
 12  # 
 13  # Unless required by applicable law or agreed to in writing, 
 14  # software distributed under the License is distributed on an 
 15  # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
 16  # KIND, either express or implied.  See the License for the 
 17  # specific language governing permissions and limitations 
 18  # under the License. 
 19  # 
 20  import logging, os, socket, time, types 
 21  from heapq import heappush, heappop, nsmallest 
 22  from proton import Collector, Connection, ConnectionException, Delivery, Described, dispatch 
 23  from proton import Endpoint, Event, EventBase, EventType, generate_uuid, Handler, Link, Message 
 24  from proton import ProtonException, PN_ACCEPTED, PN_PYREF, SASL, Session, SSL, SSLDomain, SSLUnavailable, symbol 
 25  from proton import Terminus, Timeout, Transport, TransportException, ulong, Url 
 26  from select import select 
 27  from proton.handlers import OutgoingMessageHandler 
 28  from proton import unicode2utf8, utf82unicode 
 29   
 30  import traceback 
 31  from proton import WrappedHandler, _chandler, secs2millis, millis2secs, timeout2millis, millis2timeout, Selectable 
 32  from .wrapper import Wrapper, PYCTX 
 33  from cproton import * 
 34  from . import _compat 
 35   
 36  try: 
 37      import Queue 
 38  except ImportError: 
 39      import queue as Queue 
40 41 -class Task(Wrapper):
42 43 @staticmethod
44 - def wrap(impl):
45 if impl is None: 46 return None 47 else: 48 return Task(impl)
49
50 - def __init__(self, impl):
51 Wrapper.__init__(self, impl, pn_task_attachments)
52
53 - def _init(self):
54 pass
55
56 - def cancel(self):
57 pn_task_cancel(self._impl)
58
59 -class Acceptor(Wrapper):
60
61 - def __init__(self, impl):
62 Wrapper.__init__(self, impl)
63
64 - def set_ssl_domain(self, ssl_domain):
65 pn_acceptor_set_ssl_domain(self._impl, ssl_domain._domain)
66
67 - def close(self):
68 pn_acceptor_close(self._impl)
69
70 -class Reactor(Wrapper):
71 72 @staticmethod
73 - def wrap(impl):
74 if impl is None: 75 return None 76 else: 77 record = pn_reactor_attachments(impl) 78 attrs = pn_void2py(pn_record_get(record, PYCTX)) 79 if attrs and 'subclass' in attrs: 80 return attrs['subclass'](impl=impl) 81 else: 82 return Reactor(impl=impl)
83
84 - def __init__(self, *handlers, **kwargs):
85 Wrapper.__init__(self, kwargs.get("impl", pn_reactor), pn_reactor_attachments) 86 for h in handlers: 87 self.handler.add(h)
88
89 - def _init(self):
90 self.errors = []
91
92 - def on_error(self, info):
93 self.errors.append(info) 94 self.yield_()
95
96 - def _get_global(self):
97 return WrappedHandler.wrap(pn_reactor_get_global_handler(self._impl), self.on_error)
98
99 - def _set_global(self, handler):
100 impl = _chandler(handler, self.on_error) 101 pn_reactor_set_global_handler(self._impl, impl) 102 pn_decref(impl)
103 104 global_handler = property(_get_global, _set_global) 105
106 - def _get_timeout(self):
107 return millis2timeout(pn_reactor_get_timeout(self._impl))
108
109 - def _set_timeout(self, secs):
110 return pn_reactor_set_timeout(self._impl, timeout2millis(secs))
111 112 timeout = property(_get_timeout, _set_timeout) 113
114 - def yield_(self):
115 pn_reactor_yield(self._impl)
116
117 - def mark(self):
118 return pn_reactor_mark(self._impl)
119
120 - def _get_handler(self):
121 return WrappedHandler.wrap(pn_reactor_get_handler(self._impl), self.on_error)
122
123 - def _set_handler(self, handler):
124 impl = _chandler(handler, self.on_error) 125 pn_reactor_set_handler(self._impl, impl) 126 pn_decref(impl)
127 128 handler = property(_get_handler, _set_handler) 129
130 - def run(self):
131 self.timeout = 3.14159265359 132 self.start() 133 while self.process(): pass 134 self.stop()
135
136 - def wakeup(self):
137 n = pn_reactor_wakeup(self._impl) 138 if n: raise IOError(pn_error_text(pn_io_error(pn_reactor_io(self._impl))))
139
140 - def start(self):
141 pn_reactor_start(self._impl)
142 143 @property
144 - def quiesced(self):
145 return pn_reactor_quiesced(self._impl)
146
147 - def _check_errors(self):
148 if self.errors: 149 for exc, value, tb in self.errors[:-1]: 150 traceback.print_exception(exc, value, tb) 151 exc, value, tb = self.errors[-1] 152 _compat.raise_(exc, value, tb)
153
154 - def process(self):
155 result = pn_reactor_process(self._impl) 156 self._check_errors() 157 return result
158
159 - def stop(self):
160 pn_reactor_stop(self._impl) 161 self._check_errors() 162 self.global_handler = None 163 self.handler = None
164
165 - def schedule(self, delay, task):
166 impl = _chandler(task, self.on_error) 167 task = Task.wrap(pn_reactor_schedule(self._impl, secs2millis(delay), impl)) 168 pn_decref(impl) 169 return task
170
171 - def acceptor(self, host, port, handler=None):
172 impl = _chandler(handler, self.on_error) 173 aimpl = pn_reactor_acceptor(self._impl, unicode2utf8(host), str(port), impl) 174 pn_decref(impl) 175 if aimpl: 176 return Acceptor(aimpl) 177 else: 178 raise IOError("%s (%s:%s)" % (pn_error_text(pn_io_error(pn_reactor_io(self._impl))), host, port))
179
180 - def connection(self, handler=None):
181 impl = _chandler(handler, self.on_error) 182 result = Connection.wrap(pn_reactor_connection(self._impl, impl)) 183 pn_decref(impl) 184 return result
185
186 - def selectable(self, handler=None):
187 impl = _chandler(handler, self.on_error) 188 result = Selectable.wrap(pn_reactor_selectable(self._impl)) 189 if impl: 190 record = pn_selectable_attachments(result._impl) 191 pn_record_set_handler(record, impl) 192 pn_decref(impl) 193 return result
194
195 - def update(self, sel):
196 pn_reactor_update(self._impl, sel._impl)
197
198 - def push_event(self, obj, etype):
199 pn_collector_put(pn_reactor_collector(self._impl), PN_PYREF, pn_py2void(obj), etype.number)
200 201 from proton import wrappers as _wrappers 202 _wrappers["pn_reactor"] = lambda x: Reactor.wrap(pn_cast_pn_reactor(x)) 203 _wrappers["pn_task"] = lambda x: Task.wrap(pn_cast_pn_task(x))
204 205 206 -class EventInjector(object):
207 """ 208 Can be added to a reactor to allow events to be triggered by an 209 external thread but handled on the event thread associated with 210 the reactor. An instance of this class can be passed to the 211 Reactor.selectable() method of the reactor in order to activate 212 it. The close() method should be called when it is no longer 213 needed, to allow the event loop to end if needed. 214 """
215 - def __init__(self):
216 self.queue = Queue.Queue() 217 self.pipe = os.pipe() 218 self._closed = False
219
220 - def trigger(self, event):
221 """ 222 Request that the given event be dispatched on the event thread 223 of the reactor to which this EventInjector was added. 224 """ 225 self.queue.put(event) 226 os.write(self.pipe[1], _compat.str2bin("!"))
227
228 - def close(self):
229 """ 230 Request that this EventInjector be closed. Existing events 231 will be dispctahed on the reactors event dispactch thread, 232 then this will be removed from the set of interest. 233 """ 234 self._closed = True 235 os.write(self.pipe[1], _compat.str2bin("!"))
236
237 - def fileno(self):
238 return self.pipe[0]
239
240 - def on_selectable_init(self, event):
241 sel = event.context 242 sel.fileno(self.fileno()) 243 sel.reading = True 244 event.reactor.update(sel)
245
246 - def on_selectable_readable(self, event):
247 os.read(self.pipe[0], 512) 248 while not self.queue.empty(): 249 requested = self.queue.get() 250 event.reactor.push_event(requested.context, requested.type) 251 if self._closed: 252 s = event.context 253 s.terminate() 254 event.reactor.update(s)
255
256 257 -class ApplicationEvent(EventBase):
258 """ 259 Application defined event, which can optionally be associated with 260 an engine object and or an arbitrary subject 261 """
262 - def __init__(self, typename, connection=None, session=None, link=None, delivery=None, subject=None):
263 super(ApplicationEvent, self).__init__(PN_PYREF, self, EventType(typename)) 264 self.connection = connection 265 self.session = session 266 self.link = link 267 self.delivery = delivery 268 if self.delivery: 269 self.link = self.delivery.link 270 if self.link: 271 self.session = self.link.session 272 if self.session: 273 self.connection = self.session.connection 274 self.subject = subject
275
276 - def __repr__(self):
277 objects = [self.connection, self.session, self.link, self.delivery, self.subject] 278 return "%s(%s)" % (self.type, ", ".join([str(o) for o in objects if o is not None]))
279
280 -class Transaction(object):
281 """ 282 Class to track state of an AMQP 1.0 transaction. 283 """
284 - def __init__(self, txn_ctrl, handler, settle_before_discharge=False):
285 self.txn_ctrl = txn_ctrl 286 self.handler = handler 287 self.id = None 288 self._declare = None 289 self._discharge = None 290 self.failed = False 291 self._pending = [] 292 self.settle_before_discharge = settle_before_discharge 293 self.declare()
294
295 - def commit(self):
296 self.discharge(False)
297
298 - def abort(self):
299 self.discharge(True)
300
301 - def declare(self):
302 self._declare = self._send_ctrl(symbol(u'amqp:declare:list'), [None])
303
304 - def discharge(self, failed):
305 self.failed = failed 306 self._discharge = self._send_ctrl(symbol(u'amqp:discharge:list'), [self.id, failed])
307
308 - def _send_ctrl(self, descriptor, value):
309 delivery = self.txn_ctrl.send(Message(body=Described(descriptor, value))) 310 delivery.transaction = self 311 return delivery
312
313 - def send(self, sender, msg, tag=None):
314 dlv = sender.send(msg, tag=tag) 315 dlv.local.data = [self.id] 316 dlv.update(0x34) 317 return dlv
318
319 - def accept(self, delivery):
320 self.update(delivery, PN_ACCEPTED) 321 if self.settle_before_discharge: 322 delivery.settle() 323 else: 324 self._pending.append(delivery)
325
326 - def update(self, delivery, state=None):
327 if state: 328 delivery.local.data = [self.id, Described(ulong(state), [])] 329 delivery.update(0x34)
330
331 - def _release_pending(self):
332 for d in self._pending: 333 d.update(Delivery.RELEASED) 334 d.settle() 335 self._clear_pending()
336
337 - def _clear_pending(self):
338 self._pending = []
339
340 - def handle_outcome(self, event):
341 if event.delivery == self._declare: 342 if event.delivery.remote.data: 343 self.id = event.delivery.remote.data[0] 344 self.handler.on_transaction_declared(event) 345 elif event.delivery.remote_state == Delivery.REJECTED: 346 self.handler.on_transaction_declare_failed(event) 347 else: 348 logging.warning("Unexpected outcome for declare: %s" % event.delivery.remote_state) 349 self.handler.on_transaction_declare_failed(event) 350 elif event.delivery == self._discharge: 351 if event.delivery.remote_state == Delivery.REJECTED: 352 if not self.failed: 353 self.handler.on_transaction_commit_failed(event) 354 self._release_pending() # make this optional? 355 else: 356 if self.failed: 357 self.handler.on_transaction_aborted(event) 358 self._release_pending() 359 else: 360 self.handler.on_transaction_committed(event) 361 self._clear_pending()
362
363 -class LinkOption(object):
364 """ 365 Abstract interface for link configuration options 366 """
367 - def apply(self, link):
368 """ 369 Subclasses will implement any configuration logic in this 370 method 371 """ 372 pass
373 - def test(self, link):
374 """ 375 Subclasses can override this to selectively apply an option 376 e.g. based on some link criteria 377 """ 378 return True
379
380 -class AtMostOnce(LinkOption):
381 - def apply(self, link):
382 link.snd_settle_mode = Link.SND_SETTLED
383
384 -class AtLeastOnce(LinkOption):
385 - def apply(self, link):
386 link.snd_settle_mode = Link.SND_UNSETTLED 387 link.rcv_settle_mode = Link.RCV_FIRST
388
389 -class SenderOption(LinkOption):
390 - def apply(self, sender): pass
391 - def test(self, link): return link.is_sender
392
393 -class ReceiverOption(LinkOption):
394 - def apply(self, receiver): pass
395 - def test(self, link): return link.is_receiver
396
397 -class DynamicNodeProperties(LinkOption):
398 - def __init__(self, props={}):
399 self.properties = {} 400 for k in props: 401 if isinstance(k, symbol): 402 self.properties[k] = props[k] 403 else: 404 self.properties[symbol(k)] = props[k]
405
406 - def apply(self, link):
407 if link.is_receiver: 408 link.source.properties.put_dict(self.properties) 409 else: 410 link.target.properties.put_dict(self.properties)
411
412 -class Filter(ReceiverOption):
413 - def __init__(self, filter_set={}):
414 self.filter_set = filter_set
415
416 - def apply(self, receiver):
417 receiver.source.filter.put_dict(self.filter_set)
418
419 -class Selector(Filter):
420 """ 421 Configures a link with a message selector filter 422 """
423 - def __init__(self, value, name='selector'):
424 super(Selector, self).__init__({symbol(name): Described(symbol('apache.org:selector-filter:string'), value)})
425
426 -class DurableSubscription(ReceiverOption):
427 - def apply(self, receiver):
428 receiver.source.durability = Terminus.DELIVERIES 429 receiver.source.expiry_policy = Terminus.EXPIRE_NEVER
430
431 -class Move(ReceiverOption):
432 - def apply(self, receiver):
433 receiver.source.distribution_mode = Terminus.DIST_MODE_MOVE
434
435 -class Copy(ReceiverOption):
436 - def apply(self, receiver):
437 receiver.source.distribution_mode = Terminus.DIST_MODE_COPY
438 446
447 -def _create_session(connection, handler=None):
448 session = connection.session() 449 session.open() 450 return session
451
452 453 -def _get_attr(target, name):
454 if hasattr(target, name): 455 return getattr(target, name) 456 else: 457 return None
458
459 -class SessionPerConnection(object):
460 - def __init__(self):
461 self._default_session = None
462
463 - def session(self, connection):
464 if not self._default_session: 465 self._default_session = _create_session(connection) 466 self._default_session.context = self 467 return self._default_session
468
469 - def on_session_remote_close(self, event):
470 event.connection.close() 471 self._default_session = None
472
473 -class GlobalOverrides(object):
474 """ 475 Internal handler that triggers the necessary socket connect for an 476 opened connection. 477 """
478 - def __init__(self, base):
479 self.base = base
480
481 - def on_unhandled(self, name, event):
482 if not self._override(event): 483 event.dispatch(self.base)
484
485 - def _override(self, event):
486 conn = event.connection 487 return conn and hasattr(conn, '_overrides') and event.dispatch(conn._overrides)
488
489 -class Connector(Handler):
490 """ 491 Internal handler that triggers the necessary socket connect for an 492 opened connection. 493 """
494 - def __init__(self, connection):
495 self.connection = connection 496 self.address = None 497 self.heartbeat = None 498 self.reconnect = None 499 self.ssl_domain = None 500 self.allow_insecure_mechs = True 501 self.allowed_mechs = None 502 self.sasl_enabled = True 503 self.user = None 504 self.password = None
505
506 - def _connect(self, connection):
507 url = self.address.next() 508 # IoHandler uses the hostname to determine where to try to connect to 509 connection.hostname = "%s:%s" % (url.host, url.port) 510 logging.info("connecting to %s..." % connection.hostname) 511 512 transport = Transport() 513 if self.sasl_enabled: 514 sasl = transport.sasl() 515 sasl.allow_insecure_mechs = self.allow_insecure_mechs 516 if url.username: 517 connection.user = url.username 518 elif self.user: 519 connection.user = self.user 520 if url.password: 521 connection.password = url.password 522 elif self.password: 523 connection.password = self.password 524 if self.allowed_mechs: 525 sasl.allowed_mechs(self.allowed_mechs) 526 transport.bind(connection) 527 if self.heartbeat: 528 transport.idle_timeout = self.heartbeat 529 if url.scheme == 'amqps': 530 if not self.ssl_domain: 531 raise SSLUnavailable("amqps: SSL libraries not found") 532 self.ssl = SSL(transport, self.ssl_domain) 533 self.ssl.peer_hostname = url.host
534
535 - def on_connection_local_open(self, event):
536 self._connect(event.connection)
537
538 - def on_connection_remote_open(self, event):
539 logging.info("connected to %s" % event.connection.hostname) 540 if self.reconnect: 541 self.reconnect.reset() 542 self.transport = None
543
544 - def on_transport_tail_closed(self, event):
545 self.on_transport_closed(event)
546
547 - def on_transport_closed(self, event):
548 if self.connection and self.connection.state & Endpoint.LOCAL_ACTIVE: 549 if self.reconnect: 550 event.transport.unbind() 551 delay = self.reconnect.next() 552 if delay == 0: 553 logging.info("Disconnected, reconnecting...") 554 self._connect(self.connection) 555 else: 556 logging.info("Disconnected will try to reconnect after %s seconds" % delay) 557 event.reactor.schedule(delay, self) 558 else: 559 logging.info("Disconnected") 560 self.connection = None
561
562 - def on_timer_task(self, event):
563 self._connect(self.connection)
564
565 - def on_connection_remote_close(self, event):
566 self.connection = None
567
568 -class Backoff(object):
569 """ 570 A reconnect strategy involving an increasing delay between 571 retries, up to a maximum or 10 seconds. 572 """
573 - def __init__(self):
574 self.delay = 0
575
576 - def reset(self):
577 self.delay = 0
578
579 - def next(self):
580 current = self.delay 581 if current == 0: 582 self.delay = 0.1 583 else: 584 self.delay = min(10, 2*current) 585 return current
586
587 -class Urls(object):
588 - def __init__(self, values):
589 self.values = [Url(v) for v in values] 590 self.i = iter(self.values)
591
592 - def __iter__(self):
593 return self
594
595 - def next(self):
596 try: 597 return next(self.i) 598 except StopIteration: 599 self.i = iter(self.values) 600 return next(self.i)
601
602 -class SSLConfig(object):
603 - def __init__(self):
604 self.client = SSLDomain(SSLDomain.MODE_CLIENT) 605 self.server = SSLDomain(SSLDomain.MODE_SERVER)
606
607 - def set_credentials(self, cert_file, key_file, password):
608 self.client.set_credentials(cert_file, key_file, password) 609 self.server.set_credentials(cert_file, key_file, password)
610
611 - def set_trusted_ca_db(self, certificate_db):
612 self.client.set_trusted_ca_db(certificate_db) 613 self.server.set_trusted_ca_db(certificate_db)
614
615 616 -class Container(Reactor):
617 """A representation of the AMQP concept of a 'container', which 618 lossely speaking is something that establishes links to or from 619 another container, over which messages are transfered. This is 620 an extension to the Reactor class that adds convenience methods 621 for creating connections and sender- or receiver- links. 622 """
623 - def __init__(self, *handlers, **kwargs):
624 super(Container, self).__init__(*handlers, **kwargs) 625 if "impl" not in kwargs: 626 try: 627 self.ssl = SSLConfig() 628 except SSLUnavailable: 629 self.ssl = None 630 self.global_handler = GlobalOverrides(kwargs.get('global_handler', self.global_handler)) 631 self.trigger = None 632 self.container_id = str(generate_uuid()) 633 self.allow_insecure_mechs = True 634 self.allowed_mechs = None 635 self.sasl_enabled = True 636 self.user = None 637 self.password = None 638 Wrapper.__setattr__(self, 'subclass', self.__class__)
639
640 - def connect(self, url=None, urls=None, address=None, handler=None, reconnect=None, heartbeat=None, ssl_domain=None, **kwargs):
641 """ 642 Initiates the establishment of an AMQP connection. Returns an 643 instance of proton.Connection. 644 645 @param url: URL string of process to connect to 646 647 @param urls: list of URL strings of process to try to connect to 648 649 Only one of url or urls should be specified. 650 651 @param reconnect: A value of False will prevent the library 652 form automatically trying to reconnect if the underlying 653 socket is disconnected before the connection has been closed. 654 655 @param heartbeat: A value in milliseconds indicating the 656 desired frequency of heartbeats used to test the underlying 657 socket is alive. 658 659 @param ssl_domain: SSL configuration in the form of an 660 instance of proton.SSLdomain. 661 662 @param handler: a connection scoped handler that will be 663 called to process any events in the scope of this connection 664 or its child links 665 666 @param kwargs: sasl_enabled, which determines whether a sasl 667 layer is used for the connection; allowed_mechs an optional 668 list of SASL mechanisms to allow if sasl is enabled; 669 allow_insecure_mechs a flag indicating whether insecure 670 mechanisms, such as PLAIN over a non-encrypted socket, are 671 allowed. These options can also be set at container scope. 672 673 """ 674 conn = self.connection(handler) 675 conn.container = self.container_id or str(generate_uuid()) 676 677 connector = Connector(conn) 678 connector.allow_insecure_mechs = kwargs.get('allow_insecure_mechs', self.allow_insecure_mechs) 679 connector.allowed_mechs = kwargs.get('allowed_mechs', self.allowed_mechs) 680 connector.sasl_enabled = kwargs.get('sasl_enabled', self.sasl_enabled) 681 connector.user = kwargs.get('user', self.user) 682 connector.password = kwargs.get('password', self.password) 683 conn._overrides = connector 684 if url: connector.address = Urls([url]) 685 elif urls: connector.address = Urls(urls) 686 elif address: connector.address = address 687 else: raise ValueError("One of url, urls or address required") 688 if heartbeat: 689 connector.heartbeat = heartbeat 690 if reconnect: 691 connector.reconnect = reconnect 692 elif reconnect is None: 693 connector.reconnect = Backoff() 694 # use container's default client domain if none specified. This is 695 # only necessary of the URL specifies the "amqps:" scheme 696 connector.ssl_domain = ssl_domain or (self.ssl and self.ssl.client) 697 conn._session_policy = SessionPerConnection() #todo: make configurable 698 conn.open() 699 return conn
700
701 - def _get_id(self, container, remote, local):
702 if local and remote: "%s-%s-%s" % (container, remote, local) 703 elif local: return "%s-%s" % (container, local) 704 elif remote: return "%s-%s" % (container, remote) 705 else: return "%s-%s" % (container, str(generate_uuid()))
706
707 - def _get_session(self, context):
708 if isinstance(context, Url): 709 return self._get_session(self.connect(url=context)) 710 elif isinstance(context, Session): 711 return context 712 elif isinstance(context, Connection): 713 if hasattr(context, '_session_policy'): 714 return context._session_policy.session(context) 715 else: 716 return _create_session(context) 717 else: 718 return context.session()
719
720 - def create_sender(self, context, target=None, source=None, name=None, handler=None, tags=None, options=None):
721 """ 722 Initiates the establishment of a link over which messages can 723 be sent. Returns an instance of proton.Sender. 724 725 There are two patterns of use. (1) A connection can be passed 726 as the first argument, in which case the link is established 727 on that connection. In this case the target address can be 728 specified as the second argument (or as a keyword 729 argument). The source address can also be specified if 730 desired. (2) Alternatively a URL can be passed as the first 731 argument. In this case a new connection will be establised on 732 which the link will be attached. If a path is specified and 733 the target is not, then the path of the URL is used as the 734 target address. 735 736 The name of the link may be specified if desired, otherwise a 737 unique name will be generated. 738 739 Various LinkOptions can be specified to further control the 740 attachment. 741 """ 742 if isinstance(context, _compat.STRING_TYPES): 743 context = Url(context) 744 if isinstance(context, Url) and not target: 745 target = context.path 746 session = self._get_session(context) 747 snd = session.sender(name or self._get_id(session.connection.container, target, source)) 748 if source: 749 snd.source.address = source 750 if target: 751 snd.target.address = target 752 if handler != None: 753 snd.handler = handler 754 if tags: 755 snd.tag_generator = tags 756 _apply_link_options(options, snd) 757 snd.open() 758 return snd
759
760 - def create_receiver(self, context, source=None, target=None, name=None, dynamic=False, handler=None, options=None):
761 """ 762 Initiates the establishment of a link over which messages can 763 be received (aka a subscription). Returns an instance of 764 proton.Receiver. 765 766 There are two patterns of use. (1) A connection can be passed 767 as the first argument, in which case the link is established 768 on that connection. In this case the source address can be 769 specified as the second argument (or as a keyword 770 argument). The target address can also be specified if 771 desired. (2) Alternatively a URL can be passed as the first 772 argument. In this case a new connection will be establised on 773 which the link will be attached. If a path is specified and 774 the source is not, then the path of the URL is used as the 775 target address. 776 777 The name of the link may be specified if desired, otherwise a 778 unique name will be generated. 779 780 Various LinkOptions can be specified to further control the 781 attachment. 782 """ 783 if isinstance(context, _compat.STRING_TYPES): 784 context = Url(context) 785 if isinstance(context, Url) and not source: 786 source = context.path 787 session = self._get_session(context) 788 rcv = session.receiver(name or self._get_id(session.connection.container, source, target)) 789 if source: 790 rcv.source.address = source 791 if dynamic: 792 rcv.source.dynamic = True 793 if target: 794 rcv.target.address = target 795 if handler != None: 796 rcv.handler = handler 797 _apply_link_options(options, rcv) 798 rcv.open() 799 return rcv
800
801 - def declare_transaction(self, context, handler=None, settle_before_discharge=False):
802 if not _get_attr(context, '_txn_ctrl'): 803 class InternalTransactionHandler(OutgoingMessageHandler): 804 def __init__(self): 805 super(InternalTransactionHandler, self).__init__(auto_settle=True)
806 807 def on_settled(self, event): 808 if hasattr(event.delivery, "transaction"): 809 event.transaction = event.delivery.transaction 810 event.delivery.transaction.handle_outcome(event)
811 context._txn_ctrl = self.create_sender(context, None, name='txn-ctrl', handler=InternalTransactionHandler()) 812 context._txn_ctrl.target.type = Terminus.COORDINATOR 813 context._txn_ctrl.target.capabilities.put_object(symbol(u'amqp:local-transactions')) 814 return Transaction(context._txn_ctrl, handler, settle_before_discharge) 815
816 - def listen(self, url, ssl_domain=None):
817 """ 818 Initiates a server socket, accepting incoming AMQP connections 819 on the interface and port specified. 820 """ 821 url = Url(url) 822 acceptor = self.acceptor(url.host, url.port) 823 ssl_config = ssl_domain 824 if not ssl_config and url.scheme == 'amqps': 825 # use container's default server domain 826 if self.ssl: 827 ssl_config = self.ssl.server 828 else: 829 raise SSLUnavailable("amqps: SSL libraries not found") 830 if ssl_config: 831 acceptor.set_ssl_domain(ssl_config) 832 return acceptor
833
834 - def do_work(self, timeout=None):
835 if timeout: 836 self.timeout = timeout 837 return self.process()
838