Package proton :: Module reactor
[frames] | no frames]

Source Code for Module proton.reactor

  1  from __future__ import absolute_import 
  2  # 
  3  # Licensed to the Apache Software Foundation (ASF) under one 
  4  # or more contributor license agreements.  See the NOTICE file 
  5  # distributed with this work for additional information 
  6  # regarding copyright ownership.  The ASF licenses this file 
  7  # to you under the Apache License, Version 2.0 (the 
  8  # "License"); you may not use this file except in compliance 
  9  # with the License.  You may obtain a copy of the License at 
 10  # 
 11  #   http://www.apache.org/licenses/LICENSE-2.0 
 12  # 
 13  # Unless required by applicable law or agreed to in writing, 
 14  # software distributed under the License is distributed on an 
 15  # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
 16  # KIND, either express or implied.  See the License for the 
 17  # specific language governing permissions and limitations 
 18  # under the License. 
 19  # 
 20  import logging, os, socket, time, types 
 21  from heapq import heappush, heappop, nsmallest 
 22  from proton import Collector, Connection, ConnectionException, Delivery, Described, dispatch 
 23  from proton import Endpoint, Event, EventBase, EventType, generate_uuid, Handler, Link, Message 
 24  from proton import ProtonException, PN_ACCEPTED, PN_PYREF, SASL, Session, SSL, SSLDomain, SSLUnavailable, symbol 
 25  from proton import Terminus, Timeout, Transport, TransportException, ulong, Url 
 26  from select import select 
 27  from proton.handlers import OutgoingMessageHandler 
 28  from proton import unicode2utf8, utf82unicode 
 29   
 30  import traceback 
 31  from proton import WrappedHandler, _chandler, secs2millis, millis2secs, timeout2millis, millis2timeout, Selectable 
 32  from .wrapper import Wrapper, PYCTX 
 33  from cproton import * 
 34  from . import _compat 
 35   
 36  try: 
 37      import Queue 
 38  except ImportError: 
 39      import queue as Queue 
40 41 -class Task(Wrapper):
42 43 @staticmethod
44 - def wrap(impl):
45 if impl is None: 46 return None 47 else: 48 return Task(impl)
49
50 - def __init__(self, impl):
51 Wrapper.__init__(self, impl, pn_task_attachments)
52
53 - def _init(self):
54 pass
55
56 - def cancel(self):
57 pn_task_cancel(self._impl)
58
59 -class Acceptor(Wrapper):
60
61 - def __init__(self, impl):
62 Wrapper.__init__(self, impl)
63
64 - def set_ssl_domain(self, ssl_domain):
65 pn_acceptor_set_ssl_domain(self._impl, ssl_domain._domain)
66
67 - def close(self):
68 pn_acceptor_close(self._impl)
69
70 -class Reactor(Wrapper):
71 72 @staticmethod
73 - def wrap(impl):
74 if impl is None: 75 return None 76 else: 77 record = pn_reactor_attachments(impl) 78 attrs = pn_void2py(pn_record_get(record, PYCTX)) 79 if attrs and 'subclass' in attrs: 80 return attrs['subclass'](impl=impl) 81 else: 82 return Reactor(impl=impl)
83
84 - def __init__(self, *handlers, **kwargs):
85 Wrapper.__init__(self, kwargs.get("impl", pn_reactor), pn_reactor_attachments) 86 for h in handlers: 87 self.handler.add(h)
88
89 - def _init(self):
90 self.errors = []
91
92 - def on_error(self, info):
93 self.errors.append(info) 94 self.yield_()
95
96 - def _get_global(self):
97 return WrappedHandler.wrap(pn_reactor_get_global_handler(self._impl), self.on_error)
98
99 - def _set_global(self, handler):
100 impl = _chandler(handler, self.on_error) 101 pn_reactor_set_global_handler(self._impl, impl) 102 pn_decref(impl)
103 104 global_handler = property(_get_global, _set_global) 105
106 - def _get_timeout(self):
107 return millis2timeout(pn_reactor_get_timeout(self._impl))
108
109 - def _set_timeout(self, secs):
110 return pn_reactor_set_timeout(self._impl, timeout2millis(secs))
111 112 timeout = property(_get_timeout, _set_timeout) 113
114 - def yield_(self):
115 pn_reactor_yield(self._impl)
116
117 - def mark(self):
118 return pn_reactor_mark(self._impl)
119
120 - def _get_handler(self):
121 return WrappedHandler.wrap(pn_reactor_get_handler(self._impl), self.on_error)
122
123 - def _set_handler(self, handler):
124 impl = _chandler(handler, self.on_error) 125 pn_reactor_set_handler(self._impl, impl) 126 pn_decref(impl)
127 128 handler = property(_get_handler, _set_handler) 129
130 - def run(self):
131 self.timeout = 3.14159265359 132 self.start() 133 while self.process(): pass 134 self.stop() 135 self.process() 136 self.global_handler = None 137 self.handler = None
138
139 - def wakeup(self):
140 n = pn_reactor_wakeup(self._impl) 141 if n: raise IOError(pn_error_text(pn_reactor_error(self._impl)))
142
143 - def start(self):
144 pn_reactor_start(self._impl)
145 146 @property
147 - def quiesced(self):
148 return pn_reactor_quiesced(self._impl)
149
150 - def _check_errors(self):
151 if self.errors: 152 for exc, value, tb in self.errors[:-1]: 153 traceback.print_exception(exc, value, tb) 154 exc, value, tb = self.errors[-1] 155 _compat.raise_(exc, value, tb)
156
157 - def process(self):
158 result = pn_reactor_process(self._impl) 159 self._check_errors() 160 return result
161
162 - def stop(self):
163 pn_reactor_stop(self._impl) 164 self._check_errors()
165
166 - def schedule(self, delay, task):
167 impl = _chandler(task, self.on_error) 168 task = Task.wrap(pn_reactor_schedule(self._impl, secs2millis(delay), impl)) 169 pn_decref(impl) 170 return task
171
172 - def acceptor(self, host, port, handler=None):
173 impl = _chandler(handler, self.on_error) 174 aimpl = pn_reactor_acceptor(self._impl, unicode2utf8(host), str(port), impl) 175 pn_decref(impl) 176 if aimpl: 177 return Acceptor(aimpl) 178 else: 179 raise IOError("%s (%s:%s)" % pn_error_text(pn_reactor_error(self._impl)), host, port)
180
181 - def connection(self, handler=None):
182 """Deprecated: use connection_to_host() instead 183 """ 184 impl = _chandler(handler, self.on_error) 185 result = Connection.wrap(pn_reactor_connection(self._impl, impl)) 186 if impl: pn_decref(impl) 187 return result
188
189 - def connection_to_host(self, host, port, handler=None):
190 """Create an outgoing Connection that will be managed by the reactor. 191 The reator's pn_iohandler will create a socket connection to the host 192 once the connection is opened. 193 """ 194 conn = self.connection(handler) 195 self.set_connection_host(conn, host, port) 196 return conn
197
198 - def set_connection_host(self, connection, host, port):
199 """Change the address used by the connection. The address is 200 used by the reactor's iohandler to create an outgoing socket 201 connection. This must be set prior to opening the connection. 202 """ 203 pn_reactor_set_connection_host(self._impl, 204 connection._impl, 205 unicode2utf8(str(host)), 206 unicode2utf8(str(port)))
207
208 - def get_connection_address(self, connection):
209 """This may be used to retrieve the remote peer address. 210 @return: string containing the address in URL format or None if no 211 address is available. Use the proton.Url class to create a Url object 212 from the returned value. 213 """ 214 _url = pn_reactor_get_connection_address(self._impl, connection._impl) 215 return utf82unicode(_url)
216
217 - def selectable(self, handler=None):
218 impl = _chandler(handler, self.on_error) 219 result = Selectable.wrap(pn_reactor_selectable(self._impl)) 220 if impl: 221 record = pn_selectable_attachments(result._impl) 222 pn_record_set_handler(record, impl) 223 pn_decref(impl) 224 return result
225
226 - def update(self, sel):
227 pn_reactor_update(self._impl, sel._impl)
228
229 - def push_event(self, obj, etype):
230 pn_collector_put(pn_reactor_collector(self._impl), PN_PYREF, pn_py2void(obj), etype.number)
231 232 from proton import wrappers as _wrappers 233 _wrappers["pn_reactor"] = lambda x: Reactor.wrap(pn_cast_pn_reactor(x)) 234 _wrappers["pn_task"] = lambda x: Task.wrap(pn_cast_pn_task(x))
235 236 237 -class EventInjector(object):
238 """ 239 Can be added to a reactor to allow events to be triggered by an 240 external thread but handled on the event thread associated with 241 the reactor. An instance of this class can be passed to the 242 Reactor.selectable() method of the reactor in order to activate 243 it. The close() method should be called when it is no longer 244 needed, to allow the event loop to end if needed. 245 """
246 - def __init__(self):
247 self.queue = Queue.Queue() 248 self.pipe = os.pipe() 249 self._closed = False
250
251 - def trigger(self, event):
252 """ 253 Request that the given event be dispatched on the event thread 254 of the reactor to which this EventInjector was added. 255 """ 256 self.queue.put(event) 257 os.write(self.pipe[1], _compat.str2bin("!"))
258
259 - def close(self):
260 """ 261 Request that this EventInjector be closed. Existing events 262 will be dispctahed on the reactors event dispactch thread, 263 then this will be removed from the set of interest. 264 """ 265 self._closed = True 266 os.write(self.pipe[1], _compat.str2bin("!"))
267
268 - def fileno(self):
269 return self.pipe[0]
270
271 - def on_selectable_init(self, event):
272 sel = event.context 273 sel.fileno(self.fileno()) 274 sel.reading = True 275 event.reactor.update(sel)
276
277 - def on_selectable_readable(self, event):
278 os.read(self.pipe[0], 512) 279 while not self.queue.empty(): 280 requested = self.queue.get() 281 event.reactor.push_event(requested.context, requested.type) 282 if self._closed: 283 s = event.context 284 s.terminate() 285 event.reactor.update(s)
286
287 288 -class ApplicationEvent(EventBase):
289 """ 290 Application defined event, which can optionally be associated with 291 an engine object and or an arbitrary subject 292 """
293 - def __init__(self, typename, connection=None, session=None, link=None, delivery=None, subject=None):
294 super(ApplicationEvent, self).__init__(PN_PYREF, self, EventType(typename)) 295 self.connection = connection 296 self.session = session 297 self.link = link 298 self.delivery = delivery 299 if self.delivery: 300 self.link = self.delivery.link 301 if self.link: 302 self.session = self.link.session 303 if self.session: 304 self.connection = self.session.connection 305 self.subject = subject
306
307 - def __repr__(self):
308 objects = [self.connection, self.session, self.link, self.delivery, self.subject] 309 return "%s(%s)" % (self.type, ", ".join([str(o) for o in objects if o is not None]))
310
311 -class Transaction(object):
312 """ 313 Class to track state of an AMQP 1.0 transaction. 314 """
315 - def __init__(self, txn_ctrl, handler, settle_before_discharge=False):
316 self.txn_ctrl = txn_ctrl 317 self.handler = handler 318 self.id = None 319 self._declare = None 320 self._discharge = None 321 self.failed = False 322 self._pending = [] 323 self.settle_before_discharge = settle_before_discharge 324 self.declare()
325
326 - def commit(self):
327 self.discharge(False)
328
329 - def abort(self):
330 self.discharge(True)
331
332 - def declare(self):
333 self._declare = self._send_ctrl(symbol(u'amqp:declare:list'), [None])
334
335 - def discharge(self, failed):
336 self.failed = failed 337 self._discharge = self._send_ctrl(symbol(u'amqp:discharge:list'), [self.id, failed])
338
339 - def _send_ctrl(self, descriptor, value):
340 delivery = self.txn_ctrl.send(Message(body=Described(descriptor, value))) 341 delivery.transaction = self 342 return delivery
343
344 - def send(self, sender, msg, tag=None):
345 dlv = sender.send(msg, tag=tag) 346 dlv.local.data = [self.id] 347 dlv.update(0x34) 348 return dlv
349
350 - def accept(self, delivery):
351 self.update(delivery, PN_ACCEPTED) 352 if self.settle_before_discharge: 353 delivery.settle() 354 else: 355 self._pending.append(delivery)
356
357 - def update(self, delivery, state=None):
358 if state: 359 delivery.local.data = [self.id, Described(ulong(state), [])] 360 delivery.update(0x34)
361
362 - def _release_pending(self):
363 for d in self._pending: 364 d.update(Delivery.RELEASED) 365 d.settle() 366 self._clear_pending()
367
368 - def _clear_pending(self):
369 self._pending = []
370
371 - def handle_outcome(self, event):
372 if event.delivery == self._declare: 373 if event.delivery.remote.data: 374 self.id = event.delivery.remote.data[0] 375 self.handler.on_transaction_declared(event) 376 elif event.delivery.remote_state == Delivery.REJECTED: 377 self.handler.on_transaction_declare_failed(event) 378 else: 379 logging.warning("Unexpected outcome for declare: %s" % event.delivery.remote_state) 380 self.handler.on_transaction_declare_failed(event) 381 elif event.delivery == self._discharge: 382 if event.delivery.remote_state == Delivery.REJECTED: 383 if not self.failed: 384 self.handler.on_transaction_commit_failed(event) 385 self._release_pending() # make this optional? 386 else: 387 if self.failed: 388 self.handler.on_transaction_aborted(event) 389 self._release_pending() 390 else: 391 self.handler.on_transaction_committed(event) 392 self._clear_pending()
393
394 -class LinkOption(object):
395 """ 396 Abstract interface for link configuration options 397 """
398 - def apply(self, link):
399 """ 400 Subclasses will implement any configuration logic in this 401 method 402 """ 403 pass
404 - def test(self, link):
405 """ 406 Subclasses can override this to selectively apply an option 407 e.g. based on some link criteria 408 """ 409 return True
410
411 -class AtMostOnce(LinkOption):
412 - def apply(self, link):
414
415 -class AtLeastOnce(LinkOption):
416 - def apply(self, link):
419
420 -class SenderOption(LinkOption):
421 - def apply(self, sender): pass
422 - def test(self, link): return link.is_sender
423
424 -class ReceiverOption(LinkOption):
425 - def apply(self, receiver): pass
426 - def test(self, link): return link.is_receiver
427
428 -class DynamicNodeProperties(LinkOption):
429 - def __init__(self, props={}):
430 self.properties = {} 431 for k in props: 432 if isinstance(k, symbol): 433 self.properties[k] = props[k] 434 else: 435 self.properties[symbol(k)] = props[k]
436
437 - def apply(self, link):
442
443 -class Filter(ReceiverOption):
444 - def __init__(self, filter_set={}):
445 self.filter_set = filter_set
446
447 - def apply(self, receiver):
448 receiver.source.filter.put_dict(self.filter_set)
449
450 -class Selector(Filter):
451 """ 452 Configures a link with a message selector filter 453 """
454 - def __init__(self, value, name='selector'):
455 super(Selector, self).__init__({symbol(name): Described(symbol('apache.org:selector-filter:string'), value)})
456
457 -class DurableSubscription(ReceiverOption):
458 - def apply(self, receiver):
461
462 -class Move(ReceiverOption):
463 - def apply(self, receiver):
465
466 -class Copy(ReceiverOption):
467 - def apply(self, receiver):
469 477
478 -def _create_session(connection, handler=None):
479 session = connection.session() 480 session.open() 481 return session
482
483 484 -def _get_attr(target, name):
485 if hasattr(target, name): 486 return getattr(target, name) 487 else: 488 return None
489
490 -class SessionPerConnection(object):
491 - def __init__(self):
492 self._default_session = None
493
494 - def session(self, connection):
495 if not self._default_session: 496 self._default_session = _create_session(connection) 497 return self._default_session
498
499 -class GlobalOverrides(object):
500 """ 501 Internal handler that triggers the necessary socket connect for an 502 opened connection. 503 """
504 - def __init__(self, base):
505 self.base = base
506
507 - def on_unhandled(self, name, event):
508 if not self._override(event): 509 event.dispatch(self.base)
510
511 - def _override(self, event):
512 conn = event.connection 513 return conn and hasattr(conn, '_overrides') and event.dispatch(conn._overrides)
514
515 -class Connector(Handler):
516 """ 517 Internal handler that triggers the necessary socket connect for an 518 opened connection. 519 """
520 - def __init__(self, connection):
521 self.connection = connection 522 self.address = None 523 self.heartbeat = None 524 self.reconnect = None 525 self.ssl_domain = None 526 self.allow_insecure_mechs = True 527 self.allowed_mechs = None 528 self.sasl_enabled = True 529 self.user = None 530 self.password = None 531 self.virtual_host = None 532 self.ssl_sni = None
533
534 - def _connect(self, connection, reactor):
535 assert(reactor is not None) 536 url = self.address.next() 537 reactor.set_connection_host(connection, url.host, str(url.port)) 538 # if virtual-host not set, use host from address as default 539 if self.virtual_host is None: 540 connection.hostname = url.host 541 logging.debug("connecting to %s..." % url) 542 543 transport = Transport() 544 if self.sasl_enabled: 545 sasl = transport.sasl() 546 sasl.allow_insecure_mechs = self.allow_insecure_mechs 547 if url.username: 548 connection.user = url.username 549 elif self.user: 550 connection.user = self.user 551 if url.password: 552 connection.password = url.password 553 elif self.password: 554 connection.password = self.password 555 if self.allowed_mechs: 556 sasl.allowed_mechs(self.allowed_mechs) 557 transport.bind(connection) 558 if self.heartbeat: 559 transport.idle_timeout = self.heartbeat 560 if url.scheme == 'amqps': 561 if not self.ssl_domain: 562 raise SSLUnavailable("amqps: SSL libraries not found") 563 self.ssl = SSL(transport, self.ssl_domain) 564 self.ssl.peer_hostname = self.ssl_sni or self.virtual_host or url.host
565
566 - def on_connection_local_open(self, event):
567 self._connect(event.connection, event.reactor)
568
569 - def on_connection_remote_open(self, event):
570 logging.debug("connected to %s" % event.connection.hostname) 571 if self.reconnect: 572 self.reconnect.reset() 573 self.transport = None
574
575 - def on_transport_tail_closed(self, event):
576 self.on_transport_closed(event)
577
578 - def on_transport_closed(self, event):
579 if self.connection and self.connection.state & Endpoint.LOCAL_ACTIVE: 580 if self.reconnect: 581 event.transport.unbind() 582 delay = self.reconnect.next() 583 if delay == 0: 584 logging.info("Disconnected, reconnecting...") 585 self._connect(self.connection, event.reactor) 586 else: 587 logging.info("Disconnected will try to reconnect after %s seconds" % delay) 588 event.reactor.schedule(delay, self) 589 else: 590 logging.debug("Disconnected") 591 self.connection = None
592
593 - def on_timer_task(self, event):
594 self._connect(self.connection, event.reactor)
595
596 - def on_connection_remote_close(self, event):
597 self.connection = None
598
599 -class Backoff(object):
600 """ 601 A reconnect strategy involving an increasing delay between 602 retries, up to a maximum or 10 seconds. 603 """
604 - def __init__(self):
605 self.delay = 0
606
607 - def reset(self):
608 self.delay = 0
609
610 - def next(self):
611 current = self.delay 612 if current == 0: 613 self.delay = 0.1 614 else: 615 self.delay = min(10, 2*current) 616 return current
617
618 -class Urls(object):
619 - def __init__(self, values):
620 self.values = [Url(v) for v in values] 621 self.i = iter(self.values)
622
623 - def __iter__(self):
624 return self
625
626 - def next(self):
627 try: 628 return next(self.i) 629 except StopIteration: 630 self.i = iter(self.values) 631 return next(self.i)
632
633 -class SSLConfig(object):
634 - def __init__(self):
635 self.client = SSLDomain(SSLDomain.MODE_CLIENT) 636 self.server = SSLDomain(SSLDomain.MODE_SERVER)
637
638 - def set_credentials(self, cert_file, key_file, password):
639 self.client.set_credentials(cert_file, key_file, password) 640 self.server.set_credentials(cert_file, key_file, password)
641
642 - def set_trusted_ca_db(self, certificate_db):
643 self.client.set_trusted_ca_db(certificate_db) 644 self.server.set_trusted_ca_db(certificate_db)
645
646 647 -class Container(Reactor):
648 """A representation of the AMQP concept of a 'container', which 649 lossely speaking is something that establishes links to or from 650 another container, over which messages are transfered. This is 651 an extension to the Reactor class that adds convenience methods 652 for creating connections and sender- or receiver- links. 653 """
654 - def __init__(self, *handlers, **kwargs):
655 super(Container, self).__init__(*handlers, **kwargs) 656 if "impl" not in kwargs: 657 try: 658 self.ssl = SSLConfig() 659 except SSLUnavailable: 660 self.ssl = None 661 self.global_handler = GlobalOverrides(kwargs.get('global_handler', self.global_handler)) 662 self.trigger = None 663 self.container_id = str(generate_uuid()) 664 self.allow_insecure_mechs = True 665 self.allowed_mechs = None 666 self.sasl_enabled = True 667 self.user = None 668 self.password = None 669 Wrapper.__setattr__(self, 'subclass', self.__class__)
670
671 - def connect(self, url=None, urls=None, address=None, handler=None, reconnect=None, heartbeat=None, ssl_domain=None, **kwargs):
672 """ 673 Initiates the establishment of an AMQP connection. Returns an 674 instance of proton.Connection. 675 676 @param url: URL string of process to connect to 677 678 @param urls: list of URL strings of process to try to connect to 679 680 Only one of url or urls should be specified. 681 682 @param reconnect: A value of False will prevent the library 683 form automatically trying to reconnect if the underlying 684 socket is disconnected before the connection has been closed. 685 686 @param heartbeat: A value in milliseconds indicating the 687 desired frequency of heartbeats used to test the underlying 688 socket is alive. 689 690 @param ssl_domain: SSL configuration in the form of an 691 instance of proton.SSLdomain. 692 693 @param handler: a connection scoped handler that will be 694 called to process any events in the scope of this connection 695 or its child links 696 697 @param kwargs: sasl_enabled, which determines whether a sasl layer is 698 used for the connection; allowed_mechs an optional list of SASL 699 mechanisms to allow if sasl is enabled; allow_insecure_mechs a flag 700 indicating whether insecure mechanisms, such as PLAIN over a 701 non-encrypted socket, are allowed; 'virtual_host' the hostname to set 702 in the Open performative used by peer to determine the correct 703 back-end service for the client. If 'virtual_host' is not supplied the 704 host field from the URL is used instead." 705 706 """ 707 conn = self.connection(handler) 708 conn.container = self.container_id or str(generate_uuid()) 709 conn.offered_capabilities = kwargs.get('offered_capabilities') 710 conn.desired_capabilities = kwargs.get('desired_capabilities') 711 conn.properties = kwargs.get('properties') 712 713 connector = Connector(conn) 714 connector.allow_insecure_mechs = kwargs.get('allow_insecure_mechs', self.allow_insecure_mechs) 715 connector.allowed_mechs = kwargs.get('allowed_mechs', self.allowed_mechs) 716 connector.sasl_enabled = kwargs.get('sasl_enabled', self.sasl_enabled) 717 connector.user = kwargs.get('user', self.user) 718 connector.password = kwargs.get('password', self.password) 719 connector.virtual_host = kwargs.get('virtual_host') 720 if connector.virtual_host: 721 # only set hostname if virtual-host is a non-empty string 722 conn.hostname = connector.virtual_host 723 connector.ssl_sni = kwargs.get('sni') 724 725 conn._overrides = connector 726 if url: connector.address = Urls([url]) 727 elif urls: connector.address = Urls(urls) 728 elif address: connector.address = address 729 else: raise ValueError("One of url, urls or address required") 730 if heartbeat: 731 connector.heartbeat = heartbeat 732 if reconnect: 733 connector.reconnect = reconnect 734 elif reconnect is None: 735 connector.reconnect = Backoff() 736 # use container's default client domain if none specified. This is 737 # only necessary of the URL specifies the "amqps:" scheme 738 connector.ssl_domain = ssl_domain or (self.ssl and self.ssl.client) 739 conn._session_policy = SessionPerConnection() #todo: make configurable 740 conn.open() 741 return conn
742
743 - def _get_id(self, container, remote, local):
744 if local and remote: "%s-%s-%s" % (container, remote, local) 745 elif local: return "%s-%s" % (container, local) 746 elif remote: return "%s-%s" % (container, remote) 747 else: return "%s-%s" % (container, str(generate_uuid()))
748
749 - def _get_session(self, context):
750 if isinstance(context, Url): 751 return self._get_session(self.connect(url=context)) 752 elif isinstance(context, Session): 753 return context 754 elif isinstance(context, Connection): 755 if hasattr(context, '_session_policy'): 756 return context._session_policy.session(context) 757 else: 758 return _create_session(context) 759 else: 760 return context.session()
761
762 - def create_sender(self, context, target=None, source=None, name=None, handler=None, tags=None, options=None):
763 """ 764 Initiates the establishment of a link over which messages can 765 be sent. Returns an instance of proton.Sender. 766 767 There are two patterns of use. (1) A connection can be passed 768 as the first argument, in which case the link is established 769 on that connection. In this case the target address can be 770 specified as the second argument (or as a keyword 771 argument). The source address can also be specified if 772 desired. (2) Alternatively a URL can be passed as the first 773 argument. In this case a new connection will be establised on 774 which the link will be attached. If a path is specified and 775 the target is not, then the path of the URL is used as the 776 target address. 777 778 The name of the link may be specified if desired, otherwise a 779 unique name will be generated. 780 781 Various LinkOptions can be specified to further control the 782 attachment. 783 """ 784 if isinstance(context, _compat.STRING_TYPES): 785 context = Url(context) 786 if isinstance(context, Url) and not target: 787 target = context.path 788 session = self._get_session(context) 789 snd = session.sender(name or self._get_id(session.connection.container, target, source)) 790 if source: 791 snd.source.address = source 792 if target: 793 snd.target.address = target 794 if handler != None: 795 snd.handler = handler 796 if tags: 797 snd.tag_generator = tags 798 _apply_link_options(options, snd) 799 snd.open() 800 return snd
801
802 - def create_receiver(self, context, source=None, target=None, name=None, dynamic=False, handler=None, options=None):
803 """ 804 Initiates the establishment of a link over which messages can 805 be received (aka a subscription). Returns an instance of 806 proton.Receiver. 807 808 There are two patterns of use. (1) A connection can be passed 809 as the first argument, in which case the link is established 810 on that connection. In this case the source address can be 811 specified as the second argument (or as a keyword 812 argument). The target address can also be specified if 813 desired. (2) Alternatively a URL can be passed as the first 814 argument. In this case a new connection will be establised on 815 which the link will be attached. If a path is specified and 816 the source is not, then the path of the URL is used as the 817 target address. 818 819 The name of the link may be specified if desired, otherwise a 820 unique name will be generated. 821 822 Various LinkOptions can be specified to further control the 823 attachment. 824 """ 825 if isinstance(context, _compat.STRING_TYPES): 826 context = Url(context) 827 if isinstance(context, Url) and not source: 828 source = context.path 829 session = self._get_session(context) 830 rcv = session.receiver(name or self._get_id(session.connection.container, source, target)) 831 if source: 832 rcv.source.address = source 833 if dynamic: 834 rcv.source.dynamic = True 835 if target: 836 rcv.target.address = target 837 if handler != None: 838 rcv.handler = handler 839 _apply_link_options(options, rcv) 840 rcv.open() 841 return rcv
842
843 - def declare_transaction(self, context, handler=None, settle_before_discharge=False):
844 if not _get_attr(context, '_txn_ctrl'): 845 class InternalTransactionHandler(OutgoingMessageHandler): 846 def __init__(self): 847 super(InternalTransactionHandler, self).__init__(auto_settle=True)
848 849 def on_settled(self, event): 850 if hasattr(event.delivery, "transaction"): 851 event.transaction = event.delivery.transaction 852 event.delivery.transaction.handle_outcome(event)
853 context._txn_ctrl = self.create_sender(context, None, name='txn-ctrl', handler=InternalTransactionHandler()) 854 context._txn_ctrl.target.type = Terminus.COORDINATOR 855 context._txn_ctrl.target.capabilities.put_object(symbol(u'amqp:local-transactions')) 856 return Transaction(context._txn_ctrl, handler, settle_before_discharge) 857
858 - def listen(self, url, ssl_domain=None):
859 """ 860 Initiates a server socket, accepting incoming AMQP connections 861 on the interface and port specified. 862 """ 863 url = Url(url) 864 acceptor = self.acceptor(url.host, url.port) 865 ssl_config = ssl_domain 866 if not ssl_config and url.scheme == 'amqps': 867 # use container's default server domain 868 if self.ssl: 869 ssl_config = self.ssl.server 870 else: 871 raise SSLUnavailable("amqps: SSL libraries not found") 872 if ssl_config: 873 acceptor.set_ssl_domain(ssl_config) 874 return acceptor
875
876 - def do_work(self, timeout=None):
877 if timeout: 878 self.timeout = timeout 879 return self.process()
880