2 new commits in galaxy-central: https://bitbucket.org/galaxy/galaxy-central/commits/3544960e8928/ Changeset: 3544960e8928 User: jmchilton Date: 2013-05-28 04:40:36 Summary: Improved exception and error handling conditions for the LWR job runner. PEP-8 fixes. Affected #: 2 files diff -r 31714646a7b441f34a43748065278a1b08940c3c -r 3544960e892875f29ad9d64c8e99141bc4da4052 lib/galaxy/jobs/runners/lwr.py --- a/lib/galaxy/jobs/runners/lwr.py +++ b/lib/galaxy/jobs/runners/lwr.py @@ -14,6 +14,7 @@ __all__ = [ 'LwrJobRunner' ] + class LwrJobRunner( AsynchronousJobRunner ): """ LWR Job Runner @@ -49,18 +50,17 @@ return job_state def queue_job(self, job_wrapper): - stderr = stdout = command_line = '' - + command_line = '' job_destination = job_wrapper.job_destination try: job_wrapper.prepare() if hasattr(job_wrapper, 'prepare_input_files_cmds') and job_wrapper.prepare_input_files_cmds is not None: - for cmd in job_wrapper.prepare_input_files_cmds: # run the commands to stage the input files + for cmd in job_wrapper.prepare_input_files_cmds: # run the commands to stage the input files #log.debug( 'executing: %s' % cmd ) if 0 != os.system(cmd): raise Exception('Error running file staging command: %s' % cmd) - job_wrapper.prepare_input_files_cmds = None # prevent them from being used in-line + job_wrapper.prepare_input_files_cmds = None # prevent them from being used in-line command_line = self.build_command_line( job_wrapper, include_metadata=False, include_work_dir_outputs=False ) except: job_wrapper.fail( "failure preparing job", exception=True ) @@ -73,7 +73,6 @@ return try: - #log.debug( 'executing: %s' % command_line ) client = self.get_client_from_wrapper(job_wrapper) output_files = self.get_output_files(job_wrapper) input_files = job_wrapper.get_input_fnames() @@ -85,8 +84,7 @@ client.launch( rebuilt_command_line ) job_wrapper.set_job_destination( job_destination, job_id ) job_wrapper.change_state( model.Job.states.QUEUED ) - - except Exception, exc: + except: job_wrapper.fail( "failure running job", exception=True ) log.exception("failure running job %d" % job_wrapper.job_id) return @@ -124,23 +122,36 @@ client = self.get_client_from_state(job_state) run_results = client.raw_check_complete() - log.debug('run_results %s' % run_results ) stdout = run_results['stdout'] stderr = run_results['stderr'] + download_failure_exceptions = [] if job_wrapper.get_state() not in [ model.Job.states.ERROR, model.Job.states.DELETED ]: work_dir_outputs = self.get_work_dir_outputs(job_wrapper) output_files = self.get_output_files(job_wrapper) for source_file, output_file in work_dir_outputs: - client.download_work_dir_output(source_file, job_wrapper.working_directory, output_file) + try: + client.download_work_dir_output(source_file, job_wrapper.working_directory, output_file) + except Exception, e: + download_failure_exceptions.append(e) # Remove from full output_files list so don't try to download directly. output_files.remove(output_file) for output_file in output_files: - client.download_output(output_file, working_directory=job_wrapper.working_directory) - client.clean() + try: + client.download_output(output_file, working_directory=job_wrapper.working_directory) + except Exception, e: + download_failure_exceptions.append(e) + if download_failure_exceptions or self.app.config.cleanup_job == "always": + try: + client.clean() + except: + log.warn("Failed to cleanup remote LWR job") + if download_failure_exceptions: + job_wrapper.fail("Failed to find or download one or more job outputs from remote server.", exception=True) log.debug('execution finished: %s' % command_line) - except Exception, exc: - job_wrapper.fail( "failure running job", exception=True ) + except: + message = "Failed to communicate with remote job server." + job_wrapper.fail( message, exception=True ) log.exception("failure running job %d" % job_wrapper.job_id) return self._handle_metadata_externally( job_wrapper ) @@ -166,14 +177,14 @@ if e.errno == errno.ESRCH: log.debug( "check_pid(): PID %d is dead" % pid ) else: - log.warning( "check_pid(): Got errno %s when attempting to check PID %d: %s" %( errno.errorcode[e.errno], pid, e.strerror ) ) + log.warning( "check_pid(): Got errno %s when attempting to check PID %d: %s" % ( errno.errorcode[e.errno], pid, e.strerror ) ) return False def stop_job( self, job ): #if our local job has JobExternalOutputMetadata associated, then our primary job has to have already finished job_ext_output_metadata = job.get_external_output_metadata() if job_ext_output_metadata: - pid = job_ext_output_metadata[0].job_runner_external_pid #every JobExternalOutputMetadata has a pid set, we just need to take from one of them + pid = job_ext_output_metadata[0].job_runner_external_pid # every JobExternalOutputMetadata has a pid set, we just need to take from one of them if pid in [ None, '' ]: log.warning( "stop_job(): %s: no PID in database for job, unable to stop" % job.id ) return @@ -189,10 +200,10 @@ return # give up sleep( 2 ) if not self.check_pid( pid ): - log.debug( "stop_job(): %s: PID %d successfully killed with signal %d" %( job.id, pid, sig ) ) + log.debug( "stop_job(): %s: PID %d successfully killed with signal %d" % ( job.id, pid, sig ) ) return else: - log.warning( "stop_job(): %s: PID %d refuses to die after signaling TERM/KILL" %( job.id, pid ) ) + log.warning( "stop_job(): %s: PID %d refuses to die after signaling TERM/KILL" % ( job.id, pid ) ) else: # Remote kill lwr_url = job.job_runner_name @@ -201,7 +212,6 @@ client = self.get_client(job.destination_params, job_id) client.kill() - def recover( self, job, job_wrapper ): """Recovers jobs stuck in the queued/running state when Galaxy started""" job_state = AsynchronousJobState() diff -r 31714646a7b441f34a43748065278a1b08940c3c -r 3544960e892875f29ad9d64c8e99141bc4da4052 lib/galaxy/jobs/runners/lwr_client/client.py --- a/lib/galaxy/jobs/runners/lwr_client/client.py +++ b/lib/galaxy/jobs/runners/lwr_client/client.py @@ -19,6 +19,15 @@ return replacement +class OutputNotFoundException(Exception): + + def __init__(self, path): + self.path = path + + def __str__(self): + return "No remote output found for path %s" % self.path + + class Client(object): """ Objects of this client class perform low-level communication with a remote LWR server. @@ -163,7 +172,7 @@ elif output_type == "task": output_path = os.path.join(working_directory, name) else: - raise Exception("No remote output found for dataset with path %s" % path) + raise OutputNotFoundException(path) self.__raw_download_output(name, self.job_id, output_type, output_path) def __raw_download_output(self, name, job_id, output_type, output_path): https://bitbucket.org/galaxy/galaxy-central/commits/cac2f16e960c/ Changeset: cac2f16e960c User: dannon Date: 2013-06-16 20:09:39 Summary: Merged in jmchilton/galaxy-central-lwr (pull request #173) Improved exception and error handling conditions for the LWR job runner. PEP-8 fixes. Affected #: 2 files diff -r eb910f78fa5bb75eed2b967d1bdac69b2ae47894 -r cac2f16e960c698101f16f9cb6191f63eb03e6a8 lib/galaxy/jobs/runners/lwr.py --- a/lib/galaxy/jobs/runners/lwr.py +++ b/lib/galaxy/jobs/runners/lwr.py @@ -14,6 +14,7 @@ __all__ = [ 'LwrJobRunner' ] + class LwrJobRunner( AsynchronousJobRunner ): """ LWR Job Runner @@ -49,18 +50,17 @@ return job_state def queue_job(self, job_wrapper): - stderr = stdout = command_line = '' - + command_line = '' job_destination = job_wrapper.job_destination try: job_wrapper.prepare() if hasattr(job_wrapper, 'prepare_input_files_cmds') and job_wrapper.prepare_input_files_cmds is not None: - for cmd in job_wrapper.prepare_input_files_cmds: # run the commands to stage the input files + for cmd in job_wrapper.prepare_input_files_cmds: # run the commands to stage the input files #log.debug( 'executing: %s' % cmd ) if 0 != os.system(cmd): raise Exception('Error running file staging command: %s' % cmd) - job_wrapper.prepare_input_files_cmds = None # prevent them from being used in-line + job_wrapper.prepare_input_files_cmds = None # prevent them from being used in-line command_line = self.build_command_line( job_wrapper, include_metadata=False, include_work_dir_outputs=False ) except: job_wrapper.fail( "failure preparing job", exception=True ) @@ -73,7 +73,6 @@ return try: - #log.debug( 'executing: %s' % command_line ) client = self.get_client_from_wrapper(job_wrapper) output_files = self.get_output_files(job_wrapper) input_files = job_wrapper.get_input_fnames() @@ -85,8 +84,7 @@ client.launch( rebuilt_command_line ) job_wrapper.set_job_destination( job_destination, job_id ) job_wrapper.change_state( model.Job.states.QUEUED ) - - except Exception, exc: + except: job_wrapper.fail( "failure running job", exception=True ) log.exception("failure running job %d" % job_wrapper.job_id) return @@ -124,23 +122,36 @@ client = self.get_client_from_state(job_state) run_results = client.raw_check_complete() - log.debug('run_results %s' % run_results ) stdout = run_results['stdout'] stderr = run_results['stderr'] + download_failure_exceptions = [] if job_wrapper.get_state() not in [ model.Job.states.ERROR, model.Job.states.DELETED ]: work_dir_outputs = self.get_work_dir_outputs(job_wrapper) output_files = self.get_output_files(job_wrapper) for source_file, output_file in work_dir_outputs: - client.download_work_dir_output(source_file, job_wrapper.working_directory, output_file) + try: + client.download_work_dir_output(source_file, job_wrapper.working_directory, output_file) + except Exception, e: + download_failure_exceptions.append(e) # Remove from full output_files list so don't try to download directly. output_files.remove(output_file) for output_file in output_files: - client.download_output(output_file, working_directory=job_wrapper.working_directory) - client.clean() + try: + client.download_output(output_file, working_directory=job_wrapper.working_directory) + except Exception, e: + download_failure_exceptions.append(e) + if download_failure_exceptions or self.app.config.cleanup_job == "always": + try: + client.clean() + except: + log.warn("Failed to cleanup remote LWR job") + if download_failure_exceptions: + job_wrapper.fail("Failed to find or download one or more job outputs from remote server.", exception=True) log.debug('execution finished: %s' % command_line) - except Exception, exc: - job_wrapper.fail( "failure running job", exception=True ) + except: + message = "Failed to communicate with remote job server." + job_wrapper.fail( message, exception=True ) log.exception("failure running job %d" % job_wrapper.job_id) return self._handle_metadata_externally( job_wrapper ) @@ -166,14 +177,14 @@ if e.errno == errno.ESRCH: log.debug( "check_pid(): PID %d is dead" % pid ) else: - log.warning( "check_pid(): Got errno %s when attempting to check PID %d: %s" %( errno.errorcode[e.errno], pid, e.strerror ) ) + log.warning( "check_pid(): Got errno %s when attempting to check PID %d: %s" % ( errno.errorcode[e.errno], pid, e.strerror ) ) return False def stop_job( self, job ): #if our local job has JobExternalOutputMetadata associated, then our primary job has to have already finished job_ext_output_metadata = job.get_external_output_metadata() if job_ext_output_metadata: - pid = job_ext_output_metadata[0].job_runner_external_pid #every JobExternalOutputMetadata has a pid set, we just need to take from one of them + pid = job_ext_output_metadata[0].job_runner_external_pid # every JobExternalOutputMetadata has a pid set, we just need to take from one of them if pid in [ None, '' ]: log.warning( "stop_job(): %s: no PID in database for job, unable to stop" % job.id ) return @@ -189,10 +200,10 @@ return # give up sleep( 2 ) if not self.check_pid( pid ): - log.debug( "stop_job(): %s: PID %d successfully killed with signal %d" %( job.id, pid, sig ) ) + log.debug( "stop_job(): %s: PID %d successfully killed with signal %d" % ( job.id, pid, sig ) ) return else: - log.warning( "stop_job(): %s: PID %d refuses to die after signaling TERM/KILL" %( job.id, pid ) ) + log.warning( "stop_job(): %s: PID %d refuses to die after signaling TERM/KILL" % ( job.id, pid ) ) else: # Remote kill lwr_url = job.job_runner_name @@ -201,7 +212,6 @@ client = self.get_client(job.destination_params, job_id) client.kill() - def recover( self, job, job_wrapper ): """Recovers jobs stuck in the queued/running state when Galaxy started""" job_state = AsynchronousJobState() diff -r eb910f78fa5bb75eed2b967d1bdac69b2ae47894 -r cac2f16e960c698101f16f9cb6191f63eb03e6a8 lib/galaxy/jobs/runners/lwr_client/client.py --- a/lib/galaxy/jobs/runners/lwr_client/client.py +++ b/lib/galaxy/jobs/runners/lwr_client/client.py @@ -19,6 +19,15 @@ return replacement +class OutputNotFoundException(Exception): + + def __init__(self, path): + self.path = path + + def __str__(self): + return "No remote output found for path %s" % self.path + + class Client(object): """ Objects of this client class perform low-level communication with a remote LWR server. @@ -163,7 +172,7 @@ elif output_type == "task": output_path = os.path.join(working_directory, name) else: - raise Exception("No remote output found for dataset with path %s" % path) + raise OutputNotFoundException(path) self.__raw_download_output(name, self.job_id, output_type, output_path) def __raw_download_output(self, name, job_id, output_type, output_path): Repository URL: https://bitbucket.org/galaxy/galaxy-central/ -- This is a commit notification from bitbucket.org. You are receiving this because you have the service enabled, addressing the recipient of this email.