commit/galaxy-central: jmchilton: Update LWR client throught LWR changeset 860ba9c.
1 new commit in galaxy-central: https://bitbucket.org/galaxy/galaxy-central/commits/72bb9c96a5ba/ Changeset: 72bb9c96a5ba User: jmchilton Date: 2014-06-04 19:16:38 Summary: Update LWR client throught LWR changeset 860ba9c. Allows asynchornous submission of LWR jobs to remote servers without a remote LWR service running. LWR still needs to be configured there and job updates are should be configured to be sent back to Galaxy through AMQP. A long running process will be executed on the remote server to monitor the job - while this will always be needed in some cases - for others it would be great to be able to embedded the update steps into the job to be submitted to the DRM itself so that this long running process can be just a submission. This is one more potential step down the road toward allowing remote execution of Galaxy jobs with ever fewer remote constraints. Extensions to this would include allowing Galaxy job updates via the jobs API (along with staging) so MQ becomes optional, allowing auto-configuration of remote LWR application if unavailable or unconfigured, more transfer options (scp), more CLI shell options (fabric, ansible, etc...). Affected #: 7 files diff -r 3d6b71244ff9a708bd6eda419b50315d7aac4405 -r 72bb9c96a5bad59ca79a669ec1d43a87aa06986f lib/galaxy/jobs/runners/lwr.py --- a/lib/galaxy/jobs/runners/lwr.py +++ b/lib/galaxy/jobs/runners/lwr.py @@ -402,7 +402,10 @@ def __job_state( self, job, job_wrapper ): job_state = AsynchronousJobState() - job_state.job_id = str( job.get_job_runner_external_id() ) + # TODO: Determine why this is set when using normal message queue updates + # but not CLI submitted MQ updates... + raw_job_id = job.get_job_runner_external_id() or job_wrapper.job_id + job_state.job_id = str( raw_job_id ) job_state.runner_url = job_wrapper.get_job_runner_url() job_state.job_destination = job_wrapper.job_destination job_state.job_wrapper = job_wrapper diff -r 3d6b71244ff9a708bd6eda419b50315d7aac4405 -r 72bb9c96a5bad59ca79a669ec1d43a87aa06986f lib/galaxy/jobs/runners/lwr_client/client.py --- a/lib/galaxy/jobs/runners/lwr_client/client.py +++ b/lib/galaxy/jobs/runners/lwr_client/client.py @@ -8,6 +8,8 @@ from .decorators import retry from .util import copy from .util import ensure_directory +from .util import to_base64_json + import logging log = logging.getLogger(__name__) @@ -285,16 +287,25 @@ self._raw_execute("download_output", output_params, output_path=output_path) -class MessageJobClient(BaseJobClient): +class BaseMessageJobClient(BaseJobClient): def __init__(self, destination_params, job_id, client_manager): - super(MessageJobClient, self).__init__(destination_params, job_id) + super(BaseMessageJobClient, self).__init__(destination_params, job_id) if not self.job_directory: error_message = "Message-queue based LWR client requires destination define a remote job_directory to stage files into." raise Exception(error_message) self.client_manager = client_manager - def launch(self, command_line, dependencies_description=None, env=[], remote_staging=[], job_config=None): + def clean(self): + del self.client_manager.status_cache[self.job_id] + + def full_status(self): + full_status = self.client_manager.status_cache.get(self.job_id, None) + if full_status is None: + raise Exception("full_status() called before a final status was properly cached with cilent manager.") + return full_status + + def _build_setup_message(self, command_line, dependencies_description, env, remote_staging, job_config): """ """ launch_params = dict(command_line=command_line, job_id=self.job_id) @@ -313,21 +324,54 @@ # before queueing. setup_params = _setup_params_from_job_config(job_config) launch_params["setup_params"] = setup_params + return launch_params + + +class MessageJobClient(BaseMessageJobClient): + + def launch(self, command_line, dependencies_description=None, env=[], remote_staging=[], job_config=None): + """ + """ + launch_params = self._build_setup_message( + command_line, + dependencies_description=dependencies_description, + env=env, + remote_staging=remote_staging, + job_config=job_config + ) response = self.client_manager.exchange.publish("setup", launch_params) log.info("Job published to setup message queue.") return response - def clean(self): - del self.client_manager.status_cache[self.job_id] + def kill(self): + self.client_manager.exchange.publish("kill", dict(job_id=self.job_id)) - def full_status(self): - full_status = self.client_manager.status_cache.get(self.job_id, None) - if full_status is None: - raise Exception("full_status() called before a final status was properly cached with cilent manager.") - return full_status + +class MessageCLIJobClient(BaseMessageJobClient): + + def __init__(self, destination_params, job_id, client_manager, shell): + super(MessageCLIJobClient, self).__init__(destination_params, job_id, client_manager) + self.remote_lwr_path = destination_params["remote_lwr_path"] + self.shell = shell + + def launch(self, command_line, dependencies_description=None, env=[], remote_staging=[], job_config=None): + """ + """ + launch_params = self._build_setup_message( + command_line, + dependencies_description=dependencies_description, + env=env, + remote_staging=remote_staging, + job_config=job_config + ) + base64_message = to_base64_json(launch_params) + submit_command = os.path.join(self.remote_lwr_path, "scripts", "submit.bash") + # TODO: Allow configuration of manager, app, and ini path... + self.shell.execute("nohup %s --base64 %s &" % (submit_command, base64_message)) def kill(self): - self.client_manager.exchange.publish("kill", dict(job_id=self.job_id)) + # TODO + pass class InputCachingJobClient(JobClient): diff -r 3d6b71244ff9a708bd6eda419b50315d7aac4405 -r 72bb9c96a5bad59ca79a669ec1d43a87aa06986f lib/galaxy/jobs/runners/lwr_client/interface.py --- a/lib/galaxy/jobs/runners/lwr_client/interface.py +++ b/lib/galaxy/jobs/runners/lwr_client/interface.py @@ -78,8 +78,8 @@ def execute(self, command, args={}, data=None, input_path=None, output_path=None): # If data set, should be unicode (on Python 2) or str (on Python 3). - from lwr import routes - from lwr.framework import build_func_args + from lwr.web import routes + from lwr.web.framework import build_func_args controller = getattr(routes, command) action = controller.func body_args = dict(body=self.__build_body(data, input_path)) diff -r 3d6b71244ff9a708bd6eda419b50315d7aac4405 -r 72bb9c96a5bad59ca79a669ec1d43a87aa06986f lib/galaxy/jobs/runners/lwr_client/manager.py --- a/lib/galaxy/jobs/runners/lwr_client/manager.py +++ b/lib/galaxy/jobs/runners/lwr_client/manager.py @@ -8,6 +8,7 @@ from .client import JobClient from .client import InputCachingJobClient from .client import MessageJobClient +from .client import MessageCLIJobClient from .interface import HttpLwrInterface from .interface import LocalLwrInterface from .object_client import ObjectStoreClient @@ -71,6 +72,12 @@ pass +try: + from galaxy.jobs.runners.util.cli import factory as cli_factory +except ImportError: + from lwr.managers.util.cli import factory as cli_factory + + class MessageQueueClientManager(object): def __init__(self, **kwds): @@ -90,7 +97,8 @@ def callback_wrapper(body, message): try: if "job_id" in body: - self.status_cache[body["job_id"]] = body + job_id = body["job_id"] + self.status_cache[job_id] = body log.debug("Handling asynchronous status update from remote LWR.") callback(body) except Exception: @@ -116,9 +124,15 @@ return self.active def get_client(self, destination_params, job_id, **kwargs): + if job_id is None: + raise Exception("Cannot generate LWR client for empty job_id.") destination_params = _parse_destination_params(destination_params) destination_params.update(**kwargs) - return MessageJobClient(destination_params, job_id, self) + if 'shell_plugin' in destination_params: + shell = cli_factory.get_shell(destination_params) + return MessageCLIJobClient(destination_params, job_id, self, shell) + else: + return MessageJobClient(destination_params, job_id, self) class ObjectStoreClientManager(object): diff -r 3d6b71244ff9a708bd6eda419b50315d7aac4405 -r 72bb9c96a5bad59ca79a669ec1d43a87aa06986f lib/galaxy/jobs/runners/lwr_client/util.py --- a/lib/galaxy/jobs/runners/lwr_client/util.py +++ b/lib/galaxy/jobs/runners/lwr_client/util.py @@ -7,6 +7,8 @@ import os.path import hashlib import shutil +import json +import base64 def unique_path_prefix(path): @@ -70,6 +72,20 @@ if key.startswith(prefix)]) +def to_base64_json(data): + """ + + >>> x = from_base64_json(to_base64_json(dict(a=5))) + >>> x["a"] + 5 + """ + return base64.b64encode(json.dumps(data)) + + +def from_base64_json(data): + return json.loads(base64.b64decode(data)) + + class PathHelper(object): ''' diff -r 3d6b71244ff9a708bd6eda419b50315d7aac4405 -r 72bb9c96a5bad59ca79a669ec1d43a87aa06986f lib/galaxy/jobs/runners/util/cli/__init__.py --- a/lib/galaxy/jobs/runners/util/cli/__init__.py +++ b/lib/galaxy/jobs/runners/util/cli/__init__.py @@ -43,11 +43,19 @@ Return shell and job interface defined by and configured via specified params. """ + shell = self.get_shell_plugin(shell_params) + job_interface = self.get_job_interface(job_params) + return shell, job_interface + + def get_shell_plugin(self, shell_params): shell_plugin = shell_params.get('plugin', DEFAULT_SHELL_PLUGIN) + shell = self.cli_shells[shell_plugin](**shell_params) + return shell + + def get_job_interface(self, job_params): job_plugin = job_params['plugin'] - shell = self.cli_shells[shell_plugin](**shell_params) job_interface = self.cli_job_interfaces[job_plugin](**job_params) - return shell, job_interface + return job_interface def split_params(params): diff -r 3d6b71244ff9a708bd6eda419b50315d7aac4405 -r 72bb9c96a5bad59ca79a669ec1d43a87aa06986f lib/galaxy/jobs/runners/util/cli/factory.py --- /dev/null +++ b/lib/galaxy/jobs/runners/util/cli/factory.py @@ -0,0 +1,18 @@ +try: + from galaxy.jobs.runners.util.cli import ( + CliInterface, + split_params + ) + code_dir = 'lib' +except ImportError: + from lwr.managers.util.cli import ( + CliInterface, + split_params + ) + code_dir = '.' + + +def get_shell(params): + cli_interface = CliInterface(code_dir=code_dir) + shell_params, _ = split_params(params) + return cli_interface.get_shell_plugin(shell_params) Repository URL: https://bitbucket.org/galaxy/galaxy-central/ -- This is a commit notification from bitbucket.org. You are receiving this because you have the service enabled, addressing the recipient of this email.
participants (1)
-
commits-noreply@bitbucket.org