2 new commits in galaxy-central: https://bitbucket.org/galaxy/galaxy-central/commits/e3502f3a5dec/ Changeset: e3502f3a5dec User: jmchilton Date: 2014-06-02 07:13:55 Summary: Restore ability to disable LWR-MQ consumer timeout. Mis-translated this parameter when converting it to use runner spec. Affected #: 2 files diff -r cf31fae45bf3a2d3244744d86edabad0c1465729 -r e3502f3a5dec0e5cb11a60bd6473618d23f98b64 lib/galaxy/jobs/runners/lwr.py --- a/lib/galaxy/jobs/runners/lwr.py +++ b/lib/galaxy/jobs/runners/lwr.py @@ -58,7 +58,7 @@ default=None, ), amqp_consumer_timeout=dict( - map=specs.to_float_or_none, + map=lambda val: None if val == "None" else float(val), default=None, ), amqp_connect_ssl_ca_certs=dict( diff -r cf31fae45bf3a2d3244744d86edabad0c1465729 -r e3502f3a5dec0e5cb11a60bd6473618d23f98b64 lib/galaxy/jobs/runners/lwr_client/manager.py --- a/lib/galaxy/jobs/runners/lwr_client/manager.py +++ b/lib/galaxy/jobs/runners/lwr_client/manager.py @@ -84,8 +84,8 @@ connect_ssl=self.connect_ssl, publish_kwds=parse_amqp_publish_kwds(kwds) ) - timeout = kwds.get('amqp_consumer_timeout', None) - if timeout is not None: + timeout = kwds.get('amqp_consumer_timeout', False) + if timeout is not False: exchange_kwds['timeout'] = timeout self.exchange = LwrExchange(self.url, **exchange_kwds) self.status_cache = {} https://bitbucket.org/galaxy/galaxy-central/commits/667ba7198f66/ Changeset: 667ba7198f66 User: jmchilton Date: 2014-06-02 07:13:55 Summary: Move logic for creating LwrExchange out of MessageQueueClientManager... So can reuse server side when connecting server components to AMQP. Affected #: 3 files diff -r e3502f3a5dec0e5cb11a60bd6473618d23f98b64 -r 667ba7198f66c0ae9ae0eadbb9ce569a1af05199 lib/galaxy/jobs/runners/lwr_client/amqp_exchange_factory.py --- /dev/null +++ b/lib/galaxy/jobs/runners/lwr_client/amqp_exchange_factory.py @@ -0,0 +1,32 @@ +from .amqp_exchange import LwrExchange +from .util import filter_destination_params + + +def get_exchange(url, manager_name, params): + connect_ssl = parse_amqp_connect_ssl_params(params) + exchange_kwds = dict( + manager_name=manager_name, + connect_ssl=connect_ssl, + publish_kwds=parse_amqp_publish_kwds(params) + ) + timeout = params.get('amqp_consumer_timeout', False) + if timeout is not False: + exchange_kwds['timeout'] = timeout + exchange = LwrExchange(url, **exchange_kwds) + return exchange + + +def parse_amqp_connect_ssl_params(params): + ssl_params = filter_destination_params(params, "amqp_connect_ssl_") + if not ssl_params: + return + + ssl = __import__('ssl') + if 'cert_reqs' in ssl_params: + value = ssl_params['cert_reqs'] + ssl_params['cert_reqs'] = getattr(ssl, value.upper()) + return ssl_params + + +def parse_amqp_publish_kwds(params): + return filter_destination_params(params, "amqp_publish_") diff -r e3502f3a5dec0e5cb11a60bd6473618d23f98b64 -r 667ba7198f66c0ae9ae0eadbb9ce569a1af05199 lib/galaxy/jobs/runners/lwr_client/manager.py --- a/lib/galaxy/jobs/runners/lwr_client/manager.py +++ b/lib/galaxy/jobs/runners/lwr_client/manager.py @@ -13,10 +13,8 @@ from .object_client import ObjectStoreClient from .transport import get_transport from .util import TransferEventManager -from .util import parse_amqp_connect_ssl_params -from .util import parse_amqp_publish_kwds from .destination import url_to_destination_params -from .amqp_exchange import LwrExchange +from .amqp_exchange_factory import get_exchange from logging import getLogger @@ -78,16 +76,7 @@ def __init__(self, **kwds): self.url = kwds.get('url') self.manager_name = kwds.get("manager", None) or "_default_" - self.connect_ssl = parse_amqp_connect_ssl_params(kwds) - exchange_kwds = dict( - manager_name=self.manager_name, - connect_ssl=self.connect_ssl, - publish_kwds=parse_amqp_publish_kwds(kwds) - ) - timeout = kwds.get('amqp_consumer_timeout', False) - if timeout is not False: - exchange_kwds['timeout'] = timeout - self.exchange = LwrExchange(self.url, **exchange_kwds) + self.exchange = get_exchange(self.url, self.manager_name, kwds) self.status_cache = {} self.callback_lock = threading.Lock() self.callback_thread = None diff -r e3502f3a5dec0e5cb11a60bd6473618d23f98b64 -r 667ba7198f66c0ae9ae0eadbb9ce569a1af05199 lib/galaxy/jobs/runners/lwr_client/util.py --- a/lib/galaxy/jobs/runners/lwr_client/util.py +++ b/lib/galaxy/jobs/runners/lwr_client/util.py @@ -63,22 +63,6 @@ return contents -def parse_amqp_connect_ssl_params(params): - ssl_params = filter_destination_params(params, "amqp_connect_ssl_") - if not ssl_params: - return - - ssl = __import__('ssl') - if 'cert_reqs' in ssl_params: - value = ssl_params['cert_reqs'] - ssl_params['cert_reqs'] = getattr(ssl, value.upper()) - return ssl_params - - -def parse_amqp_publish_kwds(params): - return filter_destination_params(params, "amqp_publish_") - - def filter_destination_params(destination_params, prefix): destination_params = destination_params or {} return dict([(key[len(prefix):], destination_params[key]) Repository URL: https://bitbucket.org/galaxy/galaxy-central/ -- This is a commit notification from bitbucket.org. You are receiving this because you have the service enabled, addressing the recipient of this email.