7 new commits in galaxy-central: https://bitbucket.org/galaxy/galaxy-central/commits/bed23d528080/ Changeset: bed23d528080 User: jmchilton Date: 2014-05-05 19:35:09 Summary: Simplify command for grabbing tool in get_work_dir_outputs. Subtlely reduces code duplication and makes runners slightly easier to unit test (fewer dependencies). Affected #: 1 file diff -r b94b4da49bfdf4f204d0ad2dce878a37e6e4f3d3 -r bed23d5280808dbc2d6b56ddcfe572e9bedda97a lib/galaxy/jobs/runners/__init__.py --- a/lib/galaxy/jobs/runners/__init__.py +++ b/lib/galaxy/jobs/runners/__init__.py @@ -196,7 +196,7 @@ output_pairs = [] # Walk job's output associations to find and use from_work_dir attributes. job = job_wrapper.get_job() - job_tool = self.app.toolbox.tools_by_id.get( job.tool_id, None ) + job_tool = job_wrapper.tool for dataset_assoc in job.output_datasets + job.output_library_datasets: for dataset in dataset_assoc.dataset.dataset.history_associations + dataset_assoc.dataset.dataset.library_associations: if isinstance( dataset, self.app.model.HistoryDatasetAssociation ): https://bitbucket.org/galaxy/galaxy-central/commits/51c248b80e72/ Changeset: 51c248b80e72 User: jmchilton Date: 2014-05-05 19:35:09 Summary: Unit tests for local job runner. Test running, capturing standard output and exit code, killing job, setting metadata, adding Galaxy to jobs PYTHONPATH, GALAXY_SLOTS logic (default and overidding). Affected #: 2 files diff -r bed23d5280808dbc2d6b56ddcfe572e9bedda97a -r 51c248b80e729ec6a3ef39e851ebdf79ef888476 test/unit/jobs/test_runner_local.py --- /dev/null +++ b/test/unit/jobs/test_runner_local.py @@ -0,0 +1,155 @@ +import os +import threading +import time +from unittest import TestCase + +from galaxy.util import bunch +from galaxy.jobs.runners import local +from galaxy.jobs import metrics +from galaxy import model + +from tools_support import ( + UsesApp, + UsesTools +) + + +class TestLocalJobRunner( TestCase, UsesApp, UsesTools ): + + def setUp( self ): + self.setup_app() + self._init_tool() + self.app.job_metrics = metrics.JobMetrics() + self.job_wrapper = MockJobWrapper( self.app, self.test_directory, self.tool ) + + def tearDown( self ): + self.tear_down_app() + + def test_run( self ): + self.job_wrapper.command_line = "echo HelloWorld" + runner = local.LocalJobRunner( self.app, 1 ) + runner.queue_job( self.job_wrapper ) + assert self.job_wrapper.stdout.strip() == "HelloWorld" + + def test_galaxy_lib_on_path( self ): + self.job_wrapper.command_line = '''python -c "import galaxy.util"''' + runner = local.LocalJobRunner( self.app, 1 ) + runner.queue_job( self.job_wrapper ) + assert self.job_wrapper.exit_code == 0 + + def test_default_slots( self ): + self.job_wrapper.command_line = '''echo $GALAXY_SLOTS''' + runner = local.LocalJobRunner( self.app, 1 ) + runner.queue_job( self.job_wrapper ) + assert self.job_wrapper.stdout.strip() == "1" + + def test_slots_override( self ): + # Set local_slots in job destination to specify slots for + # local job runner. + self.job_wrapper.job_destination.params[ "local_slots" ] = 3 + self.job_wrapper.command_line = '''echo $GALAXY_SLOTS''' + runner = local.LocalJobRunner( self.app, 1 ) + runner.queue_job( self.job_wrapper ) + assert self.job_wrapper.stdout.strip() == "3" + + def test_exit_code( self ): + self.job_wrapper.command_line = '''sh -c "exit 4"''' + runner = local.LocalJobRunner( self.app, 1 ) + runner.queue_job( self.job_wrapper ) + assert self.job_wrapper.exit_code == 4 + + def test_metadata_gets_set( self ): + runner = local.LocalJobRunner( self.app, 1 ) + runner.queue_job( self.job_wrapper ) + assert os.path.exists( self.job_wrapper.mock_metadata_path ) + + def test_stopping_job( self ): + self.job_wrapper.command_line = '''python -c "import time; time.sleep(15)"''' + runner = local.LocalJobRunner( self.app, 1 ) + + def queue(): + runner.queue_job( self.job_wrapper ) + + t = threading.Thread(target=queue) + t.start() + while True: + if self.job_wrapper.external_id: + break + time.sleep( .01 ) + external_id = self.job_wrapper.external_id + mock_job = bunch.Bunch( + get_external_output_metadata=lambda: None, + get_job_runner_external_id=lambda: str(external_id), + get_id=lambda: 1 + ) + runner.stop_job( mock_job ) + t.join(1) + + +class MockJobWrapper( object ): + + def __init__( self, app, test_directory, tool ): + working_directory = os.path.join( test_directory, "workdir" ) + os.makedirs( working_directory ) + self.app = app + self.tool = tool + self.state = model.Job.states.QUEUED + self.command_line = "echo HelloWorld" + self.prepare_called = False + self.write_version_cmd = None + self.dependency_shell_commands = None + self.working_directory = working_directory + self.requires_setting_metadata = True + self.job_destination = bunch.Bunch( id="default", params={} ) + self.galaxy_lib_dir = os.path.abspath( "lib" ) + self.job_id = 1 + self.external_id = None + self.output_paths = [ '/tmp/output1.dat' ] + self.mock_metadata_path = os.path.abspath( os.path.join( test_directory, "METADATA_SET" ) ) + self.metadata_command = "touch %s" % self.mock_metadata_path + + # Cruft for setting metadata externally, axe at some point. + self.external_output_metadata = bunch.Bunch( + set_job_runner_external_pid=lambda pid, session: None + ) + self.app.datatypes_registry.set_external_metadata_tool = bunch.Bunch( + build_dependency_shell_commands=lambda: [] + ) + + def prepare( self ): + self.prepare_called = True + + def set_job_destination( self, job_destination, external_id ): + self.external_id = external_id + + def get_command_line( self ): + return self.command_line + + def get_id_tag( self ): + return "1" + + def get_state( self ): + return self.state + + def change_state( self, state ): + self.state = state + + def get_output_fnames( self ): + return [] + + def get_job( self ): + return model.Job() + + def setup_external_metadata( self, **kwds ): + return self.metadata_command + + def get_env_setup_clause( self ): + return "" + + def has_limits( self ): + return False + + def finish( self, stdout, stderr, exit_code ): + self.stdout = stdout + self.stderr = stderr + self.exit_code = exit_code diff -r bed23d5280808dbc2d6b56ddcfe572e9bedda97a -r 51c248b80e729ec6a3ef39e851ebdf79ef888476 test/unit/tools_support.py --- a/test/unit/tools_support.py +++ b/test/unit/tools_support.py @@ -58,7 +58,7 @@ class UsesTools( object ): - def _init_tool( self, tool_contents ): + def _init_tool( self, tool_contents=SIMPLE_TOOL_CONTENTS ): self.tool_file = os.path.join( self.test_directory, "tool.xml" ) self.app.config.drmaa_external_runjob_script = "" self.app.config.tool_secret = "testsecret" https://bitbucket.org/galaxy/galaxy-central/commits/ff666103459d/ Changeset: ff666103459d User: jmchilton Date: 2014-05-05 19:35:09 Summary: Refactor small __poll helper method out of local job runner's queue. Affected #: 1 file diff -r 51c248b80e729ec6a3ef39e851ebdf79ef888476 -r ff666103459de3c87595a1d25783d7a3283b4951 lib/galaxy/jobs/runners/local.py --- a/lib/galaxy/jobs/runners/local.py +++ b/lib/galaxy/jobs/runners/local.py @@ -18,6 +18,8 @@ __all__ = [ 'LocalJobRunner' ] +DEFAULT_POOL_SLEEP_TIME = 1 + class LocalJobRunner( BaseJobRunner ): """ @@ -85,20 +87,11 @@ preexec_fn=os.setpgrp ) job_wrapper.set_job_destination(job_wrapper.job_destination, proc.pid) job_wrapper.change_state( model.Job.states.RUNNING ) - job_start = datetime.datetime.now() - i = 0 - # Iterate until the process exits, periodically checking its limits - while proc.poll() is None: - i += 1 - if (i % 20) == 0: - msg = job_wrapper.check_limits(runtime=datetime.datetime.now() - job_start) - if msg is not None: - job_wrapper.fail(msg) - log.debug('(%s) Terminating process group' % job_id) - self._terminate(proc) - return - else: - sleep(1) + + terminated = self.__poll( proc, job_wrapper, job_id ) + if terminated: + return + # Reap the process and get the exit code. exit_code = proc.wait() stdout_file.seek( 0 ) @@ -168,3 +161,19 @@ if proc.poll() is None: os.killpg( proc.pid, 9 ) return proc.wait() # reap + + def __poll( self, proc, job_wrapper, job_id ): + job_start = datetime.datetime.now() + i = 0 + # Iterate until the process exits, periodically checking its limits + while proc.poll() is None: + i += 1 + if (i % 20) == 0: + msg = job_wrapper.check_limits(runtime=datetime.datetime.now() - job_start) + if msg is not None: + job_wrapper.fail(msg) + log.debug('(%s) Terminating process group' % job_id) + self._terminate(proc) + return True + else: + sleep( DEFAULT_POOL_SLEEP_TIME ) https://bitbucket.org/galaxy/galaxy-central/commits/50de2360e451/ Changeset: 50de2360e451 User: jmchilton Date: 2014-05-05 19:35:09 Summary: Local job runner - only poll if limits set (so can be set). Small optimization aimed at speeding up tests - unit and functional. Affected #: 2 files diff -r ff666103459de3c87595a1d25783d7a3283b4951 -r 50de2360e4511b91f22ed437c1d1d62826136342 lib/galaxy/jobs/__init__.py --- a/lib/galaxy/jobs/__init__.py +++ b/lib/galaxy/jobs/__init__.py @@ -1161,6 +1161,11 @@ return 'Job ran longer than the maximum allowed execution time (%s), please try different inputs or parameters' % self.app.job_config.limits.walltime return None + def has_limits( self ): + has_output_limit = self.app.job_config.limits.output_size > 0 + has_walltime_limit = self.app.job_config.limits.walltime_delta is not None + return has_output_limit or has_walltime_limit + def get_command_line( self ): return self.command_line diff -r ff666103459de3c87595a1d25783d7a3283b4951 -r 50de2360e4511b91f22ed437c1d1d62826136342 lib/galaxy/jobs/runners/local.py --- a/lib/galaxy/jobs/runners/local.py +++ b/lib/galaxy/jobs/runners/local.py @@ -88,7 +88,7 @@ job_wrapper.set_job_destination(job_wrapper.job_destination, proc.pid) job_wrapper.change_state( model.Job.states.RUNNING ) - terminated = self.__poll( proc, job_wrapper, job_id ) + terminated = self.__poll_if_needed( proc, job_wrapper, job_id ) if terminated: return @@ -162,7 +162,11 @@ os.killpg( proc.pid, 9 ) return proc.wait() # reap - def __poll( self, proc, job_wrapper, job_id ): + def __poll_if_needed( self, proc, job_wrapper, job_id ): + # Only poll if needed (i.e. job limits are set) + if not job_wrapper.has_limits(): + return + job_start = datetime.datetime.now() i = 0 # Iterate until the process exits, periodically checking its limits https://bitbucket.org/galaxy/galaxy-central/commits/03d912919356/ Changeset: 03d912919356 User: jmchilton Date: 2014-05-05 19:35:09 Summary: Allow embedding setting of metadata as part of local jobs. (Like it works for all other runners except the LWR). This is essentially scaffolding toward elimination of extra metadata step in local runner. Got Dan's blessing to kill this off a while ago but I still don't quite understand why it is separated out - so doing this in two parts. This changeset changes nothing by default but adds an option to local runner destinations (embed_metadata_in_job) to allow them operate in the new fashion (embed the metadata setting step in the job itself). At some later point this option should just be removed and all local jobs should operate this way. Use this new option to speed up the local job runner or to test out how this runner will work in the future. Affected #: 2 files diff -r 50de2360e4511b91f22ed437c1d1d62826136342 -r 03d912919356db7ff34d289985c8ee3ac2a67dd1 lib/galaxy/jobs/runners/local.py --- a/lib/galaxy/jobs/runners/local.py +++ b/lib/galaxy/jobs/runners/local.py @@ -13,12 +13,16 @@ from galaxy import model from galaxy.jobs.runners import BaseJobRunner from galaxy.util import DATABASE_MAX_STRING_SIZE, shrink_stream_by_size +from galaxy.util import asbool log = logging.getLogger( __name__ ) __all__ = [ 'LocalJobRunner' ] DEFAULT_POOL_SLEEP_TIME = 1 +# TODO: Set to false and just get rid of this option. It would simplify this +# class nicely. -John +DEFAULT_EMBED_METADATA_IN_JOB = False class LocalJobRunner( BaseJobRunner ): @@ -63,7 +67,8 @@ def queue_job( self, job_wrapper ): # prepare the job - if not self.prepare_job( job_wrapper ): + include_metadata = asbool( job_wrapper.job_destination.params.get( "embed_metadata_in_job", DEFAULT_EMBED_METADATA_IN_JOB ) ) + if not self.prepare_job( job_wrapper, include_metadata=include_metadata ): return stderr = stdout = '' @@ -105,7 +110,9 @@ log.exception("failure running job %d" % job_wrapper.job_id) job_wrapper.fail( "failure running job", exception=True ) return - self._handle_metadata_externally( job_wrapper, resolve_requirements=True ) + external_metadata = not asbool( job_wrapper.job_destination.params.get( "embed_metadata_in_job", DEFAULT_EMBED_METADATA_IN_JOB ) ) + if external_metadata: + self._handle_metadata_externally( job_wrapper, resolve_requirements=True ) # Finish the job! try: job_wrapper.finish( stdout, stderr, exit_code ) diff -r 50de2360e4511b91f22ed437c1d1d62826136342 -r 03d912919356db7ff34d289985c8ee3ac2a67dd1 test/unit/jobs/test_runner_local.py --- a/test/unit/jobs/test_runner_local.py +++ b/test/unit/jobs/test_runner_local.py @@ -63,6 +63,17 @@ runner.queue_job( self.job_wrapper ) assert os.path.exists( self.job_wrapper.mock_metadata_path ) + def test_metadata_gets_set_if_embedded( self ): + self.job_wrapper.job_destination.params[ "embed_metadata_in_job" ] = "True" + + # Kill off cruft for _handle_metadata_externally and make sure job stil works... + self.job_wrapper.external_output_metadata = None + self.app.datatypes_registry.set_external_metadata_tool = None + + runner = local.LocalJobRunner( self.app, 1 ) + runner.queue_job( self.job_wrapper ) + assert os.path.exists( self.job_wrapper.mock_metadata_path ) + def test_stopping_job( self ): self.job_wrapper.command_line = '''python -c "import time; time.sleep(15)"''' runner = local.LocalJobRunner( self.app, 1 ) https://bitbucket.org/galaxy/galaxy-central/commits/e1db77201a5d/ Changeset: e1db77201a5d User: jmchilton Date: 2014-05-05 19:35:09 Summary: Rework local job runner to create job script. Affected #: 2 files diff -r 03d912919356db7ff34d289985c8ee3ac2a67dd1 -r e1db77201a5d9e93cf7729c2695c34e98abf5252 lib/galaxy/jobs/runners/__init__.py --- a/lib/galaxy/jobs/runners/__init__.py +++ b/lib/galaxy/jobs/runners/__init__.py @@ -270,7 +270,36 @@ self.work_queue.put( ( self.finish_job, ajs ) ) -class AsynchronousJobState( object ): +class JobState( object ): + """ + Encapsulate state of jobs. + """ + + def set_defaults( self, files_dir ): + if self.job_wrapper is not None: + id_tag = self.job_wrapper.get_id_tag() + if files_dir is not None: + self.job_file = JobState.default_job_file( files_dir, id_tag ) + self.output_file = os.path.join( files_dir, 'galaxy_%s.o' % id_tag ) + self.error_file = os.path.join( files_dir, 'galaxy_%s.e' % id_tag ) + self.exit_code_file = os.path.join( files_dir, 'galaxy_%s.ec' % id_tag ) + job_name = 'g%s' % id_tag + if self.job_wrapper.tool.old_id: + job_name += '_%s' % self.job_wrapper.tool.old_id + if self.job_wrapper.user: + job_name += '_%s' % self.job_wrapper.user + self.job_name = ''.join( map( lambda x: x if x in ( string.letters + string.digits + '_' ) else '_', job_name ) ) + + @staticmethod + def default_job_file( files_dir, id_tag ): + return os.path.join( files_dir, 'galaxy_%s.sh' % id_tag ) + + @staticmethod + def default_exit_code_file( files_dir, id_tag ): + return os.path.join( files_dir, 'galaxy_%s.ec' % id_tag ) + + +class AsynchronousJobState( JobState ): """ Encapsulate the state of an asynchronous job, this should be subclassed as needed for various job runners to capture additional information needed @@ -297,21 +326,6 @@ self.cleanup_file_attributes = [ 'job_file', 'output_file', 'error_file', 'exit_code_file' ] - def set_defaults( self, files_dir ): - if self.job_wrapper is not None: - id_tag = self.job_wrapper.get_id_tag() - if files_dir is not None: - self.job_file = os.path.join( files_dir, 'galaxy_%s.sh' % id_tag ) - self.output_file = os.path.join( files_dir, 'galaxy_%s.o' % id_tag ) - self.error_file = os.path.join( files_dir, 'galaxy_%s.e' % id_tag ) - self.exit_code_file = os.path.join( files_dir, 'galaxy_%s.ec' % id_tag ) - job_name = 'g%s' % id_tag - if self.job_wrapper.tool.old_id: - job_name += '_%s' % self.job_wrapper.tool.old_id - if self.job_wrapper.user: - job_name += '_%s' % self.job_wrapper.user - self.job_name = ''.join( map( lambda x: x if x in ( string.letters + string.digits + '_' ) else '_', job_name ) ) - def cleanup( self ): for file in [ getattr( self, a ) for a in self.cleanup_file_attributes if hasattr( self, a ) ]: try: diff -r 03d912919356db7ff34d289985c8ee3ac2a67dd1 -r e1db77201a5d9e93cf7729c2695c34e98abf5252 lib/galaxy/jobs/runners/local.py --- a/lib/galaxy/jobs/runners/local.py +++ b/lib/galaxy/jobs/runners/local.py @@ -11,7 +11,8 @@ from time import sleep from galaxy import model -from galaxy.jobs.runners import BaseJobRunner +from ..runners import BaseJobRunner +from ..runners import JobState from galaxy.util import DATABASE_MAX_STRING_SIZE, shrink_stream_by_size from galaxy.util import asbool @@ -37,12 +38,6 @@ #create a local copy of os.environ to use as env for subprocess.Popen self._environ = os.environ.copy() - # put lib into the PYTHONPATH for subprocesses - if 'PYTHONPATH' in self._environ: - self._environ['PYTHONPATH'] = '%s:%s' % ( self._environ['PYTHONPATH'], os.path.abspath( 'lib' ) ) - else: - self._environ['PYTHONPATH'] = os.path.abspath( 'lib' ) - #Set TEMP if a valid temp value is not already set if not ( 'TMPDIR' in self._environ or 'TEMP' in self._environ or 'TMP' in self._environ ): self._environ[ 'TEMP' ] = tempfile.gettempdir() @@ -58,12 +53,24 @@ ## slots would be cleaner name, but don't want deployers to see examples and think it ## is going to work with other job runners. slots = job_wrapper.job_destination.params.get( "local_slots", None ) - command_line = command_line.lstrip( " ;" ) if slots: - command_line = 'GALAXY_SLOTS="%d"; export GALAXY_SLOTS; GALAXY_SLOTS_CONFIGURED="1"; export GALAXY_SLOTS_CONFIGURED; %s' % ( int( slots ), command_line ) + slots_statement = 'GALAXY_SLOTS="%d"; export GALAXY_SLOTS; GALAXY_SLOTS_CONFIGURED="1"; export GALAXY_SLOTS_CONFIGURED;' % ( int( slots ) ) else: - command_line = 'GALAXY_SLOTS="1"; export GALAXY_SLOTS; %s' % command_line - return command_line + slots_statement = 'GALAXY_SLOTS="1"; export GALAXY_SLOTS;' + + job_id = job_wrapper.get_id_tag() + job_file = JobState.default_job_file( job_wrapper.working_directory, job_id ) + exit_code_path = JobState.default_exit_code_file( job_wrapper.working_directory, job_id ) + job_script_props = { + 'slots_statement': slots_statement, + 'command': command_line, + 'exit_code_path': exit_code_path, + 'working_directory': job_wrapper.working_directory, + } + job_file_contents = self.get_job_file( job_wrapper, **job_script_props ) + open( job_file, 'w' ).write( job_file_contents ) + os.chmod( job_file, 0755 ) + return job_file, exit_code_path def queue_job( self, job_wrapper ): # prepare the job @@ -75,8 +82,7 @@ exit_code = 0 # command line has been added to the wrapper by prepare_job() - command_line = self.__command_line( job_wrapper ) - + command_line, exit_code_path = self.__command_line( job_wrapper ) job_id = job_wrapper.get_id_tag() try: @@ -99,6 +105,11 @@ # Reap the process and get the exit code. exit_code = proc.wait() + try: + exit_code = int( open( exit_code_path, 'r' ).read() ) + except Exception: + log.warn( "Failed to read exit code from path %s" % exit_code_path ) + pass stdout_file.seek( 0 ) stderr_file.seek( 0 ) stdout = shrink_stream_by_size( stdout_file, DATABASE_MAX_STRING_SIZE, join_by="\n..\n", left_larger=True, beginning_on_size_error=True ) https://bitbucket.org/galaxy/galaxy-central/commits/5bf39d99f789/ Changeset: 5bf39d99f789 User: jmchilton Date: 2014-05-19 04:12:36 Summary: Merged in jmchilton/galaxy-central-fork-1 (pull request #384) Local Job Runner Enhancements Affected #: 5 files diff -r 632087d479859d0a8152267f6c21a623722bc875 -r 5bf39d99f7892bd8d9733f5efd68a8bb356c7bd9 lib/galaxy/jobs/__init__.py --- a/lib/galaxy/jobs/__init__.py +++ b/lib/galaxy/jobs/__init__.py @@ -1163,6 +1163,11 @@ return 'Job ran longer than the maximum allowed execution time (%s), please try different inputs or parameters' % self.app.job_config.limits.walltime return None + def has_limits( self ): + has_output_limit = self.app.job_config.limits.output_size > 0 + has_walltime_limit = self.app.job_config.limits.walltime_delta is not None + return has_output_limit or has_walltime_limit + def get_command_line( self ): return self.command_line diff -r 632087d479859d0a8152267f6c21a623722bc875 -r 5bf39d99f7892bd8d9733f5efd68a8bb356c7bd9 lib/galaxy/jobs/runners/__init__.py --- a/lib/galaxy/jobs/runners/__init__.py +++ b/lib/galaxy/jobs/runners/__init__.py @@ -196,7 +196,7 @@ output_pairs = [] # Walk job's output associations to find and use from_work_dir attributes. job = job_wrapper.get_job() - job_tool = self.app.toolbox.tools_by_id.get( job.tool_id, None ) + job_tool = job_wrapper.tool for dataset_assoc in job.output_datasets + job.output_library_datasets: for dataset in dataset_assoc.dataset.dataset.history_associations + dataset_assoc.dataset.dataset.library_associations: if isinstance( dataset, self.app.model.HistoryDatasetAssociation ): @@ -269,7 +269,36 @@ self.work_queue.put( ( self.finish_job, ajs ) ) -class AsynchronousJobState( object ): +class JobState( object ): + """ + Encapsulate state of jobs. + """ + + def set_defaults( self, files_dir ): + if self.job_wrapper is not None: + id_tag = self.job_wrapper.get_id_tag() + if files_dir is not None: + self.job_file = JobState.default_job_file( files_dir, id_tag ) + self.output_file = os.path.join( files_dir, 'galaxy_%s.o' % id_tag ) + self.error_file = os.path.join( files_dir, 'galaxy_%s.e' % id_tag ) + self.exit_code_file = os.path.join( files_dir, 'galaxy_%s.ec' % id_tag ) + job_name = 'g%s' % id_tag + if self.job_wrapper.tool.old_id: + job_name += '_%s' % self.job_wrapper.tool.old_id + if self.job_wrapper.user: + job_name += '_%s' % self.job_wrapper.user + self.job_name = ''.join( map( lambda x: x if x in ( string.letters + string.digits + '_' ) else '_', job_name ) ) + + @staticmethod + def default_job_file( files_dir, id_tag ): + return os.path.join( files_dir, 'galaxy_%s.sh' % id_tag ) + + @staticmethod + def default_exit_code_file( files_dir, id_tag ): + return os.path.join( files_dir, 'galaxy_%s.ec' % id_tag ) + + +class AsynchronousJobState( JobState ): """ Encapsulate the state of an asynchronous job, this should be subclassed as needed for various job runners to capture additional information needed @@ -296,21 +325,6 @@ self.cleanup_file_attributes = [ 'job_file', 'output_file', 'error_file', 'exit_code_file' ] - def set_defaults( self, files_dir ): - if self.job_wrapper is not None: - id_tag = self.job_wrapper.get_id_tag() - if files_dir is not None: - self.job_file = os.path.join( files_dir, 'galaxy_%s.sh' % id_tag ) - self.output_file = os.path.join( files_dir, 'galaxy_%s.o' % id_tag ) - self.error_file = os.path.join( files_dir, 'galaxy_%s.e' % id_tag ) - self.exit_code_file = os.path.join( files_dir, 'galaxy_%s.ec' % id_tag ) - job_name = 'g%s' % id_tag - if self.job_wrapper.tool.old_id: - job_name += '_%s' % self.job_wrapper.tool.old_id - if self.job_wrapper.user: - job_name += '_%s' % self.job_wrapper.user - self.job_name = ''.join( map( lambda x: x if x in ( string.letters + string.digits + '_' ) else '_', job_name ) ) - def cleanup( self ): for file in [ getattr( self, a ) for a in self.cleanup_file_attributes if hasattr( self, a ) ]: try: diff -r 632087d479859d0a8152267f6c21a623722bc875 -r 5bf39d99f7892bd8d9733f5efd68a8bb356c7bd9 lib/galaxy/jobs/runners/local.py --- a/lib/galaxy/jobs/runners/local.py +++ b/lib/galaxy/jobs/runners/local.py @@ -11,13 +11,20 @@ from time import sleep from galaxy import model -from galaxy.jobs.runners import BaseJobRunner +from ..runners import BaseJobRunner +from ..runners import JobState from galaxy.util import DATABASE_MAX_STRING_SIZE, shrink_stream_by_size +from galaxy.util import asbool log = logging.getLogger( __name__ ) __all__ = [ 'LocalJobRunner' ] +DEFAULT_POOL_SLEEP_TIME = 1 +# TODO: Set to false and just get rid of this option. It would simplify this +# class nicely. -John +DEFAULT_EMBED_METADATA_IN_JOB = False + class LocalJobRunner( BaseJobRunner ): """ @@ -31,12 +38,6 @@ #create a local copy of os.environ to use as env for subprocess.Popen self._environ = os.environ.copy() - # put lib into the PYTHONPATH for subprocesses - if 'PYTHONPATH' in self._environ: - self._environ['PYTHONPATH'] = '%s:%s' % ( self._environ['PYTHONPATH'], os.path.abspath( 'lib' ) ) - else: - self._environ['PYTHONPATH'] = os.path.abspath( 'lib' ) - #Set TEMP if a valid temp value is not already set if not ( 'TMPDIR' in self._environ or 'TEMP' in self._environ or 'TMP' in self._environ ): self._environ[ 'TEMP' ] = os.path.abspath(tempfile.gettempdir()) @@ -52,24 +53,36 @@ ## slots would be cleaner name, but don't want deployers to see examples and think it ## is going to work with other job runners. slots = job_wrapper.job_destination.params.get( "local_slots", None ) - command_line = command_line.lstrip( " ;" ) if slots: - command_line = 'GALAXY_SLOTS="%d"; export GALAXY_SLOTS; GALAXY_SLOTS_CONFIGURED="1"; export GALAXY_SLOTS_CONFIGURED; %s' % ( int( slots ), command_line ) + slots_statement = 'GALAXY_SLOTS="%d"; export GALAXY_SLOTS; GALAXY_SLOTS_CONFIGURED="1"; export GALAXY_SLOTS_CONFIGURED;' % ( int( slots ) ) else: - command_line = 'GALAXY_SLOTS="1"; export GALAXY_SLOTS; %s' % command_line - return command_line + slots_statement = 'GALAXY_SLOTS="1"; export GALAXY_SLOTS;' + + job_id = job_wrapper.get_id_tag() + job_file = JobState.default_job_file( job_wrapper.working_directory, job_id ) + exit_code_path = JobState.default_exit_code_file( job_wrapper.working_directory, job_id ) + job_script_props = { + 'slots_statement': slots_statement, + 'command': command_line, + 'exit_code_path': exit_code_path, + 'working_directory': job_wrapper.working_directory, + } + job_file_contents = self.get_job_file( job_wrapper, **job_script_props ) + open( job_file, 'w' ).write( job_file_contents ) + os.chmod( job_file, 0755 ) + return job_file, exit_code_path def queue_job( self, job_wrapper ): # prepare the job - if not self.prepare_job( job_wrapper ): + include_metadata = asbool( job_wrapper.job_destination.params.get( "embed_metadata_in_job", DEFAULT_EMBED_METADATA_IN_JOB ) ) + if not self.prepare_job( job_wrapper, include_metadata=include_metadata ): return stderr = stdout = '' exit_code = 0 # command line has been added to the wrapper by prepare_job() - command_line = self.__command_line( job_wrapper ) - + command_line, exit_code_path = self.__command_line( job_wrapper ) job_id = job_wrapper.get_id_tag() try: @@ -85,22 +98,18 @@ preexec_fn=os.setpgrp ) job_wrapper.set_job_destination(job_wrapper.job_destination, proc.pid) job_wrapper.change_state( model.Job.states.RUNNING ) - job_start = datetime.datetime.now() - i = 0 - # Iterate until the process exits, periodically checking its limits - while proc.poll() is None: - i += 1 - if (i % 20) == 0: - msg = job_wrapper.check_limits(runtime=datetime.datetime.now() - job_start) - if msg is not None: - job_wrapper.fail(msg) - log.debug('(%s) Terminating process group' % job_id) - self._terminate(proc) - return - else: - sleep(1) + + terminated = self.__poll_if_needed( proc, job_wrapper, job_id ) + if terminated: + return + # Reap the process and get the exit code. exit_code = proc.wait() + try: + exit_code = int( open( exit_code_path, 'r' ).read() ) + except Exception: + log.warn( "Failed to read exit code from path %s" % exit_code_path ) + pass stdout_file.seek( 0 ) stderr_file.seek( 0 ) stdout = shrink_stream_by_size( stdout_file, DATABASE_MAX_STRING_SIZE, join_by="\n..\n", left_larger=True, beginning_on_size_error=True ) @@ -112,7 +121,9 @@ log.exception("failure running job %d" % job_wrapper.job_id) job_wrapper.fail( "failure running job", exception=True ) return - self._handle_metadata_externally( job_wrapper, resolve_requirements=True ) + external_metadata = not asbool( job_wrapper.job_destination.params.get( "embed_metadata_in_job", DEFAULT_EMBED_METADATA_IN_JOB ) ) + if external_metadata: + self._handle_metadata_externally( job_wrapper, resolve_requirements=True ) # Finish the job! try: job_wrapper.finish( stdout, stderr, exit_code ) @@ -168,3 +179,23 @@ if proc.poll() is None: os.killpg( proc.pid, 9 ) return proc.wait() # reap + + def __poll_if_needed( self, proc, job_wrapper, job_id ): + # Only poll if needed (i.e. job limits are set) + if not job_wrapper.has_limits(): + return + + job_start = datetime.datetime.now() + i = 0 + # Iterate until the process exits, periodically checking its limits + while proc.poll() is None: + i += 1 + if (i % 20) == 0: + msg = job_wrapper.check_limits(runtime=datetime.datetime.now() - job_start) + if msg is not None: + job_wrapper.fail(msg) + log.debug('(%s) Terminating process group' % job_id) + self._terminate(proc) + return True + else: + sleep( DEFAULT_POOL_SLEEP_TIME ) diff -r 632087d479859d0a8152267f6c21a623722bc875 -r 5bf39d99f7892bd8d9733f5efd68a8bb356c7bd9 test/unit/jobs/test_runner_local.py --- /dev/null +++ b/test/unit/jobs/test_runner_local.py @@ -0,0 +1,166 @@ +import os +import threading +import time +from unittest import TestCase + +from galaxy.util import bunch +from galaxy.jobs.runners import local +from galaxy.jobs import metrics +from galaxy import model + +from tools_support import ( + UsesApp, + UsesTools +) + + +class TestLocalJobRunner( TestCase, UsesApp, UsesTools ): + + def setUp( self ): + self.setup_app() + self._init_tool() + self.app.job_metrics = metrics.JobMetrics() + self.job_wrapper = MockJobWrapper( self.app, self.test_directory, self.tool ) + + def tearDown( self ): + self.tear_down_app() + + def test_run( self ): + self.job_wrapper.command_line = "echo HelloWorld" + runner = local.LocalJobRunner( self.app, 1 ) + runner.queue_job( self.job_wrapper ) + assert self.job_wrapper.stdout.strip() == "HelloWorld" + + def test_galaxy_lib_on_path( self ): + self.job_wrapper.command_line = '''python -c "import galaxy.util"''' + runner = local.LocalJobRunner( self.app, 1 ) + runner.queue_job( self.job_wrapper ) + assert self.job_wrapper.exit_code == 0 + + def test_default_slots( self ): + self.job_wrapper.command_line = '''echo $GALAXY_SLOTS''' + runner = local.LocalJobRunner( self.app, 1 ) + runner.queue_job( self.job_wrapper ) + assert self.job_wrapper.stdout.strip() == "1" + + def test_slots_override( self ): + # Set local_slots in job destination to specify slots for + # local job runner. + self.job_wrapper.job_destination.params[ "local_slots" ] = 3 + self.job_wrapper.command_line = '''echo $GALAXY_SLOTS''' + runner = local.LocalJobRunner( self.app, 1 ) + runner.queue_job( self.job_wrapper ) + assert self.job_wrapper.stdout.strip() == "3" + + def test_exit_code( self ): + self.job_wrapper.command_line = '''sh -c "exit 4"''' + runner = local.LocalJobRunner( self.app, 1 ) + runner.queue_job( self.job_wrapper ) + assert self.job_wrapper.exit_code == 4 + + def test_metadata_gets_set( self ): + runner = local.LocalJobRunner( self.app, 1 ) + runner.queue_job( self.job_wrapper ) + assert os.path.exists( self.job_wrapper.mock_metadata_path ) + + def test_metadata_gets_set_if_embedded( self ): + self.job_wrapper.job_destination.params[ "embed_metadata_in_job" ] = "True" + + # Kill off cruft for _handle_metadata_externally and make sure job stil works... + self.job_wrapper.external_output_metadata = None + self.app.datatypes_registry.set_external_metadata_tool = None + + runner = local.LocalJobRunner( self.app, 1 ) + runner.queue_job( self.job_wrapper ) + assert os.path.exists( self.job_wrapper.mock_metadata_path ) + + def test_stopping_job( self ): + self.job_wrapper.command_line = '''python -c "import time; time.sleep(15)"''' + runner = local.LocalJobRunner( self.app, 1 ) + + def queue(): + runner.queue_job( self.job_wrapper ) + + t = threading.Thread(target=queue) + t.start() + while True: + if self.job_wrapper.external_id: + break + time.sleep( .01 ) + external_id = self.job_wrapper.external_id + mock_job = bunch.Bunch( + get_external_output_metadata=lambda: None, + get_job_runner_external_id=lambda: str(external_id), + get_id=lambda: 1 + ) + runner.stop_job( mock_job ) + t.join(1) + + +class MockJobWrapper( object ): + + def __init__( self, app, test_directory, tool ): + working_directory = os.path.join( test_directory, "workdir" ) + os.makedirs( working_directory ) + self.app = app + self.tool = tool + self.state = model.Job.states.QUEUED + self.command_line = "echo HelloWorld" + self.prepare_called = False + self.write_version_cmd = None + self.dependency_shell_commands = None + self.working_directory = working_directory + self.requires_setting_metadata = True + self.job_destination = bunch.Bunch( id="default", params={} ) + self.galaxy_lib_dir = os.path.abspath( "lib" ) + self.job_id = 1 + self.external_id = None + self.output_paths = [ '/tmp/output1.dat' ] + self.mock_metadata_path = os.path.abspath( os.path.join( test_directory, "METADATA_SET" ) ) + self.metadata_command = "touch %s" % self.mock_metadata_path + + # Cruft for setting metadata externally, axe at some point. + self.external_output_metadata = bunch.Bunch( + set_job_runner_external_pid=lambda pid, session: None + ) + self.app.datatypes_registry.set_external_metadata_tool = bunch.Bunch( + build_dependency_shell_commands=lambda: [] + ) + + def prepare( self ): + self.prepare_called = True + + def set_job_destination( self, job_destination, external_id ): + self.external_id = external_id + + def get_command_line( self ): + return self.command_line + + def get_id_tag( self ): + return "1" + + def get_state( self ): + return self.state + + def change_state( self, state ): + self.state = state + + def get_output_fnames( self ): + return [] + + def get_job( self ): + return model.Job() + + def setup_external_metadata( self, **kwds ): + return self.metadata_command + + def get_env_setup_clause( self ): + return "" + + def has_limits( self ): + return False + + def finish( self, stdout, stderr, exit_code ): + self.stdout = stdout + self.stderr = stderr + self.exit_code = exit_code diff -r 632087d479859d0a8152267f6c21a623722bc875 -r 5bf39d99f7892bd8d9733f5efd68a8bb356c7bd9 test/unit/tools_support.py --- a/test/unit/tools_support.py +++ b/test/unit/tools_support.py @@ -58,7 +58,7 @@ class UsesTools( object ): - def _init_tool( self, tool_contents ): + def _init_tool( self, tool_contents=SIMPLE_TOOL_CONTENTS ): self.tool_file = os.path.join( self.test_directory, "tool.xml" ) self.app.config.drmaa_external_runjob_script = "" self.app.config.tool_secret = "testsecret" Repository URL: https://bitbucket.org/galaxy/galaxy-central/ -- This is a commit notification from bitbucket.org. You are receiving this because you have the service enabled, addressing the recipient of this email.