details: http://www.bx.psu.edu/hg/galaxy/rev/45067d8114eb changeset: 1621:45067d8114eb user: Nate Coraor <nate@bx.psu.edu> date: Tue Nov 11 14:48:25 2008 -0500 description: Add a thread pool for the PBS runner to use for queue/finish methods. 3 file(s) affected in this change: lib/galaxy/config.py lib/galaxy/jobs/runners/pbs.py universe_wsgi.ini.sample diffs (156 lines): diff -r bc74a32f875e -r 45067d8114eb lib/galaxy/config.py --- a/lib/galaxy/config.py Tue Nov 11 10:19:21 2008 -0500 +++ b/lib/galaxy/config.py Tue Nov 11 14:48:25 2008 -0500 @@ -40,7 +40,8 @@ self.remote_user_maildomain = kwargs.get( "remote_user_maildomain", None ) self.template_path = resolve_path( kwargs.get( "template_path", "templates" ), self.root ) self.template_cache = resolve_path( kwargs.get( "template_cache_path", "database/compiled_templates" ), self.root ) - self.job_queue_workers = int( kwargs.get( "job_queue_workers", "10" ) ) + self.local_job_queue_workers = int( kwargs.get( "local_job_queue_workers", "5" ) ) + self.cluster_job_queue_workers = int( kwargs.get( "cluster_job_queue_workers", "5" ) ) self.job_scheduler_policy = kwargs.get("job_scheduler_policy", "FIFO") self.job_queue_cleanup_interval = int( kwargs.get("job_queue_cleanup_interval", "5") ) self.job_working_directory = resolve_path( kwargs.get( "job_working_directory", "database/job_working_directory" ), self.root ) diff -r bc74a32f875e -r 45067d8114eb lib/galaxy/jobs/runners/pbs.py --- a/lib/galaxy/jobs/runners/pbs.py Tue Nov 11 10:19:21 2008 -0500 +++ b/lib/galaxy/jobs/runners/pbs.py Tue Nov 11 14:48:25 2008 -0500 @@ -78,13 +78,20 @@ # be modified by the monitor thread, which will move items from 'queue' # to 'watched' and then manage the watched jobs. self.watched = [] - self.queue = Queue() + self.monitor_queue = Queue() # set the default server during startup self.default_pbs_server = None self.determine_pbs_server( 'pbs:///' ) self.monitor_thread = threading.Thread( target=self.monitor ) self.monitor_thread.start() - log.debug( "ready" ) + self.work_queue = Queue() + self.work_threads = [] + nworkers = app.config.cluster_job_queue_workers + for i in range( nworkers ): + worker = threading.Thread( target=self.run_next ) + worker.start() + self.work_threads.append( worker ) + log.debug( "%d workers ready" % nworkers ) def determine_pbs_server( self, url, rewrite = False ): """Determine what PBS server we are connecting to""" @@ -112,6 +119,22 @@ queue = None return queue + def run_next( self ): + """ + Run the next item in the queue (a job waiting to run or finish ) + """ + while 1: + ( op, obj ) = self.work_queue.get() + if op is self.STOP_SIGNAL: + return + try: + if op == 'queue': + self.queue_job( obj ) + elif op == 'finish': + self.finish_job( obj ) + except: + log.exception( "Uncaught exception %sing job" % op ) + def queue_job( self, job_wrapper ): """Create PBS script for a job and submit it to the PBS queue""" @@ -136,9 +159,6 @@ job_wrapper.cleanup() return - # Change to queued state immediately - job_wrapper.change_state( 'queued' ) - ( pbs_server_name, runner_url ) = self.determine_pbs_server( runner_url, rewrite = True ) pbs_queue_name = self.determine_pbs_queue( runner_url ) c = pbs.pbs_connect( pbs_server_name ) @@ -228,7 +248,7 @@ pbs_job_state.runner_url = runner_url # Add to our 'queue' of jobs to monitor - self.queue.put( pbs_job_state ) + self.monitor_queue.put( pbs_job_state ) def monitor( self ): """ @@ -239,7 +259,7 @@ # Take any new watched jobs and put them on the monitor list try: while 1: - pbs_job_state = self.queue.get_nowait() + pbs_job_state = self.monitor_queue.get_nowait() if pbs_job_state is self.STOP_SIGNAL: # TODO: This is where any cleanup would occur return @@ -290,7 +310,7 @@ new_watched.append( pbs_job_state ) else: log.debug("(%s/%s) job has left queue" % (galaxy_job_id, job_id) ) - self.finish_job( pbs_job_state ) + self.work_queue.put( ( 'finish', pbs_job_state ) ) # Replace the watch list with the updated version self.watched = new_watched @@ -371,12 +391,17 @@ def put( self, job_wrapper ): """Add a job to the queue (by job identifier)""" - self.queue_job( job_wrapper ) + #self.queue_job( job_wrapper ) + # Change to queued state before handing to worker thread so the runner won't pick it up again + job_wrapper.change_state( 'queued' ) + self.work_queue.put( ( 'queue', job_wrapper ) ) def shutdown( self ): """Attempts to gracefully shut down the monitor thread""" log.info( "sending stop signal to worker threads" ) - self.queue.put( self.STOP_SIGNAL ) + self.monitor_queue.put( self.STOP_SIGNAL ) + for i in range( len( self.work_threads ) ): + self.work_queue.put( ( self.STOP_SIGNAL, None ) ) log.info( "pbs job runner stopped" ) def get_stage_in_out( self, fnames, symlink=False ): @@ -419,9 +444,9 @@ log.debug( "(%s/%s) is still in running state, adding to the PBS queue" % ( job.id, job.job_runner_external_id ) ) pbs_job_state.old_state = 'R' pbs_job_state.running = True - self.queue.put( pbs_job_state ) + self.monitor_queue.put( pbs_job_state ) elif job.state == model.Job.states.QUEUED: log.debug( "(%s/%s) is still in PBS queued state, adding to the PBS queue" % ( job.id, job.job_runner_external_id ) ) pbs_job_state.old_state = 'Q' pbs_job_state.running = False - self.queue.put( pbs_job_state ) + self.monitor_queue.put( pbs_job_state ) diff -r bc74a32f875e -r 45067d8114eb universe_wsgi.ini.sample --- a/universe_wsgi.ini.sample Tue Nov 11 10:19:21 2008 -0500 +++ b/universe_wsgi.ini.sample Tue Nov 11 14:48:25 2008 -0500 @@ -26,7 +26,7 @@ ## track_jobs_in_database = true # Number of concurrent jobs to run (local runner) -job_queue_workers = 5 +local_job_queue_workers = 5 # Job scheduling policy to be used. # module/package name and classname should be in "module:classname" format. @@ -155,6 +155,12 @@ # runner. #default_cluster_job_runner = pbs:/// +# cluster_job_queue_workers: The cluster runners have their own thread pools +# used to prepare and finish jobs (so that these operations do not block normal +# queue operation). The value here is the number of worker threads available +# to each runner. +#cluster_job_queue_workers = 5 + # The PBS options are described in detail in the Galaxy Configuration section of # the ClusteringGalaxy Wiki, and are only necessary when using file staging. #pbs_application_server =