1 new commit in galaxy-central: https://bitbucket.org/galaxy/galaxy-central/commits/22ec0ac1ebcd/ Changeset: 22ec0ac1ebcd User: natefoo Date: 2014-05-14 23:34:15 Summary: Add a config option for controlling the AMQP consumer timeout, catch socket.error and attempt to recover from it. Affected #: 4 files diff -r a24a928c3b8c4e49bad89fb95496c6b1f78c23bc -r 22ec0ac1ebcd1cc5ae5fda4ec70723043e8d5c5f job_conf.xml.sample_advanced --- a/job_conf.xml.sample_advanced +++ b/job_conf.xml.sample_advanced @@ -51,6 +51,12 @@ <!-- <param id="amqp_connect_ssl_keyfile">/path/to/key.pem</param> --><!-- <param id="amqp_connect_ssl_certfile">/path/to/cert.pem</param> --><!-- <param id="amqp_connect_ssl_cert_reqs">cert_required</param> --> + <!-- By default, the AMQP consumer uses a nonblocking connection with + a 0.2 second timeout. In testing, this works fine for + unencrypted AMQP connections, but with SSL it will cause the + client to reconnect to the server after each timeout. Set to a + higher value (in seconds) (or `None` to use blocking connections). --> + <!-- <param id="amqp_consumer_timeout">None</param> --></plugin><plugin id="cli" type="runner" load="galaxy.jobs.runners.cli:ShellJobRunner" /><plugin id="condor" type="runner" load="galaxy.jobs.runners.condor:CondorJobRunner" /> diff -r a24a928c3b8c4e49bad89fb95496c6b1f78c23bc -r 22ec0ac1ebcd1cc5ae5fda4ec70723043e8d5c5f lib/galaxy/jobs/runners/lwr.py --- a/lib/galaxy/jobs/runners/lwr.py +++ b/lib/galaxy/jobs/runners/lwr.py @@ -47,10 +47,16 @@ self._init_monitor_thread() self._init_worker_threads() amqp_connect_ssl_args = {} + amqp_consumer_timeout = False for kwd in kwds.keys(): if kwd.startswith('amqp_connect_ssl_'): amqp_connect_ssl_args[kwd] = kwds[kwd] client_manager_kwargs = {'transport_type': transport, 'cache': string_as_bool_or_none(cache), "url": url, 'amqp_connect_ssl_args': amqp_connect_ssl_args or None} + if 'amqp_consumer_timeout' in kwds: + if kwds['amqp_consumer_timeout'] == 'None': + client_manager_kwargs['amqp_consumer_timeout'] = None + else: + client_manager_kwargs['amqp_consumer_timeout'] = float(kwds['amqp_consumer_timeout']) self.galaxy_url = galaxy_url self.client_manager = build_client_manager(**client_manager_kwargs) diff -r a24a928c3b8c4e49bad89fb95496c6b1f78c23bc -r 22ec0ac1ebcd1cc5ae5fda4ec70723043e8d5c5f lib/galaxy/jobs/runners/lwr_client/amqp_exchange.py --- a/lib/galaxy/jobs/runners/lwr_client/amqp_exchange.py +++ b/lib/galaxy/jobs/runners/lwr_client/amqp_exchange.py @@ -6,6 +6,7 @@ import socket import logging +from time import sleep log = logging.getLogger(__name__) @@ -47,13 +48,18 @@ def consume(self, queue_name, callback, check=True, connection_kwargs={}): queue = self.__queue(queue_name) - with self.connection(self.__url, ssl=self.__connect_ssl, **connection_kwargs) as connection: - with kombu.Consumer(connection, queues=[queue], callbacks=[callback], accept=['json']): - while check: - try: - connection.drain_events(timeout=self.__timeout) - except socket.timeout: - pass + while check: + try: + with self.connection(self.__url, ssl=self.__connect_ssl, **connection_kwargs) as connection: + with kombu.Consumer(connection, queues=[queue], callbacks=[callback], accept=['json']): + while check and connection.connected: + try: + connection.drain_events(timeout=self.__timeout) + except socket.timeout: + pass + except socket.error, exc: + log.warning('Got socket.error, will retry: %s', exc) + sleep(1) def publish(self, name, payload): with self.connection(self.__url, ssl=self.__connect_ssl) as connection: diff -r a24a928c3b8c4e49bad89fb95496c6b1f78c23bc -r 22ec0ac1ebcd1cc5ae5fda4ec70723043e8d5c5f lib/galaxy/jobs/runners/lwr_client/manager.py --- a/lib/galaxy/jobs/runners/lwr_client/manager.py +++ b/lib/galaxy/jobs/runners/lwr_client/manager.py @@ -78,7 +78,11 @@ self.url = kwds.get('url') self.manager_name = kwds.get("manager", "_default_") self.connect_ssl = parse_amqp_connect_ssl_params(kwds.get('amqp_connect_ssl_args', None)) - self.exchange = LwrExchange(self.url, self.manager_name, self.connect_ssl) + timeout = kwds.get('amqp_consumer_timeout', False) + if timeout is False: + self.exchange = LwrExchange(self.url, self.manager_name, self.connect_ssl) + else: + self.exchange = LwrExchange(self.url, self.manager_name, self.connect_ssl, timeout=timeout) self.status_cache = {} self.callback_lock = threading.Lock() self.callback_thread = None Repository URL: https://bitbucket.org/galaxy/galaxy-central/ -- This is a commit notification from bitbucket.org. You are receiving this because you have the service enabled, addressing the recipient of this email.