1 new commit in galaxy-central: https://bitbucket.org/galaxy/galaxy-central/commits/5bf39d99f789/ Changeset: 5bf39d99f789 User: jmchilton Date: 2014-05-19 04:12:36 Summary: Merged in jmchilton/galaxy-central-fork-1 (pull request #384) Local Job Runner Enhancements Affected #: 5 files diff -r 632087d479859d0a8152267f6c21a623722bc875 -r 5bf39d99f7892bd8d9733f5efd68a8bb356c7bd9 lib/galaxy/jobs/__init__.py --- a/lib/galaxy/jobs/__init__.py +++ b/lib/galaxy/jobs/__init__.py @@ -1163,6 +1163,11 @@ return 'Job ran longer than the maximum allowed execution time (%s), please try different inputs or parameters' % self.app.job_config.limits.walltime return None + def has_limits( self ): + has_output_limit = self.app.job_config.limits.output_size > 0 + has_walltime_limit = self.app.job_config.limits.walltime_delta is not None + return has_output_limit or has_walltime_limit + def get_command_line( self ): return self.command_line diff -r 632087d479859d0a8152267f6c21a623722bc875 -r 5bf39d99f7892bd8d9733f5efd68a8bb356c7bd9 lib/galaxy/jobs/runners/__init__.py --- a/lib/galaxy/jobs/runners/__init__.py +++ b/lib/galaxy/jobs/runners/__init__.py @@ -196,7 +196,7 @@ output_pairs = [] # Walk job's output associations to find and use from_work_dir attributes. job = job_wrapper.get_job() - job_tool = self.app.toolbox.tools_by_id.get( job.tool_id, None ) + job_tool = job_wrapper.tool for dataset_assoc in job.output_datasets + job.output_library_datasets: for dataset in dataset_assoc.dataset.dataset.history_associations + dataset_assoc.dataset.dataset.library_associations: if isinstance( dataset, self.app.model.HistoryDatasetAssociation ): @@ -269,7 +269,36 @@ self.work_queue.put( ( self.finish_job, ajs ) ) -class AsynchronousJobState( object ): +class JobState( object ): + """ + Encapsulate state of jobs. + """ + + def set_defaults( self, files_dir ): + if self.job_wrapper is not None: + id_tag = self.job_wrapper.get_id_tag() + if files_dir is not None: + self.job_file = JobState.default_job_file( files_dir, id_tag ) + self.output_file = os.path.join( files_dir, 'galaxy_%s.o' % id_tag ) + self.error_file = os.path.join( files_dir, 'galaxy_%s.e' % id_tag ) + self.exit_code_file = os.path.join( files_dir, 'galaxy_%s.ec' % id_tag ) + job_name = 'g%s' % id_tag + if self.job_wrapper.tool.old_id: + job_name += '_%s' % self.job_wrapper.tool.old_id + if self.job_wrapper.user: + job_name += '_%s' % self.job_wrapper.user + self.job_name = ''.join( map( lambda x: x if x in ( string.letters + string.digits + '_' ) else '_', job_name ) ) + + @staticmethod + def default_job_file( files_dir, id_tag ): + return os.path.join( files_dir, 'galaxy_%s.sh' % id_tag ) + + @staticmethod + def default_exit_code_file( files_dir, id_tag ): + return os.path.join( files_dir, 'galaxy_%s.ec' % id_tag ) + + +class AsynchronousJobState( JobState ): """ Encapsulate the state of an asynchronous job, this should be subclassed as needed for various job runners to capture additional information needed @@ -296,21 +325,6 @@ self.cleanup_file_attributes = [ 'job_file', 'output_file', 'error_file', 'exit_code_file' ] - def set_defaults( self, files_dir ): - if self.job_wrapper is not None: - id_tag = self.job_wrapper.get_id_tag() - if files_dir is not None: - self.job_file = os.path.join( files_dir, 'galaxy_%s.sh' % id_tag ) - self.output_file = os.path.join( files_dir, 'galaxy_%s.o' % id_tag ) - self.error_file = os.path.join( files_dir, 'galaxy_%s.e' % id_tag ) - self.exit_code_file = os.path.join( files_dir, 'galaxy_%s.ec' % id_tag ) - job_name = 'g%s' % id_tag - if self.job_wrapper.tool.old_id: - job_name += '_%s' % self.job_wrapper.tool.old_id - if self.job_wrapper.user: - job_name += '_%s' % self.job_wrapper.user - self.job_name = ''.join( map( lambda x: x if x in ( string.letters + string.digits + '_' ) else '_', job_name ) ) - def cleanup( self ): for file in [ getattr( self, a ) for a in self.cleanup_file_attributes if hasattr( self, a ) ]: try: diff -r 632087d479859d0a8152267f6c21a623722bc875 -r 5bf39d99f7892bd8d9733f5efd68a8bb356c7bd9 lib/galaxy/jobs/runners/local.py --- a/lib/galaxy/jobs/runners/local.py +++ b/lib/galaxy/jobs/runners/local.py @@ -11,13 +11,20 @@ from time import sleep from galaxy import model -from galaxy.jobs.runners import BaseJobRunner +from ..runners import BaseJobRunner +from ..runners import JobState from galaxy.util import DATABASE_MAX_STRING_SIZE, shrink_stream_by_size +from galaxy.util import asbool log = logging.getLogger( __name__ ) __all__ = [ 'LocalJobRunner' ] +DEFAULT_POOL_SLEEP_TIME = 1 +# TODO: Set to false and just get rid of this option. It would simplify this +# class nicely. -John +DEFAULT_EMBED_METADATA_IN_JOB = False + class LocalJobRunner( BaseJobRunner ): """ @@ -31,12 +38,6 @@ #create a local copy of os.environ to use as env for subprocess.Popen self._environ = os.environ.copy() - # put lib into the PYTHONPATH for subprocesses - if 'PYTHONPATH' in self._environ: - self._environ['PYTHONPATH'] = '%s:%s' % ( self._environ['PYTHONPATH'], os.path.abspath( 'lib' ) ) - else: - self._environ['PYTHONPATH'] = os.path.abspath( 'lib' ) - #Set TEMP if a valid temp value is not already set if not ( 'TMPDIR' in self._environ or 'TEMP' in self._environ or 'TMP' in self._environ ): self._environ[ 'TEMP' ] = os.path.abspath(tempfile.gettempdir()) @@ -52,24 +53,36 @@ ## slots would be cleaner name, but don't want deployers to see examples and think it ## is going to work with other job runners. slots = job_wrapper.job_destination.params.get( "local_slots", None ) - command_line = command_line.lstrip( " ;" ) if slots: - command_line = 'GALAXY_SLOTS="%d"; export GALAXY_SLOTS; GALAXY_SLOTS_CONFIGURED="1"; export GALAXY_SLOTS_CONFIGURED; %s' % ( int( slots ), command_line ) + slots_statement = 'GALAXY_SLOTS="%d"; export GALAXY_SLOTS; GALAXY_SLOTS_CONFIGURED="1"; export GALAXY_SLOTS_CONFIGURED;' % ( int( slots ) ) else: - command_line = 'GALAXY_SLOTS="1"; export GALAXY_SLOTS; %s' % command_line - return command_line + slots_statement = 'GALAXY_SLOTS="1"; export GALAXY_SLOTS;' + + job_id = job_wrapper.get_id_tag() + job_file = JobState.default_job_file( job_wrapper.working_directory, job_id ) + exit_code_path = JobState.default_exit_code_file( job_wrapper.working_directory, job_id ) + job_script_props = { + 'slots_statement': slots_statement, + 'command': command_line, + 'exit_code_path': exit_code_path, + 'working_directory': job_wrapper.working_directory, + } + job_file_contents = self.get_job_file( job_wrapper, **job_script_props ) + open( job_file, 'w' ).write( job_file_contents ) + os.chmod( job_file, 0755 ) + return job_file, exit_code_path def queue_job( self, job_wrapper ): # prepare the job - if not self.prepare_job( job_wrapper ): + include_metadata = asbool( job_wrapper.job_destination.params.get( "embed_metadata_in_job", DEFAULT_EMBED_METADATA_IN_JOB ) ) + if not self.prepare_job( job_wrapper, include_metadata=include_metadata ): return stderr = stdout = '' exit_code = 0 # command line has been added to the wrapper by prepare_job() - command_line = self.__command_line( job_wrapper ) - + command_line, exit_code_path = self.__command_line( job_wrapper ) job_id = job_wrapper.get_id_tag() try: @@ -85,22 +98,18 @@ preexec_fn=os.setpgrp ) job_wrapper.set_job_destination(job_wrapper.job_destination, proc.pid) job_wrapper.change_state( model.Job.states.RUNNING ) - job_start = datetime.datetime.now() - i = 0 - # Iterate until the process exits, periodically checking its limits - while proc.poll() is None: - i += 1 - if (i % 20) == 0: - msg = job_wrapper.check_limits(runtime=datetime.datetime.now() - job_start) - if msg is not None: - job_wrapper.fail(msg) - log.debug('(%s) Terminating process group' % job_id) - self._terminate(proc) - return - else: - sleep(1) + + terminated = self.__poll_if_needed( proc, job_wrapper, job_id ) + if terminated: + return + # Reap the process and get the exit code. exit_code = proc.wait() + try: + exit_code = int( open( exit_code_path, 'r' ).read() ) + except Exception: + log.warn( "Failed to read exit code from path %s" % exit_code_path ) + pass stdout_file.seek( 0 ) stderr_file.seek( 0 ) stdout = shrink_stream_by_size( stdout_file, DATABASE_MAX_STRING_SIZE, join_by="\n..\n", left_larger=True, beginning_on_size_error=True ) @@ -112,7 +121,9 @@ log.exception("failure running job %d" % job_wrapper.job_id) job_wrapper.fail( "failure running job", exception=True ) return - self._handle_metadata_externally( job_wrapper, resolve_requirements=True ) + external_metadata = not asbool( job_wrapper.job_destination.params.get( "embed_metadata_in_job", DEFAULT_EMBED_METADATA_IN_JOB ) ) + if external_metadata: + self._handle_metadata_externally( job_wrapper, resolve_requirements=True ) # Finish the job! try: job_wrapper.finish( stdout, stderr, exit_code ) @@ -168,3 +179,23 @@ if proc.poll() is None: os.killpg( proc.pid, 9 ) return proc.wait() # reap + + def __poll_if_needed( self, proc, job_wrapper, job_id ): + # Only poll if needed (i.e. job limits are set) + if not job_wrapper.has_limits(): + return + + job_start = datetime.datetime.now() + i = 0 + # Iterate until the process exits, periodically checking its limits + while proc.poll() is None: + i += 1 + if (i % 20) == 0: + msg = job_wrapper.check_limits(runtime=datetime.datetime.now() - job_start) + if msg is not None: + job_wrapper.fail(msg) + log.debug('(%s) Terminating process group' % job_id) + self._terminate(proc) + return True + else: + sleep( DEFAULT_POOL_SLEEP_TIME ) diff -r 632087d479859d0a8152267f6c21a623722bc875 -r 5bf39d99f7892bd8d9733f5efd68a8bb356c7bd9 test/unit/jobs/test_runner_local.py --- /dev/null +++ b/test/unit/jobs/test_runner_local.py @@ -0,0 +1,166 @@ +import os +import threading +import time +from unittest import TestCase + +from galaxy.util import bunch +from galaxy.jobs.runners import local +from galaxy.jobs import metrics +from galaxy import model + +from tools_support import ( + UsesApp, + UsesTools +) + + +class TestLocalJobRunner( TestCase, UsesApp, UsesTools ): + + def setUp( self ): + self.setup_app() + self._init_tool() + self.app.job_metrics = metrics.JobMetrics() + self.job_wrapper = MockJobWrapper( self.app, self.test_directory, self.tool ) + + def tearDown( self ): + self.tear_down_app() + + def test_run( self ): + self.job_wrapper.command_line = "echo HelloWorld" + runner = local.LocalJobRunner( self.app, 1 ) + runner.queue_job( self.job_wrapper ) + assert self.job_wrapper.stdout.strip() == "HelloWorld" + + def test_galaxy_lib_on_path( self ): + self.job_wrapper.command_line = '''python -c "import galaxy.util"''' + runner = local.LocalJobRunner( self.app, 1 ) + runner.queue_job( self.job_wrapper ) + assert self.job_wrapper.exit_code == 0 + + def test_default_slots( self ): + self.job_wrapper.command_line = '''echo $GALAXY_SLOTS''' + runner = local.LocalJobRunner( self.app, 1 ) + runner.queue_job( self.job_wrapper ) + assert self.job_wrapper.stdout.strip() == "1" + + def test_slots_override( self ): + # Set local_slots in job destination to specify slots for + # local job runner. + self.job_wrapper.job_destination.params[ "local_slots" ] = 3 + self.job_wrapper.command_line = '''echo $GALAXY_SLOTS''' + runner = local.LocalJobRunner( self.app, 1 ) + runner.queue_job( self.job_wrapper ) + assert self.job_wrapper.stdout.strip() == "3" + + def test_exit_code( self ): + self.job_wrapper.command_line = '''sh -c "exit 4"''' + runner = local.LocalJobRunner( self.app, 1 ) + runner.queue_job( self.job_wrapper ) + assert self.job_wrapper.exit_code == 4 + + def test_metadata_gets_set( self ): + runner = local.LocalJobRunner( self.app, 1 ) + runner.queue_job( self.job_wrapper ) + assert os.path.exists( self.job_wrapper.mock_metadata_path ) + + def test_metadata_gets_set_if_embedded( self ): + self.job_wrapper.job_destination.params[ "embed_metadata_in_job" ] = "True" + + # Kill off cruft for _handle_metadata_externally and make sure job stil works... + self.job_wrapper.external_output_metadata = None + self.app.datatypes_registry.set_external_metadata_tool = None + + runner = local.LocalJobRunner( self.app, 1 ) + runner.queue_job( self.job_wrapper ) + assert os.path.exists( self.job_wrapper.mock_metadata_path ) + + def test_stopping_job( self ): + self.job_wrapper.command_line = '''python -c "import time; time.sleep(15)"''' + runner = local.LocalJobRunner( self.app, 1 ) + + def queue(): + runner.queue_job( self.job_wrapper ) + + t = threading.Thread(target=queue) + t.start() + while True: + if self.job_wrapper.external_id: + break + time.sleep( .01 ) + external_id = self.job_wrapper.external_id + mock_job = bunch.Bunch( + get_external_output_metadata=lambda: None, + get_job_runner_external_id=lambda: str(external_id), + get_id=lambda: 1 + ) + runner.stop_job( mock_job ) + t.join(1) + + +class MockJobWrapper( object ): + + def __init__( self, app, test_directory, tool ): + working_directory = os.path.join( test_directory, "workdir" ) + os.makedirs( working_directory ) + self.app = app + self.tool = tool + self.state = model.Job.states.QUEUED + self.command_line = "echo HelloWorld" + self.prepare_called = False + self.write_version_cmd = None + self.dependency_shell_commands = None + self.working_directory = working_directory + self.requires_setting_metadata = True + self.job_destination = bunch.Bunch( id="default", params={} ) + self.galaxy_lib_dir = os.path.abspath( "lib" ) + self.job_id = 1 + self.external_id = None + self.output_paths = [ '/tmp/output1.dat' ] + self.mock_metadata_path = os.path.abspath( os.path.join( test_directory, "METADATA_SET" ) ) + self.metadata_command = "touch %s" % self.mock_metadata_path + + # Cruft for setting metadata externally, axe at some point. + self.external_output_metadata = bunch.Bunch( + set_job_runner_external_pid=lambda pid, session: None + ) + self.app.datatypes_registry.set_external_metadata_tool = bunch.Bunch( + build_dependency_shell_commands=lambda: [] + ) + + def prepare( self ): + self.prepare_called = True + + def set_job_destination( self, job_destination, external_id ): + self.external_id = external_id + + def get_command_line( self ): + return self.command_line + + def get_id_tag( self ): + return "1" + + def get_state( self ): + return self.state + + def change_state( self, state ): + self.state = state + + def get_output_fnames( self ): + return [] + + def get_job( self ): + return model.Job() + + def setup_external_metadata( self, **kwds ): + return self.metadata_command + + def get_env_setup_clause( self ): + return "" + + def has_limits( self ): + return False + + def finish( self, stdout, stderr, exit_code ): + self.stdout = stdout + self.stderr = stderr + self.exit_code = exit_code diff -r 632087d479859d0a8152267f6c21a623722bc875 -r 5bf39d99f7892bd8d9733f5efd68a8bb356c7bd9 test/unit/tools_support.py --- a/test/unit/tools_support.py +++ b/test/unit/tools_support.py @@ -58,7 +58,7 @@ class UsesTools( object ): - def _init_tool( self, tool_contents ): + def _init_tool( self, tool_contents=SIMPLE_TOOL_CONTENTS ): self.tool_file = os.path.join( self.test_directory, "tool.xml" ) self.app.config.drmaa_external_runjob_script = "" self.app.config.tool_secret = "testsecret" Repository URL: https://bitbucket.org/galaxy/galaxy-central/ -- This is a commit notification from bitbucket.org. You are receiving this because you have the service enabled, addressing the recipient of this email.