commit/galaxy-central: 11 new changesets
11 new commits in galaxy-central: https://bitbucket.org/galaxy/galaxy-central/commits/3f8f90da2773/ Changeset: 3f8f90da2773 User: jmchilton Date: 2014-01-03 00:50:10 Summary: Fix LWR exit code handling. Affected #: 1 file diff -r 5378f0517318861997ae7be112bbbf4098bdf7ce -r 3f8f90da27736f7ae1e7dfee4bd2e43bc1cc4bfd lib/galaxy/jobs/runners/lwr.py --- a/lib/galaxy/jobs/runners/lwr.py +++ b/lib/galaxy/jobs/runners/lwr.py @@ -173,6 +173,7 @@ run_results = client.raw_check_complete() stdout = run_results.get('stdout', '') stderr = run_results.get('stderr', '') + exit_code = run_results.get('returncode', None) working_directory_contents = run_results.get('working_directory_contents', []) # Use LWR client code to transfer/copy files back # and cleanup job if needed. @@ -207,7 +208,7 @@ self._handle_metadata_externally( job_wrapper, resolve_requirements=True ) # Finish the job try: - job_wrapper.finish( stdout, stderr ) + job_wrapper.finish( stdout, stderr, exit_code ) except Exception: log.exception("Job wrapper finish method failed") job_wrapper.fail("Unable to finish job", exception=True) https://bitbucket.org/galaxy/galaxy-central/commits/18b2967240e5/ Changeset: 18b2967240e5 User: jmchilton Date: 2014-01-03 00:50:10 Summary: Refactor JobRunner's in_directory out into galaxy.util for reuse. Affected #: 2 files diff -r 3f8f90da27736f7ae1e7dfee4bd2e43bc1cc4bfd -r 18b2967240e53d77ed87c3ef5aeaec532597bd58 lib/galaxy/jobs/runners/__init__.py --- a/lib/galaxy/jobs/runners/__init__.py +++ b/lib/galaxy/jobs/runners/__init__.py @@ -15,6 +15,7 @@ from galaxy.jobs.command_factory import build_command from galaxy import model from galaxy.util import DATABASE_MAX_STRING_SIZE, shrink_stream_by_size +from galaxy.util import in_directory from galaxy.jobs.runners.util.job_script import job_script log = logging.getLogger( __name__ ) @@ -154,18 +155,6 @@ if not job_working_directory: job_working_directory = os.path.abspath( job_wrapper.working_directory ) - def in_directory( file, directory ): - """ - Return true, if the common prefix of both is equal to directory - e.g. /a/b/c/d.rst and directory is /a/b, the common prefix is /a/b - """ - - # Make both absolute. - directory = os.path.abspath( directory ) - file = os.path.abspath( file ) - - return os.path.commonprefix( [ file, directory ] ) == directory - # Set up dict of dataset id --> output path; output path can be real or # false depending on outputs_to_working_directory output_paths = {} diff -r 3f8f90da27736f7ae1e7dfee4bd2e43bc1cc4bfd -r 18b2967240e53d77ed87c3ef5aeaec532597bd58 lib/galaxy/util/__init__.py --- a/lib/galaxy/util/__init__.py +++ b/lib/galaxy/util/__init__.py @@ -374,6 +374,20 @@ return default return out + +def in_directory( file, directory ): + """ + Return true, if the common prefix of both is equal to directory + e.g. /a/b/c/d.rst and directory is /a/b, the common prefix is /a/b + """ + + # Make both absolute. + directory = os.path.abspath( directory ) + file = os.path.abspath( file ) + + return os.path.commonprefix( [ file, directory ] ) == directory + + class Params( object ): """ Stores and 'sanitizes' parameters. Alphanumeric characters and the https://bitbucket.org/galaxy/galaxy-central/commits/9920d006a966/ Changeset: 9920d006a966 User: jmchilton Date: 2014-01-03 00:50:11 Summary: Update LWR client through LWR changeset 8ef5299. This encompasses numerous individual LWR changesets. The largest of these changes include extensions to: - Enable LWR client to work in a fashion where remote paths are precalculated and used during job input evaluation instead of having these inputs rewritten after the fact. (Galaxy + LWR runner require significant modifications to enable this - found in subsequent changesets). https://bitbucket.org/jmchilton/lwr/commits/a06587ff0544f3a09cf221057be1b910.... - Allow staging or arbitrary paths (not restricted to job files such as inputs, working directory files, configs, etc...). https://bitbucket.org/jmchilton/lwr/commits/63981e79696337399edb42be5614bc72.... - Vast improvements to extra_files_path handling - before it only worked for one level of input extra_files_path - now works with arbitrary nesting of inputs and outputs. https://bitbucket.org/jmchilton/lwr/commits/b5e8c4dffc3a04639550e01f3f44ac31..., https://bitbucket.org/jmchilton/lwr/commits/ab5bc61f6e0c9e11748436bd61aa1f6c.... - Handle tools with version_command defined (LWR servers running on *nix systems only). https://bitbucket.org/jmchilton/lwr/commits/a3b43baa1aa6a8167a4e8cbb195d2a96.... - Allow restircting path mapping to specific input types (input, tool, config, workdir, output, output_workdir, *default* (all previous), unstructured (new for arbitrary files), and *any* (*default* + unstructured). https://bitbucket.org/jmchilton/lwr/commits/27e678d47846c2fdf5792d0c64167d15... As well as several major refactorings to break improve the LWR client code structure - Break up stager.py into smaller modules - https://bitbucket.org/jmchilton/lwr/commits/d0efda40b2c92100161ca7f825cb754d... - Refactor complex method __download_results into class with smaller helper methods - https://bitbucket.org/jmchilton/lwr/commits/45fd16e52579c273eea5f52948ca57a1... - Introduce higher level abstrction for staging actions - https://bitbucket.org/jmchilton/lwr/commits/35ea7e2fa88714aff5a38f0e0a55e3fd.... Affected #: 10 files diff -r 18b2967240e53d77ed87c3ef5aeaec532597bd58 -r 9920d006a96618d7729ef09e0fb861b22ff530db lib/galaxy/jobs/runners/lwr.py --- a/lib/galaxy/jobs/runners/lwr.py +++ b/lib/galaxy/jobs/runners/lwr.py @@ -15,6 +15,8 @@ from .lwr_client import finish_job as lwr_finish_job from .lwr_client import submit_job as lwr_submit_job from .lwr_client import ClientJobDescription +from .lwr_client import LwrOutputs +from .lwr_client import GalaxyOutputs log = logging.getLogger( __name__ ) @@ -79,6 +81,7 @@ tool=job_wrapper.tool, config_files=job_wrapper.extra_filenames, requirements=requirements, + version_file=job_wrapper.get_version_string_path(), ) job_id = lwr_submit_job(client, client_job_description, remote_job_config) log.info("lwr job submitted with job_id %s" % job_id) @@ -174,7 +177,7 @@ stdout = run_results.get('stdout', '') stderr = run_results.get('stderr', '') exit_code = run_results.get('returncode', None) - working_directory_contents = run_results.get('working_directory_contents', []) + lwr_outputs = LwrOutputs(run_results) # Use LWR client code to transfer/copy files back # and cleanup job if needed. completed_normally = \ @@ -188,13 +191,17 @@ # no need to handle them differently here. work_dir_outputs = [] output_files = self.get_output_files( job_wrapper ) + galaxy_outputs = GalaxyOutputs( + working_directory=job_wrapper.working_directory, + work_dir_outputs=work_dir_outputs, + output_files=output_files, + version_file=job_wrapper.get_version_string_path(), + ) finish_args = dict( client=client, - working_directory=job_wrapper.working_directory, job_completed_normally=completed_normally, cleanup_job=cleanup_job, - work_dir_outputs=work_dir_outputs, - output_files=output_files, - working_directory_contents=working_directory_contents ) + galaxy_outputs=galaxy_outputs, + lwr_outputs=lwr_outputs ) failed = lwr_finish_job( **finish_args ) if failed: diff -r 18b2967240e53d77ed87c3ef5aeaec532597bd58 -r 9920d006a96618d7729ef09e0fb861b22ff530db lib/galaxy/jobs/runners/lwr_client/__init__.py --- a/lib/galaxy/jobs/runners/lwr_client/__init__.py +++ b/lib/galaxy/jobs/runners/lwr_client/__init__.py @@ -6,9 +6,24 @@ """ -from .stager import submit_job, finish_job, ClientJobDescription +from .staging.down import finish_job +from .staging.up import submit_job +from .staging import ClientJobDescription +from .staging import LwrOutputs +from .staging import GalaxyOutputs from .client import OutputNotFoundException from .manager import ClientManager from .destination import url_to_destination_params +from .path_mapper import PathMapper -__all__ = [ClientManager, OutputNotFoundException, url_to_destination_params, finish_job, submit_job, ClientJobDescription] +__all__ = [ + ClientManager, + OutputNotFoundException, + url_to_destination_params, + finish_job, + submit_job, + ClientJobDescription, + LwrOutputs, + GalaxyOutputs, + PathMapper, +] diff -r 18b2967240e53d77ed87c3ef5aeaec532597bd58 -r 9920d006a96618d7729ef09e0fb861b22ff530db lib/galaxy/jobs/runners/lwr_client/action_mapper.py --- a/lib/galaxy/jobs/runners/lwr_client/action_mapper.py +++ b/lib/galaxy/jobs/runners/lwr_client/action_mapper.py @@ -1,12 +1,49 @@ from simplejson import load from os.path import abspath -from fnmatch import fnmatch +from os.path import dirname +from os.path import join +from os.path import basename +from os.path import sep +import fnmatch from re import compile - +from re import escape +from galaxy.util.bunch import Bunch +from .util import directory_files +from .util import unique_path_prefix DEFAULT_MAPPED_ACTION = 'transfer' # Not really clear to me what this should be, exception? DEFAULT_PATH_MAPPER_TYPE = 'prefix' +# Poor man's enum. +path_type = Bunch( + # Galaxy input datasets and extra files. + INPUT="input", + # Galaxy config and param files. + CONFIG="config", + # Files from tool's tool_dir (for now just wrapper if available). + TOOL="tool", + # Input work dir files - e.g. metadata files, task-split input files, etc.. + WORKDIR="workdir", + # Galaxy output datasets in their final home. + OUTPUT="output", + # Galaxy from_work_dir output paths and other files (e.g. galaxy.json) + OUTPUT_WORKDIR="output_workdir", + # Other fixed tool parameter paths (likely coming from tool data, but not + # nessecarily). Not sure this is the best name... + UNSTRUCTURED="unstructured", +) + + +ACTION_DEFAULT_PATH_TYPES = [ + path_type.INPUT, + path_type.CONFIG, + path_type.TOOL, + path_type.WORKDIR, + path_type.OUTPUT, + path_type.OUTPUT_WORKDIR, +] +ALL_PATH_TYPES = ACTION_DEFAULT_PATH_TYPES + [path_type.UNSTRUCTURED] + class FileActionMapper(object): """ @@ -15,37 +52,61 @@ >>> json_string = r'''{"paths": [ \ {"path": "/opt/galaxy", "action": "none"}, \ {"path": "/galaxy/data", "action": "transfer"}, \ - {"path": "/cool/bamfiles/**/*.bam", "action": "copy", "type": "glob"}, \ - {"path": ".*/dataset_\\\\d+.dat", "action": "copy", "type": "regex"} \ + {"path": "/cool/bamfiles/**/*.bam", "action": "copy", "match_type": "glob"}, \ + {"path": ".*/dataset_\\\\d+.dat", "action": "copy", "match_type": "regex"} \ ]}''' >>> from tempfile import NamedTemporaryFile >>> from os import unlink - >>> f = NamedTemporaryFile(delete=False) - >>> write_result = f.write(json_string.encode('UTF-8')) - >>> f.close() - >>> class MockClient(): - ... default_file_action = 'none' - ... action_config_path = f.name - ... - >>> mapper = FileActionMapper(MockClient()) - >>> unlink(f.name) + >>> def mapper_for(default_action, config_contents): + ... f = NamedTemporaryFile(delete=False) + ... f.write(config_contents.encode('UTF-8')) + ... f.close() + ... mock_client = Bunch(default_file_action=default_action, action_config_path=f.name) + ... mapper = FileActionMapper(mock_client) + ... unlink(f.name) + ... return mapper + >>> mapper = mapper_for(default_action='none', config_contents=json_string) >>> # Test first config line above, implicit path prefix mapper - >>> mapper.action('/opt/galaxy/tools/filters/catWrapper.py', 'input')[0] == u'none' + >>> action = mapper.action('/opt/galaxy/tools/filters/catWrapper.py', 'input') + >>> action.action_type == u'none' True + >>> action.staging_needed + False >>> # Test another (2nd) mapper, this one with a different action - >>> mapper.action('/galaxy/data/files/000/dataset_1.dat', 'input')[0] == u'transfer' + >>> action = mapper.action('/galaxy/data/files/000/dataset_1.dat', 'input') + >>> action.action_type == u'transfer' + True + >>> action.staging_needed True >>> # Always at least copy work_dir outputs. - >>> mapper.action('/opt/galaxy/database/working_directory/45.sh', 'work_dir')[0] == u'copy' + >>> action = mapper.action('/opt/galaxy/database/working_directory/45.sh', 'workdir') + >>> action.action_type == u'copy' + True + >>> action.staging_needed True >>> # Test glob mapper (matching test) - >>> mapper.action('/cool/bamfiles/projectABC/study1/patient3.bam', 'input')[0] == u'copy' + >>> mapper.action('/cool/bamfiles/projectABC/study1/patient3.bam', 'input').action_type == u'copy' True >>> # Test glob mapper (non-matching test) - >>> mapper.action('/cool/bamfiles/projectABC/study1/patient3.bam.bai', 'input')[0] == u'none' + >>> mapper.action('/cool/bamfiles/projectABC/study1/patient3.bam.bai', 'input').action_type == u'none' True >>> # Regex mapper test. - >>> mapper.action('/old/galaxy/data/dataset_10245.dat', 'input')[0] == u'copy' + >>> mapper.action('/old/galaxy/data/dataset_10245.dat', 'input').action_type == u'copy' + True + >>> # Doesn't map unstructured paths by default + >>> mapper.action('/old/galaxy/data/dataset_10245.dat', 'unstructured').action_type == u'none' + True + >>> input_only_mapper = mapper_for(default_action="none", config_contents=r'''{"paths": [ \ + {"path": "/", "action": "transfer", "path_types": "input"} \ + ] }''') + >>> input_only_mapper.action('/dataset_1.dat', 'input').action_type == u'transfer' + True + >>> input_only_mapper.action('/dataset_1.dat', 'output').action_type == u'none' + True + >>> unstructured_mapper = mapper_for(default_action="none", config_contents=r'''{"paths": [ \ + {"path": "/", "action": "transfer", "path_types": "*any*"} \ + ] }''') + >>> unstructured_mapper.action('/old/galaxy/data/dataset_10245.dat', 'unstructured').action_type == u'transfer' True """ @@ -59,27 +120,92 @@ def __load_action_config(self, path): config = load(open(path, 'rb')) for path_config in config.get('paths', []): - map_type = path_config.get('type', DEFAULT_PATH_MAPPER_TYPE) + map_type = path_config.get('match_type', DEFAULT_PATH_MAPPER_TYPE) self.mappers.append(mappers[map_type](path_config)) - def action(self, path, type): - action = self.default_action + def action(self, path, type, mapper=None): + action_type = self.default_action if type in ACTION_DEFAULT_PATH_TYPES else "none" + file_lister = DEFAULT_FILE_LISTER normalized_path = abspath(path) - for mapper in self.mappers: - if mapper.matches(normalized_path): - action = mapper.action - break - if type in ["work_dir", "output_task"] and action == "none": + if not mapper: + for query_mapper in self.mappers: + if query_mapper.matches(normalized_path, type): + mapper = query_mapper + break + if mapper: + action_type = mapper.action_type + file_lister = mapper.file_lister + if type in ["workdir", "output_workdir"] and action_type == "none": ## We are changing the working_directory relative to what ## Galaxy would use, these need to be copied over. - action = "copy" - return (action,) + action_type = "copy" + action_class = actions.get(action_type, None) + if action_class is None: + message_template = "Unknown action_type encountered %s while trying to map path %s" + message_args = (action_type, path) + raise Exception(message_template % message_args) + return action_class(path, file_lister=file_lister) + + def unstructured_mappers(self): + """ Return mappers that will map 'unstructured' files (i.e. go beyond + mapping inputs, outputs, and config files). + """ + return filter(lambda m: path_type.UNSTRUCTURED in m.path_types, self.mappers) + + +class BaseAction(object): + + def __init__(self, path, file_lister=None): + self.path = path + self.file_lister = file_lister or DEFAULT_FILE_LISTER + + def unstructured_map(self): + unstructured_map = self.file_lister.unstructured_map(self.path) + # To ensure uniqueness, prepend unique prefix to each name + prefix = unique_path_prefix(self.path) + for path, name in unstructured_map.iteritems(): + unstructured_map[path] = join(prefix, name) + return unstructured_map + + +class NoneAction(BaseAction): + """ This action indicates the corresponding path does not require any + additional action. This should indicate paths that are available both on + the LWR client (i.e. Galaxy server) and remote LWR server with the same + paths. """ + action_type = "none" + staging_needed = False + + +class TransferAction(BaseAction): + """ This actions indicates that the LWR client should initiate an HTTP + transfer of the corresponding path to the remote LWR server before + launching the job. """ + action_type = "transfer" + staging_needed = True + + +class CopyAction(BaseAction): + """ This action indicates that the LWR client should execute a file system + copy of the corresponding path to the LWR staging directory prior to + launching the corresponding job. """ + action_type = "copy" + staging_needed = True class BasePathMapper(object): def __init__(self, config): - self.action = config.get('action', DEFAULT_MAPPED_ACTION) + self.action_type = config.get('action', DEFAULT_MAPPED_ACTION) + path_types_str = config.get('path_types', "*defaults*") + path_types_str = path_types_str.replace("*defaults*", ",".join(ACTION_DEFAULT_PATH_TYPES)) + path_types_str = path_types_str.replace("*any*", ",".join(ALL_PATH_TYPES)) + self.path_types = path_types_str.split(",") + self.file_lister = FileLister(config) + + def matches(self, path, path_type): + path_type_matches = path_type in self.path_types + return path_type_matches and self._path_matches(path) class PrefixPathMapper(BasePathMapper): @@ -88,9 +214,13 @@ super(PrefixPathMapper, self).__init__(config) self.prefix_path = abspath(config['path']) - def matches(self, path): + def _path_matches(self, path): return path.startswith(self.prefix_path) + def to_pattern(self): + pattern_str = "(%s%s[^\s,\"\']+)" % (escape(self.prefix_path), escape(sep)) + return compile(pattern_str) + class GlobPathMapper(BasePathMapper): @@ -98,8 +228,11 @@ super(GlobPathMapper, self).__init__(config) self.glob_path = config['path'] - def matches(self, path): - return fnmatch(path, self.glob_path) + def _path_matches(self, path): + return fnmatch.fnmatch(path, self.glob_path) + + def to_pattern(self): + return compile(fnmatch.translate(self.glob_path)) class RegexPathMapper(BasePathMapper): @@ -108,9 +241,32 @@ super(RegexPathMapper, self).__init__(config) self.pattern = compile(config['path']) - def matches(self, path): + def _path_matches(self, path): return self.pattern.match(path) is not None + def to_pattern(self): + return self.pattern + + +class FileLister(object): + + def __init__(self, config): + self.depth = int(config.get("depth", "0")) + + def unstructured_map(self, path): + depth = self.depth + if self.depth == 0: + return {path: basename(path)} + else: + while depth > 0: + path = dirname(path) + depth -= 1 + return dict([(join(path, f), f) for f in directory_files(path)]) + +DEFAULT_FILE_LISTER = FileLister(dict(depth=0)) + +ACTION_CLASSES = [NoneAction, TransferAction, CopyAction] +actions = dict([(clazz.action_type, clazz) for clazz in ACTION_CLASSES]) mappers = { 'prefix': PrefixPathMapper, @@ -119,4 +275,4 @@ } -__all__ = [FileActionMapper] +__all__ = [FileActionMapper, path_type] diff -r 18b2967240e53d77ed87c3ef5aeaec532597bd58 -r 9920d006a96618d7729ef09e0fb861b22ff530db lib/galaxy/jobs/runners/lwr_client/client.py --- a/lib/galaxy/jobs/runners/lwr_client/client.py +++ b/lib/galaxy/jobs/runners/lwr_client/client.py @@ -82,16 +82,16 @@ args = {"job_id": self.job_id, "name": name, "input_type": input_type} return self._raw_execute('input_path', args) - def put_file(self, path, input_type, name=None, contents=None, action='transfer'): + def put_file(self, path, input_type, name=None, contents=None, action_type='transfer'): if not name: name = os.path.basename(path) args = {"job_id": self.job_id, "name": name, "input_type": input_type} input_path = path if contents: input_path = None - if action == 'transfer': + if action_type == 'transfer': return self._upload_file(args, contents, input_path) - elif action == 'copy': + elif action_type == 'copy': lwr_path = self._raw_execute('input_path', args) self._copy(path, lwr_path) return {'path': lwr_path} @@ -105,11 +105,14 @@ ## path. Use old paths. input_type = args['input_type'] action = { - 'input': 'upload_input', - 'input_extra': 'upload_extra_input', + # For backward compatibility just target upload_input_extra for all + # inputs, it allows nested inputs. Want to do away with distinction + # inputs and extra inputs. + 'input': 'upload_extra_input', 'config': 'upload_config_file', - 'work_dir': 'upload_working_directory_file', - 'tool': 'upload_tool_file' + 'workdir': 'upload_working_directory_file', + 'tool': 'upload_tool_file', + 'unstructured': 'upload_unstructured_file', }[input_type] del args['input_type'] return action @@ -119,7 +122,23 @@ return self._raw_execute("get_output_type", {"name": name, "job_id": self.job_id}) - def fetch_output(self, path, working_directory, action='transfer'): + # Deprecated + def fetch_output_legacy(self, path, working_directory, action_type='transfer'): + # Needs to determine if output is task/working directory or standard. + name = os.path.basename(path) + + output_type = self._get_output_type(name) + if output_type == "none": + # Just make sure the file was created. + if not os.path.exists(path): + raise OutputNotFoundException(path) + return + elif output_type in ["task"]: + path = os.path.join(working_directory, name) + + self.__populate_output_path(name, path, output_type, action_type) + + def fetch_output(self, path, name=None, check_exists_remotely=False, action_type='transfer'): """ Download an output dataset from the remote server. @@ -130,38 +149,23 @@ working_directory : str Local working_directory for the job. """ - name = os.path.basename(path) - output_type = self._get_output_type(name) - if output_type == "none": - # Just make sure the file was created. - if not os.path.exists(path): - raise OutputNotFoundException(path) - return + if not name: + # Extra files will send in the path. + name = os.path.basename(path) - output_path = self.__output_path(path, name, working_directory, output_type) - self.__populate_output_path(name, output_path, output_type, action) + output_type = "direct" # Task/from_work_dir outputs now handled with fetch_work_dir_output + self.__populate_output_path(name, path, output_type, action_type) - def __populate_output_path(self, name, output_path, output_type, action): - if action == 'transfer': + def __populate_output_path(self, name, output_path, output_type, action_type): + self.__ensure_directory(output_path) + if action_type == 'transfer': self.__raw_download_output(name, self.job_id, output_type, output_path) - elif action == 'copy': + elif action_type == 'copy': lwr_path = self._output_path(name, self.job_id, output_type)['path'] self._copy(lwr_path, output_path) - def __output_path(self, path, name, working_directory, output_type): - """ - Preconditions: output_type is not 'none'. - """ - if output_type == "direct": - output_path = path - elif output_type == "task": - output_path = os.path.join(working_directory, name) - else: - raise Exception("Unknown output_type returned from LWR server %s" % output_type) - return output_path - - def fetch_work_dir_output(self, name, working_directory, output_path, action='transfer'): + def fetch_work_dir_output(self, name, working_directory, output_path, action_type='transfer'): """ Download an output dataset specified with from_work_dir from the remote server. @@ -175,12 +179,18 @@ output_path : str Full path to output dataset. """ - if action == 'transfer': + self.__ensure_directory(output_path) + if action_type == 'transfer': self.__raw_download_output(name, self.job_id, "work_dir", output_path) else: # Even if action is none - LWR has a different work_dir so this needs to be copied. lwr_path = self._output_path(name, self.job_id, 'work_dir')['path'] self._copy(lwr_path, output_path) + def __ensure_directory(self, output_path): + output_path_directory = os.path.dirname(output_path) + if not os.path.exists(output_path_directory): + os.makedirs(output_path_directory) + @parseJson() def _output_path(self, name, job_id, output_type): return self._raw_execute("output_path", diff -r 18b2967240e53d77ed87c3ef5aeaec532597bd58 -r 9920d006a96618d7729ef09e0fb861b22ff530db lib/galaxy/jobs/runners/lwr_client/path_mapper.py --- /dev/null +++ b/lib/galaxy/jobs/runners/lwr_client/path_mapper.py @@ -0,0 +1,97 @@ +import os.path +from .action_mapper import FileActionMapper +from .action_mapper import path_type +from .util import PathHelper + +from galaxy.util import in_directory + + +class PathMapper(object): + """ Ties together a FileActionMapper and remote job configuration returned + by the LWR setup method to pre-determine the location of files for staging + on the remote LWR server. + + This is not useful when rewrite_paths (as has traditionally been done with + the LWR) because when doing that the LWR determines the paths as files are + uploaded. When rewrite_paths is disabled however, the destination of files + needs to be determined prior to transfer so an object of this class can be + used. + """ + + def __init__( + self, + client, + remote_job_config, + local_working_directory, + action_mapper=None, + ): + self.local_working_directory = local_working_directory + if not action_mapper: + action_mapper = FileActionMapper(client) + self.action_mapper = action_mapper + self.input_directory = remote_job_config["inputs_directory"] + self.output_directory = remote_job_config["outputs_directory"] + self.working_directory = remote_job_config["working_directory"] + self.unstructured_files_directory = remote_job_config["unstructured_files_directory"] + self.config_directory = remote_job_config["configs_directory"] + separator = remote_job_config["system_properties"]["separator"] + self.path_helper = PathHelper(separator) + + def remote_output_path_rewrite(self, local_path): + output_type = path_type.OUTPUT + if in_directory(local_path, self.local_working_directory): + output_type = path_type.OUTPUT_WORKDIR + remote_path = self.__remote_path_rewrite(local_path, output_type) + return remote_path + + def remote_input_path_rewrite(self, local_path): + remote_path = self.__remote_path_rewrite(local_path, path_type.INPUT) + return remote_path + + def remote_version_path_rewrite(self, local_path): + remote_path = self.__remote_path_rewrite(local_path, path_type.OUTPUT, name="COMMAND_VERSION") + return remote_path + + def check_for_arbitrary_rewrite(self, local_path): + if not os.path.exists(local_path): + return None, [] + + path = str(local_path) # Use false_path if needed. + action = self.action_mapper.action(path, path_type.UNSTRUCTURED) + if not action.staging_needed: + return None, [] + unique_names = action.unstructured_map() + name = unique_names[path] + remote_path = self.path_helper.remote_join(self.unstructured_files_directory, name) + return remote_path, unique_names + + def __remote_path_rewrite(self, dataset_path, dataset_path_type, name=None): + """ Return remote path of this file (if staging is required) else None. + """ + path = str(dataset_path) # Use false_path if needed. + action = self.action_mapper.action(path, dataset_path_type) + remote_path_rewrite = None + if action.staging_needed: + if name is None: + name = os.path.basename(path) + remote_directory = self.__remote_directory(dataset_path_type) + remote_path_rewrite = self.path_helper.remote_join(remote_directory, name) + return remote_path_rewrite + + def __action(self, dataset_path, dataset_path_type): + path = str(dataset_path) # Use false_path if needed. + action = self.action_mapper.action(path, dataset_path_type) + return action + + def __remote_directory(self, dataset_path_type): + if dataset_path_type in [path_type.OUTPUT]: + return self.output_directory + elif dataset_path_type in [path_type.WORKDIR, path_type.OUTPUT_WORKDIR]: + return self.working_directory + elif dataset_path_type in [path_type.INPUT]: + return self.input_directory + else: + message = "PathMapper cannot handle path type %s" % dataset_path_type + raise Exception(message) + +__all__ = [PathMapper] diff -r 18b2967240e53d77ed87c3ef5aeaec532597bd58 -r 9920d006a96618d7729ef09e0fb861b22ff530db lib/galaxy/jobs/runners/lwr_client/stager.py --- a/lib/galaxy/jobs/runners/lwr_client/stager.py +++ /dev/null @@ -1,417 +0,0 @@ -from os.path import abspath, basename, join, exists -from os import listdir, sep -from re import findall -from re import compile -from io import open -from contextlib import contextmanager - -from .action_mapper import FileActionMapper - -from logging import getLogger -log = getLogger(__name__) - -# All output files marked with from_work_dir attributes will copied or downloaded -# this pattern picks up attiditional files to copy back - such as those -# associated with multiple outputs and metadata configuration. Set to .* to just -# copy everything -COPY_FROM_WORKING_DIRECTORY_PATTERN = compile(r"primary_.*|galaxy.json|metadata_.*") - - -class JobInputs(object): - """ - Abstractions over dynamic inputs created for a given job (namely the command to - execute and created configfiles). - - **Parameters** - - command_line : str - Local command to execute for this job. (To be rewritten.) - config_files : str - Config files created for this job. (To be rewritten.) - - - >>> import tempfile - >>> tf = tempfile.NamedTemporaryFile() - >>> def setup_inputs(tf): - ... open(tf.name, "w").write(u"world /path/to/input the rest") - ... inputs = JobInputs(u"hello /path/to/input", [tf.name]) - ... return inputs - >>> inputs = setup_inputs(tf) - >>> inputs.rewrite_paths(u"/path/to/input", u'C:\\input') - >>> inputs.rewritten_command_line == u'hello C:\\\\input' - True - >>> inputs.rewritten_config_files[tf.name] == u'world C:\\\\input the rest' - True - >>> tf.close() - >>> tf = tempfile.NamedTemporaryFile() - >>> inputs = setup_inputs(tf) - >>> inputs.find_referenced_subfiles('/path/to') == [u'/path/to/input'] - True - >>> inputs.path_referenced('/path/to') - True - >>> inputs.path_referenced(u'/path/to') - True - >>> inputs.path_referenced('/path/to/input') - True - >>> inputs.path_referenced('/path/to/notinput') - False - >>> tf.close() - """ - - def __init__(self, command_line, config_files): - self.rewritten_command_line = command_line - self.rewritten_config_files = {} - for config_file in config_files or []: - config_contents = _read(config_file) - self.rewritten_config_files[config_file] = config_contents - - def find_referenced_subfiles(self, directory): - """ - Return list of files below specified `directory` in job inputs. Could - use more sophisticated logic (match quotes to handle spaces, handle - subdirectories, etc...). - - **Parameters** - - directory : str - Full path to directory to search. - - """ - pattern = r"(%s%s\S+)" % (directory, sep) - referenced_files = set() - for input_contents in self.__items(): - referenced_files.update(findall(pattern, input_contents)) - return list(referenced_files) - - def path_referenced(self, path): - pattern = r"%s" % path - found = False - for input_contents in self.__items(): - if findall(pattern, input_contents): - found = True - break - return found - - def rewrite_paths(self, local_path, remote_path): - """ - Rewrite references to `local_path` with `remote_path` in job inputs. - """ - self.__rewrite_command_line(local_path, remote_path) - self.__rewrite_config_files(local_path, remote_path) - - def __rewrite_command_line(self, local_path, remote_path): - self.rewritten_command_line = self.rewritten_command_line.replace(local_path, remote_path) - - def __rewrite_config_files(self, local_path, remote_path): - for config_file, rewritten_contents in self.rewritten_config_files.items(): - self.rewritten_config_files[config_file] = rewritten_contents.replace(local_path, remote_path) - - def __items(self): - items = [self.rewritten_command_line] - items.extend(self.rewritten_config_files.values()) - return items - - -class TransferTracker(object): - - def __init__(self, client, job_inputs): - self.client = client - self.action_mapper = FileActionMapper(client) - self.job_inputs = job_inputs - self.file_renames = {} - - def handle_transfer(self, path, type, name=None, contents=None): - if contents: - # If contents loaded in memory, no need to write out file and copy, - # just transfer. - action = ('transfer', ) - else: - if not exists(path): - message = "handle_tranfer called on non-existent file - [%s]" % path - log.warn(message) - raise Exception(message) - action = self.__action(path, type) - - if action[0] in ['transfer', 'copy']: - response = self.client.put_file(path, type, name=name, contents=contents) - self.register_rewrite(path, response['path'], type, force=True) - elif action[0] == 'none': - # No action for this file. - pass - else: - raise Exception("Unknown action type (%s) encountered for path (%s)" % (action[0], path)) - - def register_rewrite(self, local_path, remote_path, type, force=False): - action = self.__action(local_path, type) - if action[0] in ['transfer', 'copy'] or force: - self.file_renames[local_path] = remote_path - - def rewrite_input_paths(self): - """ - For each file that has been transferred and renamed, updated - command_line and configfiles to reflect that rewrite. - """ - for local_path, remote_path in self.file_renames.items(): - self.job_inputs.rewrite_paths(local_path, remote_path) - - def __action(self, path, type): - return self.action_mapper.action(path, type) - - -class FileStager(object): - """ - Objects of the FileStager class interact with an LWR client object to - stage the files required to run jobs on a remote LWR server. - - **Parameters** - - client : JobClient - LWR client object. - client_job_description : client_job_description - Description of client view of job to stage and execute remotely. - """ - - def __init__(self, client, client_job_description, job_config): - """ - """ - self.client = client - self.command_line = client_job_description.command_line - self.config_files = client_job_description.config_files - self.input_files = client_job_description.input_files - self.output_files = client_job_description.output_files - self.tool_id = client_job_description.tool.id - self.tool_version = client_job_description.tool.version - self.tool_dir = abspath(client_job_description.tool.tool_dir) - self.working_directory = client_job_description.working_directory - - # Setup job inputs, these will need to be rewritten before - # shipping off to remote LWR server. - self.job_inputs = JobInputs(self.command_line, self.config_files) - - self.transfer_tracker = TransferTracker(client, self.job_inputs) - - self.__handle_setup(job_config) - self.__initialize_referenced_tool_files() - self.__upload_tool_files() - self.__upload_input_files() - self.__upload_working_directory_files() - self.__initialize_output_file_renames() - self.__initialize_task_output_file_renames() - self.__initialize_config_file_renames() - self.__handle_rewrites() - self.__upload_rewritten_config_files() - - def __handle_setup(self, job_config): - if not job_config: - job_config = self.client.setup(self.tool_id, self.tool_version) - - self.new_working_directory = job_config['working_directory'] - self.new_outputs_directory = job_config['outputs_directory'] - # Default configs_directory to match remote working_directory to mimic - # behavior of older LWR servers. - self.new_configs_drectory = job_config.get('configs_directory', self.new_working_directory) - self.remote_path_separator = job_config['path_separator'] - # If remote LWR server assigned job id, use that otherwise - # just use local job_id assigned. - galaxy_job_id = self.client.job_id - self.job_id = job_config.get('job_id', galaxy_job_id) - if self.job_id != galaxy_job_id: - # Remote LWR server assigned an id different than the - # Galaxy job id, update client to reflect this. - self.client.job_id = self.job_id - - def __initialize_referenced_tool_files(self): - self.referenced_tool_files = self.job_inputs.find_referenced_subfiles(self.tool_dir) - - def __upload_tool_files(self): - for referenced_tool_file in self.referenced_tool_files: - self.transfer_tracker.handle_transfer(referenced_tool_file, 'tool') - - def __upload_input_files(self): - for input_file in self.input_files: - self.__upload_input_file(input_file) - self.__upload_input_extra_files(input_file) - - def __upload_input_file(self, input_file): - if self.job_inputs.path_referenced(input_file): - if exists(input_file): - self.transfer_tracker.handle_transfer(input_file, 'input') - else: - message = "LWR: __upload_input_file called on empty or missing dataset." + \ - " So such file: [%s]" % input_file - log.debug(message) - - def __upload_input_extra_files(self, input_file): - # TODO: Determine if this is object store safe and what needs to be - # done if it is not. - files_path = "%s_files" % input_file[0:-len(".dat")] - if exists(files_path) and self.job_inputs.path_referenced(files_path): - for extra_file in listdir(files_path): - extra_file_path = join(files_path, extra_file) - relative_path = basename(files_path) - extra_file_relative_path = join(relative_path, extra_file) - self.transfer_tracker.handle_transfer(extra_file_path, 'input_extra', name=extra_file_relative_path) - - def __upload_working_directory_files(self): - # Task manager stages files into working directory, these need to be - # uploaded if present. - for working_directory_file in listdir(self.working_directory): - path = join(self.working_directory, working_directory_file) - self.transfer_tracker.handle_transfer(path, 'work_dir') - - def __initialize_output_file_renames(self): - for output_file in self.output_files: - remote_path = r'%s%s%s' % (self.new_outputs_directory, self.remote_path_separator, basename(output_file)) - self.transfer_tracker.register_rewrite(output_file, remote_path, 'output') - - def __initialize_task_output_file_renames(self): - for output_file in self.output_files: - name = basename(output_file) - task_file = join(self.working_directory, name) - remote_path = r'%s%s%s' % (self.new_working_directory, self.remote_path_separator, name) - self.transfer_tracker.register_rewrite(task_file, remote_path, 'output_task') - - def __initialize_config_file_renames(self): - for config_file in self.config_files: - remote_path = r'%s%s%s' % (self.new_configs_drectory, self.remote_path_separator, basename(config_file)) - self.transfer_tracker.register_rewrite(config_file, remote_path, 'config') - - def __handle_rewrites(self): - """ - For each file that has been transferred and renamed, updated - command_line and configfiles to reflect that rewrite. - """ - self.transfer_tracker.rewrite_input_paths() - - def __upload_rewritten_config_files(self): - for config_file, new_config_contents in self.job_inputs.rewritten_config_files.items(): - self.client.put_file(config_file, input_type='config', contents=new_config_contents) - - def get_rewritten_command_line(self): - """ - Returns the rewritten version of the command line to execute suitable - for remote host. - """ - return self.job_inputs.rewritten_command_line - - -def finish_job(client, cleanup_job, job_completed_normally, working_directory, work_dir_outputs, output_files, working_directory_contents=[]): - """ - """ - download_failure_exceptions = [] - if job_completed_normally: - download_failure_exceptions = __download_results(client, working_directory, work_dir_outputs, output_files, working_directory_contents) - return __clean(download_failure_exceptions, cleanup_job, client) - - -def __download_results(client, working_directory, work_dir_outputs, output_files, working_directory_contents): - action_mapper = FileActionMapper(client) - downloaded_working_directory_files = [] - exception_tracker = DownloadExceptionTracker() - - # Fetch explicit working directory outputs. - for source_file, output_file in work_dir_outputs: - name = basename(source_file) - with exception_tracker(): - action = action_mapper.action(output_file, 'output') - client.fetch_work_dir_output(name, working_directory, output_file, action[0]) - downloaded_working_directory_files.append(name) - # Remove from full output_files list so don't try to download directly. - output_files.remove(output_file) - - # Fetch output files. - for output_file in output_files: - with exception_tracker(): - action = action_mapper.action(output_file, 'output') - client.fetch_output(output_file, working_directory=working_directory, action=action[0]) - - # Fetch remaining working directory outputs of interest. - for name in working_directory_contents: - if name in downloaded_working_directory_files: - continue - if COPY_FROM_WORKING_DIRECTORY_PATTERN.match(name): - with exception_tracker(): - output_file = join(working_directory, name) - action = action_mapper.action(output_file, 'output') - client.fetch_work_dir_output(name, working_directory, output_file, action=action[0]) - downloaded_working_directory_files.append(name) - - return exception_tracker.download_failure_exceptions - - -class DownloadExceptionTracker(object): - - def __init__(self): - self.download_failure_exceptions = [] - - @contextmanager - def __call__(self): - try: - yield - except Exception as e: - self.download_failure_exceptions.append(e) - - -def __clean(download_failure_exceptions, cleanup_job, client): - failed = (len(download_failure_exceptions) > 0) - if (not failed and cleanup_job != "never") or cleanup_job == "always": - try: - client.clean() - except: - log.warn("Failed to cleanup remote LWR job") - return failed - - -def submit_job(client, client_job_description, job_config=None): - """ - """ - file_stager = FileStager(client, client_job_description, job_config) - rebuilt_command_line = file_stager.get_rewritten_command_line() - job_id = file_stager.job_id - client.launch(rebuilt_command_line, requirements=client_job_description.requirements) - return job_id - - -def _read(path): - """ - Utility method to quickly read small files (config files and tool - wrappers) into memory as bytes. - """ - input = open(path, "r", encoding="utf-8") - try: - return input.read() - finally: - input.close() - - -class ClientJobDescription(object): - """ A description of how client views job - command_line, inputs, etc.. - - **Parameters** - - command_line : str - The local command line to execute, this will be rewritten for the remote server. - config_files : list - List of Galaxy 'configfile's produced for this job. These will be rewritten and sent to remote server. - input_files : list - List of input files used by job. These will be transferred and references rewritten. - output_files : list - List of output_files produced by job. - tool_dir : str - Directory containing tool to execute (if a wrapper is used, it will be transferred to remote server). - working_directory : str - Local path created by Galaxy for running this job. - requirements : list - List of requirements for tool execution. - """ - - def __init__(self, tool, command_line, config_files, input_files, output_files, working_directory, requirements): - self.tool = tool - self.command_line = command_line - self.config_files = config_files - self.input_files = input_files - self.output_files = output_files - self.working_directory = working_directory - self.requirements = requirements - -__all__ = [submit_job, ClientJobDescription, finish_job] diff -r 18b2967240e53d77ed87c3ef5aeaec532597bd58 -r 9920d006a96618d7729ef09e0fb861b22ff530db lib/galaxy/jobs/runners/lwr_client/staging/__init__.py --- /dev/null +++ b/lib/galaxy/jobs/runners/lwr_client/staging/__init__.py @@ -0,0 +1,120 @@ +from os.path import basename +from os.path import join +from os.path import dirname +from os import sep + +from ..util import PathHelper + +COMMAND_VERSION_FILENAME = "COMMAND_VERSION" + + +class ClientJobDescription(object): + """ A description of how client views job - command_line, inputs, etc.. + + **Parameters** + + command_line : str + The local command line to execute, this will be rewritten for the remote server. + config_files : list + List of Galaxy 'configfile's produced for this job. These will be rewritten and sent to remote server. + input_files : list + List of input files used by job. These will be transferred and references rewritten. + output_files : list + List of output_files produced by job. + tool_dir : str + Directory containing tool to execute (if a wrapper is used, it will be transferred to remote server). + working_directory : str + Local path created by Galaxy for running this job. + requirements : list + List of requirements for tool execution. + version_file : str + Path to version file expected on the client server + arbitrary_files : dict() + Additional non-input, non-tool, non-config, non-working directory files + to transfer before staging job. This is most likely data indices but + can be anything. For now these are copied into staging working + directory but this will be reworked to find a better, more robust + location. + rewrite_paths : boolean + Indicates whether paths should be rewritten in job inputs (command_line + and config files) while staging files). + """ + + def __init__( + self, + tool, + command_line, + config_files, + input_files, + output_files, + working_directory, + requirements, + version_file=None, + arbitrary_files=None, + rewrite_paths=True, + ): + self.tool = tool + self.command_line = command_line + self.config_files = config_files + self.input_files = input_files + self.output_files = output_files + self.working_directory = working_directory + self.requirements = requirements + self.version_file = version_file + self.rewrite_paths = rewrite_paths + self.arbitrary_files = arbitrary_files or {} + + +class GalaxyOutputs(object): + """ Abstraction describing the output datasets EXPECTED by the Galaxy job + runner client. """ + + def __init__(self, working_directory, work_dir_outputs, output_files, version_file): + self.working_directory = working_directory + self.work_dir_outputs = work_dir_outputs + self.output_files = output_files + self.version_file = version_file + + +class LwrOutputs(object): + """ Abstraction describing the output files PRODUCED by the remote LWR + server. """ + + def __init__(self, complete_response): + # Default to None instead of [] to distinguish between empty contents and it not set + # by the LWR - older LWR instances will not set these in complete response. + self.working_directory_contents = complete_response.get("working_directory_contents", None) + self.output_directory_contents = complete_response.get("outputs_directory_contents", None) + # Older (pre-2014) LWR servers will not include separator in response, + #so this should only be used when reasoning about outputs in + # subdirectories which was not previously supported. + self.path_helper = PathHelper(complete_response.get("system_properties", {}).get("separator", sep)) + + def has_output_file(self, output_file): + if self.output_directory_contents is None: + # Legacy LWR doesn't report this, return None indicating unsure if + # output was generated. + return None + else: + return basename(output_file) in self.output_directory_contents + + def has_output_directory_listing(self): + return self.output_directory_contents is not None + + def output_extras(self, output_file): + """ + Returns dict mapping local path to remote name. + """ + if not self.has_output_directory_listing(): + # Fetching $output.extra_files_path is not supported with legacy + # LWR (pre-2014) severs. + return {} + + output_directory = dirname(output_file) + + def local_path(name): + return join(output_directory, self.path_helper.local_name(name)) + + files_directory = "%s_files%s" % (basename(output_file)[0:-len(".dat")], self.path_helper.separator) + names = filter(lambda o: o.startswith(files_directory), self.output_directory_contents) + return dict(map(lambda name: (local_path(name), name), names)) diff -r 18b2967240e53d77ed87c3ef5aeaec532597bd58 -r 9920d006a96618d7729ef09e0fb861b22ff530db lib/galaxy/jobs/runners/lwr_client/staging/down.py --- /dev/null +++ b/lib/galaxy/jobs/runners/lwr_client/staging/down.py @@ -0,0 +1,127 @@ +from os.path import join +from os.path import relpath +from re import compile +from contextlib import contextmanager + +from ..staging import COMMAND_VERSION_FILENAME +from ..action_mapper import FileActionMapper + + +from logging import getLogger +log = getLogger(__name__) + +# All output files marked with from_work_dir attributes will copied or downloaded +# this pattern picks up attiditional files to copy back - such as those +# associated with multiple outputs and metadata configuration. Set to .* to just +# copy everything +COPY_FROM_WORKING_DIRECTORY_PATTERN = compile(r"primary_.*|galaxy.json|metadata_.*|dataset_\d+\.dat|dataset_\d+_files.+") + + +def finish_job(client, cleanup_job, job_completed_normally, galaxy_outputs, lwr_outputs): + """ Responsible for downloading results from remote server and cleaning up + LWR staging directory (if needed.) + """ + download_failure_exceptions = [] + if job_completed_normally: + downloader = ResultsDownloader(client, galaxy_outputs, lwr_outputs) + download_failure_exceptions = downloader.download() + return __clean(download_failure_exceptions, cleanup_job, client) + + +class ResultsDownloader(object): + + def __init__(self, client, galaxy_outputs, lwr_outputs): + self.client = client + self.galaxy_outputs = galaxy_outputs + self.lwr_outputs = lwr_outputs + self.action_mapper = FileActionMapper(client) + self.downloaded_working_directory_files = [] + self.exception_tracker = DownloadExceptionTracker() + self.output_files = galaxy_outputs.output_files + self.working_directory_contents = lwr_outputs.working_directory_contents or [] + + def download(self): + self.__download_working_directory_outputs() + self.__download_outputs() + self.__download_version_file() + self.__download_other_working_directory_files() + return self.exception_tracker.download_failure_exceptions + + def __download_working_directory_outputs(self): + working_directory = self.galaxy_outputs.working_directory + # Fetch explicit working directory outputs. + for source_file, output_file in self.galaxy_outputs.work_dir_outputs: + name = relpath(source_file, working_directory) + remote_name = self.lwr_outputs.path_helper.remote_name(name) + with self.exception_tracker(): + action = self.action_mapper.action(output_file, 'output_workdir') + self.client.fetch_work_dir_output(remote_name, working_directory, output_file, action_type=action.action_type) + self.downloaded_working_directory_files.append(remote_name) + # Remove from full output_files list so don't try to download directly. + self.output_files.remove(output_file) + + def __download_outputs(self): + # Legacy LWR not returning list of files, iterate over the list of + # expected outputs for tool. + for output_file in self.output_files: + # Fetch ouptut directly... + with self.exception_tracker(): + action = self.action_mapper.action(output_file, 'output') + output_generated = self.lwr_outputs.has_output_file(output_file) + working_directory = self.galaxy_outputs.working_directory + if output_generated is None: + self.client.fetch_output_legacy(output_file, working_directory, action_type=action.action_type) + elif output_generated: + self.client.fetch_output(output_file, action_type=action.action_type) + + for local_path, remote_name in self.lwr_outputs.output_extras(output_file).iteritems(): + with self.exception_tracker(): + action = self.action_mapper.action(local_path, 'output') + self.client.fetch_output(path=local_path, name=remote_name, action_type=action.action_type) + # else not output generated, do not attempt download. + + def __download_version_file(self): + version_file = self.galaxy_outputs.version_file + # output_directory_contents may be none for legacy LWR servers. + lwr_output_directory_contents = (self.lwr_outputs.output_directory_contents or []) + if version_file and COMMAND_VERSION_FILENAME in lwr_output_directory_contents: + action = self.action_mapper.action(version_file, 'output') + self.client.fetch_output(path=version_file, name=COMMAND_VERSION_FILENAME, action_type=action.action_type) + + def __download_other_working_directory_files(self): + working_directory = self.galaxy_outputs.working_directory + # Fetch remaining working directory outputs of interest. + for name in self.working_directory_contents: + if name in self.downloaded_working_directory_files: + continue + if COPY_FROM_WORKING_DIRECTORY_PATTERN.match(name): + with self.exception_tracker(): + output_file = join(working_directory, self.lwr_outputs.path_helper.local_name(name)) + action = self.action_mapper.action(output_file, 'output_workdir') + self.client.fetch_work_dir_output(name, working_directory, output_file, action_type=action.action_type) + self.downloaded_working_directory_files.append(name) + + +class DownloadExceptionTracker(object): + + def __init__(self): + self.download_failure_exceptions = [] + + @contextmanager + def __call__(self): + try: + yield + except Exception as e: + self.download_failure_exceptions.append(e) + + +def __clean(download_failure_exceptions, cleanup_job, client): + failed = (len(download_failure_exceptions) > 0) + if (not failed and cleanup_job != "never") or cleanup_job == "always": + try: + client.clean() + except: + log.warn("Failed to cleanup remote LWR job") + return failed + +__all__ = [finish_job] diff -r 18b2967240e53d77ed87c3ef5aeaec532597bd58 -r 9920d006a96618d7729ef09e0fb861b22ff530db lib/galaxy/jobs/runners/lwr_client/staging/up.py --- /dev/null +++ b/lib/galaxy/jobs/runners/lwr_client/staging/up.py @@ -0,0 +1,373 @@ +from os.path import abspath, basename, join, exists +from os.path import dirname +from os.path import relpath +from os import listdir, sep +from re import findall +from io import open + +from ..staging import COMMAND_VERSION_FILENAME +from ..action_mapper import FileActionMapper +from ..action_mapper import path_type +from ..util import PathHelper +from ..util import directory_files + +from logging import getLogger +log = getLogger(__name__) + + +def submit_job(client, client_job_description, job_config=None): + """ + """ + file_stager = FileStager(client, client_job_description, job_config) + rebuilt_command_line = file_stager.get_command_line() + job_id = file_stager.job_id + client.launch(rebuilt_command_line, requirements=client_job_description.requirements) + return job_id + + +class FileStager(object): + """ + Objects of the FileStager class interact with an LWR client object to + stage the files required to run jobs on a remote LWR server. + + **Parameters** + + client : JobClient + LWR client object. + client_job_description : client_job_description + Description of client view of job to stage and execute remotely. + """ + + def __init__(self, client, client_job_description, job_config): + """ + """ + self.client = client + self.command_line = client_job_description.command_line + self.config_files = client_job_description.config_files + self.input_files = client_job_description.input_files + self.output_files = client_job_description.output_files + self.tool_id = client_job_description.tool.id + self.tool_version = client_job_description.tool.version + self.tool_dir = abspath(client_job_description.tool.tool_dir) + self.working_directory = client_job_description.working_directory + self.version_file = client_job_description.version_file + self.arbitrary_files = client_job_description.arbitrary_files + self.rewrite_paths = client_job_description.rewrite_paths + + # Setup job inputs, these will need to be rewritten before + # shipping off to remote LWR server. + self.job_inputs = JobInputs(self.command_line, self.config_files) + + self.action_mapper = FileActionMapper(client) + self.transfer_tracker = TransferTracker(client, self.action_mapper, self.job_inputs, rewrite_paths=self.rewrite_paths) + + self.__handle_setup(job_config) + self.__initialize_referenced_tool_files() + if self.rewrite_paths: + self.__initialize_referenced_arbitrary_files() + + self.__upload_tool_files() + self.__upload_input_files() + self.__upload_working_directory_files() + self.__upload_arbitrary_files() + + if self.rewrite_paths: + self.__initialize_output_file_renames() + self.__initialize_task_output_file_renames() + self.__initialize_config_file_renames() + self.__initialize_version_file_rename() + + self.__handle_rewrites() + + self.__upload_rewritten_config_files() + + def __handle_setup(self, job_config): + if not job_config: + job_config = self.client.setup(self.tool_id, self.tool_version) + + self.new_working_directory = job_config['working_directory'] + self.new_outputs_directory = job_config['outputs_directory'] + # Default configs_directory to match remote working_directory to mimic + # behavior of older LWR servers. + self.new_configs_directory = job_config.get('configs_directory', self.new_working_directory) + self.remote_separator = self.__parse_remote_separator(job_config) + self.path_helper = PathHelper(self.remote_separator) + # If remote LWR server assigned job id, use that otherwise + # just use local job_id assigned. + galaxy_job_id = self.client.job_id + self.job_id = job_config.get('job_id', galaxy_job_id) + if self.job_id != galaxy_job_id: + # Remote LWR server assigned an id different than the + # Galaxy job id, update client to reflect this. + self.client.job_id = self.job_id + + def __parse_remote_separator(self, job_config): + separator = job_config.get("system_properties", {}).get("separator", None) + if not separator: # Legacy LWR + separator = job_config["path_separator"] # Poorly named + return separator + + def __initialize_referenced_tool_files(self): + self.referenced_tool_files = self.job_inputs.find_referenced_subfiles(self.tool_dir) + + def __initialize_referenced_arbitrary_files(self): + referenced_arbitrary_path_mappers = dict() + for mapper in self.action_mapper.unstructured_mappers(): + mapper_pattern = mapper.to_pattern() + # TODO: Make more sophisticated, allow parent directories, + # grabbing sibbling files based on patterns, etc... + paths = self.job_inputs.find_pattern_references(mapper_pattern) + for path in paths: + if path not in referenced_arbitrary_path_mappers: + referenced_arbitrary_path_mappers[path] = mapper + for path, mapper in referenced_arbitrary_path_mappers.iteritems(): + action = self.action_mapper.action(path, path_type.UNSTRUCTURED, mapper) + unstructured_map = action.unstructured_map() + self.arbitrary_files.update(unstructured_map) + + def __upload_tool_files(self): + for referenced_tool_file in self.referenced_tool_files: + self.transfer_tracker.handle_transfer(referenced_tool_file, path_type.TOOL) + + def __upload_arbitrary_files(self): + for path, name in self.arbitrary_files.iteritems(): + self.transfer_tracker.handle_transfer(path, path_type.UNSTRUCTURED, name=name) + + def __upload_input_files(self): + for input_file in self.input_files: + self.__upload_input_file(input_file) + self.__upload_input_extra_files(input_file) + + def __upload_input_file(self, input_file): + if self.__stage_input(input_file): + if exists(input_file): + self.transfer_tracker.handle_transfer(input_file, path_type.INPUT) + else: + message = "LWR: __upload_input_file called on empty or missing dataset." + \ + " So such file: [%s]" % input_file + log.debug(message) + + def __upload_input_extra_files(self, input_file): + files_path = "%s_files" % input_file[0:-len(".dat")] + if exists(files_path) and self.__stage_input(files_path): + for extra_file_name in directory_files(files_path): + extra_file_path = join(files_path, extra_file_name) + remote_name = self.path_helper.remote_name(relpath(extra_file_path, dirname(files_path))) + self.transfer_tracker.handle_transfer(extra_file_path, path_type.INPUT, name=remote_name) + + def __upload_working_directory_files(self): + # Task manager stages files into working directory, these need to be + # uploaded if present. + working_directory_files = listdir(self.working_directory) if exists(self.working_directory) else [] + for working_directory_file in working_directory_files: + path = join(self.working_directory, working_directory_file) + self.transfer_tracker.handle_transfer(path, 'workdir') + + def __initialize_version_file_rename(self): + version_file = self.version_file + if version_file: + remote_path = self.path_helper.remote_join(self.new_outputs_directory, COMMAND_VERSION_FILENAME) + self.transfer_tracker.register_rewrite(version_file, remote_path, path_type.OUTPUT) + + def __initialize_output_file_renames(self): + for output_file in self.output_files: + remote_path = self.path_helper.remote_join(self.new_outputs_directory, basename(output_file)) + self.transfer_tracker.register_rewrite(output_file, remote_path, path_type.OUTPUT) + + def __initialize_task_output_file_renames(self): + for output_file in self.output_files: + name = basename(output_file) + task_file = join(self.working_directory, name) + remote_path = self.path_helper.remote_join(self.new_working_directory, name) + self.transfer_tracker.register_rewrite(task_file, remote_path, path_type.OUTPUT_WORKDIR) + + def __initialize_config_file_renames(self): + for config_file in self.config_files: + remote_path = self.path_helper.remote_join(self.new_configs_directory, basename(config_file)) + self.transfer_tracker.register_rewrite(config_file, remote_path, path_type.CONFIG) + + def __handle_rewrites(self): + """ + For each file that has been transferred and renamed, updated + command_line and configfiles to reflect that rewrite. + """ + self.transfer_tracker.rewrite_input_paths() + + def __upload_rewritten_config_files(self): + for config_file, new_config_contents in self.job_inputs.config_files.items(): + self.client.put_file(config_file, input_type='config', contents=new_config_contents) + + def get_command_line(self): + """ + Returns the rewritten version of the command line to execute suitable + for remote host. + """ + return self.job_inputs.command_line + + def __stage_input(self, file_path): + # If we have disabled path rewriting, just assume everything needs to be transferred, + # else check to ensure the file is referenced before transferring it. + return (not self.rewrite_paths) or self.job_inputs.path_referenced(file_path) + + +class JobInputs(object): + """ + Abstractions over dynamic inputs created for a given job (namely the command to + execute and created configfiles). + + **Parameters** + + command_line : str + Local command to execute for this job. (To be rewritten.) + config_files : str + Config files created for this job. (To be rewritten.) + + + >>> import tempfile + >>> tf = tempfile.NamedTemporaryFile() + >>> def setup_inputs(tf): + ... open(tf.name, "w").write(u"world /path/to/input the rest") + ... inputs = JobInputs(u"hello /path/to/input", [tf.name]) + ... return inputs + >>> inputs = setup_inputs(tf) + >>> inputs.rewrite_paths(u"/path/to/input", u'C:\\input') + >>> inputs.command_line == u'hello C:\\\\input' + True + >>> inputs.config_files[tf.name] == u'world C:\\\\input the rest' + True + >>> tf.close() + >>> tf = tempfile.NamedTemporaryFile() + >>> inputs = setup_inputs(tf) + >>> inputs.find_referenced_subfiles('/path/to') == [u'/path/to/input'] + True + >>> inputs.path_referenced('/path/to') + True + >>> inputs.path_referenced(u'/path/to') + True + >>> inputs.path_referenced('/path/to/input') + True + >>> inputs.path_referenced('/path/to/notinput') + False + >>> tf.close() + """ + + def __init__(self, command_line, config_files): + self.command_line = command_line + self.config_files = {} + for config_file in config_files or []: + config_contents = _read(config_file) + self.config_files[config_file] = config_contents + + def find_pattern_references(self, pattern): + referenced_files = set() + for input_contents in self.__items(): + referenced_files.update(findall(pattern, input_contents)) + return list(referenced_files) + + def find_referenced_subfiles(self, directory): + """ + Return list of files below specified `directory` in job inputs. Could + use more sophisticated logic (match quotes to handle spaces, handle + subdirectories, etc...). + + **Parameters** + + directory : str + Full path to directory to search. + + """ + pattern = r"(%s%s\S+)" % (directory, sep) + return self.find_pattern_references(pattern) + + def path_referenced(self, path): + pattern = r"%s" % path + found = False + for input_contents in self.__items(): + if findall(pattern, input_contents): + found = True + break + return found + + def rewrite_paths(self, local_path, remote_path): + """ + Rewrite references to `local_path` with `remote_path` in job inputs. + """ + self.__rewrite_command_line(local_path, remote_path) + self.__rewrite_config_files(local_path, remote_path) + + def __rewrite_command_line(self, local_path, remote_path): + self.command_line = self.command_line.replace(local_path, remote_path) + + def __rewrite_config_files(self, local_path, remote_path): + for config_file, contents in self.config_files.items(): + self.config_files[config_file] = contents.replace(local_path, remote_path) + + def __items(self): + items = [self.command_line] + items.extend(self.config_files.values()) + return items + + +class TransferTracker(object): + + def __init__(self, client, action_mapper, job_inputs, rewrite_paths): + self.client = client + self.action_mapper = action_mapper + + self.job_inputs = job_inputs + self.rewrite_paths = rewrite_paths + self.file_renames = {} + + def handle_transfer(self, path, type, name=None, contents=None): + if contents: + # If contents loaded in memory, no need to write out file and copy, + # just transfer. + action_type = 'transfer' + else: + if not exists(path): + message = "handle_tranfer called on non-existent file - [%s]" % path + log.warn(message) + raise Exception(message) + action_type = self.__action(path, type).action_type + + if action_type in ['transfer', 'copy']: + response = self.client.put_file(path, type, name=name, contents=contents) + register = self.rewrite_paths or type == 'tool' # Even if inputs not rewritten, tool must be. + if register: + self.register_rewrite(path, response['path'], type, force=True) + elif action_type == 'none': + # No action for this file. + pass + else: + raise Exception("Unknown action type (%s) encountered for path (%s)" % (action_type, path)) + + def register_rewrite(self, local_path, remote_path, type, force=False): + action = self.__action(local_path, type) + if action.action_type in ['transfer', 'copy'] or force: + self.file_renames[local_path] = remote_path + + def rewrite_input_paths(self): + """ + For each file that has been transferred and renamed, updated + command_line and configfiles to reflect that rewrite. + """ + for local_path, remote_path in self.file_renames.items(): + self.job_inputs.rewrite_paths(local_path, remote_path) + + def __action(self, path, type): + return self.action_mapper.action(path, type) + + +def _read(path): + """ + Utility method to quickly read small files (config files and tool + wrappers) into memory as bytes. + """ + input = open(path, "r", encoding="utf-8") + try: + return input.read() + finally: + input.close() + + +__all__ = [submit_job] diff -r 18b2967240e53d77ed87c3ef5aeaec532597bd58 -r 9920d006a96618d7729ef09e0fb861b22ff530db lib/galaxy/jobs/runners/lwr_client/util.py --- a/lib/galaxy/jobs/runners/lwr_client/util.py +++ b/lib/galaxy/jobs/runners/lwr_client/util.py @@ -1,5 +1,85 @@ from threading import Lock, Event from weakref import WeakValueDictionary +from os import walk +from os import curdir +from os.path import relpath +from os.path import join +import os.path +import hashlib + + +def unique_path_prefix(path): + m = hashlib.md5() + m.update(path) + return m.hexdigest() + + +def directory_files(directory): + """ + + >>> from tempfile import mkdtemp + >>> from shutil import rmtree + >>> from os.path import join + >>> from os import makedirs + >>> tempdir = mkdtemp() + >>> with open(join(tempdir, "moo"), "w") as f: pass + >>> directory_files(tempdir) + ['moo'] + >>> subdir = join(tempdir, "cow", "sub1") + >>> makedirs(subdir) + >>> with open(join(subdir, "subfile1"), "w") as f: pass + >>> with open(join(subdir, "subfile2"), "w") as f: pass + >>> sorted(directory_files(tempdir)) + ['cow/sub1/subfile1', 'cow/sub1/subfile2', 'moo'] + >>> rmtree(tempdir) + """ + contents = [] + for path, _, files in walk(directory): + relative_path = relpath(path, directory) + for name in files: + # Return file1.txt, dataset_1_files/image.png, etc... don't + # include . in path. + if relative_path != curdir: + contents.append(join(relative_path, name)) + else: + contents.append(name) + return contents + + +class PathHelper(object): + ''' + + >>> import posixpath + >>> # Forcing local path to posixpath because LWR designed to be used with + >>> # posix client. + >>> posix_path_helper = PathHelper("/", local_path_module=posixpath) + >>> windows_slash = "\\\\" + >>> len(windows_slash) + 1 + >>> nt_path_helper = PathHelper(windows_slash, local_path_module=posixpath) + >>> posix_path_helper.remote_name("moo/cow") + 'moo/cow' + >>> nt_path_helper.remote_name("moo/cow") + 'moo\\\\cow' + >>> posix_path_helper.local_name("moo/cow") + 'moo/cow' + >>> nt_path_helper.local_name("moo\\\\cow") + 'moo/cow' + ''' + + def __init__(self, separator, local_path_module=os.path): + self.separator = separator + self.local_join = local_path_module.join + self.local_sep = local_path_module.sep + + def remote_name(self, local_name): + return self.remote_join(*local_name.split(self.local_sep)) + + def local_name(self, remote_name): + return self.local_join(*remote_name.split(self.separator)) + + def remote_join(self, *args): + return self.separator.join(args) class TransferEventManager(object): https://bitbucket.org/galaxy/galaxy-central/commits/6da672723d5c/ Changeset: 6da672723d5c User: jmchilton Date: 2014-01-03 00:50:11 Summary: Fix bug in from_work_dir location checking... ... would only be exhibited if using LWR + 'remote_metadata'. Affected #: 1 file diff -r 9920d006a96618d7729ef09e0fb861b22ff530db -r 6da672723d5c6e55347abb57de399460c39b5d38 lib/galaxy/jobs/runners/__init__.py --- a/lib/galaxy/jobs/runners/__init__.py +++ b/lib/galaxy/jobs/runners/__init__.py @@ -179,7 +179,7 @@ # TODO: move instead of copy to save time? source_file = os.path.join( job_working_directory, hda_tool_output.from_work_dir ) destination = job_wrapper.get_output_destination( output_paths[ dataset.dataset_id ] ) - if in_directory( source_file, job_wrapper.working_directory ): + if in_directory( source_file, job_working_directory ): output_pairs.append( ( source_file, destination ) ) log.debug( "Copying %s to %s as directed by from_work_dir" % ( source_file, destination ) ) else: https://bitbucket.org/galaxy/galaxy-central/commits/4529b941cf27/ Changeset: 4529b941cf27 User: jmchilton Date: 2014-01-03 00:50:12 Summary: PEP-8 fixes for local job runner. Affected #: 1 file diff -r 6da672723d5c6e55347abb57de399460c39b5d38 -r 4529b941cf2751323d2a5cb8875dfe8bf3ccb02c lib/galaxy/jobs/runners/local.py --- a/lib/galaxy/jobs/runners/local.py +++ b/lib/galaxy/jobs/runners/local.py @@ -18,11 +18,13 @@ __all__ = [ 'LocalJobRunner' ] + class LocalJobRunner( BaseJobRunner ): """ Job runner backed by a finite pool of worker threads. FIFO scheduling """ runner_name = "LocalRunner" + def __init__( self, app, nworkers ): """Start the job runner """ @@ -74,13 +76,13 @@ log.debug( '(%s) executing: %s' % ( job_id, command_line ) ) stdout_file = tempfile.NamedTemporaryFile( suffix='_stdout', dir=job_wrapper.working_directory ) stderr_file = tempfile.NamedTemporaryFile( suffix='_stderr', dir=job_wrapper.working_directory ) - proc = subprocess.Popen( args = command_line, - shell = True, - cwd = job_wrapper.working_directory, - stdout = stdout_file, - stderr = stderr_file, - env = self._environ, - preexec_fn = os.setpgrp ) + proc = subprocess.Popen( args=command_line, + shell=True, + cwd=job_wrapper.working_directory, + stdout=stdout_file, + stderr=stderr_file, + env=self._environ, + preexec_fn=os.setpgrp ) job_wrapper.set_job_destination(job_wrapper.job_destination, proc.pid) job_wrapper.change_state( model.Job.states.RUNNING ) job_start = datetime.datetime.now() @@ -89,7 +91,7 @@ while proc.poll() is None: i += 1 if (i % 20) == 0: - msg = job_wrapper.check_limits(runtime = datetime.datetime.now() - job_start) + msg = job_wrapper.check_limits(runtime=datetime.datetime.now() - job_start) if msg is not None: job_wrapper.fail(msg) log.debug('(%s) Terminating process group' % job_id) @@ -122,7 +124,7 @@ #if our local job has JobExternalOutputMetadata associated, then our primary job has to have already finished job_ext_output_metadata = job.get_external_output_metadata() if job_ext_output_metadata: - pid = job_ext_output_metadata[0].job_runner_external_pid #every JobExternalOutputMetadata has a pid set, we just need to take from one of them + pid = job_ext_output_metadata[0].job_runner_external_pid # every JobExternalOutputMetadata has a pid set, we just need to take from one of them else: pid = job.get_job_runner_external_id() if pid in [ None, '' ]: @@ -140,14 +142,14 @@ return # give up sleep( 2 ) if not self._check_pid( pid ): - log.debug( "stop_job(): %s: PID %d successfully killed with signal %d" %( job.get_id(), pid, sig ) ) + log.debug( "stop_job(): %s: PID %d successfully killed with signal %d" % ( job.get_id(), pid, sig ) ) return else: - log.warning( "stop_job(): %s: PID %d refuses to die after signaling TERM/KILL" %( job.get_id(), pid ) ) + log.warning( "stop_job(): %s: PID %d refuses to die after signaling TERM/KILL" % ( job.get_id(), pid ) ) def recover( self, job, job_wrapper ): # local jobs can't be recovered - job_wrapper.change_state( model.Job.states.ERROR, info = "This job was killed when Galaxy was restarted. Please retry the job." ) + job_wrapper.change_state( model.Job.states.ERROR, info="This job was killed when Galaxy was restarted. Please retry the job." ) def _check_pid( self, pid ): try: @@ -157,7 +159,7 @@ if e.errno == errno.ESRCH: log.debug( "_check_pid(): PID %d is dead" % pid ) else: - log.warning( "_check_pid(): Got errno %s when attempting to check PID %d: %s" %( errno.errorcode[e.errno], pid, e.strerror ) ) + log.warning( "_check_pid(): Got errno %s when attempting to check PID %d: %s" % ( errno.errorcode[e.errno], pid, e.strerror ) ) return False def _terminate( self, proc ): @@ -165,4 +167,4 @@ sleep( 1 ) if proc.poll() is None: os.killpg( proc.pid, 9 ) - return proc.wait() # reap + return proc.wait() # reap https://bitbucket.org/galaxy/galaxy-central/commits/18e6f346d6b1/ Changeset: 18e6f346d6b1 User: jmchilton Date: 2014-01-03 00:50:12 Summary: PEP-8 fixes for jobs/__init__.py Affected #: 1 file diff -r 4529b941cf2751323d2a5cb8875dfe8bf3ccb02c -r 18e6f346d6b13521c9757989bfb7fc06791e561b lib/galaxy/jobs/__init__.py --- a/lib/galaxy/jobs/__init__.py +++ b/lib/galaxy/jobs/__init__.py @@ -58,6 +58,7 @@ if self.tags is not None: self['tags'] = [ x.strip() for x in self.tags.split(',') ] + class JobToolConfiguration( Bunch ): """ Provides details on what handler and destination a tool should use @@ -71,12 +72,14 @@ self['params'] = dict() super(JobToolConfiguration, self).__init__(**kwds) + class JobConfiguration( object ): """A parser and interface to advanced job management features. These features are configured in the job configuration, by default, ``job_conf.xml`` """ DEFAULT_NWORKERS = 4 + def __init__(self, app): """Parse the job configuration XML. """ @@ -172,17 +175,17 @@ self.tools[id].append(JobToolConfiguration(**dict(tool.items()))) self.tools[id][-1]['params'] = self.__get_params(tool) - types = dict(registered_user_concurrent_jobs = int, - anonymous_user_concurrent_jobs = int, - walltime = str, - output_size = int) + types = dict(registered_user_concurrent_jobs=int, + anonymous_user_concurrent_jobs=int, + walltime=str, + output_size=int) - self.limits = Bunch(registered_user_concurrent_jobs = None, - anonymous_user_concurrent_jobs = None, - walltime = None, - walltime_delta = None, - output_size = None, - concurrent_jobs = {}) + self.limits = Bunch(registered_user_concurrent_jobs=None, + anonymous_user_concurrent_jobs=None, + walltime=None, + walltime_delta=None, + output_size=None, + concurrent_jobs={}) # Parse job limits limits = root.find('limits') @@ -253,12 +256,12 @@ self.default_destination_id = self.app.config.default_cluster_job_runner # Set the job limits - self.limits = Bunch(registered_user_concurrent_jobs = self.app.config.registered_user_job_limit, - anonymous_user_concurrent_jobs = self.app.config.anonymous_user_job_limit, - walltime = self.app.config.job_walltime, - walltime_delta = self.app.config.job_walltime_delta, - output_size = self.app.config.output_size_limit, - concurrent_jobs = {}) + self.limits = Bunch(registered_user_concurrent_jobs=self.app.config.registered_user_job_limit, + anonymous_user_concurrent_jobs=self.app.config.anonymous_user_job_limit, + walltime=self.app.config.job_walltime, + walltime_delta=self.app.config.job_walltime_delta, + output_size=self.app.config.output_size_limit, + concurrent_jobs={}) log.debug('Done loading job configuration') @@ -533,6 +536,7 @@ else: log.warning("Legacy destination with id '%s' could not be converted: Unknown runner plugin: %s" % (id, destination.runner)) + class JobWrapper( object ): """ Wraps a 'model.Job' with convenience methods for running processes and @@ -627,7 +631,7 @@ Prepare the job to run by creating the working directory and the config files. """ - self.sa_session.expunge_all() #this prevents the metadata reverting that has been seen in conjunction with the PBS job runner + self.sa_session.expunge_all() # this prevents the metadata reverting that has been seen in conjunction with the PBS job runner if not os.path.exists( self.working_directory ): os.mkdir( self.working_directory ) @@ -686,7 +690,7 @@ self.command_line = self.tool.build_command_line( param_dict ) # FIXME: for now, tools get Galaxy's lib dir in their path if self.command_line and self.command_line.startswith( 'python' ): - self.galaxy_lib_dir = os.path.abspath( "lib" ) # cwd = galaxy root + self.galaxy_lib_dir = os.path.abspath( "lib" ) # cwd = galaxy root # Shell fragment to inject dependencies self.dependency_shell_commands = self.tool.build_dependency_shell_commands() # We need command_line persisted to the db in order for Galaxy to re-queue the job @@ -719,7 +723,7 @@ job.traceback = traceback.format_exc() # Get the exception and let the tool attempt to generate # a better message - etype, evalue, tb = sys.exc_info() + etype, evalue, tb = sys.exc_info() m = self.tool.handle_job_failure_exception( evalue ) if m: message = m @@ -790,7 +794,7 @@ job.state = job.states.PAUSED self.sa_session.add( job ) - def change_state( self, state, info = False ): + def change_state( self, state, info=False ): job = self.get_job() self.sa_session.refresh( job ) for dataset_assoc in job.output_datasets + job.output_library_datasets: @@ -816,7 +820,7 @@ log.warning('set_runner() is deprecated, use set_job_destination()') self.set_job_destination(self.job_destination, external_id) - def set_job_destination(self, job_destination, external_id=None ): + def set_job_destination( self, job_destination, external_id=None ): """ Persist job destination params in the database for recovery. @@ -896,11 +900,11 @@ job.state = final_job_state return self.fail( "Job %s's output dataset(s) could not be read" % job.id ) - job_context = ExpressionContext( dict( stdout = job.stdout, stderr = job.stderr ) ) + job_context = ExpressionContext( dict( stdout=job.stdout, stderr=job.stderr ) ) for dataset_assoc in job.output_datasets + job.output_library_datasets: context = self.get_dataset_finish_context( job_context, dataset_assoc.dataset.dataset ) #should this also be checking library associations? - can a library item be added from a history before the job has ended? - lets not allow this to occur - for dataset in dataset_assoc.dataset.dataset.history_associations + dataset_assoc.dataset.dataset.library_associations: #need to update all associated output hdas, i.e. history was shared with job running + for dataset in dataset_assoc.dataset.dataset.history_associations + dataset_assoc.dataset.dataset.library_associations: # need to update all associated output hdas, i.e. history was shared with job running trynum = 0 while trynum < self.app.config.retry_job_output_collection: try: @@ -911,9 +915,9 @@ except ( OSError, ObjectNotFound ), e: trynum += 1 log.warning( 'Error accessing %s, will retry: %s', dataset.dataset.file_name, e ) - time.sleep( 2 ) + time.sleep( 2 ) dataset.blurb = 'done' - dataset.peek = 'no peek' + dataset.peek = 'no peek' dataset.info = (dataset.info or '') if context['stdout'].strip(): #Ensure white space between entries @@ -941,7 +945,7 @@ #it would be quicker to just copy the metadata from the originating output dataset, #but somewhat trickier (need to recurse up the copied_from tree), for now we'll call set_meta() if ( not self.external_output_metadata.external_metadata_set_successfully( dataset, self.sa_session ) and self.app.config.retry_metadata_internally ): - dataset.datatype.set_meta( dataset, overwrite = False ) #call datatype.set_meta directly for the initial set_meta call during dataset creation + dataset.datatype.set_meta( dataset, overwrite=False ) # call datatype.set_meta directly for the initial set_meta call during dataset creation elif not self.external_output_metadata.external_metadata_set_successfully( dataset, self.sa_session ) and job.states.ERROR != final_job_state: dataset._state = model.Dataset.states.FAILED_METADATA else: @@ -1011,19 +1015,22 @@ out_data = dict( [ ( da.name, da.dataset ) for da in job.output_datasets ] ) inp_data.update( [ ( da.name, da.dataset ) for da in job.input_library_datasets ] ) out_data.update( [ ( da.name, da.dataset ) for da in job.output_library_datasets ] ) - param_dict = dict( [ ( p.name, p.value ) for p in job.parameters ] ) # why not re-use self.param_dict here? ##dunno...probably should, this causes tools.parameters.basic.UnvalidatedValue to be used in following methods instead of validated and transformed values during i.e. running workflows + param_dict = dict( [ ( p.name, p.value ) for p in job.parameters ] ) # why not re-use self.param_dict here? ##dunno...probably should, this causes tools.parameters.basic.UnvalidatedValue to be used in following methods instead of validated and transformed values during i.e. running workflows param_dict = self.tool.params_from_strings( param_dict, self.app ) # Check for and move associated_files self.tool.collect_associated_files(out_data, self.working_directory) gitd = self.sa_session.query( model.GenomeIndexToolData ).filter_by( job=job ).first() if gitd: - self.tool.collect_associated_files({'' : gitd}, self.working_directory) + self.tool.collect_associated_files({'': gitd}, self.working_directory) # Create generated output children and primary datasets and add to param_dict - collected_datasets = {'children':self.tool.collect_child_datasets(out_data, self.working_directory),'primary':self.tool.collect_primary_datasets(out_data, self.working_directory)} - param_dict.update({'__collected_datasets__':collected_datasets}) + collected_datasets = { + 'children': self.tool.collect_child_datasets(out_data, self.working_directory), + 'primary': self.tool.collect_primary_datasets(out_data, self.working_directory) + } + param_dict.update({'__collected_datasets__': collected_datasets}) # Certain tools require tasks to be completed after job execution # ( this used to be performed in the "exec_after_process" hook, but hooks are deprecated ). - self.tool.exec_after_process( self.queue.app, inp_data, out_data, param_dict, job = job ) + self.tool.exec_after_process( self.queue.app, inp_data, out_data, param_dict, job=job ) # Call 'exec_after_process' hook self.tool.call_hook( 'exec_after_process', self.queue.app, inp_data=inp_data, out_data=out_data, param_dict=param_dict, @@ -1114,7 +1121,7 @@ def get_input_fnames( self ): job = self.get_job() filenames = [] - for da in job.input_datasets + job.input_library_datasets: #da is JobToInputDatasetAssociation object + for da in job.input_datasets + job.input_library_datasets: # da is JobToInputDatasetAssociation object if da.dataset: filenames.extend(self.get_input_dataset_fnames(da.dataset)) return filenames @@ -1136,11 +1143,12 @@ def compute_outputs( self ) : class DatasetPath( object ): - def __init__( self, dataset_id, real_path, false_path = None, mutable = True ): + def __init__( self, dataset_id, real_path, false_path=None, mutable=True ): self.dataset_id = dataset_id self.real_path = real_path self.false_path = false_path self.mutable = mutable + def __str__( self ): if self.false_path is None: return self.real_path @@ -1157,13 +1165,13 @@ self.output_hdas_and_paths = {} for name, hda in [ ( da.name, da.dataset ) for da in job.output_datasets + job.output_library_datasets ]: false_path = os.path.abspath( os.path.join( self.working_directory, "galaxy_dataset_%d.dat" % hda.dataset.id ) ) - dsp = DatasetPath( hda.dataset.id, hda.dataset.file_name, false_path, mutable = hda.dataset.external_filename is None ) + dsp = DatasetPath( hda.dataset.id, hda.dataset.file_name, false_path, mutable=hda.dataset.external_filename is None ) self.output_paths.append( dsp ) self.output_hdas_and_paths[name] = hda, dsp if special: false_path = os.path.abspath( os.path.join( self.working_directory, "galaxy_dataset_%d.dat" % special.dataset.id ) ) else: - results = [ ( da.name, da.dataset, DatasetPath( da.dataset.dataset.id, da.dataset.file_name, mutable = da.dataset.dataset.external_filename is None ) ) for da in job.output_datasets + job.output_library_datasets ] + results = [ ( da.name, da.dataset, DatasetPath( da.dataset.dataset.id, da.dataset.file_name, mutable=da.dataset.dataset.external_filename is None ) ) for da in job.output_datasets + job.output_library_datasets ] self.output_paths = [t[2] for t in results] self.output_hdas_and_paths = dict([(t[0], t[1:]) for t in results]) if special: @@ -1238,13 +1246,13 @@ datatypes_config = self.app.datatypes_registry.integrated_datatypes_configs return self.external_output_metadata.setup_external_metadata( [ output_dataset_assoc.dataset for output_dataset_assoc in job.output_datasets ], self.sa_session, - exec_dir = exec_dir, - tmp_dir = tmp_dir, - dataset_files_path = dataset_files_path, - config_root = config_root, - config_file = config_file, - datatypes_config = datatypes_config, - job_metadata = os.path.join( self.working_directory, TOOL_PROVIDED_JOB_METADATA_FILE ), + exec_dir=exec_dir, + tmp_dir=tmp_dir, + dataset_files_path=dataset_files_path, + config_root=config_root, + config_file=config_file, + datatypes_config=datatypes_config, + job_metadata=os.path.join( self.working_directory, TOOL_PROVIDED_JOB_METADATA_FILE ), **kwds ) @property @@ -1318,7 +1326,7 @@ just copy these files directly to the ulimate destination. """ return output_path - + @property def requires_setting_metadata( self ): if self.tool: @@ -1391,7 +1399,7 @@ out_data.update( [ ( da.name, da.dataset ) for da in job.output_library_datasets ] ) # DBTODO New method for generating command line for a task? # These can be passed on the command line if wanted as $userId $userEmail - if job.history and job.history.user: # check for anonymous user! + if job.history and job.history.user: # check for anonymous user! userId = '%d' % job.history.user.id userEmail = str(job.history.user.email) else: @@ -1430,7 +1438,7 @@ self.command_line = self.command_line.replace(k, v) # FIXME: for now, tools get Galaxy's lib dir in their path if self.command_line and self.command_line.startswith( 'python' ): - self.galaxy_lib_dir = os.path.abspath( "lib" ) # cwd = galaxy root + self.galaxy_lib_dir = os.path.abspath( "lib" ) # cwd = galaxy root # Shell fragment to inject dependencies self.dependency_shell_commands = self.tool.build_dependency_shell_commands() # We need command_line persisted to the db in order for Galaxy to re-queue the job @@ -1452,7 +1460,7 @@ self.status = 'error' # How do we want to handle task failure? Fail the job and let it clean up? - def change_state( self, state, info = False ): + def change_state( self, state, info=False ): task = self.get_task() self.sa_session.refresh( task ) if info: @@ -1568,11 +1576,14 @@ """ def put( self, *args, **kwargs ): return + def put_stop( self, *args ): return + def shutdown( self ): return + class ParallelismInfo(object): """ Stores the information (if any) for running multiple instances of the tool in parallel @@ -1584,7 +1595,7 @@ items = tag.iteritems() else: items = tag.attrib.items() - self.attributes = dict([item for item in items if item[0] != 'method' ]) + self.attributes = dict( [ item for item in items if item[ 0 ] != 'method' ]) if len(self.attributes) == 0: # legacy basic mode - provide compatible defaults self.attributes['split_size'] = 20 https://bitbucket.org/galaxy/galaxy-central/commits/ebbd49858ce9/ Changeset: ebbd49858ce9 User: jmchilton Date: 2014-01-03 00:50:13 Summary: PEP-8 fixes task runner. Affected #: 1 file diff -r 18e6f346d6b13521c9757989bfb7fc06791e561b -r ebbd49858ce94bd99805622456268e84bcbc5d7f lib/galaxy/jobs/runners/tasks.py --- a/lib/galaxy/jobs/runners/tasks.py +++ b/lib/galaxy/jobs/runners/tasks.py @@ -1,11 +1,10 @@ import logging import subprocess -from Queue import Queue -import threading from galaxy import model -import os, errno +import os +import errno from time import sleep from galaxy.jobs import TaskWrapper @@ -15,11 +14,13 @@ __all__ = [ 'TaskedJobRunner' ] + class TaskedJobRunner( BaseJobRunner ): """ Job runner backed by a finite pool of worker threads. FIFO scheduling """ runner_name = "TaskRunner" + def __init__( self, app, nworkers ): """Start the job runner with 'nworkers' worker threads""" super( TaskedJobRunner, self ).__init__( app, nworkers ) @@ -124,14 +125,14 @@ #this is terminate-able when output dataset/job is deleted #so that long running set_meta()s can be canceled without having to reboot the server if job_wrapper.get_state() not in [ model.Job.states.ERROR, model.Job.states.DELETED ] and job_wrapper.output_paths: - external_metadata_script = job_wrapper.setup_external_metadata( output_fnames = job_wrapper.get_output_fnames(), - set_extension = True, - kwds = { 'overwrite' : False } ) #we don't want to overwrite metadata that was copied over in init_meta(), as per established behavior + external_metadata_script = job_wrapper.setup_external_metadata( output_fnames=job_wrapper.get_output_fnames(), + set_extension=True, + kwds={ 'overwrite' : False } ) # we don't want to overwrite metadata that was copied over in init_meta(), as per established behavior log.debug( 'executing external set_meta script for job %d: %s' % ( job_wrapper.job_id, external_metadata_script ) ) - external_metadata_proc = subprocess.Popen( args = external_metadata_script, - shell = True, - env = os.environ, - preexec_fn = os.setpgrp ) + external_metadata_proc = subprocess.Popen( args=external_metadata_script, + shell=True, + env=os.environ, + preexec_fn=os.setpgrp ) job_wrapper.external_output_metadata.set_job_runner_external_pid( external_metadata_proc.pid, self.sa_session ) external_metadata_proc.wait() log.debug( 'execution of external set_meta finished for job %d' % job_wrapper.job_id ) @@ -160,7 +161,7 @@ else: #if our local job has JobExternalOutputMetadata associated, then our primary job has to have already finished if job.external_output_metadata: - pid = job.external_output_metadata[0].job_runner_external_pid #every JobExternalOutputMetadata has a pid set, we just need to take from one of them + pid = job.external_output_metadata[0].job_runner_external_pid # every JobExternalOutputMetadata has a pid set, we just need to take from one of them else: pid = job.job_runner_external_id if pid in [ None, '' ]: @@ -170,7 +171,7 @@ def recover( self, job, job_wrapper ): # DBTODO Task Recovery, this should be possible. - job_wrapper.change_state( model.Job.states.ERROR, info = "This job was killed when Galaxy was restarted. Please retry the job." ) + job_wrapper.change_state( model.Job.states.ERROR, info="This job was killed when Galaxy was restarted. Please retry the job." ) def _cancel_job( self, job_wrapper, task_wrappers ): """ @@ -227,7 +228,7 @@ if e.errno == errno.ESRCH: log.debug( "_check_pid(): PID %d is dead" % pid ) else: - log.warning( "_check_pid(): Got errno %s when attempting to check PID %d: %s" %( errno.errorcode[e.errno], pid, e.strerror ) ) + log.warning( "_check_pid(): Got errno %s when attempting to check PID %d: %s" % ( errno.errorcode[e.errno], pid, e.strerror ) ) return False def _stop_pid( self, pid, job_id ): @@ -254,7 +255,7 @@ # avoid a two-second overhead using some other asynchronous method. sleep( 2 ) if not self._check_pid( pid ): - log.debug( "_stop_pid(): %s: PID %d successfully killed with signal %d" %( job_id, pid, sig ) ) + log.debug( "_stop_pid(): %s: PID %d successfully killed with signal %d" % ( job_id, pid, sig ) ) return else: - log.warning( "_stop_pid(): %s: PID %d refuses to die after signaling TERM/KILL" %( job_id, pid ) ) + log.warning( "_stop_pid(): %s: PID %d refuses to die after signaling TERM/KILL" % ( job_id, pid ) ) https://bitbucket.org/galaxy/galaxy-central/commits/6facd407e247/ Changeset: 6facd407e247 User: jmchilton Date: 2014-01-03 00:50:14 Summary: PEP-8 fixes for set_metadata.py. Affected #: 1 file diff -r ebbd49858ce94bd99805622456268e84bcbc5d7f -r 6facd407e2479f56fac8530a2f413226d497dfb2 scripts/set_metadata.py --- a/scripts/set_metadata.py +++ b/scripts/set_metadata.py @@ -10,7 +10,9 @@ logging.basicConfig() log = logging.getLogger( __name__ ) -import os, sys, cPickle +import os +import sys +import cPickle # ensure supported version from check_python import check_python try: @@ -19,15 +21,15 @@ sys.exit(1) new_path = [ os.path.join( os.getcwd(), "lib" ) ] -new_path.extend( sys.path[1:] ) # remove scripts/ from the path +new_path.extend( sys.path[ 1: ] ) # remove scripts/ from the path sys.path = new_path from galaxy import eggs import pkg_resources pkg_resources.require("simplejson") import simplejson -import galaxy.model.mapping #need to load this before we unpickle, in order to setup properties assigned by the mappers -galaxy.model.Job() #this looks REAL stupid, but it is REQUIRED in order for SA to insert parameters into the classes defined by the mappers --> it appears that instantiating ANY mapper'ed class would suffice here +import galaxy.model.mapping # need to load this before we unpickle, in order to setup properties assigned by the mappers +galaxy.model.Job() # this looks REAL stupid, but it is REQUIRED in order for SA to insert parameters into the classes defined by the mappers --> it appears that instantiating ANY mapper'ed class would suffice here from galaxy.util import stringify_dictionary_keys from galaxy.util.json import from_json_string from sqlalchemy.orm import clear_mappers @@ -35,6 +37,7 @@ from galaxy import config import ConfigParser + def __main__(): file_path = sys.argv.pop( 1 ) tmp_dir = sys.argv.pop( 1 ) @@ -45,7 +48,7 @@ config_file_name = sys.argv.pop( 1 ) if not os.path.isabs( config_file_name ): config_file_name = os.path.join( config_root, config_file_name ) - + # Set up reference to object store # First, read in the main config file for Galaxy; this is required because # the object store configuration is stored there @@ -65,7 +68,7 @@ universe_config = config.Configuration(**conf_dict) object_store = build_object_store_from_config(universe_config) galaxy.model.Dataset.object_store = object_store - + # Set up datatypes registry datatypes_config = sys.argv.pop( 1 ) datatypes_registry = galaxy.datatypes.registry.Registry() @@ -89,32 +92,32 @@ filename_out = fields.pop( 0 ) filename_results_code = fields.pop( 0 ) dataset_filename_override = fields.pop( 0 ) - #Need to be careful with the way that these parameters are populated from the filename splitting, - #because if a job is running when the server is updated, any existing external metadata command-lines + # Need to be careful with the way that these parameters are populated from the filename splitting, + # because if a job is running when the server is updated, any existing external metadata command-lines #will not have info about the newly added override_metadata file if fields: override_metadata = fields.pop( 0 ) else: override_metadata = None try: - dataset = cPickle.load( open( filename_in ) ) #load DatasetInstance + dataset = cPickle.load( open( filename_in ) ) # load DatasetInstance if dataset_filename_override: dataset.dataset.external_filename = dataset_filename_override if ext_override.get( dataset.dataset.id, None ): dataset.extension = ext_override[ dataset.dataset.id ] - #Metadata FileParameter types may not be writable on a cluster node, and are therefore temporarily substituted with MetadataTempFiles + # Metadata FileParameter types may not be writable on a cluster node, and are therefore temporarily substituted with MetadataTempFiles if override_metadata: override_metadata = simplejson.load( open( override_metadata ) ) for metadata_name, metadata_file_override in override_metadata: if galaxy.datatypes.metadata.MetadataTempFile.is_JSONified_value( metadata_file_override ): metadata_file_override = galaxy.datatypes.metadata.MetadataTempFile.from_JSON( metadata_file_override ) setattr( dataset.metadata, metadata_name, metadata_file_override ) - kwds = stringify_dictionary_keys( simplejson.load( open( filename_kwds ) ) )#load kwds; need to ensure our keywords are not unicode + kwds = stringify_dictionary_keys( simplejson.load( open( filename_kwds ) ) ) # load kwds; need to ensure our keywords are not unicode dataset.datatype.set_meta( dataset, **kwds ) - dataset.metadata.to_JSON_dict( filename_out ) # write out results of set_meta - simplejson.dump( ( True, 'Metadata has been set successfully' ), open( filename_results_code, 'wb+' ) ) #setting metadata has succeeded + dataset.metadata.to_JSON_dict( filename_out ) # write out results of set_meta + simplejson.dump( ( True, 'Metadata has been set successfully' ), open( filename_results_code, 'wb+' ) ) # setting metadata has succeeded except Exception, e: - simplejson.dump( ( False, str( e ) ), open( filename_results_code, 'wb+' ) ) #setting metadata has failed somehow + simplejson.dump( ( False, str( e ) ), open( filename_results_code, 'wb+' ) ) # setting metadata has failed somehow clear_mappers() # Shut down any additional threads that might have been created via the ObjectStore object_store.shutdown() https://bitbucket.org/galaxy/galaxy-central/commits/de7ed9e75dda/ Changeset: de7ed9e75dda User: jmchilton Date: 2014-01-03 00:50:14 Summary: Work on unit test path problems. Move test/unit/tool_shed to test/unit/tool_shed_unit_tests. Affected #: 5 files diff -r 6facd407e2479f56fac8530a2f413226d497dfb2 -r de7ed9e75dda6b8d9a364a56f9ac29a085a9e3cc test/unit/tool_shed/test_fabric_util.py --- a/test/unit/tool_shed/test_fabric_util.py +++ /dev/null @@ -1,45 +0,0 @@ -from contextlib import contextmanager -from tool_shed.galaxy_install.tool_dependencies import fabric_util - - -def test_env_file_builder(): - install_dir = "/opt/galaxy/dependencies/foo/" - env_file_builder = fabric_util.EnvFileBuilder( install_dir ) - added_lines = [] - mock_return = dict(value=0) - - def mock_file_append( text, file_path, **kwds ): - added_lines.append(text) - return mock_return["value"] - - with __mock_fabric_util_method("file_append", mock_file_append): - env_file_builder.append_line( name="PATH", action="prepend_to", value="/usr/bin/local/R" ) - assert added_lines == [ "PATH=/usr/bin/local/R:$PATH; export PATH" ] - assert env_file_builder.return_code == 0 - - # Reset mock lines - del added_lines[:] - # Next time file_append will fail - mock_return["value"] = 1 - - env_file_builder.append_line( action="source", value="/usr/bin/local/R/env.sh" ) - assert added_lines == [ "if [ -f /usr/bin/local/R/env.sh ] ; then . /usr/bin/local/R/env.sh ; fi" ] - # Check failure - assert env_file_builder.return_code == 1 - - mock_return["value"] = 0 - env_file_builder.append_line( name="LD_LIBRARY_PATH", action="append_to", value="/usr/bin/local/R/lib" ) - # Verify even though last append succeeded, previous failure still recorded. - assert env_file_builder.return_code == 1 - - -## Poor man's mocking. Need to get a real mocking library as real Galaxy development -## dependnecy. -@contextmanager -def __mock_fabric_util_method(name, mock_method): - real_method = getattr(fabric_util, name) - try: - setattr(fabric_util, name, mock_method) - yield - finally: - setattr(fabric_util, name, real_method) diff -r 6facd407e2479f56fac8530a2f413226d497dfb2 -r de7ed9e75dda6b8d9a364a56f9ac29a085a9e3cc test/unit/tool_shed/test_td_common_util.py --- a/test/unit/tool_shed/test_td_common_util.py +++ /dev/null @@ -1,80 +0,0 @@ -from os.path import join -from contextlib import contextmanager -from galaxy.util import parse_xml_string - -from tool_shed.galaxy_install.tool_dependencies import fabric_util -from tool_shed.galaxy_install.tool_dependencies import td_common_util - - -TEST_DEPENDENCIES_DIR = "/opt/galaxy/dependencies" -TEST_INSTALL_DIR = "%s/test_install_dir" % TEST_DEPENDENCIES_DIR - - -class MockApp( object ): - - def __init__( self ): - pass - -def test_create_or_update_env_shell_file(): - test_path = "/usr/share/R/libs" - env_file_builder = fabric_util.EnvFileBuilder( test_path ) - line, path = env_file_builder.create_or_update_env_shell_file( TEST_INSTALL_DIR, dict( action="append_to", name="R_LIBS", value=test_path ) ) - assert path == join( TEST_INSTALL_DIR, "env.sh" ) - assert line == "R_LIBS=$R_LIBS:/usr/share/R/libs; export R_LIBS" - - line, path = env_file_builder.create_or_update_env_shell_file( TEST_INSTALL_DIR, dict( action="prepend_to", name="R_LIBS", value=test_path ) ) - assert path == join( TEST_INSTALL_DIR, "env.sh" ) - assert line == "R_LIBS=/usr/share/R/libs:$R_LIBS; export R_LIBS" - - line, path = env_file_builder.create_or_update_env_shell_file( TEST_INSTALL_DIR, dict( action="set_to", name="R_LIBS", value=test_path ) ) - assert path == join( TEST_INSTALL_DIR, "env.sh" ) - assert line == "R_LIBS=/usr/share/R/libs; export R_LIBS" - - line, path = env_file_builder.create_or_update_env_shell_file( TEST_INSTALL_DIR, dict( action="source", value=test_path ) ) - assert path == join( TEST_INSTALL_DIR, "env.sh" ) - assert line == "if [ -f /usr/share/R/libs ] ; then . /usr/share/R/libs ; fi" - -def test_get_env_shell_file_paths_from_setup_environment_elem(): - xml = """<action name="setup_r_environment"> - <repository name="package_r_3_0_1" owner="bgruening" toolshed="toolshed.g2.bx.psu.edu" changeset_revision="1234567"> - <package name="R" version="3.0.1" /> - </repository> - </action> - """ - mock_app = MockApp() - action_elem = parse_xml_string( xml ) - required_for_install_env_sh = '/path/to/existing.sh' - all_env_paths = [ required_for_install_env_sh ] - action_dict = {} - - r_env_sh = '/path/to/go/env.sh' - - def mock_get_env_shell_file_paths( app, elem ): - assert app == mock_app - assert elem.get( 'name' ) == "package_r_3_0_1" - return [ r_env_sh ] - - with __mock_common_util_method( "get_env_shell_file_paths", mock_get_env_shell_file_paths ): - td_common_util.get_env_shell_file_paths_from_setup_environment_elem( mock_app, all_env_paths, action_elem, action_dict ) - ## Verify old env files weren't deleted. - assert required_for_install_env_sh in all_env_paths - ## Verify new ones added. - assert r_env_sh in all_env_paths - ## env_shell_file_paths includes everything - assert all( [ env in action_dict[ 'env_shell_file_paths' ] for env in all_env_paths ] ) - - ## action_shell_file_paths includes only env files defined in - ## inside the setup_ action element. - assert required_for_install_env_sh not in action_dict[ 'action_shell_file_paths' ] - assert r_env_sh in action_dict[ 'action_shell_file_paths' ] - -## Poor man's mocking. Need to get a real mocking library as real Galaxy development -## dependnecy. -@contextmanager -def __mock_common_util_method( name, mock_method ): - real_method = getattr( td_common_util, name ) - try: - setattr( td_common_util, name, mock_method ) - yield - finally: - setattr( td_common_util, name, real_method ) diff -r 6facd407e2479f56fac8530a2f413226d497dfb2 -r de7ed9e75dda6b8d9a364a56f9ac29a085a9e3cc test/unit/tool_shed_unit_tests/__init__.py --- /dev/null +++ b/test/unit/tool_shed_unit_tests/__init__.py @@ -0,0 +1,4 @@ +""" +Module cannot be called tool_shed, because this conflicts with lib/tool_shed +also at top level of path. +""" diff -r 6facd407e2479f56fac8530a2f413226d497dfb2 -r de7ed9e75dda6b8d9a364a56f9ac29a085a9e3cc test/unit/tool_shed_unit_tests/test_fabric_util.py --- /dev/null +++ b/test/unit/tool_shed_unit_tests/test_fabric_util.py @@ -0,0 +1,45 @@ +from contextlib import contextmanager +from tool_shed.galaxy_install.tool_dependencies import fabric_util + + +def test_env_file_builder(): + install_dir = "/opt/galaxy/dependencies/foo/" + env_file_builder = fabric_util.EnvFileBuilder( install_dir ) + added_lines = [] + mock_return = dict(value=0) + + def mock_file_append( text, file_path, **kwds ): + added_lines.append(text) + return mock_return["value"] + + with __mock_fabric_util_method("file_append", mock_file_append): + env_file_builder.append_line( name="PATH", action="prepend_to", value="/usr/bin/local/R" ) + assert added_lines == [ "PATH=/usr/bin/local/R:$PATH; export PATH" ] + assert env_file_builder.return_code == 0 + + # Reset mock lines + del added_lines[:] + # Next time file_append will fail + mock_return["value"] = 1 + + env_file_builder.append_line( action="source", value="/usr/bin/local/R/env.sh" ) + assert added_lines == [ "if [ -f /usr/bin/local/R/env.sh ] ; then . /usr/bin/local/R/env.sh ; fi" ] + # Check failure + assert env_file_builder.return_code == 1 + + mock_return["value"] = 0 + env_file_builder.append_line( name="LD_LIBRARY_PATH", action="append_to", value="/usr/bin/local/R/lib" ) + # Verify even though last append succeeded, previous failure still recorded. + assert env_file_builder.return_code == 1 + + +## Poor man's mocking. Need to get a real mocking library as real Galaxy development +## dependnecy. +@contextmanager +def __mock_fabric_util_method(name, mock_method): + real_method = getattr(fabric_util, name) + try: + setattr(fabric_util, name, mock_method) + yield + finally: + setattr(fabric_util, name, real_method) diff -r 6facd407e2479f56fac8530a2f413226d497dfb2 -r de7ed9e75dda6b8d9a364a56f9ac29a085a9e3cc test/unit/tool_shed_unit_tests/test_td_common_util.py --- /dev/null +++ b/test/unit/tool_shed_unit_tests/test_td_common_util.py @@ -0,0 +1,80 @@ +from os.path import join +from contextlib import contextmanager +from galaxy.util import parse_xml_string + +from tool_shed.galaxy_install.tool_dependencies import fabric_util +from tool_shed.galaxy_install.tool_dependencies import td_common_util + + +TEST_DEPENDENCIES_DIR = "/opt/galaxy/dependencies" +TEST_INSTALL_DIR = "%s/test_install_dir" % TEST_DEPENDENCIES_DIR + + +class MockApp( object ): + + def __init__( self ): + pass + +def test_create_or_update_env_shell_file(): + test_path = "/usr/share/R/libs" + env_file_builder = fabric_util.EnvFileBuilder( test_path ) + line, path = env_file_builder.create_or_update_env_shell_file( TEST_INSTALL_DIR, dict( action="append_to", name="R_LIBS", value=test_path ) ) + assert path == join( TEST_INSTALL_DIR, "env.sh" ) + assert line == "R_LIBS=$R_LIBS:/usr/share/R/libs; export R_LIBS" + + line, path = env_file_builder.create_or_update_env_shell_file( TEST_INSTALL_DIR, dict( action="prepend_to", name="R_LIBS", value=test_path ) ) + assert path == join( TEST_INSTALL_DIR, "env.sh" ) + assert line == "R_LIBS=/usr/share/R/libs:$R_LIBS; export R_LIBS" + + line, path = env_file_builder.create_or_update_env_shell_file( TEST_INSTALL_DIR, dict( action="set_to", name="R_LIBS", value=test_path ) ) + assert path == join( TEST_INSTALL_DIR, "env.sh" ) + assert line == "R_LIBS=/usr/share/R/libs; export R_LIBS" + + line, path = env_file_builder.create_or_update_env_shell_file( TEST_INSTALL_DIR, dict( action="source", value=test_path ) ) + assert path == join( TEST_INSTALL_DIR, "env.sh" ) + assert line == "if [ -f /usr/share/R/libs ] ; then . /usr/share/R/libs ; fi" + +def test_get_env_shell_file_paths_from_setup_environment_elem(): + xml = """<action name="setup_r_environment"> + <repository name="package_r_3_0_1" owner="bgruening" toolshed="toolshed.g2.bx.psu.edu" changeset_revision="1234567"> + <package name="R" version="3.0.1" /> + </repository> + </action> + """ + mock_app = MockApp() + action_elem = parse_xml_string( xml ) + required_for_install_env_sh = '/path/to/existing.sh' + all_env_paths = [ required_for_install_env_sh ] + action_dict = {} + + r_env_sh = '/path/to/go/env.sh' + + def mock_get_env_shell_file_paths( app, elem ): + assert app == mock_app + assert elem.get( 'name' ) == "package_r_3_0_1" + return [ r_env_sh ] + + with __mock_common_util_method( "get_env_shell_file_paths", mock_get_env_shell_file_paths ): + td_common_util.get_env_shell_file_paths_from_setup_environment_elem( mock_app, all_env_paths, action_elem, action_dict ) + ## Verify old env files weren't deleted. + assert required_for_install_env_sh in all_env_paths + ## Verify new ones added. + assert r_env_sh in all_env_paths + ## env_shell_file_paths includes everything + assert all( [ env in action_dict[ 'env_shell_file_paths' ] for env in all_env_paths ] ) + + ## action_shell_file_paths includes only env files defined in + ## inside the setup_ action element. + assert required_for_install_env_sh not in action_dict[ 'action_shell_file_paths' ] + assert r_env_sh in action_dict[ 'action_shell_file_paths' ] + +## Poor man's mocking. Need to get a real mocking library as real Galaxy development +## dependnecy. +@contextmanager +def __mock_common_util_method( name, mock_method ): + real_method = getattr( td_common_util, name ) + try: + setattr( td_common_util, name, mock_method ) + yield + finally: + setattr( td_common_util, name, real_method ) https://bitbucket.org/galaxy/galaxy-central/commits/586dd135b4af/ Changeset: 586dd135b4af User: jmchilton Date: 2014-01-03 00:50:15 Summary: Fix version output redirection for pre-bash shells. Affected #: 1 file diff -r de7ed9e75dda6b8d9a364a56f9ac29a085a9e3cc -r 586dd135b4afc9d947c15aedc45c5104cc9ad94c lib/galaxy/jobs/command_factory.py --- a/lib/galaxy/jobs/command_factory.py +++ b/lib/galaxy/jobs/command_factory.py @@ -38,7 +38,7 @@ def __handle_version_command(commands_builder, job_wrapper): # Prepend version string if job_wrapper.version_string_cmd: - version_command = "%s &> %s" % ( job_wrapper.version_string_cmd, job_wrapper.get_version_string_path() ) + version_command = "%s > %s 2>&1" % ( job_wrapper.version_string_cmd, job_wrapper.get_version_string_path() ) commands_builder.prepend_command(version_command) https://bitbucket.org/galaxy/galaxy-central/commits/f0f7c3cd2e8a/ Changeset: f0f7c3cd2e8a User: jmchilton Date: 2014-01-03 00:50:15 Summary: Adjust GALAXY_SLOTS in local.py for pre-bash shells. Affected #: 1 file diff -r 586dd135b4afc9d947c15aedc45c5104cc9ad94c -r f0f7c3cd2e8af64243f878c04f418c1ee054bc55 lib/galaxy/jobs/runners/local.py --- a/lib/galaxy/jobs/runners/local.py +++ b/lib/galaxy/jobs/runners/local.py @@ -54,9 +54,9 @@ slots = job_wrapper.job_destination.params.get( "local_slots", None ) command_line = command_line.lstrip( " ;" ) if slots: - command_line = 'export GALAXY_SLOTS="%d"; export GALAXY_SLOTS_CONFIGURED="1"; %s' % ( int( slots ), command_line ) + command_line = 'GALAXY_SLOTS="%d"; export GALAXY_SLOTS; GALAXY_SLOTS_CONFIGURED="1"; export GALAXY_SLOTS_CONFIGURED; %s' % ( int( slots ), command_line ) else: - command_line = 'export GALAXY_SLOTS="1"; %s' % command_line + command_line = 'GALAXY_SLOTS="1"; export GALAXY_SLOTS; %s' % command_line return command_line def queue_job( self, job_wrapper ): Repository URL: https://bitbucket.org/galaxy/galaxy-central/ -- This is a commit notification from bitbucket.org. You are receiving this because you have the service enabled, addressing the recipient of this email.
participants (1)
-
commits-noreply@bitbucket.org