details: http://www.bx.psu.edu/hg/galaxy/rev/e26741c8c642 changeset: 1731:e26741c8c642 user: Nate Coraor <nate@bx.psu.edu> date: Tue Feb 03 14:15:09 2009 -0500 description: Stop jobs if any of their output datasets grow larger than a defined limit 5 file(s) affected in this change: lib/galaxy/config.py lib/galaxy/jobs/__init__.py lib/galaxy/jobs/runners/local.py lib/galaxy/jobs/runners/pbs.py lib/galaxy/tools/__init__.py diffs (309 lines): diff -r 0424f713d58d -r e26741c8c642 lib/galaxy/config.py --- a/lib/galaxy/config.py Tue Feb 03 14:14:02 2009 -0500 +++ b/lib/galaxy/config.py Tue Feb 03 14:15:09 2009 -0500 @@ -47,6 +47,7 @@ self.cluster_files_directory = os.path.abspath( kwargs.get( "cluster_files_directory", "database/pbs" ) ) self.job_working_directory = resolve_path( kwargs.get( "job_working_directory", "database/job_working_directory" ), self.root ) self.outputs_to_working_directory = string_as_bool( kwargs.get( 'outputs_to_working_directory', False ) ) + self.output_size_limit = int( kwargs.get( 'output_size_limit', 0 ) ) self.admin_pass = kwargs.get('admin_pass',"galaxy") self.sendmail_path = kwargs.get('sendmail_path',"/usr/sbin/sendmail") self.mailing_join_addr = kwargs.get('mailing_join_addr',"galaxy-user-join@bx.psu.edu") diff -r 0424f713d58d -r e26741c8c642 lib/galaxy/jobs/__init__.py --- a/lib/galaxy/jobs/__init__.py Tue Feb 03 14:14:02 2009 -0500 +++ b/lib/galaxy/jobs/__init__.py Tue Feb 03 14:15:09 2009 -0500 @@ -305,6 +305,7 @@ # and job recovery fail. self.working_directory = \ os.path.join( self.app.config.job_working_directory, str( self.job_id ) ) + self.output_paths = None def get_param_dict( self ): """ @@ -342,7 +343,7 @@ incoming['userId'] = userId incoming['userEmail'] = userEmail # Build params, done before hook so hook can use - param_dict = self.tool.build_param_dict( incoming, inp_data, out_data, self.working_directory ) + param_dict = self.tool.build_param_dict( incoming, inp_data, out_data, self.get_output_fnames() ) # Certain tools require tasks to be completed prior to job execution # ( this used to be performed in the "exec_before_job" hook, but hooks are deprecated ). if self.tool.tool_type is not None: @@ -371,7 +372,7 @@ self.param_dict = param_dict self.extra_filenames = extra_filenames return extra_filenames - + def fail( self, message, exception=False ): """ Indicate job failure by setting state and message on all output @@ -385,14 +386,14 @@ # Do this first in case we generate a traceback below if exception: job.traceback = traceback.format_exc() - for dataset_assoc in job.output_datasets: - if self.app.config.outputs_to_working_directory: - false_path = os.path.abspath( os.path.join( self.working_directory, "galaxy_dataset_%d.dat" % dataset_assoc.dataset.dataset.id ) ) + if self.app.config.outputs_to_working_directory: + for dataset_path in self.get_output_fnames(): try: - shutil.move( false_path, dataset_assoc.dataset.file_name ) - log.debug( "fail(): Moved %s to %s" % ( false_path, dataset_assoc.dataset.file_name ) ) + shutil.move( dataset_path.false_path, dataset_path.real_path ) + log.debug( "fail(): Moved %s to %s" % ( dataset_path.false_path, dataset_path.real_path ) ) except ( IOError, OSError ), e: log.error( "fail(): Missing output file in working directory: %s" % e ) + for dataset_assoc in job.output_datasets: dataset = dataset_assoc.dataset dataset.refresh() dataset.state = dataset.states.ERROR @@ -452,15 +453,15 @@ job.state = "error" else: job.state = 'ok' + if self.app.config.outputs_to_working_directory: + for dataset_path in self.get_output_fnames(): + try: + shutil.move( dataset_path.false_path, dataset_path.real_path ) + log.debug( "finish(): Moved %s to %s" % ( dataset_path.false_path, dataset_path.real_path ) ) + except ( IOError, OSError ): + self.fail( "Job %s's output dataset(s) could not be read" % job.id ) + return for dataset_assoc in job.output_datasets: - if self.app.config.outputs_to_working_directory: - false_path = os.path.abspath( os.path.join( self.working_directory, "galaxy_dataset_%d.dat" % dataset_assoc.dataset.dataset.id ) ) - try: - shutil.move( false_path, dataset_assoc.dataset.file_name ) - log.debug( "finish(): Moved %s to %s" % ( false_path, dataset_assoc.dataset.file_name ) ) - except ( IOError, OSError ): - self.fail( "The job's output dataset(s) could not be read" ) - return for dataset in dataset_assoc.dataset.dataset.history_associations: #need to update all associated output hdas, i.e. history was shared with job running dataset.blurb = 'done' dataset.peek = 'no peek' @@ -519,7 +520,7 @@ for fname in self.extra_filenames: os.remove( fname ) if self.working_directory is not None: - os.rmdir( self.working_directory ) + shutil.rmtree( self.working_directory ) except: log.exception( "Unable to cleanup job %d" % self.job_id ) @@ -543,9 +544,36 @@ return filenames def get_output_fnames( self ): + if self.output_paths is not None: + return self.output_paths + + class DatasetPath( object ): + def __init__( self, real_path, false_path = None ): + self.real_path = real_path + self.false_path = false_path + def __str__( self ): + if false_path is None: + return self.real_path + else: + return self.false_path + job = model.Job.get( self.job_id ) - return [ da.dataset.file_name for da in job.output_datasets ] + if self.app.config.outputs_to_working_directory: + self.output_paths = [] + for name, data in [ ( da.name, da.dataset ) for da in job.output_datasets ]: + false_path = os.path.abspath( os.path.join( self.working_directory, "galaxy_dataset_%d.dat" % data.id ) ) + self.output_paths.append( DatasetPath( data.file_name, false_path ) ) + else: + self.output_paths = [ DatasetPath( da.dataset.file_name ) for da in job.output_datasets ] + return self.output_paths + def check_output_sizes( self ): + sizes = [] + output_paths = self.get_output_fnames() + for outfile in [ str( o ) for o in output_paths ]: + sizes.append( ( outfile, os.stat( outfile ).st_size ) ) + return sizes + class DefaultJobDispatcher( object ): def __init__( self, app ): self.app = app diff -r 0424f713d58d -r e26741c8c642 lib/galaxy/jobs/runners/local.py --- a/lib/galaxy/jobs/runners/local.py Tue Feb 03 14:14:02 2009 -0500 +++ b/lib/galaxy/jobs/runners/local.py Tue Feb 03 14:15:09 2009 -0500 @@ -4,6 +4,7 @@ import threading from galaxy import model +from galaxy.datatypes.data import nice_size import os, errno from time import sleep @@ -68,9 +69,31 @@ env = env, preexec_fn = os.setpgrp ) job_wrapper.set_runner( 'local:///', proc.pid ) + if self.app.config.output_size_limit > 0: + sleep_time = 1 + while proc.poll() is None: + for outfile, size in job_wrapper.check_output_sizes(): + if size > self.app.config.output_size_limit: + # Error the job immediately + job_wrapper.fail( 'Job output grew too large (greater than %s), please try different job parameters or' \ + % nice_size( self.app.config.output_size_limit ) ) + log.warning( 'Terminating job %s due to output %s growing larger than %s limit' \ + % ( job_wrapper.job_id, os.path.basename( outfile ), nice_size( self.app.config.output_size_limit ) ) ) + # Then kill it + os.killpg( proc.pid, 15 ) + sleep( 1 ) + if proc.poll() is None: + os.killpg( proc.pid, 9 ) + proc.wait() # reap + log.debug( 'Job %s (pid %s) terminated' % ( job_wrapper.job_id, proc.pid ) ) + return + sleep( sleep_time ) + if sleep_time < 8: + # So we don't stat every second + sleep_time *= 2 stdout = proc.stdout.read() stderr = proc.stderr.read() - proc.wait() + proc.wait() # reap log.debug('execution finished: %s' % command_line) except Exception, exc: job_wrapper.fail( "failure running job", exception=True ) diff -r 0424f713d58d -r e26741c8c642 lib/galaxy/jobs/runners/pbs.py --- a/lib/galaxy/jobs/runners/pbs.py Tue Feb 03 14:14:02 2009 -0500 +++ b/lib/galaxy/jobs/runners/pbs.py Tue Feb 03 14:15:09 2009 -0500 @@ -2,6 +2,8 @@ from Queue import Queue, Empty from galaxy import model +from galaxy.datatypes.data import nice_size + from paste.deploy.converters import asbool import pkg_resources @@ -60,6 +62,7 @@ self.ofile = None self.efile = None self.runner_url = None + self.check_count = 0 class PBSJobRunner( object ): """ @@ -71,6 +74,8 @@ # Check if PBS was importable, fail if not if pbs is None: raise Exception( "PBSJobRunner requires pbs-python which was not found" ) + if app.config.pbs_application_server and app.config.outputs_to_working_directory: + raise Exception( "pbs_application_server (file staging) and outputs_to_working_directory options are mutually exclusive" ) self.app = app # 'watched' and 'queue' are both used to keep track of jobs to watch. # 'queue' is used to add new watched jobs, and can be called from @@ -132,6 +137,8 @@ self.queue_job( obj ) elif op == 'finish': self.finish_job( obj ) + elif op == 'fail_oversize_job': + self.fail_oversize_job( obj ) except: log.exception( "Uncaught exception %sing job" % op ) @@ -173,8 +180,9 @@ if self.app.config.pbs_application_server: pbs_ofile = self.app.config.pbs_application_server + ':' + ofile pbs_efile = self.app.config.pbs_application_server + ':' + efile - stagein = self.get_stage_in_out( job_wrapper.get_input_fnames() + job_wrapper.get_output_fnames(), symlink=True ) - stageout = self.get_stage_in_out( job_wrapper.get_output_fnames() ) + output_files = [ str( o ) for o in job_wrapper.get_output_fnames() ] + stagein = self.get_stage_in_out( job_wrapper.get_input_fnames() + output_files, symlink=True ) + stageout = self.get_stage_in_out( output_files ) job_attrs = pbs.new_attropl(5) job_attrs[0].name = pbs.ATTR_o job_attrs[0].value = pbs_ofile @@ -298,6 +306,20 @@ if state == "R" and not pbs_job_state.running: pbs_job_state.running = True pbs_job_state.job_wrapper.change_state( "running" ) + if self.app.config.output_size_limit > 0 and state == "R" and (pbs_job_state.check_count % 10) == 0: + # Every 10th time a job is checked, check the size of its outputs. + fail = False + for outfile, size in pbs_job_state.job_wrapper.check_output_sizes(): + if size > self.app.config.output_size_limit: + pbs_job_state.fail_message = 'Job output grew too large (greater than %s), please try different job parameters or' \ + % nice_size( self.app.config.output_size_limit ) + log.warning( '(%s/%s) Dequeueing job due to output %s growing larger than %s limit' \ + % ( galaxy_job_id, job_id, os.path.basename( outfile ), nice_size( self.app.config.output_size_limit ) ) ) + self.work_queue.put( ( 'fail_oversize_job', pbs_job_state ) ) + fail = True + break + if fail: + continue pbs_job_state.old_state = state new_watched.append( pbs_job_state ) else: @@ -329,6 +351,7 @@ pbs_server_name = self.determine_pbs_server( pbs_job_state.runner_url ) if pbs_server_name not in servers: servers.append( pbs_server_name ) + pbs_job_state.check_count += 1 for pbs_server_name in servers: c = pbs.pbs_connect( pbs_server_name ) if c <= 0: @@ -386,6 +409,14 @@ # clean up the pbs files self.cleanup( ( ofile, efile, job_file ) ) + def fail_oversize_job( self, pbs_job_state ): + """ + Seperated out so we can use the worker threads for it. + """ + self.stop_job( self.app.model.Job.get( pbs_job_state.job_wrapper.job_id ) ) + pbs_job_state.job_wrapper.fail( pbs_job_state.fail_message ) + self.cleanup( ( pbs_job_state.ofile, pbs_job_state.efile, pbs_job_state.job_file ) ) + def cleanup( self, files ): if not asbool( self.app.config.get( 'debug', False ) ): for file in files: @@ -431,7 +462,7 @@ return pbs.pbs_deljob( c, str( job.job_runner_external_id ), 'NULL' ) pbs.pbs_disconnect( c ) - log.debug( "(%s/%s) Removed from PBS queue at user's request" % ( job.id, job.job_runner_external_id ) ) + log.debug( "(%s/%s) Removed from PBS queue before job completion" % ( job.id, job.job_runner_external_id ) ) def recover( self, job, job_wrapper ): """Recovers jobs stuck in the queued/running state when Galaxy started""" diff -r 0424f713d58d -r e26741c8c642 lib/galaxy/tools/__init__.py --- a/lib/galaxy/tools/__init__.py Tue Feb 03 14:14:02 2009 -0500 +++ b/lib/galaxy/tools/__init__.py Tue Feb 03 14:15:09 2009 -0500 @@ -1045,7 +1045,7 @@ input.validate( value, None ) input_values[ input.name ] = value - def build_param_dict( self, incoming, input_datasets, output_datasets, working_directory = None ): + def build_param_dict( self, incoming, input_datasets, output_datasets, output_paths ): """ Build the dictionary of parameters for substituting into the command line. Each value is wrapped in a `InputValueWrapper`, which allows @@ -1098,10 +1098,14 @@ param_dict[ "_CHILD___%s___%s" % ( name, child.designation ) ] = DatasetFilenameWrapper( child ) for name, data in output_datasets.items(): # Write outputs to the working directory (for security purposes) if desired. - if self.app.config.outputs_to_working_directory and working_directory is not None: - false_path = os.path.abspath( os.path.join( working_directory, "galaxy_dataset_%d.dat" % data.dataset.id ) ) - param_dict[name] = DatasetFilenameWrapper( data, false_path = false_path ) - open( false_path, 'w' ).close() + if self.app.config.outputs_to_working_directory: + try: + false_path = [ dp.false_path for dp in output_paths if dp.real_path == data.file_name ][0] + param_dict[name] = DatasetFilenameWrapper( data, false_path = false_path ) + open( false_path, 'w' ).close() + except IndexError: + log.warning( "Unable to determine alternate path for writing job outputs, outputs will be written to their real paths" ) + param_dict[name] = DatasetFilenameWrapper( data ) else: param_dict[name] = DatasetFilenameWrapper( data ) # Provide access to a path to store additional files