3 new commits in galaxy-central: https://bitbucket.org/galaxy/galaxy-central/commits/05b757b20e06/ Changeset: 05b757b20e06 User: jmchilton Date: 2014-02-18 18:33:25 Summary: Random PEP-8 fixes to some api controllers. Affected #: 3 files diff -r a04725d3afedb65329b13bf5ba813e6381ec082f -r 05b757b20e06ee2667a3e910cf24e64f14915953 lib/galaxy/webapps/galaxy/api/libraries.py --- a/lib/galaxy/webapps/galaxy/api/libraries.py +++ b/lib/galaxy/webapps/galaxy/api/libraries.py @@ -10,6 +10,7 @@ import logging log = logging.getLogger( __name__ ) + class LibrariesController( BaseAPIController ): @web.expose_api @@ -130,7 +131,7 @@ new_library['synopsis'] = synopsis new_library['id'] = encoded_id return new_library - + def edit( self, trans, encoded_id, payload, **kwd ): """ * PUT /api/libraries/{encoded_id} diff -r a04725d3afedb65329b13bf5ba813e6381ec082f -r 05b757b20e06ee2667a3e910cf24e64f14915953 lib/galaxy/webapps/galaxy/api/library_contents.py --- a/lib/galaxy/webapps/galaxy/api/library_contents.py +++ b/lib/galaxy/webapps/galaxy/api/library_contents.py @@ -2,7 +2,7 @@ API operations on the contents of a library. """ import logging -from galaxy import web , exceptions +from galaxy import web, exceptions from galaxy.model import ExtendedMetadata, ExtendedMetadataIndex from galaxy.web.base.controller import BaseAPIController, UsesLibraryMixin, UsesLibraryMixinItems from galaxy.web.base.controller import UsesHistoryDatasetAssociationMixin @@ -11,6 +11,7 @@ log = logging.getLogger( __name__ ) + class LibraryContentsController( BaseAPIController, UsesLibraryMixin, UsesLibraryMixinItems, UsesHistoryDatasetAssociationMixin ): @@ -36,6 +37,7 @@ """ rval = [] current_user_roles = trans.get_current_user_roles() + def traverse( folder ): admin = trans.user_is_admin() rval = [] @@ -50,7 +52,9 @@ for ld in folder.datasets: if not admin: can_access = trans.app.security_agent.can_access_dataset( - current_user_roles, ld.library_dataset_dataset_association.dataset ) + current_user_roles, + ld.library_dataset_dataset_association.dataset + ) if (admin or can_access) and not ld.deleted: #log.debug( "type(folder): %s" % type( folder ) ) #log.debug( "type(api_path): %s; folder.api_path: %s" % ( type(folder.api_path), folder.api_path ) ) @@ -73,20 +77,20 @@ return "Invalid library id ( %s ) specified." % str( library_id ) #log.debug( "Root folder type: %s" % type( library.root_folder ) ) encoded_id = 'F' + trans.security.encode_id( library.root_folder.id ) - rval.append( dict( id = encoded_id, - type = 'folder', - name = '/', - url = url_for( 'library_content', library_id=library_id, id=encoded_id ) ) ) + rval.append( dict( id=encoded_id, + type='folder', + name='/', + url=url_for( 'library_content', library_id=library_id, id=encoded_id ) ) ) #log.debug( "Root folder attributes: %s" % str(dir(library.root_folder)) ) library.root_folder.api_path = '' for content in traverse( library.root_folder ): encoded_id = trans.security.encode_id( content.id ) if content.api_type == 'folder': encoded_id = 'F' + encoded_id - rval.append( dict( id = encoded_id, - type = content.api_type, - name = content.api_path, - url = url_for( 'library_content', library_id=library_id, id=encoded_id, ) ) ) + rval.append( dict( id=encoded_id, + type=content.api_type, + name=content.api_path, + url=url_for( 'library_content', library_id=library_id, id=encoded_id, ) ) ) return rval @web.expose_api diff -r a04725d3afedb65329b13bf5ba813e6381ec082f -r 05b757b20e06ee2667a3e910cf24e64f14915953 lib/galaxy/webapps/galaxy/api/roles.py --- a/lib/galaxy/webapps/galaxy/api/roles.py +++ b/lib/galaxy/webapps/galaxy/api/roles.py @@ -4,11 +4,12 @@ import logging from galaxy.web.base.controller import BaseAPIController, url_for from galaxy import web -from elementtree.ElementTree import XML log = logging.getLogger( __name__ ) + class RoleAPIController( BaseAPIController ): + @web.expose_api def index( self, trans, **kwd ): """ @@ -61,11 +62,11 @@ if not name or not description: trans.response.status = 400 return "Enter a valid name and a description" - if trans.sa_session.query( trans.app.model.Role ).filter( trans.app.model.Role.table.c.name==name ).first(): + if trans.sa_session.query( trans.app.model.Role ).filter( trans.app.model.Role.table.c.name == name ).first(): trans.response.status = 400 return "A role with that name already exists" - role_type = trans.app.model.Role.types.ADMIN #TODO: allow non-admins to create roles + role_type = trans.app.model.Role.types.ADMIN # TODO: allow non-admins to create roles role = trans.app.model.Role( name=name, description=description, type=role_type ) trans.sa_session.add( role ) @@ -73,12 +74,15 @@ users = [ trans.sa_session.query( trans.model.User ).get( trans.security.decode_id( i ) ) for i in user_ids ] group_ids = payload.get( 'group_ids', [] ) groups = [ trans.sa_session.query( trans.model.Group ).get( trans.security.decode_id( i ) ) for i in group_ids ] + # Create the UserRoleAssociations for user in users: trans.app.security_agent.associate_user_role( user, role ) + # Create the GroupRoleAssociations for group in groups: trans.app.security_agent.associate_group_role( group, role ) + trans.sa_session.flush() encoded_id = trans.security.encode_id( role.id ) item = role.to_dict( view='element', value_mapper={ 'id': trans.security.encode_id } ) https://bitbucket.org/galaxy/galaxy-central/commits/98253ff31a33/ Changeset: 98253ff31a33 User: jmchilton Date: 2014-02-18 18:33:28 Summary: Add note to job_conf.xml.sample_advanced about LWR+MQ. Affected #: 1 file diff -r 05b757b20e06ee2667a3e910cf24e64f14915953 -r 98253ff31a334d01eb2ee28e10c637f71c1ba6f3 job_conf.xml.sample_advanced --- a/job_conf.xml.sample_advanced +++ b/job_conf.xml.sample_advanced @@ -23,6 +23,11 @@ <!-- More information on LWR can be found at https://lwr.readthedocs.org --><!-- Uncomment following line to use libcurl to perform HTTP calls (defaults to urllib) --><!-- <param id="transport">curl</param> --> + <!-- Uncomment following parameters (second optional) to target a message + queue, ensure jobs_directory is specified on destinations and all + file actions are remote executable. --> + <!-- <param id="url">amqp://guest:guest@localhost:5672//</param> --> + <!-- <param id="manager">_default_</param> --></plugin><plugin id="cli" type="runner" load="galaxy.jobs.runners.cli:ShellJobRunner" /><plugin id="condor" type="runner" load="galaxy.jobs.runners.condor:CondorJobRunner" /> https://bitbucket.org/galaxy/galaxy-central/commits/5c3d7c234661/ Changeset: 5c3d7c234661 User: jmchilton Date: 2014-02-18 18:33:28 Summary: Improvement to message queue driven LWR. At high-level there are two core enhancements here - ability to kill message queue driven LWR jobs (though due to regressions in latest galaxy release there is not longer a way to initiate this from the GUI I don't think) and additional state transition (LWR+MQ jobs will now transition from queue to running properly, previously they just went from queue to complete). A lot of other small changes to LWR client aimed at getting file size of lwr_client/client.py and lwr_client/manager.py under control - see LWR commit log for more details. This updates the LWR client through LWR changeset 2d9f333. Affected #: 9 files diff -r 98253ff31a334d01eb2ee28e10c637f71c1ba6f3 -r 5c3d7c2346617774a4248b9ef990a3208e00576b lib/galaxy/jobs/runners/lwr.py --- a/lib/galaxy/jobs/runners/lwr.py +++ b/lib/galaxy/jobs/runners/lwr.py @@ -52,7 +52,7 @@ try: client = self.get_client_from_state(job_state) - if not hasattr(client, 'get_status'): + if hasattr(self.client_manager, 'ensure_has_status_update_callback'): # Message queue implementation. # TODO: Very hacky now, refactor after Dannon merges in his @@ -60,7 +60,7 @@ # check_watched_item like this and instead a callback needs to # be issued post job recovery allowing a message queue # consumer to be setup. - self.client_manager.ensure_has_job_completes_callback(self.__async_complete) + self.client_manager.ensure_has_status_update_callback(self.__async_update) return job_state status = client.get_status() @@ -69,16 +69,20 @@ # either way we are done I guess. self.mark_as_finished(job_state) return None - if status == "complete": + job_state = self.__update_job_state_for_lwr_status(job_state, status) + return job_state + + def __update_job_state_for_lwr_status(self, job_state, lwr_status): + if lwr_status == "complete": self.mark_as_finished(job_state) return None - if status == "running" and not job_state.running: + if lwr_status == "running" and not job_state.running: job_state.running = True job_state.job_wrapper.change_state( model.Job.states.RUNNING ) return job_state - def __async_complete( self, final_status ): - job_id = final_status[ "job_id" ] + def __async_update( self, full_status ): + job_id = full_status[ "job_id" ] job_state = self.__find_watched_job( job_id ) if not job_state: # Probably finished too quickly, sleep and try again. @@ -88,9 +92,9 @@ sleep( 2 ) job_state = self.__find_watched_job( job_id ) if not job_state: - log.warn( "Failed to find job corresponding to final status %s in %s" % ( final_status, self.watched ) ) + log.warn( "Failed to find job corresponding to final status %s in %s" % ( full_status, self.watched ) ) else: - self.mark_as_finished( job_state ) + self.__update_job_state_for_lwr_status(job_state, full_status["status"]) def __find_watched_job( self, job_id ): found_job = None @@ -227,7 +231,7 @@ job_wrapper = job_state.job_wrapper try: client = self.get_client_from_state(job_state) - run_results = client.final_status() + run_results = client.full_status() stdout = run_results.get('stdout', '') stderr = run_results.get('stderr', '') @@ -251,7 +255,7 @@ except Exception: message = "Failed to communicate with remote job server." job_wrapper.fail( message, exception=True ) - log.exception("failure running job %d" % job_wrapper.job_id) + log.exception("failure finishing job %d" % job_wrapper.job_id) return if not LwrJobRunner.__remote_metadata( client ): self._handle_metadata_externally( job_wrapper, resolve_requirements=True ) diff -r 98253ff31a334d01eb2ee28e10c637f71c1ba6f3 -r 5c3d7c2346617774a4248b9ef990a3208e00576b lib/galaxy/jobs/runners/lwr_client/amqp_exchange.py --- a/lib/galaxy/jobs/runners/lwr_client/amqp_exchange.py +++ b/lib/galaxy/jobs/runners/lwr_client/amqp_exchange.py @@ -40,10 +40,15 @@ self.__exchange = kombu.Exchange(DEFAULT_EXCHANGE_NAME, DEFAULT_EXCHANGE_TYPE) self.__timeout = timeout + @property + def url(self): + return self.__url + def consume(self, queue_name, callback, check=True, connection_kwargs={}): queue = self.__queue(queue_name) with self.connection(self.__url, **connection_kwargs) as connection: with kombu.Consumer(connection, queues=[queue], callbacks=[callback], accept=['json']): + log.debug("Consuming queue %s" % queue) while check: try: connection.drain_events(timeout=self.__timeout) @@ -54,6 +59,7 @@ with self.connection(self.__url) as connection: with pools.producers[connection].acquire() as producer: key = self.__queue_name(name) + log.debug("Publishing with key %s and payload %s" % (key, payload)) producer.publish( payload, serializer='json', diff -r 98253ff31a334d01eb2ee28e10c637f71c1ba6f3 -r 5c3d7c2346617774a4248b9ef990a3208e00576b lib/galaxy/jobs/runners/lwr_client/client.py --- a/lib/galaxy/jobs/runners/lwr_client/client.py +++ b/lib/galaxy/jobs/runners/lwr_client/client.py @@ -1,49 +1,18 @@ import os -import shutil -import json from json import dumps -from time import sleep from .destination import submit_params from .setup_handler import build as build_setup_handler from .job_directory import RemoteJobDirectory +from .decorators import parseJson +from .decorators import retry +from .util import copy +from .util import ensure_directory import logging log = logging.getLogger(__name__) CACHE_WAIT_SECONDS = 3 -MAX_RETRY_COUNT = 5 -RETRY_SLEEP_TIME = 0.1 - - -class parseJson(object): - - def __call__(self, func): - def replacement(*args, **kwargs): - response = func(*args, **kwargs) - return json.loads(response) - return replacement - - -class retry(object): - - def __call__(self, func): - - def replacement(*args, **kwargs): - max_count = MAX_RETRY_COUNT - count = 0 - while True: - count += 1 - try: - return func(*args, **kwargs) - except: - if count >= max_count: - raise - else: - sleep(RETRY_SLEEP_TIME) - continue - - return replacement class OutputNotFoundException(Exception): @@ -111,17 +80,78 @@ super(JobClient, self).__init__(destination_params, job_id) self.job_manager_interface = job_manager_interface - def _raw_execute(self, command, args={}, data=None, input_path=None, output_path=None): - return self.job_manager_interface.execute(command, args, data, input_path, output_path) + def launch(self, command_line, requirements=[], remote_staging=[], job_config=None): + """ + Queue up the execution of the supplied `command_line` on the remote + server. Called launch for historical reasons, should be renamed to + enqueue or something like that. - @property - def _submit_params(self): - return submit_params(self.destination_params) + **Parameters** + + command_line : str + Command to execute. + """ + launch_params = dict(command_line=command_line, job_id=self.job_id) + submit_params_dict = submit_params(self.destination_params) + if submit_params_dict: + launch_params['params'] = dumps(submit_params_dict) + if requirements: + launch_params['requirements'] = dumps([requirement.to_dict() for requirement in requirements]) + if remote_staging: + launch_params['remote_staging'] = dumps(remote_staging) + if job_config and self.setup_handler.local: + # Setup not yet called, job properties were inferred from + # destination arguments. Hence, must have LWR setup job + # before queueing. + setup_params = _setup_params_from_job_config(job_config) + launch_params["setup_params"] = dumps(setup_params) + return self._raw_execute("launch", launch_params) + + def full_status(self): + """ Return a dictionary summarizing final state of job. + """ + return self.raw_check_complete() + + def kill(self): + """ + Cancel remote job, either removing from the queue or killing it. + """ + return self._raw_execute("kill", {"job_id": self.job_id}) + + @retry() + @parseJson() + def raw_check_complete(self): + """ + Get check_complete response from the remote server. + """ + check_complete_response = self._raw_execute("check_complete", {"job_id": self.job_id}) + return check_complete_response + + def get_status(self): + check_complete_response = self.raw_check_complete() + # Older LWR instances won't set status so use 'complete', at some + # point drop backward compatibility. + status = check_complete_response.get("status", None) + if status in ["status", None]: + # LEGACY: Bug in certains older LWR instances returned literal + # "status". + complete = check_complete_response["complete"] == "true" + old_status = "complete" if complete else "running" + status = old_status + return status + + def clean(self): + """ + Cleanup the remote job. + """ + self._raw_execute("clean", {"job_id": self.job_id}) @parseJson() - def input_path(self, path, input_type, name=None): - args = {"job_id": self.job_id, "name": name, "input_type": input_type} - return self._raw_execute('input_path', args) + def remote_setup(self, **setup_args): + """ + Setup remote LWR server to run this job. + """ + return self._raw_execute("setup", setup_args) def put_file(self, path, input_type, name=None, contents=None, action_type='transfer'): if not name: @@ -134,9 +164,80 @@ return self._upload_file(args, contents, input_path) elif action_type == 'copy': lwr_path = self._raw_execute('input_path', args) - self._copy(path, lwr_path) + copy(path, lwr_path) return {'path': lwr_path} + def fetch_output(self, path, name, working_directory, action_type, output_type): + """ + Fetch (transfer, copy, etc...) an output from the remote LWR server. + + **Parameters** + + path : str + Local path of the dataset. + name : str + Remote name of file (i.e. path relative to remote staging output + or working directory). + working_directory : str + Local working_directory for the job. + action_type : str + Where to find file on LWR (output_workdir or output). legacy is also + an option in this case LWR is asked for location - this will only be + used if targetting an older LWR server that didn't return statuses + allowing this to be inferred. + """ + if output_type == 'legacy': + self._fetch_output_legacy(path, working_directory, action_type=action_type) + elif output_type == 'output_workdir': + self._fetch_work_dir_output(name, working_directory, path, action_type=action_type) + elif output_type == 'output': + self._fetch_output(path=path, name=name, action_type=action_type) + else: + raise Exception("Unknown output_type %s" % output_type) + + def _raw_execute(self, command, args={}, data=None, input_path=None, output_path=None): + return self.job_manager_interface.execute(command, args, data, input_path, output_path) + + # Deprecated + def _fetch_output_legacy(self, path, working_directory, action_type='transfer'): + # Needs to determine if output is task/working directory or standard. + name = os.path.basename(path) + + output_type = self._get_output_type(name) + if output_type == "none": + # Just make sure the file was created. + if not os.path.exists(path): + raise OutputNotFoundException(path) + return + elif output_type in ["task"]: + path = os.path.join(working_directory, name) + + self.__populate_output_path(name, path, output_type, action_type) + + def _fetch_output(self, path, name=None, check_exists_remotely=False, action_type='transfer'): + if not name: + # Extra files will send in the path. + name = os.path.basename(path) + + output_type = "direct" # Task/from_work_dir outputs now handled with fetch_work_dir_output + self.__populate_output_path(name, path, output_type, action_type) + + def _fetch_work_dir_output(self, name, working_directory, output_path, action_type='transfer'): + ensure_directory(output_path) + if action_type == 'transfer': + self.__raw_download_output(name, self.job_id, "work_dir", output_path) + else: # Even if action is none - LWR has a different work_dir so this needs to be copied. + lwr_path = self._output_path(name, self.job_id, 'work_dir')['path'] + copy(lwr_path, output_path) + + def __populate_output_path(self, name, output_path, output_type, action_type): + ensure_directory(output_path) + if action_type == 'transfer': + self.__raw_download_output(name, self.job_id, output_type, output_path) + elif action_type == 'copy': + lwr_path = self._output_path(name, self.job_id, output_type)['path'] + copy(lwr_path, output_path) + @parseJson() def _upload_file(self, args, contents, input_path): return self._raw_execute(self._upload_file_action(args), args, contents, input_path) @@ -163,75 +264,6 @@ return self._raw_execute("get_output_type", {"name": name, "job_id": self.job_id}) - # Deprecated - def fetch_output_legacy(self, path, working_directory, action_type='transfer'): - # Needs to determine if output is task/working directory or standard. - name = os.path.basename(path) - - output_type = self._get_output_type(name) - if output_type == "none": - # Just make sure the file was created. - if not os.path.exists(path): - raise OutputNotFoundException(path) - return - elif output_type in ["task"]: - path = os.path.join(working_directory, name) - - self.__populate_output_path(name, path, output_type, action_type) - - def fetch_output(self, path, name=None, check_exists_remotely=False, action_type='transfer'): - """ - Download an output dataset from the remote server. - - **Parameters** - - path : str - Local path of the dataset. - working_directory : str - Local working_directory for the job. - """ - - if not name: - # Extra files will send in the path. - name = os.path.basename(path) - - output_type = "direct" # Task/from_work_dir outputs now handled with fetch_work_dir_output - self.__populate_output_path(name, path, output_type, action_type) - - def __populate_output_path(self, name, output_path, output_type, action_type): - self.__ensure_directory(output_path) - if action_type == 'transfer': - self.__raw_download_output(name, self.job_id, output_type, output_path) - elif action_type == 'copy': - lwr_path = self._output_path(name, self.job_id, output_type)['path'] - self._copy(lwr_path, output_path) - - def fetch_work_dir_output(self, name, working_directory, output_path, action_type='transfer'): - """ - Download an output dataset specified with from_work_dir from the - remote server. - - **Parameters** - - name : str - Path in job's working_directory to find output in. - working_directory : str - Local working_directory for the job. - output_path : str - Full path to output dataset. - """ - self.__ensure_directory(output_path) - if action_type == 'transfer': - self.__raw_download_output(name, self.job_id, "work_dir", output_path) - else: # Even if action is none - LWR has a different work_dir so this needs to be copied. - lwr_path = self._output_path(name, self.job_id, 'work_dir')['path'] - self._copy(lwr_path, output_path) - - def __ensure_directory(self, output_path): - output_path_directory = os.path.dirname(output_path) - if not os.path.exists(output_path_directory): - os.makedirs(output_path_directory) - @parseJson() def _output_path(self, name, job_id, output_type): return self._raw_execute("output_path", @@ -248,107 +280,6 @@ } self._raw_execute("download_output", output_params, output_path=output_path) - def launch(self, command_line, requirements=[], remote_staging=[], job_config=None): - """ - Queue up the execution of the supplied `command_line` on the remote - server. Called launch for historical reasons, should be renamed to - enqueue or something like that. - - **Parameters** - - command_line : str - Command to execute. - """ - launch_params = dict(command_line=command_line, job_id=self.job_id) - submit_params = self._submit_params - if submit_params: - launch_params['params'] = dumps(submit_params) - if requirements: - launch_params['requirements'] = dumps([requirement.to_dict() for requirement in requirements]) - if remote_staging: - launch_params['remote_staging'] = dumps(remote_staging) - if job_config and self.setup_handler.local: - # Setup not yet called, job properties were inferred from - # destination arguments. Hence, must have LWR setup job - # before queueing. - setup_params = _setup_params_from_job_config(job_config) - launch_params["setup_params"] = dumps(setup_params) - return self._raw_execute("launch", launch_params) - - def final_status(self): - """ Return a dictionary summarizing final state of job. - """ - return self.raw_check_complete() - - def kill(self): - """ - Cancel remote job, either removing from the queue or killing it. - """ - return self._raw_execute("kill", {"job_id": self.job_id}) - - def wait(self, max_seconds=None): - """ - Wait for job to finish. - """ - i = 0 - while max_seconds is None or i < max_seconds: - complete_response = self.raw_check_complete() - if complete_response["complete"] == "true": - print complete_response - return complete_response - else: - print complete_response - sleep(1) - i += 1 - - @parseJson() - def raw_check_complete(self): - """ - Get check_complete response from the remote server. - """ - check_complete_response = self._raw_execute("check_complete", {"job_id": self.job_id}) - return check_complete_response - - def check_complete(self, response=None): - """ - Return boolean indicating whether the job is complete. - """ - if response is None: - response = self.raw_check_complete() - return response["complete"] == "true" - - @retry() - def get_status(self): - check_complete_response = self.raw_check_complete() - # Older LWR instances won't set status so use 'complete', at some - # point drop backward compatibility. - status = check_complete_response.get("status", None) - # Bug in certains older LWR instances returned literal "status". - if status in ["status", None]: - complete = self.check_complete(check_complete_response) - old_status = "complete" if complete else "running" - status = old_status - return status - - def clean(self): - """ - Cleanup the remote job. - """ - self._raw_execute("clean", {"job_id": self.job_id}) - - @parseJson() - def remote_setup(self, **setup_args): - """ - Setup remote LWR server to run this job. - """ - return self._raw_execute("setup", setup_args) - - def _copy(self, source, destination): - source = os.path.abspath(source) - destination = os.path.abspath(destination) - if source != destination: - shutil.copyfile(source, destination) - class MessageJobClient(BaseJobClient): @@ -379,16 +310,16 @@ return self.client_manager.exchange.publish("setup", launch_params) def clean(self): - del self.client_manager.final_status_cache[self.job_id] + del self.client_manager.status_cache[self.job_id] - def final_status(self): - final_status = self.client_manager.final_status_cache.get(self.job_id, None) - if final_status is None: - raise Exception("final_status() called before a final status was properly cached with cilent manager.") - return final_status + def full_status(self): + full_status = self.client_manager.status_cache.get(self.job_id, None) + if full_status is None: + raise Exception("full_status() called before a final status was properly cached with cilent manager.") + return full_status def kill(self): - log.warn("Kill not yet implemented with message queue driven LWR jobs.") + self.client_manager.exchange.publish("kill", dict(job_id=self.job_id)) class InputCachingJobClient(JobClient): @@ -443,55 +374,3 @@ tool_id=tool_id, tool_version=tool_version ) - - -class ObjectStoreClient(object): - - def __init__(self, lwr_interface): - self.lwr_interface = lwr_interface - - @parseJson() - def exists(self, **kwds): - return self._raw_execute("object_store_exists", args=self.__data(**kwds)) - - @parseJson() - def file_ready(self, **kwds): - return self._raw_execute("object_store_file_ready", args=self.__data(**kwds)) - - @parseJson() - def create(self, **kwds): - return self._raw_execute("object_store_create", args=self.__data(**kwds)) - - @parseJson() - def empty(self, **kwds): - return self._raw_execute("object_store_empty", args=self.__data(**kwds)) - - @parseJson() - def size(self, **kwds): - return self._raw_execute("object_store_size", args=self.__data(**kwds)) - - @parseJson() - def delete(self, **kwds): - return self._raw_execute("object_store_delete", args=self.__data(**kwds)) - - @parseJson() - def get_data(self, **kwds): - return self._raw_execute("object_store_get_data", args=self.__data(**kwds)) - - @parseJson() - def get_filename(self, **kwds): - return self._raw_execute("object_store_get_filename", args=self.__data(**kwds)) - - @parseJson() - def update_from_file(self, **kwds): - return self._raw_execute("object_store_update_from_file", args=self.__data(**kwds)) - - @parseJson() - def get_store_usage_percent(self): - return self._raw_execute("object_store_get_store_usage_percent", args={}) - - def __data(self, **kwds): - return kwds - - def _raw_execute(self, command, args={}): - return self.lwr_interface.execute(command, args, data=None, input_path=None, output_path=None) diff -r 98253ff31a334d01eb2ee28e10c637f71c1ba6f3 -r 5c3d7c2346617774a4248b9ef990a3208e00576b lib/galaxy/jobs/runners/lwr_client/decorators.py --- /dev/null +++ b/lib/galaxy/jobs/runners/lwr_client/decorators.py @@ -0,0 +1,35 @@ +import time +import json + +MAX_RETRY_COUNT = 5 +RETRY_SLEEP_TIME = 0.1 + + +class parseJson(object): + + def __call__(self, func): + def replacement(*args, **kwargs): + response = func(*args, **kwargs) + return json.loads(response) + return replacement + + +class retry(object): + + def __call__(self, func): + + def replacement(*args, **kwargs): + max_count = MAX_RETRY_COUNT + count = 0 + while True: + count += 1 + try: + return func(*args, **kwargs) + except: + if count >= max_count: + raise + else: + time.sleep(RETRY_SLEEP_TIME) + continue + + return replacement diff -r 98253ff31a334d01eb2ee28e10c637f71c1ba6f3 -r 5c3d7c2346617774a4248b9ef990a3208e00576b lib/galaxy/jobs/runners/lwr_client/interface.py --- /dev/null +++ b/lib/galaxy/jobs/runners/lwr_client/interface.py @@ -0,0 +1,97 @@ +from abc import ABCMeta +from abc import abstractmethod +try: + from StringIO import StringIO as BytesIO +except ImportError: + from io import BytesIO +try: + from six import text_type +except ImportError: + from galaxy.util import unicodify as text_type +try: + from urllib import urlencode +except ImportError: + from urllib.parse import urlencode + + +class LwrInteface(object): + """ + Abstract base class describes how synchronous client communicates with + (potentially remote) LWR procedures. Obvious implementation is HTTP based + but LWR objects wrapped in routes can also be directly communicated with + if in memory. + """ + __metaclass__ = ABCMeta + + @abstractmethod + def execute(self, command, args={}, data=None, input_path=None, output_path=None): + """ + Execute the correspond command against configured LWR job manager. Arguments are + method parameters and data or input_path describe essentially POST bodies. If command + results in a file, resulting path should be specified as output_path. + """ + + +class HttpLwrInterface(LwrInteface): + + def __init__(self, destination_params, transport): + self.transport = transport + self.remote_host = destination_params.get("url") + assert self.remote_host is not None, "Failed to determine url for LWR client." + self.private_key = destination_params.get("private_token", None) + + def execute(self, command, args={}, data=None, input_path=None, output_path=None): + url = self.__build_url(command, args) + response = self.transport.execute(url, data=data, input_path=input_path, output_path=output_path) + return response + + def __build_url(self, command, args): + if self.private_key: + args["private_key"] = self.private_key + arg_bytes = dict([(k, text_type(args[k]).encode('utf-8')) for k in args]) + data = urlencode(arg_bytes) + url = self.remote_host + command + "?" + data + return url + + +class LocalLwrInterface(LwrInteface): + + def __init__(self, destination_params, job_manager=None, file_cache=None, object_store=None): + self.job_manager = job_manager + self.file_cache = file_cache + self.object_store = object_store + + def __app_args(self): + ## Arguments that would be specified from LwrApp if running + ## in web server. + return { + 'manager': self.job_manager, + 'file_cache': self.file_cache, + 'object_store': self.object_store, + 'ip': None + } + + def execute(self, command, args={}, data=None, input_path=None, output_path=None): + # If data set, should be unicode (on Python 2) or str (on Python 3). + from lwr import routes + from lwr.framework import build_func_args + controller = getattr(routes, command) + action = controller.func + body_args = dict(body=self.__build_body(data, input_path)) + args = build_func_args(action, args.copy(), self.__app_args(), body_args) + result = action(**args) + if controller.response_type != 'file': + return controller.body(result) + else: + # TODO: Add to Galaxy. + from galaxy.util import copy_to_path + with open(result, 'rb') as result_file: + copy_to_path(result_file, output_path) + + def __build_body(self, data, input_path): + if data is not None: + return BytesIO(data.encode('utf-8')) + elif input_path is not None: + return open(input_path, 'rb') + else: + return None diff -r 98253ff31a334d01eb2ee28e10c637f71c1ba6f3 -r 5c3d7c2346617774a4248b9ef990a3208e00576b lib/galaxy/jobs/runners/lwr_client/manager.py --- a/lib/galaxy/jobs/runners/lwr_client/manager.py +++ b/lib/galaxy/jobs/runners/lwr_client/manager.py @@ -1,27 +1,16 @@ -from abc import ABCMeta, abstractmethod import threading try: from Queue import Queue except ImportError: from queue import Queue from os import getenv -try: - from urllib import urlencode -except ImportError: - from urllib.parse import urlencode -try: - from StringIO import StringIO as BytesIO -except ImportError: - from io import BytesIO -try: - from six import text_type -except ImportError: - from galaxy.util import unicodify as text_type from .client import JobClient from .client import InputCachingJobClient -from .client import ObjectStoreClient from .client import MessageJobClient +from .interface import HttpLwrInterface +from .interface import LocalLwrInterface +from .object_client import ObjectStoreClient from .transport import get_transport from .util import TransferEventManager from .destination import url_to_destination_params @@ -87,12 +76,12 @@ self.url = kwds.get('url') self.manager_name = kwds.get("manager", "_default_") self.exchange = LwrExchange(self.url, self.manager_name) - self.final_status_cache = {} + self.status_cache = {} self.callback_lock = threading.Lock() self.callback_thread = None self.active = True - def ensure_has_job_completes_callback(self, callback): + def ensure_has_status_update_callback(self, callback): with self.callback_lock: if self.callback_thread is not None: return @@ -100,17 +89,17 @@ def callback_wrapper(body, message): try: if "job_id" in body: - self.final_status_cache[body["job_id"]] = body + self.status_cache[body["job_id"]] = body callback(body) except Exception: log.exception("Failure processing job status update message.") message.ack() def run(): - self.exchange.consume("complete", callback_wrapper, check=self) + self.exchange.consume("status_update", callback_wrapper, check=self) thread = threading.Thread( - name="lwr_client_%s_complete_callback" % self.manager_name, + name="lwr_client_%s_status_update_callback" % self.manager_name, target=run ) thread.daemon = False # Lets not interrupt processing of this. @@ -148,87 +137,6 @@ return ObjectStoreClient(interface) -class JobManagerInteface(object): - """ - Abstract base class describes how client communicates with remote job - manager. - """ - __metaclass__ = ABCMeta - - @abstractmethod - def execute(self, command, args={}, data=None, input_path=None, output_path=None): - """ - Execute the correspond command against configured LWR job manager. Arguments are - method parameters and data or input_path describe essentially POST bodies. If command - results in a file, resulting path should be specified as output_path. - """ - - -class HttpLwrInterface(object): - - def __init__(self, destination_params, transport): - self.transport = transport - self.remote_host = destination_params.get("url") - assert self.remote_host is not None, "Failed to determine url for LWR client." - self.private_key = destination_params.get("private_token", None) - - def execute(self, command, args={}, data=None, input_path=None, output_path=None): - url = self.__build_url(command, args) - response = self.transport.execute(url, data=data, input_path=input_path, output_path=output_path) - return response - - def __build_url(self, command, args): - if self.private_key: - args["private_key"] = self.private_key - arg_bytes = dict([(k, text_type(args[k]).encode('utf-8')) for k in args]) - data = urlencode(arg_bytes) - url = self.remote_host + command + "?" + data - return url - - -class LocalLwrInterface(object): - - def __init__(self, destination_params, job_manager=None, file_cache=None, object_store=None): - self.job_manager = job_manager - self.file_cache = file_cache - self.object_store = object_store - - def __app_args(self): - ## Arguments that would be specified from LwrApp if running - ## in web server. - return { - 'manager': self.job_manager, - 'file_cache': self.file_cache, - 'object_store': self.object_store, - 'ip': None - } - - def execute(self, command, args={}, data=None, input_path=None, output_path=None): - # If data set, should be unicode (on Python 2) or str (on Python 3). - from lwr import routes - from lwr.framework import build_func_args - controller = getattr(routes, command) - action = controller.func - body_args = dict(body=self.__build_body(data, input_path)) - args = build_func_args(action, args.copy(), self.__app_args(), body_args) - result = action(**args) - if controller.response_type != 'file': - return controller.body(result) - else: - # TODO: Add to Galaxy. - from galaxy.util import copy_to_path - with open(result, 'rb') as result_file: - copy_to_path(result_file, output_path) - - def __build_body(self, data, input_path): - if data is not None: - return BytesIO(data.encode('utf-8')) - elif input_path is not None: - return open(input_path, 'rb') - else: - return None - - class ClientCacher(object): def __init__(self, **kwds): diff -r 98253ff31a334d01eb2ee28e10c637f71c1ba6f3 -r 5c3d7c2346617774a4248b9ef990a3208e00576b lib/galaxy/jobs/runners/lwr_client/object_client.py --- /dev/null +++ b/lib/galaxy/jobs/runners/lwr_client/object_client.py @@ -0,0 +1,53 @@ +from .decorators import parseJson + + +class ObjectStoreClient(object): + + def __init__(self, lwr_interface): + self.lwr_interface = lwr_interface + + @parseJson() + def exists(self, **kwds): + return self._raw_execute("object_store_exists", args=self.__data(**kwds)) + + @parseJson() + def file_ready(self, **kwds): + return self._raw_execute("object_store_file_ready", args=self.__data(**kwds)) + + @parseJson() + def create(self, **kwds): + return self._raw_execute("object_store_create", args=self.__data(**kwds)) + + @parseJson() + def empty(self, **kwds): + return self._raw_execute("object_store_empty", args=self.__data(**kwds)) + + @parseJson() + def size(self, **kwds): + return self._raw_execute("object_store_size", args=self.__data(**kwds)) + + @parseJson() + def delete(self, **kwds): + return self._raw_execute("object_store_delete", args=self.__data(**kwds)) + + @parseJson() + def get_data(self, **kwds): + return self._raw_execute("object_store_get_data", args=self.__data(**kwds)) + + @parseJson() + def get_filename(self, **kwds): + return self._raw_execute("object_store_get_filename", args=self.__data(**kwds)) + + @parseJson() + def update_from_file(self, **kwds): + return self._raw_execute("object_store_update_from_file", args=self.__data(**kwds)) + + @parseJson() + def get_store_usage_percent(self): + return self._raw_execute("object_store_get_store_usage_percent", args={}) + + def __data(self, **kwds): + return kwds + + def _raw_execute(self, command, args={}): + return self.lwr_interface.execute(command, args, data=None, input_path=None, output_path=None) diff -r 98253ff31a334d01eb2ee28e10c637f71c1ba6f3 -r 5c3d7c2346617774a4248b9ef990a3208e00576b lib/galaxy/jobs/runners/lwr_client/staging/down.py --- a/lib/galaxy/jobs/runners/lwr_client/staging/down.py +++ b/lib/galaxy/jobs/runners/lwr_client/staging/down.py @@ -41,15 +41,14 @@ if not action.staging_action_local: return False - path = action.path - if output_type == 'legacy': - working_directory = results_collector.client_outputs.working_directory - self.client.fetch_output_legacy(path, working_directory, action_type=action.action_type) - elif output_type == 'output_workdir': - working_directory = results_collector.client_outputs.working_directory - self.client.fetch_work_dir_output(name, working_directory, path, action_type=action.action_type) - elif output_type == 'output': - self.client.fetch_output(path=path, name=name, action_type=action.action_type) + working_directory = results_collector.client_outputs.working_directory + self.client.fetch_output( + path=action.path, + name=name, + working_directory=working_directory, + output_type=output_type, + action_type=action.action_type + ) return True diff -r 98253ff31a334d01eb2ee28e10c637f71c1ba6f3 -r 5c3d7c2346617774a4248b9ef990a3208e00576b lib/galaxy/jobs/runners/lwr_client/util.py --- a/lib/galaxy/jobs/runners/lwr_client/util.py +++ b/lib/galaxy/jobs/runners/lwr_client/util.py @@ -6,6 +6,7 @@ from os.path import join import os.path import hashlib +import shutil def unique_path_prefix(path): @@ -14,6 +15,22 @@ return m.hexdigest() +def copy(source, destination): + """ Copy file from source to destination if needed (skip if source + is destination). + """ + source = os.path.abspath(source) + destination = os.path.abspath(destination) + if source != destination: + shutil.copyfile(source, destination) + + +def ensure_directory(file_path): + directory = os.path.dirname(file_path) + if not os.path.exists(directory): + os.makedirs(directory) + + def directory_files(directory): """ Repository URL: https://bitbucket.org/galaxy/galaxy-central/ -- This is a commit notification from bitbucket.org. You are receiving this because you have the service enabled, addressing the recipient of this email.