2 new commits in galaxy-central:
https://bitbucket.org/galaxy/galaxy-central/commits/a658be4c5c37/
Changeset: a658be4c5c37
User: jmchilton
Date: 2014-02-02 02:49:54
Summary: Search API - allow non-admins to search their own workflows.
Just makes sense, but also enables easier functional tests of search.
Affected #: 1 file
diff -r a629ff293db60b781a9abb29309bc56a2107d366 -r
a658be4c5c37165c790ab7bf95801cc230a1a04c lib/galaxy/webapps/galaxy/api/search.py
--- a/lib/galaxy/webapps/galaxy/api/search.py
+++ b/lib/galaxy/webapps/galaxy/api/search.py
@@ -43,7 +43,7 @@
elif type( item ) in [ trans.app.model.Job ]:
if item.used_id == trans.user or trans.user_is_admin():
append = True
- elif type( item ) in [ trans.app.model.Page ]:
+ elif type( item ) in [ trans.app.model.Page,
trans.app.model.StoredWorkflow ]:
try:
if self.security_check( trans, item, False, True):
append = True
https://bitbucket.org/galaxy/galaxy-central/commits/89d7cf67241f/
Changeset: 89d7cf67241f
User: jmchilton
Date: 2014-02-02 02:53:04
Summary: Add some intial functional testing of search API.
Refactored workflow creation stuff in test_workflows to allow it to be used other places
(namely test_search.py). Tests searching for workflows and filtering out deleted
workflows.
Affected #: 3 files
diff -r a658be4c5c37165c790ab7bf95801cc230a1a04c -r
89d7cf67241f36e5f2c4de0aa31328fb7823e7d5 test/functional/api/helpers.py
--- a/test/functional/api/helpers.py
+++ b/test/functional/api/helpers.py
@@ -1,5 +1,9 @@
import time
from json import dumps
+from json import loads
+from pkg_resources import resource_string
+
+workflow_str = resource_string( __name__, "test_workflow_1.ga" )
class TestsDatasets:
@@ -56,3 +60,35 @@
history_id=history_id,
**kwds
)
+
+
+class WorkflowPopulator( object ):
+ # Impulse is to make this a Mixin, but probably better as an object.
+
+ def __init__( self, api_test_case ):
+ self.api_test_case = api_test_case
+
+ def load_workflow( self, name, add_pja=False ):
+ workflow = loads( workflow_str )
+ workflow[ "name" ] = name
+ if add_pja:
+ tool_step = workflow[ "steps" ][ "2" ]
+ tool_step[ "post_job_actions" ][
"RenameDatasetActionout_file1" ] = dict(
+ action_type="RenameDatasetAction",
+ output_name="out_file1",
+ action_arguments=dict( newname="the_new_name" ),
+ )
+ return workflow
+
+ def simple_workflow( self, name, **create_kwds ):
+ workflow = self.load_workflow( name )
+ return self.create_workflow( workflow, **create_kwds )
+
+ def create_workflow( self, workflow, **create_kwds ):
+ data = dict(
+ workflow=dumps( workflow ),
+ **create_kwds
+ )
+ upload_response = self.api_test_case._post( "workflows/upload",
data=data )
+ uploaded_workflow_id = upload_response.json()[ "id" ]
+ return uploaded_workflow_id
diff -r a658be4c5c37165c790ab7bf95801cc230a1a04c -r
89d7cf67241f36e5f2c4de0aa31328fb7823e7d5 test/functional/api/test_search.py
--- /dev/null
+++ b/test/functional/api/test_search.py
@@ -0,0 +1,32 @@
+from base import api
+from base.interactor import delete_request
+
+from .helpers import WorkflowPopulator
+
+
+class SearchApiTestCase( api.ApiTestCase ):
+
+ def test_search_workflows( self ):
+ workflow_populator = WorkflowPopulator( self )
+ workflow_id = workflow_populator.simple_workflow( "test_for_search" )
+ search_response = self.__search( "select * from workflow" )
+ assert self.__has_result_with_name( search_response, "test_for_search
(imported from API)" ), search_response.json()
+
+ # Deleted
+ delete_url = self._api_url( "workflows/%s" % workflow_id, use_key=True
)
+ delete_request( delete_url )
+
+ search_response = self.__search( "select * from workflow where not
deleted" )
+ assert not self.__has_result_with_name( search_response, "test_for_search
(imported from API)" ), search_response.json()
+
+ def __search( self, query ):
+ data = dict( query=query )
+ search_response = self._post( "search", data=data )
+ self._assert_status_code_is( search_response, 200 )
+ return search_response
+
+ def __has_result_with_name( self, search_response, name ):
+ search_response_object = search_response.json()
+ assert "results" in search_response_object, search_response_object
+ results = search_response_object[ "results" ]
+ return name in map( lambda r: r.get( "name", None ), results )
diff -r a658be4c5c37165c790ab7bf95801cc230a1a04c -r
89d7cf67241f36e5f2c4de0aa31328fb7823e7d5 test/functional/api/test_workflows.py
--- a/test/functional/api/test_workflows.py
+++ b/test/functional/api/test_workflows.py
@@ -1,14 +1,11 @@
from base import api
-from json import loads
from json import dumps
-from pkg_resources import resource_string
import time
from .helpers import TestsDatasets
+from .helpers import WorkflowPopulator
from base.interactor import delete_request # requests like delete
-workflow_str = resource_string( __name__, "test_workflow_1.ga" )
-
# Workflow API TODO:
# - Allow history_id as param to workflow run action. (hist_id)
@@ -17,8 +14,12 @@
# - Much more testing obviously, always more testing.
class WorkflowsApiTestCase( api.ApiTestCase, TestsDatasets ):
+ def setUp( self ):
+ super( WorkflowsApiTestCase, self ).setUp()
+ self.workflow_populator = WorkflowPopulator( self )
+
def test_delete( self ):
- workflow_id = self._simple_workflow( "test_delete" )
+ workflow_id = self.workflow_populator.simple_workflow( "test_delete" )
workflow_name = "test_delete (imported from API)"
self._assert_user_has_workflow_with_name( workflow_name )
workflow_url = self._api_url( "workflows/%s" % workflow_id,
use_key=True )
@@ -28,7 +29,7 @@
assert workflow_name not in self.__workflow_names()
def test_other_cannot_delete( self ):
- workflow_id = self._simple_workflow( "test_other_delete" )
+ workflow_id = self.workflow_populator.simple_workflow(
"test_other_delete" )
with self._different_user():
workflow_url = self._api_url( "workflows/%s" % workflow_id,
use_key=True )
delete_response = delete_request( workflow_url )
@@ -41,14 +42,14 @@
def test_import( self ):
data = dict(
- workflow=dumps( self._load_workflow( name="test_import" ) ),
+ workflow=dumps( self.workflow_populator.load_workflow(
name="test_import" ) ),
)
upload_response = self._post( "workflows/upload", data=data )
self._assert_status_code_is( upload_response, 200 )
self._assert_user_has_workflow_with_name( "test_import (imported from
API)" )
def test_export( self ):
- uploaded_workflow_id = self._simple_workflow( "test_for_export" )
+ uploaded_workflow_id = self.workflow_populator.simple_workflow(
"test_for_export" )
download_response = self._get( "workflows/%s/download" %
uploaded_workflow_id )
self._assert_status_code_is( download_response, 200 )
downloaded_workflow = download_response.json()
@@ -58,7 +59,7 @@
assert first_input[ "name" ] == "WorkflowInput1"
def test_run_workflow( self ):
- workflow = self._load_workflow( name="test_for_run" )
+ workflow = self.workflow_populator.load_workflow( name="test_for_run"
)
workflow_request, history_id = self._setup_workflow_run( workflow )
# TODO: This should really be a post to workflows/<workflow_id>/run or
# something like that.
@@ -67,8 +68,8 @@
self._wait_for_history( history_id, assert_ok=True )
def test_pja_import_export( self ):
- workflow = self._load_workflow( name="test_for_pja_import",
add_pja=True )
- uploaded_workflow_id = self._create_workflow( workflow )
+ workflow = self.workflow_populator.load_workflow(
name="test_for_pja_import", add_pja=True )
+ uploaded_workflow_id = self.workflow_populator.create_workflow( workflow )
download_response = self._get( "workflows/%s/download" %
uploaded_workflow_id )
downloaded_workflow = download_response.json()
self._assert_has_keys( downloaded_workflow[ "steps" ], "0",
"1", "2" )
@@ -80,7 +81,7 @@
def test_post_job_action( self ):
""" Tests both import and execution of post job actions.
"""
- workflow = self._load_workflow( name="test_for_pja_run", add_pja=True
)
+ workflow = self.workflow_populator.load_workflow(
name="test_for_pja_run", add_pja=True )
workflow_request, history_id = self._setup_workflow_run( workflow )
run_workflow_response = self._post( "workflows", data=workflow_request
)
self._assert_status_code_is( run_workflow_response, 200 )
@@ -92,7 +93,7 @@
assert "the_new_name" in map( lambda hda: hda[ "name" ],
contents )
def _setup_workflow_run( self, workflow ):
- uploaded_workflow_id = self._create_workflow( workflow )
+ uploaded_workflow_id = self.workflow_populator.create_workflow( workflow )
workflow_show_resposne = self._get( "workflows/%s" %
uploaded_workflow_id )
self._assert_status_code_is( workflow_show_resposne, 200 )
workflow_inputs = workflow_show_resposne.json()[ "inputs" ]
@@ -119,20 +120,6 @@
def _ds_entry( self, hda ):
return dict( src="hda", id=hda[ "id" ] )
- def _simple_workflow( self, name, **create_kwds ):
- workflow = self._load_workflow( name )
- return self._create_workflow( workflow, **create_kwds )
-
- def _create_workflow( self, workflow, **create_kwds ):
- data = dict(
- workflow=dumps( workflow ),
- **create_kwds
- )
- upload_response = self._post( "workflows/upload", data=data )
- self._assert_status_code_is( upload_response, 200 )
- uploaded_workflow_id = upload_response.json()[ "id" ]
- return uploaded_workflow_id
-
def _assert_user_has_workflow_with_name( self, name ):
names = self.__workflow_names()
assert name in names, "No workflows with name %s in users workflows
<%s>" % ( name, names )
@@ -142,15 +129,3 @@
self._assert_status_code_is( index_response, 200 )
names = map( lambda w: w[ "name" ], index_response.json() )
return names
-
- def _load_workflow( self, name, add_pja=False ):
- workflow = loads( workflow_str )
- workflow[ "name" ] = name
- if add_pja:
- tool_step = workflow[ "steps" ][ "2" ]
- tool_step[ "post_job_actions" ][
"RenameDatasetActionout_file1" ] = dict(
- action_type="RenameDatasetAction",
- output_name="out_file1",
- action_arguments=dict( newname="the_new_name" ),
- )
- return workflow
Repository URL:
https://bitbucket.org/galaxy/galaxy-central/
--
This is a commit notification from
bitbucket.org. You are receiving
this because you have the service enabled, addressing the recipient of
this email.