details: http://www.bx.psu.edu/hg/galaxy/rev/b4ba343d3540 changeset: 3335:b4ba343d3540 user: Nate Coraor <nate@bx.psu.edu> date: Fri Feb 05 10:04:04 2010 -0500 description: Add (most of) Assaf Gordon's patch for allowing SGE options in the runner url, and add the same support to PBS. Please note that this should be considered a temporary implementation, and will be removed in the future. diffstat: lib/galaxy/jobs/__init__.py | 9 +++++ lib/galaxy/jobs/runners/pbs.py | 69 +++++++++++++++++++++++++---------------- lib/galaxy/jobs/runners/sge.py | 35 +++++++++++++++++---- 3 files changed, 79 insertions(+), 34 deletions(-) diffs (188 lines): diff -r 51a1369fdf96 -r b4ba343d3540 lib/galaxy/jobs/__init__.py --- a/lib/galaxy/jobs/__init__.py Fri Feb 05 09:23:19 2010 -0500 +++ b/lib/galaxy/jobs/__init__.py Fri Feb 05 10:04:04 2010 -0500 @@ -719,6 +719,7 @@ for outfile in [ str( o ) for o in output_paths ]: sizes.append( ( outfile, os.stat( outfile ).st_size ) ) return sizes + def setup_external_metadata( self, exec_dir = None, tmp_dir = None, dataset_files_path = None, config_root = None, datatypes_config = None, set_extension = True, **kwds ): # extension could still be 'auto' if this is the upload tool. job = self.sa_session.query( model.Job ).get( self.job_id ) @@ -747,6 +748,14 @@ job_metadata = os.path.join( self.working_directory, TOOL_PROVIDED_JOB_METADATA_FILE ), **kwds ) + @property + def user( self ): + job = self.sa_session.query( model.Job ).get( self.job_id ) + if job.history.user is None: + return 'anonymous@' + job.galaxy_session.remote_addr.split()[-1] + else: + return job.history.user.email + class DefaultJobDispatcher( object ): def __init__( self, app ): self.app = app diff -r 51a1369fdf96 -r b4ba343d3540 lib/galaxy/jobs/runners/pbs.py --- a/lib/galaxy/jobs/runners/pbs.py Fri Feb 05 09:23:19 2010 -0500 +++ b/lib/galaxy/jobs/runners/pbs.py Fri Feb 05 10:04:04 2010 -0500 @@ -124,12 +124,27 @@ def determine_pbs_queue( self, url ): """Determine what PBS queue we are submitting to""" - url_split = url.split("/") - queue = url_split[3] - if queue == "": - # None == server's default queue - queue = None - return queue + try: + return url.split('/')[3] or None + except: + return None + + def determine_pbs_options( self, url ): + try: + opts = url.split('/')[4].strip().lstrip('-').split(' -') + assert opts != [''] + except: + return [] + rval = [] + for opt in opts: + name, value = opt.split( None, 1 ) + if name == 'l': + resource_attrs = value.split(',') + for j, ( res, val ) in enumerate( [ a.split('=', 1) for a in resource_attrs ] ): + rval.append( dict( name = pbs.ATTR_l, value = val, resource = res ) ) + else: + rval.append( dict( name = getattr( pbs, 'ATTR_' + name ), value = value ) ) + return rval def run_next( self ): """ @@ -175,6 +190,7 @@ ( pbs_server_name, runner_url ) = self.determine_pbs_server( runner_url, rewrite = True ) pbs_queue_name = self.determine_pbs_queue( runner_url ) + pbs_options = self.determine_pbs_options( runner_url ) c = pbs.pbs_connect( pbs_server_name ) if c <= 0: job_wrapper.fail( "Unable to queue job for execution. Resubmitting the job may succeed." ) @@ -185,7 +201,6 @@ ofile = "%s/%s.o" % (self.app.config.cluster_files_directory, job_wrapper.job_id) efile = "%s/%s.e" % (self.app.config.cluster_files_directory, job_wrapper.job_id) - output_fnames = job_wrapper.get_output_fnames() # If an application server is set, we're staging @@ -195,28 +210,28 @@ output_files = [ str( o ) for o in output_fnames ] stagein = self.get_stage_in_out( job_wrapper.get_input_fnames() + output_files, symlink=True ) stageout = self.get_stage_in_out( output_files ) - job_attrs = pbs.new_attropl(5) - job_attrs[0].name = pbs.ATTR_o - job_attrs[0].value = pbs_ofile - job_attrs[1].name = pbs.ATTR_e - job_attrs[1].value = pbs_efile - job_attrs[2].name = pbs.ATTR_stagein - job_attrs[2].value = stagein - job_attrs[3].name = pbs.ATTR_stageout - job_attrs[3].value = stageout - job_attrs[4].name = pbs.ATTR_N - job_attrs[4].value = "%s_%s" % ( job_wrapper.job_id, job_wrapper.tool.id ) - exec_dir = os.path.abspath( job_wrapper.working_directory ) + attrs = [ + dict( name = pbs.ATTR_o, value = pbs_ofile ), + dict( name = pbs.ATTR_e, value = pbs_efile ), + dict( name = pbs.ATTR_stagein, value = stagein ), + dict( name = pbs.ATTR_stageout, value = stageout ), + ] # If not, we're using NFS else: - job_attrs = pbs.new_attropl(3) - job_attrs[0].name = pbs.ATTR_o - job_attrs[0].value = ofile - job_attrs[1].name = pbs.ATTR_e - job_attrs[1].value = efile - job_attrs[2].name = pbs.ATTR_N - job_attrs[2].value = "%s_%s" % ( job_wrapper.job_id, job_wrapper.tool.id ) - exec_dir = os.path.abspath( job_wrapper.working_directory ) + attrs = [ + dict( name = pbs.ATTR_o, value = ofile ), + dict( name = pbs.ATTR_e, value = efile ), + ] + + # define PBS job options + attrs.append( dict( name = pbs.ATTR_N, value = str( "%s_%s_%s" % ( job_wrapper.job_id, job_wrapper.tool.id, job_wrapper.user ) ) ) ) + job_attrs = pbs.new_attropl( len( attrs ) + len( pbs_options ) ) + for i, attr in enumerate( attrs + pbs_options ): + job_attrs[i].name = attr['name'] + job_attrs[i].value = attr['value'] + if 'resource' in attr: + job_attrs[i].resource = attr['resource'] + exec_dir = os.path.abspath( job_wrapper.working_directory ) # write the job script if self.app.config.pbs_stage_path != '': diff -r 51a1369fdf96 -r b4ba343d3540 lib/galaxy/jobs/runners/sge.py --- a/lib/galaxy/jobs/runners/sge.py Fri Feb 05 09:23:19 2010 -0500 +++ b/lib/galaxy/jobs/runners/sge.py Fri Feb 05 10:04:04 2010 -0500 @@ -94,12 +94,24 @@ def determine_sge_queue( self, url ): """Determine what SGE queue we are submitting to""" - url_split = url.split("/") - queue = url_split[3] - if queue == "": - # None == server's default queue - queue = None - return queue + try: + return url.split('/')[3] or None + except: + return None + + def determine_sge_project( self, url ): + """Determine what SGE project we are submitting to""" + try: + return url.split('/')[4] or None + except: + return None + + def determine_sge_tool_parameters( self, url ): + """Determine what are the tool's specific paramters""" + try: + return url.split('/')[5] or None + except: + return None def queue_job( self, job_wrapper ): """Create SGE script for a job and submit it to the SGE queue""" @@ -132,6 +144,8 @@ # TODO: support multiple cells log.warning( "(%s) Using multiple SGE cells is not supported. This job will be submitted to the default cell." % job_wrapper.job_id ) sge_queue_name = self.determine_sge_queue( runner_url ) + sge_project_name = self.determine_sge_project( runner_url ) + sge_extra_params = self.determine_sge_tool_parameters ( runner_url ) # define job attributes ofile = "%s/database/pbs/%s.o" % (os.getcwd(), job_wrapper.job_id) @@ -140,8 +154,15 @@ jt.remoteCommand = "%s/database/pbs/galaxy_%s.sh" % (os.getcwd(), job_wrapper.job_id) jt.outputPath = ":%s" % ofile jt.errorPath = ":%s" % efile + nativeSpec = [] if sge_queue_name is not None: - jt.setNativeSpecification( "-q %s" % sge_queue_name ) + nativeSpec.append( "-q '%s'" % sge_queue_name ) + if sge_project_name is not None: + nativeSpec.append( "-P '%s'" % sge_project_name) + if sge_extra_params is not None: + nativeSpec.append( sge_extra_params ) + if len(nativeSpec)>0: + jt.nativeSpecification = ' '.join(nativeSpec) script = sge_template % (job_wrapper.galaxy_lib_dir, os.path.abspath( job_wrapper.working_directory ), command_line) fh = file( jt.remoteCommand, "w" )