5 new commits in galaxy-central: https://bitbucket.org/galaxy/galaxy-central/commits/0d81156abc40/ Changeset: 0d81156abc40 User: jmchilton Date: 2014-05-16 07:57:31 Summary: Helper method for creating JobWrappers in JobHandler. Affected #: 1 file diff -r b94bb858d29b39f5d8127414bd703ee8e4905281 -r 0d81156abc40773643f5f8d72af521fe5d1b5d9d lib/galaxy/jobs/handler.py --- a/lib/galaxy/jobs/handler.py +++ b/lib/galaxy/jobs/handler.py @@ -84,6 +84,13 @@ self.monitor_thread.start() log.info( "job handler queue started" ) + def job_wrapper( self, job ): + return JobWrapper( job, self ) + + def job_wrapper_for_id( self, id ): + job = self.sa_session.query( model.Job ) + return self.job_wrapper( job ) + def __check_jobs_at_startup( self ): """ Checks all jobs that are in the 'new', 'queued' or 'running' state in @@ -110,7 +117,7 @@ for job in jobs_at_startup: if job.tool_id not in self.app.toolbox.tools_by_id: log.warning( "(%s) Tool '%s' removed from tool config, unable to recover job" % ( job.id, job.tool_id ) ) - JobWrapper( job, self ).fail( 'This tool was disabled before the job completed. Please contact your Galaxy administrator.' ) + self.job_wrapper( job ).fail( 'This tool was disabled before the job completed. Please contact your Galaxy administrator.' ) elif job.job_runner_name is not None and job.job_runner_external_id is None: # This could happen during certain revisions of Galaxy where a runner URL was persisted before the job was dispatched to a runner. log.debug( "(%s) Job runner assigned but no external ID recorded, adding to the job handler queue" % job.id ) @@ -122,7 +129,7 @@ elif job.job_runner_name is not None and job.job_runner_external_id is not None and job.destination_id is None: # This is the first start after upgrading from URLs to destinations, convert the URL to a destination and persist # TODO: test me extensively - job_wrapper = JobWrapper( job, self ) + job_wrapper = self.job_wrapper( job ) job_destination = self.dispatcher.url_to_destination(job.job_runner_name) if job_destination.id is None: job_destination.id = 'legacy_url' @@ -138,7 +145,7 @@ self.queue.put( ( job.id, job.tool_id ) ) else: # Already dispatched and running - job_wrapper = JobWrapper( job, self ) + job_wrapper = self.job_wrapper( job ) job_wrapper.job_runner_mapper.cached_job_destination = JobDestination(id=job.destination_id, runner=job.job_runner_name, params=job.destination_params) self.dispatcher.recover( job, job_wrapper ) if self.sa_session.dirty: @@ -296,21 +303,21 @@ continue # don't run jobs for which the input dataset was deleted if idata.deleted: - self.job_wrappers.pop(job.id, JobWrapper( job, self )).fail( "input data %s (file: %s) was deleted before the job started" % ( idata.hid, idata.file_name ) ) + self.job_wrappers.pop(job.id, self.job_wrapper( job )).fail( "input data %s (file: %s) was deleted before the job started" % ( idata.hid, idata.file_name ) ) return JOB_INPUT_DELETED # an error in the input data causes us to bail immediately elif idata.state == idata.states.ERROR: - self.job_wrappers.pop(job.id, JobWrapper( job, self )).fail( "input data %s is in error state" % ( idata.hid ) ) + self.job_wrappers.pop(job.id, self.job_wrapper( job )).fail( "input data %s is in error state" % ( idata.hid ) ) return JOB_INPUT_ERROR elif idata.state == idata.states.FAILED_METADATA: - self.job_wrappers.pop(job.id, JobWrapper( job, self )).fail( "input data %s failed to properly set metadata" % ( idata.hid ) ) + self.job_wrappers.pop(job.id, self.job_wrapper( job )).fail( "input data %s failed to properly set metadata" % ( idata.hid ) ) return JOB_INPUT_ERROR elif idata.state != idata.states.OK and not ( idata.state == idata.states.SETTING_METADATA and job.tool_id is not None and job.tool_id == self.app.datatypes_registry.set_external_metadata_tool.id ): # need to requeue return JOB_WAIT # Create the job wrapper so that the destination can be set if job.id not in self.job_wrappers: - self.job_wrappers[job.id] = JobWrapper(job, self) + self.job_wrappers[job.id] = self.job_wrapper( job ) # Cause the job_destination to be set and cached by the mapper try: self.job_wrappers[job.id].job_destination https://bitbucket.org/galaxy/galaxy-central/commits/6b565d3fc2a7/ Changeset: 6b565d3fc2a7 User: jmchilton Date: 2014-05-16 07:57:31 Summary: PEP-8 fixes for JobManager. Affected #: 1 file diff -r 0d81156abc40773643f5f8d72af521fe5d1b5d9d -r 6b565d3fc2a7f4f0db9f7a911e762e139c7fbb67 lib/galaxy/jobs/manager.py --- a/lib/galaxy/jobs/manager.py +++ b/lib/galaxy/jobs/manager.py @@ -3,20 +3,12 @@ """ import logging -import os -import random -import threading -import time -from Queue import Empty, Queue - -from galaxy import model -from galaxy.util.sleeper import Sleeper -from galaxy.jobs import handler, JobWrapper, NoopQueue -from galaxy.util.json import from_json_string +from galaxy.jobs import handler, NoopQueue log = logging.getLogger( __name__ ) + class JobManager( object ): """ Highest level interface to job management. @@ -35,14 +27,18 @@ self.job_handler = NoopHandler() self.job_queue = self.job_stop_queue = NoopQueue() self.job_handler.start() + def shutdown( self ): self.job_handler.shutdown() + class NoopHandler( object ): def __init__( self, *args, **kwargs ): self.job_queue = NoopQueue() self.job_stop_queue = NoopQueue() + def start( self ): pass + def shutdown( self, *args ): pass https://bitbucket.org/galaxy/galaxy-central/commits/7c668716a412/ Changeset: 7c668716a412 User: jmchilton Date: 2014-05-16 07:57:31 Summary: Handle LWR failed state in asynchronous MQ updates. Affected #: 1 file diff -r 6b565d3fc2a7f4f0db9f7a911e762e139c7fbb67 -r 7c668716a4125c583c0000d11a4aae9347dc8606 lib/galaxy/jobs/runners/lwr.py --- a/lib/galaxy/jobs/runners/lwr.py +++ b/lib/galaxy/jobs/runners/lwr.py @@ -98,6 +98,9 @@ if lwr_status == "complete": self.mark_as_finished(job_state) return None + if lwr_status == "failed": + self.fail_job(job_state) + return None if lwr_status == "running" and not job_state.running: job_state.running = True job_state.job_wrapper.change_state( model.Job.states.RUNNING ) https://bitbucket.org/galaxy/galaxy-central/commits/21a568dac530/ Changeset: 21a568dac530 User: jmchilton Date: 2014-05-16 07:57:31 Summary: Correct 'abstract' method definition in AsynchronousJobRunner. Affected #: 1 file diff -r 7c668716a4125c583c0000d11a4aae9347dc8606 -r 21a568dac530aefb54e49238da2fac0934cb9308 lib/galaxy/jobs/runners/__init__.py --- a/lib/galaxy/jobs/runners/__init__.py +++ b/lib/galaxy/jobs/runners/__init__.py @@ -400,7 +400,7 @@ self.watched = new_watched # Subclasses should implement this unless they override check_watched_items all together. - def check_watched_item(self): + def check_watched_item(self, job_state): raise NotImplementedError() def finish_job( self, job_state ): https://bitbucket.org/galaxy/galaxy-central/commits/7e95105c4660/ Changeset: 7e95105c4660 User: jmchilton Date: 2014-05-16 07:57:31 Summary: Simplified, truely asynchronous LWR MQ status updates. In addition to eliminating some egregious hacks, this reworked functionality should allow any LWR handler process to consume update messages for any job (so LWR jobs don't have to be bound to a particular handler). Affected #: 2 files diff -r 21a568dac530aefb54e49238da2fac0934cb9308 -r 7e95105c466051432e5c47ff977826a19758b867 lib/galaxy/jobs/handler.py --- a/lib/galaxy/jobs/handler.py +++ b/lib/galaxy/jobs/handler.py @@ -87,9 +87,9 @@ def job_wrapper( self, job ): return JobWrapper( job, self ) - def job_wrapper_for_id( self, id ): - job = self.sa_session.query( model.Job ) - return self.job_wrapper( job ) + def job_pair_for_id( self, id ): + job = self.sa_session.query( model.Job ).get( id ) + return job, self.job_wrapper( job ) def __check_jobs_at_startup( self ): """ diff -r 21a568dac530aefb54e49238da2fac0934cb9308 -r 7e95105c466051432e5c47ff977826a19758b867 lib/galaxy/jobs/runners/lwr.py --- a/lib/galaxy/jobs/runners/lwr.py +++ b/lib/galaxy/jobs/runners/lwr.py @@ -43,8 +43,6 @@ def __init__( self, app, nworkers, transport=None, cache=None, url=None, galaxy_url=DEFAULT_GALAXY_URL, **kwds ): """Start the job runner """ super( LwrJobRunner, self ).__init__( app, nworkers ) - self.async_status_updates = dict() - self._init_monitor_thread() self._init_worker_threads() amqp_connect_ssl_args = {} amqp_consumer_timeout = False @@ -65,6 +63,10 @@ client_manager_kwargs['amqp_consumer_timeout'] = float(kwds['amqp_consumer_timeout']) self.galaxy_url = galaxy_url self.client_manager = build_client_manager(**client_manager_kwargs) + if url: + self.client_manager.ensure_has_status_update_callback(self.__async_update) + else: + self._init_monitor_thread() def url_to_destination( self, url ): """Convert a legacy URL to a job destination""" @@ -73,18 +75,6 @@ def check_watched_item(self, job_state): try: client = self.get_client_from_state(job_state) - - if hasattr(self.client_manager, 'ensure_has_status_update_callback'): - # Message queue implementation. - - # TODO: Very hacky now, refactor after Dannon merges in his - # message queue work, runners need the ability to disable - # check_watched_item like this and instead a callback needs to - # be issued post job recovery allowing a message queue - # consumer to be setup. - self.client_manager.ensure_has_status_update_callback(self.__async_update) - return job_state - status = client.get_status() except Exception: # An orphaned job was put into the queue at app startup, so remote server went down @@ -108,26 +98,9 @@ def __async_update( self, full_status ): job_id = full_status[ "job_id" ] - job_state = self.__find_watched_job( job_id ) - if not job_state: - # Probably finished too quickly, sleep and try again. - # Kind of a hack, why does monitor queue need to no wait - # get and sleep instead of doing a busy wait that would - # respond immediately. - sleep( 2 ) - job_state = self.__find_watched_job( job_id ) - if not job_state: - log.warn( "Failed to find job corresponding to status %s in %s" % ( full_status, self.watched ) ) - else: - self.__update_job_state_for_lwr_status(job_state, full_status["status"]) - - def __find_watched_job( self, job_id ): - found_job = None - for async_job_state in self.watched: - if str( async_job_state.job_id ) == job_id: - found_job = async_job_state - break - return found_job + job, job_wrapper = self.app.job_manager.job_handler.job_queue.job_pair_for_id( job_id ) + job_state = self.__job_state( job, job_wrapper ) + self.__update_job_state_for_lwr_status(job_state, full_status["status"]) def queue_job(self, job_wrapper): job_destination = job_wrapper.job_destination @@ -358,12 +331,8 @@ def recover( self, job, job_wrapper ): """Recovers jobs stuck in the queued/running state when Galaxy started""" - job_state = AsynchronousJobState() - job_state.job_id = str( job.get_job_runner_external_id() ) - job_state.runner_url = job_wrapper.get_job_runner_url() - job_state.job_destination = job_wrapper.job_destination + job_state = self.__job_state( job, job_wrapper ) job_wrapper.command_line = job.get_command_line() - job_state.job_wrapper = job_wrapper state = job.get_state() if state in [model.Job.states.RUNNING, model.Job.states.QUEUED]: log.debug( "(LWR/%s) is still in running state, adding to the LWR queue" % ( job.get_id()) ) @@ -375,6 +344,14 @@ super( LwrJobRunner, self ).shutdown() self.client_manager.shutdown() + def __job_state( self, job, job_wrapper ): + job_state = AsynchronousJobState() + job_state.job_id = str( job.get_job_runner_external_id() ) + job_state.runner_url = job_wrapper.get_job_runner_url() + job_state.job_destination = job_wrapper.job_destination + job_state.job_wrapper = job_wrapper + return job_state + def __client_outputs( self, client, job_wrapper ): work_dir_outputs = self.get_work_dir_outputs( job_wrapper ) output_files = self.get_output_files( job_wrapper ) Repository URL: https://bitbucket.org/galaxy/galaxy-central/ -- This is a commit notification from bitbucket.org. You are receiving this because you have the service enabled, addressing the recipient of this email.