18 new commits in galaxy-central: https://bitbucket.org/galaxy/galaxy-central/commits/1037d12c3005/ Changeset: 1037d12c3005 User: jmchilton Date: 2014-12-02 02:18:56+00:00 Summary: PEP-8 fixes for lib/galaxy/tools/parameters/output_collect.py Affected #: 1 file diff -r 5393f6389105d22bc8791f908075c152f11a4d2e -r 1037d12c30058f318014efea19898b209dc8ba8a lib/galaxy/tools/parameters/output_collect.py --- a/lib/galaxy/tools/parameters/output_collect.py +++ b/lib/galaxy/tools/parameters/output_collect.py @@ -119,7 +119,8 @@ for root, dirs, files in os.walk( extra_files_path_joined ): extra_dir = os.path.join( primary_data.extra_files_path, root.replace( extra_files_path_joined, '', 1 ).lstrip( os.path.sep ) ) for f in files: - app.object_store.update_from_file( primary_data.dataset, + app.object_store.update_from_file( + primary_data.dataset, extra_dir=extra_dir, alt_name=f, file_name=os.path.join( root, f ), @@ -127,7 +128,7 @@ dir_only=True, preserve_symlinks=True ) - # FIXME: + # FIXME: # since these are placed into the job working dir, let the standard # Galaxy cleanup methods handle this (for now?) # there was an extra_files_path dir, attempt to remove it https://bitbucket.org/galaxy/galaxy-central/commits/fd043fbe00ab/ Changeset: fd043fbe00ab User: jmchilton Date: 2014-12-02 02:18:56+00:00 Summary: Refactor output_collect for reuse. Reduces cyclomatic complexity slightly as well. Affected #: 1 file diff -r 1037d12c30058f318014efea19898b209dc8ba8a -r fd043fbe00abd3f2ee6b6b2f8ffc3271c44cbce0 lib/galaxy/tools/parameters/output_collect.py --- a/lib/galaxy/tools/parameters/output_collect.py +++ b/lib/galaxy/tools/parameters/output_collect.py @@ -43,20 +43,8 @@ for filename in glob.glob(os.path.join(app.config.new_file_path, "primary_%i_*" % outdata.id) ): filenames[ filename ] = DEFAULT_DATASET_COLLECTOR if 'job_working_directory' in app.config.collect_outputs_from: - for extra_file_collector in dataset_collectors: - directory = job_working_directory - if extra_file_collector.directory: - directory = os.path.join( directory, extra_file_collector.directory ) - if not util.in_directory( directory, job_working_directory ): - raise Exception( "Problem with tool configuration, attempting to pull in datasets from outside working directory." ) - if not os.path.isdir( directory ): - continue - for filename in sorted( os.listdir( directory ) ): - path = os.path.join( directory, filename ) - if not os.path.isfile( path ): - continue - if extra_file_collector.match( outdata, filename ): - filenames[ path ] = extra_file_collector + for path, extra_file_collector in walk_over_extra_files( dataset_collectors, job_working_directory, outdata ): + filenames[ path ] = extra_file_collector for filename_index, ( filename, extra_file_collector ) in enumerate( filenames.iteritems() ): fields_match = extra_file_collector.match( outdata, os.path.basename( filename ) ) if not fields_match: @@ -163,6 +151,22 @@ return primary_datasets +def walk_over_extra_files( extra_file_collectors, job_working_directory, matchable ): + for extra_file_collector in extra_file_collectors: + directory = job_working_directory + if extra_file_collector.directory: + directory = os.path.join( directory, extra_file_collector.directory ) + if not util.in_directory( directory, job_working_directory ): + raise Exception( "Problem with tool configuration, attempting to pull in datasets from outside working directory." ) + if not os.path.isdir( directory ): + continue + for filename in sorted( os.listdir( directory ) ): + path = os.path.join( directory, filename ) + if not os.path.isfile( path ): + continue + if extra_file_collector.match( matchable, filename ): + yield path, extra_file_collector + # XML can describe custom patterns, but these literals describe named # patterns that will be replaced. NAMED_PATTERNS = { https://bitbucket.org/galaxy/galaxy-central/commits/2022044556ca/ Changeset: 2022044556ca User: jmchilton Date: 2014-12-02 02:18:56+00:00 Summary: Refactor small helper out of get_work_dir_outputs for reuse. Leave comment about not understanding the complexity :). Affected #: 1 file diff -r fd043fbe00abd3f2ee6b6b2f8ffc3271c44cbce0 -r 2022044556ca064d879cacbdeae07f2a8ae6f1fc lib/galaxy/jobs/runners/__init__.py --- a/lib/galaxy/jobs/runners/__init__.py +++ b/lib/galaxy/jobs/runners/__init__.py @@ -214,23 +214,31 @@ # Walk job's output associations to find and use from_work_dir attributes. job = job_wrapper.get_job() job_tool = job_wrapper.tool + for (joda, dataset) in self._walk_dataset_outputs( job ): + if joda and job_tool: + hda_tool_output = job_tool.outputs.get( joda.name, None ) + if hda_tool_output and hda_tool_output.from_work_dir: + # Copy from working dir to HDA. + # TODO: move instead of copy to save time? + source_file = os.path.join( job_working_directory, hda_tool_output.from_work_dir ) + destination = job_wrapper.get_output_destination( output_paths[ dataset.dataset_id ] ) + if in_directory( source_file, job_working_directory ): + output_pairs.append( ( source_file, destination ) ) + else: + # Security violation. + log.exception( "from_work_dir specified a location not in the working directory: %s, %s" % ( source_file, job_wrapper.working_directory ) ) + return output_pairs + + def _walk_dataset_outputs( self, job ): for dataset_assoc in job.output_datasets + job.output_library_datasets: for dataset in dataset_assoc.dataset.dataset.history_associations + dataset_assoc.dataset.dataset.library_associations: if isinstance( dataset, self.app.model.HistoryDatasetAssociation ): joda = self.sa_session.query( self.app.model.JobToOutputDatasetAssociation ).filter_by( job=job, dataset=dataset ).first() - if joda and job_tool: - hda_tool_output = job_tool.outputs.get( joda.name, None ) - if hda_tool_output and hda_tool_output.from_work_dir: - # Copy from working dir to HDA. - # TODO: move instead of copy to save time? - source_file = os.path.join( job_working_directory, hda_tool_output.from_work_dir ) - destination = job_wrapper.get_output_destination( output_paths[ dataset.dataset_id ] ) - if in_directory( source_file, job_working_directory ): - output_pairs.append( ( source_file, destination ) ) - else: - # Security violation. - log.exception( "from_work_dir specified a location not in the working directory: %s, %s" % ( source_file, job_wrapper.working_directory ) ) - return output_pairs + yield (joda, dataset) + # TODO: why is this not just something easy like: + # for dataset_assoc in job.output_datasets + job.output_library_datasets: + # yield (dataset_assoc, dataset_assoc.dataset) + # I don't understand the reworking it backwards. -John def _handle_metadata_externally( self, job_wrapper, resolve_requirements=False ): """ https://bitbucket.org/galaxy/galaxy-central/commits/691584d72c7c/ Changeset: 691584d72c7c User: jmchilton Date: 2014-12-02 02:18:56+00:00 Summary: Rename variable for clarity post output collections. After tools can explicitly create collections the term created_collections becomes ambigious - these are history collection instances (HDCA) implicitly created by mapping over collections so renaming this field implicit_collections. Affected #: 3 files diff -r 2022044556ca064d879cacbdeae07f2a8ae6f1fc -r 691584d72c7c74e92d5756358a723b8f2064ef91 lib/galaxy/tools/__init__.py --- a/lib/galaxy/tools/__init__.py +++ b/lib/galaxy/tools/__init__.py @@ -2216,7 +2216,7 @@ num_jobs=len( execution_tracker.successful_jobs ), job_errors=execution_tracker.execution_errors, jobs=execution_tracker.successful_jobs, - implicit_collections=execution_tracker.created_collections, + implicit_collections=execution_tracker.implicit_collections, ) else: template = 'message.mako' diff -r 2022044556ca064d879cacbdeae07f2a8ae6f1fc -r 691584d72c7c74e92d5756358a723b8f2064ef91 lib/galaxy/tools/execute.py --- a/lib/galaxy/tools/execute.py +++ b/lib/galaxy/tools/execute.py @@ -47,7 +47,7 @@ self.execution_errors = [] self.output_datasets = [] self.outputs_by_output_name = collections.defaultdict(list) - self.created_collections = {} + self.implicit_collections = {} def record_success( self, job, outputs ): self.successful_jobs.append( job ) @@ -122,6 +122,6 @@ ) collections[ output_name ] = collection - self.created_collections = collections + self.implicit_collections = collections __all__ = [ execute ] diff -r 2022044556ca064d879cacbdeae07f2a8ae6f1fc -r 691584d72c7c74e92d5756358a723b8f2064ef91 lib/galaxy/workflow/modules.py --- a/lib/galaxy/workflow/modules.py +++ b/lib/galaxy/workflow/modules.py @@ -784,7 +784,7 @@ workflow_invocation_uuid=invocation.uuid.hex ) if collection_info: - step_outputs = dict( execution_tracker.created_collections ) + step_outputs = dict( execution_tracker.implicit_collections ) else: step_outputs = dict( execution_tracker.output_datasets ) progress.set_step_outputs( step, step_outputs ) https://bitbucket.org/galaxy/galaxy-central/commits/1e2fbca80715/ Changeset: 1e2fbca80715 User: jmchilton Date: 2014-12-02 02:18:56+00:00 Summary: Rename variable for clarity. This method does not just return datasets. Affected #: 1 file diff -r 691584d72c7c74e92d5756358a723b8f2064ef91 -r 1e2fbca807151860c54f84b14ba6285b39aaf6a7 test/base/interactor.py --- a/test/base/interactor.py +++ b/test/base/interactor.py @@ -207,12 +207,12 @@ if isinstance(value, list) and len(value) == 1: inputs_tree[key] = value[0] - datasets = self.__submit_tool( history_id, tool_id=testdef.tool.id, tool_input=inputs_tree ) - datasets_object = datasets.json() + submit_response = self.__submit_tool( history_id, tool_id=testdef.tool.id, tool_input=inputs_tree ) + submit_response_object = submit_response.json() try: - return self.__dictify_outputs( datasets_object ), datasets_object[ 'jobs' ] + return self.__dictify_outputs( submit_response_object ), submit_response_object[ 'jobs' ] except KeyError: - raise Exception( datasets_object[ 'message' ] ) + raise Exception( submit_response_object[ 'message' ] ) def _create_collection( self, history_id, collection_def ): create_payload = dict( https://bitbucket.org/galaxy/galaxy-central/commits/cd9eb06bb5f7/ Changeset: cd9eb06bb5f7 User: jmchilton Date: 2014-12-02 02:18:56+00:00 Summary: Add file missed with pull request 505. Affected #: 1 file diff -r 1e2fbca807151860c54f84b14ba6285b39aaf6a7 -r cd9eb06bb5f7da306031ef62f6c538a5816ef08e test/api/test_workflow_map_reduce_pause.ga --- /dev/null +++ b/test/api/test_workflow_map_reduce_pause.ga @@ -0,0 +1,198 @@ +{ + "a_galaxy_workflow": "true", + "annotation": "", + "format-version": "0.1", + "name": "map_reduce_pause", + "steps": { + "0": { + "annotation": "", + "id": 0, + "input_connections": {}, + "inputs": [ + { + "description": "", + "name": "Input Dataset" + } + ], + "name": "Input dataset", + "outputs": [], + "position": { + "left": 172.83680772781372, + "top": 200.96180772781372 + }, + "tool_errors": null, + "tool_id": null, + "tool_state": "{\"name\": \"Input Dataset\"}", + "tool_version": null, + "type": "data_input", + "user_outputs": [] + }, + "1": { + "annotation": "", + "id": 1, + "input_connections": {}, + "inputs": [ + { + "description": "", + "name": "Input Dataset Collection" + } + ], + "name": "Input dataset collection", + "outputs": [], + "position": { + "left": 161.46528673171997, + "top": 422.5764012336731 + }, + "tool_errors": null, + "tool_id": null, + "tool_state": "{\"collection_type\": \"list\", \"name\": \"Input Dataset Collection\"}", + "tool_version": null, + "type": "data_collection_input", + "user_outputs": [] + }, + "2": { + "annotation": "", + "id": 2, + "input_connections": { + "input": { + "id": 0, + "output_name": "output" + } + }, + "inputs": [], + "name": "Select first", + "outputs": [ + { + "name": "out_file1", + "type": "input" + } + ], + "position": { + "left": 410.9444432258606, + "top": 195.05903673171997 + }, + "post_job_actions": {}, + "tool_errors": null, + "tool_id": "head", + "tool_state": "{\"__job_resource\": \"{\\\"__job_resource__select\\\": \\\"no\\\", \\\"__current_case__\\\": 0}\", \"input\": \"null\", \"__page__\": 0, \"__rerun_remap_job_id__\": null, \"lineNum\": \"\\\"1\\\"\"}", + "tool_version": "1.0.0", + "type": "tool", + "user_outputs": [] + }, + "3": { + "annotation": "", + "id": 3, + "input_connections": { + "input": { + "id": 1, + "output_name": "output" + } + }, + "inputs": [], + "name": "Select first", + "outputs": [ + { + "name": "out_file1", + "type": "input" + } + ], + "position": { + "left": 632.9756932258606, + "top": 360.57988023757935 + }, + "post_job_actions": {}, + "tool_errors": null, + "tool_id": "head", + "tool_state": "{\"__job_resource\": \"{\\\"__job_resource__select\\\": \\\"no\\\", \\\"__current_case__\\\": 0}\", \"input\": \"null\", \"__page__\": 0, \"__rerun_remap_job_id__\": null, \"lineNum\": \"\\\"1\\\"\"}", + "tool_version": "1.0.0", + "type": "tool", + "user_outputs": [] + }, + "4": { + "annotation": "", + "id": 4, + "input_connections": { + "input": { + "id": 2, + "output_name": "out_file1" + } + }, + "inputs": [], + "name": "Pause for dataset review", + "outputs": [], + "position": { + "left": 657.0903172492981, + "top": 197.71528673171997 + }, + "tool_errors": null, + "tool_id": null, + "tool_state": "{\"name\": \"Pause for Dataset Review\"}", + "tool_version": null, + "type": "pause", + "user_outputs": [] + }, + "5": { + "annotation": "", + "id": 5, + "input_connections": { + "input1": { + "id": 4, + "output_name": "output" + }, + "queries_0|input2": { + "id": 3, + "output_name": "out_file1" + } + }, + "inputs": [], + "name": "Concatenate datasets (for test workflows)", + "outputs": [ + { + "name": "out_file1", + "type": "input" + } + ], + "position": { + "left": 977.8889012336731, + "top": 228.01042222976685 + }, + "post_job_actions": {}, + "tool_errors": null, + "tool_id": "cat", + "tool_state": "{\"__job_resource\": \"{\\\"__job_resource__select\\\": \\\"no\\\", \\\"__current_case__\\\": 0}\", \"__page__\": 0, \"__rerun_remap_job_id__\": null, \"input1\": \"null\", \"queries\": \"[{\\\"input2\\\": null, \\\"__index__\\\": 0}]\"}", + "tool_version": "1.0.0", + "type": "tool", + "user_outputs": [] + }, + "6": { + "annotation": "", + "id": 6, + "input_connections": { + "input1": { + "id": 5, + "output_name": "out_file1" + } + }, + "inputs": [], + "name": "Concatenate multiple datasets (for test workflows)", + "outputs": [ + { + "name": "out_file1", + "type": "input" + } + ], + "position": { + "left": 1342.545150756836, + "top": 233.55210876464844 + }, + "post_job_actions": {}, + "tool_errors": null, + "tool_id": "cat_list", + "tool_state": "{\"__job_resource\": \"{\\\"__job_resource__select\\\": \\\"no\\\", \\\"__current_case__\\\": 0}\", \"__page__\": 0, \"__rerun_remap_job_id__\": null, \"input1\": \"null\"}", + "tool_version": "1.0.0", + "type": "tool", + "user_outputs": [] + } + }, + "uuid": "a27044e7-2efe-4661-9a09-00708d3520f1" +} \ No newline at end of file https://bitbucket.org/galaxy/galaxy-central/commits/0a809bab2180/ Changeset: 0a809bab2180 User: jmchilton Date: 2014-12-02 02:18:56+00:00 Summary: Refactor dataset collection creation into reusable pieces. In downstream output collection work this allows creating elements after initial construction - which is useful for output collections composed of dynamically discovered datasets after a job has completed. Affected #: 5 files diff -r cd9eb06bb5f7da306031ef62f6c538a5816ef08e -r 0a809bab2180137fd7c6cf25a53bfa93fe1b0dcd lib/galaxy/dataset_collections/builder.py --- /dev/null +++ b/lib/galaxy/dataset_collections/builder.py @@ -0,0 +1,26 @@ +from galaxy import model + + +def build_collection( type, dataset_instances ): + """ + Build DatasetCollection with populated DatasetcollectionElement objects + corresponding to the supplied dataset instances or throw exception if + this is not a valid collection of the specified type. + """ + dataset_collection = model.DatasetCollection( ) + set_collection_elements( dataset_collection, type, dataset_instances ) + return dataset_collection + + +def set_collection_elements( dataset_collection, type, dataset_instances ): + element_index = 0 + elements = [] + for element in type.generate_elements( dataset_instances ): + element.element_index = element_index + element.collection = dataset_collection + elements.append( element ) + + element_index += 1 + + dataset_collection.elements = elements + return dataset_collection diff -r cd9eb06bb5f7da306031ef62f6c538a5816ef08e -r 0a809bab2180137fd7c6cf25a53bfa93fe1b0dcd lib/galaxy/dataset_collections/types/__init__.py --- a/lib/galaxy/dataset_collections/types/__init__.py +++ b/lib/galaxy/dataset_collections/types/__init__.py @@ -12,10 +12,9 @@ __metaclass__ = ABCMeta @abstractmethod - def build_collection( self, dataset_instances ): - """ - Build DatasetCollection with populated DatasetcollectionElement objects - corresponding to the supplied dataset instances or throw exception if + def generate_elements( self, dataset_instances ): + """ Generate DatasetCollectionElements with corresponding + to the supplied dataset instances or throw exception if this is not a valid collection of the specified type. """ @@ -24,11 +23,3 @@ def _validation_failed( self, message ): raise exceptions.ObjectAttributeInvalidException( message ) - - def _new_collection_for_elements( self, elements ): - dataset_collection = model.DatasetCollection( ) - for index, element in enumerate( elements ): - element.element_index = index - element.collection = dataset_collection - dataset_collection.elements = elements - return dataset_collection diff -r cd9eb06bb5f7da306031ef62f6c538a5816ef08e -r 0a809bab2180137fd7c6cf25a53bfa93fe1b0dcd lib/galaxy/dataset_collections/types/list.py --- a/lib/galaxy/dataset_collections/types/list.py +++ b/lib/galaxy/dataset_collections/types/list.py @@ -11,13 +11,10 @@ def __init__( self ): pass - def build_collection( self, elements ): - associations = [] + def generate_elements( self, elements ): for identifier, element in elements.iteritems(): association = DatasetCollectionElement( element=element, element_identifier=identifier, ) - associations.append( association ) - - return self._new_collection_for_elements( associations ) + yield association diff -r cd9eb06bb5f7da306031ef62f6c538a5816ef08e -r 0a809bab2180137fd7c6cf25a53bfa93fe1b0dcd lib/galaxy/dataset_collections/types/paired.py --- a/lib/galaxy/dataset_collections/types/paired.py +++ b/lib/galaxy/dataset_collections/types/paired.py @@ -17,7 +17,7 @@ def __init__( self ): pass - def build_collection( self, elements ): + def generate_elements( self, elements ): forward_dataset = elements.get( FORWARD_IDENTIFIER, None ) reverse_dataset = elements.get( REVERSE_IDENTIFIER, None ) if not forward_dataset or not reverse_dataset: @@ -30,4 +30,5 @@ element=reverse_dataset, element_identifier=REVERSE_IDENTIFIER, ) - return self._new_collection_for_elements([left_association, right_association]) + yield left_association + yield right_association diff -r cd9eb06bb5f7da306031ef62f6c538a5816ef08e -r 0a809bab2180137fd7c6cf25a53bfa93fe1b0dcd lib/galaxy/managers/collections.py --- a/lib/galaxy/managers/collections.py +++ b/lib/galaxy/managers/collections.py @@ -1,6 +1,7 @@ from galaxy.dataset_collections.registry import DatasetCollectionTypesRegistry from galaxy.dataset_collections.matching import MatchingCollections from galaxy.dataset_collections.type_description import CollectionTypeDescriptionFactory +from galaxy.dataset_collections import builder from galaxy import model from galaxy.exceptions import MessageException @@ -113,7 +114,7 @@ # else if elements is set, it better be an ordered dict! type_plugin = collection_type_description.rank_type_plugin() - dataset_collection = type_plugin.build_collection( elements ) + dataset_collection = builder.build_collection( type_plugin, elements ) dataset_collection.collection_type = collection_type return dataset_collection https://bitbucket.org/galaxy/galaxy-central/commits/9fba811dd8f9/ Changeset: 9fba811dd8f9 User: jmchilton Date: 2014-12-02 02:18:56+00:00 Summary: Rearrange verify method to make duplication more obvious. Affected #: 1 file diff -r 0a809bab2180137fd7c6cf25a53bfa93fe1b0dcd -r 9fba811dd8f919d6ae24795f17e1f269556cee9e test/base/interactor.py --- a/test/base/interactor.py +++ b/test/base/interactor.py @@ -64,6 +64,7 @@ fetcher = self.__dataset_fetcher( history_id ) ## TODO: Twill version verifys dataset is 'ok' in here. self.twill_test_case.verify_hid( outfile, hda_id=hid, attributes=attributes, dataset_fetcher=fetcher, shed_tool_id=shed_tool_id ) + self._verify_metadata( history_id, hid, attributes ) primary_datasets = attributes.get( 'primary_datasets', {} ) if primary_datasets: @@ -86,8 +87,6 @@ self.twill_test_case.verify_hid( primary_outfile, hda_id=primary_hda_id, attributes=primary_attributes, dataset_fetcher=fetcher, shed_tool_id=shed_tool_id ) self._verify_metadata( history_id, primary_hda_id, primary_attributes ) - self._verify_metadata( history_id, hid, attributes ) - def _verify_metadata( self, history_id, hid, attributes ): metadata = attributes.get( 'metadata', {} ).copy() for key, value in metadata.copy().iteritems(): https://bitbucket.org/galaxy/galaxy-central/commits/4ac25385dfc6/ Changeset: 4ac25385dfc6 User: jmchilton Date: 2014-12-02 02:18:56+00:00 Summary: Break up a couple test methods for reuse downstream. Remove duplication related to verifying a single dataset in tests - allows reuse in downstream when verifying the content of explicitly defined tool output collections. Refactor out waiting for jobs - seems wrong place to put that logic this should allow easier moving. Affected #: 1 file diff -r 9fba811dd8f919d6ae24795f17e1f269556cee9e -r 4ac25385dfc69a72744e8593200696c499d60eaa test/base/interactor.py --- a/test/base/interactor.py +++ b/test/base/interactor.py @@ -58,13 +58,10 @@ outfile = output_testdef.outfile attributes = output_testdef.attributes name = output_testdef.name - for job in jobs: - self.wait_for_job( job[ 'id' ], history_id, maxseconds ) + self.wait_for_jobs( history_id, jobs, maxseconds ) hid = self.__output_id( output_data ) - fetcher = self.__dataset_fetcher( history_id ) ## TODO: Twill version verifys dataset is 'ok' in here. - self.twill_test_case.verify_hid( outfile, hda_id=hid, attributes=attributes, dataset_fetcher=fetcher, shed_tool_id=shed_tool_id ) - self._verify_metadata( history_id, hid, attributes ) + self.verify_output_dataset( history_id=history_id, hda_id=hid, outfile=outfile, attributes=attributes, shed_tool_id=shed_tool_id ) primary_datasets = attributes.get( 'primary_datasets', {} ) if primary_datasets: @@ -84,8 +81,16 @@ raise Exception( msg_template % msg_args ) primary_hda_id = primary_output[ "dataset" ][ "id" ] - self.twill_test_case.verify_hid( primary_outfile, hda_id=primary_hda_id, attributes=primary_attributes, dataset_fetcher=fetcher, shed_tool_id=shed_tool_id ) - self._verify_metadata( history_id, primary_hda_id, primary_attributes ) + self.verify_output_dataset( history_id, primary_hda_id, primary_outfile, primary_attributes, shed_tool_id=shed_tool_id ) + + def wait_for_jobs( self, history_id, jobs, maxseconds ): + for job in jobs: + self.wait_for_job( job[ 'id' ], history_id, maxseconds ) + + def verify_output_dataset( self, history_id, hda_id, outfile, attributes, shed_tool_id ): + fetcher = self.__dataset_fetcher( history_id ) + self.twill_test_case.verify_hid( outfile, hda_id=hda_id, attributes=attributes, dataset_fetcher=fetcher, shed_tool_id=shed_tool_id ) + self._verify_metadata( history_id, hda_id, attributes ) def _verify_metadata( self, history_id, hid, attributes ): metadata = attributes.get( 'metadata', {} ).copy() https://bitbucket.org/galaxy/galaxy-central/commits/e5be58d2ae87/ Changeset: e5be58d2ae87 User: jmchilton Date: 2014-12-02 02:18:56+00:00 Summary: Script to build workflows from simpler YAML description. Affected #: 3 files diff -r 4ac25385dfc69a72744e8593200696c499d60eaa -r e5be58d2ae874cdeb697e1a367ea18652ca93fdc test/api/test_workflows.py --- a/test/api/test_workflows.py +++ b/test/api/test_workflows.py @@ -8,24 +8,54 @@ from .helpers import DatasetCollectionPopulator from .helpers import skip_without_tool +from .yaml_to_workflow import yaml_to_workflow + from requests import delete from requests import put from galaxy.exceptions import error_codes +class BaseWorkflowsApiTestCase( api.ApiTestCase ): + # TODO: Find a new file for this class. + + def setUp( self ): + super( BaseWorkflowsApiTestCase, self ).setUp() + self.workflow_populator = WorkflowPopulator( self.galaxy_interactor ) + self.dataset_populator = DatasetPopulator( self.galaxy_interactor ) + self.dataset_collection_populator = DatasetCollectionPopulator( self.galaxy_interactor ) + + def _assert_user_has_workflow_with_name( self, name ): + names = self._workflow_names() + assert name in names, "No workflows with name %s in users workflows <%s>" % ( name, names ) + + def _workflow_names( self ): + index_response = self._get( "workflows" ) + self._assert_status_code_is( index_response, 200 ) + names = map( lambda w: w[ "name" ], index_response.json() ) + return names + + def _upload_yaml_workflow(self, has_yaml): + workflow = yaml_to_workflow(has_yaml) + workflow_str = dumps(workflow, indent=4) + data = { + 'workflow': workflow_str + } + upload_response = self._post( "workflows", data=data ) + self._assert_status_code_is( upload_response, 200 ) + self._assert_user_has_workflow_with_name( "%s (imported from API)" % ( workflow[ "name" ] ) ) + return upload_response.json()[ "id" ] + + # Workflow API TODO: # - Allow history_id as param to workflow run action. (hist_id) # - Allow post to workflows/<workflow_id>/run in addition to posting to # /workflows with id in payload. # - Much more testing obviously, always more testing. -class WorkflowsApiTestCase( api.ApiTestCase ): +class WorkflowsApiTestCase( BaseWorkflowsApiTestCase ): def setUp( self ): super( WorkflowsApiTestCase, self ).setUp() - self.workflow_populator = WorkflowPopulator( self.galaxy_interactor ) - self.dataset_populator = DatasetPopulator( self.galaxy_interactor ) - self.dataset_collection_populator = DatasetCollectionPopulator( self.galaxy_interactor ) def test_show_invalid_key_is_400( self ): show_response = self._get( "workflows/%s" % self._random_key() ) @@ -45,7 +75,7 @@ delete_response = delete( workflow_url ) self._assert_status_code_is( delete_response, 200 ) # Make sure workflow is no longer in index by default. - assert workflow_name not in self.__workflow_names() + assert workflow_name not in self._workflow_names() def test_other_cannot_delete( self ): workflow_id = self.workflow_populator.simple_workflow( "test_other_delete" ) @@ -825,10 +855,6 @@ src = 'hdca' return dict( src=src, id=hda[ "id" ] ) - def _assert_user_has_workflow_with_name( self, name ): - names = self.__workflow_names() - assert name in names, "No workflows with name %s in users workflows <%s>" % ( name, names ) - def __review_paused_steps( self, uploaded_workflow_id, invocation_id, order_index, action=True ): invocation = self._invocation_details( uploaded_workflow_id, invocation_id ) invocation_steps = invocation[ "steps" ] @@ -863,12 +889,6 @@ else: return invocation_response - def __workflow_names( self ): - index_response = self._get( "workflows" ) - self._assert_status_code_is( index_response, 200 ) - names = map( lambda w: w[ "name" ], index_response.json() ) - return names - def __import_workflow( self, workflow_id, deprecated_route=False ): if deprecated_route: route = "workflows/import" diff -r 4ac25385dfc69a72744e8593200696c499d60eaa -r e5be58d2ae874cdeb697e1a367ea18652ca93fdc test/api/test_workflows_from_yaml.py --- /dev/null +++ b/test/api/test_workflows_from_yaml.py @@ -0,0 +1,32 @@ +import json + +from .test_workflows import BaseWorkflowsApiTestCase + + +class WorkflowsFromYamlApiTestCase( BaseWorkflowsApiTestCase ): + + def setUp( self ): + super( WorkflowsFromYamlApiTestCase, self ).setUp() + + def test_simple_upload(self): + workflow_id = self._upload_yaml_workflow(""" +- type: input +- tool_id: cat1 + state: + input1: + $link: 0 +- tool_id: cat1 + state: + input1: + $link: 1#out_file1 +- tool_id: random_lines1 + state: + num_lines: 10 + input: + $link: 2#out_file1 + seed_source: + seed_source_selector: set_seed + seed: asdf + __current_case__: 1 +""") + self._get("workflows/%s/download" % workflow_id).content diff -r 4ac25385dfc69a72744e8593200696c499d60eaa -r e5be58d2ae874cdeb697e1a367ea18652ca93fdc test/api/yaml_to_workflow.py --- /dev/null +++ b/test/api/yaml_to_workflow.py @@ -0,0 +1,220 @@ +import sys + +import yaml +import json + +try: + from collections import OrderedDict +except ImportError: + from galaxy.util.odict import odict as OrderedDict + + +STEP_TYPE_ALIASES = { + 'input': 'data_input', + 'input_collection': 'data_collection_input', +} + + +def yaml_to_workflow(has_yaml): + as_python = yaml.load(has_yaml) + + if isinstance(as_python, list): + as_python = {"steps": as_python} + + __ensure_defaults(as_python, { + "a_galaxy_workflow": "true", + "format-version": "0.1", + "annotation": "", + "name": "Workflow" + }) + + steps = as_python["steps"] + + conversion_context = ConversionContext() + if isinstance(steps, list): + steps_as_dict = OrderedDict() + for i, step in enumerate(steps): + steps_as_dict[ str(i) ] = step + if "id" not in step: + step["id"] = i + + if "label" in step: + label = step["label"] + conversion_context.labels[label] = i + + if "position" not in step: + # TODO: this really should be optional in Galaxy API. + step["position"] = { + "left": 10 * i, + "top": 10 * i + } + + as_python["steps"] = steps_as_dict + steps = steps_as_dict + + for i, step in steps.iteritems(): + step_type = step.get("type", "tool") + step_type = STEP_TYPE_ALIASES.get(step_type, step_type) + if step_type not in [ "data_input", "data_collection_input", "tool", "pause"]: + raise Exception("Unknown step type encountered %s" % step_type) + step["type"] = step_type + eval("transform_%s" % step_type)(conversion_context, step) + + return as_python + + +def transform_data_input(context, step): + transform_input(context, step, default_name="Input dataset") + + +def transform_data_collection_input(context, step): + transform_input(context, step, default_name="Input dataset collection") + + +def transform_input(context, step, default_name): + default_name = step.get("label", default_name) + __ensure_defaults( step, { + "annotation": "", + }) + + __ensure_inputs_connections(step) + + if not "inputs" in step: + step["inputs"] = [{}] + + step_inputs = step["inputs"][0] + if "name" in step_inputs: + name = step_inputs["name"] + else: + name = default_name + + __ensure_defaults( step_inputs, { + "name": name, + "description": "", + }) + tool_state = { + "name": name + } + if "collection_type" in step: + tool_state["collection_type"] = step["collection_type"] + + __populate_tool_state(step, tool_state) + + +def transform_tool(context, step): + if "tool_id" not in step: + raise Exception("Tool steps must define a tool_id.") + + __ensure_defaults( step, { + "annotation": "", + } ) + __ensure_inputs_connections(step) + + tool_state = { + # TODO: Galaxy should not require tool state actually specify a __page__. + "__page__": 0, + } + + if "connect" not in step: + step["connect"] = {} + + connect = step["connect"] + del step["connect"] + + def append_link(key, value): + if key not in connect: + connect[key] = [] + connect[key].append(value["$link"]) + + def replace_links(value, key=""): + if __is_link(value): + append_link(key, value) + return None + if isinstance(value, dict): + new_values = {} + for k, v in value.iteritems(): + new_key = __join_prefix(key, k) + new_values[k] = replace_links(v, new_key) + return new_values + elif isinstance(value, list): + new_values = [] + for i, v in enumerate(value): + # If we are a repeat we need to modify the key + # but not if values are actually $links. + if __is_link(v): + append_link(key, v) + new_values.append(None) + else: + new_key = "%s_%d" % ( key, i ) + new_values.append(replace_links(v, new_key)) + return new_values + else: + return value + + if "state" in step: + step_state = step["state"] + step_state = replace_links(step_state) + + for key, value in step_state.iteritems(): + tool_state[key] = json.dumps(value) + del step["state"] + + for key, values in connect.iteritems(): + input_connection_value = [] + if not isinstance(values, list): + values = [ values ] + for value in values: + if not isinstance(value, dict): + value_parts = str(value).split("#") + if len(value_parts) == 1: + value_parts.append("output") + id = value_parts[0] + if id in context.labels: + id = context.labels[id] + value = {"id": int(id), "output_name": value_parts[1]} + input_connection_value.append(value) + # TODO: this should be a list + step["input_connections"][key] = input_connection_value[0] + + __populate_tool_state(step, tool_state) + + +class ConversionContext(object): + + def __init__(self): + self.labels = {} + + +def __is_link(value): + return isinstance(value, dict) and "$link" in value + + +def __join_prefix(prefix, key): + if prefix: + new_key = "%s|%s" % (prefix, key) + else: + new_key = key + return new_key + + +def __ensure_inputs_connections(step): + if "input_connections" not in step: + step["input_connections"] = {} + + +def __ensure_defaults(in_dict, defaults): + for key, value in defaults.items(): + if key not in in_dict: + in_dict[ key ] = value + + +def __populate_tool_state(step, tool_state): + step["tool_state"] = json.dumps(tool_state) + + +def main(argv): + print json.dumps(yaml_to_workflow(argv[0])) + + +if __name__ == "__main__": + main(sys.argv) https://bitbucket.org/galaxy/galaxy-central/commits/c214c2f07f3b/ Changeset: c214c2f07f3b User: jmchilton Date: 2014-12-02 02:18:56+00:00 Summary: Split all workflow extraction testing out of test/api/test_workflows.py. test_workflows.py was pretty unwieldy and the workflow extraction tests were very different than the other tests in that file (e.g. very different helper functions). Affected #: 2 files diff -r e5be58d2ae874cdeb697e1a367ea18652ca93fdc -r c214c2f07f3bf7d86db8ee14188765c9aabdb22e test/api/test_workflow_extraction.py --- /dev/null +++ b/test/api/test_workflow_extraction.py @@ -0,0 +1,269 @@ +from json import dumps, loads +import operator + +from .helpers import skip_without_tool +from .test_workflows import BaseWorkflowsApiTestCase + + +class WorkflowExtractionApiTestCase( BaseWorkflowsApiTestCase ): + + def setUp( self ): + super( WorkflowExtractionApiTestCase, self ).setUp() + + @skip_without_tool( "cat1" ) + def test_extract_from_history( self ): + history_id = self.dataset_populator.new_history() + # Run the simple test workflow and extract it back out from history + cat1_job_id = self.__setup_and_run_cat1_workflow( history_id=history_id ) + contents_response = self._get( "histories/%s/contents" % history_id ) + input_hids = map( lambda c: c[ "hid" ], contents_response.json()[ 0:2 ] ) + downloaded_workflow = self._extract_and_download_workflow( + from_history_id=history_id, + dataset_ids=dumps( input_hids ), + job_ids=dumps( [ cat1_job_id ] ), + workflow_name="test import from history", + ) + self.assertEquals( downloaded_workflow[ "name" ], "test import from history" ) + self.__assert_looks_like_cat1_example_workflow( downloaded_workflow ) + + def test_extract_with_copied_inputs( self ): + old_history_id = self.dataset_populator.new_history() + # Run the simple test workflow and extract it back out from history + self.__setup_and_run_cat1_workflow( history_id=old_history_id ) + + history_id = self.dataset_populator.new_history() + + # Bug cannot mess up hids or these don't extract correctly. See Trello card here: + # https://trello.com/c/mKzLbM2P + # # create dummy dataset to complicate hid mapping + # self.dataset_populator.new_dataset( history_id, content="dummydataset" ) + # offset = 1 + + offset = 0 + old_contents = self._get( "histories/%s/contents" % old_history_id ).json() + for old_dataset in old_contents: + self.__copy_content_to_history( history_id, old_dataset ) + new_contents = self._get( "histories/%s/contents" % history_id ).json() + input_hids = map( lambda c: c[ "hid" ], new_contents[ (offset + 0):(offset + 2) ] ) + cat1_job_id = self.__job_id( history_id, new_contents[ (offset + 2) ][ "id" ] ) + downloaded_workflow = self._extract_and_download_workflow( + from_history_id=history_id, + dataset_ids=dumps( input_hids ), + job_ids=dumps( [ cat1_job_id ] ), + workflow_name="test import from history", + ) + self.__assert_looks_like_cat1_example_workflow( downloaded_workflow ) + + @skip_without_tool( "random_lines1" ) + def test_extract_mapping_workflow_from_history( self ): + history_id = self.dataset_populator.new_history() + hdca, job_id1, job_id2 = self.__run_random_lines_mapped_over_pair( history_id ) + downloaded_workflow = self._extract_and_download_workflow( + from_history_id=history_id, + dataset_collection_ids=dumps( [ hdca[ "hid" ] ] ), + job_ids=dumps( [ job_id1, job_id2 ] ), + workflow_name="test import from mapping history", + ) + self.__assert_looks_like_randomlines_mapping_workflow( downloaded_workflow ) + + def test_extract_copied_mapping_from_history( self ): + old_history_id = self.dataset_populator.new_history() + hdca, job_id1, job_id2 = self.__run_random_lines_mapped_over_pair( old_history_id ) + + history_id = self.dataset_populator.new_history() + old_contents = self._get( "histories/%s/contents" % old_history_id ).json() + for old_content in old_contents: + self.__copy_content_to_history( history_id, old_content ) + # API test is somewhat contrived since there is no good way + # to retrieve job_id1, job_id2 like this for copied dataset + # collections I don't think. + downloaded_workflow = self._extract_and_download_workflow( + from_history_id=history_id, + dataset_collection_ids=dumps( [ hdca[ "hid" ] ] ), + job_ids=dumps( [ job_id1, job_id2 ] ), + workflow_name="test import from history", + ) + self.__assert_looks_like_randomlines_mapping_workflow( downloaded_workflow ) + + @skip_without_tool( "random_lines1" ) + @skip_without_tool( "multi_data_param" ) + def test_extract_reduction_from_history( self ): + history_id = self.dataset_populator.new_history() + hdca = self.dataset_collection_populator.create_pair_in_history( history_id, contents=["1 2 3\n4 5 6", "7 8 9\n10 11 10"] ).json() + hdca_id = hdca[ "id" ] + inputs1 = { + "input": { "batch": True, "values": [ { "src": "hdca", "id": hdca_id } ] }, + "num_lines": 2 + } + implicit_hdca1, job_id1 = self._run_tool_get_collection_and_job_id( history_id, "random_lines1", inputs1 ) + inputs2 = { + "f1": { "src": "hdca", "id": implicit_hdca1[ "id" ] }, + "f2": { "src": "hdca", "id": implicit_hdca1[ "id" ] }, + } + reduction_run_output = self.dataset_populator.run_tool( + tool_id="multi_data_param", + inputs=inputs2, + history_id=history_id, + ) + job_id2 = reduction_run_output[ "jobs" ][ 0 ][ "id" ] + self.dataset_populator.wait_for_history( history_id, assert_ok=True, timeout=20 ) + downloaded_workflow = self._extract_and_download_workflow( + from_history_id=history_id, + dataset_collection_ids=dumps( [ hdca[ "hid" ] ] ), + job_ids=dumps( [ job_id1, job_id2 ] ), + workflow_name="test import reduction", + ) + assert len( downloaded_workflow[ "steps" ] ) == 3 + collect_step_idx = self._assert_first_step_is_paired_input( downloaded_workflow ) + tool_steps = self._get_steps_of_type( downloaded_workflow, "tool", expected_len=2 ) + random_lines_map_step = tool_steps[ 0 ] + reduction_step = tool_steps[ 1 ] + random_lines_input = random_lines_map_step[ "input_connections" ][ "input" ] + assert random_lines_input[ "id" ] == collect_step_idx + reduction_step_input = reduction_step[ "input_connections" ][ "f1" ] + assert reduction_step_input[ "id"] == random_lines_map_step[ "id" ] + + @skip_without_tool( "collection_paired_test" ) + def test_extract_workflows_with_dataset_collections( self ): + history_id = self.dataset_populator.new_history() + hdca = self.dataset_collection_populator.create_pair_in_history( history_id ).json() + hdca_id = hdca[ "id" ] + inputs = { + "f1": dict( src="hdca", id=hdca_id ) + } + run_output = self.dataset_populator.run_tool( + tool_id="collection_paired_test", + inputs=inputs, + history_id=history_id, + ) + job_id = run_output[ "jobs" ][ 0 ][ "id" ] + self.dataset_populator.wait_for_history( history_id, assert_ok=True ) + downloaded_workflow = self._extract_and_download_workflow( + from_history_id=history_id, + dataset_collection_ids=dumps( [ hdca[ "hid" ] ] ), + job_ids=dumps( [ job_id ] ), + workflow_name="test import from history", + ) + collection_steps = self._get_steps_of_type( downloaded_workflow, "data_collection_input", expected_len=1 ) + collection_step = collection_steps[ 0 ] + collection_step_state = loads( collection_step[ "tool_state" ] ) + self.assertEquals( collection_step_state[ "collection_type" ], u"paired" ) + + def __run_random_lines_mapped_over_pair( self, history_id ): + hdca = self.dataset_collection_populator.create_pair_in_history( history_id, contents=["1 2 3\n4 5 6", "7 8 9\n10 11 10"] ).json() + hdca_id = hdca[ "id" ] + inputs1 = { + "input": { "batch": True, "values": [ { "src": "hdca", "id": hdca_id } ] }, + "num_lines": 2 + } + implicit_hdca1, job_id1 = self._run_tool_get_collection_and_job_id( history_id, "random_lines1", inputs1 ) + inputs2 = { + "input": { "batch": True, "values": [ { "src": "hdca", "id": implicit_hdca1[ "id" ] } ] }, + "num_lines": 1 + } + _, job_id2 = self._run_tool_get_collection_and_job_id( history_id, "random_lines1", inputs2 ) + return hdca, job_id1, job_id2 + + def __assert_looks_like_randomlines_mapping_workflow( self, downloaded_workflow ): + # Assert workflow is input connected to a tool step with one output + # connected to another tool step. + assert len( downloaded_workflow[ "steps" ] ) == 3 + collect_step_idx = self._assert_first_step_is_paired_input( downloaded_workflow ) + tool_steps = self._get_steps_of_type( downloaded_workflow, "tool", expected_len=2 ) + tool_step_idxs = [] + tool_input_step_idxs = [] + for tool_step in tool_steps: + self._assert_has_key( tool_step[ "input_connections" ], "input" ) + input_step_idx = tool_step[ "input_connections" ][ "input" ][ "id" ] + tool_step_idxs.append( tool_step[ "id" ] ) + tool_input_step_idxs.append( input_step_idx ) + + assert collect_step_idx not in tool_step_idxs + assert tool_input_step_idxs[ 0 ] == collect_step_idx + assert tool_input_step_idxs[ 1 ] == tool_step_idxs[ 0 ] + + def __assert_looks_like_cat1_example_workflow( self, downloaded_workflow ): + assert len( downloaded_workflow[ "steps" ] ) == 3 + input_steps = self._get_steps_of_type( downloaded_workflow, "data_input", expected_len=2 ) + tool_step = self._get_steps_of_type( downloaded_workflow, "tool", expected_len=1 )[ 0 ] + + input1 = tool_step[ "input_connections" ][ "input1" ] + input2 = tool_step[ "input_connections" ][ "queries_0|input2" ] + + self.assertEquals( input_steps[ 0 ][ "id" ], input1[ "id" ] ) + self.assertEquals( input_steps[ 1 ][ "id" ], input2[ "id" ] ) + + def __copy_content_to_history( self, history_id, content ): + if content[ "history_content_type" ] == "dataset": + payload = dict( + source="hda", + content=content["id"] + ) + response = self._post( "histories/%s/contents/datasets" % history_id, payload ) + + else: + payload = dict( + source="hdca", + content=content["id"] + ) + response = self._post( "histories/%s/contents/dataset_collections" % history_id, payload ) + self._assert_status_code_is( response, 200 ) + return response.json() + + def __setup_and_run_cat1_workflow( self, history_id ): + workflow = self.workflow_populator.load_workflow( name="test_for_extract" ) + workflow_request, history_id = self._setup_workflow_run( workflow, history_id=history_id ) + run_workflow_response = self._post( "workflows", data=workflow_request ) + self._assert_status_code_is( run_workflow_response, 200 ) + + self.dataset_populator.wait_for_history( history_id, assert_ok=True, timeout=10 ) + return self.__cat_job_id( history_id ) + + def _assert_first_step_is_paired_input( self, downloaded_workflow ): + collection_steps = self._get_steps_of_type( downloaded_workflow, "data_collection_input", expected_len=1 ) + collection_step = collection_steps[ 0 ] + collection_step_state = loads( collection_step[ "tool_state" ] ) + self.assertEquals( collection_step_state[ "collection_type" ], u"paired" ) + collect_step_idx = collection_step[ "id" ] + return collect_step_idx + + def _extract_and_download_workflow( self, **extract_payload ): + create_workflow_response = self._post( "workflows", data=extract_payload ) + self._assert_status_code_is( create_workflow_response, 200 ) + + new_workflow_id = create_workflow_response.json()[ "id" ] + download_response = self._get( "workflows/%s/download" % new_workflow_id ) + self._assert_status_code_is( download_response, 200 ) + downloaded_workflow = download_response.json() + return downloaded_workflow + + def _get_steps_of_type( self, downloaded_workflow, type, expected_len=None ): + steps = [ s for s in downloaded_workflow[ "steps" ].values() if s[ "type" ] == type ] + if expected_len is not None: + n = len( steps ) + assert n == expected_len, "Expected %d steps of type %s, found %d" % ( expected_len, type, n ) + return sorted( steps, key=operator.itemgetter("id") ) + + def __job_id( self, history_id, dataset_id ): + url = "histories/%s/contents/%s/provenance" % ( history_id, dataset_id ) + prov_response = self._get( url, data=dict( follow=False ) ) + self._assert_status_code_is( prov_response, 200 ) + return prov_response.json()[ "job_id" ] + + def __cat_job_id( self, history_id ): + data = dict( history_id=history_id, tool_id="cat1" ) + jobs_response = self._get( "jobs", data=data ) + self._assert_status_code_is( jobs_response, 200 ) + cat1_job_id = jobs_response.json()[ 0 ][ "id" ] + return cat1_job_id + + def _run_tool_get_collection_and_job_id( self, history_id, tool_id, inputs ): + run_output1 = self.dataset_populator.run_tool( + tool_id=tool_id, + inputs=inputs, + history_id=history_id, + ) + implicit_hdca = run_output1[ "implicit_collections" ][ 0 ] + job_id = run_output1[ "jobs" ][ 0 ][ "id" ] + self.dataset_populator.wait_for_history( history_id, assert_ok=True, timeout=20 ) + return implicit_hdca, job_id diff -r e5be58d2ae874cdeb697e1a367ea18652ca93fdc -r c214c2f07f3bf7d86db8ee14188765c9aabdb22e test/api/test_workflows.py --- a/test/api/test_workflows.py +++ b/test/api/test_workflows.py @@ -1,7 +1,5 @@ from base import api from json import dumps -from json import loads -import operator import time from .helpers import WorkflowPopulator from .helpers import DatasetPopulator @@ -46,6 +44,57 @@ self._assert_user_has_workflow_with_name( "%s (imported from API)" % ( workflow[ "name" ] ) ) return upload_response.json()[ "id" ] + def _setup_workflow_run( self, workflow, inputs_by='step_id', history_id=None ): + uploaded_workflow_id = self.workflow_populator.create_workflow( workflow ) + if not history_id: + history_id = self.dataset_populator.new_history() + hda1 = self.dataset_populator.new_dataset( history_id, content="1 2 3" ) + hda2 = self.dataset_populator.new_dataset( history_id, content="4 5 6" ) + workflow_request = dict( + history="hist_id=%s" % history_id, + workflow_id=uploaded_workflow_id, + ) + label_map = { + 'WorkflowInput1': self._ds_entry(hda1), + 'WorkflowInput2': self._ds_entry(hda2) + } + if inputs_by == 'step_id': + ds_map = self._build_ds_map( uploaded_workflow_id, label_map ) + workflow_request[ "ds_map" ] = ds_map + elif inputs_by == "step_index": + index_map = { + '0': self._ds_entry(hda1), + '1': self._ds_entry(hda2) + } + workflow_request[ "inputs" ] = dumps( index_map ) + workflow_request[ "inputs_by" ] = 'step_index' + elif inputs_by == "name": + workflow_request[ "inputs" ] = dumps( label_map ) + workflow_request[ "inputs_by" ] = 'name' + + return workflow_request, history_id + + def _build_ds_map( self, workflow_id, label_map ): + workflow_inputs = self._workflow_inputs( workflow_id ) + ds_map = {} + for key, value in workflow_inputs.iteritems(): + label = value[ "label" ] + if label in label_map: + ds_map[ key ] = label_map[ label ] + return dumps( ds_map ) + + def _ds_entry( self, hda ): + src = 'hda' + if 'history_content_type' in hda and hda[ 'history_content_type' ] == "dataset_collection": + src = 'hdca' + return dict( src=src, id=hda[ "id" ] ) + + def _workflow_inputs( self, uploaded_workflow_id ): + workflow_show_resposne = self._get( "workflows/%s" % uploaded_workflow_id ) + self._assert_status_code_is( workflow_show_resposne, 200 ) + workflow_inputs = workflow_show_resposne.json()[ "inputs" ] + return workflow_inputs + # Workflow API TODO: # - Allow history_id as param to workflow run action. (hist_id) @@ -358,109 +407,6 @@ step_map[step_index] = step["inputs"][0]["name"] return step_map - @skip_without_tool( "cat1" ) - def test_extract_from_history( self ): - history_id = self.dataset_populator.new_history() - # Run the simple test workflow and extract it back out from history - cat1_job_id = self.__setup_and_run_cat1_workflow( history_id=history_id ) - contents_response = self._get( "histories/%s/contents" % history_id ) - input_hids = map( lambda c: c[ "hid" ], contents_response.json()[ 0:2 ] ) - downloaded_workflow = self._extract_and_download_workflow( - from_history_id=history_id, - dataset_ids=dumps( input_hids ), - job_ids=dumps( [ cat1_job_id ] ), - workflow_name="test import from history", - ) - self.assertEquals( downloaded_workflow[ "name" ], "test import from history" ) - self.__assert_looks_like_cat1_example_workflow( downloaded_workflow ) - - def test_extract_with_copied_inputs( self ): - old_history_id = self.dataset_populator.new_history() - # Run the simple test workflow and extract it back out from history - self.__setup_and_run_cat1_workflow( history_id=old_history_id ) - - history_id = self.dataset_populator.new_history() - - # Bug cannot mess up hids or these don't extract correctly. See Trello card here: - # https://trello.com/c/mKzLbM2P - # # create dummy dataset to complicate hid mapping - # self.dataset_populator.new_dataset( history_id, content="dummydataset" ) - # offset = 1 - - offset = 0 - old_contents = self._get( "histories/%s/contents" % old_history_id ).json() - for old_dataset in old_contents: - self.__copy_content_to_history( history_id, old_dataset ) - new_contents = self._get( "histories/%s/contents" % history_id ).json() - input_hids = map( lambda c: c[ "hid" ], new_contents[ (offset + 0):(offset + 2) ] ) - cat1_job_id = self.__job_id( history_id, new_contents[ (offset + 2) ][ "id" ] ) - downloaded_workflow = self._extract_and_download_workflow( - from_history_id=history_id, - dataset_ids=dumps( input_hids ), - job_ids=dumps( [ cat1_job_id ] ), - workflow_name="test import from history", - ) - self.__assert_looks_like_cat1_example_workflow( downloaded_workflow ) - - def __assert_looks_like_cat1_example_workflow( self, downloaded_workflow ): - assert len( downloaded_workflow[ "steps" ] ) == 3 - input_steps = self._get_steps_of_type( downloaded_workflow, "data_input", expected_len=2 ) - tool_step = self._get_steps_of_type( downloaded_workflow, "tool", expected_len=1 )[ 0 ] - - input1 = tool_step[ "input_connections" ][ "input1" ] - input2 = tool_step[ "input_connections" ][ "queries_0|input2" ] - - self.assertEquals( input_steps[ 0 ][ "id" ], input1[ "id" ] ) - self.assertEquals( input_steps[ 1 ][ "id" ], input2[ "id" ] ) - - def __setup_and_run_cat1_workflow( self, history_id ): - workflow = self.workflow_populator.load_workflow( name="test_for_extract" ) - workflow_request, history_id = self._setup_workflow_run( workflow, history_id=history_id ) - run_workflow_response = self._post( "workflows", data=workflow_request ) - self._assert_status_code_is( run_workflow_response, 200 ) - - self.dataset_populator.wait_for_history( history_id, assert_ok=True, timeout=10 ) - return self.__cat_job_id( history_id ) - - def __cat_job_id( self, history_id ): - data = dict( history_id=history_id, tool_id="cat1" ) - jobs_response = self._get( "jobs", data=data ) - self._assert_status_code_is( jobs_response, 200 ) - cat1_job_id = jobs_response.json()[ 0 ][ "id" ] - return cat1_job_id - - def __job_id( self, history_id, dataset_id ): - url = "histories/%s/contents/%s/provenance" % ( history_id, dataset_id ) - prov_response = self._get( url, data=dict( follow=False ) ) - self._assert_status_code_is( prov_response, 200 ) - return prov_response.json()[ "job_id" ] - - @skip_without_tool( "collection_paired_test" ) - def test_extract_workflows_with_dataset_collections( self ): - history_id = self.dataset_populator.new_history() - hdca = self.dataset_collection_populator.create_pair_in_history( history_id ).json() - hdca_id = hdca[ "id" ] - inputs = { - "f1": dict( src="hdca", id=hdca_id ) - } - run_output = self.dataset_populator.run_tool( - tool_id="collection_paired_test", - inputs=inputs, - history_id=history_id, - ) - job_id = run_output[ "jobs" ][ 0 ][ "id" ] - self.dataset_populator.wait_for_history( history_id, assert_ok=True ) - downloaded_workflow = self._extract_and_download_workflow( - from_history_id=history_id, - dataset_collection_ids=dumps( [ hdca[ "hid" ] ] ), - job_ids=dumps( [ job_id ] ), - workflow_name="test import from history", - ) - collection_steps = self._get_steps_of_type( downloaded_workflow, "data_collection_input", expected_len=1 ) - collection_step = collection_steps[ 0 ] - collection_step_state = loads( collection_step[ "tool_state" ] ) - self.assertEquals( collection_step_state[ "collection_type" ], u"paired" ) - def test_empty_create( self ): response = self._post( "workflows" ) self._assert_status_code_is( response, 400 ) @@ -475,161 +421,6 @@ self._assert_status_code_is( response, 400 ) self._assert_error_code_is( response, error_codes.USER_REQUEST_INVALID_PARAMETER ) - @skip_without_tool( "random_lines1" ) - def test_extract_mapping_workflow_from_history( self ): - history_id = self.dataset_populator.new_history() - hdca, job_id1, job_id2 = self.__run_random_lines_mapped_over_pair( history_id ) - downloaded_workflow = self._extract_and_download_workflow( - from_history_id=history_id, - dataset_collection_ids=dumps( [ hdca[ "hid" ] ] ), - job_ids=dumps( [ job_id1, job_id2 ] ), - workflow_name="test import from mapping history", - ) - self.__assert_looks_like_randomlines_mapping_workflow( downloaded_workflow ) - - def test_extract_copied_mapping_from_history( self ): - old_history_id = self.dataset_populator.new_history() - hdca, job_id1, job_id2 = self.__run_random_lines_mapped_over_pair( old_history_id ) - - history_id = self.dataset_populator.new_history() - old_contents = self._get( "histories/%s/contents" % old_history_id ).json() - for old_content in old_contents: - self.__copy_content_to_history( history_id, old_content ) - # API test is somewhat contrived since there is no good way - # to retrieve job_id1, job_id2 like this for copied dataset - # collections I don't think. - downloaded_workflow = self._extract_and_download_workflow( - from_history_id=history_id, - dataset_collection_ids=dumps( [ hdca[ "hid" ] ] ), - job_ids=dumps( [ job_id1, job_id2 ] ), - workflow_name="test import from history", - ) - self.__assert_looks_like_randomlines_mapping_workflow( downloaded_workflow ) - - @skip_without_tool( "random_lines1" ) - @skip_without_tool( "multi_data_param" ) - def test_extract_reduction_from_history( self ): - history_id = self.dataset_populator.new_history() - hdca = self.dataset_collection_populator.create_pair_in_history( history_id, contents=["1 2 3\n4 5 6", "7 8 9\n10 11 10"] ).json() - hdca_id = hdca[ "id" ] - inputs1 = { - "input": { "batch": True, "values": [ { "src": "hdca", "id": hdca_id } ] }, - "num_lines": 2 - } - implicit_hdca1, job_id1 = self._run_tool_get_collection_and_job_id( history_id, "random_lines1", inputs1 ) - inputs2 = { - "f1": { "src": "hdca", "id": implicit_hdca1[ "id" ] }, - "f2": { "src": "hdca", "id": implicit_hdca1[ "id" ] }, - } - reduction_run_output = self.dataset_populator.run_tool( - tool_id="multi_data_param", - inputs=inputs2, - history_id=history_id, - ) - job_id2 = reduction_run_output[ "jobs" ][ 0 ][ "id" ] - self.dataset_populator.wait_for_history( history_id, assert_ok=True, timeout=20 ) - downloaded_workflow = self._extract_and_download_workflow( - from_history_id=history_id, - dataset_collection_ids=dumps( [ hdca[ "hid" ] ] ), - job_ids=dumps( [ job_id1, job_id2 ] ), - workflow_name="test import reduction", - ) - assert len( downloaded_workflow[ "steps" ] ) == 3 - collect_step_idx = self._assert_first_step_is_paired_input( downloaded_workflow ) - tool_steps = self._get_steps_of_type( downloaded_workflow, "tool", expected_len=2 ) - random_lines_map_step = tool_steps[ 0 ] - reduction_step = tool_steps[ 1 ] - random_lines_input = random_lines_map_step[ "input_connections" ][ "input" ] - assert random_lines_input[ "id" ] == collect_step_idx - reduction_step_input = reduction_step[ "input_connections" ][ "f1" ] - assert reduction_step_input[ "id"] == random_lines_map_step[ "id" ] - - def __copy_content_to_history( self, history_id, content ): - if content[ "history_content_type" ] == "dataset": - payload = dict( - source="hda", - content=content["id"] - ) - response = self._post( "histories/%s/contents/datasets" % history_id, payload ) - - else: - payload = dict( - source="hdca", - content=content["id"] - ) - response = self._post( "histories/%s/contents/dataset_collections" % history_id, payload ) - self._assert_status_code_is( response, 200 ) - return response.json() - - def __run_random_lines_mapped_over_pair( self, history_id ): - hdca = self.dataset_collection_populator.create_pair_in_history( history_id, contents=["1 2 3\n4 5 6", "7 8 9\n10 11 10"] ).json() - hdca_id = hdca[ "id" ] - inputs1 = { - "input": { "batch": True, "values": [ { "src": "hdca", "id": hdca_id } ] }, - "num_lines": 2 - } - implicit_hdca1, job_id1 = self._run_tool_get_collection_and_job_id( history_id, "random_lines1", inputs1 ) - inputs2 = { - "input": { "batch": True, "values": [ { "src": "hdca", "id": implicit_hdca1[ "id" ] } ] }, - "num_lines": 1 - } - _, job_id2 = self._run_tool_get_collection_and_job_id( history_id, "random_lines1", inputs2 ) - return hdca, job_id1, job_id2 - - def __assert_looks_like_randomlines_mapping_workflow( self, downloaded_workflow ): - # Assert workflow is input connected to a tool step with one output - # connected to another tool step. - assert len( downloaded_workflow[ "steps" ] ) == 3 - collect_step_idx = self._assert_first_step_is_paired_input( downloaded_workflow ) - tool_steps = self._get_steps_of_type( downloaded_workflow, "tool", expected_len=2 ) - tool_step_idxs = [] - tool_input_step_idxs = [] - for tool_step in tool_steps: - self._assert_has_key( tool_step[ "input_connections" ], "input" ) - input_step_idx = tool_step[ "input_connections" ][ "input" ][ "id" ] - tool_step_idxs.append( tool_step[ "id" ] ) - tool_input_step_idxs.append( input_step_idx ) - - assert collect_step_idx not in tool_step_idxs - assert tool_input_step_idxs[ 0 ] == collect_step_idx - assert tool_input_step_idxs[ 1 ] == tool_step_idxs[ 0 ] - - def _run_tool_get_collection_and_job_id( self, history_id, tool_id, inputs ): - run_output1 = self.dataset_populator.run_tool( - tool_id=tool_id, - inputs=inputs, - history_id=history_id, - ) - implicit_hdca = run_output1[ "implicit_collections" ][ 0 ] - job_id = run_output1[ "jobs" ][ 0 ][ "id" ] - self.dataset_populator.wait_for_history( history_id, assert_ok=True, timeout=20 ) - return implicit_hdca, job_id - - def _assert_first_step_is_paired_input( self, downloaded_workflow ): - collection_steps = self._get_steps_of_type( downloaded_workflow, "data_collection_input", expected_len=1 ) - collection_step = collection_steps[ 0 ] - collection_step_state = loads( collection_step[ "tool_state" ] ) - self.assertEquals( collection_step_state[ "collection_type" ], u"paired" ) - collect_step_idx = collection_step[ "id" ] - return collect_step_idx - - def _extract_and_download_workflow( self, **extract_payload ): - create_workflow_response = self._post( "workflows", data=extract_payload ) - self._assert_status_code_is( create_workflow_response, 200 ) - - new_workflow_id = create_workflow_response.json()[ "id" ] - download_response = self._get( "workflows/%s/download" % new_workflow_id ) - self._assert_status_code_is( download_response, 200 ) - downloaded_workflow = download_response.json() - return downloaded_workflow - - def _get_steps_of_type( self, downloaded_workflow, type, expected_len=None ): - steps = [ s for s in downloaded_workflow[ "steps" ].values() if s[ "type" ] == type ] - if expected_len is not None: - n = len( steps ) - assert n == expected_len, "Expected %d steps of type %s, found %d" % ( expected_len, type, n ) - return sorted( steps, key=operator.itemgetter("id") ) - @skip_without_tool( "cat1" ) def test_run_with_pja( self ): workflow = self.workflow_populator.load_workflow( name="test_for_pja_run", add_pja=True ) @@ -776,45 +567,6 @@ assert len( usages ) == 1 return workflow_id, usages[ 0 ] - def _setup_workflow_run( self, workflow, inputs_by='step_id', history_id=None ): - uploaded_workflow_id = self.workflow_populator.create_workflow( workflow ) - if not history_id: - history_id = self.dataset_populator.new_history() - hda1 = self.dataset_populator.new_dataset( history_id, content="1 2 3" ) - hda2 = self.dataset_populator.new_dataset( history_id, content="4 5 6" ) - workflow_request = dict( - history="hist_id=%s" % history_id, - workflow_id=uploaded_workflow_id, - ) - label_map = { - 'WorkflowInput1': self._ds_entry(hda1), - 'WorkflowInput2': self._ds_entry(hda2) - } - if inputs_by == 'step_id': - ds_map = self._build_ds_map( uploaded_workflow_id, label_map ) - workflow_request[ "ds_map" ] = ds_map - elif inputs_by == "step_index": - index_map = { - '0': self._ds_entry(hda1), - '1': self._ds_entry(hda2) - } - workflow_request[ "inputs" ] = dumps( index_map ) - workflow_request[ "inputs_by" ] = 'step_index' - elif inputs_by == "name": - workflow_request[ "inputs" ] = dumps( label_map ) - workflow_request[ "inputs_by" ] = 'name' - - return workflow_request, history_id - - def _build_ds_map( self, workflow_id, label_map ): - workflow_inputs = self._workflow_inputs( workflow_id ) - ds_map = {} - for key, value in workflow_inputs.iteritems(): - label = value[ "label" ] - if label in label_map: - ds_map[ key ] = label_map[ label ] - return dumps( ds_map ) - def _setup_random_x2_workflow_steps( self, name ): workflow_request, history_id = self._setup_random_x2_workflow( "test_for_replace_step_params" ) random_line_steps = self._random_lines_steps( workflow_request ) @@ -843,18 +595,6 @@ ) return workflow_request, history_id - def _workflow_inputs( self, uploaded_workflow_id ): - workflow_show_resposne = self._get( "workflows/%s" % uploaded_workflow_id ) - self._assert_status_code_is( workflow_show_resposne, 200 ) - workflow_inputs = workflow_show_resposne.json()[ "inputs" ] - return workflow_inputs - - def _ds_entry( self, hda ): - src = 'hda' - if 'history_content_type' in hda and hda[ 'history_content_type' ] == "dataset_collection": - src = 'hdca' - return dict( src=src, id=hda[ "id" ] ) - def __review_paused_steps( self, uploaded_workflow_id, invocation_id, order_index, action=True ): invocation = self._invocation_details( uploaded_workflow_id, invocation_id ) invocation_steps = invocation[ "steps" ] https://bitbucket.org/galaxy/galaxy-central/commits/415221281212/ Changeset: 415221281212 User: jmchilton Date: 2014-12-02 02:18:56+00:00 Summary: Simplify new test_workflow_extraction.py. Now makes sense to assume all tests consume a history. Affected #: 1 file diff -r c214c2f07f3bf7d86db8ee14188765c9aabdb22e -r 415221281212483cdad6213e284c03e0698dbf63 test/api/test_workflow_extraction.py --- a/test/api/test_workflow_extraction.py +++ b/test/api/test_workflow_extraction.py @@ -9,16 +9,16 @@ def setUp( self ): super( WorkflowExtractionApiTestCase, self ).setUp() + self.history_id = self.dataset_populator.new_history() @skip_without_tool( "cat1" ) def test_extract_from_history( self ): - history_id = self.dataset_populator.new_history() # Run the simple test workflow and extract it back out from history - cat1_job_id = self.__setup_and_run_cat1_workflow( history_id=history_id ) - contents_response = self._get( "histories/%s/contents" % history_id ) + cat1_job_id = self.__setup_and_run_cat1_workflow( history_id=self.history_id ) + contents_response = self._get( "histories/%s/contents" % self.history_id ) input_hids = map( lambda c: c[ "hid" ], contents_response.json()[ 0:2 ] ) downloaded_workflow = self._extract_and_download_workflow( - from_history_id=history_id, + from_history_id=self.history_id, dataset_ids=dumps( input_hids ), job_ids=dumps( [ cat1_job_id ] ), workflow_name="test import from history", @@ -31,8 +31,6 @@ # Run the simple test workflow and extract it back out from history self.__setup_and_run_cat1_workflow( history_id=old_history_id ) - history_id = self.dataset_populator.new_history() - # Bug cannot mess up hids or these don't extract correctly. See Trello card here: # https://trello.com/c/mKzLbM2P # # create dummy dataset to complicate hid mapping @@ -42,12 +40,12 @@ offset = 0 old_contents = self._get( "histories/%s/contents" % old_history_id ).json() for old_dataset in old_contents: - self.__copy_content_to_history( history_id, old_dataset ) - new_contents = self._get( "histories/%s/contents" % history_id ).json() + self.__copy_content_to_history( self.history_id, old_dataset ) + new_contents = self._get( "histories/%s/contents" % self.history_id ).json() input_hids = map( lambda c: c[ "hid" ], new_contents[ (offset + 0):(offset + 2) ] ) - cat1_job_id = self.__job_id( history_id, new_contents[ (offset + 2) ][ "id" ] ) + cat1_job_id = self.__job_id( self.history_id, new_contents[ (offset + 2) ][ "id" ] ) downloaded_workflow = self._extract_and_download_workflow( - from_history_id=history_id, + from_history_id=self.history_id, dataset_ids=dumps( input_hids ), job_ids=dumps( [ cat1_job_id ] ), workflow_name="test import from history", @@ -56,10 +54,9 @@ @skip_without_tool( "random_lines1" ) def test_extract_mapping_workflow_from_history( self ): - history_id = self.dataset_populator.new_history() - hdca, job_id1, job_id2 = self.__run_random_lines_mapped_over_pair( history_id ) + hdca, job_id1, job_id2 = self.__run_random_lines_mapped_over_pair( self.history_id ) downloaded_workflow = self._extract_and_download_workflow( - from_history_id=history_id, + from_history_id=self.history_id, dataset_collection_ids=dumps( [ hdca[ "hid" ] ] ), job_ids=dumps( [ job_id1, job_id2 ] ), workflow_name="test import from mapping history", @@ -70,15 +67,14 @@ old_history_id = self.dataset_populator.new_history() hdca, job_id1, job_id2 = self.__run_random_lines_mapped_over_pair( old_history_id ) - history_id = self.dataset_populator.new_history() old_contents = self._get( "histories/%s/contents" % old_history_id ).json() for old_content in old_contents: - self.__copy_content_to_history( history_id, old_content ) + self.__copy_content_to_history( self.history_id, old_content ) # API test is somewhat contrived since there is no good way # to retrieve job_id1, job_id2 like this for copied dataset # collections I don't think. downloaded_workflow = self._extract_and_download_workflow( - from_history_id=history_id, + from_history_id=self.history_id, dataset_collection_ids=dumps( [ hdca[ "hid" ] ] ), job_ids=dumps( [ job_id1, job_id2 ] ), workflow_name="test import from history", @@ -88,14 +84,13 @@ @skip_without_tool( "random_lines1" ) @skip_without_tool( "multi_data_param" ) def test_extract_reduction_from_history( self ): - history_id = self.dataset_populator.new_history() - hdca = self.dataset_collection_populator.create_pair_in_history( history_id, contents=["1 2 3\n4 5 6", "7 8 9\n10 11 10"] ).json() + hdca = self.dataset_collection_populator.create_pair_in_history( self.history_id, contents=["1 2 3\n4 5 6", "7 8 9\n10 11 10"] ).json() hdca_id = hdca[ "id" ] inputs1 = { "input": { "batch": True, "values": [ { "src": "hdca", "id": hdca_id } ] }, "num_lines": 2 } - implicit_hdca1, job_id1 = self._run_tool_get_collection_and_job_id( history_id, "random_lines1", inputs1 ) + implicit_hdca1, job_id1 = self._run_tool_get_collection_and_job_id( self.history_id, "random_lines1", inputs1 ) inputs2 = { "f1": { "src": "hdca", "id": implicit_hdca1[ "id" ] }, "f2": { "src": "hdca", "id": implicit_hdca1[ "id" ] }, @@ -103,12 +98,12 @@ reduction_run_output = self.dataset_populator.run_tool( tool_id="multi_data_param", inputs=inputs2, - history_id=history_id, + history_id=self.history_id, ) job_id2 = reduction_run_output[ "jobs" ][ 0 ][ "id" ] - self.dataset_populator.wait_for_history( history_id, assert_ok=True, timeout=20 ) + self.dataset_populator.wait_for_history( self.history_id, assert_ok=True, timeout=20 ) downloaded_workflow = self._extract_and_download_workflow( - from_history_id=history_id, + from_history_id=self.history_id, dataset_collection_ids=dumps( [ hdca[ "hid" ] ] ), job_ids=dumps( [ job_id1, job_id2 ] ), workflow_name="test import reduction", @@ -125,8 +120,7 @@ @skip_without_tool( "collection_paired_test" ) def test_extract_workflows_with_dataset_collections( self ): - history_id = self.dataset_populator.new_history() - hdca = self.dataset_collection_populator.create_pair_in_history( history_id ).json() + hdca = self.dataset_collection_populator.create_pair_in_history( self.history_id ).json() hdca_id = hdca[ "id" ] inputs = { "f1": dict( src="hdca", id=hdca_id ) @@ -134,12 +128,12 @@ run_output = self.dataset_populator.run_tool( tool_id="collection_paired_test", inputs=inputs, - history_id=history_id, + history_id=self.history_id, ) job_id = run_output[ "jobs" ][ 0 ][ "id" ] - self.dataset_populator.wait_for_history( history_id, assert_ok=True ) + self.dataset_populator.wait_for_history( self.history_id, assert_ok=True ) downloaded_workflow = self._extract_and_download_workflow( - from_history_id=history_id, + from_history_id=self.history_id, dataset_collection_ids=dumps( [ hdca[ "hid" ] ] ), job_ids=dumps( [ job_id ] ), workflow_name="test import from history", https://bitbucket.org/galaxy/galaxy-central/commits/b8a2150cfea0/ Changeset: b8a2150cfea0 User: jmchilton Date: 2014-12-02 02:18:56+00:00 Summary: Simplify interface to test helper _extract_and_download_workflow. Makes tests in this file marginally more readable. Affected #: 1 file diff -r 415221281212483cdad6213e284c03e0698dbf63 -r b8a2150cfea0a925cd21f7de71e1547b3b118025 test/api/test_workflow_extraction.py --- a/test/api/test_workflow_extraction.py +++ b/test/api/test_workflow_extraction.py @@ -18,10 +18,8 @@ contents_response = self._get( "histories/%s/contents" % self.history_id ) input_hids = map( lambda c: c[ "hid" ], contents_response.json()[ 0:2 ] ) downloaded_workflow = self._extract_and_download_workflow( - from_history_id=self.history_id, - dataset_ids=dumps( input_hids ), - job_ids=dumps( [ cat1_job_id ] ), - workflow_name="test import from history", + dataset_ids=input_hids, + job_ids=[ cat1_job_id ], ) self.assertEquals( downloaded_workflow[ "name" ], "test import from history" ) self.__assert_looks_like_cat1_example_workflow( downloaded_workflow ) @@ -45,10 +43,8 @@ input_hids = map( lambda c: c[ "hid" ], new_contents[ (offset + 0):(offset + 2) ] ) cat1_job_id = self.__job_id( self.history_id, new_contents[ (offset + 2) ][ "id" ] ) downloaded_workflow = self._extract_and_download_workflow( - from_history_id=self.history_id, - dataset_ids=dumps( input_hids ), - job_ids=dumps( [ cat1_job_id ] ), - workflow_name="test import from history", + dataset_ids=input_hids, + job_ids=[ cat1_job_id ], ) self.__assert_looks_like_cat1_example_workflow( downloaded_workflow ) @@ -56,10 +52,8 @@ def test_extract_mapping_workflow_from_history( self ): hdca, job_id1, job_id2 = self.__run_random_lines_mapped_over_pair( self.history_id ) downloaded_workflow = self._extract_and_download_workflow( - from_history_id=self.history_id, - dataset_collection_ids=dumps( [ hdca[ "hid" ] ] ), - job_ids=dumps( [ job_id1, job_id2 ] ), - workflow_name="test import from mapping history", + dataset_collection_ids=[ hdca[ "hid" ] ], + job_ids=[ job_id1, job_id2 ], ) self.__assert_looks_like_randomlines_mapping_workflow( downloaded_workflow ) @@ -74,10 +68,8 @@ # to retrieve job_id1, job_id2 like this for copied dataset # collections I don't think. downloaded_workflow = self._extract_and_download_workflow( - from_history_id=self.history_id, - dataset_collection_ids=dumps( [ hdca[ "hid" ] ] ), - job_ids=dumps( [ job_id1, job_id2 ] ), - workflow_name="test import from history", + dataset_collection_ids=[ hdca[ "hid" ] ], + job_ids=[ job_id1, job_id2 ], ) self.__assert_looks_like_randomlines_mapping_workflow( downloaded_workflow ) @@ -103,10 +95,8 @@ job_id2 = reduction_run_output[ "jobs" ][ 0 ][ "id" ] self.dataset_populator.wait_for_history( self.history_id, assert_ok=True, timeout=20 ) downloaded_workflow = self._extract_and_download_workflow( - from_history_id=self.history_id, - dataset_collection_ids=dumps( [ hdca[ "hid" ] ] ), - job_ids=dumps( [ job_id1, job_id2 ] ), - workflow_name="test import reduction", + dataset_collection_ids=[ hdca[ "hid" ] ], + job_ids=[ job_id1, job_id2 ], ) assert len( downloaded_workflow[ "steps" ] ) == 3 collect_step_idx = self._assert_first_step_is_paired_input( downloaded_workflow ) @@ -133,10 +123,8 @@ job_id = run_output[ "jobs" ][ 0 ][ "id" ] self.dataset_populator.wait_for_history( self.history_id, assert_ok=True ) downloaded_workflow = self._extract_and_download_workflow( - from_history_id=self.history_id, - dataset_collection_ids=dumps( [ hdca[ "hid" ] ] ), - job_ids=dumps( [ job_id ] ), - workflow_name="test import from history", + dataset_collection_ids=[ hdca[ "hid" ] ], + job_ids=[ job_id ], ) collection_steps = self._get_steps_of_type( downloaded_workflow, "data_collection_input", expected_len=1 ) collection_step = collection_steps[ 0 ] @@ -222,6 +210,18 @@ return collect_step_idx def _extract_and_download_workflow( self, **extract_payload ): + if "from_history_id" not in extract_payload: + extract_payload[ "from_history_id" ] = self.history_id + + if "workflow_name" not in extract_payload: + extract_payload[ "workflow_name" ] = "test import from history" + + for key in "job_ids", "dataset_ids", "dataset_collection_ids": + if key in extract_payload: + value = extract_payload[ key ] + if isinstance(value, list): + extract_payload[ key ] = dumps( value ) + create_workflow_response = self._post( "workflows", data=extract_payload ) self._assert_status_code_is( create_workflow_response, 200 ) https://bitbucket.org/galaxy/galaxy-central/commits/47195be80a9f/ Changeset: 47195be80a9f User: jmchilton Date: 2014-12-02 02:18:56+00:00 Summary: Introduce another helper to simplify test_workflow_extraction.py. Affected #: 1 file diff -r b8a2150cfea0a925cd21f7de71e1547b3b118025 -r 47195be80a9fa06c4777103d74dcbb2a18a8c33b test/api/test_workflow_extraction.py --- a/test/api/test_workflow_extraction.py +++ b/test/api/test_workflow_extraction.py @@ -15,8 +15,8 @@ def test_extract_from_history( self ): # Run the simple test workflow and extract it back out from history cat1_job_id = self.__setup_and_run_cat1_workflow( history_id=self.history_id ) - contents_response = self._get( "histories/%s/contents" % self.history_id ) - input_hids = map( lambda c: c[ "hid" ], contents_response.json()[ 0:2 ] ) + contents = self._history_contents() + input_hids = map( lambda c: c[ "hid" ], contents[ 0:2 ] ) downloaded_workflow = self._extract_and_download_workflow( dataset_ids=input_hids, job_ids=[ cat1_job_id ], @@ -36,10 +36,10 @@ # offset = 1 offset = 0 - old_contents = self._get( "histories/%s/contents" % old_history_id ).json() + old_contents = self._history_contents( old_history_id ) for old_dataset in old_contents: self.__copy_content_to_history( self.history_id, old_dataset ) - new_contents = self._get( "histories/%s/contents" % self.history_id ).json() + new_contents = self._history_contents() input_hids = map( lambda c: c[ "hid" ], new_contents[ (offset + 0):(offset + 2) ] ) cat1_job_id = self.__job_id( self.history_id, new_contents[ (offset + 2) ][ "id" ] ) downloaded_workflow = self._extract_and_download_workflow( @@ -61,7 +61,7 @@ old_history_id = self.dataset_populator.new_history() hdca, job_id1, job_id2 = self.__run_random_lines_mapped_over_pair( old_history_id ) - old_contents = self._get( "histories/%s/contents" % old_history_id ).json() + old_contents = self._history_contents( old_history_id ) for old_content in old_contents: self.__copy_content_to_history( self.history_id, old_content ) # API test is somewhat contrived since there is no good way @@ -175,6 +175,11 @@ self.assertEquals( input_steps[ 0 ][ "id" ], input1[ "id" ] ) self.assertEquals( input_steps[ 1 ][ "id" ], input2[ "id" ] ) + def _history_contents( self, history_id=None ): + if history_id is None: + history_id = self.history_id + return self._get( "histories/%s/contents" % history_id ).json() + def __copy_content_to_history( self, history_id, content ): if content[ "history_content_type" ] == "dataset": payload = dict( https://bitbucket.org/galaxy/galaxy-central/commits/d0dc9272a481/ Changeset: d0dc9272a481 User: jmchilton Date: 2014-12-02 02:18:56+00:00 Summary: Fix for running test_extract_workflow.py stand-alone. Affected #: 1 file diff -r 47195be80a9fa06c4777103d74dcbb2a18a8c33b -r d0dc9272a4819e2fb8867b45b782ac2e6d3d8861 test/unit/workflows/test_extract_summary.py --- a/test/unit/workflows/test_extract_summary.py +++ b/test/unit/workflows/test_extract_summary.py @@ -1,5 +1,6 @@ import unittest +import galaxy.model.mapping from galaxy import model from galaxy.workflow import extract https://bitbucket.org/galaxy/galaxy-central/commits/bc4086a04bef/ Changeset: bc4086a04bef User: jmchilton Date: 2014-12-02 02:18:56+00:00 Summary: Remove dated TODO from workflow extraction code. Affected #: 1 file diff -r d0dc9272a4819e2fb8867b45b782ac2e6d3d8861 -r bc4086a04beff9b1530e3093a15e15ffd8faa6b0 lib/galaxy/workflow/extract.py --- a/lib/galaxy/workflow/extract.py +++ b/lib/galaxy/workflow/extract.py @@ -205,8 +205,6 @@ else: dataset_collection = content # TODO: Optimize db call - # TODO: Ensure this is deterministic, must get same job - # for each dataset collection. dataset_instance = dataset_collection.collection.dataset_instances[ 0 ] if not self.__check_state( dataset_instance ): # Just checking the state of one instance, don't need more but https://bitbucket.org/galaxy/galaxy-central/commits/172c82454647/ Changeset: 172c82454647 User: jmchilton Date: 2014-12-02 02:18:56+00:00 Summary: More unit tests for workflow extraction summarize method. Affected #: 1 file diff -r bc4086a04beff9b1530e3093a15e15ffd8faa6b0 -r 172c824546478705dfc108310bd5db88254f5798 test/unit/workflows/test_extract_summary.py --- a/test/unit/workflows/test_extract_summary.py +++ b/test/unit/workflows/test_extract_summary.py @@ -46,7 +46,7 @@ assert len( job_dict ) == 1 self.assertEquals( job_dict[ hda.job ], [ ('out1', derived_hda_2 ) ] ) - def test_fake_job( self ): + def test_fake_job_hda( self ): """ Fakes job if creating_job_associations is empty. """ hda = MockHda( job=UNDEFINED_JOB ) @@ -59,6 +59,28 @@ datasets = job_dict.values()[ 0 ] assert datasets == [ ( None, hda ) ] + def test_fake_job_hdca( self ): + hdca = MockHdca( ) + self.history.active_datasets.append( hdca ) + job_dict, warnings = extract.summarize( trans=self.trans ) + assert not warnings + assert len( job_dict ) == 1 + fake_job = job_dict.keys()[ 0 ] + assert fake_job.id.startswith( "fake_" ) + assert fake_job.is_fake + content_instances = job_dict.values()[ 0 ] + assert content_instances == [ ( None, hdca ) ] + + def test_implicit_map_job_hdca( self ): + creating_job = model.Job() + hdca = MockHdca( implicit_output_name="out1", job=creating_job ) + self.history.active_datasets.append( hdca ) + job_dict, warnings = extract.summarize( trans=self.trans ) + assert not warnings + assert len( job_dict ) == 1 + job = job_dict.keys()[ 0 ] + assert job is creating_job + def test_warns_and_skips_datasets_if_not_finished( self ): hda = MockHda( state='queued' ) self.history.active_datasets.append( hda ) @@ -102,3 +124,31 @@ self.creating_job_associations = [ assoc ] else: self.creating_job_associations = [] + + +class MockHdca( object ): + + def __init__( self, implicit_output_name=None, job=None, hid=1 ): + self.id = 124 + self.copied_from_history_dataset_collection_association = None + self.history_content_type = "dataset_collection" + self.implicit_output_name = implicit_output_name + self.hid = 1 + self.collection = model.DatasetCollection() + element = model.DatasetCollectionElement( + collection=self.collection, + element=model.HistoryDatasetAssociation(), + element_index=0, + element_identifier="moocow", + ) + element.dataset_instance.dataset = model.Dataset() + element.dataset_instance.dataset.state = "ok" + creating = model.JobToOutputDatasetAssociation( + implicit_output_name, + element.dataset_instance, + ) + creating.job = job + element.dataset_instance.creating_job_associations = [ + creating, + ] + self.collection.elements = [element] https://bitbucket.org/galaxy/galaxy-central/commits/74e82c604d02/ Changeset: 74e82c604d02 User: jmchilton Date: 2014-12-02 02:18:56+00:00 Summary: Refactor lib/galaxy/workflow/extract.py to breakup big loop. Affected #: 1 file diff -r 172c824546478705dfc108310bd5db88254f5798 -r 74e82c604d026307fa8d72a7850ddd33c5f9259b lib/galaxy/workflow/extract.py --- a/lib/galaxy/workflow/extract.py +++ b/lib/galaxy/workflow/extract.py @@ -195,39 +195,48 @@ # needed because cannot allow selection of individual datasets from an implicit # mapping during extraction - you get the collection or nothing. for content in self.history.active_contents: - if content.history_content_type == "dataset_collection": - hid = content.hid - content = self.__original_hdca( content ) - self.collection_types[ hid ] = content.collection.collection_type - if not content.implicit_output_name: - job = DatasetCollectionCreationJob( content ) - self.jobs[ job ] = [ ( None, content ) ] + self.__summarize_content( content ) + + def __summarize_content( self, content ): + # Update internal state for history content (either an HDA or + # an HDCA). + if content.history_content_type == "dataset_collection": + self.__summarize_dataset_collection( content ) + else: + self.__summarize_dataset( content ) + + def __summarize_dataset_collection( self, content ): + content = self.__original_hdca( content ) + dataset_collection = content + hid = content.hid + self.collection_types[ hid ] = content.collection.collection_type + if not content.implicit_output_name: + job = DatasetCollectionCreationJob( content ) + self.jobs[ job ] = [ ( None, content ) ] + else: + dataset_collection = content + # TODO: Optimize db call + dataset_instance = dataset_collection.collection.dataset_instances[ 0 ] + if not self.__check_state( dataset_instance ): + # Just checking the state of one instance, don't need more but + # makes me wonder if even need this check at all? + return + + job_hda = self.__original_hda( dataset_instance ) + if not job_hda.creating_job_associations: + log.warn( "An implicitly create output dataset collection doesn't have a creating_job_association, should not happen!" ) + job = DatasetCollectionCreationJob( dataset_collection ) + self.jobs[ job ] = [ ( None, dataset_collection ) ] + + for assoc in job_hda.creating_job_associations: + job = assoc.job + if job not in self.jobs or self.jobs[ job ][ 0 ][ 1 ].history_content_type == "dataset": + self.jobs[ job ] = [ ( assoc.name, dataset_collection ) ] + self.implicit_map_jobs.append( job ) else: - dataset_collection = content - # TODO: Optimize db call - dataset_instance = dataset_collection.collection.dataset_instances[ 0 ] - if not self.__check_state( dataset_instance ): - # Just checking the state of one instance, don't need more but - # makes me wonder if even need this check at all? - continue + self.jobs[ job ].append( ( assoc.name, dataset_collection ) ) - job_hda = self.__original_hda( dataset_instance ) - if not job_hda.creating_job_associations: - log.warn( "An implicitly create output dataset collection doesn't have a creating_job_association, should not happen!" ) - job = DatasetCollectionCreationJob( dataset_collection ) - self.jobs[ job ] = [ ( None, dataset_collection ) ] - - for assoc in job_hda.creating_job_associations: - job = assoc.job - if job not in self.jobs or self.jobs[ job ][ 0 ][ 1 ].history_content_type == "dataset": - self.jobs[ job ] = [ ( assoc.name, dataset_collection ) ] - self.implicit_map_jobs.append( job ) - else: - self.jobs[ job ].append( ( assoc.name, dataset_collection ) ) - else: - self.__append_dataset( content ) - - def __append_dataset( self, dataset ): + def __summarize_dataset( self, dataset ): if not self.__check_state( dataset ): return Repository URL: https://bitbucket.org/galaxy/galaxy-central/ -- This is a commit notification from bitbucket.org. You are receiving this because you have the service enabled, addressing the recipient of this email.