commit/galaxy-central: smcmanus: Added code to handle task deletion properly.
1 new commit in galaxy-central: https://bitbucket.org/galaxy/galaxy-central/changeset/f1eb04839053/ changeset: f1eb04839053 user: smcmanus date: 2012-08-02 05:27:32 summary: Added code to handle task deletion properly. affected #: 8 files diff -r d19fbfefb676f953ffa572f74e311c3dfb5aeaee -r f1eb04839053a883401da55737ee2d5a338fae43 lib/galaxy/jobs/__init__.py --- a/lib/galaxy/jobs/__init__.py +++ b/lib/galaxy/jobs/__init__.py @@ -308,11 +308,13 @@ #ERROR at this point means the job was deleted by an administrator. return self.fail( job.info ) - # Check the - if ( self.check_tool_output( stdout, stderr, tool_exit_code ) ): - job.state = job.states.OK - else: - job.state = job.states.ERROR + # Check the tool's stdout, stderr, and exit code for errors, but only + # if the job has not already been marked as having an error. + if job.states.ERROR != job.state: + if ( self.check_tool_output( stdout, stderr, tool_exit_code ) ): + job.state = job.states.OK + else: + job.state = job.states.ERROR if self.version_string_cmd: version_filename = self.get_version_string_path() @@ -974,6 +976,8 @@ def set_runner( self, runner_url, external_id ): task = self.get_task() self.sa_session.refresh( task ) + # DELETEME: + #log.debug( "************** Setting task %d runner name to %s" % ( task.get_id(), runner_url ) ) task.task_runner_name = runner_url task.task_runner_external_id = external_id # DBTODO Check task job_runner_stuff @@ -983,28 +987,35 @@ def finish( self, stdout, stderr, tool_exit_code=0 ): # DBTODO integrate previous finish logic. # Simple finish for tasks. Just set the flag OK. - log.debug( 'task %s for job %d ended' % (self.task_id, self.job_id) ) """ Called to indicate that the associated command has been run. Updates the output datasets based on stderr and stdout from the command, and the contents of the output files. """ + # This may have ended too soon + log.debug( 'task %s for job %d ended; exit code: %d' + % (self.task_id, self.job_id, tool_exit_code) ) # default post job setup_external_metadata self.sa_session.expunge_all() task = self.get_task() # if the job was deleted, don't finish it if task.state == task.states.DELETED: + # Job was deleted by an administrator if self.app.config.cleanup_job in ( 'always', 'onsuccess' ): self.cleanup() return elif task.state == task.states.ERROR: - # Job was deleted by an administrator self.fail( task.info ) return + + # Check what the tool returned. If the stdout or stderr matched + # regular expressions that indicate errors, then set an error. + # The same goes if the tool's exit code was in a given range. if ( self.check_tool_output( stdout, stderr, tool_exit_code ) ): task.state = task.states.OK else: task.state = task.states.ERROR + # Save stdout and stderr if len( stdout ) > 32768: log.error( "stdout for task %d is greater than 32K, only first part will be logged to database" % task.id ) diff -r d19fbfefb676f953ffa572f74e311c3dfb5aeaee -r f1eb04839053a883401da55737ee2d5a338fae43 lib/galaxy/jobs/handler.py --- a/lib/galaxy/jobs/handler.py +++ b/lib/galaxy/jobs/handler.py @@ -392,6 +392,9 @@ runner_name = "tasks" else: runner_name = ( job_wrapper.get_job_runner_url().split(":", 1) )[0] + # DELETEME + #log.debug( "__get_runner_name: runner_name = %s; URL: %s" + # % (runner_name, job_wrapper.get_job_runner_url()) ) return runner_name def put( self, job_wrapper ): @@ -408,17 +411,43 @@ job_wrapper.fail( 'Unable to run job due to a misconfiguration of the Galaxy job running system. Please contact a site administrator.' ) def stop( self, job ): - runner_name = ( job.job_runner_name.split(":", 1) )[0] - log.debug( "stopping job %d in %s runner" %( job.id, runner_name ) ) - try: - self.job_runners[runner_name].stop_job( job ) - except KeyError: - log.error( 'stop(): (%s) Invalid job runner: %s' % ( job_wrapper.job_id, runner_name ) ) - # Job and output dataset states have already been updated, so nothing is done here. + """ + Stop the given job. The input variable job may be either a Job or a Task. + """ + # The Job and Task classes have been modified so that their accessors + # will return the appropriate value. + # Note that Jobs and Tasks have runner_names, which are distinct from + # the job_runner_name and task_runner_name. + + # DELETEME - this next block is for debug only. + log.debug( "DefaultJobDispatcher: Stopping job %d" % job.get_id() ) + if ( isinstance( job, model.Job ) ): + log.debug( "Stopping job %d:", job.get_id() ) + elif( isinstance( job, model.Task ) ): + log.debug( "Stopping job %d, task %d" + % ( job.get_job().get_id(), job.get_id() ) ) + else: + log.debug( "Unknown job to stop" ) + + # The runner name is not set until the job has started. + # If we're stopping a task, then the runner_name may be + # None, in which case it hasn't been scheduled. + if ( None != job.get_job_runner_name() ): + runner_name = (job.get_job_runner_name().split(":",1))[0] + if ( isinstance( job, model.Job ) ): + log.debug( "stopping job %d in %s runner" %( job.get_id(), runner_name ) ) + elif ( isinstance( job, model.Task ) ): + log.debug( "Stopping job %d, task %d in %s runner" + % ( job.get_job().get_id(), job.get_id(), runner_name ) ) + try: + self.job_runners[runner_name].stop_job( job ) + except KeyError: + log.error( 'stop(): (%s) Invalid job runner: %s' % ( job_wrapper.job_id, runner_name ) ) + # Job and output dataset states have already been updated, so nothing is done here. def recover( self, job, job_wrapper ): runner_name = ( job.job_runner_name.split(":", 1) )[0] - log.debug( "recovering job %d in %s runner" %( job.id, runner_name ) ) + log.debug( "recovering job %d in %s runner" %( job.get_id(), runner_name ) ) try: self.job_runners[runner_name].recover( job, job_wrapper ) except KeyError: diff -r d19fbfefb676f953ffa572f74e311c3dfb5aeaee -r f1eb04839053a883401da55737ee2d5a338fae43 lib/galaxy/jobs/runners/drmaa.py --- a/lib/galaxy/jobs/runners/drmaa.py +++ b/lib/galaxy/jobs/runners/drmaa.py @@ -136,8 +136,16 @@ if op is self.STOP_SIGNAL: return try: + # If the next item is to be run, then only run it if the + # job state is "queued". Otherwise the next item was either + # cancelled or one of its siblings encountered an error. if op == 'queue': - self.queue_job( obj ) + job_state = obj.get_state() + if model.Job.states.QUEUED == job_state: + self.queue_job( obj ) + else: + log.debug( "PBS: not executing job %d in state %s" + % ( obj.get_id_tag(), job_state ) ) elif op == 'finish': self.finish_job( obj ) elif op == 'fail': @@ -330,6 +338,7 @@ which_try = 0 # By default, the exit code is 0, which typically indicates success. exit_code = 0 + exit_code_str = "0" while which_try < (self.app.config.retry_job_output_collection + 1): try: ofh = file(ofile, "r") @@ -386,15 +395,15 @@ """Attempts to delete a job from the DRM queue""" try: if self.external_killJob_script is None: - self.ds.control( job.job_runner_external_id, drmaa.JobControlAction.TERMINATE ) + self.ds.control( job.get_job_runner_external_id(), drmaa.JobControlAction.TERMINATE ) else: # FIXME: hardcoded path - subprocess.Popen( [ '/usr/bin/sudo', '-E', self.external_killJob_script, str( job.job_runner_external_id ), str( self.userid ) ], shell=False ) - log.debug( "(%s/%s) Removed from DRM queue at user's request" % ( job.id, job.job_runner_external_id ) ) + subprocess.Popen( [ '/usr/bin/sudo', '-E', self.external_killJob_script, str( job.get_job_runner_external_id() ), str( self.userid ) ], shell=False ) + log.debug( "(%s/%s) Removed from DRM queue at user's request" % ( job.get_id(), job.get_job_runner_external_id() ) ) except drmaa.InvalidJobException: - log.debug( "(%s/%s) User killed running job, but it was already dead" % ( job.id, job.job_runner_external_id ) ) + log.debug( "(%s/%s) User killed running job, but it was already dead" % ( job.get_id(), job.get_job_runner_external_id() ) ) except Exception, e: - log.debug( "(%s/%s) User killed running job, but error encountered removing from DRM queue: %s" % ( job.id, job.job_runner_external_id, e ) ) + log.debug( "(%s/%s) User killed running job, but error encountered removing from DRM queue: %s" % ( job.get_id(), job.get_job_runner_external_id(), e ) ) def recover( self, job, job_wrapper ): """Recovers jobs stuck in the queued/running state when Galaxy started""" @@ -402,18 +411,18 @@ drm_job_state.ofile = "%s.drmout" % os.path.join(os.getcwd(), job_wrapper.working_directory, job_wrapper.get_id_tag()) drm_job_state.efile = "%s.drmerr" % os.path.join(os.getcwd(), job_wrapper.working_directory, job_wrapper.get_id_tag()) drm_job_state.ecfile = "%s.drmec" % os.path.join(os.getcwd(), job_wrapper.working_directory, job_wrapper.get_id_tag()) - drm_job_state.job_file = "%s/galaxy_%s.sh" % (self.app.config.cluster_files_directory, job.id) - drm_job_state.job_id = str( job.job_runner_external_id ) + drm_job_state.job_file = "%s/galaxy_%s.sh" % (self.app.config.cluster_files_directory, job.get_id()) + drm_job_state.job_id = str( job.get_job_runner_external_id() ) drm_job_state.runner_url = job_wrapper.get_job_runner_url() - job_wrapper.command_line = job.command_line + job_wrapper.command_line = job.get_command_line() drm_job_state.job_wrapper = job_wrapper if job.state == model.Job.states.RUNNING: - log.debug( "(%s/%s) is still in running state, adding to the DRM queue" % ( job.id, job.job_runner_external_id ) ) + log.debug( "(%s/%s) is still in running state, adding to the DRM queue" % ( job.get_id(), job.get_job_runner_external_id() ) ) drm_job_state.old_state = drmaa.JobState.RUNNING drm_job_state.running = True self.monitor_queue.put( drm_job_state ) - elif job.state == model.Job.states.QUEUED: - log.debug( "(%s/%s) is still in DRM queued state, adding to the DRM queue" % ( job.id, job.job_runner_external_id ) ) + elif job.get_state() == model.Job.states.QUEUED: + log.debug( "(%s/%s) is still in DRM queued state, adding to the DRM queue" % ( job.get_id(), job.get_job_runner_external_id() ) ) drm_job_state.old_state = drmaa.JobState.QUEUED_ACTIVE drm_job_state.running = False self.monitor_queue.put( drm_job_state ) diff -r d19fbfefb676f953ffa572f74e311c3dfb5aeaee -r f1eb04839053a883401da55737ee2d5a338fae43 lib/galaxy/jobs/runners/local.py --- a/lib/galaxy/jobs/runners/local.py +++ b/lib/galaxy/jobs/runners/local.py @@ -7,6 +7,7 @@ from galaxy import model from galaxy.datatypes.data import nice_size from galaxy.jobs.runners import BaseJobRunner +from galaxy.jobs import Sleeper, JobWrapper, TaskWrapper import os, errno from time import sleep @@ -52,93 +53,106 @@ log.exception( "Uncaught exception running job" ) def run_job( self, job_wrapper ): - job_wrapper.set_runner( 'local:///', None ) - stderr = stdout = command_line = '' - exit_code = 0 - # Prepare the job to run - try: - job_wrapper.prepare() - command_line = self.build_command_line( job_wrapper ) - except: - log.exception("failure running job %d" % job_wrapper.job_id) - job_wrapper.fail( "failure preparing job", exception=True ) - return - # If we were able to get a command line, run the job - if command_line: + # Do not run the job if something happened to its state while it was + # enqueued. (For example, a task could have been cancelled and does not + # need to be run.) + if model.Job.states.QUEUED != job_wrapper.get_state(): + log.debug( "Local runner: job %s is in state %s and will not be run" + % ( job_wrapper.get_id_tag(), job_wrapper.get_state() ) ) + + else: + log.debug( "Local runner: starting job %s" + % job_wrapper.get_id_tag() ) + job_wrapper.set_runner( 'local:///', None ) + stderr = stdout = command_line = '' + exit_code = 0 + # Prepare the job to run try: - log.debug( 'executing: %s' % command_line ) - stdout_file = tempfile.NamedTemporaryFile( suffix='_stdout', dir=job_wrapper.working_directory ) - stderr_file = tempfile.NamedTemporaryFile( suffix='_stderr', dir=job_wrapper.working_directory ) - proc = subprocess.Popen( args = command_line, - shell = True, - cwd = job_wrapper.working_directory, - stdout = stdout_file, - stderr = stderr_file, - env = os.environ, - preexec_fn = os.setpgrp ) - job_wrapper.set_runner( 'local:///', proc.pid ) - job_wrapper.change_state( model.Job.states.RUNNING ) - if self.app.config.output_size_limit > 0: - sleep_time = 1 - while proc.poll() is None: - for outfile, size in job_wrapper.check_output_sizes(): - if size > self.app.config.output_size_limit: - # Error the job immediately - job_wrapper.fail( 'Job output grew too large (greater than %s), please try different job parameters or' \ - % nice_size( self.app.config.output_size_limit ) ) - log.warning( 'Terminating job %s due to output %s growing larger than %s limit' \ - % ( job_wrapper.job_id, os.path.basename( outfile ), nice_size( self.app.config.output_size_limit ) ) ) - # Then kill it - os.killpg( proc.pid, 15 ) - sleep( 1 ) - if proc.poll() is None: - os.killpg( proc.pid, 9 ) - proc.wait() # reap - log.debug( 'Job %s (pid %s) terminated' % ( job_wrapper.job_id, proc.pid ) ) - return - sleep( sleep_time ) - if sleep_time < 8: - # So we don't stat every second - sleep_time *= 2 - # Reap the process and get the exit code. The exit code should - # only be None if the process isn't finished, but check anyway. - exit_code = proc.wait() # reap - if None == exit_code: - exit_code = 0 - stdout_file.seek( 0 ) - stderr_file.seek( 0 ) - stdout = stdout_file.read( 32768 ) - stderr = stderr_file.read( 32768 ) - stdout_file.close() - stderr_file.close() - log.debug('execution finished: %s' % command_line) - except Exception, exc: - job_wrapper.fail( "failure running job", exception=True ) + job_wrapper.prepare() + command_line = self.build_command_line( job_wrapper ) + except: log.exception("failure running job %d" % job_wrapper.job_id) + job_wrapper.fail( "failure preparing job", exception=True ) return - #run the metadata setting script here - #this is terminate-able when output dataset/job is deleted - #so that long running set_meta()s can be canceled without having to reboot the server - if job_wrapper.get_state() not in [ model.Job.states.ERROR, model.Job.states.DELETED ] and self.app.config.set_metadata_externally and job_wrapper.output_paths: - external_metadata_script = job_wrapper.setup_external_metadata( output_fnames = job_wrapper.get_output_fnames(), - set_extension = True, - tmp_dir = job_wrapper.working_directory, - kwds = { 'overwrite' : False } ) #we don't want to overwrite metadata that was copied over in init_meta(), as per established behavior - log.debug( 'executing external set_meta script for job %d: %s' % ( job_wrapper.job_id, external_metadata_script ) ) - external_metadata_proc = subprocess.Popen( args = external_metadata_script, - shell = True, - env = os.environ, - preexec_fn = os.setpgrp ) - job_wrapper.external_output_metadata.set_job_runner_external_pid( external_metadata_proc.pid, self.sa_session ) - external_metadata_proc.wait() - log.debug( 'execution of external set_meta for job %d finished' % job_wrapper.job_id ) + # If we were able to get a command line, run the job + if command_line: + try: + log.debug( 'executing: %s' % command_line ) + stdout_file = tempfile.NamedTemporaryFile( suffix='_stdout', dir=job_wrapper.working_directory ) + stderr_file = tempfile.NamedTemporaryFile( suffix='_stderr', dir=job_wrapper.working_directory ) + proc = subprocess.Popen( args = command_line, + shell = True, + cwd = job_wrapper.working_directory, + stdout = stdout_file, + stderr = stderr_file, + env = os.environ, + preexec_fn = os.setpgrp ) + # DELETEME (debug): + log.debug( "Job %s: PID %d" + % ( job_wrapper.get_id_tag(), proc.pid ) ) + job_wrapper.set_runner( 'local:///', proc.pid ) + job_wrapper.change_state( model.Job.states.RUNNING ) + if self.app.config.output_size_limit > 0: + sleep_time = 1 + while proc.poll() is None: + for outfile, size in job_wrapper.check_output_sizes(): + if size > self.app.config.output_size_limit: + # Error the job immediately + job_wrapper.fail( 'Job output grew too large (greater than %s), please try different job parameters or' \ + % nice_size( self.app.config.output_size_limit ) ) + log.warning( 'Terminating job %s due to output %s growing larger than %s limit' \ + % ( job_wrapper.job_id, os.path.basename( outfile ), nice_size( self.app.config.output_size_limit ) ) ) + # Then kill it + os.killpg( proc.pid, 15 ) + sleep( 1 ) + if proc.poll() is None: + os.killpg( proc.pid, 9 ) + proc.wait() # reap + log.debug( 'Job %s (pid %s) terminated' % ( job_wrapper.job_id, proc.pid ) ) + return + sleep( sleep_time ) + if sleep_time < 8: + # So we don't stat every second + sleep_time *= 2 + # Reap the process and get the exit code. The exit code should + # only be None if the process isn't finished, but check anyway. + exit_code = proc.wait() # reap + if None == exit_code: + exit_code = 0 + stdout_file.seek( 0 ) + stderr_file.seek( 0 ) + stdout = stdout_file.read( 32768 ) + stderr = stderr_file.read( 32768 ) + stdout_file.close() + stderr_file.close() + log.debug('execution finished: %s' % command_line) + except Exception, exc: + job_wrapper.fail( "failure running job", exception=True ) + log.exception("failure running job %d" % job_wrapper.job_id) + return + #run the metadata setting script here + #this is terminate-able when output dataset/job is deleted + #so that long running set_meta()s can be canceled without having to reboot the server + if job_wrapper.get_state() not in [ model.Job.states.ERROR, model.Job.states.DELETED ] and self.app.config.set_metadata_externally and job_wrapper.output_paths: + external_metadata_script = job_wrapper.setup_external_metadata( output_fnames = job_wrapper.get_output_fnames(), + set_extension = True, + tmp_dir = job_wrapper.working_directory, + kwds = { 'overwrite' : False } ) #we don't want to overwrite metadata that was copied over in init_meta(), as per established behavior + log.debug( 'executing external set_meta script for job %d: %s' % ( job_wrapper.job_id, external_metadata_script ) ) + external_metadata_proc = subprocess.Popen( args = external_metadata_script, + shell = True, + env = os.environ, + preexec_fn = os.setpgrp ) + job_wrapper.external_output_metadata.set_job_runner_external_pid( external_metadata_proc.pid, self.sa_session ) + external_metadata_proc.wait() + log.debug( 'execution of external set_meta for job %d finished' % job_wrapper.job_id ) - # Finish the job! - try: - job_wrapper.finish( stdout, stderr, exit_code ) - except: - log.exception("Job wrapper finish method failed") - job_wrapper.fail("Unable to finish job", exception=True) + # Finish the job! + try: + job_wrapper.finish( stdout, stderr, exit_code ) + except: + log.exception("Job wrapper finish method failed") + job_wrapper.fail("Unable to finish job", exception=True) def put( self, job_wrapper ): """Add a job to the queue (by job identifier)""" @@ -166,29 +180,29 @@ def stop_job( self, job ): #if our local job has JobExternalOutputMetadata associated, then our primary job has to have already finished - if job.external_output_metadata: - pid = job.external_output_metadata[0].job_runner_external_pid #every JobExternalOutputMetadata has a pid set, we just need to take from one of them + if job.get_external_output_metadata(): + pid = job.get_external_output_metadata()[0].job_runner_external_pid #every JobExternalOutputMetadata has a pid set, we just need to take from one of them else: - pid = job.job_runner_external_id + pid = job.get_job_runner_external_id() if pid in [ None, '' ]: - log.warning( "stop_job(): %s: no PID in database for job, unable to stop" % job.id ) + log.warning( "stop_job(): %s: no PID in database for job, unable to stop" % job.get_id() ) return pid = int( pid ) if not self.check_pid( pid ): - log.warning( "stop_job(): %s: PID %d was already dead or can't be signaled" % ( job.id, pid ) ) + log.warning( "stop_job(): %s: PID %d was already dead or can't be signaled" % ( job.get_id(), pid ) ) return for sig in [ 15, 9 ]: try: os.killpg( pid, sig ) except OSError, e: - log.warning( "stop_job(): %s: Got errno %s when attempting to signal %d to PID %d: %s" % ( job.id, errno.errorcode[e.errno], sig, pid, e.strerror ) ) + log.warning( "stop_job(): %s: Got errno %s when attempting to signal %d to PID %d: %s" % ( job.get_id(), errno.errorcode[e.errno], sig, pid, e.strerror ) ) return # give up sleep( 2 ) if not self.check_pid( pid ): - log.debug( "stop_job(): %s: PID %d successfully killed with signal %d" %( job.id, pid, sig ) ) + log.debug( "stop_job(): %s: PID %d successfully killed with signal %d" %( job.get_id(), pid, sig ) ) return else: - log.warning( "stop_job(): %s: PID %d refuses to die after signaling TERM/KILL" %( job.id, pid ) ) + log.warning( "stop_job(): %s: PID %d refuses to die after signaling TERM/KILL" %( job.get_id(), pid ) ) def recover( self, job, job_wrapper ): # local jobs can't be recovered diff -r d19fbfefb676f953ffa572f74e311c3dfb5aeaee -r f1eb04839053a883401da55737ee2d5a338fae43 lib/galaxy/jobs/runners/sge.py --- a/lib/galaxy/jobs/runners/sge.py +++ b/lib/galaxy/jobs/runners/sge.py @@ -365,28 +365,28 @@ def stop_job( self, job ): """Attempts to delete a job from the SGE queue""" try: - self.ds.control( job.job_runner_external_id, DRMAA.Session.TERMINATE ) - log.debug( "(%s/%s) Removed from SGE queue at user's request" % ( job.id, job.job_runner_external_id ) ) + self.ds.control( job.get_job_runner_external_id(), DRMAA.Session.TERMINATE ) + log.debug( "(%s/%s) Removed from SGE queue at user's request" % ( job.get_id(), job.get_job_runner_external_id() ) ) except DRMAA.InvalidJobError: - log.debug( "(%s/%s) User killed running job, but it was already dead" % ( job.id, job.job_runner_external_id ) ) + log.debug( "(%s/%s) User killed running job, but it was already dead" % ( job.get_id(), job.get_job_runner_external_id() ) ) def recover( self, job, job_wrapper ): """Recovers jobs stuck in the queued/running state when Galaxy started""" sge_job_state = SGEJobState() - sge_job_state.ofile = "%s/database/pbs/%s.o" % (os.getcwd(), job.id) - sge_job_state.efile = "%s/database/pbs/%s.e" % (os.getcwd(), job.id) - sge_job_state.job_file = "%s/database/pbs/galaxy_%s.sh" % (os.getcwd(), job.id) - sge_job_state.job_id = str( job.job_runner_external_id ) + sge_job_state.ofile = "%s/database/pbs/%s.o" % (os.getcwd(), job.get_id()) + sge_job_state.efile = "%s/database/pbs/%s.e" % (os.getcwd(), job.get_id()) + sge_job_state.job_file = "%s/database/pbs/galaxy_%s.sh" % (os.getcwd(), job.get_id()) + sge_job_state.job_id = str( job.get_job_runner_external_id() ) sge_job_state.runner_url = job_wrapper.get_job_runner_url() - job_wrapper.command_line = job.command_line + job_wrapper.command_line = job.get_command_line() sge_job_state.job_wrapper = job_wrapper - if job.state == model.Job.states.RUNNING: - log.debug( "(%s/%s) is still in running state, adding to the SGE queue" % ( job.id, job.job_runner_external_id ) ) + if job.get_state() == model.Job.states.RUNNING: + log.debug( "(%s/%s) is still in running state, adding to the SGE queue" % ( job.get_id(), job.get_job_runner_external_id() ) ) sge_job_state.old_state = DRMAA.Session.RUNNING sge_job_state.running = True self.monitor_queue.put( sge_job_state ) - elif job.state == model.Job.states.QUEUED: - log.debug( "(%s/%s) is still in SGE queued state, adding to the SGE queue" % ( job.id, job.job_runner_external_id ) ) + elif job.get_state() == model.Job.states.QUEUED: + log.debug( "(%s/%s) is still in SGE queued state, adding to the SGE queue" % ( job.get_id(), job.get_job_runner_external_id() ) ) sge_job_state.old_state = DRMAA.Session.QUEUED_ACTIVE sge_job_state.running = False self.monitor_queue.put( sge_job_state ) diff -r d19fbfefb676f953ffa572f74e311c3dfb5aeaee -r f1eb04839053a883401da55737ee2d5a338fae43 lib/galaxy/jobs/runners/tasks.py --- a/lib/galaxy/jobs/runners/tasks.py +++ b/lib/galaxy/jobs/runners/tasks.py @@ -80,30 +80,49 @@ tw = TaskWrapper(task, job_wrapper.queue) task_wrappers.append(tw) self.app.job_manager.job_handler.dispatcher.put(tw) - tasks_incomplete = False + tasks_complete = False count_complete = 0 sleep_time = 1 # sleep/loop until no more progress can be made. That is when - # all tasks are one of { OK, ERROR, DELETED } + # all tasks are one of { OK, ERROR, DELETED }. If a task completed_states = [ model.Task.states.OK, \ - model.Task.states.ERROR, \ - model.Task.states.DELETED ] - # TODO: Should we report an error (and not merge outputs) if one of the subtasks errored out? - # Should we prevent any that are pending from being started in that case? - while tasks_incomplete is False: + model.Task.states.ERROR, \ + model.Task.states.DELETED ] + + # TODO: Should we report an error (and not merge outputs) if + # one of the subtasks errored out? Should we prevent any that + # are pending from being started in that case? + # SM: I'm + # If any task has an error, then we will stop all of them + # immediately. Tasks that are in the QUEUED state will be + # moved to the DELETED state. The task's runner should + # ignore tasks that are not in the QUEUED state. + # Deleted tasks are not included right now. + # + while tasks_complete is False: count_complete = 0 - tasks_incomplete = True + tasks_complete = True for tw in task_wrappers: +# # DELETEME - debug +# log.debug( "Checking task wrapper %d; tasks_complete = %s" +# % (tw.task_id, tasks_complete) ) task_state = tw.get_state() - if not task_state in completed_states: - tasks_incomplete = False + if ( model.Task.states.ERROR == task_state ): + log.debug( "Canceling job %d: Task %d returned an error" + % ( tw.job_id, tw.task_id ) ) + self.cancel_job( job_wrapper, task_wrappers ) + tasks_complete = True + break + elif not task_state in completed_states: + tasks_complete = False else: count_complete = count_complete + 1 - if tasks_incomplete is False: - # log.debug('Tasks complete: %s. Sleeping %s' % (count_complete, sleep_time)) + if tasks_complete is False: sleep( sleep_time ) if sleep_time < 8: sleep_time *= 2 + import time + job_wrapper.reclaim_ownership() # if running as the actual user, change ownership before merging. log.debug('execution finished - beginning merge: %s' % command_line) stdout, stderr = splitter.do_merge(job_wrapper, task_wrappers) @@ -135,6 +154,58 @@ log.exception("Job wrapper finish method failed") job_wrapper.fail("Unable to finish job", exception=True) + + def cancel_job( self, job_wrapper, task_wrappers ): + """ + Cancel the given job. The job's state will be set to ERROR. + Any running tasks will be cancelled, and any queued/pending + tasks will be marked as DELETED so that runners know not + to run those tasks. + """ + job = job_wrapper.get_job() + job.set_state( model.Job.states.ERROR ) + + # For every task (except the one that already had an error) + # - If the task is queued, then mark it as deleted + # so that the runner will not run it later. (It would + # be great to remove stuff from a runner's queue before + # the runner picks it up, but that isn't possible in + # most APIs.) + # - If the task is running, then tell the runner + # (via the dispatcher) to cancel the task. + # - Else the task is new or waiting (which should be + # impossible) or in an error or deleted state already, + # so skip it. + # This is currently done in two loops. If a running task is + # cancelled, then a queued task could take its place before + # it's marked as deleted. + # TODO: Eliminate the chance of a race condition wrt state. + for task_wrapper in task_wrappers: + task = task_wrapper.get_task() + task_state = task.get_state() + if ( model.Task.states.QUEUED == task_state ): + log.debug( "cancel_job for job %d: Task %d is not running; setting state to DELETED" + % ( job.get_id(), task.get_id() ) ) + task_wrapper.change_state( task.states.DELETED ) + # If a task failed, then the caller will have waited a few seconds + # before recognizing the failure. In that time, a queued task could + # have been picked up by a runner but not marked as running. + # So wait a few seconds so that we can eliminate such tasks once they + # are running. + sleep(5) + for task_wrapper in task_wrappers: + if ( model.Task.states.RUNNING == task_wrapper.get_state() ): + task = task_wrapper.get_task() + log.debug( "cancel_job for job %d: Stopping running task %d" + % ( job.get_id(), task.get_id() ) ) + job_wrapper.app.job_manager.job_handler.dispatcher.stop( task ) + +# DELETEME: +# else: +# log.debug( "cancel_job for job %d: Task %d is in state %s and does not need to be cancelled" +# % ( job.get_id(), task.get_id(), task_state ) ) + + def put( self, job_wrapper ): """Add a job to the queue (by job identifier)""" # Change to queued state before handing to worker thread so the runner won't pick it up again @@ -166,9 +237,11 @@ # runner because the task runner also starts all the tasks. # First, get the list of tasks from job.tasks, which uses SQL # alchemy to retrieve a job's list of tasks. - if ( len( job.tasks ) > 0 ): - for task in job.tasks: - self.stop_pid( task.task_runner_external_id, job.id ) + tasks = job.get_tasks() + if ( len( tasks ) > 0 ): + for task in tasks: + log.debug( "Killing task's job " + str(task.get_id()) ) + self.app.job_manager.job_handler.dispatcher.stop(task) # There were no subtasks, so just kill the job. We'll touch # this if the tasks runner is used but the tool does not use diff -r d19fbfefb676f953ffa572f74e311c3dfb5aeaee -r f1eb04839053a883401da55737ee2d5a338fae43 lib/galaxy/jobs/splitters/multi.py --- a/lib/galaxy/jobs/splitters/multi.py +++ b/lib/galaxy/jobs/splitters/multi.py @@ -126,6 +126,9 @@ if output in merge_outputs: output_type = outputs[output][0].datatype output_files = [os.path.join(dir,base_output_name) for dir in task_dirs] + # Just include those files f in the output list for which the + # file f exists; some files may not exist if a task fails. + output_files = [ f for f in output_files if os.path.exists(f) ] log.debug('files %s ' % output_files) output_type.merge(output_files, output_file_name) log.debug('merge finished: %s' % output_file_name) diff -r d19fbfefb676f953ffa572f74e311c3dfb5aeaee -r f1eb04839053a883401da55737ee2d5a338fae43 lib/galaxy/model/__init__.py --- a/lib/galaxy/model/__init__.py +++ b/lib/galaxy/model/__init__.py @@ -114,6 +114,7 @@ ERROR = 'error', DELETED = 'deleted', DELETED_NEW = 'deleted_new' ) + # Please include an accessor (get/set pair) for any new columns/members. def __init__( self ): self.session_id = None self.user_id = None @@ -134,6 +135,97 @@ self.imported = False self.handler = None + # TODO: Add accessors for members defined in SQL Alchemy for the Job table and + # for the mapper defined to the Job table. + def get_session_id( self ): + return self.session_id + def get_user_id( self ): + return self.user_id + def get_tool_id( self ): + return self.tool_id + def get_tool_version( self ): + return self.tool_version + def get_command_line( self ): + return self.command_line + def get_param_filename( self ): + return self.param_filename + def get_parameters( self ): + return self.parameters + def get_input_datasets( self ): + return self.input_datasets + def get_output_datasets( self ): + return self.output_datasets + def get_input_library_datasets( self ): + return self.input_library_datasets + def get_output_library_datasets( self ): + return self.output_library_datasets + def get_state( self ): + return self.state + def get_info( self ): + return self.info + def get_job_runner_name( self ): + # This differs from the Task class in that job_runner_name is + # accessed instead of task_runner_name. Note that the field + # runner_name is not the same thing. + return self.job_runner_name + def get_job_runner_external_id( self ): + # This is different from the Task just in the member accessed: + return self.job_runner_external_id + def get_post_job_actions( self ): + return self.post_job_actions + def get_imported( self ): + return self.imported + def get_handler( self ): + return self.handler + def get_params( self ): + return self.params + def get_user( self ): + # This is defined in the SQL Alchemy mapper as a relation to the User. + return self.user + def get_id( self ): + # This is defined in the SQL Alchemy's Job table (and not in the model). + return self.id + def get_tasks( self ): + # The tasks member is pert of a reference in the SQL Alchemy schema: + return self.tasks + + def set_session_id( self, session_id ): + self.session_id = session_id + def set_user_id( self, user_id ): + self.user_id = user_id + def set_tool_id( self, tool_id ): + self.tool_id = tool_id + def set_tool_version( self, tool_version ): + self.tool_version = tool_version + def set_command_line( self, command_line ): + self.command_line = command_line + def set_param_filename( self, param_filename ): + self.param_filename = param_filename + def set_parameters( self, parameters ): + self.parameters = parameters + def set_input_datasets( self, input_datasets ): + self.input_datasets = input_datasets + def set_output_datasets( self, output_datasets ): + self.output_datasets = output_datasets + def set_input_library_datasets( self, input_library_datasets ): + self.input_library_datasets = input_library_datasets + def set_output_library_datasets( self, output_library_datasets ): + self.output_library_datasets = output_library_datasets + def set_info( self, info ): + self.info = info + def set_runner_name( self, job_runner_name ): + self.job_runner_name = job_runner_name + def set_runner_external_id( self, job_runner_external_id ): + self.job_runner_external_id = job_runner_external_id + def set_post_job_actions( self, post_job_actions ): + self.post_job_actions = post_job_actions + def set_imported( self, imported ): + self.imported = imported + def set_handler( self, handler ): + self.handler = handler + def set_params( self, params ): + self.params = params + def add_parameter( self, name, value ): self.parameters.append( JobParameter( name, value ) ) def add_input_dataset( self, name, dataset ): @@ -147,6 +239,10 @@ def add_post_job_action(self, pja): self.post_job_actions.append( PostJobActionAssociation( pja, self ) ) def set_state( self, state ): + """ + This is the only set method that performs extra work. In this case, the + state is propagated down to datasets. + """ self.state = state # For historical reasons state propogates down to datasets for da in self.output_datasets: @@ -204,6 +300,7 @@ ERROR = 'error', DELETED = 'deleted' ) + # Please include an accessor (get/set pair) for any new columns/members. def __init__( self, job, working_directory, prepare_files_cmd ): self.command_line = None self.parameters = [] @@ -213,13 +310,19 @@ self.task_runner_name = None self.task_runner_external_id = None self.job = job - self.stdout = None - self.stderr = None + # SM: Using default empty strings avoids None exceptions later on. + self.stdout = "" + self.stderr = "" self.prepare_input_files_cmd = prepare_files_cmd +<<<<<<< local + + # TODO: Add accessors for members defined in SQL Alchemy for the Task table. +======= def set_state( self, state ): self.state = state +>>>>>>> other def get_param_values( self, app ): """ Read encoded parameter values from the database and turn back into a @@ -230,6 +333,91 @@ param_dict = tool.params_from_strings( param_dict, app ) return param_dict + def get_id( self ): + # This is defined in the SQL Alchemy schema: + return self.id + def get_command_line( self ): + return self.command_line + def get_parameters( self ): + return self.parameters + def get_state( self ): + return self.state + def get_info( self ): + return self.info + def get_working_directory( self ): + return self.working_directory + def get_task_runner_name( self ): + return self.task_runner_name + def get_task_runner_external_id( self ): + return self.task_runner_external_id + def get_job( self ): + return self.job + def get_stdout( self ): + return self.stdout + def get_stderr( self ): + return self.stderr + def get_prepare_input_files_cmd( self ): + return self.prepare_input_files_cmd + + # The following accessors are for members that are in the Job class but + # not in the Task class. So they can either refer to the parent Job + # or return None, depending on whether Tasks need to point to the parent + # (e.g., for a session) or never use the member (e.g., external output + # metdata). These can be filled in as needed. + def get_external_output_metadata( self ): + return None + def get_job_runner_name( self ): + """ + Since runners currently access Tasks the same way they access Jobs, + this method just refers to *this* instance's runner. + """ + return self.task_runner_name + def get_job_runner_external_id( self ): + """ + Runners will use the same methods to get information about the Task + class as they will about the Job class, so this method just returns + the task's external id. + """ + # TODO: Merge into get_runner_external_id. + return self.task_runner_external_id + def get_session_id( self ): + # The Job's galaxy session is equal to the Job's session, so the + # Job's session is the same as the Task's session. + return self.get_job().get_session_id() + + def set_id( self, id ): + # This is defined in the SQL Alchemy's mapper and not here. + # This should never be called. + self.id = id + def set_command_line( self, command_line ): + self.command_line = command_line + def set_parameters( self, parameters ): + self.parameters = parameters + def set_state( self, state ): + self.state = state + def set_info( self, info ): + self.info = info + def set_working_directory( self, working_directory ): + self.working_directory = working_directory + def set_task_runner_name( self, task_runner_name ): + self.task_runner_name = task_runner_name + def set_job_runner_external_id( self, task_runner_external_id ): + # This method is available for runners that do not want/need to + # differentiate between the kinds of Runnable things (Jobs and Tasks) + # that they're using. + log.debug( "Task %d: Set external id to %s" + % ( self.id, task_runner_external_id ) ) + self.task_runner_external_id = task_runner_external_id + def set_task_runner_external_id( self, task_runner_external_id ): + self.task_runner_external_id = task_runner_external_id + def set_job( self, job ): + self.job = job + def set_stdout( self, stdout ): + self.stdout = stdout + def set_stderr( self, stderr ): + self.stderr = stderr + def set_prepare_input_files_cmd( self, prepare_input_files_cmd ): + self.prepare_input_files_cmd = prepare_input_files_cmd class JobParameter( object ): def __init__( self, name, value ): Repository URL: https://bitbucket.org/galaxy/galaxy-central/ -- This is a commit notification from bitbucket.org. You are receiving this because you have the service enabled, addressing the recipient of this email.
participants (1)
-
Bitbucket