commit/galaxy-central: 5 new changesets
5 new commits in galaxy-central: https://bitbucket.org/galaxy/galaxy-central/commits/e9c37e7632c6/ Changeset: e9c37e7632c6 User: jmchilton Date: 2013-12-10 00:14:39 Summary: Update LWR client through LWR changeset 295fbb09de58. Main addition here is introduction of logic for handling of implicit working directory files - namely those needed for variable count outputs (should work immediately) and setting metadata externally (requires additional changes). Fixes bugs related to uploading empty working directory files (cannot mmap empty file) and incorrectly cleaning up of sucessful jobs when cleanup_job=never. As well as various readibility-centric refactorings. Most interesting LWR commits this covers includes: https://bitbucket.org/jmchilton/lwr/commits/1886ad1c0951605d2bc1a6e167f84f57... https://bitbucket.org/jmchilton/lwr/commits/295fbb09de582a369e72d4331708be3f... https://bitbucket.org/jmchilton/lwr/commits/45f0c818b91adfd63a9a4b3488a4a0d8... Affected #: 6 files diff -r 142006369d22f60a12c4573a782bbb30c58efbb9 -r e9c37e7632c66a555cfeabb3d1dfa30d8b621881 lib/galaxy/jobs/runners/lwr.py --- a/lib/galaxy/jobs/runners/lwr.py +++ b/lib/galaxy/jobs/runners/lwr.py @@ -12,6 +12,7 @@ from .lwr_client import ClientManager, url_to_destination_params from .lwr_client import finish_job as lwr_finish_job from .lwr_client import submit_job as lwr_submit_job +from .lwr_client import ClientJobDescription log = logging.getLogger( __name__ ) @@ -78,12 +79,15 @@ try: client = self.get_client_from_wrapper(job_wrapper) - output_files = self.get_output_files(job_wrapper) - input_files = job_wrapper.get_input_fnames() - working_directory = job_wrapper.working_directory - tool = job_wrapper.tool - config_files = job_wrapper.extra_filenames - job_id = lwr_submit_job(client, tool, command_line, config_files, input_files, output_files, working_directory) + client_job_description = ClientJobDescription( + command_line=command_line, + output_files=self.get_output_files(job_wrapper), + input_files=job_wrapper.get_input_fnames(), + working_directory=job_wrapper.working_directory, + tool=job_wrapper.tool, + config_files=job_wrapper.extra_filenames, + ) + job_id = lwr_submit_job(client, client_job_description) log.info("lwr job submitted with job_id %s" % job_id) job_wrapper.set_job_destination( job_destination, job_id ) job_wrapper.change_state( model.Job.states.QUEUED ) diff -r 142006369d22f60a12c4573a782bbb30c58efbb9 -r e9c37e7632c66a555cfeabb3d1dfa30d8b621881 lib/galaxy/jobs/runners/lwr_client/__init__.py --- a/lib/galaxy/jobs/runners/lwr_client/__init__.py +++ b/lib/galaxy/jobs/runners/lwr_client/__init__.py @@ -6,9 +6,9 @@ """ -from .stager import submit_job, finish_job +from .stager import submit_job, finish_job, ClientJobDescription from .client import OutputNotFoundException from .manager import ClientManager from .destination import url_to_destination_params -__all__ = [ClientManager, OutputNotFoundException, url_to_destination_params, finish_job, submit_job] +__all__ = [ClientManager, OutputNotFoundException, url_to_destination_params, finish_job, submit_job, ClientJobDescription] diff -r 142006369d22f60a12c4573a782bbb30c58efbb9 -r e9c37e7632c66a555cfeabb3d1dfa30d8b621881 lib/galaxy/jobs/runners/lwr_client/client.py --- a/lib/galaxy/jobs/runners/lwr_client/client.py +++ b/lib/galaxy/jobs/runners/lwr_client/client.py @@ -50,8 +50,7 @@ return "No remote output found for path %s" % self.path -# TODO: Rename to job client. -class Client(object): +class JobClient(object): """ Objects of this client class perform low-level communication with a remote LWR server. @@ -162,25 +161,23 @@ raise Exception("Unknown output_type returned from LWR server %s" % output_type) return output_path - def fetch_work_dir_output(self, source, working_directory, output_path, action='transfer'): + def fetch_work_dir_output(self, name, working_directory, output_path, action='transfer'): """ Download an output dataset specified with from_work_dir from the remote server. **Parameters** - source : str + name : str Path in job's working_directory to find output in. working_directory : str Local working_directory for the job. output_path : str Full path to output dataset. """ - output = open(output_path, "wb") - name = os.path.basename(source) if action == 'transfer': - self.__raw_download_output(name, self.job_id, "work_dir", output) - elif action == 'copy': + self.__raw_download_output(name, self.job_id, "work_dir", output_path) + else: # Even if action is none - LWR has a different work_dir so this needs to be copied. lwr_path = self._output_path(name, self.job_id, 'work_dir')['path'] self._copy(lwr_path, output_path) @@ -286,13 +283,13 @@ shutil.copyfile(source, destination) -class InputCachingClient(Client): +class InputCachingJobClient(JobClient): """ Beta client that cache's staged files to prevent duplication. """ def __init__(self, destination_params, job_id, job_manager_interface, client_cacher): - super(InputCachingClient, self).__init__(destination_params, job_id, job_manager_interface) + super(InputCachingJobClient, self).__init__(destination_params, job_id, job_manager_interface) self.client_cacher = client_cacher @parseJson() diff -r 142006369d22f60a12c4573a782bbb30c58efbb9 -r e9c37e7632c66a555cfeabb3d1dfa30d8b621881 lib/galaxy/jobs/runners/lwr_client/manager.py --- a/lib/galaxy/jobs/runners/lwr_client/manager.py +++ b/lib/galaxy/jobs/runners/lwr_client/manager.py @@ -18,7 +18,8 @@ except ImportError: from galaxy.util import unicodify as text_type -from .client import Client, InputCachingClient +from .client import JobClient +from .client import InputCachingJobClient from .client import ObjectStoreClient from .transport import get_transport from .util import TransferEventManager @@ -51,11 +52,11 @@ if cache: log.info("Setting LWR client class to caching variant.") self.client_cacher = ClientCacher(**kwds) - self.client_class = InputCachingClient + self.client_class = InputCachingJobClient self.extra_client_kwds = {"client_cacher": self.client_cacher} else: log.info("Setting LWR client class to standard, non-caching variant.") - self.client_class = Client + self.client_class = JobClient self.extra_client_kwds = {} def get_client(self, destination_params, job_id): diff -r 142006369d22f60a12c4573a782bbb30c58efbb9 -r e9c37e7632c66a555cfeabb3d1dfa30d8b621881 lib/galaxy/jobs/runners/lwr_client/stager.py --- a/lib/galaxy/jobs/runners/lwr_client/stager.py +++ b/lib/galaxy/jobs/runners/lwr_client/stager.py @@ -1,6 +1,7 @@ from os.path import abspath, basename, join, exists from os import listdir, sep from re import findall +from re import compile from io import open from .action_mapper import FileActionMapper @@ -8,6 +9,12 @@ from logging import getLogger log = getLogger(__name__) +# All output files marked with from_work_dir attributes will copied or downloaded +# this pattern picks up attiditional files to copy back - such as those +# associated with multiple outputs and metadata configuration. Set to .* to just +# copy everything +COPY_FROM_WORKING_DIRECTORY_PATTERN = compile(r"primary_.*|galaxy.json|metadata_.*") + class JobInputs(object): """ @@ -157,35 +164,24 @@ **Parameters** - client : Client + client : JobClient LWR client object. - command_line : str - The local command line to execute, this will be rewritten for the remote server. - config_files : list - List of Galaxy 'configfile's produced for this job. These will be rewritten and sent to remote server. - input_files : list - List of input files used by job. These will be transferred and references rewritten. - output_files : list - List of output_files produced by job. - tool_dir : str - Directory containing tool to execute (if a wrapper is used, it will be transferred to remote server). - working_directory : str - Local path created by Galaxy for running this job. - + client_job_description : client_job_description + Description of client view of job to stage and execute remotely. """ - def __init__(self, client, tool, command_line, config_files, input_files, output_files, working_directory): + def __init__(self, client, client_job_description, job_config): """ """ self.client = client - self.command_line = command_line - self.config_files = config_files - self.input_files = input_files - self.output_files = output_files - self.tool_id = tool.id - self.tool_version = tool.version - self.tool_dir = abspath(tool.tool_dir) - self.working_directory = working_directory + self.command_line = client_job_description.command_line + self.config_files = client_job_description.config_files + self.input_files = client_job_description.input_files + self.output_files = client_job_description.output_files + self.tool_id = client_job_description.tool.id + self.tool_version = client_job_description.tool.version + self.tool_dir = abspath(client_job_description.tool.tool_dir) + self.working_directory = client_job_description.working_directory # Setup job inputs, these will need to be rewritten before # shipping off to remote LWR server. @@ -193,7 +189,7 @@ self.transfer_tracker = TransferTracker(client, self.job_inputs) - self.__handle_setup() + self.__handle_setup(job_config) self.__initialize_referenced_tool_files() self.__upload_tool_files() self.__upload_input_files() @@ -204,8 +200,9 @@ self.__handle_rewrites() self.__upload_rewritten_config_files() - def __handle_setup(self): - job_config = self.client.setup(self.tool_id, self.tool_version) + def __handle_setup(self, job_config): + if not job_config: + job_config = self.client.setup(self.tool_id, self.tool_version) self.new_working_directory = job_config['working_directory'] self.new_outputs_directory = job_config['outputs_directory'] @@ -297,32 +294,49 @@ return self.job_inputs.rewritten_command_line -def finish_job(client, cleanup_job, job_completed_normally, working_directory, work_dir_outputs, output_files): +def finish_job(client, cleanup_job, job_completed_normally, working_directory, work_dir_outputs, output_files, working_directory_contents=[]): """ """ action_mapper = FileActionMapper(client) download_failure_exceptions = [] + downloaded_working_directory_files = [] if job_completed_normally: + # Fetch explicit working directory outputs. for source_file, output_file in work_dir_outputs: + name = basename(source_file) try: action = action_mapper.action(output_file, 'output') - client.fetch_work_dir_output(source_file, working_directory, output_file, action[0]) + client.fetch_work_dir_output(name, working_directory, output_file, action[0]) + downloaded_working_directory_files.append(name) except Exception as e: download_failure_exceptions.append(e) # Remove from full output_files list so don't try to download directly. output_files.remove(output_file) + + # Fetch output files. for output_file in output_files: try: action = action_mapper.action(output_file, 'output') client.fetch_output(output_file, working_directory=working_directory, action=action[0]) except Exception as e: download_failure_exceptions.append(e) + + # Fetch remaining working directory outputs of interest. + for name in working_directory_contents: + if name in downloaded_working_directory_files: + continue + if COPY_FROM_WORKING_DIRECTORY_PATTERN.match(name): + output_file = join(working_directory, name) + action = action_mapper.action(output_file, 'output') + client.fetch_work_dir_output(name, working_directory, output_file, action=action[0]) + downloaded_working_directory_files.append(name) + return __clean(download_failure_exceptions, cleanup_job, client) def __clean(download_failure_exceptions, cleanup_job, client): failed = (len(download_failure_exceptions) > 0) - if not failed or cleanup_job == "always": + if (not failed and cleanup_job != "never") or cleanup_job == "always": try: client.clean() except: @@ -330,10 +344,10 @@ return failed -def submit_job(client, tool, command_line, config_files, input_files, output_files, working_directory): +def submit_job(client, client_job_description, job_config=None): """ """ - file_stager = FileStager(client, tool, command_line, config_files, input_files, output_files, working_directory) + file_stager = FileStager(client, client_job_description, job_config) rebuilt_command_line = file_stager.get_rewritten_command_line() job_id = file_stager.job_id client.launch(rebuilt_command_line) @@ -351,4 +365,33 @@ finally: input.close() -__all__ = [submit_job, finish_job] + +class ClientJobDescription(object): + """ A description of how client views job - command_line, inputs, etc.. + + **Parameters** + + command_line : str + The local command line to execute, this will be rewritten for the remote server. + config_files : list + List of Galaxy 'configfile's produced for this job. These will be rewritten and sent to remote server. + input_files : list + List of input files used by job. These will be transferred and references rewritten. + output_files : list + List of output_files produced by job. + tool_dir : str + Directory containing tool to execute (if a wrapper is used, it will be transferred to remote server). + working_directory : str + Local path created by Galaxy for running this job. + """ + + def __init__(self, tool, command_line, config_files, input_files, output_files, working_directory): + self.tool = tool + self.command_line = command_line + self.config_files = config_files + self.input_files = input_files + self.output_files = output_files + self.working_directory = working_directory + + +__all__ = [submit_job, ClientJobDescription, finish_job] diff -r 142006369d22f60a12c4573a782bbb30c58efbb9 -r e9c37e7632c66a555cfeabb3d1dfa30d8b621881 lib/galaxy/jobs/runners/lwr_client/transport/standard.py --- a/lib/galaxy/jobs/runners/lwr_client/transport/standard.py +++ b/lib/galaxy/jobs/runners/lwr_client/transport/standard.py @@ -2,6 +2,7 @@ LWR HTTP Client layer based on Python Standard Library (urllib2) """ from __future__ import with_statement +from os.path import getsize import mmap try: from urllib2 import urlopen @@ -23,8 +24,11 @@ input = None try: if input_path: - input = open(input_path, 'rb') - data = mmap.mmap(input.fileno(), 0, access=mmap.ACCESS_READ) + if getsize(input_path): + input = open(input_path, 'rb') + data = mmap.mmap(input.fileno(), 0, access=mmap.ACCESS_READ) + else: + data = b"" response = self._url_open(request, data) finally: if input: https://bitbucket.org/galaxy/galaxy-central/commits/36d360516828/ Changeset: 36d360516828 User: jmchilton Date: 2013-12-10 00:14:39 Summary: Allow command_factory.build_command to pass in metadata kwds. Add unit tests both for the old default parameters and the ability to replace these defaults. This is all in case one wants to run the set metadata command on a remote server with different galaxy path, output paths, etc.... Affected #: 2 files diff -r e9c37e7632c66a555cfeabb3d1dfa30d8b621881 -r 36d3605168283f888bf7f415a60805de11e0bd3e lib/galaxy/jobs/command_factory.py --- a/lib/galaxy/jobs/command_factory.py +++ b/lib/galaxy/jobs/command_factory.py @@ -2,7 +2,7 @@ from os.path import abspath -def build_command( job, job_wrapper, include_metadata=False, include_work_dir_outputs=True ): +def build_command( job, job_wrapper, include_metadata=False, include_work_dir_outputs=True, metadata_kwds={} ): """ Compose the sequence of commands necessary to execute a job. This will currently include: @@ -55,11 +55,15 @@ # Append metadata setting commands, we don't want to overwrite metadata # that was copied over in init_meta(), as per established behavior if include_metadata and job_wrapper.requires_setting_metadata: + exec_dir = metadata_kwds.get( 'exec_dir', abspath( getcwd() ) ) + tmp_dir = metadata_kwds.get( 'tmp_dir', job_wrapper.working_directory ) + dataset_files_path = metadata_kwds.get( 'dataset_files_path', job.app.model.Dataset.file_path ) + output_fnames = metadata_kwds.get( 'output_fnames', job_wrapper.get_output_fnames() ) metadata_command = job_wrapper.setup_external_metadata( - exec_dir=abspath( getcwd() ), - tmp_dir=job_wrapper.working_directory, - dataset_files_path=job.app.model.Dataset.file_path, - output_fnames=job_wrapper.get_output_fnames(), + exec_dir=exec_dir, + tmp_dir=tmp_dir, + dataset_files_path=dataset_files_path, + output_fnames=output_fnames, set_extension=False, kwds={ 'overwrite' : False } ) or '' diff -r e9c37e7632c66a555cfeabb3d1dfa30d8b621881 -r 36d3605168283f888bf7f415a60805de11e0bd3e test/unit/test_command_factory.py --- a/test/unit/test_command_factory.py +++ b/test/unit/test_command_factory.py @@ -5,13 +5,15 @@ from galaxy.util.bunch import Bunch MOCK_COMMAND_LINE = "/opt/galaxy/tools/bowtie /mnt/galaxyData/files/000/input000.dat" +TEST_METADATA_LINE = "set_metadata_and_stuff.sh" +TEST_FILES_PATH = "file_path" class TestCommandFactory(TestCase): def setUp(self): self.job_wrapper = MockJobWrapper() - self.job = Bunch(app=Bunch(model=Bunch(Dataset=Bunch(file_path="file_path")))) + self.job = Bunch(app=Bunch(model=Bunch(Dataset=Bunch(file_path=TEST_FILES_PATH)))) self.include_metadata = False self.include_work_dir_outputs = True @@ -40,9 +42,8 @@ def _test_set_metadata(self): self.include_metadata = True self.include_work_dir_outputs = False - metadata_line = "set_metadata_and_stuff.sh" - self.job_wrapper.metadata_line = metadata_line - expected_command = '%s; return_code=$?; cd %s; %s; sh -c "exit $return_code"' % (MOCK_COMMAND_LINE, getcwd(), metadata_line) + self.job_wrapper.metadata_line = TEST_METADATA_LINE + expected_command = '%s; return_code=$?; cd %s; %s; sh -c "exit $return_code"' % (MOCK_COMMAND_LINE, getcwd(), TEST_METADATA_LINE) self.__assert_command_is( expected_command ) def test_empty_metadata(self): @@ -55,18 +56,47 @@ # Empty metadata command do not touch command line. expected_command = '%s' % (MOCK_COMMAND_LINE) self.__assert_command_is( expected_command ) - - - def __assert_command_is(self, expected_command): - command = self.__command() + + def test_metadata_kwd_defaults(self): + configured_kwds = self.__set_metadata_with_kwds() + assert configured_kwds['exec_dir'] == getcwd() + assert configured_kwds['tmp_dir'] == self.job_wrapper.working_directory + assert configured_kwds['dataset_files_path'] == TEST_FILES_PATH + assert configured_kwds['output_fnames'] == ['output1'] + + def test_metadata_kwds_overrride(self): + configured_kwds = self.__set_metadata_with_kwds( + exec_dir="/path/to/remote/galaxy", + tmp_dir="/path/to/remote/staging/directory/job1", + dataset_files_path="/path/to/remote/datasets/", + output_fnames=['/path/to/remote_output1'], + ) + assert configured_kwds['exec_dir'] == "/path/to/remote/galaxy" + assert configured_kwds['tmp_dir'] == "/path/to/remote/staging/directory/job1" + assert configured_kwds['dataset_files_path'] == "/path/to/remote/datasets/" + assert configured_kwds['output_fnames'] == ['/path/to/remote_output1'] + + def __set_metadata_with_kwds(self, **kwds): + self.include_metadata = True + self.include_work_dir_outputs = False + self.job_wrapper.metadata_line = TEST_METADATA_LINE + if kwds: + self.__command(metadata_kwds=kwds) + else: + self.__command() + return self.job_wrapper.configured_external_metadata_kwds + + def __assert_command_is(self, expected_command, **command_kwds): + command = self.__command(**command_kwds) self.assertEqual(command, expected_command) - def __command(self): + def __command(self, **extra_kwds): kwds = dict( job=self.job, job_wrapper=self.job_wrapper, include_metadata=self.include_metadata, include_work_dir_outputs=self.include_work_dir_outputs, + **extra_kwds ) return build_command(**kwds) @@ -78,6 +108,7 @@ self.command_line = MOCK_COMMAND_LINE self.dependency_shell_commands = [] self.metadata_line = None + self.configured_external_metadata_kwds = None self.working_directory = "job1" def get_command_line(self): @@ -88,10 +119,11 @@ return self.metadata_line is not None def setup_external_metadata(self, *args, **kwds): + self.configured_external_metadata_kwds = kwds return self.metadata_line def get_output_fnames(self): - return [] + return ["output1"] class MockJob(object): https://bitbucket.org/galaxy/galaxy-central/commits/2be38856ea2d/ Changeset: 2be38856ea2d User: jmchilton Date: 2013-12-10 00:14:39 Summary: Eliminate non-specific exception catching in LWR runner. This is a Python anti-pattern that should probably be eliminated throughout Galaxy. Affected #: 1 file diff -r 36d3605168283f888bf7f415a60805de11e0bd3e -r 2be38856ea2d50f04931f3c7343630a5211385f4 lib/galaxy/jobs/runners/lwr.py --- a/lib/galaxy/jobs/runners/lwr.py +++ b/lib/galaxy/jobs/runners/lwr.py @@ -67,7 +67,7 @@ raise Exception('Error running file staging command: %s' % cmd) job_wrapper.prepare_input_files_cmds = None # prevent them from being used in-line command_line = self.build_command_line( job_wrapper, include_metadata=False, include_work_dir_outputs=False ) - except: + except Exception: job_wrapper.fail( "failure preparing job", exception=True ) log.exception("failure running job %d" % job_wrapper.job_id) return @@ -91,7 +91,7 @@ log.info("lwr job submitted with job_id %s" % job_id) job_wrapper.set_job_destination( job_destination, job_id ) job_wrapper.change_state( model.Job.states.QUEUED ) - except: + except Exception: job_wrapper.fail( "failure running job", exception=True ) log.exception("failure running job %d" % job_wrapper.job_id) return @@ -152,7 +152,7 @@ if failed: job_wrapper.fail("Failed to find or download one or more job outputs from remote server.", exception=True) - except: + except Exception: message = "Failed to communicate with remote job server." job_wrapper.fail( message, exception=True ) log.exception("failure running job %d" % job_wrapper.job_id) @@ -161,7 +161,7 @@ # Finish the job try: job_wrapper.finish( stdout, stderr ) - except: + except Exception: log.exception("Job wrapper finish method failed") job_wrapper.fail("Unable to finish job", exception=True) https://bitbucket.org/galaxy/galaxy-central/commits/0ed67d508ba3/ Changeset: 0ed67d508ba3 User: jmchilton Date: 2013-12-10 00:14:39 Summary: Refactor queue_job in LWR job runner to more managable pieces. Affected #: 1 file diff -r 2be38856ea2d50f04931f3c7343630a5211385f4 -r 0ed67d508ba30090453b793bee5283b6a87c9c6d lib/galaxy/jobs/runners/lwr.py --- a/lib/galaxy/jobs/runners/lwr.py +++ b/lib/galaxy/jobs/runners/lwr.py @@ -55,30 +55,14 @@ return job_state def queue_job(self, job_wrapper): - command_line = '' job_destination = job_wrapper.job_destination - try: - job_wrapper.prepare() - if hasattr(job_wrapper, 'prepare_input_files_cmds') and job_wrapper.prepare_input_files_cmds is not None: - for cmd in job_wrapper.prepare_input_files_cmds: # run the commands to stage the input files - #log.debug( 'executing: %s' % cmd ) - if 0 != os.system(cmd): - raise Exception('Error running file staging command: %s' % cmd) - job_wrapper.prepare_input_files_cmds = None # prevent them from being used in-line - command_line = self.build_command_line( job_wrapper, include_metadata=False, include_work_dir_outputs=False ) - except Exception: - job_wrapper.fail( "failure preparing job", exception=True ) - log.exception("failure running job %d" % job_wrapper.job_id) - return + command_line, client = self.__prepare_job( job_wrapper, job_destination ) - # If we were able to get a command line, run the job if not command_line: - job_wrapper.finish( '', '' ) return try: - client = self.get_client_from_wrapper(job_wrapper) client_job_description = ClientJobDescription( command_line=command_line, output_files=self.get_output_files(job_wrapper), @@ -104,6 +88,37 @@ lwr_job_state.job_destination = job_destination self.monitor_job(lwr_job_state) + def __prepare_job(self, job_wrapper, job_destination): + """ Build command-line and LWR client for this job. """ + command_line = None + client = None + + try: + job_wrapper.prepare() + self.__prepare_input_files_locally(job_wrapper) + command_line = self.build_command_line( job_wrapper, include_metadata=False, include_work_dir_outputs=False ) + client = self.get_client_from_wrapper(job_wrapper) + except Exception: + job_wrapper.fail( "failure preparing job", exception=True ) + log.exception("failure running job %d" % job_wrapper.job_id) + return + + # If we were able to get a command line, run the job + if not command_line: + job_wrapper.finish( '', '' ) + return + + return command_line, client + + def __prepare_input_files_locally(self, job_wrapper): + """Run task splitting commands locally.""" + prepare_input_files_cmds = getattr(job_wrapper, 'prepare_input_files_cmds', None) + if prepare_input_files_cmds is not None: + for cmd in prepare_input_files_cmds: # run the commands to stage the input files + if 0 != os.system(cmd): + raise Exception('Error running file staging command: %s' % cmd) + job_wrapper.prepare_input_files_cmds = None # prevent them from being used in-line + def get_output_files(self, job_wrapper): output_fnames = job_wrapper.get_output_fnames() return [ str( o ) for o in output_fnames ] https://bitbucket.org/galaxy/galaxy-central/commits/292d68d7e6a1/ Changeset: 292d68d7e6a1 User: jmchilton Date: 2013-12-10 00:14:39 Summary: Allow setting metadata remotely with the LWR. Must be running against very new LWR server which has been configured with a GALAXY_HOME variable. The LWR job runner is reorganized to fetch the remote job config earlier and the LWR passes more information during this setup phase (namely galaxy_home - that is used when building the metadata setting command). There is a new destination parameter - use_remote_datatypes. This can be true or false - defaulting to false. When false - the local Galaxy integrated datatypes will be sent over to the remote LWR server and used in conjuction with the galaxy server configured remotely. When true - the remote LWR job manager should be configured with a 'galaxy_datatypes_config_file' parameters and this datatypes config file will be used when setting metadata externally. Since both options are subtley broken because there is no guarentee the remote and local Galaxy instances have the same datatypes available - I guess I would personally recommend continuing to set metadata on the galaxy server in conjunction with the LWR. Affected #: 3 files diff -r 0ed67d508ba30090453b793bee5283b6a87c9c6d -r 292d68d7e6a101b4a8df06a9f09aa3d16403e14e lib/galaxy/jobs/command_factory.py --- a/lib/galaxy/jobs/command_factory.py +++ b/lib/galaxy/jobs/command_factory.py @@ -2,7 +2,7 @@ from os.path import abspath -def build_command( job, job_wrapper, include_metadata=False, include_work_dir_outputs=True, metadata_kwds={} ): +def build_command( job, job_wrapper, include_metadata=False, include_work_dir_outputs=True, metadata_kwds={}, job_working_directory=None ): """ Compose the sequence of commands necessary to execute a job. This will currently include: @@ -43,7 +43,7 @@ # Append commands to copy job outputs based on from_work_dir attribute. if include_work_dir_outputs: - work_dir_outputs = job.get_work_dir_outputs( job_wrapper ) + work_dir_outputs = job.get_work_dir_outputs( job_wrapper, job_working_directory=job_working_directory ) if work_dir_outputs: if not captured_return_code: commands += capture_return_code_command @@ -59,12 +59,18 @@ tmp_dir = metadata_kwds.get( 'tmp_dir', job_wrapper.working_directory ) dataset_files_path = metadata_kwds.get( 'dataset_files_path', job.app.model.Dataset.file_path ) output_fnames = metadata_kwds.get( 'output_fnames', job_wrapper.get_output_fnames() ) + config_root = metadata_kwds.get( 'config_root', None ) + config_file = metadata_kwds.get( 'config_file', None ) + datatypes_config = metadata_kwds.get( 'datatypes_config', None ) metadata_command = job_wrapper.setup_external_metadata( exec_dir=exec_dir, tmp_dir=tmp_dir, dataset_files_path=dataset_files_path, output_fnames=output_fnames, set_extension=False, + config_root=config_root, + config_file=config_file, + datatypes_config=datatypes_config, kwds={ 'overwrite' : False } ) or '' metadata_command = metadata_command.strip() @@ -72,7 +78,7 @@ if not captured_return_code: commands += capture_return_code_command captured_return_code = True - commands += "; cd %s; %s" % (abspath( getcwd() ), metadata_command) + commands += "; cd %s; %s" % (exec_dir, metadata_command) if captured_return_code: commands += '; sh -c "exit $return_code"' diff -r 0ed67d508ba30090453b793bee5283b6a87c9c6d -r 292d68d7e6a101b4a8df06a9f09aa3d16403e14e lib/galaxy/jobs/runners/__init__.py --- a/lib/galaxy/jobs/runners/__init__.py +++ b/lib/galaxy/jobs/runners/__init__.py @@ -146,11 +146,13 @@ def build_command_line( self, job_wrapper, include_metadata=False, include_work_dir_outputs=True ): return build_command( self, job_wrapper, include_metadata=include_metadata, include_work_dir_outputs=include_work_dir_outputs ) - def get_work_dir_outputs( self, job_wrapper ): + def get_work_dir_outputs( self, job_wrapper, job_working_directory=None ): """ Returns list of pairs (source_file, destination) describing path to work_dir output file and ultimate destination. """ + if not job_working_directory: + job_working_directory = os.path.abspath( job_wrapper.working_directory ) def in_directory( file, directory ): """ @@ -186,7 +188,7 @@ if hda_tool_output and hda_tool_output.from_work_dir: # Copy from working dir to HDA. # TODO: move instead of copy to save time? - source_file = os.path.join( os.path.abspath( job_wrapper.working_directory ), hda_tool_output.from_work_dir ) + source_file = os.path.join( job_working_directory, hda_tool_output.from_work_dir ) destination = job_wrapper.get_output_destination( output_paths[ dataset.dataset_id ] ) if in_directory( source_file, job_wrapper.working_directory ): output_pairs.append( ( source_file, destination ) ) diff -r 0ed67d508ba30090453b793bee5283b6a87c9c6d -r 292d68d7e6a101b4a8df06a9f09aa3d16403e14e lib/galaxy/jobs/runners/lwr.py --- a/lib/galaxy/jobs/runners/lwr.py +++ b/lib/galaxy/jobs/runners/lwr.py @@ -3,7 +3,9 @@ from galaxy import model from galaxy.jobs.runners import AsynchronousJobState, AsynchronousJobRunner from galaxy.jobs import JobDestination +from galaxy.jobs.command_factory import build_command from galaxy.util import string_as_bool_or_none +from galaxy.util.bunch import Bunch import errno from time import sleep @@ -18,6 +20,9 @@ __all__ = [ 'LwrJobRunner' ] +NO_REMOTE_GALAXY_FOR_METADATA_MESSAGE = "LWR misconfiguration - LWR client configured to set metadata remotely, but remote LWR isn't properly configured with a galaxy_home directory." +NO_REMOTE_DATATYPES_CONFIG = "LWR client is configured to use remote datatypes configuration when setting metadata externally, but LWR is not configured with this information. Defaulting to datatypes_conf.xml." + class LwrJobRunner( AsynchronousJobRunner ): """ @@ -57,7 +62,7 @@ def queue_job(self, job_wrapper): job_destination = job_wrapper.job_destination - command_line, client = self.__prepare_job( job_wrapper, job_destination ) + command_line, client, remote_job_config = self.__prepare_job( job_wrapper, job_destination ) if not command_line: return @@ -71,7 +76,7 @@ tool=job_wrapper.tool, config_files=job_wrapper.extra_filenames, ) - job_id = lwr_submit_job(client, client_job_description) + job_id = lwr_submit_job(client, client_job_description, remote_job_config) log.info("lwr job submitted with job_id %s" % job_id) job_wrapper.set_job_destination( job_destination, job_id ) job_wrapper.change_state( model.Job.states.QUEUED ) @@ -92,23 +97,33 @@ """ Build command-line and LWR client for this job. """ command_line = None client = None - + remote_job_config = None try: job_wrapper.prepare() self.__prepare_input_files_locally(job_wrapper) - command_line = self.build_command_line( job_wrapper, include_metadata=False, include_work_dir_outputs=False ) client = self.get_client_from_wrapper(job_wrapper) + tool = job_wrapper.tool + remote_job_config = client.setup(tool.id, tool.version) + remote_metadata = LwrJobRunner.__remote_metadata( client ) + remote_work_dir_copy = LwrJobRunner.__remote_work_dir_copy( client ) + metadata_kwds = self.__build_metadata_configuration(client, job_wrapper, remote_metadata, remote_job_config) + command_line = build_command( + self, + job_wrapper=job_wrapper, + include_metadata=remote_metadata, + include_work_dir_outputs=remote_work_dir_copy, + metadata_kwds=metadata_kwds, + job_working_directory=remote_job_config['working_directory'] + ) except Exception: job_wrapper.fail( "failure preparing job", exception=True ) log.exception("failure running job %d" % job_wrapper.job_id) - return # If we were able to get a command line, run the job if not command_line: job_wrapper.finish( '', '' ) - return - return command_line, client + return command_line, client, remote_job_config def __prepare_input_files_locally(self, job_wrapper): """Run task splitting commands locally.""" @@ -149,20 +164,27 @@ run_results = client.raw_check_complete() stdout = run_results.get('stdout', '') stderr = run_results.get('stderr', '') - + working_directory_contents = run_results.get('working_directory_contents', []) # Use LWR client code to transfer/copy files back # and cleanup job if needed. completed_normally = \ job_wrapper.get_state() not in [ model.Job.states.ERROR, model.Job.states.DELETED ] cleanup_job = self.app.config.cleanup_job - work_dir_outputs = self.get_work_dir_outputs( job_wrapper ) + remote_work_dir_copy = LwrJobRunner.__remote_work_dir_copy( client ) + if not remote_work_dir_copy: + work_dir_outputs = self.get_work_dir_outputs( job_wrapper ) + else: + # They have already been copied over to look like regular outputs remotely, + # no need to handle them differently here. + work_dir_outputs = [] output_files = self.get_output_files( job_wrapper ) finish_args = dict( client=client, working_directory=job_wrapper.working_directory, job_completed_normally=completed_normally, cleanup_job=cleanup_job, work_dir_outputs=work_dir_outputs, - output_files=output_files ) + output_files=output_files, + working_directory_contents=working_directory_contents ) failed = lwr_finish_job( **finish_args ) if failed: @@ -172,7 +194,8 @@ job_wrapper.fail( message, exception=True ) log.exception("failure running job %d" % job_wrapper.job_id) return - self._handle_metadata_externally( job_wrapper ) + if not LwrJobRunner.__remote_metadata( client ): + self._handle_metadata_externally( job_wrapper ) # Finish the job try: job_wrapper.finish( stdout, stderr ) @@ -244,3 +267,64 @@ job_state.old_state = True job_state.running = state == model.Job.states.RUNNING self.monitor_queue.put( job_state ) + + @staticmethod + def __remote_metadata( lwr_client ): + remote_metadata = string_as_bool_or_none( lwr_client.destination_params.get( "remote_metadata", False ) ) + return remote_metadata + + @staticmethod + def __remote_work_dir_copy( lwr_client ): + # Right now remote metadata handling assumes from_work_dir outputs + # have been copied over before it runs. So do that remotely. This is + # not the default though because adding it to the command line is not + # cross-platform (no cp on Windows) and its un-needed work outside + # the context of metadata settting (just as easy to download from + # either place.) + return LwrJobRunner.__remote_metadata( lwr_client ) + + @staticmethod + def __use_remote_datatypes_conf( lwr_client ): + """ When setting remote metadata, use integrated datatypes from this + Galaxy instance or use the datatypes config configured via the remote + LWR. + + Both options are broken in different ways for same reason - datatypes + may not match. One can push the local datatypes config to the remote + server - but there is no guarentee these datatypes will be defined + there. Alternatively, one can use the remote datatype config - but + there is no guarentee that it will contain all the datatypes available + to this Galaxy. + """ + use_remote_datatypes = string_as_bool_or_none( lwr_client.destination_params.get( "use_remote_datatypes", False ) ) + return use_remote_datatypes + + def __build_metadata_configuration(self, client, job_wrapper, remote_metadata, remote_job_config): + metadata_kwds = {} + if remote_metadata: + remote_system_properties = remote_job_config.get("system_properties", {}) + remote_galaxy_home = remote_system_properties.get("galaxy_home", None) + if not remote_galaxy_home: + raise Exception(NO_REMOTE_GALAXY_FOR_METADATA_MESSAGE) + metadata_kwds['exec_dir'] = remote_galaxy_home + outputs_directory = remote_job_config['outputs_directory'] + configs_directory = remote_job_config['configs_directory'] + outputs = [Bunch(false_path=os.path.join(outputs_directory, os.path.basename(path)), real_path=path) for path in self.get_output_files(job_wrapper)] + metadata_kwds['output_fnames'] = outputs + metadata_kwds['config_root'] = remote_galaxy_home + default_config_file = os.path.join(remote_galaxy_home, 'universe_wsgi.ini') + metadata_kwds['config_file'] = remote_system_properties.get('galaxy_config_file', default_config_file) + metadata_kwds['dataset_files_path'] = remote_system_properties.get('galaxy_dataset_files_path', None) + if LwrJobRunner.__use_remote_datatypes_conf( client ): + remote_datatypes_config = remote_system_properties.get('galaxy_datatypes_config_file', None) + if not remote_datatypes_config: + log.warn(NO_REMOTE_DATATYPES_CONFIG) + remote_datatypes_config = os.path.join(remote_galaxy_home, 'datatypes_conf.xml') + metadata_kwds['datatypes_config'] = remote_datatypes_config + else: + integrates_datatypes_config = self.app.datatypes_registry.integrated_datatypes_configs + # Ensure this file gets pushed out to the remote config dir. + job_wrapper.extra_filenames.append(integrates_datatypes_config) + + metadata_kwds['datatypes_config'] = os.path.join(configs_directory, os.path.basename(integrates_datatypes_config)) + return metadata_kwds Repository URL: https://bitbucket.org/galaxy/galaxy-central/ -- This is a commit notification from bitbucket.org. You are receiving this because you have the service enabled, addressing the recipient of this email.
participants (1)
-
commits-noreply@bitbucket.org