Package proton :: Module handlers
[frames] | no frames]

Source Code for Module proton.handlers

  1  # 
  2  # Licensed to the Apache Software Foundation (ASF) under one 
  3  # or more contributor license agreements.  See the NOTICE file 
  4  # distributed with this work for additional information 
  5  # regarding copyright ownership.  The ASF licenses this file 
  6  # to you under the Apache License, Version 2.0 (the 
  7  # "License"); you may not use this file except in compliance 
  8  # with the License.  You may obtain a copy of the License at 
  9  # 
 10  #   http://www.apache.org/licenses/LICENSE-2.0 
 11  # 
 12  # Unless required by applicable law or agreed to in writing, 
 13  # software distributed under the License is distributed on an 
 14  # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
 15  # KIND, either express or implied.  See the License for the 
 16  # specific language governing permissions and limitations 
 17  # under the License. 
 18  # 
 19  import heapq, logging, os, re, socket, time, types 
 20   
 21  from proton import dispatch, generate_uuid, PN_ACCEPTED, SASL, symbol, ulong, Url 
 22  from proton import Collector, Connection, Delivery, Described, Endpoint, Event, Link, Terminus, Timeout 
 23  from proton import Message, Handler, ProtonException, Transport, TransportException, ConnectionException 
 24  from select import select 
25 26 27 -class OutgoingMessageHandler(Handler):
28 """ 29 A utility for simpler and more intuitive handling of delivery 30 events related to outgoing i.e. sent messages. 31 """
32 - def __init__(self, auto_settle=True, delegate=None):
33 self.auto_settle = auto_settle 34 self.delegate = delegate
35 41
42 - def on_delivery(self, event):
43 dlv = event.delivery 44 if dlv.link.is_sender and dlv.updated: 45 if dlv.remote_state == Delivery.ACCEPTED: 46 self.on_accepted(event) 47 elif dlv.remote_state == Delivery.REJECTED: 48 self.on_rejected(event) 49 elif dlv.remote_state == Delivery.RELEASED or dlv.remote_state == Delivery.MODIFIED: 50 self.on_released(event) 51 if dlv.settled: 52 self.on_settled(event) 53 if self.auto_settle: 54 dlv.settle()
55
56 - def on_sendable(self, event):
57 """ 58 Called when the sender link has credit and messages can 59 therefore be transferred. 60 """ 61 if self.delegate != None: 62 dispatch(self.delegate, 'on_sendable', event)
63
64 - def on_accepted(self, event):
65 """ 66 Called when the remote peer accepts an outgoing message. 67 """ 68 if self.delegate != None: 69 dispatch(self.delegate, 'on_accepted', event)
70
71 - def on_rejected(self, event):
72 """ 73 Called when the remote peer rejects an outgoing message. 74 """ 75 if self.delegate != None: 76 dispatch(self.delegate, 'on_rejected', event)
77
78 - def on_released(self, event):
79 """ 80 Called when the remote peer releases an outgoing message. Note 81 that this may be in response to either the RELEASE or MODIFIED 82 state as defined by the AMQP specification. 83 """ 84 if self.delegate != None: 85 dispatch(self.delegate, 'on_released', event)
86
87 - def on_settled(self, event):
88 """ 89 Called when the remote peer has settled the outgoing 90 message. This is the point at which it shouod never be 91 retransmitted. 92 """ 93 if self.delegate != None: 94 dispatch(self.delegate, 'on_settled', event)
95
96 -def recv_msg(delivery):
97 msg = Message() 98 msg.decode(delivery.link.recv(delivery.pending)) 99 delivery.link.advance() 100 return msg
101
102 -class Reject(ProtonException):
103 """ 104 An exception that indicate a message should be rejected 105 """ 106 pass
107
108 -class Release(ProtonException):
109 """ 110 An exception that indicate a message should be rejected 111 """ 112 pass
113
114 -class Acking(object):
115 - def accept(self, delivery):
116 """ 117 Accepts a received message. 118 """ 119 self.settle(delivery, Delivery.ACCEPTED)
120
121 - def reject(self, delivery):
122 """ 123 Rejects a received message that is considered invalid or 124 unprocessable. 125 """ 126 self.settle(delivery, Delivery.REJECTED)
127
128 - def release(self, delivery, delivered=True):
129 """ 130 Releases a received message, making it available at the source 131 for any (other) interested receiver. The ``delivered`` 132 parameter indicates whether this should be considered a 133 delivery attempt (and the delivery count updated) or not. 134 """ 135 if delivered: 136 self.settle(delivery, Delivery.MODIFIED) 137 else: 138 self.settle(delivery, Delivery.RELEASED)
139
140 - def settle(self, delivery, state=None):
144
145 -class IncomingMessageHandler(Handler, Acking):
146 """ 147 A utility for simpler and more intuitive handling of delivery 148 events related to incoming i.e. received messages. 149 """ 150
151 - def __init__(self, auto_accept=True, delegate=None):
152 self.delegate = delegate 153 self.auto_accept = auto_accept
154
155 - def on_delivery(self, event):
156 dlv = event.delivery 157 if not dlv.link.is_receiver: return 158 if dlv.readable and not dlv.partial: 159 event.message = recv_msg(dlv) 160 if event.link.state & Endpoint.LOCAL_CLOSED: 161 if self.auto_accept: 162 dlv.update(Delivery.RELEASED) 163 dlv.settle() 164 else: 165 try: 166 self.on_message(event) 167 if self.auto_accept: 168 dlv.update(Delivery.ACCEPTED) 169 dlv.settle() 170 except Reject: 171 dlv.update(Delivery.REJECTED) 172 dlv.settle() 173 except Release: 174 dlv.update(Delivery.MODIFIED) 175 dlv.settle() 176 elif dlv.updated and dlv.settled: 177 self.on_settled(event)
178
179 - def on_message(self, event):
180 """ 181 Called when a message is received. The message itself can be 182 obtained as a property on the event. For the purpose of 183 refering to this message in further actions (e.g. if 184 explicitly accepting it, the ``delivery`` should be used, also 185 obtainable via a property on the event. 186 """ 187 if self.delegate != None: 188 dispatch(self.delegate, 'on_message', event)
189
190 - def on_settled(self, event):
191 if self.delegate != None: 192 dispatch(self.delegate, 'on_settled', event)
193
194 -class EndpointStateHandler(Handler):
195 """ 196 A utility that exposes 'endpoint' events i.e. the open/close for 197 links, sessions and connections in a more intuitive manner. A 198 XXX_opened method will be called when both local and remote peers 199 have opened the link, session or connection. This can be used to 200 confirm a locally initiated action for example. A XXX_opening 201 method will be called when the remote peer has requested an open 202 that was not initiated locally. By default this will simply open 203 locally, which then triggers the XXX_opened call. The same applies 204 to close. 205 """ 206
207 - def __init__(self, peer_close_is_error=False, delegate=None):
208 self.delegate = delegate 209 self.peer_close_is_error = peer_close_is_error
210 211 @classmethod
212 - def is_local_open(cls, endpoint):
213 return endpoint.state & Endpoint.LOCAL_ACTIVE
214 215 @classmethod
216 - def is_local_uninitialised(cls, endpoint):
217 return endpoint.state & Endpoint.LOCAL_UNINIT
218 219 @classmethod
220 - def is_local_closed(cls, endpoint):
221 return endpoint.state & Endpoint.LOCAL_CLOSED
222 223 @classmethod
224 - def is_remote_open(cls, endpoint):
225 return endpoint.state & Endpoint.REMOTE_ACTIVE
226 227 @classmethod
228 - def is_remote_closed(cls, endpoint):
229 return endpoint.state & Endpoint.REMOTE_CLOSED
230 231 @classmethod
232 - def print_error(cls, endpoint, endpoint_type):
233 if endpoint.remote_condition: 234 logging.error(endpoint.remote_condition.description) 235 elif cls.is_local_open(endpoint) and cls.is_remote_closed(endpoint): 236 logging.error("%s closed by peer" % endpoint_type)
237 246
247 - def on_session_remote_close(self, event):
248 if event.session.remote_condition: 249 self.on_session_error(event) 250 elif self.is_local_closed(event.session): 251 self.on_session_closed(event) 252 else: 253 self.on_session_closing(event) 254 event.session.close()
255
256 - def on_connection_remote_close(self, event):
257 if event.connection.remote_condition: 258 self.on_connection_error(event) 259 elif self.is_local_closed(event.connection): 260 self.on_connection_closed(event) 261 else: 262 self.on_connection_closing(event) 263 event.connection.close()
264
265 - def on_connection_local_open(self, event):
266 if self.is_remote_open(event.connection): 267 self.on_connection_opened(event)
268
269 - def on_connection_remote_open(self, event):
270 if self.is_local_open(event.connection): 271 self.on_connection_opened(event) 272 elif self.is_local_uninitialised(event.connection): 273 self.on_connection_opening(event) 274 event.connection.open()
275
276 - def on_session_local_open(self, event):
277 if self.is_remote_open(event.session): 278 self.on_session_opened(event)
279
280 - def on_session_remote_open(self, event):
281 if self.is_local_open(event.session): 282 self.on_session_opened(event) 283 elif self.is_local_uninitialised(event.session): 284 self.on_session_opening(event) 285 event.session.open()
286 290 297
298 - def on_connection_opened(self, event):
299 if self.delegate != None: 300 dispatch(self.delegate, 'on_connection_opened', event)
301
302 - def on_session_opened(self, event):
303 if self.delegate != None: 304 dispatch(self.delegate, 'on_session_opened', event)
305 309
310 - def on_connection_opening(self, event):
311 if self.delegate != None: 312 dispatch(self.delegate, 'on_connection_opening', event)
313
314 - def on_session_opening(self, event):
315 if self.delegate != None: 316 dispatch(self.delegate, 'on_session_opening', event)
317 321
322 - def on_connection_error(self, event):
323 if self.delegate != None: 324 dispatch(self.delegate, 'on_connection_error', event) 325 else: 326 self.log_error(event.connection, "connection")
327
328 - def on_session_error(self, event):
329 if self.delegate != None: 330 dispatch(self.delegate, 'on_session_error', event) 331 else: 332 self.log_error(event.session, "session") 333 event.connection.close()
334 341
342 - def on_connection_closed(self, event):
343 if self.delegate != None: 344 dispatch(self.delegate, 'on_connection_closed', event)
345
346 - def on_session_closed(self, event):
347 if self.delegate != None: 348 dispatch(self.delegate, 'on_session_closed', event)
349 353
354 - def on_connection_closing(self, event):
355 if self.delegate != None: 356 dispatch(self.delegate, 'on_connection_closing', event) 357 elif self.peer_close_is_error: 358 self.on_connection_error(event)
359
360 - def on_session_closing(self, event):
361 if self.delegate != None: 362 dispatch(self.delegate, 'on_session_closing', event) 363 elif self.peer_close_is_error: 364 self.on_session_error(event)
365 371
372 - def on_transport_tail_closed(self, event):
373 self.on_transport_closed(event)
374
375 - def on_transport_closed(self, event):
376 if self.delegate != None and event.connection and self.is_local_open(event.connection): 377 dispatch(self.delegate, 'on_disconnected', event)
378
379 -class MessagingHandler(Handler, Acking):
380 """ 381 A general purpose handler that makes the proton-c events somewhat 382 simpler to deal with and/or avoids repetitive tasks for common use 383 cases. 384 """
385 - def __init__(self, prefetch=10, auto_accept=True, auto_settle=True, peer_close_is_error=False):
386 self.handlers = [] 387 if prefetch: 388 self.handlers.append(CFlowController(prefetch)) 389 self.handlers.append(EndpointStateHandler(peer_close_is_error, self)) 390 self.handlers.append(IncomingMessageHandler(auto_accept, self)) 391 self.handlers.append(OutgoingMessageHandler(auto_settle, self)) 392 self.fatal_conditions = ["amqp:unauthorized-access"]
393
394 - def on_transport_error(self, event):
395 """ 396 Called when some error is encountered with the transport over 397 which the AMQP connection is to be established. This includes 398 authentication errors as well as socket errors. 399 """ 400 if event.transport.condition: 401 if event.transport.condition.info: 402 logging.error("%s: %s" % (event.transport.condition.name, event.transport.condition.description, event.transport.condition.info)) 403 else: 404 logging.error("%s: %s" % (event.transport.condition.name, event.transport.condition.description)) 405 if event.transport.condition.name in self.fatal_conditions: 406 event.connection.close() 407 else: 408 logging.error("Unspecified transport error")
409
410 - def on_connection_error(self, event):
411 """ 412 Called when the peer closes the connection with an error condition. 413 """ 414 EndpointStateHandler.print_error(event.connection, "connection")
415
416 - def on_session_error(self, event):
417 """ 418 Called when the peer closes the session with an error condition. 419 """ 420 EndpointStateHandler.print_error(event.session, "session") 421 event.connection.close()
422 429
430 - def on_reactor_init(self, event):
431 """ 432 Called when the event loop - the reactor - starts. 433 """ 434 if hasattr(event.reactor, 'subclass'): 435 setattr(event, event.reactor.subclass.__name__.lower(), event.reactor) 436 self.on_start(event)
437
438 - def on_start(self, event):
439 """ 440 Called when the event loop starts. (Just an alias for on_reactor_init) 441 """ 442 pass
443 - def on_connection_closed(self, event):
444 """ 445 Called when the connection is closed. 446 """ 447 pass
448 - def on_session_closed(self, event):
449 """ 450 Called when the session is closed. 451 """ 452 pass
458 - def on_connection_closing(self, event):
459 """ 460 Called when the peer initiates the closing of the connection. 461 """ 462 pass
463 - def on_session_closing(self, event):
464 """ 465 Called when the peer initiates the closing of the session. 466 """ 467 pass
473 - def on_disconnected(self, event):
474 """ 475 Called when the socket is disconnected. 476 """ 477 pass
478
479 - def on_sendable(self, event):
480 """ 481 Called when the sender link has credit and messages can 482 therefore be transferred. 483 """ 484 pass
485
486 - def on_accepted(self, event):
487 """ 488 Called when the remote peer accepts an outgoing message. 489 """ 490 pass
491
492 - def on_rejected(self, event):
493 """ 494 Called when the remote peer rejects an outgoing message. 495 """ 496 pass
497
498 - def on_released(self, event):
499 """ 500 Called when the remote peer releases an outgoing message. Note 501 that this may be in response to either the RELEASE or MODIFIED 502 state as defined by the AMQP specification. 503 """ 504 pass
505
506 - def on_settled(self, event):
507 """ 508 Called when the remote peer has settled the outgoing 509 message. This is the point at which it shouod never be 510 retransmitted. 511 """ 512 pass
513 - def on_message(self, event):
514 """ 515 Called when a message is received. The message itself can be 516 obtained as a property on the event. For the purpose of 517 refering to this message in further actions (e.g. if 518 explicitly accepting it, the ``delivery`` should be used, also 519 obtainable via a property on the event. 520 """ 521 pass
522
523 -class TransactionHandler(object):
524 """ 525 The interface for transaction handlers, i.e. objects that want to 526 be notified of state changes related to a transaction. 527 """
528 - def on_transaction_declared(self, event):
529 pass
530
531 - def on_transaction_committed(self, event):
532 pass
533
534 - def on_transaction_aborted(self, event):
535 pass
536
537 - def on_transaction_declare_failed(self, event):
538 pass
539
540 - def on_transaction_commit_failed(self, event):
541 pass
542
543 -class TransactionalClientHandler(MessagingHandler, TransactionHandler):
544 """ 545 An extension to the MessagingHandler for applications using 546 transactions. 547 """ 548
549 - def __init__(self, prefetch=10, auto_accept=False, auto_settle=True, peer_close_is_error=False):
550 super(TransactionalClientHandler, self).__init__(prefetch, auto_accept, auto_settle, peer_close_is_error)
551
552 - def accept(self, delivery, transaction=None):
553 if transaction: 554 transaction.accept(delivery) 555 else: 556 super(TransactionalClientHandler, self).accept(delivery)
557 558 from proton import WrappedHandler 559 from cproton import pn_flowcontroller, pn_handshaker, pn_iohandler
560 561 -class CFlowController(WrappedHandler):
562
563 - def __init__(self, window=1024):
564 WrappedHandler.__init__(self, lambda: pn_flowcontroller(window))
565
566 -class CHandshaker(WrappedHandler):
567
568 - def __init__(self):
569 WrappedHandler.__init__(self, pn_handshaker)
570
571 -class IOHandler(WrappedHandler):
572
573 - def __init__(self):
574 WrappedHandler.__init__(self, pn_iohandler)
575
576 -class PythonIO:
577
578 - def __init__(self):
579 self.selectables = [] 580 self.delegate = IOHandler()
581
582 - def on_unhandled(self, method, event):
583 event.dispatch(self.delegate)
584
585 - def on_selectable_init(self, event):
586 self.selectables.append(event.context)
587
588 - def on_selectable_updated(self, event):
589 pass
590
591 - def on_selectable_final(self, event):
592 sel = event.context 593 if sel.is_terminal: 594 self.selectables.remove(sel) 595 sel.release()
596
597 - def on_reactor_quiesced(self, event):
598 reactor = event.reactor 599 # check if we are still quiesced, other handlers of 600 # on_reactor_quiesced could have produced events to process 601 if not reactor.quiesced: return 602 603 reading = [] 604 writing = [] 605 deadline = None 606 for sel in self.selectables: 607 if sel.reading: 608 reading.append(sel) 609 if sel.writing: 610 writing.append(sel) 611 if sel.deadline: 612 if deadline is None: 613 deadline = sel.deadline 614 else: 615 deadline = min(sel.deadline, deadline) 616 617 if deadline is not None: 618 timeout = deadline - time.time() 619 else: 620 timeout = reactor.timeout 621 if (timeout < 0): timeout = 0 622 timeout = min(timeout, reactor.timeout) 623 readable, writable, _ = select(reading, writing, [], timeout) 624 625 reactor.mark() 626 627 now = time.time() 628 629 for s in readable: 630 s.readable() 631 for s in writable: 632 s.writable() 633 for s in self.selectables: 634 if s.deadline and now > s.deadline: 635 s.expired() 636 637 reactor.yield_()
638