commit/galaxy-central: smcmanus: The task runner now chooses an exit code based on the tasks' exit codes - if the tasks are successful, then the last exit code scanned is chosen, and if a task fails then the first scanned failing task's exit code is chosen. The JobWrapper's fail method also stores stdout, stderr, and the exit code if they're available, though this will currently only applies to when a Job using the task runner fails. A null/None exit code is also now supported.
1 new commit in galaxy-central: https://bitbucket.org/galaxy/galaxy-central/changeset/3d07a7800f9a/ changeset: 3d07a7800f9a user: smcmanus date: 2012-10-03 00:20:12 summary: The task runner now chooses an exit code based on the tasks' exit codes - if the tasks are successful, then the last exit code scanned is chosen, and if a task fails then the first scanned failing task's exit code is chosen. The JobWrapper's fail method also stores stdout, stderr, and the exit code if they're available, though this will currently only applies to when a Job using the task runner fails. A null/None exit code is also now supported. affected #: 4 files diff -r 6d45fd1d830ee85edcdc07ba93831c316102e81a -r 3d07a7800f9af46e46c1f3ad3f0fe432949f3b51 lib/galaxy/jobs/__init__.py --- a/lib/galaxy/jobs/__init__.py +++ b/lib/galaxy/jobs/__init__.py @@ -204,7 +204,7 @@ self.version_string_cmd = self.tool.version_string_cmd return extra_filenames - def fail( self, message, exception=False ): + def fail( self, message, exception=False, stdout="", stderr="", exit_code=None ): """ Indicate job failure by setting state and message on all output datasets. @@ -249,6 +249,20 @@ job.state = job.states.ERROR job.command_line = self.command_line job.info = message + # TODO: Put setting the stdout, stderr, and exit code in one place + # (not duplicated with the finish method). + if ( len( stdout ) > 32768 ): + stdout = stdout[:32768] + log.info( "stdout for job %d is greater than 32K, only first part will be logged to database" % job.id ) + job.stdout = stdout + if ( len( stderr ) > 32768 ): + stderr = stderr[:32768] + log.info( "stderr for job %d is greater than 32K, only first part will be logged to database" % job.id ) + job.stderr = stderr + # Let the exit code be Null if one is not provided: + if ( exit_code != None ): + job.exit_code = exit_code + self.sa_session.add( job ) self.sa_session.flush() #Perform email action even on failure. @@ -290,7 +304,7 @@ self.sa_session.add( job ) self.sa_session.flush() - def finish( self, stdout, stderr, tool_exit_code=0 ): + def finish( self, stdout, stderr, tool_exit_code=None ): """ Called to indicate that the associated command has been run. Updates the output datasets based on stderr and stdout from the command, and @@ -300,25 +314,38 @@ self.sa_session.expunge_all() job = self.get_job() + # TODO: After failing here, consider returning from the function. try: self.reclaim_ownership() except: - self.fail( job.info ) log.exception( '(%s) Failed to change ownership of %s, failing' % ( job.id, self.working_directory ) ) + return self.fail( job.info, stdout=stdout, stderr=stderr, exit_code=tool_exit_code ) + log.debug( "############## JobWrapper.finish: %s exit code" + % ( "None" if None == tool_exit_code else str(tool_exit_code))) # if the job was deleted, don't finish it if job.state == job.states.DELETED or job.state == job.states.ERROR: - #ERROR at this point means the job was deleted by an administrator. - return self.fail( job.info ) + # ERROR at this point means the job was deleted by an administrator. + # SM: Note that, at this point, the exit code must be saved in case + # there was an error. Errors caught here could mean that the job + # was deleted by an administrator (based on old comments), but it + # could also mean that a job was broken up into tasks and one of + # the tasks failed. So + return self.fail( job.info, stderr=stderr, stdout=stdout, exit_code=tool_exit_code ) # Check the tool's stdout, stderr, and exit code for errors, but only # if the job has not already been marked as having an error. # The job's stdout and stderr will be set accordingly. + log.debug( "############## JobWrapper.finish: Post-check exit code: %s/%s" + % ( ( "None" if None == tool_exit_code else str(tool_exit_code) ), + ( "None" if None == job.exit_code else str(job.exit_code) ) ) ) if job.states.ERROR != job.state: if ( self.check_tool_output( stdout, stderr, tool_exit_code, job )): job.state = job.states.OK else: job.state = job.states.ERROR + log.debug( "############## JobWrapper.finish: Post-check exit code: %s" + % ( "None" if None == tool_exit_code else str(tool_exit_code))) if self.version_string_cmd: version_filename = self.get_version_string_path() @@ -358,7 +385,6 @@ # TODO: The context['stderr'] holds stderr's contents. An error # only really occurs if the job also has an error. So check the # job's state: - #if context['stderr']: if job.states.ERROR == job.state: dataset.blurb = "error" elif dataset.has_data(): @@ -435,12 +461,18 @@ self.sa_session.flush() # Save stdout and stderr if len( job.stdout ) > 32768: - log.error( "stdout for job %d is greater than 32K, only first part will be logged to database" % job.id ) + log.info( "stdout for job %d is greater than 32K, only first part will be logged to database" % job.id ) job.stdout = job.stdout[:32768] if len( job.stderr ) > 32768: - log.error( "stderr for job %d is greater than 32K, only first part will be logged to database" % job.id ) + log.info( "stderr for job %d is greater than 32K, only first part will be logged to database" % job.id ) job.stderr = job.stderr[:32768] - job.exit_code = tool_exit_code + # The exit code will be null if there is no exit code to be set. + # This is so that we don't assign an exit code, such as 0, that + # is either incorrect or has the wrong semantics. + if None != tool_exit_code: + job.exit_code = tool_exit_code + log.debug( "############## JobWrapper.finish: storing %s exit code" + % ( "None" if None == job.exit_code else str(job.exit_code))) # custom post process setup inp_data = dict( [ ( da.name, da.dataset ) for da in job.input_datasets ] ) out_data = dict( [ ( da.name, da.dataset ) for da in job.output_datasets ] ) @@ -513,26 +545,27 @@ # that range, then apply the error level and add a message. # If we've reached a fatal error rule, then stop. max_error_level = galaxy.tools.StdioErrorLevel.NO_ERROR - for stdio_exit_code in self.tool.stdio_exit_codes: - if ( tool_exit_code >= stdio_exit_code.range_start and - tool_exit_code <= stdio_exit_code.range_end ): - # Tack on a generic description of the code - # plus a specific code description. For example, - # this might prepend "Job 42: Warning: Out of Memory\n". - code_desc = stdio_exit_code.desc - if ( None == code_desc ): - code_desc = "" - tool_msg = ( "%s: Exit code %d: %s" % ( - galaxy.tools.StdioErrorLevel.desc( stdio_exit_code.error_level ), - tool_exit_code, - code_desc ) ) - log.info( "Job %s: %s" % (job.get_id_tag(), tool_msg) ) - stderr = tool_msg + "\n" + stderr - max_error_level = max( max_error_level, - stdio_exit_code.error_level ) - if ( max_error_level >= - galaxy.tools.StdioErrorLevel.FATAL ): - break + if tool_exit_code != None: + for stdio_exit_code in self.tool.stdio_exit_codes: + if ( tool_exit_code >= stdio_exit_code.range_start and + tool_exit_code <= stdio_exit_code.range_end ): + # Tack on a generic description of the code + # plus a specific code description. For example, + # this might prepend "Job 42: Warning (Out of Memory)\n". + code_desc = stdio_exit_code.desc + if ( None == code_desc ): + code_desc = "" + tool_msg = ( "%s: Exit code %d (%s)" % ( + galaxy.tools.StdioErrorLevel.desc( stdio_exit_code.error_level ), + tool_exit_code, + code_desc ) ) + log.info( "Job %s: %s" % (job.get_id_tag(), tool_msg) ) + stderr = tool_msg + "\n" + stderr + max_error_level = max( max_error_level, + stdio_exit_code.error_level ) + if ( max_error_level >= + galaxy.tools.StdioErrorLevel.FATAL ): + break if max_error_level < galaxy.tools.StdioErrorLevel.FATAL: # We'll examine every regex. Each regex specifies whether @@ -1013,6 +1046,11 @@ self.sa_session.refresh( task ) return task.state + def get_exit_code( self ): + task = self.get_task() + self.sa_session.refresh( task ) + return task.exit_code + def set_runner( self, runner_url, external_id ): task = self.get_task() self.sa_session.refresh( task ) @@ -1022,7 +1060,7 @@ self.sa_session.add( task ) self.sa_session.flush() - def finish( self, stdout, stderr, tool_exit_code=0 ): + def finish( self, stdout, stderr, tool_exit_code=None ): # DBTODO integrate previous finish logic. # Simple finish for tasks. Just set the flag OK. """ @@ -1032,7 +1070,8 @@ """ # This may have ended too soon log.debug( 'task %s for job %d ended; exit code: %d' - % (self.task_id, self.job_id, tool_exit_code) ) + % (self.task_id, self.job_id, + tool_exit_code if tool_exit_code != None else -256 ) ) # default post job setup_external_metadata self.sa_session.expunge_all() task = self.get_task() diff -r 6d45fd1d830ee85edcdc07ba93831c316102e81a -r 3d07a7800f9af46e46c1f3ad3f0fe432949f3b51 lib/galaxy/jobs/runners/local.py --- a/lib/galaxy/jobs/runners/local.py +++ b/lib/galaxy/jobs/runners/local.py @@ -111,11 +111,8 @@ if sleep_time < 8: # So we don't stat every second sleep_time *= 2 - # Reap the process and get the exit code. The exit code should - # only be None if the process isn't finished, but check anyway. - exit_code = proc.wait() # reap - if None == exit_code: - exit_code = 0 + # Reap the process and get the exit code. + exit_code = proc.wait() stdout_file.seek( 0 ) stderr_file.seek( 0 ) stdout = stdout_file.read( 32768 ) @@ -146,7 +143,10 @@ # Finish the job! try: - job_wrapper.finish( stdout, stderr, exit_code ) + #job_wrapper.finish( stdout, stderr, exit_code ) + # DELETEME: This is for testing how null exit codes are handled: + log.debug( "############## Finishing job - None exit code" ) + job_wrapper.finish( stdout, stderr, None ) except: log.exception("Job wrapper finish method failed") job_wrapper.fail("Unable to finish job", exception=True) diff -r 6d45fd1d830ee85edcdc07ba93831c316102e81a -r 3d07a7800f9af46e46c1f3ad3f0fe432949f3b51 lib/galaxy/jobs/runners/tasks.py --- a/lib/galaxy/jobs/runners/tasks.py +++ b/lib/galaxy/jobs/runners/tasks.py @@ -56,6 +56,16 @@ job_wrapper.fail( "failure preparing job", exception=True ) log.exception("failure running job %d" % job_wrapper.job_id) return + + # This is the job's exit code, which will depend on the tasks' + # exit code. The overall job's exit code will be one of two values: + # o if the job is successful, then the last task scanned will be + # used to determine the exit code. Note that this is not the same + # thing as the last task to complete, which could be added later. + # o if a task fails, then the job will fail and the failing task's + # exit code will become the job's exit code. + job_exit_code = "" + # If we were able to get a command line, run the job. ( must be passed to tasks ) if command_line: try: @@ -69,7 +79,9 @@ job_wrapper.fail("Job Splitting Failed, no match for '%s'" % job_wrapper.tool.parallelism) return tasks = splitter.do_split(job_wrapper) - # Not an option for now. Task objects don't *do* anything useful yet, but we'll want them tracked outside this thread to do anything. + # Not an option for now. Task objects don't *do* anything + # useful yet, but we'll want them tracked outside this thread + # to do anything. # if track_tasks_in_database: task_wrappers = [] for task in tasks: @@ -100,25 +112,31 @@ # Deleted tasks are not included right now. # while tasks_complete is False: + log.debug( "************ Rechecking tasks" ) count_complete = 0 tasks_complete = True for tw in task_wrappers: task_state = tw.get_state() + log.debug( "***** Checking task %d: state %s" + % (tw.task_id, task_state) ) if ( model.Task.states.ERROR == task_state ): - log.debug( "Canceling job %d: Task %d returned an error" - % ( tw.job_id, tw.task_id ) ) + job_exit_code = tw.get_exit_code() + log.debug( "Canceling job %d: Task %s returned an error (exit code %d)" + % ( tw.job_id, tw.task_id, job_exit_code ) ) self.cancel_job( job_wrapper, task_wrappers ) tasks_complete = True break elif not task_state in completed_states: tasks_complete = False else: + job_exit_code = tw.get_exit_code() count_complete = count_complete + 1 if tasks_complete is False: sleep( sleep_time ) if sleep_time < 8: sleep_time *= 2 import time + log.debug( "####################### Finished with tasks; job exit code: %d" % job_exit_code ) job_wrapper.reclaim_ownership() # if running as the actual user, change ownership before merging. log.debug('execution finished - beginning merge: %s' % command_line) @@ -146,7 +164,8 @@ # Finish the job try: - job_wrapper.finish( stdout, stderr ) + log.debug( "$$$$$$$$$$$$$$ job_exit_code before finish: %d" % job_exit_code ) + job_wrapper.finish( stdout, stderr, job_exit_code ) except: log.exception("Job wrapper finish method failed") job_wrapper.fail("Unable to finish job", exception=True) diff -r 6d45fd1d830ee85edcdc07ba93831c316102e81a -r 3d07a7800f9af46e46c1f3ad3f0fe432949f3b51 lib/galaxy/model/__init__.py --- a/lib/galaxy/model/__init__.py +++ b/lib/galaxy/model/__init__.py @@ -134,7 +134,7 @@ self.post_job_actions = [] self.imported = False self.handler = None - self.exit_code = 0 + self.exit_code = None # TODO: Add accessors for members defined in SQL Alchemy for the Job table and # for the mapper defined to the Job table. @@ -317,10 +317,9 @@ self.task_runner_name = None self.task_runner_external_id = None self.job = job - # SM: Using default empty strings avoids None exceptions later on. self.stdout = "" self.stderr = "" - self.exit_code = 0 + self.exit_code = None self.prepare_input_files_cmd = prepare_files_cmd def get_param_values( self, app ): Repository URL: https://bitbucket.org/galaxy/galaxy-central/ -- This is a commit notification from bitbucket.org. You are receiving this because you have the service enabled, addressing the recipient of this email.
participants (1)
-
Bitbucket