details: http://www.bx.psu.edu/hg/galaxy/rev/39268be26944 changeset: 3767:39268be26944 user: Nate Coraor <nate@bx.psu.edu> date: Mon May 10 20:59:41 2010 -0400 description: Add worker threads to SGE runner diffstat: lib/galaxy/jobs/runners/sge.py | 66 +++++++++++++++++++++++++++++++++-------- 1 files changed, 52 insertions(+), 14 deletions(-) diffs (148 lines): diff -r f7cf88978f1f -r 39268be26944 lib/galaxy/jobs/runners/sge.py --- a/lib/galaxy/jobs/runners/sge.py Mon May 10 14:30:18 2010 -0400 +++ b/lib/galaxy/jobs/runners/sge.py Mon May 10 20:59:41 2010 -0400 @@ -75,13 +75,20 @@ # be modified by the monitor thread, which will move items from 'queue' # to 'watched' and then manage the watched jobs. self.watched = [] - self.queue = Queue() + self.monitor_queue = Queue() self.default_cell = self.determine_sge_cell( self.app.config.default_cluster_job_runner ) self.ds = DRMAA.Session() self.ds.init( self.default_cell ) self.monitor_thread = threading.Thread( target=self.monitor ) self.monitor_thread.start() - log.debug( "ready" ) + self.work_queue = Queue() + self.work_threads = [] + nworkers = app.config.cluster_job_queue_workers + for i in range( nworkers ): + worker = threading.Thread( target=self.run_next ) + worker.start() + self.work_threads.append( worker ) + log.debug( "%d workers ready" % nworkers ) def determine_sge_cell( self, url ): """Determine what SGE cell we are using""" @@ -113,6 +120,24 @@ except: return None + def run_next( self ): + """ + Run the next item in the queue (a job waiting to run or finish ) + """ + while 1: + ( op, obj ) = self.work_queue.get() + if op is self.STOP_SIGNAL: + return + try: + if op == 'queue': + self.queue_job( obj ) + elif op == 'finish': + self.finish_job( obj ) + elif op == 'fail': + self.fail_job( obj ) + except: + log.exception( "Uncaught exception %sing job" % op ) + def queue_job( self, job_wrapper ): """Create SGE script for a job and submit it to the SGE queue""" @@ -213,7 +238,7 @@ self.ds.deleteJobTemplate( jt ) # Add to our 'queue' of jobs to monitor - self.queue.put( sge_job_state ) + self.monitor_queue.put( sge_job_state ) def monitor( self ): """ @@ -224,7 +249,7 @@ # Take any new watched jobs and put them on the monitor list try: while 1: - sge_job_state = self.queue.get_nowait() + sge_job_state = self.monitor_queue.get_nowait() if sge_job_state is self.STOP_SIGNAL: # TODO: This is where any cleanup would occur self.ds.exit() @@ -252,13 +277,14 @@ except DRMAA.InvalidJobError: # we should only get here if an orphaned job was put into the queue at app startup log.debug("(%s/%s) job left SGE queue" % ( galaxy_job_id, job_id ) ) - self.finish_job( sge_job_state ) + self.work_queue.put( ( 'finish', sge_job_state ) ) continue except Exception, e: # so we don't kill the monitor thread log.exception("(%s/%s) Unable to check job status" % ( galaxy_job_id, job_id ) ) log.warning("(%s/%s) job will now be errored" % ( galaxy_job_id, job_id ) ) - sge_job_state.job_wrapper.fail( "Cluster could not complete job" ) + sge_job_state.fail_message = "Cluster could not complete job" + self.work_queue.put( ( 'fail', sge_job_state ) ) continue if state != old_state: log.debug("(%s/%s) state change: %s" % ( galaxy_job_id, job_id, DRMAA_state[state] ) ) @@ -266,11 +292,11 @@ sge_job_state.running = True sge_job_state.job_wrapper.change_state( model.Job.states.RUNNING ) if state == DRMAA.Session.DONE: - self.finish_job( sge_job_state ) + self.work_queue.put( ( 'finish', sge_job_state ) ) continue if state == DRMAA.Session.FAILED: - sge_job_state.job_wrapper.fail( "Cluster could not complete job" ) - sge_job_state.job_wrapper.cleanup() + sge_job_state.fail_message = "Cluster could not complete job" + self.work_queue.put( ( 'fail', sge_job_state ) ) continue sge_job_state.old_state = state new_watched.append( sge_job_state ) @@ -304,6 +330,14 @@ # clean up the sge files self.cleanup( ( ofile, efile, job_file ) ) + def fail_job( self, sge_job_state ): + """ + Seperated out so we can use the worker threads for it. + """ + self.stop_job( self.sa_session.query( self.app.model.Job ).get( sge_job_state.job_wrapper.job_id ) ) + sge_job_state.job_wrapper.fail( sge_job_state.fail_message ) + self.cleanup( ( sge_job_state.ofile, sge_job_state.efile, sge_job_state.job_file ) ) + def cleanup( self, files ): if not asbool( self.app.config.get( 'debug', False ) ): for file in files: @@ -312,12 +346,16 @@ def put( self, job_wrapper ): """Add a job to the queue (by job identifier)""" - self.queue_job( job_wrapper ) - + # Change to queued state before handing to worker thread so the runner won't pick it up again + job_wrapper.change_state( model.Job.states.QUEUED ) + self.work_queue.put( ( 'queue', job_wrapper ) ) + def shutdown( self ): """Attempts to gracefully shut down the monitor thread""" log.info( "sending stop signal to worker threads" ) - self.queue.put( self.STOP_SIGNAL ) + self.monitor_queue.put( self.STOP_SIGNAL ) + for i in range( len( self.work_threads ) ): + self.work_queue.put( ( self.STOP_SIGNAL, None ) ) log.info( "sge job runner stopped" ) def stop_job( self, job ): @@ -342,9 +380,9 @@ log.debug( "(%s/%s) is still in running state, adding to the SGE queue" % ( job.id, job.job_runner_external_id ) ) sge_job_state.old_state = DRMAA.Session.RUNNING sge_job_state.running = True - self.queue.put( sge_job_state ) + self.monitor_queue.put( sge_job_state ) elif job.state == model.Job.states.QUEUED: log.debug( "(%s/%s) is still in SGE queued state, adding to the SGE queue" % ( job.id, job.job_runner_external_id ) ) sge_job_state.old_state = DRMAA.Session.QUEUED_ACTIVE sge_job_state.running = False - self.queue.put( sge_job_state ) + self.monitor_queue.put( sge_job_state )