commit/galaxy-central: 6 new changesets
6 new commits in galaxy-central: https://bitbucket.org/galaxy/galaxy-central/commits/6ae144fd51f7/ Changeset: 6ae144fd51f7 User: jmchilton Date: 2015-01-15 14:30:00+00:00 Summary: Allow tools to output collections with static or determinable structure. By "static" I mean tools such as a FASTQ de-interlacer that would produce a "paired" collection with two datasets everytime. By "determinable" I mean tools that perform N->N operations within the same job - such as a tool that needs to normalize a bunch of datasets all at once and not in separate jobs. (For N->N collection operations that should or can be done in N separate jobs tool authors should just write tools that operate over a dataset and produce a dataset and let the end-user 'map over' that operation.) There are still large classes of operations where the structure of the output collection cannot be pre-determined - such as splitting files (e.g. bam files by read group) - that are not implemented in this commit. Model: The models have been updated to do a more thorough job of tracking collection outputs. Jobs just producing HistoryDatasetCollectionAssociations works fine for simple jobs producing collections - but you don't want to map a list over a tool that produces a pair and produce a bunch of pairs HDCAs and a list:pair HDCA- you just want a bunch of pieces and the one list:pair at that the top. Workflow: Workflows containing such operations can be executed - but the workflow editor has not been updated to handle this complexity (and it will require a significant overhaul) so such tools are not available in the workflow editor. Tool Testing: This commit also introduces a new tool XML syntax for describing tests on output collections. See files test/functional/tools/collection_creates_list.xml and test/functional/tools/collection_creates_pair.xml for examples. Tests: Includes two tools to test this - one that uses explicit pair output names and one that iterates over the structure of input list to produce an output list. Includes several new tools API tests that test the tools described above via the API and implicit mapping over such tools. Includes two new workflow API tests - one that verifies a simple workflow with output collections works and one that verifies mapping over workflow steps in collections works. Affected #: 25 files diff -r 6d3099596550b23ed9c21384e35ffb3a7af57a64 -r 6ae144fd51f756031e1691268369dcbe5440a16b lib/galaxy/jobs/runners/__init__.py --- a/lib/galaxy/jobs/runners/__init__.py +++ b/lib/galaxy/jobs/runners/__init__.py @@ -216,7 +216,7 @@ job_tool = job_wrapper.tool for (joda, dataset) in self._walk_dataset_outputs( job ): if joda and job_tool: - hda_tool_output = job_tool.outputs.get( joda.name, None ) + hda_tool_output = job_tool.find_output_def( joda.name ) if hda_tool_output and hda_tool_output.from_work_dir: # Copy from working dir to HDA. # TODO: move instead of copy to save time? diff -r 6d3099596550b23ed9c21384e35ffb3a7af57a64 -r 6ae144fd51f756031e1691268369dcbe5440a16b lib/galaxy/managers/collections.py --- a/lib/galaxy/managers/collections.py +++ b/lib/galaxy/managers/collections.py @@ -72,7 +72,11 @@ for input_name, input_collection in implicit_collection_info[ "implicit_inputs" ]: dataset_collection_instance.add_implicit_input_collection( input_name, input_collection ) for output_dataset in implicit_collection_info.get( "outputs" ): - output_dataset.hidden_beneath_collection_instance = dataset_collection_instance + if isinstance( output_dataset, model.HistoryDatasetCollectionAssociation ): + dataset_collection_instance.add_implicit_input_collection( input_name, input_collection ) + else: + # dataset collection, don't need to do anything... + pass trans.sa_session.add( output_dataset ) dataset_collection_instance.implicit_output_name = implicit_collection_info[ "implicit_output_name" ] @@ -92,6 +96,18 @@ return self.__persist( dataset_collection_instance ) + def create_dataset_collection( + self, + trans, + collection_type, + elements=None, + ): + return self.__create_dataset_collection( + trans=trans, + collection_type=collection_type, + elements=elements, + ) + def __create_dataset_collection( self, trans, diff -r 6d3099596550b23ed9c21384e35ffb3a7af57a64 -r 6ae144fd51f756031e1691268369dcbe5440a16b lib/galaxy/model/__init__.py --- a/lib/galaxy/model/__init__.py +++ b/lib/galaxy/model/__init__.py @@ -329,6 +329,7 @@ self.input_datasets = [] self.output_datasets = [] self.input_dataset_collections = [] + self.output_dataset_collection_instances = [] self.output_dataset_collections = [] self.input_library_datasets = [] self.output_library_datasets = [] @@ -472,8 +473,10 @@ self.output_datasets.append( JobToOutputDatasetAssociation( name, dataset ) ) def add_input_dataset_collection( self, name, dataset ): self.input_dataset_collections.append( JobToInputDatasetCollectionAssociation( name, dataset ) ) - def add_output_dataset_collection( self, name, dataset ): - self.output_dataset_collections.append( JobToOutputDatasetCollectionAssociation( name, dataset ) ) + def add_output_dataset_collection( self, name, dataset_collection_instance ): + self.output_dataset_collection_instances.append( JobToOutputDatasetCollectionAssociation( name, dataset_collection_instance ) ) + def add_implicit_output_dataset_collection( self, name, dataset_collection ): + self.output_dataset_collections.append( JobToImplicitOutputDatasetCollectionAssociation( name, dataset_collection ) ) def add_input_library_dataset( self, name, dataset ): self.input_library_datasets.append( JobToInputLibraryDatasetAssociation( name, dataset ) ) def add_output_library_dataset( self, name, dataset ): @@ -734,7 +737,18 @@ self.dataset = dataset +# Many jobs may map to one HistoryDatasetCollection using these for a given +# tool output (if mapping over an input collection). class JobToOutputDatasetCollectionAssociation( object ): + def __init__( self, name, dataset_collection_instance ): + self.name = name + self.dataset_collection_instance = dataset_collection_instance + + +# A DatasetCollection will be mapped to at most one job per tool output +# using these. (You can think of many of these models as going into the +# creation of a JobToOutputDatasetCollectionAssociation.) +class JobToImplicitOutputDatasetCollectionAssociation( object ): def __init__( self, name, dataset_collection ): self.name = name self.dataset_collection = dataset_collection diff -r 6d3099596550b23ed9c21384e35ffb3a7af57a64 -r 6ae144fd51f756031e1691268369dcbe5440a16b lib/galaxy/model/mapping.py --- a/lib/galaxy/model/mapping.py +++ b/lib/galaxy/model/mapping.py @@ -448,6 +448,12 @@ Column( "dataset_collection_id", Integer, ForeignKey( "history_dataset_collection_association.id" ), index=True ), Column( "name", Unicode(255) ) ) +model.JobToImplicitOutputDatasetCollectionAssociation.table = Table( "job_to_implicit_output_dataset_collection", metadata, + Column( "id", Integer, primary_key=True ), + Column( "job_id", Integer, ForeignKey( "job.id" ), index=True ), + Column( "dataset_collection_id", Integer, ForeignKey( "dataset_collection.id" ), index=True ), + Column( "name", Unicode(255) ) ) + model.JobToOutputDatasetCollectionAssociation.table = Table( "job_to_output_dataset_collection", metadata, Column( "id", Integer, primary_key=True ), Column( "job_id", Integer, ForeignKey( "job.id" ), index=True ), @@ -1809,8 +1815,13 @@ mapper( model.JobToOutputDatasetCollectionAssociation, model.JobToOutputDatasetCollectionAssociation.table, properties=dict( + job=relation( model.Job ), dataset_collection_instance=relation( + model.HistoryDatasetCollectionAssociation, lazy=False, backref="output_dataset_collection_instances" ) ) ) + +mapper( model.JobToImplicitOutputDatasetCollectionAssociation, + model.JobToImplicitOutputDatasetCollectionAssociation.table, properties=dict( job=relation( model.Job ), dataset_collection=relation( - model.HistoryDatasetCollectionAssociation, lazy=False ) ) ) + model.DatasetCollection, backref="output_dataset_collections" ) ) ) mapper( model.JobToInputLibraryDatasetAssociation, model.JobToInputLibraryDatasetAssociation.table, properties=dict( @@ -1895,6 +1906,8 @@ parameters=relation( model.JobParameter, lazy=False ), input_datasets=relation( model.JobToInputDatasetAssociation ), output_datasets=relation( model.JobToOutputDatasetAssociation ), + output_dataset_collection_instances=relation( model.JobToOutputDatasetCollectionAssociation ), + output_dataset_collections=relation( model.JobToImplicitOutputDatasetCollectionAssociation ), post_job_actions=relation( model.PostJobActionAssociation, lazy=False ), input_library_datasets=relation( model.JobToInputLibraryDatasetAssociation ), output_library_datasets=relation( model.JobToOutputLibraryDatasetAssociation ), diff -r 6d3099596550b23ed9c21384e35ffb3a7af57a64 -r 6ae144fd51f756031e1691268369dcbe5440a16b lib/galaxy/model/migrate/versions/0127_output_collection_adjustments.py --- /dev/null +++ b/lib/galaxy/model/migrate/versions/0127_output_collection_adjustments.py @@ -0,0 +1,63 @@ +""" +Migration script updating collections tables for output collections. +""" + +from sqlalchemy import * +from sqlalchemy.orm import * +from migrate import * +from migrate.changeset import * +from galaxy.model.custom_types import * + +import datetime +now = datetime.datetime.utcnow + +import logging +log = logging.getLogger( __name__ ) + +metadata = MetaData() + +JobToImplicitOutputDatasetCollectionAssociation_table = Table( + "job_to_implicit_output_dataset_collection", metadata, + Column( "id", Integer, primary_key=True ), + Column( "job_id", Integer, ForeignKey( "job.id" ), index=True ), + Column( "dataset_collection_id", Integer, ForeignKey( "dataset_collection.id" ), index=True ), + Column( "name", Unicode(255) ) +) + + +TABLES = [ + JobToImplicitOutputDatasetCollectionAssociation_table, +] + + +def upgrade(migrate_engine): + metadata.bind = migrate_engine + print __doc__ + metadata.reflect() + + for table in TABLES: + __create(table) + + +def downgrade(migrate_engine): + metadata.bind = migrate_engine + metadata.reflect() + + for table in TABLES: + __drop(table) + + +def __create(table): + try: + table.create() + except Exception as e: + print str(e) + log.exception("Creating %s table failed: %s" % (table.name, str( e ) ) ) + + +def __drop(table): + try: + table.drop() + except Exception as e: + print str(e) + log.exception("Dropping %s table failed: %s" % (table.name, str( e ) ) ) diff -r 6d3099596550b23ed9c21384e35ffb3a7af57a64 -r 6ae144fd51f756031e1691268369dcbe5440a16b lib/galaxy/tools/__init__.py --- a/lib/galaxy/tools/__init__.py +++ b/lib/galaxy/tools/__init__.py @@ -222,7 +222,18 @@ return new_state -class ToolOutput( object, Dictifiable ): +class ToolOutputBase( object, Dictifiable ): + + def __init__( self, name, label=None, filters=None, hidden=False ): + super( ToolOutputBase, self ).__init__() + self.name = name + self.label = label + self.filters = filters or [] + self.hidden = hidden + self.collection = False + + +class ToolOutput( ToolOutputBase ): """ Represents an output datasets produced by a tool. For backward compatibility this behaves as if it were the tuple:: @@ -233,16 +244,18 @@ dict_collection_visible_keys = ( 'name', 'format', 'label', 'hidden' ) def __init__( self, name, format=None, format_source=None, metadata_source=None, - parent=None, label=None, filters=None, actions=None, hidden=False ): - self.name = name + parent=None, label=None, filters=None, actions=None, hidden=False, + implicit=False ): + super( ToolOutput, self ).__init__( name, label=label, filters=filters, hidden=hidden ) self.format = format self.format_source = format_source self.metadata_source = metadata_source self.parent = parent - self.label = label - self.filters = filters or [] self.actions = actions - self.hidden = hidden + + # Initialize default values + self.change_format = [] + self.implicit = implicit # Tuple emulation @@ -263,6 +276,80 @@ return iter( ( self.format, self.metadata_source, self.parent ) ) +class ToolOutputCollection( ToolOutputBase ): + """ + Represents a HistoryDatasetCollectionAssociation of output datasets produced by a tool. + <outputs> + <dataset_collection type="list" label="${tool.name} on ${on_string} fasta"> + <discover_datasets pattern="__name__" ext="fasta" visible="True" directory="outputFiles" /> + </dataset_collection> + <dataset_collection type="paired" label="${tool.name} on ${on_string} paired reads"> + <data name="forward" format="fastqsanger" /> + <data name="reverse" format="fastqsanger"/> + </dataset_collection> + <outputs> + """ + + def __init__( self, name, structure, label=None, filters=None, hidden=False, default_format="data" ): + super( ToolOutputCollection, self ).__init__( name, label=label, filters=filters, hidden=hidden ) + self.collection = True + self.default_format = default_format + self.structure = structure + self.outputs = odict() + + def known_outputs( self, inputs ): + def to_part( ( element_identifier, output ) ): + return ToolOutputCollectionPart( self, element_identifier, output ) + + # This line is probably not right - should verify structured_like + # or have outputs and all outputs have name. + if not self.structure.structured_like and self.outputs: + outputs = self.outputs + else: + # TODO: Handle nested structures. + input_collection = inputs[ self.structure.structured_like ] + outputs = odict() + for element in input_collection.collection.elements: + name = element.element_identifier + output = ToolOutput( name, format=self.default_format, implicit=True ) + outputs[ element.element_identifier ] = output + + return map( to_part, outputs.items() ) + + +class ToolOutputCollectionStructure( object ): + + def __init__( self, collection_type=None, structured_like=None ): + self.collection_type = collection_type + self.structured_like = structured_like + if collection_type is None and structured_like is None: + raise ValueError( "Output collection types must be specify type of structured_like" ) + + +class ToolOutputCollectionPart( object ): + + def __init__( self, output_collection_def, element_identifier, output_def ): + self.output_collection_def = output_collection_def + self.element_identifier = element_identifier + self.output_def = output_def + + @property + def effective_output_name( self ): + name = self.output_collection_def.name + part_name = self.element_identifier + effective_output_name = "%s|__part__|%s" % ( name, part_name ) + return effective_output_name + + @staticmethod + def is_named_collection_part_name( name ): + return "|__part__|" in name + + @staticmethod + def split_output_name( name ): + assert ToolOutputCollectionPart.is_named_collection_part_name( name ) + return name.split("|__part__|") + + class Tool( object, Dictifiable ): """ Represents a computational tool that can be executed through Galaxy. @@ -700,9 +787,7 @@ """ Parse <outputs> elements and fill in self.outputs (keyed by name) """ - self.outputs = odict() - for output in tool_source.parse_outputs(self): - self.outputs[ output.name ] = output + self.outputs, self.output_collections = tool_source.parse_outputs(self) # TODO: Include the tool's name in any parsing warnings. def parse_stdio( self, tool_source ): @@ -930,6 +1015,19 @@ while len( self.__help_by_page ) < self.npages: self.__help_by_page.append( self.__help ) + def find_output_def( self, name ): + # name is JobToOutputDatasetAssociation name. + # TODO: to defensive, just throw IndexError and catch somewhere + # up that stack. + if ToolOutputCollectionPart.is_named_collection_part_name( name ): + collection_name, part = ToolOutputCollectionPart.split_output_name( name ) + collection_def = self.output_collections.get( collection_name, None ) + if not collection_def: + return None + return collection_def.outputs.get( part, None ) + else: + return self.outputs.get( name, None ) + def check_workflow_compatible( self, tool_source ): """ Determine if a tool can be used in workflows. External tools and the @@ -1109,6 +1207,7 @@ num_jobs=len( execution_tracker.successful_jobs ), job_errors=execution_tracker.execution_errors, jobs=execution_tracker.successful_jobs, + output_collections=execution_tracker.output_collections, implicit_collections=execution_tracker.implicit_collections, ) else: @@ -1122,14 +1221,14 @@ def __should_refresh_state( self, incoming ): return not( 'runtool_btn' in incoming or 'URL' in incoming or 'ajax_upload' in incoming ) - def handle_single_execution( self, trans, rerun_remap_job_id, params, history ): + def handle_single_execution( self, trans, rerun_remap_job_id, params, history, mapping_over_collection ): """ Return a pair with whether execution is successful as well as either resulting output data or an error message indicating the problem. """ try: params = self.__remove_meta_properties( params ) - job, out_data = self.execute( trans, incoming=params, history=history, rerun_remap_job_id=rerun_remap_job_id ) + job, out_data = self.execute( trans, incoming=params, history=history, rerun_remap_job_id=rerun_remap_job_id, mapping_over_collection=mapping_over_collection ) except httpexceptions.HTTPFound, e: #if it's a paste redirect exception, pass it up the stack raise e @@ -2756,6 +2855,15 @@ return inputs +class TestCollectionOutputDef( object ): + + def __init__( self, name, attrib, element_tests ): + self.name = name + self.collection_type = attrib.get( "type", None ) + self.attrib = attrib + self.element_tests = element_tests + + def json_fix( val ): if isinstance( val, list ): return [ json_fix( v ) for v in val ] diff -r 6d3099596550b23ed9c21384e35ffb3a7af57a64 -r 6ae144fd51f756031e1691268369dcbe5440a16b lib/galaxy/tools/actions/__init__.py --- a/lib/galaxy/tools/actions/__init__.py +++ b/lib/galaxy/tools/actions/__init__.py @@ -149,7 +149,7 @@ tool.visit_inputs( param_values, visitor ) return input_dataset_collections - def execute(self, tool, trans, incoming={}, return_job=False, set_output_hid=True, set_output_history=True, history=None, job_params=None, rerun_remap_job_id=None): + def execute(self, tool, trans, incoming={}, return_job=False, set_output_hid=True, set_output_history=True, history=None, job_params=None, rerun_remap_job_id=None, mapping_over_collection=False): """ Executes a tool, creating job and tool outputs, associating them, and submitting the job to the job queue. If history is not specified, use @@ -160,6 +160,8 @@ history = tool.get_default_history_by_trans( trans, create=True ) out_data = odict() + out_collections = {} + out_collection_instances = {} # Track input dataset collections - but replace with simply lists so collect # input datasets can process these normally. inp_dataset_collections = self.collect_input_dataset_collections( tool, incoming ) @@ -264,10 +266,50 @@ output.actions.apply_action( data, output_action_params ) # Store all changes to database trans.sa_session.flush() + return data for name, output in tool.outputs.items(): if not filter_output(output, incoming): - handle_output( name, output ) + if output.collection: + collections_manager = trans.app.dataset_collections_service + + # As far as I can tell - this is always true - but just verify + assert set_output_history, "Cannot create dataset collection for this kind of tool." + + elements = odict() + input_collections = dict( [ (k, v[0]) for k, v in inp_dataset_collections.iteritems() ] ) + known_outputs = output.known_outputs( input_collections ) + # Just to echo TODO elsewhere - this should be restructured to allow + # nested collections. + for output_part_def in known_outputs: + effective_output_name = output_part_def.effective_output_name + element = handle_output( effective_output_name, output_part_def.output_def ) + # Following hack causes dataset to no be added to history... + child_dataset_names.add( effective_output_name ) + + elements[ output_part_def.element_identifier ] = element + + if mapping_over_collection: + dc = collections_manager.create_dataset_collection( + trans, + collection_type=output.structure.collection_type, + elements=elements, + ) + out_collections[ name ] = dc + else: + hdca_name = self.get_output_name( output, None, tool, on_text, trans, incoming, history, wrapped_params.params, job_params ) + hdca = collections_manager.create( + trans, + history, + name=hdca_name, + collection_type=output.structure.collection_type, + elements=elements, + ) + # name here is name of the output element - not name + # of the hdca. + out_collection_instances[ name ] = hdca + else: + handle_output( name, output ) # Add all the top-level (non-child) datasets to the history unless otherwise specified for name in out_data.keys(): if name not in child_dataset_names and name not in incoming: # don't add children; or already existing datasets, i.e. async created @@ -322,6 +364,10 @@ job.add_input_dataset( name, None ) for name, dataset in out_data.iteritems(): job.add_output_dataset( name, dataset ) + for name, dataset_collection in out_collections.iteritems(): + job.add_implicit_output_dataset_collection( name, dataset_collection ) + for name, dataset_collection_instance in out_collection_instances.iteritems(): + job.add_output_dataset_collection( name, dataset_collection_instance ) job.object_store_id = object_store_populator.object_store_id if job_params: job.params = dumps( job_params ) diff -r 6d3099596550b23ed9c21384e35ffb3a7af57a64 -r 6ae144fd51f756031e1691268369dcbe5440a16b lib/galaxy/tools/evaluation.py --- a/lib/galaxy/tools/evaluation.py +++ b/lib/galaxy/tools/evaluation.py @@ -59,6 +59,9 @@ inp_data.update( [ ( da.name, da.dataset ) for da in job.input_library_datasets ] ) out_data.update( [ ( da.name, da.dataset ) for da in job.output_library_datasets ] ) + out_collections = dict( [ ( obj.name, obj.dataset_collection_instance ) for obj in job.output_dataset_collection_instances ] ) + out_collections.update( [ ( obj.name, obj.dataset_collection ) for obj in job.output_dataset_collections ] ) + if get_special: # Set up output dataset association for export history jobs. Because job @@ -84,6 +87,7 @@ incoming, inp_data, out_data, + output_collections=out_collections, output_paths=compute_environment.output_paths(), job_working_directory=compute_environment.working_directory(), input_paths=compute_environment.input_paths() @@ -98,7 +102,7 @@ self.param_dict = param_dict - def build_param_dict( self, incoming, input_datasets, output_datasets, output_paths, job_working_directory, input_paths=[] ): + def build_param_dict( self, incoming, input_datasets, output_datasets, output_collections, output_paths, job_working_directory, input_paths=[] ): """ Build the dictionary of parameters for substituting into the command line. Each value is wrapped in a `InputValueWrapper`, which allows @@ -115,6 +119,7 @@ self.__populate_wrappers(param_dict, input_dataset_paths) self.__populate_input_dataset_wrappers(param_dict, input_datasets, input_dataset_paths) self.__populate_output_dataset_wrappers(param_dict, output_datasets, output_paths, job_working_directory) + self.__populate_output_collection_wrappers(param_dict, output_collections, output_paths, job_working_directory) self.__populate_unstructured_path_rewrites(param_dict) # Call param dict sanitizer, before non-job params are added, as we don't want to sanitize filenames. self.__sanitize_param_dict( param_dict ) @@ -269,6 +274,35 @@ for child in data.children: param_dict[ "_CHILD___%s___%s" % ( name, child.designation ) ] = DatasetFilenameWrapper( child ) + def __populate_output_collection_wrappers(self, param_dict, output_collections, output_paths, job_working_directory): + output_dataset_paths = dataset_path_rewrites( output_paths ) + tool = self.tool + for name, out_collection in output_collections.items(): + if name not in tool.output_collections: + continue + #message_template = "Name [%s] not found in tool.output_collections %s" + #message = message_template % ( name, tool.output_collections ) + #raise AssertionError( message ) + + wrapper_kwds = dict( + datatypes_registry=self.app.datatypes_registry, + dataset_paths=output_dataset_paths, + tool=tool, + name=name + ) + wrapper = DatasetCollectionWrapper( + out_collection, + **wrapper_kwds + ) + param_dict[ name ] = wrapper + # TODO: Handle nested collections... + output_def = tool.output_collections[ name ] + for element_identifier, output_def in output_def.outputs.items(): + if not output_def.implicit: + dataset_wrapper = wrapper[ element_identifier ] + param_dict[ output_def.name ] = dataset_wrapper + log.info("Updating param_dict for %s with %s" % (output_def.name, dataset_wrapper) ) + def __populate_output_dataset_wrappers(self, param_dict, output_datasets, output_paths, job_working_directory): output_dataset_paths = dataset_path_rewrites( output_paths ) for name, hda in output_datasets.items(): diff -r 6d3099596550b23ed9c21384e35ffb3a7af57a64 -r 6ae144fd51f756031e1691268369dcbe5440a16b lib/galaxy/tools/execute.py --- a/lib/galaxy/tools/execute.py +++ b/lib/galaxy/tools/execute.py @@ -4,6 +4,7 @@ collections from matched collections. """ import collections +import galaxy.tools from galaxy.tools.actions import on_text_for_names import logging @@ -23,7 +24,7 @@ # Only workflow invocation code gets to set this, ignore user supplied # values or rerun parameters. del params[ '__workflow_invocation_uuid__' ] - job, result = tool.handle_single_execution( trans, rerun_remap_job_id, params, history ) + job, result = tool.handle_single_execution( trans, rerun_remap_job_id, params, history, collection_info ) if job: execution_tracker.record_success( job, result ) else: @@ -46,6 +47,7 @@ self.failed_jobs = 0 self.execution_errors = [] self.output_datasets = [] + self.output_collections = [] self.outputs_by_output_name = collections.defaultdict(list) self.implicit_collections = {} @@ -53,7 +55,15 @@ self.successful_jobs.append( job ) self.output_datasets.extend( outputs ) for output_name, output_dataset in outputs: + if galaxy.tools.ToolOutputCollectionPart.is_named_collection_part_name( output_name ): + # Skip known collection outputs, these will be covered by + # output collections. + continue self.outputs_by_output_name[ output_name ].append( output_dataset ) + for job_output in job.output_dataset_collections: + self.outputs_by_output_name[ job_output.name ].append( job_output.dataset_collection ) + for job_output in job.output_dataset_collection_instances: + self.output_collections.append( ( job_output.name, job_output.dataset_collection_instance ) ) def record_error( self, error ): self.failed_jobs += 1 @@ -120,6 +130,11 @@ collection_type=collection_type, implicit_collection_info=implicit_collection_info, ) + for job in self.successful_jobs: + # TODO: Think through this, may only want this for output + # collections - or we may be already recording data in some + # other way. + job.add_output_dataset_collection( output_name, collection ) collections[ output_name ] = collection self.implicit_collections = collections diff -r 6d3099596550b23ed9c21384e35ffb3a7af57a64 -r 6ae144fd51f756031e1691268369dcbe5440a16b lib/galaxy/tools/parameters/basic.py --- a/lib/galaxy/tools/parameters/basic.py +++ b/lib/galaxy/tools/parameters/basic.py @@ -2088,7 +2088,6 @@ for instance should I just be checking dynamic options). """ allow = True - # TODO: allow should be false in some in cases... return allow def _options_filter_attribute( self, value ): diff -r 6d3099596550b23ed9c21384e35ffb3a7af57a64 -r 6ae144fd51f756031e1691268369dcbe5440a16b lib/galaxy/tools/parser/interface.py --- a/lib/galaxy/tools/parser/interface.py +++ b/lib/galaxy/tools/parser/interface.py @@ -130,7 +130,8 @@ @abstractmethod def parse_outputs(self, tool): - """ Return a list of ToolOutput objects. + """ Return a pair of output and output collections ordered + dictionaries for use by Tool. """ @abstractmethod diff -r 6d3099596550b23ed9c21384e35ffb3a7af57a64 -r 6ae144fd51f756031e1691268369dcbe5440a16b lib/galaxy/tools/parser/xml.py --- a/lib/galaxy/tools/parser/xml.py +++ b/lib/galaxy/tools/parser/xml.py @@ -12,6 +12,7 @@ InputSource, ) from galaxy.util import string_as_bool, xml_text, xml_to_string +from galaxy.util.odict import odict from galaxy.tools.deps import requirements import galaxy.tools from galaxy.tools.parameters import output_collect @@ -133,17 +134,54 @@ def parse_outputs(self, tool): out_elem = self.root.find("outputs") + outputs = odict() + output_collections = odict() if out_elem is None: - return [] + return outputs, output_collections - def _parse(data_elem): - return self._parse_output(data_elem, tool) + data_dict = odict() - return map(_parse, out_elem.findall("data")) + def _parse(data_elem, default_format="data"): + output_def = self._parse_output(data_elem, tool, default_format=default_format) + data_dict[output_def.name] = output_def + return output_def - def _parse_output(self, data_elem, tool): + map(_parse, out_elem.findall("data")) + + for collection_elem in out_elem.findall("collection"): + name = collection_elem.get( "name" ) + default_format = collection_elem.get( "format", "data" ) + collection_type = collection_elem.get( "type", None ) + structured_like = collection_elem.get( "structured_like", None ) + structure = galaxy.tools.ToolOutputCollectionStructure( + collection_type=collection_type, + structured_like=structured_like, + ) + output_collection = galaxy.tools.ToolOutputCollection( + name, + structure, + default_format=default_format + ) + outputs[output_collection.name] = output_collection + + for data_elem in collection_elem.findall("data"): + _parse( data_elem, default_format=default_format ) + + for data_elem in collection_elem.findall("data"): + output_name = data_elem.get("name") + data = data_dict[output_name] + assert data + del data_dict[output_name] + output_collection.outputs[output_name] = data + output_collections[ name ] = output_collection + + for output_def in data_dict.values(): + outputs[output_def.name] = output_def + return outputs, output_collections + + def _parse_output(self, data_elem, tool, default_format): output = galaxy.tools.ToolOutput( data_elem.get("name") ) - output.format = data_elem.get("format", "data") + output.format = data_elem.get("format", default_format) output.change_format = data_elem.findall("change_format") output.format_source = data_elem.get("format_source", None) output.metadata_source = data_elem.get("metadata_source", "") @@ -185,6 +223,7 @@ def _test_elem_to_dict(test_elem, i): rval = dict( outputs=__parse_output_elems(test_elem), + output_collections=__parse_output_collection_elems(test_elem), inputs=__parse_input_elems(test_elem, i), command=__parse_assert_list_from_elem( test_elem.find("assert_command") ), stdout=__parse_assert_list_from_elem( test_elem.find("assert_stdout") ), @@ -232,6 +271,29 @@ return __parse_assert_list_from_elem( assert_elem ) +def __parse_output_collection_elems( test_elem ): + output_collections = [] + for output_collection_elem in test_elem.findall( "output_collection" ): + output_collection_def = __parse_output_collection_elem( output_collection_elem ) + output_collections.append( output_collection_def ) + return output_collections + + +def __parse_output_collection_elem( output_collection_elem ): + attrib = dict( output_collection_elem.attrib ) + name = attrib.pop( 'name', None ) + if name is None: + raise Exception( "Test output collection does not have a 'name'" ) + element_tests = {} + for element in output_collection_elem.findall("element"): + element_attrib = dict( element.attrib ) + identifier = element_attrib.pop( 'name', None ) + if identifier is None: + raise Exception( "Test primary dataset does not have a 'identifier'" ) + element_tests[ identifier ] = __parse_test_attributes( element, element_attrib ) + return galaxy.tools.TestCollectionOutputDef( name, attrib, element_tests ) + + def __parse_test_attributes( output_elem, attrib ): assert_list = __parse_assert_list( output_elem ) file = attrib.pop( 'file', None ) diff -r 6d3099596550b23ed9c21384e35ffb3a7af57a64 -r 6ae144fd51f756031e1691268369dcbe5440a16b lib/galaxy/tools/parser/yaml.py --- a/lib/galaxy/tools/parser/yaml.py +++ b/lib/galaxy/tools/parser/yaml.py @@ -5,6 +5,7 @@ from galaxy.tools.deps import requirements from galaxy.tools.parameters import output_collect +from galaxy.util.odict import odict import galaxy.tools @@ -76,7 +77,11 @@ output_defs = [] for name, output_dict in outputs.items(): output_defs.append(self._parse_output(tool, name, output_dict)) - return output_defs + outputs = odict() + for output in output_defs: + outputs[output.name] = output + # TODO: parse outputs collections + return output_defs, odict() def _parse_output(self, tool, name, output_dict): # TODO: handle filters, actions, change_format diff -r 6d3099596550b23ed9c21384e35ffb3a7af57a64 -r 6ae144fd51f756031e1691268369dcbe5440a16b lib/galaxy/tools/test.py --- a/lib/galaxy/tools/test.py +++ b/lib/galaxy/tools/test.py @@ -123,6 +123,7 @@ self.inputs = self.__process_raw_inputs( self.tool.inputs, test_dict["inputs"] ) self.outputs = test_dict["outputs"] + self.output_collections = test_dict["output_collections"] num_outputs = test_dict.get( 'expect_num_outputs', None ) if num_outputs: num_outputs = int( num_outputs ) diff -r 6d3099596550b23ed9c21384e35ffb3a7af57a64 -r 6ae144fd51f756031e1691268369dcbe5440a16b lib/galaxy/tools/wrappers.py --- a/lib/galaxy/tools/wrappers.py +++ b/lib/galaxy/tools/wrappers.py @@ -292,10 +292,13 @@ # It is a HistoryDatasetCollectionAssociation collection = has_collection.collection self.name = has_collection.name - else: + elif hasattr( has_collection, "child_collection" ): # It is a DatasetCollectionElement instance referencing another collection collection = has_collection.child_collection self.name = has_collection.element_identifier + else: + collection = has_collection + self.name = None elements = collection.elements element_instances = odict.odict() diff -r 6d3099596550b23ed9c21384e35ffb3a7af57a64 -r 6ae144fd51f756031e1691268369dcbe5440a16b lib/galaxy/webapps/galaxy/api/tools.py --- a/lib/galaxy/webapps/galaxy/api/tools.py +++ b/lib/galaxy/webapps/galaxy/api/tools.py @@ -192,6 +192,7 @@ output_datasets = vars.get( 'out_data', [] ) rval = { "outputs": [], + "output_collections": [], "jobs": [], "implicit_collections": [], } @@ -214,6 +215,12 @@ for job in vars.get('jobs', []): rval[ 'jobs' ].append( self.encode_all_ids( trans, job.to_dict( view='collection' ), recursive=True ) ) + for output_name, collection_instance in vars.get('output_collections', []): + history = target_history or trans.history + output_dict = dictify_dataset_collection_instance( collection_instance, security=trans.security, parent=history ) + output_dict[ 'output_name' ] = output_name + rval[ 'output_collections' ].append( output_dict ) + for output_name, collection_instance in vars.get( 'implicit_collections', {} ).iteritems(): history = target_history or trans.history output_dict = dictify_dataset_collection_instance( collection_instance, security=trans.security, parent=history ) diff -r 6d3099596550b23ed9c21384e35ffb3a7af57a64 -r 6ae144fd51f756031e1691268369dcbe5440a16b lib/galaxy/workflow/modules.py --- a/lib/galaxy/workflow/modules.py +++ b/lib/galaxy/workflow/modules.py @@ -821,6 +821,7 @@ step_outputs = dict( execution_tracker.implicit_collections ) else: step_outputs = dict( execution_tracker.output_datasets ) + step_outputs.update( execution_tracker.output_collections ) progress.set_step_outputs( step, step_outputs ) jobs = execution_tracker.successful_jobs for job in jobs: diff -r 6d3099596550b23ed9c21384e35ffb3a7af57a64 -r 6ae144fd51f756031e1691268369dcbe5440a16b test/api/test_tools.py --- a/test/api/test_tools.py +++ b/test/api/test_tools.py @@ -215,6 +215,67 @@ contents = self.dataset_populator.get_history_dataset_content( history_id, hid=4 ) assert contents.strip() == "123\n456", contents + @skip_without_tool( "collection_creates_pair" ) + def test_paired_collection_output( self ): + history_id = self.dataset_populator.new_history() + new_dataset1 = self.dataset_populator.new_dataset( history_id, content='123\n456\n789\n0ab' ) + inputs = { + "input1": {"src": "hda", "id": new_dataset1["id"]}, + } + # TODO: shouldn't need this wait + self.dataset_populator.wait_for_history( history_id, assert_ok=True ) + create = self._run( "collection_creates_pair", history_id, inputs, assert_ok=True ) + jobs = create[ 'jobs' ] + implicit_collections = create[ 'implicit_collections' ] + collections = create[ 'output_collections' ] + + self.assertEquals( len( jobs ), 1 ) + self.assertEquals( len( implicit_collections ), 0 ) + self.assertEquals( len( collections ), 1 ) + + output_collection = collections[ 0 ] + elements = output_collection[ "elements" ] + assert len( elements ) == 2 + element0, element1 = elements + assert element0[ "element_identifier" ] == "forward" + assert element1[ "element_identifier" ] == "reverse" + self.dataset_populator.wait_for_history( history_id, assert_ok=True ) + contents0 = self.dataset_populator.get_history_dataset_content( history_id, dataset_id=element0["object"]["id"]) + assert contents0 == "123\n789\n", contents0 + contents1 = self.dataset_populator.get_history_dataset_content( history_id, dataset_id=element1["object"]["id"]) + assert contents1 == "456\n0ab\n", contents1 + + @skip_without_tool( "collection_creates_list" ) + def test_list_collection_output( self ): + history_id = self.dataset_populator.new_history() + create_response = self.dataset_collection_populator.create_list_in_history( history_id, contents=["a\nb\nc\nd", "e\nf\ng\nh"] ) + hdca_id = create_response.json()[ "id" ] + inputs = { + "input1": { "src": "hdca", "id": hdca_id }, + } + # TODO: real problem here - shouldn't have to have this wait. + self.dataset_populator.wait_for_history( history_id, assert_ok=True ) + create = self._run( "collection_creates_list", history_id, inputs, assert_ok=True ) + jobs = create[ 'jobs' ] + implicit_collections = create[ 'implicit_collections' ] + collections = create[ 'output_collections' ] + + self.assertEquals( len( jobs ), 1 ) + self.assertEquals( len( implicit_collections ), 0 ) + self.assertEquals( len( collections ), 1 ) + + output_collection = collections[ 0 ] + elements = output_collection[ "elements" ] + assert len( elements ) == 2 + element0, element1 = elements + assert element0[ "element_identifier" ] == "data1" + assert element1[ "element_identifier" ] == "data2" + self.dataset_populator.wait_for_history( history_id, assert_ok=True ) + contents0 = self.dataset_populator.get_history_dataset_content( history_id, dataset_id=element0["object"]["id"]) + assert contents0 == "0\n", contents0 + contents1 = self.dataset_populator.get_history_dataset_content( history_id, dataset_id=element1["object"]["id"]) + assert contents1 == "1\n", contents1 + @skip_without_tool( "cat1" ) def test_run_cat1_with_two_inputs( self ): # Run tool with an multiple data parameter and grouping (repeat) @@ -535,6 +596,52 @@ self.assertEquals( len( response_object[ 'jobs' ] ), 2 ) self.assertEquals( len( response_object[ 'implicit_collections' ] ), 1 ) + @skip_without_tool( "collection_creates_pair" ) + def test_map_over_collection_output( self ): + history_id = self.dataset_populator.new_history() + create_response = self.dataset_collection_populator.create_list_in_history( history_id, contents=["a\nb\nc\nd", "e\nf\ng\nh"] ) + hdca_id = create_response.json()[ "id" ] + inputs = { + "input1|__collection_multirun__": {"src": "hdca", "id": hdca_id}, + } + self.dataset_populator.wait_for_history( history_id, assert_ok=True ) + create = self._run( "collection_creates_pair", history_id, inputs, assert_ok=True ) + jobs = create[ 'jobs' ] + implicit_collections = create[ 'implicit_collections' ] + self.assertEquals( len( jobs ), 2 ) + self.assertEquals( len( implicit_collections ), 1 ) + implicit_collection = implicit_collections[ 0 ] + assert implicit_collection[ "collection_type" ] == "list:paired", implicit_collection + outer_elements = implicit_collection[ "elements" ] + assert len( outer_elements ) == 2 + element0, element1 = outer_elements + assert element0[ "element_identifier" ] == "data1" + assert element1[ "element_identifier" ] == "data2" + + pair0, pair1 = element0["object"], element1["object"] + pair00, pair01 = pair0["elements"] + pair10, pair11 = pair1["elements"] + + for pair in pair0, pair1: + assert "collection_type" in pair, pair + assert pair["collection_type"] == "paired", pair + + pair_ids = [] + for pair_element in pair00, pair01, pair10, pair11: + assert "object" in pair_element + pair_ids.append(pair_element["object"]["id"]) + + self.dataset_populator.wait_for_history( history_id, assert_ok=True ) + expected_contents = [ + "a\nc\n", + "b\nd\n", + "e\ng\n", + "f\nh\n", + ] + for i in range(4): + contents = self.dataset_populator.get_history_dataset_content( history_id, dataset_id=pair_ids[i]) + self.assertEquals(expected_contents[i], contents) + @skip_without_tool( "cat1" ) def test_cannot_map_over_incompatible_collections( self ): history_id = self.dataset_populator.new_history() diff -r 6d3099596550b23ed9c21384e35ffb3a7af57a64 -r 6ae144fd51f756031e1691268369dcbe5440a16b test/api/test_workflows.py --- a/test/api/test_workflows.py +++ b/test/api/test_workflows.py @@ -487,6 +487,57 @@ self._assert_status_code_is( run_workflow_response, 200 ) self.dataset_populator.wait_for_history( history_id, assert_ok=True ) + def test_workflow_run_output_collections(self): + workflow_id = self._upload_yaml_workflow(""" +- label: text_input + type: input +- label: split_up + tool_id: collection_creates_pair + state: + input1: + $link: text_input +- tool_id: collection_paired_test + state: + f1: + $link: split_up#paired_output +""") + history_id = self.dataset_populator.new_history() + hda1 = self.dataset_populator.new_dataset( history_id, content="a\nb\nc\nd\n" ) + inputs = { + '0': self._ds_entry(hda1), + } + self.__invoke_workflow( history_id, workflow_id, inputs ) + time.sleep( 5 ) + self.dataset_populator.wait_for_history( history_id, assert_ok=True ) + self.assertEquals("a\nc\nb\nd\n", self.dataset_populator.get_history_dataset_content( history_id, hid=0 ) ) + + def test_workflow_run_output_collection_mapping(self): + workflow_id = self._upload_yaml_workflow(""" +- type: input_collection +- tool_id: collection_creates_pair + state: + input1: + $link: 0 +- tool_id: collection_paired_test + state: + f1: + $link: 1#paired_output +- tool_id: cat_list + state: + input1: + $link: 2#out1 +""") + history_id = self.dataset_populator.new_history() + hdca1 = self.dataset_collection_populator.create_list_in_history( history_id, contents=["a\nb\nc\nd\n", "e\nf\ng\nh\n"] ).json() + self.dataset_populator.wait_for_history( history_id, assert_ok=True ) + inputs = { + '0': self._ds_entry(hdca1), + } + self.__invoke_workflow( history_id, workflow_id, inputs ) + time.sleep( 5 ) + self.dataset_populator.wait_for_history( history_id, assert_ok=True ) + self.assertEquals("a\nc\nb\nd\ne\ng\nf\nh\n", self.dataset_populator.get_history_dataset_content( history_id, hid=0 ) ) + def test_workflow_request( self ): workflow = self.workflow_populator.load_workflow( name="test_for_queue" ) workflow_request, history_id = self._setup_workflow_run( workflow ) diff -r 6d3099596550b23ed9c21384e35ffb3a7af57a64 -r 6ae144fd51f756031e1691268369dcbe5440a16b test/base/interactor.py --- a/test/base/interactor.py +++ b/test/base/interactor.py @@ -214,6 +214,7 @@ return Bunch( inputs=inputs_tree, outputs=self.__dictify_outputs( submit_response_object ), + output_collections=self.__dictify_output_collections( submit_response_object ), jobs=submit_response_object[ 'jobs' ], ) except KeyError: @@ -247,6 +248,12 @@ element_identifiers.append( element ) return element_identifiers + def __dictify_output_collections( self, submit_response ): + output_collections_dict = odict() + for output_collection in submit_response[ 'output_collections' ]: + output_collections_dict[ output_collection.get("output_name") ] = output_collection + return output_collections_dict + def __dictify_outputs( self, datasets_object ): ## Convert outputs list to a dictionary that can be accessed by ## output_name so can be more flexiable about ordering of outputs diff -r 6d3099596550b23ed9c21384e35ffb3a7af57a64 -r 6ae144fd51f756031e1691268369dcbe5440a16b test/functional/test_toolbox.py --- a/test/functional/test_toolbox.py +++ b/test/functional/test_toolbox.py @@ -48,6 +48,7 @@ try: tool_response = galaxy_interactor.run_tool( testdef, test_history ) data_list, jobs, tool_inputs = tool_response.outputs, tool_response.jobs, tool_response.inputs + data_collection_list = tool_response.output_collections except RunToolException as e: tool_inputs = e.inputs tool_execution_exception = e @@ -56,10 +57,10 @@ tool_execution_exception = e raise e - self.assertTrue( data_list ) + self.assertTrue( data_list or data_collection_list ) try: - job_stdio = self._verify_outputs( testdef, test_history, jobs, shed_tool_id, data_list, galaxy_interactor ) + job_stdio = self._verify_outputs( testdef, test_history, jobs, shed_tool_id, data_list, data_collection_list, galaxy_interactor ) except JobOutputsError as e: job_stdio = e.job_stdio job_output_exceptions = e.output_exceptions @@ -92,7 +93,7 @@ else: raise Exception( "Test parse failure" ) - def _verify_outputs( self, testdef, history, jobs, shed_tool_id, data_list, galaxy_interactor ): + def _verify_outputs( self, testdef, history, jobs, shed_tool_id, data_list, data_collection_list, galaxy_interactor ): assert len(jobs) == 1, "Test framework logic error, somehow tool test resulted in more than one job." job = jobs[ 0 ] @@ -106,6 +107,14 @@ raise Exception( message ) found_exceptions = [] + def register_exception(e): + if not found_exceptions: + # Only print this stuff out once. + for stream in ['stdout', 'stderr']: + if stream in job_stdio: + print >>sys.stderr, self._format_stream( job_stdio[ stream ], stream=stream, format=True ) + found_exceptions.append(e) + if testdef.expect_failure: if testdef.outputs: raise Exception("Cannot specify outputs in a test expecting failure.") @@ -120,18 +129,18 @@ if not testdef.expect_failure: found_exceptions.append(e) + job_stdio = galaxy_interactor.get_job_stdio( job[ 'id' ] ) + if not job_failed and testdef.expect_failure: error = AssertionError("Expected job to fail but Galaxy indicated the job successfully completed.") - found_exceptions.append(error) - - job_stdio = galaxy_interactor.get_job_stdio( job[ 'id' ] ) + register_exception(error) expect_exit_code = testdef.expect_exit_code if expect_exit_code is not None: exit_code = job_stdio["exit_code"] if str(expect_exit_code) != str(exit_code): error = AssertionError("Expected job to complete with exit code %s, found %s" % (expect_exit_code, exit_code)) - found_exceptions.append(error) + register_exception(error) for output_index, output_tuple in enumerate(testdef.outputs): # Get the correct hid @@ -151,12 +160,7 @@ try: galaxy_interactor.verify_output( history, jobs, output_data, output_testdef=output_testdef, shed_tool_id=shed_tool_id, maxseconds=maxseconds ) except Exception as e: - if not found_exceptions: - # Only print this stuff out once. - for stream in ['stdout', 'stderr']: - if stream in job_stdio: - print >>sys.stderr, self._format_stream( job_stdio[ stream ], stream=stream, format=True ) - found_exceptions.append(e) + register_exception(e) other_checks = { "command_line": "Command produced by the job", @@ -171,7 +175,48 @@ except AssertionError, err: errmsg = '%s different than expected\n' % description errmsg += str( err ) - found_exceptions.append( AssertionError( errmsg ) ) + register_exception( AssertionError( errmsg ) ) + + for output_collection_def in testdef.output_collections: + try: + name = output_collection_def.name + # TODO: data_collection_list is clearly a bad name for dictionary. + if name not in data_collection_list: + template = "Failed to find output [%s], tool outputs include [%s]" + message = template % (name, ",".join(data_collection_list.keys())) + raise AssertionError(message) + + # Data collection returned from submission, elements may have been populated after + # the job completed so re-hit the API for more information. + data_collection_returned = data_collection_list[ name ] + data_collection = galaxy_interactor._get( "dataset_collections/%s" % data_collection_returned[ "id" ], data={"instance_type": "history"} ).json() + elements = data_collection[ "elements" ] + element_dict = dict( map(lambda e: (e["element_identifier"], e["object"]), elements) ) + + expected_collection_type = output_collection_def.collection_type + if expected_collection_type: + collection_type = data_collection[ "collection_type"] + if expected_collection_type != collection_type: + template = "Expected output collection [%s] to be of type [%s], was of type [%s]." + message = template % (name, expected_collection_type, collection_type) + raise AssertionError(message) + + for element_identifier, ( element_outfile, element_attrib ) in output_collection_def.element_tests.items(): + if element_identifier not in element_dict: + template = "Failed to find identifier [%s] for testing, tool generated collection with identifiers [%s]" + message = template % (element_identifier, ",".join(element_dict.keys())) + raise AssertionError(message) + hda = element_dict[ element_identifier ] + + galaxy_interactor.verify_output_dataset( + history, + hda_id=hda["id"], + outfile=element_outfile, + attributes=element_attrib, + shed_tool_id=shed_tool_id + ) + except Exception as e: + register_exception(e) if found_exceptions: raise JobOutputsError(found_exceptions, job_stdio) diff -r 6d3099596550b23ed9c21384e35ffb3a7af57a64 -r 6ae144fd51f756031e1691268369dcbe5440a16b test/functional/tools/collection_creates_list.xml --- /dev/null +++ b/test/functional/tools/collection_creates_list.xml @@ -0,0 +1,37 @@ +<tool id="collection_creates_list" name="collection_creates_list" version="0.1.0"> + <command> + #for $key in $list_output.keys()# + echo "identifier is $key" > "$list_output[$key]"; + #end for# + echo 'ensure not empty'; + </command> + <inputs> + <param name="input1" type="data_collection" collection_type="list" label="Input" help="Input collection..." format="txt" /> + </inputs> + <outputs> + <collection name="list_output" type="list" label="Duplicate List" structured_like="input1" format="txt"> + </collection> + </outputs> + <tests> + <test> + <param name="input1"> + <collection type="list"> + <element name="l11" value="simple_line.txt" /> + <element name="l12" value="simple_line.txt" /> + </collection> + </param> + <output_collection name="list_output" type="list"> + <element name="l11"> + <assert_contents> + <has_text_matching expression="^identifier is l11\n$" /> + </assert_contents> + </element> + <element name="l12"> + <assert_contents> + <has_text_matching expression="^identifier is l12\n$" /> + </assert_contents> + </element> + </output_collection> + </test> + </tests> +</tool> diff -r 6d3099596550b23ed9c21384e35ffb3a7af57a64 -r 6ae144fd51f756031e1691268369dcbe5440a16b test/functional/tools/collection_creates_pair.xml --- /dev/null +++ b/test/functional/tools/collection_creates_pair.xml @@ -0,0 +1,33 @@ +<tool id="collection_creates_pair" name="collection_creates_pair" version="0.1.0"> + <command> + sed 'n;d' $input1 > $forward ; + sed -n 'g;n;p' $input1 > "reverse.txt"; + </command> + <inputs> + <param name="input1" type="data" label="Input" help="Input to be split." /> + </inputs> + <outputs> + <collection name="paired_output" type="paired" label="Split Pair"> + <!-- can reference parts directly or find via from_work_dir. --> + <data name="forward" format="txt" /> + <data name="reverse" format="txt" from_work_dir="reverse.txt" /> + </collection> + </outputs> + <tests> + <test> + <param name="input1" value="simple_lines_interleaved.txt" /> + <output_collection name="paired_output" type="paired"> + <element name="forward"> + <assert_contents> + <has_text_matching expression="^This is a line of text.\nThis is a line of text.\n$" /> + </assert_contents> + </element> + <element name="reverse"> + <assert_contents> + <has_text_matching expression="^This is a different line of text.\nThis is a different line of text.\n$" /> + </assert_contents> + </element> + </output_collection> + </test> + </tests> +</tool> diff -r 6d3099596550b23ed9c21384e35ffb3a7af57a64 -r 6ae144fd51f756031e1691268369dcbe5440a16b test/functional/tools/samples_tool_conf.xml --- a/test/functional/tools/samples_tool_conf.xml +++ b/test/functional/tools/samples_tool_conf.xml @@ -34,6 +34,8 @@ <tool file="collection_nested_test.xml" /><tool file="collection_mixed_param.xml" /><tool file="collection_two_paired.xml" /> + <tool file="collection_creates_pair.xml" /> + <tool file="collection_creates_list.xml" /><tool file="collection_optional_param.xml" /><tool file="multiple_versions_v01.xml" /> diff -r 6d3099596550b23ed9c21384e35ffb3a7af57a64 -r 6ae144fd51f756031e1691268369dcbe5440a16b test/unit/tools/test_parsing.py --- a/test/unit/tools/test_parsing.py +++ b/test/unit/tools/test_parsing.py @@ -167,8 +167,9 @@ assert containers[0].identifier == "mycool/bwa" def test_outputs(self): - outputs = self._tool_source.parse_outputs(object()) + outputs, output_collections = self._tool_source.parse_outputs(object()) assert len(outputs) == 1 + assert len(output_collections) == 0 def test_stdio(self): exit, regexes = self._tool_source.parse_stdio() @@ -273,8 +274,9 @@ assert containers[0].identifier == "awesome/bowtie" def test_outputs(self): - outputs = self._tool_source.parse_outputs(object()) + outputs, output_collections = self._tool_source.parse_outputs(object()) assert len(outputs) == 1 + assert len(output_collections) == 0 def test_stdio(self): exit, regexes = self._tool_source.parse_stdio() https://bitbucket.org/galaxy/galaxy-central/commits/99b60d99df38/ Changeset: 99b60d99df38 User: jmchilton Date: 2015-01-15 14:30:00+00:00 Summary: Allow tools to output collections with a dynamic number of datasets. Models: Track whether dataset collections have been populated yet. Dataset collections are still effectively immutable once populated - but dynamic output collections require them to be sort of like `final` fields in Java (analogy courtesy of JJ) - allowing them to be declared before they are initialized or populated. This is tracked by the `populated_state` field. Tools: Output collections can now describe `discover_datasets` elements just like datasets - except in this case instead of dynamically populating new datasets in the history - they will comprise the collection. `designation` has been reused to serve as the element_identifier for the collection element corresponding to the dataset. See Pull Request 356 for more information on the discover_datasets tag https://bitbucket.org/galaxy/galaxy-central/pull-request/356/enhancements-fo.... Workflows: Update workflow execution and recovery for dynamic output collections. Galaxy workflow data flow before collections * - * - * - * - * - * Galaxy worfklow data flow after collections (iteration 1) * - * - * \ * - * - * * - * - * / \ * - * - * * - * - * \ / * - * - * * - * - * / Galaxy worfklow data flow after this commit / * - * \ * - * * - * / \ * - * / \ / \ / \ / / * - * \ \ * - * -- * - * * - * -- * - * \ \ * - * / / \ / \ / \ / * - * \ / * - * * - * \ * - * / Affected #: 17 files diff -r 6ae144fd51f756031e1691268369dcbe5440a16b -r 99b60d99df388fb919df5e6dadc791c0e6be90d3 lib/galaxy/jobs/__init__.py --- a/lib/galaxy/jobs/__init__.py +++ b/lib/galaxy/jobs/__init__.py @@ -1202,6 +1202,11 @@ out_data = dict( [ ( da.name, da.dataset ) for da in job.output_datasets ] ) inp_data.update( [ ( da.name, da.dataset ) for da in job.input_library_datasets ] ) out_data.update( [ ( da.name, da.dataset ) for da in job.output_library_datasets ] ) + + # TODO: eliminate overlap with tools/evaluation.py + out_collections = dict( [ ( obj.name, obj.dataset_collection_instance ) for obj in job.output_dataset_collection_instances ] ) + out_collections.update( [ ( obj.name, obj.dataset_collection ) for obj in job.output_dataset_collections ] ) + input_ext = 'data' for _, data in inp_data.items(): # For loop odd, but sort simulating behavior in galaxy.tools.actions @@ -1218,6 +1223,12 @@ 'children': self.tool.collect_child_datasets(out_data, self.working_directory), 'primary': self.tool.collect_primary_datasets(out_data, self.working_directory, input_ext) } + self.tool.collect_dynamic_collections( + out_collections, + job_working_directory=self.working_directory, + inp_data=inp_data, + job=job, + ) param_dict.update({'__collected_datasets__': collected_datasets}) # Certain tools require tasks to be completed after job execution # ( this used to be performed in the "exec_after_process" hook, but hooks are deprecated ). diff -r 6ae144fd51f756031e1691268369dcbe5440a16b -r 99b60d99df388fb919df5e6dadc791c0e6be90d3 lib/galaxy/managers/collections.py --- a/lib/galaxy/managers/collections.py +++ b/lib/galaxy/managers/collections.py @@ -28,6 +28,7 @@ Abstraction for interfacing with dataset collections instance - ideally abstarcts out model and plugin details. """ + ELEMENTS_UNINITIALIZED = object() def __init__( self, app ): self.type_registry = DatasetCollectionTypesRegistry( app ) @@ -129,9 +130,24 @@ elements = self.__load_elements( trans, element_identifiers ) # else if elements is set, it better be an ordered dict! + if elements is not self.ELEMENTS_UNINITIALIZED: + type_plugin = collection_type_description.rank_type_plugin() + dataset_collection = builder.build_collection( type_plugin, elements ) + else: + dataset_collection = model.DatasetCollection( populated=False ) + dataset_collection.collection_type = collection_type + return dataset_collection + + def set_collection_elements( self, dataset_collection, dataset_instances ): + if dataset_collection.populated: + raise Exception("Cannot reset elements of an already populated dataset collection.") + + collection_type = dataset_collection.collection_type + collection_type_description = self.collection_type_descriptions.for_collection_type( collection_type ) type_plugin = collection_type_description.rank_type_plugin() - dataset_collection = builder.build_collection( type_plugin, elements ) - dataset_collection.collection_type = collection_type + builder.set_collection_elements( dataset_collection, type_plugin, dataset_instances ) + dataset_collection.mark_as_populated() + return dataset_collection def delete( self, trans, instance_type, id ): diff -r 6ae144fd51f756031e1691268369dcbe5440a16b -r 99b60d99df388fb919df5e6dadc791c0e6be90d3 lib/galaxy/managers/collections_util.py --- a/lib/galaxy/managers/collections_util.py +++ b/lib/galaxy/managers/collections_util.py @@ -77,7 +77,9 @@ # TODO: Work in progress - this end-point is not right yet... dict_value[ 'url' ] = web.url_for( 'library_content', library_id=encoded_library_id, id=encoded_id, folder_id=encoded_folder_id ) if view == "element": - dict_value[ 'elements' ] = map( dictify_element, dataset_colleciton_instance.collection.elements ) + collection = dataset_colleciton_instance.collection + dict_value[ 'elements' ] = map( dictify_element, collection.elements ) + dict_value[ 'populated' ] = collection.populated security.encode_all_ids( dict_value, recursive=True ) # TODO: Use Kyle's recusrive formulation of this. return dict_value @@ -87,7 +89,9 @@ object_detials = element.element_object.to_dict() if element.child_collection: # Recursively yield elements for each nested collection... - object_detials[ "elements" ] = map( dictify_element, element.child_collection.elements ) + child_collection = element.child_collection + object_detials[ "elements" ] = map( dictify_element, child_collection.elements ) + object_detials[ "populated" ] = child_collection.populated dictified[ "object" ] = object_detials return dictified diff -r 6ae144fd51f756031e1691268369dcbe5440a16b -r 99b60d99df388fb919df5e6dadc791c0e6be90d3 lib/galaxy/model/__init__.py --- a/lib/galaxy/model/__init__.py +++ b/lib/galaxy/model/__init__.py @@ -2676,14 +2676,37 @@ """ dict_collection_visible_keys = ( 'id', 'collection_type' ) dict_element_visible_keys = ( 'id', 'collection_type' ) + populated_states = Bunch( + NEW='new', # New dataset collection, unpopulated elements + OK='ok', # Collection elements populated (HDAs may or may not have errors) + FAILED='failed', # some problem populating state, won't be populated + ) def __init__( self, id=None, collection_type=None, + populated=True, ): self.id = id self.collection_type = collection_type + if not populated: + self.populated_state = DatasetCollection.populated_states.NEW + + @property + def populated( self ): + return self.populated_state == DatasetCollection.populated_states.OK + + @property + def waiting_for_elements( self ): + return self.populated_state == DatasetCollection.populated_states.NEW + + def mark_as_populated( self ): + self.populated_state = DatasetCollection.populated_states.OK + + def handle_population_failed( self, message ): + self.populated_state = DatasetCollection.populated_states.FAILED + self.populated_state_message = message @property def dataset_instances( self ): diff -r 6ae144fd51f756031e1691268369dcbe5440a16b -r 99b60d99df388fb919df5e6dadc791c0e6be90d3 lib/galaxy/model/mapping.py --- a/lib/galaxy/model/mapping.py +++ b/lib/galaxy/model/mapping.py @@ -621,6 +621,8 @@ model.DatasetCollection.table = Table( "dataset_collection", metadata, Column( "id", Integer, primary_key=True ), Column( "collection_type", Unicode(255), nullable=False ), + Column( "populated_state", TrimmedString( 64 ), default='ok', nullable=False ), + Column( "populated_state_message", TEXT ), Column( "create_time", DateTime, default=now ), Column( "update_time", DateTime, default=now, onupdate=now ), ) diff -r 6ae144fd51f756031e1691268369dcbe5440a16b -r 99b60d99df388fb919df5e6dadc791c0e6be90d3 lib/galaxy/model/migrate/versions/0127_output_collection_adjustments.py --- a/lib/galaxy/model/migrate/versions/0127_output_collection_adjustments.py +++ b/lib/galaxy/model/migrate/versions/0127_output_collection_adjustments.py @@ -38,6 +38,18 @@ for table in TABLES: __create(table) + try: + dataset_collection_table = Table( "dataset_collection", metadata, autoload=True ) + # need server_default because column in non-null + populated_state_column = Column( 'populated_state', TrimmedString( 64 ), default='ok', server_default="ok", nullable=False ) + populated_state_column.create( dataset_collection_table ) + + populated_message_column = Column( 'populated_state_message', TEXT, nullable=True ) + populated_message_column.create( dataset_collection_table ) + except Exception as e: + print str(e) + log.exception( "Creating dataset collection populated column failed." ) + def downgrade(migrate_engine): metadata.bind = migrate_engine @@ -46,6 +58,16 @@ for table in TABLES: __drop(table) + try: + dataset_collection_table = Table( "dataset_collection", metadata, autoload=True ) + populated_state_column = dataset_collection_table.c.populated_state + populated_state_column.drop() + populated_message_column = dataset_collection_table.c.populated_state_message + populated_message_column.drop() + except Exception as e: + print str(e) + log.exception( "Dropping dataset collection populated_state/ column failed." ) + def __create(table): try: diff -r 6ae144fd51f756031e1691268369dcbe5440a16b -r 99b60d99df388fb919df5e6dadc791c0e6be90d3 lib/galaxy/tools/__init__.py --- a/lib/galaxy/tools/__init__.py +++ b/lib/galaxy/tools/__init__.py @@ -297,7 +297,13 @@ self.structure = structure self.outputs = odict() + # TODO: + self.metadata_source = None + def known_outputs( self, inputs ): + if self.dynamic_structure: + return [] + def to_part( ( element_identifier, output ) ): return ToolOutputCollectionPart( self, element_identifier, output ) @@ -316,14 +322,33 @@ return map( to_part, outputs.items() ) + @property + def dynamic_structure(self): + return self.structure.dynamic + + @property + def dataset_collectors(self): + if not self.dynamic_structure: + raise Exception("dataset_collectors called for output collection with static structure") + return self.structure.dataset_collectors + class ToolOutputCollectionStructure( object ): - def __init__( self, collection_type=None, structured_like=None ): + def __init__( + self, + collection_type, + structured_like, + dataset_collectors, + ): self.collection_type = collection_type self.structured_like = structured_like - if collection_type is None and structured_like is None: + self.dataset_collectors = dataset_collectors + if collection_type is None and structured_like is None and dataset_collectors is None: raise ValueError( "Output collection types must be specify type of structured_like" ) + if dataset_collectors and structured_like: + raise ValueError( "Cannot specify dynamic structure (discovered_datasets) and structured_like attribute." ) + self.dynamic = dataset_collectors is not None class ToolOutputCollectionPart( object ): @@ -2146,6 +2171,11 @@ """ return output_collect.collect_primary_datasets( self, output, job_working_directory, input_ext ) + def collect_dynamic_collections( self, output, **kwds ): + """ Find files corresponding to dynamically structured collections. + """ + return output_collect.collect_dynamic_collections( self, output, **kwds ) + def to_dict( self, trans, link_details=False, io_details=False ): """ Returns dict of tool. """ diff -r 6ae144fd51f756031e1691268369dcbe5440a16b -r 99b60d99df388fb919df5e6dadc791c0e6be90d3 lib/galaxy/tools/actions/__init__.py --- a/lib/galaxy/tools/actions/__init__.py +++ b/lib/galaxy/tools/actions/__init__.py @@ -289,6 +289,10 @@ elements[ output_part_def.element_identifier ] = element + if output.dynamic_structure: + assert not elements # known_outputs must have been empty + elements = collections_manager.ELEMENTS_UNINITIALIZED + if mapping_over_collection: dc = collections_manager.create_dataset_collection( trans, diff -r 6ae144fd51f756031e1691268369dcbe5440a16b -r 99b60d99df388fb919df5e6dadc791c0e6be90d3 lib/galaxy/tools/parameters/dataset_matcher.py --- a/lib/galaxy/tools/parameters/dataset_matcher.py +++ b/lib/galaxy/tools/parameters/dataset_matcher.py @@ -162,6 +162,11 @@ return self.dataset_collection_match( dataset_collection ) def dataset_collection_match( self, dataset_collection ): + # If dataset collection not yet populated, cannot determine if it + # would be a valid match for this parameter. + if not dataset_collection.populated: + return False + valid = True for element in dataset_collection.elements: if not self.__valid_element( element ): diff -r 6ae144fd51f756031e1691268369dcbe5440a16b -r 99b60d99df388fb919df5e6dadc791c0e6be90d3 lib/galaxy/tools/parameters/output_collect.py --- a/lib/galaxy/tools/parameters/output_collect.py +++ b/lib/galaxy/tools/parameters/output_collect.py @@ -12,6 +12,176 @@ DATASET_ID_TOKEN = "DATASET_ID" DEFAULT_EXTRA_FILENAME_PATTERN = r"primary_DATASET_ID_(?P<designation>[^_]+)_(?P<visible>[^_]+)_(?P<ext>[^_]+)(_(?P<dbkey>[^_]+))?" +import logging +log = logging.getLogger( __name__ ) + + +def collect_dynamic_collections( + tool, + output_collections, + job_working_directory, + inp_data={}, + job=None, +): + collections_service = tool.app.dataset_collections_service + job_context = JobContext( + tool, + job, + job_working_directory, + inp_data, + ) + + for name, has_collection in output_collections.items(): + if name not in tool.output_collections: + continue + output_collection_def = tool.output_collections[ name ] + if not output_collection_def.dynamic_structure: + continue + + # Could be HDCA for normal jobs or a DC for mapping + # jobs. + if hasattr(has_collection, "collection"): + collection = has_collection.collection + else: + collection = has_collection + + try: + elements = job_context.build_collection_elements( + collection, + output_collection_def, + ) + collections_service.set_collection_elements( + collection, + elements + ) + except Exception: + log.info("Problem gathering output collection.") + collection.handle_population_failed("Problem building datasets for collection.") + + +class JobContext( object ): + + def __init__( self, tool, job, job_working_directory, inp_data ): + self.inp_data = inp_data + self.app = tool.app + self.sa_session = tool.sa_session + self.job = job + self.job_working_directory = job_working_directory + + @property + def permissions( self ): + inp_data = self.inp_data + existing_datasets = [ inp for inp in inp_data.values() if inp ] + if existing_datasets: + permissions = self.app.security_agent.guess_derived_permissions_for_datasets( existing_datasets ) + else: + # No valid inputs, we will use history defaults + permissions = self.app.security_agent.history_get_default_permissions( self.job.history ) + return permissions + + def find_files( self, collection, dataset_collectors ): + filenames = odict.odict() + for path, extra_file_collector in walk_over_extra_files( dataset_collectors, self.job_working_directory, collection ): + filenames[ path ] = extra_file_collector + return filenames + + def build_collection_elements( self, collection, output_collection_def ): + datasets = self.create_datasets( + collection, + output_collection_def, + ) + + elements = odict.odict() + # TODO: allow configurable sorting. + # <sort by="lexical" /><!-- default --> + # <sort by="reverse_lexical" /> + # <sort regex="example.(\d+).fastq" by="1:numerical" /> + # <sort regex="part_(\d+)_sample_([^_]+).fastq" by="2:lexical,1:numerical" /> + # TODO: allow nested structure + for designation in datasets.keys(): + elements[ designation ] = datasets[ designation ] + + return elements + + def create_datasets( self, collection, output_collection_def ): + dataset_collectors = output_collection_def.dataset_collectors + filenames = self.find_files( collection, dataset_collectors ) + + datasets = {} + for filename, extra_file_collector in filenames.iteritems(): + fields_match = extra_file_collector.match( collection, os.path.basename( filename ) ) + if not fields_match: + raise Exception( "Problem parsing metadata fields for file %s" % filename ) + designation = fields_match.designation + visible = fields_match.visible + ext = fields_match.ext + dbkey = fields_match.dbkey + # Create new primary dataset + name = fields_match.name or designation + + dataset = self.create_dataset( + ext=ext, + designation=designation, + visible=visible, + dbkey=dbkey, + name=name, + filename=filename, + metadata_source_name=output_collection_def.metadata_source, + ) + + datasets[ designation ] = dataset + return datasets + + def create_dataset( + self, + ext, + designation, + visible, + dbkey, + name, + filename, + metadata_source_name, + ): + app = self.app + sa_session = self.sa_session + + # Copy metadata from one of the inputs if requested. + metadata_source = None + if metadata_source_name: + metadata_source = self.inp_data[ metadata_source_name ] + + # Create new primary dataset + primary_data = app.model.HistoryDatasetAssociation( extension=ext, + designation=designation, + visible=visible, + dbkey=dbkey, + create_dataset=True, + sa_session=sa_session ) + app.security_agent.set_all_dataset_permissions( primary_data.dataset, self.permissions ) + sa_session.add( primary_data ) + sa_session.flush() + # Move data from temp location to dataset location + app.object_store.update_from_file(primary_data.dataset, file_name=filename, create=True) + primary_data.set_size() + # If match specified a name use otherwise generate one from + # designation. + primary_data.name = name + + if metadata_source: + primary_data.init_meta( copy_from=metadata_source ) + else: + primary_data.init_meta() + + # Associate new dataset with job + if self.job: + assoc = app.model.JobToOutputDatasetAssociation( '__new_primary_file_%s|%s__' % ( name, designation ), primary_data ) + assoc.job = self.job + sa_session.add( assoc ) + sa_session.flush() + + primary_data.state = 'ok' + return primary_data + def collect_primary_datasets( tool, output, job_working_directory, input_ext ): app = tool.app diff -r 6ae144fd51f756031e1691268369dcbe5440a16b -r 99b60d99df388fb919df5e6dadc791c0e6be90d3 lib/galaxy/tools/parser/xml.py --- a/lib/galaxy/tools/parser/xml.py +++ b/lib/galaxy/tools/parser/xml.py @@ -153,9 +153,13 @@ default_format = collection_elem.get( "format", "data" ) collection_type = collection_elem.get( "type", None ) structured_like = collection_elem.get( "structured_like", None ) + dataset_collectors = None + if collection_elem.find( "discover_datasets" ) is not None: + dataset_collectors = output_collect.dataset_collectors_from_elem( collection_elem ) structure = galaxy.tools.ToolOutputCollectionStructure( collection_type=collection_type, structured_like=structured_like, + dataset_collectors=dataset_collectors, ) output_collection = galaxy.tools.ToolOutputCollection( name, diff -r 6ae144fd51f756031e1691268369dcbe5440a16b -r 99b60d99df388fb919df5e6dadc791c0e6be90d3 lib/galaxy/workflow/modules.py --- a/lib/galaxy/workflow/modules.py +++ b/lib/galaxy/workflow/modules.py @@ -901,6 +901,11 @@ if replacement_value.hidden_beneath_collection_instance: replacement_value = replacement_value.hidden_beneath_collection_instance outputs[ replacement_name ] = replacement_value + for job_output_collection in job_0.output_dataset_collection_instances: + replacement_name = job_output_collection.name + replacement_value = job_output_collection.dataset_collection_instance + outputs[ replacement_name ] = replacement_value + progress.set_step_outputs( step, outputs ) diff -r 6ae144fd51f756031e1691268369dcbe5440a16b -r 99b60d99df388fb919df5e6dadc791c0e6be90d3 lib/galaxy/workflow/run.py --- a/lib/galaxy/workflow/run.py +++ b/lib/galaxy/workflow/run.py @@ -251,7 +251,18 @@ step_outputs = self.outputs[ connection.output_step.id ] if step_outputs is STEP_OUTPUT_DELAYED: raise modules.DelayedWorkflowEvaluation() - return step_outputs[ connection.output_name ] + replacement = step_outputs[ connection.output_name ] + if isinstance( replacement, model.HistoryDatasetCollectionAssociation ): + if not replacement.collection.populated: + if not replacement.collection.waiting_for_elements: + # If we are not waiting for elements, there was some + # problem creating the collection. Collection will never + # be populated. + # TODO: consider distinguish between cancelled and failed? + raise modules.CancelWorkflowEvaluation() + + raise modules.DelayedWorkflowEvaluation() + return replacement def set_outputs_for_input( self, step, outputs={} ): if self.inputs_by_step_id: diff -r 6ae144fd51f756031e1691268369dcbe5440a16b -r 99b60d99df388fb919df5e6dadc791c0e6be90d3 test/api/test_tools.py --- a/test/api/test_tools.py +++ b/test/api/test_tools.py @@ -276,6 +276,40 @@ contents1 = self.dataset_populator.get_history_dataset_content( history_id, dataset_id=element1["object"]["id"]) assert contents1 == "1\n", contents1 + @skip_without_tool( "collection_split_on_column" ) + def test_dynamic_list_output( self ): + history_id = self.dataset_populator.new_history() + new_dataset1 = self.dataset_populator.new_dataset( history_id, content='samp1\t1\nsamp1\t3\nsamp2\t2\nsamp2\t4\n' ) + inputs = { + 'input1': dataset_to_param( new_dataset1 ), + } + self.dataset_populator.wait_for_history( history_id, assert_ok=True ) + create = self._run( "collection_split_on_column", history_id, inputs, assert_ok=True ) + + jobs = create[ 'jobs' ] + implicit_collections = create[ 'implicit_collections' ] + collections = create[ 'output_collections' ] + + self.assertEquals( len( jobs ), 1 ) + job_id = jobs[ 0 ][ "id" ] + self.assertEquals( len( implicit_collections ), 0 ) + self.assertEquals( len( collections ), 1 ) + + output_collection = collections[0] + self._assert_has_keys( output_collection, "id", "name", "elements", "populated" ) + assert not output_collection[ "populated" ] + assert len( output_collection[ "elements" ] ) == 0 + + self.dataset_populator.wait_for_job( job_id, assert_ok=True ) + + get_collection_response = self._get( "dataset_collections/%s" % output_collection[ "id" ], data={"instance_type": "history"} ) + self._assert_status_code_is( get_collection_response, 200 ) + + output_collection = get_collection_response.json() + self._assert_has_keys( output_collection, "id", "name", "elements", "populated" ) + assert output_collection[ "populated" ] + assert len( output_collection[ "elements" ] ) == 2 + @skip_without_tool( "cat1" ) def test_run_cat1_with_two_inputs( self ): # Run tool with an multiple data parameter and grouping (repeat) diff -r 6ae144fd51f756031e1691268369dcbe5440a16b -r 99b60d99df388fb919df5e6dadc791c0e6be90d3 test/api/test_workflows.py --- a/test/api/test_workflows.py +++ b/test/api/test_workflows.py @@ -538,6 +538,44 @@ self.dataset_populator.wait_for_history( history_id, assert_ok=True ) self.assertEquals("a\nc\nb\nd\ne\ng\nf\nh\n", self.dataset_populator.get_history_dataset_content( history_id, hid=0 ) ) + def test_workflow_run_dynamic_output_collections(self): + history_id = self.dataset_populator.new_history() + workflow_id = self._upload_yaml_workflow(""" +- label: text_input1 + type: input +- label: text_input2 + type: input +- label: cat_inputs + tool_id: cat1 + state: + input1: + $link: text_input1 + queries: + - input2: + $link: text_input2 +- label: split_up + tool_id: collection_split_on_column + state: + input1: + $link: cat_inputs#out_file1 +- tool_id: cat_list + state: + input1: + $link: split_up#split_output +""") + hda1 = self.dataset_populator.new_dataset( history_id, content="samp1\t10.0\nsamp2\t20.0\n" ) + hda2 = self.dataset_populator.new_dataset( history_id, content="samp1\t30.0\nsamp2\t40.0\n" ) + self.dataset_populator.wait_for_history( history_id, assert_ok=True ) + inputs = { + '0': self._ds_entry(hda1), + '1': self._ds_entry(hda2), + } + self.__invoke_workflow( history_id, workflow_id, inputs ) + # TODO: wait on workflow invocations + time.sleep(10) + self.dataset_populator.wait_for_history( history_id, assert_ok=True ) + self.assertEquals("10.0\n30.0\n20.0\n40.0\n", self.dataset_populator.get_history_dataset_content( history_id, hid=0 ) ) + def test_workflow_request( self ): workflow = self.workflow_populator.load_workflow( name="test_for_queue" ) workflow_request, history_id = self._setup_workflow_run( workflow ) diff -r 6ae144fd51f756031e1691268369dcbe5440a16b -r 99b60d99df388fb919df5e6dadc791c0e6be90d3 test/functional/tools/collection_split_on_column.xml --- /dev/null +++ b/test/functional/tools/collection_split_on_column.xml @@ -0,0 +1,30 @@ +<tool id="collection_split_on_column" name="collection_split_on_column" version="0.1.0"> + <command> + mkdir outputs; cd outputs; awk '{ print \$2 > \$1 ".tabular" }' $input1 + </command> + <inputs> + <param name="input1" type="data" label="Input Table" help="Table to split on first column" format="tabular" /> + </inputs> + <outputs> + <collection name="split_output" type="list" label="Table split on first column"> + <discover_datasets pattern="__name_and_ext__" directory="outputs" /> + </collection> + </outputs> + <tests> + <test> + <param name="input1" value="tinywga.fam" /> + <output_collection name="split_output" type="list"> + <element name="101"> + <assert_contents> + <has_text_matching expression="^1\n2\n3\n$" /> + </assert_contents> + </element> + <element name="1334"> + <assert_contents> + <has_text_matching expression="^1\n10\n11\n12\n13\n2\n$" /> + </assert_contents> + </element> + </output_collection> + </test> + </tests> +</tool> diff -r 6ae144fd51f756031e1691268369dcbe5440a16b -r 99b60d99df388fb919df5e6dadc791c0e6be90d3 test/functional/tools/samples_tool_conf.xml --- a/test/functional/tools/samples_tool_conf.xml +++ b/test/functional/tools/samples_tool_conf.xml @@ -37,6 +37,7 @@ <tool file="collection_creates_pair.xml" /><tool file="collection_creates_list.xml" /><tool file="collection_optional_param.xml" /> + <tool file="collection_split_on_column.xml" /><tool file="multiple_versions_v01.xml" /><tool file="multiple_versions_v02.xml" /> https://bitbucket.org/galaxy/galaxy-central/commits/9468f580cc4a/ Changeset: 9468f580cc4a User: jmchilton Date: 2015-01-15 14:30:00+00:00 Summary: More configurable format and metadata handling for output collections. Imporvements to testing code. Affected #: 10 files diff -r 99b60d99df388fb919df5e6dadc791c0e6be90d3 -r 9468f580cc4a9c45d1770477e5175e963afd3aec lib/galaxy/tools/__init__.py --- a/lib/galaxy/tools/__init__.py +++ b/lib/galaxy/tools/__init__.py @@ -290,15 +290,30 @@ <outputs> """ - def __init__( self, name, structure, label=None, filters=None, hidden=False, default_format="data" ): + def __init__( + self, + name, + structure, + label=None, + filters=None, + hidden=False, + default_format="data", + default_format_source=None, + default_metadata_source=None, + inherit_format=False, + inherit_metadata=False + ): super( ToolOutputCollection, self ).__init__( name, label=label, filters=filters, hidden=hidden ) self.collection = True self.default_format = default_format self.structure = structure self.outputs = odict() - # TODO: - self.metadata_source = None + self.inherit_format = inherit_format + self.inherit_metadata = inherit_metadata + + self.metadata_source = default_metadata_source + self.format_source = default_format_source def known_outputs( self, inputs ): if self.dynamic_structure: @@ -317,7 +332,19 @@ outputs = odict() for element in input_collection.collection.elements: name = element.element_identifier - output = ToolOutput( name, format=self.default_format, implicit=True ) + format = self.default_format + if self.inherit_format: + format = element.dataset_instance.ext + output = ToolOutput( + name, + format=format, + format_source=self.format_source, + metadata_source=self.metadata_source, + implicit=True, + ) + if self.inherit_metadata: + output.metadata_source = element.dataset_instance + outputs[ element.element_identifier ] = output return map( to_part, outputs.items() ) diff -r 99b60d99df388fb919df5e6dadc791c0e6be90d3 -r 9468f580cc4a9c45d1770477e5175e963afd3aec lib/galaxy/tools/actions/__init__.py --- a/lib/galaxy/tools/actions/__init__.py +++ b/lib/galaxy/tools/actions/__init__.py @@ -245,8 +245,16 @@ # This may not be neccesary with the new parent/child associations data.designation = name # Copy metadata from one of the inputs if requested. - if output.metadata_source: - data.init_meta( copy_from=inp_data[output.metadata_source] ) + + # metadata source can be either a string referencing an input + # or an actual object to copy. + metadata_source = output.metadata_source + if metadata_source: + if isinstance( metadata_source, basestring ): + metadata_source = inp_data[metadata_source] + + if metadata_source is not None: + data.init_meta( copy_from=metadata_source ) else: data.init_meta() # Take dbkey from LAST input @@ -287,6 +295,11 @@ # Following hack causes dataset to no be added to history... child_dataset_names.add( effective_output_name ) + if set_output_history: + history.add_dataset( element, set_hid=set_output_hid ) + trans.sa_session.add( element ) + trans.sa_session.flush() + elements[ output_part_def.element_identifier ] = element if output.dynamic_structure: diff -r 99b60d99df388fb919df5e6dadc791c0e6be90d3 -r 9468f580cc4a9c45d1770477e5175e963afd3aec lib/galaxy/tools/parameters/output_collect.py --- a/lib/galaxy/tools/parameters/output_collect.py +++ b/lib/galaxy/tools/parameters/output_collect.py @@ -174,6 +174,8 @@ # Associate new dataset with job if self.job: + self.job.history.add_dataset( primary_data ) + assoc = app.model.JobToOutputDatasetAssociation( '__new_primary_file_%s|%s__' % ( name, designation ), primary_data ) assoc.job = self.job sa_session.add( assoc ) diff -r 99b60d99df388fb919df5e6dadc791c0e6be90d3 -r 9468f580cc4a9c45d1770477e5175e963afd3aec lib/galaxy/tools/parser/xml.py --- a/lib/galaxy/tools/parser/xml.py +++ b/lib/galaxy/tools/parser/xml.py @@ -141,8 +141,8 @@ data_dict = odict() - def _parse(data_elem, default_format="data"): - output_def = self._parse_output(data_elem, tool, default_format=default_format) + def _parse(data_elem, **kwds): + output_def = self._parse_output(data_elem, tool, **kwds) data_dict[output_def.name] = output_def return output_def @@ -153,6 +153,14 @@ default_format = collection_elem.get( "format", "data" ) collection_type = collection_elem.get( "type", None ) structured_like = collection_elem.get( "structured_like", None ) + inherit_format = False + inherit_metadata = False + if structured_like: + inherit_format = string_as_bool( collection_elem.get( "inherit_format", None ) ) + inherit_metadata = string_as_bool( collection_elem.get( "inherit_metadata", None ) ) + default_format_source = collection_elem.get( "format_source", None ) + default_metadata_source = collection_elem.get( "metadata_source", "" ) + dataset_collectors = None if collection_elem.find( "discover_datasets" ) is not None: dataset_collectors = output_collect.dataset_collectors_from_elem( collection_elem ) @@ -164,12 +172,21 @@ output_collection = galaxy.tools.ToolOutputCollection( name, structure, - default_format=default_format + default_format=default_format, + inherit_format=inherit_format, + inherit_metadata=inherit_metadata, + default_format_source=default_format_source, + default_metadata_source=default_metadata_source, ) outputs[output_collection.name] = output_collection for data_elem in collection_elem.findall("data"): - _parse( data_elem, default_format=default_format ) + _parse( + data_elem, + default_format=default_format, + default_format_source=default_format_source, + default_metadata_source=default_metadata_source, + ) for data_elem in collection_elem.findall("data"): output_name = data_elem.get("name") @@ -183,12 +200,19 @@ outputs[output_def.name] = output_def return outputs, output_collections - def _parse_output(self, data_elem, tool, default_format): + def _parse_output( + self, + data_elem, + tool, + default_format="data", + default_format_source=None, + default_metadata_source="", + ): output = galaxy.tools.ToolOutput( data_elem.get("name") ) output.format = data_elem.get("format", default_format) output.change_format = data_elem.findall("change_format") - output.format_source = data_elem.get("format_source", None) - output.metadata_source = data_elem.get("metadata_source", "") + output.format_source = data_elem.get("format_source", default_format_source) + output.metadata_source = data_elem.get("metadata_source", default_metadata_source) output.parent = data_elem.get("parent", None) output.label = xml_text( data_elem, "label" ) output.count = int( data_elem.get("count", 1) ) diff -r 99b60d99df388fb919df5e6dadc791c0e6be90d3 -r 9468f580cc4a9c45d1770477e5175e963afd3aec test/api/helpers.py --- a/test/api/helpers.py +++ b/test/api/helpers.py @@ -134,7 +134,7 @@ def get_history_dataset_details( self, history_id, **kwds ): dataset_id = self.__history_dataset_id( history_id, **kwds ) - details_response = self.__get_contents_request( history_id, "/%s" % dataset_id ) + details_response = self.__get_contents_request( history_id, "/datasets/%s" % dataset_id ) assert details_response.status_code == 200 return details_response.json() diff -r 99b60d99df388fb919df5e6dadc791c0e6be90d3 -r 9468f580cc4a9c45d1770477e5175e963afd3aec test/api/test_tools.py --- a/test/api/test_tools.py +++ b/test/api/test_tools.py @@ -225,25 +225,11 @@ # TODO: shouldn't need this wait self.dataset_populator.wait_for_history( history_id, assert_ok=True ) create = self._run( "collection_creates_pair", history_id, inputs, assert_ok=True ) - jobs = create[ 'jobs' ] - implicit_collections = create[ 'implicit_collections' ] - collections = create[ 'output_collections' ] - - self.assertEquals( len( jobs ), 1 ) - self.assertEquals( len( implicit_collections ), 0 ) - self.assertEquals( len( collections ), 1 ) - - output_collection = collections[ 0 ] - elements = output_collection[ "elements" ] - assert len( elements ) == 2 - element0, element1 = elements - assert element0[ "element_identifier" ] == "forward" - assert element1[ "element_identifier" ] == "reverse" + output_collection = self._assert_one_job_one_collection_run( create ) + element0, element1 = self._assert_elements_are( output_collection, "forward", "reverse" ) self.dataset_populator.wait_for_history( history_id, assert_ok=True ) - contents0 = self.dataset_populator.get_history_dataset_content( history_id, dataset_id=element0["object"]["id"]) - assert contents0 == "123\n789\n", contents0 - contents1 = self.dataset_populator.get_history_dataset_content( history_id, dataset_id=element1["object"]["id"]) - assert contents1 == "456\n0ab\n", contents1 + self._verify_element( history_id, element0, contents="123\n789\n", file_ext="txt" ) + self._verify_element( history_id, element1, contents="456\n0ab\n", file_ext="txt" ) @skip_without_tool( "collection_creates_list" ) def test_list_collection_output( self ): @@ -256,25 +242,31 @@ # TODO: real problem here - shouldn't have to have this wait. self.dataset_populator.wait_for_history( history_id, assert_ok=True ) create = self._run( "collection_creates_list", history_id, inputs, assert_ok=True ) - jobs = create[ 'jobs' ] - implicit_collections = create[ 'implicit_collections' ] - collections = create[ 'output_collections' ] + output_collection = self._assert_one_job_one_collection_run( create ) + element0, element1 = self._assert_elements_are( output_collection, "data1", "data2" ) + self.dataset_populator.wait_for_history( history_id, assert_ok=True ) + self._verify_element( history_id, element0, contents="identifier is data1\n", file_ext="txt" ) + self._verify_element( history_id, element1, contents="identifier is data2\n", file_ext="txt" ) - self.assertEquals( len( jobs ), 1 ) - self.assertEquals( len( implicit_collections ), 0 ) - self.assertEquals( len( collections ), 1 ) - - output_collection = collections[ 0 ] - elements = output_collection[ "elements" ] - assert len( elements ) == 2 - element0, element1 = elements - assert element0[ "element_identifier" ] == "data1" - assert element1[ "element_identifier" ] == "data2" + @skip_without_tool( "collection_creates_list_2" ) + def test_list_collection_output_format_source( self ): + # test using format_source with a tool + history_id = self.dataset_populator.new_history() + new_dataset1 = self.dataset_populator.new_dataset( history_id, content='#col1\tcol2' ) + create_response = self.dataset_collection_populator.create_list_in_history( history_id, contents=["a\tb\nc\td", "e\tf\ng\th"] ) + hdca_id = create_response.json()[ "id" ] + inputs = { + "header": { "src": "hda", "id": new_dataset1["id"] }, + "input_collect": { "src": "hdca", "id": hdca_id }, + } + # TODO: real problem here - shouldn't have to have this wait. self.dataset_populator.wait_for_history( history_id, assert_ok=True ) - contents0 = self.dataset_populator.get_history_dataset_content( history_id, dataset_id=element0["object"]["id"]) - assert contents0 == "0\n", contents0 - contents1 = self.dataset_populator.get_history_dataset_content( history_id, dataset_id=element1["object"]["id"]) - assert contents1 == "1\n", contents1 + create = self._run( "collection_creates_list_2", history_id, inputs, assert_ok=True ) + output_collection = self._assert_one_job_one_collection_run( create ) + element0, element1 = self._assert_elements_are( output_collection, "data1", "data2" ) + self.dataset_populator.wait_for_history( history_id, assert_ok=True ) + self._verify_element( history_id, element0, contents="#col1\tcol2\na\tb\nc\td\n", file_ext="txt" ) + self._verify_element( history_id, element1, contents="#col1\tcol2\ne\tf\ng\th\n", file_ext="txt" ) @skip_without_tool( "collection_split_on_column" ) def test_dynamic_list_output( self ): @@ -286,21 +278,12 @@ self.dataset_populator.wait_for_history( history_id, assert_ok=True ) create = self._run( "collection_split_on_column", history_id, inputs, assert_ok=True ) - jobs = create[ 'jobs' ] - implicit_collections = create[ 'implicit_collections' ] - collections = create[ 'output_collections' ] - - self.assertEquals( len( jobs ), 1 ) - job_id = jobs[ 0 ][ "id" ] - self.assertEquals( len( implicit_collections ), 0 ) - self.assertEquals( len( collections ), 1 ) - - output_collection = collections[0] + output_collection = self._assert_one_job_one_collection_run( create ) self._assert_has_keys( output_collection, "id", "name", "elements", "populated" ) assert not output_collection[ "populated" ] assert len( output_collection[ "elements" ] ) == 0 - self.dataset_populator.wait_for_job( job_id, assert_ok=True ) + self.dataset_populator.wait_for_job( create["jobs"][0]["id"], assert_ok=True ) get_collection_response = self._get( "dataset_collections/%s" % output_collection[ "id" ], data={"instance_type": "history"} ) self._assert_status_code_is( get_collection_response, 200 ) @@ -309,6 +292,7 @@ self._assert_has_keys( output_collection, "id", "name", "elements", "populated" ) assert output_collection[ "populated" ] assert len( output_collection[ "elements" ] ) == 2 + # TODO: verify element identifiers @skip_without_tool( "cat1" ) def test_run_cat1_with_two_inputs( self ): @@ -443,6 +427,42 @@ assert "123\n0ab" in outputs_contents assert "456\n789" in outputs_contents + def _assert_one_job_one_collection_run( self, create ): + jobs = create[ 'jobs' ] + implicit_collections = create[ 'implicit_collections' ] + collections = create[ 'output_collections' ] + + self.assertEquals( len( jobs ), 1 ) + self.assertEquals( len( implicit_collections ), 0 ) + self.assertEquals( len( collections ), 1 ) + + output_collection = collections[ 0 ] + return output_collection + + def _assert_elements_are( self, collection, *args ): + elements = collection["elements"] + self.assertEquals(len(elements), len(args)) + for index, element in enumerate(elements): + arg = args[index] + self.assertEquals(arg, element["element_identifier"]) + return elements + + def _verify_element( self, history_id, element, **props ): + object_id = element["object"]["id"] + + if "contents" in props: + expected_contents = props["contents"] + + contents = self.dataset_populator.get_history_dataset_content( history_id, dataset_id=object_id) + self.assertEquals( contents, expected_contents ) + + del props["contents"] + + if props: + details = self.dataset_populator.get_history_dataset_details( history_id, dataset_id=object_id) + for key, value in props.items(): + self.assertEquals( details[key], value ) + def _setup_repeat_multirun( self ): history_id = self.dataset_populator.new_history() new_dataset1 = self.dataset_populator.new_dataset( history_id, content='123' ) diff -r 99b60d99df388fb919df5e6dadc791c0e6be90d3 -r 9468f580cc4a9c45d1770477e5175e963afd3aec test/functional/tools/collection_creates_list.xml --- a/test/functional/tools/collection_creates_list.xml +++ b/test/functional/tools/collection_creates_list.xml @@ -9,7 +9,9 @@ <param name="input1" type="data_collection" collection_type="list" label="Input" help="Input collection..." format="txt" /></inputs><outputs> - <collection name="list_output" type="list" label="Duplicate List" structured_like="input1" format="txt"> + <collection name="list_output" type="list" label="Duplicate List" structured_like="input1" inherit_format="true"> + <!-- inherit_format can be used in conjunction with structured_like + to perserve format. --></collection></outputs><tests> diff -r 99b60d99df388fb919df5e6dadc791c0e6be90d3 -r 9468f580cc4a9c45d1770477e5175e963afd3aec test/functional/tools/collection_creates_list_2.xml --- /dev/null +++ b/test/functional/tools/collection_creates_list_2.xml @@ -0,0 +1,22 @@ +<tool id="collection_creates_list_2" name="collection_creates_list_2" version="0.1.0"> + <!-- go through and a header to each item in a collection - should use implicit + mapping the non-collectiony add header tool to do this in a real analysis. + --> + <command> + #for $key in $list_output.keys()# + cat "$header" > "$list_output[$key]"; + cat "$input_collect[$key]" >> "$list_output[$key]"; + #end for# + echo 'ensure not empty'; + </command> + <inputs> + <param name="header" type="data" label="Input Data" help="Input data..." /> + <param name="input_collect" type="data_collection" collection_type="list" label="Input Collect" help="Input collection..." /> + </inputs> + <outputs> + <collection name="list_output" type="list" label="Duplicate List" structured_like="input_collect" format_source="header"> + </collection> + </outputs> + <tests> + </tests> +</tool> diff -r 99b60d99df388fb919df5e6dadc791c0e6be90d3 -r 9468f580cc4a9c45d1770477e5175e963afd3aec test/functional/tools/collection_creates_pair.xml --- a/test/functional/tools/collection_creates_pair.xml +++ b/test/functional/tools/collection_creates_pair.xml @@ -8,9 +8,13 @@ </inputs><outputs><collection name="paired_output" type="paired" label="Split Pair"> - <!-- can reference parts directly or find via from_work_dir. --> + <!-- command can reference parts directly or find via from_work_dir. --><data name="forward" format="txt" /> - <data name="reverse" format="txt" from_work_dir="reverse.txt" /> + <data name="reverse" format_source="input1" from_work_dir="reverse.txt" /> + <!-- data elements can use format, format_source, metadata_from, + from_work_dir. The format="input" idiom is not supported, + it should be considered deprecated and format_source is superior. + --></collection></outputs><tests> diff -r 99b60d99df388fb919df5e6dadc791c0e6be90d3 -r 9468f580cc4a9c45d1770477e5175e963afd3aec test/functional/tools/samples_tool_conf.xml --- a/test/functional/tools/samples_tool_conf.xml +++ b/test/functional/tools/samples_tool_conf.xml @@ -36,6 +36,7 @@ <tool file="collection_two_paired.xml" /><tool file="collection_creates_pair.xml" /><tool file="collection_creates_list.xml" /> + <tool file="collection_creates_list_2.xml" /><tool file="collection_optional_param.xml" /><tool file="collection_split_on_column.xml" /> https://bitbucket.org/galaxy/galaxy-central/commits/0bcbf45df979/ Changeset: 0bcbf45df979 User: jmchilton Date: 2015-01-15 14:30:00+00:00 Summary: A simpler way to configure output pairs (exploiting static structure). See example and comments in test/functional/tools/collection_creates_pair_from_type.xml. Affected #: 6 files diff -r 9468f580cc4a9c45d1770477e5175e963afd3aec -r 0bcbf45df97932e14522bb255f457d39aec16bf2 lib/galaxy/dataset_collections/registry.py --- a/lib/galaxy/dataset_collections/registry.py +++ b/lib/galaxy/dataset_collections/registry.py @@ -1,6 +1,6 @@ from .types import list from .types import paired - +from galaxy import model PLUGIN_CLASSES = [list.ListDatasetCollectionType, paired.PairedDatasetCollectionType] @@ -12,3 +12,13 @@ def get( self, plugin_type ): return self.__plugins[ plugin_type ] + + def prototype( self, plugin_type ): + plugin_type_object = self.get( plugin_type ) + if not hasattr( plugin_type_object, 'prototype_elements' ): + raise Exception( "Cannot pre-determine structure for collection of type %s" % plugin_type ) + + dataset_collection = model.DatasetCollection() + elements = [ e for e in plugin_type_object.prototype_elements() ] + dataset_collection.elements = elements + return dataset_collection diff -r 9468f580cc4a9c45d1770477e5175e963afd3aec -r 0bcbf45df97932e14522bb255f457d39aec16bf2 lib/galaxy/dataset_collections/types/paired.py --- a/lib/galaxy/dataset_collections/types/paired.py +++ b/lib/galaxy/dataset_collections/types/paired.py @@ -1,6 +1,6 @@ from ..types import BaseDatasetCollectionType -from galaxy.model import DatasetCollectionElement +from galaxy.model import DatasetCollectionElement, HistoryDatasetAssociation FORWARD_IDENTIFIER = "forward" REVERSE_IDENTIFIER = "reverse" @@ -32,3 +32,15 @@ ) yield left_association yield right_association + + def prototype_elements( self ): + left_association = DatasetCollectionElement( + element=HistoryDatasetAssociation(), + element_identifier=FORWARD_IDENTIFIER, + ) + right_association = DatasetCollectionElement( + element=HistoryDatasetAssociation(), + element_identifier=REVERSE_IDENTIFIER, + ) + yield left_association + yield right_association diff -r 9468f580cc4a9c45d1770477e5175e963afd3aec -r 0bcbf45df97932e14522bb255f457d39aec16bf2 lib/galaxy/tools/__init__.py --- a/lib/galaxy/tools/__init__.py +++ b/lib/galaxy/tools/__init__.py @@ -315,7 +315,7 @@ self.metadata_source = default_metadata_source self.format_source = default_format_source - def known_outputs( self, inputs ): + def known_outputs( self, inputs, type_registry ): if self.dynamic_structure: return [] @@ -324,13 +324,17 @@ # This line is probably not right - should verify structured_like # or have outputs and all outputs have name. - if not self.structure.structured_like and self.outputs: + if len( self.outputs ) > 1: outputs = self.outputs else: + # either must have specified structured_like or something worse + if self.structure.structured_like: + collection_prototype = inputs[ self.structure.structured_like ].collection + else: + collection_prototype = type_registry.prototype( self.structure.collection_type ) # TODO: Handle nested structures. - input_collection = inputs[ self.structure.structured_like ] outputs = odict() - for element in input_collection.collection.elements: + for element in collection_prototype.elements: name = element.element_identifier format = self.default_format if self.inherit_format: diff -r 9468f580cc4a9c45d1770477e5175e963afd3aec -r 0bcbf45df97932e14522bb255f457d39aec16bf2 lib/galaxy/tools/actions/__init__.py --- a/lib/galaxy/tools/actions/__init__.py +++ b/lib/galaxy/tools/actions/__init__.py @@ -286,7 +286,7 @@ elements = odict() input_collections = dict( [ (k, v[0]) for k, v in inp_dataset_collections.iteritems() ] ) - known_outputs = output.known_outputs( input_collections ) + known_outputs = output.known_outputs( input_collections, collections_manager.type_registry ) # Just to echo TODO elsewhere - this should be restructured to allow # nested collections. for output_part_def in known_outputs: diff -r 9468f580cc4a9c45d1770477e5175e963afd3aec -r 0bcbf45df97932e14522bb255f457d39aec16bf2 test/functional/tools/collection_creates_pair_from_type.xml --- /dev/null +++ b/test/functional/tools/collection_creates_pair_from_type.xml @@ -0,0 +1,35 @@ +<tool id="collection_creates_pair_from_type" name="collection_creates_pair_from_type" version="0.1.0"> + <command> + sed 'n;d' $input1 > $paired_output.forward; + sed -n 'g;n;p' $input1 > $paired_output.reverse; + </command> + <inputs> + <param name="input1" type="data" label="Input" help="Input to be split." /> + </inputs> + <outputs> + <!-- unlike lists - structure of paired outputs can be predetermined + so we can use $output.identifier in command block, see + collection_creates_pair.xml for examples of actually labelling + pieces and allowing stuff like from_work_dir. + --> + <collection name="paired_output" type="paired" label="Split Pair" format_source="input1"> + </collection> + </outputs> + <tests> + <test> + <param name="input1" value="simple_lines_interleaved.txt" ftype="txt" /> + <output_collection name="paired_output" type="paired"> + <element name="forward" ftype="txt"> + <assert_contents> + <has_text_matching expression="^This is a line of text.\nThis is a line of text.\n$" /> + </assert_contents> + </element> + <element name="reverse" ftype="txt"> + <assert_contents> + <has_text_matching expression="^This is a different line of text.\nThis is a different line of text.\n$" /> + </assert_contents> + </element> + </output_collection> + </test> + </tests> +</tool> diff -r 9468f580cc4a9c45d1770477e5175e963afd3aec -r 0bcbf45df97932e14522bb255f457d39aec16bf2 test/functional/tools/samples_tool_conf.xml --- a/test/functional/tools/samples_tool_conf.xml +++ b/test/functional/tools/samples_tool_conf.xml @@ -35,6 +35,7 @@ <tool file="collection_mixed_param.xml" /><tool file="collection_two_paired.xml" /><tool file="collection_creates_pair.xml" /> + <tool file="collection_creates_pair_from_type.xml" /><tool file="collection_creates_list.xml" /><tool file="collection_creates_list_2.xml" /><tool file="collection_optional_param.xml" /> https://bitbucket.org/galaxy/galaxy-central/commits/42f2532c7e95/ Changeset: 42f2532c7e95 User: jmchilton Date: 2015-01-15 14:30:00+00:00 Summary: Update workflow extraction backend for output collections. Affected #: 5 files diff -r 0bcbf45df97932e14522bb255f457d39aec16bf2 -r 42f2532c7e95237ce419c2f2f7cf6ff2c603170b lib/galaxy/tools/__init__.py --- a/lib/galaxy/tools/__init__.py +++ b/lib/galaxy/tools/__init__.py @@ -314,6 +314,7 @@ self.metadata_source = default_metadata_source self.format_source = default_format_source + self.change_format = [] # TODO def known_outputs( self, inputs, type_registry ): if self.dynamic_structure: diff -r 0bcbf45df97932e14522bb255f457d39aec16bf2 -r 42f2532c7e95237ce419c2f2f7cf6ff2c603170b lib/galaxy/workflow/extract.py --- a/lib/galaxy/workflow/extract.py +++ b/lib/galaxy/workflow/extract.py @@ -11,6 +11,7 @@ SelectToolParameter, UnvalidatedValue ) +from galaxy.tools import ToolOutputCollectionPart from galaxy.tools.parameters.grouping import ( Conditional, Repeat @@ -115,6 +116,8 @@ input_collection = an_implicit_output_collection.find_implicit_input_collection( input_name ) if input_collection: other_hid = input_collection.hid + else: + log.info("Cannot find implicit input collection for %s" % input_name) if other_hid in hid_to_output_pair: other_step, other_name = hid_to_output_pair[ other_hid ] conn = model.WorkflowStepConnection() @@ -126,18 +129,26 @@ steps.append( step ) steps_by_job_id[ job_id ] = step # Store created dataset hids - for assoc in job.output_datasets: + for assoc in (job.output_datasets + job.output_dataset_collection_instances): + assoc_name = assoc.name + if ToolOutputCollectionPart.is_named_collection_part_name( assoc_name ): + continue if job in summary.implicit_map_jobs: hid = None for implicit_pair in jobs[ job ]: query_assoc_name, dataset_collection = implicit_pair - if query_assoc_name == assoc.name: + if query_assoc_name == assoc_name: hid = dataset_collection.hid if hid is None: - log.warn("Failed to find matching implicit job.") + template = "Failed to find matching implicit job - job is %s, jobs are %s, assoc_name is %s." + message = template % ( job.id, jobs, assoc.name ) + log.warn( message ) raise Exception( "Failed to extract job." ) else: - hid = assoc.dataset.hid + if hasattr( assoc, "dataset" ): + hid = assoc.dataset.hid + else: + hid = assoc.dataset_collection_instance.hid hid_to_output_pair[ hid ] = ( step, assoc.name ) return steps @@ -210,11 +221,18 @@ dataset_collection = content hid = content.hid self.collection_types[ hid ] = content.collection.collection_type - if not content.implicit_output_name: - job = DatasetCollectionCreationJob( content ) - self.jobs[ job ] = [ ( None, content ) ] - else: - dataset_collection = content + if content.creating_job_associations: + for assoc in content.creating_job_associations: + job = assoc.job + if job not in self.jobs or self.jobs[ job ][ 0 ][ 1 ].history_content_type == "dataset": + self.jobs[ job ] = [ ( assoc.name, dataset_collection ) ] + if content.implicit_output_name: + self.implicit_map_jobs.append( job ) + else: + self.jobs[ job ].append( ( assoc.name, dataset_collection ) ) + # This whole elif condition may no longer be needed do to additional + # tracking with creating_job_associations. Will delete at some point. + elif content.implicit_output_name: # TODO: Optimize db call dataset_instance = dataset_collection.collection.dataset_instances[ 0 ] if not self.__check_state( dataset_instance ): @@ -235,6 +253,9 @@ self.implicit_map_jobs.append( job ) else: self.jobs[ job ].append( ( assoc.name, dataset_collection ) ) + else: + job = DatasetCollectionCreationJob( content ) + self.jobs[ job ] = [ ( None, content ) ] def __summarize_dataset( self, dataset ): if not self.__check_state( dataset ): diff -r 0bcbf45df97932e14522bb255f457d39aec16bf2 -r 42f2532c7e95237ce419c2f2f7cf6ff2c603170b lib/galaxy/workflow/modules.py --- a/lib/galaxy/workflow/modules.py +++ b/lib/galaxy/workflow/modules.py @@ -649,7 +649,9 @@ data_outputs = [] data_inputs = None for name, tool_output in self.tool.outputs.iteritems(): - if tool_output.format_source != None: + if tool_output.collection: + formats = [ 'input' ] + elif tool_output.format_source != None: formats = [ 'input' ] # default to special name "input" which remove restrictions on connections if data_inputs == None: data_inputs = self.get_data_inputs() diff -r 0bcbf45df97932e14522bb255f457d39aec16bf2 -r 42f2532c7e95237ce419c2f2f7cf6ff2c603170b test/api/test_workflow_extraction.py --- a/test/api/test_workflow_extraction.py +++ b/test/api/test_workflow_extraction.py @@ -182,6 +182,97 @@ collection_step_state = loads( collection_step[ "tool_state" ] ) self.assertEquals( collection_step_state[ "collection_type" ], u"list:paired" ) + def test_extract_workflow_with_output_collections( self ): + jobs_summary = self._run_jobs(""" +steps: + - label: text_input1 + type: input + - label: text_input2 + type: input + - label: cat_inputs + tool_id: cat1 + state: + input1: + $link: text_input1 + queries: + - input2: + $link: text_input2 + - label: split_up + tool_id: collection_split_on_column + state: + input1: + $link: cat_inputs#out_file1 + - tool_id: cat_list + state: + input1: + $link: split_up#split_output +test_data: + text_input1: "samp1\t10.0\nsamp2\t20.0\n" + text_input2: "samp1\t30.0\nsamp2\t40.0\n" +""") + tool_ids = [ "cat1", "collection_split_on_column", "cat_list" ] + job_ids = map( functools.partial(self._job_id_for_tool, jobs_summary.jobs ), tool_ids ) + downloaded_workflow = self._extract_and_download_workflow( + dataset_ids=[ "1", "2" ], + job_ids=job_ids, + ) + self.__check_workflow( + downloaded_workflow, + step_count=5, + verify_connected=True, + data_input_count=2, + data_collection_input_count=0, + tool_ids=tool_ids, + ) + + def test_extract_with_mapped_output_collections( self ): + jobs_summary = self._run_jobs(""" +steps: + - label: text_input1 + type: input_collection + - label: cat_inputs + tool_id: cat1 + state: + input1: + $link: text_input1 + - label: pair_off + tool_id: collection_creates_pair + state: + input1: + $link: cat_inputs#out_file1 + - label: cat_pairs + tool_id: cat_collection + state: + input1: + $link: pair_off#paired_output + - tool_id: cat_list + state: + input1: + $link: cat_pairs#out_file1 +test_data: + text_input1: + type: list + elements: + - identifier: samp1 + content: "samp1\t10.0\nsamp2\t20.0\n" + - identifier: samp2 + content: "samp1\t30.0\nsamp2\t40.0\n" +""") + tool_ids = [ "cat1", "collection_creates_pair", "cat_collection", "cat_list" ] + job_ids = map( functools.partial(self._job_id_for_tool, jobs_summary.jobs ), tool_ids ) + downloaded_workflow = self._extract_and_download_workflow( + dataset_collection_ids=[ "3" ], + job_ids=job_ids, + ) + self.__check_workflow( + downloaded_workflow, + step_count=5, + verify_connected=True, + data_input_count=0, + data_collection_input_count=1, + tool_ids=tool_ids, + ) + def _job_id_for_tool( self, jobs, tool_id ): return self._job_for_tool( jobs, tool_id )[ "id" ] diff -r 0bcbf45df97932e14522bb255f457d39aec16bf2 -r 42f2532c7e95237ce419c2f2f7cf6ff2c603170b test/unit/workflows/test_extract_summary.py --- a/test/unit/workflows/test_extract_summary.py +++ b/test/unit/workflows/test_extract_summary.py @@ -135,6 +135,7 @@ self.implicit_output_name = implicit_output_name self.hid = 1 self.collection = model.DatasetCollection() + self.creating_job_associations = [] element = model.DatasetCollectionElement( collection=self.collection, element=model.HistoryDatasetAssociation(), https://bitbucket.org/galaxy/galaxy-central/commits/0fd00352949c/ Changeset: 0fd00352949c User: jmchilton Date: 2015-01-15 14:30:00+00:00 Summary: Hide tools producing collections in workflow editor. Output collections will require significant overhaul to workflow editor logic. Affected #: 1 file diff -r 42f2532c7e95237ce419c2f2f7cf6ff2c603170b -r 0fd00352949c5d407e36f5d1ff0ffbcd699931c6 lib/galaxy/tools/__init__.py --- a/lib/galaxy/tools/__init__.py +++ b/lib/galaxy/tools/__init__.py @@ -496,6 +496,10 @@ self.installed_changeset_revision ) return None + @property + def produces_collections( self ): + return any( o.collection for o in self.outputs.values() ) + def __get_job_tool_configuration(self, job_params=None): """Generalized method for getting this tool's job configuration. @@ -1099,10 +1103,15 @@ if self.tool_type.startswith( 'data_source' ): return False + if self.produces_collections: + # Someday we will get there! + return False + if hasattr( tool_source, "root"): root = tool_source.root if not string_as_bool( root.get( "workflow_compatible", "True" ) ): return False + # TODO: Anyway to capture tools that dynamically change their own # outputs? return True Repository URL: https://bitbucket.org/galaxy/galaxy-central/ -- This is a commit notification from bitbucket.org. You are receiving this because you have the service enabled, addressing the recipient of this email.
participants (1)
-
commits-noreply@bitbucket.org