commit/galaxy-central: jmchilton: Update LWR client through LWR changeset db9ea0e6d6a7.
1 new commit in galaxy-central: https://bitbucket.org/galaxy/galaxy-central/commits/80d7d65df2f4/ Changeset: 80d7d65df2f4 User: jmchilton Date: 2014-06-02 02:34:04 Summary: Update LWR client through LWR changeset db9ea0e6d6a7. Affected #: 1 file diff -r 2ba39a38f8fe89654f07b0993db88fd94256c375 -r 80d7d65df2f4986f70dce9cd9ee5caa615e16847 lib/galaxy/jobs/runners/lwr_client/amqp_exchange.py --- a/lib/galaxy/jobs/runners/lwr_client/amqp_exchange.py +++ b/lib/galaxy/jobs/runners/lwr_client/amqp_exchange.py @@ -6,6 +6,7 @@ import socket import logging +import threading from time import sleep log = logging.getLogger(__name__) @@ -16,6 +17,7 @@ DEFAULT_EXCHANGE_TYPE = "direct" # Set timeout to periodically give up looking and check if polling should end. DEFAULT_TIMEOUT = 0.2 +DEFAULT_HEARTBEAT = 580 class LwrExchange(object): @@ -50,17 +52,28 @@ queue = self.__queue(queue_name) log.debug("Consuming queue '%s'", queue) while check: + heartbeat_thread = None try: - with self.connection(self.__url, ssl=self.__connect_ssl, **connection_kwargs) as connection: - with kombu.Consumer(connection, queues=[queue], callbacks=[callback], accept=['json']): + # TODO: configurable heartbeat + with self.connection(self.__url, ssl=self.__connect_ssl, heartbeat=DEFAULT_HEARTBEAT, **connection_kwargs) as connection: + with kombu.Consumer(connection, queues=[queue], callbacks=[callback], accept=['json']) as consumer: + heartbeat_thread = self.__start_heartbeat(queue_name, connection) while check and connection.connected: try: connection.drain_events(timeout=self.__timeout) except socket.timeout: pass - except socket.error, exc: - log.warning('Got socket.error, will retry: %s', exc) - sleep(1) + except (IOError, socket.error), exc: + # In testing, errno is None + log.warning('Got %s, will retry: %s', exc.__class__.__name__, exc) + heartbeat_thread.join() + + def heartbeat(self, connection): + log.debug('AMQP heartbeat thread alive') + while connection.connected: + connection.heartbeat_check() + sleep(1) + log.debug('AMQP heartbeat thread exiting') def publish(self, name, payload): with self.connection(self.__url, ssl=self.__connect_ssl) as connection: @@ -93,3 +106,9 @@ else: key_prefix = "lwr_%s_" % self.__manager_name return key_prefix + + def __start_heartbeat(self, queue_name, connection): + thread_name = "consume-heartbeat-%s" % (self.__queue_name(queue_name)) + thread = threading.Thread(name=thread_name, target=self.heartbeat, args=(connection,)) + thread.start() + return thread Repository URL: https://bitbucket.org/galaxy/galaxy-central/ -- This is a commit notification from bitbucket.org. You are receiving this because you have the service enabled, addressing the recipient of this email.
participants (1)
-
commits-noreply@bitbucket.org