commit/galaxy-central: natefoo: Implement the "memory_limit_reached" runner state for slurm and allow resubmission on that state.
1 new commit in galaxy-central: https://bitbucket.org/galaxy/galaxy-central/commits/53e734a15327/ Changeset: 53e734a15327 User: natefoo Date: 2014-10-23 03:05:07+00:00 Summary: Implement the "memory_limit_reached" runner state for slurm and allow resubmission on that state. Affected #: 4 files diff -r 74499e753e0f28b4353a27e241c1fce0f0895964 -r 53e734a1532733df32dcf45ace3dea6aa3f3604d config/job_conf.xml.sample_advanced --- a/config/job_conf.xml.sample_advanced +++ b/config/job_conf.xml.sample_advanced @@ -399,8 +399,11 @@ the terminal condition of a job will be used. The 'condition' attribute is optional, if not present, the - resubmit destination will be used for all conditions. Currently, - only the "walltime_reached" condition is implemented. + resubmit destination will be used for all conditions. The + conditions currently implemented are: + + - "walltime_reached" + - "memory_limit_reached" The 'handler' tag is optional, if not present, the job's original handler will be reused for the resubmitted job. @@ -413,6 +416,10 @@ <!-- The destination that you resubmit jobs to can be any runner type --><param id="nativeSpecification">-l h_rt=96:00:00</param></destination> + <destination id="smallmem" runner="slurm"> + <param id="nativeSpecification">--mem-per-cpu=512</param> + <resubmit condition="memory_limit_reached" destination="bigmem" /> + </destination></destinations><resources default="default"> diff -r 74499e753e0f28b4353a27e241c1fce0f0895964 -r 53e734a1532733df32dcf45ace3dea6aa3f3604d lib/galaxy/jobs/runners/__init__.py --- a/lib/galaxy/jobs/runners/__init__.py +++ b/lib/galaxy/jobs/runners/__init__.py @@ -337,6 +337,7 @@ """ runner_states = Bunch( WALLTIME_REACHED = 'walltime_reached', + MEMORY_LIMIT_REACHED = 'memory_limit_reached', GLOBAL_WALLTIME_REACHED = 'global_walltime_reached', OUTPUT_SIZE_LIMIT = 'output_size_limit' ) diff -r 74499e753e0f28b4353a27e241c1fce0f0895964 -r 53e734a1532733df32dcf45ace3dea6aa3f3604d lib/galaxy/jobs/runners/slurm.py --- a/lib/galaxy/jobs/runners/slurm.py +++ b/lib/galaxy/jobs/runners/slurm.py @@ -13,6 +13,8 @@ __all__ = [ 'SlurmJobRunner' ] +SLURM_MEMORY_LIMIT_EXCEEDED_MSG = 'slurmstepd: error: Exceeded job memory limit' + class SlurmJobRunner( DRMAAJobRunner ): runner_name = "SlurmRunner" @@ -62,8 +64,14 @@ except: ajs.fail_message = "This job failed due to a cluster node failure, and an attempt to resubmit the job failed." elif job_info['JobState'] == 'CANCELLED': - log.info( '(%s/%s) Job was cancelled via slurm (e.g. with scancel(1))', ajs.job_wrapper.get_id_tag(), ajs.job_id ) - ajs.fail_message = "This job failed because it was cancelled by an administrator." + # Check to see if the job was killed for exceeding memory consumption + if self.__check_memory_limit( ajs.error_file ): + log.info( '(%s/%s) Job hit memory limit', ajs.job_wrapper.get_id_tag(), ajs.job_id ) + ajs.fail_message = "This job was terminated because it used more memory than it was allocated." + ajs.runner_state = ajs.runner_states.MEMORY_LIMIT_REACHED + else: + log.info( '(%s/%s) Job was cancelled via slurm (e.g. with scancel(1))', ajs.job_wrapper.get_id_tag(), ajs.job_id ) + ajs.fail_message = "This job failed because it was cancelled by an administrator." else: log.warning( '(%s/%s) Job failed due to unknown reasons, JobState was: %s', ajs.job_wrapper.get_id_tag(), ajs.job_id, job_info['JobState'] ) ajs.fail_message = "This job failed for reasons that could not be determined." @@ -77,3 +85,31 @@ super( SlurmJobRunner, self )._complete_terminal_job( ajs, drmaa_state = drmaa_state ) # by default, finish as if the job was successful. super( SlurmJobRunner, self )._complete_terminal_job( ajs, drmaa_state = drmaa_state ) + + def __check_memory_limit( self, efile_path ): + """ + A very poor implementation of tail, but it doesn't need to be fancy + since we are only searching the last 2K + """ + try: + log.debug( 'Checking %s for exceeded memory message from slurm', efile_path ) + with open( efile_path ) as f: + pos = 2 + bof = False + while pos < 2048: + try: + f.seek(-pos, 2) + pos += 1 + except: + f.seek(-pos + 1, 2) + bof = True + + if (bof or f.read(1) == '\n') and f.readline().strip() == SLURM_MEMORY_LIMIT_EXCEEDED_MSG: + return True + + if bof: + break + except: + log.exception('Error reading end of %s:', path) + + return False diff -r 74499e753e0f28b4353a27e241c1fce0f0895964 -r 53e734a1532733df32dcf45ace3dea6aa3f3604d lib/galaxy/jobs/runners/state_handlers/resubmit.py --- a/lib/galaxy/jobs/runners/state_handlers/resubmit.py +++ b/lib/galaxy/jobs/runners/state_handlers/resubmit.py @@ -6,14 +6,24 @@ log = logging.getLogger(__name__) +MESSAGES = dict( + walltime_reached = 'it reached the walltime', + memory_limit_reached = 'it exceeded the amount of allocated memory' +) + def failure(app, job_runner, job_state): - if getattr( job_state, 'runner_state', None ) and job_state.runner_state == job_state.runner_states.WALLTIME_REACHED: + if getattr( job_state, 'runner_state', None ) and job_state.runner_state in ( job_state.runner_states.WALLTIME_REACHED, job_state.runner_states.MEMORY_LIMIT_REACHED ): # Intercept jobs that hit the walltime and have a walltime or nonspecific resubmit destination configured for resubmit in job_state.job_destination.get('resubmit'): - if resubmit.get('condition', None) and resubmit['condition'] != 'walltime_reached': + if resubmit.get('condition', None) and resubmit['condition'] != job_state.runner_state: continue # There is a resubmit defined for the destination but its condition is not for walltime_reached - log.info("(%s/%s) Job will be resubmitted to '%s' because it reached the walltime at the '%s' destination", job_state.job_wrapper.job_id, job_state.job_id, resubmit['destination'], job_state.job_wrapper.job_destination.id ) + log.info("(%s/%s) Job will be resubmitted to '%s' because %s at the '%s' destination", + job_state.job_wrapper.job_id, + job_state.job_id, + resubmit['destination'], + MESSAGES[job_state.runner_state], + job_state.job_wrapper.job_destination.id ) # fetch JobDestination for the id or tag new_destination = app.job_config.get_destination(resubmit['destination']) # Resolve dynamic if necessary Repository URL: https://bitbucket.org/galaxy/galaxy-central/ -- This is a commit notification from bitbucket.org. You are receiving this because you have the service enabled, addressing the recipient of this email.
participants (1)
-
commits-noreply@bitbucket.org