1 new commit in galaxy-central: https://bitbucket.org/galaxy/galaxy-central/commits/7e9ef97e0449/ Changeset: 7e9ef97e0449 User: jmchilton Date: 2014-01-23 09:04:52 Summary: Add workflow API tests. Test import, export, index, show, run workflows, and post job actions. Affected #: 4 files diff -r f86086c92a2291e11bd1d76afa67b9166a8dc0ee -r 7e9ef97e04496434313430ee71aff7033c50c390 test/functional/api/helpers.py --- /dev/null +++ b/test/functional/api/helpers.py @@ -0,0 +1,58 @@ +import time +from json import dumps + + +class TestsDatasets: + + def _new_dataset( self, history_id, content='TestData123', **kwds ): + payload = self._upload_payload( history_id, content, **kwds ) + run_response = self._post( "tools", data=payload ) + self._assert_status_code_is( run_response, 200 ) + return run_response.json()["outputs"][0] + + def _wait_for_history( self, history_id, assert_ok=False ): + while True: + history_details_response = self._get( "histories/%s" % history_id ) + self._assert_status_code_is( history_details_response, 200 ) + history_state = history_details_response.json()[ "state" ] + if history_state not in [ "running", "queued" ]: + break + time.sleep( .1 ) + if assert_ok: + self.assertEquals( history_state, 'ok' ) + + def _new_history( self, **kwds ): + name = kwds.get( "name", "API Test History" ) + create_history_response = self._post( "histories", data=dict( name=name ) ) + self._assert_status_code_is( create_history_response, 200 ) + history_id = create_history_response.json()[ "id" ] + return history_id + + def _upload_payload( self, history_id, content, **kwds ): + name = kwds.get( "name", "Test Dataset" ) + dbkey = kwds.get( "dbkey", "?" ) + file_type = kwds.get( "file_type", 'txt' ) + upload_params = { + 'files_0|NAME': name, + 'files_0|url_paste': content, + 'dbkey': dbkey, + 'file_type': file_type, + } + if "to_posix_lines" in kwds: + upload_params[ "files_0|to_posix_lines"] = kwds[ "to_posix_lines" ] + if "space_to_tab" in kwds: + upload_params[ "files_0|space_to_tab" ] = kwds[ "space_to_tab" ] + return self._run_tool_payload( + tool_id='upload1', + inputs=upload_params, + history_id=history_id, + upload_type='upload_dataset' + ) + + def _run_tool_payload( self, tool_id, inputs, history_id, **kwds ): + return dict( + tool_id=tool_id, + inputs=dumps(inputs), + history_id=history_id, + **kwds + ) diff -r f86086c92a2291e11bd1d76afa67b9166a8dc0ee -r 7e9ef97e04496434313430ee71aff7033c50c390 test/functional/api/test_tools.py --- a/test/functional/api/test_tools.py +++ b/test/functional/api/test_tools.py @@ -1,13 +1,12 @@ # Test tools API. from itertools import chain -from json import dumps -import time from base import api from operator import itemgetter +from .helpers import TestsDatasets -class ToolsTestCase( api.ApiTestCase ): +class ToolsTestCase( api.ApiTestCase, TestsDatasets ): def test_index( self ): index = self._get( "tools" ) @@ -80,56 +79,3 @@ display_response = self._get( "histories/%s/contents/%s/display" % ( history_id, new_dataset[ "id" ] ) ) self._assert_status_code_is( display_response, 200 ) return display_response.content - - def _new_dataset( self, history_id, content='TestData123', **kwds ): - payload = self._upload_payload( history_id, content, **kwds ) - run_response = self._post( "tools", data=payload ) - self._assert_status_code_is( run_response, 200 ) - return run_response.json()["outputs"][0] - - def _wait_for_history( self, history_id, assert_ok=False ): - while True: - history_details_response = self._get( "histories/%s" % history_id ) - self._assert_status_code_is( history_details_response, 200 ) - history_state = history_details_response.json()[ "state" ] - if history_state not in [ "running", "queued" ]: - break - time.sleep( .1 ) - if assert_ok: - self.assertEquals( history_state, 'ok' ) - - def _new_history( self, **kwds ): - name = kwds.get( "name", "API Test History" ) - create_history_response = self._post( "histories", data=dict( name=name ) ) - self._assert_status_code_is( create_history_response, 200 ) - history_id = create_history_response.json()[ "id" ] - return history_id - - def _upload_payload( self, history_id, content, **kwds ): - name = kwds.get( "name", "Test Dataset" ) - dbkey = kwds.get( "dbkey", "?" ) - file_type = kwds.get( "file_type", 'txt' ) - upload_params = { - 'files_0|NAME': name, - 'files_0|url_paste': content, - 'dbkey': dbkey, - 'file_type': file_type, - } - if "to_posix_lines" in kwds: - upload_params[ "files_0|to_posix_lines"] = kwds[ "to_posix_lines" ] - if "space_to_tab" in kwds: - upload_params[ "files_0|space_to_tab" ] = kwds[ "space_to_tab" ] - return self._run_tool_payload( - tool_id='upload1', - inputs=upload_params, - history_id=history_id, - upload_type='upload_dataset' - ) - - def _run_tool_payload( self, tool_id, inputs, history_id, **kwds ): - return dict( - tool_id=tool_id, - inputs=dumps(inputs), - history_id=history_id, - **kwds - ) diff -r f86086c92a2291e11bd1d76afa67b9166a8dc0ee -r 7e9ef97e04496434313430ee71aff7033c50c390 test/functional/api/test_workflow_1.ga --- /dev/null +++ b/test/functional/api/test_workflow_1.ga @@ -0,0 +1,87 @@ +{ + "a_galaxy_workflow": "true", + "annotation": "", + "format-version": "0.1", + "name": "TestWorkflow1", + "steps": { + "0": { + "annotation": "", + "id": 0, + "input_connections": {}, + "inputs": [ + { + "description": "", + "name": "WorkflowInput1" + } + ], + "name": "Input dataset", + "outputs": [], + "position": { + "left": 199.55555772781372, + "top": 200.66666460037231 + }, + "tool_errors": null, + "tool_id": null, + "tool_state": "{\"name\": \"WorkflowInput1\"}", + "tool_version": null, + "type": "data_input", + "user_outputs": [] + }, + "1": { + "annotation": "", + "id": 1, + "input_connections": {}, + "inputs": [ + { + "description": "", + "name": "WorkflowInput2" + } + ], + "name": "Input dataset", + "outputs": [], + "position": { + "left": 206.22221422195435, + "top": 327.33335161209106 + }, + "tool_errors": null, + "tool_id": null, + "tool_state": "{\"name\": \"WorkflowInput2\"}", + "tool_version": null, + "type": "data_input", + "user_outputs": [] + }, + "2": { + "annotation": "", + "id": 2, + "input_connections": { + "input1": { + "id": 0, + "output_name": "output" + }, + "queries_0|input2": { + "id": 1, + "output_name": "output" + } + }, + "inputs": [], + "name": "Concatenate datasets", + "outputs": [ + { + "name": "out_file1", + "type": "input" + } + ], + "position": { + "left": 419.33335876464844, + "top": 200.44446563720703 + }, + "post_job_actions": {}, + "tool_errors": null, + "tool_id": "cat1", + "tool_state": "{\"__page__\": 0, \"__rerun_remap_job_id__\": null, \"input1\": \"null\", \"chromInfo\": \"\\\"/home/john/workspace/galaxy-central/tool-data/shared/ucsc/chrom/?.len\\\"\", \"queries\": \"[{\\\"input2\\\": null, \\\"__index__\\\": 0}]\"}", + "tool_version": "1.0.0", + "type": "tool", + "user_outputs": [] + } + } +} \ No newline at end of file diff -r f86086c92a2291e11bd1d76afa67b9166a8dc0ee -r 7e9ef97e04496434313430ee71aff7033c50c390 test/functional/api/test_workflows.py --- /dev/null +++ b/test/functional/api/test_workflows.py @@ -0,0 +1,128 @@ +from base import api +from json import loads +from json import dumps +from pkg_resources import resource_string +import time +from .helpers import TestsDatasets + +workflow_str = resource_string( __name__, "test_workflow_1.ga" ) + + +# Workflow API TODO: +# - Allow history_id as param to workflow run action. (hist_id) +# - Allow post to workflows/<workflow_id>/run in addition to posting to +# /workflows with id in payload. +# - Much more testing obviously, always more testing. +class WorkflowsApiTestCase( api.ApiTestCase, TestsDatasets ): + + def test_index( self ): + index_response = self._get( "workflows" ) + self._assert_status_code_is( index_response, 200 ) + assert isinstance( index_response.json(), list ) + + def test_import( self ): + data = dict( + workflow=dumps( self._load_workflow( name="test_import" ) ), + ) + upload_response = self._post( "workflows/upload", data=data ) + self._assert_status_code_is( upload_response, 200 ) + self._assert_user_has_workflow_with_name( "test_import (imported from API)" ) + + def test_export( self ): + uploaded_workflow_id = self._create_workflow( self._load_workflow( name="test_for_export" ) ) + download_response = self._get( "workflows/%s/download" % uploaded_workflow_id ) + self._assert_status_code_is( download_response, 200 ) + downloaded_workflow = download_response.json() + assert downloaded_workflow[ "name" ] == "test_for_export (imported from API)" + assert len( downloaded_workflow[ "steps" ] ) == 3 + first_input = downloaded_workflow[ "steps" ][ "0" ][ "inputs" ][ 0 ] + assert first_input[ "name" ] == "WorkflowInput1" + + def test_run_workflow( self ): + workflow = self._load_workflow( name="test_for_run" ) + workflow_request, history_id = self._setup_workflow_run( workflow ) + # TODO: This should really be a post to workflows/<workflow_id>/run or + # something like that. + run_workflow_response = self._post( "workflows", data=workflow_request ) + self._assert_status_code_is( run_workflow_response, 200 ) + self._wait_for_history( history_id, assert_ok=True ) + + def test_pja_import_export( self ): + workflow = self._load_workflow( name="test_for_pja_import", add_pja=True ) + uploaded_workflow_id = self._create_workflow( workflow ) + download_response = self._get( "workflows/%s/download" % uploaded_workflow_id ) + downloaded_workflow = download_response.json() + self._assert_has_keys( downloaded_workflow[ "steps" ], "0", "1", "2" ) + pjas = downloaded_workflow[ "steps" ][ "2" ][ "post_job_actions" ].values() + assert len( pjas ) == 1, len( pjas ) + pja = pjas[ 0 ] + self._assert_has_keys( pja, "action_type", "output_name", "action_arguments" ) + + def test_post_job_action( self ): + """ Tests both import and execution of post job actions. + """ + workflow = self._load_workflow( name="test_for_pja_run", add_pja=True ) + workflow_request, history_id = self._setup_workflow_run( workflow ) + run_workflow_response = self._post( "workflows", data=workflow_request ) + self._assert_status_code_is( run_workflow_response, 200 ) + self._wait_for_history( history_id, assert_ok=True ) + time.sleep(.1) # Give another little bit of time for rename (needed?) + contents = self._get( "histories/%s/contents" % history_id ).json() + # loading workflow with add_pja=True causes workflow output to be + # renamed to 'the_new_name'. + assert "the_new_name" in map( lambda hda: hda[ "name" ], contents ) + + def _setup_workflow_run( self, workflow ): + uploaded_workflow_id = self._create_workflow( workflow ) + workflow_show_resposne = self._get( "workflows/%s" % uploaded_workflow_id ) + self._assert_status_code_is( workflow_show_resposne, 200 ) + workflow_inputs = workflow_show_resposne.json()[ "inputs" ] + step_1 = step_2 = None + for key, value in workflow_inputs.iteritems(): + label = value[ "label" ] + if label == "WorkflowInput1": + step_1 = key + if label == "WorkflowInput2": + step_2 = key + history_id = self._new_history() + hda1 = self._new_dataset( history_id, content="1 2 3" ) + hda2 = self._new_dataset( history_id, content="4 5 6" ) + workflow_request = dict( + history="hist_id=%s" % history_id, + workflow_id=uploaded_workflow_id, + ds_map=dumps( { + step_1: self._ds_entry(hda1), + step_2: self._ds_entry(hda2), + } ), + ) + return workflow_request, history_id + + def _ds_entry( self, hda ): + return dict( src="hda", id=hda[ "id" ] ) + + def _create_workflow( self, workflow ): + data = dict( + workflow=dumps( workflow ), + ) + upload_response = self._post( "workflows/upload", data=data ) + self._assert_status_code_is( upload_response, 200 ) + uploaded_workflow_id = upload_response.json()[ "id" ] + return uploaded_workflow_id + + def _assert_user_has_workflow_with_name( self, name ): + index_response = self._get( "workflows" ) + self._assert_status_code_is( index_response, 200 ) + names = map( lambda w: w[ "name" ], index_response.json() ) + assert name in names, "No workflows with name %s in users workflows <%s>" % ( name, names ) + + def _load_workflow( self, name, add_pja=False ): + workflow = loads( workflow_str ) + workflow[ "name" ] = name + if add_pja: + tool_step = workflow[ "steps" ][ "2" ] + tool_step[ "post_job_actions" ][ "RenameDatasetActionout_file1" ] = dict( + action_type="RenameDatasetAction", + output_name="out_file1", + action_arguments=dict( newname="the_new_name" ), + ) + return workflow Repository URL: https://bitbucket.org/galaxy/galaxy-central/ -- This is a commit notification from bitbucket.org. You are receiving this because you have the service enabled, addressing the recipient of this email.