1 new commit in galaxy-central: https://bitbucket.org/galaxy/galaxy-central/commits/1ed8dd575321/ Changeset: 1ed8dd575321 User: jmchilton Date: 2014-06-02 03:46:45 Summary: Rework LWR runner parameters to use @natefoo's spec mechanism. Add new module - galaxy.util.specs - with utilities for things I predict would be common spec types. This simplifies some things in terms of defaults and empty values that means other simplifications outside the LWR job runner constructor as well (e.g. for ssl_ parameters). Affected #: 3 files diff -r 80d7d65df2f4986f70dce9cd9ee5caa615e16847 -r 1ed8dd5753212286c1abd168c55708b26da5edbf lib/galaxy/jobs/runners/lwr.py --- a/lib/galaxy/jobs/runners/lwr.py +++ b/lib/galaxy/jobs/runners/lwr.py @@ -8,6 +8,7 @@ from galaxy.tools.deps import dependencies from galaxy.util import string_as_bool_or_none from galaxy.util.bunch import Bunch +from galaxy.util import specs import errno from time import sleep @@ -34,6 +35,50 @@ # url_for from web threads. https://gist.github.com/jmchilton/9098762 DEFAULT_GALAXY_URL = "http://localhost:8080" +LWR_PARAM_SPECS = dict( + transport=dict( + map=specs.to_str_or_none, + valid=specs.is_in("urllib", "curl", None), + default=None + ), + cache=dict( + map=specs.to_bool_or_none, + default=None, + ), + url=dict( + map=specs.to_str_or_none, + default=None, + ), + galaxy_url=dict( + map=specs.to_str_or_none, + default=DEFAULT_GALAXY_URL, + ), + manager=dict( + map=specs.to_str_or_none, + default=None, + ), + amqp_consumer_timeout=dict( + map=specs.to_float_or_none, + default=None, + ), + amqp_connect_ssl_ca_certs=dict( + map=specs.to_str_or_none, + default=None, + ), + amqp_connect_ssl_keyfile=dict( + map=specs.to_str_or_none, + default=None, + ), + amqp_connect_ssl_certfile=dict( + map=specs.to_str_or_none, + default=None, + ), + amqp_connect_ssl_cert_reqs=dict( + map=specs.to_str_or_none, + default=None, + ), +) + class LwrJobRunner( AsynchronousJobRunner ): """ @@ -41,28 +86,23 @@ """ runner_name = "LWRRunner" - def __init__( self, app, nworkers, transport=None, cache=None, url=None, galaxy_url=DEFAULT_GALAXY_URL, **kwds ): + def __init__( self, app, nworkers, **kwds ): """Start the job runner """ - super( LwrJobRunner, self ).__init__( app, nworkers ) + super( LwrJobRunner, self ).__init__( app, nworkers, runner_param_specs=LWR_PARAM_SPECS, **kwds ) + transport = self.runner_params.transport + cache = self.runner_params.cache + url = self.runner_params.url self._init_worker_threads() - amqp_connect_ssl_args = {} - amqp_consumer_timeout = False - for kwd in kwds.keys(): - if kwd.startswith('amqp_connect_ssl_'): - amqp_connect_ssl_args[kwd] = kwds[kwd] client_manager_kwargs = { 'transport_type': transport, 'cache': string_as_bool_or_none(cache), "url": url, - 'amqp_connect_ssl_args': amqp_connect_ssl_args or None, 'manager': kwds.get("manager", None), } - if 'amqp_consumer_timeout' in kwds: - if kwds['amqp_consumer_timeout'] == 'None': - client_manager_kwargs['amqp_consumer_timeout'] = None - else: - client_manager_kwargs['amqp_consumer_timeout'] = float(kwds['amqp_consumer_timeout']) - self.galaxy_url = galaxy_url + for kwd in self.runner_params.keys(): + if kwd.startswith('amqp_'): + client_manager_kwargs[kwd] = self.runner_params[kwd] + self.galaxy_url = self.runner_params.galaxy_url self.client_manager = build_client_manager(**client_manager_kwargs) if url: self.client_manager.ensure_has_status_update_callback(self.__async_update) diff -r 80d7d65df2f4986f70dce9cd9ee5caa615e16847 -r 1ed8dd5753212286c1abd168c55708b26da5edbf lib/galaxy/jobs/runners/lwr_client/manager.py --- a/lib/galaxy/jobs/runners/lwr_client/manager.py +++ b/lib/galaxy/jobs/runners/lwr_client/manager.py @@ -77,12 +77,15 @@ def __init__(self, **kwds): self.url = kwds.get('url') self.manager_name = kwds.get("manager", None) or "_default_" - self.connect_ssl = parse_amqp_connect_ssl_params(kwds.get('amqp_connect_ssl_args', None)) - timeout = kwds.get('amqp_consumer_timeout', False) - if timeout is False: - self.exchange = LwrExchange(self.url, self.manager_name, self.connect_ssl) - else: - self.exchange = LwrExchange(self.url, self.manager_name, self.connect_ssl, timeout=timeout) + self.connect_ssl = parse_amqp_connect_ssl_params(kwds) + exchange_kwds = dict( + manager_name=self.manager_name, + connect_ssl=self.connect_ssl, + ) + timeout = kwds.get('amqp_consumer_timeout', None) + if timeout is not None: + exchange_kwds['timeout'] = timeout + self.exchange = LwrExchange(self.url, **exchange_kwds) self.status_cache = {} self.callback_lock = threading.Lock() self.callback_thread = None diff -r 80d7d65df2f4986f70dce9cd9ee5caa615e16847 -r 1ed8dd5753212286c1abd168c55708b26da5edbf lib/galaxy/util/specs.py --- /dev/null +++ b/lib/galaxy/util/specs.py @@ -0,0 +1,33 @@ +import functools +import operator + +from galaxy import util + + +# Utility methods for specifing maps. +def to_str_or_none( value ): + if value is None: + return None + else: + return str( value ) + + +def to_bool_or_none( value ): + return util.string_as_bool_or_none( value ) + + +def to_bool( value ): + return util.asbool( value ) + + +def to_float_or_none( value ): + if value is None: + return None + else: + return float( value ) + + +# Utility methods for specifing valid... +def is_in( *args ): + return functools.partial( operator.contains, args ) + Repository URL: https://bitbucket.org/galaxy/galaxy-central/ -- This is a commit notification from bitbucket.org. You are receiving this because you have the service enabled, addressing the recipient of this email.