[hg] galaxy 2425: Allow the configuration of a job walltime (cur...
details: http://www.bx.psu.edu/hg/galaxy/rev/c8a4a4ea0f2c changeset: 2425:c8a4a4ea0f2c user: Nate Coraor <nate@bx.psu.edu> date: Fri May 29 13:19:42 2009 -0400 description: Allow the configuration of a job walltime (currently only supported in PBS) 4 file(s) affected in this change: lib/galaxy/config.py lib/galaxy/jobs/runners/pbs.py lib/galaxy/util/bunch.py universe_wsgi.ini.sample diffs (223 lines): diff -r 0fc19c283f78 -r c8a4a4ea0f2c lib/galaxy/config.py --- a/lib/galaxy/config.py Fri May 29 11:07:18 2009 -0400 +++ b/lib/galaxy/config.py Fri May 29 13:19:42 2009 -0400 @@ -55,6 +55,7 @@ self.job_working_directory = resolve_path( kwargs.get( "job_working_directory", "database/job_working_directory" ), self.root ) self.outputs_to_working_directory = string_as_bool( kwargs.get( 'outputs_to_working_directory', False ) ) self.output_size_limit = int( kwargs.get( 'output_size_limit', 0 ) ) + self.job_walltime = kwargs.get( 'job_walltime', None ) self.admin_users = kwargs.get( "admin_users", "" ) self.sendmail_path = kwargs.get('sendmail_path',"/usr/sbin/sendmail") self.mailing_join_addr = kwargs.get('mailing_join_addr',"galaxy-user-join@bx.psu.edu") @@ -123,7 +124,7 @@ """ admin_users = self.get( "admin_users", "" ).split( "," ) return ( user is not None and user.email in admin_users ) - + def get_database_engine_options( kwargs ): """ Allow options for the SQLAlchemy database engine to be passed by using diff -r 0fc19c283f78 -r c8a4a4ea0f2c lib/galaxy/jobs/runners/pbs.py --- a/lib/galaxy/jobs/runners/pbs.py Fri May 29 11:07:18 2009 -0400 +++ b/lib/galaxy/jobs/runners/pbs.py Fri May 29 13:19:42 2009 -0400 @@ -1,8 +1,10 @@ import os, logging, threading, time +from datetime import timedelta from Queue import Queue, Empty from galaxy import model from galaxy.datatypes.data import nice_size +from galaxy.util.bunch import Bunch from paste.deploy.converters import asbool @@ -88,6 +90,10 @@ # set the default server during startup self.default_pbs_server = None self.determine_pbs_server( 'pbs:///' ) + self.job_walltime = None + if self.app.config.job_walltime is not None: + h, m, s = [ int( v ) for v in self.app.config.job_walltime.split( ':' ) ] + self.job_walltime = timedelta( 0, s, 0, 0, m, h ) self.monitor_thread = threading.Thread( target=self.monitor ) self.monitor_thread.start() self.work_queue = Queue() @@ -138,8 +144,8 @@ self.queue_job( obj ) elif op == 'finish': self.finish_job( obj ) - elif op == 'fail_oversize_job': - self.fail_oversize_job( obj ) + elif op == 'fail': + self.fail_job( obj ) except: log.exception( "Uncaught exception %sing job" % op ) @@ -297,7 +303,7 @@ """ new_watched = [] # reduce pbs load by batching status queries - ( failures, states ) = self.check_all_jobs() + ( failures, statuses ) = self.check_all_jobs() for pbs_job_state in self.watched: job_id = pbs_job_state.job_id galaxy_job_id = pbs_job_state.job_wrapper.job_id @@ -307,28 +313,43 @@ log.debug( "(%s/%s) Skipping state check because PBS server connection failed" % ( galaxy_job_id, job_id ) ) new_watched.append( pbs_job_state ) continue - if states.has_key( job_id ): - state = states[job_id] - if state != old_state: - log.debug("(%s/%s) job state changed from %s to %s" % ( galaxy_job_id, job_id, old_state, state ) ) - if state == "R" and not pbs_job_state.running: + if statuses.has_key( job_id ): + status = statuses[job_id] + if status.job_state != old_state: + log.debug("(%s/%s) job state changed from %s to %s" % ( galaxy_job_id, job_id, old_state, status.job_state ) ) + if status.job_state == "R" and not pbs_job_state.running: pbs_job_state.running = True pbs_job_state.job_wrapper.change_state( model.Job.states.RUNNING ) - if self.app.config.output_size_limit > 0 and state == "R" and (pbs_job_state.check_count % 10) == 0: - # Every 10th time a job is checked, check the size of its outputs. - fail = False - for outfile, size in pbs_job_state.job_wrapper.check_output_sizes(): - if size > self.app.config.output_size_limit: - pbs_job_state.fail_message = 'Job output grew too large (greater than %s), please try different job parameters or' \ - % nice_size( self.app.config.output_size_limit ) - log.warning( '(%s/%s) Dequeueing job due to output %s growing larger than %s limit' \ - % ( galaxy_job_id, job_id, os.path.basename( outfile ), nice_size( self.app.config.output_size_limit ) ) ) - self.work_queue.put( ( 'fail_oversize_job', pbs_job_state ) ) - fail = True - break - if fail: - continue - pbs_job_state.old_state = state + if status.job_state == "R" and ( pbs_job_state.check_count % 20 ) == 0: + # Every 20th time the job status is checked, do limit checks (if configured) + if self.app.config.output_size_limit > 0: + # Check the size of the job outputs + fail = False + for outfile, size in pbs_job_state.job_wrapper.check_output_sizes(): + if size > self.app.config.output_size_limit: + pbs_job_state.fail_message = 'Job output grew too large (greater than %s), please try different job parameters or' \ + % nice_size( self.app.config.output_size_limit ) + log.warning( '(%s/%s) Dequeueing job due to output %s growing larger than %s limit' \ + % ( galaxy_job_id, job_id, os.path.basename( outfile ), nice_size( self.app.config.output_size_limit ) ) ) + self.work_queue.put( ( 'fail', pbs_job_state ) ) + fail = True + break + if fail: + continue + if self.job_walltime is not None: + # Check the job's execution time + if status.get( 'resources_used', False ): + # resources_used may not be in the status for new jobs + h, m, s = [ int( i ) for i in status.resources_used.walltime.split( ':' ) ] + time_executing = timedelta( 0, s, 0, 0, m, h ) + if time_executing > self.job_walltime: + pbs_job_state.fail_message = 'Job ran longer than maximum allowed execution time (%s), please try different job parameters or' \ + % self.app.config.job_walltime + log.warning( '(%s/%s) Dequeueing job since walltime has been reached' \ + % ( galaxy_job_id, job_id ) ) + self.work_queue.put( ( 'fail', pbs_job_state ) ) + continue + pbs_job_state.old_state = status.job_state new_watched.append( pbs_job_state ) else: try: @@ -350,11 +371,12 @@ def check_all_jobs( self ): """ Returns a list of servers that failed to be contacted and a dict - of "job_id : state" pairs. + of "job_id : status" pairs (where status is a bunchified version + of the API's structure. """ servers = [] failures = [] - states = {} + statuses = {} for pbs_job_state in self.watched: pbs_server_name = self.determine_pbs_server( pbs_job_state.runner_url ) if pbs_server_name not in servers: @@ -366,13 +388,27 @@ log.debug("connection to PBS server %s for state check failed" % pbs_server_name ) failures.append( pbs_server_name ) continue - stat_attrl = pbs.new_attrl(1) - stat_attrl[0].name = 'job_state' + stat_attrl = pbs.new_attrl(2) + stat_attrl[0].name = pbs.ATTR_state + stat_attrl[1].name = pbs.ATTR_used jobs = pbs.pbs_statjob( c, None, stat_attrl, None ) pbs.pbs_disconnect( c ) - for job in jobs: - states[job.name] = job.attribs[0].value - return( ( failures, states ) ) + statuses.update( self.convert_statjob_to_bunches( jobs ) ) + return( ( failures, statuses ) ) + + def convert_statjob_to_bunches( self, statjob_out ): + statuses = {} + for job in statjob_out: + status = {} + for attrib in job.attribs: + if attrib.resource is None: + status[ attrib.name ] = attrib.value + else: + if attrib.name not in status: + status[ attrib.name ] = Bunch() + status[ attrib.name ][ attrib.resource ] = attrib.value + statuses[ job.name ] = Bunch( **status ) + return statuses def check_single_job( self, pbs_server_name, job_id ): """ @@ -384,7 +420,7 @@ log.debug("connection to PBS server %s for state check failed" % pbs_server_name ) return None stat_attrl = pbs.new_attrl(1) - stat_attrl[0].name = 'job_state' + stat_attrl[0].name = pbs.ATTR_state jobs = pbs.pbs_statjob( c, job_id, stat_attrl, None ) pbs.pbs_disconnect( c ) return jobs[0].attribs[0].value @@ -417,7 +453,7 @@ # clean up the pbs files self.cleanup( ( ofile, efile, job_file ) ) - def fail_oversize_job( self, pbs_job_state ): + def fail_job( self, pbs_job_state ): """ Seperated out so we can use the worker threads for it. """ diff -r 0fc19c283f78 -r c8a4a4ea0f2c lib/galaxy/util/bunch.py --- a/lib/galaxy/util/bunch.py Fri May 29 11:07:18 2009 -0400 +++ b/lib/galaxy/util/bunch.py Fri May 29 13:19:42 2009 -0400 @@ -21,4 +21,7 @@ return '%s' % self.__dict__ def __nonzero__(self): - return bool(self.__dict__) \ No newline at end of file + return bool(self.__dict__) + + def __setitem__(self, k, v): + self.__dict__.__setitem__(k, v) diff -r 0fc19c283f78 -r c8a4a4ea0f2c universe_wsgi.ini.sample --- a/universe_wsgi.ini.sample Fri May 29 11:07:18 2009 -0400 +++ b/universe_wsgi.ini.sample Fri May 29 13:19:42 2009 -0400 @@ -171,6 +171,11 @@ # Job queue cleanup interval in minutes. Currently only used by RoundRobin job_queue_cleanup_interval = 30 +# Jobs can be killed after a certain amount of execution time. Format is in +# hh:mm:ss. Currently only implemented for PBS. Leave commented for +# unlimited. +#job_walltime = 10:00:00 + # Clustering Galaxy is not a straightforward process and requires a lot of # pre-configuration. See the ClusteringGalaxy Wiki before attempting to set # any of these options:
participants (1)
-
Greg Von Kuster