Home | Trees | Indices | Help |
---|
|
1 # 2 # Licensed to the Apache Software Foundation (ASF) under one 3 # or more contributor license agreements. See the NOTICE file 4 # distributed with this work for additional information 5 # regarding copyright ownership. The ASF licenses this file 6 # to you under the Apache License, Version 2.0 (the 7 # "License"); you may not use this file except in compliance 8 # with the License. You may obtain a copy of the License at 9 # 10 # http://www.apache.org/licenses/LICENSE-2.0 11 # 12 # Unless required by applicable law or agreed to in writing, 13 # software distributed under the License is distributed on an 14 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 15 # KIND, either express or implied. See the License for the 16 # specific language governing permissions and limitations 17 # under the License. 18 # 19 import collections, socket, time, threading 20 21 from proton import ConnectionException, Delivery, Endpoint, Handler, Link, LinkException, Message 22 from proton import ProtonException, Timeout, Url 23 from proton.reactor import Container 24 from proton.handlers import MessagingHandler, IncomingMessageHandler55 62 6529 self.connection = connection 30 self.link = link 31 self.connection.wait(lambda: not (self.link.state & Endpoint.REMOTE_UNINIT), 32 msg="Opening link %s" % link.name) 33 self._checkClosed()3436 try: 37 self.connection.wait(lambda: self.link.state & Endpoint.REMOTE_CLOSED, 38 timeout=timeout, 39 msg="Opening link %s" % self.link.name) 40 except Timeout as e: pass 41 self._checkClosed()4244 if self.link.state & Endpoint.REMOTE_CLOSED: 45 self.link.close() 46 raise LinkDetached(self.link)4749 self.link.close() 50 self.connection.wait(lambda: not (self.link.state & Endpoint.REMOTE_ACTIVE), 51 msg="Closing link %s" % self.link.name)52 53 # Access to other link attributes.8768 super(BlockingSender, self).__init__(connection, sender) 69 if self.link.target and self.link.target.address and self.link.target.address != self.link.remote_target.address: 70 #this may be followed by a detach, which may contain an error condition, so wait a little... 71 self._waitForClose() 72 #...but close ourselves if peer does not 73 self.link.close() 74 raise LinkException("Failed to open sender %s, target does not match" % self.link.name)7577 delivery = self.link.send(msg) 78 self.connection.wait(lambda: _is_settled(delivery), msg="Sending on sender %s" % self.link.name, timeout=timeout) 79 if delivery.link.snd_settle_mode != Link.SND_SETTLED: 80 delivery.settle() 81 bad = error_states 82 if bad is None: 83 bad = [Delivery.REJECTED, Delivery.RELEASED] 84 if delivery.remote_state in bad: 85 raise SendException(delivery.remote_state) 86 return delivery12290 super(Fetcher, self).__init__(prefetch=prefetch, auto_accept=False) 91 self.connection = connection 92 self.incoming = collections.deque([]) 93 self.unsettled = collections.deque([])9496 self.incoming.append((event.message, event.delivery)) 97 self.connection.container.yield_() # Wake up the wait() loop to handle the message.98100 if event.link.state & Endpoint.LOCAL_ACTIVE: 101 event.link.close() 102 raise LinkDetached(event.link)103 106 107 @property109 return len(self.incoming)110112 message, delivery = self.incoming.popleft() 113 if not delivery.settled: 114 self.unsettled.append(delivery) 115 return message116169126 super(BlockingReceiver, self).__init__(connection, receiver) 127 if self.link.source and self.link.source.address and self.link.source.address != self.link.remote_source.address: 128 #this may be followed by a detach, which may contain an error condition, so wait a little... 129 self._waitForClose() 130 #...but close ourselves if peer does not 131 self.link.close() 132 raise LinkException("Failed to open receiver %s, source does not match" % self.link.name) 133 if credit: receiver.flow(credit) 134 self.fetcher = fetcher 135 self.container = connection.container136138 self.fetcher = None 139 # The next line causes a core dump if the Proton-C reactor finalizes 140 # first. The self.container reference prevents reactor finalization 141 # until after it is set to None. 142 self.link.handler = None 143 self.container = None144146 if not self.fetcher: 147 raise Exception("Can't call receive on this receiver as a handler was provided") 148 if not self.link.credit: 149 self.link.flow(1) 150 self.connection.wait(lambda: self.fetcher.has_message, msg="Receiving on receiver %s" % self.link.name, timeout=timeout) 151 return self.fetcher.pop()152 155 158 164185173 self.link = link 174 if link.is_sender: 175 txt = "sender %s to %s closed" % (link.name, link.target.address) 176 else: 177 txt = "receiver %s from %s closed" % (link.name, link.source.address) 178 if link.remote_condition: 179 txt += " due to: %s" % link.remote_condition 180 self.condition = link.remote_condition.name 181 else: 182 txt += " by peer" 183 self.condition = None 184 super(LinkDetached, self).__init__(txt)198189 self.connection = connection 190 txt = "Connection %s closed" % connection.hostname 191 if connection.remote_condition: 192 txt += " due to: %s" % connection.remote_condition 193 self.condition = connection.remote_condition.name 194 else: 195 txt += " by peer" 196 self.condition = None 197 super(ConnectionClosed, self).__init__(txt)201 """ 202 A synchronous style connection wrapper. 203 """299204 - def __init__(self, url, timeout=None, container=None, ssl_domain=None, heartbeat=None, **kwargs):205 self.disconnected = False 206 self.timeout = timeout or 60 207 self.container = container or Container() 208 self.container.timeout = self.timeout 209 self.container.start() 210 self.url = Url(url).defaults() 211 self.conn = self.container.connect(url=self.url, handler=self, ssl_domain=ssl_domain, reconnect=False, heartbeat=heartbeat, **kwargs) 212 self.wait(lambda: not (self.conn.state & Endpoint.REMOTE_UNINIT), 213 msg="Opening connection")214216 return BlockingSender(self, self.container.create_sender(self.conn, address, name=name, handler=handler, options=options))217218 - def create_receiver(self, address, credit=None, dynamic=False, handler=None, name=None, options=None):219 prefetch = credit 220 if handler: 221 fetcher = None 222 if prefetch is None: 223 prefetch = 1 224 else: 225 fetcher = Fetcher(self, credit) 226 return BlockingReceiver( 227 self, self.container.create_receiver(self.conn, address, name=name, dynamic=dynamic, handler=handler or fetcher, options=options), fetcher, credit=prefetch)228230 if not self.conn: 231 return 232 self.conn.close() 233 try: 234 self.wait(lambda: not (self.conn.state & Endpoint.REMOTE_ACTIVE), 235 msg="Closing connection") 236 finally: 237 self.conn.free() 238 # For cleanup, reactor needs to process PN_CONNECTION_FINAL 239 # and all events with embedded contexts must be drained. 240 self.run() # will not block any more 241 self.conn = None 242 self.container.global_handler = None # break circular ref: container to cadapter.on_error 243 self.container = None244 247249 """ Hand control over to the event loop (e.g. if waiting indefinitely for incoming messages) """ 250 while self.container.process(): pass 251 self.container.stop() 252 self.container.process()253255 """Call process until condition() is true""" 256 if timeout is False: 257 timeout = self.timeout 258 if timeout is None: 259 while not condition() and not self.disconnected: 260 self.container.process() 261 else: 262 container_timeout = self.container.timeout 263 self.container.timeout = timeout 264 try: 265 deadline = time.time() + timeout 266 while not condition() and not self.disconnected: 267 self.container.process() 268 if deadline < time.time(): 269 txt = "Connection %s timed out" % self.url 270 if msg: txt += ": " + msg 271 raise Timeout(txt) 272 finally: 273 self.container.timeout = container_timeout 274 if self.disconnected or self._is_closed(): 275 self.container.stop() 276 self.conn.handler = None # break cyclical reference 277 if self.disconnected and not self._is_closed(): 278 raise ConnectionException( 279 "Connection %s disconnected: %s" % (self.url, self.disconnected))280282 if event.link.state & Endpoint.LOCAL_ACTIVE: 283 event.link.close() 284 raise LinkDetached(event.link)285287 if event.connection.state & Endpoint.LOCAL_ACTIVE: 288 event.connection.close() 289 raise ConnectionClosed(event.connection)290292 self.on_transport_closed(event)293295 self.on_transport_closed(event)296313302 """Thread-safe atomic counter. Start at start, increment by step.""" 303 self.count, self.step = start, step 304 self.lock = threading.Lock()305307 """Get the next value""" 308 self.lock.acquire() 309 self.count += self.step; 310 result = self.count 311 self.lock.release() 312 return result315 """ 316 Implementation of the synchronous request-responce (aka RPC) pattern. 317 @ivar address: Address for all requests, may be None. 318 @ivar connection: Connection for requests and responses. 319 """ 320 321 correlation_id = AtomicCount() 322361 362 @property324 """ 325 Send requests and receive responses. A single instance can send many requests 326 to the same or different addresses. 327 328 @param connection: A L{BlockingConnection} 329 @param address: Address for all requests. 330 If not specified, each request must have the address property set. 331 Sucessive messages may have different addresses. 332 """ 333 super(SyncRequestResponse, self).__init__() 334 self.connection = connection 335 self.address = address 336 self.sender = self.connection.create_sender(self.address) 337 # dynamic=true generates a unique address dynamically for this receiver. 338 # credit=1 because we want to receive 1 response message initially. 339 self.receiver = self.connection.create_receiver(None, dynamic=True, credit=1, handler=self) 340 self.response = None341343 """ 344 Send a request message, wait for and return the response message. 345 346 @param request: A L{proton.Message}. If L{self.address} is not set the 347 L{self.address} must be set and will be used. 348 """ 349 if not self.address and not request.address: 350 raise ValueError("Request message has no address: %s" % request) 351 request.reply_to = self.reply_to 352 request.correlation_id = correlation_id = self.correlation_id.next() 353 self.sender.send(request) 354 def wakeup(): 355 return self.response and (self.response.correlation_id == correlation_id)356 self.connection.wait(wakeup, msg="Waiting for response") 357 response = self.response 358 self.response = None # Ready for next response. 359 self.receiver.flow(1) # Set up credit for the next response. 360 return response364 """Return the dynamic address of our receiver.""" 365 return self.receiver.remote_source.address366368 """Called when we receive a message for our receiver.""" 369 self.response = event.message 370 self.connection.container.yield_() # Wake up the wait() loop to handle the message.371
Home | Trees | Indices | Help |
---|
Generated by Epydoc 3.0.1 on Thu Jul 20 18:23:17 2017 | http://epydoc.sourceforge.net |