commit/galaxy-central: jmchilton: LWR client updates - mostly MQ improvements.
1 new commit in galaxy-central: https://bitbucket.org/galaxy/galaxy-central/commits/35cc93303b8e/ Changeset: 35cc93303b8e User: jmchilton Date: 2014-06-19 19:36:15 Summary: LWR client updates - mostly MQ improvements. Updated LWR runner with spec parsing for new connection "ensure" parameters. Affected #: 4 files diff -r 21babc3112aa28761425367d3a5abfbbd8ce80e9 -r 35cc93303b8ec387cae5a84a646440ff0607921e lib/galaxy/jobs/runners/lwr.py --- a/lib/galaxy/jobs/runners/lwr.py +++ b/lib/galaxy/jobs/runners/lwr.py @@ -92,7 +92,23 @@ map=str, valid=specs.is_in("transient", "persistent"), default="persistent", - ) + ), + amqp_publish_retry_max_retries=dict( + map=int, + default=None, + ), + amqp_publish_retry_interval_start=dict( + map=int, + default=None, + ), + amqp_publish_retry_interval_step=dict( + map=int, + default=None, + ), + amqp_publish_retry_interval_max=dict( + map=int, + default=None, + ), ) diff -r 21babc3112aa28761425367d3a5abfbbd8ce80e9 -r 35cc93303b8ec387cae5a84a646440ff0607921e lib/galaxy/jobs/runners/lwr_client/amqp_exchange.py --- a/lib/galaxy/jobs/runners/lwr_client/amqp_exchange.py +++ b/lib/galaxy/jobs/runners/lwr_client/amqp_exchange.py @@ -19,6 +19,9 @@ DEFAULT_TIMEOUT = 0.2 DEFAULT_HEARTBEAT = 580 +DEFAULT_RECONNECT_CONSUMER_WAIT = 1 +DEFAULT_HEARTBEAT_WAIT = 1 + class LwrExchange(object): """ Utility for publishing and consuming structured LWR queues using kombu. @@ -51,6 +54,8 @@ self.__exchange = kombu.Exchange(DEFAULT_EXCHANGE_NAME, DEFAULT_EXCHANGE_TYPE) self.__timeout = timeout self.__publish_kwds = publish_kwds + if "retry_policy" in self.__publish_kwds: + self.__publish_kwds["retry_policy"]["errback"] = self.__publish_errback @property def url(self): @@ -75,12 +80,13 @@ log.warning('Got %s, will retry: %s', exc.__class__.__name__, exc) if heartbeat_thread: heartbeat_thread.join() + sleep(DEFAULT_RECONNECT_CONSUMER_WAIT) def heartbeat(self, connection): log.debug('AMQP heartbeat thread alive') while connection.connected: connection.heartbeat_check() - sleep(1) + sleep(DEFAULT_HEARTBEAT_WAIT) log.debug('AMQP heartbeat thread exiting') def publish(self, name, payload): @@ -96,6 +102,10 @@ **self.__publish_kwds ) + def __publish_errback(self, exc, interval): + log.error("Connection error while publishing: %r", exc, exc_info=1) + log.info("Retrying in %s seconds", interval) + def connection(self, connection_string, **kwargs): if "ssl" not in kwargs: kwargs["ssl"] = self.__connect_ssl diff -r 21babc3112aa28761425367d3a5abfbbd8ce80e9 -r 35cc93303b8ec387cae5a84a646440ff0607921e lib/galaxy/jobs/runners/lwr_client/amqp_exchange_factory.py --- a/lib/galaxy/jobs/runners/lwr_client/amqp_exchange_factory.py +++ b/lib/galaxy/jobs/runners/lwr_client/amqp_exchange_factory.py @@ -29,4 +29,13 @@ def parse_amqp_publish_kwds(params): - return filter_destination_params(params, "amqp_publish_") + all_publish_params = filter_destination_params(params, "amqp_publish_") + retry_policy_params = {} + for key in all_publish_params.keys(): + if key.startswith("retry_"): + value = all_publish_params[key] + retry_policy_params[key[len("retry_"):]] = value + del all_publish_params[key] + if retry_policy_params: + all_publish_params["retry_policy"] = retry_policy_params + return all_publish_params diff -r 21babc3112aa28761425367d3a5abfbbd8ce80e9 -r 35cc93303b8ec387cae5a84a646440ff0607921e lib/galaxy/jobs/runners/lwr_client/staging/up.py --- a/lib/galaxy/jobs/runners/lwr_client/staging/up.py +++ b/lib/galaxy/jobs/runners/lwr_client/staging/up.py @@ -183,7 +183,7 @@ working_directory_files = listdir(self.working_directory) if exists(self.working_directory) else [] for working_directory_file in working_directory_files: path = join(self.working_directory, working_directory_file) - self.transfer_tracker.handle_transfer(path, 'workdir') + self.transfer_tracker.handle_transfer(path, path_type.WORKDIR) def __initialize_version_file_rename(self): version_file = self.version_file @@ -217,7 +217,7 @@ def __upload_rewritten_config_files(self): for config_file, new_config_contents in self.job_inputs.config_files.items(): - self.transfer_tracker.handle_transfer(config_file, type='config', contents=new_config_contents) + self.transfer_tracker.handle_transfer(config_file, type=path_type.CONFIG, contents=new_config_contents) def get_command_line(self): """ Repository URL: https://bitbucket.org/galaxy/galaxy-central/ -- This is a commit notification from bitbucket.org. You are receiving this because you have the service enabled, addressing the recipient of this email.
participants (1)
-
commits-noreply@bitbucket.org