2 new commits in galaxy-central:
https://bitbucket.org/galaxy/galaxy-central/commits/a658be4c5c37/ Changeset: a658be4c5c37 User: jmchilton Date: 2014-02-02 02:49:54 Summary: Search API - allow non-admins to search their own workflows.
Just makes sense, but also enables easier functional tests of search. Affected #: 1 file
diff -r a629ff293db60b781a9abb29309bc56a2107d366 -r a658be4c5c37165c790ab7bf95801cc230a1a04c lib/galaxy/webapps/galaxy/api/search.py --- a/lib/galaxy/webapps/galaxy/api/search.py +++ b/lib/galaxy/webapps/galaxy/api/search.py @@ -43,7 +43,7 @@ elif type( item ) in [ trans.app.model.Job ]: if item.used_id == trans.user or trans.user_is_admin(): append = True - elif type( item ) in [ trans.app.model.Page ]: + elif type( item ) in [ trans.app.model.Page, trans.app.model.StoredWorkflow ]: try: if self.security_check( trans, item, False, True): append = True
https://bitbucket.org/galaxy/galaxy-central/commits/89d7cf67241f/ Changeset: 89d7cf67241f User: jmchilton Date: 2014-02-02 02:53:04 Summary: Add some intial functional testing of search API.
Refactored workflow creation stuff in test_workflows to allow it to be used other places (namely test_search.py). Tests searching for workflows and filtering out deleted workflows. Affected #: 3 files
diff -r a658be4c5c37165c790ab7bf95801cc230a1a04c -r 89d7cf67241f36e5f2c4de0aa31328fb7823e7d5 test/functional/api/helpers.py --- a/test/functional/api/helpers.py +++ b/test/functional/api/helpers.py @@ -1,5 +1,9 @@ import time from json import dumps +from json import loads +from pkg_resources import resource_string + +workflow_str = resource_string( __name__, "test_workflow_1.ga" )
class TestsDatasets: @@ -56,3 +60,35 @@ history_id=history_id, **kwds ) + + +class WorkflowPopulator( object ): + # Impulse is to make this a Mixin, but probably better as an object. + + def __init__( self, api_test_case ): + self.api_test_case = api_test_case + + def load_workflow( self, name, add_pja=False ): + workflow = loads( workflow_str ) + workflow[ "name" ] = name + if add_pja: + tool_step = workflow[ "steps" ][ "2" ] + tool_step[ "post_job_actions" ][ "RenameDatasetActionout_file1" ] = dict( + action_type="RenameDatasetAction", + output_name="out_file1", + action_arguments=dict( newname="the_new_name" ), + ) + return workflow + + def simple_workflow( self, name, **create_kwds ): + workflow = self.load_workflow( name ) + return self.create_workflow( workflow, **create_kwds ) + + def create_workflow( self, workflow, **create_kwds ): + data = dict( + workflow=dumps( workflow ), + **create_kwds + ) + upload_response = self.api_test_case._post( "workflows/upload", data=data ) + uploaded_workflow_id = upload_response.json()[ "id" ] + return uploaded_workflow_id
diff -r a658be4c5c37165c790ab7bf95801cc230a1a04c -r 89d7cf67241f36e5f2c4de0aa31328fb7823e7d5 test/functional/api/test_search.py --- /dev/null +++ b/test/functional/api/test_search.py @@ -0,0 +1,32 @@ +from base import api +from base.interactor import delete_request + +from .helpers import WorkflowPopulator + + +class SearchApiTestCase( api.ApiTestCase ): + + def test_search_workflows( self ): + workflow_populator = WorkflowPopulator( self ) + workflow_id = workflow_populator.simple_workflow( "test_for_search" ) + search_response = self.__search( "select * from workflow" ) + assert self.__has_result_with_name( search_response, "test_for_search (imported from API)" ), search_response.json() + + # Deleted + delete_url = self._api_url( "workflows/%s" % workflow_id, use_key=True ) + delete_request( delete_url ) + + search_response = self.__search( "select * from workflow where not deleted" ) + assert not self.__has_result_with_name( search_response, "test_for_search (imported from API)" ), search_response.json() + + def __search( self, query ): + data = dict( query=query ) + search_response = self._post( "search", data=data ) + self._assert_status_code_is( search_response, 200 ) + return search_response + + def __has_result_with_name( self, search_response, name ): + search_response_object = search_response.json() + assert "results" in search_response_object, search_response_object + results = search_response_object[ "results" ] + return name in map( lambda r: r.get( "name", None ), results )
diff -r a658be4c5c37165c790ab7bf95801cc230a1a04c -r 89d7cf67241f36e5f2c4de0aa31328fb7823e7d5 test/functional/api/test_workflows.py --- a/test/functional/api/test_workflows.py +++ b/test/functional/api/test_workflows.py @@ -1,14 +1,11 @@ from base import api -from json import loads from json import dumps -from pkg_resources import resource_string import time from .helpers import TestsDatasets +from .helpers import WorkflowPopulator
from base.interactor import delete_request # requests like delete
-workflow_str = resource_string( __name__, "test_workflow_1.ga" ) -
# Workflow API TODO: # - Allow history_id as param to workflow run action. (hist_id) @@ -17,8 +14,12 @@ # - Much more testing obviously, always more testing. class WorkflowsApiTestCase( api.ApiTestCase, TestsDatasets ):
+ def setUp( self ): + super( WorkflowsApiTestCase, self ).setUp() + self.workflow_populator = WorkflowPopulator( self ) + def test_delete( self ): - workflow_id = self._simple_workflow( "test_delete" ) + workflow_id = self.workflow_populator.simple_workflow( "test_delete" ) workflow_name = "test_delete (imported from API)" self._assert_user_has_workflow_with_name( workflow_name ) workflow_url = self._api_url( "workflows/%s" % workflow_id, use_key=True ) @@ -28,7 +29,7 @@ assert workflow_name not in self.__workflow_names()
def test_other_cannot_delete( self ): - workflow_id = self._simple_workflow( "test_other_delete" ) + workflow_id = self.workflow_populator.simple_workflow( "test_other_delete" ) with self._different_user(): workflow_url = self._api_url( "workflows/%s" % workflow_id, use_key=True ) delete_response = delete_request( workflow_url ) @@ -41,14 +42,14 @@
def test_import( self ): data = dict( - workflow=dumps( self._load_workflow( name="test_import" ) ), + workflow=dumps( self.workflow_populator.load_workflow( name="test_import" ) ), ) upload_response = self._post( "workflows/upload", data=data ) self._assert_status_code_is( upload_response, 200 ) self._assert_user_has_workflow_with_name( "test_import (imported from API)" )
def test_export( self ): - uploaded_workflow_id = self._simple_workflow( "test_for_export" ) + uploaded_workflow_id = self.workflow_populator.simple_workflow( "test_for_export" ) download_response = self._get( "workflows/%s/download" % uploaded_workflow_id ) self._assert_status_code_is( download_response, 200 ) downloaded_workflow = download_response.json() @@ -58,7 +59,7 @@ assert first_input[ "name" ] == "WorkflowInput1"
def test_run_workflow( self ): - workflow = self._load_workflow( name="test_for_run" ) + workflow = self.workflow_populator.load_workflow( name="test_for_run" ) workflow_request, history_id = self._setup_workflow_run( workflow ) # TODO: This should really be a post to workflows/<workflow_id>/run or # something like that. @@ -67,8 +68,8 @@ self._wait_for_history( history_id, assert_ok=True )
def test_pja_import_export( self ): - workflow = self._load_workflow( name="test_for_pja_import", add_pja=True ) - uploaded_workflow_id = self._create_workflow( workflow ) + workflow = self.workflow_populator.load_workflow( name="test_for_pja_import", add_pja=True ) + uploaded_workflow_id = self.workflow_populator.create_workflow( workflow ) download_response = self._get( "workflows/%s/download" % uploaded_workflow_id ) downloaded_workflow = download_response.json() self._assert_has_keys( downloaded_workflow[ "steps" ], "0", "1", "2" ) @@ -80,7 +81,7 @@ def test_post_job_action( self ): """ Tests both import and execution of post job actions. """ - workflow = self._load_workflow( name="test_for_pja_run", add_pja=True ) + workflow = self.workflow_populator.load_workflow( name="test_for_pja_run", add_pja=True ) workflow_request, history_id = self._setup_workflow_run( workflow ) run_workflow_response = self._post( "workflows", data=workflow_request ) self._assert_status_code_is( run_workflow_response, 200 ) @@ -92,7 +93,7 @@ assert "the_new_name" in map( lambda hda: hda[ "name" ], contents )
def _setup_workflow_run( self, workflow ): - uploaded_workflow_id = self._create_workflow( workflow ) + uploaded_workflow_id = self.workflow_populator.create_workflow( workflow ) workflow_show_resposne = self._get( "workflows/%s" % uploaded_workflow_id ) self._assert_status_code_is( workflow_show_resposne, 200 ) workflow_inputs = workflow_show_resposne.json()[ "inputs" ] @@ -119,20 +120,6 @@ def _ds_entry( self, hda ): return dict( src="hda", id=hda[ "id" ] )
- def _simple_workflow( self, name, **create_kwds ): - workflow = self._load_workflow( name ) - return self._create_workflow( workflow, **create_kwds ) - - def _create_workflow( self, workflow, **create_kwds ): - data = dict( - workflow=dumps( workflow ), - **create_kwds - ) - upload_response = self._post( "workflows/upload", data=data ) - self._assert_status_code_is( upload_response, 200 ) - uploaded_workflow_id = upload_response.json()[ "id" ] - return uploaded_workflow_id - def _assert_user_has_workflow_with_name( self, name ): names = self.__workflow_names() assert name in names, "No workflows with name %s in users workflows <%s>" % ( name, names ) @@ -142,15 +129,3 @@ self._assert_status_code_is( index_response, 200 ) names = map( lambda w: w[ "name" ], index_response.json() ) return names - - def _load_workflow( self, name, add_pja=False ): - workflow = loads( workflow_str ) - workflow[ "name" ] = name - if add_pja: - tool_step = workflow[ "steps" ][ "2" ] - tool_step[ "post_job_actions" ][ "RenameDatasetActionout_file1" ] = dict( - action_type="RenameDatasetAction", - output_name="out_file1", - action_arguments=dict( newname="the_new_name" ), - ) - return workflow
Repository URL: https://bitbucket.org/galaxy/galaxy-central/
--
This is a commit notification from bitbucket.org. You are receiving this because you have the service enabled, addressing the recipient of this email.
galaxy-commits@lists.galaxyproject.org