Home | Trees | Indices | Help |
---|
|
1 # 2 # Licensed to the Apache Software Foundation (ASF) under one 3 # or more contributor license agreements. See the NOTICE file 4 # distributed with this work for additional information 5 # regarding copyright ownership. The ASF licenses this file 6 # to you under the Apache License, Version 2.0 (the 7 # "License"); you may not use this file except in compliance 8 # with the License. You may obtain a copy of the License at 9 # 10 # http://www.apache.org/licenses/LICENSE-2.0 11 # 12 # Unless required by applicable law or agreed to in writing, 13 # software distributed under the License is distributed on an 14 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 15 # KIND, either express or implied. See the License for the 16 # specific language governing permissions and limitations 17 # under the License. 18 # 19 import collections, socket, time, threading 20 21 from proton import ConnectionException, Delivery, Endpoint, Handler, Link, LinkException, Message 22 from proton import ProtonException, Timeout, Url 23 from proton.reactor import Container 24 from proton.handlers import MessagingHandler, IncomingMessageHandler55 62 6529 self.connection = connection 30 self.link = link 31 self.connection.wait(lambda: not (self.link.state & Endpoint.REMOTE_UNINIT), 32 msg="Opening link %s" % link.name) 33 self._checkClosed()3436 try: 37 self.connection.wait(lambda: self.link.state & Endpoint.REMOTE_CLOSED, 38 timeout=timeout, 39 msg="Opening link %s" % self.link.name) 40 except Timeout as e: pass 41 self._checkClosed()4244 if self.link.state & Endpoint.REMOTE_CLOSED: 45 self.link.close() 46 raise LinkDetached(self.link)4749 self.link.close() 50 self.connection.wait(lambda: not (self.link.state & Endpoint.REMOTE_ACTIVE), 51 msg="Closing link %s" % self.link.name)52 53 # Access to other link attributes.8768 super(BlockingSender, self).__init__(connection, sender) 69 if self.link.target and self.link.target.address and self.link.target.address != self.link.remote_target.address: 70 #this may be followed by a detach, which may contain an error condition, so wait a little... 71 self._waitForClose() 72 #...but close ourselves if peer does not 73 self.link.close() 74 raise LinkException("Failed to open sender %s, target does not match" % self.link.name)7577 delivery = self.link.send(msg) 78 self.connection.wait(lambda: _is_settled(delivery), msg="Sending on sender %s" % self.link.name, timeout=timeout) 79 if delivery.link.snd_settle_mode != Link.SND_SETTLED: 80 delivery.settle() 81 bad = error_states 82 if bad is None: 83 bad = [Delivery.REJECTED, Delivery.RELEASED] 84 if delivery.remote_state in bad: 85 raise SendException(delivery.remote_state) 86 return delivery12290 super(Fetcher, self).__init__(prefetch=prefetch, auto_accept=False) 91 self.connection = connection 92 self.incoming = collections.deque([]) 93 self.unsettled = collections.deque([])9496 self.incoming.append((event.message, event.delivery)) 97 self.connection.container.yield_() # Wake up the wait() loop to handle the message.98100 if event.link.state & Endpoint.LOCAL_ACTIVE: 101 event.link.close() 102 raise LinkDetached(event.link)103 106 107 @property109 return len(self.incoming)110112 message, delivery = self.incoming.popleft() 113 if not delivery.settled: 114 self.unsettled.append(delivery) 115 return message116164126 super(BlockingReceiver, self).__init__(connection, receiver) 127 if self.link.source and self.link.source.address and self.link.source.address != self.link.remote_source.address: 128 #this may be followed by a detach, which may contain an error condition, so wait a little... 129 self._waitForClose() 130 #...but close ourselves if peer does not 131 self.link.close() 132 raise LinkException("Failed to open receiver %s, source does not match" % self.link.name) 133 if credit: receiver.flow(credit) 134 self.fetcher = fetcher135 139141 if not self.fetcher: 142 raise Exception("Can't call receive on this receiver as a handler was provided") 143 if not self.link.credit: 144 self.link.flow(1) 145 self.connection.wait(lambda: self.fetcher.has_message, msg="Receiving on receiver %s" % self.link.name, timeout=timeout) 146 return self.fetcher.pop()147 150 153 159180168 self.link = link 169 if link.is_sender: 170 txt = "sender %s to %s closed" % (link.name, link.target.address) 171 else: 172 txt = "receiver %s from %s closed" % (link.name, link.source.address) 173 if link.remote_condition: 174 txt += " due to: %s" % link.remote_condition 175 self.condition = link.remote_condition.name 176 else: 177 txt += " by peer" 178 self.condition = None 179 super(LinkDetached, self).__init__(txt)193184 self.connection = connection 185 txt = "Connection %s closed" % connection.hostname 186 if connection.remote_condition: 187 txt += " due to: %s" % connection.remote_condition 188 self.condition = connection.remote_condition.name 189 else: 190 txt += " by peer" 191 self.condition = None 192 super(ConnectionClosed, self).__init__(txt)196 """ 197 A synchronous style connection wrapper. 198 """284199 - def __init__(self, url, timeout=None, container=None, ssl_domain=None, heartbeat=None, **kwargs):200 self.disconnected = False 201 self.timeout = timeout or 60 202 self.container = container or Container() 203 self.container.timeout = self.timeout 204 self.container.start() 205 self.url = Url(url).defaults() 206 self.conn = self.container.connect(url=self.url, handler=self, ssl_domain=ssl_domain, reconnect=False, heartbeat=heartbeat, **kwargs) 207 self.wait(lambda: not (self.conn.state & Endpoint.REMOTE_UNINIT), 208 msg="Opening connection")209211 return BlockingSender(self, self.container.create_sender(self.conn, address, name=name, handler=handler, options=options))212213 - def create_receiver(self, address, credit=None, dynamic=False, handler=None, name=None, options=None):214 prefetch = credit 215 if handler: 216 fetcher = None 217 if prefetch is None: 218 prefetch = 1 219 else: 220 fetcher = Fetcher(self, credit) 221 return BlockingReceiver( 222 self, self.container.create_receiver(self.conn, address, name=name, dynamic=dynamic, handler=handler or fetcher, options=options), fetcher, credit=prefetch)223225 self.conn.close() 226 try: 227 self.wait(lambda: not (self.conn.state & Endpoint.REMOTE_ACTIVE), 228 msg="Closing connection") 229 finally: 230 self.conn = None 231 self.container = None232 235237 """ Hand control over to the event loop (e.g. if waiting indefinitely for incoming messages) """ 238 while self.container.process(): pass239241 """Call process until condition() is true""" 242 if timeout is False: 243 timeout = self.timeout 244 if timeout is None: 245 while not condition() and not self.disconnected: 246 self.container.process() 247 else: 248 container_timeout = self.container.timeout 249 self.container.timeout = timeout 250 try: 251 deadline = time.time() + timeout 252 while not condition() and not self.disconnected: 253 self.container.process() 254 if deadline < time.time(): 255 txt = "Connection %s timed out" % self.url 256 if msg: txt += ": " + msg 257 raise Timeout(txt) 258 finally: 259 self.container.timeout = container_timeout 260 if self.disconnected or self._is_closed(): 261 self.container.stop() 262 self.conn.handler = None # break cyclical reference 263 if self.disconnected and not self._is_closed(): 264 raise ConnectionException("Connection %s disconnected" % self.url)265267 if event.link.state & Endpoint.LOCAL_ACTIVE: 268 event.link.close() 269 raise LinkDetached(event.link)270272 if event.connection.state & Endpoint.LOCAL_ACTIVE: 273 event.connection.close() 274 raise ConnectionClosed(event.connection)275277 self.on_transport_closed(event)278280 self.on_transport_closed(event)281298287 """Thread-safe atomic counter. Start at start, increment by step.""" 288 self.count, self.step = start, step 289 self.lock = threading.Lock()290292 """Get the next value""" 293 self.lock.acquire() 294 self.count += self.step; 295 result = self.count 296 self.lock.release() 297 return result300 """ 301 Implementation of the synchronous request-responce (aka RPC) pattern. 302 @ivar address: Address for all requests, may be None. 303 @ivar connection: Connection for requests and responses. 304 """ 305 306 correlation_id = AtomicCount() 307346 347 @property309 """ 310 Send requests and receive responses. A single instance can send many requests 311 to the same or different addresses. 312 313 @param connection: A L{BlockingConnection} 314 @param address: Address for all requests. 315 If not specified, each request must have the address property set. 316 Sucessive messages may have different addresses. 317 """ 318 super(SyncRequestResponse, self).__init__() 319 self.connection = connection 320 self.address = address 321 self.sender = self.connection.create_sender(self.address) 322 # dynamic=true generates a unique address dynamically for this receiver. 323 # credit=1 because we want to receive 1 response message initially. 324 self.receiver = self.connection.create_receiver(None, dynamic=True, credit=1, handler=self) 325 self.response = None326328 """ 329 Send a request message, wait for and return the response message. 330 331 @param request: A L{proton.Message}. If L{self.address} is not set the 332 L{self.address} must be set and will be used. 333 """ 334 if not self.address and not request.address: 335 raise ValueError("Request message has no address: %s" % request) 336 request.reply_to = self.reply_to 337 request.correlation_id = correlation_id = self.correlation_id.next() 338 self.sender.send(request) 339 def wakeup(): 340 return self.response and (self.response.correlation_id == correlation_id)341 self.connection.wait(wakeup, msg="Waiting for response") 342 response = self.response 343 self.response = None # Ready for next response. 344 self.receiver.flow(1) # Set up credit for the next response. 345 return response349 """Return the dynamic address of our receiver.""" 350 return self.receiver.remote_source.address351353 """Called when we receive a message for our receiver.""" 354 self.response = event.message 355 self.connection.container.yield_() # Wake up the wait() loop to handle the message.356
Home | Trees | Indices | Help |
---|
Generated by Epydoc 3.0.1 on Wed Mar 7 15:15:41 2018 | http://epydoc.sourceforge.net |