galaxy-dist commit 2389c323a1e6: Fix for compatibility with the refactored base job runner build_command_line.
# HG changeset patch -- Bitbucket.org # Project galaxy-dist # URL http://bitbucket.org/galaxy/galaxy-dist/overview # User Dannon Baker <dannonbaker@me.com> # Date 1289568484 18000 # Node ID 2389c323a1e6a50de6d48d5431d16d830ccef021 # Parent ca23ea683d26df004e666ede82950c712e3ac637 Fix for compatibility with the refactored base job runner build_command_line. DRMAA runner now uses get_id_tag() in *Wrapper instead of job_id directly for creation of .sh .o and .e files, as well as some debugging. This fixes the issue where many tasks for the same job get submitted at the same time and these files overwrite one another or collide in the submission process. --- a/lib/galaxy/jobs/__init__.py +++ b/lib/galaxy/jobs/__init__.py @@ -295,6 +295,10 @@ class JobWrapper( object ): def get_job( self ): return self.sa_session.query( model.Job ).get( self.job_id ) + def get_id_tag(self): + # For compatability with drmaa, which uses job_id right now, and TaskWrapper + return str(self.job_id) + def get_param_dict( self ): """ Restore the dictionary of parameters from the database. @@ -788,6 +792,10 @@ class TaskWrapper(JobWrapper): def get_task( self ): return self.sa_session.query(model.Task).get(self.task_id) + def get_id_tag(self): + # For compatibility with drmaa job runner and TaskWrapper, instead of using job_id directly + return "%s_%s" % (self.job_id, self.task_id) + def get_param_dict( self ): """ Restore the dictionary of parameters from the database. @@ -964,7 +972,7 @@ class TaskWrapper(JobWrapper): def setup_external_metadata( self, exec_dir = None, tmp_dir = None, dataset_files_path = None, config_root = None, datatypes_config = None, set_extension = True, **kwds ): # There is no metadata setting for tasks. This is handled after the merge, at the job level. - pass + return "" @property def user( self ): --- a/lib/galaxy/jobs/runners/tasks.py +++ b/lib/galaxy/jobs/runners/tasks.py @@ -117,8 +117,12 @@ class TaskedJobRunner( object ): basic.merge(working_directory, output_filename) log.debug('execution finished: %s' % command_line) for tw in task_wrappers: - stdout += tw.get_task().stdout - stderr += tw.get_task().stderr + # Prevent repetitive output, e.g. "Sequence File Aligned"x20 + # Eventually do a reduce for jobs that output "N reads mapped", combining all N for tasks. + if stdout.strip() != tw.get_task().stdout.strip(): + stdout += tw.get_task().stdout + if stderr.strip() != tw.get_task().stderr.strip(): + stderr += tw.get_task().stderr except Exception: job_wrapper.fail( "failure running job", exception=True ) log.exception("failure running job %d" % job_wrapper.job_id) --- a/lib/galaxy/jobs/runners/drmaa.py +++ b/lib/galaxy/jobs/runners/drmaa.py @@ -125,7 +125,7 @@ class DRMAAJobRunner( BaseJobRunner ): command_line = self.build_command_line( job_wrapper, include_metadata=True ) except: job_wrapper.fail( "failure preparing job", exception=True ) - log.exception("failure running job %d" % job_wrapper.job_id) + log.exception("failure running job %d" % job_wrapper.get_id_tag()) return runner_url = job_wrapper.tool.job_runner @@ -137,7 +137,7 @@ class DRMAAJobRunner( BaseJobRunner ): # Check for deletion before we change state if job_wrapper.get_state() == model.Job.states.DELETED: - log.debug( "Job %s deleted by user before it entered the queue" % job_wrapper.job_id ) + log.debug( "Job %s deleted by user before it entered the queue" % job_wrapper.get_id_tag() ) job_wrapper.cleanup() return @@ -145,10 +145,10 @@ class DRMAAJobRunner( BaseJobRunner ): job_wrapper.change_state( model.Job.states.QUEUED ) # define job attributes - ofile = "%s/database/pbs/%s.o" % (os.getcwd(), job_wrapper.job_id) - efile = "%s/database/pbs/%s.e" % (os.getcwd(), job_wrapper.job_id) + ofile = "%s/database/pbs/%s.o" % (os.getcwd(), job_wrapper.get_id_tag()) + efile = "%s/database/pbs/%s.e" % (os.getcwd(), job_wrapper.get_id_tag()) jt = self.ds.createJobTemplate() - jt.remoteCommand = "%s/database/pbs/galaxy_%s.sh" % (os.getcwd(), job_wrapper.job_id) + jt.remoteCommand = "%s/database/pbs/galaxy_%s.sh" % (os.getcwd(), job_wrapper.get_id_tag()) jt.outputPath = ":%s" % ofile jt.errorPath = ":%s" % efile native_spec = self.get_native_spec( runner_url ) @@ -163,14 +163,16 @@ class DRMAAJobRunner( BaseJobRunner ): # job was deleted while we were preparing it if job_wrapper.get_state() == model.Job.states.DELETED: - log.debug( "Job %s deleted by user before it entered the queue" % job_wrapper.job_id ) + log.debug( "Job %s deleted by user before it entered the queue" % job_wrapper.get_id_tag() ) self.cleanup( ( ofile, efile, jt.remoteCommand ) ) job_wrapper.cleanup() return - galaxy_job_id = job_wrapper.job_id - log.debug("(%s) submitting file %s" % ( galaxy_job_id, jt.remoteCommand ) ) - log.debug("(%s) command is: %s" % ( galaxy_job_id, command_line ) ) + # wrapper.get_id_tag() instead of job_id for compatibility with TaskWrappers. + galaxy_id_tag = job_wrapper.get_id_tag() + + log.debug("(%s) submitting file %s" % ( galaxy_id_tag, jt.remoteCommand ) ) + log.debug("(%s) command is: %s" % ( galaxy_id_tag, command_line ) ) # runJob will raise if there's a submit problem job_id = self.ds.runJob(jt) log.info("(%s) queued as %s" % ( galaxy_job_id, job_id ) )
participants (1)
-
commits-noreply@bitbucket.org