commit/galaxy-central: jmchilton: Merged in jmchilton/galaxy-central-fork-1 (pull request #452)
1 new commit in galaxy-central: https://bitbucket.org/galaxy/galaxy-central/commits/de2e33813275/ Changeset: de2e33813275 User: jmchilton Date: 2014-08-09 15:15:11 Summary: Merged in jmchilton/galaxy-central-fork-1 (pull request #452) More RESTful workflow API (attempt 3) Affected #: 4 files diff -r fcbd2849ebd17a80e66e0fbdac9df7bb37f88fa2 -r de2e33813275b89a04e7de4e95b21f6d2e100d4e lib/galaxy/web/base/controller.py --- a/lib/galaxy/web/base/controller.py +++ b/lib/galaxy/web/base/controller.py @@ -1664,7 +1664,7 @@ session.flush() return imported_stored - def _workflow_from_dict( self, trans, data, source=None, add_to_menu=False ): + def _workflow_from_dict( self, trans, data, source=None, add_to_menu=False, publish=False ): """ Creates a workflow from a dict. Created workflow is stored in the database and returned. """ @@ -1752,6 +1752,7 @@ workflow.stored_workflow = stored stored.latest_workflow = workflow stored.user = trans.user + stored.published = publish if data[ 'annotation' ]: self.add_item_annotation( trans.sa_session, stored.user, stored, data[ 'annotation' ] ) diff -r fcbd2849ebd17a80e66e0fbdac9df7bb37f88fa2 -r de2e33813275b89a04e7de4e95b21f6d2e100d4e lib/galaxy/webapps/galaxy/api/workflows.py --- a/lib/galaxy/webapps/galaxy/api/workflows.py +++ b/lib/galaxy/webapps/galaxy/api/workflows.py @@ -6,11 +6,13 @@ import logging from sqlalchemy import desc, or_ -from galaxy import exceptions, util, web +from galaxy import exceptions, util from galaxy.model.item_attrs import UsesAnnotations +from galaxy.managers import histories from galaxy.web import _future_expose_api as expose_api from galaxy.web.base.controller import BaseAPIController, url_for, UsesStoredWorkflowMixin from galaxy.web.base.controller import UsesHistoryMixin +from galaxy.web.base.controller import SharableMixin from galaxy.workflow.extract import extract_workflow from galaxy.workflow.run import invoke from galaxy.workflow.run import WorkflowRunConfig @@ -18,9 +20,13 @@ log = logging.getLogger(__name__) -class WorkflowsAPIController(BaseAPIController, UsesStoredWorkflowMixin, UsesHistoryMixin, UsesAnnotations): +class WorkflowsAPIController(BaseAPIController, UsesStoredWorkflowMixin, UsesHistoryMixin, UsesAnnotations, SharableMixin): - @web.expose_api + def __init__( self, app ): + super( BaseAPIController, self ).__init__( app ) + self.history_manager = histories.HistoryManager() + + @expose_api def index(self, trans, **kwd): """ GET /api/workflows @@ -54,30 +60,21 @@ rval.append(item) return rval - @web.expose_api + @expose_api def show(self, trans, id, **kwd): """ GET /api/workflows/{encoded_workflow_id} Displays information needed to run a workflow from the command line. """ - workflow_id = id - try: - decoded_workflow_id = trans.security.decode_id(workflow_id) - except TypeError: - trans.response.status = 400 - return "Malformed workflow id ( %s ) specified, unable to decode." % str(workflow_id) - try: - stored_workflow = trans.sa_session.query(trans.app.model.StoredWorkflow).get(decoded_workflow_id) - if stored_workflow.importable is False and stored_workflow.user != trans.user and not trans.user_is_admin(): - if trans.sa_session.query(trans.app.model.StoredWorkflowUserShareAssociation).filter_by(user=trans.user, stored_workflow=stored_workflow).count() == 0: - trans.response.status = 400 - return("Workflow is neither importable, nor owned by or shared with current user") - except: - trans.response.status = 400 - return "That workflow does not exist." + stored_workflow = self.__get_stored_workflow( trans, id ) + if stored_workflow.importable is False and stored_workflow.user != trans.user and not trans.user_is_admin(): + if trans.sa_session.query(trans.app.model.StoredWorkflowUserShareAssociation).filter_by(user=trans.user, stored_workflow=stored_workflow).count() == 0: + message = "Workflow is neither importable, nor owned by or shared with current user" + raise exceptions.ItemAccessibilityException( message ) + item = stored_workflow.to_dict( view='element', value_mapper={ 'id': trans.security.encode_id } ) - item['url'] = url_for('workflow', id=workflow_id) + item['url'] = url_for('workflow', id=id) item['owner'] = stored_workflow.user.username latest_workflow = stored_workflow.latest_workflow inputs = {} @@ -114,7 +111,7 @@ item['steps'] = steps return item - @web.expose_api + @expose_api def create(self, trans, payload, **kwd): """ POST /api/workflows @@ -161,10 +158,20 @@ :param workflow_name: If from_history_id is set - name of the workflow to create when extracting a workflow from history :type workflow_name: str """ + ways_to_create = set( [ + 'workflow_id', + 'installed_repository_file', + 'from_history_id', + 'shared_workflow_id', + 'workflow', + ] ).intersection( payload ) + if len( ways_to_create ) == 0: + message = "One parameter among - %s - must be specified" % ", ".join( ways_to_create ) + raise exceptions.RequestParameterMissingException( message ) - if len( set( ['workflow_id', 'installed_repository_file', 'from_history_id'] ).intersection( payload ) ) > 1: - trans.response.status = 403 - return "Only one among 'workflow_id', 'installed_repository_file', 'from_history_id' must be specified" + if len( ways_to_create ) > 1: + message = "Only one parameter among - %s - must be specified" % ", ".join( ways_to_create ) + raise exceptions.RequestParameterInvalidException( message ) if 'installed_repository_file' in payload: workflow_controller = trans.webapp.controllers[ 'workflow' ] @@ -193,10 +200,17 @@ item[ 'url' ] = url_for( 'workflow', id=item[ 'id' ] ) return item + if 'shared_workflow_id' in payload: + workflow_id = payload[ 'shared_workflow_id' ] + return self.__api_import_shared_workflow( trans, workflow_id, payload ) + + if 'workflow' in payload: + return self.__api_import_new_workflow( trans, payload, **kwd ) + workflow_id = payload.get( 'workflow_id', None ) if not workflow_id: - trans.response.status = 403 - return "Either workflow_id, installed_repository_file or from_history_id must be specified" + message = "Invalid workflow_id specified." + raise exceptions.RequestParameterInvalidException( message ) # Pull other parameters out of payload. param_map = payload.get( 'parameters', {} ) @@ -225,34 +239,24 @@ history_param = payload.get('history', '') # Get workflow + accessibility check. - stored_workflow = trans.sa_session.query(self.app.model.StoredWorkflow).get(trans.security.decode_id(workflow_id)) - if stored_workflow.user != trans.user and not trans.user_is_admin(): - if trans.sa_session.query(trans.app.model.StoredWorkflowUserShareAssociation).filter_by(user=trans.user, stored_workflow=stored_workflow).count() == 0: - trans.response.status = 400 - return("Workflow is not owned by or shared with current user") + stored_workflow = self.__get_stored_accessible_workflow( trans, workflow_id ) workflow = stored_workflow.latest_workflow # Sanity checks. - if not workflow: - trans.response.status = 400 - return "Workflow not found." if len( workflow.steps ) == 0: - trans.response.status = 400 - return "Workflow cannot be run because it does not have any steps" + raise exceptions.MessageException( "Workflow cannot be run because it does not have any steps" ) if workflow.has_cycles: - trans.response.status = 400 - return "Workflow cannot be run because it contains cycles" + raise exceptions.MessageException( "Workflow cannot be run because it contains cycles" ) if workflow.has_errors: - trans.response.status = 400 - return "Workflow cannot be run because of validation errors in some steps" + message = "Workflow cannot be run because of validation errors in some steps" + raise exceptions.MessageException( message ) # Get target history. if history_param.startswith('hist_id='): # Passing an existing history to use. - history = trans.sa_session.query(self.app.model.History).get(trans.security.decode_id(history_param[8:])) - if history.user != trans.user and not trans.user_is_admin(): - trans.response.status = 400 - return "Invalid History specified." + encoded_history_id = history_param[ 8: ] + history_id = self.__decode_id( trans, encoded_history_id, model_type="history" ) + history = self.history_manager.get( trans, history_id, check_ownership=True ) else: # Send workflow outputs to new history. history = self.app.model.History(name=history_param, user=trans.user) @@ -284,8 +288,8 @@ inputs[k]['id'] ) else: - trans.response.status = 400 - return "Unknown dataset source '%s' specified." % inputs[k]['src'] + message = "Unknown workflow input source '%s' specified." % inputs[k]['src'] + raise exceptions.RequestParameterInvalidException( message ) if add_to_history and content.history != history: content = content.copy() if isinstance( content, self.app.model.HistoryDatasetAssociation ): @@ -294,8 +298,8 @@ history.add_dataset_collection( content ) inputs[k]['hda'] = content # TODO: rename key to 'content', prescreen input ensure not populated explicitly except AssertionError: - trans.response.status = 400 - return "Invalid Dataset '%s' Specified" % inputs[k]['id'] + message = "Invalid workflow input '%s' specified" % inputs[k]['id'] + raise exceptions.ItemAccessibilityException( message ) # Run each step, connecting outputs to inputs replacement_dict = payload.get('replacement_params', {}) @@ -308,6 +312,8 @@ param_map=param_map, ) + # invoke may throw MessageExceptions on tool erors, failure + # to match up inputs, etc... outputs = invoke( trans=trans, workflow=workflow, @@ -327,30 +333,22 @@ return rval - @web.expose_api + @expose_api def workflow_dict( self, trans, workflow_id, **kwd ): """ GET /api/workflows/{encoded_workflow_id}/download Returns a selected workflow as a json dictionary. """ - try: - stored_workflow = trans.sa_session.query(self.app.model.StoredWorkflow).get(trans.security.decode_id(workflow_id)) - except Exception, e: - return ("Workflow with ID='%s' can not be found\n Exception: %s") % (workflow_id, str( e )) - # check to see if user has permissions to selected workflow - if stored_workflow.user != trans.user and not trans.user_is_admin(): - if trans.sa_session.query(trans.app.model.StoredWorkflowUserShareAssociation).filter_by(user=trans.user, stored_workflow=stored_workflow).count() == 0: - trans.response.status = 400 - return("Workflow is not owned by or shared with current user") + stored_workflow = self.__get_stored_accessible_workflow( trans, workflow_id ) ret_dict = self._workflow_to_dict( trans, stored_workflow ) if not ret_dict: # This workflow has a tool that's missing from the distribution - trans.response.status = 400 - return "Workflow cannot be exported due to missing tools." + message = "Workflow cannot be exported due to missing tools." + raise exceptions.MessageException( message ) return ret_dict - @web.expose_api + @expose_api def delete( self, trans, id, **kwd ): """ DELETE /api/workflows/{encoded_workflow_id} @@ -379,19 +377,39 @@ # TODO: Unsure of response message to let api know that a workflow was successfully deleted return ( "Workflow '%s' successfully deleted" % stored_workflow.name ) - @web.expose_api - def import_new_workflow(self, trans, payload, **kwd): + @expose_api + def import_new_workflow_deprecated(self, trans, payload, **kwd): """ POST /api/workflows/upload Importing dynamic workflows from the api. Return newly generated workflow id. Author: rpark # currently assumes payload['workflow'] is a json representation of a workflow to be inserted into the database + + Deprecated in favor to POST /api/workflows with encoded 'workflow' in + payload the same way. """ + return self.__api_import_new_workflow( trans, payload, **kwd ) + def __api_import_new_workflow( self, trans, payload, **kwd ): data = payload['workflow'] - workflow, missing_tool_tups = self._workflow_from_dict( trans, data, source="API" ) + publish = util.string_as_bool( payload.get( "publish", False ) ) + # If 'publish' set, default to importable. + importable = util.string_as_bool( payload.get( "importable", publish ) ) + + if publish and not importable: + raise exceptions.RequestParameterInvalidException( "Published workflow must be importable." ) + + from_dict_kwds = dict( + source="API", + publish=publish, + ) + workflow, missing_tool_tups = self._workflow_from_dict( trans, data, **from_dict_kwds ) + + if importable: + self._make_item_accessible( trans.sa_session, workflow ) + trans.sa_session.flush() # galaxy workflow newly created id workflow_id = workflow.id @@ -409,7 +427,7 @@ return item @expose_api - def import_shared_workflow(self, trans, payload, **kwd): + def import_shared_workflow_deprecated(self, trans, payload, **kwd): """ POST /api/workflows/import Import a workflow shared by other users. @@ -423,14 +441,17 @@ workflow_id = payload.get('workflow_id', None) if workflow_id is None: raise exceptions.ObjectAttributeMissingException( "Missing required parameter 'workflow_id'." ) + self.__api_import_shared_workflow( trans, workflow_id, payload ) + + def __api_import_shared_workflow( self, trans, workflow_id, payload, **kwd ): try: stored_workflow = self.get_stored_workflow( trans, workflow_id, check_ownership=False ) except: raise exceptions.ObjectNotFound( "Malformed workflow id ( %s ) specified." % workflow_id ) if stored_workflow.importable is False: - raise exceptions.MessageException( 'The owner of this workflow has disabled imports via this link.' ) + raise exceptions.ItemAccessibilityException( 'The owner of this workflow has disabled imports via this link.' ) elif stored_workflow.deleted: - raise exceptions.MessageException( "You can't import this workflow because it has been deleted." ) + raise exceptions.ItemDeletionException( "You can't import this workflow because it has been deleted." ) imported_workflow = self._import_shared_workflow( trans, stored_workflow ) item = imported_workflow.to_dict( value_mapper={ 'id': trans.security.encode_id } ) encoded_id = trans.security.encode_id(imported_workflow.id) @@ -491,3 +512,32 @@ if out is not None: return self.encode_all_ids( trans, out.to_dict('element'), True) return None + + def __get_stored_accessible_workflow( self, trans, workflow_id ): + stored_workflow = self.__get_stored_workflow( trans, workflow_id ) + + # check to see if user has permissions to selected workflow + if stored_workflow.user != trans.user and not trans.user_is_admin(): + if trans.sa_session.query(trans.app.model.StoredWorkflowUserShareAssociation).filter_by(user=trans.user, stored_workflow=stored_workflow).count() == 0: + message = "Workflow is not owned by or shared with current user" + raise exceptions.ItemAccessibilityException( message ) + + return stored_workflow + + def __get_stored_workflow( self, trans, workflow_id ): + try: + workflow_id = self.__decode_id( trans, workflow_id ) + query = trans.sa_session.query( trans.app.model.StoredWorkflow ) + stored_workflow = query.get( workflow_id ) + except Exception: + raise exceptions.ObjectNotFound( "No such workflow found - invalid workflow identifier." ) + if stored_workflow is None: + raise exceptions.ObjectNotFound( "No such workflow found." ) + return stored_workflow + + def __decode_id( self, trans, workflow_id, model_type="workflow" ): + try: + return trans.security.decode_id( workflow_id ) + except Exception: + message = "Malformed %s id ( %s ) specified, unable to decode" % ( model_type, workflow_id ) + raise exceptions.MalformedId( message ) diff -r fcbd2849ebd17a80e66e0fbdac9df7bb37f88fa2 -r de2e33813275b89a04e7de4e95b21f6d2e100d4e lib/galaxy/webapps/galaxy/buildapp.py --- a/lib/galaxy/webapps/galaxy/buildapp.py +++ b/lib/galaxy/webapps/galaxy/buildapp.py @@ -213,13 +213,13 @@ webapp.add_route( '/visualization/show/:visualization_name', controller='visualization', action='render', visualization_name=None ) - # "POST /api/workflows/import" => ``workflows.import_workflow()``. - # Defines a named route "import_workflow". - webapp.mapper.connect( 'import_workflow', '/api/workflows/upload', controller='workflows', action='import_new_workflow', conditions=dict( method=['POST'] ) ) + # Deprecated in favor of POST /api/workflows with 'workflow' in payload. + webapp.mapper.connect( 'import_workflow_deprecated', '/api/workflows/upload', controller='workflows', action='import_new_workflow_deprecated', conditions=dict( method=['POST'] ) ) webapp.mapper.connect( 'workflow_dict', '/api/workflows/{workflow_id}/download', controller='workflows', action='workflow_dict', conditions=dict( method=['GET'] ) ) # Preserve the following download route for now for dependent applications -- deprecate at some point webapp.mapper.connect( 'workflow_dict', '/api/workflows/download/{workflow_id}', controller='workflows', action='workflow_dict', conditions=dict( method=['GET'] ) ) - webapp.mapper.connect( 'import_shared_workflow', '/api/workflows/import', controller='workflows', action='import_shared_workflow', conditions=dict( method=['POST'] ) ) + # Deprecated in favor of POST /api/workflows with shared_workflow_id in payload. + webapp.mapper.connect( 'import_shared_workflow_deprecated', '/api/workflows/import', controller='workflows', action='import_shared_workflow_deprecated', conditions=dict( method=['POST'] ) ) webapp.mapper.connect( 'workflow_usage', '/api/workflows/{workflow_id}/usage', controller='workflows', action='workflow_usage', conditions=dict(method=['GET'])) webapp.mapper.connect( 'workflow_usage_contents', '/api/workflows/{workflow_id}/usage/{usage_id}', controller='workflows', action='workflow_usage_contents', conditions=dict(method=['GET'])) diff -r fcbd2849ebd17a80e66e0fbdac9df7bb37f88fa2 -r de2e33813275b89a04e7de4e95b21f6d2e100d4e test/api/test_workflows.py --- a/test/api/test_workflows.py +++ b/test/api/test_workflows.py @@ -9,6 +9,7 @@ from .helpers import skip_without_tool from base.interactor import delete_request # requests like delete +from galaxy.exceptions import error_codes # Workflow API TODO: @@ -24,6 +25,16 @@ self.dataset_populator = DatasetPopulator( self.galaxy_interactor ) self.dataset_collection_populator = DatasetCollectionPopulator( self.galaxy_interactor ) + def test_show_invalid_is_404( self ): + show_response = self._get( "workflow/%s" % self._random_key() ) + self._assert_status_code_is( show_response, 404 ) + + def test_cannot_show_private_workflow( self ): + workflow_id = self.workflow_populator.simple_workflow( "test_not_importportable" ) + with self._different_user(): + show_response = self._get( "workflows/%s" % workflow_id ) + self._assert_status_code_is( show_response, 403 ) + def test_delete( self ): workflow_id = self.workflow_populator.simple_workflow( "test_delete" ) workflow_name = "test_delete (imported from API)" @@ -46,14 +57,44 @@ self._assert_status_code_is( index_response, 200 ) assert isinstance( index_response.json(), list ) - def test_import( self ): + def test_upload( self ): + self.__test_upload( use_deprecated_route=False ) + + def test_upload_deprecated( self ): + self.__test_upload( use_deprecated_route=True ) + + def __test_upload( self, use_deprecated_route ): data = dict( workflow=dumps( self.workflow_populator.load_workflow( name="test_import" ) ), ) - upload_response = self._post( "workflows/upload", data=data ) + if use_deprecated_route: + route = "workflows/upload" + else: + route = "workflows" + upload_response = self._post( route, data=data ) self._assert_status_code_is( upload_response, 200 ) self._assert_user_has_workflow_with_name( "test_import (imported from API)" ) + def test_import_deprecated( self ): + workflow_id = self.workflow_populator.simple_workflow( "test_import_published_deprecated", publish=True ) + with self._different_user(): + other_import_response = self.__import_workflow( workflow_id ) + self._assert_status_code_is( other_import_response, 200 ) + self._assert_user_has_workflow_with_name( "imported: test_import_published_deprecated (imported from API)") + + def test_not_importable_prevents_import( self ): + workflow_id = self.workflow_populator.simple_workflow( "test_not_importportable" ) + with self._different_user(): + other_import_response = self.__import_workflow( workflow_id ) + self._assert_status_code_is( other_import_response, 403 ) + + def test_import_published( self ): + workflow_id = self.workflow_populator.simple_workflow( "test_import_published", publish=True ) + with self._different_user(): + other_import_response = self.__import_workflow( workflow_id, deprecated_route=True ) + self._assert_status_code_is( other_import_response, 200 ) + self._assert_user_has_workflow_with_name( "imported: test_import_published (imported from API)") + def test_export( self ): uploaded_workflow_id = self.workflow_populator.simple_workflow( "test_for_export" ) download_response = self._get( "workflows/%s/download" % uploaded_workflow_id ) @@ -85,6 +126,29 @@ self._assert_status_code_is( run_workflow_response, 200 ) self.dataset_populator.wait_for_history( history_id, assert_ok=True ) + def test_cannot_run_inaccessible_workflow( self ): + workflow = self.workflow_populator.load_workflow( name="test_for_run_cannot_access" ) + workflow_request, history_id = self._setup_workflow_run( workflow ) + with self._different_user(): + run_workflow_response = self._post( "workflows", data=workflow_request ) + self._assert_status_code_is( run_workflow_response, 403 ) + + def test_404_on_invalid_workflow( self ): + workflow = self.workflow_populator.load_workflow( name="test_for_run_does_not_exist" ) + workflow_request, history_id = self._setup_workflow_run( workflow ) + workflow_request[ "workflow_id" ] = self._random_key() + run_workflow_response = self._post( "workflows", data=workflow_request ) + self._assert_status_code_is( run_workflow_response, 404 ) + + def test_cannot_run_against_other_users_history( self ): + workflow = self.workflow_populator.load_workflow( name="test_for_run_does_not_exist" ) + workflow_request, history_id = self._setup_workflow_run( workflow ) + with self._different_user(): + other_history_id = self.dataset_populator.new_history() + workflow_request[ "history" ] = "hist_id=%s" % other_history_id + run_workflow_response = self._post( "workflows", data=workflow_request ) + self._assert_status_code_is( run_workflow_response, 403 ) + @skip_without_tool( "cat1" ) @skip_without_tool( "collection_two_paired" ) def test_run_workflow_collection_params( self ): @@ -233,6 +297,20 @@ collection_step_state = loads( collection_step[ "tool_state" ] ) self.assertEquals( collection_step_state[ "collection_type" ], u"paired" ) + def test_empty_create( self ): + response = self._post( "workflows" ) + self._assert_status_code_is( response, 400 ) + self._assert_error_code_is( response, error_codes.USER_REQUEST_MISSING_PARAMETER ) + + def test_invalid_create_multiple_types( self ): + data = { + 'shared_workflow_id': '1234567890abcdef', + 'from_history_id': '1234567890abcdef' + } + response = self._post( "workflows", data ) + self._assert_status_code_is( response, 400 ) + self._assert_error_code_is( response, error_codes.USER_REQUEST_INVALID_PARAMETER ) + @skip_without_tool( "random_lines1" ) def test_extract_mapping_workflow_from_history( self ): history_id = self.dataset_populator.new_history() @@ -551,3 +629,16 @@ self._assert_status_code_is( index_response, 200 ) names = map( lambda w: w[ "name" ], index_response.json() ) return names + + def __import_workflow( self, workflow_id, deprecated_route=False ): + if deprecated_route: + route = "workflows/import" + import_data = dict( + workflow_id=workflow_id, + ) + else: + route = "workflows" + import_data = dict( + shared_workflow_id=workflow_id, + ) + return self._post( route, import_data ) Repository URL: https://bitbucket.org/galaxy/galaxy-central/ -- This is a commit notification from bitbucket.org. You are receiving this because you have the service enabled, addressing the recipient of this email.
participants (1)
-
commits-noreply@bitbucket.org