1 new commit in galaxy-central: https://bitbucket.org/galaxy/galaxy-central/commits/75b13d642f94/ Changeset: 75b13d642f94 User: jmchilton Date: 2014-02-18 04:26:49 Summary: Allow LWR runner to publish/consume messages via message queue. To use LWR via message queue, ensure each such destination defines "default_file_action" as "remote_copy" and the LWR base staging directory pointed to by the "jobs_directory" parameter. Finally add a "url" argument to LWR job runner plugin in job_conf.xml specifing the amqp URL to target (by default this will target the default ("_default_") LWR job manager - this can be overridden by specifing a "manager" attribute the plugin element in job_conf.xml). Some of the implementation details here are bit hacky now to localize changes to LWR, once Dannon has merged his work - a new job runner base class or option should be implemented to account for this truely asynchronous job running implementation. (Comments inline about this.) This updates the LWR client through LWR changeset 59ad1ea03448. Affected #: 3 files diff -r 7d649885e3259f3359e8c3816315c60e958c1518 -r 75b13d642f94b9ff6ec7c3c3713ad1177b243bd3 lib/galaxy/jobs/runners/lwr.py --- a/lib/galaxy/jobs/runners/lwr.py +++ b/lib/galaxy/jobs/runners/lwr.py @@ -35,12 +35,13 @@ """ runner_name = "LWRRunner" - def __init__( self, app, nworkers, transport=None, cache=None ): + def __init__( self, app, nworkers, transport=None, cache=None, url=None ): """Start the job runner """ super( LwrJobRunner, self ).__init__( app, nworkers ) + self.async_status_updates = dict() self._init_monitor_thread() self._init_worker_threads() - client_manager_kwargs = {'transport_type': transport, 'cache': string_as_bool_or_none(cache)} + client_manager_kwargs = {'transport_type': transport, 'cache': string_as_bool_or_none(cache), "url": url} self.client_manager = build_client_manager(**client_manager_kwargs) def url_to_destination( self, url ): @@ -50,6 +51,18 @@ def check_watched_item(self, job_state): try: client = self.get_client_from_state(job_state) + + if not hasattr(client, 'get_status'): + # Message queue implementation. + + # TODO: Very hacky now, refactor after Dannon merges in his + # message queue work, runners need the ability to disable + # check_watched_item like this and instead a callback needs to + # be issued post job recovery allowing a message queue + # consumer to be setup. + self.client_manager.ensure_has_job_completes_callback(self.__async_complete) + return job_state + status = client.get_status() except Exception: # An orphaned job was put into the queue at app startup, so remote server went down @@ -64,6 +77,29 @@ job_state.job_wrapper.change_state( model.Job.states.RUNNING ) return job_state + def __async_complete( self, final_status ): + job_id = final_status[ "job_id" ] + job_state = self.__find_watched_job( job_id ) + if not job_state: + # Probably finished too quickly, sleep and try again. + # Kind of a hack, why does monitor queue need to no wait + # get and sleep instead of doing a busy wait that would + # respond immediately. + sleep( 2 ) + job_state = self.__find_watched_job( job_id ) + if not job_state: + log.warn( "Failed to find job corresponding to final status %s in %s" % ( final_status, self.watched ) ) + else: + self.mark_as_finished( job_state ) + + def __find_watched_job( self, job_id ): + found_job = None + for async_job_state in self.watched: + if str( async_job_state.job_id ) == job_id: + found_job = async_job_state + break + return found_job + def queue_job(self, job_wrapper): job_destination = job_wrapper.job_destination @@ -184,15 +220,15 @@ return self.get_client( job_destination_params, job_id ) def get_client( self, job_destination_params, job_id ): - return self.client_manager.get_client( job_destination_params, job_id ) + return self.client_manager.get_client( job_destination_params, str( job_id ) ) def finish_job( self, job_state ): stderr = stdout = '' job_wrapper = job_state.job_wrapper try: client = self.get_client_from_state(job_state) + run_results = client.final_status() - run_results = client.raw_check_complete() stdout = run_results.get('stdout', '') stderr = run_results.get('stderr', '') exit_code = run_results.get('returncode', None) @@ -291,6 +327,10 @@ job_state.running = state == model.Job.states.RUNNING self.monitor_queue.put( job_state ) + def shutdown( self ): + super( LwrJobRunner, self ).shutdown() + self.client_manager.shutdown() + def __client_outputs( self, client, job_wrapper ): remote_work_dir_copy = LwrJobRunner.__remote_work_dir_copy( client ) if not remote_work_dir_copy: diff -r 7d649885e3259f3359e8c3816315c60e958c1518 -r 75b13d642f94b9ff6ec7c3c3713ad1177b243bd3 lib/galaxy/jobs/runners/lwr_client/client.py --- a/lib/galaxy/jobs/runners/lwr_client/client.py +++ b/lib/galaxy/jobs/runners/lwr_client/client.py @@ -8,6 +8,9 @@ from .setup_handler import build as build_setup_handler from .job_directory import RemoteJobDirectory +import logging +log = logging.getLogger(__name__) + CACHE_WAIT_SECONDS = 3 MAX_RETRY_COUNT = 5 RETRY_SLEEP_TIME = 0.1 @@ -272,6 +275,11 @@ launch_params["setup_params"] = dumps(setup_params) return self._raw_execute("launch", launch_params) + def final_status(self): + """ Return a dictionary summarizing final state of job. + """ + return self.raw_check_complete() + def kill(self): """ Cancel remote job, either removing from the queue or killing it. @@ -314,11 +322,11 @@ check_complete_response = self.raw_check_complete() # Older LWR instances won't set status so use 'complete', at some # point drop backward compatibility. - complete = self.check_complete(check_complete_response) - old_status = "complete" if complete else "running" - status = check_complete_response.get("status", old_status) + status = check_complete_response.get("status", None) # Bug in certains older LWR instances returned literal "status". - if status not in ["complete", "running", "queued"]: + if status in ["status", None]: + complete = self.check_complete(check_complete_response) + old_status = "complete" if complete else "running" status = old_status return status @@ -344,12 +352,12 @@ class MessageJobClient(BaseJobClient): - def __init__(self, destination_params, job_id, exchange): + def __init__(self, destination_params, job_id, client_manager): super(MessageJobClient, self).__init__(destination_params, job_id) if not self.job_directory: error_message = "Message-queue based LWR client requires destination define a remote job_directory to stage files into." raise Exception(error_message) - self.exchange = exchange + self.client_manager = client_manager def launch(self, command_line, requirements=[], remote_staging=[], job_config=None): """ @@ -368,7 +376,19 @@ # before queueing. setup_params = _setup_params_from_job_config(job_config) launch_params["setup_params"] = setup_params - return self.exchange.publish("setup", launch_params) + return self.client_manager.exchange.publish("setup", launch_params) + + def clean(self): + del self.client_manager.final_status_cache[self.job_id] + + def final_status(self): + final_status = self.client_manager.final_status_cache.get(self.job_id, None) + if final_status is None: + raise Exception("final_status() called before a final status was properly cached with cilent manager.") + return final_status + + def kill(self): + log.warn("Kill not yet implemented with message queue driven LWR jobs.") class InputCachingJobClient(JobClient): diff -r 7d649885e3259f3359e8c3816315c60e958c1518 -r 75b13d642f94b9ff6ec7c3c3713ad1177b243bd3 lib/galaxy/jobs/runners/lwr_client/manager.py --- a/lib/galaxy/jobs/runners/lwr_client/manager.py +++ b/lib/galaxy/jobs/runners/lwr_client/manager.py @@ -1,9 +1,9 @@ from abc import ABCMeta, abstractmethod +import threading try: from Queue import Queue except ImportError: from queue import Queue -from threading import Thread from os import getenv try: from urllib import urlencode @@ -87,25 +87,35 @@ self.url = kwds.get('url') self.manager_name = kwds.get("manager", "_default_") self.exchange = LwrExchange(self.url, self.manager_name) - self.thread = None + self.final_status_cache = {} + self.callback_lock = threading.Lock() + self.callback_thread = None self.active = True - def listen_for_job_completes(self, callback): - # TODO: Call this from LWR runner. - def callback_wrapper(body, message): - callback(body) - message.ack() + def ensure_has_job_completes_callback(self, callback): + with self.callback_lock: + if self.callback_thread is not None: + return - def run(): - self.exchange.consume("complete", callback_wrapper, check=self) + def callback_wrapper(body, message): + try: + if "job_id" in body: + self.final_status_cache[body["job_id"]] = body + callback(body) + except Exception: + log.exception("Failure processing job status update message.") + message.ack() - t = Thread( - name="lwr_client_%s_complete_callback" % self.manager_name, - target=run - ) - t.daemon = False # Lets not interrupt processing of this. - t.start() - self.thread = t + def run(): + self.exchange.consume("complete", callback_wrapper, check=self) + + thread = threading.Thread( + name="lwr_client_%s_complete_callback" % self.manager_name, + target=run + ) + thread.daemon = False # Lets not interrupt processing of this. + thread.start() + self.callback_thread = thread def shutdown(self): self.active = False @@ -115,7 +125,7 @@ def get_client(self, destination_params, job_id): destination_params = _parse_destination_params(destination_params) - return MessageJobClient(destination_params, job_id, self.exchange) + return MessageJobClient(destination_params, job_id, self) class ObjectStoreClientManager(object): @@ -259,7 +269,7 @@ self.num_transfer_threads = num_transfer_threads self.transfer_queue = Queue() for i in range(num_transfer_threads): - t = Thread(target=self._transfer_worker) + t = threading.Thread(target=self._transfer_worker) t.daemon = True t.start() Repository URL: https://bitbucket.org/galaxy/galaxy-central/ -- This is a commit notification from bitbucket.org. You are receiving this because you have the service enabled, addressing the recipient of this email.