3 new commits in galaxy-central: https://bitbucket.org/galaxy/galaxy-central/commits/927ebdb27fd1/ Changeset: 927ebdb27fd1 User: jmchilton Date: 2014-06-02 05:34:39 Summary: Build on 1ed8dd5 for even simpler LWR param handling. Affected #: 2 files diff -r 1ed8dd5753212286c1abd168c55708b26da5edbf -r 927ebdb27fd197edb8578bf70d7e5ec049f7abfd lib/galaxy/jobs/runners/lwr.py --- a/lib/galaxy/jobs/runners/lwr.py +++ b/lib/galaxy/jobs/runners/lwr.py @@ -89,26 +89,25 @@ def __init__( self, app, nworkers, **kwds ): """Start the job runner """ super( LwrJobRunner, self ).__init__( app, nworkers, runner_param_specs=LWR_PARAM_SPECS, **kwds ) - transport = self.runner_params.transport - cache = self.runner_params.cache - url = self.runner_params.url self._init_worker_threads() - client_manager_kwargs = { - 'transport_type': transport, - 'cache': string_as_bool_or_none(cache), - "url": url, - 'manager': kwds.get("manager", None), - } - for kwd in self.runner_params.keys(): - if kwd.startswith('amqp_'): - client_manager_kwargs[kwd] = self.runner_params[kwd] self.galaxy_url = self.runner_params.galaxy_url - self.client_manager = build_client_manager(**client_manager_kwargs) - if url: + self.__init_client_manager() + if self.runner_params.url: + # This is a message queue driven runner, don't monitor + # just setup required callback. self.client_manager.ensure_has_status_update_callback(self.__async_update) else: self._init_monitor_thread() + def __init_client_manager( self ): + client_manager_kwargs = {} + for kwd in 'url', 'manager', 'cache', 'transport': + client_manager_kwargs[ kwd ] = self.runner_params[ kwd ] + for kwd in self.runner_params.keys(): + if kwd.startswith( 'amqp_' ): + client_manager_kwargs[ kwd ] = self.runner_params[ kwd ] + self.client_manager = build_client_manager(**client_manager_kwargs) + def url_to_destination( self, url ): """Convert a legacy URL to a job destination""" return JobDestination( runner="lwr", params=url_to_destination_params( url ) ) diff -r 1ed8dd5753212286c1abd168c55708b26da5edbf -r 927ebdb27fd197edb8578bf70d7e5ec049f7abfd lib/galaxy/jobs/runners/lwr_client/manager.py --- a/lib/galaxy/jobs/runners/lwr_client/manager.py +++ b/lib/galaxy/jobs/runners/lwr_client/manager.py @@ -44,7 +44,7 @@ self.job_manager_interface_args = dict(job_manager=kwds['job_manager'], file_cache=kwds['file_cache']) else: self.job_manager_interface_class = HttpLwrInterface - transport_type = kwds.get('transport_type', None) + transport_type = kwds.get('transport', None) transport = get_transport(transport_type) self.job_manager_interface_args = dict(transport=transport) cache = kwds.get('cache', None) @@ -138,7 +138,7 @@ self.interface_args = dict(object_store=kwds['object_store']) else: self.interface_class = HttpLwrInterface - transport_type = kwds.get('transport_type', None) + transport_type = kwds.get('transport', None) transport = get_transport(transport_type) self.interface_args = dict(transport=transport) self.extra_client_kwds = {} https://bitbucket.org/galaxy/galaxy-central/commits/8003903b1f06/ Changeset: 8003903b1f06 User: jmchilton Date: 2014-06-02 05:34:39 Summary: Small tweaks to LWR configuring to reduce code duplication. Refactor amqp_exchange to move logic for SSL into shared spot and rework SSL parsing to reuse similar logic used to parse destination parameters. Affected #: 2 files diff -r 927ebdb27fd197edb8578bf70d7e5ec049f7abfd -r 8003903b1f062ada839e8898b9c04d62488f100b lib/galaxy/jobs/runners/lwr_client/amqp_exchange.py --- a/lib/galaxy/jobs/runners/lwr_client/amqp_exchange.py +++ b/lib/galaxy/jobs/runners/lwr_client/amqp_exchange.py @@ -33,7 +33,13 @@ name _default_. """ - def __init__(self, url, manager_name, connect_ssl=None, timeout=DEFAULT_TIMEOUT): + def __init__( + self, + url, + manager_name, + connect_ssl=None, + timeout=DEFAULT_TIMEOUT, + ): """ """ if not kombu: @@ -54,9 +60,8 @@ while check: heartbeat_thread = None try: - # TODO: configurable heartbeat - with self.connection(self.__url, ssl=self.__connect_ssl, heartbeat=DEFAULT_HEARTBEAT, **connection_kwargs) as connection: - with kombu.Consumer(connection, queues=[queue], callbacks=[callback], accept=['json']) as consumer: + with self.connection(self.__url, heartbeat=DEFAULT_HEARTBEAT, **connection_kwargs) as connection: + with kombu.Consumer(connection, queues=[queue], callbacks=[callback], accept=['json']): heartbeat_thread = self.__start_heartbeat(queue_name, connection) while check and connection.connected: try: @@ -76,7 +81,7 @@ log.debug('AMQP heartbeat thread exiting') def publish(self, name, payload): - with self.connection(self.__url, ssl=self.__connect_ssl) as connection: + with self.connection(self.__url) as connection: with pools.producers[connection].acquire() as producer: key = self.__queue_name(name) producer.publish( @@ -88,6 +93,8 @@ ) def connection(self, connection_string, **kwargs): + if "ssl" not in kwargs: + kwargs["ssl"] = self.__connect_ssl return kombu.Connection(connection_string, **kwargs) def __queue(self, name): diff -r 927ebdb27fd197edb8578bf70d7e5ec049f7abfd -r 8003903b1f062ada839e8898b9c04d62488f100b lib/galaxy/jobs/runners/lwr_client/util.py --- a/lib/galaxy/jobs/runners/lwr_client/util.py +++ b/lib/galaxy/jobs/runners/lwr_client/util.py @@ -64,21 +64,15 @@ def parse_amqp_connect_ssl_params(params): - ssl = None - rval = None - ssl_options = [] - if params: - ssl_options = filter(lambda x: x.startswith('amqp_connect_ssl_'), params.keys()) - if ssl_options: - ssl = __import__('ssl') - rval = {} - for option in ssl_options: - value = params.get(option) - option = option.replace('amqp_connect_ssl_', '', 1) - if option == 'cert_reqs': - value = getattr(ssl, value.upper()) - rval[option] = value - return rval + ssl_params = filter_destination_params(params, "amqp_connect_ssl_") + if not ssl_params: + return + + ssl = __import__('ssl') + if 'cert_reqs' in ssl_params: + value = ssl_params['cert_reqs'] + ssl_params['cert_reqs'] = getattr(ssl, value.upper()) + return ssl_params def filter_destination_params(destination_params, prefix): https://bitbucket.org/galaxy/galaxy-central/commits/cf31fae45bf3/ Changeset: cf31fae45bf3 User: jmchilton Date: 2014-06-02 05:34:39 Summary: Allow configuration of more AMQP parameters. In particular ampq_publish_retry can now be set to True to retry disconnected connections on publication. Hopefully, this will fix producer-related disconnect problems seen in production. Affected #: 4 files diff -r 8003903b1f062ada839e8898b9c04d62488f100b -r cf31fae45bf3a2d3244744d86edabad0c1465729 lib/galaxy/jobs/runners/lwr.py --- a/lib/galaxy/jobs/runners/lwr.py +++ b/lib/galaxy/jobs/runners/lwr.py @@ -77,6 +77,22 @@ map=specs.to_str_or_none, default=None, ), + # http://kombu.readthedocs.org/en/latest/reference/kombu.html#kombu.Producer.p... + amqp_publish_retry=dict( + map=specs.to_bool, + default=False, + ), + amqp_publish_priority=dict( + map=int, + valid=lambda x: 0 <= x and x <= 9, + default=0, + ), + # http://kombu.readthedocs.org/en/latest/reference/kombu.html#kombu.Exchange.d... + amqp_publish_delivery_mode=dict( + map=str, + valid=specs.is_in("transient", "persistent"), + default="persistent", + ) ) diff -r 8003903b1f062ada839e8898b9c04d62488f100b -r cf31fae45bf3a2d3244744d86edabad0c1465729 lib/galaxy/jobs/runners/lwr_client/amqp_exchange.py --- a/lib/galaxy/jobs/runners/lwr_client/amqp_exchange.py +++ b/lib/galaxy/jobs/runners/lwr_client/amqp_exchange.py @@ -39,6 +39,7 @@ manager_name, connect_ssl=None, timeout=DEFAULT_TIMEOUT, + publish_kwds={}, ): """ """ @@ -49,6 +50,7 @@ self.__connect_ssl = connect_ssl self.__exchange = kombu.Exchange(DEFAULT_EXCHANGE_NAME, DEFAULT_EXCHANGE_TYPE) self.__timeout = timeout + self.__publish_kwds = publish_kwds @property def url(self): @@ -90,6 +92,7 @@ exchange=self.__exchange, declare=[self.__exchange], routing_key=key, + **self.__publish_kwds ) def connection(self, connection_string, **kwargs): diff -r 8003903b1f062ada839e8898b9c04d62488f100b -r cf31fae45bf3a2d3244744d86edabad0c1465729 lib/galaxy/jobs/runners/lwr_client/manager.py --- a/lib/galaxy/jobs/runners/lwr_client/manager.py +++ b/lib/galaxy/jobs/runners/lwr_client/manager.py @@ -14,6 +14,7 @@ from .transport import get_transport from .util import TransferEventManager from .util import parse_amqp_connect_ssl_params +from .util import parse_amqp_publish_kwds from .destination import url_to_destination_params from .amqp_exchange import LwrExchange @@ -81,6 +82,7 @@ exchange_kwds = dict( manager_name=self.manager_name, connect_ssl=self.connect_ssl, + publish_kwds=parse_amqp_publish_kwds(kwds) ) timeout = kwds.get('amqp_consumer_timeout', None) if timeout is not None: diff -r 8003903b1f062ada839e8898b9c04d62488f100b -r cf31fae45bf3a2d3244744d86edabad0c1465729 lib/galaxy/jobs/runners/lwr_client/util.py --- a/lib/galaxy/jobs/runners/lwr_client/util.py +++ b/lib/galaxy/jobs/runners/lwr_client/util.py @@ -75,6 +75,10 @@ return ssl_params +def parse_amqp_publish_kwds(params): + return filter_destination_params(params, "amqp_publish_") + + def filter_destination_params(destination_params, prefix): destination_params = destination_params or {} return dict([(key[len(prefix):], destination_params[key]) Repository URL: https://bitbucket.org/galaxy/galaxy-central/ -- This is a commit notification from bitbucket.org. You are receiving this because you have the service enabled, addressing the recipient of this email.