# HG changeset patch -- Bitbucket.org # Project galaxy-dist # URL http://bitbucket.org/galaxy/galaxy-dist/overview # User Nate Coraor <nate@bx.psu.edu> # Date 1279818642 14400 # Node ID db1ae0bc8995cb30deef73ce9d14ef175c512f90 # Parent 79d38503db4d860f9c9d62560c156fb0a2122b1b Attempt to reduce costly queries in the job runner, especially when tracking jobs in the database. --- a/lib/galaxy/jobs/__init__.py +++ b/lib/galaxy/jobs/__init__.py @@ -18,7 +18,7 @@ from Queue import Queue, Empty log = logging.getLogger( __name__ ) # States for running a job. These are NOT the same as data states -JOB_WAIT, JOB_ERROR, JOB_INPUT_ERROR, JOB_INPUT_DELETED, JOB_OK, JOB_READY, JOB_DELETED, JOB_ADMIN_DELETED = 'wait', 'error', 'input_error', 'input_deleted', 'ok', 'ready', 'deleted', 'admin_deleted' +JOB_WAIT, JOB_INPUT_ERROR, JOB_INPUT_DELETED, JOB_READY, JOB_DELETED, JOB_ADMIN_DELETED = 'wait', 'input_error', 'input_deleted', 'ready', 'deleted', 'admin_deleted' # This file, if created in the job's working directory, will be used for # setting advanced metadata properties on the job and its associated outputs. @@ -106,14 +106,14 @@ class JobQueue( object ): for job in self.sa_session.query( model.Job ).filter( model.Job.state == model.Job.states.NEW ): if job.tool_id not in self.app.toolbox.tools_by_id: log.warning( "Tool '%s' removed from tool config, unable to recover job: %s" % ( job.tool_id, job.id ) ) - JobWrapper( job, None, self ).fail( 'This tool was disabled before the job completed. Please contact your Galaxy administrator, or' ) + JobWrapper( job, self ).fail( 'This tool was disabled before the job completed. Please contact your Galaxy administrator, or' ) else: log.debug( "no runner: %s is still in new state, adding to the jobs queue" %job.id ) self.queue.put( ( job.id, job.tool_id ) ) for job in self.sa_session.query( model.Job ).filter( ( model.Job.state == model.Job.states.RUNNING ) | ( model.Job.state == model.Job.states.QUEUED ) ): if job.tool_id not in self.app.toolbox.tools_by_id: log.warning( "Tool '%s' removed from tool config, unable to recover job: %s" % ( job.tool_id, job.id ) ) - JobWrapper( job, None, self ).fail( 'This tool was disabled before the job completed. Please contact your Galaxy administrator, or' ) + JobWrapper( job, self ).fail( 'This tool was disabled before the job completed. Please contact your Galaxy administrator, or' ) elif job.job_runner_name is None: log.debug( "no runner: %s is still in queued state, adding to the jobs queue" %job.id ) if self.track_jobs_in_database: @@ -121,7 +121,7 @@ class JobQueue( object ): else: self.queue.put( ( job.id, job.tool_id ) ) else: - job_wrapper = JobWrapper( job, self.app.toolbox.tools_by_id[ job.tool_id ], self ) + job_wrapper = JobWrapper( job, self ) self.dispatcher.recover( job, job_wrapper ) if self.sa_session.dirty: self.sa_session.flush() @@ -154,11 +154,12 @@ class JobQueue( object ): # Pull all new jobs from the queue at once new_jobs = [] if self.track_jobs_in_database: - for j in self.sa_session.query( model.Job ) \ + # Clear the session so we get fresh states for job and all datasets + self.sa_session.expunge_all() + # Fetch all new jobs + new_jobs = self.sa_session.query( model.Job ) \ .options( lazyload( "external_output_metadata" ), lazyload( "parameters" ) ) \ - .filter( model.Job.state == model.Job.states.NEW ): - job = JobWrapper( j, self.app.toolbox.tools_by_id[ j.tool_id ], self ) - new_jobs.append( job ) + .filter( model.Job.state == model.Job.states.NEW ).all() else: try: while 1: @@ -168,8 +169,7 @@ class JobQueue( object ): # Unpack the message job_id, tool_id = message # Create a job wrapper from it - job_entity = self.sa_session.query( model.Job ).get( job_id ) - job = JobWrapper( job_entity, self.app.toolbox.tools_by_id[ tool_id ], self ) + job = self.sa_session.query( model.Job ).get( job_id ) # Append to watch queue new_jobs.append( job ) except Empty: @@ -179,52 +179,43 @@ class JobQueue( object ): new_waiting = [] for job in ( new_jobs + self.waiting ): try: - # Clear the session for each job so we get fresh states for - # job and all datasets - self.sa_session.expunge_all() - # Get the real job entity corresponding to the wrapper (if we - # are tracking in the database this is probably cached in - # the session from the origianl query above) - job_entity = self.sa_session.query( model.Job ).get( job.job_id ) + # Since we don't expunge when not tracking jobs in the + # database, refresh the job here so it's not stale. + if not self.track_jobs_in_database: + self.sa_session.refresh( job ) # Check the job's dependencies, requeue if they're not done - job_state = self.__check_if_ready_to_run( job, job_entity ) - if job_state == JOB_WAIT: + job_state = self.__check_if_ready_to_run( job ) + if job_state == JOB_WAIT: if not self.track_jobs_in_database: new_waiting.append( job ) - elif job_state == JOB_ERROR: - log.info( "job %d ended with an error" % job.job_id ) elif job_state == JOB_INPUT_ERROR: - log.info( "job %d unable to run: one or more inputs in error state" % job.job_id ) + log.info( "job %d unable to run: one or more inputs in error state" % job.id ) elif job_state == JOB_INPUT_DELETED: - log.info( "job %d unable to run: one or more inputs deleted" % job.job_id ) + log.info( "job %d unable to run: one or more inputs deleted" % job.id ) elif job_state == JOB_READY: if self.job_lock: - log.info("Job dispatch attempted for %s, but prevented by administrative lock." % job.job_id) + log.info( "Job dispatch attempted for %s, but prevented by administrative lock." % job.id ) if not self.track_jobs_in_database: new_waiting.append( job ) else: - self.dispatcher.put( job ) - log.debug( "job %d dispatched" % job.job_id) + self.dispatcher.put( JobWrapper( job, self ) ) + log.info( "job %d dispatched" % job.id ) elif job_state == JOB_DELETED: - msg = "job %d deleted by user while still queued" % job.job_id - job.info = msg - log.debug( msg ) + log.info( "job %d deleted by user while still queued" % job.id ) elif job_state == JOB_ADMIN_DELETED: - job.fail( job_entity.info ) - log.info( "job %d deleted by admin while still queued" % job.job_id ) + job.info( "job %d deleted by admin while still queued" % job.id ) else: - msg = "unknown job state '%s' for job %d" % ( job_state, job.job_id ) - job.info = msg - log.error( msg ) + log.error( "unknown job state '%s' for job %d" % ( job_state, job.id ) ) + if not self.track_jobs_in_database: + new_waiting.append( job ) except Exception, e: - job.info = "failure running job %d: %s" % ( job.job_id, str( e ) ) - log.exception( "failure running job %d" % job.job_id ) + log.exception( "failure running job %d" % job.id ) # Update the waiting list self.waiting = new_waiting # Done with the session self.sa_session.remove() - def __check_if_ready_to_run( self, job_wrapper, job ): + def __check_if_ready_to_run( self, job ): """ Check if a job is ready to run by verifying that each of its input datasets is ready (specifically in the OK state). If any input dataset @@ -244,11 +235,11 @@ class JobQueue( object ): continue # don't run jobs for which the input dataset was deleted if idata.deleted: - job_wrapper.fail( "input data %d (file: %s) was deleted before the job started" % ( idata.hid, idata.file_name ) ) + JobWrapper( job, self ).fail( "input data %d (file: %s) was deleted before the job started" % ( idata.hid, idata.file_name ) ) return JOB_INPUT_DELETED # an error in the input data causes us to bail immediately elif idata.state == idata.states.ERROR: - job_wrapper.fail( "input data %d is in error state" % ( idata.hid ) ) + JobWrapper( job, self ).fail( "input data %d is in error state" % ( idata.hid ) ) return JOB_INPUT_ERROR elif idata.state != idata.states.OK and not ( idata.state == idata.states.SETTING_METADATA and job.tool_id is not None and job.tool_id == self.app.datatypes_registry.set_external_metadata_tool.id ): # need to requeue @@ -280,11 +271,11 @@ class JobWrapper( object ): Wraps a 'model.Job' with convience methods for running processes and state management. """ - def __init__(self, job, tool, queue ): + def __init__( self, job, queue ): self.job_id = job.id self.session_id = job.session_id self.user_id = job.user_id - self.tool = tool + self.tool = queue.app.toolbox.tools_by_id.get( job.tool_id, None ) self.queue = queue self.app = queue.app self.sa_session = self.app.model.context --- a/lib/galaxy/tools/actions/__init__.py +++ b/lib/galaxy/tools/actions/__init__.py @@ -5,7 +5,6 @@ from galaxy.tools.parameters.grouping im from galaxy.util.template import fill_template from galaxy.util.none_like import NoneDataset from galaxy.web import url_for -from galaxy.jobs import JOB_OK import galaxy.tools from types import * @@ -353,7 +352,7 @@ class DefaultToolAction( object ): assert GALAXY_URL is not None, "GALAXY_URL parameter missing in tool config." redirect_url += "&GALAXY_URL=%s" % GALAXY_URL # Job should not be queued, so set state to ok - job.state = JOB_OK + job.state = trans.app.model.Job.states.OK job.info = "Redirected to: %s" % redirect_url trans.sa_session.add( job ) trans.sa_session.flush()