commit/galaxy-central: jmchilton: Update to latest Pulsar client.
1 new commit in galaxy-central: https://bitbucket.org/galaxy/galaxy-central/commits/9207ecb43d81/ Changeset: 9207ecb43d81 User: jmchilton Date: 2014-09-16 22:29:31 Summary: Update to latest Pulsar client. Updates to Pulsar changeset f8d832fe4896f3fa9e27cc66a2aad7e446b99c2a - see Pulsar (https://github.com/galaxyproject/pulsar) for individual changes. Affected #: 6 files diff -r d682014dc25a03118fd16065cd5900863c9b95f9 -r 9207ecb43d81853da7a58cd5634683dc87991c4f lib/pulsar/client/__init__.py --- a/lib/pulsar/client/__init__.py +++ b/lib/pulsar/client/__init__.py @@ -34,7 +34,7 @@ test_tool = pulsar://http://remotehost:8913 Remember this must be added after the ``[galaxy:tool_runners]`` header -in the ``universe.ini`` file. +in the ``galaxy.ini`` file. """ diff -r d682014dc25a03118fd16065cd5900863c9b95f9 -r 9207ecb43d81853da7a58cd5634683dc87991c4f lib/pulsar/client/amqp_exchange.py --- a/lib/pulsar/client/amqp_exchange.py +++ b/lib/pulsar/client/amqp_exchange.py @@ -85,18 +85,27 @@ if heartbeat_thread: heartbeat_thread.join() sleep(DEFAULT_RECONNECT_CONSUMER_WAIT) + except BaseException: + log.exception("Problem consuming queue, consumer quitting in problematic fashion!") + raise def heartbeat(self, connection): log.debug('AMQP heartbeat thread alive') - while connection.connected: - connection.heartbeat_check() - sleep(DEFAULT_HEARTBEAT_WAIT) + try: + while connection.connected: + connection.heartbeat_check() + sleep(DEFAULT_HEARTBEAT_WAIT) + except BaseException: + log.exception("Problem with heartbeat, leaving heartbeat method in problematic state!") + raise log.debug('AMQP heartbeat thread exiting') def publish(self, name, payload): + key = self.__queue_name(name) + log.debug("Begin publishing to key %s" % key) with self.connection(self.__url) as connection: with pools.producers[connection].acquire() as producer: - key = self.__queue_name(name) + log.debug("Have producer for publishing to key %s" % key) producer.publish( payload, serializer='json', @@ -105,6 +114,7 @@ routing_key=key, **self.__publish_kwds ) + log.debug("Published to key %s" % key) def __publish_errback(self, exc, interval): log.error("Connection error while publishing: %r", exc, exc_info=1) diff -r d682014dc25a03118fd16065cd5900863c9b95f9 -r 9207ecb43d81853da7a58cd5634683dc87991c4f lib/pulsar/client/transport/__init__.py --- a/lib/pulsar/client/transport/__init__.py +++ b/lib/pulsar/client/transport/__init__.py @@ -23,9 +23,16 @@ transport_type = 'curl' return transport_type -# TODO: Provide urllib implementation if these unavailable, -# also explore a requests+poster option. -from .curl import get_file -from .curl import post_file +from .curl import curl_available +from .requests import requests_multipart_post_available +if curl_available: + from .curl import get_file + from .curl import post_file +elif requests_multipart_post_available: + from .requests import get_file + from .requests import post_file +else: + from .poster import get_file + from .poster import post_file __all__ = [get_transport, get_file, post_file] diff -r d682014dc25a03118fd16065cd5900863c9b95f9 -r 9207ecb43d81853da7a58cd5634683dc87991c4f lib/pulsar/client/transport/curl.py --- a/lib/pulsar/client/transport/curl.py +++ b/lib/pulsar/client/transport/curl.py @@ -2,10 +2,11 @@ from cStringIO import StringIO except ImportError: from io import StringIO +curl_available = True try: from pycurl import Curl except ImportError: - pass + curl_available = False from os.path import getsize diff -r d682014dc25a03118fd16065cd5900863c9b95f9 -r 9207ecb43d81853da7a58cd5634683dc87991c4f lib/pulsar/client/transport/poster.py --- /dev/null +++ b/lib/pulsar/client/transport/poster.py @@ -0,0 +1,56 @@ +from __future__ import absolute_import +try: + from urllib2 import urlopen +except ImportError: + from urllib.request import urlopen +try: + from urllib2 import Request +except ImportError: + from urllib.request import Request +try: + from galaxy import eggs + eggs.require("poster") +except ImportError: + pass + +try: + import poster +except ImportError: + poster = None + +POSTER_UNAVAILABLE_MESSAGE = "Pulsar configured to use poster module - but it is unavailable. Please install poster." + +import logging +log = logging.getLogger(__name__) + + +if poster is not None: + poster.streaminghttp.register_openers() + + +def post_file(url, path): + __ensure_poster() + try: + datagen, headers = poster.encode.multipart_encode({"file": open(path, "rb")}) + request = Request(url, datagen, headers) + return urlopen(request).read() + except: + log.exception("problem") + raise + + +def get_file(url, path): + __ensure_poster() + request = Request(url=url) + response = urlopen(request) + with open(path, 'wb') as output: + while True: + buffer = response.read(1024) + if not buffer: + break + output.write(buffer) + + +def __ensure_poster(): + if poster is None: + raise ImportError(POSTER_UNAVAILABLE_MESSAGE) diff -r d682014dc25a03118fd16065cd5900863c9b95f9 -r 9207ecb43d81853da7a58cd5634683dc87991c4f lib/pulsar/client/transport/requests.py --- /dev/null +++ b/lib/pulsar/client/transport/requests.py @@ -0,0 +1,50 @@ +from __future__ import absolute_import +try: + from galaxy import eggs + eggs.require("requets") +except ImportError: + pass + +try: + import requests +except ImportError: + requests = None +requests_multipart_post_available = False +try: + import requests_toolbelt + requests_multipart_post_available = True +except ImportError: + requests_toolbelt = None + + +REQUESTS_UNAVAILABLE_MESSAGE = "Pulsar configured to use requests module - but it is unavailable. Please install requests." +REQUESTS_TOOLBELT_UNAVAILABLE_MESSAGE = "Pulsar configured to use requests_toolbelt module - but it is unavailable. Please install requests_toolbelt." + +import logging +log = logging.getLogger(__name__) + + +def post_file(url, path): + if requests_toolbelt is None: + raise ImportError(REQUESTS_TOOLBELT_UNAVAILABLE_MESSAGE) + + __ensure_requests() + m = requests_toolbelt.MultipartEncoder( + fields={'file': ('filename', open(path, 'rb'))} + ) + requests.post(url, data=m, headers={'Content-Type': m.content_type}) + + +def get_file(url, path): + __ensure_requests() + r = requests.get(url, stream=True) + with open(path, 'wb') as f: + for chunk in r.iter_content(chunk_size=1024): + if chunk: # filter out keep-alive new chunks + f.write(chunk) + f.flush() + + +def __ensure_requests(): + if requests is None: + raise ImportError(REQUESTS_UNAVAILABLE_MESSAGE) Repository URL: https://bitbucket.org/galaxy/galaxy-central/ -- This is a commit notification from bitbucket.org. You are receiving this because you have the service enabled, addressing the recipient of this email.
participants (1)
-
commits-noreply@bitbucket.org