1 new commit in galaxy-central: https://bitbucket.org/galaxy/galaxy-central/commits/6d45e4c17643/ Changeset: 6d45e4c17643 User: jmchilton Date: 2014-05-16 23:17:08 Summary: Update LWR client through LWR changeset f7d15f0. Adds new LWR file action type - rewrite - that allows paths to be simply rewritten during paramater replacement or command rewriting. Affected #: 5 files diff -r f61263482700499fd79238b6468c18e6915aaf08 -r 6d45e4c17643f86a5b7c572a9bd423ac46fb446e lib/galaxy/jobs/runners/lwr_client/action_mapper.py --- a/lib/galaxy/jobs/runners/lwr_client/action_mapper.py +++ b/lib/galaxy/jobs/runners/lwr_client/action_mapper.py @@ -132,6 +132,24 @@ self.mappers = mappers_from_dicts(config.get("paths", [])) self.files_endpoint = config.get("files_endpoint", None) + def action(self, path, type, mapper=None): + mapper = self.__find_mapper(path, type, mapper) + action_class = self.__action_class(path, type, mapper) + file_lister = DEFAULT_FILE_LISTER + action_kwds = {} + if mapper: + file_lister = mapper.file_lister + action_kwds = mapper.action_kwds + action = action_class(path, file_lister=file_lister, **action_kwds) + self.__process_action(action, type) + return action + + def unstructured_mappers(self): + """ Return mappers that will map 'unstructured' files (i.e. go beyond + mapping inputs, outputs, and config files). + """ + return filter(lambda m: path_type.UNSTRUCTURED in m.path_types, self.mappers) + def to_dict(self): return dict( default_action=self.default_action, @@ -153,18 +171,19 @@ config = load(open(path, 'rb')) self.mappers = mappers_from_dicts(config.get('paths', [])) - def action(self, path, type, mapper=None): - action_type = self.default_action if type in ACTION_DEFAULT_PATH_TYPES else "none" - file_lister = DEFAULT_FILE_LISTER - normalized_path = abspath(path) + def __find_mapper(self, path, type, mapper=None): if not mapper: + normalized_path = abspath(path) for query_mapper in self.mappers: if query_mapper.matches(normalized_path, type): mapper = query_mapper break + return mapper + + def __action_class(self, path, type, mapper): + action_type = self.default_action if type in ACTION_DEFAULT_PATH_TYPES else "none" if mapper: action_type = mapper.action_type - file_lister = mapper.file_lister if type in ["workdir", "output_workdir"] and action_type == "none": # We are changing the working_directory relative to what # Galaxy would use, these need to be copied over. @@ -174,15 +193,7 @@ message_template = "Unknown action_type encountered %s while trying to map path %s" message_args = (action_type, path) raise Exception(message_template % message_args) - action = action_class(path, file_lister=file_lister) - self.__process_action(action, type) - return action - - def unstructured_mappers(self): - """ Return mappers that will map 'unstructured' files (i.e. go beyond - mapping inputs, outputs, and config files). - """ - return filter(lambda m: path_type.UNSTRUCTURED in m.path_types, self.mappers) + return action_class def __process_action(self, action, file_type): """ Extension point to populate extra action information after an @@ -198,19 +209,30 @@ url = "%s&path=%s&file_type=%s" % (url_base, action.path, file_type) action.url = url +REQUIRED_ACTION_KWD = object() + class BaseAction(object): + action_spec = {} def __init__(self, path, file_lister=None): self.path = path self.file_lister = file_lister or DEFAULT_FILE_LISTER - def unstructured_map(self): + def unstructured_map(self, path_helper): unstructured_map = self.file_lister.unstructured_map(self.path) - # To ensure uniqueness, prepend unique prefix to each name - prefix = unique_path_prefix(self.path) - for path, name in unstructured_map.iteritems(): - unstructured_map[path] = join(prefix, name) + if self.staging_needed: + # To ensure uniqueness, prepend unique prefix to each name + prefix = unique_path_prefix(self.path) + for path, name in unstructured_map.iteritems(): + unstructured_map[path] = join(prefix, name) + else: + path_rewrites = {} + for path in unstructured_map: + rewrite = self.path_rewrite(path_helper, path) + if rewrite: + path_rewrites[path] = rewrite + unstructured_map = path_rewrites return unstructured_map @property @@ -230,6 +252,56 @@ action_type = "none" staging = STAGING_ACTION_NONE + def to_dict(self): + return dict(path=self.path, action_type=self.action_type) + + @classmethod + def from_dict(cls, action_dict): + return NoneAction(path=action_dict["path"]) + + def path_rewrite(self, path_helper, path=None): + return None + + +class RewriteAction(BaseAction): + """ This actin indicates the LWR server should simply rewrite the path + to the specified file. + """ + action_spec = dict( + source_directory=REQUIRED_ACTION_KWD, + destination_directory=REQUIRED_ACTION_KWD + ) + action_type = "rewrite" + staging = STAGING_ACTION_NONE + + def __init__(self, path, file_lister=None, source_directory=None, destination_directory=None): + self.path = path + self.file_lister = file_lister or DEFAULT_FILE_LISTER + self.source_directory = source_directory + self.destination_directory = destination_directory + + def to_dict(self): + return dict( + path=self.path, + action_type=self.action_type, + source_directory=self.source_directory, + destination_directory=self.destination_directory, + ) + + @classmethod + def from_dict(cls, action_dict): + return RewriteAction( + path=action_dict["path"], + source_directory=action_dict["source_directory"], + destination_directory=action_dict["destination_directory"], + ) + + def path_rewrite(self, path_helper, path=None): + if not path: + path = self.path + new_path = path_helper.from_posix_with_new_base(self.path, self.source_directory, self.destination_directory) + return None if new_path == self.path else new_path + class TransferAction(BaseAction): """ This actions indicates that the LWR client should initiate an HTTP @@ -353,7 +425,18 @@ class BasePathMapper(object): def __init__(self, config): - self.action_type = config.get('action', DEFAULT_MAPPED_ACTION) + action_type = config.get('action', DEFAULT_MAPPED_ACTION) + action_class = actions.get(action_type, None) + action_kwds = action_class.action_spec.copy() + for key, value in action_kwds.items(): + if key in config: + action_kwds[key] = config[key] + elif value is REQUIRED_ACTION_KWD: + message_template = "action_type %s requires key word argument %s" + message = message_template % (action_type, key) + raise Exception( message ) + self.action_type = action_type + self.action_kwds = action_kwds path_types_str = config.get('path_types', "*defaults*") path_types_str = path_types_str.replace("*defaults*", ",".join(ACTION_DEFAULT_PATH_TYPES)) path_types_str = path_types_str.replace("*any*", ",".join(ALL_PATH_TYPES)) @@ -368,9 +451,10 @@ base_dict = dict( action=self.action_type, path_types=",".join(self.path_types), - match_type=self.match_type, - **self.file_lister.to_dict() + match_type=self.match_type ) + base_dict.update(self.file_lister.to_dict()) + base_dict.update(self.action_kwds) base_dict.update(**kwds) return base_dict @@ -464,10 +548,11 @@ ACTION_CLASSES = [ NoneAction, + RewriteAction, TransferAction, CopyAction, RemoteCopyAction, - RemoteTransferAction + RemoteTransferAction, ] actions = dict([(clazz.action_type, clazz) for clazz in ACTION_CLASSES]) diff -r f61263482700499fd79238b6468c18e6915aaf08 -r 6d45e4c17643f86a5b7c572a9bd423ac46fb446e lib/galaxy/jobs/runners/lwr_client/amqp_exchange.py --- a/lib/galaxy/jobs/runners/lwr_client/amqp_exchange.py +++ b/lib/galaxy/jobs/runners/lwr_client/amqp_exchange.py @@ -48,6 +48,7 @@ def consume(self, queue_name, callback, check=True, connection_kwargs={}): queue = self.__queue(queue_name) + log.debug("Consuming queue '%s'", queue) while check: try: with self.connection(self.__url, ssl=self.__connect_ssl, **connection_kwargs) as connection: diff -r f61263482700499fd79238b6468c18e6915aaf08 -r 6d45e4c17643f86a5b7c572a9bd423ac46fb446e lib/galaxy/jobs/runners/lwr_client/path_mapper.py --- a/lib/galaxy/jobs/runners/lwr_client/path_mapper.py +++ b/lib/galaxy/jobs/runners/lwr_client/path_mapper.py @@ -59,7 +59,7 @@ path = str(local_path) # Use false_path if needed. action = self.action_mapper.action(path, path_type.UNSTRUCTURED) if not action.staging_needed: - return None, [] + return action.path_rewrite(self.path_helper), [] unique_names = action.unstructured_map() name = unique_names[path] remote_path = self.path_helper.remote_join(self.unstructured_files_directory, name) @@ -70,12 +70,16 @@ """ path = str(dataset_path) # Use false_path if needed. action = self.action_mapper.action(path, dataset_path_type) - remote_path_rewrite = None if action.staging_needed: if name is None: name = os.path.basename(path) remote_directory = self.__remote_directory(dataset_path_type) remote_path_rewrite = self.path_helper.remote_join(remote_directory, name) + else: + # Actions which don't require staging MUST define a path_rewrite + # method. + remote_path_rewrite = action.path_rewrite(self.path_helper) + return remote_path_rewrite def __action(self, dataset_path, dataset_path_type): diff -r f61263482700499fd79238b6468c18e6915aaf08 -r 6d45e4c17643f86a5b7c572a9bd423ac46fb446e lib/galaxy/jobs/runners/lwr_client/staging/up.py --- a/lib/galaxy/jobs/runners/lwr_client/staging/up.py +++ b/lib/galaxy/jobs/runners/lwr_client/staging/up.py @@ -78,9 +78,11 @@ self.job_inputs = JobInputs(self.command_line, self.config_files) self.action_mapper = FileActionMapper(client) - self.transfer_tracker = TransferTracker(client, self.action_mapper, self.job_inputs, rewrite_paths=self.rewrite_paths) self.__handle_setup(job_config) + + self.transfer_tracker = TransferTracker(client, self.path_helper, self.action_mapper, self.job_inputs, rewrite_paths=self.rewrite_paths) + self.__initialize_referenced_tool_files() if self.rewrite_paths: self.__initialize_referenced_arbitrary_files() @@ -142,7 +144,7 @@ referenced_arbitrary_path_mappers[path] = mapper for path, mapper in referenced_arbitrary_path_mappers.iteritems(): action = self.action_mapper.action(path, path_type.UNSTRUCTURED, mapper) - unstructured_map = action.unstructured_map() + unstructured_map = action.unstructured_map(self.path_helper) self.arbitrary_files.update(unstructured_map) def __upload_tool_files(self): @@ -330,8 +332,9 @@ class TransferTracker(object): - def __init__(self, client, action_mapper, job_inputs, rewrite_paths): + def __init__(self, client, path_helper, action_mapper, job_inputs, rewrite_paths): self.client = client + self.path_helper = path_helper self.action_mapper = action_mapper self.job_inputs = job_inputs @@ -357,6 +360,11 @@ register = self.rewrite_paths or type == 'tool' # Even if inputs not rewritten, tool must be. if register: self.register_rewrite(path, get_path(), type, force=True) + elif self.rewrite_paths: + path_rewrite = action.path_rewrite(self.path_helper) + if path_rewrite: + self.register_rewrite(path, path_rewrite, type, force=True) + # else: # No action for this file def __add_remote_staging_input(self, action, name, type): diff -r f61263482700499fd79238b6468c18e6915aaf08 -r 6d45e4c17643f86a5b7c572a9bd423ac46fb446e lib/galaxy/jobs/runners/lwr_client/util.py --- a/lib/galaxy/jobs/runners/lwr_client/util.py +++ b/lib/galaxy/jobs/runners/lwr_client/util.py @@ -107,6 +107,12 @@ 'moo/cow' >>> nt_path_helper.local_name("moo\\\\cow") 'moo/cow' + >>> posix_path_helper.from_posix_with_new_base("/galaxy/data/bowtie/hg19.fa", "/galaxy/data/", "/work/galaxy/data") + '/work/galaxy/data/bowtie/hg19.fa' + >>> posix_path_helper.from_posix_with_new_base("/galaxy/data/bowtie/hg19.fa", "/galaxy/data", "/work/galaxy/data") + '/work/galaxy/data/bowtie/hg19.fa' + >>> posix_path_helper.from_posix_with_new_base("/galaxy/data/bowtie/hg19.fa", "/galaxy/data", "/work/galaxy/data/") + '/work/galaxy/data/bowtie/hg19.fa' ''' def __init__(self, separator, local_path_module=os.path): @@ -123,6 +129,22 @@ def remote_join(self, *args): return self.separator.join(args) + def from_posix_with_new_base(self, posix_path, old_base, new_base): + # TODO: Test with new_base as a windows path against nt_path_helper. + if old_base.endswith("/"): + old_base = old_base[:-1] + if not posix_path.startswith(old_base): + message_template = "Cannot compute new path for file %s, does not start with %s." + message = message_template % (posix_path, old_base) + raise Exception(message) + stripped_path = posix_path[ len(old_base): ] + while stripped_path.startswith("/"): + stripped_path = stripped_path[1:] + path_parts = stripped_path.split(self.separator) + if new_base.endswith(self.separator): + new_base = new_base[:-len(self.separator)] + return self.remote_join(new_base, *path_parts) + class TransferEventManager(object): Repository URL: https://bitbucket.org/galaxy/galaxy-central/ -- This is a commit notification from bitbucket.org. You are receiving this because you have the service enabled, addressing the recipient of this email.