commit/galaxy-central: 3 new changesets
3 new commits in galaxy-central: https://bitbucket.org/galaxy/galaxy-central/commits/d847b6985330/ Changeset: d847b6985330 User: jmchilton Date: 2014-02-20 16:37:17 Summary: Extend SecurityHelper to allow generation of different "kind"s of keys. There has been talk about including the object's class into the hash when generating keys so the ids are not overlapping between model types. This change could allow that, but my immediate desire is to just to create single-purpose per job keys to allow external job running code to access files on behalf of a given job. Affected #: 1 file diff -r 9c673e0ea8919fea6941f24fdf6936686840bb71 -r d847b69853302d9a127c31c612dd19cc7ffa9f61 lib/galaxy/web/security/__init__.py --- a/lib/galaxy/web/security/__init__.py +++ b/lib/galaxy/web/security/__init__.py @@ -1,3 +1,4 @@ +import collections import os, os.path, logging import pkg_resources @@ -37,27 +38,32 @@ self.id_secret = config['id_secret'] self.id_cipher = Blowfish.new( self.id_secret ) - def encode_id( self, obj_id ): + per_kind_id_secret_base = config.get( 'per_kind_id_secret_base', self.id_secret ) + self.id_ciphers_for_key = _cipher_cache( per_kind_id_secret_base ) + + def encode_id( self, obj_id, kind=None ): + id_cipher = self.__id_cipher( kind ) # Convert to string s = str( obj_id ) # Pad to a multiple of 8 with leading "!" s = ( "!" * ( 8 - len(s) % 8 ) ) + s # Encrypt - return self.id_cipher.encrypt( s ).encode( 'hex' ) + return id_cipher.encrypt( s ).encode( 'hex' ) - def encode_dict_ids( self, a_dict ): + def encode_dict_ids( self, a_dict, kind=None ): """ Encode all ids in dictionary. Ids are identified by (a) an 'id' key or (b) a key that ends with '_id' """ for key, val in a_dict.items(): if key == 'id' or key.endswith('_id'): - a_dict[ key ] = self.encode_id( val ) + a_dict[ key ] = self.encode_id( val, kind=kind ) return a_dict - def decode_id( self, obj_id ): - return int( self.id_cipher.decrypt( obj_id.decode( 'hex' ) ).lstrip( "!" ) ) + def decode_id( self, obj_id, kind=None ): + id_cipher = self.__id_cipher( kind ) + return int( id_cipher.decrypt( obj_id.decode( 'hex' ) ).lstrip( "!" ) ) def encode_guid( self, session_key ): # Session keys are strings @@ -73,3 +79,19 @@ def get_new_guid( self ): # Generate a unique, high entropy 128 bit random number return get_random_bytes( 16 ) + + def __id_cipher( self, kind ): + if not kind: + id_cipher = self.id_cipher + else: + id_cipher = self.id_ciphers_for_key[ kind ] + return id_cipher + + +class _cipher_cache( collections.defaultdict ): + + def __init__( self, secret_base ): + self.secret_base = secret_base + + def __missing__( self, key ): + return Blowfish.new( self.secret_base + "__" + key ) https://bitbucket.org/galaxy/galaxy-central/commits/58acdb670e4b/ Changeset: 58acdb670e4b User: jmchilton Date: 2014-02-20 16:37:17 Summary: Implement API allowing access to a job's files. Provides a mechanism for remote job execution mechanisms to read and write files on behalf of an "active" jobs. For a variety of reasons simply providing access to datasets is insuccifient for this - there are working directory files needed for metadata calculation, inputs and outputs may be of intermediate form (task split files for instance), tool files, location files, etc.... Therefore this API endpoint provides access to a jobs view of these files. Some attempt is made to verify the paths that are written to are valid for the supplied job (they either correspond to the output datasets of the job or the working directory of the job). Authorizing suchs paths for reading is much more difficult and left undone due to the unstructure and arbitrary nature of .loc files. To implement this securely and with minimal configuration required - single-purpose "job_key"s are generated to authenticate these API calls. Each key allow usage of the API only for a single, active job. Update LWR job runner to be able to leverage this API to stage files remotely via HTTP for "remote_transfer" actions (added recently to the LWR client). Affected #: 6 files diff -r d847b69853302d9a127c31c612dd19cc7ffa9f61 -r 58acdb670e4b8ea9a35d3b19aaab1e66c5b75009 lib/galaxy/jobs/runners/lwr.py --- a/lib/galaxy/jobs/runners/lwr.py +++ b/lib/galaxy/jobs/runners/lwr.py @@ -28,6 +28,10 @@ NO_REMOTE_GALAXY_FOR_METADATA_MESSAGE = "LWR misconfiguration - LWR client configured to set metadata remotely, but remote LWR isn't properly configured with a galaxy_home directory." NO_REMOTE_DATATYPES_CONFIG = "LWR client is configured to use remote datatypes configuration when setting metadata externally, but LWR is not configured with this information. Defaulting to datatypes_conf.xml." +# Is there a good way to infer some default for this? Can only use +# url_for from web threads. https://gist.github.com/jmchilton/9098762 +DEFAULT_GALAXY_URL = "http://localhost:8080" + class LwrJobRunner( AsynchronousJobRunner ): """ @@ -35,13 +39,14 @@ """ runner_name = "LWRRunner" - def __init__( self, app, nworkers, transport=None, cache=None, url=None ): + def __init__( self, app, nworkers, transport=None, cache=None, url=None, galaxy_url=DEFAULT_GALAXY_URL ): """Start the job runner """ super( LwrJobRunner, self ).__init__( app, nworkers ) self.async_status_updates = dict() self._init_monitor_thread() self._init_worker_threads() client_manager_kwargs = {'transport_type': transport, 'cache': string_as_bool_or_none(cache), "url": url} + self.galaxy_url = galaxy_url self.client_manager = build_client_manager(**client_manager_kwargs) def url_to_destination( self, url ): @@ -224,7 +229,21 @@ return self.get_client( job_destination_params, job_id ) def get_client( self, job_destination_params, job_id ): - return self.client_manager.get_client( job_destination_params, str( job_id ) ) + # Cannot use url_for outside of web thread. + #files_endpoint = url_for( controller="job_files", job_id=encoded_job_id ) + + encoded_job_id = self.app.security.encode_id(job_id) + job_key = self.app.security.encode_id( job_id, kind="jobs_files" ) + files_endpoint = "%s/api/jobs/%s/files?job_key=%s" % ( + self.galaxy_url, + encoded_job_id, + job_key + ) + get_client_kwds = dict( + job_id=str( job_id ), + files_endpoint=files_endpoint, + ) + return self.client_manager.get_client( job_destination_params, **get_client_kwds ) def finish_job( self, job_state ): stderr = stdout = '' diff -r d847b69853302d9a127c31c612dd19cc7ffa9f61 -r 58acdb670e4b8ea9a35d3b19aaab1e66c5b75009 lib/galaxy/model/__init__.py --- a/lib/galaxy/model/__init__.py +++ b/lib/galaxy/model/__init__.py @@ -266,6 +266,16 @@ self.handler = None self.exit_code = None + @property + def finished( self ): + states = self.states + return self.state in [ + states.OK, + states.ERROR, + states.DELETED, + states.DELETED_NEW, + ] + # TODO: Add accessors for members defined in SQL Alchemy for the Job table and # for the mapper defined to the Job table. def get_external_output_metadata( self ): diff -r d847b69853302d9a127c31c612dd19cc7ffa9f61 -r 58acdb670e4b8ea9a35d3b19aaab1e66c5b75009 lib/galaxy/web/__init__.py --- a/lib/galaxy/web/__init__.py +++ b/lib/galaxy/web/__init__.py @@ -20,3 +20,4 @@ from framework import _future_expose_api from framework import _future_expose_api_anonymous from framework import _future_expose_api_raw +from framework import _future_expose_api_raw_anonymous diff -r d847b69853302d9a127c31c612dd19cc7ffa9f61 -r 58acdb670e4b8ea9a35d3b19aaab1e66c5b75009 lib/galaxy/web/framework/__init__.py --- a/lib/galaxy/web/framework/__init__.py +++ b/lib/galaxy/web/framework/__init__.py @@ -278,6 +278,10 @@ return _future_expose_api( func, to_json=False, user_required=True ) +def _future_expose_api_raw_anonymous( func ): + return _future_expose_api( func, to_json=False, user_required=False ) + + # TODO: rename as expose_api and make default. def _future_expose_api( func, to_json=True, user_required=True ): """ diff -r d847b69853302d9a127c31c612dd19cc7ffa9f61 -r 58acdb670e4b8ea9a35d3b19aaab1e66c5b75009 lib/galaxy/webapps/galaxy/api/job_files.py --- /dev/null +++ b/lib/galaxy/webapps/galaxy/api/job_files.py @@ -0,0 +1,142 @@ +""" API for asynchronous job running mechanisms can use to fetch or put files +related to running and queued jobs. +""" +import os +import shutil + +from galaxy import exceptions +from galaxy import util +from galaxy import model +from galaxy.web.base.controller import BaseAPIController +from galaxy.web import _future_expose_api_anonymous as expose_api_anonymous +from galaxy.web import _future_expose_api_raw_anonymous as expose_api_raw_anonymous + + +import logging +log = logging.getLogger( __name__ ) + + +class JobFilesAPIController( BaseAPIController ): + """ This job files controller allows remote job running mechanisms to + read and modify the current state of files for queued and running jobs. + It is certainly not meant to represent part of Galaxy's stable, user + facing API. + + Furthermore, even if a user key corresponds to the user running the job, + it should not be accepted for authorization - this API allows access to + low-level unfiltered files and such authorization would break Galaxy's + security model for tool execution. + """ + + @expose_api_raw_anonymous + def index( self, trans, job_id, **kwargs ): + """ + index( self, trans, job_id, **kwargs ) + * GET /api/jobs/{job_id}/files + Get a file required to staging a job (proper datasets, extra inputs, + task-split inputs, working directory files). + + :type job_id: str + :param job_id: encoded id string of the job + :type path: str + :param path: Path to file. + :type job_key: str + :param job_key: A key used to authenticate this request as acting on + behalf or a job runner for the specified job. + ..note: + This API method is intended only for consumption by job runners, + not end users. + + :rtype: binary + :returns: contents of file + """ + self.__authorize_job_access( trans, job_id, **kwargs ) + path = kwargs.get("path", None) + return open(path, 'rb') + + @expose_api_anonymous + def create( self, trans, job_id, payload, **kwargs ): + """ + create( self, trans, job_id, payload, **kwargs ) + * POST /api/jobs/{job_id}/files + Populate an output file (formal dataset, task split part, working + directory file (such as those related to metadata)). This should be + a multipart post with a 'file' parameter containing the contents of + the actual file to create. + + :type job_id: str + :param job_id: encoded id string of the job + :type payload: dict + :param payload: dictionary structure containing:: + 'job_key' = Key authenticating + 'path' = Path to file to create. + + ..note: + This API method is intended only for consumption by job runners, + not end users. + + :rtype: dict + :returns: an okay message + """ + job = self.__authorize_job_access( trans, job_id, **payload ) + path = payload.get( "path" ) + self.__check_job_can_write_to_path( trans, job, path ) + + # Is this writing an unneeded file? Should this just copy in Python? + input_file = payload.get( "file", payload.get( "__file", None ) ).file + try: + shutil.copyfile( input_file.name, path ) + finally: + input_file.close() + return {"message": "ok"} + + def __authorize_job_access(self, trans, encoded_job_id, **kwargs): + for key in [ "path", "job_key" ]: + if key not in kwargs: + error_message = "Job files action requires a valid '%s'." % key + raise exceptions.ObjectAttributeMissingException( error_message ) + + job_id = trans.security.decode_id( encoded_job_id ) + job_key = trans.security.encode_id( job_id, kind="jobs_files" ) + if not util.safe_str_cmp( kwargs[ "job_key" ], job_key ): + raise exceptions.ItemAccessibilityException("Invalid job_key supplied.") + + # Verify job is active. Don't update the contents of complete jobs. + job = trans.sa_session.query( model.Job ).get( job_id ) + if job.finished: + error_message = "Attempting to read or modify the files of a job that has already completed." + raise exceptions.MessageException( error_message ) + return job + + def __check_job_can_write_to_path( self, trans, job, path ): + """ Verify an idealized job runner should actually be able to write to + the specified path - it must be a dataset output, a dataset "extra + file", or a some place in the working directory of this job. + + Would like similar checks for reading the unstructured nature of loc + files make this very difficult. (See abandoned work here + https://gist.github.com/jmchilton/9103619.) + """ + in_work_dir = self.__in_working_directory( job, path, trans.app ) + if not in_work_dir and not self.__is_output_dataset_path( job, path ): + raise exceptions.ItemAccessibilityException("Job is not authorized to write to supplied path.") + + def __is_output_dataset_path( self, job, path ): + """ Check if is an output path for this job or a file in the an + output's extra files path. + """ + da_lists = [ job.output_datasets, job.output_library_datasets ] + for da_list in da_lists: + for job_dataset_association in da_list: + dataset = job_dataset_association.dataset + if not dataset: + continue + if os.path.abspath( dataset.file_name ) == os.path.abspath( path ): + return True + elif util.in_directory( path, dataset.extra_files_path ): + return True + return False + + def __in_working_directory( self, job, path, app ): + working_directory = app.object_store.get_filename(job, base_dir='job_work', dir_only=True, extra_dir=str(job.id)) + return util.in_directory( path, working_directory ) diff -r d847b69853302d9a127c31c612dd19cc7ffa9f61 -r 58acdb670e4b8ea9a35d3b19aaab1e66c5b75009 lib/galaxy/webapps/galaxy/buildapp.py --- a/lib/galaxy/webapps/galaxy/buildapp.py +++ b/lib/galaxy/webapps/galaxy/buildapp.py @@ -233,12 +233,20 @@ 'permissions', path_prefix='/api/libraries/:library_id', parent_resources=dict( member_name='library', collection_name='libraries' ) ) - - webapp.mapper.resource( 'job', - 'jobs', + + webapp.mapper.resource( 'job', + 'jobs', path_prefix='/api' ) webapp.mapper.connect( 'job_search', '/api/jobs/search', controller='jobs', action='search', conditions=dict( method=['POST'] ) ) + # Job files controllers. Only for consumption by remote job runners. + webapp.mapper.resource( 'file', + 'files', + controller="job_files", + name_prefix="job_", + path_prefix='/api/jobs/:job_id', + parent_resources=dict( member_name="job", collection_name="jobs") + ) _add_item_extended_metadata_controller( webapp, name_prefix="library_dataset_", https://bitbucket.org/galaxy/galaxy-central/commits/2727f1617161/ Changeset: 2727f1617161 User: jmchilton Date: 2014-03-17 01:54:02 Summary: Merged in jmchilton/galaxy-central-fork-1 (pull request #327) Implement job files API. Affected #: 7 files diff -r 6495ceccc87d467f73d76e21f801acc78934fc8f -r 2727f16171610e0e27420d780e9a5ed56f1908f5 lib/galaxy/jobs/runners/lwr.py --- a/lib/galaxy/jobs/runners/lwr.py +++ b/lib/galaxy/jobs/runners/lwr.py @@ -28,6 +28,10 @@ NO_REMOTE_GALAXY_FOR_METADATA_MESSAGE = "LWR misconfiguration - LWR client configured to set metadata remotely, but remote LWR isn't properly configured with a galaxy_home directory." NO_REMOTE_DATATYPES_CONFIG = "LWR client is configured to use remote datatypes configuration when setting metadata externally, but LWR is not configured with this information. Defaulting to datatypes_conf.xml." +# Is there a good way to infer some default for this? Can only use +# url_for from web threads. https://gist.github.com/jmchilton/9098762 +DEFAULT_GALAXY_URL = "http://localhost:8080" + class LwrJobRunner( AsynchronousJobRunner ): """ @@ -35,13 +39,14 @@ """ runner_name = "LWRRunner" - def __init__( self, app, nworkers, transport=None, cache=None, url=None ): + def __init__( self, app, nworkers, transport=None, cache=None, url=None, galaxy_url=DEFAULT_GALAXY_URL ): """Start the job runner """ super( LwrJobRunner, self ).__init__( app, nworkers ) self.async_status_updates = dict() self._init_monitor_thread() self._init_worker_threads() client_manager_kwargs = {'transport_type': transport, 'cache': string_as_bool_or_none(cache), "url": url} + self.galaxy_url = galaxy_url self.client_manager = build_client_manager(**client_manager_kwargs) def url_to_destination( self, url ): @@ -224,7 +229,21 @@ return self.get_client( job_destination_params, job_id ) def get_client( self, job_destination_params, job_id ): - return self.client_manager.get_client( job_destination_params, str( job_id ) ) + # Cannot use url_for outside of web thread. + #files_endpoint = url_for( controller="job_files", job_id=encoded_job_id ) + + encoded_job_id = self.app.security.encode_id(job_id) + job_key = self.app.security.encode_id( job_id, kind="jobs_files" ) + files_endpoint = "%s/api/jobs/%s/files?job_key=%s" % ( + self.galaxy_url, + encoded_job_id, + job_key + ) + get_client_kwds = dict( + job_id=str( job_id ), + files_endpoint=files_endpoint, + ) + return self.client_manager.get_client( job_destination_params, **get_client_kwds ) def finish_job( self, job_state ): stderr = stdout = '' diff -r 6495ceccc87d467f73d76e21f801acc78934fc8f -r 2727f16171610e0e27420d780e9a5ed56f1908f5 lib/galaxy/model/__init__.py --- a/lib/galaxy/model/__init__.py +++ b/lib/galaxy/model/__init__.py @@ -268,6 +268,16 @@ self.handler = None self.exit_code = None + @property + def finished( self ): + states = self.states + return self.state in [ + states.OK, + states.ERROR, + states.DELETED, + states.DELETED_NEW, + ] + # TODO: Add accessors for members defined in SQL Alchemy for the Job table and # for the mapper defined to the Job table. def get_external_output_metadata( self ): diff -r 6495ceccc87d467f73d76e21f801acc78934fc8f -r 2727f16171610e0e27420d780e9a5ed56f1908f5 lib/galaxy/web/__init__.py --- a/lib/galaxy/web/__init__.py +++ b/lib/galaxy/web/__init__.py @@ -20,3 +20,4 @@ from framework import _future_expose_api from framework import _future_expose_api_anonymous from framework import _future_expose_api_raw +from framework import _future_expose_api_raw_anonymous diff -r 6495ceccc87d467f73d76e21f801acc78934fc8f -r 2727f16171610e0e27420d780e9a5ed56f1908f5 lib/galaxy/web/framework/__init__.py --- a/lib/galaxy/web/framework/__init__.py +++ b/lib/galaxy/web/framework/__init__.py @@ -278,6 +278,10 @@ return _future_expose_api( func, to_json=False, user_required=True ) +def _future_expose_api_raw_anonymous( func ): + return _future_expose_api( func, to_json=False, user_required=False ) + + # TODO: rename as expose_api and make default. def _future_expose_api( func, to_json=True, user_required=True ): """ diff -r 6495ceccc87d467f73d76e21f801acc78934fc8f -r 2727f16171610e0e27420d780e9a5ed56f1908f5 lib/galaxy/web/security/__init__.py --- a/lib/galaxy/web/security/__init__.py +++ b/lib/galaxy/web/security/__init__.py @@ -1,3 +1,4 @@ +import collections import os, os.path, logging import pkg_resources @@ -37,27 +38,32 @@ self.id_secret = config['id_secret'] self.id_cipher = Blowfish.new( self.id_secret ) - def encode_id( self, obj_id ): + per_kind_id_secret_base = config.get( 'per_kind_id_secret_base', self.id_secret ) + self.id_ciphers_for_key = _cipher_cache( per_kind_id_secret_base ) + + def encode_id( self, obj_id, kind=None ): + id_cipher = self.__id_cipher( kind ) # Convert to string s = str( obj_id ) # Pad to a multiple of 8 with leading "!" s = ( "!" * ( 8 - len(s) % 8 ) ) + s # Encrypt - return self.id_cipher.encrypt( s ).encode( 'hex' ) + return id_cipher.encrypt( s ).encode( 'hex' ) - def encode_dict_ids( self, a_dict ): + def encode_dict_ids( self, a_dict, kind=None ): """ Encode all ids in dictionary. Ids are identified by (a) an 'id' key or (b) a key that ends with '_id' """ for key, val in a_dict.items(): if key == 'id' or key.endswith('_id'): - a_dict[ key ] = self.encode_id( val ) + a_dict[ key ] = self.encode_id( val, kind=kind ) return a_dict - def decode_id( self, obj_id ): - return int( self.id_cipher.decrypt( obj_id.decode( 'hex' ) ).lstrip( "!" ) ) + def decode_id( self, obj_id, kind=None ): + id_cipher = self.__id_cipher( kind ) + return int( id_cipher.decrypt( obj_id.decode( 'hex' ) ).lstrip( "!" ) ) def encode_guid( self, session_key ): # Session keys are strings @@ -73,3 +79,19 @@ def get_new_guid( self ): # Generate a unique, high entropy 128 bit random number return get_random_bytes( 16 ) + + def __id_cipher( self, kind ): + if not kind: + id_cipher = self.id_cipher + else: + id_cipher = self.id_ciphers_for_key[ kind ] + return id_cipher + + +class _cipher_cache( collections.defaultdict ): + + def __init__( self, secret_base ): + self.secret_base = secret_base + + def __missing__( self, key ): + return Blowfish.new( self.secret_base + "__" + key ) diff -r 6495ceccc87d467f73d76e21f801acc78934fc8f -r 2727f16171610e0e27420d780e9a5ed56f1908f5 lib/galaxy/webapps/galaxy/api/job_files.py --- /dev/null +++ b/lib/galaxy/webapps/galaxy/api/job_files.py @@ -0,0 +1,142 @@ +""" API for asynchronous job running mechanisms can use to fetch or put files +related to running and queued jobs. +""" +import os +import shutil + +from galaxy import exceptions +from galaxy import util +from galaxy import model +from galaxy.web.base.controller import BaseAPIController +from galaxy.web import _future_expose_api_anonymous as expose_api_anonymous +from galaxy.web import _future_expose_api_raw_anonymous as expose_api_raw_anonymous + + +import logging +log = logging.getLogger( __name__ ) + + +class JobFilesAPIController( BaseAPIController ): + """ This job files controller allows remote job running mechanisms to + read and modify the current state of files for queued and running jobs. + It is certainly not meant to represent part of Galaxy's stable, user + facing API. + + Furthermore, even if a user key corresponds to the user running the job, + it should not be accepted for authorization - this API allows access to + low-level unfiltered files and such authorization would break Galaxy's + security model for tool execution. + """ + + @expose_api_raw_anonymous + def index( self, trans, job_id, **kwargs ): + """ + index( self, trans, job_id, **kwargs ) + * GET /api/jobs/{job_id}/files + Get a file required to staging a job (proper datasets, extra inputs, + task-split inputs, working directory files). + + :type job_id: str + :param job_id: encoded id string of the job + :type path: str + :param path: Path to file. + :type job_key: str + :param job_key: A key used to authenticate this request as acting on + behalf or a job runner for the specified job. + ..note: + This API method is intended only for consumption by job runners, + not end users. + + :rtype: binary + :returns: contents of file + """ + self.__authorize_job_access( trans, job_id, **kwargs ) + path = kwargs.get("path", None) + return open(path, 'rb') + + @expose_api_anonymous + def create( self, trans, job_id, payload, **kwargs ): + """ + create( self, trans, job_id, payload, **kwargs ) + * POST /api/jobs/{job_id}/files + Populate an output file (formal dataset, task split part, working + directory file (such as those related to metadata)). This should be + a multipart post with a 'file' parameter containing the contents of + the actual file to create. + + :type job_id: str + :param job_id: encoded id string of the job + :type payload: dict + :param payload: dictionary structure containing:: + 'job_key' = Key authenticating + 'path' = Path to file to create. + + ..note: + This API method is intended only for consumption by job runners, + not end users. + + :rtype: dict + :returns: an okay message + """ + job = self.__authorize_job_access( trans, job_id, **payload ) + path = payload.get( "path" ) + self.__check_job_can_write_to_path( trans, job, path ) + + # Is this writing an unneeded file? Should this just copy in Python? + input_file = payload.get( "file", payload.get( "__file", None ) ).file + try: + shutil.copyfile( input_file.name, path ) + finally: + input_file.close() + return {"message": "ok"} + + def __authorize_job_access(self, trans, encoded_job_id, **kwargs): + for key in [ "path", "job_key" ]: + if key not in kwargs: + error_message = "Job files action requires a valid '%s'." % key + raise exceptions.ObjectAttributeMissingException( error_message ) + + job_id = trans.security.decode_id( encoded_job_id ) + job_key = trans.security.encode_id( job_id, kind="jobs_files" ) + if not util.safe_str_cmp( kwargs[ "job_key" ], job_key ): + raise exceptions.ItemAccessibilityException("Invalid job_key supplied.") + + # Verify job is active. Don't update the contents of complete jobs. + job = trans.sa_session.query( model.Job ).get( job_id ) + if job.finished: + error_message = "Attempting to read or modify the files of a job that has already completed." + raise exceptions.MessageException( error_message ) + return job + + def __check_job_can_write_to_path( self, trans, job, path ): + """ Verify an idealized job runner should actually be able to write to + the specified path - it must be a dataset output, a dataset "extra + file", or a some place in the working directory of this job. + + Would like similar checks for reading the unstructured nature of loc + files make this very difficult. (See abandoned work here + https://gist.github.com/jmchilton/9103619.) + """ + in_work_dir = self.__in_working_directory( job, path, trans.app ) + if not in_work_dir and not self.__is_output_dataset_path( job, path ): + raise exceptions.ItemAccessibilityException("Job is not authorized to write to supplied path.") + + def __is_output_dataset_path( self, job, path ): + """ Check if is an output path for this job or a file in the an + output's extra files path. + """ + da_lists = [ job.output_datasets, job.output_library_datasets ] + for da_list in da_lists: + for job_dataset_association in da_list: + dataset = job_dataset_association.dataset + if not dataset: + continue + if os.path.abspath( dataset.file_name ) == os.path.abspath( path ): + return True + elif util.in_directory( path, dataset.extra_files_path ): + return True + return False + + def __in_working_directory( self, job, path, app ): + working_directory = app.object_store.get_filename(job, base_dir='job_work', dir_only=True, extra_dir=str(job.id)) + return util.in_directory( path, working_directory ) diff -r 6495ceccc87d467f73d76e21f801acc78934fc8f -r 2727f16171610e0e27420d780e9a5ed56f1908f5 lib/galaxy/webapps/galaxy/buildapp.py --- a/lib/galaxy/webapps/galaxy/buildapp.py +++ b/lib/galaxy/webapps/galaxy/buildapp.py @@ -261,6 +261,14 @@ path_prefix='/api' ) webapp.mapper.connect( 'job_search', '/api/jobs/search', controller='jobs', action='search', conditions=dict( method=['POST'] ) ) + # Job files controllers. Only for consumption by remote job runners. + webapp.mapper.resource( 'file', + 'files', + controller="job_files", + name_prefix="job_", + path_prefix='/api/jobs/:job_id', + parent_resources=dict( member_name="job", collection_name="jobs") + ) _add_item_extended_metadata_controller( webapp, name_prefix="library_dataset_", Repository URL: https://bitbucket.org/galaxy/galaxy-central/ -- This is a commit notification from bitbucket.org. You are receiving this because you have the service enabled, addressing the recipient of this email.
participants (1)
-
commits-noreply@bitbucket.org