commit/galaxy-central: 2 new changesets
2 new commits in galaxy-central: https://bitbucket.org/galaxy/galaxy-central/changeset/ce3500d71226/ changeset: ce3500d71226 user: JaimeFrey date: 2012-06-28 21:03:46 summary: Add Condor task runner module, which runs tasks in a local Condor pool. affected #: 1 file diff -r 063a1d690724b3485f41c620f664f11c7390ceef -r ce3500d712269f72f1c75a8b618c74d9f030fe82 lib/galaxy/jobs/runners/condor.py --- /dev/null +++ b/lib/galaxy/jobs/runners/condor.py @@ -0,0 +1,387 @@ +import os, sys, logging, threading, time, subprocess, re +from Queue import Queue, Empty + +from galaxy import model +from galaxy.jobs.runners import BaseJobRunner + +from paste.deploy.converters import asbool + +import pkg_resources + +if sys.version_info[:2] == ( 2, 4 ): + pkg_resources.require( "ctypes" ) + +log = logging.getLogger( __name__ ) + +__all__ = [ 'CondorJobRunner' ] + +drm_template = """#!/bin/sh +GALAXY_LIB="%s" +if [ "$GALAXY_LIB" != "None" ]; then + if [ -n "$PYTHONPATH" ]; then + PYTHONPATH="$GALAXY_LIB:$PYTHONPATH" + else + PYTHONPATH="$GALAXY_LIB" + fi + export PYTHONPATH +fi +cd %s +%s +""" + +class CondorJobState( object ): + def __init__( self ): + """ + Encapsulates state related to a job that is being run via the DRM and + that we need to monitor. + """ + self.job_wrapper = None + self.job_id = None + self.running = False + self.failed = False + self.job_file = None + self.ofile = None + self.efile = None + self.user_log = None + self.user_log_size = 0 + self.runner_url = None + +class CondorJobRunner( BaseJobRunner ): + """ + Job runner backed by a finite pool of worker threads. FIFO scheduling + """ + STOP_SIGNAL = object() + def __init__( self, app ): + """Initialize this job runner and start the monitor thread""" + # Check if drmaa was importable, fail if not + self.app = app + self.sa_session = app.model.context + # 'watched' and 'queue' are both used to keep track of jobs to watch. + # 'queue' is used to add new watched jobs, and can be called from + # any thread (usually by the 'queue_job' method). 'watched' must only + # be modified by the monitor thread, which will move items from 'queue' + # to 'watched' and then manage the watched jobs. + self.watched = [] + self.monitor_queue = Queue() + self.monitor_thread = threading.Thread( target=self.monitor ) + self.monitor_thread.start() + self.work_queue = Queue() + self.work_threads = [] + nworkers = app.config.cluster_job_queue_workers + for i in range( nworkers ): + worker = threading.Thread( target=self.run_next ) + worker.start() + self.work_threads.append( worker ) + log.debug( "%d workers ready" % nworkers ) + + def get_native_spec( self, url ): + """Get any native DRM arguments specified by the site configuration""" + try: + return url.split('/')[2] or None + except: + return None + + def run_next( self ): + """ + Run the next item in the queue (a job waiting to run or finish ) + """ + while 1: + ( op, obj ) = self.work_queue.get() + if op is self.STOP_SIGNAL: + return + try: + if op == 'queue': + self.queue_job( obj ) + elif op == 'finish': + self.finish_job( obj ) + elif op == 'fail': + self.fail_job( obj ) + except: + log.exception( "Uncaught exception %sing job" % op ) + if op == 'queue': + obj.fail( "Uncaught exception queueing job", exception=True ) + + def queue_job( self, job_wrapper ): + """Create job script and submit it to the DRM""" + + try: + job_wrapper.prepare() + command_line = self.build_command_line( job_wrapper, include_metadata=True ) + except: + job_wrapper.fail( "failure preparing job", exception=True ) + log.exception("failure running job %s" % job_wrapper.get_id_tag()) + return + + runner_url = job_wrapper.get_job_runner() + + # This is silly, why would we queue a job with no command line? + if not command_line: + job_wrapper.finish( '', '' ) + return + + if job_wrapper.get_state() == model.Job.states.DELETED: + log.debug( "Job %s deleted by user before it entered the queue" % job_wrapper.get_id_tag() ) + job_wrapper.cleanup() + return + + # Change to queued state immediately + job_wrapper.change_state( model.Job.states.QUEUED ) + + # define job attributes + ofile = "%s/%s.o" % (self.app.config.cluster_files_directory, job_wrapper.job_id) + efile = "%s/%s.e" % (self.app.config.cluster_files_directory, job_wrapper.job_id) + user_log = "%s/%s.condor.log" % (self.app.config.cluster_files_directory, job_wrapper.job_id) + executable = "%s/galaxy_%s.sh" % (self.app.config.cluster_files_directory, job_wrapper.job_id) + submit_desc = [ ] + submit_desc.append( 'universe = vanilla' ) + submit_desc.append( 'getenv = true' ) + submit_desc.append( 'executable = ' + executable ) + submit_desc.append( 'output = ' + ofile ) + submit_desc.append( 'error = ' + efile ) + submit_desc.append( 'log = ' + user_log ) + submit_desc.append( 'notification = NEVER' ) + submit_desc.append( 'queue' ) + submit_file = "%s/%s.condor.desc" % (self.app.config.cluster_files_directory, job_wrapper.job_id) + + script = drm_template % (job_wrapper.galaxy_lib_dir, os.path.abspath( job_wrapper.working_directory ), command_line) + try: + fh = file( executable, "w" ) + fh.write( script ) + fh.close() + os.chmod( executable, 0750 ) + except: + job_wrapper.fail( "failure preparing job script", exception=True ) + log.exception("failure running job %s" % job_wrapper.get_id_tag()) + return + + try: + fh = file( submit_file, 'w' ) + for line in submit_desc: + fh.write( line + '\n' ) + fh.close() + except: + job_wrapper.fail( "failure writing submit file", exception=True ) + log.exception("failure running job %s" % job_wrapper.get_id_tag()) + self.cleanup( ( executable, submit_file ) ) + return + + # job was deleted while we were preparing it + if job_wrapper.get_state() == model.Job.states.DELETED: + log.debug( "Job %s deleted by user before it entered the queue" % job_wrapper.get_id_tag() ) + self.cleanup( ( executable, submit_file ) ) + job_wrapper.cleanup() + return + + # wrapper.get_id_tag() instead of job_id for compatibility with TaskWrappers. + galaxy_id_tag = job_wrapper.get_id_tag() + + log.debug("(%s) submitting file %s" % ( galaxy_id_tag, executable ) ) + log.debug("(%s) command is: %s" % ( galaxy_id_tag, command_line ) ) + s_out = '' + job_id = None + try: + submit = subprocess.Popen( ( 'condor_submit', submit_file ), stdout=subprocess.PIPE, stderr=subprocess.STDOUT ) + s_out, s_err = submit.communicate() + if submit.returncode == 0: + match = re.search( 'submitted to cluster (\\d+).', s_out ) + if match is None: + s_out = 'Failed to find job id from condor_submit' + else: + job_id = match.group( 1 ) + except Exception, e: + # TODO Add extra except for OSError? + s_out = str(e) + + if job_id is None: + log.debug( "condor_submit failed for job %s: %s" % (job_wrapper.get_id_tag(), s_out) ) + self.cleanup( ( submit_file, executable ) ) + job_wrapper.fail( "condor_submit failed", exception=True ) + return + + log.info("(%s) queued as %s" % ( galaxy_id_tag, job_id ) ) + + self.cleanup( ( submit_file, ) ) + + # store runner information for tracking if Galaxy restarts + job_wrapper.set_runner( runner_url, job_id ) + + # Store DRM related state information for job + drm_job_state = CondorJobState() + drm_job_state.job_wrapper = job_wrapper + drm_job_state.job_id = job_id + drm_job_state.ofile = ofile + drm_job_state.efile = efile + drm_job_state.job_file = executable + drm_job_state.user_log = user_log + drm_job_state.running = False + drm_job_state.runner_url = runner_url + + # Add to our 'queue' of jobs to monitor + self.monitor_queue.put( drm_job_state ) + + def monitor( self ): + """ + Watches jobs currently in the PBS queue and deals with state changes + (queued to running) and job completion + """ + while 1: + # Take any new watched jobs and put them on the monitor list + try: + while 1: + drm_job_state = self.monitor_queue.get_nowait() + if drm_job_state is self.STOP_SIGNAL: + # TODO: This is where any cleanup would occur + #self.ds.exit() + # Do we need any cleanup here? + return + self.watched.append( drm_job_state ) + except Empty: + pass + # Iterate over the list of watched jobs and check state + self.check_watched_items() + # Sleep a bit before the next state check + time.sleep( 1 ) + + def check_watched_items( self ): + """ + Called by the monitor thread to look at each watched job and deal + with state changes. + """ + new_watched = [] + for drm_job_state in self.watched: + job_id = drm_job_state.job_id + log_job_id = job_id.zfill(3) + galaxy_job_id = drm_job_state.job_wrapper.job_id + job_running = False; + job_complete = False; + job_failed = False; + try: + if os.stat( drm_job_state.user_log ).st_size == drm_job_state.user_log_size: + new_watched.append( drm_job_state ) + continue + with open(drm_job_state.user_log, 'r') as fh: + for line in fh: + if '001 (' + log_job_id + '.' in line: + job_running = True + if '004 (' + log_job_id + '.' in line: + job_running = False + if '007 (' + log_job_id + '.' in line: + job_running = False + if '005 (' + log_job_id + '.' in line: + job_complete = True + if '009 (' + log_job_id + '.' in line: + job_failed = True + drm_job_state.user_log_size = fh.tell() + except Exception, e: + # so we don't kill the monitor thread + log.exception("(%s/%s) Unable to check job status" % ( galaxy_job_id, job_id ) ) + log.warning("(%s/%s) job will now be errored" % ( galaxy_job_id, job_id ) ) + drm_job_state.fail_message = "Cluster could not complete job" + self.work_queue.put( ( 'fail', drm_job_state ) ) + continue + if job_running and not drm_job_state.running: + log.debug("(%s/%s) job is now running" % ( galaxy_job_id, job_id ) ) + drm_job_state.job_wrapper.change_state( model.Job.states.RUNNING ) + if not job_running and drm_job_state.running: + log.debug("(%s/%s) job has stopped running" % ( galaxy_job_id, job_id ) ) + # Will switching from RUNNING to QUEUED confuse Galaxy? + #drm_job_state.job_wrapper.change_state( model.Job.states.QUEUED ) + if job_complete: + log.debug("(%s/%s) job has completed" % ( galaxy_job_id, job_id ) ) + self.work_queue.put( ( 'finish', drm_job_state ) ) + continue + if job_failed: + log.debug("(%s/%s) job failed" % ( galaxy_job_id, job_id ) ) + drm_job_state.failed = True + self.work_queue.put( ( 'finish', drm_job_state ) ) + continue + drm_job_state.runnning = job_running + new_watched.append( drm_job_state ) + # Replace the watch list with the updated version + self.watched = new_watched + + def finish_job( self, drm_job_state ): + """ + Get the output/error for a finished job, pass to `job_wrapper.finish` + and cleanup all the DRM temporary files. + """ + ofile = drm_job_state.ofile + efile = drm_job_state.efile + job_file = drm_job_state.job_file + # collect the output + try: + ofh = file(ofile, "r") + efh = file(efile, "r") + stdout = ofh.read( 32768 ) + stderr = efh.read( 32768 ) + except: + stdout = '' + stderr = 'Job output not returned from cluster' + log.debug(stderr) + + try: + drm_job_state.job_wrapper.finish( stdout, stderr ) + except: + log.exception("Job wrapper finish method failed") + + # clean up the drm files + self.cleanup( ( ofile, efile, job_file, drm_job_state.user_log ) ) + + def fail_job( self, drm_job_state ): + """ + Seperated out so we can use the worker threads for it. + """ + self.stop_job( self.sa_session.query( self.app.model.Job ).get( drm_job_state.job_wrapper.job_id ) ) + drm_job_state.job_wrapper.fail( drm_job_state.fail_message ) + self.cleanup( ( drm_job_state.ofile, drm_job_state.efile, drm_job_state.job_file, drm_job_state.user_log ) ) + + def cleanup( self, files ): + if not asbool( self.app.config.get( 'debug', False ) ): + for file in files: + if os.access( file, os.R_OK ): + os.unlink( file ) + + def put( self, job_wrapper ): + """Add a job to the queue (by job identifier)""" + # Change to queued state before handing to worker thread so the runner won't pick it up again + job_wrapper.change_state( model.Job.states.QUEUED ) + self.work_queue.put( ( 'queue', job_wrapper ) ) + + def shutdown( self ): + """Attempts to gracefully shut down the monitor thread""" + log.info( "sending stop signal to worker threads" ) + self.monitor_queue.put( self.STOP_SIGNAL ) + for i in range( len( self.work_threads ) ): + self.work_queue.put( ( self.STOP_SIGNAL, None ) ) + log.info( "condor job runner stopped" ) + + def stop_job( self, job ): + """Attempts to delete a job from the DRM queue""" + try: + subprocess.check_call( ( 'condor_rm', job.job_runner_external_id ) ) + log.debug( "(%s/%s) Removed from DRM queue at user's request" % ( job.id, job.job_runner_external_id ) ) + except subprocess.CalledProcessError: + log.debug( "(%s/%s) User killed running job, but condor_rm failed" % ( job.id, job.job_runner_external_id ) ) + except Exception, e: + log.debug( "(%s/%s) User killed running job, but error encountered removing from Condor queue: %s" % ( job.id, job.job_runner_external_id, e ) ) + + def recover( self, job, job_wrapper ): + """Recovers jobs stuck in the queued/running state when Galaxy started""" + # TODO Check if we need any changes here + drm_job_state = CondorJobState() + drm_job_state.ofile = "%s/database/pbs/%s.o" % (os.getcwd(), job.id) + drm_job_state.efile = "%s/database/pbs/%s.e" % (os.getcwd(), job.id) + drm_job_state.job_file = "%s/database/pbs/galaxy_%s.sh" % (os.getcwd(), job.id) + drm_job_state.job_id = str( job.job_runner_external_id ) + drm_job_state.runner_url = job_wrapper.get_job_runner() + job_wrapper.command_line = job.command_line + drm_job_state.job_wrapper = job_wrapper + drm_job_state.user_log = "%s/%s.condor.log" % (self.app.config.cluster_files_directory, job_wrapper.job_id) + if job.state == model.Job.states.RUNNING: + log.debug( "(%s/%s) is still in running state, adding to the DRM queue" % ( job.id, job.job_runner_external_id ) ) + drm_job_state.running = True + self.monitor_queue.put( drm_job_state ) + elif job.state == model.Job.states.QUEUED: + log.debug( "(%s/%s) is still in DRM queued state, adding to the DRM queue" % ( job.id, job.job_runner_external_id ) ) + drm_job_state.running = False + self.monitor_queue.put( drm_job_state ) https://bitbucket.org/galaxy/galaxy-central/changeset/594b69416e8c/ changeset: 594b69416e8c user: natefoo date: 2012-07-26 19:11:05 summary: Merged in JaimeFrey/galaxy-condor (pull request #51) affected #: 1 file diff -r 6d23049609a995e2ef7371aa63a7cb8bf6536403 -r 594b69416e8ce8bac101cedc662240d46ac90031 lib/galaxy/jobs/runners/condor.py --- /dev/null +++ b/lib/galaxy/jobs/runners/condor.py @@ -0,0 +1,387 @@ +import os, sys, logging, threading, time, subprocess, re +from Queue import Queue, Empty + +from galaxy import model +from galaxy.jobs.runners import BaseJobRunner + +from paste.deploy.converters import asbool + +import pkg_resources + +if sys.version_info[:2] == ( 2, 4 ): + pkg_resources.require( "ctypes" ) + +log = logging.getLogger( __name__ ) + +__all__ = [ 'CondorJobRunner' ] + +drm_template = """#!/bin/sh +GALAXY_LIB="%s" +if [ "$GALAXY_LIB" != "None" ]; then + if [ -n "$PYTHONPATH" ]; then + PYTHONPATH="$GALAXY_LIB:$PYTHONPATH" + else + PYTHONPATH="$GALAXY_LIB" + fi + export PYTHONPATH +fi +cd %s +%s +""" + +class CondorJobState( object ): + def __init__( self ): + """ + Encapsulates state related to a job that is being run via the DRM and + that we need to monitor. + """ + self.job_wrapper = None + self.job_id = None + self.running = False + self.failed = False + self.job_file = None + self.ofile = None + self.efile = None + self.user_log = None + self.user_log_size = 0 + self.runner_url = None + +class CondorJobRunner( BaseJobRunner ): + """ + Job runner backed by a finite pool of worker threads. FIFO scheduling + """ + STOP_SIGNAL = object() + def __init__( self, app ): + """Initialize this job runner and start the monitor thread""" + # Check if drmaa was importable, fail if not + self.app = app + self.sa_session = app.model.context + # 'watched' and 'queue' are both used to keep track of jobs to watch. + # 'queue' is used to add new watched jobs, and can be called from + # any thread (usually by the 'queue_job' method). 'watched' must only + # be modified by the monitor thread, which will move items from 'queue' + # to 'watched' and then manage the watched jobs. + self.watched = [] + self.monitor_queue = Queue() + self.monitor_thread = threading.Thread( target=self.monitor ) + self.monitor_thread.start() + self.work_queue = Queue() + self.work_threads = [] + nworkers = app.config.cluster_job_queue_workers + for i in range( nworkers ): + worker = threading.Thread( target=self.run_next ) + worker.start() + self.work_threads.append( worker ) + log.debug( "%d workers ready" % nworkers ) + + def get_native_spec( self, url ): + """Get any native DRM arguments specified by the site configuration""" + try: + return url.split('/')[2] or None + except: + return None + + def run_next( self ): + """ + Run the next item in the queue (a job waiting to run or finish ) + """ + while 1: + ( op, obj ) = self.work_queue.get() + if op is self.STOP_SIGNAL: + return + try: + if op == 'queue': + self.queue_job( obj ) + elif op == 'finish': + self.finish_job( obj ) + elif op == 'fail': + self.fail_job( obj ) + except: + log.exception( "Uncaught exception %sing job" % op ) + if op == 'queue': + obj.fail( "Uncaught exception queueing job", exception=True ) + + def queue_job( self, job_wrapper ): + """Create job script and submit it to the DRM""" + + try: + job_wrapper.prepare() + command_line = self.build_command_line( job_wrapper, include_metadata=True ) + except: + job_wrapper.fail( "failure preparing job", exception=True ) + log.exception("failure running job %s" % job_wrapper.get_id_tag()) + return + + runner_url = job_wrapper.get_job_runner() + + # This is silly, why would we queue a job with no command line? + if not command_line: + job_wrapper.finish( '', '' ) + return + + if job_wrapper.get_state() == model.Job.states.DELETED: + log.debug( "Job %s deleted by user before it entered the queue" % job_wrapper.get_id_tag() ) + job_wrapper.cleanup() + return + + # Change to queued state immediately + job_wrapper.change_state( model.Job.states.QUEUED ) + + # define job attributes + ofile = "%s/%s.o" % (self.app.config.cluster_files_directory, job_wrapper.job_id) + efile = "%s/%s.e" % (self.app.config.cluster_files_directory, job_wrapper.job_id) + user_log = "%s/%s.condor.log" % (self.app.config.cluster_files_directory, job_wrapper.job_id) + executable = "%s/galaxy_%s.sh" % (self.app.config.cluster_files_directory, job_wrapper.job_id) + submit_desc = [ ] + submit_desc.append( 'universe = vanilla' ) + submit_desc.append( 'getenv = true' ) + submit_desc.append( 'executable = ' + executable ) + submit_desc.append( 'output = ' + ofile ) + submit_desc.append( 'error = ' + efile ) + submit_desc.append( 'log = ' + user_log ) + submit_desc.append( 'notification = NEVER' ) + submit_desc.append( 'queue' ) + submit_file = "%s/%s.condor.desc" % (self.app.config.cluster_files_directory, job_wrapper.job_id) + + script = drm_template % (job_wrapper.galaxy_lib_dir, os.path.abspath( job_wrapper.working_directory ), command_line) + try: + fh = file( executable, "w" ) + fh.write( script ) + fh.close() + os.chmod( executable, 0750 ) + except: + job_wrapper.fail( "failure preparing job script", exception=True ) + log.exception("failure running job %s" % job_wrapper.get_id_tag()) + return + + try: + fh = file( submit_file, 'w' ) + for line in submit_desc: + fh.write( line + '\n' ) + fh.close() + except: + job_wrapper.fail( "failure writing submit file", exception=True ) + log.exception("failure running job %s" % job_wrapper.get_id_tag()) + self.cleanup( ( executable, submit_file ) ) + return + + # job was deleted while we were preparing it + if job_wrapper.get_state() == model.Job.states.DELETED: + log.debug( "Job %s deleted by user before it entered the queue" % job_wrapper.get_id_tag() ) + self.cleanup( ( executable, submit_file ) ) + job_wrapper.cleanup() + return + + # wrapper.get_id_tag() instead of job_id for compatibility with TaskWrappers. + galaxy_id_tag = job_wrapper.get_id_tag() + + log.debug("(%s) submitting file %s" % ( galaxy_id_tag, executable ) ) + log.debug("(%s) command is: %s" % ( galaxy_id_tag, command_line ) ) + s_out = '' + job_id = None + try: + submit = subprocess.Popen( ( 'condor_submit', submit_file ), stdout=subprocess.PIPE, stderr=subprocess.STDOUT ) + s_out, s_err = submit.communicate() + if submit.returncode == 0: + match = re.search( 'submitted to cluster (\\d+).', s_out ) + if match is None: + s_out = 'Failed to find job id from condor_submit' + else: + job_id = match.group( 1 ) + except Exception, e: + # TODO Add extra except for OSError? + s_out = str(e) + + if job_id is None: + log.debug( "condor_submit failed for job %s: %s" % (job_wrapper.get_id_tag(), s_out) ) + self.cleanup( ( submit_file, executable ) ) + job_wrapper.fail( "condor_submit failed", exception=True ) + return + + log.info("(%s) queued as %s" % ( galaxy_id_tag, job_id ) ) + + self.cleanup( ( submit_file, ) ) + + # store runner information for tracking if Galaxy restarts + job_wrapper.set_runner( runner_url, job_id ) + + # Store DRM related state information for job + drm_job_state = CondorJobState() + drm_job_state.job_wrapper = job_wrapper + drm_job_state.job_id = job_id + drm_job_state.ofile = ofile + drm_job_state.efile = efile + drm_job_state.job_file = executable + drm_job_state.user_log = user_log + drm_job_state.running = False + drm_job_state.runner_url = runner_url + + # Add to our 'queue' of jobs to monitor + self.monitor_queue.put( drm_job_state ) + + def monitor( self ): + """ + Watches jobs currently in the PBS queue and deals with state changes + (queued to running) and job completion + """ + while 1: + # Take any new watched jobs and put them on the monitor list + try: + while 1: + drm_job_state = self.monitor_queue.get_nowait() + if drm_job_state is self.STOP_SIGNAL: + # TODO: This is where any cleanup would occur + #self.ds.exit() + # Do we need any cleanup here? + return + self.watched.append( drm_job_state ) + except Empty: + pass + # Iterate over the list of watched jobs and check state + self.check_watched_items() + # Sleep a bit before the next state check + time.sleep( 1 ) + + def check_watched_items( self ): + """ + Called by the monitor thread to look at each watched job and deal + with state changes. + """ + new_watched = [] + for drm_job_state in self.watched: + job_id = drm_job_state.job_id + log_job_id = job_id.zfill(3) + galaxy_job_id = drm_job_state.job_wrapper.job_id + job_running = False; + job_complete = False; + job_failed = False; + try: + if os.stat( drm_job_state.user_log ).st_size == drm_job_state.user_log_size: + new_watched.append( drm_job_state ) + continue + with open(drm_job_state.user_log, 'r') as fh: + for line in fh: + if '001 (' + log_job_id + '.' in line: + job_running = True + if '004 (' + log_job_id + '.' in line: + job_running = False + if '007 (' + log_job_id + '.' in line: + job_running = False + if '005 (' + log_job_id + '.' in line: + job_complete = True + if '009 (' + log_job_id + '.' in line: + job_failed = True + drm_job_state.user_log_size = fh.tell() + except Exception, e: + # so we don't kill the monitor thread + log.exception("(%s/%s) Unable to check job status" % ( galaxy_job_id, job_id ) ) + log.warning("(%s/%s) job will now be errored" % ( galaxy_job_id, job_id ) ) + drm_job_state.fail_message = "Cluster could not complete job" + self.work_queue.put( ( 'fail', drm_job_state ) ) + continue + if job_running and not drm_job_state.running: + log.debug("(%s/%s) job is now running" % ( galaxy_job_id, job_id ) ) + drm_job_state.job_wrapper.change_state( model.Job.states.RUNNING ) + if not job_running and drm_job_state.running: + log.debug("(%s/%s) job has stopped running" % ( galaxy_job_id, job_id ) ) + # Will switching from RUNNING to QUEUED confuse Galaxy? + #drm_job_state.job_wrapper.change_state( model.Job.states.QUEUED ) + if job_complete: + log.debug("(%s/%s) job has completed" % ( galaxy_job_id, job_id ) ) + self.work_queue.put( ( 'finish', drm_job_state ) ) + continue + if job_failed: + log.debug("(%s/%s) job failed" % ( galaxy_job_id, job_id ) ) + drm_job_state.failed = True + self.work_queue.put( ( 'finish', drm_job_state ) ) + continue + drm_job_state.runnning = job_running + new_watched.append( drm_job_state ) + # Replace the watch list with the updated version + self.watched = new_watched + + def finish_job( self, drm_job_state ): + """ + Get the output/error for a finished job, pass to `job_wrapper.finish` + and cleanup all the DRM temporary files. + """ + ofile = drm_job_state.ofile + efile = drm_job_state.efile + job_file = drm_job_state.job_file + # collect the output + try: + ofh = file(ofile, "r") + efh = file(efile, "r") + stdout = ofh.read( 32768 ) + stderr = efh.read( 32768 ) + except: + stdout = '' + stderr = 'Job output not returned from cluster' + log.debug(stderr) + + try: + drm_job_state.job_wrapper.finish( stdout, stderr ) + except: + log.exception("Job wrapper finish method failed") + + # clean up the drm files + self.cleanup( ( ofile, efile, job_file, drm_job_state.user_log ) ) + + def fail_job( self, drm_job_state ): + """ + Seperated out so we can use the worker threads for it. + """ + self.stop_job( self.sa_session.query( self.app.model.Job ).get( drm_job_state.job_wrapper.job_id ) ) + drm_job_state.job_wrapper.fail( drm_job_state.fail_message ) + self.cleanup( ( drm_job_state.ofile, drm_job_state.efile, drm_job_state.job_file, drm_job_state.user_log ) ) + + def cleanup( self, files ): + if not asbool( self.app.config.get( 'debug', False ) ): + for file in files: + if os.access( file, os.R_OK ): + os.unlink( file ) + + def put( self, job_wrapper ): + """Add a job to the queue (by job identifier)""" + # Change to queued state before handing to worker thread so the runner won't pick it up again + job_wrapper.change_state( model.Job.states.QUEUED ) + self.work_queue.put( ( 'queue', job_wrapper ) ) + + def shutdown( self ): + """Attempts to gracefully shut down the monitor thread""" + log.info( "sending stop signal to worker threads" ) + self.monitor_queue.put( self.STOP_SIGNAL ) + for i in range( len( self.work_threads ) ): + self.work_queue.put( ( self.STOP_SIGNAL, None ) ) + log.info( "condor job runner stopped" ) + + def stop_job( self, job ): + """Attempts to delete a job from the DRM queue""" + try: + subprocess.check_call( ( 'condor_rm', job.job_runner_external_id ) ) + log.debug( "(%s/%s) Removed from DRM queue at user's request" % ( job.id, job.job_runner_external_id ) ) + except subprocess.CalledProcessError: + log.debug( "(%s/%s) User killed running job, but condor_rm failed" % ( job.id, job.job_runner_external_id ) ) + except Exception, e: + log.debug( "(%s/%s) User killed running job, but error encountered removing from Condor queue: %s" % ( job.id, job.job_runner_external_id, e ) ) + + def recover( self, job, job_wrapper ): + """Recovers jobs stuck in the queued/running state when Galaxy started""" + # TODO Check if we need any changes here + drm_job_state = CondorJobState() + drm_job_state.ofile = "%s/database/pbs/%s.o" % (os.getcwd(), job.id) + drm_job_state.efile = "%s/database/pbs/%s.e" % (os.getcwd(), job.id) + drm_job_state.job_file = "%s/database/pbs/galaxy_%s.sh" % (os.getcwd(), job.id) + drm_job_state.job_id = str( job.job_runner_external_id ) + drm_job_state.runner_url = job_wrapper.get_job_runner() + job_wrapper.command_line = job.command_line + drm_job_state.job_wrapper = job_wrapper + drm_job_state.user_log = "%s/%s.condor.log" % (self.app.config.cluster_files_directory, job_wrapper.job_id) + if job.state == model.Job.states.RUNNING: + log.debug( "(%s/%s) is still in running state, adding to the DRM queue" % ( job.id, job.job_runner_external_id ) ) + drm_job_state.running = True + self.monitor_queue.put( drm_job_state ) + elif job.state == model.Job.states.QUEUED: + log.debug( "(%s/%s) is still in DRM queued state, adding to the DRM queue" % ( job.id, job.job_runner_external_id ) ) + drm_job_state.running = False + self.monitor_queue.put( drm_job_state ) Repository URL: https://bitbucket.org/galaxy/galaxy-central/ -- This is a commit notification from bitbucket.org. You are receiving this because you have the service enabled, addressing the recipient of this email.
participants (1)
-
Bitbucket