commit/galaxy-central: natefoo: Include LWR and job running fixes from John Chilton in next-stable.
1 new commit in galaxy-central: https://bitbucket.org/galaxy/galaxy-central/commits/e0607a09d9fa/ changeset: e0607a09d9fa branch: next-stable user: natefoo date: 2013-03-18 16:38:34 summary: Include LWR and job running fixes from John Chilton in next-stable. affected #: 10 files diff -r e58bb74b6afe3ebac548a2b2d124c24bf149a24a -r e0607a09d9fa9eb690e9b410d9c167072db0b55a job_conf.xml.sample_advanced --- a/job_conf.xml.sample_advanced +++ b/job_conf.xml.sample_advanced @@ -7,6 +7,7 @@ <plugin id="local" type="runner" load="galaxy.jobs.runners.local:LocalJobRunner"/><plugin id="pbs" type="runner" load="galaxy.jobs.runners.pbs:PBSJobRunner" workers="2"/><plugin id="gridengine" type="runner" load="galaxy.jobs.runners.drmaa:DRMAARunner"/> + <plugin id="lwr" type="runner" load="galaxy.jobs.runners.lwr.LwrJobRunner" /><!-- https://lwr.readthedocs.org --></plugins><handlers default="handlers"><!-- Additional job handlers - the id should match the name of a @@ -40,6 +41,11 @@ <param id="type">python</param><param id="function">foo</param></destination> + <destination id="secure_lwr" runner="lwr"> + <param id="url">https://windowshost.examle.com:8913/</param> + <!-- If set, private_token must match token remote LWR server configured with. --> + <param id="private_token">123456789changeme</param> + </destination></destinations><tools><!-- Tools can be configured to use specific destinations or handlers, diff -r e58bb74b6afe3ebac548a2b2d124c24bf149a24a -r e0607a09d9fa9eb690e9b410d9c167072db0b55a lib/galaxy/jobs/__init__.py --- a/lib/galaxy/jobs/__init__.py +++ b/lib/galaxy/jobs/__init__.py @@ -126,7 +126,12 @@ for plugin in self.__findall_with_required(plugins, 'plugin', ('id', 'type', 'load')): if plugin.get('type') == 'runner': workers = plugin.get('workers', plugins.get('workers', JobConfiguration.DEFAULT_NWORKERS)) - self.runner_plugins.append(dict(id=plugin.get('id'), load=plugin.get('load'), workers=int(workers))) + runner_kwds = self.__get_params(plugin) + runner_info = dict(id=plugin.get('id'), + load=plugin.get('load'), + workers=int(workers), + kwds=runner_kwds) + self.runner_plugins.append(runner_info) else: log.error('Unknown plugin type: %s' % plugin.get('type')) # Load tasks if configured @@ -480,7 +485,7 @@ log.warning("Job runner classes must be subclassed from BaseJobRunner, %s has bases: %s" % (id, runner_class.__bases__)) continue try: - rval[id] = runner_class( self.app, runner['workers'] ) + rval[id] = runner_class( self.app, runner[ 'workers' ], **runner.get( 'kwds', {} ) ) except TypeError: log.warning( "Job runner '%s:%s' has not been converted to a new-style runner" % ( module_name, class_name ) ) rval[id] = runner_class( self.app ) @@ -833,7 +838,7 @@ log.warning('set_runner() is deprecated, use set_job_destination()') self.set_job_destination(self.job_destination, external_id) - def set_job_destination(self, job_destination, external_id): + def set_job_destination(self, job_destination, external_id=None ): """ Persist job destination params in the database for recovery. diff -r e58bb74b6afe3ebac548a2b2d124c24bf149a24a -r e0607a09d9fa9eb690e9b410d9c167072db0b55a lib/galaxy/jobs/runners/lwr.py --- a/lib/galaxy/jobs/runners/lwr.py +++ b/lib/galaxy/jobs/runners/lwr.py @@ -3,11 +3,12 @@ from galaxy import model from galaxy.jobs.runners import AsynchronousJobState, AsynchronousJobRunner +from galaxy.jobs import JobDestination import errno from time import sleep -from lwr_client import FileStager, Client +from lwr_client import FileStager, Client, url_to_destination_params log = logging.getLogger( __name__ ) @@ -19,11 +20,16 @@ """ runner_name = "LWRRunner" - def __init__( self, app, nworkers ): + def __init__( self, app, nworkers, transport=None ): """Start the job runner """ super( LwrJobRunner, self ).__init__( app, nworkers ) self._init_monitor_thread() self._init_worker_threads() + self.transport_type = transport + + def url_to_destination( self, url ): + """Convert a legacy URL to a job destination""" + return JobDestination( runner="lwr", params=url_to_destination_params( url ) ) def check_watched_item(self, job_state): try: @@ -42,7 +48,7 @@ def queue_job(self, job_wrapper): stderr = stdout = command_line = '' - runner_url = job_wrapper.get_job_runner_url() + job_destination = job_wrapper.job_destination try: job_wrapper.prepare() @@ -74,7 +80,7 @@ rebuilt_command_line = file_stager.get_rewritten_command_line() job_id = file_stager.job_id client.launch( rebuilt_command_line ) - job_wrapper.set_runner( runner_url, job_id ) + job_wrapper.set_job_destination( job_destination, job_id ) job_wrapper.change_state( model.Job.states.RUNNING ) except Exception, exc: @@ -87,32 +93,26 @@ lwr_job_state.job_id = job_id lwr_job_state.old_state = True lwr_job_state.running = True - lwr_job_state.runner_url = runner_url + lwr_job_state.job_destination = job_destination self.monitor_job(lwr_job_state) def get_output_files(self, job_wrapper): output_fnames = job_wrapper.get_output_fnames() return [ str( o ) for o in output_fnames ] - - def determine_lwr_url(self, url): - lwr_url = url[ len( 'lwr://' ) : ] - return lwr_url - def get_client_from_wrapper(self, job_wrapper): job_id = job_wrapper.job_id if hasattr(job_wrapper, 'task_id'): job_id = "%s_%s" % (job_id, job_wrapper.task_id) - return self.get_client( job_wrapper.get_job_runner_url(), job_id ) + return self.get_client( job_wrapper.job_destination.params, job_id ) def get_client_from_state(self, job_state): - job_runner = job_state.runner_url + job_destination_params = job_state.job_destination.params job_id = job_state.job_id - return self.get_client(job_runner, job_id) + return self.get_client( job_destination_params, job_id ) - def get_client(self, job_runner, job_id): - lwr_url = self.determine_lwr_url( job_runner ) - return Client(lwr_url, job_id) + def get_client( self, job_destination_params, job_id ): + return Client( job_destination_params, job_id, transport_type=self.transport_type ) def finish_job( self, job_state ): stderr = stdout = command_line = '' @@ -210,7 +210,7 @@ lwr_url = job.job_runner_name job_id = job.job_runner_external_id log.debug("Attempt remote lwr kill of job with url %s and id %s" % (lwr_url, job_id)) - client = self.get_client(lwr_url, job_id) + client = self.get_client(job.destination_params, job_id) client.kill() @@ -219,6 +219,7 @@ job_state = AsynchronousJobState() job_state.job_id = str( job.get_job_runner_external_id() ) job_state.runner_url = job_wrapper.get_job_runner_url() + job_state.job_destination = job_wrapper.job_destination job_wrapper.command_line = job.get_command_line() job_state.job_wrapper = job_wrapper if job.get_state() == model.Job.states.RUNNING: diff -r e58bb74b6afe3ebac548a2b2d124c24bf149a24a -r e0607a09d9fa9eb690e9b410d9c167072db0b55a lib/galaxy/jobs/runners/lwr_client/__init__.py --- a/lib/galaxy/jobs/runners/lwr_client/__init__.py +++ b/lib/galaxy/jobs/runners/lwr_client/__init__.py @@ -5,502 +5,9 @@ This module contains logic for interfacing with an external LWR server. """ -import os -import re -import time -import urllib -import simplejson +from .stager import FileStager +from .client import Client +from .destination import url_to_destination_params -from transport import get_transport - - -class JobInputs(object): - """ - Abstractions over dynamic inputs created for a given job (namely the command to - execute and created configfiles). - - **Parameters** - - command_line : str - Local command to execute for this job. (To be rewritten.) - config_files : str - Config files created for this job. (To be rewritten.) - - - >>> import tempfile - >>> tf = tempfile.NamedTemporaryFile() - >>> def setup_inputs(tf): - ... open(tf.name, "w").write("world /path/to/input the rest") - ... inputs = JobInputs("hello /path/to/input", [tf.name]) - ... return inputs - >>> inputs = setup_inputs(tf) - >>> inputs.rewrite_paths("/path/to/input", 'C:\\input') - >>> inputs.rewritten_command_line - 'hello C:\\\\input' - >>> inputs.rewritten_config_files[tf.name] - 'world C:\\\\input the rest' - >>> tf.close() - >>> tf = tempfile.NamedTemporaryFile() - >>> inputs = setup_inputs(tf) - >>> inputs.find_referenced_subfiles('/path/to') - ['/path/to/input'] - >>> inputs.path_referenced('/path/to') - True - >>> inputs.path_referenced('/path/to/input') - True - >>> inputs.path_referenced('/path/to/notinput') - False - >>> tf.close() - """ - - def __init__(self, command_line, config_files): - self.rewritten_command_line = command_line - self.rewritten_config_files = {} - for config_file in config_files or []: - config_contents = _read(config_file) - self.rewritten_config_files[config_file] = config_contents - - def find_referenced_subfiles(self, directory): - """ - Return list of files below specified `directory` in job inputs. Could - use more sophisticated logic (match quotes to handle spaces, handle - subdirectories, etc...). - - **Parameters** - - directory : str - Full path to directory to search. - - """ - pattern = r"(%s%s\S+)" % (directory, os.sep) - referenced_files = set() - for input_contents in self.__items(): - referenced_files.update(re.findall(pattern, input_contents)) - return list(referenced_files) - - def path_referenced(self, path): - pattern = r"%s" % path - found = False - for input_contents in self.__items(): - if re.findall(pattern, input_contents): - found = True - break - return found - - def rewrite_paths(self, local_path, remote_path): - """ - Rewrite references to `local_path` with `remote_path` in job inputs. - """ - self.__rewrite_command_line(local_path, remote_path) - self.__rewrite_config_files(local_path, remote_path) - - def __rewrite_command_line(self, local_path, remote_path): - self.rewritten_command_line = self.rewritten_command_line.replace(local_path, remote_path) - - def __rewrite_config_files(self, local_path, remote_path): - for config_file, rewritten_contents in self.rewritten_config_files.iteritems(): - self.rewritten_config_files[config_file] = rewritten_contents.replace(local_path, remote_path) - - def __items(self): - items = [self.rewritten_command_line] - items.extend(self.rewritten_config_files.values()) - return items - - -class FileStager(object): - """ - Objects of the FileStager class interact with an LWR client object to - stage the files required to run jobs on a remote LWR server. - - **Parameters** - - client : Client - LWR client object. - command_line : str - The local command line to execute, this will be rewritten for the remote server. - config_files : list - List of Galaxy 'configfile's produced for this job. These will be rewritten and sent to remote server. - input_files : list - List of input files used by job. These will be transferred and references rewritten. - output_files : list - List of output_files produced by job. - tool_dir : str - Directory containing tool to execute (if a wrapper is used, it will be transferred to remote server). - working_directory : str - Local path created by Galaxy for running this job. - - """ - - def __init__(self, client, tool, command_line, config_files, input_files, output_files, working_directory): - """ - """ - self.client = client - self.command_line = command_line - self.config_files = config_files - self.input_files = input_files - self.output_files = output_files - self.tool_id = tool.id - self.tool_version = tool.version - self.tool_dir = os.path.abspath(tool.tool_dir) - self.working_directory = working_directory - - # Setup job inputs, these will need to be rewritten before - # shipping off to remote LWR server. - self.job_inputs = JobInputs(self.command_line, self.config_files) - - self.file_renames = {} - - self.__handle_setup() - self.__initialize_referenced_tool_files() - self.__upload_tool_files() - self.__upload_input_files() - self.__upload_working_directory_files() - self.__initialize_output_file_renames() - self.__initialize_task_output_file_renames() - self.__initialize_config_file_renames() - self.__handle_rewrites() - self.__upload_rewritten_config_files() - - def __handle_setup(self): - job_config = self.client.setup(self.tool_id, self.tool_version) - - self.new_working_directory = job_config['working_directory'] - self.new_outputs_directory = job_config['outputs_directory'] - self.remote_path_separator = job_config['path_separator'] - # If remote LWR server assigned job id, use that otherwise - # just use local job_id assigned. - galaxy_job_id = self.client.job_id - self.job_id = job_config.get('job_id', galaxy_job_id) - if self.job_id != galaxy_job_id: - # Remote LWR server assigned an id different than the - # Galaxy job id, update client to reflect this. - self.client.job_id = self.job_id - - def __initialize_referenced_tool_files(self): - self.referenced_tool_files = self.job_inputs.find_referenced_subfiles(self.tool_dir) - - def __upload_tool_files(self): - for referenced_tool_file in self.referenced_tool_files: - tool_upload_response = self.client.upload_tool_file(referenced_tool_file) - self.file_renames[referenced_tool_file] = tool_upload_response['path'] - - def __upload_input_files(self): - for input_file in self.input_files: - self.__upload_input_file(input_file) - self.__upload_input_extra_files(input_file) - - def __upload_input_file(self, input_file): - if self.job_inputs.path_referenced(input_file): - input_upload_response = self.client.upload_input(input_file) - self.file_renames[input_file] = input_upload_response['path'] - - def __upload_input_extra_files(self, input_file): - # TODO: Determine if this is object store safe and what needs to be - # done if it is not. - files_path = "%s_files" % input_file[0:-len(".dat")] - if os.path.exists(files_path) and self.job_inputs.path_referenced(files_path): - for extra_file in os.listdir(files_path): - extra_file_path = os.path.join(files_path, extra_file) - relative_path = os.path.basename(files_path) - extra_file_relative_path = os.path.join(relative_path, extra_file) - response = self.client.upload_extra_input(extra_file_path, extra_file_relative_path) - self.file_renames[extra_file_path] = response['path'] - - def __upload_working_directory_files(self): - # Task manager stages files into working directory, these need to be - # uploaded if present. - for working_directory_file in os.listdir(self.working_directory): - path = os.path.join(self.working_directory, working_directory_file) - working_file_response = self.client.upload_working_directory_file(path) - self.file_renames[path] = working_file_response['path'] - - def __initialize_output_file_renames(self): - for output_file in self.output_files: - self.file_renames[output_file] = r'%s%s%s' % (self.new_outputs_directory, - self.remote_path_separator, - os.path.basename(output_file)) - - def __initialize_task_output_file_renames(self): - for output_file in self.output_files: - name = os.path.basename(output_file) - self.file_renames[os.path.join(self.working_directory, name)] = r'%s%s%s' % (self.new_working_directory, - self.remote_path_separator, - name) - - def __initialize_config_file_renames(self): - for config_file in self.config_files: - self.file_renames[config_file] = r'%s%s%s' % (self.new_working_directory, - self.remote_path_separator, - os.path.basename(config_file)) - - def __rewrite_paths(self, contents): - new_contents = contents - for local_path, remote_path in self.file_renames.iteritems(): - new_contents = new_contents.replace(local_path, remote_path) - return new_contents - - def __handle_rewrites(self): - for local_path, remote_path in self.file_renames.iteritems(): - self.job_inputs.rewrite_paths(local_path, remote_path) - - def __upload_rewritten_config_files(self): - for config_file, new_config_contents in self.job_inputs.rewritten_config_files.iteritems(): - self.client.upload_config_file(config_file, new_config_contents) - - def get_rewritten_command_line(self): - """ - Returns the rewritten version of the command line to execute suitable - for remote host. - """ - return self.job_inputs.rewritten_command_line - - -class parseJson(object): - - def __init__(self): - pass - - def __call__(self, func): - def replacement(*args, **kwargs): - response = func(*args, **kwargs) - return simplejson.loads(response) - return replacement - - -class Client(object): - """ - Objects of this client class perform low-level communication with a remote LWR server. - - **Parameters** - - remote_host : str - Remote URL of the LWR server. - job_id : str - Galaxy job/task id. - private_key : str (optional) - Secret key the remote LWR server is configured with. - """ - - def __init__(self, remote_host, job_id, private_key=None): - if not remote_host.endswith("/"): - remote_host = remote_host + "/" - ## If we don't have an explicit private_key defined, check for - ## one embedded in the URL. A URL of the form - ## https://moo@cow:8913 will try to contact https://cow:8913 - ## with a private key of moo - private_key_format = "https?://(.*)@.*/?" - private_key_match = re.match(private_key_format, remote_host) - if not private_key and private_key_match: - private_key = private_key_match.group(1) - remote_host = remote_host.replace("%s@" % private_key, '', 1) - self.remote_host = remote_host - self.job_id = job_id - self.private_key = private_key - self.transport = get_transport() - - def __build_url(self, command, args): - if self.private_key: - args["private_key"] = self.private_key - data = urllib.urlencode(args) - url = self.remote_host + command + "?" + data - return url - - def __raw_execute(self, command, args={}, data=None, input_path=None, output_path=None): - url = self.__build_url(command, args) - response = self.transport.execute(url, data=data, input_path=input_path, output_path=output_path) - return response - - @parseJson() - def __upload_file(self, action, path, name=None, contents=None): - if not name: - name = os.path.basename(path) - args = {"job_id": self.job_id, "name": name} - input_path = path - if contents: - input_path = None - return self.__raw_execute(action, args, contents, input_path) - - def upload_tool_file(self, path): - """ - Upload a tool related file (e.g. wrapper) required to run job. - - **Parameters** - - path : str - Local path tool. - """ - return self.__upload_file("upload_tool_file", path) - - def upload_input(self, path): - """ - Upload input dataset to remote server. - - **Parameters** - - path : str - Local path of input dataset. - """ - return self.__upload_file("upload_input", path) - - def upload_extra_input(self, path, relative_name): - """ - Upload extra input file to remote server. - - **Parameters** - - path : str - Extra files path of input dataset corresponding to this input. - relative_name : str - Relative path of extra file to upload relative to inputs extra files path. - """ - return self.__upload_file("upload_extra_input", path, name=relative_name) - - def upload_config_file(self, path, contents): - """ - Upload a job's config file to the remote server. - - **Parameters** - - path : str - Local path to the original config file. - contents : str - Rewritten contents of the config file to upload. - """ - return self.__upload_file("upload_config_file", path, contents=contents) - - def upload_working_directory_file(self, path): - """ - Upload the supplied file (path) from a job's working directory - to remote server. - - **Parameters** - - path : str - Path to file to upload. - """ - return self.__upload_file("upload_working_directory_file", path) - - @parseJson() - def _get_output_type(self, name): - return self.__raw_execute("get_output_type", {"name": name, - "job_id": self.job_id}) - - def download_work_dir_output(self, source, working_directory, output_path): - """ - Download an output dataset specified with from_work_dir from the - remote server. - - **Parameters** - - source : str - Path in job's working_directory to find output in. - working_directory : str - Local working_directory for the job. - output_path : str - Full path to output dataset. - """ - output = open(output_path, "wb") - name = os.path.basename(source) - self.__raw_download_output(name, self.job_id, "work_dir", output) - - def download_output(self, path, working_directory): - """ - Download an output dataset from the remote server. - - **Parameters** - - path : str - Local path of the dataset. - working_directory : str - Local working_directory for the job. - """ - name = os.path.basename(path) - output_type = self._get_output_type(name) - if output_type == "direct": - output_path = path - elif output_type == "task": - output_path = os.path.join(working_directory, name) - else: - raise Exception("No remote output found for dataset with path %s" % path) - self.__raw_download_output(name, self.job_id, output_type, output_path) - - def __raw_download_output(self, name, job_id, output_type, output_path): - self.__raw_execute("download_output", - {"name": name, - "job_id": self.job_id, - "output_type": output_type}, - output_path=output_path) - - def launch(self, command_line): - """ - Run or queue up the execution of the supplied - `command_line` on the remote server. - - **Parameters** - - command_line : str - Command to execute. - """ - return self.__raw_execute("launch", {"command_line": command_line, - "job_id": self.job_id}) - - def kill(self): - """ - Cancel remote job, either removing from the queue or killing it. - """ - return self.__raw_execute("kill", {"job_id": self.job_id}) - - def wait(self): - """ - Wait for job to finish. - """ - while True: - complete_response = self.raw_check_complete() - if complete_response["complete"] == "true": - return complete_response - time.sleep(1) - - @parseJson() - def raw_check_complete(self): - """ - Get check_complete response from the remote server. - """ - check_complete_response = self.__raw_execute("check_complete", {"job_id": self.job_id}) - return check_complete_response - - def check_complete(self): - """ - Return boolean indicating whether the job is complete. - """ - return self.raw_check_complete()["complete"] == "true" - - def clean(self): - """ - Cleanup the remote job. - """ - self.__raw_execute("clean", {"job_id": self.job_id}) - - @parseJson() - def setup(self, tool_id=None, tool_version=None): - """ - Setup remote LWR server to run this job. - """ - setup_args = {"job_id": self.job_id} - if tool_id: - setup_args["tool_id"] = tool_id - if tool_version: - setup_args["tool_version"] = tool_version - return self.__raw_execute("setup", setup_args) - - -def _read(path): - """ - Utility method to quickly read small files (config files and tool - wrappers) into memory as strings. - """ - input = open(path, "r") - try: - return input.read() - finally: - input.close() +__all__ = [Client, FileStager, url_to_destination_params] diff -r e58bb74b6afe3ebac548a2b2d124c24bf149a24a -r e0607a09d9fa9eb690e9b410d9c167072db0b55a lib/galaxy/jobs/runners/lwr_client/client.py --- /dev/null +++ b/lib/galaxy/jobs/runners/lwr_client/client.py @@ -0,0 +1,235 @@ +import os +import time +import urllib +import simplejson + +from .transport import get_transport +from .destination import url_to_destination_params + + +class parseJson(object): + + def __init__(self): + pass + + def __call__(self, func): + def replacement(*args, **kwargs): + response = func(*args, **kwargs) + return simplejson.loads(response) + return replacement + + +class Client(object): + """ + Objects of this client class perform low-level communication with a remote LWR server. + + **Parameters** + + destination_params : dict or str + connection parameters, either url with dict containing url (and optionally `private_token`). + job_id : str + Galaxy job/task id. + """ + + def __init__(self, destination_params, job_id, transport_type=None): + if isinstance(destination_params, str) or isinstance(destination_params, unicode): + destination_params = url_to_destination_params(destination_params) + self.remote_host = destination_params.get("url") + assert self.remote_host != None, "Failed to determine url for LWR client." + self.private_key = destination_params.get("private_token", None) + self.job_id = job_id + self.transport = get_transport(transport_type) + + def __build_url(self, command, args): + if self.private_key: + args["private_key"] = self.private_key + data = urllib.urlencode(args) + url = self.remote_host + command + "?" + data + return url + + def __raw_execute(self, command, args={}, data=None, input_path=None, output_path=None): + url = self.__build_url(command, args) + response = self.transport.execute(url, data=data, input_path=input_path, output_path=output_path) + return response + + @parseJson() + def __upload_file(self, action, path, name=None, contents=None): + if not name: + name = os.path.basename(path) + args = {"job_id": self.job_id, "name": name} + input_path = path + if contents: + input_path = None + return self.__raw_execute(action, args, contents, input_path) + + def upload_tool_file(self, path): + """ + Upload a tool related file (e.g. wrapper) required to run job. + + **Parameters** + + path : str + Local path tool. + """ + return self.__upload_file("upload_tool_file", path) + + def upload_input(self, path): + """ + Upload input dataset to remote server. + + **Parameters** + + path : str + Local path of input dataset. + """ + return self.__upload_file("upload_input", path) + + def upload_extra_input(self, path, relative_name): + """ + Upload extra input file to remote server. + + **Parameters** + + path : str + Extra files path of input dataset corresponding to this input. + relative_name : str + Relative path of extra file to upload relative to inputs extra files path. + """ + return self.__upload_file("upload_extra_input", path, name=relative_name) + + def upload_config_file(self, path, contents): + """ + Upload a job's config file to the remote server. + + **Parameters** + + path : str + Local path to the original config file. + contents : str + Rewritten contents of the config file to upload. + """ + return self.__upload_file("upload_config_file", path, contents=contents) + + def upload_working_directory_file(self, path): + """ + Upload the supplied file (path) from a job's working directory + to remote server. + + **Parameters** + + path : str + Path to file to upload. + """ + return self.__upload_file("upload_working_directory_file", path) + + @parseJson() + def _get_output_type(self, name): + return self.__raw_execute("get_output_type", {"name": name, + "job_id": self.job_id}) + + def download_work_dir_output(self, source, working_directory, output_path): + """ + Download an output dataset specified with from_work_dir from the + remote server. + + **Parameters** + + source : str + Path in job's working_directory to find output in. + working_directory : str + Local working_directory for the job. + output_path : str + Full path to output dataset. + """ + output = open(output_path, "wb") + name = os.path.basename(source) + self.__raw_download_output(name, self.job_id, "work_dir", output) + + def download_output(self, path, working_directory): + """ + Download an output dataset from the remote server. + + **Parameters** + + path : str + Local path of the dataset. + working_directory : str + Local working_directory for the job. + """ + name = os.path.basename(path) + output_type = self._get_output_type(name) + if output_type == "direct": + output_path = path + elif output_type == "task": + output_path = os.path.join(working_directory, name) + else: + raise Exception("No remote output found for dataset with path %s" % path) + self.__raw_download_output(name, self.job_id, output_type, output_path) + + def __raw_download_output(self, name, job_id, output_type, output_path): + self.__raw_execute("download_output", + {"name": name, + "job_id": self.job_id, + "output_type": output_type}, + output_path=output_path) + + def launch(self, command_line): + """ + Run or queue up the execution of the supplied + `command_line` on the remote server. + + **Parameters** + + command_line : str + Command to execute. + """ + return self.__raw_execute("launch", {"command_line": command_line, + "job_id": self.job_id}) + + def kill(self): + """ + Cancel remote job, either removing from the queue or killing it. + """ + return self.__raw_execute("kill", {"job_id": self.job_id}) + + def wait(self): + """ + Wait for job to finish. + """ + while True: + complete_response = self.raw_check_complete() + if complete_response["complete"] == "true": + return complete_response + time.sleep(1) + + @parseJson() + def raw_check_complete(self): + """ + Get check_complete response from the remote server. + """ + check_complete_response = self.__raw_execute("check_complete", {"job_id": self.job_id}) + return check_complete_response + + def check_complete(self): + """ + Return boolean indicating whether the job is complete. + """ + return self.raw_check_complete()["complete"] == "true" + + def clean(self): + """ + Cleanup the remote job. + """ + self.__raw_execute("clean", {"job_id": self.job_id}) + + @parseJson() + def setup(self, tool_id=None, tool_version=None): + """ + Setup remote LWR server to run this job. + """ + setup_args = {"job_id": self.job_id} + if tool_id: + setup_args["tool_id"] = tool_id + if tool_version: + setup_args["tool_version"] = tool_version + return self.__raw_execute("setup", setup_args) diff -r e58bb74b6afe3ebac548a2b2d124c24bf149a24a -r e0607a09d9fa9eb690e9b410d9c167072db0b55a lib/galaxy/jobs/runners/lwr_client/destination.py --- /dev/null +++ b/lib/galaxy/jobs/runners/lwr_client/destination.py @@ -0,0 +1,44 @@ + +from re import match + + +def url_to_destination_params(url): + """Convert a legacy runner URL to a job destination + + >>> params_simple = url_to_destination_params("http://localhost:8913/") + >>> params_simple["url"] + 'http://localhost:8913/' + >>> params_simple["private_token"] is None + True + >>> advanced_url = "https://1234x@example.com:8914/managers/longqueue" + >>> params_advanced = url_to_destination_params(advanced_url) + >>> params_advanced["url"] + 'https://example.com:8914/managers/longqueue/' + >>> params_advanced["private_token"] + '1234x' + >>> runner_url = "lwr://http://localhost:8913/" + >>> runner_params = url_to_destination_params(runner_url) + >>> runner_params['url'] + 'http://localhost:8913/' + """ + + if url.startswith("lwr://"): + url = url[len("lwr://"):] + + if not url.endswith("/"): + url += "/" + + ## Check for private token embedded in the URL. A URL of the form + ## https://moo@cow:8913 will try to contact https://cow:8913 + ## with a private key of moo + private_token_format = "https?://(.*)@.*/?" + private_token_match = match(private_token_format, url) + private_token = None + if private_token_match: + private_token = private_token_match.group(1) + url = url.replace("%s@" % private_token, '', 1) + + destination_args = {"url": url, + "private_token": private_token} + + return destination_args diff -r e58bb74b6afe3ebac548a2b2d124c24bf149a24a -r e0607a09d9fa9eb690e9b410d9c167072db0b55a lib/galaxy/jobs/runners/lwr_client/stager.py --- /dev/null +++ b/lib/galaxy/jobs/runners/lwr_client/stager.py @@ -0,0 +1,256 @@ + +import os +from re import findall + + +class JobInputs(object): + """ + Abstractions over dynamic inputs created for a given job (namely the command to + execute and created configfiles). + + **Parameters** + + command_line : str + Local command to execute for this job. (To be rewritten.) + config_files : str + Config files created for this job. (To be rewritten.) + + + >>> import tempfile + >>> tf = tempfile.NamedTemporaryFile() + >>> def setup_inputs(tf): + ... open(tf.name, "w").write("world /path/to/input the rest") + ... inputs = JobInputs("hello /path/to/input", [tf.name]) + ... return inputs + >>> inputs = setup_inputs(tf) + >>> inputs.rewrite_paths("/path/to/input", 'C:\\input') + >>> inputs.rewritten_command_line + 'hello C:\\\\input' + >>> inputs.rewritten_config_files[tf.name] + 'world C:\\\\input the rest' + >>> tf.close() + >>> tf = tempfile.NamedTemporaryFile() + >>> inputs = setup_inputs(tf) + >>> inputs.find_referenced_subfiles('/path/to') + ['/path/to/input'] + >>> inputs.path_referenced('/path/to') + True + >>> inputs.path_referenced('/path/to/input') + True + >>> inputs.path_referenced('/path/to/notinput') + False + >>> tf.close() + """ + + def __init__(self, command_line, config_files): + self.rewritten_command_line = command_line + self.rewritten_config_files = {} + for config_file in config_files or []: + config_contents = _read(config_file) + self.rewritten_config_files[config_file] = config_contents + + def find_referenced_subfiles(self, directory): + """ + Return list of files below specified `directory` in job inputs. Could + use more sophisticated logic (match quotes to handle spaces, handle + subdirectories, etc...). + + **Parameters** + + directory : str + Full path to directory to search. + + """ + pattern = r"(%s%s\S+)" % (directory, os.sep) + referenced_files = set() + for input_contents in self.__items(): + referenced_files.update(findall(pattern, input_contents)) + return list(referenced_files) + + def path_referenced(self, path): + pattern = r"%s" % path + found = False + for input_contents in self.__items(): + if findall(pattern, input_contents): + found = True + break + return found + + def rewrite_paths(self, local_path, remote_path): + """ + Rewrite references to `local_path` with `remote_path` in job inputs. + """ + self.__rewrite_command_line(local_path, remote_path) + self.__rewrite_config_files(local_path, remote_path) + + def __rewrite_command_line(self, local_path, remote_path): + self.rewritten_command_line = self.rewritten_command_line.replace(local_path, remote_path) + + def __rewrite_config_files(self, local_path, remote_path): + for config_file, rewritten_contents in self.rewritten_config_files.iteritems(): + self.rewritten_config_files[config_file] = rewritten_contents.replace(local_path, remote_path) + + def __items(self): + items = [self.rewritten_command_line] + items.extend(self.rewritten_config_files.values()) + return items + + +class FileStager(object): + """ + Objects of the FileStager class interact with an LWR client object to + stage the files required to run jobs on a remote LWR server. + + **Parameters** + + client : Client + LWR client object. + command_line : str + The local command line to execute, this will be rewritten for the remote server. + config_files : list + List of Galaxy 'configfile's produced for this job. These will be rewritten and sent to remote server. + input_files : list + List of input files used by job. These will be transferred and references rewritten. + output_files : list + List of output_files produced by job. + tool_dir : str + Directory containing tool to execute (if a wrapper is used, it will be transferred to remote server). + working_directory : str + Local path created by Galaxy for running this job. + + """ + + def __init__(self, client, tool, command_line, config_files, input_files, output_files, working_directory): + """ + """ + self.client = client + self.command_line = command_line + self.config_files = config_files + self.input_files = input_files + self.output_files = output_files + self.tool_id = tool.id + self.tool_version = tool.version + self.tool_dir = os.path.abspath(tool.tool_dir) + self.working_directory = working_directory + + # Setup job inputs, these will need to be rewritten before + # shipping off to remote LWR server. + self.job_inputs = JobInputs(self.command_line, self.config_files) + + self.file_renames = {} + + self.__handle_setup() + self.__initialize_referenced_tool_files() + self.__upload_tool_files() + self.__upload_input_files() + self.__upload_working_directory_files() + self.__initialize_output_file_renames() + self.__initialize_task_output_file_renames() + self.__initialize_config_file_renames() + self.__handle_rewrites() + self.__upload_rewritten_config_files() + + def __handle_setup(self): + job_config = self.client.setup(self.tool_id, self.tool_version) + + self.new_working_directory = job_config['working_directory'] + self.new_outputs_directory = job_config['outputs_directory'] + self.remote_path_separator = job_config['path_separator'] + # If remote LWR server assigned job id, use that otherwise + # just use local job_id assigned. + galaxy_job_id = self.client.job_id + self.job_id = job_config.get('job_id', galaxy_job_id) + if self.job_id != galaxy_job_id: + # Remote LWR server assigned an id different than the + # Galaxy job id, update client to reflect this. + self.client.job_id = self.job_id + + def __initialize_referenced_tool_files(self): + self.referenced_tool_files = self.job_inputs.find_referenced_subfiles(self.tool_dir) + + def __upload_tool_files(self): + for referenced_tool_file in self.referenced_tool_files: + tool_upload_response = self.client.upload_tool_file(referenced_tool_file) + self.file_renames[referenced_tool_file] = tool_upload_response['path'] + + def __upload_input_files(self): + for input_file in self.input_files: + self.__upload_input_file(input_file) + self.__upload_input_extra_files(input_file) + + def __upload_input_file(self, input_file): + if self.job_inputs.path_referenced(input_file): + input_upload_response = self.client.upload_input(input_file) + self.file_renames[input_file] = input_upload_response['path'] + + def __upload_input_extra_files(self, input_file): + # TODO: Determine if this is object store safe and what needs to be + # done if it is not. + files_path = "%s_files" % input_file[0:-len(".dat")] + if os.path.exists(files_path) and self.job_inputs.path_referenced(files_path): + for extra_file in os.listdir(files_path): + extra_file_path = os.path.join(files_path, extra_file) + relative_path = os.path.basename(files_path) + extra_file_relative_path = os.path.join(relative_path, extra_file) + response = self.client.upload_extra_input(extra_file_path, extra_file_relative_path) + self.file_renames[extra_file_path] = response['path'] + + def __upload_working_directory_files(self): + # Task manager stages files into working directory, these need to be + # uploaded if present. + for working_directory_file in os.listdir(self.working_directory): + path = os.path.join(self.working_directory, working_directory_file) + working_file_response = self.client.upload_working_directory_file(path) + self.file_renames[path] = working_file_response['path'] + + def __initialize_output_file_renames(self): + for output_file in self.output_files: + self.file_renames[output_file] = r'%s%s%s' % (self.new_outputs_directory, + self.remote_path_separator, + os.path.basename(output_file)) + + def __initialize_task_output_file_renames(self): + for output_file in self.output_files: + name = os.path.basename(output_file) + self.file_renames[os.path.join(self.working_directory, name)] = r'%s%s%s' % (self.new_working_directory, + self.remote_path_separator, + name) + + def __initialize_config_file_renames(self): + for config_file in self.config_files: + self.file_renames[config_file] = r'%s%s%s' % (self.new_working_directory, + self.remote_path_separator, + os.path.basename(config_file)) + + def __rewrite_paths(self, contents): + new_contents = contents + for local_path, remote_path in self.file_renames.iteritems(): + new_contents = new_contents.replace(local_path, remote_path) + return new_contents + + def __handle_rewrites(self): + for local_path, remote_path in self.file_renames.iteritems(): + self.job_inputs.rewrite_paths(local_path, remote_path) + + def __upload_rewritten_config_files(self): + for config_file, new_config_contents in self.job_inputs.rewritten_config_files.iteritems(): + self.client.upload_config_file(config_file, new_config_contents) + + def get_rewritten_command_line(self): + """ + Returns the rewritten version of the command line to execute suitable + for remote host. + """ + return self.job_inputs.rewritten_command_line + + +def _read(path): + """ + Utility method to quickly read small files (config files and tool + wrappers) into memory as strings. + """ + input = open(path, "r") + try: + return input.read() + finally: + input.close() diff -r e58bb74b6afe3ebac548a2b2d124c24bf149a24a -r e0607a09d9fa9eb690e9b410d9c167072db0b55a lib/galaxy/jobs/runners/lwr_client/transport/__init__.py --- a/lib/galaxy/jobs/runners/lwr_client/transport/__init__.py +++ b/lib/galaxy/jobs/runners/lwr_client/transport/__init__.py @@ -3,14 +3,24 @@ import os -def get_transport(os_module=os): - use_curl = os_module.getenv('LWR_CURL_TRANSPORT', "0") - ## If LWR_CURL_TRANSPORT is unset or set to 0, use default, - ## else use curl. - if use_curl.isdigit() and not int(use_curl): - return Urllib2Transport() +def get_transport(transport_type=None, os_module=os): + transport_type = __get_transport_type(transport_type, os_module) + if transport_type == 'urllib': + transport = Urllib2Transport() else: - return PycurlTransport() + transport = PycurlTransport() + return transport +def __get_transport_type(transport_type, os_module): + if not transport_type: + use_curl = os_module.getenv('LWR_CURL_TRANSPORT', "0") + ## If LWR_CURL_TRANSPORT is unset or set to 0, use default, + ## else use curl. + if use_curl.isdigit() and not int(use_curl): + transport_type = 'urllib' + else: + transport_type = 'curl' + return transport_type + __all__ = [get_transport] diff -r e58bb74b6afe3ebac548a2b2d124c24bf149a24a -r e0607a09d9fa9eb690e9b410d9c167072db0b55a lib/galaxy/jobs/runners/pbs.py --- a/lib/galaxy/jobs/runners/pbs.py +++ b/lib/galaxy/jobs/runners/pbs.py @@ -173,7 +173,9 @@ return JobDestination(runner='pbs', params=params) def parse_destination_params(self, params): - """A wrapper method around __args_to_attrs() that allow administrators to define PBS params as either command-line options (as in ``qsub(1B)``) or more human-readable "long" args (as in ``pbs_submit(3B)``). + """A wrapper method around __args_to_attrs() that allow administrators to define PBS + params as either command-line options (as in ``qsub(1B)``) or more human-readable "long" + args (as in ``pbs_submit(3B)``). :returns: list of dicts -- The dicts map directly to pbs attropl structs (see ``pbs_submit(3B)``) """ @@ -185,7 +187,7 @@ arg = arg.lstrip('-') args[arg] = value except: - log.warning('Unrecognized long argument in destination params: %s' % k) + log.warning('Unrecognized long argument in destination params: %s' % arg) return self.__args_to_attrs(args) # Internal stuff diff -r e58bb74b6afe3ebac548a2b2d124c24bf149a24a -r e0607a09d9fa9eb690e9b410d9c167072db0b55a lib/galaxy/jobs/runners/tasks.py --- a/lib/galaxy/jobs/runners/tasks.py +++ b/lib/galaxy/jobs/runners/tasks.py @@ -9,12 +9,13 @@ from time import sleep from galaxy.jobs import TaskWrapper +from galaxy.jobs.runners import BaseJobRunner log = logging.getLogger( __name__ ) __all__ = [ 'TaskedJobRunner' ] -class TaskedJobRunner( object ): +class TaskedJobRunner( BaseJobRunner ): """ Job runner backed by a finite pool of worker threads. FIFO scheduling """ @@ -25,7 +26,7 @@ self._init_worker_threads() def queue_job( self, job_wrapper ): - super( LocalJobRunner, self ).queue_job( job_wrapper ) + super( TaskedJobRunner, self ).queue_job( job_wrapper ) if not job_wrapper.is_ready: return Repository URL: https://bitbucket.org/galaxy/galaxy-central/ -- This is a commit notification from bitbucket.org. You are receiving this because you have the service enabled, addressing the recipient of this email.
participants (1)
-
commits-noreply@bitbucket.org