1 new commit in galaxy-central: https://bitbucket.org/galaxy/galaxy-central/changeset/bb365c782b64/ changeset: bb365c782b64 user: natefoo date: 2012-10-24 17:34:16 summary: Implement job_walltime for the local runner. affected #: 3 files diff -r a5db4601ddbe117b5eb393f5d92982e50a866c7d -r bb365c782b64c37b90333d964f55cba0acd401da lib/galaxy/config.py --- a/lib/galaxy/config.py +++ b/lib/galaxy/config.py @@ -5,6 +5,7 @@ import sys, os, tempfile import logging, logging.config import ConfigParser +from datetime import timedelta from galaxy.util import string_as_bool, listify, parse_xml from galaxy import eggs @@ -99,6 +100,10 @@ self.output_size_limit = int( kwargs.get( 'output_size_limit', 0 ) ) self.retry_job_output_collection = int( kwargs.get( 'retry_job_output_collection', 0 ) ) self.job_walltime = kwargs.get( 'job_walltime', None ) + self.job_walltime_delta = None + if self.job_walltime is not None: + h, m, s = [ int( v ) for v in self.job_walltime.split( ':' ) ] + self.job_walltime_delta = timedelta( 0, s, 0, 0, m, h ) self.admin_users = kwargs.get( "admin_users", "" ) self.mailing_join_addr = kwargs.get('mailing_join_addr',"galaxy-announce-join@bx.psu.edu") self.error_email_to = kwargs.get( 'error_email_to', None ) diff -r a5db4601ddbe117b5eb393f5d92982e50a866c7d -r bb365c782b64c37b90333d964f55cba0acd401da lib/galaxy/jobs/runners/local.py --- a/lib/galaxy/jobs/runners/local.py +++ b/lib/galaxy/jobs/runners/local.py @@ -1,6 +1,7 @@ import logging import subprocess import tempfile +import datetime from Queue import Queue import threading @@ -89,28 +90,36 @@ preexec_fn = os.setpgrp ) job_wrapper.set_runner( 'local:///', proc.pid ) job_wrapper.change_state( model.Job.states.RUNNING ) - if self.app.config.output_size_limit > 0: - sleep_time = 1 - while proc.poll() is None: + sleep_time = 1 + job_start = datetime.datetime.now() + while proc.poll() is None: + if self.app.config.output_size_limit > 0: for outfile, size in job_wrapper.check_output_sizes(): if size > self.app.config.output_size_limit: # Error the job immediately - job_wrapper.fail( 'Job output grew too large (greater than %s), please try different job parameters or' \ + job_wrapper.fail( 'Job output grew too large (greater than %s), please try different job parameters' \ % nice_size( self.app.config.output_size_limit ) ) log.warning( 'Terminating job %s due to output %s growing larger than %s limit' \ % ( job_wrapper.job_id, os.path.basename( outfile ), nice_size( self.app.config.output_size_limit ) ) ) # Then kill it - os.killpg( proc.pid, 15 ) - sleep( 1 ) - if proc.poll() is None: - os.killpg( proc.pid, 9 ) - proc.wait() # reap + self._terminate( proc ) log.debug( 'Job %s (pid %s) terminated' % ( job_wrapper.job_id, proc.pid ) ) return sleep( sleep_time ) - if sleep_time < 8: - # So we don't stat every second - sleep_time *= 2 + if self.app.config.job_walltime_delta is not None: + time_executing = datetime.datetime.now() - job_start + if time_executing > self.app.config.job_walltime_delta: + # Error the job immediately + job_wrapper.fail( 'Job ran longer than maximum allowed execution time (%s), please try different job parameters' \ + % self.app.config.job_walltime ) + log.warning( 'Terminating job %s since walltime has been reached' % job_wrapper.job_id ) + # Then kill it + self._terminate( proc ) + log.debug( 'Job %s (pid %s) terminated' % ( job_wrapper.job_id, proc.pid ) ) + return + if sleep_time < 8: + # So we don't stat every second + sleep_time *= 2 # Reap the process and get the exit code. exit_code = proc.wait() stdout_file.seek( 0 ) @@ -202,3 +211,9 @@ # local jobs can't be recovered job_wrapper.change_state( model.Job.states.ERROR, info = "This job was killed when Galaxy was restarted. Please retry the job." ) + def _terminate( self, proc ): + os.killpg( proc.pid, 15 ) + sleep( 1 ) + if proc.poll() is None: + os.killpg( proc.pid, 9 ) + return proc.wait() # reap diff -r a5db4601ddbe117b5eb393f5d92982e50a866c7d -r bb365c782b64c37b90333d964f55cba0acd401da lib/galaxy/jobs/runners/pbs.py --- a/lib/galaxy/jobs/runners/pbs.py +++ b/lib/galaxy/jobs/runners/pbs.py @@ -128,10 +128,6 @@ # set the default server during startup self.default_pbs_server = None self.determine_pbs_server( 'pbs:///' ) - self.job_walltime = None - if self.app.config.job_walltime is not None: - h, m, s = [ int( v ) for v in self.app.config.job_walltime.split( ':' ) ] - self.job_walltime = timedelta( 0, s, 0, 0, m, h ) self.monitor_thread = threading.Thread( target=self.monitor ) self.monitor_thread.start() self.work_queue = Queue() @@ -422,7 +418,7 @@ fail = False for outfile, size in pbs_job_state.job_wrapper.check_output_sizes(): if size > self.app.config.output_size_limit: - pbs_job_state.fail_message = 'Job output grew too large (greater than %s), please try different job parameters or' \ + pbs_job_state.fail_message = 'Job output grew too large (greater than %s), please try different job parameters' \ % nice_size( self.app.config.output_size_limit ) log.warning( '(%s/%s) Dequeueing job due to output %s growing larger than %s limit' \ % ( galaxy_job_id, job_id, os.path.basename( outfile ), nice_size( self.app.config.output_size_limit ) ) ) @@ -432,14 +428,14 @@ break if fail: continue - if self.job_walltime is not None: + if self.app.config.job_walltime_delta is not None: # Check the job's execution time if status.get( 'resources_used', False ): # resources_used may not be in the status for new jobs h, m, s = [ int( i ) for i in status.resources_used.walltime.split( ':' ) ] time_executing = timedelta( 0, s, 0, 0, m, h ) - if time_executing > self.job_walltime: - pbs_job_state.fail_message = 'Job ran longer than maximum allowed execution time (%s), please try different job parameters or' \ + if time_executing > self.app.config.job_walltime_delta: + pbs_job_state.fail_message = 'Job ran longer than maximum allowed execution time (%s), please try different job parameters' \ % self.app.config.job_walltime log.warning( '(%s/%s) Dequeueing job since walltime has been reached' \ % ( galaxy_job_id, job_id ) ) Repository URL: https://bitbucket.org/galaxy/galaxy-central/ -- This is a commit notification from bitbucket.org. You are receiving this because you have the service enabled, addressing the recipient of this email.