1 new commit in galaxy-central: https://bitbucket.org/galaxy/galaxy-central/commits/423a2939e224/ Changeset: 423a2939e224 User: jmchilton Date: 2014-02-17 21:17:43 Summary: Update LWR client through LWR changeset 5c609be. The LWR server and client can be driven via message queues, though this changeset doesn't update the LWR runner itself to allow this - only the client itself. The runner however has been updated to support some more minor changes that enabled the LWR client to be message queue driven. In particular, a new action type was added "remote_copy" that does a file-system copy of files being staged in or out like "copy" - but it does so on the LWR side instead of the Galaxy client. In this scenario these actions are serialized and transferred with the "launch" command to the LWR - potentially eliminating many HTTP calls. Paired with this, more processing can happen on the client side - namely calculating remote paths and properties - and if configured in this fashion the LWR "setup" step can be eliminated - allowing the LWR client to completely specify a job with a single one-directional message (very useful for message queues). While modifying the LWR to be able to behave in this fashion was done to enable message queues - a traditional HTTP driven LWR can use these features (remote copy, client-side path calculation, etc...) and job_conf.xml.sample_advanced has been updated to reflect this. For more information on changes to the LWR client this encompasses see the following LWR changesets: https://bitbucket.org/jmchilton/lwr/commits/bacaa5c840bbf41952fcafe6e0deadd3... https://bitbucket.org/jmchilton/lwr/commits/c8254d4525fa19f6761c6a75506d7b2b... https://bitbucket.org/jmchilton/lwr/commits/1f068927a677bbfb932c475e5a4dad98... https://bitbucket.org/jmchilton/lwr/commits/bde8cbe6f5736a82d5fa20b322cb34d9... https://bitbucket.org/jmchilton/lwr/commits/993910fa0c5a0ac2be72b2d70eeaaa07... Affected #: 14 files diff -r 11744e84d4d2389a28ff8e1929d3b4f5eafb21f0 -r 423a2939e224a294ded5842daa485da0a007fcbb job_conf.xml.sample_advanced --- a/job_conf.xml.sample_advanced +++ b/job_conf.xml.sample_advanced @@ -96,6 +96,16 @@ ones (both alternatives are a little brittle). --><!-- <param id="remote_metadata">true</param> --><!-- <param id="use_remote_datatypes">false</param> --> + <!-- Traditionally, the LWR client sends request to LWR + server to populate various system properties. This + extra step can be disabled and these calculated here + on client by uncommenting job_directory and + specifying any additional remote_property_ of + interest. When using message queues this is nessecary + not optional. + --> + <!-- <param id="jobs_directory">/path/to/remote/lwr/lwr_staging/</param> --> + <!-- <param id="remote_property_galaxy_home">/path/to/remote/galaxy-central</param> --><!-- If remote LWR server is configured to run jobs as the real user, uncomment the following line to pass the current Galaxy user along. --> diff -r 11744e84d4d2389a28ff8e1929d3b4f5eafb21f0 -r 423a2939e224a294ded5842daa485da0a007fcbb lib/galaxy/jobs/runners/lwr.py --- a/lib/galaxy/jobs/runners/lwr.py +++ b/lib/galaxy/jobs/runners/lwr.py @@ -6,19 +6,19 @@ from galaxy.jobs import JobDestination from galaxy.jobs.command_factory import build_command from galaxy.util import string_as_bool_or_none -from galaxy.util import in_directory from galaxy.util.bunch import Bunch import errno from time import sleep import os -from .lwr_client import ClientManager, url_to_destination_params +from .lwr_client import build_client_manager +from .lwr_client import url_to_destination_params from .lwr_client import finish_job as lwr_finish_job from .lwr_client import submit_job as lwr_submit_job from .lwr_client import ClientJobDescription from .lwr_client import LwrOutputs -from .lwr_client import GalaxyOutputs +from .lwr_client import ClientOutputs from .lwr_client import PathMapper log = logging.getLogger( __name__ ) @@ -41,7 +41,7 @@ self._init_monitor_thread() self._init_worker_threads() client_manager_kwargs = {'transport_type': transport, 'cache': string_as_bool_or_none(cache)} - self.client_manager = ClientManager(**client_manager_kwargs) + self.client_manager = build_client_manager(**client_manager_kwargs) def url_to_destination( self, url ): """Convert a legacy URL to a job destination""" @@ -83,13 +83,12 @@ client_job_description = ClientJobDescription( command_line=command_line, - output_files=self.get_output_files(job_wrapper), input_files=self.get_input_files(job_wrapper), + client_outputs=self.__client_outputs(client, job_wrapper), working_directory=job_wrapper.working_directory, tool=job_wrapper.tool, config_files=job_wrapper.extra_filenames, requirements=requirements, - version_file=job_wrapper.get_version_string_path(), rewrite_paths=rewrite_paths, arbitrary_files=unstructured_path_rewrites, ) @@ -197,30 +196,17 @@ stdout = run_results.get('stdout', '') stderr = run_results.get('stderr', '') exit_code = run_results.get('returncode', None) - lwr_outputs = LwrOutputs(run_results) + lwr_outputs = LwrOutputs.from_status_response(run_results) # Use LWR client code to transfer/copy files back # and cleanup job if needed. completed_normally = \ job_wrapper.get_state() not in [ model.Job.states.ERROR, model.Job.states.DELETED ] cleanup_job = self.app.config.cleanup_job - remote_work_dir_copy = LwrJobRunner.__remote_work_dir_copy( client ) - if not remote_work_dir_copy: - work_dir_outputs = self.get_work_dir_outputs( job_wrapper ) - else: - # They have already been copied over to look like regular outputs remotely, - # no need to handle them differently here. - work_dir_outputs = [] - output_files = self.get_output_files( job_wrapper ) - galaxy_outputs = GalaxyOutputs( - working_directory=job_wrapper.working_directory, - work_dir_outputs=work_dir_outputs, - output_files=output_files, - version_file=job_wrapper.get_version_string_path(), - ) + client_outputs = self.__client_outputs(client, job_wrapper) finish_args = dict( client=client, job_completed_normally=completed_normally, cleanup_job=cleanup_job, - galaxy_outputs=galaxy_outputs, + client_outputs=client_outputs, lwr_outputs=lwr_outputs ) failed = lwr_finish_job( **finish_args ) @@ -305,6 +291,23 @@ job_state.running = state == model.Job.states.RUNNING self.monitor_queue.put( job_state ) + def __client_outputs( self, client, job_wrapper ): + remote_work_dir_copy = LwrJobRunner.__remote_work_dir_copy( client ) + if not remote_work_dir_copy: + work_dir_outputs = self.get_work_dir_outputs( job_wrapper ) + else: + # They have already been copied over to look like regular outputs remotely, + # no need to handle them differently here. + work_dir_outputs = [] + output_files = self.get_output_files( job_wrapper ) + client_outputs = ClientOutputs( + working_directory=job_wrapper.working_directory, + work_dir_outputs=work_dir_outputs, + output_files=output_files, + version_file=job_wrapper.get_version_string_path(), + ) + return client_outputs + @staticmethod def __dependency_resolution( lwr_client ): dependency_resolution = lwr_client.destination_params.get( "dependency_resolution", "local" ) diff -r 11744e84d4d2389a28ff8e1929d3b4f5eafb21f0 -r 423a2939e224a294ded5842daa485da0a007fcbb lib/galaxy/jobs/runners/lwr_client/__init__.py --- a/lib/galaxy/jobs/runners/lwr_client/__init__.py +++ b/lib/galaxy/jobs/runners/lwr_client/__init__.py @@ -10,20 +10,20 @@ from .staging.up import submit_job from .staging import ClientJobDescription from .staging import LwrOutputs -from .staging import GalaxyOutputs +from .staging import ClientOutputs from .client import OutputNotFoundException -from .manager import ClientManager +from .manager import build_client_manager from .destination import url_to_destination_params from .path_mapper import PathMapper __all__ = [ - ClientManager, + build_client_manager, OutputNotFoundException, url_to_destination_params, finish_job, submit_job, ClientJobDescription, LwrOutputs, - GalaxyOutputs, + ClientOutputs, PathMapper, ] diff -r 11744e84d4d2389a28ff8e1929d3b4f5eafb21f0 -r 423a2939e224a294ded5842daa485da0a007fcbb lib/galaxy/jobs/runners/lwr_client/action_mapper.py --- a/lib/galaxy/jobs/runners/lwr_client/action_mapper.py +++ b/lib/galaxy/jobs/runners/lwr_client/action_mapper.py @@ -7,6 +7,7 @@ import fnmatch from re import compile from re import escape +import galaxy.util from galaxy.util.bunch import Bunch from .util import directory_files from .util import unique_path_prefix @@ -14,6 +15,11 @@ DEFAULT_MAPPED_ACTION = 'transfer' # Not really clear to me what this should be, exception? DEFAULT_PATH_MAPPER_TYPE = 'prefix' +STAGING_ACTION_REMOTE = "remote" +STAGING_ACTION_LOCAL = "local" +STAGING_ACTION_NONE = None +STAGING_ACTION_DEFAULT = "default" + # Poor man's enum. path_type = Bunch( # Galaxy input datasets and extra files. @@ -63,6 +69,7 @@ ... f.close() ... mock_client = Bunch(default_file_action=default_action, action_config_path=f.name) ... mapper = FileActionMapper(mock_client) + ... mapper = FileActionMapper(config=mapper.to_dict()) # Serialize and deserialize it to make sure still works ... unlink(f.name) ... return mapper >>> mapper = mapper_for(default_action='none', config_contents=json_string) @@ -110,18 +117,33 @@ True """ - def __init__(self, client): - self.default_action = client.default_file_action + def __init__(self, client=None, config=None): + if config is None and client is None: + message = "FileActionMapper must be constructed from either a client or a config dictionary." + raise Exception(message) + if config is None: + config = self.__client_to_config(client) + self.default_action = config.get("default_action", "transfer") + self.mappers = mappers_from_dicts(config.get("paths", [])) + + def to_dict(self): + return dict( + default_action=self.default_action, + paths=map(lambda m: m.to_dict(), self.mappers) + ) + + def __client_to_config(self, client): action_config_path = client.action_config_path - self.mappers = [] if action_config_path: - self.__load_action_config(action_config_path) + config = load(open(action_config_path, 'rb')) + else: + config = dict() + config["default_action"] = client.default_file_action + return config def __load_action_config(self, path): config = load(open(path, 'rb')) - for path_config in config.get('paths', []): - map_type = path_config.get('match_type', DEFAULT_PATH_MAPPER_TYPE) - self.mappers.append(mappers[map_type](path_config)) + self.mappers = mappers_from_dicts(config.get('paths', [])) def action(self, path, type, mapper=None): action_type = self.default_action if type in ACTION_DEFAULT_PATH_TYPES else "none" @@ -167,6 +189,14 @@ unstructured_map[path] = join(prefix, name) return unstructured_map + @property + def staging_needed(self): + return self.staging != STAGING_ACTION_NONE + + @property + def staging_action_local(self): + return self.staging == STAGING_ACTION_LOCAL + class NoneAction(BaseAction): """ This action indicates the corresponding path does not require any @@ -174,7 +204,7 @@ the LWR client (i.e. Galaxy server) and remote LWR server with the same paths. """ action_type = "none" - staging_needed = False + staging = STAGING_ACTION_NONE class TransferAction(BaseAction): @@ -182,7 +212,7 @@ transfer of the corresponding path to the remote LWR server before launching the job. """ action_type = "transfer" - staging_needed = True + staging = STAGING_ACTION_LOCAL class CopyAction(BaseAction): @@ -190,7 +220,74 @@ copy of the corresponding path to the LWR staging directory prior to launching the corresponding job. """ action_type = "copy" - staging_needed = True + staging = STAGING_ACTION_LOCAL + + +class RemoteCopyAction(BaseAction): + """ This action indicates the LWR server should copy the file before + execution via direct file system copy. This is like a CopyAction, but + it indicates the action should occur on the LWR server instead of on + the client. + """ + action_type = "remote_copy" + staging = STAGING_ACTION_REMOTE + + def to_dict(self): + return dict(path=self.path, action_type=RemoteCopyAction.action_type) + + @classmethod + def from_dict(cls, action_dict): + return RemoteCopyAction(path=action_dict["path"]) + + def write_to_path(self, path): + galaxy.util.copy_to_path(open(self.path, "rb"), path) + + +class MessageAction(object): + """ Sort of pseudo action describing "files" store in memory and + transferred via message (HTTP, Python-call, MQ, etc...) + """ + action_type = "message" + staging = STAGING_ACTION_DEFAULT + + def __init__(self, contents, client=None): + self.contents = contents + self.client = client + + @property + def staging_needed(self): + return True + + @property + def staging_action_local(self): + # Ekkk, cannot be called if created through from_dict. + # Shouldn't be a problem the way it is used - but is an + # object design problem. + return self.client.prefer_local_staging + + def to_dict(self): + return dict(contents=self.contents, action_type=MessageAction.action_type) + + @classmethod + def from_dict(cls, action_dict): + return MessageAction(contents=action_dict["contents"]) + + def write_to_path(self, path): + open(path, "w").write(self.contents) + +DICTIFIABLE_ACTION_CLASSES = [RemoteCopyAction, MessageAction] + + +def from_dict(action_dict): + action_type = action_dict.get("action_type", None) + target_class = None + for action_class in DICTIFIABLE_ACTION_CLASSES: + if action_type == action_class.action_type: + target_class = action_class + if not target_class: + message = "Failed to recover action from dictionary - invalid action type specified %s." % action_type + raise Exception(message) + return target_class.from_dict(action_dict) class BasePathMapper(object): @@ -207,8 +304,19 @@ path_type_matches = path_type in self.path_types return path_type_matches and self._path_matches(path) + def _extend_base_dict(self, **kwds): + base_dict = dict( + action=self.action_type, + path_types=",".join(self.path_types), + match_type=self.match_type, + **self.file_lister.to_dict() + ) + base_dict.update(**kwds) + return base_dict + class PrefixPathMapper(BasePathMapper): + match_type = 'prefix' def __init__(self, config): super(PrefixPathMapper, self).__init__(config) @@ -221,8 +329,12 @@ pattern_str = "(%s%s[^\s,\"\']+)" % (escape(self.prefix_path), escape(sep)) return compile(pattern_str) + def to_dict(self): + return self._extend_base_dict(path=self.prefix_path) + class GlobPathMapper(BasePathMapper): + match_type = 'glob' def __init__(self, config): super(GlobPathMapper, self).__init__(config) @@ -234,12 +346,17 @@ def to_pattern(self): return compile(fnmatch.translate(self.glob_path)) + def to_dict(self): + return self._extend_base_dict(path=self.glob_path) + class RegexPathMapper(BasePathMapper): + match_type = 'regex' def __init__(self, config): super(RegexPathMapper, self).__init__(config) - self.pattern = compile(config['path']) + self.pattern_raw = config['path'] + self.pattern = compile(self.pattern_raw) def _path_matches(self, path): return self.pattern.match(path) is not None @@ -247,12 +364,32 @@ def to_pattern(self): return self.pattern + def to_dict(self): + return self._extend_base_dict(path=self.pattern_raw) + +MAPPER_CLASSES = [PrefixPathMapper, GlobPathMapper, RegexPathMapper] +MAPPER_CLASS_DICT = dict(map(lambda c: (c.match_type, c), MAPPER_CLASSES)) + + +def mappers_from_dicts(mapper_def_list): + return map(lambda m: __mappper_from_dict(m), mapper_def_list) + + +def __mappper_from_dict(mapper_dict): + map_type = mapper_dict.get('match_type', DEFAULT_PATH_MAPPER_TYPE) + return MAPPER_CLASS_DICT[map_type](mapper_dict) + class FileLister(object): def __init__(self, config): self.depth = int(config.get("depth", "0")) + def to_dict(self): + return dict( + depth=self.depth + ) + def unstructured_map(self, path): depth = self.depth if self.depth == 0: @@ -265,14 +402,8 @@ DEFAULT_FILE_LISTER = FileLister(dict(depth=0)) -ACTION_CLASSES = [NoneAction, TransferAction, CopyAction] +ACTION_CLASSES = [NoneAction, TransferAction, CopyAction, RemoteCopyAction] actions = dict([(clazz.action_type, clazz) for clazz in ACTION_CLASSES]) -mappers = { - 'prefix': PrefixPathMapper, - 'glob': GlobPathMapper, - 'regex': RegexPathMapper, -} - -__all__ = [FileActionMapper, path_type] +__all__ = [FileActionMapper, path_type, from_dict, MessageAction] diff -r 11744e84d4d2389a28ff8e1929d3b4f5eafb21f0 -r 423a2939e224a294ded5842daa485da0a007fcbb lib/galaxy/jobs/runners/lwr_client/amqp_exchange.py --- /dev/null +++ b/lib/galaxy/jobs/runners/lwr_client/amqp_exchange.py @@ -0,0 +1,83 @@ +try: + import kombu + from kombu import pools +except ImportError: + kombu = None + +import socket +import logging +log = logging.getLogger(__name__) + + +KOMBU_UNAVAILABLE = "Attempting to bind to AMQP message queue, but kombu dependency unavailable" + +DEFAULT_EXCHANGE_NAME = "lwr" +DEFAULT_EXCHANGE_TYPE = "direct" +DEFAULT_TIMEOUT = 0.2 # Set timeout to periodically give up looking and check + # if polling should end. + + +class LwrExchange(object): + """ Utility for publishing and consuming structured LWR queues using kombu. + This is shared between the server and client - an exchange should be setup + for each manager (or in the case of the client, each manager one wished to + communicate with.) + + Each LWR manager is defined solely by name in the scheme, so only one LWR + should target each AMQP endpoint or care should be taken that unique + manager names are used across LWR servers targetting same AMQP endpoint - + and in particular only one such LWR should define an default manager with + name _default_. + """ + + def __init__(self, url, manager_name, timeout=DEFAULT_TIMEOUT): + """ + """ + if not kombu: + raise Exception(KOMBU_UNAVAILABLE) + self.__url = url + self.__manager_name = manager_name + self.__exchange = kombu.Exchange(DEFAULT_EXCHANGE_NAME, DEFAULT_EXCHANGE_TYPE) + self.__timeout = timeout + + def consume(self, queue_name, callback, check=True, connection_kwargs={}): + queue = self.__queue(queue_name) + with self.connection(self.__url, **connection_kwargs) as connection: + with kombu.Consumer(connection, queues=[queue], callbacks=[callback], accept=['json']): + while check: + try: + connection.drain_events(timeout=self.__timeout) + except socket.timeout: + pass + + def publish(self, name, payload): + with self.connection(self.__url) as connection: + with pools.producers[connection].acquire() as producer: + key = self.__queue_name(name) + producer.publish( + payload, + serializer='json', + exchange=self.__exchange, + declare=[self.__exchange], + routing_key=key, + ) + + def connection(self, connection_string, **kwargs): + return kombu.Connection(connection_string, **kwargs) + + def __queue(self, name): + queue_name = self.__queue_name(name) + queue = kombu.Queue(queue_name, self.__exchange, routing_key=queue_name) + return queue + + def __queue_name(self, name): + key_prefix = self.__key_prefix() + queue_name = '%s_%s' % (key_prefix, name) + return queue_name + + def __key_prefix(self): + if self.__manager_name == "_default_": + key_prefix = "lwr_" + else: + key_prefix = "lwr_%s_" % self.__manager_name + return key_prefix diff -r 11744e84d4d2389a28ff8e1929d3b4f5eafb21f0 -r 423a2939e224a294ded5842daa485da0a007fcbb lib/galaxy/jobs/runners/lwr_client/client.py --- a/lib/galaxy/jobs/runners/lwr_client/client.py +++ b/lib/galaxy/jobs/runners/lwr_client/client.py @@ -1,9 +1,12 @@ import os import shutil -from json import dumps, loads +import json +from json import dumps from time import sleep from .destination import submit_params +from .setup_handler import build as build_setup_handler +from .job_directory import RemoteJobDirectory CACHE_WAIT_SECONDS = 3 MAX_RETRY_COUNT = 5 @@ -15,7 +18,7 @@ def __call__(self, func): def replacement(*args, **kwargs): response = func(*args, **kwargs) - return loads(response) + return json.loads(response) return replacement @@ -49,7 +52,47 @@ return "No remote output found for path %s" % self.path -class JobClient(object): +class BaseJobClient(object): + + def __init__(self, destination_params, job_id): + self.destination_params = destination_params + self.job_id = job_id + if "jobs_directory" in (destination_params or {}): + staging_directory = destination_params["jobs_directory"] + sep = destination_params.get("remote_sep", os.sep) + job_directory = RemoteJobDirectory( + remote_staging_directory=staging_directory, + remote_id=job_id, + remote_sep=sep, + ) + else: + job_directory = None + self.job_directory = job_directory + + self.default_file_action = self.destination_params.get("default_file_action", "transfer") + self.action_config_path = self.destination_params.get("file_action_config", None) + + self.setup_handler = build_setup_handler(self, destination_params) + + def setup(self, tool_id=None, tool_version=None): + """ + Setup remote LWR server to run this job. + """ + setup_args = {"job_id": self.job_id} + if tool_id: + setup_args["tool_id"] = tool_id + if tool_version: + setup_args["tool_version"] = tool_version + return self.setup_handler.setup(**setup_args) + + @property + def prefer_local_staging(self): + # If doing a job directory is defined, calculate paths here and stage + # remotely. + return self.job_directory is None + + +class JobClient(BaseJobClient): """ Objects of this client class perform low-level communication with a remote LWR server. @@ -62,12 +105,8 @@ """ def __init__(self, destination_params, job_id, job_manager_interface): + super(JobClient, self).__init__(destination_params, job_id) self.job_manager_interface = job_manager_interface - self.destination_params = destination_params - self.job_id = job_id - - self.default_file_action = self.destination_params.get("default_file_action", "transfer") - self.action_config_path = self.destination_params.get("file_action_config", None) def _raw_execute(self, command, args={}, data=None, input_path=None, output_path=None): return self.job_manager_interface.execute(command, args, data, input_path, output_path) @@ -206,10 +245,11 @@ } self._raw_execute("download_output", output_params, output_path=output_path) - def launch(self, command_line, requirements=[]): + def launch(self, command_line, requirements=[], remote_staging=[], job_config=None): """ - Run or queue up the execution of the supplied - `command_line` on the remote server. + Queue up the execution of the supplied `command_line` on the remote + server. Called launch for historical reasons, should be renamed to + enqueue or something like that. **Parameters** @@ -222,6 +262,14 @@ launch_params['params'] = dumps(submit_params) if requirements: launch_params['requirements'] = dumps([requirement.to_dict() for requirement in requirements]) + if remote_staging: + launch_params['remote_staging'] = dumps(remote_staging) + if job_config and self.setup_handler.local: + # Setup not yet called, job properties were inferred from + # destination arguments. Hence, must have LWR setup job + # before queueing. + setup_params = _setup_params_from_job_config(job_config) + launch_params["setup_params"] = dumps(setup_params) return self._raw_execute("launch", launch_params) def kill(self): @@ -230,15 +278,20 @@ """ return self._raw_execute("kill", {"job_id": self.job_id}) - def wait(self): + def wait(self, max_seconds=None): """ Wait for job to finish. """ - while True: + i = 0 + while max_seconds is None or i < max_seconds: complete_response = self.raw_check_complete() if complete_response["complete"] == "true": + print complete_response return complete_response + else: + print complete_response sleep(1) + i += 1 @parseJson() def raw_check_complete(self): @@ -276,15 +329,10 @@ self._raw_execute("clean", {"job_id": self.job_id}) @parseJson() - def setup(self, tool_id=None, tool_version=None): + def remote_setup(self, **setup_args): """ Setup remote LWR server to run this job. """ - setup_args = {"job_id": self.job_id} - if tool_id: - setup_args["tool_id"] = tool_id - if tool_version: - setup_args["tool_version"] = tool_version return self._raw_execute("setup", setup_args) def _copy(self, source, destination): @@ -294,6 +342,35 @@ shutil.copyfile(source, destination) +class MessageJobClient(BaseJobClient): + + def __init__(self, destination_params, job_id, exchange): + super(MessageJobClient, self).__init__(destination_params, job_id) + if not self.job_directory: + error_message = "Message-queue based LWR client requires destination define a remote job_directory to stage files into." + raise Exception(error_message) + self.exchange = exchange + + def launch(self, command_line, requirements=[], remote_staging=[], job_config=None): + """ + """ + launch_params = dict(command_line=command_line, job_id=self.job_id) + submit_params_dict = submit_params(self.destination_params) + if submit_params_dict: + launch_params['params'] = submit_params_dict + if requirements: + launch_params['requirements'] = [requirement.to_dict() for requirement in requirements] + if remote_staging: + launch_params['remote_staging'] = remote_staging + if job_config and self.setup_handler.local: + # Setup not yet called, job properties were inferred from + # destination arguments. Hence, must have LWR setup job + # before queueing. + setup_params = _setup_params_from_job_config(job_config) + launch_params["setup_params"] = setup_params + return self.exchange.publish("setup", launch_params) + + class InputCachingJobClient(JobClient): """ Beta client that cache's staged files to prevent duplication. @@ -337,6 +414,17 @@ return self._raw_execute("file_available", {"path": path}) +def _setup_params_from_job_config(job_config): + job_id = job_config.get("job_id", None) + tool_id = job_config.get("tool_id", None) + tool_version = job_config.get("tool_version", None) + return dict( + job_id=job_id, + tool_id=tool_id, + tool_version=tool_version + ) + + class ObjectStoreClient(object): def __init__(self, lwr_interface): diff -r 11744e84d4d2389a28ff8e1929d3b4f5eafb21f0 -r 423a2939e224a294ded5842daa485da0a007fcbb lib/galaxy/jobs/runners/lwr_client/destination.py --- a/lib/galaxy/jobs/runners/lwr_client/destination.py +++ b/lib/galaxy/jobs/runners/lwr_client/destination.py @@ -1,5 +1,6 @@ from re import match +from .util import filter_destination_params SUBMIT_PREFIX = "submit_" @@ -54,7 +55,4 @@ >>> result {'native_specification': '-q batch'} """ - destination_params = destination_params or {} - return dict([(key[len(SUBMIT_PREFIX):], destination_params[key]) - for key in destination_params - if key.startswith(SUBMIT_PREFIX)]) + return filter_destination_params(destination_params, SUBMIT_PREFIX) diff -r 11744e84d4d2389a28ff8e1929d3b4f5eafb21f0 -r 423a2939e224a294ded5842daa485da0a007fcbb lib/galaxy/jobs/runners/lwr_client/job_directory.py --- /dev/null +++ b/lib/galaxy/jobs/runners/lwr_client/job_directory.py @@ -0,0 +1,137 @@ +""" +""" +import os.path +from collections import deque +import posixpath + +from .util import PathHelper +from galaxy.util import in_directory + +from logging import getLogger +log = getLogger(__name__) + + +TYPES_TO_METHOD = dict( + input="inputs_directory", + input_extra="inputs_directory", + unstructured="unstructured_files_directory", + config="configs_directory", + tool="tool_files_directory", + work_dir="working_directory", + workdir="working_directory", + output="outputs_directory", + output_workdir="working_directory", +) + + +class RemoteJobDirectory(object): + """ Representation of a (potentially) remote LWR-style staging directory. + """ + + def __init__(self, remote_staging_directory, remote_id, remote_sep): + self.path_helper = PathHelper(remote_sep) + self.job_directory = self.path_helper.remote_join( + remote_staging_directory, + remote_id + ) + + def working_directory(self): + return self._sub_dir('working') + + def inputs_directory(self): + return self._sub_dir('inputs') + + def outputs_directory(self): + return self._sub_dir('outputs') + + def configs_directory(self): + return self._sub_dir('configs') + + def tool_files_directory(self): + return self._sub_dir('tool_files') + + def unstructured_files_directory(self): + return self._sub_dir('unstructured') + + @property + def path(self): + return self.job_directory + + @property + def separator(self): + return self.path_helper.separator + + def calculate_path(self, remote_relative_path, input_type): + """ Only for used by LWR client, should override for managers to + enforce security and make the directory if needed. + """ + directory, allow_nested_files = self._directory_for_file_type(input_type) + return self.path_helper.remote_join(directory, remote_relative_path) + + def _directory_for_file_type(self, file_type): + allow_nested_files = False + # work_dir and input_extra are types used by legacy clients... + # Obviously this client won't be legacy because this is in the + # client module, but this code is reused on server which may + # serve legacy clients. + allow_nested_files = file_type in ['input', 'input_extra', 'unstructured', 'output', 'output_workdir'] + directory_function = getattr(self, TYPES_TO_METHOD.get(file_type, None), None) + if not directory_function: + raise Exception("Unknown file_type specified %s" % file_type) + return directory_function(), allow_nested_files + + def _sub_dir(self, name): + return self.path_helper.remote_join(self.job_directory, name) + + +def get_mapped_file(directory, remote_path, allow_nested_files=False, local_path_module=os.path, mkdir=True): + """ + + >>> import ntpath + >>> get_mapped_file(r'C:\\lwr\\staging\\101', 'dataset_1_files/moo/cow', allow_nested_files=True, local_path_module=ntpath, mkdir=False) + 'C:\\\\lwr\\\\staging\\\\101\\\\dataset_1_files\\\\moo\\\\cow' + >>> get_mapped_file(r'C:\\lwr\\staging\\101', 'dataset_1_files/moo/cow', allow_nested_files=False, local_path_module=ntpath) + 'C:\\\\lwr\\\\staging\\\\101\\\\cow' + >>> get_mapped_file(r'C:\\lwr\\staging\\101', '../cow', allow_nested_files=True, local_path_module=ntpath, mkdir=False) + Traceback (most recent call last): + Exception: Attempt to read or write file outside an authorized directory. + """ + if not allow_nested_files: + name = local_path_module.basename(remote_path) + path = local_path_module.join(directory, name) + else: + local_rel_path = __posix_to_local_path(remote_path, local_path_module=local_path_module) + local_path = local_path_module.join(directory, local_rel_path) + verify_is_in_directory(local_path, directory, local_path_module=local_path_module) + local_directory = local_path_module.dirname(local_path) + if mkdir and not local_path_module.exists(local_directory): + os.makedirs(local_directory) + path = local_path + return path + + +def __posix_to_local_path(path, local_path_module=os.path): + """ + Converts a posix path (coming from Galaxy), to a local path (be it posix or Windows). + + >>> import ntpath + >>> __posix_to_local_path('dataset_1_files/moo/cow', local_path_module=ntpath) + 'dataset_1_files\\\\moo\\\\cow' + >>> import posixpath + >>> __posix_to_local_path('dataset_1_files/moo/cow', local_path_module=posixpath) + 'dataset_1_files/moo/cow' + """ + partial_path = deque() + while True: + if not path or path == '/': + break + (path, base) = posixpath.split(path) + partial_path.appendleft(base) + return local_path_module.join(*partial_path) + + +def verify_is_in_directory(path, directory, local_path_module=os.path): + if not in_directory(path, directory, local_path_module): + msg = "Attempt to read or write file outside an authorized directory." + log.warn("%s Attempted path: %s, valid directory: %s" % (msg, path, directory)) + raise Exception(msg) diff -r 11744e84d4d2389a28ff8e1929d3b4f5eafb21f0 -r 423a2939e224a294ded5842daa485da0a007fcbb lib/galaxy/jobs/runners/lwr_client/manager.py --- a/lib/galaxy/jobs/runners/lwr_client/manager.py +++ b/lib/galaxy/jobs/runners/lwr_client/manager.py @@ -21,9 +21,11 @@ from .client import JobClient from .client import InputCachingJobClient from .client import ObjectStoreClient +from .client import MessageJobClient from .transport import get_transport from .util import TransferEventManager from .destination import url_to_destination_params +from .amqp_exchange import LwrExchange from logging import getLogger @@ -32,6 +34,15 @@ DEFAULT_TRANSFER_THREADS = 2 +def build_client_manager(**kwargs): + if 'job_manager' in kwargs: + return ClientManager(**kwargs) # TODO: Consider more separation here. + elif 'url' in kwargs: + return MessageQueueClientManager(**kwargs) + else: + return ClientManager(**kwargs) + + class ClientManager(object): """ Factory to create LWR clients, used to manage potential shared @@ -60,20 +71,51 @@ self.extra_client_kwds = {} def get_client(self, destination_params, job_id): - destination_params = self.__parse_destination_params(destination_params) + destination_params = _parse_destination_params(destination_params) job_manager_interface_class = self.job_manager_interface_class job_manager_interface_args = dict(destination_params=destination_params, **self.job_manager_interface_args) job_manager_interface = job_manager_interface_class(**job_manager_interface_args) return self.client_class(destination_params, job_id, job_manager_interface, **self.extra_client_kwds) - def __parse_destination_params(self, destination_params): - try: - unicode_type = unicode - except NameError: - unicode_type = str - if isinstance(destination_params, str) or isinstance(destination_params, unicode_type): - destination_params = url_to_destination_params(destination_params) - return destination_params + def shutdown(self): + pass + + +class MessageQueueClientManager(object): + + def __init__(self, **kwds): + self.url = kwds.get('url') + self.manager_name = kwds.get("manager", "_default_") + self.exchange = LwrExchange(self.url, self.manager_name) + self.thread = None + self.active = True + + def listen_for_job_completes(self, callback): + # TODO: Call this from LWR runner. + def callback_wrapper(body, message): + callback(body) + message.ack() + + def run(): + self.exchange.consume("complete", callback_wrapper, check=self) + + t = Thread( + name="lwr_client_%s_complete_callback" % self.manager_name, + target=run + ) + t.daemon = False # Lets not interrupt processing of this. + t.start() + self.thread = t + + def shutdown(self): + self.active = False + + def __nonzero__(self): + return self.active + + def get_client(self, destination_params, job_id): + destination_params = _parse_destination_params(destination_params) + return MessageJobClient(destination_params, job_id, self.exchange) class ObjectStoreClientManager(object): @@ -163,7 +205,8 @@ if controller.response_type != 'file': return controller.body(result) else: - from lwr.util import copy_to_path + # TODO: Add to Galaxy. + from galaxy.util import copy_to_path with open(result, 'rb') as result_file: copy_to_path(result_file, output_path) @@ -221,6 +264,16 @@ t.start() +def _parse_destination_params(destination_params): + try: + unicode_type = unicode + except NameError: + unicode_type = str + if isinstance(destination_params, str) or isinstance(destination_params, unicode_type): + destination_params = url_to_destination_params(destination_params) + return destination_params + + def _environ_default_int(variable, default="0"): val = getenv(variable, default) int_val = int(default) diff -r 11744e84d4d2389a28ff8e1929d3b4f5eafb21f0 -r 423a2939e224a294ded5842daa485da0a007fcbb lib/galaxy/jobs/runners/lwr_client/setup_handler.py --- /dev/null +++ b/lib/galaxy/jobs/runners/lwr_client/setup_handler.py @@ -0,0 +1,103 @@ +import os +from .util import filter_destination_params + +REMOTE_SYSTEM_PROPERTY_PREFIX = "remote_property_" + + +def build(client, destination_args): + """ Build a SetupHandler object for client from destination parameters. + """ + # Have defined a remote job directory, lets do the setup locally. + if client.job_directory: + handler = LocalSetupHandler(client, destination_args) + else: + handler = RemoteSetupHandler(client) + return handler + + +class LocalSetupHandler(object): + """ Parse destination params to infer job setup parameters (input/output + directories, etc...). Default is to get this configuration data from the + remote LWR server. + + Downside of this approach is that it requires more and more dependent + configuraiton of Galaxy. Upside is that it is asynchronous and thus makes + message queue driven configurations possible. + + Remote system properties (such as galaxy_home) can be specified in + destination args by prefixing property with remote_property_ (e.g. + remote_property_galaxy_home). + """ + + def __init__(self, client, destination_args): + self.client = client + system_properties = self.__build_system_properties(destination_args) + system_properties["sep"] = client.job_directory.separator + self.system_properties = system_properties + self.jobs_directory = destination_args["jobs_directory"] + + def setup(self, job_id, tool_id=None, tool_version=None): + return build_job_config( + job_id=job_id, + job_directory=self.client.job_directory, + system_properties=self.system_properties, + tool_id=tool_id, + tool_version=tool_version, + ) + + @property + def local(self): + """ + """ + return True + + def __build_system_properties(self, destination_params): + return filter_destination_params(destination_params, REMOTE_SYSTEM_PROPERTY_PREFIX) + + +class RemoteSetupHandler(object): + """ Default behavior. Fetch setup information from remote LWR server. + """ + def __init__(self, client): + self.client = client + + def setup(self, **setup_args): + return self.client.remote_setup(**setup_args) + + @property + def local(self): + """ + """ + return False + + +def build_job_config(job_id, job_directory, system_properties={}, tool_id=None, tool_version=None): + """ + """ + inputs_directory = job_directory.inputs_directory() + working_directory = job_directory.working_directory() + outputs_directory = job_directory.outputs_directory() + configs_directory = job_directory.configs_directory() + tools_directory = job_directory.tool_files_directory() + unstructured_files_directory = job_directory.unstructured_files_directory() + sep = system_properties.get("sep", os.sep) + job_config = { + "working_directory": working_directory, + "outputs_directory": outputs_directory, + "configs_directory": configs_directory, + "tools_directory": tools_directory, + "inputs_directory": inputs_directory, + "unstructured_files_directory": unstructured_files_directory, + # Poorly named legacy attribute. Drop at some point. + "path_separator": sep, + "job_id": job_id, + "system_properties": system_properties, + } + if tool_id: + job_config["tool_id"] = tool_id + if tool_version: + job_config["tool_version"] = tool_version + return job_config + + +__all__ = [build_job_config, build] diff -r 11744e84d4d2389a28ff8e1929d3b4f5eafb21f0 -r 423a2939e224a294ded5842daa485da0a007fcbb lib/galaxy/jobs/runners/lwr_client/staging/__init__.py --- a/lib/galaxy/jobs/runners/lwr_client/staging/__init__.py +++ b/lib/galaxy/jobs/runners/lwr_client/staging/__init__.py @@ -14,15 +14,20 @@ **Parameters** command_line : str - The local command line to execute, this will be rewritten for the remote server. + The local command line to execute, this will be rewritten for + the remote server. config_files : list - List of Galaxy 'configfile's produced for this job. These will be rewritten and sent to remote server. + List of Galaxy 'configfile's produced for this job. These will + be rewritten and sent to remote server. input_files : list - List of input files used by job. These will be transferred and references rewritten. - output_files : list - List of output_files produced by job. + List of input files used by job. These will be transferred and + references rewritten. + client_outputs : ClientOutputs + Description of outputs produced by job (at least output files along + with optional version string and working directory outputs. tool_dir : str - Directory containing tool to execute (if a wrapper is used, it will be transferred to remote server). + Directory containing tool to execute (if a wrapper is used, it will + be transferred to remote server). working_directory : str Local path created by Galaxy for running this job. requirements : list @@ -46,10 +51,9 @@ command_line, config_files, input_files, - output_files, + client_outputs, working_directory, requirements, - version_file=None, arbitrary_files=None, rewrite_paths=True, ): @@ -57,38 +61,74 @@ self.command_line = command_line self.config_files = config_files self.input_files = input_files - self.output_files = output_files + self.client_outputs = client_outputs self.working_directory = working_directory self.requirements = requirements - self.version_file = version_file self.rewrite_paths = rewrite_paths self.arbitrary_files = arbitrary_files or {} + @property + def output_files(self): + return self.client_outputs.output_files -class GalaxyOutputs(object): + @property + def version_file(self): + return self.client_outputs.version_file + + +class ClientOutputs(object): """ Abstraction describing the output datasets EXPECTED by the Galaxy job - runner client. """ + runner client. + """ - def __init__(self, working_directory, work_dir_outputs, output_files, version_file): + def __init__(self, working_directory, output_files, work_dir_outputs=None, version_file=None): self.working_directory = working_directory self.work_dir_outputs = work_dir_outputs self.output_files = output_files self.version_file = version_file + def to_dict(self): + return dict( + working_directory=self.working_directory, + work_dir_outputs=self.work_dir_outputs, + output_files=self.output_files, + version_file=self.version_file + ) + + @staticmethod + def from_dict(config_dict): + return ClientOutputs( + working_directory=config_dict.get('working_directory'), + work_dir_outputs=config_dict.get('work_dir_outputs'), + output_files=config_dict.get('output_files'), + version_file=config_dict.get('version_file'), + ) + class LwrOutputs(object): """ Abstraction describing the output files PRODUCED by the remote LWR server. """ - def __init__(self, complete_response): + def __init__(self, working_directory_contents, output_directory_contents, remote_separator=sep): + self.working_directory_contents = working_directory_contents + self.output_directory_contents = output_directory_contents + self.path_helper = PathHelper(remote_separator) + + @staticmethod + def from_status_response(complete_response): # Default to None instead of [] to distinguish between empty contents and it not set # by the LWR - older LWR instances will not set these in complete response. - self.working_directory_contents = complete_response.get("working_directory_contents", None) - self.output_directory_contents = complete_response.get("outputs_directory_contents", None) + working_directory_contents = complete_response.get("working_directory_contents", None) + output_directory_contents = complete_response.get("outputs_directory_contents", None) # Older (pre-2014) LWR servers will not include separator in response, - #so this should only be used when reasoning about outputs in - # subdirectories which was not previously supported. - self.path_helper = PathHelper(complete_response.get("system_properties", {}).get("separator", sep)) + # so this should only be used when reasoning about outputs in + # subdirectories (which was not previously supported prior to that). + remote_separator = complete_response.get("system_properties", {}).get("separator", sep) + return LwrOutputs( + working_directory_contents, + output_directory_contents, + remote_separator + ) def has_output_file(self, output_file): if self.output_directory_contents is None: diff -r 11744e84d4d2389a28ff8e1929d3b4f5eafb21f0 -r 423a2939e224a294ded5842daa485da0a007fcbb lib/galaxy/jobs/runners/lwr_client/staging/down.py --- a/lib/galaxy/jobs/runners/lwr_client/staging/down.py +++ b/lib/galaxy/jobs/runners/lwr_client/staging/down.py @@ -17,111 +17,144 @@ COPY_FROM_WORKING_DIRECTORY_PATTERN = compile(r"primary_.*|galaxy.json|metadata_.*|dataset_\d+\.dat|dataset_\d+_files.+") -def finish_job(client, cleanup_job, job_completed_normally, galaxy_outputs, lwr_outputs): +def finish_job(client, cleanup_job, job_completed_normally, client_outputs, lwr_outputs): """ Responsible for downloading results from remote server and cleaning up LWR staging directory (if needed.) """ - download_failure_exceptions = [] + collection_failure_exceptions = [] if job_completed_normally: - downloader = ResultsDownloader(client, galaxy_outputs, lwr_outputs) - download_failure_exceptions = downloader.download() - return __clean(download_failure_exceptions, cleanup_job, client) + output_collector = ClientOutputCollector(client) + action_mapper = FileActionMapper(client) + results_stager = ResultsCollector(output_collector, action_mapper, client_outputs, lwr_outputs) + collection_failure_exceptions = results_stager.collect() + __clean(collection_failure_exceptions, cleanup_job, client) + return collection_failure_exceptions -class ResultsDownloader(object): +class ClientOutputCollector(object): - def __init__(self, client, galaxy_outputs, lwr_outputs): + def __init__(self, client): self.client = client - self.galaxy_outputs = galaxy_outputs + + def collect_output(self, results_collector, output_type, action, name): + # This output should have been handled by the LWR. + if not action.staging_action_local: + return False + + path = action.path + if output_type == 'legacy': + working_directory = results_collector.client_outputs.working_directory + self.client.fetch_output_legacy(path, working_directory, action_type=action.action_type) + elif output_type == 'output_workdir': + working_directory = results_collector.client_outputs.working_directory + self.client.fetch_work_dir_output(name, working_directory, path, action_type=action.action_type) + elif output_type == 'output': + self.client.fetch_output(path=path, name=name, action_type=action.action_type) + return True + + +class ResultsCollector(object): + + def __init__(self, output_collector, action_mapper, client_outputs, lwr_outputs): + self.output_collector = output_collector + self.action_mapper = action_mapper + self.client_outputs = client_outputs self.lwr_outputs = lwr_outputs - self.action_mapper = FileActionMapper(client) self.downloaded_working_directory_files = [] self.exception_tracker = DownloadExceptionTracker() - self.output_files = galaxy_outputs.output_files + self.output_files = client_outputs.output_files self.working_directory_contents = lwr_outputs.working_directory_contents or [] - def download(self): - self.__download_working_directory_outputs() - self.__download_outputs() - self.__download_version_file() - self.__download_other_working_directory_files() - return self.exception_tracker.download_failure_exceptions + def collect(self): + self.__collect_working_directory_outputs() + self.__collect_outputs() + self.__collect_version_file() + self.__collect_other_working_directory_files() + return self.exception_tracker.collection_failure_exceptions - def __download_working_directory_outputs(self): - working_directory = self.galaxy_outputs.working_directory + def __collect_working_directory_outputs(self): + working_directory = self.client_outputs.working_directory # Fetch explicit working directory outputs. - for source_file, output_file in self.galaxy_outputs.work_dir_outputs: + for source_file, output_file in self.client_outputs.work_dir_outputs: name = relpath(source_file, working_directory) - remote_name = self.lwr_outputs.path_helper.remote_name(name) - with self.exception_tracker(): - action = self.action_mapper.action(output_file, 'output_workdir') - self.client.fetch_work_dir_output(remote_name, working_directory, output_file, action_type=action.action_type) - self.downloaded_working_directory_files.append(remote_name) + lwr_name = self.lwr_outputs.path_helper.remote_name(name) + if self._attempt_collect_output('output_workdir', path=output_file, name=lwr_name): + self.downloaded_working_directory_files.append(lwr_name) # Remove from full output_files list so don't try to download directly. self.output_files.remove(output_file) - def __download_outputs(self): + def __collect_outputs(self): # Legacy LWR not returning list of files, iterate over the list of # expected outputs for tool. for output_file in self.output_files: - # Fetch ouptut directly... - with self.exception_tracker(): - action = self.action_mapper.action(output_file, 'output') - output_generated = self.lwr_outputs.has_output_file(output_file) - working_directory = self.galaxy_outputs.working_directory - if output_generated is None: - self.client.fetch_output_legacy(output_file, working_directory, action_type=action.action_type) - elif output_generated: - self.client.fetch_output(output_file, action_type=action.action_type) + # Fetch output directly... + output_generated = self.lwr_outputs.has_output_file(output_file) + if output_generated is None: + self._attempt_collect_output('legacy', output_file) + elif output_generated: + self._attempt_collect_output('output', output_file) - for local_path, remote_name in self.lwr_outputs.output_extras(output_file).iteritems(): - with self.exception_tracker(): - action = self.action_mapper.action(local_path, 'output') - self.client.fetch_output(path=local_path, name=remote_name, action_type=action.action_type) + for galaxy_path, lwr_name in self.lwr_outputs.output_extras(output_file).iteritems(): + self._attempt_collect_output('output', path=galaxy_path, name=lwr_name) # else not output generated, do not attempt download. - def __download_version_file(self): - version_file = self.galaxy_outputs.version_file + def __collect_version_file(self): + version_file = self.client_outputs.version_file # output_directory_contents may be none for legacy LWR servers. lwr_output_directory_contents = (self.lwr_outputs.output_directory_contents or []) if version_file and COMMAND_VERSION_FILENAME in lwr_output_directory_contents: - action = self.action_mapper.action(version_file, 'output') - self.client.fetch_output(path=version_file, name=COMMAND_VERSION_FILENAME, action_type=action.action_type) + self._attempt_collect_output('output', version_file, name=COMMAND_VERSION_FILENAME) - def __download_other_working_directory_files(self): - working_directory = self.galaxy_outputs.working_directory + def __collect_other_working_directory_files(self): + working_directory = self.client_outputs.working_directory # Fetch remaining working directory outputs of interest. for name in self.working_directory_contents: if name in self.downloaded_working_directory_files: continue if COPY_FROM_WORKING_DIRECTORY_PATTERN.match(name): - with self.exception_tracker(): - output_file = join(working_directory, self.lwr_outputs.path_helper.local_name(name)) - action = self.action_mapper.action(output_file, 'output_workdir') - self.client.fetch_work_dir_output(name, working_directory, output_file, action_type=action.action_type) + output_file = join(working_directory, self.lwr_outputs.path_helper.local_name(name)) + if self._attempt_collect_output(output_type='output_workdir', path=output_file, name=name): self.downloaded_working_directory_files.append(name) + def _attempt_collect_output(self, output_type, path, name=None): + # path is final path on galaxy server (client) + # name is the 'name' of the file on the LWR server (possible a relative) + # path. + collected = False + with self.exception_tracker(): + # output_action_type cannot be 'legacy' but output_type may be + # eventually drop support for legacy mode (where type wasn't known) + # ahead of time. + output_action_type = 'output_workdir' if output_type == 'output_workdir' else 'output' + action = self.action_mapper.action(path, output_action_type) + if self._collect_output(output_type, action, name): + collected = True + + return collected + + def _collect_output(self, output_type, action, name): + return self.output_collector.collect_output(self, output_type, action, name) + class DownloadExceptionTracker(object): def __init__(self): - self.download_failure_exceptions = [] + self.collection_failure_exceptions = [] @contextmanager def __call__(self): try: yield except Exception as e: - self.download_failure_exceptions.append(e) + self.collection_failure_exceptions.append(e) -def __clean(download_failure_exceptions, cleanup_job, client): - failed = (len(download_failure_exceptions) > 0) +def __clean(collection_failure_exceptions, cleanup_job, client): + failed = (len(collection_failure_exceptions) > 0) if (not failed and cleanup_job != "never") or cleanup_job == "always": try: client.clean() - except: + except Exception: log.warn("Failed to cleanup remote LWR job") - return failed __all__ = [finish_job] diff -r 11744e84d4d2389a28ff8e1929d3b4f5eafb21f0 -r 423a2939e224a294ded5842daa485da0a007fcbb lib/galaxy/jobs/runners/lwr_client/staging/up.py --- a/lib/galaxy/jobs/runners/lwr_client/staging/up.py +++ b/lib/galaxy/jobs/runners/lwr_client/staging/up.py @@ -8,6 +8,7 @@ from ..staging import COMMAND_VERSION_FILENAME from ..action_mapper import FileActionMapper from ..action_mapper import path_type +from ..action_mapper import MessageAction from ..util import PathHelper from ..util import directory_files @@ -21,7 +22,24 @@ file_stager = FileStager(client, client_job_description, job_config) rebuilt_command_line = file_stager.get_command_line() job_id = file_stager.job_id - client.launch(rebuilt_command_line, requirements=client_job_description.requirements) + launch_kwds = dict( + command_line=rebuilt_command_line, + requirements=client_job_description.requirements, + ) + if file_stager.job_config: + launch_kwds["job_config"] = file_stager.job_config + remote_staging = {} + remote_staging_actions = file_stager.transfer_tracker.remote_staging_actions + if remote_staging_actions: + remote_staging["setup"] = remote_staging_actions + # Somehow make the following optional. + remote_staging["action_mapper"] = file_stager.action_mapper.to_dict() + remote_staging["client_outputs"] = client_job_description.client_outputs.to_dict() + + if remote_staging: + launch_kwds["remote_staging"] = remote_staging + + client.launch(**launch_kwds) return job_id @@ -100,6 +118,7 @@ # Remote LWR server assigned an id different than the # Galaxy job id, update client to reflect this. self.client.job_id = self.job_id + self.job_config = job_config def __parse_remote_separator(self, job_config): separator = job_config.get("system_properties", {}).get("separator", None) @@ -195,7 +214,7 @@ def __upload_rewritten_config_files(self): for config_file, new_config_contents in self.job_inputs.config_files.items(): - self.client.put_file(config_file, input_type='config', contents=new_config_contents) + self.transfer_tracker.handle_transfer(config_file, type='config', contents=new_config_contents) def get_command_line(self): """ @@ -317,29 +336,48 @@ self.job_inputs = job_inputs self.rewrite_paths = rewrite_paths self.file_renames = {} + self.remote_staging_actions = [] def handle_transfer(self, path, type, name=None, contents=None): + action = self.__action_for_transfer(path, type, contents) + + if action.staging_needed: + local_action = action.staging_action_local + if local_action: + response = self.client.put_file(path, type, name=name, contents=contents) + get_path = lambda: response['path'] + else: + job_directory = self.client.job_directory + assert job_directory, "job directory required for action %s" % action + if not name: + name = basename(path) + self.__add_remote_staging_input(action, name, type) + get_path = lambda: job_directory.calculate_path(name, type) + register = self.rewrite_paths or type == 'tool' # Even if inputs not rewritten, tool must be. + if register: + self.register_rewrite(path, get_path(), type, force=True) + # else: # No action for this file + + def __add_remote_staging_input(self, action, name, type): + input_dict = dict( + name=name, + type=type, + action=action.to_dict(), + ) + self.remote_staging_actions.append(input_dict) + + def __action_for_transfer(self, path, type, contents): if contents: # If contents loaded in memory, no need to write out file and copy, # just transfer. - action_type = 'transfer' + action = MessageAction(contents=contents, client=self.client) else: if not exists(path): message = "handle_tranfer called on non-existent file - [%s]" % path log.warn(message) raise Exception(message) - action_type = self.__action(path, type).action_type - - if action_type in ['transfer', 'copy']: - response = self.client.put_file(path, type, name=name, contents=contents) - register = self.rewrite_paths or type == 'tool' # Even if inputs not rewritten, tool must be. - if register: - self.register_rewrite(path, response['path'], type, force=True) - elif action_type == 'none': - # No action for this file. - pass - else: - raise Exception("Unknown action type (%s) encountered for path (%s)" % (action_type, path)) + action = self.__action(path, type) + return action def register_rewrite(self, local_path, remote_path, type, force=False): action = self.__action(local_path, type) diff -r 11744e84d4d2389a28ff8e1929d3b4f5eafb21f0 -r 423a2939e224a294ded5842daa485da0a007fcbb lib/galaxy/jobs/runners/lwr_client/util.py --- a/lib/galaxy/jobs/runners/lwr_client/util.py +++ b/lib/galaxy/jobs/runners/lwr_client/util.py @@ -46,6 +46,13 @@ return contents +def filter_destination_params(destination_params, prefix): + destination_params = destination_params or {} + return dict([(key[len(prefix):], destination_params[key]) + for key in destination_params + if key.startswith(prefix)]) + + class PathHelper(object): ''' Repository URL: https://bitbucket.org/galaxy/galaxy-central/ -- This is a commit notification from bitbucket.org. You are receiving this because you have the service enabled, addressing the recipient of this email.