1 new commit in galaxy-central: https://bitbucket.org/galaxy/galaxy-central/commits/87ef26ad1876/ Changeset: 87ef26ad1876 User: jmchilton Date: 2014-12-03 03:17:25+00:00 Summary: Improvements to test/api/test_workflow_extraction.py. Fill out the dataset collection parameter test and add a new test for a workflow that includes subcollection mapping. Move toward orchestrating jobs in this file via a high-level YAML description of steps and test data as well as some higher-level methods for testing various stuff about extracted workflows (step counts, input types, tools used, connected-ness, etc...). Allow ordering jobs index API by create_time instead of update_time. Affected #: 4 files diff -r a734cbf4bd368198e2c813b5ea61e807f9a1af33 -r 87ef26ad1876d0d6162eb4b6e7c93c8d69fbc4d6 lib/galaxy/webapps/galaxy/api/jobs.py --- a/lib/galaxy/webapps/galaxy/api/jobs.py +++ b/lib/galaxy/webapps/galaxy/api/jobs.py @@ -73,9 +73,11 @@ raise exceptions.ObjectAttributeInvalidException() out = [] - for job in query.order_by( - trans.app.model.Job.update_time.desc() - ).all(): + if kwd.get( 'order_by' ) == 'create_time': + order_by = trans.app.model.Job.create_time.desc() + else: + order_by = trans.app.model.Job.update_time.desc() + for job in query.order_by( order_by ).all(): out.append( self.encode_all_ids( trans, job.to_dict( 'collection' ), True ) ) return out diff -r a734cbf4bd368198e2c813b5ea61e807f9a1af33 -r 87ef26ad1876d0d6162eb4b6e7c93c8d69fbc4d6 test/api/helpers.py --- a/test/api/helpers.py +++ b/test/api/helpers.py @@ -76,7 +76,10 @@ return run_response.json()["outputs"][0] def wait_for_history( self, history_id, assert_ok=False, timeout=DEFAULT_HISTORY_TIMEOUT ): - wait_on_state( lambda: self.galaxy_interactor.get( "histories/%s" % history_id ), assert_ok=assert_ok, timeout=timeout ) + return wait_on_state( lambda: self.galaxy_interactor.get( "histories/%s" % history_id ), assert_ok=assert_ok, timeout=timeout ) + + def wait_for_job( self, job_id, assert_ok=False, timeout=DEFAULT_HISTORY_TIMEOUT ): + return wait_on_state( lambda: self.galaxy_interactor.get( "jobs/%s" % job_id ), assert_ok=assert_ok, timeout=timeout ) def new_history( self, **kwds ): name = kwds.get( "name", "API Test History" ) @@ -296,6 +299,12 @@ ) return self.__create( payload ) + def create_list_of_pairs_in_history( self, history_id, **kwds ): + pair1 = self.create_pair_in_history( history_id, **kwds ).json()["id"] + #pair2 = self.create_pair_in_history( history_id, **kwds ).json()["id"] + #pair3 = self.create_pair_in_history( history_id, **kwds ).json()["id"] + return self.create_list_from_pairs( history_id, [ pair1 ] ) + def create_pair_in_history( self, history_id, **kwds ): payload = self.create_pair_payload( history_id, @@ -375,13 +384,13 @@ response = state_func() assert response.status_code == 200, "Failed to fetch state update while waiting." state = response.json()[ "state" ] - if state not in [ "running", "queued", "new" ]: + if state not in [ "running", "queued", "new", "ready" ]: if assert_ok: assert state == "ok", "Final state - %s - not okay." % state return state else: return None - wait_on( get_state, desc="state", timeout=timeout) + return wait_on( get_state, desc="state", timeout=timeout) def wait_on( function, desc, timeout=5 ): diff -r a734cbf4bd368198e2c813b5ea61e807f9a1af33 -r 87ef26ad1876d0d6162eb4b6e7c93c8d69fbc4d6 test/api/test_workflow_extraction.py --- a/test/api/test_workflow_extraction.py +++ b/test/api/test_workflow_extraction.py @@ -1,8 +1,12 @@ +from collections import namedtuple +import functools from json import dumps, loads import operator from .helpers import skip_without_tool +from .helpers import wait_on_state from .test_workflows import BaseWorkflowsApiTestCase +import yaml class WorkflowExtractionApiTestCase( BaseWorkflowsApiTestCase ): @@ -110,27 +114,147 @@ @skip_without_tool( "collection_paired_test" ) def test_extract_workflows_with_dataset_collections( self ): - hdca = self.dataset_collection_populator.create_pair_in_history( self.history_id ).json() - hdca_id = hdca[ "id" ] - inputs = { - "f1": dict( src="hdca", id=hdca_id ) - } - run_output = self.dataset_populator.run_tool( - tool_id="collection_paired_test", - inputs=inputs, - history_id=self.history_id, - ) - job_id = run_output[ "jobs" ][ 0 ][ "id" ] - self.dataset_populator.wait_for_history( self.history_id, assert_ok=True ) + jobs_summary = self._run_jobs(""" +steps: + - label: text_input1 + type: input_collection + - tool_id: collection_paired_test + state: + f1: + $link: text_input1 +test_data: + text_input1: + type: paired +""") + job_id = self._job_id_for_tool( jobs_summary.jobs, "collection_paired_test" ) downloaded_workflow = self._extract_and_download_workflow( - dataset_collection_ids=[ hdca[ "hid" ] ], + dataset_collection_ids=[ jobs_summary.inputs["text_input1"]["hid"] ], job_ids=[ job_id ], ) - collection_steps = self._get_steps_of_type( downloaded_workflow, "data_collection_input", expected_len=1 ) - collection_step = collection_steps[ 0 ] + self.__check_workflow( + downloaded_workflow, + step_count=2, + verify_connected=True, + data_input_count=0, + data_collection_input_count=1, + tool_ids=["collection_paired_test"] + ) + + collection_step = self._get_steps_of_type( downloaded_workflow, "data_collection_input", expected_len=1 )[ 0 ] collection_step_state = loads( collection_step[ "tool_state" ] ) self.assertEquals( collection_step_state[ "collection_type" ], u"paired" ) + def test_subcollection_mapping( self ): + jobs_summary = self._run_jobs(""" +steps: + - label: text_input1 + type: input_collection + - label: noop + tool_id: cat1 + state: + input1: + $link: text_input1 + - tool_id: cat_collection + state: + input1: + $link: noop#out_file1 +test_data: + text_input1: + type: "list:paired" + """) + job1_id = self._job_id_for_tool( jobs_summary.jobs, "cat1" ) + job2_id = self._job_id_for_tool( jobs_summary.jobs, "cat_collection" ) + downloaded_workflow = self._extract_and_download_workflow( + dataset_collection_ids=[ jobs_summary.inputs["text_input1"]["hid"] ], + job_ids=[ job1_id, job2_id ], + ) + print jobs_summary.inputs["text_input1"] + self.__check_workflow( + downloaded_workflow, + step_count=3, + verify_connected=True, + data_input_count=0, + data_collection_input_count=1, + tool_ids=["cat_collection", "cat1"], + ) + + collection_step = self._get_steps_of_type( downloaded_workflow, "data_collection_input", expected_len=1 )[ 0 ] + collection_step_state = loads( collection_step[ "tool_state" ] ) + self.assertEquals( collection_step_state[ "collection_type" ], u"list:paired" ) + + def _run_jobs( self, jobs_yaml ): + history_id = self.history_id + workflow_id = self._upload_yaml_workflow( + jobs_yaml + ) + jobs_descriptions = yaml.load( jobs_yaml ) + test_data = jobs_descriptions["test_data"] + + label_map = {} + inputs = {} + for key, value in test_data.items(): + if isinstance( value, dict ): + elements_data = value.get( "elements", [] ) + elements = [] + for element_data in elements_data: + identifier = element_data[ "identifier" ] + content = element_data["content"] + elements.append( ( identifier, content ) ) + collection_type = value["type"] + if collection_type == "list:paired": + hdca = self.dataset_collection_populator.create_list_of_pairs_in_history( history_id ).json() + elif collection_type == "list": + hdca = self.dataset_collection_populator.create_list_in_history( history_id, contents=elements ).json() + else: + hdca = self.dataset_collection_populator.create_pair_in_history( history_id, contents=elements ).json() + label_map[key] = self._ds_entry( hdca ) + inputs[key] = hdca + else: + hda = self.dataset_populator.new_dataset( history_id, content=value ) + label_map[key] = self._ds_entry( hda ) + inputs[key] = hda + workflow_request = dict( + history="hist_id=%s" % history_id, + workflow_id=workflow_id, + ) + workflow_request[ "inputs" ] = dumps( label_map ) + workflow_request[ "inputs_by" ] = 'name' + self.dataset_populator.wait_for_history( history_id, assert_ok=True ) + url = "workflows/%s/usage" % ( workflow_id ) + invocation_response = self._post( url, data=workflow_request ) + self._assert_status_code_is( invocation_response, 200 ) + invocation = invocation_response.json() + invocation_id = invocation[ "id" ] + # Wait for workflow to become fully scheduled and then for all jobs + # complete. + self.wait_for_invocation( workflow_id, invocation_id ) + self.dataset_populator.wait_for_history( history_id, assert_ok=True ) + jobs = self._history_jobs( history_id ) + return RunJobsSummary( + history_id=history_id, + workflow_id=workflow_id, + inputs=inputs, + jobs=jobs, + ) + + def wait_for_invocation( self, workflow_id, invocation_id ): + url = "workflows/%s/usage/%s" % ( workflow_id, invocation_id ) + return wait_on_state( lambda: self._get( url ) ) + + def _history_jobs( self, history_id ): + return self._get("jobs", { "history_id": history_id, "order_by": "create_time" } ).json() + + def _job_id_for_tool( self, jobs, tool_id ): + return self._job_for_tool( jobs, tool_id )[ "id" ] + + def _job_for_tool( self, jobs, tool_id ): + tool_jobs = filter( lambda j: j["tool_id"] == tool_id, jobs ) + if not tool_jobs: + assert False, "Failed to find job for tool %s" % tool_id + # if len( tool_jobs ) > 1: + # assert False, "Found multiple jobs for tool %s" % tool_id + return tool_jobs[ -1 ] + def __run_random_lines_mapped_over_pair( self, history_id ): hdca = self.dataset_collection_populator.create_pair_in_history( history_id, contents=["1 2 3\n4 5 6", "7 8 9\n10 11 10"] ).json() hdca_id = hdca[ "id" ] @@ -266,3 +390,45 @@ job_id = run_output1[ "jobs" ][ 0 ][ "id" ] self.dataset_populator.wait_for_history( history_id, assert_ok=True, timeout=20 ) return implicit_hdca, job_id + + def __check_workflow( + self, + workflow, + step_count=None, + verify_connected=False, + data_input_count=None, + data_collection_input_count=None, + tool_ids=None, + ): + steps = workflow[ 'steps' ] + + if step_count is not None: + assert len( steps ) == step_count + if verify_connected: + self.__assert_connected( workflow, steps ) + if tool_ids is not None: + tool_steps = self._get_steps_of_type( workflow, "tool" ) + found_steps = set(map(operator.itemgetter("tool_id"), tool_steps)) + expected_steps = set(tool_ids) + assert found_steps == expected_steps + if data_input_count is not None: + self._get_steps_of_type( workflow, "data_input", expected_len=data_input_count ) + if data_collection_input_count is not None: + self._get_steps_of_type( workflow, "data_collection_input", expected_len=data_collection_input_count ) + + def __assert_connected( self, workflow, steps ): + disconnected_inputs = [] + + for key, value in steps.items(): + if value[ 'type' ] == "tool": + input_connections = value[ "input_connections" ] + if not input_connections: + disconnected_inputs.append( value ) + + if disconnected_inputs: + template = "%d step(s_ disconnected in extracted workflow - disconnectect steps are %s - workflow is %s" + message = template % ( len( disconnected_inputs ), disconnected_inputs, workflow ) + raise AssertionError( message ) + + +RunJobsSummary = namedtuple('RunJobsSummary', ['history_id', 'workflow_id', 'inputs', 'jobs']) diff -r a734cbf4bd368198e2c813b5ea61e807f9a1af33 -r 87ef26ad1876d0d6162eb4b6e7c93c8d69fbc4d6 test/api/test_workflows.py --- a/test/api/test_workflows.py +++ b/test/api/test_workflows.py @@ -95,6 +95,12 @@ workflow_inputs = workflow_show_resposne.json()[ "inputs" ] return workflow_inputs + def _invocation_details( self, workflow_id, invocation_id ): + invocation_details_response = self._get( "workflows/%s/usage/%s" % ( workflow_id, invocation_id ) ) + self._assert_status_code_is( invocation_details_response, 200 ) + invocation_details = invocation_details_response.json() + return invocation_details + # Workflow API TODO: # - Allow history_id as param to workflow run action. (hist_id) @@ -530,12 +536,6 @@ self._assert_status_code_is( step_response, 200 ) self._assert_has_keys( step_response.json(), "id", "order_index" ) - def _invocation_details( self, workflow_id, invocation_id ): - invocation_details_response = self._get( "workflows/%s/usage/%s" % ( workflow_id, invocation_id ) ) - self._assert_status_code_is( invocation_details_response, 200 ) - invocation_details = invocation_details_response.json() - return invocation_details - def _invocation_step_details( self, workflow_id, invocation_id, step_id ): invocation_step_response = self._get( "workflows/%s/usage/%s/steps/%s" % ( workflow_id, invocation_id, step_id ) ) self._assert_status_code_is( invocation_step_response, 200 ) Repository URL: https://bitbucket.org/galaxy/galaxy-central/ -- This is a commit notification from bitbucket.org. You are receiving this because you have the service enabled, addressing the recipient of this email.