details: http://www.bx.psu.edu/hg/galaxy/rev/1dc78a13070c changeset: 1645:1dc78a13070c user: James Taylor <james@jamestaylor.org> date: Tue Dec 02 13:49:29 2008 -0500 description: imported patch job-queue-sessions 1 file(s) affected in this change: lib/galaxy/jobs/__init__.py diffs (229 lines): diff -r ab46f22b4624 -r 1dc78a13070c lib/galaxy/jobs/__init__.py --- a/lib/galaxy/jobs/__init__.py Tue Dec 02 11:41:25 2008 -0500 +++ b/lib/galaxy/jobs/__init__.py Tue Dec 02 13:49:29 2008 -0500 @@ -63,9 +63,8 @@ def __init__( self, app, dispatcher ): """Start the job manager""" self.app = app - # Should we use IPC to communicate (needed if forking) + # Should we read jobs form the database, or use an in memory queue self.track_jobs_in_database = app.config.get_bool( 'track_jobs_in_database', False ) - # Check if any special scheduling policy should be used. If not, default is FIFO. sched_policy = app.config.get('job_scheduler_policy', 'FIFO') # Parse the scheduler policy string. The policy class implements a special queue. @@ -104,13 +103,13 @@ self.sleeper = Sleeper() self.running = True self.dispatcher = dispatcher - self.monitor_thread = threading.Thread( target=self.monitor ) + self.monitor_thread = threading.Thread( target=self.__monitor ) self.monitor_thread.start() log.info( "job manager started" ) if app.config.get_bool( 'enable_job_recovery', True ): - self.check_jobs_at_startup() + self.__check_jobs_at_startup() - def check_jobs_at_startup( self ): + def __check_jobs_at_startup( self ): """ Checks all jobs that are in the 'running' or 'queued' state in the database and requeues or cleans up as necessary. Only run as the @@ -125,10 +124,10 @@ for job in model.Job.filter( (model.Job.c.state == model.Job.states.RUNNING) | (model.Job.c.state == model.Job.states.QUEUED) ).all(): if job.job_runner_name is not None: # why are we passing the queue to the wrapper? - job_wrapper = JobWrapper( job.id, self.app.toolbox.tools_by_id[ job.tool_id ], self ) + job_wrapper = JobWrapper( job, self.app.toolbox.tools_by_id[ job.tool_id ], self ) self.dispatcher.recover( job, job_wrapper ) - def monitor( self ): + def __monitor( self ): """ Continually iterate the waiting jobs, checking is each is ready to run and dispatching if so. @@ -137,13 +136,13 @@ time.sleep( 10 ) while self.running: try: - self.monitor_step() + self.__monitor_step() except: log.exception( "Exception in monitor_step" ) # Sleep self.sleeper.sleep( 1 ) - def monitor_step( self ): + def __monitor_step( self ): """ Called repeatedly by `monitor` to process waiting jobs. Gets any new jobs (either from the database or from its own queue), then iterates @@ -153,12 +152,13 @@ it is marked as having errors and removed from the queue. Otherwise, the job is dispatched. """ + # Get an orm session + session = mapping.Session() # Pull all new jobs from the queue at once new_jobs = [] if self.track_jobs_in_database: - model = self.app.model - for j in model.Job.filter( model.Job.c.state==model.Job.states.NEW ).all(): - job = JobWrapper( j.id, self.app.toolbox.tools_by_id[ j.tool_id ], self ) + for j in session.query( model.Job ).filter( model.Job.c.state == model.Job.states.NEW ).all(): + job = JobWrapper( j, self.app.toolbox.tools_by_id[ j.tool_id ], self ) new_jobs.append( job ) else: try: @@ -169,18 +169,26 @@ # Unpack the message job_id, tool_id = message # Create a job wrapper from it - job = JobWrapper( job_id, self.app.toolbox.tools_by_id[ tool_id ], self ) + job_entity = session.query( model.Job ).get( job_id ) + job = JobWrapper( job_entity, self.app.toolbox.tools_by_id[ tool_id ], self ) # Append to watch queue new_jobs.append( job ) except Empty: - pass + pass # Iterate over new and waiting jobs and look for any that are # ready to run new_waiting = [] for job in ( new_jobs + self.waiting ): try: + # Clear the session for each job so we get fresh states for + # job and all datasets + session.clear() + # Get the real job entity corresponding to the wrapper (if we + # are tracking in the database this is probably cached in + # the session from the origianl query above) + job_entity = session.query( model.Job ).get( job.job_id ) # Check the job's dependencies, requeue if they're not done - job_state = job.check_if_ready_to_run() + job_state = self.__check_if_ready_to_run( job, job_entity ) if job_state == JOB_WAIT: if not self.track_jobs_in_database: new_waiting.append( job ) @@ -195,7 +203,7 @@ if self.use_policy : self.squeue.put( job ) log.debug( "job %d put in policy queue" % job.job_id ) - else : # or dispatch the job directly + else: # or dispatch the job directly self.dispatcher.put( job ) log.debug( "job %d dispatched" % job.job_id) elif job_state == JOB_DELETED: @@ -225,6 +233,43 @@ except Exception, e: # if something else breaks while dispatching job.fail( "failure running job %d: %s" % ( sjob.job_id, str( e ) ) ) log.exception( "failure running job %d" % sjob.job_id ) + # Done with the session + mapping.Session.remove() + + def __check_if_ready_to_run( self, job_wrapper, job ): + """ + Check if a job is ready to run by verifying that each of its input + datasets is ready (specifically in the OK state). If any input dataset + has an error, fail the job and return JOB_INPUT_ERROR. If any input + dataset is deleted, fail the job and return JOB_INPUT_DELETED. If all + input datasets are in OK state, return JOB_READY indicating that the + job can be dispatched. Otherwise, return JOB_WAIT indicating that input + datasets are still being prepared. + """ + if self.app.memory_usage: + # Keep track of memory usage + m0 = self.app.memory_usage.memory() + for dataset_assoc in job.input_datasets: + idata = dataset_assoc.dataset + if not idata: + continue + # don't run jobs for which the input dataset was deleted + if idata.deleted: + job_wrapper.fail( "input data %d (file: %s) was deleted before the job started" % ( idata.hid, idata.file_name ) ) + return JOB_INPUT_DELETED + # an error in the input data causes us to bail immediately + elif idata.state == idata.states.ERROR: + job_wrapper.fail( "input data %d (file: %s) is in an error state" % ( idata.hid, idata.file_name ) ) + return JOB_INPUT_ERROR + elif idata.state != idata.states.OK: + # need to requeue + return JOB_WAIT + if self.app.memory_usage: + m1 = self.app.memory_usage.memory( m0, pretty=True ) + log.info("End of check_if_ready_to_run for job id %d, memory used increased by %s" % ( job.id, m1 ) ) + if job.state == model.Job.states.DELETED: + return JOB_DELETED + return JOB_READY def put( self, job_id, tool ): """Add a job to the queue (by job identifier)""" @@ -251,8 +296,10 @@ Wraps a 'model.Job' with convience methods for running processes and state management. """ - def __init__(self, job_id, tool, queue ): - self.job_id = job_id + def __init__(self, job, tool, queue ): + self.job_id = job.id + # This is immutable, we cache it for the scheduling policy to use if needed + self.session_id = job.session_id self.tool = tool self.queue = queue self.app = queue.app @@ -401,45 +448,6 @@ job.job_runner_name = runner_url job.job_runner_external_id = external_id job.flush() - - def check_if_ready_to_run( self ): - """ - Check if a job is ready to run by verifying that each of its input - datasets is ready (specifically in the OK state). If any input dataset - has an error, fail the job and return JOB_INPUT_ERROR. If any input - dataset is deleted, fail the job and return JOB_INPUT_DELETED. If all - input datasets are in OK state, return JOB_READY indicating that the - job can be dispatched. Otherwise, return JOB_WAIT indicating that input - datasets are still being prepared. - """ - if self.app.memory_usage: - # Keep track of memory usage - m0 = self.app.memory_usage.memory() - job = model.Job.get( self.job_id ) - job.refresh() - for dataset_assoc in job.input_datasets: - idata = dataset_assoc.dataset - if not idata: - continue - idata.refresh() - idata.dataset.refresh() #we need to refresh the base Dataset, since that is where 'state' is stored - # don't run jobs for which the input dataset was deleted - if idata.deleted: - self.fail( "input data %d (file: %s) was deleted before the job started" % ( idata.hid, idata.file_name ) ) - return JOB_INPUT_DELETED - # an error in the input data causes us to bail immediately - elif idata.state == idata.states.ERROR: - self.fail( "input data %d (file: %s) is in an error state" % ( idata.hid, idata.file_name ) ) - return JOB_INPUT_ERROR - elif idata.state != idata.states.OK: - # need to requeue - return JOB_WAIT - if self.app.memory_usage: - m1 = self.app.memory_usage.memory( m0, pretty=True ) - log.info("End of check_if_ready_to_run for job id %d, memory used increased by %s" % ( job.id, m1 ) ) - if job.state == model.Job.states.DELETED: - return JOB_DELETED - return JOB_READY def finish( self, stdout, stderr ): """ @@ -538,8 +546,7 @@ return self.command_line def get_session_id( self ): - job = model.Job.get( self.job_id ) - return job.session_id + return self.session_id def get_input_fnames( self ): job = model.Job.get( self.job_id )