commit/galaxy-central: natefoo: Fix the output_size and runtime job limits. Heads up to pbs runner users: I did not test this change with the PBS runner.
1 new commit in galaxy-central: https://bitbucket.org/galaxy/galaxy-central/commits/21babc3112aa/ Changeset: 21babc3112aa User: natefoo Date: 2014-06-19 19:22:26 Summary: Fix the output_size and runtime job limits. Heads up to pbs runner users: I did not test this change with the PBS runner. Affected #: 6 files diff -r 576be32f8aabc6b3124d32ea47c29b2d95b9a544 -r 21babc3112aa28761425367d3a5abfbbd8ce80e9 job_conf.xml.sample_advanced --- a/job_conf.xml.sample_advanced +++ b/job_conf.xml.sample_advanced @@ -412,5 +412,12 @@ will be terminated by Galaxy. --><limit type="walltime">24:00:00</limit> + <!-- output_size: + Size that any defined tool output can grow to before the job + will be terminated. This does not include temporary files + created by the job. Format is flexible, e.g.: + '10GB' = '10g' = '10240 Mb' = '10737418240' + --> + <limit type="output_size">10GB</limit></limits></job_conf> diff -r 576be32f8aabc6b3124d32ea47c29b2d95b9a544 -r 21babc3112aa28761425367d3a5abfbbd8ce80e9 lib/galaxy/jobs/__init__.py --- a/lib/galaxy/jobs/__init__.py +++ b/lib/galaxy/jobs/__init__.py @@ -22,7 +22,7 @@ from galaxy.exceptions import ObjectInvalid, ObjectNotFound from galaxy.jobs.actions.post import ActionBox from galaxy.jobs.mapper import JobRunnerMapper -from galaxy.jobs.runners import BaseJobRunner +from galaxy.jobs.runners import BaseJobRunner, JobState from galaxy.util.bunch import Bunch from galaxy.util.expressions import ExpressionContext from galaxy.util.json import from_json_string @@ -230,7 +230,7 @@ types = dict(registered_user_concurrent_jobs=int, anonymous_user_concurrent_jobs=int, walltime=str, - output_size=int) + output_size=util.size_to_bytes) self.limits = Bunch(registered_user_concurrent_jobs=None, anonymous_user_concurrent_jobs=None, @@ -1270,13 +1270,13 @@ def check_limits(self, runtime=None): if self.app.job_config.limits.output_size > 0: for outfile, size in self.get_output_sizes(): - if size > self.app.config.output_size_limit: - log.warning( '(%s) Job output %s is over the output size limit' % ( self.get_id_tag(), os.path.basename( outfile ) ) ) - return 'Job output file grew too large (greater than %s), please try different inputs or parameters' % util.nice_size( self.app.job_config.limits.output_size ) + if size > self.app.job_config.limits.output_size: + log.warning( '(%s) Job output size %s has exceeded the global output size limit', self.get_id_tag(), os.path.basename( outfile ) ) + return JobState.runner_states.OUTPUT_SIZE_LIMIT, 'Job output file grew too large (greater than %s), please try different inputs or parameters' % util.nice_size( self.app.job_config.limits.output_size ) if self.app.job_config.limits.walltime_delta is not None and runtime is not None: if runtime > self.app.job_config.limits.walltime_delta: - log.warning( '(%s) Job has reached walltime, it will be terminated' % ( self.get_id_tag() ) ) - return 'Job ran longer than the maximum allowed execution time (%s), please try different inputs or parameters' % self.app.job_config.limits.walltime + log.warning( '(%s) Job runtime %s has exceeded the global walltime, it will be terminated', self.get_id_tag(), runtime ) + return JobState.runner_states.GLOBAL_WALLTIME_REACHED, 'Job ran longer than the maximum allowed execution time (runtime: %s, limit: %s), please try different inputs or parameters' % ( str(runtime).split('.')[0], self.app.job_config.limits.walltime ) return None def has_limits( self ): diff -r 576be32f8aabc6b3124d32ea47c29b2d95b9a544 -r 21babc3112aa28761425367d3a5abfbbd8ce80e9 lib/galaxy/jobs/runners/__init__.py --- a/lib/galaxy/jobs/runners/__init__.py +++ b/lib/galaxy/jobs/runners/__init__.py @@ -6,6 +6,7 @@ import time import string import logging +import datetime import threading import subprocess @@ -335,7 +336,9 @@ Encapsulate state of jobs. """ runner_states = Bunch( - WALLTIME_REACHED = 'walltime_reached' + WALLTIME_REACHED = 'walltime_reached', + GLOBAL_WALLTIME_REACHED = 'global_walltime_reached', + OUTPUT_SIZE_LIMIT = 'output_size_limit' ) def __init__( self ): self.runner_state_handled = False @@ -374,8 +377,9 @@ def __init__( self, files_dir=None, job_wrapper=None, job_id=None, job_file=None, output_file=None, error_file=None, exit_code_file=None, job_name=None, job_destination=None ): super( AsynchronousJobState, self ).__init__() self.old_state = None - self.running = False + self._running = False self.check_count = 0 + self.start_time = None self.job_wrapper = job_wrapper # job_id is the DRM's job id, not the Galaxy job id @@ -392,6 +396,33 @@ self.cleanup_file_attributes = [ 'job_file', 'output_file', 'error_file', 'exit_code_file' ] + @property + def running( self ): + return self._running + + @running.setter + def running( self, is_running ): + self._running = is_running + # This will be invalid for job recovery + if self.start_time is None: + self.start_time = datetime.datetime.now() + + def check_limits( self, runtime=None ): + limit_state = None + if self.job_wrapper.has_limits(): + self.check_count += 1 + if self.running and (self.check_count % 20 == 0): + if runtime is None: + runtime = datetime.datetime.now() - (self.start_time or datetime.datetime.now()) + self.check_count = 0 + limit_state = self.job_wrapper.check_limits( runtime=runtime ) + if limit_state is not None: + # Set up the job for failure, but the runner will do the actual work + self.runner_state, self.fail_message = limit_state + self.stop_job = True + return True + return False + def cleanup( self ): for file in [ getattr( self, a ) for a in self.cleanup_file_attributes if hasattr( self, a ) ]: try: diff -r 576be32f8aabc6b3124d32ea47c29b2d95b9a544 -r 21babc3112aa28761425367d3a5abfbbd8ce80e9 lib/galaxy/jobs/runners/drmaa.py --- a/lib/galaxy/jobs/runners/drmaa.py +++ b/lib/galaxy/jobs/runners/drmaa.py @@ -273,6 +273,9 @@ if state in ( drmaa.JobState.FAILED, drmaa.JobState.DONE ): self._complete_terminal_job( ajs, drmaa_state = state ) continue + if ajs.check_limits(): + self.work_queue.put( ( self.fail_job, ajs ) ) + continue ajs.old_state = state new_watched.append( ajs ) # Replace the watch list with the updated version diff -r 576be32f8aabc6b3124d32ea47c29b2d95b9a544 -r 21babc3112aa28761425367d3a5abfbbd8ce80e9 lib/galaxy/jobs/runners/local.py --- a/lib/galaxy/jobs/runners/local.py +++ b/lib/galaxy/jobs/runners/local.py @@ -191,9 +191,9 @@ while proc.poll() is None: i += 1 if (i % 20) == 0: - msg = job_wrapper.check_limits(runtime=datetime.datetime.now() - job_start) - if msg is not None: - job_wrapper.fail(msg) + limit_state = job_wrapper.check_limits(runtime=datetime.datetime.now() - job_start) + if limit_state is not None: + job_wrapper.fail(limit_state[1]) log.debug('(%s) Terminating process group' % job_id) self._terminate(proc) return True diff -r 576be32f8aabc6b3124d32ea47c29b2d95b9a544 -r 21babc3112aa28761425367d3a5abfbbd8ce80e9 lib/galaxy/jobs/runners/pbs.py --- a/lib/galaxy/jobs/runners/pbs.py +++ b/lib/galaxy/jobs/runners/pbs.py @@ -400,18 +400,11 @@ if status.job_state == "R" and not pbs_job_state.running: pbs_job_state.running = True pbs_job_state.job_wrapper.change_state( model.Job.states.RUNNING ) - if status.job_state == "R" and ( pbs_job_state.check_count % 20 ) == 0: - # Every 20th time the job status is checked, do limit checks (if configured) - # Get the job's runtime - runtime = None - if status.get( 'resources_used', False ): - # resources_used may not be in the status for new jobs - h, m, s = [ int( i ) for i in status.resources_used.walltime.split( ':' ) ] - runtime = timedelta( 0, s, 0, 0, m, h ) - msg = pbs_job_state.job_wrapper.check_limits(runtime) - if msg is not None: - pbs_job_state.fail_message = msg - pbs_job_state.stop_job = True + if status.job_state == "R" and status.get( 'resources_used', False ): + # resources_used may not be in the status for new jobs + h, m, s = [ int( i ) for i in status.resources_used.walltime.split( ':' ) ] + runtime = timedelta( 0, s, 0, 0, m, h ) + if pbs_job_state.check_limits( runtime=runtime ): self.work_queue.put( ( self.fail_job, pbs_job_state ) ) continue elif status.job_state == "C": Repository URL: https://bitbucket.org/galaxy/galaxy-central/ -- This is a commit notification from bitbucket.org. You are receiving this because you have the service enabled, addressing the recipient of this email.
participants (1)
-
commits-noreply@bitbucket.org