# HG changeset patch -- Bitbucket.org # Project galaxy-dist # URL http://bitbucket.org/galaxy/galaxy-dist/overview # User James Taylor <james@jamestaylor.org> # Date 1288983797 14400 # Node ID 1e8d7c6ad88b074b33b2029f3baa15c2eb929e70 # Parent 32e5efb7a7d5606a4dc1c8808fe4888ade2892e8 Add support for tool dependency injection to all cluster runners --- a/lib/galaxy/tools/deps/__init__.py +++ b/lib/galaxy/tools/deps/__init__.py @@ -51,7 +51,7 @@ class DependencyManager( object ): if os.path.exists( script ): return script, path, version else: - return None, None + return None, None, None def _find_dep_default( self, name ): version = None @@ -64,6 +64,6 @@ class DependencyManager( object ): if os.path.exists( script ): return script, real_path, real_version else: - return None, None + return None, None, None --- a/lib/galaxy/jobs/runners/__init__.py +++ b/lib/galaxy/jobs/runners/__init__.py @@ -0,0 +1,32 @@ +import os, os.path + +class BaseJobRunner( object ): + + def build_command_line( self, job_wrapper, include_metadata=False ): + """ + Compose the sequence of commands neccesary to execute a job. This will + currently include: + - environment settings corresponding to any requirement tags + - command line taken from job wrapper + - commands to set metadata (if include_metadata is True) + """ + commands = job_wrapper.get_command_line() + # All job runners currently handle this case which should never + # occur + if not commands: + return None + # Prepend dependency injection + if job_wrapper.dependency_shell_commands: + commands = "; ".join( job_wrapper.dependency_shell_commands + [ commands ] ) + # Append metadata setting commands, we don't want to overwrite metadata + # that was copied over in init_meta(), as per established behavior + if include_metadata and self.app.config.set_metadata_externally: + commands += "; cd %s; " % os.path.abspath( os.getcwd() ) + commands += job_wrapper.setup_external_metadata( + exec_dir = os.path.abspath( os.getcwd() ), + tmp_dir = self.app.config.new_file_path, + dataset_files_path = self.app.model.Dataset.file_path, + output_fnames = output_fnames, + set_extension = False, + kwds = { 'overwrite' : False } ) + return commands --- a/lib/galaxy/jobs/runners/local.py +++ b/lib/galaxy/jobs/runners/local.py @@ -5,13 +5,14 @@ import threading from galaxy import model from galaxy.datatypes.data import nice_size +from galaxy.jobs.runners import BaseJobRunner import os, errno from time import sleep log = logging.getLogger( __name__ ) -class LocalJobRunner( object ): +class LocalJobRunner( BaseJobRunner ): """ Job runner backed by a finite pool of worker threads. FIFO scheduling """ @@ -53,9 +54,7 @@ class LocalJobRunner( object ): # Prepare the job to run try: job_wrapper.prepare() - command_line = job_wrapper.get_command_line() - if job_wrapper.dependency_shell_commands: - command_line = "; ".join( job_wrapper.dependency_shell_commands + [ command_line ] ) + command_line = self.build_command_line( job_wrapper ) except: job_wrapper.fail( "failure preparing job", exception=True ) log.exception("failure running job %d" % job_wrapper.job_id) --- a/lib/galaxy/jobs/runners/sge.py +++ b/lib/galaxy/jobs/runners/sge.py @@ -2,6 +2,8 @@ import os, logging, threading, time from Queue import Queue, Empty from galaxy import model +from galaxy.jobs.runners import BaseJobRunner + from paste.deploy.converters import asbool import pkg_resources @@ -58,7 +60,7 @@ class SGEJobState( object ): self.efile = None self.runner_url = None -class SGEJobRunner( object ): +class SGEJobRunner( BaseJobRunner ): """ Job runner backed by a finite pool of worker threads. FIFO scheduling """ @@ -144,7 +146,7 @@ class SGEJobRunner( object ): try: job_wrapper.prepare() - command_line = job_wrapper.get_command_line() + command_line = self.build_command_line( job_wrapper, include_metadata = True ) except: job_wrapper.fail( "failure preparing job", exception=True ) log.exception("failure running job %d" % job_wrapper.job_id) @@ -191,14 +193,7 @@ class SGEJobRunner( object ): jt.nativeSpecification = ' '.join(nativeSpec) script = sge_template % (job_wrapper.galaxy_lib_dir, os.path.abspath( job_wrapper.working_directory ), command_line) - if self.app.config.set_metadata_externally: - script += "cd %s\n" % os.path.abspath( os.getcwd() ) - script += "%s\n" % job_wrapper.setup_external_metadata( exec_dir = os.path.abspath( os.getcwd() ), - tmp_dir = self.app.config.new_file_path, - dataset_files_path = self.app.model.Dataset.file_path, - output_fnames = job_wrapper.get_output_fnames(), - set_extension = False, - kwds = { 'overwrite' : False } ) #we don't want to overwrite metadata that was copied over in init_meta(), as per established behavior + fh = file( jt.remoteCommand, "w" ) fh.write( script ) fh.close() --- a/lib/galaxy/jobs/runners/drmaa.py +++ b/lib/galaxy/jobs/runners/drmaa.py @@ -2,6 +2,8 @@ import os, logging, threading, time from Queue import Queue, Empty from galaxy import model +from galaxy.jobs.runners import BaseJobRunner + from paste.deploy.converters import asbool import pkg_resources @@ -58,7 +60,7 @@ class DRMAAJobState( object ): self.efile = None self.runner_url = None -class DRMAAJobRunner( object ): +class DRMAAJobRunner( BaseJobRunner ): """ Job runner backed by a finite pool of worker threads. FIFO scheduling """ @@ -120,7 +122,7 @@ class DRMAAJobRunner( object ): try: job_wrapper.prepare() - command_line = job_wrapper.get_command_line() + command_line = self.build_command_line( job_wrapper, include_metadata=True ) except: job_wrapper.fail( "failure preparing job", exception=True ) log.exception("failure running job %d" % job_wrapper.job_id) @@ -154,14 +156,6 @@ class DRMAAJobRunner( object ): jt.nativeSpecification = native_spec script = drm_template % (job_wrapper.galaxy_lib_dir, os.path.abspath( job_wrapper.working_directory ), command_line) - if self.app.config.set_metadata_externally: - script += "cd %s\n" % os.path.abspath( os.getcwd() ) - script += "%s\n" % job_wrapper.setup_external_metadata( exec_dir = os.path.abspath( os.getcwd() ), - tmp_dir = self.app.config.new_file_path, - dataset_files_path = self.app.model.Dataset.file_path, - output_fnames = job_wrapper.get_output_fnames(), - set_extension = False, - kwds = { 'overwrite' : False } ) #we don't want to overwrite metadata that was copied over in init_meta(), as per established behavior fh = file( jt.remoteCommand, "w" ) fh.write( script ) fh.close() --- a/lib/galaxy/jobs/runners/pbs.py +++ b/lib/galaxy/jobs/runners/pbs.py @@ -5,6 +5,7 @@ from Queue import Queue, Empty from galaxy import model from galaxy.datatypes.data import nice_size from galaxy.util.bunch import Bunch +from galaxy.jobs.runners import BaseJobRunner from paste.deploy.converters import asbool @@ -80,7 +81,7 @@ class PBSJobState( object ): self.check_count = 0 self.stop_job = False -class PBSJobRunner( object ): +class PBSJobRunner( BaseJobRunner ): """ Job runner backed by a finite pool of worker threads. FIFO scheduling """ @@ -183,7 +184,7 @@ class PBSJobRunner( object ): try: job_wrapper.prepare() - command_line = job_wrapper.get_command_line() + command_line = self.build_command_line( job_wrapper, include_metadata=not( self.app.config.pbs_stage_path ) ) except: job_wrapper.fail( "failure preparing job", exception=True ) log.exception("failure running job %d" % job_wrapper.job_id) @@ -253,14 +254,6 @@ class PBSJobRunner( object ): script = pbs_symlink_template % (job_wrapper.galaxy_lib_dir, " ".join(job_wrapper.get_input_fnames() + output_files), self.app.config.pbs_stage_path, exec_dir, command_line) else: script = pbs_template % ( job_wrapper.galaxy_lib_dir, exec_dir, command_line ) - if self.app.config.set_metadata_externally: - script += "cd %s\n" % os.path.abspath( os.getcwd() ) - script += "%s\n" % job_wrapper.setup_external_metadata( exec_dir = os.path.abspath( os.getcwd() ), - tmp_dir = self.app.config.new_file_path, - dataset_files_path = self.app.model.Dataset.file_path, - output_fnames = output_fnames, - set_extension = False, - kwds = { 'overwrite' : False } ) #we don't want to overwrite metadata that was copied over in init_meta(), as per established behavior job_file = "%s/%s.sh" % (self.app.config.cluster_files_directory, job_wrapper.job_id) fh = file(job_file, "w") fh.write(script)