commit/galaxy-central: 10 new changesets
10 new commits in galaxy-central: https://bitbucket.org/galaxy/galaxy-central/commits/4ebd2f3313b2/ Changeset: 4ebd2f3313b2 User: jmchilton Date: 2013-12-11 17:26:14 Summary: Reduce extra-module dependencies in galaxy.tools.deps. Eliminte older elementtree dependency. Eliminate dependency on galaxy.util:string_as_bool. Only remaining dependency in galaxy.util.submodules. PEP-8 fix up galaxy.util.submodules so it can be copied directly into the LWR. This allowed adding the whole galaxy.tools.deps module and submodules into LWR for remote dependency resolution. Affected #: 5 files diff -r 98694eecdf76e7d69c77a0743d2b65fcb1ae3c12 -r 4ebd2f3313b21ecac0d327055b452911551ec279 lib/galaxy/tools/deps/__init__.py --- a/lib/galaxy/tools/deps/__init__.py +++ b/lib/galaxy/tools/deps/__init__.py @@ -7,7 +7,7 @@ import logging log = logging.getLogger( __name__ ) -from galaxy.util import parse_xml +from xml.etree import ElementTree from .resolvers import INDETERMINATE_DEPENDENCY from .resolvers.galaxy_packages import GalaxyPackageDependencyResolver @@ -69,8 +69,8 @@ def __build_dependency_resolvers( self, conf_file ): if not conf_file or not os.path.exists( conf_file ): return self.__default_dependency_resolvers() - tree = parse_xml( conf_file ) - return self.__parse_resolver_conf_xml( tree ) + root = ElementTree.parse( conf_file ).getroot() + return self.__parse_resolver_conf_xml( root ) def __default_dependency_resolvers( self ): return [ @@ -78,14 +78,14 @@ GalaxyPackageDependencyResolver(self), ] - def __parse_resolver_conf_xml(self, tree): + def __parse_resolver_conf_xml(self, root): """ - :param tree: Object representing the root ``<dependency_resolvers>`` object in the file. - :type tree: ``xml.etree.ElementTree.Element`` + :param root: Object representing the root ``<dependency_resolvers>`` object in the file. + :type root: ``xml.etree.ElementTree.Element`` """ resolvers = [] - resolvers_element = tree.getroot() + resolvers_element = root for resolver_element in resolvers_element.getchildren(): resolver_type = resolver_element.tag resolver_kwds = dict(resolver_element.items()) diff -r 98694eecdf76e7d69c77a0743d2b65fcb1ae3c12 -r 4ebd2f3313b21ecac0d327055b452911551ec279 lib/galaxy/tools/deps/requirements.py --- a/lib/galaxy/tools/deps/requirements.py +++ b/lib/galaxy/tools/deps/requirements.py @@ -19,8 +19,7 @@ def parse_requirements_from_xml( xml_root ): """ - >>> from galaxy.util import parse_xml - >>> from elementtree import ElementTree + >>> from xml.etree import ElementTree >>> def load_requirements( contents ): ... contents_document = '''<tool><requirements>%s</requirements></tool>''' ... root = ElementTree.fromstring( contents_document % contents ) @@ -43,7 +42,7 @@ requirements_elem = xml_root.find( "requirements" ) requirement_elems = [] - if requirements_elem: + if requirements_elem is not None: requirement_elems = requirements_elem.findall( 'requirement' ) requirements = [] diff -r 98694eecdf76e7d69c77a0743d2b65fcb1ae3c12 -r 4ebd2f3313b21ecac0d327055b452911551ec279 lib/galaxy/tools/deps/resolvers/galaxy_packages.py --- a/lib/galaxy/tools/deps/resolvers/galaxy_packages.py +++ b/lib/galaxy/tools/deps/resolvers/galaxy_packages.py @@ -1,7 +1,6 @@ from os.path import join, islink, realpath, basename, exists, abspath from ..resolvers import DependencyResolver, INDETERMINATE_DEPENDENCY, Dependency -from galaxy.util import string_as_bool import logging log = logging.getLogger( __name__ ) @@ -16,7 +15,7 @@ ## the tool shed so allow a fallback version of the Galaxy package ## resolver that will just grab 'default' version of exact version ## unavailable. - self.versionless = string_as_bool(kwds.get('versionless', "false")) + self.versionless = str(kwds.get('versionless', "false")).lower() == "true" self.base_path = abspath( kwds.get('base_path', dependency_manager.default_base_path) ) def resolve( self, name, version, type, **kwds ): diff -r 98694eecdf76e7d69c77a0743d2b65fcb1ae3c12 -r 4ebd2f3313b21ecac0d327055b452911551ec279 lib/galaxy/tools/deps/resolvers/modules.py --- a/lib/galaxy/tools/deps/resolvers/modules.py +++ b/lib/galaxy/tools/deps/resolvers/modules.py @@ -12,7 +12,6 @@ from subprocess import Popen, PIPE from ..resolvers import DependencyResolver, INDETERMINATE_DEPENDENCY, Dependency -from galaxy.util import string_as_bool import logging log = logging.getLogger( __name__ ) @@ -28,9 +27,9 @@ resolver_type = "modules" def __init__(self, dependency_manager, **kwds): - self.versionless = string_as_bool(kwds.get('versionless', 'false')) + self.versionless = _string_as_bool(kwds.get('versionless', 'false')) find_by = kwds.get('find_by', 'avail') - prefetch = string_as_bool(kwds.get('prefetch', DEFAULT_MODULE_PREFETCH)) + prefetch = _string_as_bool(kwds.get('prefetch', DEFAULT_MODULE_PREFETCH)) self.modulecmd = kwds.get('modulecmd', DEFAULT_MODULECMD_PATH) if find_by == 'directory': modulepath = kwds.get('modulepath', self.__default_modulespath()) @@ -150,4 +149,8 @@ command = 'eval `%s sh load %s`' % (self.module_dependency_resolver.modulecmd, module_to_load) return command + +def _string_as_bool( value ): + return str( value ).lower() == "true" + __all__ = [ModuleDependencyResolver] diff -r 98694eecdf76e7d69c77a0743d2b65fcb1ae3c12 -r 4ebd2f3313b21ecac0d327055b452911551ec279 lib/galaxy/util/submodules.py --- a/lib/galaxy/util/submodules.py +++ b/lib/galaxy/util/submodules.py @@ -1,30 +1,31 @@ from os import listdir + import logging -log = logging.getLogger( __name__ ) +log = logging.getLogger(__name__) -def submodules( module ): - unsorted_submodule_names = __submodule_names( module ) - submodule_names = sorted( unsorted_submodule_names, reverse=True ) +def submodules(module): + unsorted_submodule_names = __submodule_names(module) + submodule_names = sorted(unsorted_submodule_names, reverse=True) submodules = [] for submodule_name in submodule_names: - full_submodule = "%s.%s" % ( module.__name__, submodule_name ) + full_submodule = "%s.%s" % (module.__name__, submodule_name) try: - __import__( full_submodule ) - submodule = getattr( module, submodule_name ) - submodules.append( submodule ) + __import__(full_submodule) + submodule = getattr(module, submodule_name) + submodules.append(submodule) except BaseException, exception: - exception_str = str( exception ) - message = "%s dynamic module could not be loaded: %s" % ( full_submodule, exception_str ) - log.debug( message ) + exception_str = str(exception) + message = "%s dynamic module could not be loaded: %s" % (full_submodule, exception_str) + log.debug(message) return submodules -def __submodule_names( module ): - module_dir = module.__path__[ 0 ] +def __submodule_names(module): + module_dir = module.__path__[0] names = [] - for fname in listdir( module_dir ): - if not( fname.startswith( "_" ) ) and fname.endswith( ".py" ): - submodule_name = fname[ :-len( ".py" ) ] - names.append( submodule_name ) + for fname in listdir(module_dir): + if not(fname.startswith("_")) and fname.endswith(".py"): + submodule_name = fname[:-len(".py")] + names.append(submodule_name) return names https://bitbucket.org/galaxy/galaxy-central/commits/34694461de2c/ Changeset: 34694461de2c User: jmchilton Date: 2013-12-11 17:26:15 Summary: Refactor command_factory:build_command to reduce parameter count. Grouping all parameters to tweak the building of this command line for remote servers (read LWR) - since I have at least one more to add. Affected #: 3 files diff -r 4ebd2f3313b21ecac0d327055b452911551ec279 -r 34694461de2c2863066aa24c7c0cee65837f29e5 lib/galaxy/jobs/command_factory.py --- a/lib/galaxy/jobs/command_factory.py +++ b/lib/galaxy/jobs/command_factory.py @@ -2,7 +2,7 @@ from os.path import abspath -def build_command( job, job_wrapper, include_metadata=False, include_work_dir_outputs=True, metadata_kwds={}, job_working_directory=None ): +def build_command( job, job_wrapper, include_metadata=False, include_work_dir_outputs=True, remote_command_params={} ): """ Compose the sequence of commands necessary to execute a job. This will currently include: @@ -43,7 +43,10 @@ # Append commands to copy job outputs based on from_work_dir attribute. if include_work_dir_outputs: - work_dir_outputs = job.get_work_dir_outputs( job_wrapper, job_working_directory=job_working_directory ) + work_dir_outputs_kwds = {} + if 'working_directory' in remote_command_params: + work_dir_outputs_kwds['job_working_directory'] = remote_command_params['working_directory'] + work_dir_outputs = job.get_work_dir_outputs( job_wrapper, **work_dir_outputs_kwds ) if work_dir_outputs: if not captured_return_code: commands += capture_return_code_command @@ -55,6 +58,7 @@ # Append metadata setting commands, we don't want to overwrite metadata # that was copied over in init_meta(), as per established behavior if include_metadata and job_wrapper.requires_setting_metadata: + metadata_kwds = remote_command_params.get('metadata_kwds', {}) exec_dir = metadata_kwds.get( 'exec_dir', abspath( getcwd() ) ) tmp_dir = metadata_kwds.get( 'tmp_dir', job_wrapper.working_directory ) dataset_files_path = metadata_kwds.get( 'dataset_files_path', job.app.model.Dataset.file_path ) diff -r 4ebd2f3313b21ecac0d327055b452911551ec279 -r 34694461de2c2863066aa24c7c0cee65837f29e5 lib/galaxy/jobs/runners/lwr.py --- a/lib/galaxy/jobs/runners/lwr.py +++ b/lib/galaxy/jobs/runners/lwr.py @@ -107,13 +107,16 @@ remote_metadata = LwrJobRunner.__remote_metadata( client ) remote_work_dir_copy = LwrJobRunner.__remote_work_dir_copy( client ) metadata_kwds = self.__build_metadata_configuration(client, job_wrapper, remote_metadata, remote_job_config) + remote_command_params = dict( + working_directory=remote_job_config['working_directory'], + metadata_kwds=metadata_kwds, + ) command_line = build_command( self, job_wrapper=job_wrapper, include_metadata=remote_metadata, include_work_dir_outputs=remote_work_dir_copy, - metadata_kwds=metadata_kwds, - job_working_directory=remote_job_config['working_directory'] + remote_command_params=remote_command_params, ) except Exception: job_wrapper.fail( "failure preparing job", exception=True ) diff -r 4ebd2f3313b21ecac0d327055b452911551ec279 -r 34694461de2c2863066aa24c7c0cee65837f29e5 test/unit/test_command_factory.py --- a/test/unit/test_command_factory.py +++ b/test/unit/test_command_factory.py @@ -81,7 +81,7 @@ self.include_work_dir_outputs = False self.job_wrapper.metadata_line = TEST_METADATA_LINE if kwds: - self.__command(metadata_kwds=kwds) + self.__command(remote_command_params=dict(metadata_kwds=kwds)) else: self.__command() return self.job_wrapper.configured_external_metadata_kwds https://bitbucket.org/galaxy/galaxy-central/commits/e94ac0f171cb/ Changeset: e94ac0f171cb User: jmchilton Date: 2013-12-11 17:26:15 Summary: Allow new dependency_resolution destination for LWR runner. It can be 'none', 'local', or 'remote'. At this point this allows one to disable command dependency injection if set to 'none' or 'remote'. Actually implementing logic for 'remote' resolution will follow. In subsequent changes - 'remote' will make backward-imcompatiable API calls to the LWR - this is why 'none' is also provided as an option - it will be backward compatiable way to disable dependency resolution for an LWR destination. Affected #: 3 files diff -r 34694461de2c2863066aa24c7c0cee65837f29e5 -r e94ac0f171cb91325747434d0dd62ea6e9ea9187 lib/galaxy/jobs/command_factory.py --- a/lib/galaxy/jobs/command_factory.py +++ b/lib/galaxy/jobs/command_factory.py @@ -31,8 +31,9 @@ if hasattr(job_wrapper, 'prepare_input_files_cmds') and job_wrapper.prepare_input_files_cmds is not None: commands = "; ".join( job_wrapper.prepare_input_files_cmds + [ commands ] ) + local_dependency_resolution = "dependency_resolution" not in remote_command_params or (remote_command_params["dependency_resolution"] == "local") # Prepend dependency injection - if job_wrapper.dependency_shell_commands: + if job_wrapper.dependency_shell_commands and local_dependency_resolution: commands = "; ".join( job_wrapper.dependency_shell_commands + [ commands ] ) # Coping work dir outputs or setting metadata will mask return code of diff -r 34694461de2c2863066aa24c7c0cee65837f29e5 -r e94ac0f171cb91325747434d0dd62ea6e9ea9187 lib/galaxy/jobs/runners/lwr.py --- a/lib/galaxy/jobs/runners/lwr.py +++ b/lib/galaxy/jobs/runners/lwr.py @@ -106,10 +106,12 @@ remote_job_config = client.setup(tool.id, tool.version) remote_metadata = LwrJobRunner.__remote_metadata( client ) remote_work_dir_copy = LwrJobRunner.__remote_work_dir_copy( client ) + dependency_resolution = LwrJobRunner.__dependency_resolution( client ) metadata_kwds = self.__build_metadata_configuration(client, job_wrapper, remote_metadata, remote_job_config) remote_command_params = dict( working_directory=remote_job_config['working_directory'], metadata_kwds=metadata_kwds, + dependency_resolution=dependency_resolution, ) command_line = build_command( self, @@ -272,6 +274,13 @@ self.monitor_queue.put( job_state ) @staticmethod + def __dependency_resolution( lwr_client ): + dependency_resolution = lwr_client.destination_params.get( "dependency_resolution", "local" ) + if dependency_resolution not in ["none", "local", "remote"]: + raise Exception("Unknown dependency_resolution value encountered %s" % dependency_resolution) + return dependency_resolution + + @staticmethod def __remote_metadata( lwr_client ): remote_metadata = string_as_bool_or_none( lwr_client.destination_params.get( "remote_metadata", False ) ) return remote_metadata diff -r 34694461de2c2863066aa24c7c0cee65837f29e5 -r e94ac0f171cb91325747434d0dd62ea6e9ea9187 test/unit/test_command_factory.py --- a/test/unit/test_command_factory.py +++ b/test/unit/test_command_factory.py @@ -27,6 +27,19 @@ self.job_wrapper.dependency_shell_commands = dep_commands self.__assert_command_is( "%s; %s" % (dep_commands[0], MOCK_COMMAND_LINE) ) + def test_remote_dependency_resolution(self): + self.include_work_dir_outputs = False + dep_commands = [". /opt/galaxy/tools/bowtie/default/env.sh"] + self.job_wrapper.dependency_shell_commands = dep_commands + self.__assert_command_is(MOCK_COMMAND_LINE, remote_command_params=dict(dependency_resolution="remote")) + + def test_explicit_local_dependency_resolution(self): + self.include_work_dir_outputs = False + dep_commands = [". /opt/galaxy/tools/bowtie/default/env.sh"] + self.job_wrapper.dependency_shell_commands = dep_commands + self.__assert_command_is("%s; %s" % (dep_commands[0], MOCK_COMMAND_LINE), + remote_command_params=dict(dependency_resolution="local")) + def test_set_metadata_skipped_if_unneeded(self): self.include_metadata = True self.include_work_dir_outputs = False https://bitbucket.org/galaxy/galaxy-central/commits/dc4e5f648da2/ Changeset: dc4e5f648da2 User: jmchilton Date: 2013-12-11 17:26:15 Summary: command_factory cleanup - fix very poorly named input parameter. Affected #: 2 files diff -r e94ac0f171cb91325747434d0dd62ea6e9ea9187 -r dc4e5f648da215f3a9da27fe5155d4dd2ebfbff0 lib/galaxy/jobs/command_factory.py --- a/lib/galaxy/jobs/command_factory.py +++ b/lib/galaxy/jobs/command_factory.py @@ -2,7 +2,7 @@ from os.path import abspath -def build_command( job, job_wrapper, include_metadata=False, include_work_dir_outputs=True, remote_command_params={} ): +def build_command( runner, job_wrapper, include_metadata=False, include_work_dir_outputs=True, remote_command_params={} ): """ Compose the sequence of commands necessary to execute a job. This will currently include: @@ -47,7 +47,7 @@ work_dir_outputs_kwds = {} if 'working_directory' in remote_command_params: work_dir_outputs_kwds['job_working_directory'] = remote_command_params['working_directory'] - work_dir_outputs = job.get_work_dir_outputs( job_wrapper, **work_dir_outputs_kwds ) + work_dir_outputs = runner.get_work_dir_outputs( job_wrapper, **work_dir_outputs_kwds ) if work_dir_outputs: if not captured_return_code: commands += capture_return_code_command @@ -62,7 +62,7 @@ metadata_kwds = remote_command_params.get('metadata_kwds', {}) exec_dir = metadata_kwds.get( 'exec_dir', abspath( getcwd() ) ) tmp_dir = metadata_kwds.get( 'tmp_dir', job_wrapper.working_directory ) - dataset_files_path = metadata_kwds.get( 'dataset_files_path', job.app.model.Dataset.file_path ) + dataset_files_path = metadata_kwds.get( 'dataset_files_path', runner.app.model.Dataset.file_path ) output_fnames = metadata_kwds.get( 'output_fnames', job_wrapper.get_output_fnames() ) config_root = metadata_kwds.get( 'config_root', None ) config_file = metadata_kwds.get( 'config_file', None ) diff -r e94ac0f171cb91325747434d0dd62ea6e9ea9187 -r dc4e5f648da215f3a9da27fe5155d4dd2ebfbff0 test/unit/test_command_factory.py --- a/test/unit/test_command_factory.py +++ b/test/unit/test_command_factory.py @@ -13,7 +13,7 @@ def setUp(self): self.job_wrapper = MockJobWrapper() - self.job = Bunch(app=Bunch(model=Bunch(Dataset=Bunch(file_path=TEST_FILES_PATH)))) + self.runner = Bunch(app=Bunch(model=Bunch(Dataset=Bunch(file_path=TEST_FILES_PATH)))) self.include_metadata = False self.include_work_dir_outputs = True @@ -105,7 +105,7 @@ def __command(self, **extra_kwds): kwds = dict( - job=self.job, + runner=self.runner, job_wrapper=self.job_wrapper, include_metadata=self.include_metadata, include_work_dir_outputs=self.include_work_dir_outputs, @@ -137,7 +137,3 @@ def get_output_fnames(self): return ["output1"] - - -class MockJob(object): - app = Bunch() https://bitbucket.org/galaxy/galaxy-central/commits/11888fd788bf/ Changeset: 11888fd788bf User: jmchilton Date: 2013-12-11 17:26:15 Summary: Add more unit tests for command_factory. In particular for for task splitting commands and copying work dir outputs. Affected #: 1 file diff -r dc4e5f648da215f3a9da27fe5155d4dd2ebfbff0 -r 11888fd788bf4364acef861f545c4e27af05d6a8 test/unit/test_command_factory.py --- a/test/unit/test_command_factory.py +++ b/test/unit/test_command_factory.py @@ -13,7 +13,13 @@ def setUp(self): self.job_wrapper = MockJobWrapper() - self.runner = Bunch(app=Bunch(model=Bunch(Dataset=Bunch(file_path=TEST_FILES_PATH)))) + self.workdir_outputs = [] + + def workdir_outputs(job_wrapper, **kwds): + assert job_wrapper == self.job_wrapper + return self.workdir_outputs + + self.runner = Bunch(app=Bunch(model=Bunch(Dataset=Bunch(file_path=TEST_FILES_PATH))), get_work_dir_outputs=workdir_outputs) self.include_metadata = False self.include_work_dir_outputs = True @@ -40,6 +46,16 @@ self.__assert_command_is("%s; %s" % (dep_commands[0], MOCK_COMMAND_LINE), remote_command_params=dict(dependency_resolution="local")) + def test_task_prepare_inputs(self): + self.include_work_dir_outputs = False + self.job_wrapper.prepare_input_files_cmds = ["/opt/split1", "/opt/split2"] + self.__assert_command_is( "/opt/split1; /opt/split2; %s" % MOCK_COMMAND_LINE ) + + def test_workdir_outputs(self): + self.include_work_dir_outputs = True + self.workdir_outputs = [("foo", "bar")] + self.__assert_command_is( '%s; return_code=$?; if [ -f foo ] ; then cp foo bar ; fi; sh -c "exit $return_code"' % MOCK_COMMAND_LINE ) + def test_set_metadata_skipped_if_unneeded(self): self.include_metadata = True self.include_work_dir_outputs = False @@ -123,6 +139,7 @@ self.metadata_line = None self.configured_external_metadata_kwds = None self.working_directory = "job1" + self.prepare_input_files_cmds = None def get_command_line(self): return self.command_line https://bitbucket.org/galaxy/galaxy-central/commits/ef3e337d1fc7/ Changeset: ef3e337d1fc7 User: jmchilton Date: 2013-12-11 17:26:15 Summary: command_factory cleanup - introduce abstraction for more readable command construction logic. Affected #: 1 file diff -r 11888fd788bf4364acef861f545c4e27af05d6a8 -r ef3e337d1fc7d4ee61bcf91e4ca50446eb087d6a lib/galaxy/jobs/command_factory.py --- a/lib/galaxy/jobs/command_factory.py +++ b/lib/galaxy/jobs/command_factory.py @@ -13,34 +13,25 @@ - commands to set metadata (if include_metadata is True) """ - commands = job_wrapper.get_command_line() + commands_builder = CommandsBuilder(job_wrapper.get_command_line()) # All job runners currently handle this case which should never occur - if not commands: + if not commands_builder.commands: return None - # Remove trailing semi-colon so we can start hacking up this command. - # TODO: Refactor to compose a list and join with ';', would be more clean. - commands = commands.rstrip("; ") - # Prepend version string if job_wrapper.version_string_cmd: - commands = "%s &> %s; " % ( job_wrapper.version_string_cmd, job_wrapper.get_version_string_path() ) + commands + version_command = "%s &> %s" % ( job_wrapper.version_string_cmd, job_wrapper.get_version_string_path() ) + commands_builder.prepend_command(version_command) # prepend getting input files (if defined) if hasattr(job_wrapper, 'prepare_input_files_cmds') and job_wrapper.prepare_input_files_cmds is not None: - commands = "; ".join( job_wrapper.prepare_input_files_cmds + [ commands ] ) + commands_builder.prepend_commands(job_wrapper.prepare_input_files_cmds) local_dependency_resolution = "dependency_resolution" not in remote_command_params or (remote_command_params["dependency_resolution"] == "local") # Prepend dependency injection if job_wrapper.dependency_shell_commands and local_dependency_resolution: - commands = "; ".join( job_wrapper.dependency_shell_commands + [ commands ] ) - - # Coping work dir outputs or setting metadata will mask return code of - # tool command. If these are used capture the return code and ensure - # the last thing that happens is an exit with return code. - capture_return_code_command = "; return_code=$?" - captured_return_code = False + commands_builder.prepend_commands(job_wrapper.dependency_shell_commands) # Append commands to copy job outputs based on from_work_dir attribute. if include_work_dir_outputs: @@ -49,12 +40,9 @@ work_dir_outputs_kwds['job_working_directory'] = remote_command_params['working_directory'] work_dir_outputs = runner.get_work_dir_outputs( job_wrapper, **work_dir_outputs_kwds ) if work_dir_outputs: - if not captured_return_code: - commands += capture_return_code_command - captured_return_code = True - - commands += "; " + "; ".join( [ "if [ -f %s ] ; then cp %s %s ; fi" % - ( source_file, source_file, destination ) for ( source_file, destination ) in work_dir_outputs ] ) + commands_builder.capture_return_code() + commands_builder.append_commands([ "if [ -f %s ] ; then cp %s %s ; fi" % + ( source_file, source_file, destination ) for ( source_file, destination ) in work_dir_outputs ]) # Append metadata setting commands, we don't want to overwrite metadata # that was copied over in init_meta(), as per established behavior @@ -80,12 +68,44 @@ ) or '' metadata_command = metadata_command.strip() if metadata_command: - if not captured_return_code: - commands += capture_return_code_command - captured_return_code = True - commands += "; cd %s; %s" % (exec_dir, metadata_command) + commands_builder.capture_return_code() + commands_builder.append_command("cd %s; %s" % (exec_dir, metadata_command)) - if captured_return_code: - commands += '; sh -c "exit $return_code"' + return commands_builder.build() - return commands + +class CommandsBuilder(object): + + def __init__(self, initial_command): + # Remove trailing semi-colon so we can start hacking up this command. + # TODO: Refactor to compose a list and join with ';', would be more clean. + commands = initial_command.rstrip("; ") + self.commands = commands + + # Coping work dir outputs or setting metadata will mask return code of + # tool command. If these are used capture the return code and ensure + # the last thing that happens is an exit with return code. + self.return_code_captured = False + + def prepend_command(self, command): + self.commands = "%s; %s" % (command, self.commands) + return self + + def prepend_commands(self, commands): + return self.prepend_command("; ".join(commands)) + + def append_command(self, command): + self.commands = "%s; %s" % (self.commands, command) + + def append_commands(self, commands): + self.append_command("; ".join(commands)) + + def capture_return_code(self): + if not self.return_code_captured: + self.return_code_captured = True + self.append_command("return_code=$?") + + def build(self): + if self.return_code_captured: + self.append_command('sh -c "exit $return_code"') + return self.commands https://bitbucket.org/galaxy/galaxy-central/commits/188df80a411c/ Changeset: 188df80a411c User: jmchilton Date: 2013-12-11 17:26:15 Summary: command_factory cleanup - split big build_command method into smaller well-named pieces. Affected #: 1 file diff -r ef3e337d1fc7d4ee61bcf91e4ca50446eb087d6a -r 188df80a411c01272d03e7143d2afa343d575f81 lib/galaxy/jobs/command_factory.py --- a/lib/galaxy/jobs/command_factory.py +++ b/lib/galaxy/jobs/command_factory.py @@ -19,59 +19,77 @@ if not commands_builder.commands: return None + __handle_version_command(commands_builder, job_wrapper) + __handle_task_splitting(commands_builder, job_wrapper) + __handle_dependency_resolution(commands_builder, job_wrapper, remote_command_params) + + if include_work_dir_outputs: + __handle_work_dir_outputs(commands_builder, job_wrapper, runner, remote_command_params) + + if include_metadata and job_wrapper.requires_setting_metadata: + __handle_metadata(commands_builder, job_wrapper, runner, remote_command_params) + + return commands_builder.build() + + +def __handle_version_command(commands_builder, job_wrapper): # Prepend version string if job_wrapper.version_string_cmd: version_command = "%s &> %s" % ( job_wrapper.version_string_cmd, job_wrapper.get_version_string_path() ) commands_builder.prepend_command(version_command) + +def __handle_task_splitting(commands_builder, job_wrapper): # prepend getting input files (if defined) - if hasattr(job_wrapper, 'prepare_input_files_cmds') and job_wrapper.prepare_input_files_cmds is not None: + if getattr(job_wrapper, 'prepare_input_files_cmds', None): commands_builder.prepend_commands(job_wrapper.prepare_input_files_cmds) + +def __handle_dependency_resolution(commands_builder, job_wrapper, remote_command_params): local_dependency_resolution = "dependency_resolution" not in remote_command_params or (remote_command_params["dependency_resolution"] == "local") # Prepend dependency injection if job_wrapper.dependency_shell_commands and local_dependency_resolution: commands_builder.prepend_commands(job_wrapper.dependency_shell_commands) + +def __handle_work_dir_outputs(commands_builder, job_wrapper, runner, remote_command_params): # Append commands to copy job outputs based on from_work_dir attribute. - if include_work_dir_outputs: - work_dir_outputs_kwds = {} - if 'working_directory' in remote_command_params: - work_dir_outputs_kwds['job_working_directory'] = remote_command_params['working_directory'] - work_dir_outputs = runner.get_work_dir_outputs( job_wrapper, **work_dir_outputs_kwds ) - if work_dir_outputs: - commands_builder.capture_return_code() - commands_builder.append_commands([ "if [ -f %s ] ; then cp %s %s ; fi" % - ( source_file, source_file, destination ) for ( source_file, destination ) in work_dir_outputs ]) + work_dir_outputs_kwds = {} + if 'working_directory' in remote_command_params: + work_dir_outputs_kwds['job_working_directory'] = remote_command_params['working_directory'] + work_dir_outputs = runner.get_work_dir_outputs( job_wrapper, **work_dir_outputs_kwds ) + if work_dir_outputs: + commands_builder.capture_return_code() + commands_builder.append_commands([ "if [ -f %s ] ; then cp %s %s ; fi" % + ( source_file, source_file, destination ) for ( source_file, destination ) in work_dir_outputs ]) + +def __handle_metadata(commands_builder, job_wrapper, runner, remote_command_params): # Append metadata setting commands, we don't want to overwrite metadata # that was copied over in init_meta(), as per established behavior - if include_metadata and job_wrapper.requires_setting_metadata: - metadata_kwds = remote_command_params.get('metadata_kwds', {}) - exec_dir = metadata_kwds.get( 'exec_dir', abspath( getcwd() ) ) - tmp_dir = metadata_kwds.get( 'tmp_dir', job_wrapper.working_directory ) - dataset_files_path = metadata_kwds.get( 'dataset_files_path', runner.app.model.Dataset.file_path ) - output_fnames = metadata_kwds.get( 'output_fnames', job_wrapper.get_output_fnames() ) - config_root = metadata_kwds.get( 'config_root', None ) - config_file = metadata_kwds.get( 'config_file', None ) - datatypes_config = metadata_kwds.get( 'datatypes_config', None ) - metadata_command = job_wrapper.setup_external_metadata( - exec_dir=exec_dir, - tmp_dir=tmp_dir, - dataset_files_path=dataset_files_path, - output_fnames=output_fnames, - set_extension=False, - config_root=config_root, - config_file=config_file, - datatypes_config=datatypes_config, - kwds={ 'overwrite' : False } - ) or '' - metadata_command = metadata_command.strip() - if metadata_command: - commands_builder.capture_return_code() - commands_builder.append_command("cd %s; %s" % (exec_dir, metadata_command)) - - return commands_builder.build() + metadata_kwds = remote_command_params.get('metadata_kwds', {}) + exec_dir = metadata_kwds.get( 'exec_dir', abspath( getcwd() ) ) + tmp_dir = metadata_kwds.get( 'tmp_dir', job_wrapper.working_directory ) + dataset_files_path = metadata_kwds.get( 'dataset_files_path', runner.app.model.Dataset.file_path ) + output_fnames = metadata_kwds.get( 'output_fnames', job_wrapper.get_output_fnames() ) + config_root = metadata_kwds.get( 'config_root', None ) + config_file = metadata_kwds.get( 'config_file', None ) + datatypes_config = metadata_kwds.get( 'datatypes_config', None ) + metadata_command = job_wrapper.setup_external_metadata( + exec_dir=exec_dir, + tmp_dir=tmp_dir, + dataset_files_path=dataset_files_path, + output_fnames=output_fnames, + set_extension=False, + config_root=config_root, + config_file=config_file, + datatypes_config=datatypes_config, + kwds={ 'overwrite' : False } + ) or '' + metadata_command = metadata_command.strip() + if metadata_command: + commands_builder.capture_return_code() + commands_builder.append_command("cd %s; %s" % (exec_dir, metadata_command)) class CommandsBuilder(object): https://bitbucket.org/galaxy/galaxy-central/commits/b513c61601ef/ Changeset: b513c61601ef User: jmchilton Date: 2013-12-11 17:26:15 Summary: command_factory cleanup - a couple more small tweaks for readability. Affected #: 1 file diff -r 188df80a411c01272d03e7143d2afa343d575f81 -r b513c61601ef6bec6713320cd4e23fc4e020359e lib/galaxy/jobs/command_factory.py --- a/lib/galaxy/jobs/command_factory.py +++ b/lib/galaxy/jobs/command_factory.py @@ -1,6 +1,9 @@ from os import getcwd from os.path import abspath +CAPTURE_RETURN_CODE = "return_code=$?" +YIELD_CAPTURED_CODE = 'sh -c "exit $return_code"' + def build_command( runner, job_wrapper, include_metadata=False, include_work_dir_outputs=True, remote_command_params={} ): """ @@ -46,7 +49,8 @@ def __handle_dependency_resolution(commands_builder, job_wrapper, remote_command_params): - local_dependency_resolution = "dependency_resolution" not in remote_command_params or (remote_command_params["dependency_resolution"] == "local") + local_dependency_resolution = remote_command_params.get("dependency_resolution", "local") == "local" + # Prepend dependency injection if job_wrapper.dependency_shell_commands and local_dependency_resolution: commands_builder.prepend_commands(job_wrapper.dependency_shell_commands) @@ -60,8 +64,8 @@ work_dir_outputs = runner.get_work_dir_outputs( job_wrapper, **work_dir_outputs_kwds ) if work_dir_outputs: commands_builder.capture_return_code() - commands_builder.append_commands([ "if [ -f %s ] ; then cp %s %s ; fi" % - ( source_file, source_file, destination ) for ( source_file, destination ) in work_dir_outputs ]) + copy_commands = map(__copy_if_exists_command, work_dir_outputs) + commands_builder.append_commands(copy_commands) def __handle_metadata(commands_builder, job_wrapper, runner, remote_command_params): @@ -92,6 +96,11 @@ commands_builder.append_command("cd %s; %s" % (exec_dir, metadata_command)) +def __copy_if_exists_command(work_dir_output): + source_file, destination = work_dir_output + return "if [ -f %s ] ; then cp %s %s ; fi" % ( source_file, source_file, destination ) + + class CommandsBuilder(object): def __init__(self, initial_command): @@ -121,9 +130,11 @@ def capture_return_code(self): if not self.return_code_captured: self.return_code_captured = True - self.append_command("return_code=$?") + self.append_command(CAPTURE_RETURN_CODE) def build(self): if self.return_code_captured: - self.append_command('sh -c "exit $return_code"') + self.append_command(YIELD_CAPTURED_CODE) return self.commands + +__all__ = [build_command] https://bitbucket.org/galaxy/galaxy-central/commits/3a7dee0e35b7/ Changeset: 3a7dee0e35b7 User: jmchilton Date: 2013-12-11 17:26:15 Summary: Implement 'remote' dependency_resolution for LWR. Set LWR destination parameter 'dependency_resolution' to remote. This will disable all dependency resolution on Galaxy side when building up command line and instead pass the tool's requirements along to the LWR server for resolution on that end. This requires a very up-to-date LWR server. I think right now 'galaxy_packages' and 'modules' resolution would work but not 'tool_shed_package' resolution - I think more repository information related to the tool needs to be passed along to the LWR to enable this. Updates the LWR client code through LWR changeset 4380e65 - mostly bringing in changes to enable this but also includes other small refactorings to stager.py. Affected #: 4 files diff -r b513c61601ef6bec6713320cd4e23fc4e020359e -r 3a7dee0e35b7b83617c8df8b943e74199941ff55 lib/galaxy/jobs/runners/lwr.py --- a/lib/galaxy/jobs/runners/lwr.py +++ b/lib/galaxy/jobs/runners/lwr.py @@ -68,6 +68,9 @@ return try: + dependency_resolution = LwrJobRunner.__dependency_resolution( client ) + remote_dependency_resolution = dependency_resolution == "remote" + requirements = job_wrapper.tool.requirements if remote_dependency_resolution else [] client_job_description = ClientJobDescription( command_line=command_line, output_files=self.get_output_files(job_wrapper), @@ -75,6 +78,7 @@ working_directory=job_wrapper.working_directory, tool=job_wrapper.tool, config_files=job_wrapper.extra_filenames, + requirements=requirements, ) job_id = lwr_submit_job(client, client_job_description, remote_job_config) log.info("lwr job submitted with job_id %s" % job_id) diff -r b513c61601ef6bec6713320cd4e23fc4e020359e -r 3a7dee0e35b7b83617c8df8b943e74199941ff55 lib/galaxy/jobs/runners/lwr_client/client.py --- a/lib/galaxy/jobs/runners/lwr_client/client.py +++ b/lib/galaxy/jobs/runners/lwr_client/client.py @@ -197,7 +197,7 @@ } self._raw_execute("download_output", output_params, output_path=output_path) - def launch(self, command_line): + def launch(self, command_line, requirements=[]): """ Run or queue up the execution of the supplied `command_line` on the remote server. @@ -211,6 +211,8 @@ submit_params = self._submit_params if submit_params: launch_params['params'] = dumps(submit_params) + if requirements: + launch_params['requirements'] = dumps([requirement.to_dict() for requirement in requirements]) return self._raw_execute("launch", launch_params) def kill(self): diff -r b513c61601ef6bec6713320cd4e23fc4e020359e -r 3a7dee0e35b7b83617c8df8b943e74199941ff55 lib/galaxy/jobs/runners/lwr_client/stager.py --- a/lib/galaxy/jobs/runners/lwr_client/stager.py +++ b/lib/galaxy/jobs/runners/lwr_client/stager.py @@ -3,6 +3,7 @@ from re import findall from re import compile from io import open +from contextlib import contextmanager from .action_mapper import FileActionMapper @@ -297,41 +298,58 @@ def finish_job(client, cleanup_job, job_completed_normally, working_directory, work_dir_outputs, output_files, working_directory_contents=[]): """ """ + download_failure_exceptions = [] + if job_completed_normally: + download_failure_exceptions = __download_results(client, working_directory, work_dir_outputs, output_files, working_directory_contents) + return __clean(download_failure_exceptions, cleanup_job, client) + + +def __download_results(client, working_directory, work_dir_outputs, output_files, working_directory_contents): action_mapper = FileActionMapper(client) - download_failure_exceptions = [] downloaded_working_directory_files = [] - if job_completed_normally: - # Fetch explicit working directory outputs. - for source_file, output_file in work_dir_outputs: - name = basename(source_file) - try: - action = action_mapper.action(output_file, 'output') - client.fetch_work_dir_output(name, working_directory, output_file, action[0]) - downloaded_working_directory_files.append(name) - except Exception as e: - download_failure_exceptions.append(e) - # Remove from full output_files list so don't try to download directly. - output_files.remove(output_file) + exception_tracker = DownloadExceptionTracker() - # Fetch output files. - for output_file in output_files: - try: - action = action_mapper.action(output_file, 'output') - client.fetch_output(output_file, working_directory=working_directory, action=action[0]) - except Exception as e: - download_failure_exceptions.append(e) + # Fetch explicit working directory outputs. + for source_file, output_file in work_dir_outputs: + name = basename(source_file) + with exception_tracker(): + action = action_mapper.action(output_file, 'output') + client.fetch_work_dir_output(name, working_directory, output_file, action[0]) + downloaded_working_directory_files.append(name) + # Remove from full output_files list so don't try to download directly. + output_files.remove(output_file) - # Fetch remaining working directory outputs of interest. - for name in working_directory_contents: - if name in downloaded_working_directory_files: - continue - if COPY_FROM_WORKING_DIRECTORY_PATTERN.match(name): + # Fetch output files. + for output_file in output_files: + with exception_tracker(): + action = action_mapper.action(output_file, 'output') + client.fetch_output(output_file, working_directory=working_directory, action=action[0]) + + # Fetch remaining working directory outputs of interest. + for name in working_directory_contents: + if name in downloaded_working_directory_files: + continue + if COPY_FROM_WORKING_DIRECTORY_PATTERN.match(name): + with exception_tracker(): output_file = join(working_directory, name) action = action_mapper.action(output_file, 'output') client.fetch_work_dir_output(name, working_directory, output_file, action=action[0]) downloaded_working_directory_files.append(name) - return __clean(download_failure_exceptions, cleanup_job, client) + return exception_tracker.download_failure_exceptions + + +class DownloadExceptionTracker(object): + + def __init__(self): + self.download_failure_exceptions = [] + + @contextmanager + def __call__(self): + try: + yield + except Exception as e: + self.download_failure_exceptions.append(e) def __clean(download_failure_exceptions, cleanup_job, client): @@ -350,7 +368,7 @@ file_stager = FileStager(client, client_job_description, job_config) rebuilt_command_line = file_stager.get_rewritten_command_line() job_id = file_stager.job_id - client.launch(rebuilt_command_line) + client.launch(rebuilt_command_line, requirements=client_job_description.requirements) return job_id @@ -383,15 +401,17 @@ Directory containing tool to execute (if a wrapper is used, it will be transferred to remote server). working_directory : str Local path created by Galaxy for running this job. + requirements : list + List of requirements for tool execution. """ - def __init__(self, tool, command_line, config_files, input_files, output_files, working_directory): + def __init__(self, tool, command_line, config_files, input_files, output_files, working_directory, requirements): self.tool = tool self.command_line = command_line self.config_files = config_files self.input_files = input_files self.output_files = output_files self.working_directory = working_directory - + self.requirements = requirements __all__ = [submit_job, ClientJobDescription, finish_job] diff -r b513c61601ef6bec6713320cd4e23fc4e020359e -r 3a7dee0e35b7b83617c8df8b943e74199941ff55 lib/galaxy/tools/deps/requirements.py --- a/lib/galaxy/tools/deps/requirements.py +++ b/lib/galaxy/tools/deps/requirements.py @@ -15,6 +15,16 @@ self.type = type self.version = version + def to_dict( self ): + return dict(name=self.name, type=self.type, version=self.version) + + @staticmethod + def from_dict( dict ): + version = dict.get( "version", None ) + name = dict.get("name", None) + type = dict.get("type", None) + return ToolRequirement( name=name, type=type, version=version ) + def parse_requirements_from_xml( xml_root ): """ https://bitbucket.org/galaxy/galaxy-central/commits/a831eb375268/ Changeset: a831eb375268 User: jmchilton Date: 2013-12-11 17:26:15 Summary: Fill out job_conf.xml example with LWR options added over recent months. Affected #: 1 file diff -r 3a7dee0e35b7b83617c8df8b943e74199941ff55 -r a831eb375268fcf80479f7bf325ec7d8a757582c job_conf.xml.sample_advanced --- a/job_conf.xml.sample_advanced +++ b/job_conf.xml.sample_advanced @@ -52,8 +52,37 @@ <param id="private_token">123456789changeme</param><!-- Uncomment the following statement to disable file staging (e.g. if there is a shared file system between Galaxy and the LWR - server). --> + server). Alternatively action can be set to 'copy' - to replace + http transfers with file system copies. --><!-- <param id="default_file_action">none</param> --> + <!-- The above option is just the default, the transfer behavior + none|copy|http can be configured on a per path basis via the + following file. See lib/galaxy/jobs/runners/lwr_client/action_mapper.py + for examples of how to configure this file. This is very beta + and nature of file will likely change. + --> + <!-- <param id="file_action_config">file_actions.json</param> --> + <!-- Uncomment following option to disable Galaxy tool dependency + resolution and utilize remote LWR's configuraiton of tool + dependency resolution instead (same options as Galaxy for + dependency resolution are available in LWR). + --> + <!-- <param id="dependency_resolution">remote</params> --> + <!-- Uncomment following option to enable setting metadata on remote + LWR server. The 'use_remote_datatypes' option is available for + determining whether to use remotely configured datatypes or local + ones (both alternatives are a little brittle). --> + <!-- <param id="remote_metadata">true</param> --> + <!-- <param id="use_remote_datatypes">false</param> --> + <!-- If remote LWR server is configured to run jobs as the real user, + uncomment the following line to pass the current Galaxy user + along. --> + <!-- <param id="submit_user">$__user_name__</param> --> + <!-- Various other submission parameters can be passed along to the LWR + whose use will depend on the remote LWR's configured job manager. + For instance: + --> + <!-- <param id="submit_native_specification">-P bignodes -R y -pe threads 8</param> --></destination><destination id="ssh_torque" runner="cli"><param id="shell_plugin">SecureShell</param> Repository URL: https://bitbucket.org/galaxy/galaxy-central/ -- This is a commit notification from bitbucket.org. You are receiving this because you have the service enabled, addressing the recipient of this email.
participants (1)
-
commits-noreply@bitbucket.org