6 new commits in galaxy-central: https://bitbucket.org/galaxy/galaxy-central/commits/432999eabbaa/ Changeset: 432999eabbaa User: jmchilton Date: 2013-10-17 00:37:37 Summary: Add job_script module from LWR including GALAXY_SLOTS logic. Use new job_script module in the DRMAA runner. This implements the long discussed GALAXY_SLOTS logic for PBS/TORQUE, SLURM, and grid engine when coming through the DRMAA runner. Affected #: 6 files diff -r b681b2aeddbc52c3d953abcb431688df6fb8a005 -r 432999eabbaa97528a2bd37f1b99de86b39a924f lib/galaxy/jobs/runners/__init__.py --- a/lib/galaxy/jobs/runners/__init__.py +++ b/lib/galaxy/jobs/runners/__init__.py @@ -15,6 +15,7 @@ from galaxy.jobs.command_factory import build_command from galaxy import model from galaxy.util import DATABASE_MAX_STRING_SIZE, shrink_stream_by_size +from galaxy.jobs.runners.util.job_script import job_script log = logging.getLogger( __name__ ) @@ -218,6 +219,16 @@ external_metadata_proc.wait() log.debug( 'execution of external set_meta for job %d finished' % job_wrapper.job_id ) + def get_job_file(self, job_wrapper, **kwds): + options = dict( + galaxy_lib=job_wrapper.galaxy_lib_dir, + env_setup_commands=job_wrapper.get_env_setup_clause(), + working_directory=os.path.abspath( job_wrapper.working_directory ), + command=job_wrapper.runner_command_line, + ) + options.update(**kwds) + return job_script(**options) + class AsynchronousJobState( object ): """ diff -r b681b2aeddbc52c3d953abcb431688df6fb8a005 -r 432999eabbaa97528a2bd37f1b99de86b39a924f lib/galaxy/jobs/runners/drmaa.py --- a/lib/galaxy/jobs/runners/drmaa.py +++ b/lib/galaxy/jobs/runners/drmaa.py @@ -37,29 +37,10 @@ drmaa.JobState.FAILED: 'job finished, but failed', } -# The last four lines (following the last fi) will: -# - setup the env -# - move to the job wrapper's working directory -# - execute the command -# - take the command's exit code ($?) and write it to a file. -drm_template = """#!/bin/sh -GALAXY_LIB="%s" -if [ "$GALAXY_LIB" != "None" ]; then - if [ -n "$PYTHONPATH" ]; then - PYTHONPATH="$GALAXY_LIB:$PYTHONPATH" - else - PYTHONPATH="$GALAXY_LIB" - fi - export PYTHONPATH -fi -%s -cd %s -%s -echo $? > %s -""" DRMAA_jobTemplate_attributes = [ 'args', 'remoteCommand', 'outputPath', 'errorPath', 'nativeSpecification', - 'jobName','email','project' ] + 'jobName', 'email', 'project' ] + class DRMAAJobRunner( AsynchronousJobRunner ): """ @@ -138,12 +119,7 @@ jt.nativeSpecification = native_spec # fill in the DRM's job run template - script = drm_template % ( job_wrapper.galaxy_lib_dir, - job_wrapper.get_env_setup_clause(), - os.path.abspath( job_wrapper.working_directory ), - command_line, - ajs.exit_code_file ) - + script = self.get_job_file(job_wrapper, exit_code_path=ajs.exit_code_file) try: fh = file( ajs.job_file, "w" ) fh.write( script ) diff -r b681b2aeddbc52c3d953abcb431688df6fb8a005 -r 432999eabbaa97528a2bd37f1b99de86b39a924f lib/galaxy/jobs/runners/util/__init__.py --- /dev/null +++ b/lib/galaxy/jobs/runners/util/__init__.py @@ -0,0 +1,9 @@ +""" +This module and its submodules contains utilities for running external +processes and interfacing with job managers. This module should contain +functionality shared between Galaxy and the LWR. +""" +try: + from galaxy.util.bunch import Bunch +except ImportError: + from lwr.util import Bunch diff -r b681b2aeddbc52c3d953abcb431688df6fb8a005 -r 432999eabbaa97528a2bd37f1b99de86b39a924f lib/galaxy/jobs/runners/util/job_script/CLUSTER_SLOTS_STATEMENT.sh --- /dev/null +++ b/lib/galaxy/jobs/runners/util/job_script/CLUSTER_SLOTS_STATEMENT.sh @@ -0,0 +1,11 @@ +export GALAXY_SLOTS_CONFIGURED="1" +if [ -n "$SLURM_JOB_NUM_NODES" ]; then + GALAXY_SLOTS="$SLURM_JOB_NUM_NODES" +elif [ -n "$NSLOTS" ]; then + GALAXY_SLOTS="$NSLOTS" +elif [ -f "$PBS_NODEFILE" ]; then + GALAXY_SLOTS=`wc -l < $PBS_NODEFILE` +else + GALAXY_SLOTS="1" + unset GALAXY_SLOTS_CONFIGURED +fi diff -r b681b2aeddbc52c3d953abcb431688df6fb8a005 -r 432999eabbaa97528a2bd37f1b99de86b39a924f lib/galaxy/jobs/runners/util/job_script/DEFAULT_JOB_FILE_TEMPLATE.sh --- /dev/null +++ b/lib/galaxy/jobs/runners/util/job_script/DEFAULT_JOB_FILE_TEMPLATE.sh @@ -0,0 +1,17 @@ +#!/bin/sh +$headers +$slots_statement +export GALAXY_SLOTS +GALAXY_LIB="$galaxy_lib" +if [ "$GALAXY_LIB" != "None" ]; then + if [ -n "$PYTHONPATH" ]; then + PYTHONPATH="$GALAXY_LIB:$PYTHONPATH" + else + PYTHONPATH="$GALAXY_LIB" + fi + export PYTHONPATH +fi +$env_setup_commands +cd $working_directory +$command +echo $? > $exit_code_path diff -r b681b2aeddbc52c3d953abcb431688df6fb8a005 -r 432999eabbaa97528a2bd37f1b99de86b39a924f lib/galaxy/jobs/runners/util/job_script/__init__.py --- /dev/null +++ b/lib/galaxy/jobs/runners/util/job_script/__init__.py @@ -0,0 +1,54 @@ +from string import Template +from pkg_resources import resource_string + +DEFAULT_JOB_FILE_TEMPLATE = Template( + resource_string(__name__, 'DEFAULT_JOB_FILE_TEMPLATE.sh') +) + +SLOTS_STATEMENT_CLUSTER_DEFAULT = \ + resource_string(__name__, 'CLUSTER_SLOTS_STATEMENT.sh') + +SLOTS_STATEMENT_SINGLE = """ +GALAXY_SLOTS="1" +""" + +REQUIRED_TEMPLATE_PARAMS = ['working_directory', 'command', 'exit_code_path'] +OPTIONAL_TEMPLATE_PARAMS = { + 'galaxy_lib': None, + 'headers': '', + 'env_setup_commands': '', + 'slots_statement': SLOTS_STATEMENT_CLUSTER_DEFAULT, +} + + +def job_script(template=DEFAULT_JOB_FILE_TEMPLATE, **kwds): + """ + + >>> has_exception = False + >>> try: job_script() + ... except Exception as e: has_exception = True + >>> has_exception + True + >>> script = job_script(working_directory='wd', command='uptime', exit_code_path='ec') + >>> '\\nuptime\\n' in script + True + >>> 'echo $? > ec' in script + True + >>> 'GALAXY_LIB="None"' in script + True + >>> script.startswith('#!/bin/sh\\n#PBS -test\\n') + False + >>> script = job_script(working_directory='wd', command='uptime', exit_code_path='ec', headers='#PBS -test') + >>> script.startswith('#!/bin/sh\\n#PBS -test\\n') + True + >>> script = job_script(working_directory='wd', command='uptime', exit_code_path='ec', slots_statement='GALAXY_SLOTS="$SLURM_JOB_NUM_NODES"') + >>> script.find('GALAXY_SLOTS="$SLURM_JOB_NUM_NODES"\\nexport GALAXY_SLOTS\\n') > 0 + True + """ + if any([param not in kwds for param in REQUIRED_TEMPLATE_PARAMS]): + raise Exception("Failed to create job_script, a required parameter is missing.") + template_params = OPTIONAL_TEMPLATE_PARAMS.copy() + template_params.update(**kwds) + if not isinstance(template, Template): + template = Template(template) + return template.safe_substitute(template_params) https://bitbucket.org/galaxy/galaxy-central/commits/126d5c5b9a49/ Changeset: 126d5c5b9a49 User: jmchilton Date: 2013-10-17 00:37:37 Summary: Condor job runner enhancements. Implement exit code handling and submission script parameters (job destination parameter <param id="submit_xxxx">yyyy</param> will be passed through as xxxx = yyyy. Bring in LWR helper scripts to implement most of these and clean up things. Bring in LWR external.py for parsing ID's comding from resource managers, seems like overkill for now in Galaxy because just used in condor, but in LWR also used for its version of the CLI runners (will merge those changes in down the road). Utilize job_scripts module for building script (this is what fixes exit code handling). TODO: GALAXY_SLOTS logic still not implemented, but should be easier now. Affected #: 3 files diff -r 432999eabbaa97528a2bd37f1b99de86b39a924f -r 126d5c5b9a4938909b518cb176dc47890e1c13e5 lib/galaxy/jobs/runners/condor.py --- a/lib/galaxy/jobs/runners/condor.py +++ b/lib/galaxy/jobs/runners/condor.py @@ -3,40 +3,18 @@ """ import os -import re -import sys -import time import logging -import subprocess from galaxy import model from galaxy.jobs.runners import AsynchronousJobState, AsynchronousJobRunner - -from galaxy.util import asbool +from galaxy.jobs.runners.util.condor import submission_params, build_submit_description +from galaxy.jobs.runners.util.condor import condor_submit, condor_stop +from galaxy.jobs.runners.util.condor import summarize_condor_log log = logging.getLogger( __name__ ) __all__ = [ 'CondorJobRunner' ] -drm_template = """#!/bin/sh -GALAXY_LIB="%s" -if [ "$GALAXY_LIB" != "None" ]; then - if [ -n "$PYTHONPATH" ]; then - PYTHONPATH="$GALAXY_LIB:$PYTHONPATH" - else - PYTHONPATH="$GALAXY_LIB" - fi - export PYTHONPATH -fi -cd %s -%s -""" - -default_query_classad = dict( - universe = 'vanilla', - getenv = 'true', - notification = 'NEVER', -) class CondorJobState( AsynchronousJobState ): def __init__( self, **kwargs ): @@ -49,21 +27,22 @@ self.user_log = None self.user_log_size = 0 + class CondorJobRunner( AsynchronousJobRunner ): """ Job runner backed by a finite pool of worker threads. FIFO scheduling """ runner_name = "CondorRunner" + def __init__( self, app, nworkers ): """Initialize this job runner and start the monitor thread""" super( CondorJobRunner, self ).__init__( app, nworkers ) self._init_monitor_thread() self._init_worker_threads() - # superclass url_to_destination is fine - condor runner does not take params - def queue_job( self, job_wrapper ): """Create job script and submit it to the DRM""" + # prepare the job if not self.prepare_job( job_wrapper, include_metadata=True ): return @@ -78,25 +57,30 @@ galaxy_id_tag = job_wrapper.get_id_tag() # get destination params - query_params = default_query_classad.copy() - query_params.update( job_destination.params ) + query_params = submission_params(**job_destination.params) # define job attributes - cjs = CondorJobState( files_dir=self.app.config.cluster_files_directory, job_wrapper=job_wrapper ) - cjs.user_log = os.path.join( self.app.config.cluster_files_directory, 'galaxy_%s.condor.log' % galaxy_id_tag ) + cjs = CondorJobState( + files_dir=self.app.config.cluster_files_directory, + job_wrapper=job_wrapper + ) + + cluster_directory = self.app.config.cluster_files_directory + cjs.user_log = os.path.join( cluster_directory, 'galaxy_%s.condor.log' % galaxy_id_tag ) cjs.register_cleanup_file_attribute( 'user_log' ) - submit_file = os.path.join( self.app.config.cluster_files_directory, 'galaxy_%s.condor.desc' % galaxy_id_tag ) + submit_file = os.path.join( cluster_directory, 'galaxy_%s.condor.desc' % galaxy_id_tag ) executable = cjs.job_file - submit_desc = [ ] - for k, v in query_params.items(): - submit_desc.append( '%s = %s' % ( k, v ) ) - submit_desc.append( 'executable = ' + executable ) - submit_desc.append( 'output = ' + cjs.output_file ) - submit_desc.append( 'error = ' + cjs.error_file ) - submit_desc.append( 'log = ' + cjs.user_log ) - submit_desc.append( 'queue' ) - script = drm_template % (job_wrapper.galaxy_lib_dir, os.path.abspath( job_wrapper.working_directory ), command_line) + build_submit_params = dict( + executable=executable, + output=cjs.output_file, + error=cjs.error_file, + user_log=cjs.user_log, + query_params=query_params, + ) + + submit_file_contents = build_submit_description(**build_submit_params) + script = self.get_job_file( job_wrapper, exit_code_path=cjs.exit_code_file ) try: fh = file( executable, "w" ) fh.write( script ) @@ -108,10 +92,7 @@ return try: - fh = file( submit_file, 'w' ) - for line in submit_desc: - fh.write( line + '\n' ) - fh.close() + open(submit_file, "w").write(submit_file_contents) except: if self.app.config.cleanup_job == "always": cjs.cleanup() @@ -132,23 +113,9 @@ log.debug( "(%s) submitting file %s" % ( galaxy_id_tag, executable ) ) log.debug( "(%s) command is: %s" % ( galaxy_id_tag, command_line ) ) - s_out = '' - external_job_id = None - try: - submit = subprocess.Popen( ( 'condor_submit', submit_file ), stdout=subprocess.PIPE, stderr=subprocess.STDOUT ) - s_out, s_err = submit.communicate() - if submit.returncode == 0: - match = re.search( 'submitted to cluster (\\d+).', s_out ) - if match is None: - s_out = 'Failed to find job id from condor_submit' - else: - external_job_id = match.group( 1 ) - except Exception, e: - # TODO Add extra except for OSError? - s_out = str(e) - + external_job_id, message = condor_submit(submit_file) if external_job_id is None: - log.debug( "condor_submit failed for job %s: %s" % (job_wrapper.get_id_tag(), s_out) ) + log.debug( "condor_submit failed for job %s: %s" % (job_wrapper.get_id_tag(), message) ) if self.app.config.cleanup_job == "always": os.unlink( submit_file ) cjs.cleanup() @@ -177,29 +144,17 @@ new_watched = [] for cjs in self.watched: job_id = cjs.job_id - log_job_id = job_id.zfill(3) galaxy_id_tag = cjs.job_wrapper.get_id_tag() - job_running = False - job_complete = False - job_failed = False try: if os.stat( cjs.user_log ).st_size == cjs.user_log_size: new_watched.append( cjs ) continue - with open(cjs.user_log, 'r') as fh: - for line in fh: - if '001 (' + log_job_id + '.' in line: - job_running = True - if '004 (' + log_job_id + '.' in line: - job_running = False - if '007 (' + log_job_id + '.' in line: - job_running = False - if '005 (' + log_job_id + '.' in line: - job_complete = True - if '009 (' + log_job_id + '.' in line: - job_failed = True - cjs.user_log_size = fh.tell() - except Exception, e: + s1, s4, s7, s5, s9, log_size = summarize_condor_log(cjs.user_log, job_id) + job_running = s1 and not (s4 or s7) + job_complete = s5 + job_failed = s9 + cjs.user_log_size = log_size + except Exception: # so we don't kill the monitor thread log.exception( "(%s/%s) Unable to check job status" % ( galaxy_id_tag, job_id ) ) log.warning( "(%s/%s) job will now be errored" % ( galaxy_id_tag, job_id ) ) @@ -230,13 +185,10 @@ def stop_job( self, job ): """Attempts to delete a job from the DRM queue""" - try: - subprocess.check_call( ( 'condor_rm', job.job_runner_external_id ) ) - log.debug( "(%s/%s) Removed from DRM queue at user's request" % ( job.id, job.job_runner_external_id ) ) - except subprocess.CalledProcessError: - log.debug( "(%s/%s) User killed running job, but condor_rm failed" % ( job.id, job.job_runner_external_id ) ) - except Exception, e: - log.debug( "(%s/%s) User killed running job, but error encountered removing from Condor queue: %s" % ( job.id, job.job_runner_external_id, e ) ) + external_id = job.job_runner_external_id + failure_message = condor_stop(external_id) + if failure_message: + log.debug("(%s/%s). Failed to stop condor %s" % (external_id, failure_message)) def recover( self, job, job_wrapper ): """Recovers jobs stuck in the queued/running state when Galaxy started""" diff -r 432999eabbaa97528a2bd37f1b99de86b39a924f -r 126d5c5b9a4938909b518cb176dc47890e1c13e5 lib/galaxy/jobs/runners/util/condor/__init__.py --- /dev/null +++ b/lib/galaxy/jobs/runners/util/condor/__init__.py @@ -0,0 +1,114 @@ +""" +Condor helper utilities. +""" +from subprocess import Popen, PIPE, STDOUT, check_call, CalledProcessError +from ..external import parse_external_id + +DEFAULT_QUERY_CLASSAD = dict( + universe='vanilla', + getenv='true', + notification='NEVER', +) + +PROBLEM_RUNNING_CONDOR_SUBMIT = \ + "Problem encountered while running condor_submit." +PROBLEM_PARSING_EXTERNAL_ID = \ + "Failed to find job id from condor_submit" + +SUBMIT_PARAM_PREFIX = "submit_" + + +def submission_params(**kwds): + submission_params = {} + for key, value in kwds.iteritems(): + key = key.lower() + if key.startswith(SUBMIT_PARAM_PREFIX): + condor_key = key[len(SUBMIT_PARAM_PREFIX):] + submission_params[condor_key] = value + return submission_params + + +def build_submit_description(executable, output, error, user_log, query_params): + """ + Build up the contents of a condor submit description file. + + >>> submit_args = dict(executable='/path/to/script', output='o', error='e', user_log='ul') + >>> submit_args['query_params'] = dict() + >>> default_description = build_submit_description(**submit_args) + >>> assert 'executable = /path/to/script' in default_description + >>> assert 'output = o' in default_description + >>> assert 'error = e' in default_description + >>> assert 'queue' in default_description + >>> assert 'universe = vanilla' in default_description + >>> assert 'universe = standard' not in default_description + >>> submit_args['query_params'] = dict(universe='standard') + >>> std_description = build_submit_description(**submit_args) + >>> assert 'universe = vanilla' not in std_description + >>> assert 'universe = standard' in std_description + """ + all_query_params = DEFAULT_QUERY_CLASSAD.copy() + all_query_params.update(query_params) + + submit_description = [] + for key, value in all_query_params.items(): + submit_description.append('%s = %s' % (key, value)) + submit_description.append('executable = ' + executable) + submit_description.append('output = ' + output) + submit_description.append('error = ' + error) + submit_description.append('log = ' + user_log) + submit_description.append('queue') + return '\n'.join(submit_description) + + +def condor_submit(submit_file): + """ + Submit a condor job described by the given file. Parse an external id for + the submission or return None and a reason for the failure. + """ + external_id = None + try: + submit = Popen(('condor_submit', submit_file), stdout=PIPE, stderr=STDOUT) + message, _ = submit.communicate() + if submit.returncode == 0: + external_id = parse_external_id(message, type='condor') + else: + message = PROBLEM_PARSING_EXTERNAL_ID + except Exception as e: + message = str(e) + return external_id, message + + +def condor_stop(external_id): + """ + Stop running condor job and return a failure_message if this + fails. + """ + failure_message = None + try: + check_call(('condor_rm', external_id)) + except CalledProcessError: + failure_message = "condor_rm failed" + except Exception as e: + "error encountered calling condor_rm: %s" % e + return failure_message + + +def summarize_condor_log(log_file, external_id): + """ + """ + log_job_id = external_id.zfill(3) + s1 = s4 = s7 = s5 = s9 = False + with open(log_file, 'r') as log_handle: + for line in log_handle: + if '001 (' + log_job_id + '.' in line: + s1 = True + if '004 (' + log_job_id + '.' in line: + s4 = True + if '007 (' + log_job_id + '.' in line: + s7 = True + if '005 (' + log_job_id + '.' in line: + s5 = True + if '009 (' + log_job_id + '.' in line: + s9 = True + file_size = log_handle.tell() + return s1, s4, s7, s5, s9, file_size diff -r 432999eabbaa97528a2bd37f1b99de86b39a924f -r 126d5c5b9a4938909b518cb176dc47890e1c13e5 lib/galaxy/jobs/runners/util/external.py --- /dev/null +++ b/lib/galaxy/jobs/runners/util/external.py @@ -0,0 +1,37 @@ +from re import search + +EXTERNAL_ID_TYPE_ANY = None + +EXTERNAL_ID_PATTERNS = [ + ('condor', r'submitted to cluster (\d+)\.'), + ('slurm', r'Submitted batch job (\w+)'), + ('torque', r'(.+)'), # Default 'pattern' assumed by Galaxy code circa August 2013. +] + + +def parse_external_id(output, type=EXTERNAL_ID_TYPE_ANY): + """ + Attempt to parse the output of job submission commands for an external id.__doc__ + + >>> parse_external_id("12345.pbsmanager") + '12345.pbsmanager' + >>> parse_external_id('Submitted batch job 185') + '185' + >>> parse_external_id('Submitted batch job 185', type='torque') + 'Submitted batch job 185' + >>> parse_external_id('submitted to cluster 125.') + '125' + >>> parse_external_id('submitted to cluster 125.', type='slurm') + >>> + """ + external_id = None + for pattern_type, pattern in EXTERNAL_ID_PATTERNS: + if type != EXTERNAL_ID_TYPE_ANY and type != pattern_type: + continue + + match = search(pattern, output) + if match: + external_id = match.group(1) + break + + return external_id https://bitbucket.org/galaxy/galaxy-central/commits/93caaf7f9bdd/ Changeset: 93caaf7f9bdd User: jmchilton Date: 2013-10-17 00:37:37 Summary: Implement GALAXY_SLOTS logic for condor runner. Just pass through submit_request_cpus as GALAXY_SLOTS if set in job_conf.xml. Affected #: 1 file diff -r 126d5c5b9a4938909b518cb176dc47890e1c13e5 -r 93caaf7f9bdd6429a250921fa080bfe71028f8c0 lib/galaxy/jobs/runners/condor.py --- a/lib/galaxy/jobs/runners/condor.py +++ b/lib/galaxy/jobs/runners/condor.py @@ -58,6 +58,11 @@ # get destination params query_params = submission_params(**job_destination.params) + galaxy_slots = query_params.get('request_cpus', None) + if galaxy_slots: + galaxy_slots_statement = 'GALAXY_SLOTS="%s"; export GALAXY_SLOTS_CONFIGURED="1"' % galaxy_slots + else: + galaxy_slots_statement = 'GALAXY_SLOTS="1"' # define job attributes cjs = CondorJobState( @@ -80,7 +85,11 @@ ) submit_file_contents = build_submit_description(**build_submit_params) - script = self.get_job_file( job_wrapper, exit_code_path=cjs.exit_code_file ) + script = self.get_job_file( + job_wrapper, + exit_code_path=cjs.exit_code_file, + slots_statement=galaxy_slots_statement, + ) try: fh = file( executable, "w" ) fh.write( script ) https://bitbucket.org/galaxy/galaxy-central/commits/5715b876f91b/ Changeset: 5715b876f91b User: jmchilton Date: 2013-10-17 00:37:37 Summary: PBS job runner improvements (including GALAXY_SLOTS). Rework to use get_job_file, this will enable GALAXY_SLOTS logic for the PBS runner. Bring in staging fixes found in pull request #194 from CSIRO (stephen_mcmahon_). Small PEP-8 tweaks. Affected #: 1 file diff -r 93caaf7f9bdd6429a250921fa080bfe71028f8c0 -r 5715b876f91bd34d179dc9d48e7ad1a9e6baf285 lib/galaxy/jobs/runners/pbs.py --- a/lib/galaxy/jobs/runners/pbs.py +++ b/lib/galaxy/jobs/runners/pbs.py @@ -1,9 +1,10 @@ -import os, logging, threading, time, traceback +import os +import logging +import time +import traceback from datetime import timedelta -from Queue import Queue, Empty from galaxy import model -from galaxy.datatypes.data import nice_size from galaxy.util.bunch import Bunch from galaxy.util import DATABASE_MAX_STRING_SIZE, shrink_stream_by_size from galaxy.jobs import JobDestination @@ -35,42 +36,14 @@ # The last two lines execute the command and then retrieve the command's # exit code ($?) and write it to a file. -pbs_template = """#!/bin/sh -GALAXY_LIB="%s" -if [ "$GALAXY_LIB" != "None" ]; then - if [ -n "$PYTHONPATH" ]; then - export PYTHONPATH="$GALAXY_LIB:$PYTHONPATH" - else - export PYTHONPATH="$GALAXY_LIB" - fi -fi -%s -cd %s -%s -echo $? > %s -""" - -# The last two lines execute the command and then retrieve the command's -# exit code ($?) and write it to a file. -pbs_symlink_template = """#!/bin/sh -GALAXY_LIB="%s" -if [ "$GALAXY_LIB" != "None" ]; then - if [ -n "$PYTHONPATH" ]; then - export PYTHONPATH="$GALAXY_LIB:$PYTHONPATH" - else - export PYTHONPATH="$GALAXY_LIB" - fi -fi +pbs_symlink_template = """ for dataset in %s; do dir=`dirname $dataset` file=`basename $dataset` [ ! -d $dir ] && mkdir -p $dir [ ! -e $dataset ] && ln -s %s/$file $dataset done -%s -cd %s -%s -echo $? > %s +mkdir -p %s """ PBS_ARGMAP = { @@ -109,6 +82,7 @@ -8: "exec() of user command failed", } + class PBSJobRunner( AsynchronousJobRunner ): """ Job runner backed by a finite pool of worker threads. FIFO scheduling @@ -273,6 +247,7 @@ pbs_ofile = self.app.config.pbs_application_server + ':' + ofile pbs_efile = self.app.config.pbs_application_server + ':' + efile output_files = [ str( o ) for o in output_fnames ] + output_files.append(ecfile) stagein = self.get_stage_in_out( job_wrapper.get_input_fnames() + output_files, symlink=True ) stageout = self.get_stage_in_out( output_files ) attrs = [ @@ -300,20 +275,20 @@ # write the job script if self.app.config.pbs_stage_path != '': - script = pbs_symlink_template % ( job_wrapper.galaxy_lib_dir, - " ".join( job_wrapper.get_input_fnames() + output_files ), - self.app.config.pbs_stage_path, - job_wrapper.get_env_setup_clause(), - exec_dir, - command_line, - ecfile ) + # touch the ecfile so that it gets staged + with file(ecfile, 'a'): + os.utime(ecfile, None) + stage_commands = pbs_symlink_template % ( + " ".join( job_wrapper.get_input_fnames() + output_files ), + self.app.config.pbs_stage_path, + exec_dir, + ) else: - script = pbs_template % ( job_wrapper.galaxy_lib_dir, - job_wrapper.get_env_setup_clause(), - exec_dir, - command_line, - ecfile ) + stage_commands = '' + + env_setup_commands = '%s\n%s' % (stage_commands, job_wrapper.get_env_setup_clause()) + script = self.get_job_file(job_wrapper, exit_code_path=ecfile, env_setup_commands=env_setup_commands) job_file = "%s/%s.sh" % (self.app.config.cluster_files_directory, job_wrapper.job_id) fh = file(job_file, "w") fh.write(script) https://bitbucket.org/galaxy/galaxy-central/commits/84784515c048/ Changeset: 84784515c048 User: jmchilton Date: 2013-10-17 00:37:37 Summary: Add GALAXY_SLOTS logic to local runner. Affected #: 1 file diff -r 5715b876f91bd34d179dc9d48e7ad1a9e6baf285 -r 84784515c0481cfddbfc05e983efced55972e9a0 lib/galaxy/jobs/runners/local.py --- a/lib/galaxy/jobs/runners/local.py +++ b/lib/galaxy/jobs/runners/local.py @@ -42,6 +42,21 @@ super( LocalJobRunner, self ).__init__( app, nworkers ) self._init_worker_threads() + def __command_line( self, job_wrapper ): + """ + """ + command_line = job_wrapper.runner_command_line + + ## slots would be cleaner name, but don't want deployers to see examples and think it + ## is going to work with other job runners. + slots = job_wrapper.job_destination.params.get( "local_slots", None ) + command_line = command_line.lstrip( " ;" ) + if slots: + command_line = 'export GALAXY_SLOTS="%d"; export GALAXY_SLOTS_CONFIGURED="1"; %s' % ( int( slots ), command_line ) + else: + command_line = 'export GALAXY_SLOTS="1"; %s' % command_line + return command_line + def queue_job( self, job_wrapper ): # prepare the job if not self.prepare_job( job_wrapper ): @@ -51,7 +66,7 @@ exit_code = 0 # command line has been added to the wrapper by prepare_job() - command_line = job_wrapper.runner_command_line + command_line = self.__command_line( job_wrapper ) job_id = job_wrapper.get_id_tag() https://bitbucket.org/galaxy/galaxy-central/commits/3825c8d49364/ Changeset: 3825c8d49364 User: jmchilton Date: 2013-10-17 00:37:37 Summary: Improved fix for tools producing command lines with trailing semi-colons. Affected #: 1 file diff -r 84784515c0481cfddbfc05e983efced55972e9a0 -r 3825c8d493649a72c6fc65bdd06b2362f0bf5930 lib/galaxy/jobs/command_factory.py --- a/lib/galaxy/jobs/command_factory.py +++ b/lib/galaxy/jobs/command_factory.py @@ -21,7 +21,7 @@ # Remove trailing semi-colon so we can start hacking up this command. # TODO: Refactor to compose a list and join with ';', would be more clean. - commands = commands.rstrip(";") + commands = commands.rstrip("; ") # Prepend version string if job_wrapper.version_string_cmd: Repository URL: https://bitbucket.org/galaxy/galaxy-central/ -- This is a commit notification from bitbucket.org. You are receiving this because you have the service enabled, addressing the recipient of this email.