Home | Trees | Indices | Help |
---|
|
1 # 2 # Licensed to the Apache Software Foundation (ASF) under one 3 # or more contributor license agreements. See the NOTICE file 4 # distributed with this work for additional information 5 # regarding copyright ownership. The ASF licenses this file 6 # to you under the Apache License, Version 2.0 (the 7 # "License"); you may not use this file except in compliance 8 # with the License. You may obtain a copy of the License at 9 # 10 # http://www.apache.org/licenses/LICENSE-2.0 11 # 12 # Unless required by applicable law or agreed to in writing, 13 # software distributed under the License is distributed on an 14 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 15 # KIND, either express or implied. See the License for the 16 # specific language governing permissions and limitations 17 # under the License. 18 # 19 20 from __future__ import absolute_import 21 22 import collections 23 import time 24 import threading 25 26 from ._exceptions import ProtonException, ConnectionException, LinkException, Timeout 27 from ._delivery import Delivery 28 from ._endpoints import Endpoint, Link 29 from ._events import Handler 30 from ._url import Url 31 32 from ._reactor import Container 33 from ._handlers import MessagingHandler, IncomingMessageHandler67 76 8038 self.connection = connection 39 self.link = link 40 self.connection.wait(lambda: not (self.link.state & Endpoint.REMOTE_UNINIT), 41 msg="Opening link %s" % link.name) 42 self._checkClosed()4345 try: 46 self.connection.wait(lambda: self.link.state & Endpoint.REMOTE_CLOSED, 47 timeout=timeout, 48 msg="Opening link %s" % self.link.name) 49 except Timeout as e: 50 pass 51 self._checkClosed()5254 if self.link.state & Endpoint.REMOTE_CLOSED: 55 self.link.close() 56 if not self.connection.closing: 57 raise LinkDetached(self.link)5860 self.link.close() 61 self.connection.wait(lambda: not (self.link.state & Endpoint.REMOTE_ACTIVE), 62 msg="Closing link %s" % self.link.name)63 64 # Access to other link attributes.10484 super(BlockingSender, self).__init__(connection, sender) 85 if self.link.target and self.link.target.address and self.link.target.address != self.link.remote_target.address: 86 # this may be followed by a detach, which may contain an error condition, so wait a little... 87 self._waitForClose() 88 # ...but close ourselves if peer does not 89 self.link.close() 90 raise LinkException("Failed to open sender %s, target does not match" % self.link.name)9193 delivery = self.link.send(msg) 94 self.connection.wait(lambda: _is_settled(delivery), msg="Sending on sender %s" % self.link.name, 95 timeout=timeout) 96 if delivery.link.snd_settle_mode != Link.SND_SETTLED: 97 delivery.settle() 98 bad = error_states 99 if bad is None: 100 bad = [Delivery.REJECTED, Delivery.RELEASED] 101 if delivery.remote_state in bad: 102 raise SendException(delivery.remote_state) 103 return delivery142108 super(Fetcher, self).__init__(prefetch=prefetch, auto_accept=False) 109 self.connection = connection 110 self.incoming = collections.deque([]) 111 self.unsettled = collections.deque([])112114 self.incoming.append((event.message, event.delivery)) 115 self.connection.container.yield_() # Wake up the wait() loop to handle the message.116118 if event.link.state & Endpoint.LOCAL_ACTIVE: 119 event.link.close() 120 if not self.connection.closing: 121 raise LinkDetached(event.link)122 126 127 @property 130132 message, delivery = self.incoming.popleft() 133 if not delivery.settled: 134 self.unsettled.append(delivery) 135 return message136190146 super(BlockingReceiver, self).__init__(connection, receiver) 147 if self.link.source and self.link.source.address and self.link.source.address != self.link.remote_source.address: 148 # this may be followed by a detach, which may contain an error condition, so wait a little... 149 self._waitForClose() 150 # ...but close ourselves if peer does not 151 self.link.close() 152 raise LinkException("Failed to open receiver %s, source does not match" % self.link.name) 153 if credit: receiver.flow(credit) 154 self.fetcher = fetcher 155 self.container = connection.container156158 self.fetcher = None 159 # The next line causes a core dump if the Proton-C reactor finalizes 160 # first. The self.container reference prevents out of order reactor 161 # finalization. It may not be set if exception in BlockingLink.__init__ 162 if hasattr(self, "container"): 163 self.link.handler = None # implicit call to reactor164166 if not self.fetcher: 167 raise Exception("Can't call receive on this receiver as a handler was provided") 168 if not self.link.credit: 169 self.link.flow(1) 170 self.connection.wait(lambda: self.fetcher.has_message, msg="Receiving on receiver %s" % self.link.name, 171 timeout=timeout) 172 return self.fetcher.pop()173 176 179 185206194 self.link = link 195 if link.is_sender: 196 txt = "sender %s to %s closed" % (link.name, link.target.address) 197 else: 198 txt = "receiver %s from %s closed" % (link.name, link.source.address) 199 if link.remote_condition: 200 txt += " due to: %s" % link.remote_condition 201 self.condition = link.remote_condition.name 202 else: 203 txt += " by peer" 204 self.condition = None 205 super(LinkDetached, self).__init__(txt)219210 self.connection = connection 211 txt = "Connection %s closed" % connection.hostname 212 if connection.remote_condition: 213 txt += " due to: %s" % connection.remote_condition 214 self.condition = connection.remote_condition.name 215 else: 216 txt += " by peer" 217 self.condition = None 218 super(ConnectionClosed, self).__init__(txt)222 """ 223 A synchronous style connection wrapper. 224 225 This object's implementation uses OS resources. To ensure they 226 are released when the object is no longer in use, make sure that 227 object operations are enclosed in a try block and that close() is 228 always executed on exit. 229 """ 230344231 - def __init__(self, url, timeout=None, container=None, ssl_domain=None, heartbeat=None, **kwargs):232 self.disconnected = False 233 self.timeout = timeout or 60 234 self.container = container or Container() 235 self.container.timeout = self.timeout 236 self.container.start() 237 self.url = Url(url).defaults() 238 self.conn = None 239 self.closing = False 240 failed = True 241 try: 242 self.conn = self.container.connect(url=self.url, handler=self, ssl_domain=ssl_domain, reconnect=False, 243 heartbeat=heartbeat, **kwargs) 244 self.wait(lambda: not (self.conn.state & Endpoint.REMOTE_UNINIT), 245 msg="Opening connection") 246 failed = False 247 finally: 248 if failed and self.conn: 249 self.close()250252 return BlockingSender(self, self.container.create_sender(self.conn, address, name=name, handler=handler, 253 options=options))254255 - def create_receiver(self, address, credit=None, dynamic=False, handler=None, name=None, options=None):256 prefetch = credit 257 if handler: 258 fetcher = None 259 if prefetch is None: 260 prefetch = 1 261 else: 262 fetcher = Fetcher(self, credit) 263 return BlockingReceiver( 264 self, 265 self.container.create_receiver(self.conn, address, name=name, dynamic=dynamic, handler=handler or fetcher, 266 options=options), fetcher, credit=prefetch)267269 # TODO: provide stronger interrupt protection on cleanup. See PEP 419 270 if self.closing: 271 return 272 self.closing = True 273 self.container.errors = [] 274 try: 275 if self.conn: 276 self.conn.close() 277 self.wait(lambda: not (self.conn.state & Endpoint.REMOTE_ACTIVE), 278 msg="Closing connection") 279 finally: 280 self.conn.free() 281 # Nothing left to block on. Allow reactor to clean up. 282 self.run() 283 self.conn = None 284 self.container.global_handler = None # break circular ref: container to cadapter.on_error 285 self.container.stop_events() 286 self.container = None287 290292 """ Hand control over to the event loop (e.g. if waiting indefinitely for incoming messages) """ 293 while self.container.process(): pass 294 self.container.stop() 295 self.container.process()296298 """Call process until condition() is true""" 299 if timeout is False: 300 timeout = self.timeout 301 if timeout is None: 302 while not condition() and not self.disconnected: 303 self.container.process() 304 else: 305 container_timeout = self.container.timeout 306 self.container.timeout = timeout 307 try: 308 deadline = time.time() + timeout 309 while not condition() and not self.disconnected: 310 self.container.process() 311 if deadline < time.time(): 312 txt = "Connection %s timed out" % self.url 313 if msg: txt += ": " + msg 314 raise Timeout(txt) 315 finally: 316 self.container.timeout = container_timeout 317 if self.disconnected or self._is_closed(): 318 self.container.stop() 319 self.conn.handler = None # break cyclical reference 320 if self.disconnected and not self._is_closed(): 321 raise ConnectionException( 322 "Connection %s disconnected: %s" % (self.url, self.disconnected))323325 if event.link.state & Endpoint.LOCAL_ACTIVE: 326 event.link.close() 327 if not self.closing: 328 raise LinkDetached(event.link)329331 if event.connection.state & Endpoint.LOCAL_ACTIVE: 332 event.connection.close() 333 if not self.closing: 334 raise ConnectionClosed(event.connection)335337 self.on_transport_closed(event)338340 self.on_transport_closed(event)341359348 """Thread-safe atomic counter. Start at start, increment by step.""" 349 self.count, self.step = start, step 350 self.lock = threading.Lock()351353 """Get the next value""" 354 self.lock.acquire() 355 self.count += self.step; 356 result = self.count 357 self.lock.release() 358 return result362 """ 363 Implementation of the synchronous request-response (aka RPC) pattern. 364 @ivar address: Address for all requests, may be None. 365 @ivar connection: Connection for requests and responses. 366 """ 367 368 correlation_id = AtomicCount() 369410 411 @property371 """ 372 Send requests and receive responses. A single instance can send many requests 373 to the same or different addresses. 374 375 @param connection: A L{BlockingConnection} 376 @param address: Address for all requests. 377 If not specified, each request must have the address property set. 378 Successive messages may have different addresses. 379 """ 380 super(SyncRequestResponse, self).__init__() 381 self.connection = connection 382 self.address = address 383 self.sender = self.connection.create_sender(self.address) 384 # dynamic=true generates a unique address dynamically for this receiver. 385 # credit=1 because we want to receive 1 response message initially. 386 self.receiver = self.connection.create_receiver(None, dynamic=True, credit=1, handler=self) 387 self.response = None388390 """ 391 Send a request message, wait for and return the response message. 392 393 @param request: A L{proton.Message}. If L{self.address} is not set the 394 L{self.address} must be set and will be used. 395 """ 396 if not self.address and not request.address: 397 raise ValueError("Request message has no address: %s" % request) 398 request.reply_to = self.reply_to 399 request.correlation_id = correlation_id = str(self.correlation_id.next()) 400 self.sender.send(request) 401 402 def wakeup(): 403 return self.response and (self.response.correlation_id == correlation_id)404 405 self.connection.wait(wakeup, msg="Waiting for response") 406 response = self.response 407 self.response = None # Ready for next response. 408 self.receiver.flow(1) # Set up credit for the next response. 409 return response413 """Return the dynamic address of our receiver.""" 414 return self.receiver.remote_source.address415417 """Called when we receive a message for our receiver.""" 418 self.response = event.message 419 self.connection.container.yield_() # Wake up the wait() loop to handle the message.420
Home | Trees | Indices | Help |
---|
Generated by Epydoc 3.0.1 on Wed Oct 30 18:00:19 2019 | http://epydoc.sourceforge.net |